This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push: new e87d166a81c [SPARK-46062][SQL] Sync the isStreaming flag between CTE definition and reference e87d166a81c is described below commit e87d166a81c620c15cc94dfb15c17c9cacbbc9b6 Author: Jungtaek Lim <kabhwan.opensou...@gmail.com> AuthorDate: Thu Nov 23 22:32:16 2023 +0900 [SPARK-46062][SQL] Sync the isStreaming flag between CTE definition and reference This PR proposes to sync the flag `isStreaming` from CTE definition to CTE reference. The essential issue is that CTE reference node cannot determine the flag `isStreaming` by itself, and never be able to have a proper value and always takes the default as it does not have a parameter in constructor. The other flag `resolved` is handled, and we need to do the same for `isStreaming`. Once we add the parameter to the constructor, we will also need to make sure the flag is in sync with CTE definition. We have a rule `ResolveWithCTE` doing the sync, hence we add the logic to sync the flag `isStreaming` as well. The bug may impact some rules which behaves differently depending on isStreaming flag. It would no longer be a problem once CTE reference is replaced with CTE definition at some point in "optimization phase", but all rules in analyzer and optimizer being triggered before the rule takes effect may misbehave based on incorrect isStreaming flag. No. New UT. No. Closes #43966 from HeartSaVioR/SPARK-46062. Authored-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> (cherry picked from commit 43046631a5d4ac7201361a00473cc87fa52ab5a7) Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../sql/catalyst/analysis/CTESubstitution.scala | 2 +- .../sql/catalyst/analysis/ResolveWithCTE.scala | 2 +- .../catalyst/optimizer/MergeScalarSubqueries.scala | 3 +- ...ushdownPredicatesAndPruneColumnsForCTEDef.scala | 2 +- .../plans/logical/basicLogicalOperators.scala | 1 + .../sql/catalyst/analysis/AnalysisSuite.scala | 15 +++++++ .../optimizer/MergeScalarSubqueriesSuite.scala | 3 +- .../spark/sql/streaming/StreamingQuerySuite.scala | 47 +++++++++++++++++++++- 8 files changed, 69 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala index 77c687843c3..f047483b20f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala @@ -261,7 +261,7 @@ object CTESubstitution extends Rule[LogicalPlan] { d.child } else { // Add a `SubqueryAlias` for hint-resolving rules to match relation names. - SubqueryAlias(table, CTERelationRef(d.id, d.resolved, d.output)) + SubqueryAlias(table, CTERelationRef(d.id, d.resolved, d.output, d.isStreaming)) } }.getOrElse(u) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala index 78b776f12f0..f1077378b2d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala @@ -51,7 +51,7 @@ object ResolveWithCTE extends Rule[LogicalPlan] { case ref: CTERelationRef if !ref.resolved => cteDefMap.get(ref.cteId).map { cteDef => - CTERelationRef(cteDef.id, cteDef.resolved, cteDef.output) + CTERelationRef(cteDef.id, cteDef.resolved, cteDef.output, cteDef.isStreaming) }.getOrElse { ref } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala index 6184160829b..ff0bc5e66d7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala @@ -381,7 +381,8 @@ object MergeScalarSubqueries extends Rule[LogicalPlan] { val subqueryCTE = header.plan.asInstanceOf[CTERelationDef] GetStructField( ScalarSubquery( - CTERelationRef(subqueryCTE.id, _resolved = true, subqueryCTE.output), + CTERelationRef(subqueryCTE.id, _resolved = true, subqueryCTE.output, + subqueryCTE.isStreaming), exprId = ssr.exprId), ssr.headerIndex) } else { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushdownPredicatesAndPruneColumnsForCTEDef.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushdownPredicatesAndPruneColumnsForCTEDef.scala index f351ba0b39a..41859673616 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushdownPredicatesAndPruneColumnsForCTEDef.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushdownPredicatesAndPruneColumnsForCTEDef.scala @@ -141,7 +141,7 @@ object PushdownPredicatesAndPruneColumnsForCTEDef extends Rule[LogicalPlan] { cteDef } - case cteRef @ CTERelationRef(cteId, _, output, _) => + case cteRef @ CTERelationRef(cteId, _, output, _, _) => val (cteDef, _, _, newAttrSet) = cteMap(cteId) if (newAttrSet.size < output.size) { val indices = newAttrSet.toSeq.map(cteDef.output.indexOf) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index b5a2f097424..d775b72a5da 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -837,6 +837,7 @@ case class CTERelationRef( cteId: Long, _resolved: Boolean, override val output: Seq[Attribute], + override val isStreaming: Boolean, statsOpt: Option[Statistics] = None) extends LeafNode with MultiInstanceRelation { final override val nodePatterns: Seq[TreePattern] = Seq(CTE) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index 9d51c41a6d8..8d9f9abc00b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -1466,4 +1466,19 @@ class AnalysisSuite extends AnalysisTest with Matchers { // EventTimeWatermark node is NOT eliminated. assert(analyzed.exists(_.isInstanceOf[EventTimeWatermark])) } + + test("SPARK-46062: isStreaming flag is synced from CTE definition to CTE reference") { + val cteDef = CTERelationDef(streamingRelation.select($"a", $"ts")) + // Intentionally marking the flag _resolved to false, so that analyzer has a chance to sync + // the flag isStreaming on syncing the flag _resolved. + val cteRef = CTERelationRef(cteDef.id, _resolved = false, Nil, isStreaming = false) + val plan = WithCTE(cteRef, Seq(cteDef)).analyze + + val refs = plan.collect { + case r: CTERelationRef => r + } + assert(refs.length == 1) + assert(refs.head.resolved) + assert(refs.head.isStreaming) + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueriesSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueriesSuite.scala index 8af0e02855b..13e13841478 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueriesSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueriesSuite.scala @@ -42,7 +42,8 @@ class MergeScalarSubqueriesSuite extends PlanTest { } private def extractorExpression(cteIndex: Int, output: Seq[Attribute], fieldIndex: Int) = { - GetStructField(ScalarSubquery(CTERelationRef(cteIndex, _resolved = true, output)), fieldIndex) + GetStructField(ScalarSubquery( + CTERelationRef(cteIndex, _resolved = true, output, isStreaming = false)), fieldIndex) .as("scalarsubquery()") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index b889ac18974..da9b579b397 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -39,7 +39,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Dataset, Row, SaveMode} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Literal, Rand, Randn, Shuffle, Uuid} -import org.apache.spark.sql.catalyst.plans.logical.LocalRelation +import org.apache.spark.sql.catalyst.plans.logical.{CTERelationDef, CTERelationRef, LocalRelation} import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.Complete import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2} @@ -1317,6 +1317,51 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi } } + test("SPARK-46062: streaming query reading from CTE, which refers to temp view from " + + "streaming source") { + val inputStream = MemoryStream[Int] + inputStream.toDF().createOrReplaceTempView("tv") + + val df = spark.sql( + """ + |WITH w as ( + | SELECT * FROM tv + |) + |SELECT value from w + |""".stripMargin) + + testStream(df)( + AddData(inputStream, 1, 2, 3), + CheckAnswer(1, 2, 3), + Execute { q => + var isStreamingForCteDef: Option[Boolean] = None + var isStreamingForCteRef: Option[Boolean] = None + + q.analyzedPlan.foreach { + case d: CTERelationDef => + assert(d.resolved, "The definition node must be resolved after analysis.") + isStreamingForCteDef = Some(d.isStreaming) + + case d: CTERelationRef => + assert(d.resolved, "The reference node must be marked as resolved after analysis.") + isStreamingForCteRef = Some(d.isStreaming) + + case _ => + } + + assert(isStreamingForCteDef.isDefined && isStreamingForCteRef.isDefined, + "Both definition and reference for CTE should be available in analyzed plan.") + + assert(isStreamingForCteDef.get, "Expected isStreaming=true for CTE definition, but " + + "isStreaming is set to false.") + + assert(isStreamingForCteDef === isStreamingForCteRef, + "isStreaming flag should be carried over from definition to reference, " + + s"definition: ${isStreamingForCteDef.get}, reference: ${isStreamingForCteRef.get}.") + } + ) + } + private def checkExceptionMessage(df: DataFrame): Unit = { withTempDir { outputDir => withTempDir { checkpointDir => --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org