EBernhardson has uploaded a new change for review. ( https://gerrit.wikimedia.org/r/350332 )
Change subject: [WIP] train/evaluate models with xgboost4j-spark ...................................................................... [WIP] train/evaluate models with xgboost4j-spark This is far from perfect, it rather avoids the integration of xgboost4j-spark with spark.ml, as that doesn't (currently) support ranking tasks. A PR is being worked on for spark 2.3 but it still needs some work. This accomplishes the following: * Convert a dataframe resulting from feature collection into format expected by xgboost * Train a model with arbitrary parameters and return the model * Evaluate the model against a dataframe using NDCG@k TODO: * Dump the resulting model, parse into a tree and offer it for export * Setup cross-validation * Setup test/train splits Not doable (yet): * xgboost4j-spark doesn't support weights on rows. This is supported in xgboost4j, so it might be possible to work up an upstream patch if someone feels like diving into scala. Change-Id: Ia749b85e1626c3aaa3b1c006d06d2d0720443d8f --- M bootstrap-vm.sh A mjolnir/training/xgboost.py 2 files changed, 207 insertions(+), 1 deletion(-) git pull ssh://gerrit.wikimedia.org:29418/search/MjoLniR refs/changes/32/350332/1 diff --git a/bootstrap-vm.sh b/bootstrap-vm.sh index dd8f202..6453983 100644 --- a/bootstrap-vm.sh +++ b/bootstrap-vm.sh @@ -21,7 +21,12 @@ apt-get install -q -y -o Dpkg::Options::="--force-confdef" -o Dpkg::Options::="--force-confold" \ spark-python \ openjdk-7-jre-headless \ - python-virtualenv + python-virtualenv \ + git-core \ + build-essential \ + libopenblas-dev \ + gfortran + # findspark needs a SPARK_HOME to setup pyspark cat >/etc/profile.d/spark.sh <<EOD @@ -51,3 +56,14 @@ virtualenv venv venv/bin/pip install tox fi + +# Grab and compile xgboost. Using git because there are multiple +# dependenies to grab and this is just easier +git clone https://github.com/dmlc/xgboost.git /tmp/xgboost +cd /tmp/xgboost +git checkout v0.60 +git submodule update --init --recursive +cd jvm-packages +mvn package +# Copy the library into our virtualenv +cp -rf xgboost4j-spark/target/xgboost4j-spark-0.7-jar-with-dependencies.jar /vagrant diff --git a/mjolnir/training/xgboost.py b/mjolnir/training/xgboost.py new file mode 100644 index 0000000..f17dc8e --- /dev/null +++ b/mjolnir/training/xgboost.py @@ -0,0 +1,190 @@ +from functools import reduce +import math +from pyspark.ml.linalg import Vectors +from pyspark.sql.column import _to_seq +from pyspark.sql import DataFrame, Window +from pyspark.sql import functions as F + +# Command line: +# PYSPARK_PYTHON=/usr/bin/python3.4 SPARK_CONF_DIR=/etc/spark/conf bin/pyspark --jars ~/xgboost4j-spark-0.7-jar-with-dependencies.jar --driver-class-path ~/xgboost4j-spark-0.7-jar-with-dependencies.jar --master yarn --files /usr/lib/libhdfs.so.0.0.0,xgboost.py --executor-cores 4 --num-executors 40 --conf spark.task.cpus=4 + +def _example(sqlContext, num_workers=40, num_rounds=40): + df = sqlContext.read.parquet('hdfs://analytics-hadoop/user/ebernhardson/mjolnir/features') + df_vec = cols_to_vector(df, num_workers).cache() + model = train(df_vec, num_rounds, num_workers, {'eta': 0.3, 'objective': 'rank:pairwise'}) + return model.evaluate(df_vec) + + +def _calc_groups(index, rows): + """Calculate per-partition group information + + Emits the number of sequential rows with equal (wikiid, query) columns. The + partitions must be pre-sorted for this to result in valid xgboost group + data. + + Parameters + ---------- + index : int + Partition index + rows : iter + Rows in the partition, sorted by (wikiid, query) + + Yields + ------ + index : int + Partition index + count : int + Number of sequential rows with same (wikiid, query) columns. + """ + group = None + count = 0 + for row in rows: + row_group = (row.wikiid, row.query) + if row_group == group: + count += 1 + else: + group = row_group + if count > 0: + yield index, count + count = 1 + yield index, count + + +def cols_to_vector(df, num_partitions): + """Preprocessing for dataframes passed into xgboost + + Prepares a dataframe for use in xgboost training or evaluation. + + TODO: It seems like Vectors.dense should be handled by feature collection + phase, but then we lose the name of the features. It's possible to attach + metadata to a schema, but that is lost if the column is modified in any + way. Might be reasonable though? + + Parameters + ---------- + df : pyspark.sql.DataFrame + num_partitions : int + + Returns: + pyspark.sql.DataFrame + """ + features = [f for f in df.columns if f[:8] == 'feature_'] + features_notnan = [~F.isnan(F.col(f)) for f in features] + nan_condition = reduce(lambda x, y: x & y, features_notnan) + return ( + df + .where(nan_condition) + .rdd.repartition(num_partitions) + # TODO: Can we use DataFrame.repartition and DataFrame.sort? Or will that shuffle? + # Emit a pair with our partitioning key + .map(lambda r: (r.wikiid+r.query, (r.wikiid, r.query, r.relevance, Vectors.dense([r[f] for f in features])))) + .repartitionAndSortWithinPartitions() + # drop partitioning key + .map(lambda pair: pair[1]) + .toDF(['wikiid', 'query', 'relevance', 'features'])) + +def train(df, num_rounds, num_workers, params): + """Train a single xgboost model + + + df : pyspark.sql.DataFrame + Training data + num_workers : int + number of workers to utilize + params : dict + parameters to pass on to xgboost + + Returns + ------- + XGBoostModel + """ + df.cache() + # We must have the same number of partitions here as workers the model will + # be trained with, or xgboost4j-spark will repartition and make this data + # useless + assert num_workers == df.rdd.getNumPartitions() + groups = [[] for _ in range(df.rdd.getNumPartitions())] + for index, count in df.rdd.mapPartitionsWithIndex(_calc_groups).collect(): + groups[index].append(count) + params['groupData'] = groups + return XGBoostModel.trainWithDataFrame(df, params, num_rounds, num_workers, + feature_col="features", label_col="relevance") + +class XGBoostModel(object): + def __init__(self, j_xgb_model): + self._j_xgb_model = j_xgb_model + + @staticmethod + def trainWithDataFrame(trainingData, params, num_rounds, num_workers, objective = None, + evalMetric = None, use_external_memory = False, missing = 0., + feature_col = "features", label_col = "label"): + sc = trainingData._sc + # TODO: Is there a more efficient way to send this? + j_params = sc._jvm.scala.collection.immutable.HashMap() + for k, v in params.items(): + if k == 'groupData': + v = _to_seq(sc, map(lambda group: _to_seq(sc, group), v)) + j_params = j_params.updated(k, v) + + j_xgb_model = sc._jvm.ml.dmlc.xgboost4j.scala.spark.XGBoost.trainWithDataFrame( + trainingData._jdf, j_params, nRounds, nWorkers, + obj, evalMetric, useExternalMemory, missing, feature_col, + label_col) + return XGBoostModel(j_xgb_model) + + def transform(self, df_test): + jdf = self._j_xgb_model.transform(df_test._jdf) + return DataFrame(jdf, df_test.sql_ctx) + + def evaluate(self, df_test, metric='ndcg', k=10): + df_pred = ( + self.transform(df_test) + .withColumn('wiki_query', F.concat(F.col('wikiid'), F.col('query'))) + .withColumn('label', (F.col('relevance') * 10).cast('int')) + .cache()) + + pred_col = F.col('prediction') + query_col = F.col('wiki_query') + label_col = F.col('label') + + dcg_df = _dcg_for_df(df, query_col, label_col, pred_col) + idcg_df = _dcg_for_df(df_pred, query_col, label_col, None) + + return ( + pred_dcg.join(ideal_dcg, ['wiki_query'], how='inner') + .select((F.col('dcg') / F.col('idcg')).alias('ndcg')) + .select(F.mean('ndcg').alias('mean_ndcg')) + .collect()[0].mean_ndcg) + + +def _dcg_for_df(df, k, query_col, label_col, order_col): + if order_col is None: + df = df.select(label_col, query_col) + order_col = label_col + else: + df = df.select(order_col, label_col, query_col) + w = Window.partitionBy(query_col).orderBy(order_col.desc()) + return ( + df.withColumn('rn', F.row_number().over(w)) + .where(F.col('rn') <= k) + .groupBy(query_col) + .agg(F.collect_list(label_col).alias('topAtK')) + .select(query_col, _dcg_for_row(F.col('topAtK')).alias('dcg'))) + + +@F.udf +def _dcg_for_row(topAtK): + """Calculated DCG for the provided labels + + Parameters + ---------- + topAtK : list of int + + Returns + ------- + int + """ + dcg = 0 + for i, rel in enumerate(topAtK): + dcg += (2**rel - 1) / math.log(i + 2, 2) + return dcg -- To view, visit https://gerrit.wikimedia.org/r/350332 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Ia749b85e1626c3aaa3b1c006d06d2d0720443d8f Gerrit-PatchSet: 1 Gerrit-Project: search/MjoLniR Gerrit-Branch: master Gerrit-Owner: EBernhardson <ebernhard...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits