[GitHub] spark issue #17332: [SPARK-10764][ML] Add optional caching to Pipelines

2017-03-20 Thread sachintyagi22
Github user sachintyagi22 commented on the issue:

https://github.com/apache/spark/pull/17332
  
cc @mengxr @jkbradley -- Hi, any thoughts on this? Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17332: [SPARK-10764][ML] Add optional caching to Pipelines

2017-03-18 Thread sachintyagi22
Github user sachintyagi22 commented on the issue:

https://github.com/apache/spark/pull/17332
  
cc @jkbradley Please take a look. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17332: [SPARK-10764][ML] Add optional caching to Pipelin...

2017-03-17 Thread sachintyagi22
GitHub user sachintyagi22 opened a pull request:

https://github.com/apache/spark/pull/17332

[SPARK-10764][ML] Add optional caching to Pipelines

This PR is to allow users to persist the output of a particular 
`PipelineStage` in the `Pipeline`. It also allows users to persist only a 
subset of output that is actually needed for the next stage instead of the 
whole big data frame.

For example, while running LDA a pipeline may look something like 
`Tokenizer` -> `StopWordRemover` -> `CountVectorizer` -> `LDA`. Here a user 
might want to `persist` the data frame coming out of `CountVectorizer` so that 
the LDA iterations work on this persisted data. Also, we need the flexibility 
to persist only the subset of columns (say docId, and features) that are needed 
for `LDA` rather than all extra columns that may have been added by the 
previous stages.

Also, another issue is that once the pipeline is fit on a data, for the 
`transform` on the same data it has to pass through all the previous stages 
again. In many cases this can be optimized if we already had access to data 
cached in the intermediate stage. In the example above, it will be useful if 
the data previously persisted in `CountVectorizer` can be reused to get 
`topicDistributions` from the `LDA` on the train data set itself. Otherwise, 
the data needs to *again* pass through all the stages.

For this, the `PipelineStage` exposes following methods
* `persist(level: StorageLevel, colExprs: String*)` - where the user can 
specify storage level and subset of the output dataframe of the stage to be 
persisted.
* `persist(colExprs: String*)` - uses default storage level.
* `getPersistedDf(): Option[DataFrame]` - to retrieve the cached data frame 
for a stage. This will be useful when we only need to run the pipeline from a 
specific stage. For instance, 
`pipelineModel.transform(countVectorizer.getPersistedDf.get)` should only run 
the final `LDA` stage of the pipeline. The persisted dataframe s only available 
once the `Pipeline.fit` has been called.

Changes in `Pipeline`
* The `fit` method checks whether a particular stage is marked to persist 
its output. If yes, it marks the output data frame as persistent according to 
the storage level and column subset. For `Executors` in the stage, it 
associates the persistant dataframe with both the `Executor` as well as the 
resultant `Transformer`.
* The `transformSchema` methods is changes so that the column pruning as 
result of specifying subset columns for persisting is taken into account.

Changes in `PipelineModel`
* `transform(dataset)` method now checks if the dataset argument was 
persisted in any of the stages during `Pipeline.fit`. If yes, it ensures only 
the stages after that stage are run (else all the stages are run). These two 
cases are handled for `transformSchema`.

* Existing tests
* Added new test in `PipelineSuite` to test above scenarios.

## What changes were proposed in this pull request?

(Please fill in changes proposed in this fix)

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration 
tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sachintyagi22/spark pipeline-cache

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17332.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17332


commit 9a617928316df36e0e8d23db44c4ea01d6d1bca0
Author: Sachin Tyagi <sachin.ty...@gmail.com>
Date:   2017-03-17T08:11:01Z

[SPARK-10764][ML] Add optional caching to Pipelines

This PR is to allow users to persist the output of a particular 
`PipelineStage` in the `Pipeline`. It also allows users to persist only a 
subset of output that is actually needed for the next stage instead of the 
whole big data frame.

For example, while running LDA a pipeline may look something like 
`Tokenizer` -> `StopWordRemover` -> `CountVectorizer` -> `LDA`. Here a user 
might want to `persist` the data frame coming out of `CountVectorizer` so that 
the LDA iterations work on this persisted data. Also, we need the flexibility 
to persist only the subset of columns (say docId, and features) that are needed 
for `LDA` rather than all extra columns that may have been added by the 
previous stages.

Also, another issue is that once the pipeline is fit on a data, for the 
`transform` on the same data it has to pass through all the previous stages 
again. In many cases this can be