"""Util functions to cleanup disk space."""
import gzip
import logging
import os
import shutil
import time
import types
from argparse import ArgumentDefaultsHelpFormatter
from datetime import datetime, timedelta
from sbws.globals import fail_hard
from sbws.util.filelock import DirectoryLock
from sbws.util.timestamp import unixts_to_dt_obj
log = logging.getLogger(__name__)
[docs]def gen_parser(sub):
"""
Helper function for the broader argument parser generating code that adds
in all the possible command line arguments for the cleanup command.
:param argparse._SubParsersAction sub: what to add a sub-parser to
"""
d = (
"Compress and delete results and/or v3bw files old files."
"Configuration options are read to determine which are old files"
)
p = sub.add_parser(
"cleanup", description=d, formatter_class=ArgumentDefaultsHelpFormatter
)
p.add_argument(
"--dry-run",
action="store_true",
help="Don't actually compress or delete anything",
)
p.add_argument(
"--no-results", action="store_true", help="Do not clean results files"
)
p.add_argument(
"--no-v3bw", action="store_true", help="Do not clean v3bw files"
)
def _get_files_mtime_older_than(dname, days_delta, extensions):
"""Return files which modification time is older than days_delta
and which extension is one of the extensions."""
assert os.path.isdir(dname)
assert isinstance(days_delta, int)
assert isinstance(extensions, list)
for ext in extensions:
assert isinstance(ext, str)
assert ext[0] == "."
# Determine oldest allowed date
today = datetime.utcfromtimestamp(time.time())
oldest_day = today - timedelta(days=days_delta)
for root, dirs, files in os.walk(dname):
for f in files:
fname = os.path.join(root, f)
_, ext = os.path.splitext(fname)
if ext not in extensions:
log.debug(
"Ignoring %s because its extension is not in " "%s",
fname,
extensions,
)
continue
# using file modification time instead of parsing the name
# of the file.
filedt = unixts_to_dt_obj(
os.stat(fname, follow_symlinks=False).st_mtime
)
if filedt < oldest_day:
yield fname
def _delete_files(dname, files, dry_run=True):
"""Delete the files passed as argument."""
assert os.path.isdir(dname)
assert isinstance(files, types.GeneratorType)
with DirectoryLock(dname):
for fname in files:
log.info("Deleting %s", fname)
assert os.path.commonprefix([dname, fname]) == dname
if not dry_run:
os.remove(fname)
def _compress_files(dname, files, dry_run=True):
"""Compress the files passed as argument."""
assert os.path.isdir(dname)
assert isinstance(files, types.GeneratorType)
with DirectoryLock(dname):
for fname in files:
log.info("Compressing %s", fname)
assert os.path.commonprefix([dname, fname]) == dname
if dry_run:
continue
with open(fname, "rt") as in_fd:
out_fname = fname + ".gz"
with gzip.open(out_fname, "wt") as out_fd:
shutil.copyfileobj(in_fd, out_fd)
os.remove(fname)
def _check_validity_periods_v3bw(compress_after_days, delete_after_days):
if 1 <= compress_after_days and compress_after_days < delete_after_days:
return True
fail_hard(
"v3bw files should only be compressed after 1 day and deleted "
"after a bigger number of days."
)
def _clean_v3bw_files(args, conf):
v3bw_dname = conf.getpath("paths", "v3bw_dname")
if not os.path.isdir(v3bw_dname):
fail_hard("%s does not exist", v3bw_dname)
compress_after_days = conf.getint(
"cleanup", "v3bw_files_compress_after_days"
)
delete_after_days = conf.getint("cleanup", "v3bw_files_delete_after_days")
_check_validity_periods_v3bw(compress_after_days, delete_after_days)
# first delete so that the files to be deleted are not compressed first
files_to_delete = _get_files_mtime_older_than(
v3bw_dname, delete_after_days, [".v3bw", ".gz"]
)
_delete_files(v3bw_dname, files_to_delete, dry_run=args.dry_run)
files_to_compress = _get_files_mtime_older_than(
v3bw_dname, compress_after_days, [".v3bw"]
)
# when dry_run is true, compress will also show all the files that
# would have been deleted, since they are not really deleted
_compress_files(v3bw_dname, files_to_compress, dry_run=args.dry_run)
def _clean_result_files(args, conf):
datadir = conf.getpath("paths", "datadir")
if not os.path.isdir(datadir):
fail_hard("%s does not exist", datadir)
compress_after_days = conf.getint(
"cleanup", "data_files_compress_after_days"
)
delete_after_days = conf.getint("cleanup", "data_files_delete_after_days")
# first delete so that the files to be deleted are not compressed first
files_to_delete = _get_files_mtime_older_than(
datadir, delete_after_days, [".txt", ".gz"]
)
_delete_files(datadir, files_to_delete, dry_run=args.dry_run)
# when dry_run is true, compress will also show all the files that
# would have been deleted, since they are not really deleted
files_to_compress = _get_files_mtime_older_than(
datadir, compress_after_days, [".txt"]
)
_compress_files(datadir, files_to_compress, dry_run=args.dry_run)
[docs]def main(args, conf):
"""
Main entry point in to the cleanup command.
:param argparse.Namespace args: command line arguments
:param configparser.ConfigParser conf: parsed config files
"""
if not args.no_results:
_clean_result_files(args, conf)
if not args.no_v3bw:
_clean_v3bw_files(args, conf)