EBernhardson has uploaded a new change for review. ( https://gerrit.wikimedia.org/r/350501 )
Change subject: Update spark requirement to 2.1.0 ...................................................................... Update spark requirement to 2.1.0 To utilize xgboost4j-spark we will need to be using spark 2.1.0. Updates existing code to be compatible. Change-Id: Id610df4516b181de55e60e53dddd5f0c9a4dabaf --- M bootstrap-vm.sh M mjolnir/dbn.py M mjolnir/sampling.py D mjolnir/spark/ml.py 4 files changed, 24 insertions(+), 96 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/search/MjoLniR refs/changes/01/350501/1 diff --git a/bootstrap-vm.sh b/bootstrap-vm.sh index dd8f202..de6422a 100644 --- a/bootstrap-vm.sh +++ b/bootstrap-vm.sh @@ -2,52 +2,43 @@ set -e -cat >/etc/apt/sources.list.d/cloudera.list <<EOD -# Packages for Cloudera's Distribution for Hadoop, Version 5.10.0, on Ubuntu 14.04 amd64 -deb [arch=amd64] http://archive.cloudera.com/cdh5/debian/jessie/amd64/cdh jessie-cdh5.10.0 contrib -deb-src http://archive.cloudera.com/cdh5/debian/jessie/amd64/cdh jessie-cdh5.10.0 contrib -EOD - -cat >/etc/apt/preferences.d/cloudera.pref <<EOD -Package: * -Pin: release o=Cloudera, l=Cloudera -Pin-Priority: 501 -EOD - -wget -q https://archive.cloudera.com/cdh5/ubuntu/trusty/amd64/cdh/archive.key -O /root/cloudera-archive.key -apt-key add /root/cloudera-archive.key - apt-get update apt-get install -q -y -o Dpkg::Options::="--force-confdef" -o Dpkg::Options::="--force-confold" \ - spark-python \ openjdk-7-jre-headless \ python-virtualenv +cd /opt +wget -qO - http://d4kbcqa49mib13.cloudfront.net/spark-2.1.0-bin-hadoop2.6.tgz | tar -zxvf +ln -s /opt/spark-2.1.0-bin-hadoop2.6/bin/pyspark /usr/local/bin # findspark needs a SPARK_HOME to setup pyspark cat >/etc/profile.d/spark.sh <<EOD -SPARK_HOME=/usr/lib/spark +SPARK_HOME=/opt/spark-2.1.0-bin-hadoop2.6 export SPARK_HOME EOD # pyspark wants to put a metastore_db directory in /vagrant, put it somewhere else -cat >/etc/spark/conf/hive-site.xml <<EOD +cat >/opt/spark-2.1.0-bin-hadoop2.6/conf/hive-site.xml <<EOD <configuration> <property> <name>hive.metastore.warehouse.dir</name> <value>/tmp/</value> <description>location of default database for the warehouse</description> </property> + <property> + <name>javax.jdo.option.ConnectionURL</name> + <value>jdbc:derby:;databaseName=/tmp/metastore_db;create=true</value> + </property> </configuration> EOD # pyspark wants to put a derby.log in /vagrant as well, put it elsewhere -cat >> /etc/spark/conf/spark-defaults.conf <<EOD +cat >> /opt/spark-2.1.0-bin-hadoop2.6/conf/spark-defaults.conf <<EOD spark.driver.extraJavaOptions=-Dderby.stream.error.file=/tmp/derby.log EOD if [ ! -d /vagrant/venv ]; then cd /vagrant mkdir venv - virtualenv venv + virtualenv -p /usr/bin/python2.7 venv venv/bin/pip install tox fi diff --git a/mjolnir/dbn.py b/mjolnir/dbn.py index 958f302..96b37ca 100644 --- a/mjolnir/dbn.py +++ b/mjolnir/dbn.py @@ -188,7 +188,7 @@ # of grouping into python, but that could just as well end up worse? .repartition(num_partitions, 'wikiid', 'norm_query') # Run each partition through the DBN to generate relevance scores. - .mapPartitions(train_partition) + .rdd.mapPartitions(train_partition) # Convert the rdd of tuples back into a DataFrame so the fields all # have a name. .toDF(['wikiid', 'norm_query', 'hit_page_id', 'relevance'])) diff --git a/mjolnir/sampling.py b/mjolnir/sampling.py index 8c3f2e2..4adb578 100644 --- a/mjolnir/sampling.py +++ b/mjolnir/sampling.py @@ -9,7 +9,6 @@ import bisect import mjolnir.spark -from mjolnir.spark.ml import QuantileDiscretizer import pyspark from pyspark.sql import functions as F from pyspark.sql.column import Column, _to_java_column @@ -49,7 +48,7 @@ def _calc_splits(df, num_buckets=100): """Calculate the right edge of num_session buckets - Utilizes QuantileDiscretizer to bucketize the source. Due to the available + Utilizes approxQuantile to bucketize the source. Due to the available implementation this has to be done per-wiki, so is perhaps slower than necessary. For current dataset sizes that isn't an issue. @@ -72,13 +71,15 @@ of queries per bucket. """ - qdf = QuantileDiscretizer(numBuckets=num_buckets, - inputCol='num_sessions', - outputCol='bucket') - # QuantileDiscretizer.getSplits actually outputs num_buckets, plus -inf - # at the beginning and +inf at the end. We will bucketize with the - # bisect library so chop off the -inf as we only need inf on one end. - return qdf.fit(df).getSplits()[1:] + percentiles = [x/float(num_buckets) for x in range(1, num_buckets)] + # With 100 buckets, there will be buckets at .01, .02, etc. This specifies + # percentils .01 must be the value between .009 and .011 + relative_error = 1. / (num_buckets * 10) + splits = df.approxQuantile('num_sessions', percentiles, relative_error) + + # range(1, num_buckets) returned num_buckets-1 values. This final inf captures + # everything from the last bucket to the end. + return splits + [float('inf')] def _sample_queries(df, wikis, num_buckets=100, samples_desired=10000, seed=None): @@ -152,7 +153,7 @@ return ((row.wikiid, split), (row.wikiid, row.norm_query)) return ( - df + df.rdd # To use sampleByKey we need to convert to a PairRDD which has keys matching # those used in wiki_fractions. .map(to_pair_rdd) @@ -231,7 +232,7 @@ # Spark will eventually throw this away in an LRU fashion. .cache()) - df_queries_sampled = _sample_queries(df_queries_unique, wikis, samples_desired=queries_per_wiki) + df_queries_sampled = _sample_queries(df_queries_unique, wikis, samples_desired=queries_per_wiki, seed=seed) # Select the rows chosen by sampling from the filtered df return df_filtered.join(df_queries_sampled, how='inner', on=['wikiid', 'norm_query']) diff --git a/mjolnir/spark/ml.py b/mjolnir/spark/ml.py deleted file mode 100644 index 4dabd99..0000000 --- a/mjolnir/spark/ml.py +++ /dev/null @@ -1,64 +0,0 @@ -"""Backports of pyspark.ml.* from pyspark 2.x to 1.6.0""" - -from pyspark.ml.feature import Bucketizer -from pyspark.ml.param.shared import Param, HasInputCol, HasOutputCol -from pyspark.ml.util import keyword_only -from pyspark.ml.wrapper import JavaEstimator - - -class QuantileDiscretizer(JavaEstimator, HasInputCol, HasOutputCol): - """ - .. note:: Experimental. Backported from 2.x - - `QuantileDiscretizer` takes a column with continuous features and outputs a column with binned - categorical features. The number of bins can be set using the :py:attr:`numBuckets` parameter. - The bin ranges are chosen using an approximate algorithm (see the scala documentation for a - detailed description). - """ - - @keyword_only - def __init__(self, numBuckets=2, inputCol=None, outputCol=None, relativeError=0.001, - handleInvalid="error"): - """ - __init__(self, numBuckets=2, inputCol=None, outputCol=None, relativeError=0.001, \ - handleInvalid="error") - """ - super(QuantileDiscretizer, self).__init__() - self._java_obj = self._new_java_obj("org.apache.spark.ml.feature.QuantileDiscretizer", - self.uid) - self.numBuckets = Param( - self, "numBuckets", "Maximum number of buckets (quantiles, or " + - "categories) into which data points are grouped. Must be >= 2.") - self._setDefault(numBuckets=2) - kwargs = self.__init__._input_kwargs - self.setParams(**kwargs) - - @keyword_only - def setParams(self, numBuckets=2, inputCol=None, outputCol=None): - """ - setParams(self, numBuckets=2, inputCol=None, outputCol=None, relativeError=0.001, \ - handleInvalid="error") - Set the params for the QuantileDiscretizer - """ - kwargs = self.setParams._input_kwargs - return self._set(**kwargs) - - def setNumBuckets(self, value): - """ - Sets the value of :py:attr:`numBuckets`. - """ - return self._set(numBuckets=value) - - def getNumBuckets(self): - """ - Gets the value of numBuckets or its default value. - """ - return self.getOrDefault(self.numBuckets) - - def _create_model(self, java_model): - """ - Private method to convert the java_model to a Python model. - """ - return Bucketizer(splits=list(java_model.getSplits()), - inputCol=self.getInputCol(), - outputCol=self.getOutputCol()) -- To view, visit https://gerrit.wikimedia.org/r/350501 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Id610df4516b181de55e60e53dddd5f0c9a4dabaf Gerrit-PatchSet: 1 Gerrit-Project: search/MjoLniR Gerrit-Branch: master Gerrit-Owner: EBernhardson <[email protected]> _______________________________________________ MediaWiki-commits mailing list [email protected] https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits
