EBernhardson has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/350501 )

Change subject: Update spark requirement to 2.1.0
......................................................................

Update spark requirement to 2.1.0

To utilize xgboost4j-spark we will need to be using spark 2.1.0.
Updates existing code to be compatible.

Change-Id: Id610df4516b181de55e60e53dddd5f0c9a4dabaf
---
M bootstrap-vm.sh
M mjolnir/dbn.py
M mjolnir/sampling.py
D mjolnir/spark/ml.py
4 files changed, 24 insertions(+), 96 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/search/MjoLniR 
refs/changes/01/350501/1

diff --git a/bootstrap-vm.sh b/bootstrap-vm.sh
index dd8f202..de6422a 100644
--- a/bootstrap-vm.sh
+++ b/bootstrap-vm.sh
@@ -2,52 +2,43 @@
 
 set -e
 
-cat >/etc/apt/sources.list.d/cloudera.list <<EOD
-# Packages for Cloudera's Distribution for Hadoop, Version 5.10.0, on Ubuntu 
14.04 amd64
-deb [arch=amd64] http://archive.cloudera.com/cdh5/debian/jessie/amd64/cdh 
jessie-cdh5.10.0 contrib
-deb-src http://archive.cloudera.com/cdh5/debian/jessie/amd64/cdh 
jessie-cdh5.10.0 contrib
-EOD
-
-cat >/etc/apt/preferences.d/cloudera.pref <<EOD
-Package: *
-Pin: release o=Cloudera, l=Cloudera
-Pin-Priority: 501
-EOD
-
-wget -q https://archive.cloudera.com/cdh5/ubuntu/trusty/amd64/cdh/archive.key 
-O /root/cloudera-archive.key
-apt-key add /root/cloudera-archive.key
-
 apt-get update
 apt-get install -q -y -o Dpkg::Options::="--force-confdef" -o 
Dpkg::Options::="--force-confold" \
-    spark-python \
     openjdk-7-jre-headless \
     python-virtualenv
 
+cd /opt
+wget -qO - http://d4kbcqa49mib13.cloudfront.net/spark-2.1.0-bin-hadoop2.6.tgz 
| tar -zxvf
+ln -s /opt/spark-2.1.0-bin-hadoop2.6/bin/pyspark /usr/local/bin
 # findspark needs a SPARK_HOME to setup pyspark
 cat >/etc/profile.d/spark.sh <<EOD
-SPARK_HOME=/usr/lib/spark
+SPARK_HOME=/opt/spark-2.1.0-bin-hadoop2.6
 export SPARK_HOME
 EOD
 
 # pyspark wants to put a metastore_db directory in /vagrant, put it somewhere 
else
-cat >/etc/spark/conf/hive-site.xml <<EOD
+cat >/opt/spark-2.1.0-bin-hadoop2.6/conf/hive-site.xml <<EOD
 <configuration>
    <property>
       <name>hive.metastore.warehouse.dir</name>
       <value>/tmp/</value>
       <description>location of default database for the warehouse</description>
    </property>
+   <property>
+      <name>javax.jdo.option.ConnectionURL</name>
+      <value>jdbc:derby:;databaseName=/tmp/metastore_db;create=true</value>
+   </property>
 </configuration>
 EOD
 
 # pyspark wants to put a derby.log in /vagrant as well, put it elsewhere
-cat >> /etc/spark/conf/spark-defaults.conf <<EOD
+cat >> /opt/spark-2.1.0-bin-hadoop2.6/conf/spark-defaults.conf <<EOD
 spark.driver.extraJavaOptions=-Dderby.stream.error.file=/tmp/derby.log
 EOD
 
 if [ ! -d /vagrant/venv ]; then
     cd /vagrant
     mkdir venv
-    virtualenv venv
+    virtualenv -p /usr/bin/python2.7 venv
     venv/bin/pip install tox
 fi
diff --git a/mjolnir/dbn.py b/mjolnir/dbn.py
index 958f302..96b37ca 100644
--- a/mjolnir/dbn.py
+++ b/mjolnir/dbn.py
@@ -188,7 +188,7 @@
         # of grouping into python, but that could just as well end up worse?
         .repartition(num_partitions, 'wikiid', 'norm_query')
         # Run each partition through the DBN to generate relevance scores.
-        .mapPartitions(train_partition)
+        .rdd.mapPartitions(train_partition)
         # Convert the rdd of tuples back into a DataFrame so the fields all
         # have a name.
         .toDF(['wikiid', 'norm_query', 'hit_page_id', 'relevance']))
diff --git a/mjolnir/sampling.py b/mjolnir/sampling.py
index 8c3f2e2..4adb578 100644
--- a/mjolnir/sampling.py
+++ b/mjolnir/sampling.py
@@ -9,7 +9,6 @@
 
 import bisect
 import mjolnir.spark
-from mjolnir.spark.ml import QuantileDiscretizer
 import pyspark
 from pyspark.sql import functions as F
 from pyspark.sql.column import Column, _to_java_column
@@ -49,7 +48,7 @@
 def _calc_splits(df, num_buckets=100):
     """Calculate the right edge of num_session buckets
 
-    Utilizes QuantileDiscretizer to bucketize the source. Due to the available
+    Utilizes approxQuantile to bucketize the source. Due to the available
     implementation this has to be done per-wiki, so is perhaps slower than
     necessary. For current dataset sizes that isn't an issue.
 
@@ -72,13 +71,15 @@
         of queries per bucket.
     """
 
-    qdf = QuantileDiscretizer(numBuckets=num_buckets,
-                              inputCol='num_sessions',
-                              outputCol='bucket')
-    # QuantileDiscretizer.getSplits actually outputs num_buckets, plus -inf
-    # at the beginning and +inf at the end. We will bucketize with the
-    # bisect library so chop off the -inf as we only need inf on one end.
-    return qdf.fit(df).getSplits()[1:]
+    percentiles = [x/float(num_buckets) for x in range(1, num_buckets)]
+    # With 100 buckets, there will be buckets at .01, .02, etc. This specifies
+    # percentils .01 must be the value between .009 and .011
+    relative_error = 1. / (num_buckets * 10)
+    splits = df.approxQuantile('num_sessions', percentiles, relative_error)
+
+    # range(1, num_buckets) returned num_buckets-1 values. This final inf 
captures
+    # everything from the last bucket to the end.
+    return splits + [float('inf')]
 
 
 def _sample_queries(df, wikis, num_buckets=100, samples_desired=10000, 
seed=None):
@@ -152,7 +153,7 @@
         return ((row.wikiid, split), (row.wikiid, row.norm_query))
 
     return (
-        df
+        df.rdd
         # To use sampleByKey we need to convert to a PairRDD which has keys 
matching
         # those used in wiki_fractions.
         .map(to_pair_rdd)
@@ -231,7 +232,7 @@
         # Spark will eventually throw this away in an LRU fashion.
         .cache())
 
-    df_queries_sampled = _sample_queries(df_queries_unique, wikis, 
samples_desired=queries_per_wiki)
+    df_queries_sampled = _sample_queries(df_queries_unique, wikis, 
samples_desired=queries_per_wiki, seed=seed)
 
     # Select the rows chosen by sampling from the filtered df
     return df_filtered.join(df_queries_sampled, how='inner', on=['wikiid', 
'norm_query'])
diff --git a/mjolnir/spark/ml.py b/mjolnir/spark/ml.py
deleted file mode 100644
index 4dabd99..0000000
--- a/mjolnir/spark/ml.py
+++ /dev/null
@@ -1,64 +0,0 @@
-"""Backports of pyspark.ml.* from pyspark 2.x to 1.6.0"""
-
-from pyspark.ml.feature import Bucketizer
-from pyspark.ml.param.shared import Param, HasInputCol, HasOutputCol
-from pyspark.ml.util import keyword_only
-from pyspark.ml.wrapper import JavaEstimator
-
-
-class QuantileDiscretizer(JavaEstimator, HasInputCol, HasOutputCol):
-    """
-    .. note:: Experimental. Backported from 2.x
-
-    `QuantileDiscretizer` takes a column with continuous features and outputs 
a column with binned
-    categorical features. The number of bins can be set using the 
:py:attr:`numBuckets` parameter.
-    The bin ranges are chosen using an approximate algorithm (see the scala 
documentation for a
-    detailed description).
-    """
-
-    @keyword_only
-    def __init__(self, numBuckets=2, inputCol=None, outputCol=None, 
relativeError=0.001,
-                 handleInvalid="error"):
-        """
-        __init__(self, numBuckets=2, inputCol=None, outputCol=None, 
relativeError=0.001, \
-                 handleInvalid="error")
-        """
-        super(QuantileDiscretizer, self).__init__()
-        self._java_obj = 
self._new_java_obj("org.apache.spark.ml.feature.QuantileDiscretizer",
-                                            self.uid)
-        self.numBuckets = Param(
-            self, "numBuckets", "Maximum number of buckets (quantiles, or " +
-            "categories) into which data points are grouped. Must be >= 2.")
-        self._setDefault(numBuckets=2)
-        kwargs = self.__init__._input_kwargs
-        self.setParams(**kwargs)
-
-    @keyword_only
-    def setParams(self, numBuckets=2, inputCol=None, outputCol=None):
-        """
-        setParams(self, numBuckets=2, inputCol=None, outputCol=None, 
relativeError=0.001, \
-                  handleInvalid="error")
-        Set the params for the QuantileDiscretizer
-        """
-        kwargs = self.setParams._input_kwargs
-        return self._set(**kwargs)
-
-    def setNumBuckets(self, value):
-        """
-        Sets the value of :py:attr:`numBuckets`.
-        """
-        return self._set(numBuckets=value)
-
-    def getNumBuckets(self):
-        """
-        Gets the value of numBuckets or its default value.
-        """
-        return self.getOrDefault(self.numBuckets)
-
-    def _create_model(self, java_model):
-        """
-        Private method to convert the java_model to a Python model.
-        """
-        return Bucketizer(splits=list(java_model.getSplits()),
-                          inputCol=self.getInputCol(),
-                          outputCol=self.getOutputCol())

-- 
To view, visit https://gerrit.wikimedia.org/r/350501
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Id610df4516b181de55e60e53dddd5f0c9a4dabaf
Gerrit-PatchSet: 1
Gerrit-Project: search/MjoLniR
Gerrit-Branch: master
Gerrit-Owner: EBernhardson <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to