[jira] [Commented] (SPARK-22126) Fix model-specific optimization support for ML tuning

2018-01-05 Thread Bago Amirbekian (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313958#comment-16313958
 ] 

Bago Amirbekian commented on SPARK-22126:
-

> Do you think it's possible to put this kind of execution in fitMultiple and 
> allow CV to parallelize work for the stages?

Yes, absolutely. The iterator can maintain a queue of tasks. Each call to 
`next` will pick a task off the queue, optionally add more tasks to the queue 
and return a single model instance. Since models can be returned in any order, 
the tasks can be organized however is needed to optimally finish the work. If 
the queue is empty, `next` can simply wait for a previous task to finish and 
put more tasks on the queue. The iterator approach is very flexible.

> With my PR, I would do this by having the Pipeline estimator return all 
> params in getOptimizedParams.

The issue here is that when you call `fit(dataset, paramMaps)` you've now fixed 
the order that you want the models returned. For my purposes I don't see much 
of a difference between `Seq[(Int, Model)]` and `Iterator[(Integer, Mode)]`. 
The key difference for me between `fitMutliple(..., paramMaps): Lazy[(Int, 
Model)]` and `fit(..., paramMaps): Lazy[Model]` is the flexibility to produce 
the models in arbitrary order.

> Fix model-specific optimization support for ML tuning
> -
>
> Key: SPARK-22126
> URL: https://issues.apache.org/jira/browse/SPARK-22126
> Project: Spark
>  Issue Type: Improvement
>  Components: ML
>Affects Versions: 2.3.0
>Reporter: Weichen Xu
>
> Fix model-specific optimization support for ML tuning. This is discussed in 
> SPARK-19357
> more discussion is here
>  https://gist.github.com/MrBago/f501b9e7712dc6a67dc9fea24e309bf0
> Anyone who's following might want to scan the design doc (in the links 
> above), the latest api proposal is:
> {code}
> def fitMultiple(
> dataset: Dataset[_],
> paramMaps: Array[ParamMap]
>   ): java.util.Iterator[scala.Tuple2[java.lang.Integer, Model]]
> {code}
> Old discussion:
> I copy discussion from gist to here:
> I propose to design API as:
> {code}
> def fitCallables(dataset: Dataset[_], paramMaps: Array[ParamMap]): 
> Array[Callable[Map[Int, M]]]
> {code}
> Let me use an example to explain the API:
> {quote}
>  It could be possible to still use the current parallelism and still allow 
> for model-specific optimizations. For example, if we doing cross validation 
> and have a param map with regParam = (0.1, 0.3) and maxIter = (5, 10). Lets 
> say that the cross validator could know that maxIter is optimized for the 
> model being evaluated (e.g. a new method in Estimator that return such 
> params). It would then be straightforward for the cross validator to remove 
> maxIter from the param map that will be parallelized over and use it to 
> create 2 arrays of paramMaps: ((regParam=0.1, maxIter=5), (regParam=0.1, 
> maxIter=10)) and ((regParam=0.3, maxIter=5), (regParam=0.3, maxIter=10)).
> {quote}
> In this example, we can see that, models computed from ((regParam=0.1, 
> maxIter=5), (regParam=0.1, maxIter=10)) can only be computed in one thread 
> code, models computed from ((regParam=0.3, maxIter=5), (regParam=0.3, 
> maxIter=10))  in another thread. In this example, there're 4 paramMaps, but 
> we can at most generate two threads to compute the models for them.
> The API above allow "callable.call()" to return multiple models, and return 
> type is {code}Map[Int, M]{code}, key is integer, used to mark the paramMap 
> index for corresponding model. Use the example above, there're 4 paramMaps, 
> but only return 2 callable objects, one callable object for ((regParam=0.1, 
> maxIter=5), (regParam=0.1, maxIter=10)), another one for ((regParam=0.3, 
> maxIter=5), (regParam=0.3, maxIter=10)).
> and the default "fitCallables/fit with paramMaps" can be implemented as 
> following:
> {code}
> def fitCallables(dataset: Dataset[_], paramMaps: Array[ParamMap]):
> Array[Callable[Map[Int, M]]] = {
>   paramMaps.zipWithIndex.map { case (paramMap: ParamMap, index: Int) =>
> new Callable[Map[Int, M]] {
>   override def call(): Map[Int, M] = Map(index -> fit(dataset, paramMap))
> }
>   }
> }
> def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): Seq[M] = {
>fitCallables(dataset, paramMaps).map { _.call().toSeq }
>  .flatMap(_).sortBy(_._1).map(_._2)
> }
> {code}
> If use the API I proposed above, the code in 
> [CrossValidation|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala#L149-L159]
> can be changed to:
> {code}
>   val trainingDataset = sparkSession.createDataFrame(training, 
> schema).cache()
>   val validationDataset = sparkSession.createDataFrame(validation, 
> schema).cache()
>   /

[jira] [Commented] (SPARK-22126) Fix model-specific optimization support for ML tuning

2018-01-05 Thread Bryan Cutler (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313833#comment-16313833
 ] 

Bryan Cutler commented on SPARK-22126:
--

Hi [~bago.amirbekian], I was looking into similar pipeline optimizations in 
SPARK-19071.  My thinking for this is that to do a proper optimization on a 
pipeline, you really need to create a dependency graph for the different 
inputs/outputs of the stages.  With that you can determine an evaluation order 
and what can be run in parallel, but this is a lot more complicated than the 
current CV parallelism.  Do you think it's possible to put this kind of 
execution in {{fitMultiple}} and allow CV to parallelize work for the stages?

So I think regardless of using the current api or {{fitMultiple}}, the pipeline 
estimator will have to implement it's own form of parallelism.  With my PR, I 
would do this by having the {{Pipeline}} estimator return all params in 
{{getOptimizedParams}} except {{Pipeline.stages}}, so when CV calls this it 
will not do any parallelization (except if the user is fitting multiple 
pipelines) and hand over execution to the pipeline estimator so it can do all 
the above optimizations and return a lazy sequence of {{PipelineModel}}.  

> Fix model-specific optimization support for ML tuning
> -
>
> Key: SPARK-22126
> URL: https://issues.apache.org/jira/browse/SPARK-22126
> Project: Spark
>  Issue Type: Improvement
>  Components: ML
>Affects Versions: 2.3.0
>Reporter: Weichen Xu
>
> Fix model-specific optimization support for ML tuning. This is discussed in 
> SPARK-19357
> more discussion is here
>  https://gist.github.com/MrBago/f501b9e7712dc6a67dc9fea24e309bf0
> Anyone who's following might want to scan the design doc (in the links 
> above), the latest api proposal is:
> {code}
> def fitMultiple(
> dataset: Dataset[_],
> paramMaps: Array[ParamMap]
>   ): java.util.Iterator[scala.Tuple2[java.lang.Integer, Model]]
> {code}
> Old discussion:
> I copy discussion from gist to here:
> I propose to design API as:
> {code}
> def fitCallables(dataset: Dataset[_], paramMaps: Array[ParamMap]): 
> Array[Callable[Map[Int, M]]]
> {code}
> Let me use an example to explain the API:
> {quote}
>  It could be possible to still use the current parallelism and still allow 
> for model-specific optimizations. For example, if we doing cross validation 
> and have a param map with regParam = (0.1, 0.3) and maxIter = (5, 10). Lets 
> say that the cross validator could know that maxIter is optimized for the 
> model being evaluated (e.g. a new method in Estimator that return such 
> params). It would then be straightforward for the cross validator to remove 
> maxIter from the param map that will be parallelized over and use it to 
> create 2 arrays of paramMaps: ((regParam=0.1, maxIter=5), (regParam=0.1, 
> maxIter=10)) and ((regParam=0.3, maxIter=5), (regParam=0.3, maxIter=10)).
> {quote}
> In this example, we can see that, models computed from ((regParam=0.1, 
> maxIter=5), (regParam=0.1, maxIter=10)) can only be computed in one thread 
> code, models computed from ((regParam=0.3, maxIter=5), (regParam=0.3, 
> maxIter=10))  in another thread. In this example, there're 4 paramMaps, but 
> we can at most generate two threads to compute the models for them.
> The API above allow "callable.call()" to return multiple models, and return 
> type is {code}Map[Int, M]{code}, key is integer, used to mark the paramMap 
> index for corresponding model. Use the example above, there're 4 paramMaps, 
> but only return 2 callable objects, one callable object for ((regParam=0.1, 
> maxIter=5), (regParam=0.1, maxIter=10)), another one for ((regParam=0.3, 
> maxIter=5), (regParam=0.3, maxIter=10)).
> and the default "fitCallables/fit with paramMaps" can be implemented as 
> following:
> {code}
> def fitCallables(dataset: Dataset[_], paramMaps: Array[ParamMap]):
> Array[Callable[Map[Int, M]]] = {
>   paramMaps.zipWithIndex.map { case (paramMap: ParamMap, index: Int) =>
> new Callable[Map[Int, M]] {
>   override def call(): Map[Int, M] = Map(index -> fit(dataset, paramMap))
> }
>   }
> }
> def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): Seq[M] = {
>fitCallables(dataset, paramMaps).map { _.call().toSeq }
>  .flatMap(_).sortBy(_._1).map(_._2)
> }
> {code}
> If use the API I proposed above, the code in 
> [CrossValidation|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala#L149-L159]
> can be changed to:
> {code}
>   val trainingDataset = sparkSession.createDataFrame(training, 
> schema).cache()
>   val validationDataset = sparkSession.createDataFrame(validation, 
> schema).cache()
>   // Fit models in a Future for training in parallel
>

[jira] [Commented] (SPARK-22126) Fix model-specific optimization support for ML tuning

2018-01-04 Thread Bago Amirbekian (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16312264#comment-16312264
 ] 

Bago Amirbekian commented on SPARK-22126:
-

[~bryanc] thanks for taking the time to put together the PR and share thoughts. 
I like the idea of being able to preserve the existing APIs and not needing to 
add a new fitMultiple API but I'm concerned the existing APIs aren't quite 
flexible enough.

One of the use cases that motivated the {{ fitMultiple }} API was optimizing 
the Pipeline Estimator. The Pipeline Estimator seems like in important one to 
optimize because I believe it's required in order for CrossValidator to be able 
to exploit optimized implementations of the {{ fit }}/{{ fitMultiple }} methods 
of Pipeline stages.

The way one would optimize the Pipeline Estimator is to group the paramMaps 
into a tree structure where each level represents a stage with a param that can 
take multiple values. One would then traverse the tree in depth first order. 
Notice that in this case the params need not be estimator params, but could 
actually be transformer params as well since we can avoid applying expensive 
transformers multiple times.

With this approach all the params for a pipeline estimator after the top level 
of the tree are "optimizable" so simply being group on optimizable params isn't 
sufficient, we need to actually order the paramMaps to match the depth first 
traversal of the stages tree.

I'm still thinking through all this in my head so let me know if any of it is 
off base or not clear, but I think the advantage of the {{ fitMultiple }} 
approach gives us full flexibility in order to these kinds of optimizations.

> Fix model-specific optimization support for ML tuning
> -
>
> Key: SPARK-22126
> URL: https://issues.apache.org/jira/browse/SPARK-22126
> Project: Spark
>  Issue Type: Improvement
>  Components: ML
>Affects Versions: 2.3.0
>Reporter: Weichen Xu
>
> Fix model-specific optimization support for ML tuning. This is discussed in 
> SPARK-19357
> more discussion is here
>  https://gist.github.com/MrBago/f501b9e7712dc6a67dc9fea24e309bf0
> Anyone who's following might want to scan the design doc (in the links 
> above), the latest api proposal is:
> {code}
> def fitMultiple(
> dataset: Dataset[_],
> paramMaps: Array[ParamMap]
>   ): java.util.Iterator[scala.Tuple2[java.lang.Integer, Model]]
> {code}
> Old discussion:
> I copy discussion from gist to here:
> I propose to design API as:
> {code}
> def fitCallables(dataset: Dataset[_], paramMaps: Array[ParamMap]): 
> Array[Callable[Map[Int, M]]]
> {code}
> Let me use an example to explain the API:
> {quote}
>  It could be possible to still use the current parallelism and still allow 
> for model-specific optimizations. For example, if we doing cross validation 
> and have a param map with regParam = (0.1, 0.3) and maxIter = (5, 10). Lets 
> say that the cross validator could know that maxIter is optimized for the 
> model being evaluated (e.g. a new method in Estimator that return such 
> params). It would then be straightforward for the cross validator to remove 
> maxIter from the param map that will be parallelized over and use it to 
> create 2 arrays of paramMaps: ((regParam=0.1, maxIter=5), (regParam=0.1, 
> maxIter=10)) and ((regParam=0.3, maxIter=5), (regParam=0.3, maxIter=10)).
> {quote}
> In this example, we can see that, models computed from ((regParam=0.1, 
> maxIter=5), (regParam=0.1, maxIter=10)) can only be computed in one thread 
> code, models computed from ((regParam=0.3, maxIter=5), (regParam=0.3, 
> maxIter=10))  in another thread. In this example, there're 4 paramMaps, but 
> we can at most generate two threads to compute the models for them.
> The API above allow "callable.call()" to return multiple models, and return 
> type is {code}Map[Int, M]{code}, key is integer, used to mark the paramMap 
> index for corresponding model. Use the example above, there're 4 paramMaps, 
> but only return 2 callable objects, one callable object for ((regParam=0.1, 
> maxIter=5), (regParam=0.1, maxIter=10)), another one for ((regParam=0.3, 
> maxIter=5), (regParam=0.3, maxIter=10)).
> and the default "fitCallables/fit with paramMaps" can be implemented as 
> following:
> {code}
> def fitCallables(dataset: Dataset[_], paramMaps: Array[ParamMap]):
> Array[Callable[Map[Int, M]]] = {
>   paramMaps.zipWithIndex.map { case (paramMap: ParamMap, index: Int) =>
> new Callable[Map[Int, M]] {
>   override def call(): Map[Int, M] = Map(index -> fit(dataset, paramMap))
> }
>   }
> }
> def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): Seq[M] = {
>fitCallables(dataset, paramMaps).map { _.call().toSeq }
>  .flatMap(_).sortBy(_._1).map(_._2)
> }
> {code}
> If use the 

[jira] [Commented] (SPARK-22126) Fix model-specific optimization support for ML tuning

2018-01-02 Thread Bryan Cutler (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16308868#comment-16308868
 ] 

Bryan Cutler commented on SPARK-22126:
--

Thanks for taking a look [~josephkb]!  I believe it's possible to still use the 
current fit() API that returns a {{Seq[Model[_]]}} and avoid materializing all 
models in memory at once.  If an estimator has model-specific optimizations and 
is creating multiple models, it could return a lazy sequence (such as a SeqVew 
or Stream).  Then if the CrossValidator just converts the sequence of Models to 
an iterator, it will only hold a reference to the model currently being 
evaluated and previous models can be GC'd.  Does that sound like it would be 
worth a shot here?

As for the issue of parallelism in model-specific optimizations, it's true 
there might be some benefit in the Estimator being able to allow the 
CrossValidator to handle the parallelism under certain cases.  But until there 
are more examples of this to look at, it's hard to know if making a new API for 
it is worth that benefit.  I saw a reference to a Keras model in another JIRA, 
is that an example that could have a model-specific optimization while still 
allowing the CrossValidator to parallelize over it?

> Fix model-specific optimization support for ML tuning
> -
>
> Key: SPARK-22126
> URL: https://issues.apache.org/jira/browse/SPARK-22126
> Project: Spark
>  Issue Type: Improvement
>  Components: ML
>Affects Versions: 2.3.0
>Reporter: Weichen Xu
>
> Fix model-specific optimization support for ML tuning. This is discussed in 
> SPARK-19357
> more discussion is here
>  https://gist.github.com/MrBago/f501b9e7712dc6a67dc9fea24e309bf0
> Anyone who's following might want to scan the design doc (in the links 
> above), the latest api proposal is:
> {code}
> def fitMultiple(
> dataset: Dataset[_],
> paramMaps: Array[ParamMap]
>   ): java.util.Iterator[scala.Tuple2[java.lang.Integer, Model]]
> {code}
> Old discussion:
> I copy discussion from gist to here:
> I propose to design API as:
> {code}
> def fitCallables(dataset: Dataset[_], paramMaps: Array[ParamMap]): 
> Array[Callable[Map[Int, M]]]
> {code}
> Let me use an example to explain the API:
> {quote}
>  It could be possible to still use the current parallelism and still allow 
> for model-specific optimizations. For example, if we doing cross validation 
> and have a param map with regParam = (0.1, 0.3) and maxIter = (5, 10). Lets 
> say that the cross validator could know that maxIter is optimized for the 
> model being evaluated (e.g. a new method in Estimator that return such 
> params). It would then be straightforward for the cross validator to remove 
> maxIter from the param map that will be parallelized over and use it to 
> create 2 arrays of paramMaps: ((regParam=0.1, maxIter=5), (regParam=0.1, 
> maxIter=10)) and ((regParam=0.3, maxIter=5), (regParam=0.3, maxIter=10)).
> {quote}
> In this example, we can see that, models computed from ((regParam=0.1, 
> maxIter=5), (regParam=0.1, maxIter=10)) can only be computed in one thread 
> code, models computed from ((regParam=0.3, maxIter=5), (regParam=0.3, 
> maxIter=10))  in another thread. In this example, there're 4 paramMaps, but 
> we can at most generate two threads to compute the models for them.
> The API above allow "callable.call()" to return multiple models, and return 
> type is {code}Map[Int, M]{code}, key is integer, used to mark the paramMap 
> index for corresponding model. Use the example above, there're 4 paramMaps, 
> but only return 2 callable objects, one callable object for ((regParam=0.1, 
> maxIter=5), (regParam=0.1, maxIter=10)), another one for ((regParam=0.3, 
> maxIter=5), (regParam=0.3, maxIter=10)).
> and the default "fitCallables/fit with paramMaps" can be implemented as 
> following:
> {code}
> def fitCallables(dataset: Dataset[_], paramMaps: Array[ParamMap]):
> Array[Callable[Map[Int, M]]] = {
>   paramMaps.zipWithIndex.map { case (paramMap: ParamMap, index: Int) =>
> new Callable[Map[Int, M]] {
>   override def call(): Map[Int, M] = Map(index -> fit(dataset, paramMap))
> }
>   }
> }
> def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): Seq[M] = {
>fitCallables(dataset, paramMaps).map { _.call().toSeq }
>  .flatMap(_).sortBy(_._1).map(_._2)
> }
> {code}
> If use the API I proposed above, the code in 
> [CrossValidation|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala#L149-L159]
> can be changed to:
> {code}
>   val trainingDataset = sparkSession.createDataFrame(training, 
> schema).cache()
>   val validationDataset = sparkSession.createDataFrame(validation, 
> schema).cache()
>   // Fit models in a Future for traini

[jira] [Commented] (SPARK-22126) Fix model-specific optimization support for ML tuning

2017-12-31 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16307147#comment-16307147
 ] 

