EBernhardson has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/350332 )

Change subject: [WIP] train/evaluate models with xgboost4j-spark
......................................................................

[WIP] train/evaluate models with xgboost4j-spark

This is far from perfect, it rather avoids the integration of
xgboost4j-spark with spark.ml, as that doesn't (currently) support
ranking tasks. A PR is being worked on for spark 2.3 but it still
needs some work.

This accomplishes the following:
* Convert a dataframe resulting from feature collection into format
  expected by xgboost
* Train a model with arbitrary parameters and return the model
* Evaluate the model against a dataframe using NDCG@k

TODO:
* Dump the resulting model, parse into a tree and offer it for export
* Setup cross-validation
* Setup test/train splits

Not doable (yet):
* xgboost4j-spark doesn't support weights on rows. This is supported
  in xgboost4j, so it might be possible to work up an upstream patch
  if someone feels like diving into scala.

Change-Id: Ia749b85e1626c3aaa3b1c006d06d2d0720443d8f
---
M bootstrap-vm.sh
A mjolnir/training/xgboost.py
2 files changed, 207 insertions(+), 1 deletion(-)


  git pull ssh://gerrit.wikimedia.org:29418/search/MjoLniR 
refs/changes/32/350332/1

diff --git a/bootstrap-vm.sh b/bootstrap-vm.sh
index dd8f202..6453983 100644
--- a/bootstrap-vm.sh
+++ b/bootstrap-vm.sh
@@ -21,7 +21,12 @@
 apt-get install -q -y -o Dpkg::Options::="--force-confdef" -o 
Dpkg::Options::="--force-confold" \
     spark-python \
     openjdk-7-jre-headless \
-    python-virtualenv
+    python-virtualenv \
+    git-core \
+    build-essential \
+    libopenblas-dev \
+    gfortran
+
 
 # findspark needs a SPARK_HOME to setup pyspark
 cat >/etc/profile.d/spark.sh <<EOD
@@ -51,3 +56,14 @@
     virtualenv venv
     venv/bin/pip install tox
 fi
+
+# Grab and compile xgboost. Using git because there are multiple
+# dependenies to grab and this is just easier
+git clone https://github.com/dmlc/xgboost.git /tmp/xgboost
+cd /tmp/xgboost
+git checkout v0.60
+git submodule update --init --recursive
+cd jvm-packages
+mvn package
+# Copy the library into our virtualenv
+cp -rf xgboost4j-spark/target/xgboost4j-spark-0.7-jar-with-dependencies.jar 
/vagrant
diff --git a/mjolnir/training/xgboost.py b/mjolnir/training/xgboost.py
new file mode 100644
index 0000000..f17dc8e
--- /dev/null
+++ b/mjolnir/training/xgboost.py
@@ -0,0 +1,190 @@
+from functools import reduce
+import math
+from pyspark.ml.linalg import Vectors
+from pyspark.sql.column import _to_seq
+from pyspark.sql import DataFrame, Window
+from pyspark.sql import functions as F
+
+# Command line:
+# PYSPARK_PYTHON=/usr/bin/python3.4 SPARK_CONF_DIR=/etc/spark/conf bin/pyspark 
 --jars ~/xgboost4j-spark-0.7-jar-with-dependencies.jar  --driver-class-path 
