DCausse has submitted this change and it was merged. ( https://gerrit.wikimedia.org/r/366005 )
Change subject: Finish implementation of zero-feature cli arg ...................................................................... Finish implementation of zero-feature cli arg The zero-feature cli argument to training_pipeline.py was only half finished, complete the implementation by actually calling the backend function. Also fix a couple small problems identified by flake8, and failures in test cases due to recently merged code. Change-Id: Iddbd75eb6d55f059ed95a4aab19586da47d6064d --- M mjolnir/cli/training_pipeline.py M mjolnir/feature_engineering.py M mjolnir/test/test_sampling.py M mjolnir/training/tuning.py 4 files changed, 33 insertions(+), 19 deletions(-) Approvals: DCausse: Verified; Looks good to me, approved diff --git a/mjolnir/cli/training_pipeline.py b/mjolnir/cli/training_pipeline.py index 7e91457..fc08b8d 100644 --- a/mjolnir/cli/training_pipeline.py +++ b/mjolnir/cli/training_pipeline.py @@ -21,7 +21,7 @@ def main(sc, sqlContext, input_dir, output_dir, wikis, target_node_evaluations, - num_workers, num_cv_jobs, num_folds, test_dir): + num_workers, num_cv_jobs, num_folds, test_dir, zero_features): if os.path.exists(output_dir): logging.error('Output directory (%s) already exists' % (output_dir)) @@ -44,6 +44,10 @@ print '' continue + if zero_features: + df_hits_with_features = mjolnir.feature_engineering.zero_features( + df_hits_with_features, zero_features) + # Explore a hyperparameter space. Skip the most expensive part of tuning, # increasing the # of trees, with target_node_evaluations=None tune_results = mjolnir.training.xgboost.tune( diff --git a/mjolnir/feature_engineering.py b/mjolnir/feature_engineering.py index 8301547..9f86a53 100644 --- a/mjolnir/feature_engineering.py +++ b/mjolnir/feature_engineering.py @@ -41,6 +41,7 @@ """ features = df.schema['features'].metadata['features'] idxs = [features.index(name) for name in feature_names] + def zero_features(feat): raw = feat.toArray() for idx in idxs: @@ -63,6 +64,7 @@ pyspark.sql.DataFrame """ features = df.schema['features'].metadata['features'] + def extract_feature(features, idx): return features.toArray()[idx] extract_feature_udf = F.udf(extract_feature, pyspark.sql.types.FloatType()) diff --git a/mjolnir/test/test_sampling.py b/mjolnir/test/test_sampling.py index 0ac46f6..910bc17 100644 --- a/mjolnir/test/test_sampling.py +++ b/mjolnir/test/test_sampling.py @@ -19,8 +19,8 @@ ('foo', 'e', 5, 'eee', list(range(3))), ]).toDF(['wikiid', 'query', 'norm_query_id', 'session_id', 'hit_page_ids']) - sampled = mjolnir.sampling.sample(df, ['foo'], samples_per_wiki=100, - min_sessions_per_query=1, seed=12345).collect() + sampled = mjolnir.sampling.sample(df, samples_per_wiki=100, + seed=12345).collect() # The sampling rate should have been chosen as 1.0, so we should have all data # regardless of probabilities. assert len(sampled) == 5 @@ -49,26 +49,34 @@ num_sessions = max(1, min(100, int(a * math.pow(x+1, k)) + 10)) for j in xrange(0, num_sessions): session_id = "%s_%s_%s" % (wiki, q, str(j)) - rows.append((wiki, q, session_id, 1)) + rows.append((wiki, q, x, session_id, list(range(3)))) - df = spark_context.parallelize(rows).toDF(['wikiid', 'norm_query_id', 'session_id', 'q_by_ip_day']) - queries_per_wiki = 100 - df_sampled = mjolnir.sampling.sample(df, [wiki for (wiki, _, _) in wikis], - queries_per_wiki=queries_per_wiki, - min_sessions_per_query=10, seed=12345) - sampled = df_sampled.collect() + df = ( + spark_context.parallelize(rows) + .toDF(['wikiid', 'query', 'norm_query_id', 'session_id', 'hit_page_ids'])) - ratio_of_sessions = len(sampled) / len(rows) - expected_ratio_of_sessions = queries_per_wiki / len(queries) - # assert the overall sampling matches constraint on ratio - assert abs(ratio_of_sessions - expected_ratio_of_sessions) < 0.01 + samples_per_wiki = 1000 + # Using a constant seed ensures deterministic testing. Because this code + # actually relies on the law of large numbers, and we do not have large + # numbers here, many seeds probably fail. + df_sampled = mjolnir.sampling.sample(df, samples_per_wiki=samples_per_wiki, + seed=12345) + sampled = ( + df_sampled + .select('wikiid', 'query', F.explode('hit_page_ids').alias('hit_page_id')) + .drop_duplicates() + .groupBy('wikiid') + .agg(F.count(F.lit(1)).alias('num_samples')) + .collect()) + + total_samples_desired = len(wikis) * samples_per_wiki + total_samples = sum([r.num_samples for r in sampled]) + assert abs(total_samples - total_samples_desired) / float(total_samples_desired) < 0.05 # Test each wiki also meets the constraint for (wiki, _, _) in wikis: # ratio of rows - sampled_num_rows = len([r for r in sampled if r.wikiid == wiki]) - orig_num_rows = len([r for r in rows if r[0] == wiki]) - ratio_of_sessions = sampled_num_rows / orig_num_rows - assert abs(ratio_of_sessions - expected_ratio_of_sessions) < 0.01, wiki + sampled_num_rows = sum([r.num_samples for r in sampled if r.wikiid == wiki]) + assert abs(sampled_num_rows - samples_per_wiki) / float(samples_per_wiki) < 0.05 # assert correlation between sessions per query orig_grouped = ( diff --git a/mjolnir/training/tuning.py b/mjolnir/training/tuning.py index 8548f5d..2799db6 100644 --- a/mjolnir/training/tuning.py +++ b/mjolnir/training/tuning.py @@ -83,7 +83,7 @@ .collect()) df_splits = ( - sc.parallelize(split_rows(rows)) + df._sc.parallelize(split_rows(rows)) .toDF(['wikiid', 'norm_query_id', output_column])) return df.join(df_splits, how='inner', on=['wikiid', 'norm_query_id']) -- To view, visit https://gerrit.wikimedia.org/r/366005 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: merged Gerrit-Change-Id: Iddbd75eb6d55f059ed95a4aab19586da47d6064d Gerrit-PatchSet: 2 Gerrit-Project: search/MjoLniR Gerrit-Branch: master Gerrit-Owner: EBernhardson <ebernhard...@wikimedia.org> Gerrit-Reviewer: DCausse <dcau...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits