Mforns has uploaded a new change for review. ( https://gerrit.wikimedia.org/r/355601 )
Change subject: Add script to puge old mediawiki data snapshots ...................................................................... Add script to puge old mediawiki data snapshots Bug: T162034 Change-Id: I7a6ce082dd539c9bd02433cb7db5368ec2c07369 --- A bin/refinery-drop-mediawiki-snapshots 1 file changed, 231 insertions(+), 0 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/analytics/refinery refs/changes/01/355601/1 diff --git a/bin/refinery-drop-mediawiki-snapshots b/bin/refinery-drop-mediawiki-snapshots new file mode 100644 index 0000000..aab19d0 --- /dev/null +++ b/bin/refinery-drop-mediawiki-snapshots @@ -0,0 +1,231 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Note: You should make sure to put refinery/python on your PYTHONPATH. +# export PYTHONPATH=$PYTHONPATH:/path/to/refinery/python + +""" +Automatically drops old partitions from the mediawiki raw and historical +tables. See AFFECTED_TABLES dict for a comprehensive list. + +As this data sets are historical (they span from the beginning of time +to latest import), the dimension used to determine which partitions need +to be removed is not time, it's "snapshot". By default only 2 snapshots +will be kept: the current snapshot plus the immediately previous one. + +Note: Private snapshots (snapshot=YYYY-MM_private) are not affected +by this script. + +Usage: refinery-drop-mediawiki-snapshots [options] + +Options: + -h --help Show this help message and exit. + -s --keep-snapshots=<n> Keep the <n> most recent snapshots. [default: 2] + -o --hive-options=<options> Any valid Hive CLI options you want to pass to Hive commands. + Example: '--auxpath /path/to/hive-serdes-1.0-SNAPSHOT.jar' + -v --verbose Turn on verbose debug logging. + -n --dry-run Don't actually drop any partitions, just output Hive queries. +""" + + +from docopt import docopt +from refinery.util import HiveUtils, HdfsUtils +import datetime +import logging +import os +import re +import sys + + +# Tables that have mediawiki snapshots to be managed +# key: database, value: table +AFFECTED_TABLES = { + 'wmf_raw': [ + 'mediawiki_archive', + 'mediawiki_ipblocks', + 'mediawiki_logging', + 'mediawiki_page', + 'mediawiki_project_namespace_map', + 'mediawiki_revision', + 'mediawiki_user', + 'mediawiki_user_groups' + ], + 'wmf': [ + 'mediawiki_history', + 'mediawiki_page_history', + 'mediawiki_user_history' + ] +} + +# Tables partitioned by wiki_db in addition to by snapshot +WIKI_DB_TABLES = [ + 'mediawiki_archive', + 'mediawiki_ipblocks', + 'mediawiki_logging', + 'mediawiki_page', + 'mediawiki_revision', + 'mediawiki_user', + 'mediawiki_user_groups' +] + +# Convenience global, will be initialized in main +hive = None + + +# Returns the age in days of a given partition spec +TODAYS_ORDINAL = datetime.datetime.now().toordinal() +SPEC_DATE_REGEX = re.compile(r'snapshot=(?P<year>[0-9]{4})-(?P<month>[0-9]{4})') +def get_partition_age(partition): + partition_datetime = hive.partition_datetime_from_spec( + partition, + SPEC_DATE_REGEX + ) + return TODAYS_ORDINAL - partition_datetime.toordinal() + +# Returns the partitions to be dropped given a hive table +def get_partitions_to_drop(table, keep_snapshots): + partitions = hive.partitions(table) + spec_separator = HiveUtils.partition_spec_separator + + # For tables partitioned also by wiki_db discard that dimension + # snapshot=2017-01,wiki_db=enwiki => snapshot=2017-01 + if table in WIKI_DB_TABLES: + snapshots = set([]) + for partition in partitions: + snapshot = partition.split(spec_separator)[0] + snapshots.add(snapshot) + partitions = list(snapshots) + + # Filter out private snapshots + partitions = filter(lambda p: not p.endswith('_private'), partitions) + + # Select partitions to drop (keep the most recent <keep_snapshots> ones) + partitions.sort(key=get_partition_age) + partitions_to_drop = partitions[keep_snapshots:] + + # HACK: For tables partitioned by wiki_db, add wiki_db!='' to snapshot spec, + # so that HiveUtils deletes the whole snapshot partition with all wiki_bd + # sub-partitions in it. + if table in WIKI_DB_TABLES: + partitions_to_drop = [ + spec_separator.join([p, "wiki_db!=''"]) + for p in partitions_to_drop + ] + return partitions_to_drop + +# Returns the age in days of a given partition directory +PATH_DATE_REGEX = re.compile(r'snapshot=([0-9]{4}-[0-9]{2})') +PATH_DATE_FORMAT = '%Y-%m' +def get_directory_age(path): + directory_datetime = hive.partition_datetime_from_path( + path, + PATH_DATE_REGEX, + PATH_DATE_FORMAT + ) + return TODAYS_ORDINAL - directory_datetime.toordinal() + +# Returns the directories to be removed given a hive table +def get_directories_to_remove(table, keep_snapshots): + table_location = hive.table_location(table) + + # Get partition directories + glob = os.path.join(table_location, '*') + directories = HdfsUtils.ls(glob, include_children=False) + + # Filter out private snapshots + directories = filter(lambda d: not d.endswith('_private'), directories) + + # Select directories to drop (keep the most recent <keep_snapshots> ones) + directories.sort(key=get_directory_age) + return directories[keep_snapshots:] + +# Raises an error if partitions and directories do not match +def check_partitions_vs_directories(partitions, directories): + spec_separator = HiveUtils.partition_spec_separator + partition_snapshots = set([p.split(spec_separator)[0] for p in partitions]) + directory_snapshots = set([os.path.blah(d) for d in directories]) + if partition_snapshots != directory_snapshots: + logging.error( + 'Selected partitions extracted from table specs ({1}) ' + 'does not match selected partitions extracted from data paths ({2}).' + .format(partition_snapshots, directory_snapshots) + ) + sys.exit(1) + +# Drop given hive table partitions (if dry_run, just print) +def drop_partitions(table, partitions, dry_run): + if partitions: + if dry_run: + print(hive.drop_partitions_ddl(table, partitions)) + else: + logging.info( + 'Dropping {0} partitions from table {1}.{2}' + .format(len(partitions), hive.database, table) + ) + hive.drop_partitions(table, partitions) + else: + logging.info( + 'No partitions need to be dropped for table {0}.{1}' + .format(hive.database, table) + ) + +# Remove given data directories (if dry_run, just print) +def remove_directories(table, directories, dry_run): + table_location = hive.table_location(table) + if directories: + if dry_run: + print('hdfs dfs -rm -R ' + ' '.join(directories)) + else: + logging.info('Removing {0} directories from {1}.' + .format(len(directories), table_location) + ) + HdfsUtils.rm(' '.join(directories)) + else: + logging.info('No directories need to be removed for {0}'.format(table_location)) + + +if __name__ == '__main__': + # Parse arguments + arguments = docopt(__doc__) + keep_snapshots = int(arguments['--keep-snapshots']) + hive_options = arguments['--hive-options'] + verbose = arguments['--verbose'] + dry_run = arguments['--dry-run'] + + # Setup logging + log_level = logging.INFO + if verbose: + log_level = logging.DEBUG + logging.basicConfig(level=log_level, + format='%(asctime)s %(levelname)-6s %(message)s', + datefmt='%Y-%m-%dT%H:%M:%S') + + # Check arguments + if keep_snapshots < 2: + logging.error('Option \'--keep-snapshots\' must be greater or equal than 2.') + sys.exit(1) + + for database, tables in AFFECTED_TABLES.items(): + # Instantiate HiveUtils + global hive + hive = HiveUtils(database, hive_options) + + # Apply the cleaning to each table + for table in tables: + partitions = get_partitions_to_drop(table, keep_snapshots) + directories = get_directories_to_remove(table, keep_snapshots) + check_partitions_vs_directories(partitions, directories) + drop_partitions(table, partitions, dry_run) + remove_directories(table, directories, dry_run) -- To view, visit https://gerrit.wikimedia.org/r/355601 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I7a6ce082dd539c9bd02433cb7db5368ec2c07369 Gerrit-PatchSet: 1 Gerrit-Project: analytics/refinery Gerrit-Branch: master Gerrit-Owner: Mforns <[email protected]> _______________________________________________ MediaWiki-commits mailing list [email protected] https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits
