DCausse has submitted this change and it was merged. ( https://gerrit.wikimedia.org/r/381250 )
Change subject: Some scripts to automate training and model upload ...................................................................... Some scripts to automate training and model upload Add a script above the current cli scripts to automate the configuration we need to train on many different wikis. Uses a YAML config file to know what to do for various wikis. I might have went too far with the configuration, now pretty much everything to do for commands, except the bare bones names of commands to be executed, comes from the config file. This does make it pretty flexible and made adding new commands (such as pyspark) relatively easy, but it might have just shifted the difficulties elsewhere... Upload script could be improved, but should work as a first implementation. Change-Id: I19e02ac51b155eba7b9e296d203e03e7499a996d --- M docs/feature_engineering.rst M docs/running-in-analytics.rst A example_train.yaml M mjolnir/__main__.py M mjolnir/test/conftest.py A mjolnir/test/fixtures/build_mjolnir_utility/basic.expect A mjolnir/test/fixtures/build_mjolnir_utility/basic.test A mjolnir/test/fixtures/build_mjolnir_utility/minimum.expect A mjolnir/test/fixtures/build_mjolnir_utility/minimum.test A mjolnir/test/fixtures/build_mjolnir_utility/multiple_args.expect A mjolnir/test/fixtures/build_mjolnir_utility/multiple_args.test A mjolnir/test/fixtures/build_spark_command/fully_featured.expect A mjolnir/test/fixtures/build_spark_command/fully_featured.test A mjolnir/test/fixtures/build_spark_command/minimum.expect A mjolnir/test/fixtures/build_spark_command/minimum.test A mjolnir/test/fixtures/load_config/applies_templating.expect A mjolnir/test/fixtures/load_config/applies_templating.test A mjolnir/test/fixtures/load_config/example_train.expect A mjolnir/test/fixtures/load_config/example_train.test A mjolnir/test/fixtures/load_config/merge.expect A mjolnir/test/fixtures/load_config/merge.test A mjolnir/test/fixtures/load_config/minimum_allowed.expect A mjolnir/test/fixtures/load_config/minimum_allowed.test A mjolnir/test/fixtures/load_config/minimum_allowed_command.expect A mjolnir/test/fixtures/load_config/minimum_allowed_command.test A mjolnir/test/utilities/test_spark.py A mjolnir/utilities/spark.py A mjolnir/utilities/upload.py 28 files changed, 1,855 insertions(+), 161 deletions(-) Approvals: DCausse: Verified; Looks good to me, approved diff --git a/docs/feature_engineering.rst b/docs/feature_engineering.rst index 8de4f25..e9c06ff 100644 --- a/docs/feature_engineering.rst +++ b/docs/feature_engineering.rst @@ -22,12 +22,7 @@ Open a pyspark shell with mjolnir:: - PYSPARK_PYTHON=venv/bin/python SPARK_CONF_DIR=/etc/spark/conf ~/spark-2.1.0-bin-hadoop2.6/bin/pyspark \ - --jars /home/ebernhardson/mjolnir-0.1-jar-with-dependencies.jar \ - --driver-class-path /home/ebernhardson/mjolnir-0.1-jar-with-dependencies.jar \ - --master yarn \ - --files /usr/lib/libhdfs.so.0.0.0 \ - --archives 'mjolnir_venv.zip#venv' + venv/bin/mjolnir-utilities.py spark --config example_train.yaml shell Load the feature file into a dataframe. Specifying the schema allows naming the columns and ensuring they are converted to the correc types:: @@ -70,12 +65,7 @@ Open a pyspark shell with mjolnir:: - PYSPARK_PYTHON=venv/bin/python SPARK_CONF_DIR=/etc/spark/conf ~/spark-2.1.0-bin-hadoop2.6/bin/pyspark \ - --jars /home/ebernhardson/mjolnir-0.1-jar-with-dependencies.jar \ - --driver-class-path /home/ebernhardson/mjolnir-0.1-jar-with-dependencies.jar \ - --master yarn \ - --files /usr/lib/libhdfs.so.0.0.0 \ - --archives 'mjolnir_venv.zip#venv' + venv/bin/mjolnir-utilities.py spark --config example_train.yaml shell Load all the relevant query strings into an rdd:: @@ -134,23 +124,12 @@ Again we have two new datasets with an additional feature that can be evaluated with training_pipeline.py. These new features can be tested together directly:: - PYSPARK_PYTHON=venv/bin/python SPARK_CONF_DIR=/etc/spark/conf ~/spark-2.1.0-bin-hadoop2.6/bin/spark-submit \ - --jars /home/ebernhardson/mjolnir-0.1-jar-with-dependencies.jar \ - --driver-class-path /home/ebernhardson/mjolnir-0.1-jar-with-dependencies.jar \ - --master yarn \ - --files /usr/lib/libhdfs.so.0.0.0 \ - --conf spark.dynamicAllocation.maxExecutors=105 \ - --conf spark.sql.autoBroadcastJoinThreshold=-1 \ - --conf spark.task.cpus=4 \ - --conf spark.yarn.executor.memoryOverhead=1536 \ - --executor-memory 2G i\ - --executor-cores 4 \ - --archives 'mjolnir_venv.zip#venv' \ - venv/bin/mjolnir-utilities.py training_pipeline - -i hdfs://analytics-hadoop/user/ebernhardson/mjolnir/1193k_with_num_query_tokens \ - -o /home/ebernhardson/training_size/1193k_with_num_query_tokens \ - -t hdfs://analytics-hadoop/user/ebernhardson/mjolnir/test_with_num_query_tokens - -w 1 -c 100 -f 5 enwiki + venv/bin/mjolnir-utilities.py spark \ + --config example_train.yaml \ + --marker num_query_tokens \ + --template-var training_data_path=user/ebernhardson/mjolnir/1193k_with_num_query_tokens \ + train + The new features can also be tested individually by using `--zero-feature` argument to training_pipeline.py to zero out the feature not being evaluated. diff --git a/docs/running-in-analytics.rst b/docs/running-in-analytics.rst index 0731565..f728052 100644 --- a/docs/running-in-analytics.rst +++ b/docs/running-in-analytics.rst @@ -17,7 +17,7 @@ * driver - This is the JVM that is in control of the spark job and orchestrates everything. This instance does not generally get assigned any tasks, instead only being responsible for orchestrating the delivery of tasks to executors. For - the instructions in this document this is always on stat1004, but it is + the instructions in this document this is always on stat1005, but it is possible for the driver to be created inside the hadoop cluster instead. * executor - Also known as workers. These instances do the heavy lifting of executing @@ -35,21 +35,23 @@ As MjoLniR is still in early days there is not a defined process to deploy to the analytics cluster. This will be worked out, but for now it is all manual. Some caveats: > -* MjoLniR must be run from a debian jessie based host. The is because the hadoop workers - are also running debian jessie and the python binary on ubuntu will not run properly on them. +* MjoLniR must be run from a debian jessie or greater based host. The is + because the hadoop workers are also running debian jessie and the python + binary on ubuntu will not run properly on them. -* The only debian jessie based host available as a spark driver is stat1004.eqiad.wmnet. +* The best host to run everything from is stat1005, a debian stretch based machine. * MjoLniR requires spark 2.1.0, but the default installed version in the hadoop cluster is 1.6.0. You will need to fetch spark from apache and uncompress - the archive in your home directory on stat1004. + the archive in your home directory on stat1005. * MjoLniR requires a python virtualenv containing all the appropriate library - dependencies. Unfortunately it is not possible yet to build the virtualenv on - stat1004 due to some missing compile time dependencies. See https://gerrit.wikimedia.org/r/348669 + dependencies. Pip shiped with debian stretch is able to utilize wheel packages + which makes installing all our dependencies a breeze. -* MjoLniR requires a jar built in the /jvm directory of this repository. This is a fat jar - containing other dependencies, such as xgboost and kafka streaming. +* MjoLniR requires a jar built in the /jvm directory of this repository, along + with a few others. These are all deployed to archiva and referenced from the + configuration for the `spark` utility. * Some conflict with dependencies installed in the analytics cluster may cause kafka streaming to fail with `kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.broker`. @@ -71,65 +73,53 @@ Setting everything up ===================== -Taking the above into account, the process to get started is basically: +Taking the above into account, the process to get started is basically (all run from stat1005.eqiad.wmnet): -Build a virtualenv inside the vagrant box configured with this repository:: +Clone mjolnir, build a virtualenv, and zip it up for deployment to spark executors:: - cd /vagrant - virtualenv mjolnir_venv - mjolnir_venv/bin/pip install . - cd mjolnir_venv + cd ~ + git clone https://gerrit.wikimedia.org/r/search/MjoLniR mjolnir + cd mjolnir + virtualenv venv + venv/bin/pip install . + cd venv zip -qr ../mjolnir_venv.zip . cd .. - rm -rf mjolnir_venv -That zip file needs to be copied over to stat1004, along with the appropriate jar:: +Pull down spark 2.1 and decompress that into your home directory:: - scp mjolnir_venv.zip stat1004.eqiad.wmnet:~/ - scp jvm/target/mjolnir-0.1-jar-with-dependencies.jar stat1004.eqiad.wmnet:~/ - -SSH into stat1004:: - - ssh stat1004.eqiad.wmnet - -Make a directory just for mjolnir stuff:: - - mkdir ~/mjolnir - -Move the files we copied to the machine into it:: - - mv mjolnir_venv.zip mjolnir/ - mv mjolnir-0.1-jar-with-dependencies.jar mjolnir/ - -Pull down spark 2.1 and decompress that there too:: - - cd mjolnir/ + cd ~ https_proxy=http://webproxy.eqiad.wmnet:8080 wget https://archive.apache.org/dist/spark/spark-2.1.0/spark-2.1.0-bin-hadoop2.6.tgz tar -zxf spark-2.1.0-bin-hadoop2.6.tgz -We need to uncompress mjolnir_venv.zip so that it is accessible both in the -driver and the hadoop workers:: +Upgrading mjolnir +================= - mkdir venv +Upgrading mjolnir in your analytics checkout is fairly painless. Run the +following commands. If you need dependencies as well leave off the --no-deps +argument, and be sure to set an appropriate https_proxy environment variable:: + + cd ~/mjolnir + git pull --ff-only + venv/bin/pip install --upgrade --no-deps . cd venv - unzip -q ../mjolnir_venv.zip + zip -qr ../mjolnir_venv.zip . + cd .. -Running an interactive shell -============================ +The configuration file +====================== -Finally we should be ready to run things. Lets start first with the pyspark -REPL to see things are working:: +The configuration file, located at `example_train.yaml`, helps automate the +relatively tedious task of running spark command lines. Both spark and mjolnir +take an incredible amount of arguments that all have to be configured just-so +for training to work out. The high level design of this file is to have global +and per-profile configuration, and then to have defined commands. See the doc +comments in mjolnir/utilities/spark.py for more information. Many things in +this file are templated where they might not really need to be to allow +overriding them from the command line. - ssh stat1004.eqiad.wmnet - cd mjolnir/ - PYSPARK_PYTHON=venv/bin/python SPARK_CONF_DIR=/etc/spark/conf spark-2.1.0-bin-hadoop2.6/bin/pyspark \ - --repositories https://archiva.wikimedia.org/repository/releases,https://archiva.wikimedia.org/repository/snapshots,https://archiva.wikimedia.org/repository/mirrored \ - --packages ml.dmlc:xgboost4j-spark:0.7-wmf-1,org.wikimedia.search:mjolnir:0.2-SNAPSHOT,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0 \ - --master yarn \ - --files /usr/lib/libhdfs.so.0.0.0 \ - --archives 'mjolnir_venv.zip#venv' +An explanation of some of the configuration used: -An expanation of the options used: * PYSPARK_PYTHON - Tells spark where to find the python executable. This path must be a relative path to work both locally and on the worker nodes where mjolnir_venv.zip is decompressed. @@ -156,47 +146,7 @@ directory. The part before # is the path to the file locally, and the part after the # is the directory to decompress to. -After a bunch of output, some warnings, perhaps a few exceptions printed out -(normal, they are usually related to trying to find a port to run the web ui -on), you will be greated with a prompt. It should look something like:: - - Welcome to - ____ __ - / __/__ ___ _____/ /__ - _\ \/ _ \/ _ `/ __/ '_/ - /__ / .__/\_,_/_/ /_/\_\ version 2.1.0 - /_/ - - Using Python version 2.7.9 (default, Jun 29 2016 13:08:31) - SparkSession available as 'spark'. - >>> - -From here you can do anything you could do when programming mjolnir. This can be quite -useful for one-off tasks such as evaluating a previously trained model against a new -dataset, or splitting up an existing dataset into smaller pieces. - -Running data_pipeline.py -======================== - -The commandline for kicking off the data pipeline looks like:: - - cd ~/mjolnir - PYSPARK_PYTHON=venv/bin/python SPARK_CONF_DIR=/etc/spark/conf spark-2.1.0-bin-hadoop2.6/bin/spark-submit \ - --repositories https://archiva.wikimedia.org/repository/releases,https://archiva.wikimedia.org/repository/snapshots,https://archiva.wikimedia.org/repository/mirrored \ - --packages ml.dmlc:xgboost4j-spark:0.7-wmf-1,org.wikimedia.search:mjolnir:0.2-SNAPSHOT,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0 \ - --master yarn \ - --files /usr/lib/libhdfs.so.0.0.0 \ - --archives 'mjolnir_venv.zip#venv' \ - venv/bin/mjolnir-utilities.py data_pipeline \ - -i 'hdfs://analytics-hadoop/wmf/data/discovery/query_clicks/daily/year=*/month=*/day=*' \ - -o hdfs://analytics-hadoop/user/${USER}/mjolnir/training_data \ - -c codfw \ - enwiki - -This uses all the same basic spark options as before, but changes the binary -run from `pyspark`, the interactive REPL, to `spark-submit` which runs a -predefined script. This script takes a few options, but for simplicity here we -pass only a few of the available parameters: +data_pipeline.py arguments: * -i The input directory containing the query click data. It is unlikely you will ever need to use a different value than shown here. @@ -208,28 +158,8 @@ the *hot*spare* search cluster. Pointing this at the currently active cluster could cause increased latency for our users. -Running training_pipeline.py -============================ - -The commandline for kicking off training looks like:: - - PYSPARK_PYTHON=venv/bin/python SPARK_CONF_DIR=/etc/spark/conf ~/spark-2.1.0-bin-hadoop2.6/bin/spark-submit \ - --repositories https://archiva.wikimedia.org/repository/releases,https://archiva.wikimedia.org/repository/snapshots,https://archiva.wikimedia.org/repository/mirrored \ - --packages ml.dmlc:xgboost4j-spark:0.7-wmf-1,org.wikimedia.search:mjolnir:0.2-SNAPSHOT,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0 \ - --master yarn --files /usr/lib/libhdfs.so.0.0.0 \ - --archives 'mjolnir_venv.zip#venv' \ - --conf spark.dynamicAllocation.maxExecutors=105 \ - --conf spark.sql.autoBroadcastJoinThreshold=-1 \ - --conf spark.task.cpus=4 \ - --conf spark.yarn.executor.memoryOverhead=1536 \ - --executor-memory 2G \ - --executor-cores 4 \ - venv/bin/mjolnir-utilities.py training_pipeline \ - -i hdfs://analytics-hadoop/user/ebernhardson/mjolnir/1193k_with_one_hot_wp10 \ - -o ~/training_size/1193k_with_one_hot_wp10 \ - -w 1 -c 100 -f 5 enwiki - -This includes a few more arguments than the interactive shell did. These are: +training_pipeline.py takes a few more arguments, mostly related to having +an appropriate amount of resources available for training: * --conf spark.dynamicAllocation.maxExecutors=105 - The training process can use an incredible amount of resources on the cluster if allowed to. Generally @@ -267,10 +197,6 @@ for each executor. With the current cluster configuration 4 is the maximum that can be requested. Must be the same as spark.task.cpus above when training -* venv/bin/mjolnir-utilities.py training_pipeline - This is the script to run - on the driver to actually run the spark job. This will call the utility - script at mjolnir.utilities.training_pipeline. - * -i ... - Tells the training pipeline where to find the training data. This must be on HDFS and should be the output of the `data_pipeline.py` script. @@ -292,11 +218,86 @@ variance increasing this to 11 will make the training take longer but might have more accurate statistics. -* enwiki - Finally we take a list of wikis to train models for. Each wiki is trained on its own, - and a training dataset can contain features for multiple wikis. +Running an interactive shell +============================ + +Finally we should be ready to run things. Lets start first with the pyspark +REPL to see things are working:: + + ssh stat1005.eqiad.wmnet + cd mjolnir/ + venv/bin/mjolnir-utilities.py --config example_config.yaml shell + +After a bunch of output, some warnings, perhaps a few exceptions printed out +(normal, they are usually related to trying to find a port to run the web ui +on), you will be greated with a prompt. It should look something like:: + + Welcome to + ____ __ + / __/__ ___ _____/ /__ + _\ \/ _ \/ _ `/ __/ '_/ + /__ / .__/\_,_/_/ /_/\_\ version 2.1.0 + /_/ + + Using Python version 2.7.9 (default, Jun 29 2016 13:08:31) + SparkSession available as 'spark'. + >>> + +From here you can do anything you could do when programming mjolnir. This can be quite +useful for one-off tasks such as evaluating a previously trained model against a new +dataset, or splitting up an existing dataset into smaller pieces. + +Running data_pipeline.py +======================== + +The commandline for kicking off the data pipeline looks like:: + + cd ~/mjolnir + venv/bin/mjolnir-utilities.py spark --config example_config.yaml collect enwiki + +Providing enwiki at the very end limits data collection to a single wiki. Leave +this parameter off to collect data for all wikis configured in +example_config.yaml. + +With the default configuration this will store the data in hdfs at:: + + hdfs://analytics-hadoop/user/<username>/mjolnir/<Ymd> + +Running training_pipeline.py +============================ + +The commandline for kicking off training looks like:: + + venv/bin/mjolnir-utilities.py spark --config example_config.yaml train enwiki + +Similar to the collection phase, providing enwiki at the very end limits +training to a single wiki. Leave this parameter off to train for all +configured wikis (that exist in the data). + +By default this will look for data in hdfs at the same location that the +`collect` script stores data. Because this uses the current date as the name it +will not work correctly the next day. With the `example_config.yaml` file you +can override this location to point at a previous day like so:: + + venv/bin/mjolnir-utilities.py spark \ + --config example_config.yaml \ + --template-var training_data_path=user/ebernhardson/mjolnir/20171023 \ + train enwiki + +Running both together +===================== + +The commandline for kicking off running a full data collect and training in one +go looks like:: + + venv/bin/mjolnir-utilities.py spark --config example_config.yaml collect_and_train enwiki + +Same as before the final argument is the wiki to limit data collection and +training to. Resource usage in the hadoop cluster ==================================== +TODO Help! There are exceptions eveywhere! diff --git a/example_train.yaml b/example_train.yaml new file mode 100644 index 0000000..b73551c --- /dev/null +++ b/example_train.yaml @@ -0,0 +1,195 @@ +# Configuration shared by all training groups and +global: + environment: + PYSPARK_PYTHON: venv/bin/python + SPARK_CONF_DIR: /etc/spark/conf + SPARK_HOME: "%(HOME)s/spark-%(spark_version)s-bin-hadoop2.6" + template_vars: + spark_version: 2.1.0 + # Path to spark-submit applicatoin + spark_submit: "%(SPARK_HOME)s/bin/spark-submit" + # Local path to zip'd virtualenv which will be shipped to executors + mjolnir_venv_zip: "%(mjolnir_dir)s/mjolnir_venv.zip" + # Local path to python script for running mjolnir utilities + mjolnir_utility_path: "%(mjolnir_dir)s/venv/bin/mjolnir-utilities.py" + # Path inside hdfs to the training data + training_data_path: "user/%(USER)s/mjolnir/%(marker)s" + # Fully qualified HDFS path to the training data + hdfs_training_data_path: "hdfs://analytics-hadoop/%(training_data_path)s" + # Fully qualified local path to the training data + local_training_data_path: "/mnt/hdfs/%(training_data_path)s" + # Base directory used to build path to write training output to + base_training_output_dir: "%(HOME)s/training_size" + # Number of cpu cores to assign per task. Must be a multiple of + # cores_per_executor. Spark can't take advantage of this being > 1, but + # xgboost can. + cores_per_task: 1 + # Number of cpu cores to request per executor. If cores_per_task is + # less than this spark will run multiple tasks per executor in separate + # threads. + cores_per_executor: 1 + # Size of JVM heap on executors + executor_memory: 2G + # Additional memory allocated by yarn beyond executor_memory. This + # accounts for off-heap data structures both in the JVM itself, and + # those created via JNI for xgboost. Primarily this is set here so it + # can be overridden from the command line. + executor_memory_overhead: 512 + # Used by the data pipeline to decide the minimum number of sessions + # to group together. Setting this too high on low volume wikis will + # result in little to no training data. + min_sessions_per_query: 10 + # Files that must exist to run + paths: + dir_exist: !!set + ? "%(SPARK_CONF_DIR)s" + file_exist: !!set + ? "%(mjolnir_venv_zip)s" + ? "%(mjolnir_utility_path)s" + ? "%(spark_submit)s" + ? "%(PYSPARK_PYTHON)s" + spark_args: + master: yarn + # TODO: When is this necessary? + files: /usr/lib/libhdfs.so.0.0.0 + # Ship the mjolnir virtualenv to executors and decompress it to ./venv + archives: "%(mjolnir_venv_zip)s#venv" + executor-cores: "%(cores_per_executor)s" + executor-memory: "%(executor_memory)s" + # Source our jvm dependencies from archiva. + repositories: https://archiva.wikimedia.org/repository/releases,https://archiva.wikimedia.org/repository/snapshots,https://archiva.wikimedia.org/repository/mirrored + packages: ml.dmlc:xgboost4j-spark:0.7-wmf-1,org.wikimedia.search:mjolnir:0.2,org.apache.spark:spark-streaming-kafka-0-8_2.11:%(spark_version)s + spark_conf: + spark.task.cpus: "%(cores_per_task)s" + spark.yarn.executor.memoryOverhead: "%(executor_memory_overhead)s" + # While undesirable, we can't disable the public (maven central) repository + # until spark 2.2, which depends on java 8 (and our cluster is on java 7 still) + spark.driver.extraJavaOptions: "-Dhttp.proxyHost=webproxy.eqiad.wmnet -Dhttp.proxyPort=8080 -Dhttps.proxyHost=webproxy.eqiad.wmnet -Dhttps.proxyPort=8080" + commands: + pyspark: + spark_command: "%(SPARK_HOME)s/bin/pyspark" + # Shell used to test model training + pyspark_train: + spark_command: "%(SPARK_HOME)s/bin/pyspark" + template_vars: + cores_per_executor: 4 + cores_per_task: 4 + executor_memory: 2G + executor_memory_overhead: 6144 + data_pipeline: + spark_command: "%(SPARK_HOME)s/bin/spark-submit" + mjolnir_utility_path: "%(mjolnir_utility_path)s" + mjolnir_utility: data_pipeline + paths: + dir_not_exist: !!set + ? "%(local_training_data_path)s" + cmd_args: + input: hdfs://analytics-hadoop/wmf/data/discovery/query_clicks/daily/year=*/month=*/day=* + output-dir: "%(hdfs_training_data_path)s" + # Maximum number of training observations per-wiki. We usually get a bit less + # than requestsed, 35M turns into 25 or 30M. + samples-per-wiki: 35000000 + search-cluster: codfw + min-sessions: "%(min_sessions_per_query)s" + training_pipeline: + spark_command: "%(SPARK_HOME)s/bin/spark-submit" + mjolnir_utility_path: "%(mjolnir_utility_path)s" + mjolnir_utility: training_pipeline + paths: + dir_exist: !!set + # TODO: Would be nice if we could specify paths.dir_exist for + # input training data, but it's evaluated before data_pipeline is + # called when doing collect_and_train. + ? "%(base_training_output_dir)s" + spark_args: + driver-memory: 3G + spark_conf: + # Disabling auto broadcast join prevents memory explosion when spark + # mis-predicts the size of a dataframe. (where does this happen?) + spark.sql.autoBroadcastJoinThreshold: -1 + # Adjusting up executor idle timeout from 60s to 180s is a bit greedy, + # but prevents a whole bunch of log spam from spark killing executors + # between CV runs + spark.dynamicAllocation.executorIdleTimeout: 180s + cmd_args: + input: "%(hdfs_training_data_path)s" + output: "%(base_training_output_dir)s/%(marker)s_%(profile_name)s" + +# Individual training groups +profiles: + large: + # 12M to 30M observations. 4M to 12M per executor. + # Approximately 63 executors, 378 cores, 756GB memory + wikis: + - enwiki + - dewiki + commands: + training_pipeline: + template_vars: + cores_per_executor: 6 + cores_per_task: 6 + executor_memory: 3G + executor_memory_overhead: 9216 + spark_conf: + spark.dynamicAllocation.maxExecutors: 65 + cmd_args: + workers: 3 + cv-jobs: 22 + folds: 3 + final-trees: 100 + + medium: + # 4M to 12M observations per executor. + # Approximately 70 executors, 420 cores, 840GB memory + wikis: + - itwiki + - ptwiki + - frwiki + - ruwiki + commands: + training_pipeline: + template_vars: + cores_per_executor: 6 + cores_per_task: 6 + executor_memory: 3G + executor_memory_overhead: 9216 + spark_conf: + spark.dynamicAllocation.maxExecutors: 75 + cmd_args: + workers: 1 + cv-jobs: 70 + folds: 5 + final-trees: 100 + + small: + # 100k to 4M observations per executor. Way overprovisioned + # Approximately 100 executors, 400 cores, 800G memory. + wikis: + - svwiki + - fawiki + - idwiki + - viwiki + - nowiki + - hewiki + - kowiki + - fiwiki + - jawiki + - arwiki + - itwiki + - nlwiki + - zhwiki + - plwiki + commands: + training_pipeline: + template_vars: + cores_per_executor: 4 + cores_per_task: 4 + executor_memory: 2G + executor_memory_overhead: 6144 + spark_conf: + spark.dynamicAllocation.maxExecutors: 105 + cmd_args: + workers: 1 + cv-jobs: 100 + folds: 5 + final-trees: 500 diff --git a/mjolnir/__main__.py b/mjolnir/__main__.py index ae56958..f1b2c8f 100644 --- a/mjolnir/__main__.py +++ b/mjolnir/__main__.py @@ -3,8 +3,8 @@ Utilities: -* train Executes the full data + training pipeline based -* on a configuration definition +* spark Wrapper around spark-submit and a configuration file +* to run commands in expected environments. * upload Upload trained models to elasticsearch * data_pipeline Individual spark job for converting click data into * labeled training data. diff --git a/mjolnir/test/conftest.py b/mjolnir/test/conftest.py index 0c16fa4..dff4406 100644 --- a/mjolnir/test/conftest.py +++ b/mjolnir/test/conftest.py @@ -35,21 +35,20 @@ SparkContext for tests """ - # TODO: This is much too specialized to the vagrant test environment quiet_log4j() + # Pull appropriate jvm dependencies from archiva. Would be nice + # if we could provide this in SparkConf, but in 2.1.x there isn't + # a way. + os.environ['PYSPARK_SUBMIT_ARGS'] = '--repositories %s pyspark-shell' % ( + ','.join(['https://archiva.wikimedia.org/repository/%s' % (repo) + for repo in ['releases', 'snapshots', 'mirrored']])) conf = ( SparkConf() .setMaster("local[2]") .setAppName("pytest-pyspark-local-testing") - # Pull appropriate jvm dependencies from archiva. - # TODO: How to use a local version of mjolnir jar? - .set('spark.jars.ivy', ','.join([ - 'https://archiva.wikimedia.org/repository/releases', - 'https://archiva.wikimedia.org/repository/snapshots', - 'https://archiva.wikimedia.org/repository/mirrored'])) # Maven coordinates of jvm dependencies .set('spark.jars.packages', ','.join([ - 'ml.dmlc.xgboost4j-spark:0.7-wmf-1', + 'ml.dmlc:xgboost4j-spark:0.7-wmf-1', 'org.wikimedia.search:mjolnir:0.2', 'org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0'])) # By default spark will shuffle to 200 partitions, which is diff --git a/mjolnir/test/fixtures/build_mjolnir_utility/basic.expect b/mjolnir/test/fixtures/build_mjolnir_utility/basic.expect new file mode 100644 index 0000000..3bd5331 --- /dev/null +++ b/mjolnir/test/fixtures/build_mjolnir_utility/basic.expect @@ -0,0 +1,10 @@ +- venv/bin/mjolnir-utilities.py +- data_pipeline +- --input +- hdfs://some-cluster/path/to/my/data/input +- --output-dir +- hdfs://some-cluster/path/to/my/data/output +- --samples-per-wiki +- '123456789' +- --search-cluster +- codfw diff --git a/mjolnir/test/fixtures/build_mjolnir_utility/basic.test b/mjolnir/test/fixtures/build_mjolnir_utility/basic.test new file mode 100644 index 0000000..5325657 --- /dev/null +++ b/mjolnir/test/fixtures/build_mjolnir_utility/basic.test @@ -0,0 +1,10 @@ +mjolnir_utility_path: venv/bin/mjolnir-utilities.py +mjolnir_utility: data_pipeline +cmd_args: + input: hdfs://some-cluster/path/to/my/data/input + output-dir: hdfs://some-cluster/path/to/my/data/output + samples-per-wiki: 123456789 + search-cluster: codfw + + + diff --git a/mjolnir/test/fixtures/build_mjolnir_utility/minimum.expect b/mjolnir/test/fixtures/build_mjolnir_utility/minimum.expect new file mode 100644 index 0000000..b2cc2c1 --- /dev/null +++ b/mjolnir/test/fixtures/build_mjolnir_utility/minimum.expect @@ -0,0 +1,2 @@ +- venv/bin/mjolnir-utilities.py +- data_pipeline diff --git a/mjolnir/test/fixtures/build_mjolnir_utility/minimum.test b/mjolnir/test/fixtures/build_mjolnir_utility/minimum.test new file mode 100644 index 0000000..dd4cd91 --- /dev/null +++ b/mjolnir/test/fixtures/build_mjolnir_utility/minimum.test @@ -0,0 +1,4 @@ +mjolnir_utility_path: venv/bin/mjolnir-utilities.py +mjolnir_utility: data_pipeline +cmd_args: {} + diff --git a/mjolnir/test/fixtures/build_mjolnir_utility/multiple_args.expect b/mjolnir/test/fixtures/build_mjolnir_utility/multiple_args.expect new file mode 100644 index 0000000..32e2b7b --- /dev/null +++ b/mjolnir/test/fixtures/build_mjolnir_utility/multiple_args.expect @@ -0,0 +1,6 @@ +- venv/bin/mjolnir-utilities.py +- data_pipeline +- --foo +- bar +- --foo +- baz diff --git a/mjolnir/test/fixtures/build_mjolnir_utility/multiple_args.test b/mjolnir/test/fixtures/build_mjolnir_utility/multiple_args.test new file mode 100644 index 0000000..8017f59 --- /dev/null +++ b/mjolnir/test/fixtures/build_mjolnir_utility/multiple_args.test @@ -0,0 +1,7 @@ +mjolnir_utility_path: venv/bin/mjolnir-utilities.py +mjolnir_utility: data_pipeline +cmd_args: + foo: + - bar + - baz + diff --git a/mjolnir/test/fixtures/build_spark_command/fully_featured.expect b/mjolnir/test/fixtures/build_spark_command/fully_featured.expect new file mode 100644 index 0000000..886ef43 --- /dev/null +++ b/mjolnir/test/fixtures/build_spark_command/fully_featured.expect @@ -0,0 +1,11 @@ +- /some/where/that/has/spark-submit +- --conf +- org.apache.spark.foo=bar +- --conf +- some.other.path=has data +- --archives +- wozers.zip#my_stuff +- --driver-class-path +- /some/path/foo.jar,/other/place/bang.jar +- --jars +- /some/path/foo.jar,/other/place/bang.jar diff --git a/mjolnir/test/fixtures/build_spark_command/fully_featured.test b/mjolnir/test/fixtures/build_spark_command/fully_featured.test new file mode 100644 index 0000000..b6ce108 --- /dev/null +++ b/mjolnir/test/fixtures/build_spark_command/fully_featured.test @@ -0,0 +1,9 @@ +spark_command: /some/where/that/has/spark-submit +spark_conf: + org.apache.spark.foo: bar + some.other.path: has data +spark_args: + jars: /some/path/foo.jar,/other/place/bang.jar + driver-class-path: /some/path/foo.jar,/other/place/bang.jar + archives: wozers.zip#my_stuff + diff --git a/mjolnir/test/fixtures/build_spark_command/minimum.expect b/mjolnir/test/fixtures/build_spark_command/minimum.expect new file mode 100644 index 0000000..ee5b439 --- /dev/null +++ b/mjolnir/test/fixtures/build_spark_command/minimum.expect @@ -0,0 +1 @@ +- /path/to/pyspark diff --git a/mjolnir/test/fixtures/build_spark_command/minimum.test b/mjolnir/test/fixtures/build_spark_command/minimum.test new file mode 100644 index 0000000..8a6fc79 --- /dev/null +++ b/mjolnir/test/fixtures/build_spark_command/minimum.test @@ -0,0 +1 @@ +spark_command: /path/to/pyspark diff --git a/mjolnir/test/fixtures/load_config/applies_templating.expect b/mjolnir/test/fixtures/load_config/applies_templating.expect new file mode 100644 index 0000000..a1cc5bc --- /dev/null +++ b/mjolnir/test/fixtures/load_config/applies_templating.expect @@ -0,0 +1,14 @@ +global_config: + commands: {} + wikis: [] +profiles: + basic: + commands: + foo: + environment: + HOME: /home/pytest + USER: pytest + thing: foobar + keys_can_be_about_anything: + jars: that/foobar/qwerty/hithere/thing.jar + wikis: [] diff --git a/mjolnir/test/fixtures/load_config/applies_templating.test b/mjolnir/test/fixtures/load_config/applies_templating.test new file mode 100644 index 0000000..1985faf --- /dev/null +++ b/mjolnir/test/fixtures/load_config/applies_templating.test @@ -0,0 +1,16 @@ +# Most basic test that environment and template vars are available for templating +global: + environment: + thing: foobar + template_vars: + keyboard: qwerty + this: that/%(thing)s/%(keyboard)s +profiles: + basic: + template_vars: + some: thing + commands: + foo: + keys_can_be_about_anything: + jars: "%(this)s/hithere/%(some)s.jar" + diff --git a/mjolnir/test/fixtures/load_config/example_train.expect b/mjolnir/test/fixtures/load_config/example_train.expect new file mode 100644 index 0000000..a423aa4 --- /dev/null +++ b/mjolnir/test/fixtures/load_config/example_train.expect @@ -0,0 +1,588 @@ +global_config: + commands: + data_pipeline: + cmd_args: + input: hdfs://analytics-hadoop/wmf/data/discovery/query_clicks/daily/year=*/month=*/day=* + min-sessions: '10' + output-dir: hdfs://analytics-hadoop/user/pytest/mjolnir/marker + samples-per-wiki: '35000000' + search-cluster: codfw + environment: + HOME: /home/pytest + PYSPARK_PYTHON: venv/bin/python + SPARK_CONF_DIR: /etc/spark/conf + SPARK_HOME: /home/pytest/spark-2.1.0-bin-hadoop2.6 + USER: pytest + mjolnir_utility: data_pipeline + mjolnir_utility_path: /vagrant/venv/bin/mjolnir-utilities.py + paths: + dir_exist: !!set + /etc/spark/conf: null + dir_not_exist: !!set + /mnt/hdfs/user/pytest/mjolnir/marker: null + file_exist: !!set + /home/pytest/spark-2.1.0-bin-hadoop2.6/bin/spark-submit: null + /vagrant/mjolnir_venv.zip: null + /vagrant/venv/bin/mjolnir-utilities.py: null + venv/bin/python: null + spark_args: + archives: /vagrant/mjolnir_venv.zip#venv + executor-cores: '1' + executor-memory: 2G + files: /usr/lib/libhdfs.so.0.0.0 + master: yarn + packages: ml.dmlc:xgboost4j-spark:0.7-wmf-1,org.wikimedia.search:mjolnir:0.2,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0 + repositories: https://archiva.wikimedia.org/repository/releases,https://archiva.wikimedia.org/repository/snapshots,https://archiva.wikimedia.org/repository/mirrored + spark_command: /home/pytest/spark-2.1.0-bin-hadoop2.6/bin/spark-submit + spark_conf: + spark.driver.extraJavaOptions: -Dhttp.proxyHost=webproxy.eqiad.wmnet -Dhttp.proxyPort=8080 + -Dhttps.proxyHost=webproxy.eqiad.wmnet -Dhttps.proxyPort=8080 + spark.task.cpus: '1' + spark.yarn.executor.memoryOverhead: '512' + pyspark: + environment: + HOME: /home/pytest + PYSPARK_PYTHON: venv/bin/python + SPARK_CONF_DIR: /etc/spark/conf + SPARK_HOME: /home/pytest/spark-2.1.0-bin-hadoop2.6 + USER: pytest + paths: + dir_exist: !!set + /etc/spark/conf: null + file_exist: !!set + /home/pytest/spark-2.1.0-bin-hadoop2.6/bin/spark-submit: null + /vagrant/mjolnir_venv.zip: null + /vagrant/venv/bin/mjolnir-utilities.py: null + venv/bin/python: null + spark_args: + archives: /vagrant/mjolnir_venv.zip#venv + executor-cores: '1' + executor-memory: 2G + files: /usr/lib/libhdfs.so.0.0.0 + master: yarn + packages: ml.dmlc:xgboost4j-spark:0.7-wmf-1,org.wikimedia.search:mjolnir:0.2,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0 + repositories: https://archiva.wikimedia.org/repository/releases,https://archiva.wikimedia.org/repository/snapshots,https://archiva.wikimedia.org/repository/mirrored + spark_command: /home/pytest/spark-2.1.0-bin-hadoop2.6/bin/pyspark + spark_conf: + spark.driver.extraJavaOptions: -Dhttp.proxyHost=webproxy.eqiad.wmnet -Dhttp.proxyPort=8080 + -Dhttps.proxyHost=webproxy.eqiad.wmnet -Dhttps.proxyPort=8080 + spark.task.cpus: '1' + spark.yarn.executor.memoryOverhead: '512' + pyspark_train: + environment: + HOME: /home/pytest + PYSPARK_PYTHON: venv/bin/python + SPARK_CONF_DIR: /etc/spark/conf + SPARK_HOME: /home/pytest/spark-2.1.0-bin-hadoop2.6 + USER: pytest + paths: + dir_exist: !!set + /etc/spark/conf: null + file_exist: !!set + /home/pytest/spark-2.1.0-bin-hadoop2.6/bin/spark-submit: null + /vagrant/mjolnir_venv.zip: null + /vagrant/venv/bin/mjolnir-utilities.py: null + venv/bin/python: null + spark_args: + archives: /vagrant/mjolnir_venv.zip#venv + executor-cores: '4' + executor-memory: 2G + files: /usr/lib/libhdfs.so.0.0.0 + master: yarn + packages: ml.dmlc:xgboost4j-spark:0.7-wmf-1,org.wikimedia.search:mjolnir:0.2,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0 + repositories: https://archiva.wikimedia.org/repository/releases,https://archiva.wikimedia.org/repository/snapshots,https://archiva.wikimedia.org/repository/mirrored + spark_command: /home/pytest/spark-2.1.0-bin-hadoop2.6/bin/pyspark + spark_conf: + spark.driver.extraJavaOptions: -Dhttp.proxyHost=webproxy.eqiad.wmnet -Dhttp.proxyPort=8080 + -Dhttps.proxyHost=webproxy.eqiad.wmnet -Dhttps.proxyPort=8080 + spark.task.cpus: '4' + spark.yarn.executor.memoryOverhead: '6144' + training_pipeline: + cmd_args: + input: hdfs://analytics-hadoop/user/pytest/mjolnir/marker + output: /home/pytest/training_size/marker_global + environment: + HOME: /home/pytest + PYSPARK_PYTHON: venv/bin/python + SPARK_CONF_DIR: /etc/spark/conf + SPARK_HOME: /home/pytest/spark-2.1.0-bin-hadoop2.6 + USER: pytest + mjolnir_utility: training_pipeline + mjolnir_utility_path: /vagrant/venv/bin/mjolnir-utilities.py + paths: + dir_exist: !!set + /etc/spark/conf: null + /home/pytest/training_size: null + file_exist: !!set + /home/pytest/spark-2.1.0-bin-hadoop2.6/bin/spark-submit: null + /vagrant/mjolnir_venv.zip: null + /vagrant/venv/bin/mjolnir-utilities.py: null + venv/bin/python: null + spark_args: + archives: /vagrant/mjolnir_venv.zip#venv + driver-memory: 3G + executor-cores: '1' + executor-memory: 2G + files: /usr/lib/libhdfs.so.0.0.0 + master: yarn + packages: ml.dmlc:xgboost4j-spark:0.7-wmf-1,org.wikimedia.search:mjolnir:0.2,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0 + repositories: https://archiva.wikimedia.org/repository/releases,https://archiva.wikimedia.org/repository/snapshots,https://archiva.wikimedia.org/repository/mirrored + spark_command: /home/pytest/spark-2.1.0-bin-hadoop2.6/bin/spark-submit + spark_conf: + spark.driver.extraJavaOptions: -Dhttp.proxyHost=webproxy.eqiad.wmnet -Dhttp.proxyPort=8080 + -Dhttps.proxyHost=webproxy.eqiad.wmnet -Dhttps.proxyPort=8080 + spark.dynamicAllocation.executorIdleTimeout: 180s + spark.sql.autoBroadcastJoinThreshold: '-1' + spark.task.cpus: '1' + spark.yarn.executor.memoryOverhead: '512' + wikis: [] +profiles: + large: + commands: + data_pipeline: + cmd_args: + input: hdfs://analytics-hadoop/wmf/data/discovery/query_clicks/daily/year=*/month=*/day=* + min-sessions: '10' + output-dir: hdfs://analytics-hadoop/user/pytest/mjolnir/marker + samples-per-wiki: '35000000' + search-cluster: codfw + environment: + HOME: /home/pytest + PYSPARK_PYTHON: venv/bin/python + SPARK_CONF_DIR: /etc/spark/conf + SPARK_HOME: /home/pytest/spark-2.1.0-bin-hadoop2.6 + USER: pytest + mjolnir_utility: data_pipeline + mjolnir_utility_path: /vagrant/venv/bin/mjolnir-utilities.py + paths: + dir_exist: !!set + /etc/spark/conf: null + dir_not_exist: !!set + /mnt/hdfs/user/pytest/mjolnir/marker: null + file_exist: !!set + /home/pytest/spark-2.1.0-bin-hadoop2.6/bin/spark-submit: null + /vagrant/mjolnir_venv.zip: null + /vagrant/venv/bin/mjolnir-utilities.py: null + venv/bin/python: null + spark_args: + archives: /vagrant/mjolnir_venv.zip#venv + executor-cores: '1' + executor-memory: 2G + files: /usr/lib/libhdfs.so.0.0.0 + master: yarn + packages: ml.dmlc:xgboost4j-spark:0.7-wmf-1,org.wikimedia.search:mjolnir:0.2,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0 + repositories: https://archiva.wikimedia.org/repository/releases,https://archiva.wikimedia.org/repository/snapshots,https://archiva.wikimedia.org/repository/mirrored + spark_command: /home/pytest/spark-2.1.0-bin-hadoop2.6/bin/spark-submit + spark_conf: + spark.driver.extraJavaOptions: -Dhttp.proxyHost=webproxy.eqiad.wmnet -Dhttp.proxyPort=8080 + -Dhttps.proxyHost=webproxy.eqiad.wmnet -Dhttps.proxyPort=8080 + spark.task.cpus: '1' + spark.yarn.executor.memoryOverhead: '512' + pyspark: + environment: + HOME: /home/pytest + PYSPARK_PYTHON: venv/bin/python + SPARK_CONF_DIR: /etc/spark/conf + SPARK_HOME: /home/pytest/spark-2.1.0-bin-hadoop2.6 + USER: pytest + paths: + dir_exist: !!set + /etc/spark/conf: null + file_exist: !!set + /home/pytest/spark-2.1.0-bin-hadoop2.6/bin/spark-submit: null + /vagrant/mjolnir_venv.zip: null + /vagrant/venv/bin/mjolnir-utilities.py: null + venv/bin/python: null + spark_args: + archives: /vagrant/mjolnir_venv.zip#venv + executor-cores: '1' + executor-memory: 2G + files: /usr/lib/libhdfs.so.0.0.0 + master: yarn + packages: ml.dmlc:xgboost4j-spark:0.7-wmf-1,org.wikimedia.search:mjolnir:0.2,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0 + repositories: https://archiva.wikimedia.org/repository/releases,https://archiva.wikimedia.org/repository/snapshots,https://archiva.wikimedia.org/repository/mirrored + spark_command: /home/pytest/spark-2.1.0-bin-hadoop2.6/bin/pyspark + spark_conf: + spark.driver.extraJavaOptions: -Dhttp.proxyHost=webproxy.eqiad.wmnet -Dhttp.proxyPort=8080 + -Dhttps.proxyHost=webproxy.eqiad.wmnet -Dhttps.proxyPort=8080 + spark.task.cpus: '1' + spark.yarn.executor.memoryOverhead: '512' + pyspark_train: + environment: + HOME: /home/pytest + PYSPARK_PYTHON: venv/bin/python + SPARK_CONF_DIR: /etc/spark/conf + SPARK_HOME: /home/pytest/spark-2.1.0-bin-hadoop2.6 + USER: pytest + paths: + dir_exist: !!set + /etc/spark/conf: null + file_exist: !!set + /home/pytest/spark-2.1.0-bin-hadoop2.6/bin/spark-submit: null + /vagrant/mjolnir_venv.zip: null + /vagrant/venv/bin/mjolnir-utilities.py: null + venv/bin/python: null + spark_args: + archives: /vagrant/mjolnir_venv.zip#venv + executor-cores: '4' + executor-memory: 2G + files: /usr/lib/libhdfs.so.0.0.0 + master: yarn + packages: ml.dmlc:xgboost4j-spark:0.7-wmf-1,org.wikimedia.search:mjolnir:0.2,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0 + repositories: https://archiva.wikimedia.org/repository/releases,https://archiva.wikimedia.org/repository/snapshots,https://archiva.wikimedia.org/repository/mirrored + spark_command: /home/pytest/spark-2.1.0-bin-hadoop2.6/bin/pyspark + spark_conf: + spark.driver.extraJavaOptions: -Dhttp.proxyHost=webproxy.eqiad.wmnet -Dhttp.proxyPort=8080 + -Dhttps.proxyHost=webproxy.eqiad.wmnet -Dhttps.proxyPort=8080 + spark.task.cpus: '4' + spark.yarn.executor.memoryOverhead: '6144' + training_pipeline: + cmd_args: + cv-jobs: '22' + final-trees: '100' + folds: '3' + input: hdfs://analytics-hadoop/user/pytest/mjolnir/marker + output: /home/pytest/training_size/marker_large + workers: '3' + environment: + HOME: /home/pytest + PYSPARK_PYTHON: venv/bin/python + SPARK_CONF_DIR: /etc/spark/conf + SPARK_HOME: /home/pytest/spark-2.1.0-bin-hadoop2.6 + USER: pytest + mjolnir_utility: training_pipeline + mjolnir_utility_path: /vagrant/venv/bin/mjolnir-utilities.py + paths: + dir_exist: !!set + /etc/spark/conf: null + /home/pytest/training_size: null + file_exist: !!set + /home/pytest/spark-2.1.0-bin-hadoop2.6/bin/spark-submit: null + /vagrant/mjolnir_venv.zip: null + /vagrant/venv/bin/mjolnir-utilities.py: null + venv/bin/python: null + spark_args: + archives: /vagrant/mjolnir_venv.zip#venv + driver-memory: 3G + executor-cores: '6' + executor-memory: 3G + files: /usr/lib/libhdfs.so.0.0.0 + master: yarn + packages: ml.dmlc:xgboost4j-spark:0.7-wmf-1,org.wikimedia.search:mjolnir:0.2,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0 + repositories: https://archiva.wikimedia.org/repository/releases,https://archiva.wikimedia.org/repository/snapshots,https://archiva.wikimedia.org/repository/mirrored + spark_command: /home/pytest/spark-2.1.0-bin-hadoop2.6/bin/spark-submit + spark_conf: + spark.driver.extraJavaOptions: -Dhttp.proxyHost=webproxy.eqiad.wmnet -Dhttp.proxyPort=8080 + -Dhttps.proxyHost=webproxy.eqiad.wmnet -Dhttps.proxyPort=8080 + spark.dynamicAllocation.executorIdleTimeout: 180s + spark.dynamicAllocation.maxExecutors: '65' + spark.sql.autoBroadcastJoinThreshold: '-1' + spark.task.cpus: '6' + spark.yarn.executor.memoryOverhead: '9216' + wikis: + - enwiki + - dewiki + medium: + commands: + data_pipeline: + cmd_args: + input: hdfs://analytics-hadoop/wmf/data/discovery/query_clicks/daily/year=*/month=*/day=* + min-sessions: '10' + output-dir: hdfs://analytics-hadoop/user/pytest/mjolnir/marker + samples-per-wiki: '35000000' + search-cluster: codfw + environment: + HOME: /home/pytest + PYSPARK_PYTHON: venv/bin/python + SPARK_CONF_DIR: /etc/spark/conf + SPARK_HOME: /home/pytest/spark-2.1.0-bin-hadoop2.6 + USER: pytest + mjolnir_utility: data_pipeline + mjolnir_utility_path: /vagrant/venv/bin/mjolnir-utilities.py + paths: + dir_exist: !!set + /etc/spark/conf: null + dir_not_exist: !!set + /mnt/hdfs/user/pytest/mjolnir/marker: null + file_exist: !!set + /home/pytest/spark-2.1.0-bin-hadoop2.6/bin/spark-submit: null + /vagrant/mjolnir_venv.zip: null + /vagrant/venv/bin/mjolnir-utilities.py: null + venv/bin/python: null + spark_args: + archives: /vagrant/mjolnir_venv.zip#venv + executor-cores: '1' + executor-memory: 2G + files: /usr/lib/libhdfs.so.0.0.0 + master: yarn + packages: ml.dmlc:xgboost4j-spark:0.7-wmf-1,org.wikimedia.search:mjolnir:0.2,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0 + repositories: https://archiva.wikimedia.org/repository/releases,https://archiva.wikimedia.org/repository/snapshots,https://archiva.wikimedia.org/repository/mirrored + spark_command: /home/pytest/spark-2.1.0-bin-hadoop2.6/bin/spark-submit + spark_conf: + spark.driver.extraJavaOptions: -Dhttp.proxyHost=webproxy.eqiad.wmnet -Dhttp.proxyPort=8080 + -Dhttps.proxyHost=webproxy.eqiad.wmnet -Dhttps.proxyPort=8080 + spark.task.cpus: '1' + spark.yarn.executor.memoryOverhead: '512' + pyspark: + environment: + HOME: /home/pytest + PYSPARK_PYTHON: venv/bin/python + SPARK_CONF_DIR: /etc/spark/conf + SPARK_HOME: /home/pytest/spark-2.1.0-bin-hadoop2.6 + USER: pytest + paths: + dir_exist: !!set + /etc/spark/conf: null + file_exist: !!set + /home/pytest/spark-2.1.0-bin-hadoop2.6/bin/spark-submit: null + /vagrant/mjolnir_venv.zip: null + /vagrant/venv/bin/mjolnir-utilities.py: null + venv/bin/python: null + spark_args: + archives: /vagrant/mjolnir_venv.zip#venv + executor-cores: '1' + executor-memory: 2G + files: /usr/lib/libhdfs.so.0.0.0 + master: yarn + packages: ml.dmlc:xgboost4j-spark:0.7-wmf-1,org.wikimedia.search:mjolnir:0.2,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0 + repositories: https://archiva.wikimedia.org/repository/releases,https://archiva.wikimedia.org/repository/snapshots,https://archiva.wikimedia.org/repository/mirrored + spark_command: /home/pytest/spark-2.1.0-bin-hadoop2.6/bin/pyspark + spark_conf: + spark.driver.extraJavaOptions: -Dhttp.proxyHost=webproxy.eqiad.wmnet -Dhttp.proxyPort=8080 + -Dhttps.proxyHost=webproxy.eqiad.wmnet -Dhttps.proxyPort=8080 + spark.task.cpus: '1' + spark.yarn.executor.memoryOverhead: '512' + pyspark_train: + environment: + HOME: /home/pytest + PYSPARK_PYTHON: venv/bin/python + SPARK_CONF_DIR: /etc/spark/conf + SPARK_HOME: /home/pytest/spark-2.1.0-bin-hadoop2.6 + USER: pytest + paths: + dir_exist: !!set + /etc/spark/conf: null + file_exist: !!set + /home/pytest/spark-2.1.0-bin-hadoop2.6/bin/spark-submit: null + /vagrant/mjolnir_venv.zip: null + /vagrant/venv/bin/mjolnir-utilities.py: null + venv/bin/python: null + spark_args: + archives: /vagrant/mjolnir_venv.zip#venv + executor-cores: '4' + executor-memory: 2G + files: /usr/lib/libhdfs.so.0.0.0 + master: yarn + packages: ml.dmlc:xgboost4j-spark:0.7-wmf-1,org.wikimedia.search:mjolnir:0.2,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0 + repositories: https://archiva.wikimedia.org/repository/releases,https://archiva.wikimedia.org/repository/snapshots,https://archiva.wikimedia.org/repository/mirrored + spark_command: /home/pytest/spark-2.1.0-bin-hadoop2.6/bin/pyspark + spark_conf: + spark.driver.extraJavaOptions: -Dhttp.proxyHost=webproxy.eqiad.wmnet -Dhttp.proxyPort=8080 + -Dhttps.proxyHost=webproxy.eqiad.wmnet -Dhttps.proxyPort=8080 + spark.task.cpus: '4' + spark.yarn.executor.memoryOverhead: '6144' + training_pipeline: + cmd_args: + cv-jobs: '70' + final-trees: '100' + folds: '5' + input: hdfs://analytics-hadoop/user/pytest/mjolnir/marker + output: /home/pytest/training_size/marker_medium + workers: '1' + environment: + HOME: /home/pytest + PYSPARK_PYTHON: venv/bin/python + SPARK_CONF_DIR: /etc/spark/conf + SPARK_HOME: /home/pytest/spark-2.1.0-bin-hadoop2.6 + USER: pytest + mjolnir_utility: training_pipeline + mjolnir_utility_path: /vagrant/venv/bin/mjolnir-utilities.py + paths: + dir_exist: !!set + /etc/spark/conf: null + /home/pytest/training_size: null + file_exist: !!set + /home/pytest/spark-2.1.0-bin-hadoop2.6/bin/spark-submit: null + /vagrant/mjolnir_venv.zip: null + /vagrant/venv/bin/mjolnir-utilities.py: null + venv/bin/python: null + spark_args: + archives: /vagrant/mjolnir_venv.zip#venv + driver-memory: 3G + executor-cores: '6' + executor-memory: 3G + files: /usr/lib/libhdfs.so.0.0.0 + master: yarn + packages: ml.dmlc:xgboost4j-spark:0.7-wmf-1,org.wikimedia.search:mjolnir:0.2,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0 + repositories: https://archiva.wikimedia.org/repository/releases,https://archiva.wikimedia.org/repository/snapshots,https://archiva.wikimedia.org/repository/mirrored + spark_command: /home/pytest/spark-2.1.0-bin-hadoop2.6/bin/spark-submit + spark_conf: + spark.driver.extraJavaOptions: -Dhttp.proxyHost=webproxy.eqiad.wmnet -Dhttp.proxyPort=8080 + -Dhttps.proxyHost=webproxy.eqiad.wmnet -Dhttps.proxyPort=8080 + spark.dynamicAllocation.executorIdleTimeout: 180s + spark.dynamicAllocation.maxExecutors: '75' + spark.sql.autoBroadcastJoinThreshold: '-1' + spark.task.cpus: '6' + spark.yarn.executor.memoryOverhead: '9216' + wikis: + - itwiki + - ptwiki + - frwiki + - ruwiki + small: + commands: + data_pipeline: + cmd_args: + input: hdfs://analytics-hadoop/wmf/data/discovery/query_clicks/daily/year=*/month=*/day=* + min-sessions: '10' + output-dir: hdfs://analytics-hadoop/user/pytest/mjolnir/marker + samples-per-wiki: '35000000' + search-cluster: codfw + environment: + HOME: /home/pytest + PYSPARK_PYTHON: venv/bin/python + SPARK_CONF_DIR: /etc/spark/conf + SPARK_HOME: /home/pytest/spark-2.1.0-bin-hadoop2.6 + USER: pytest + mjolnir_utility: data_pipeline + mjolnir_utility_path: /vagrant/venv/bin/mjolnir-utilities.py + paths: + dir_exist: !!set + /etc/spark/conf: null + dir_not_exist: !!set + /mnt/hdfs/user/pytest/mjolnir/marker: null + file_exist: !!set + /home/pytest/spark-2.1.0-bin-hadoop2.6/bin/spark-submit: null + /vagrant/mjolnir_venv.zip: null + /vagrant/venv/bin/mjolnir-utilities.py: null + venv/bin/python: null + spark_args: + archives: /vagrant/mjolnir_venv.zip#venv + executor-cores: '1' + executor-memory: 2G + files: /usr/lib/libhdfs.so.0.0.0 + master: yarn + packages: ml.dmlc:xgboost4j-spark:0.7-wmf-1,org.wikimedia.search:mjolnir:0.2,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0 + repositories: https://archiva.wikimedia.org/repository/releases,https://archiva.wikimedia.org/repository/snapshots,https://archiva.wikimedia.org/repository/mirrored + spark_command: /home/pytest/spark-2.1.0-bin-hadoop2.6/bin/spark-submit + spark_conf: + spark.driver.extraJavaOptions: -Dhttp.proxyHost=webproxy.eqiad.wmnet -Dhttp.proxyPort=8080 + -Dhttps.proxyHost=webproxy.eqiad.wmnet -Dhttps.proxyPort=8080 + spark.task.cpus: '1' + spark.yarn.executor.memoryOverhead: '512' + pyspark: + environment: + HOME: /home/pytest + PYSPARK_PYTHON: venv/bin/python + SPARK_CONF_DIR: /etc/spark/conf + SPARK_HOME: /home/pytest/spark-2.1.0-bin-hadoop2.6 + USER: pytest + paths: + dir_exist: !!set + /etc/spark/conf: null + file_exist: !!set + /home/pytest/spark-2.1.0-bin-hadoop2.6/bin/spark-submit: null + /vagrant/mjolnir_venv.zip: null + /vagrant/venv/bin/mjolnir-utilities.py: null + venv/bin/python: null + spark_args: + archives: /vagrant/mjolnir_venv.zip#venv + executor-cores: '1' + executor-memory: 2G + files: /usr/lib/libhdfs.so.0.0.0 + master: yarn + packages: ml.dmlc:xgboost4j-spark:0.7-wmf-1,org.wikimedia.search:mjolnir:0.2,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0 + repositories: https://archiva.wikimedia.org/repository/releases,https://archiva.wikimedia.org/repository/snapshots,https://archiva.wikimedia.org/repository/mirrored + spark_command: /home/pytest/spark-2.1.0-bin-hadoop2.6/bin/pyspark + spark_conf: + spark.driver.extraJavaOptions: -Dhttp.proxyHost=webproxy.eqiad.wmnet -Dhttp.proxyPort=8080 + -Dhttps.proxyHost=webproxy.eqiad.wmnet -Dhttps.proxyPort=8080 + spark.task.cpus: '1' + spark.yarn.executor.memoryOverhead: '512' + pyspark_train: + environment: + HOME: /home/pytest + PYSPARK_PYTHON: venv/bin/python + SPARK_CONF_DIR: /etc/spark/conf + SPARK_HOME: /home/pytest/spark-2.1.0-bin-hadoop2.6 + USER: pytest + paths: + dir_exist: !!set + /etc/spark/conf: null + file_exist: !!set + /home/pytest/spark-2.1.0-bin-hadoop2.6/bin/spark-submit: null + /vagrant/mjolnir_venv.zip: null + /vagrant/venv/bin/mjolnir-utilities.py: null + venv/bin/python: null + spark_args: + archives: /vagrant/mjolnir_venv.zip#venv + executor-cores: '4' + executor-memory: 2G + files: /usr/lib/libhdfs.so.0.0.0 + master: yarn + packages: ml.dmlc:xgboost4j-spark:0.7-wmf-1,org.wikimedia.search:mjolnir:0.2,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0 + repositories: https://archiva.wikimedia.org/repository/releases,https://archiva.wikimedia.org/repository/snapshots,https://archiva.wikimedia.org/repository/mirrored + spark_command: /home/pytest/spark-2.1.0-bin-hadoop2.6/bin/pyspark + spark_conf: + spark.driver.extraJavaOptions: -Dhttp.proxyHost=webproxy.eqiad.wmnet -Dhttp.proxyPort=8080 + -Dhttps.proxyHost=webproxy.eqiad.wmnet -Dhttps.proxyPort=8080 + spark.task.cpus: '4' + spark.yarn.executor.memoryOverhead: '6144' + training_pipeline: + cmd_args: + cv-jobs: '100' + final-trees: '500' + folds: '5' + input: hdfs://analytics-hadoop/user/pytest/mjolnir/marker + output: /home/pytest/training_size/marker_small + workers: '1' + environment: + HOME: /home/pytest + PYSPARK_PYTHON: venv/bin/python + SPARK_CONF_DIR: /etc/spark/conf + SPARK_HOME: /home/pytest/spark-2.1.0-bin-hadoop2.6 + USER: pytest + mjolnir_utility: training_pipeline + mjolnir_utility_path: /vagrant/venv/bin/mjolnir-utilities.py + paths: + dir_exist: !!set + /etc/spark/conf: null + /home/pytest/training_size: null + file_exist: !!set + /home/pytest/spark-2.1.0-bin-hadoop2.6/bin/spark-submit: null + /vagrant/mjolnir_venv.zip: null + /vagrant/venv/bin/mjolnir-utilities.py: null + venv/bin/python: null + spark_args: + archives: /vagrant/mjolnir_venv.zip#venv + driver-memory: 3G + executor-cores: '4' + executor-memory: 2G + files: /usr/lib/libhdfs.so.0.0.0 + master: yarn + packages: ml.dmlc:xgboost4j-spark:0.7-wmf-1,org.wikimedia.search:mjolnir:0.2,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0 + repositories: https://archiva.wikimedia.org/repository/releases,https://archiva.wikimedia.org/repository/snapshots,https://archiva.wikimedia.org/repository/mirrored + spark_command: /home/pytest/spark-2.1.0-bin-hadoop2.6/bin/spark-submit + spark_conf: + spark.driver.extraJavaOptions: -Dhttp.proxyHost=webproxy.eqiad.wmnet -Dhttp.proxyPort=8080 + -Dhttps.proxyHost=webproxy.eqiad.wmnet -Dhttps.proxyPort=8080 + spark.dynamicAllocation.executorIdleTimeout: 180s + spark.dynamicAllocation.maxExecutors: '105' + spark.sql.autoBroadcastJoinThreshold: '-1' + spark.task.cpus: '4' + spark.yarn.executor.memoryOverhead: '6144' + wikis: + - svwiki + - fawiki + - idwiki + - viwiki + - nowiki + - hewiki + - kowiki + - fiwiki + - jawiki + - arwiki + - itwiki + - nlwiki + - zhwiki + - plwiki diff --git a/mjolnir/test/fixtures/load_config/example_train.test b/mjolnir/test/fixtures/load_config/example_train.test new file mode 120000 index 0000000..0eb5d82 --- /dev/null +++ b/mjolnir/test/fixtures/load_config/example_train.test @@ -0,0 +1 @@ +../../../../example_train.yaml \ No newline at end of file diff --git a/mjolnir/test/fixtures/load_config/merge.expect b/mjolnir/test/fixtures/load_config/merge.expect new file mode 100644 index 0000000..f572943 --- /dev/null +++ b/mjolnir/test/fixtures/load_config/merge.expect @@ -0,0 +1,31 @@ +global_config: + commands: + some_command: + environment: + HOME: /home/pytest + USER: pytest + a: global + b: global + c: global + w: global + x: global + y: global_command + z: global_command + wikis: [] +profiles: + randomly_named_profile: + commands: + some_command: + environment: + HOME: /home/pytest + USER: pytest + a: global + b: profile + c: command + results_of_environment_merge: global profile command + results_of_template_merge: global profile command + w: global + x: profile + y: global_command + z: command + wikis: [] diff --git a/mjolnir/test/fixtures/load_config/merge.test b/mjolnir/test/fixtures/load_config/merge.test new file mode 100644 index 0000000..c48778e --- /dev/null +++ b/mjolnir/test/fixtures/load_config/merge.test @@ -0,0 +1,42 @@ +# Demonstrate correct merge of data from global +# to profile and then command. +global: + environment: + a: global + b: global + c: global + template_vars: + d: global + e: global + f: global + w: global + x: global + y: global + z: global + commands: + some_command: + y: global_command + z: global_command + +profiles: + randomly_named_profile: + environment: + b: profile + c: profile + template_vars: + e: profile + f: profile + x: profile + y: profile + z: profile + commands: + some_command: + environment: + c: command + template_vars: + f: command + results_of_environment_merge: "%(a)s %(b)s %(c)s" + results_of_template_merge: "%(d)s %(e)s %(f)s" + z: command + + diff --git a/mjolnir/test/fixtures/load_config/minimum_allowed.expect b/mjolnir/test/fixtures/load_config/minimum_allowed.expect new file mode 100644 index 0000000..3520af2 --- /dev/null +++ b/mjolnir/test/fixtures/load_config/minimum_allowed.expect @@ -0,0 +1,4 @@ +global_config: + commands: {} + wikis: [] +profiles: {} diff --git a/mjolnir/test/fixtures/load_config/minimum_allowed.test b/mjolnir/test/fixtures/load_config/minimum_allowed.test new file mode 100644 index 0000000..2dddb3b --- /dev/null +++ b/mjolnir/test/fixtures/load_config/minimum_allowed.test @@ -0,0 +1,5 @@ +# The mimimum parsable configuration file Not particularly useful, but asserts +# we aren't expecting particular things. +global: {} +profiles: {} + diff --git a/mjolnir/test/fixtures/load_config/minimum_allowed_command.expect b/mjolnir/test/fixtures/load_config/minimum_allowed_command.expect new file mode 100644 index 0000000..adcf861 --- /dev/null +++ b/mjolnir/test/fixtures/load_config/minimum_allowed_command.expect @@ -0,0 +1,11 @@ +global_config: + commands: {} + wikis: [] +profiles: + not_good_at_naming_things: + commands: + qux: + environment: + HOME: /home/pytest + USER: pytest + wikis: [] diff --git a/mjolnir/test/fixtures/load_config/minimum_allowed_command.test b/mjolnir/test/fixtures/load_config/minimum_allowed_command.test new file mode 100644 index 0000000..51c51b8 --- /dev/null +++ b/mjolnir/test/fixtures/load_config/minimum_allowed_command.test @@ -0,0 +1,8 @@ +# The mimimum parsable configuration file Not particularly useful, but asserts +# we aren't expecting particular things. +global: {} +profiles: + not_good_at_naming_things: + commands: + qux: {} + diff --git a/mjolnir/test/utilities/test_spark.py b/mjolnir/test/utilities/test_spark.py new file mode 100644 index 0000000..ccea823 --- /dev/null +++ b/mjolnir/test/utilities/test_spark.py @@ -0,0 +1,93 @@ +import glob +import mjolnir.utilities.spark +import os +import pytest +import yaml + + [email protected]("a,b,expected", [ + ({}, {}, {}), + ({'a': 1}, {}, {'a': 1}), + ({}, {'b': 2}, {'b': 2}), + ({'a': 2}, {'a': 3}, {'a': 3}), + ({'a': 1, 'b': 2}, {'b': 3}, {'a': 1, 'b': 3}), +]) +def test_dict_merge(a, b, expected): + assert mjolnir.utilities.spark.dict_merge(a, b) == expected + + [email protected]("template_vars,environment,expected", [ + # Most basic run through + ({}, {}, {}), + # Variables can reference each other + ({'a': 'foo%(b)s', 'b': 'bar'}, {}, {'a': 'foobar', 'b': 'bar'}), + # Variables can reference each other from different sources + ({'a': 'foo%(b)s'}, {'b': 'bar'}, {'a': 'foobar', 'b': 'bar'}), + # template vars take precedence over environment (and vars are cast to strings) + ({'a': 1}, {'a': 2}, {'a': '1'}), +]) +def test_build_template_vars(template_vars, environment, expected): + res = mjolnir.utilities.spark.build_template_vars(template_vars, environment, 'MARKER') + print res + # This varies depending on the runner, just drop it + del res['mjolnir_dir'] + # Not testing this for simplicity + del res['marker'] + assert res == expected + + +def sort_fixture(d): + if not isinstance(d, dict): + return d + return {k: sort_fixture(v) for k, v in sorted(d.items(), key=lambda x: x[0])} + + +def generate_fixtures(test_name): + # Load fixtures for test_load_config, it's too verbose to put inline + tests = [] + fixtures_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'fixtures') + for test_file in glob.glob(os.path.join(fixtures_dir, test_name, '*.test')): + expect_file = os.path.splitext(test_file)[0] + '.expect' + tests.append((test_file, expect_file)) + return ('test_file,expect_file', tests) + + +def compare_fixture(expect_file, generated): + as_yaml = yaml.dump(sort_fixture(generated), default_flow_style=False) + if os.path.isfile(expect_file): + with open(expect_file, 'r') as f: + current = sort_fixture(yaml.load(as_yaml)) + fixture = sort_fixture(yaml.load(f)) + assert current == fixture + else: + with open(expect_file, 'w') as f: + f.write(as_yaml) + + [email protected](*generate_fixtures('load_config')) +def test_load_config(monkeypatch, test_file, expect_file): + monkeypatch.setenv('HOME', '/home/pytest') + monkeypatch.setenv('USER', 'pytest') + + with open(test_file, 'r') as f: + global_config, profiles = mjolnir.utilities.spark.load_config(f, 'marker', {}) + compare_fixture(expect_file, { + 'global_config': global_config, + 'profiles': profiles + }) + + [email protected](*generate_fixtures('build_spark_command')) +def test_build_spark_command(test_file, expect_file): + with open(test_file, 'r') as f: + test = yaml.load(f) + cmd = mjolnir.utilities.spark.build_spark_command(test) + compare_fixture(expect_file, cmd) + + [email protected](*generate_fixtures('build_mjolnir_utility')) +def test_build_mjolnir_utility(test_file, expect_file): + with open(test_file, 'r') as f: + test = yaml.load(f) + cmd = mjolnir.utilities.spark.build_mjolnir_utility(test) + compare_fixture(expect_file, cmd) diff --git a/mjolnir/utilities/spark.py b/mjolnir/utilities/spark.py new file mode 100644 index 0000000..1ac7114 --- /dev/null +++ b/mjolnir/utilities/spark.py @@ -0,0 +1,553 @@ +""" +Configuration based runner for spark commands + +Running spark commands, and our own spark pipelines, have an +amazing number of arguments to setup. Delegate those arguments +into a config file that varies by command and by wiki. + +The high level design here is to have a configuration file that +defines how to run various spark commands, and a process of templating +and configuration merging to define how to run those commands. The +basic structure of the configuration file is: + + global: + <config> + commands: + bar: + <config> + profiles: + foo: + wikis: ["bazwiki", "bangwiki"] + <config> + commands: + bar: + <config> + +Configuration at the global level, profile level, and command level are +merged with precedence given to more specific items. Most notably this +means that global per-command <config> overrides top-level profile config. + +This final configuration is then used to define how to call spark. Most of +the complexity has been pushed to the configuration file where it's hopefully +easier to deal with. Only time will tell. +""" + +from __future__ import absolute_import +import argparse +import datetime +import os +import pprint +import subprocess +import sys +import yaml + + +def dict_merge(base, override): + """Recursively merge two dictionaries + + Parameters + ---------- + base : dict + Base dictionary for merge + override : dict + dictionary to override values from base with + + Returns + ------- + dict + Merged dictionary of base and overrides + """ + # recursively merges dictionaries a and b, using b + # as the overrides where applicable + merged = base.copy() + for k, v in override.items(): + if k in merged: + if isinstance(merged[k], dict) and isinstance(v, dict): + merged[k] = dict_merge(merged[k], v) + elif isinstance(merged[k], set) and isinstance(v, set): + merged[k] = merged[k].union(v) + else: + merged[k] = v + else: + merged[k] = v + return merged + + +def build_template_vars(template_vars, environment, marker): + """Build the final template vars used in templating + + Parameters + ---------- + template_vars : dict + configuration specified to be the base template variables. These + take preference over any other way to specify template variables. + environment: dict + configuration specified to be part of the environment. Merged + into output, but with preference to template_vars argument + from above. By merging we allows performing shell-like substitution + of, for example, HOME. + marker : str + A unique marker for this run. Use isn't hard coded, but generally + used in paths to have unique input/output directories. + Returns + ------- + dict + """ + template_var_defaults = { + 'marker': marker, + # TODO: This is wrong when running via the virtualenv? + 'mjolnir_dir': os.path.dirname(os.path.dirname(os.path.dirname(os.path.realpath(__file__)))), + } + + # Merge environment into template vars giving precedent to template + template_vars = dict_merge(environment, template_vars) + # Merge in templating defaults + template_vars = dict_merge(template_var_defaults, template_vars) + # Everything must be strings to pass into subprocess.check_call: + template_vars = {k: str(v) for k, v in template_vars.items()} + + # String replacement inside template_vars itself. This is a while + # loop because a string may be replaced with another string that + # contains unresolved templating vars. + i = 0 + while any(['%(' in value for value in template_vars.values()]): + template_vars = {k: v % template_vars for k, v in template_vars.items()} + i += 1 + if i > 20: + print template_vars + raise Exception('Stuck in a loop') + + return template_vars + + +def apply_templating_recursive(data, template_vars): + if isinstance(data, dict): + return {k: apply_templating_recursive(v, template_vars) for k, v in data.items()} + elif isinstance(data, list): + return [apply_templating_recursive(v, template_vars) for v in data] + elif isinstance(data, set): + return set([apply_templating_recursive(v, template_vars) for v in data]) + else: + try: + return str(data) % template_vars + except ValueError: + raise ValueError('Failed formatting: %s' % (str(data))) + + +def load_config(stream, marker, template_overrides): + """Load a configuration file and transform it into our final configuration + + Mostly applies a series of merges so command definitions override profile + definitions which override global definitions. Then applies template variables + in the form of %(foo)s using basic python string formatting. + + Parameters + ---------- + stream : ??? + A file-like stream to read the configuration from + marker : str + A unique marker for this run. Use isn't hard coded, but generally + used in paths to have unique input/output directories. + template_overrides: + Overrides applied to all templating variables. Generally these + come from the command line. + + Returns + ------- + global_profile : dict + Top-level profile containing all globally configured items + profiles : dict + Named profiles. The values in this dict are all the same shape + as global_profile above but specialized to a particular set of + wikis. + """ + config = yaml.load(stream) + + global_profile = config['global'].copy() + # Merge in specific pieces of global environment, preferring actual environment over config + global_profile['environment'] = dict_merge(global_profile.get('environment', {}), { + 'HOME': os.environ['HOME'], + 'USER': os.environ['USER'], + }) + + def build_profile(name, profile): + # Apply profile overrides to global config + profile = dict_merge(global_profile, profile) + # Template vars should not propagate deeper into the stack. + template_vars = dict_merge(profile.pop('template_vars', {}), template_overrides) + # create an explicit profile name var to reference + if 'profile_name' in template_vars: + raise Exception("profile_name defined externally") + template_vars['profile_name'] = name + # Take the commands and wikis. These will be returned, everything else will be merged + # into the commands + commands = profile.pop('commands', {}) + wikis = profile.pop('wikis', []) + + # Merge everything else(paths, environment, etc) into each command, with + # the commmand taking precedence, and apply templating. + for name, config in commands.items(): + merged = dict_merge(profile, config) + command_template_vars = build_template_vars( + dict_merge(template_vars, merged.pop('template_vars', {})), + merged['environment'], marker) + commands[name] = apply_templating_recursive(merged, command_template_vars) + + return { + "wikis": wikis, + "commands": commands, + } + + profiles = {name: build_profile(name, profile) for name, profile in config['profiles'].items()} + global_profile = build_profile('global', global_profile) + + return global_profile, profiles + + +def validate_profiles(config): + """Validate each profile has some minimum amount of information required""" + for name, profile in config.items(): + # Assert a minimum viable level of per-group configuration + # TODO: Would be nice if somehow we could also verify there aren't + # extra unused properties that the user thinks will do something but don't + validate_config_level(name + '.', profile, { + 'wikis': [], + 'commands': { + 'pyspark': ['spark_command'], + 'data_pipeline': ['spark_command', 'mjolnir_utility_path', 'mjolnir_utility'], + 'training_pipeline': { + # Using an empty array requires the key exists, even if it doesn't + # contain sub-properties. + 'spark_command': [], + 'mjolnir_utility_path': [], + 'mjolnir_utility': [], + 'spark_conf': [ + 'spark.dynamicAllocation.maxExecutors', + 'spark.yarn.executor.memoryOverhead', + 'spark.task.cpus' + ], + 'spark_args': ['executor-memory', 'executor-cores'], + 'cmd_args': ['workers', 'cv-jobs', 'folds', 'final-trees'] + } + } + }) + + train = profile['commands']['training_pipeline'] + if train['spark_conf']['spark.task.cpus'] != train['spark_args']['executor-cores']: + raise Exception('Expected spark.task.cpus to equal executor-cores in group %s' % (name)) + + +def validate_config_level(prefix, config, requirements): + """Recursively validate a dict has expected keys""" + if type(requirements) is dict: + for key, sub_requirements in requirements.items(): + if key not in config: + raise Exception('Expected %s%s to exist in configuration file but have %s' + % (prefix, key, config.keys())) + validate_config_level('%s%s.' % (prefix, key), config[key], sub_requirements) + elif type(requirements) is list: + for key in requirements: + if key not in config: + raise Exception('Expected %s%s to exist in configuration file but have %s' + % (prefix, key, config)) + else: + raise Exception('Unexpected requirements type: %s' % (type(requirements))) + + +def check_defaults(profile, sub_commands): + """Check list of paths that should/not exist + + Parameters + ---------- + profile : dict + A single profile (either global or named) created by load_config + sub_commands : str + A list of commands to verify correctness of within the profile + + Returns + ------- + set of str + Set of errors found in the profile + """ + errors = set() + + def negate(func): + return lambda x: not func(x) + + functions = { + 'dir_exist': (negate(os.path.isdir), 'Missing Directory: %s'), + 'dir_not_exist': (os.path.isdir, 'Directory should not exist: %s'), + 'file_exist': (negate(os.path.isfile), 'Missing File: %s'), + 'file_not_exist': (os.path.isfile, 'File should not exist: 5s'), + } + + for k, v in functions.items(): + func, message = v + for command in sub_commands: + try: + for path in profile['commands'][command]['paths'][k]: + if func(path): + errors.add(message % path) + except KeyError: + pass + + return errors + + +def pretty_print_cli(args, env): + """Make a long command line barely readable when printed. + + Not suitable for copy/paste. Just look and understand. + """ + args = list(args) + output = [] + if env is not None: + output.append('%s \\' % (' '.join(["%s=%s" % (k, v) for k, v in env.items()]))) + if sum([len(x) for x in args]) < 80: + output.append(' '.join(args)) + else: + output.append('\t%s' % (args.pop(0))) + while args: + line = [args.pop(0)] + if args and line[0][:2] == '--': + line.append(args.pop(0)) + else: + while args and args[0][0] != '-' and sum([len(x) for x in line]) < 60: + line.append(args.pop(0)) + output[-1] += ' \\' + output.append('\t%s' % (' '.join(line))) + print '\n'.join(output) + + +def build_mjolnir_utility(config): + """Buiild arguments for calling mjolnir-utility.py + + Parameters + ---------- + config : dict + Configuration of the individual command to run + """ + args = [config['mjolnir_utility_path'], config['mjolnir_utility']] + + try: + # while sorting is not strictly necessary, dicts have non-deterministic + # sorts which make testing difficult + for k, v in sorted(config['cmd_args'].items(), key=lambda x: x[0]): + for item in (v if isinstance(v, (list, set)) else [v]): + args.append('--' + k) + args.append(str(item)) + except KeyError: + pass + + return args + + +def build_spark_command(config): + """Build a configuration based command line to run something in spark + + Puts together a command line for submitting spark jobs to the + cluster. Injects spark_conf and spark_args from the profile + for the specified command into the command line. + + Parameters + ---------- + config : dict + Command configuration to build spark arguments for + """ + args = [config['spark_command']] + try: + for k, v in sorted(config['spark_conf'].items(), key=lambda x: x[0]): + args.append('--conf') + args.append('%s=%s' % (k, str(v))) + except KeyError: + pass + + try: + for k, v in sorted(config['spark_args'].items(), key=lambda x: x[0]): + args.append('--' + k) + args.append(str(v)) + except KeyError: + pass + + return args + + +def subprocess_check_call(args, env=None): + """Helper function to only run commands if we are not using dry run""" + print "Running Command:" + pretty_print_cli(args, env=env) + if DRY_RUN: + return 0 + else: + retval = subprocess.check_call(args, env=env) + if retval is not 0: + raise Exception("Subprocess returned non-zero exit code: %d" % (retval)) + + +# Past here are the actual command definitions + +def collect(global_profile, profiles): + """Run mjolnir data pipeline""" + all_wikis = [wiki for group in profiles.values() for wiki in group['wikis']] + config = global_profile['commands']['data_pipeline'] + + cmd = build_spark_command(config) + build_mjolnir_utility(config) + all_wikis + subprocess_check_call(cmd, env=config['environment']) + + +def train(global_profile, profiles): + """Run mjolnir training pipeline""" + for name, profile in profiles.items(): + config = profile['commands']['training_pipeline'] + cmd = build_spark_command(config) + build_mjolnir_utility(config) + profile['wikis'] + subprocess_check_call(cmd, env=config['environment']) + + +def collect_and_train(global_profile, profiles): + """Run data and training pipelines""" + collect(global_profile, profiles) + train(global_profile, profiles) + + # Cleanup training data from hdfs + # TODO when ready + hdfs_training_data_path = global_profile['commands']['data_pipeline']['cmd_args']['output-dir'] + subprocess_check_call(['hdfs', 'dfs', '-rm', '-r', '-f', hdfs_training_data_path]) + + +def shell(command, global_profile, profiles): + """Start the pyspark shell""" + config = global_profile['commands'][command] + cmd = build_spark_command(config) + subprocess_check_call(cmd, env=config['environment']) + + +class KeyValueAction(argparse.Action): + "Allows specifying multiple k=v parameters on command line" + def __init__(self, *args, **kwargs): + kwargs['default'] = {} + kwargs['type'] = KeyValueAction.check + super(KeyValueAction, self).__init__(*args, **kwargs) + + def __call__(self, parser, namespace, values, option_string=None): + # Need to wrap in dict to get a copy so we don't change the provided default + current = dict(getattr(namespace, self.dest, {})) + k, v = values + current[k] = v + setattr(namespace, self.dest, current) + + @staticmethod + def check(value): + if '=' not in value: + raise argparse.ArgumentTypeError('%s is not a k=v string' % (value)) + return tuple(value.split('=', 2)) + + +def parse_arguments(argv, available_commands): + parser = argparse.ArgumentParser( + description='Run pre-configured spark-submit commands') + + parser.add_argument( + '-c', '--config', dest='config', type=str, required=True, + help='Path to yaml configuration file.') + parser.add_argument( + '-t', '--template-var', dest='template_vars', action=KeyValueAction, + metavar='var=override', help='Override template variables') + parser.add_argument( + '-m', '--marker', dest='marker', type=str, + default=datetime.date.today().strftime("%Y%m%d"), + help='Marker to tag training and output directories with. Defaults to ymd.') + parser.add_argument( + '-d', '--dry-run', dest='dry_run', action='store_true', + help='Print the commands that would be run, dont run them') + parser.add_argument( + '--debug', dest='debug', action='store_true', + help='Print the command definition and exit') + parser.set_defaults(dry_run=False) + parser.add_argument( + 'command', metavar='command', type=str, choices=set(available_commands), + help='Command to run: ' + ', '.join(available_commands)) + parser.add_argument( + 'wikis', metavar='wiki', type=str, nargs='*', default=[], + help='Limit wikis to this list') + + args = parser.parse_args(argv) + + return dict(vars(args)) + + +def main(argv=None): + commands = { + 'collect': { + 'func': collect, + 'needed': ['data_pipeline'], + }, + 'train': { + 'func': train, + 'needed': ['training_pipeline'], + }, + 'collect_and_train': { + 'func': collect_and_train, + 'needed': ['data_pipeline', 'training_pipeline'], + }, + 'shell': { + 'func': lambda x, y: shell('pyspark', x, y), + 'needed': [] + }, + 'shell_train': { + 'func': lambda x, y: shell('pyspark_train', x, y), + 'needed': [] + } + } + # We could probably auto-generate better help from the __doc__ strings of + # functions for each command, but save that for another day + args = parse_arguments(argv, commands.keys()) + + # Easiest to just make this global instead of passing around + global DRY_RUN + DRY_RUN = args['dry_run'] + # Merge global config with training group config + with open(args['config'], 'r') as f: + global_profile, profiles = load_config(f, args['marker'], args['template_vars']) + + # Filter to selected wikis + if args['wikis']: + wikis = set(args['wikis']) + for name, group in profiles.items(): + group['wikis'] = [wiki for wiki in group['wikis'] if wiki in wikis] + # Filter groups with no defined wikis + profiles = {name: group for name, group in profiles.items() if group['wikis']} + + if args['debug']: + print "Global Config: " + pprint.pprint(global_profile) + print "\n\nProfiles:" + pprint.pprint(profiles) + else: + command = commands[args['command']] + + # This is necessary because the path to the virtualenv needs to be the same + # locally as it is on the remote executors. This places the venv at ./venv. + # probably something less hard coded could be done but this works for now. + # This needs to occur before validation so checks against PYSPARK_PYTHON + # can verify the correct availability. + # TODO: This may not be right, and isn't overridable ... + mjolnir_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.realpath(__file__)))) + os.chdir(mjolnir_dir) + + # Early-exit if there are configuration problems + validate_profiles(profiles) + errors = check_defaults(global_profile, command['needed']) + for name, profile in profiles.items(): + errors = errors.union(check_defaults(profile, command['needed'])) + if errors: + for error in errors: + print error + if not DRY_RUN: + sys.exit(1) + + # Finally do the thing + command['func'](global_profile, profiles) + + +if __name__ == "__main__": + main() diff --git a/mjolnir/utilities/upload.py b/mjolnir/utilities/upload.py new file mode 100644 index 0000000..87b58ea --- /dev/null +++ b/mjolnir/utilities/upload.py @@ -0,0 +1,93 @@ +from __future__ import absolute_import +import argparse +import glob +import json +import os +import pprint +import re +import requests + + +def upload_models(input_dir, wikis): + fnames = glob.glob('%s/model_*.json' % (input_dir)) + wikis = set(wikis) + for fname in fnames: + print fname + match_wiki = re.search('model_([^.]+).json', os.path.basename(fname)) + if match_wiki is None: + print '\tFailed to parse wiki from %s.' % (os.path.basename(fname)) + print '\tSkipping.' + print '' + continue + + wiki = match_wiki.group(1) + if wikis and wiki not in wikis: + continue + # Take the portion of the directory name before the first _ as portion of model name + name = os.path.dirname(fnames[0]).split('_')[0] + # TODO: v1? + model_name = '%s_%s_v1' % (name, wiki) + + answer = raw_input('\tUpload model %s? ' % (model_name)).lower() + if answer != 'y': + print '\tSkipping.' + print '' + continue + + with open(fnames[0], 'r') as f: + model = json.loads(f.read()) + + req = { + "name": model_name, + "model": { + "type": "model/xgboost+json", + "definition": model + }, + "validation": { + "index": wiki, + # TODO: hard coded params.. + "params": { + "query_string": "example query string" + } + } + + } + + # TODO: Hardcoded cluster hosts + for host in ['elastic2020.codfw.wmnet', 'elastic1020.eqiad.wmnet']: + print '\tUploading to %s' % (host) + res = requests.post('http://%s:9200/_ltr/_featureset/enwiki_v1/_createmodel' % (host), + data=json.dumps(req), headers={'Content-Type': 'application/json'}) + + try: + parsed = res.json() + if 'result' in parsed and parsed['result'] == 'created': + print 'Created on %s!' % (host) + else: + pprint.pprint(parsed) + break + except ValueError: + print res.text + break + + +def parse_arguments(argv): + parser = argparse.ArgumentParser(description='Upload models to elasticsearch') + parser.add_argument( + '-i', '--input', dest='input_dir', required=True, + help='Input directory generated by training_pipeline to upload models from') + parser.add_argument( + 'wikis', metavar='wikis', type=str, nargs='*', + help='Wikis to upload models for. Empty for all models in input directory') + args = parser.parse_args(argv) + return dict(vars(args)) + + +def main(argv=None): + args = parse_arguments(argv) + # ??? + upload_models(*args) + + +if __name__ == "__main__": + main() -- To view, visit https://gerrit.wikimedia.org/r/381250 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: merged Gerrit-Change-Id: I19e02ac51b155eba7b9e296d203e03e7499a996d Gerrit-PatchSet: 15 Gerrit-Project: search/MjoLniR Gerrit-Branch: master Gerrit-Owner: EBernhardson <[email protected]> Gerrit-Reviewer: DCausse <[email protected]> Gerrit-Reviewer: EBernhardson <[email protected]> _______________________________________________ MediaWiki-commits mailing list [email protected] https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits
