GitHub user viirya opened a pull request:

    https://github.com/apache/spark/pull/16785

    [SPARK-19443][SQL] The function to generate constraints takes too long when 
the query plan grows continuously

    ## What changes were proposed in this pull request?
    
    This issue is originally reported and discussed at 
http://apache-spark-developers-list.1001551.n3.nabble.com/SQL-ML-Pipeline-performance-regression-between-1-6-and-2-x-tc20803.html#a20821
    
    When run a ML `Pipeline` with many stages, during the iterative updating to 
`Dataset` , it is observed the it takes longer time to finish the fit and 
transform as the query plan grows continuously.
    
    The example code show as the following in benchmark.
    
    Specially, the time spent on preparing optimized plan in current branch 
(74294 ms) is much higher than 1.6 (292 ms). Actually, the time is spent mostly 
on generating query plan's constraints during few optimization rules.
    
    `getAliasedConstraints` is found to be a function costing most of the 
running time.
    
    This patch tries to rewrite `getAliasedConstraints`. After this patch, the 
time to preparing optimized plan is reduced significantly from 74294 ms to 2573 
ms.
    
    ### Benchmark
    
    Run the following codes locally.
    
        import org.apache.spark.ml.{Pipeline, PipelineStage}
        import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer, 
VectorAssembler}
    
        val df = (1 to 40).foldLeft(Seq((1, "foo"), (2, "bar"), (3, 
"baz")).toDF("id", "x0"))((df, i) => df.withColumn(s"x$i", $"x0"))
    
        val indexers = df.columns.tail.map(c => new StringIndexer()
          .setInputCol(c)
          .setOutputCol(s"${c}_indexed")
          .setHandleInvalid("skip"))
    
        val encoders = indexers.map(indexer => new OneHotEncoder()
          .setInputCol(indexer.getOutputCol)
          .setOutputCol(s"${indexer.getOutputCol}_encoded")
          .setDropLast(true))
    
        val stages: Array[PipelineStage] = indexers ++ encoders
        val pipeline = new Pipeline().setStages(stages)
    
        pipeline.fit(df).transform(df).show
    
    
    ## How was this patch tested?
    
    Jenkins tests.
    
    Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/viirya/spark-1 improve-constraints-generation

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/16785.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #16785
    
----
commit b4e514ade7ea478055db448bbf66f7a88caf3a86
Author: Liang-Chi Hsieh <vii...@gmail.com>
Date:   2017-02-03T07:08:47Z

    Improve the code to generate constraints.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to