Mforns has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/355601 )

Change subject: Add script to puge old mediawiki data snapshots
......................................................................

Add script to puge old mediawiki data snapshots

Bug: T162034
Change-Id: I7a6ce082dd539c9bd02433cb7db5368ec2c07369
---
A bin/refinery-drop-mediawiki-snapshots
1 file changed, 231 insertions(+), 0 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/analytics/refinery 
refs/changes/01/355601/1

diff --git a/bin/refinery-drop-mediawiki-snapshots 
b/bin/refinery-drop-mediawiki-snapshots
new file mode 100644
index 0000000..aab19d0
--- /dev/null
+++ b/bin/refinery-drop-mediawiki-snapshots
@@ -0,0 +1,231 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Note: You should make sure to put refinery/python on your PYTHONPATH.
+#   export PYTHONPATH=$PYTHONPATH:/path/to/refinery/python
+
+"""
+Automatically drops old partitions from the mediawiki raw and historical
+tables. See AFFECTED_TABLES dict for a comprehensive list.
+
+As this data sets are historical (they span from the beginning of time
+to latest import), the dimension used to determine which partitions need
+to be removed is not time, it's "snapshot". By default only 2 snapshots
+will be kept: the current snapshot plus the immediately previous one.
+
+Note: Private snapshots (snapshot=YYYY-MM_private) are not affected
+by this script.
+
+Usage: refinery-drop-mediawiki-snapshots [options]
+
+Options:
+    -h --help                       Show this help message and exit.
+    -s --keep-snapshots=<n>         Keep the <n> most recent snapshots. 
[default: 2]
+    -o --hive-options=<options>     Any valid Hive CLI options you want to 
pass to Hive commands.
+                                    Example: '--auxpath 
/path/to/hive-serdes-1.0-SNAPSHOT.jar'
+    -v --verbose                    Turn on verbose debug logging.
+    -n --dry-run                    Don't actually drop any partitions, just 
output Hive queries.
+"""
+
+
+from docopt import docopt
+from refinery.util import HiveUtils, HdfsUtils
+import datetime
+import logging
+import os
+import re
+import sys
+
+
+# Tables that have mediawiki snapshots to be managed
+# key: database, value: table
+AFFECTED_TABLES = {
+    'wmf_raw': [
+        'mediawiki_archive',
+        'mediawiki_ipblocks',
+        'mediawiki_logging',
+        'mediawiki_page',
+        'mediawiki_project_namespace_map',
+        'mediawiki_revision',
+        'mediawiki_user',
+        'mediawiki_user_groups'
+    ],
+    'wmf': [
+        'mediawiki_history',
+        'mediawiki_page_history',
+        'mediawiki_user_history'
+    ]
+}
+
+# Tables partitioned by wiki_db in addition to by snapshot
+WIKI_DB_TABLES = [
+    'mediawiki_archive',
+    'mediawiki_ipblocks',
+    'mediawiki_logging',
+    'mediawiki_page',
+    'mediawiki_revision',
+    'mediawiki_user',
+    'mediawiki_user_groups'
+]
+
+# Convenience global, will be initialized in main
+hive = None
+
+
+# Returns the age in days of a given partition spec
+TODAYS_ORDINAL = datetime.datetime.now().toordinal()
+SPEC_DATE_REGEX = 
re.compile(r'snapshot=(?P<year>[0-9]{4})-(?P<month>[0-9]{4})')
+def get_partition_age(partition):
+    partition_datetime = hive.partition_datetime_from_spec(
+        partition,
+        SPEC_DATE_REGEX
+    )
+    return TODAYS_ORDINAL - partition_datetime.toordinal()
+
+# Returns the partitions to be dropped given a hive table
+def get_partitions_to_drop(table, keep_snapshots):
+    partitions = hive.partitions(table)
+    spec_separator = HiveUtils.partition_spec_separator
+
+    # For tables partitioned also by wiki_db discard that dimension
+    # snapshot=2017-01,wiki_db=enwiki => snapshot=2017-01
+    if table in WIKI_DB_TABLES:
+        snapshots = set([])
+        for partition in partitions:
+            snapshot = partition.split(spec_separator)[0]
+            snapshots.add(snapshot)
+        partitions = list(snapshots)
+
+    # Filter out private snapshots
+    partitions = filter(lambda p: not p.endswith('_private'), partitions)
+
+    # Select partitions to drop (keep the most recent <keep_snapshots> ones)
+    partitions.sort(key=get_partition_age)
+    partitions_to_drop = partitions[keep_snapshots:]
+
+    # HACK: For tables partitioned by wiki_db, add wiki_db!='' to snapshot 
spec,
+    # so that HiveUtils deletes the whole snapshot partition with all wiki_bd
+    # sub-partitions in it.
+    if table in WIKI_DB_TABLES:
+        partitions_to_drop = [
+            spec_separator.join([p, "wiki_db!=''"])
+            for p in partitions_to_drop
+        ]
+    return partitions_to_drop
+
+# Returns the age in days of a given partition directory
+PATH_DATE_REGEX = re.compile(r'snapshot=([0-9]{4}-[0-9]{2})')
+PATH_DATE_FORMAT = '%Y-%m'
+def get_directory_age(path):
+    directory_datetime = hive.partition_datetime_from_path(
+        path,
+        PATH_DATE_REGEX,
+        PATH_DATE_FORMAT
+    )
+    return TODAYS_ORDINAL - directory_datetime.toordinal()
+
+# Returns the directories to be removed given a hive table
+def get_directories_to_remove(table, keep_snapshots):
+    table_location = hive.table_location(table)
+
+    # Get partition directories
+    glob = os.path.join(table_location, '*')
+    directories = HdfsUtils.ls(glob, include_children=False)
+
+    # Filter out private snapshots
+    directories = filter(lambda d: not d.endswith('_private'), directories)
+
+    # Select directories to drop (keep the most recent <keep_snapshots> ones)
+    directories.sort(key=get_directory_age)
+    return directories[keep_snapshots:]
+
+# Raises an error if partitions and directories do not match
+def check_partitions_vs_directories(partitions, directories):
+    spec_separator = HiveUtils.partition_spec_separator
+    partition_snapshots = set([p.split(spec_separator)[0] for p in partitions])
+    directory_snapshots = set([os.path.blah(d) for d in directories])
+    if partition_snapshots != directory_snapshots:
+        logging.error(
+            'Selected partitions extracted from table specs ({1}) '
+            'does not match selected partitions extracted from data paths 
({2}).'
+            .format(partition_snapshots, directory_snapshots)
+        )
+        sys.exit(1)
+
+# Drop given hive table partitions (if dry_run, just print)
+def drop_partitions(table, partitions, dry_run):
+    if partitions:
+        if dry_run:
+            print(hive.drop_partitions_ddl(table, partitions))
+        else:
+            logging.info(
+                'Dropping {0} partitions from table {1}.{2}'
+                .format(len(partitions), hive.database, table)
+            )
+            hive.drop_partitions(table, partitions)
+    else:
+        logging.info(
+            'No partitions need to be dropped for table {0}.{1}'
+            .format(hive.database, table)
+        )
+
+# Remove given data directories (if dry_run, just print)
+def remove_directories(table, directories, dry_run):
+    table_location = hive.table_location(table)
+    if directories:
+        if dry_run:
+            print('hdfs dfs -rm -R ' + ' '.join(directories))
+        else:
+            logging.info('Removing {0} directories from {1}.'
+                .format(len(directories), table_location)
+            )
+            HdfsUtils.rm(' '.join(directories))
+    else:
+        logging.info('No directories need to be removed for 
{0}'.format(table_location))
+
+
+if __name__ == '__main__':
+    # Parse arguments
+    arguments = docopt(__doc__)
+    keep_snapshots  = int(arguments['--keep-snapshots'])
+    hive_options    = arguments['--hive-options']
+    verbose         = arguments['--verbose']
+    dry_run         = arguments['--dry-run']
+
+    # Setup logging
+    log_level = logging.INFO
+    if verbose:
+        log_level = logging.DEBUG
+    logging.basicConfig(level=log_level,
+                        format='%(asctime)s %(levelname)-6s %(message)s',
+                        datefmt='%Y-%m-%dT%H:%M:%S')
+
+    # Check arguments
+    if keep_snapshots < 2:
+        logging.error('Option \'--keep-snapshots\' must be greater or equal 
than 2.')
+        sys.exit(1)
+
+    for database, tables in AFFECTED_TABLES.items():
+        # Instantiate HiveUtils
+        global hive
+        hive = HiveUtils(database, hive_options)
+
+        # Apply the cleaning to each table
+        for table in tables:
+            partitions = get_partitions_to_drop(table, keep_snapshots)
+            directories = get_directories_to_remove(table, keep_snapshots)
+            check_partitions_vs_directories(partitions, directories)
+            drop_partitions(table, partitions, dry_run)
+            remove_directories(table, directories, dry_run)

-- 
To view, visit https://gerrit.wikimedia.org/r/355601
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I7a6ce082dd539c9bd02433cb7db5368ec2c07369
Gerrit-PatchSet: 1
Gerrit-Project: analytics/refinery
Gerrit-Branch: master
Gerrit-Owner: Mforns <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to