Elukey has uploaded a new change for review. ( https://gerrit.wikimedia.org/r/355604 )
Change subject: [WIP] Add the eventlogging_cleaner script and base package ...................................................................... [WIP] Add the eventlogging_cleaner script and base package Bug: T156933 Change-Id: Ia428f39832f0b22c586c3ee7484953ae93fbc8d3 --- A AUTHORS A LICENSE.txt A README.md A eventlogging_cleaner/__init__.py A eventlogging_cleaner/eventlogging_cleaner.py A setup.py 6 files changed, 396 insertions(+), 0 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/operations/software/analytics-eventlogging-maintenance refs/changes/04/355604/1 diff --git a/AUTHORS b/AUTHORS new file mode 100644 index 0000000..cbafc5d --- /dev/null +++ b/AUTHORS @@ -0,0 +1,2 @@ +Luca Toscano <ltosc...@wikimedia.org> +Marcel Ruiz Forns <mfo...@wikimedia.org> diff --git a/LICENSE.txt b/LICENSE.txt new file mode 100644 index 0000000..c7a22c6 --- /dev/null +++ b/LICENSE.txt @@ -0,0 +1,15 @@ +Copyright (c) 2016-17 Wikimedia Foundation Inc. + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see <http://www.gnu.org/licenses/>. + diff --git a/README.md b/README.md new file mode 100644 index 0000000..0b162f8 --- /dev/null +++ b/README.md @@ -0,0 +1,26 @@ +analytics-eventlogging-maintenance documentation +============================= + +TODO + +Installation +------------ + +This software is known to work correctly with python 2.7 and 3.5 + +From Source +~~~~~~~~~~~ + +.. code:: bash + + $ python setup.py install + +Usage +----- + +TODO + +Limitations +----------- + +TODO diff --git a/eventlogging_cleaner/__init__.py b/eventlogging_cleaner/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/eventlogging_cleaner/__init__.py diff --git a/eventlogging_cleaner/eventlogging_cleaner.py b/eventlogging_cleaner/eventlogging_cleaner.py new file mode 100644 index 0000000..04abdd7 --- /dev/null +++ b/eventlogging_cleaner/eventlogging_cleaner.py @@ -0,0 +1,319 @@ +import argparse +import collections +import csv +from datetime import datetime, timedelta +import logging +import os +import pymysql +import re +import sys + + +DATE_FORMAT = '%Y%m%d%H%M%S' +BATCH_SIZE = 1000 + +# Fields that are always present due to the EventLogging Capsule. +# These ones are automatically whitelisted due to their importance. +COMMON_PERSISTENT_FIELDS = ['id', 'uuid', 'timestamp'] + +log = logging.getLogger(__name__) + + +class Database(object): + + def __init__(self, db_host, db_port, db_name): + self.db_host = db_host + self.db_name = db_name + self.db_port = db_port + self.log = logging.getLogger(self.__class__.__name__) + self.connection = pymysql.connect( + host=db_host, + port=db_port, + db=db_name, + user='root', + password='root', + autocommit=True, + charset='utf8', + use_unicode=True + ) + + def execute(self, command, commit=False, dry_run=False): + """ + Sends a single sql command to the server instance, + returns metadata about the execution and the resulting data. + """ + cursor = self.connection.cursor() + result = { + "query": command, + "host": self.db_host, + "port": self.db_port, + "database": self.db_name + } + try: + if dry_run: + self.log.info(( + "We will *NOT* execute \"{}\" on {}:{}/{} because " + "this is a dry run." + ).format(command, self.db_host, self.db_port, self.db_name)) + result.update({ + "success": True, + "fields": [], + "rows": [], + "numrows": 0 + }) + return result + else: + log.info("Executing command: " + command) + cursor.execute(command) + + fields = None + rows = None + if cursor.rowcount > 0: + rows = cursor.fetchall() + fields = [] if not cursor.description else tuple([x[0] for x in cursor.description]) + numrows = cursor.rowcount + cursor.close() + + result.update({ + "success": True, + "fields": fields, + "rows": rows, + "numrows": numrows + }) + return result + + except (pymysql.err.ProgrammingError, + pymysql.err.OperationalError) as e: + cursor.close() + result.update({ + "success": False, + "errno": e.args[0], + "errmsg": e.args[1] + }) + return result + + def get_all_tables(self): + command = ( + "SELECT table_name " + "FROM information_schema.tables " + "WHERE table_schema = '{}'" + ).format(self.db_name) + result = self.execute(command) + if not result['rows']: + log.error('No table found in database ' + self.db_name) + return [] + return [row[0] for row in result['rows']] + + def get_table_fields(self, table): + command = 'DESCRIBE {}'.format(table) + result = self.execute(command) + return [row[0] for row in result['rows']] + + def close_connection(self): + try: + self.connection.close() + except (pymysql.err.ProgrammingError, + pymysql.err.OperationalError): + log.exception("Failed to close the connection to the DB") + + +class Terminator(object): + + def __init__(self, database, whitelist, newer_than, older_than, dry_run): + self.reference_time = datetime.utcnow() + self.database = database + self.whitelist = whitelist + self.start = self.relative_ts(newer_than) + self.end = self.relative_ts(older_than) + self.dry_run = dry_run + + def relative_ts(self, days): + return (self.reference_time - timedelta(days=days))\ + .strftime(DATE_FORMAT) + + def purge(self, table): + """ + Drop all the rows in a give table with timestamp between + self.start and self.end. + """ + command = ( + "DELETE FROM {} " + "WHERE timestamp >= '{}' AND timestamp < '{}' " + "LIMIT {}" + ).format(table, self.start, self.end, BATCH_SIZE) + result = self.database.execute(command, self.dry_run) + while result['numrows'] > 0: + result = self.database.execute(command, self.dry_run) + + def _get_old_uuids(self, table, offset): + """ + Return a list of uuids between self.start and self.end limiting + the batch with a offset. + """ + command = ( + "SELECT uuid from {0} WHERE timestamp >= '{1}' AND timestamp < '{2}' " + "LIMIT {3} OFFSET {4}" + ).format(table, self.start, self.end, BATCH_SIZE, offset) + result = self.database.execute(command, self.dry_run) + if result['rows']: + return ["'" + x[0] + "'" for x in result['rows']] + else: + return [] + + def sanitize(self, table): + """ + Set all the fields not in the whitelist (for a given table) to NULL. + """ + fields = self.database.get_table_fields(table) + fields_to_keep = self.whitelist[table] + COMMON_PERSISTENT_FIELDS + fields_to_purge = filter(lambda f: f not in fields_to_keep, fields) + values_string = ','.join([field + ' = NULL' for field in fields_to_purge]) + offset = 0 + uuids = self._get_old_uuids(table, offset) + command_template = ( + "UPDATE {0} " + "SET {1} " + "WHERE uuid IN ({2})" + ).format(table, values_string, '{}') + while uuids and len(uuids) > 0: + result = self.database.execute(command_template.format(",".join(uuids)), self.dry_run) + if result['numrows'] == 0: + log.error('No rows updated after executing the SQL command, aborting.') + return + if len(uuids) < BATCH_SIZE: + # Avoid an extra SQL query to the database if the number of + # uuids returned are less than BATCH_SIZE, since this value + # means that we have already reached the last batch of uuids + # to sanitize. + uuids = [] + else: + offset += BATCH_SIZE + uuids = self._get_old_uuids(table, offset) + + +def parse_whitelist(rows): + """Parse rows containing tables and their attributes to whitelist + + Returns a hashmap with the following format: + - each key is a table name + - each value is a list of whitelisted fields + { + "tableName1": ["fieldName1", "fieldName2", ...], + "tableName2": [...], + ... + } + """ + whitelist_hash = collections.defaultdict(list) + allowed_tablename_format = re.compile("^[A-Za-z0-9_]+$") + allowed_fieldname_format = re.compile("^[A-Za-z0-9_.]+$") + lineno = 0 + for row in rows: + lineno += 1 + if len(row) != 2: + raise RuntimeError('Error in the whitelist, line %d: ' + '2 elements per row allowed ' + '(tab to separate them).' % lineno) + + table_name = row[0].strip() + field_name = row[1].strip() + + if not allowed_tablename_format.match(table_name): + raise RuntimeError('Error in the whitelist: table name {} not ' + 'following the allowed format (^[A-Za-z0-9_]+$)' + .format(table_name)) + + if not allowed_fieldname_format.match(field_name): + raise RuntimeError('Error in the whitelist: field {} not ' + 'following the allowed format ' + '(^[A-Za-z0-9_.]+$)' + .format(field_name)) + + if field_name not in whitelist_hash[table_name]: + whitelist_hash[table_name].append(field_name) + else: + raise RuntimeError('Error in the whitelist: field {} ' + 'is listed multiple times.' + .format(field_name)) + + return whitelist_hash + + +if __name__ == '__main__': + # Define argument parser + parser = argparse.ArgumentParser(description='EventLogging data ' + 'retention script') + parser.add_argument('dbhostname', help='The target db hostname to purge') + parser.add_argument('--whitelist', default="whitelist.tsv", + help='The full path of the TSV whitelist file ' + '(default: whitelist.tsv)') + parser.add_argument('--dbport', default=3306, + help='The target db port (default: 3306)') + parser.add_argument('--dbname', default='log', + help='The EventLogging database name (default: log)') + parser.add_argument('--older-than', dest='older_than', default=90, + help='Delete logs older than this number of days' + ' (default: 90)') + parser.add_argument('--newer-than', dest='newer_than', default=120, + help='Delete logs newer than this number of days' + ' (default: 91)') + parser.add_argument('--dry-run', dest='dry_run', action='store_true', + help='Only print sql commands without executing them') + parser.add_argument('--logfile', dest='logfile', default=None, + help='Redirect the script\'s output to a file rather ' + 'than stdout') + args = parser.parse_args() + + log_format = ('%(levelname)s: line %(lineno)d: %(message)s') + + if args.logfile: + logging.basicConfig( + filename=args.filename, + level=logging.INFO, + format=log_format + ) + else: + logging.basicConfig( + stream=sys.stdout, + level=logging.INFO, + format=log_format + ) + + # Args basic checks + if not os.path.exists(args.whitelist): + log.error('The whitelist filepath provided does not exist') + exit(1) + + try: + # Parse whitelist file + with open(args.whitelist, 'r') as whitelist_fd: + lines = csv.reader(whitelist_fd, delimiter='\t') + whitelist = parse_whitelist(lines) + + # Connect to the database + database = Database(args.dbhostname, int(args.dbport), args.dbname) + + # Apply the retention policy to each table + tables = database.get_all_tables() + terminator = Terminator( + database, + whitelist, + args.newer_than, + args.older_than, + args.dry_run + ) + # Two purging methods: + # 1) if the table is not the in the whitelist it means that no field + # needs to be preserved, hence the rows can just be deleted. + # 2) if the table is in the whitelist, the rows needs to be updated + # with all the fields not whitelisted set as NULL. + for table in tables: + if table not in whitelist: + terminator.purge(table) + else: + terminator.sanitize(table) + + database.close_connection() + + except Exception as e: + log.exception("Exception while running main") + exit(1) diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..76ba957 --- /dev/null +++ b/setup.py @@ -0,0 +1,34 @@ +from setuptools import setup, find_packages +import os + +here = os.path.abspath(os.path.dirname(__file__)) +README = open(os.path.join(here, 'README.rst')).read() + +version = '0.0.1' + +install_requires = [ + 'PyMySQL>=0.7.11', +] + +test_requires = [ + 'mock', + 'nose', +] + +setup( + name='eventlogging_cleaner', + version=version, + description="Scripts used for EventLogging DB maintenance", + long_description=README, + author='Luca Toscano', + author_email='ltosc...@wikimedia.org', + url='', + license='GPL', + packages=find_packages(), + zip_safe=False, + install_requires=install_requires, + tests_require=test_requires, + test_suite='nose.collector', + entry_points={ + }, +) -- To view, visit https://gerrit.wikimedia.org/r/355604 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Ia428f39832f0b22c586c3ee7484953ae93fbc8d3 Gerrit-PatchSet: 1 Gerrit-Project: operations/software/analytics-eventlogging-maintenance Gerrit-Branch: master Gerrit-Owner: Elukey <ltosc...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits