Added logic to add rules and write to disk
This commit is contained in:
parent
a58a6f3085
commit
dd5195ac9f
1 changed files with 61 additions and 21 deletions
78
tmpufw
78
tmpufw
|
@ -17,17 +17,19 @@ Arguments:
|
||||||
__author__ = 'Joshua Sherman'
|
__author__ = 'Joshua Sherman'
|
||||||
__file__ = 'tmpufw'
|
__file__ = 'tmpufw'
|
||||||
__license__ = 'MIT'
|
__license__ = 'MIT'
|
||||||
__status__ = 'Production'
|
__status__ = 'Development'
|
||||||
__version__ = '1.0.0'
|
__version__ = '1.0.0'
|
||||||
|
|
||||||
import argparse
|
from argparse import ArgumentParser
|
||||||
import os
|
from os import makedirs, path
|
||||||
from subprocess import call
|
from parsedatetime import Calendar
|
||||||
import sys
|
from subprocess import CalledProcessError, check_output, STDOUT
|
||||||
|
from sys import exit
|
||||||
|
from time import mktime
|
||||||
|
|
||||||
class tmpufw(object):
|
class tmpufw(object):
|
||||||
|
|
||||||
parser = argparse.ArgumentParser(description = 'Temporarily apply `ufw` rules')
|
parser = ArgumentParser(description = 'Temporarily apply `ufw` rules')
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.parser.add_argument('-s', '--status', action = 'store_true', help = 'show rule list with expirations')
|
self.parser.add_argument('-s', '--status', action = 'store_true', help = 'show rule list with expirations')
|
||||||
|
@ -37,10 +39,9 @@ class tmpufw(object):
|
||||||
self.parser.add_argument('-t', '--ttl', default = '30 days', help = 'time to live for the rule')
|
self.parser.add_argument('-t', '--ttl', default = '30 days', help = 'time to live for the rule')
|
||||||
args = self.parser.parse_args()
|
args = self.parser.parse_args()
|
||||||
|
|
||||||
if args.status and (args.clean or args.position or args.rule):
|
if args.status:
|
||||||
self.error('the --status flag must be used by itself')
|
exit('TODO display rules and expirations')
|
||||||
if args.clean and (args.position or args.rule or args.status):
|
|
||||||
self.error('the --clean flag must be used by itself')
|
|
||||||
elif args.clean:
|
elif args.clean:
|
||||||
# TODO Check for PID
|
# TODO Check for PID
|
||||||
# TODO If PID exists, exit
|
# TODO If PID exists, exit
|
||||||
|
@ -54,23 +55,62 @@ class tmpufw(object):
|
||||||
# TODO If expiration is in the future, add rule to tmp file
|
# TODO If expiration is in the future, add rule to tmp file
|
||||||
# TODO Move tmp file to rules file
|
# TODO Move tmp file to rules file
|
||||||
# TODO Remove PID
|
# TODO Remove PID
|
||||||
sys.exit('TODO clean up expired rules')
|
exit('TODO clean up expired rules')
|
||||||
elif args.rule and args.ttl:
|
|
||||||
# TODO Add the rule to `ufw`
|
|
||||||
ufw = ['ufw', 'position', args.position, args.rule]
|
|
||||||
|
|
||||||
# TODO Convert the TTL to a timestamp
|
elif args.rule:
|
||||||
# TODO Add the rule and the timestamp to the end of the rules file
|
rules_file = '/usr/local/share/' + __file__ + '/rules'
|
||||||
# TODO Check if the ufw rule is in fact valid (ufw has a --dry-run flag)
|
rules_path = path.dirname(rules_file)
|
||||||
|
|
||||||
|
if not path.exists(rules_path):
|
||||||
|
makedirs(rules_path)
|
||||||
|
|
||||||
|
# Converts the TTL to a timestamp
|
||||||
|
cal = Calendar()
|
||||||
|
timestamp = mktime(cal.parse(args.ttl)[0])
|
||||||
|
|
||||||
|
# Writes the rule to the rules file
|
||||||
|
try:
|
||||||
# TODO Check if rule already exists and update it instead of adding it again
|
# TODO Check if rule already exists and update it instead of adding it again
|
||||||
sys.exit('TODO add rule to the database')
|
|
||||||
|
handle = open(rules_file, 'a')
|
||||||
|
handle.write(str(timestamp) + ' ' + args.rule)
|
||||||
|
handle.write("\n")
|
||||||
|
handle.close()
|
||||||
|
|
||||||
|
except IOError:
|
||||||
|
self.error('Unable to write to the rules file: ' + rules_file)
|
||||||
|
|
||||||
|
# Attempts to add the rule to `ufw`
|
||||||
|
try:
|
||||||
|
self.ufw_execute('insert ' + str(args.position) + ' ' + args.rule)
|
||||||
|
|
||||||
|
except CalledProcessError as error:
|
||||||
|
# Catches an error when attempting to add a rule to an empty database
|
||||||
|
if error.output == b"ERROR: Invalid position '1'\n":
|
||||||
|
try:
|
||||||
|
self.ufw_execute(args.rule)
|
||||||
|
|
||||||
|
except CalledProcessError as error:
|
||||||
|
self.ufw_error(error)
|
||||||
|
|
||||||
|
else:
|
||||||
|
self.ufw_error(error)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
self.error('no arguments specified')
|
self.error('no arguments specified')
|
||||||
|
|
||||||
def error(self, message):
|
def error(self, message):
|
||||||
self.parser.print_usage()
|
self.parser.print_usage()
|
||||||
print(__file__ + ': error: ' + message)
|
print(__file__ + ': error: ' + message)
|
||||||
sys.exit(2)
|
exit(2)
|
||||||
|
|
||||||
|
def ufw_execute(self, rule):
|
||||||
|
for arg in [' --dry-run ', ' ']:
|
||||||
|
command = 'ufw' + arg + rule
|
||||||
|
check_output(command, stderr = STDOUT, shell = True)
|
||||||
|
|
||||||
|
def ufw_error(self, error):
|
||||||
|
self.error('ufw: ' + error.output.decode(encoding = 'UTF-8'))
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
tmpufw()
|
tmpufw()
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue