DCausse has submitted this change and it was merged. ( 
https://gerrit.wikimedia.org/r/366005 )

Change subject: Finish implementation of zero-feature cli arg
......................................................................


Finish implementation of zero-feature cli arg

The zero-feature cli argument to training_pipeline.py
was only half finished, complete the implementation by
actually calling the backend function.

Also fix a couple small problems identified by flake8,
and failures in test cases due to recently merged code.

Change-Id: Iddbd75eb6d55f059ed95a4aab19586da47d6064d
---
M mjolnir/cli/training_pipeline.py
M mjolnir/feature_engineering.py
M mjolnir/test/test_sampling.py
M mjolnir/training/tuning.py
4 files changed, 33 insertions(+), 19 deletions(-)

Approvals:
  DCausse: Verified; Looks good to me, approved



diff --git a/mjolnir/cli/training_pipeline.py b/mjolnir/cli/training_pipeline.py
index 7e91457..fc08b8d 100644
--- a/mjolnir/cli/training_pipeline.py
+++ b/mjolnir/cli/training_pipeline.py
@@ -21,7 +21,7 @@
 
 
 def main(sc, sqlContext, input_dir, output_dir, wikis, target_node_evaluations,
-         num_workers, num_cv_jobs, num_folds, test_dir):
+         num_workers, num_cv_jobs, num_folds, test_dir, zero_features):
 
     if os.path.exists(output_dir):
         logging.error('Output directory (%s) already exists' % (output_dir))
@@ -44,6 +44,10 @@
             print ''
             continue
 
+        if zero_features:
+            df_hits_with_features = mjolnir.feature_engineering.zero_features(
+                    df_hits_with_features, zero_features)
+
         # Explore a hyperparameter space. Skip the most expensive part of 
tuning,
         # increasing the # of trees, with target_node_evaluations=None
         tune_results = mjolnir.training.xgboost.tune(
diff --git a/mjolnir/feature_engineering.py b/mjolnir/feature_engineering.py
index 8301547..9f86a53 100644
--- a/mjolnir/feature_engineering.py
+++ b/mjolnir/feature_engineering.py
@@ -41,6 +41,7 @@
     """
     features = df.schema['features'].metadata['features']
     idxs = [features.index(name) for name in feature_names]
+
     def zero_features(feat):
         raw = feat.toArray()
         for idx in idxs:
@@ -63,6 +64,7 @@
     pyspark.sql.DataFrame
     """
     features = df.schema['features'].metadata['features']
+
     def extract_feature(features, idx):
         return features.toArray()[idx]
     extract_feature_udf = F.udf(extract_feature, pyspark.sql.types.FloatType())
diff --git a/mjolnir/test/test_sampling.py b/mjolnir/test/test_sampling.py
index 0ac46f6..910bc17 100644
--- a/mjolnir/test/test_sampling.py
+++ b/mjolnir/test/test_sampling.py
@@ -19,8 +19,8 @@
         ('foo', 'e', 5, 'eee', list(range(3))),
     ]).toDF(['wikiid', 'query', 'norm_query_id', 'session_id', 'hit_page_ids'])
 
-    sampled = mjolnir.sampling.sample(df, ['foo'], samples_per_wiki=100,
-                                      min_sessions_per_query=1, 
seed=12345).collect()
+    sampled = mjolnir.sampling.sample(df, samples_per_wiki=100,
+                                      seed=12345).collect()
     # The sampling rate should have been chosen as 1.0, so we should have all 
data
     # regardless of probabilities.
     assert len(sampled) == 5
@@ -49,26 +49,34 @@
             num_sessions = max(1, min(100, int(a * math.pow(x+1, k)) + 10))
             for j in xrange(0, num_sessions):
                 session_id = "%s_%s_%s" % (wiki, q, str(j))
-                rows.append((wiki, q, session_id, 1))
+                rows.append((wiki, q, x, session_id, list(range(3))))
 
-    df = spark_context.parallelize(rows).toDF(['wikiid', 'norm_query_id', 
'session_id', 'q_by_ip_day'])
-    queries_per_wiki = 100
-    df_sampled = mjolnir.sampling.sample(df, [wiki for (wiki, _, _) in wikis],
-                                         queries_per_wiki=queries_per_wiki,
-                                         min_sessions_per_query=10, seed=12345)
-    sampled = df_sampled.collect()
+    df = (
+        spark_context.parallelize(rows)
+        .toDF(['wikiid', 'query', 'norm_query_id', 'session_id', 
'hit_page_ids']))
 
-    ratio_of_sessions = len(sampled) / len(rows)
-    expected_ratio_of_sessions = queries_per_wiki / len(queries)
-    # assert the overall sampling matches constraint on ratio
-    assert abs(ratio_of_sessions - expected_ratio_of_sessions) < 0.01
+    samples_per_wiki = 1000
+    # Using a constant seed ensures deterministic testing. Because this code
+    # actually relies on the law of large numbers, and we do not have large
+    # numbers here, many seeds probably fail.
+    df_sampled = mjolnir.sampling.sample(df, samples_per_wiki=samples_per_wiki,
+                                         seed=12345)
+    sampled = (
+        df_sampled
+        .select('wikiid', 'query', 
F.explode('hit_page_ids').alias('hit_page_id'))
+        .drop_duplicates()
+        .groupBy('wikiid')
+        .agg(F.count(F.lit(1)).alias('num_samples'))
+        .collect())
+
+    total_samples_desired = len(wikis) * samples_per_wiki
+    total_samples = sum([r.num_samples for r in sampled])
+    assert abs(total_samples - total_samples_desired) / 
float(total_samples_desired) < 0.05
     # Test each wiki also meets the constraint
     for (wiki, _, _) in wikis:
         # ratio of rows
-        sampled_num_rows = len([r for r in sampled if r.wikiid == wiki])
-        orig_num_rows = len([r for r in rows if r[0] == wiki])
-        ratio_of_sessions = sampled_num_rows / orig_num_rows
-        assert abs(ratio_of_sessions - expected_ratio_of_sessions) < 0.01, wiki
+        sampled_num_rows = sum([r.num_samples for r in sampled if r.wikiid == 
wiki])
+        assert abs(sampled_num_rows - samples_per_wiki) / 
float(samples_per_wiki) < 0.05
 
     # assert correlation between sessions per query
     orig_grouped = (
diff --git a/mjolnir/training/tuning.py b/mjolnir/training/tuning.py
index 8548f5d..2799db6 100644
--- a/mjolnir/training/tuning.py
+++ b/mjolnir/training/tuning.py
@@ -83,7 +83,7 @@
         .collect())
 
     df_splits = (
-        sc.parallelize(split_rows(rows))
+        df._sc.parallelize(split_rows(rows))
         .toDF(['wikiid', 'norm_query_id', output_column]))
 
     return df.join(df_splits, how='inner', on=['wikiid', 'norm_query_id'])

-- 
To view, visit https://gerrit.wikimedia.org/r/366005
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: Iddbd75eb6d55f059ed95a4aab19586da47d6064d
Gerrit-PatchSet: 2
Gerrit-Project: search/MjoLniR
Gerrit-Branch: master
Gerrit-Owner: EBernhardson <ebernhard...@wikimedia.org>
Gerrit-Reviewer: DCausse <dcau...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to