DCausse has submitted this change and it was merged. ( 
https://gerrit.wikimedia.org/r/385108 )

Change subject: Register CLI commands with setup.py
......................................................................


Register CLI commands with setup.py

Having to reach all the way into the venv to find our cli scripts is
error prone at best. Link them up into the venv's bin/ directory so we
don't have to do that.

Change-Id: I65e5db0c6a984e58b79ca1118f2d94da0a2783d6
---
M docs/feature_engineering.rst
M docs/running-in-analytics.rst
A mjolnir/__main__.py
R mjolnir/utilities/__init__.py
R mjolnir/utilities/data_pipeline.py
R mjolnir/utilities/kafka_daemon.py
R mjolnir/utilities/training_pipeline.py
M setup.py
8 files changed, 103 insertions(+), 28 deletions(-)

Approvals:
  DCausse: Verified; Looks good to me, approved



diff --git a/docs/feature_engineering.rst b/docs/feature_engineering.rst
index 41416a0..8de4f25 100644
--- a/docs/feature_engineering.rst
+++ b/docs/feature_engineering.rst
@@ -110,15 +110,15 @@
                 })
                 return (row.query, len(res.json()['tokens']))
             pool = multiprocessing.dummy.Pool(10, init)
-            return pool.imap_unordered(rows)
+            return pool.imap_unordered(process_one, rows)
         return f
 
 Apply that function to each partition of the rdd. This is somewhat expensive to
 compute, taking ~75s against 500k unique queries in ~30 partitions, and it will
 be used multiple times so we cache it in memory::
 
-    df_num_plain_tokens = 
rdd.mapPartitions(gen_collect_partition('enwiki_content', 
'text_search')).toDF(['query', 'num_text_tokens'])
-    df_num_text_tokens = 
rdd.mapPartitions(gen_collect_partition('enwiki_content', 
'plain_search')).toDF(['query', 'num_plain_tokens'])
+    df_num_text_tokens = 
rdd.mapPartitions(gen_collect_partition('enwiki_content', 
'text_search')).toDF(['query', 'num_text_tokens'])
+    df_num_plain_tokens = 
rdd.mapPartitions(gen_collect_partition('enwiki_content', 
'plain_search')).toDF(['query', 'num_plain_tokens'])
     df_num_tokens = df_num_plain_tokens.join(df_num_text_tokens, how='inner', 
on=['query']).cache()
 
 Merge our new feature into the existing data sets and write them out to hdfs::
@@ -146,7 +146,7 @@
         --executor-memory 2G i\
         --executor-cores 4 \
         --archives 'mjolnir_venv.zip#venv' \
-        venv/lib/python2.7/site-packages/mjolnir/cli/training_pipeline.py \
+        venv/bin/mjolnir-utilities.py training_pipeline
         -i 
hdfs://analytics-hadoop/user/ebernhardson/mjolnir/1193k_with_num_query_tokens \
         -o /home/ebernhardson/training_size/1193k_with_num_query_tokens \
         -t 
hdfs://analytics-hadoop/user/ebernhardson/mjolnir/test_with_num_query_tokens
diff --git a/docs/running-in-analytics.rst b/docs/running-in-analytics.rst
index bea481b..e8cf6e3 100644
--- a/docs/running-in-analytics.rst
+++ b/docs/running-in-analytics.rst
@@ -187,7 +187,7 @@
                --master yarn \
                --files /usr/lib/libhdfs.so.0.0.0 \
                --archives 'mjolnir_venv.zip#venv' \
-               venv/lib/python2.7/site-packages/mjolnir/cli/data_pipeline.py \
+               venv/bin/mjolnir-utilities.py data_pipeline \
                -i 
'hdfs://analytics-hadoop/wmf/data/discovery/query_clicks/daily/year=*/month=*/day=*'
 \
                -o hdfs://analytics-hadoop/user/${USER}/mjolnir/training_data \
                -c codfw \
@@ -224,7 +224,7 @@
                --conf spark.yarn.executor.memoryOverhead=1536 \
                --executor-memory 2G \
                --executor-cores 4 \
-               
venv/lib/python2.7/site-packages/mjolnir/cli/training_pipeline.py \
+               venv/bin/mjolnir-utilities.py training_pipeline \
                -i 
hdfs://analytics-hadoop/user/ebernhardson/mjolnir/1193k_with_one_hot_wp10 \
                -o ~/training_size/1193k_with_one_hot_wp10 \
                -w 1 -c 100 -f 5 enwiki
@@ -267,9 +267,9 @@
   for each executor. With the current cluster configuration 4 is the maximum 
that
   can be requested. Must be the same as spark.task.cpus above when training
 
-* venv/lib/python2.7/site-packages/mjolnir/cli/training_pipeline.py - This is 
the
-  script to run on the driver to actually run the spark job. Reaching into venv
-  like this is perhaps undesirable but gets the job done
+* venv/bin/mjolnir-utilities.py training_pipeline - This is the script to run
+  on the driver to actually run the spark job. This will call the utility
+  script at mjolnir.utilities.training_pipeline.
 
 * -i ... - Tells the training pipeline where to find the training data. This 
