Repository: spark
Updated Branches:
  refs/heads/master 7f96f2d7f -> 27f543b15


[SPARK-20441][SPARK-20432][SS] Within the same streaming query, one 
StreamingRelation should only be transformed to one StreamingExecutionRelation

## What changes were proposed in this pull request?

Within the same streaming query, when one `StreamingRelation` is referred 
multiple times – e.g. `df.union(df)` – we should transform it only to one 
`StreamingExecutionRelation`, instead of two or more different 
`StreamingExecutionRelation`s (each of which would have a separate set of 
source, source logs, ...).

## How was this patch tested?

Added two test cases, each of which would fail without this patch.

Author: Liwei Lin <lwl...@gmail.com>

Closes #17735 from lw-lin/SPARK-20441.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/27f543b1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/27f543b1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/27f543b1

Branch: refs/heads/master
Commit: 27f543b15f2f493f6f8373e46b4c9564b0a1bf81
Parents: 7f96f2d
Author: Liwei Lin <lwl...@gmail.com>
Authored: Wed May 3 08:55:02 2017 -0700
Committer: Burak Yavuz <brk...@gmail.com>
Committed: Wed May 3 08:55:02 2017 -0700

----------------------------------------------------------------------
 .../execution/streaming/StreamExecution.scala   | 20 ++++----
 .../spark/sql/streaming/StreamSuite.scala       | 48 ++++++++++++++++++++
 2 files changed, 60 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/27f543b1/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index affc201..b6ddf74 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}
 import java.util.concurrent.atomic.AtomicReference
 import java.util.concurrent.locks.ReentrantLock
 
+import scala.collection.mutable.{Map => MutableMap}
 import scala.collection.mutable.ArrayBuffer
 import scala.util.control.NonFatal
 
@@ -148,15 +149,18 @@ class StreamExecution(
       "logicalPlan must be initialized in StreamExecutionThread " +
         s"but the current thread was ${Thread.currentThread}")
     var nextSourceId = 0L
+    val toExecutionRelationMap = MutableMap[StreamingRelation, 
StreamingExecutionRelation]()
     val _logicalPlan = analyzedPlan.transform {
-      case StreamingRelation(dataSource, _, output) =>
-        // Materialize source to avoid creating it in every batch
-        val metadataPath = s"$checkpointRoot/sources/$nextSourceId"
-        val source = dataSource.createSource(metadataPath)
-        nextSourceId += 1
-        // We still need to use the previous `output` instead of 
`source.schema` as attributes in
-        // "df.logicalPlan" has already used attributes of the previous 
`output`.
-        StreamingExecutionRelation(source, output)
+      case streamingRelation@StreamingRelation(dataSource, _, output) =>
+        toExecutionRelationMap.getOrElseUpdate(streamingRelation, {
+          // Materialize source to avoid creating it in every batch
+          val metadataPath = s"$checkpointRoot/sources/$nextSourceId"
+          val source = dataSource.createSource(metadataPath)
+          nextSourceId += 1
+          // We still need to use the previous `output` instead of 
`source.schema` as attributes in
+          // "df.logicalPlan" has already used attributes of the previous 
`output`.
+          StreamingExecutionRelation(source, output)
+        })
     }
     sources = _logicalPlan.collect { case s: StreamingExecutionRelation => 
s.source }
     uniqueSources = sources.distinct

http://git-wip-us.apache.org/repos/asf/spark/blob/27f543b1/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index 01ea62a..1fc0629 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -71,6 +71,27 @@ class StreamSuite extends StreamTest {
       CheckAnswer(Row(1, 1, "one"), Row(2, 2, "two"), Row(4, 4, "four")))
   }
 
+  test("SPARK-20432: union one stream with itself") {
+    val df = 
spark.readStream.format(classOf[FakeDefaultSource].getName).load().select("a")
+    val unioned = df.union(df)
+    withTempDir { outputDir =>
+      withTempDir { checkpointDir =>
+        val query =
+          unioned
+            .writeStream.format("parquet")
+            .option("checkpointLocation", checkpointDir.getAbsolutePath)
+            .start(outputDir.getAbsolutePath)
+        try {
+          query.processAllAvailable()
+          val outputDf = spark.read.parquet(outputDir.getAbsolutePath).as[Long]
+          checkDatasetUnorderly[Long](outputDf, (0L to 10L).union((0L to 
10L)).toArray: _*)
+        } finally {
+          query.stop()
+        }
+      }
+    }
+  }
+
   test("union two streams") {
     val inputData1 = MemoryStream[Int]
     val inputData2 = MemoryStream[Int]
@@ -122,6 +143,33 @@ class StreamSuite extends StreamTest {
     assertDF(df)
   }
 
+  test("Within the same streaming query, one StreamingRelation should only be 
transformed to one " +
+    "StreamingExecutionRelation") {
+    val df = spark.readStream.format(classOf[FakeDefaultSource].getName).load()
+    var query: StreamExecution = null
+    try {
+      query =
+        df.union(df)
+          .writeStream
+          .format("memory")
+          .queryName("memory")
+          .start()
+          .asInstanceOf[StreamingQueryWrapper]
+          .streamingQuery
+      query.awaitInitialization(streamingTimeout.toMillis)
+      val executionRelations =
+        query
+          .logicalPlan
+          .collect { case ser: StreamingExecutionRelation => ser }
+      assert(executionRelations.size === 2)
+      assert(executionRelations.distinct.size === 1)
+    } finally {
+      if (query != null) {
+        query.stop()
+      }
+    }
+  }
+
   test("unsupported queries") {
     val streamInput = MemoryStream[Int]
     val batchInput = Seq(1, 2, 3).toDS()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to