This is an automated email from the ASF dual-hosted git repository. zsxwing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 7074e4fee7e [SPARK-41040][SS] Fix self-union streaming query failure when using readStream.table 7074e4fee7e is described below commit 7074e4fee7e6944013cfaa3c0c2a1458cce8a72d Author: Shixiong Zhu <zsxw...@gmail.com> AuthorDate: Tue Nov 8 08:31:24 2022 -0800 [SPARK-41040][SS] Fix self-union streaming query failure when using readStream.table ### What changes were proposed in this pull request? https://github.com/apache/spark/pull/36963 added a check to disallow any source setting `CatalogTable` in the batch plan. However, this check is not safe to enforce: - In a self-union query, the batch plan created by the source will be shared by multiple nodes in the plan. When we transform the plan, the batch plan will be visited multiple times. Hence, the first visit will set the `CatalogTable` and the second visit will try to set it again and fail the query. - A source built by arbitrary developers can set `CatalogTable` in the batch plan. We should not fail as it would break an existing source. This PR fixes the issue by removing the check and set `CatalogTable` only if the batch plan doesn't have one. ### Why are the changes needed? Fix a bug in master. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? The new added unit test Closes #38553 from zsxwing/SPARK-41040. Authored-by: Shixiong Zhu <zsxw...@gmail.com> Signed-off-by: Shixiong Zhu <zsxw...@gmail.com> --- .../sql/execution/streaming/MicroBatchExecution.scala | 18 +++++++++++++++--- .../sql/streaming/test/DataStreamTableAPISuite.scala | 13 +++++++++++++ 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index 7ed19b35114..051e45c71e6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -621,10 +621,22 @@ class MicroBatchExecution( if (hasFileMetadata) { newRelation = newRelation.withMetadataColumns() } - catalogTable.foreach { table => - assert(newRelation.catalogTable.isEmpty, + // If the catalog table is not set in the batch plan generated by the source, we will + // pick up the one from `StreamingExecutionRelation`. Otherwise, we will skip this + // step. The skipping can happen in the following cases: + // - We re-visit the same `StreamingExecutionRelation`. For example, self-union will + // share the same `StreamingExecutionRelation` and `transform` will visit it twice. + // This is safe to skip. + // - A source that sets the catalog table explicitly. We will pick up the one provided + // by the source directly to maintain the same behavior. + if (newRelation.catalogTable.isEmpty) { + catalogTable.foreach { table => + newRelation = newRelation.copy(catalogTable = Some(table)) + } + } else if (catalogTable.exists(_ ne newRelation.catalogTable.get)) { + // Output a warning if `catalogTable` is provided by the source rather than engine + logWarning( s"Source $source should not produce the information of catalog table by its own.") - newRelation = newRelation.copy(catalogTable = Some(table)) } newRelation } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala index 0d1242fbb19..6bbf2239dbf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala @@ -484,6 +484,19 @@ class DataStreamTableAPISuite extends StreamTest with BeforeAndAfter { } } + test("SPARK-41040: self-union using readStream.table should not fail") { + withTable("self_union_table") { + spark.range(10).write.format("parquet").saveAsTable("self_union_table") + val df = spark.readStream.format("parquet").table("self_union_table") + val q = df.union(df).writeStream.format("noop").start() + try { + q.processAllAvailable() + } finally { + q.stop() + } + } + } + private def checkForStreamTable(dir: Option[File], tableName: String): Unit = { val memory = MemoryStream[Int] val dsw = memory.toDS().writeStream.format("parquet") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org