spark git commit: [SPARK-16350][SQL] Fix support for incremental planning in wirteStream.foreach()
Repository: spark Updated Branches: refs/heads/branch-2.0 24933355c -> cbfd94eac [SPARK-16350][SQL] Fix support for incremental planning in wirteStream.foreach() ## What changes were proposed in this pull request? There are cases where `complete` output mode does not output updated aggregated value; for details please refer to [SPARK-16350](https://issues.apache.org/jira/browse/SPARK-16350). The cause is that, as we do `data.as[T].foreachPartition { iter => ... }` in `ForeachSink.addBatch()`, `foreachPartition()` does not support incremental planning for now. This patches makes `foreachPartition()` support incremental planning in `ForeachSink`, by making a special version of `Dataset` with its `rdd()` method supporting incremental planning. ## How was this patch tested? Added a unit test which failed before the change Author: Liwei LinCloses #14030 from lw-lin/fix-foreach-complete. (cherry picked from commit 0f7175def985a7f1e37198680f893e749612ab76) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cbfd94ea Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cbfd94ea Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cbfd94ea Branch: refs/heads/branch-2.0 Commit: cbfd94eacf46b61011f1bd8d30f0c134cab37b09 Parents: 2493335 Author: Liwei Lin Authored: Thu Jul 7 10:40:42 2016 -0700 Committer: Shixiong Zhu Committed: Thu Jul 7 10:40:52 2016 -0700 -- .../sql/execution/streaming/ForeachSink.scala | 40 - .../streaming/IncrementalExecution.scala| 4 +- .../execution/streaming/ForeachSinkSuite.scala | 86 ++-- 3 files changed, 117 insertions(+), 13 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/cbfd94ea/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala index 14b9b1c..082664a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala @@ -18,7 +18,9 @@ package org.apache.spark.sql.execution.streaming import org.apache.spark.TaskContext -import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, Dataset, Encoder, ForeachWriter} +import org.apache.spark.sql.catalyst.plans.logical.CatalystSerde /** * A [[Sink]] that forwards all data into [[ForeachWriter]] according to the contract defined by @@ -30,7 +32,41 @@ import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter} class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with Serializable { override def addBatch(batchId: Long, data: DataFrame): Unit = { -data.as[T].foreachPartition { iter => +// TODO: Refine this method when SPARK-16264 is resolved; see comments below. + +// This logic should've been as simple as: +// ``` +// data.as[T].foreachPartition { iter => ... } +// ``` +// +// Unfortunately, doing that would just break the incremental planing. The reason is, +// `Dataset.foreachPartition()` would further call `Dataset.rdd()`, but `Dataset.rdd()` just +// does not support `IncrementalExecution`. +// +// So as a provisional fix, below we've made a special version of `Dataset` with its `rdd()` +// method supporting incremental planning. But in the long run, we should generally make newly +// created Datasets use `IncrementalExecution` where necessary (which is SPARK-16264 tries to +// resolve). + +val datasetWithIncrementalExecution = + new Dataset(data.sparkSession, data.logicalPlan, implicitly[Encoder[T]]) { +override lazy val rdd: RDD[T] = { + val objectType = exprEnc.deserializer.dataType + val deserialized = CatalystSerde.deserialize[T](logicalPlan) + + // was originally: sparkSession.sessionState.executePlan(deserialized) ... + val incrementalExecution = new IncrementalExecution( +this.sparkSession, +deserialized, +data.queryExecution.asInstanceOf[IncrementalExecution].outputMode, + data.queryExecution.asInstanceOf[IncrementalExecution].checkpointLocation, + data.queryExecution.asInstanceOf[IncrementalExecution].currentBatchId) + incrementalExecution.toRdd.mapPartitions { rows => +rows.map(_.get(0, objectType)) +
spark git commit: [SPARK-16350][SQL] Fix support for incremental planning in wirteStream.foreach()
Repository: spark Updated Branches: refs/heads/master a04cab8f1 -> 0f7175def [SPARK-16350][SQL] Fix support for incremental planning in wirteStream.foreach() ## What changes were proposed in this pull request? There are cases where `complete` output mode does not output updated aggregated value; for details please refer to [SPARK-16350](https://issues.apache.org/jira/browse/SPARK-16350). The cause is that, as we do `data.as[T].foreachPartition { iter => ... }` in `ForeachSink.addBatch()`, `foreachPartition()` does not support incremental planning for now. This patches makes `foreachPartition()` support incremental planning in `ForeachSink`, by making a special version of `Dataset` with its `rdd()` method supporting incremental planning. ## How was this patch tested? Added a unit test which failed before the change Author: Liwei LinCloses #14030 from lw-lin/fix-foreach-complete. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0f7175de Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0f7175de Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0f7175de Branch: refs/heads/master Commit: 0f7175def985a7f1e37198680f893e749612ab76 Parents: a04cab8 Author: Liwei Lin Authored: Thu Jul 7 10:40:42 2016 -0700 Committer: Shixiong Zhu Committed: Thu Jul 7 10:40:42 2016 -0700 -- .../sql/execution/streaming/ForeachSink.scala | 40 - .../streaming/IncrementalExecution.scala| 4 +- .../execution/streaming/ForeachSinkSuite.scala | 86 ++-- 3 files changed, 117 insertions(+), 13 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0f7175de/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala index 14b9b1c..082664a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala @@ -18,7 +18,9 @@ package org.apache.spark.sql.execution.streaming import org.apache.spark.TaskContext -import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, Dataset, Encoder, ForeachWriter} +import org.apache.spark.sql.catalyst.plans.logical.CatalystSerde /** * A [[Sink]] that forwards all data into [[ForeachWriter]] according to the contract defined by @@ -30,7 +32,41 @@ import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter} class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with Serializable { override def addBatch(batchId: Long, data: DataFrame): Unit = { -data.as[T].foreachPartition { iter => +// TODO: Refine this method when SPARK-16264 is resolved; see comments below. + +// This logic should've been as simple as: +// ``` +// data.as[T].foreachPartition { iter => ... } +// ``` +// +// Unfortunately, doing that would just break the incremental planing. The reason is, +// `Dataset.foreachPartition()` would further call `Dataset.rdd()`, but `Dataset.rdd()` just +// does not support `IncrementalExecution`. +// +// So as a provisional fix, below we've made a special version of `Dataset` with its `rdd()` +// method supporting incremental planning. But in the long run, we should generally make newly +// created Datasets use `IncrementalExecution` where necessary (which is SPARK-16264 tries to +// resolve). + +val datasetWithIncrementalExecution = + new Dataset(data.sparkSession, data.logicalPlan, implicitly[Encoder[T]]) { +override lazy val rdd: RDD[T] = { + val objectType = exprEnc.deserializer.dataType + val deserialized = CatalystSerde.deserialize[T](logicalPlan) + + // was originally: sparkSession.sessionState.executePlan(deserialized) ... + val incrementalExecution = new IncrementalExecution( +this.sparkSession, +deserialized, +data.queryExecution.asInstanceOf[IncrementalExecution].outputMode, + data.queryExecution.asInstanceOf[IncrementalExecution].checkpointLocation, + data.queryExecution.asInstanceOf[IncrementalExecution].currentBatchId) + incrementalExecution.toRdd.mapPartitions { rows => +rows.map(_.get(0, objectType)) + }.asInstanceOf[RDD[T]] +} + } +datasetWithIncrementalExecution.foreachPartition { iter => if