~/xgboost4j-spark-0.7-jar-with-dependencies.jar --master yarn --files 
/usr/lib/libhdfs.so.0.0.0,xgboost.py --executor-cores 4 --num-executors 40 
--conf spark.task.cpus=4
+
+def _example(sqlContext, num_workers=40, num_rounds=40):
+    df = 
sqlContext.read.parquet('hdfs://analytics-hadoop/user/ebernhardson/mjolnir/features')
+    df_vec = cols_to_vector(df, num_workers).cache()
+    model = train(df_vec, num_rounds, num_workers, {'eta': 0.3, 'objective': 
'rank:pairwise'})
+    return model.evaluate(df_vec)
+
+
+def _calc_groups(index, rows):
+    """Calculate per-partition group information
+   
+    Emits the number of sequential rows with equal (wikiid, query) columns. The
+    partitions must be pre-sorted for this to result in valid xgboost group
+    data.
+
+    Parameters
+    ----------
+    index : int
+        Partition index
+    rows : iter
+        Rows in the partition, sorted by (wikiid, query)
+    
+    Yields
+    ------
+    index : int
+        Partition index
+    count : int
+        Number of sequential rows with same (wikiid, query) columns.
+    """
+    group = None
+    count = 0
+    for row in rows:
+        row_group = (row.wikiid, row.query)
+        if row_group == group:
+            count += 1
+        else:
+            group = row_group
+            if count > 0:
+                yield index, count
+            count = 1
+    yield index, count
+
+
+def cols_to_vector(df, num_partitions):
+    """Preprocessing for dataframes passed into xgboost
+
+    Prepares a dataframe for use in xgboost training or evaluation.
+
+    TODO: It seems like Vectors.dense should be handled by feature collection
+    phase, but then we lose the name of the features. It's possible to attach
+    metadata to a schema, but that is lost if the column is modified in any
+    way. Might be reasonable though?
+
+    Parameters
+    ----------
+    df : pyspark.sql.DataFrame
+    num_partitions : int
+
+    Returns:
+    pyspark.sql.DataFrame
+    """
+    features = [f for f in df.columns if f[:8] == 'feature_']
+    features_notnan = [~F.isnan(F.col(f)) for f in features]
+    nan_condition = reduce(lambda x, y: x & y, features_notnan)
+    return (
+        df
+        .where(nan_condition)
+        .rdd.repartition(num_partitions)
+        # TODO: Can we use DataFrame.repartition and DataFrame.sort? Or will 
that shuffle?
+        # Emit a pair with our partitioning key
+        .map(lambda r: (r.wikiid+r.query, (r.wikiid, r.query, r.relevance, 
Vectors.dense([r[f] for f in features]))))
+        .repartitionAndSortWithinPartitions()
+        # drop partitioning key
+        .map(lambda pair: pair[1])
+        .toDF(['wikiid', 'query', 'relevance', 'features']))
+
+def train(df, num_rounds, num_workers, params):
+    """Train a single xgboost model
+
+    
+    df : pyspark.sql.DataFrame
+        Training data
+    num_workers : int
+        number of workers to utilize
+    params : dict
+        parameters to pass on to xgboost
+
+    Returns
+    -------
+    XGBoostModel
+    """
+    df.cache()
+    # We must have the same number of partitions here as workers the model will
+    # be trained with, or xgboost4j-spark will repartition and make this data
+    # useless
+    assert num_workers == df.rdd.getNumPartitions()
+    groups = [[] for _ in range(df.rdd.getNumPartitions())]
+    for index, count in df.rdd.mapPartitionsWithIndex(_calc_groups).collect():
+        groups[index].append(count)
+    params['groupData'] = groups
+    return XGBoostModel.trainWithDataFrame(df, params, num_rounds, num_workers,
+                                           feature_col="features", 
label_col="relevance")
+
+class XGBoostModel(object):
+    def __init__(self, j_xgb_model):
+        self._j_xgb_model = j_xgb_model
+
+    @staticmethod
+    def trainWithDataFrame(trainingData, params, num_rounds, num_workers, 
objective = None,
+            evalMetric = None, use_external_memory = False, missing = 0.,
+            feature_col = "features", label_col = "label"):
+        sc = trainingData._sc
+        # TODO: Is there a more efficient way to send this?
+        j_params = sc._jvm.scala.collection.immutable.HashMap()
+        for k, v in params.items():
+            if k == 'groupData':
+                v = _to_seq(sc, map(lambda group: _to_seq(sc, group), v))
+            j_params = j_params.updated(k, v)
+
+        j_xgb_model = 
sc._jvm.ml.dmlc.xgboost4j.scala.spark.XGBoost.trainWithDataFrame(
+            trainingData._jdf, j_params, nRounds, nWorkers,
+            obj, evalMetric, useExternalMemory, missing, feature_col,
+            label_col)
+        return XGBoostModel(j_xgb_model)
+
+    def transform(self, df_test):
+        jdf = self._j_xgb_model.transform(df_test._jdf)
+        return DataFrame(jdf, df_test.sql_ctx)
+
+    def evaluate(self, df_test, metric='ndcg', k=10):
+        df_pred = (
+            self.transform(df_test)
+            .withColumn('wiki_query', F.concat(F.col('wikiid'), 
F.col('query')))
+            .withColumn('label', (F.col('relevance') * 10).cast('int'))
+            .cache())
+
+        pred_col = F.col('prediction')
+        query_col = F.col('wiki_query')
+        label_col = F.col('label')
+
+        dcg_df = _dcg_for_df(df, query_col, label_col, pred_col)
+        idcg_df = _dcg_for_df(df_pred, query_col, label_col, None)
+
+        return (
+            pred_dcg.join(ideal_dcg, ['wiki_query'], how='inner')
+            .select((F.col('dcg') / F.col('idcg')).alias('ndcg'))
+            .select(F.mean('ndcg').alias('mean_ndcg'))
+            .collect()[0].mean_ndcg)
+
+
+def _dcg_for_df(df, k, query_col, label_col, order_col):
+    if order_col is None:
+        df = df.select(label_col, query_col)
+        order_col = label_col
+    else:
+        df = df.select(order_col, label_col, query_col)
+    w = Window.partitionBy(query_col).orderBy(order_col.desc())
+    return (
+        df.withColumn('rn', F.row_number().over(w))
+        .where(F.col('rn') <= k)
+        .groupBy(query_col)
+        .agg(F.collect_list(label_col).alias('topAtK'))
+        .select(query_col, _dcg_for_row(F.col('topAtK')).alias('dcg')))
+
+
+@F.udf
+def _dcg_for_row(topAtK):
+    """Calculated DCG for the provided labels
+
+    Parameters
+    ----------
+    topAtK : list of int
+
+    Returns
+    -------
+    int
+    """
+    dcg = 0
+    for i, rel in enumerate(topAtK):
+        dcg += (2**rel - 1) / math.log(i + 2, 2)
+    return dcg

-- 
To view, visit https://gerrit.wikimedia.org/r/350332
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ia749b85e1626c3aaa3b1c006d06d2d0720443d8f
Gerrit-PatchSet: 1
Gerrit-Project: search/MjoLniR
Gerrit-Branch: master
Gerrit-Owner: EBernhardson <ebernhard...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to