must be
   on HDFS and should be the output of the `data_pipeline.py` script.
diff --git a/mjolnir/__main__.py b/mjolnir/__main__.py
new file mode 100644
index 0000000..ae56958
--- /dev/null
+++ b/mjolnir/__main__.py
@@ -0,0 +1,57 @@
+"""
+Provides access to a set of utilities for running mjolnir pipelines.
+
+Utilities:
+
+* train                Executes the full data + training pipeline based
+*                      on a configuration definition
+* upload               Upload trained models to elasticsearch
+* data_pipeline        Individual spark job for converting click data into
+*                      labeled training data.
+* training_pipeline    Individual spark job for turning labeled training data
+*                      into xgboost models.
+* kafka_daemon         Daemon side of feature collection via kafka
+
+Usage:
+    mjolnir (-h | --help)
+    mjolnir <utility> [-h|--help]
+"""  # noqa
+
+from __future__ import absolute_import
+import sys
+import traceback
+from importlib import import_module
+
+USAGE = """Usage:
+    mjolnir (-h | --help)
+    mjolnir <utility> [-h|--help]\n"""
+
+
+def main(args=None):
+    if args is None:
+        args = sys.argv[1:]
+
+    if not len(args):
+        sys.stderr.write(USAGE)
+        sys.exit(1)
+    elif args[0] in ("-h", "--help"):
+        sys.stderr.write(__doc__ + "\n")
+        sys.exit(1)
+    elif args[0][:1] == "-":
+        sys.stderr.write(USAGE)
+        sys.exit(1)
+
+    module_name = args.pop(0)
+    try:
+        module = import_module(".utilities." + module_name,
+                               package="mjolnir")
+    except ImportError:
+        sys.stderr.write(traceback.format_exc())
+        sys.stderr.write("Could not find utility %s" % (module_name))
+        sys.exit(1)
+
+    module.main(args)
+
+
+if __name__ == "__main__":
+    main()
diff --git a/mjolnir/cli/__init__.py b/mjolnir/utilities/__init__.py
similarity index 100%
rename from mjolnir/cli/__init__.py
rename to mjolnir/utilities/__init__.py
diff --git a/mjolnir/cli/data_pipeline.py b/mjolnir/utilities/data_pipeline.py
similarity index 96%
rename from mjolnir/cli/data_pipeline.py
rename to mjolnir/utilities/data_pipeline.py
index ab86ffe..62ec121 100644
--- a/mjolnir/cli/data_pipeline.py
+++ b/mjolnir/utilities/data_pipeline.py
@@ -29,9 +29,9 @@
 }
 
 
-def main(sc, sqlContext, input_dir, output_dir, wikis, samples_per_wiki,
-         min_sessions_per_query, search_cluster, brokers, 
ltr_feature_definitions,
-         samples_size_tolerance):
+def run_pipeline(sc, sqlContext, input_dir, output_dir, wikis, 
samples_per_wiki,
+                 min_sessions_per_query, search_cluster, brokers, 
ltr_feature_definitions,
+                 samples_size_tolerance):
     # TODO: Should this jar have to be provided on the command line instead?
     sqlContext.sql("ADD JAR 
/mnt/hdfs/wmf/refinery/current/artifacts/refinery-hive.jar")
     sqlContext.sql("CREATE TEMPORARY FUNCTION stemmer AS 
'org.wikimedia.analytics.refinery.hive.StemmerUDF'")
@@ -200,7 +200,7 @@
     df_hits_with_features.write.parquet(output_dir)
 
 
-def parse_arguments():
+def parse_arguments(argv):
     parser = argparse.ArgumentParser(description='...')
     parser.add_argument(
         '-i', '--input', dest='input_dir', type=str,
@@ -240,15 +240,15 @@
         'wikis', metavar='wiki', type=str, nargs='+',
         help='A wiki to generate features and labels for')
 
-    args = parser.parse_args()
+    args = parser.parse_args(argv)
     if args.samples_size_tolerance < 0 or args.samples_size_tolerance > 1:
         raise ValueError('--sample-size-tolerance must be between 0 and 1')
 
     return dict(vars(args))
 
 
-if __name__ == "__main__":
-    args = parse_arguments()
+def main(argv=None):
+    args = parse_arguments(argv)
     if args['very_verbose']:
         logging.basicConfig(level=logging.DEBUG)
     elif args['verbose']:
@@ -262,4 +262,8 @@
     # human decipherable output
     sc.setLogLevel('WARN')
     sqlContext = HiveContext(sc)
-    main(sc, sqlContext, **args)
+    run_pipeline(sc, sqlContext, **args)
+
+
+if __name__ == "__main__":
+    main()
diff --git a/mjolnir/cli/kafka_daemon.py b/mjolnir/utilities/kafka_daemon.py
similarity index 92%
rename from mjolnir/cli/kafka_daemon.py
rename to mjolnir/utilities/kafka_daemon.py
index 1bbc4e6..1d1e3a3 100644
--- a/mjolnir/cli/kafka_daemon.py
+++ b/mjolnir/utilities/kafka_daemon.py
@@ -10,7 +10,7 @@
 import mjolnir.kafka.daemon
 
 
-def parse_arguments():
+def parse_arguments(argv):
     parser = argparse.ArgumentParser(description='...')
     parser.add_argument(
         '-b', '--brokers', dest='brokers', required=True, type=lambda x: 
x.split(','),
@@ -29,12 +29,12 @@
     parser.add_argument(
         '-vv', '--very-verbose', dest='very_verbose', default=False, 
action='store_true',
         help='Increase logging to DEBUG')
-    args = parser.parse_args()
+    args = parser.parse_args(argv)
     return dict(vars(args))
 
 
-if __name__ == '__main__':
-    args = parse_arguments()
+def main(argv=None):
+    args = parse_arguments(argv)
     if args['very_verbose']:
         logging.basicConfig(level=logging.DEBUG)
     elif args['verbose']:
@@ -44,3 +44,7 @@
     del args['verbose']
     del args['very_verbose']
     mjolnir.kafka.daemon.Daemon(**args).run()
+
+
+if __name__ == '__main__':
+    main()
diff --git a/mjolnir/cli/training_pipeline.py 
b/mjolnir/utilities/training_pipeline.py
similarity index 96%
rename from mjolnir/cli/training_pipeline.py
rename to mjolnir/utilities/training_pipeline.py
index 0539858..26640ef 100644
--- a/mjolnir/cli/training_pipeline.py
+++ b/mjolnir/utilities/training_pipeline.py
@@ -50,8 +50,8 @@
     return dict(summary)
 
 
-def main(sc, sqlContext, input_dir, output_dir, wikis, initial_num_trees, 
final_num_trees,
-         num_workers, num_cv_jobs, num_folds, test_dir, zero_features):
+def run_pipeline(sc, sqlContext, input_dir, output_dir, wikis, 
initial_num_trees, final_num_trees,
+                 num_workers, num_cv_jobs, num_folds, test_dir, zero_features):
     for wiki in wikis:
         print 'Training wiki: %s' % (wiki)
         df_hits_with_features = (
@@ -143,7 +143,7 @@
         print ''
 
 
-def parse_arguments():
+def parse_arguments(argv):
     parser = argparse.ArgumentParser(description='Train XGBoost ranking 
models')
     parser.add_argument(
         '-i', '--input', dest='input_dir', type=str, required=True,
@@ -187,14 +187,14 @@
         'wikis', metavar='wiki', type=str, nargs='+',
         help='A wiki to perform model training for.')
 
-    args = parser.parse_args()
+    args = parser.parse_args(argv)
     if args.num_cv_jobs is None:
         args.num_cv_jobs = args.num_folds
     return dict(vars(args))
 
 
-if __name__ == "__main__":
-    args = parse_arguments()
+def main(argv=None):
+    args = parse_arguments(argv)
     if args['very_verbose']:
         logging.basicConfig(level=logging.DEBUG)
     elif args['verbose']:
@@ -220,10 +220,14 @@
     os.mkdir(output_dir)
 
     try:
-        main(sc, sqlContext, **args)
+        run_pipeline(sc, sqlContext, **args)
     except:  # noqa: E722
         # If the directory we created is still empty delete it
         # so it doesn't need to be manually re-created
         if not len(glob.glob(os.path.join(output_dir, '*'))):
             os.rmdir(output_dir)
         raise
+
+
+if __name__ == "__main__":
+    main()
diff --git a/setup.py b/setup.py
index 4456776..d2f22f1 100644
--- a/setup.py
+++ b/setup.py
@@ -7,6 +7,7 @@
     'clickmodels',
     'requests',
     'kafka',
+    'pyyaml',
     'hyperopt',
     # hyperopt requires networkx < 2.0, but doesn't say so
     'networkx<2.0',
@@ -29,6 +30,11 @@
     author_email='[email protected]',
     description='A plumbing library for Machine Learned Ranking at Wikimedia',
     license='MIT',
+    entry_points={
+        'console_scripts': [
+            'mjolnir-utilities.py = mjolnir.__main__:main',
+        ],
+    },
     packages=find_packages(),
     include_package_data=True,
     data_files=['README.rst'],

-- 
To view, visit https://gerrit.wikimedia.org/r/385108
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I65e5db0c6a984e58b79ca1118f2d94da0a2783d6
Gerrit-PatchSet: 8
Gerrit-Project: search/MjoLniR
Gerrit-Branch: master
Gerrit-Owner: EBernhardson <[email protected]>
Gerrit-Reviewer: DCausse <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to