162 lines
5.2 KiB
Python
Executable file
162 lines
5.2 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
Temporarily apply `ufw` rules
|
|
|
|
This script allows you to add rules to `ufw` (Uncomplicated Firewall) with a
|
|
time to live. You can then run the script as a cronjob (with the --clean flag)
|
|
to clean up (remove) the expired rules.
|
|
|
|
Arguments:
|
|
-h, --help show the help message and exit
|
|
-s, --status show rule list with expirations
|
|
-c, --clean clean up expired rules
|
|
-r RULE, --rule RULE rule to be added to `ufw`
|
|
-p POSITION, --position POSITION position to add the rule
|
|
-t TTL, --ttl TTL time to live for the rule
|
|
"""
|
|
__author__ = 'Joshua Sherman'
|
|
__file__ = 'tmpufw'
|
|
__license__ = 'MIT'
|
|
__status__ = 'Production'
|
|
__version__ = '1.0.0'
|
|
|
|
from argparse import ArgumentParser
|
|
from datetime import datetime
|
|
from os import getpid, makedirs, path, remove
|
|
from parsedatetime import Calendar
|
|
from shutil import move
|
|
from subprocess import CalledProcessError, check_output, STDOUT
|
|
from sys import exit
|
|
from time import mktime, time
|
|
|
|
class tmpufw(object):
|
|
parser = ArgumentParser(description = 'Temporarily apply `ufw` rules')
|
|
|
|
def __init__(self):
|
|
self.parser.add_argument('-s', '--status', action = 'store_true', help = 'show rule list with expirations')
|
|
self.parser.add_argument('-c', '--clean', action = 'store_true', help = 'clean up expired rules')
|
|
self.parser.add_argument('-r', '--rule', help = 'rule to be added to `ufw`')
|
|
self.parser.add_argument('-p', '--position', default = 1, help = 'position to add the rule')
|
|
self.parser.add_argument('-t', '--ttl', default = '30 days', help = 'time to live for the rule')
|
|
args = self.parser.parse_args()
|
|
|
|
# Our file names
|
|
pid_file = '/var/run/' + __file__ + '.pid'
|
|
rules_file = '/usr/local/share/' + __file__ + '/rules'
|
|
tmp_rules_file = '/tmp/' + __file__ + '-rules'
|
|
|
|
if args.status:
|
|
if path.exists(rules_file):
|
|
try:
|
|
print("Expiration\t\tRule")
|
|
print('=' * 80)
|
|
|
|
# Loops through the rules lines
|
|
for line in open(rules_file, 'r'):
|
|
# Breaks apart line into expiration timestamp and rule
|
|
timestamp, rule = line.strip("\n").split(' ', 1)
|
|
|
|
print(str(datetime.fromtimestamp(float(timestamp))) + "\t" + rule)
|
|
except IOError:
|
|
self.error('unable to read from the rules file: ' + rules_file)
|
|
else:
|
|
self.error('there are no rules to display')
|
|
elif args.clean:
|
|
# Checks for PID file
|
|
if path.exists(pid_file):
|
|
self.error(__file__ + ' is already running')
|
|
else:
|
|
# Creates the PID file
|
|
try:
|
|
handle = open(pid_file, 'w')
|
|
handle.write(str(getpid()))
|
|
handle.close()
|
|
except IOError:
|
|
self.error('unable to create PID file: ' + pid_file)
|
|
|
|
# Checks for the rules file
|
|
if path.exists(rules_file):
|
|
# Opens the temporary rules file
|
|
try:
|
|
handle = open(tmp_rules_file, 'a')
|
|
except IOError:
|
|
self.error('unable to write to the tmp rules file: ' + tmp_rules_file)
|
|
|
|
try:
|
|
current_time = time()
|
|
|
|
# Loops through the rules lines
|
|
for line in open(rules_file, 'r'):
|
|
# Breaks apart line into expiration timestamp and rule
|
|
timestamp, rule = line.strip("\n").split(' ', 1)
|
|
|
|
# Checks if rule has expired
|
|
if current_time < float(timestamp):
|
|
handle.write(line)
|
|
print(str(datetime.fromtimestamp(time())) + "\tskipped rule\t" + rule)
|
|
else:
|
|
try:
|
|
self.ufw_execute('delete ' + rule)
|
|
print(str(datetime.fromtimestamp(time())) + "\tdeleted rule\t" + rule)
|
|
except CalledProcessError as error:
|
|
self.ufw_error(error)
|
|
|
|
handle.close()
|
|
|
|
# Moves the tmp file to the rules file
|
|
move(tmp_rules_file, rules_file)
|
|
except IOError:
|
|
self.error('unable to from the read rules file: ' + rules_file)
|
|
|
|
# Removes the PID
|
|
remove(pid_file)
|
|
elif args.rule:
|
|
rules_path = path.dirname(rules_file)
|
|
|
|
if not path.exists(rules_path):
|
|
makedirs(rules_path)
|
|
|
|
# Converts the TTL to a timestamp
|
|
cal = Calendar()
|
|
timestamp = mktime(cal.parse(args.ttl)[0])
|
|
|
|
# Writes the rule to the rules file
|
|
try:
|
|
# TODO Check if rule already exists and update it instead of adding it again
|
|
handle = open(rules_file, 'a')
|
|
handle.write(str(timestamp) + ' ' + args.rule)
|
|
handle.write("\n")
|
|
handle.close()
|
|
except IOError:
|
|
self.error('unable to write to the rules file: ' + rules_file)
|
|
|
|
# Attempts to add the rule to `ufw`
|
|
try:
|
|
self.ufw_execute('insert ' + str(args.position) + ' ' + args.rule)
|
|
except CalledProcessError as error:
|
|
# Catches an error when attempting to add a rule to an empty database
|
|
if error.output == b"ERROR: Invalid position '1'\n":
|
|
try:
|
|
self.ufw_execute(args.rule)
|
|
except CalledProcessError as error:
|
|
self.ufw_error(error)
|
|
else:
|
|
self.ufw_error(error)
|
|
else:
|
|
self.error('no arguments specified')
|
|
|
|
def error(self, message):
|
|
self.parser.print_usage()
|
|
print(__file__ + ': error: ' + message)
|
|
exit(2)
|
|
|
|
def ufw_execute(self, rule):
|
|
for arg in [' --dry-run ', ' ']:
|
|
command = 'ufw' + arg + rule
|
|
check_output(command, stderr = STDOUT, shell = True)
|
|
|
|
def ufw_error(self, error):
|
|
self.error('ufw: ' + error.output.decode(encoding = 'UTF-8'))
|
|
|
|
if __name__ == '__main__':
|
|
tmpufw()
|