spark git commit: [SPARK-16350][SQL] Fix support for incremental planning in wirteStream.foreach()

2016-07-07 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 24933355c -> cbfd94eac


[SPARK-16350][SQL] Fix support for incremental planning in wirteStream.foreach()

## What changes were proposed in this pull request?

There are cases where `complete` output mode does not output updated aggregated 
value; for details please refer to 
[SPARK-16350](https://issues.apache.org/jira/browse/SPARK-16350).

The cause is that, as we do `data.as[T].foreachPartition { iter => ... }` in 
`ForeachSink.addBatch()`, `foreachPartition()` does not support incremental 
planning for now.

This patches makes `foreachPartition()` support incremental planning in 
`ForeachSink`, by making a special version of `Dataset` with its `rdd()` method 
supporting incremental planning.

## How was this patch tested?

Added a unit test which failed before the change

Author: Liwei Lin 

Closes #14030 from lw-lin/fix-foreach-complete.

(cherry picked from commit 0f7175def985a7f1e37198680f893e749612ab76)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cbfd94ea
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cbfd94ea
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cbfd94ea

Branch: refs/heads/branch-2.0
Commit: cbfd94eacf46b61011f1bd8d30f0c134cab37b09
Parents: 2493335
Author: Liwei Lin 
Authored: Thu Jul 7 10:40:42 2016 -0700
Committer: Shixiong Zhu 
Committed: Thu Jul 7 10:40:52 2016 -0700

--
 .../sql/execution/streaming/ForeachSink.scala   | 40 -
 .../streaming/IncrementalExecution.scala|  4 +-
 .../execution/streaming/ForeachSinkSuite.scala  | 86 ++--
 3 files changed, 117 insertions(+), 13 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/cbfd94ea/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
index 14b9b1c..082664a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
@@ -18,7 +18,9 @@
 package org.apache.spark.sql.execution.streaming
 
 import org.apache.spark.TaskContext
-import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, Dataset, Encoder, ForeachWriter}
+import org.apache.spark.sql.catalyst.plans.logical.CatalystSerde
 
 /**
  * A [[Sink]] that forwards all data into [[ForeachWriter]] according to the 
contract defined by
@@ -30,7 +32,41 @@ import org.apache.spark.sql.{DataFrame, Encoder, 
ForeachWriter}
 class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with 
Serializable {
 
   override def addBatch(batchId: Long, data: DataFrame): Unit = {
-data.as[T].foreachPartition { iter =>
+// TODO: Refine this method when SPARK-16264 is resolved; see comments 
below.
+
+// This logic should've been as simple as:
+// ```
+//   data.as[T].foreachPartition { iter => ... }
+// ```
+//
+// Unfortunately, doing that would just break the incremental planing. The 
reason is,
+// `Dataset.foreachPartition()` would further call `Dataset.rdd()`, but 
`Dataset.rdd()` just
+// does not support `IncrementalExecution`.
+//
+// So as a provisional fix, below we've made a special version of 
`Dataset` with its `rdd()`
+// method supporting incremental planning. But in the long run, we should 
generally make newly
+// created Datasets use `IncrementalExecution` where necessary (which is 
SPARK-16264 tries to
+// resolve).
+
+val datasetWithIncrementalExecution =
+  new Dataset(data.sparkSession, data.logicalPlan, implicitly[Encoder[T]]) 
{
+override lazy val rdd: RDD[T] = {
+  val objectType = exprEnc.deserializer.dataType
+  val deserialized = CatalystSerde.deserialize[T](logicalPlan)
+
+  // was originally: 
sparkSession.sessionState.executePlan(deserialized) ...
+  val incrementalExecution = new IncrementalExecution(
+this.sparkSession,
+deserialized,
+data.queryExecution.asInstanceOf[IncrementalExecution].outputMode,
+
data.queryExecution.asInstanceOf[IncrementalExecution].checkpointLocation,
+
data.queryExecution.asInstanceOf[IncrementalExecution].currentBatchId)
+  incrementalExecution.toRdd.mapPartitions { rows =>
+rows.map(_.get(0, objectType))
+  

spark git commit: [SPARK-16350][SQL] Fix support for incremental planning in wirteStream.foreach()

2016-07-07 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master a04cab8f1 -> 0f7175def


[SPARK-16350][SQL] Fix support for incremental planning in wirteStream.foreach()

## What changes were proposed in this pull request?

There are cases where `complete` output mode does not output updated aggregated 
value; for details please refer to 
[SPARK-16350](https://issues.apache.org/jira/browse/SPARK-16350).

The cause is that, as we do `data.as[T].foreachPartition { iter => ... }` in 
`ForeachSink.addBatch()`, `foreachPartition()` does not support incremental 
planning for now.

This patches makes `foreachPartition()` support incremental planning in 
`ForeachSink`, by making a special version of `Dataset` with its `rdd()` method 
supporting incremental planning.

## How was this patch tested?

Added a unit test which failed before the change

Author: Liwei Lin 

Closes #14030 from lw-lin/fix-foreach-complete.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0f7175de
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0f7175de
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0f7175de

Branch: refs/heads/master
Commit: 0f7175def985a7f1e37198680f893e749612ab76
Parents: a04cab8
Author: Liwei Lin 
Authored: Thu Jul 7 10:40:42 2016 -0700
Committer: Shixiong Zhu 
Committed: Thu Jul 7 10:40:42 2016 -0700

--
 .../sql/execution/streaming/ForeachSink.scala   | 40 -
 .../streaming/IncrementalExecution.scala|  4 +-
 .../execution/streaming/ForeachSinkSuite.scala  | 86 ++--
 3 files changed, 117 insertions(+), 13 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0f7175de/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
index 14b9b1c..082664a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
@@ -18,7 +18,9 @@
 package org.apache.spark.sql.execution.streaming
 
 import org.apache.spark.TaskContext
-import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, Dataset, Encoder, ForeachWriter}
+import org.apache.spark.sql.catalyst.plans.logical.CatalystSerde
 
 /**
  * A [[Sink]] that forwards all data into [[ForeachWriter]] according to the 
contract defined by
@@ -30,7 +32,41 @@ import org.apache.spark.sql.{DataFrame, Encoder, 
ForeachWriter}
 class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with 
Serializable {
 
   override def addBatch(batchId: Long, data: DataFrame): Unit = {
-data.as[T].foreachPartition { iter =>
+// TODO: Refine this method when SPARK-16264 is resolved; see comments 
below.
+
+// This logic should've been as simple as:
+// ```
+//   data.as[T].foreachPartition { iter => ... }
+// ```
+//
+// Unfortunately, doing that would just break the incremental planing. The 
reason is,
+// `Dataset.foreachPartition()` would further call `Dataset.rdd()`, but 
`Dataset.rdd()` just
+// does not support `IncrementalExecution`.
+//
+// So as a provisional fix, below we've made a special version of 
`Dataset` with its `rdd()`
+// method supporting incremental planning. But in the long run, we should 
generally make newly
+// created Datasets use `IncrementalExecution` where necessary (which is 
SPARK-16264 tries to
+// resolve).
+
+val datasetWithIncrementalExecution =
+  new Dataset(data.sparkSession, data.logicalPlan, implicitly[Encoder[T]]) 
{
+override lazy val rdd: RDD[T] = {
+  val objectType = exprEnc.deserializer.dataType
+  val deserialized = CatalystSerde.deserialize[T](logicalPlan)
+
+  // was originally: 
sparkSession.sessionState.executePlan(deserialized) ...
+  val incrementalExecution = new IncrementalExecution(
+this.sparkSession,
+deserialized,
+data.queryExecution.asInstanceOf[IncrementalExecution].outputMode,
+
data.queryExecution.asInstanceOf[IncrementalExecution].checkpointLocation,
+
data.queryExecution.asInstanceOf[IncrementalExecution].currentBatchId)
+  incrementalExecution.toRdd.mapPartitions { rows =>
+rows.map(_.get(0, objectType))
+  }.asInstanceOf[RDD[T]]
+}
+  }
+datasetWithIncrementalExecution.foreachPartition { iter =>
   if