Elukey has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/355604 )

Change subject: [WIP] Add the eventlogging_cleaner script and base package
......................................................................

[WIP] Add the eventlogging_cleaner script and base package

Bug: T156933
Change-Id: Ia428f39832f0b22c586c3ee7484953ae93fbc8d3
---
A AUTHORS
A LICENSE.txt
A README.md
A eventlogging_cleaner/__init__.py
A eventlogging_cleaner/eventlogging_cleaner.py
A setup.py
6 files changed, 396 insertions(+), 0 deletions(-)


  git pull 
ssh://gerrit.wikimedia.org:29418/operations/software/analytics-eventlogging-maintenance
 refs/changes/04/355604/1

diff --git a/AUTHORS b/AUTHORS
new file mode 100644
index 0000000..cbafc5d
--- /dev/null
+++ b/AUTHORS
@@ -0,0 +1,2 @@
+Luca Toscano <ltosc...@wikimedia.org>
+Marcel Ruiz Forns <mfo...@wikimedia.org>
diff --git a/LICENSE.txt b/LICENSE.txt
new file mode 100644
index 0000000..c7a22c6
--- /dev/null
+++ b/LICENSE.txt
@@ -0,0 +1,15 @@
+Copyright (c) 2016-17 Wikimedia Foundation Inc.
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..0b162f8
--- /dev/null
+++ b/README.md
@@ -0,0 +1,26 @@
+analytics-eventlogging-maintenance documentation
+=============================
+
+TODO
+
+Installation
+------------
+
+This software is known to work correctly with python 2.7 and 3.5
+
+From Source
+~~~~~~~~~~~
+
+.. code:: bash
+
+    $ python setup.py install
+
+Usage
+-----
+
+TODO
+
+Limitations
+-----------
+
+TODO
diff --git a/eventlogging_cleaner/__init__.py b/eventlogging_cleaner/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/eventlogging_cleaner/__init__.py
diff --git a/eventlogging_cleaner/eventlogging_cleaner.py 
b/eventlogging_cleaner/eventlogging_cleaner.py
new file mode 100644
index 0000000..04abdd7
--- /dev/null
+++ b/eventlogging_cleaner/eventlogging_cleaner.py
@@ -0,0 +1,319 @@
+import argparse
+import collections
+import csv
+from datetime import datetime, timedelta
+import logging
+import os
+import pymysql
+import re
+import sys
+
+
+DATE_FORMAT = '%Y%m%d%H%M%S'
+BATCH_SIZE = 1000
+
+# Fields that are always present due to the EventLogging Capsule.
+# These ones are automatically whitelisted due to their importance.
+COMMON_PERSISTENT_FIELDS = ['id', 'uuid', 'timestamp']
+
+log = logging.getLogger(__name__)
+
+
+class Database(object):
+
+    def __init__(self, db_host, db_port, db_name):
+        self.db_host = db_host
+        self.db_name = db_name
+        self.db_port = db_port
+        self.log = logging.getLogger(self.__class__.__name__)
+        self.connection = pymysql.connect(
+            host=db_host,
+            port=db_port,
+            db=db_name,
+            user='root',
+            password='root',
+            autocommit=True,
+            charset='utf8',
+            use_unicode=True
+        )
+
+    def execute(self, command, commit=False, dry_run=False):
+        """
+        Sends a single sql command to the server instance,
+        returns metadata about the execution and the resulting data.
+        """
+        cursor = self.connection.cursor()
+        result = {
+            "query": command,
+            "host": self.db_host,
+            "port": self.db_port,
+            "database": self.db_name
+        }
+        try:
+            if dry_run:
+                self.log.info((
+                    "We will *NOT* execute \"{}\" on {}:{}/{} because "
+                    "this is a dry run."
+                ).format(command, self.db_host, self.db_port, self.db_name))
+                result.update({
+                    "success": True,
+                    "fields": [],
+                    "rows": [],
+                    "numrows": 0
+                })
+                return result
+            else:
+                log.info("Executing command: " + command)
+                cursor.execute(command)
+
+            fields = None
+            rows = None
+            if cursor.rowcount > 0:
+                rows = cursor.fetchall()
+                fields = [] if not cursor.description else tuple([x[0] for x 
in cursor.description])
+            numrows = cursor.rowcount
+            cursor.close()
+
+            result.update({
+                "success": True,
+                "fields": fields,
+                "rows": rows,
+                "numrows": numrows
+            })
+            return result
+
+        except (pymysql.err.ProgrammingError,
+                pymysql.err.OperationalError) as e:
+            cursor.close()
+            result.update({
+                "success": False,
+                "errno": e.args[0],
+                "errmsg": e.args[1]
+            })
+            return result
+
+    def get_all_tables(self):
+        command = (
+            "SELECT table_name "
+            "FROM information_schema.tables "
+            "WHERE table_schema = '{}'"
+        ).format(self.db_name)
+        result = self.execute(command)
+        if not result['rows']:
+            log.error('No table found in database ' + self.db_name)
+            return []
+        return [row[0] for row in result['rows']]
+
+    def get_table_fields(self, table):
+        command = 'DESCRIBE {}'.format(table)
+        result = self.execute(command)
+        return [row[0] for row in result['rows']]
+
+    def close_connection(self):
+        try:
+            self.connection.close()
+        except (pymysql.err.ProgrammingError,
+                pymysql.err.OperationalError):
+            log.exception("Failed to close the connection to the DB")
+
+
+class Terminator(object):
+
+    def __init__(self, database, whitelist, newer_than, older_than, dry_run):
+        self.reference_time = datetime.utcnow()
+        self.database = database
+        self.whitelist = whitelist
+        self.start = self.relative_ts(newer_than)
+        self.end = self.relative_ts(older_than)
+        self.dry_run = dry_run
+
+    def relative_ts(self, days):
+        return (self.reference_time - timedelta(days=days))\
+            .strftime(DATE_FORMAT)
+
+    def purge(self, table):
+        """
+        Drop all the rows in a give table with timestamp between
+        self.start and self.end.
+        """
+        command = (
+            "DELETE FROM {} "
+            "WHERE timestamp >= '{}' AND timestamp < '{}' "
+            "LIMIT {}"
+        ).format(table, self.start, self.end, BATCH_SIZE)
+        result = self.database.execute(command, self.dry_run)
+        while result['numrows'] > 0:
+            result = self.database.execute(command, self.dry_run)
+
+    def _get_old_uuids(self, table, offset):
+        """
+        Return a list of uuids between self.start and self.end limiting
+        the batch with a offset.
+        """
+        command = (
+            "SELECT uuid from {0} WHERE timestamp >= '{1}' AND timestamp < 
'{2}' "
+            "LIMIT {3} OFFSET {4}"
+        ).format(table, self.start, self.end, BATCH_SIZE, offset)
+        result = self.database.execute(command, self.dry_run)
+        if result['rows']:
+            return ["'" + x[0] + "'" for x in result['rows']]
+        else:
+            return []
+
+    def sanitize(self, table):
+        """
+        Set all the fields not in the whitelist (for a given table) to NULL.
+        """
+        fields = self.database.get_table_fields(table)
+        fields_to_keep = self.whitelist[table] + COMMON_PERSISTENT_FIELDS
+        fields_to_purge = filter(lambda f: f not in fields_to_keep, fields)
+        values_string = ','.join([field + ' = NULL' for field in 
fields_to_purge])
+        offset = 0
+        uuids = self._get_old_uuids(table, offset)
+        command_template = (
+            "UPDATE {0} "
+            "SET {1} "
+            "WHERE uuid IN ({2})"
+        ).format(table, values_string, '{}')
+        while uuids and len(uuids) > 0:
+            result = 
self.database.execute(command_template.format(",".join(uuids)), self.dry_run)
+            if result['numrows'] == 0:
+                log.error('No rows updated after executing the SQL command, 
aborting.')
+                return
+            if len(uuids) < BATCH_SIZE:
+                # Avoid an extra SQL query to the database if the number of
+                # uuids returned are less than BATCH_SIZE, since this value
+                # means that we have already reached the last batch of uuids
+                # to sanitize.
+                uuids = []
+            else:
+                offset += BATCH_SIZE
+                uuids = self._get_old_uuids(table, offset)
+
+
+def parse_whitelist(rows):
+    """Parse rows containing tables and their attributes to whitelist
+
+    Returns a hashmap with the following format:
+    - each key is a table name
+    - each value is a list of whitelisted fields
+    {
+        "tableName1": ["fieldName1", "fieldName2", ...],
+        "tableName2": [...],
+        ...
+    }
+    """
+    whitelist_hash = collections.defaultdict(list)
+    allowed_tablename_format = re.compile("^[A-Za-z0-9_]+$")
+    allowed_fieldname_format = re.compile("^[A-Za-z0-9_.]+$")
+    lineno = 0
+    for row in rows:
+        lineno += 1
+        if len(row) != 2:
+            raise RuntimeError('Error in the whitelist, line %d: '
+                               '2 elements per row allowed '
+                               '(tab to separate them).' % lineno)
+
+        table_name = row[0].strip()
+        field_name = row[1].strip()
+
+        if not allowed_tablename_format.match(table_name):
+            raise RuntimeError('Error in the whitelist: table name {} not '
+                               'following the allowed format (^[A-Za-z0-9_]+$)'
+                               .format(table_name))
+
+        if not allowed_fieldname_format.match(field_name):
+            raise RuntimeError('Error in the whitelist: field {} not '
+                               'following the allowed format '
+                               '(^[A-Za-z0-9_.]+$)'
+                               .format(field_name))
+
+        if field_name not in whitelist_hash[table_name]:
+            whitelist_hash[table_name].append(field_name)
+        else:
+            raise RuntimeError('Error in the whitelist: field {} '
+                               'is listed multiple times.'
+                               .format(field_name))
+
+    return whitelist_hash
+
+
+if __name__ == '__main__':
+    # Define argument parser
+    parser = argparse.ArgumentParser(description='EventLogging data '
+                                                 'retention script')
+    parser.add_argument('dbhostname', help='The target db hostname to purge')
+    parser.add_argument('--whitelist', default="whitelist.tsv",
+                        help='The full path of the TSV whitelist file '
+                             '(default: whitelist.tsv)')
+    parser.add_argument('--dbport', default=3306,
+                        help='The target db port (default: 3306)')
+    parser.add_argument('--dbname', default='log',
+                        help='The EventLogging database name (default: log)')
+    parser.add_argument('--older-than', dest='older_than', default=90,
+                        help='Delete logs older than this number of days'
+                        ' (default: 90)')
+    parser.add_argument('--newer-than', dest='newer_than', default=120,
+                        help='Delete logs newer than this number of days'
+                        ' (default: 91)')
+    parser.add_argument('--dry-run', dest='dry_run', action='store_true',
+                        help='Only print sql commands without executing them')
+    parser.add_argument('--logfile', dest='logfile', default=None,
+                        help='Redirect the script\'s output to a file rather '
+                             'than stdout')
+    args = parser.parse_args()
+
+    log_format = ('%(levelname)s: line %(lineno)d: %(message)s')
+
+    if args.logfile:
+        logging.basicConfig(
+            filename=args.filename,
+            level=logging.INFO,
+            format=log_format
+        )
+    else:
+        logging.basicConfig(
+            stream=sys.stdout,
+            level=logging.INFO,
+            format=log_format
+        )
+
+    # Args basic checks
+    if not os.path.exists(args.whitelist):
+        log.error('The whitelist filepath provided does not exist')
+        exit(1)
+
+    try:
+        # Parse whitelist file
+        with open(args.whitelist, 'r') as whitelist_fd:
+            lines = csv.reader(whitelist_fd, delimiter='\t')
+            whitelist = parse_whitelist(lines)
+
+        # Connect to the database
+        database = Database(args.dbhostname, int(args.dbport), args.dbname)
+
+        # Apply the retention policy to each table
+        tables = database.get_all_tables()
+        terminator = Terminator(
+            database,
+            whitelist,
+            args.newer_than,
+            args.older_than,
+            args.dry_run
+        )
+        # Two purging methods:
+        # 1) if the table is not the in the whitelist it means that no field
+        #    needs to be preserved, hence the rows can just be deleted.
+        # 2) if the table is in the whitelist, the rows needs to be updated
+        #    with all the fields not whitelisted set as NULL.
+        for table in tables:
+            if table not in whitelist:
+                terminator.purge(table)
+            else:
+                terminator.sanitize(table)
+
+        database.close_connection()
+
+    except Exception as e:
+        log.exception("Exception while running main")
+        exit(1)
diff --git a/setup.py b/setup.py
new file mode 100644
index 0000000..76ba957
--- /dev/null
+++ b/setup.py
@@ -0,0 +1,34 @@
+from setuptools import setup, find_packages
+import os
+
+here = os.path.abspath(os.path.dirname(__file__))
+README = open(os.path.join(here, 'README.rst')).read()
+
+version = '0.0.1'
+
+install_requires = [
+    'PyMySQL>=0.7.11',
+]
+
+test_requires = [
+    'mock',
+    'nose',
+]
+
+setup(
+    name='eventlogging_cleaner',
+    version=version,
+    description="Scripts used for EventLogging DB maintenance",
+    long_description=README,
+    author='Luca Toscano',
+    author_email='ltosc...@wikimedia.org',
+    url='',
+    license='GPL',
+    packages=find_packages(),
+    zip_safe=False,
+    install_requires=install_requires,
+    tests_require=test_requires,
+    test_suite='nose.collector',
+    entry_points={
+    },
+)

-- 
To view, visit https://gerrit.wikimedia.org/r/355604
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ia428f39832f0b22c586c3ee7484953ae93fbc8d3
Gerrit-PatchSet: 1
Gerrit-Project: operations/software/analytics-eventlogging-maintenance
Gerrit-Branch: master
Gerrit-Owner: Elukey <ltosc...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to