Github user sethah commented on a diff in the pull request: https://github.com/apache/spark/pull/19904#discussion_r155710913 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala --- @@ -146,25 +147,18 @@ class CrossValidator @Since("1.2.0") (@Since("1.4.0") override val uid: String) val validationDataset = sparkSession.createDataFrame(validation, schema).cache() logDebug(s"Train split $splitIndex with multiple sets of parameters.") + val completeFitCount = new AtomicInteger(0) --- End diff -- My understanding of Scala futures may be off here, but this seems to change the behavior to me. Now, the unpersist operation will happen in one of the training threads, instead of asynchronously in its own thread. I'm not sure how much of an effect that will have. Why can't you just put all the logic in one map statement like below: ````scala // Fit models in a Future for training in parallel val modelFutures = epm.zipWithIndex.map { case (paramMap, paramIndex) => Future[Model[_]] { val model = est.fit(trainingDataset, paramMap).asInstanceOf[Model[_]] if (collectSubModelsParam) { subModels.get(splitIndex)(paramIndex) = model } // TODO: duplicate evaluator to take extra params from input val metric = eval.evaluate(model.transform(validationDataset, paramMap)) logDebug(s"Got metric $metric for model trained with $paramMap.") metric } (executionContext) } // Unpersist training data only when all models have trained Future.sequence[Model[_], Iterable](modelFutures)(implicitly, executionContext) .onComplete { _ => trainingDataset.unpersist() } (executionContext) ````
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org