gatorsmile commented on a change in pull request #17735: 
[SPARK-20441][SPARK-20432][SS] Within the same streaming query, one 
StreamingRelation should only be transformed to one StreamingExecutionRelation
URL: https://github.com/apache/spark/pull/17735#discussion_r243867264
 
 

 ##########
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ##########
 @@ -148,15 +149,18 @@ class StreamExecution(
       "logicalPlan must be initialized in StreamExecutionThread " +
         s"but the current thread was ${Thread.currentThread}")
     var nextSourceId = 0L
+    val toExecutionRelationMap = MutableMap[StreamingRelation, 
StreamingExecutionRelation]()
     val _logicalPlan = analyzedPlan.transform {
-      case StreamingRelation(dataSource, _, output) =>
-        // Materialize source to avoid creating it in every batch
-        val metadataPath = s"$checkpointRoot/sources/$nextSourceId"
-        val source = dataSource.createSource(metadataPath)
-        nextSourceId += 1
-        // We still need to use the previous `output` instead of 
`source.schema` as attributes in
-        // "df.logicalPlan" has already used attributes of the previous 
`output`.
-        StreamingExecutionRelation(source, output)
+      case streamingRelation@StreamingRelation(dataSource, _, output) =>
+        toExecutionRelationMap.getOrElseUpdate(streamingRelation, {
 
 Review comment:
   Why not using `QueryPlan.sameResult`? Our dedup could break it, right? cc 
@zsxwing @brkyvz 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to