Repository: spark Updated Branches: refs/heads/master 515910e9b -> 720c94fe7
[SPARK-21027][ML][PYTHON] Added tunable parallelism to one vs. rest in both Scala mllib and Pyspark # What changes were proposed in this pull request? Added tunable parallelism to the pyspark implementation of one vs. rest classification. Added a parallelism parameter to the Scala implementation of one vs. rest along with functionality for using the parameter to tune the level of parallelism. I take this PR #18281 over because the original author is busy but we need merge this PR soon. After this been merged, we can close #18281 . ## How was this patch tested? Test suite added. Author: Ajay Saini <ajays...@gmail.com> Author: WeichenXu <weichen...@databricks.com> Closes #19110 from WeichenXu123/spark-21027. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/720c94fe Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/720c94fe Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/720c94fe Branch: refs/heads/master Commit: 720c94fe774431a8a40215757ded3dae9f267c7f Parents: 515910e Author: Ajay Saini <ajays...@gmail.com> Authored: Tue Sep 12 10:02:27 2017 -0700 Committer: Joseph K. Bradley <jos...@databricks.com> Committed: Tue Sep 12 10:02:27 2017 -0700 ---------------------------------------------------------------------- docs/ml-guide.md | 18 ++++++++ .../spark/ml/classification/OneVsRest.scala | 45 ++++++++++++++------ .../ml/classification/OneVsRestSuite.scala | 42 +++++++++++++++++- python/pyspark/ml/classification.py | 25 ++++++----- .../pyspark/ml/param/_shared_params_code_gen.py | 4 +- python/pyspark/ml/param/shared.py | 24 +++++++++++ python/pyspark/ml/tests.py | 16 ++++++- 7 files changed, 146 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/720c94fe/docs/ml-guide.md ---------------------------------------------------------------------- diff --git a/docs/ml-guide.md b/docs/ml-guide.md index 7aec6a4..f6288e7 100644 --- a/docs/ml-guide.md +++ b/docs/ml-guide.md @@ -105,6 +105,24 @@ MLlib is under active development. The APIs marked `Experimental`/`DeveloperApi` may change in future releases, and the migration guide below will explain all changes between releases. +## From 2.2 to 2.3 + +### Breaking changes + +There are no breaking changes. + +### Deprecations and changes of behavior + +**Deprecations** + +There are no deprecations. + +**Changes of behavior** + +* [SPARK-21027](https://issues.apache.org/jira/browse/SPARK-21027): + We are now setting the default parallelism used in `OneVsRest` to be 1 (i.e. serial), in 2.2 and earlier version, + the `OneVsRest` parallelism would be parallelism of the default threadpool in scala. + ## From 2.1 to 2.2 ### Breaking changes http://git-wip-us.apache.org/repos/asf/spark/blob/720c94fe/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala index 05b8c3a..99bb234 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala @@ -17,10 +17,10 @@ package org.apache.spark.ml.classification -import java.util.{List => JList} import java.util.UUID -import scala.collection.JavaConverters._ +import scala.concurrent.Future +import scala.concurrent.duration.Duration import scala.language.existentials import org.apache.hadoop.fs.Path @@ -34,12 +34,13 @@ import org.apache.spark.ml._ import org.apache.spark.ml.attribute._ import org.apache.spark.ml.linalg.Vector import org.apache.spark.ml.param.{Param, ParamMap, ParamPair, Params} -import org.apache.spark.ml.param.shared.HasWeightCol +import org.apache.spark.ml.param.shared.{HasParallelism, HasWeightCol} import org.apache.spark.ml.util._ import org.apache.spark.sql.{DataFrame, Dataset, Row} import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ import org.apache.spark.storage.StorageLevel +import org.apache.spark.util.ThreadUtils private[ml] trait ClassifierTypeTrait { // scalastyle:off structural.type @@ -273,7 +274,7 @@ object OneVsRestModel extends MLReadable[OneVsRestModel] { @Since("1.4.0") final class OneVsRest @Since("1.4.0") ( @Since("1.4.0") override val uid: String) - extends Estimator[OneVsRestModel] with OneVsRestParams with MLWritable { + extends Estimator[OneVsRestModel] with OneVsRestParams with HasParallelism with MLWritable { @Since("1.4.0") def this() = this(Identifiable.randomUID("oneVsRest")) @@ -297,6 +298,16 @@ final class OneVsRest @Since("1.4.0") ( def setPredictionCol(value: String): this.type = set(predictionCol, value) /** + * The implementation of parallel one vs. rest runs the classification for + * each class in a separate threads. + * + * @group expertSetParam + */ + def setParallelism(value: Int): this.type = { + set(parallelism, value) + } + + /** * Sets the value of param [[weightCol]]. * * This is ignored if weight is not supported by [[classifier]]. @@ -318,7 +329,7 @@ final class OneVsRest @Since("1.4.0") ( transformSchema(dataset.schema) val instr = Instrumentation.create(this, dataset) - instr.logParams(labelCol, featuresCol, predictionCol) + instr.logParams(labelCol, featuresCol, predictionCol, parallelism) instr.logNamedValue("classifier", $(classifier).getClass.getCanonicalName) // determine number of classes either from metadata if provided, or via computation. @@ -352,8 +363,10 @@ final class OneVsRest @Since("1.4.0") ( multiclassLabeled.persist(StorageLevel.MEMORY_AND_DISK) } + val executionContext = getExecutionContext + // create k columns, one for each binary classifier. - val models = Range(0, numClasses).par.map { index => + val modelFutures = Range(0, numClasses).map { index => // generate new label metadata for the binary problem. val newLabelMeta = BinaryAttribute.defaultAttr.withName("label").toMetadata() val labelColName = "mc2b$" + index @@ -364,14 +377,18 @@ final class OneVsRest @Since("1.4.0") ( paramMap.put(classifier.labelCol -> labelColName) paramMap.put(classifier.featuresCol -> getFeaturesCol) paramMap.put(classifier.predictionCol -> getPredictionCol) - if (weightColIsUsed) { - val classifier_ = classifier.asInstanceOf[ClassifierType with HasWeightCol] - paramMap.put(classifier_.weightCol -> getWeightCol) - classifier_.fit(trainingDataset, paramMap) - } else { - classifier.fit(trainingDataset, paramMap) - } - }.toArray[ClassificationModel[_, _]] + Future { + if (weightColIsUsed) { + val classifier_ = classifier.asInstanceOf[ClassifierType with HasWeightCol] + paramMap.put(classifier_.weightCol -> getWeightCol) + classifier_.fit(trainingDataset, paramMap) + } else { + classifier.fit(trainingDataset, paramMap) + } + }(executionContext) + } + val models = modelFutures + .map(ThreadUtils.awaitResult(_, Duration.Inf)).toArray[ClassificationModel[_, _]] instr.logNumFeatures(models.head.numFeatures) if (handlePersistence) { http://git-wip-us.apache.org/repos/asf/spark/blob/720c94fe/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala index 17f8282..25bad59 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala @@ -25,12 +25,12 @@ import org.apache.spark.ml.feature.StringIndexer import org.apache.spark.ml.linalg.Vectors import org.apache.spark.ml.param.{ParamMap, ParamsSuite} import org.apache.spark.ml.util.{DefaultReadWriteTest, MetadataUtils, MLTestingUtils} +import org.apache.spark.ml.util.TestingUtils._ import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS import org.apache.spark.mllib.evaluation.MulticlassMetrics import org.apache.spark.mllib.linalg.{Vectors => OldVectors} import org.apache.spark.mllib.regression.{LabeledPoint => OldLabeledPoint} import org.apache.spark.mllib.util.MLlibTestSparkContext -import org.apache.spark.mllib.util.TestingUtils._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.Dataset import org.apache.spark.sql.functions._ @@ -98,7 +98,45 @@ class OneVsRestSuite extends SparkFunSuite with MLlibTestSparkContext with Defau // bound how much error we allow compared to multinomial logistic regression. val expectedMetrics = new MulticlassMetrics(results) val ovaMetrics = new MulticlassMetrics(ovaResults) - assert(expectedMetrics.confusionMatrix ~== ovaMetrics.confusionMatrix absTol 400) + assert(expectedMetrics.confusionMatrix.asML ~== ovaMetrics.confusionMatrix.asML absTol 400) + } + + test("one-vs-rest: tuning parallelism does not change output") { + val ovaPar1 = new OneVsRest() + .setClassifier(new LogisticRegression) + + val ovaModelPar1 = ovaPar1.fit(dataset) + + val transformedDatasetPar1 = ovaModelPar1.transform(dataset) + + val ovaResultsPar1 = transformedDatasetPar1.select("prediction", "label").rdd.map { + row => (row.getDouble(0), row.getDouble(1)) + } + + val ovaPar2 = new OneVsRest() + .setClassifier(new LogisticRegression) + .setParallelism(2) + + val ovaModelPar2 = ovaPar2.fit(dataset) + + val transformedDatasetPar2 = ovaModelPar2.transform(dataset) + + val ovaResultsPar2 = transformedDatasetPar2.select("prediction", "label").rdd.map { + row => (row.getDouble(0), row.getDouble(1)) + } + + val metricsPar1 = new MulticlassMetrics(ovaResultsPar1) + val metricsPar2 = new MulticlassMetrics(ovaResultsPar2) + assert(metricsPar1.confusionMatrix == metricsPar2.confusionMatrix) + + ovaModelPar1.models.zip(ovaModelPar2.models).foreach { + case (lrModel1: LogisticRegressionModel, lrModel2: LogisticRegressionModel) => + assert(lrModel1.coefficients ~== lrModel2.coefficients relTol 1E-3) + assert(lrModel1.intercept ~== lrModel2.intercept relTol 1E-3) + case other => + throw new AssertionError(s"Loaded OneVsRestModel expected model of type" + + s" LogisticRegressionModel but found ${other.getClass.getName}") + } } test("one-vs-rest: pass label metadata correctly during train") { http://git-wip-us.apache.org/repos/asf/spark/blob/720c94fe/python/pyspark/ml/classification.py ---------------------------------------------------------------------- diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index aa747f3..fbb9e7f 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -16,6 +16,7 @@ # import operator +from multiprocessing.pool import ThreadPool from pyspark import since, keyword_only from pyspark.ml import Estimator, Model @@ -1567,7 +1568,7 @@ class OneVsRestParams(HasFeaturesCol, HasLabelCol, HasWeightCol, HasPredictionCo @inherit_doc -class OneVsRest(Estimator, OneVsRestParams, JavaMLReadable, JavaMLWritable): +class OneVsRest(Estimator, OneVsRestParams, HasParallelism, JavaMLReadable, JavaMLWritable): """ .. note:: Experimental @@ -1612,22 +1613,23 @@ class OneVsRest(Estimator, OneVsRestParams, JavaMLReadable, JavaMLWritable): @keyword_only def __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", - classifier=None, weightCol=None): + classifier=None, weightCol=None, parallelism=1): """ __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ - classifier=None, weightCol=None) + classifier=None, weightCol=None, parallelism=1): """ super(OneVsRest, self).__init__() + self._setDefault(parallelism=1) kwargs = self._input_kwargs self._set(**kwargs) @keyword_only @since("2.0.0") - def setParams(self, featuresCol=None, labelCol=None, predictionCol=None, - classifier=None, weightCol=None): + def setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", + classifier=None, weightCol=None, parallelism=1): """ - setParams(self, featuresCol=None, labelCol=None, predictionCol=None, \ - classifier=None, weightCol=None): + setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ + classifier=None, weightCol=None, parallelism=1): Sets params for OneVsRest. """ kwargs = self._input_kwargs @@ -1674,8 +1676,9 @@ class OneVsRest(Estimator, OneVsRestParams, JavaMLReadable, JavaMLWritable): paramMap[classifier.weightCol] = weightCol return classifier.fit(trainingDataset, paramMap) - # TODO: Parallel training for all classes. - models = [trainSingleClass(i) for i in range(numClasses)] + pool = ThreadPool(processes=min(self.getParallelism(), numClasses)) + + models = pool.map(trainSingleClass, range(numClasses)) if handlePersistence: multiclassLabeled.unpersist() @@ -1709,8 +1712,9 @@ class OneVsRest(Estimator, OneVsRestParams, JavaMLReadable, JavaMLWritable): labelCol = java_stage.getLabelCol() predictionCol = java_stage.getPredictionCol() classifier = JavaParams._from_java(java_stage.getClassifier()) + parallelism = java_stage.getParallelism() py_stage = cls(featuresCol=featuresCol, labelCol=labelCol, predictionCol=predictionCol, - classifier=classifier) + classifier=classifier, parallelism=parallelism) py_stage._resetUid(java_stage.uid()) return py_stage @@ -1723,6 +1727,7 @@ class OneVsRest(Estimator, OneVsRestParams, JavaMLReadable, JavaMLWritable): _java_obj = JavaParams._new_java_obj("org.apache.spark.ml.classification.OneVsRest", self.uid) _java_obj.setClassifier(self.getClassifier()._to_java()) + _java_obj.setParallelism(self.getParallelism()) _java_obj.setFeaturesCol(self.getFeaturesCol()) _java_obj.setLabelCol(self.getLabelCol()) _java_obj.setPredictionCol(self.getPredictionCol()) http://git-wip-us.apache.org/repos/asf/spark/blob/720c94fe/python/pyspark/ml/param/_shared_params_code_gen.py ---------------------------------------------------------------------- diff --git a/python/pyspark/ml/param/_shared_params_code_gen.py b/python/pyspark/ml/param/_shared_params_code_gen.py index 51d49b5..130d1a0 100644 --- a/python/pyspark/ml/param/_shared_params_code_gen.py +++ b/python/pyspark/ml/param/_shared_params_code_gen.py @@ -152,7 +152,9 @@ if __name__ == "__main__": ("varianceCol", "column name for the biased sample variance of prediction.", None, "TypeConverters.toString"), ("aggregationDepth", "suggested depth for treeAggregate (>= 2).", "2", - "TypeConverters.toInt")] + "TypeConverters.toInt"), + ("parallelism", "the number of threads to use when running parallel algorithms (>= 1).", + "1", "TypeConverters.toInt")] code = [] for name, doc, defaultValueStr, typeConverter in shared: http://git-wip-us.apache.org/repos/asf/spark/blob/720c94fe/python/pyspark/ml/param/shared.py ---------------------------------------------------------------------- diff --git a/python/pyspark/ml/param/shared.py b/python/pyspark/ml/param/shared.py index 163a0e2..4041d9c 100644 --- a/python/pyspark/ml/param/shared.py +++ b/python/pyspark/ml/param/shared.py @@ -608,6 +608,30 @@ class HasAggregationDepth(Params): return self.getOrDefault(self.aggregationDepth) +class HasParallelism(Params): + """ + Mixin for param parallelism: the number of threads to use when running parallel algorithms (>= 1). + """ + + parallelism = Param(Params._dummy(), "parallelism", "the number of threads to use when running parallel algorithms (>= 1).", typeConverter=TypeConverters.toInt) + + def __init__(self): + super(HasParallelism, self).__init__() + self._setDefault(parallelism=1) + + def setParallelism(self, value): + """ + Sets the value of :py:attr:`parallelism`. + """ + return self._set(parallelism=value) + + def getParallelism(self): + """ + Gets the value of parallelism or its default value. + """ + return self.getOrDefault(self.parallelism) + + class DecisionTreeParams(Params): """ Mixin for Decision Tree parameters. http://git-wip-us.apache.org/repos/asf/spark/blob/720c94fe/python/pyspark/ml/tests.py ---------------------------------------------------------------------- diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index 15d6c76..c66cd76 100755 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -1548,11 +1548,25 @@ class OneVsRestTests(SparkSessionTestCase): (2.0, Vectors.dense(0.5, 0.5))], ["label", "features"]) lr = LogisticRegression(maxIter=5, regParam=0.01) - ovr = OneVsRest(classifier=lr) + ovr = OneVsRest(classifier=lr, parallelism=1) model = ovr.fit(df) output = model.transform(df) self.assertEqual(output.columns, ["label", "features", "prediction"]) + def test_parallelism_doesnt_change_output(self): + df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)), + (1.0, Vectors.sparse(2, [], [])), + (2.0, Vectors.dense(0.5, 0.5))], + ["label", "features"]) + ovrPar1 = OneVsRest(classifier=LogisticRegression(maxIter=5, regParam=.01), parallelism=1) + modelPar1 = ovrPar1.fit(df) + ovrPar2 = OneVsRest(classifier=LogisticRegression(maxIter=5, regParam=.01), parallelism=2) + modelPar2 = ovrPar2.fit(df) + for i, model in enumerate(modelPar1.models): + self.assertTrue(np.allclose(model.coefficients.toArray(), + modelPar2.models[i].coefficients.toArray(), atol=1E-4)) + self.assertTrue(np.allclose(model.intercept, modelPar2.models[i].intercept, atol=1E-4)) + def test_support_for_weightCol(self): df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8), 1.0), (1.0, Vectors.sparse(2, [], []), 1.0), --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org