[ https://issues.apache.org/jira/browse/SPARK-22126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16307147#comment-16307147 ]
Apache Spark commented on SPARK-22126: -------------------------------------- User 'BryanCutler' has created a pull request for this issue: https://github.com/apache/spark/pull/20124 > Fix model-specific optimization support for ML tuning > ----------------------------------------------------- > > Key: SPARK-22126 > URL: https://issues.apache.org/jira/browse/SPARK-22126 > Project: Spark > Issue Type: Improvement > Components: ML > Affects Versions: 2.3.0 > Reporter: Weichen Xu > > Fix model-specific optimization support for ML tuning. This is discussed in > SPARK-19357 > more discussion is here > https://gist.github.com/MrBago/f501b9e7712dc6a67dc9fea24e309bf0 > Anyone who's following might want to scan the design doc (in the links > above), the latest api proposal is: > {code} > def fitMultiple( > dataset: Dataset[_], > paramMaps: Array[ParamMap] > ): java.util.Iterator[scala.Tuple2[java.lang.Integer, Model]] > {code} > Old discussion: > I copy discussion from gist to here: > I propose to design API as: > {code} > def fitCallables(dataset: Dataset[_], paramMaps: Array[ParamMap]): > Array[Callable[Map[Int, M]]] > {code} > Let me use an example to explain the API: > {quote} > It could be possible to still use the current parallelism and still allow > for model-specific optimizations. For example, if we doing cross validation > and have a param map with regParam = (0.1, 0.3) and maxIter = (5, 10). Lets > say that the cross validator could know that maxIter is optimized for the > model being evaluated (e.g. a new method in Estimator that return such > params). It would then be straightforward for the cross validator to remove > maxIter from the param map that will be parallelized over and use it to > create 2 arrays of paramMaps: ((regParam=0.1, maxIter=5), (regParam=0.1, > maxIter=10)) and ((regParam=0.3, maxIter=5), (regParam=0.3, maxIter=10)). > {quote} > In this example, we can see that, models computed from ((regParam=0.1, > maxIter=5), (regParam=0.1, maxIter=10)) can only be computed in one thread > code, models computed from ((regParam=0.3, maxIter=5), (regParam=0.3, > maxIter=10)) in another thread. In this example, there're 4 paramMaps, but > we can at most generate two threads to compute the models for them. > The API above allow "callable.call()" to return multiple models, and return > type is {code}Map[Int, M]{code}, key is integer, used to mark the paramMap > index for corresponding model. Use the example above, there're 4 paramMaps, > but only return 2 callable objects, one callable object for ((regParam=0.1, > maxIter=5), (regParam=0.1, maxIter=10)), another one for ((regParam=0.3, > maxIter=5), (regParam=0.3, maxIter=10)). > and the default "fitCallables/fit with paramMaps" can be implemented as > following: > {code} > def fitCallables(dataset: Dataset[_], paramMaps: Array[ParamMap]): > Array[Callable[Map[Int, M]]] = { > paramMaps.zipWithIndex.map { case (paramMap: ParamMap, index: Int) => > new Callable[Map[Int, M]] { > override def call(): Map[Int, M] = Map(index -> fit(dataset, paramMap)) > } > } > } > def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): Seq[M] = { > fitCallables(dataset, paramMaps).map { _.call().toSeq } > .flatMap(_).sortBy(_._1).map(_._2) > } > {code} > If use the API I proposed above, the code in > [CrossValidation|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala#L149-L159] > can be changed to: > {code} > val trainingDataset = sparkSession.createDataFrame(training, > schema).cache() > val validationDataset = sparkSession.createDataFrame(validation, > schema).cache() > // Fit models in a Future for training in parallel > val modelMapFutures = fitCallables(trainingDataset, paramMaps).map { > callable => > Future[Map[Int, Model[_]]] { > val modelMap = callable.call() > if (collectSubModelsParam) { > ... > } > modelMap > } (executionContext) > } > // Unpersist training data only when all models have trained > Future.sequence[Model[_], Iterable](modelMapFutures)(implicitly, > executionContext) > .onComplete { _ => trainingDataset.unpersist() } (executionContext) > // Evaluate models in a Future that will calulate a metric and allow > model to be cleaned up > val foldMetricMapFutures = modelMapFutures.map { modelMapFuture => > modelMapFuture.map { modelMap => > modelMap.map { case (index: Int, model: Model[_]) => > val metric = eval.evaluate(model.transform(validationDataset, > paramMaps(index))) > (index, metric) > } > } (executionContext) > } > // Wait for metrics to be calculated before unpersisting validation > dataset > val foldMetrics = foldMetricMapFutures.map(ThreadUtils.awaitResult(_, > Duration.Inf)) > .map(_.toSeq).sortBy(_._1).map(_._2) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org