EBernhardson has uploaded a new change for review. ( https://gerrit.wikimedia.org/r/364937 )
Change subject: [WIP] Take the desired number of samples in data_pipeline.py ...................................................................... [WIP] Take the desired number of samples in data_pipeline.py The sampling rate in the data pipeline was done by having the user provide the number of normalized queries to work with, but this isn't a value the user ever cares about. What they actually care about is how many rows are in the output data per-wiki. TODO: This keeps blowing out the memory limits on some executors, but I'm not sure why yet. Change-Id: Ic4f1abc9e33cdc9c3d7bc346355a750c8ce1ef68 --- M mjolnir/cli/data_pipeline.py M mjolnir/norm_query.py M mjolnir/sampling.py 3 files changed, 59 insertions(+), 50 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/search/MjoLniR refs/changes/37/364937/1 diff --git a/mjolnir/cli/data_pipeline.py b/mjolnir/cli/data_pipeline.py index e79db75..10199dd 100644 --- a/mjolnir/cli/data_pipeline.py +++ b/mjolnir/cli/data_pipeline.py @@ -26,7 +26,7 @@ } -def main(sc, sqlContext, input_dir, output_dir, wikis, queries_per_wiki, +def main(sc, sqlContext, input_dir, output_dir, wikis, samples_per_wiki, min_sessions_per_query, search_cluster, brokers): # TODO: Should this jar have to be provided on the command line instead? sqlContext.sql("ADD JAR /mnt/hdfs/wmf/refinery/current/artifacts/refinery-hive.jar") @@ -63,10 +63,8 @@ df_sampled = ( mjolnir.sampling.sample( df_norm, - wikis=wikis, seed=54321, - queries_per_wiki=queries_per_wiki, - min_sessions_per_query=min_sessions_per_query) + samples_per_wiki=samples_per_wiki) # Explode source into a row per displayed hit .select('*', F.expr("posexplode(hit_page_ids)").alias('hit_position', 'hit_page_id')) .drop('hit_page_ids') @@ -165,8 +163,8 @@ default='hdfs://analytics-hadoop/wmf/data/discovery/query_clicks/daily/year=*/month=*/day=*', help='Input path, prefixed with hdfs://, to query and click data') parser.add_argument( - '-q', '--queries-per-wiki', dest='queries_per_wiki', type=int, default=20000, - help='The number of normalized queries, per wiki, to operate on') + '-q', '--samples-per-wiki', dest='samples_per_wiki', type=int, default=1000000, + help='The approximate number of rows in the final result per-wiki.') parser.add_argument( '-s', '--min-sessions', dest='min_sessions_per_query', type=int, default=10, help='The minimum number of sessions per normalized query') diff --git a/mjolnir/norm_query.py b/mjolnir/norm_query.py index 301ad15..181d700 100644 --- a/mjolnir/norm_query.py +++ b/mjolnir/norm_query.py @@ -112,7 +112,7 @@ return zip([row.query for row in source], groups) -def transform(df, url_list, indices=None, batch_size=30, top_n=5, min_sessions_per_query=35, +def transform(df, url_list, indices=None, batch_size=15, top_n=5, min_sessions_per_query=35, session_factory=requests.Session): """Group together similar results in df diff --git a/mjolnir/sampling.py b/mjolnir/sampling.py index c9f9a7c..c6f3aa0 100644 --- a/mjolnir/sampling.py +++ b/mjolnir/sampling.py @@ -82,7 +82,7 @@ return splits + [float('inf')] -def _sample_queries(df, wikis, num_buckets=100, samples_desired=10000, seed=None): +def _sample_queries(df, wikis, wiki_percents, num_buckets=100, seed=None): """Sample down a unique list of (wiki, norm_query_id, num_sessions) Given a dataset of unique queries, sample it down to samples_desired per wiki @@ -99,14 +99,11 @@ ---------- df : pyspark.sql.DataFrame Input dataframe containing (wiki, norm_query_id, num_sessions) fields. - wikis : list of strings - List of wikis to generate samples for. + wiki_percents : dict + Map from wikiid to the fraction of norm_query_ids to use from that wiki. num_buckets : int, optional The number of buckets to divide each wiki's queries into. An equal number of queries will be sampled from each bucket. (Default: 100) - samples_desired : int, optional - The approximate total number of samples to return per wiki. - (Default: 10000) seed : int or None, optional Seed used for random sampling. (Default: None) @@ -123,25 +120,17 @@ wiki_splits = {} # Map from (wikiid, split) -> % of samples needed. Used by RDD.sampleByKey wiki_fractions = {} - for wiki in wikis: - df_wiki = df.where(df.wikiid == wiki).cache() - try: - num_rows = df_wiki.count() - # If we have less than the desired amount of data no sampling is needed - if num_rows < samples_desired: - wiki_fractions[(wiki, float('inf'))] = 1. - wiki_splits[wiki] = [float('inf')] - continue + for wiki, fraction in wiki_percents.items(): + # If we have less than the desired amount of data no sampling is needed + if fraction >= 1.: + wiki_fractions[(wiki, float('inf'))] = 1. + wiki_splits[wiki] = [float('inf')] + continue - # Number of source rows expected in each bucket - bucket_rows = float(num_rows) / num_buckets - # Fraction of rows needed from each bucket - bucket_fraction = bucket_samples / bucket_rows - wiki_splits[wiki] = _calc_splits(df_wiki, num_buckets) - for split in wiki_splits[wiki]: - wiki_fractions[(wiki, split)] = bucket_fraction - finally: - df_wiki.unpersist() + df_wiki = df.where(df.wikiid == wiki) + wiki_splits[wiki] = _calc_splits(df_wiki, num_buckets) + for split in wiki_splits[wiki]: + wiki_fractions[(wiki, split)] = fraction def to_pair_rdd(row): splits = wiki_splits[row.wikiid] @@ -163,8 +152,7 @@ .toDF(['wikiid', 'norm_query_id'])) -def sample(df, wikis, seed=None, queries_per_wiki=10000, - min_sessions_per_query=35, max_queries_per_ip_day=50): +def sample(df, seed=None, samples_per_wiki=1000000): """Choose a representative sample of queries from input dataframe. Takes in the unsampled query click logs and filters it down into a smaller @@ -177,21 +165,13 @@ ---------- df : pyspark.sql.DataFrame Input dataframe with columns wikiid, query, and session_id. - wikis : set of strings - The set of wikis to sample for. Many wikis will not have enough data - to generate reasonable ml models. TODO: Should we instead define a - minimum size to require? seed : int or None, optional The random seed used when sampling. If None a seed will be chosen randomly. (Default: None) - queries_per_wiki : int, optional - The desired number of distinct normalized queries per wikiid in the + samples_per_wiki : int, optional + The desired number of distinct (query, hit_page_id) pairs in the output. This constraint is approximate and the returned number - of queries may slightly vary per wiki. (Default: 10000) - min_sessions_per_query : int, optional - Require each chosen query to have at least this many sessions per - query. This is necessary To train the DBN later in the pipeline. - (Default: 35) + of queries may vary per wiki. (Default: 1000000) Returns ------- @@ -199,12 +179,39 @@ The input DataFrame with all columns it origionally had sampled down based on the provided constraints. """ - mjolnir.spark.assert_columns(df, ['wikiid', 'norm_query_id', 'session_id']) + mjolnir.spark.assert_columns(df, ['wikiid', 'query', 'hit_page_ids', 'norm_query_id', 'session_id']) + + # We need this df twice, and by default the df coming in here is from + # mjolnir.norm_query which is quite expensive. + df.cache() + + # Figure out the percentage of each wiki's norm_query_id's we need to approximately + # have hits_per_wiki final training samples. + hit_page_id_counts = ( + df + .select('wikiid', 'query', F.explode('hit_page_ids').alias('hit_page_id')) + # We could groupBy('wikiid').agg(F.countDistinct('query', 'hit_page_id')) + # directly, but this causes spark to blow out memory limits by + # collecting too much data on too few executors. + .groupBy('wikiid', 'query') + .agg(F.countDistinct('hit_page_id').alias('num_hit_page_ids')) + .groupBy('wikiid') + .agg(F.sum('num_hit_page_ids').alias('num_hit_page_ids')) + .collect()) + + wiki_percents = {} + needs_sampling = False + for row in hit_page_id_counts: + wiki_percents[row.wikiid] = min(1., float(hits_per_wiki) / row.num_hit_page_ids) + if wiki_percents[row.wikiid] < 1.: + needs_sampling = True + + if not needs_sampling: + return df # Aggregate down into a unique set of (wikiid, norm_query_id) and add in a - # count of the number of unique sessions per pair. Filter on the number - # of sessions as we need some minimum number of sessions per query to train - # the DBN + # count of the number of unique sessions per pair. We will sample per-strata + # based on percentiles of num_sessions. df_queries_unique = ( df .groupBy('wikiid', 'norm_query_id') @@ -214,7 +221,11 @@ # Spark will eventually throw this away in an LRU fashion. .cache()) - df_queries_sampled = _sample_queries(df_queries_unique, wikis, samples_desired=queries_per_wiki, seed=seed) + # materialize df_queries_unique so we can unpersist the input df + df_queries_unique.count() + df.unpersist() - # Select the rows chosen by sampling from the filtered df + df_queries_sampled = _sample_queries(df_queries_unique, wiki_percents, seed=seed) + + # Select the rows chosen by sampling from the input df return df.join(df_queries_sampled, how='inner', on=['wikiid', 'norm_query_id']) -- To view, visit https://gerrit.wikimedia.org/r/364937 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Ic4f1abc9e33cdc9c3d7bc346355a750c8ce1ef68 Gerrit-PatchSet: 1 Gerrit-Project: search/MjoLniR Gerrit-Branch: master Gerrit-Owner: EBernhardson <ebernhard...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits