EBernhardson has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/364937 )

Change subject: [WIP] Take the desired number of samples in data_pipeline.py
......................................................................

[WIP] Take the desired number of samples in data_pipeline.py

The sampling rate in the data pipeline was done by having the user
provide the number of normalized queries to work with, but this
isn't a value the user ever cares about. What they actually care
about is how many rows are in the output data per-wiki.

TODO: This keeps blowing out the memory limits on some executors,
but I'm not sure why yet.

Change-Id: Ic4f1abc9e33cdc9c3d7bc346355a750c8ce1ef68
---
M mjolnir/cli/data_pipeline.py
M mjolnir/norm_query.py
M mjolnir/sampling.py
3 files changed, 59 insertions(+), 50 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/search/MjoLniR 
refs/changes/37/364937/1

diff --git a/mjolnir/cli/data_pipeline.py b/mjolnir/cli/data_pipeline.py
index e79db75..10199dd 100644
--- a/mjolnir/cli/data_pipeline.py
+++ b/mjolnir/cli/data_pipeline.py
@@ -26,7 +26,7 @@
 }
 
 
-def main(sc, sqlContext, input_dir, output_dir, wikis, queries_per_wiki,
+def main(sc, sqlContext, input_dir, output_dir, wikis, samples_per_wiki,
          min_sessions_per_query, search_cluster, brokers):
     # TODO: Should this jar have to be provided on the command line instead?
     sqlContext.sql("ADD JAR 
/mnt/hdfs/wmf/refinery/current/artifacts/refinery-hive.jar")
@@ -63,10 +63,8 @@
     df_sampled = (
         mjolnir.sampling.sample(
             df_norm,
-            wikis=wikis,
             seed=54321,
-            queries_per_wiki=queries_per_wiki,
-            min_sessions_per_query=min_sessions_per_query)
+            samples_per_wiki=samples_per_wiki)
         # Explode source into a row per displayed hit
         .select('*', F.expr("posexplode(hit_page_ids)").alias('hit_position', 
'hit_page_id'))
         .drop('hit_page_ids')
@@ -165,8 +163,8 @@
         
default='hdfs://analytics-hadoop/wmf/data/discovery/query_clicks/daily/year=*/month=*/day=*',
         help='Input path, prefixed with hdfs://, to query and click data')
     parser.add_argument(
-        '-q', '--queries-per-wiki', dest='queries_per_wiki', type=int, 
default=20000,
-        help='The number of normalized queries, per wiki, to operate on')
+        '-q', '--samples-per-wiki', dest='samples_per_wiki', type=int, 
default=1000000,
+        help='The approximate number of rows in the final result per-wiki.')
     parser.add_argument(
         '-s', '--min-sessions', dest='min_sessions_per_query', type=int, 
default=10,
         help='The minimum number of sessions per normalized query')
diff --git a/mjolnir/norm_query.py b/mjolnir/norm_query.py
index 301ad15..181d700 100644
--- a/mjolnir/norm_query.py
+++ b/mjolnir/norm_query.py
@@ -112,7 +112,7 @@
     return zip([row.query for row in source], groups)
 
 
-def transform(df, url_list, indices=None, batch_size=30, top_n=5, 
min_sessions_per_query=35,
+def transform(df, url_list, indices=None, batch_size=15, top_n=5, 
min_sessions_per_query=35,
               session_factory=requests.Session):
     """Group together similar results in df
 
diff --git a/mjolnir/sampling.py b/mjolnir/sampling.py
index c9f9a7c..c6f3aa0 100644
--- a/mjolnir/sampling.py
+++ b/mjolnir/sampling.py
@@ -82,7 +82,7 @@
     return splits + [float('inf')]
 
 
-def _sample_queries(df, wikis, num_buckets=100, samples_desired=10000, 
seed=None):
+def _sample_queries(df, wikis, wiki_percents, num_buckets=100, seed=None):
     """Sample down a unique list of (wiki, norm_query_id, num_sessions)
 
     Given a dataset of unique queries, sample it down to samples_desired per 
wiki
@@ -99,14 +99,11 @@
     ----------
     df : pyspark.sql.DataFrame
         Input dataframe containing (wiki, norm_query_id, num_sessions) fields.
-    wikis : list of strings
-        List of wikis to generate samples for.
+    wiki_percents : dict
+        Map from wikiid to the fraction of norm_query_ids to use from that 
wiki.
     num_buckets : int, optional
         The number of buckets to divide each wiki's queries into. An equal 
number
         of queries will be sampled from each bucket. (Default: 100)
-    samples_desired : int, optional
-        The approximate total number of samples to return per wiki.
-        (Default: 10000)
     seed : int or None, optional
         Seed used for random sampling. (Default: None)
 
@@ -123,25 +120,17 @@
     wiki_splits = {}
     # Map from (wikiid, split) -> % of samples needed. Used by RDD.sampleByKey
     wiki_fractions = {}
-    for wiki in wikis:
-        df_wiki = df.where(df.wikiid == wiki).cache()
-        try:
-            num_rows = df_wiki.count()
-            # If we have less than the desired amount of data no sampling is 
needed
-            if num_rows < samples_desired:
-                wiki_fractions[(wiki, float('inf'))] = 1.
-                wiki_splits[wiki] = [float('inf')]
-                continue
+    for wiki, fraction in wiki_percents.items():
+        # If we have less than the desired amount of data no sampling is needed
+        if fraction >= 1.:
+            wiki_fractions[(wiki, float('inf'))] = 1.
+            wiki_splits[wiki] = [float('inf')]
+            continue
 
-            # Number of source rows expected in each bucket
-            bucket_rows = float(num_rows) / num_buckets
-            # Fraction of rows needed from each bucket
-            bucket_fraction = bucket_samples / bucket_rows
-            wiki_splits[wiki] = _calc_splits(df_wiki, num_buckets)
-            for split in wiki_splits[wiki]:
-                wiki_fractions[(wiki, split)] = bucket_fraction
-        finally:
-            df_wiki.unpersist()
+        df_wiki = df.where(df.wikiid == wiki)
+        wiki_splits[wiki] = _calc_splits(df_wiki, num_buckets)
+        for split in wiki_splits[wiki]:
+            wiki_fractions[(wiki, split)] = fraction
 
     def to_pair_rdd(row):
         splits = wiki_splits[row.wikiid]
@@ -163,8 +152,7 @@
         .toDF(['wikiid', 'norm_query_id']))
 
 
-def sample(df, wikis, seed=None, queries_per_wiki=10000,
-           min_sessions_per_query=35, max_queries_per_ip_day=50):
+def sample(df, seed=None, samples_per_wiki=1000000):
     """Choose a representative sample of queries from input dataframe.
 
     Takes in the unsampled query click logs and filters it down into a smaller
@@ -177,21 +165,13 @@
     ----------
     df : pyspark.sql.DataFrame
         Input dataframe with columns wikiid, query, and session_id.
-    wikis : set of strings
-        The set of wikis to sample for. Many wikis will not have enough data
-        to generate reasonable ml models. TODO: Should we instead define a
-        minimum size to require?
     seed : int or None, optional
         The random seed used when sampling. If None a seed will be chosen
         randomly. (Default: None)
-    queries_per_wiki : int, optional
-        The desired number of distinct normalized queries per wikiid in the
+    samples_per_wiki : int, optional
+        The desired number of distinct (query, hit_page_id) pairs in the
         output. This constraint is approximate and the returned number
-        of queries may slightly vary per wiki. (Default: 10000)
-    min_sessions_per_query : int, optional
-        Require each chosen query to have at least this many sessions per
-        query. This is necessary To train the DBN later in the pipeline.
-        (Default: 35)
+        of queries may vary per wiki. (Default: 1000000)
 
     Returns
     -------
@@ -199,12 +179,39 @@
         The input DataFrame with all columns it origionally had sampled down
         based on the provided constraints.
     """
-    mjolnir.spark.assert_columns(df, ['wikiid', 'norm_query_id', 'session_id'])
+    mjolnir.spark.assert_columns(df, ['wikiid', 'query', 'hit_page_ids', 
'norm_query_id', 'session_id'])
+
+    # We need this df twice, and by default the df coming in here is from
+    # mjolnir.norm_query which is quite expensive.
+    df.cache()
+
+    # Figure out the percentage of each wiki's norm_query_id's we need to 
approximately
+    # have hits_per_wiki final training samples.
+    hit_page_id_counts = (
+        df
+        .select('wikiid', 'query', 
F.explode('hit_page_ids').alias('hit_page_id'))
+        # We could groupBy('wikiid').agg(F.countDistinct('query', 
'hit_page_id'))
+        # directly, but this causes spark to blow out memory limits by
+        # collecting too much data on too few executors.
+        .groupBy('wikiid', 'query')
+        .agg(F.countDistinct('hit_page_id').alias('num_hit_page_ids'))
+        .groupBy('wikiid')
+        .agg(F.sum('num_hit_page_ids').alias('num_hit_page_ids'))
+        .collect())
+
+    wiki_percents = {}
+    needs_sampling = False
+    for row in hit_page_id_counts:
+        wiki_percents[row.wikiid] = min(1., float(hits_per_wiki) / 
row.num_hit_page_ids)
+        if wiki_percents[row.wikiid] < 1.:
+            needs_sampling = True
+
+    if not needs_sampling:
+        return df
 
     # Aggregate down into a unique set of (wikiid, norm_query_id) and add in a
-    # count of the number of unique sessions per pair. Filter on the number
-    # of sessions as we need some minimum number of sessions per query to train
-    # the DBN
+    # count of the number of unique sessions per pair. We will sample 
per-strata
+    # based on percentiles of num_sessions.
     df_queries_unique = (
         df
         .groupBy('wikiid', 'norm_query_id')
@@ -214,7 +221,11 @@
         # Spark will eventually throw this away in an LRU fashion.
         .cache())
 
-    df_queries_sampled = _sample_queries(df_queries_unique, wikis, 
samples_desired=queries_per_wiki, seed=seed)
+    # materialize df_queries_unique so we can unpersist the input df
+    df_queries_unique.count()
+    df.unpersist()
 
-    # Select the rows chosen by sampling from the filtered df
+    df_queries_sampled = _sample_queries(df_queries_unique, wiki_percents, 
seed=seed)
+
+    # Select the rows chosen by sampling from the input df
     return df.join(df_queries_sampled, how='inner', on=['wikiid', 
'norm_query_id'])

-- 
To view, visit https://gerrit.wikimedia.org/r/364937
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ic4f1abc9e33cdc9c3d7bc346355a750c8ce1ef68
Gerrit-PatchSet: 1
Gerrit-Project: search/MjoLniR
Gerrit-Branch: master
Gerrit-Owner: EBernhardson <ebernhard...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to