Apache Spark commented on SPARK-22126:
--

User 'BryanCutler' has created a pull request for this issue:
https://github.com/apache/spark/pull/20124

> Fix model-specific optimization support for ML tuning
> -
>
> Key: SPARK-22126
> URL: https://issues.apache.org/jira/browse/SPARK-22126
> Project: Spark
>  Issue Type: Improvement
>  Components: ML
>Affects Versions: 2.3.0
>Reporter: Weichen Xu
>
> Fix model-specific optimization support for ML tuning. This is discussed in 
> SPARK-19357
> more discussion is here
>  https://gist.github.com/MrBago/f501b9e7712dc6a67dc9fea24e309bf0
> Anyone who's following might want to scan the design doc (in the links 
> above), the latest api proposal is:
> {code}
> def fitMultiple(
> dataset: Dataset[_],
> paramMaps: Array[ParamMap]
>   ): java.util.Iterator[scala.Tuple2[java.lang.Integer, Model]]
> {code}
> Old discussion:
> I copy discussion from gist to here:
> I propose to design API as:
> {code}
> def fitCallables(dataset: Dataset[_], paramMaps: Array[ParamMap]): 
> Array[Callable[Map[Int, M]]]
> {code}
> Let me use an example to explain the API:
> {quote}
>  It could be possible to still use the current parallelism and still allow 
> for model-specific optimizations. For example, if we doing cross validation 
> and have a param map with regParam = (0.1, 0.3) and maxIter = (5, 10). Lets 
> say that the cross validator could know that maxIter is optimized for the 
> model being evaluated (e.g. a new method in Estimator that return such 
> params). It would then be straightforward for the cross validator to remove 
> maxIter from the param map that will be parallelized over and use it to 
> create 2 arrays of paramMaps: ((regParam=0.1, maxIter=5), (regParam=0.1, 
> maxIter=10)) and ((regParam=0.3, maxIter=5), (regParam=0.3, maxIter=10)).
> {quote}
> In this example, we can see that, models computed from ((regParam=0.1, 
> maxIter=5), (regParam=0.1, maxIter=10)) can only be computed in one thread 
> code, models computed from ((regParam=0.3, maxIter=5), (regParam=0.3, 
> maxIter=10))  in another thread. In this example, there're 4 paramMaps, but 
> we can at most generate two threads to compute the models for them.
> The API above allow "callable.call()" to return multiple models, and return 
> type is {code}Map[Int, M]{code}, key is integer, used to mark the paramMap 
> index for corresponding model. Use the example above, there're 4 paramMaps, 
> but only return 2 callable objects, one callable object for ((regParam=0.1, 
> maxIter=5), (regParam=0.1, maxIter=10)), another one for ((regParam=0.3, 
> maxIter=5), (regParam=0.3, maxIter=10)).
> and the default "fitCallables/fit with paramMaps" can be implemented as 
> following:
> {code}
> def fitCallables(dataset: Dataset[_], paramMaps: Array[ParamMap]):
> Array[Callable[Map[Int, M]]] = {
>   paramMaps.zipWithIndex.map { case (paramMap: ParamMap, index: Int) =>
> new Callable[Map[Int, M]] {
>   override def call(): Map[Int, M] = Map(index -> fit(dataset, paramMap))
> }
>   }
> }
> def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): Seq[M] = {
>fitCallables(dataset, paramMaps).map { _.call().toSeq }
>  .flatMap(_).sortBy(_._1).map(_._2)
> }
> {code}
> If use the API I proposed above, the code in 
> [CrossValidation|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala#L149-L159]
> can be changed to:
> {code}
>   val trainingDataset = sparkSession.createDataFrame(training, 
> schema).cache()
>   val validationDataset = sparkSession.createDataFrame(validation, 
> schema).cache()
>   // Fit models in a Future for training in parallel
>   val modelMapFutures = fitCallables(trainingDataset, paramMaps).map { 
> callable =>
>  Future[Map[Int, Model[_]]] {
> val modelMap = callable.call()
> if (collectSubModelsParam) {
>...
> }
> modelMap
>  } (executionContext)
>   }
>   // Unpersist training data only when all models have trained
>   Future.sequence[Model[_], Iterable](modelMapFutures)(implicitly, 
> executionContext)
> .onComplete { _ => trainingDataset.unpersist() } (executionContext)
>   // Evaluate models in a Future that will calulate a metric and allow 
> model to be cleaned up
>   val foldMetricMapFutures = modelMapFutures.map { modelMapFuture =>
> modelMapFuture.map { modelMap =>
>   modelMap.map { case (index: Int, model: Model[_]) =>
> val metric = eval.evaluate(model.transform(validationDataset, 
> paramMaps(index)))
> (index, me

[jira] [Commented] (SPARK-22126) Fix model-specific optimization support for ML tuning

2017-12-31 Thread Bryan Cutler (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16307145#comment-16307145
 ] 

Bryan Cutler commented on SPARK-22126:
--

Hi All, I've been following the discussions here and the proposed solution 
seems pretty flexible to be able to do all that is required.  If it's ok with 
you all, I'd still like to submit a PR with an alternate implementation which I 
brought up way back when this issue came up in SPARK-19357.  It is a bit more 
simple and only adds a basic method to the Estimator API, but still brings back 
support for model-specific optimization to where it was before any of the 
parallelism was introduced.  Apologies if I am missing something from all the 
previous discussion that requires a more involved API changes, but I just 
thought I would bring this up since it is a little more simple and seems to 
meet our needs, from what I can tell.


> Fix model-specific optimization support for ML tuning
> -
>
> Key: SPARK-22126
> URL: https://issues.apache.org/jira/browse/SPARK-22126
> Project: Spark
>  Issue Type: Improvement
>  Components: ML
>Affects Versions: 2.3.0
>Reporter: Weichen Xu
>
> Fix model-specific optimization support for ML tuning. This is discussed in 
> SPARK-19357
> more discussion is here
>  https://gist.github.com/MrBago/f501b9e7712dc6a67dc9fea24e309bf0
> Anyone who's following might want to scan the design doc (in the links 
> above), the latest api proposal is:
> {code}
> def fitMultiple(
> dataset: Dataset[_],
> paramMaps: Array[ParamMap]
>   ): java.util.Iterator[scala.Tuple2[java.lang.Integer, Model]]
> {code}
> Old discussion:
> I copy discussion from gist to here:
> I propose to design API as:
> {code}
> def fitCallables(dataset: Dataset[_], paramMaps: Array[ParamMap]): 
> Array[Callable[Map[Int, M]]]
> {code}
> Let me use an example to explain the API:
> {quote}
>  It could be possible to still use the current parallelism and still allow 
> for model-specific optimizations. For example, if we doing cross validation 
> and have a param map with regParam = (0.1, 0.3) and maxIter = (5, 10). Lets 
> say that the cross validator could know that maxIter is optimized for the 
> model being evaluated (e.g. a new method in Estimator that return such 
> params). It would then be straightforward for the cross validator to remove 
> maxIter from the param map that will be parallelized over and use it to 
> create 2 arrays of paramMaps: ((regParam=0.1, maxIter=5), (regParam=0.1, 
> maxIter=10)) and ((regParam=0.3, maxIter=5), (regParam=0.3, maxIter=10)).
> {quote}
> In this example, we can see that, models computed from ((regParam=0.1, 
> maxIter=5), (regParam=0.1, maxIter=10)) can only be computed in one thread 
> code, models computed from ((regParam=0.3, maxIter=5), (regParam=0.3, 
> maxIter=10))  in another thread. In this example, there're 4 paramMaps, but 
> we can at most generate two threads to compute the models for them.
> The API above allow "callable.call()" to return multiple models, and return 
> type is {code}Map[Int, M]{code}, key is integer, used to mark the paramMap 
> index for corresponding model. Use the example above, there're 4 paramMaps, 
> but only return 2 callable objects, one callable object for ((regParam=0.1, 
> maxIter=5), (regParam=0.1, maxIter=10)), another one for ((regParam=0.3, 
> maxIter=5), (regParam=0.3, maxIter=10)).
> and the default "fitCallables/fit with paramMaps" can be implemented as 
> following:
> {code}
> def fitCallables(dataset: Dataset[_], paramMaps: Array[ParamMap]):
> Array[Callable[Map[Int, M]]] = {
>   paramMaps.zipWithIndex.map { case (paramMap: ParamMap, index: Int) =>
> new Callable[Map[Int, M]] {
>   override def call(): Map[Int, M] = Map(index -> fit(dataset, paramMap))
> }
>   }
> }
> def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): Seq[M] = {
>fitCallables(dataset, paramMaps).map { _.call().toSeq }
>  .flatMap(_).sortBy(_._1).map(_._2)
> }
> {code}
> If use the API I proposed above, the code in 
> [CrossValidation|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala#L149-L159]
> can be changed to:
> {code}
>   val trainingDataset = sparkSession.createDataFrame(training, 
> schema).cache()
>   val validationDataset = sparkSession.createDataFrame(validation, 
> schema).cache()
>   // Fit models in a Future for training in parallel
>   val modelMapFutures = fitCallables(trainingDataset, paramMaps).map { 
> callable =>
>  Future[Map[Int, Model[_]]] {
> val modelMap = callable.call()
> if (collectSubModelsParam) {
>...
> }
> modelMap
>  } (executionContext)
>   }
>   // Un

[jira] [Commented] (SPARK-22126) Fix model-specific optimization support for ML tuning

2017-12-27 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16304765#comment-16304765
 ] 

Apache Spark commented on SPARK-22126:
--

User 'MrBago' has created a pull request for this issue:
https://github.com/apache/spark/pull/20095

> Fix model-specific optimization support for ML tuning
> -
>
> Key: SPARK-22126
> URL: https://issues.apache.org/jira/browse/SPARK-22126
> Project: Spark
>  Issue Type: Improvement
>  Components: ML
>Affects Versions: 2.3.0
>Reporter: Weichen Xu
>
> Fix model-specific optimization support for ML tuning. This is discussed in 
> SPARK-19357
> more discussion is here
>  https://gist.github.com/MrBago/f501b9e7712dc6a67dc9fea24e309bf0
> Anyone who's following might want to scan the design doc (in the links 
> above), the latest api proposal is:
> {code}
> def fitMultiple(
> dataset: Dataset[_],
> paramMaps: Array[ParamMap]
>   ): java.util.Iterator[scala.Tuple2[java.lang.Integer, Model]]
> {code}
> Old discussion:
> I copy discussion from gist to here:
> I propose to design API as:
> {code}
> def fitCallables(dataset: Dataset[_], paramMaps: Array[ParamMap]): 
> Array[Callable[Map[Int, M]]]
> {code}
> Let me use an example to explain the API:
> {quote}
>  It could be possible to still use the current parallelism and still allow 
> for model-specific optimizations. For example, if we doing cross validation 
> and have a param map with regParam = (0.1, 0.3) and maxIter = (5, 10). Lets 
> say that the cross validator could know that maxIter is optimized for the 
> model being evaluated (e.g. a new method in Estimator that return such 
> params). It would then be straightforward for the cross validator to remove 
> maxIter from the param map that will be parallelized over and use it to 
> create 2 arrays of paramMaps: ((regParam=0.1, maxIter=5), (regParam=0.1, 
> maxIter=10)) and ((regParam=0.3, maxIter=5), (regParam=0.3, maxIter=10)).
> {quote}
> In this example, we can see that, models computed from ((regParam=0.1, 
> maxIter=5), (regParam=0.1, maxIter=10)) can only be computed in one thread 
> code, models computed from ((regParam=0.3, maxIter=5), (regParam=0.3, 
> maxIter=10))  in another thread. In this example, there're 4 paramMaps, but 
> we can at most generate two threads to compute the models for them.
> The API above allow "callable.call()" to return multiple models, and return 
> type is {code}Map[Int, M]{code}, key is integer, used to mark the paramMap 
> index for corresponding model. Use the example above, there're 4 paramMaps, 
> but only return 2 callable objects, one callable object for ((regParam=0.1, 
> maxIter=5), (regParam=0.1, maxIter=10)), another one for ((regParam=0.3, 
> maxIter=5), (regParam=0.3, maxIter=10)).
> and the default "fitCallables/fit with paramMaps" can be implemented as 
> following:
> {code}
> def fitCallables(dataset: Dataset[_], paramMaps: Array[ParamMap]):
> Array[Callable[Map[Int, M]]] = {
>   paramMaps.zipWithIndex.map { case (paramMap: ParamMap, index: Int) =>
> new Callable[Map[Int, M]] {
>   override def call(): Map[Int, M] = Map(index -> fit(dataset, paramMap))
> }
>   }
> }
> def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): Seq[M] = {
>fitCallables(dataset, paramMaps).map { _.call().toSeq }
>  .flatMap(_).sortBy(_._1).map(_._2)
> }
> {code}
> If use the API I proposed above, the code in 
> [CrossValidation|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala#L149-L159]
> can be changed to:
> {code}
>   val trainingDataset = sparkSession.createDataFrame(training, 
> schema).cache()
>   val validationDataset = sparkSession.createDataFrame(validation, 
> schema).cache()
>   // Fit models in a Future for training in parallel
>   val modelMapFutures = fitCallables(trainingDataset, paramMaps).map { 
> callable =>
>  Future[Map[Int, Model[_]]] {
> val modelMap = callable.call()
> if (collectSubModelsParam) {
>...
> }
> modelMap
>  } (executionContext)
>   }
>   // Unpersist training data only when all models have trained
>   Future.sequence[Model[_], Iterable](modelMapFutures)(implicitly, 
> executionContext)
> .onComplete { _ => trainingDataset.unpersist() } (executionContext)
>   // Evaluate models in a Future that will calulate a metric and allow 
> model to be cleaned up
>   val foldMetricMapFutures = modelMapFutures.map { modelMapFuture =>
> modelMapFuture.map { modelMap =>
>   modelMap.map { case (index: Int, model: Model[_]) =>
> val metric = eval.evaluate(model.transform(validationDataset, 
> paramMaps(index)))
> (index, metric)

[jira] [Commented] (SPARK-22126) Fix model-specific optimization support for ML tuning

2017-12-22 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16301986#comment-16301986
 ] 

Apache Spark commented on SPARK-22126:
--

User 'MrBago' has created a pull request for this issue:
https://github.com/apache/spark/pull/20058

> Fix model-specific optimization support for ML tuning
> -
>
> Key: SPARK-22126
> URL: https://issues.apache.org/jira/browse/SPARK-22126
> Project: Spark
>  Issue Type: Improvement
>  Components: ML
>Affects Versions: 2.3.0
>Reporter: Weichen Xu
>
> Fix model-specific optimization support for ML tuning. This is discussed in 
> SPARK-19357
> more discussion is here
>  https://gist.github.com/MrBago/f501b9e7712dc6a67dc9fea24e309bf0
> Anyone who's following might want to scan the design doc (in the links 
> above), the latest api proposal is:
> {code}
> def fitMultiple(
> dataset: Dataset[_],
> paramMaps: Array[ParamMap]
>   ): java.util.Iterator[scala.Tuple2[java.lang.Integer, Model]]
> {code}
> Old discussion:
> I copy discussion from gist to here:
> I propose to design API as:
> {code}
> def fitCallables(dataset: Dataset[_], paramMaps: Array[ParamMap]): 
> Array[Callable[Map[Int, M]]]
> {code}
> Let me use an example to explain the API:
> {quote}
>  It could be possible to still use the current parallelism and still allow 
> for model-specific optimizations. For example, if we doing cross validation 
> and have a param map with regParam = (0.1, 0.3) and maxIter = (5, 10). Lets 
> say that the cross validator could know that maxIter is optimized for the 
> model being evaluated (e.g. a new method in Estimator that return such 
> params). It would then be straightforward for the cross validator to remove 
> maxIter from the param map that will be parallelized over and use it to 
> create 2 arrays of paramMaps: ((regParam=0.1, maxIter=5), (regParam=0.1, 
> maxIter=10)) and ((regParam=0.3, maxIter=5), (regParam=0.3, maxIter=10)).
> {quote}
> In this example, we can see that, models computed from ((regParam=0.1, 
> maxIter=5), (regParam=0.1, maxIter=10)) can only be computed in one thread 
> code, models computed from ((regParam=0.3, maxIter=5), (regParam=0.3, 
> maxIter=10))  in another thread. In this example, there're 4 paramMaps, but 
> we can at most generate two threads to compute the models for them.
> The API above allow "callable.call()" to return multiple models, and return 
> type is {code}Map[Int, M]{code}, key is integer, used to mark the paramMap 
> index for corresponding model. Use the example above, there're 4 paramMaps, 
> but only return 2 callable objects, one callable object for ((regParam=0.1, 
> maxIter=5), (regParam=0.1, maxIter=10)), another one for ((regParam=0.3, 
> maxIter=5), (regParam=0.3, maxIter=10)).
> and the default "fitCallables/fit with paramMaps" can be implemented as 
> following:
> {code}
> def fitCallables(dataset: Dataset[_], paramMaps: Array[ParamMap]):
> Array[Callable[Map[Int, M]]] = {
>   paramMaps.zipWithIndex.map { case (paramMap: ParamMap, index: Int) =>
> new Callable[Map[Int, M]] {
>   override def call(): Map[Int, M] = Map(index -> fit(dataset, paramMap))
> }
>   }
> }
> def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): Seq[M] = {
>fitCallables(dataset, paramMaps).map { _.call().toSeq }
>  .flatMap(_).sortBy(_._1).map(_._2)
> }
> {code}
> If use the API I proposed above, the code in 
> [CrossValidation|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala#L149-L159]
> can be changed to:
> {code}
>   val trainingDataset = sparkSession.createDataFrame(training, 
> schema).cache()
>   val validationDataset = sparkSession.createDataFrame(validation, 
> schema).cache()
>   // Fit models in a Future for training in parallel
>   val modelMapFutures = fitCallables(trainingDataset, paramMaps).map { 
> callable =>
>  Future[Map[Int, Model[_]]] {
> val modelMap = callable.call()
> if (collectSubModelsParam) {
>...
> }
> modelMap
>  } (executionContext)
>   }
>   // Unpersist training data only when all models have trained
>   Future.sequence[Model[_], Iterable](modelMapFutures)(implicitly, 
> executionContext)
> .onComplete { _ => trainingDataset.unpersist() } (executionContext)
>   // Evaluate models in a Future that will calulate a metric and allow 
> model to be cleaned up
>   val foldMetricMapFutures = modelMapFutures.map { modelMapFuture =>
> modelMapFuture.map { modelMap =>
>   modelMap.map { case (index: Int, model: Model[_]) =>
> val metric = eval.evaluate(model.transform(validationDataset, 
> paramMaps(index)))
> (index, metric)

[jira] [Commented] (SPARK-22126) Fix model-specific optimization support for ML tuning

2017-12-18 Thread Bago Amirbekian (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16295964#comment-16295964
 ] 

Bago Amirbekian commented on SPARK-22126:
-

Anyone who's following might want to scan the design doc, the latest api 
proposal is:

{code}
def fitMultiple(
dataset: Dataset[_],
paramMaps: Array[ParamMap]
  ): java.util.Iterator[scala.Tuple2[java.lang.Integer, Model]]

{code}

> Fix model-specific optimization support for ML tuning
> -
>
> Key: SPARK-22126
> URL: https://issues.apache.org/jira/browse/SPARK-22126
> Project: Spark
>  Issue Type: Improvement
>  Components: ML
>Affects Versions: 2.3.0
>Reporter: Weichen Xu
>
> Fix model-specific optimization support for ML tuning. This is discussed in 
> SPARK-19357
> more discussion is here
>  https://gist.github.com/MrBago/f501b9e7712dc6a67dc9fea24e309bf0
> I copy discussion from gist to here:
> I propose to design API as:
> {code}
> def fitCallables(dataset: Dataset[_], paramMaps: Array[ParamMap]): 
> Array[Callable[Map[Int, M]]]
> {code}
> Let me use an example to explain the API:
> {quote}
>  It could be possible to still use the current parallelism and still allow 
> for model-specific optimizations. For example, if we doing cross validation 
> and have a param map with regParam = (0.1, 0.3) and maxIter = (5, 10). Lets 
> say that the cross validator could know that maxIter is optimized for the 
> model being evaluated (e.g. a new method in Estimator that return such 
> params). It would then be straightforward for the cross validator to remove 
> maxIter from the param map that will be parallelized over and use it to 
> create 2 arrays of paramMaps: ((regParam=0.1, maxIter=5), (regParam=0.1, 
> maxIter=10)) and ((regParam=0.3, maxIter=5), (regParam=0.3, maxIter=10)).
> {quote}
> In this example, we can see that, models computed from ((regParam=0.1, 
> maxIter=5), (regParam=0.1, maxIter=10)) can only be computed in one thread 
> code, models computed from ((regParam=0.3, maxIter=5), (regParam=0.3, 
> maxIter=10))  in another thread. In this example, there're 4 paramMaps, but 
> we can at most generate two threads to compute the models for them.
> The API above allow "callable.call()" to return multiple models, and return 
> type is {code}Map[Int, M]{code}, key is integer, used to mark the paramMap 
> index for corresponding model. Use the example above, there're 4 paramMaps, 
> but only return 2 callable objects, one callable object for ((regParam=0.1, 
> maxIter=5), (regParam=0.1, maxIter=10)), another one for ((regParam=0.3, 
> maxIter=5), (regParam=0.3, maxIter=10)).
> and the default "fitCallables/fit with paramMaps" can be implemented as 
> following:
> {code}
> def fitCallables(dataset: Dataset[_], paramMaps: Array[ParamMap]):
> Array[Callable[Map[Int, M]]] = {
>   paramMaps.zipWithIndex.map { case (paramMap: ParamMap, index: Int) =>
> new Callable[Map[Int, M]] {
>   override def call(): Map[Int, M] = Map(index -> fit(dataset, paramMap))
> }
>   }
> }
> def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): Seq[M] = {
>fitCallables(dataset, paramMaps).map { _.call().toSeq }
>  .flatMap(_).sortBy(_._1).map(_._2)
> }
> {code}
> If use the API I proposed above, the code in 
> [CrossValidation|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala#L149-L159]
> can be changed to:
> {code}
>   val trainingDataset = sparkSession.createDataFrame(training, 
> schema).cache()
>   val validationDataset = sparkSession.createDataFrame(validation, 
> schema).cache()
>   // Fit models in a Future for training in parallel
>   val modelMapFutures = fitCallables(trainingDataset, paramMaps).map { 
> callable =>
>  Future[Map[Int, Model[_]]] {
> val modelMap = callable.call()
> if (collectSubModelsParam) {
>...
> }
> modelMap
>  } (executionContext)
>   }
>   // Unpersist training data only when all models have trained
>   Future.sequence[Model[_], Iterable](modelMapFutures)(implicitly, 
> executionContext)
> .onComplete { _ => trainingDataset.unpersist() } (executionContext)
>   // Evaluate models in a Future that will calulate a metric and allow 
> model to be cleaned up
>   val foldMetricMapFutures = modelMapFutures.map { modelMapFuture =>
> modelMapFuture.map { modelMap =>
>   modelMap.map { case (index: Int, model: Model[_]) =>
> val metric = eval.evaluate(model.transform(validationDataset, 
> paramMaps(index)))
> (index, metric)
>   }
> } (executionContext)
>   }
>   // Wait for metrics to be calculated before unpersisting validation 
> dataset
>   

[jira] [Commented] (SPARK-22126) Fix model-specific optimization support for ML tuning

2017-12-12 Thread Joseph K. Bradley (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16288305#comment-16288305
 ] 

Joseph K. Bradley commented on SPARK-22126:
---

Hi all, thanks for the comments.  I realized my use of "Spark job" was 
imprecise above, so I extended that explanation within the linked design doc.  
Check out the new sections:
* "Use cases for model-specific optimizations & parallelism"
* "Other Q&A"

This should answer both [~WeichenXu123] and [~bago.amirbekian].

[~tomas.nykodym], with the current state of Spark master (2.3), we already let 
users shoot themselves in the foot by running big jobs in parallel.  We try to 
make it less likely by marking the "parallelism" Param as an expert-only API.  
I think it will be hard to prevent this without completely removing the 
"parallelism" Param, but please say if you have ideas for a better user-facing 
API.

> Fix model-specific optimization support for ML tuning
> -
>
> Key: SPARK-22126
> URL: https://issues.apache.org/jira/browse/SPARK-22126
> Project: Spark
>  Issue Type: Improvement
>  Components: ML
>Affects Versions: 2.3.0
>Reporter: Weichen Xu
>
> Fix model-specific optimization support for ML tuning. This is discussed in 
> SPARK-19357
> more discussion is here
>  https://gist.github.com/MrBago/f501b9e7712dc6a67dc9fea24e309bf0
> I copy discussion from gist to here:
> I propose to design API as:
> {code}
> def fitCallables(dataset: Dataset[_], paramMaps: Array[ParamMap]): 
> Array[Callable[Map[Int, M]]]
> {code}
> Let me use an example to explain the API:
> {quote}
>  It could be possible to still use the current parallelism and still allow 
> for model-specific optimizations. For example, if we doing cross validation 
> and have a param map with regParam = (0.1, 0.3) and maxIter = (5, 10). Lets 
> say that the cross validator could know that maxIter is optimized for the 
> model being evaluated (e.g. a new method in Estimator that return such 
> params). It would then be straightforward for the cross validator to remove 
> maxIter from the param map that will be parallelized over and use it to 
> create 2 arrays of paramMaps: ((regParam=0.1, maxIter=5), (regParam=0.1, 
> maxIter=10)) and ((regParam=0.3, maxIter=5), (regParam=0.3, maxIter=10)).
> {quote}
> In this example, we can see that, models computed from ((regParam=0.1, 
> maxIter=5), (regParam=0.1, maxIter=10)) can only be computed in one thread 
> code, models computed from ((regParam=0.3, maxIter=5), (regParam=0.3, 
> maxIter=10))  in another thread. In this example, there're 4 paramMaps, but 
> we can at most generate two threads to compute the models for them.
> The API above allow "callable.call()" to return multiple models, and return 
> type is {code}Map[Int, M]{code}, key is integer, used to mark the paramMap 
> index for corresponding model. Use the example above, there're 4 paramMaps, 
> but only return 2 callable objects, one callable object for ((regParam=0.1, 
> maxIter=5), (regParam=0.1, maxIter=10)), another one for ((regParam=0.3, 
> maxIter=5), (regParam=0.3, maxIter=10)).
> and the default "fitCallables/fit with paramMaps" can be implemented as 
> following:
> {code}
> def fitCallables(dataset: Dataset[_], paramMaps: Array[ParamMap]):
> Array[Callable[Map[Int, M]]] = {
>   paramMaps.zipWithIndex.map { case (paramMap: ParamMap, index: Int) =>
> new Callable[Map[Int, M]] {
>   override def call(): Map[Int, M] = Map(index -> fit(dataset, paramMap))
> }
>   }
> }
> def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): Seq[M] = {
>fitCallables(dataset, paramMaps).map { _.call().toSeq }
>  .flatMap(_).sortBy(_._1).map(_._2)
> }
> {code}
> If use the API I proposed above, the code in 
> [CrossValidation|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala#L149-L159]
> can be changed to:
> {code}
>   val trainingDataset = sparkSession.createDataFrame(training, 
> schema).cache()
>   val validationDataset = sparkSession.createDataFrame(validation, 
> schema).cache()
>   // Fit models in a Future for training in parallel
>   val modelMapFutures = fitCallables(trainingDataset, paramMaps).map { 
> callable =>
>  Future[Map[Int, Model[_]]] {
> val modelMap = callable.call()
> if (collectSubModelsParam) {
>...
> }
> modelMap
>  } (executionContext)
>   }
>   // Unpersist training data only when all models have trained
>   Future.sequence[Model[_], Iterable](modelMapFutures)(implicitly, 
> executionContext)
> .onComplete { _ => trainingDataset.unpersist() } (executionContext)
>   // Evaluate models in a Future that will calulate a metric and allow 

[jira] [Commented] (SPARK-22126) Fix model-specific optimization support for ML tuning

2017-12-08 Thread Tomas Nykodym (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16283982#comment-16283982
 ] 

Tomas Nykodym commented on SPARK-22126:
---

I think Weichen's API is more flexible and, at least to me, simpler to use as 
it has a simple contract. I like the fact that the callables you get back are 
(or at least should be) independent on each other so there is no need to worry 
about deadlocking or cpu utilization in case of large number of smaller jobs. 

As for not running big jobs in parallel - I don't like the fact that this would 
be left to the user. In my experience this is gonna be misused. I also think 
that is quite limiting for the future. I understand we don't want to 
over-optimize but at the same time changing API later on is hard and working 
around an API which is not flexible enough equally so.







  

> Fix model-specific optimization support for ML tuning
> -
>
> Key: SPARK-22126
> URL: https://issues.apache.org/jira/browse/SPARK-22126
> Project: Spark
>  Issue Type: Improvement
>  Components: ML
>Affects Versions: 2.3.0
>Reporter: Weichen Xu
>
> Fix model-specific optimization support for ML tuning. This is discussed in 
> SPARK-19357
> more discussion is here
>  https://gist.github.com/MrBago/f501b9e7712dc6a67dc9fea24e309bf0
> I copy discussion from gist to here:
> I propose to design API as:
> {code}
> def fitCallables(dataset: Dataset[_], paramMaps: Array[ParamMap]): 
> Array[Callable[Map[Int, M]]]
> {code}
> Let me use an example to explain the API:
> {quote}
>  It could be possible to still use the current parallelism and still allow 
> for model-specific optimizations. For example, if we doing cross validation 
> and have a param map with regParam = (0.1, 0.3) and maxIter = (5, 10). Lets 
> say that the cross validator could know that maxIter is optimized for the 
> model being evaluated (e.g. a new method in Estimator that return such 
> params). It would then be straightforward for the cross validator to remove 
> maxIter from the param map that will be parallelized over and use it to 
> create 2 arrays of paramMaps: ((regParam=0.1, maxIter=5), (regParam=0.1, 
> maxIter=10)) and ((regParam=0.3, maxIter=5), (regParam=0.3, maxIter=10)).
> {quote}
> In this example, we can see that, models computed from ((regParam=0.1, 
> maxIter=5), (regParam=0.1, maxIter=10)) can only be computed in one thread 
> code, models computed from ((regParam=0.3, maxIter=5), (regParam=0.3, 
> maxIter=10))  in another thread. In this example, there're 4 paramMaps, but 
> we can at most generate two threads to compute the models for them.
> The API above allow "callable.call()" to return multiple models, and return 
> type is {code}Map[Int, M]{code}, key is integer, used to mark the paramMap 
> index for corresponding model. Use the example above, there're 4 paramMaps, 
> but only return 2 callable objects, one callable object for ((regParam=0.1, 
> maxIter=5), (regParam=0.1, maxIter=10)), another one for ((regParam=0.3, 
> maxIter=5), (regParam=0.3, maxIter=10)).
> and the default "fitCallables/fit with paramMaps" can be implemented as 
> following:
> {code}
> def fitCallables(dataset: Dataset[_], paramMaps: Array[ParamMap]):
> Array[Callable[Map[Int, M]]] = {
>   paramMaps.zipWithIndex.map { case (paramMap: ParamMap, index: Int) =>
> new Callable[Map[Int, M]] {
>   override def call(): Map[Int, M] = Map(index -> fit(dataset, paramMap))
> }
>   }
> }
> def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): Seq[M] = {
>fitCallables(dataset, paramMaps).map { _.call().toSeq }
>  .flatMap(_).sortBy(_._1).map(_._2)
> }
> {code}
> If use the API I proposed above, the code in 
> [CrossValidation|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala#L149-L159]
> can be changed to:
> {code}
>   val trainingDataset = sparkSession.createDataFrame(training, 
> schema).cache()
>   val validationDataset = sparkSession.createDataFrame(validation, 
> schema).cache()
>   // Fit models in a Future for training in parallel
>   val modelMapFutures = fitCallables(trainingDataset, paramMaps).map { 
> callable =>
>  Future[Map[Int, Model[_]]] {
> val modelMap = callable.call()
> if (collectSubModelsParam) {
>...
> }
> modelMap
>  } (executionContext)
>   }
>   // Unpersist training data only when all models have trained
>   Future.sequence[Model[_], Iterable](modelMapFutures)(implicitly, 
> executionContext)
> .onComplete { _ => trainingDataset.unpersist() } (executionContext)
>   // Evaluate models in a Future that will calulate a metric and allow 
> model to be cleaned up
>   val

[jira] [Commented] (SPARK-22126) Fix model-specific optimization support for ML tuning

2017-12-07 Thread Bago Amirbekian (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16283145#comment-16283145
 ] 

Bago Amirbekian commented on SPARK-22126:
-

Joseph, the way I read your comment is to say that we should support 
parallelism & optimized model but not parallelism with optimized models. I 
think that would cover our current use cases, but I'm wondering if we want to 
leave open the possibility of optimizing parameters like maxIter & maxDepth and 
have those optimized implements play nice with parallelism in CrossValidator.

I normally believe in doing the simple thing first and then changing it if 
needed, but it would requiring adding another public API later.

> Fix model-specific optimization support for ML tuning
> -
>
> Key: SPARK-22126
> URL: https://issues.apache.org/jira/browse/SPARK-22126
> Project: Spark
>  Issue Type: Improvement
>  Components: ML
>Affects Versions: 2.3.0
>Reporter: Weichen Xu
>
> Fix model-specific optimization support for ML tuning. This is discussed in 
> SPARK-19357
> more discussion is here
>  https://gist.github.com/MrBago/f501b9e7712dc6a67dc9fea24e309bf0
> I copy discussion from gist to here:
> I propose to design API as:
> {code}
> def fitCallables(dataset: Dataset[_], paramMaps: Array[ParamMap]): 
> Array[Callable[Map[Int, M]]]
> {code}
> Let me use an example to explain the API:
> {quote}
>  It could be possible to still use the current parallelism and still allow 
> for model-specific optimizations. For example, if we doing cross validation 
> and have a param map with regParam = (0.1, 0.3) and maxIter = (5, 10). Lets 
> say that the cross validator could know that maxIter is optimized for the 
> model being evaluated (e.g. a new method in Estimator that return such 
> params). It would then be straightforward for the cross validator to remove 
> maxIter from the param map that will be parallelized over and use it to 
> create 2 arrays of paramMaps: ((regParam=0.1, maxIter=5), (regParam=0.1, 
> maxIter=10)) and ((regParam=0.3, maxIter=5), (regParam=0.3, maxIter=10)).
> {quote}
> In this example, we can see that, models computed from ((regParam=0.1, 
> maxIter=5), (regParam=0.1, maxIter=10)) can only be computed in one thread 
> code, models computed from ((regParam=0.3, maxIter=5), (regParam=0.3, 
> maxIter=10))  in another thread. In this example, there're 4 paramMaps, but 
> we can at most generate two threads to compute the models for them.
> The API above allow "callable.call()" to return multiple models, and return 
> type is {code}Map[Int, M]{code}, key is integer, used to mark the paramMap 
> index for corresponding model. Use the example above, there're 4 paramMaps, 
> but only return 2 callable objects, one callable object for ((regParam=0.1, 
> maxIter=5), (regParam=0.1, maxIter=10)), another one for ((regParam=0.3, 
> maxIter=5), (regParam=0.3, maxIter=10)).
> and the default "fitCallables/fit with paramMaps" can be implemented as 
> following:
> {code}
> def fitCallables(dataset: Dataset[_], paramMaps: Array[ParamMap]):
> Array[Callable[Map[Int, M]]] = {
>   paramMaps.zipWithIndex.map { case (paramMap: ParamMap, index: Int) =>
> new Callable[Map[Int, M]] {
>   override def call(): Map[Int, M] = Map(index -> fit(dataset, paramMap))
> }
>   }
> }
> def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): Seq[M] = {
>fitCallables(dataset, paramMaps).map { _.call().toSeq }
>  .flatMap(_).sortBy(_._1).map(_._2)
> }
> {code}
> If use the API I proposed above, the code in 
> [CrossValidation|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala#L149-L159]
> can be changed to:
> {code}
>   val trainingDataset = sparkSession.createDataFrame(training, 
> schema).cache()
>   val validationDataset = sparkSession.createDataFrame(validation, 
> schema).cache()
>   // Fit models in a Future for training in parallel
>   val modelMapFutures = fitCallables(trainingDataset, paramMaps).map { 
> callable =>
>  Future[Map[Int, Model[_]]] {
> val modelMap = callable.call()
> if (collectSubModelsParam) {
>...
> }
> modelMap
>  } (executionContext)
>   }
>   // Unpersist training data only when all models have trained
>   Future.sequence[Model[_], Iterable](modelMapFutures)(implicitly, 
> executionContext)
> .onComplete { _ => trainingDataset.unpersist() } (executionContext)
>   // Evaluate models in a Future that will calulate a metric and allow 
> model to be cleaned up
>   val foldMetricMapFutures = modelMapFutures.map { modelMapFuture =>
> modelMapFuture.map { modelMap =>
>   modelMap.map { case (index: Int, model: Mo

[jira] [Commented] (SPARK-22126) Fix model-specific optimization support for ML tuning

2017-12-07 Thread Weichen Xu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16283038#comment-16283038
 ] 

Weichen Xu commented on SPARK-22126:


[~josephkb] For 2nd type: suppose there is a single large spark job, each task 
of this job fit a model, and if the API return multiple callables, then each 
callable will fetch one model from the spark job result ? Is this your meaning 
? The usage looks mode a little awkward.
Or can we directly return such type
{code}
RDD[Model[_]]
{code}
because each task of this spark job will fit a model, then directly return a 
model RDD so it will be easier to use. (and Do not collect all the models to 
driver side to avoid OOM). What do you think of this ?

[~bago.amirbekian] [~tomas.nykodym] Any thoughts ?

> Fix model-specific optimization support for ML tuning
> -
>
> Key: SPARK-22126
> URL: https://issues.apache.org/jira/browse/SPARK-22126
> Project: Spark
>  Issue Type: Improvement
>  Components: ML
>Affects Versions: 2.3.0
>Reporter: Weichen Xu
>
> Fix model-specific optimization support for ML tuning. This is discussed in 
> SPARK-19357
> more discussion is here
>  https://gist.github.com/MrBago/f501b9e7712dc6a67dc9fea24e309bf0
> I copy discussion from gist to here:
> I propose to design API as:
> {code}
> def fitCallables(dataset: Dataset[_], paramMaps: Array[ParamMap]): 
> Array[Callable[Map[Int, M]]]
> {code}
> Let me use an example to explain the API:
> {quote}
>  It could be possible to still use the current parallelism and still allow 
> for model-specific optimizations. For example, if we doing cross validation 
> and have a param map with regParam = (0.1, 0.3) and maxIter = (5, 10). Lets 
> say that the cross validator could know that maxIter is optimized for the 
> model being evaluated (e.g. a new method in Estimator that return such 
> params). It would then be straightforward for the cross validator to remove 
> maxIter from the param map that will be parallelized over and use it to 
> create 2 arrays of paramMaps: ((regParam=0.1, maxIter=5), (regParam=0.1, 
> maxIter=10)) and ((regParam=0.3, maxIter=5), (regParam=0.3, maxIter=10)).
> {quote}
> In this example, we can see that, models computed from ((regParam=0.1, 
> maxIter=5), (regParam=0.1, maxIter=10)) can only be computed in one thread 
> code, models computed from ((regParam=0.3, maxIter=5), (regParam=0.3, 
> maxIter=10))  in another thread. In this example, there're 4 paramMaps, but 
> we can at most generate two threads to compute the models for them.
> The API above allow "callable.call()" to return multiple models, and return 
> type is {code}Map[Int, M]{code}, key is integer, used to mark the paramMap 
> index for corresponding model. Use the example above, there're 4 paramMaps, 
> but only return 2 callable objects, one callable object for ((regParam=0.1, 
> maxIter=5), (regParam=0.1, maxIter=10)), another one for ((regParam=0.3, 
> maxIter=5), (regParam=0.3, maxIter=10)).
> and the default "fitCallables/fit with paramMaps" can be implemented as 
> following:
> {code}
> def fitCallables(dataset: Dataset[_], paramMaps: Array[ParamMap]):
> Array[Callable[Map[Int, M]]] = {
>   paramMaps.zipWithIndex.map { case (paramMap: ParamMap, index: Int) =>
> new Callable[Map[Int, M]] {
>   override def call(): Map[Int, M] = Map(index -> fit(dataset, paramMap))
> }
>   }
> }
> def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): Seq[M] = {
>fitCallables(dataset, paramMaps).map { _.call().toSeq }
>  .flatMap(_).sortBy(_._1).map(_._2)
> }
> {code}
> If use the API I proposed above, the code in 
> [CrossValidation|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala#L149-L159]
> can be changed to:
> {code}
>   val trainingDataset = sparkSession.createDataFrame(training, 
> schema).cache()
>   val validationDataset = sparkSession.createDataFrame(validation, 
> schema).cache()
>   // Fit models in a Future for training in parallel
>   val modelMapFutures = fitCallables(trainingDataset, paramMaps).map { 
> callable =>
>  Future[Map[Int, Model[_]]] {
> val modelMap = callable.call()
> if (collectSubModelsParam) {
>...
> }
> modelMap
>  } (executionContext)
>   }
>   // Unpersist training data only when all models have trained
>   Future.sequence[Model[_], Iterable](modelMapFutures)(implicitly, 
> executionContext)
> .onComplete { _ => trainingDataset.unpersist() } (executionContext)
>   // Evaluate models in a Future that will calulate a metric and allow 
> model to be cleaned up
>   val foldMetricMapFutures = modelMapFutures.map { modelMapFuture =>
> modelMapFuture.m

[jira] [Commented] (SPARK-22126) Fix model-specific optimization support for ML tuning

2017-12-07 Thread Joseph K. Bradley (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16282699#comment-16282699
 ] 

Joseph K. Bradley commented on SPARK-22126:
---

Continuing the discussion from the gist: Perhaps we're overthinking things when 
worrying about handling multiple interdependent Callables.  I figure there are 
2 main use cases for optimizing model fitting:
* Parallel Spark jobs for fitting multiple models at once
** This is what has been introduced into CrossValidator, TrainValidationSplit 
and OneVsRest for Spark 2.3 already.
** This use case is primarily for _small_ Spark jobs.  E.g., fitting a bunch of 
small models requires a bunch of small jobs.  Each job needs to be 
lightweight/fast in order to get much benefit from running parallel jobs.
* Single Spark jobs for fitting multiple models in a clever, model-specific way
** This is what is used by Deep Learning Pipelines and what we'd like to do 
more of in the future.
** This use case is primarily for _large_ Spark jobs.  E.g., for DLP, the Spark 
job includes a bunch of tasks, and each task is sizable since it fits a model 
in Keras.

Assuming we can reasonably divide the use cases of this fitMultiple() API into 
these 2 types, then we don't need to worry about dependencies between 
Callables.  We only need to worry about dependencies when users use parallelism 
> 1 with the 2nd type of use case, which we can advise against in the 
documentation for the parallelism Param.

What do you think?

> Fix model-specific optimization support for ML tuning
> -
>
> Key: SPARK-22126
> URL: https://issues.apache.org/jira/browse/SPARK-22126
> Project: Spark
>  Issue Type: Improvement
>  Components: ML
>Affects Versions: 2.3.0
>Reporter: Weichen Xu
>
> Fix model-specific optimization support for ML tuning. This is discussed in 
> SPARK-19357
> more discussion is here
>  https://gist.github.com/MrBago/f501b9e7712dc6a67dc9fea24e309bf0
> I copy discussion from gist to here:
> I propose to design API as:
> {code}
> def fitCallables(dataset: Dataset[_], paramMaps: Array[ParamMap]): 
> Array[Callable[Map[Int, M]]]
> {code}
> Let me use an example to explain the API:
> {quote}
>  It could be possible to still use the current parallelism and still allow 
> for model-specific optimizations. For example, if we doing cross validation 
> and have a param map with regParam = (0.1, 0.3) and maxIter = (5, 10). Lets 
> say that the cross validator could know that maxIter is optimized for the 
> model being evaluated (e.g. a new method in Estimator that return such 
> params). It would then be straightforward for the cross validator to remove 
> maxIter from the param map that will be parallelized over and use it to 
> create 2 arrays of paramMaps: ((regParam=0.1, maxIter=5), (regParam=0.1, 
> maxIter=10)) and ((regParam=0.3, maxIter=5), (regParam=0.3, maxIter=10)).
> {quote}
> In this example, we can see that, models computed from ((regParam=0.1, 
> maxIter=5), (regParam=0.1, maxIter=10)) can only be computed in one thread 
> code, models computed from ((regParam=0.3, maxIter=5), (regParam=0.3, 
> maxIter=10))  in another thread. In this example, there're 4 paramMaps, but 
> we can at most generate two threads to compute the models for them.
> The API above allow "callable.call()" to return multiple models, and return 
> type is {code}Map[Int, M]{code}, key is integer, used to mark the paramMap 
> index for corresponding model. Use the example above, there're 4 paramMaps, 
> but only return 2 callable objects, one callable object for ((regParam=0.1, 
> maxIter=5), (regParam=0.1, maxIter=10)), another one for ((regParam=0.3, 
> maxIter=5), (regParam=0.3, maxIter=10)).
> and the default "fitCallables/fit with paramMaps" can be implemented as 
> following:
> {code}
> def fitCallables(dataset: Dataset[_], paramMaps: Array[ParamMap]):
> Array[Callable[Map[Int, M]]] = {
>   paramMaps.zipWithIndex.map { case (paramMap: ParamMap, index: Int) =>
> new Callable[Map[Int, M]] {
>   override def call(): Map[Int, M] = Map(index -> fit(dataset, paramMap))
> }
>   }
> }
> def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): Seq[M] = {
>fitCallables(dataset, paramMaps).map { _.call().toSeq }
>  .flatMap(_).sortBy(_._1).map(_._2)
> }
> {code}
> If use the API I proposed above, the code in 
> [CrossValidation|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala#L149-L159]
> can be changed to:
> {code}
>   val trainingDataset = sparkSession.createDataFrame(training, 
> schema).cache()
>   val validationDataset = sparkSession.createDataFrame(validation, 
> schema).cache()
>   // Fit models in a Future for training in parallel
>   val modelMapFutures = fitCallab

[jira] [Commented] (SPARK-22126) Fix model-specific optimization support for ML tuning

2017-12-05 Thread Weichen Xu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16279549#comment-16279549
 ] 

Weichen Xu commented on SPARK-22126:


[~bago.amirbekian] hmm... directly use java callable looks fine too. We can 
define less trait.

> Fix model-specific optimization support for ML tuning
> -
>
> Key: SPARK-22126
> URL: https://issues.apache.org/jira/browse/SPARK-22126
> Project: Spark
>  Issue Type: Improvement
>  Components: ML
>Affects Versions: 2.3.0
>Reporter: Weichen Xu
>
> Fix model-specific optimization support for ML tuning. This is discussed in 
> SPARK-19357
> more discussion is here
>  https://gist.github.com/MrBago/f501b9e7712dc6a67dc9fea24e309bf0



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22126) Fix model-specific optimization support for ML tuning

2017-12-05 Thread Weichen Xu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16279548#comment-16279548
 ] 

Weichen Xu commented on SPARK-22126:


[~bago.amirbekian] hmm... directly use java callable looks fine too. We can 
define less trait.

> Fix model-specific optimization support for ML tuning
> -
>
> Key: SPARK-22126
> URL: https://issues.apache.org/jira/browse/SPARK-22126
> Project: Spark
>  Issue Type: Improvement
>  Components: ML
>Affects Versions: 2.3.0
>Reporter: Weichen Xu
>
> Fix model-specific optimization support for ML tuning. This is discussed in 
> SPARK-19357
> more discussion is here
>  https://gist.github.com/MrBago/f501b9e7712dc6a67dc9fea24e309bf0



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22126) Fix model-specific optimization support for ML tuning

2017-12-05 Thread Bago Amirbekian (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16279537#comment-16279537
 ] 

Bago Amirbekian commented on SPARK-22126:
-

[~WeichenXu123] Sorry I misunderstood, I thought you wanted to use 
java.concurrent.Callable.

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Callable.html

> Fix model-specific optimization support for ML tuning
> -
>
> Key: SPARK-22126
> URL: https://issues.apache.org/jira/browse/SPARK-22126
> Project: Spark
>  Issue Type: Improvement
>  Components: ML
>Affects Versions: 2.3.0
>Reporter: Weichen Xu
>
> Fix model-specific optimization support for ML tuning. This is discussed in 
> SPARK-19357
> more discussion is here
>  https://gist.github.com/MrBago/f501b9e7712dc6a67dc9fea24e309bf0



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22126) Fix model-specific optimization support for ML tuning

2017-12-05 Thread Weichen Xu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16279529#comment-16279529
 ] 

Weichen Xu commented on SPARK-22126:


[~bago.amirbekian] Small correction: the callable definition can be
{code}
trait Callable[M] {
   def call(): Map[Int, M]
}
{code}
so "fitMultiple" return type can still be `Array[Callable[M]`, I think we do 
not need to make the generic type to be too complicated.

> Fix model-specific optimization support for ML tuning
> -
>
> Key: SPARK-22126
> URL: https://issues.apache.org/jira/browse/SPARK-22126
> Project: Spark
>  Issue Type: Improvement
>  Components: ML
>Affects Versions: 2.3.0
>Reporter: Weichen Xu
>
> Fix model-specific optimization support for ML tuning. This is discussed in 
> SPARK-19357
> more discussion is here
>  https://gist.github.com/MrBago/f501b9e7712dc6a67dc9fea24e309bf0



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22126) Fix model-specific optimization support for ML tuning

2017-12-05 Thread Bago Amirbekian (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16279218#comment-16279218
 ] 

Bago Amirbekian commented on SPARK-22126:
-

I started a discussion about potential to this issue on this gist. I'm going to 
summarize the gist here here and encourage further discussion to take place on 
this JIRA to increase visibility of the discussion.

I proposed that we add a new method to the `Estimator` interface 
`fitMultiple(dataset, paramMaps): Array[Callable[Model]]`. The purpose of this 
method is to allow estimators to implement model specific optimizations for 
fitting each model with multiple paramMaps. This API will also be use by 
`CrossValidator` and other meta transformers when fitting multiple models in 
parallel.

[~WeichenXu123] suggested modifying the API to `fitMultiple(dataset: 
Dataset[_], paramMaps: Array[ParamMap]): Array[Callable[Map[Int, M]]]`. The 
reasoning is that allowing each callable to return multiple models will make it 
easier to efficiently schedule these tasks in parallel (eg we will avoid 
scheduling A and B where B simply waits on A).

> Fix model-specific optimization support for ML tuning
> -
>
> Key: SPARK-22126
> URL: https://issues.apache.org/jira/browse/SPARK-22126
> Project: Spark
>  Issue Type: Improvement
>  Components: ML
>Affects Versions: 2.3.0
>Reporter: Weichen Xu
>
> Fix model-specific optimization support for ML tuning. This is discussed in 
> SPARK-19357
> more discussion is here
>  https://gist.github.com/MrBago/f501b9e7712dc6a67dc9fea24e309bf0



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22126) Fix model-specific optimization support for ML tuning

2017-09-26 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16180583#comment-16180583
 ] 

Apache Spark commented on SPARK-22126:
--

User 'WeichenXu123' has created a pull request for this issue:
https://github.com/apache/spark/pull/19350

> Fix model-specific optimization support for ML tuning
> -
>
> Key: SPARK-22126
> URL: https://issues.apache.org/jira/browse/SPARK-22126
> Project: Spark
>  Issue Type: Improvement
>  Components: ML
>Affects Versions: 2.3.0
>Reporter: Weichen Xu
>
> Fix model-specific optimization support for ML tuning. This is discussed in 
> SPARK-19357



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org