GitHub user liancheng opened a pull request:

    https://github.com/apache/spark/pull/15651

    [SPARK-17972][SQL] Add Dataset.checkpoint() to truncate large query plans

    ## What changes were proposed in this pull request?
    
    ### Problem
    
    Iterative ML code may easily create query plans that grow exponentially. We 
found that query planning time also increases exponentially even when all the 
sub-plan trees are cached.
    
    The following snippet illustrates the problem:
    
    ```scala
    (0 until 6).foldLeft(Seq(1, 2, 3).toDS) { (plan, iteration) =>
      println(s"== Iteration $iteration ==")
      val time0 = System.currentTimeMillis()
      val joined = plan.join(plan, "value").join(plan, "value").join(plan, 
"value").join(plan, "value")
      joined.cache()
      println(s"Query planning takes ${System.currentTimeMillis() - time0} ms")
      joined.as[Int]
    }
    
    // == Iteration 0 ==
    // Query planning takes 9 ms
    // == Iteration 1 ==
    // Query planning takes 26 ms
    // == Iteration 2 ==
    // Query planning takes 53 ms
    // == Iteration 3 ==
    // Query planning takes 163 ms
    // == Iteration 4 ==
    // Query planning takes 700 ms
    // == Iteration 5 ==
    // Query planning takes 3418 ms
    ```
    
    This is because when building a new Dataset, the new plan is always built 
upon `QueryExecution.analyzed`, which doesn't leverage existing cached plans.
    
    On the other hand, usually, doing caching every a few iterations may not be 
the right direction for this problem since caching is too memory consuming 
(imaging computing connected components over a graph with 50 billion nodes). 
What we really need here is to truncate both the query plan (to minimize query 
planning time) and the lineage of the underlying RDD (to avoid stack overflow).
    
    ### Changes introduced in this PR
    
    This PR tries to fix this issue by introducing a `checkpoint()` method into 
`Dataset[T]`, which does exactly the things described above. The following 
snippet, which is essentially the same as the one above but invokes 
`checkpoint()` instead of `cache()`, shows the micro benchmark result of this 
PR:
    
    One key point is that the checkpointed Dataset should preserve the origianl 
partitioning and ordering information of the original Dataset, so that we can 
avoid unnecessary shuffling (similar to reading from a pre-bucketed table).
    
    ### Micro benchmark
    
    ```scala
    spark.sparkContext.setCheckpointDir("/tmp/cp")
    
    (0 until 100).foldLeft(Seq(1, 2, 3).toDS) { (plan, iteration) =>
      println(s"== Iteration $iteration ==")
      val time0 = System.currentTimeMillis()
      val cp = plan.checkpoint()
      cp.count()
      System.out.println(s"Checkpointing takes ${System.currentTimeMillis() - 
time0} ms")
    
      val time1 = System.currentTimeMillis()
      val joined = cp.join(cp, "value").join(cp, "value").join(cp, 
"value").join(cp, "value")
      val result = joined.as[Int]
    
      println(s"Query planning takes ${System.currentTimeMillis() - time1} ms")
      result
    }
    
    // == Iteration 0 ==
    // Checkpointing takes 591 ms
    // Query planning takes 13 ms
    // == Iteration 1 ==
    // Checkpointing takes 1605 ms
    // Query planning takes 16 ms
    // == Iteration 2 ==
    // Checkpointing takes 782 ms
    // Query planning takes 8 ms
    // == Iteration 3 ==
    // Checkpointing takes 729 ms
    // Query planning takes 10 ms
    // == Iteration 4 ==
    // Checkpointing takes 734 ms
    // Query planning takes 9 ms
    // == Iteration 5 ==
    // ...
    // == Iteration 50 ==
    // Checkpointing takes 571 ms
    // Query planning takes 7 ms
    // == Iteration 51 ==
    // Checkpointing takes 548 ms
    // Query planning takes 7 ms
    // == Iteration 52 ==
    // Checkpointing takes 596 ms
    // Query planning takes 8 ms
    // == Iteration 53 ==
    // Checkpointing takes 568 ms
    // Query planning takes 7 ms
    // ...
    ```
    
    You may see that although checkpointing is more heavy weight an operation, 
it always takes roughly the same time to perform both checkpointing and query 
planning.
    
    ## How was this patch tested?
    
    Unit test added in `DatasetSuite`.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/liancheng/spark ds-checkpoint

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/15651.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #15651
    
----
commit c7503e3e0676fe286d83beccff4b28c5b6a684dc
Author: Cheng Lian <l...@databricks.com>
Date:   2016-10-26T23:32:08Z

    Add Dataset.checkpoint() to truncate large query plans

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to