This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 7074e4fee7e [SPARK-41040][SS] Fix self-union streaming query failure 
when using readStream.table
7074e4fee7e is described below

commit 7074e4fee7e6944013cfaa3c0c2a1458cce8a72d
Author: Shixiong Zhu <zsxw...@gmail.com>
AuthorDate: Tue Nov 8 08:31:24 2022 -0800

    [SPARK-41040][SS] Fix self-union streaming query failure when using 
readStream.table
    
    ### What changes were proposed in this pull request?
    
    https://github.com/apache/spark/pull/36963 added a check to disallow any 
source setting `CatalogTable` in the batch plan. However, this check is not 
safe to enforce:
    
    - In a self-union query, the batch plan created by the source will be 
shared by multiple nodes in the plan. When we transform the plan, the batch 
plan will be visited multiple times. Hence, the first visit will set the 
`CatalogTable` and the second visit will try to set it again and fail the query.
    - A source built by arbitrary developers can set `CatalogTable` in the 
batch plan. We should not fail as it would break an existing source.
    
    This PR fixes the issue by removing the check and set `CatalogTable` only 
if the batch plan doesn't have one.
    
    ### Why are the changes needed?
    
    Fix a bug in master.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    The new added unit test
    
    Closes #38553 from zsxwing/SPARK-41040.
    
    Authored-by: Shixiong Zhu <zsxw...@gmail.com>
    Signed-off-by: Shixiong Zhu <zsxw...@gmail.com>
---
 .../sql/execution/streaming/MicroBatchExecution.scala  | 18 +++++++++++++++---
 .../sql/streaming/test/DataStreamTableAPISuite.scala   | 13 +++++++++++++
 2 files changed, 28 insertions(+), 3 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index 7ed19b35114..051e45c71e6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -621,10 +621,22 @@ class MicroBatchExecution(
               if (hasFileMetadata) {
                 newRelation = newRelation.withMetadataColumns()
               }
-              catalogTable.foreach { table =>
-                assert(newRelation.catalogTable.isEmpty,
+              // If the catalog table is not set in the batch plan generated 
by the source, we will
+              // pick up the one from `StreamingExecutionRelation`. Otherwise, 
we will skip this
+              // step. The skipping can happen in the following cases:
+              // - We re-visit the same `StreamingExecutionRelation`. For 
example, self-union will
+              //   share the same `StreamingExecutionRelation` and `transform` 
will visit it twice.
+              //   This is safe to skip.
+              // - A source that sets the catalog table explicitly. We will 
pick up the one provided
+              //   by the source directly to maintain the same behavior.
+              if (newRelation.catalogTable.isEmpty) {
+                catalogTable.foreach { table =>
+                  newRelation = newRelation.copy(catalogTable = Some(table))
+                }
+              } else if (catalogTable.exists(_ ne 
newRelation.catalogTable.get)) {
+                // Output a warning if `catalogTable` is provided by the 
source rather than engine
+                logWarning(
                   s"Source $source should not produce the information of 
catalog table by its own.")
-                newRelation = newRelation.copy(catalogTable = Some(table))
               }
               newRelation
           }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
index 0d1242fbb19..6bbf2239dbf 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
@@ -484,6 +484,19 @@ class DataStreamTableAPISuite extends StreamTest with 
BeforeAndAfter {
     }
   }
 
+  test("SPARK-41040: self-union using readStream.table should not fail") {
+    withTable("self_union_table") {
+      spark.range(10).write.format("parquet").saveAsTable("self_union_table")
+      val df = spark.readStream.format("parquet").table("self_union_table")
+      val q = df.union(df).writeStream.format("noop").start()
+      try {
+        q.processAllAvailable()
+      } finally {
+        q.stop()
+      }
+    }
+  }
+
   private def checkForStreamTable(dir: Option[File], tableName: String): Unit 
= {
     val memory = MemoryStream[Int]
     val dsw = memory.toDS().writeStream.format("parquet")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to