This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new e87d166a81c [SPARK-46062][SQL] Sync the isStreaming flag between CTE 
definition and reference
e87d166a81c is described below

commit e87d166a81c620c15cc94dfb15c17c9cacbbc9b6
Author: Jungtaek Lim <kabhwan.opensou...@gmail.com>
AuthorDate: Thu Nov 23 22:32:16 2023 +0900

    [SPARK-46062][SQL] Sync the isStreaming flag between CTE definition and 
reference
    
    This PR proposes to sync the flag `isStreaming` from CTE definition to CTE 
reference.
    
    The essential issue is that CTE reference node cannot determine the flag 
`isStreaming` by itself, and never be able to have a proper value and always 
takes the default as it does not have a parameter in constructor. The other 
flag `resolved` is handled, and we need to do the same for `isStreaming`.
    
    Once we add the parameter to the constructor, we will also need to make 
sure the flag is in sync with CTE definition. We have a rule `ResolveWithCTE` 
doing the sync, hence we add the logic to sync the flag `isStreaming` as well.
    
    The bug may impact some rules which behaves differently depending on 
isStreaming flag. It would no longer be a problem once CTE reference is 
replaced with CTE definition at some point in "optimization phase", but all 
rules in analyzer and optimizer being triggered before the rule takes effect 
may misbehave based on incorrect isStreaming flag.
    
    No.
    
    New UT.
    
    No.
    
    Closes #43966 from HeartSaVioR/SPARK-46062.
    
    Authored-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
    (cherry picked from commit 43046631a5d4ac7201361a00473cc87fa52ab5a7)
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../sql/catalyst/analysis/CTESubstitution.scala    |  2 +-
 .../sql/catalyst/analysis/ResolveWithCTE.scala     |  2 +-
 .../catalyst/optimizer/MergeScalarSubqueries.scala |  3 +-
 ...ushdownPredicatesAndPruneColumnsForCTEDef.scala |  2 +-
 .../plans/logical/basicLogicalOperators.scala      |  1 +
 .../sql/catalyst/analysis/AnalysisSuite.scala      | 15 +++++++
 .../optimizer/MergeScalarSubqueriesSuite.scala     |  3 +-
 .../spark/sql/streaming/StreamingQuerySuite.scala  | 47 +++++++++++++++++++++-
 8 files changed, 69 insertions(+), 6 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala
index 77c687843c3..f047483b20f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala
@@ -261,7 +261,7 @@ object CTESubstitution extends Rule[LogicalPlan] {
             d.child
           } else {
             // Add a `SubqueryAlias` for hint-resolving rules to match 
relation names.
-            SubqueryAlias(table, CTERelationRef(d.id, d.resolved, d.output))
+            SubqueryAlias(table, CTERelationRef(d.id, d.resolved, d.output, 
d.isStreaming))
           }
         }.getOrElse(u)
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala
index 78b776f12f0..f1077378b2d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala
@@ -51,7 +51,7 @@ object ResolveWithCTE extends Rule[LogicalPlan] {
 
       case ref: CTERelationRef if !ref.resolved =>
         cteDefMap.get(ref.cteId).map { cteDef =>
-          CTERelationRef(cteDef.id, cteDef.resolved, cteDef.output)
+          CTERelationRef(cteDef.id, cteDef.resolved, cteDef.output, 
cteDef.isStreaming)
         }.getOrElse {
           ref
         }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala
index 6184160829b..ff0bc5e66d7 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala
@@ -381,7 +381,8 @@ object MergeScalarSubqueries extends Rule[LogicalPlan] {
               val subqueryCTE = header.plan.asInstanceOf[CTERelationDef]
               GetStructField(
                 ScalarSubquery(
-                  CTERelationRef(subqueryCTE.id, _resolved = true, 
subqueryCTE.output),
+                  CTERelationRef(subqueryCTE.id, _resolved = true, 
subqueryCTE.output,
+                    subqueryCTE.isStreaming),
                   exprId = ssr.exprId),
                 ssr.headerIndex)
             } else {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushdownPredicatesAndPruneColumnsForCTEDef.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushdownPredicatesAndPruneColumnsForCTEDef.scala
index f351ba0b39a..41859673616 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushdownPredicatesAndPruneColumnsForCTEDef.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushdownPredicatesAndPruneColumnsForCTEDef.scala
@@ -141,7 +141,7 @@ object PushdownPredicatesAndPruneColumnsForCTEDef extends 
Rule[LogicalPlan] {
         cteDef
       }
 
-    case cteRef @ CTERelationRef(cteId, _, output, _) =>
+    case cteRef @ CTERelationRef(cteId, _, output, _, _) =>
       val (cteDef, _, _, newAttrSet) = cteMap(cteId)
       if (newAttrSet.size < output.size) {
         val indices = newAttrSet.toSeq.map(cteDef.output.indexOf)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index b5a2f097424..d775b72a5da 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -837,6 +837,7 @@ case class CTERelationRef(
     cteId: Long,
     _resolved: Boolean,
     override val output: Seq[Attribute],
+    override val isStreaming: Boolean,
     statsOpt: Option[Statistics] = None) extends LeafNode with 
MultiInstanceRelation {
 
   final override val nodePatterns: Seq[TreePattern] = Seq(CTE)
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index 9d51c41a6d8..8d9f9abc00b 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -1466,4 +1466,19 @@ class AnalysisSuite extends AnalysisTest with Matchers {
     // EventTimeWatermark node is NOT eliminated.
     assert(analyzed.exists(_.isInstanceOf[EventTimeWatermark]))
   }
+
+  test("SPARK-46062: isStreaming flag is synced from CTE definition to CTE 
reference") {
+    val cteDef = CTERelationDef(streamingRelation.select($"a", $"ts"))
+    // Intentionally marking the flag _resolved to false, so that analyzer has 
a chance to sync
+    // the flag isStreaming on syncing the flag _resolved.
+    val cteRef = CTERelationRef(cteDef.id, _resolved = false, Nil, isStreaming 
= false)
+    val plan = WithCTE(cteRef, Seq(cteDef)).analyze
+
+    val refs = plan.collect {
+      case r: CTERelationRef => r
+    }
+    assert(refs.length == 1)
+    assert(refs.head.resolved)
+    assert(refs.head.isStreaming)
+  }
 }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueriesSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueriesSuite.scala
index 8af0e02855b..13e13841478 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueriesSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueriesSuite.scala
@@ -42,7 +42,8 @@ class MergeScalarSubqueriesSuite extends PlanTest {
   }
 
   private def extractorExpression(cteIndex: Int, output: Seq[Attribute], 
fieldIndex: Int) = {
-    GetStructField(ScalarSubquery(CTERelationRef(cteIndex, _resolved = true, 
output)), fieldIndex)
+    GetStructField(ScalarSubquery(
+      CTERelationRef(cteIndex, _resolved = true, output, isStreaming = 
false)), fieldIndex)
       .as("scalarsubquery()")
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index b889ac18974..da9b579b397 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -39,7 +39,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Dataset, 
Row, SaveMode}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Literal, Rand, Randn, 
Shuffle, Uuid}
-import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.catalyst.plans.logical.{CTERelationDef, 
CTERelationRef, LocalRelation}
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.Complete
 import org.apache.spark.sql.connector.read.InputPartition
 import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2}
@@ -1317,6 +1317,51 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
     }
   }
 
+  test("SPARK-46062: streaming query reading from CTE, which refers to temp 
view from " +
+    "streaming source") {
+    val inputStream = MemoryStream[Int]
+    inputStream.toDF().createOrReplaceTempView("tv")
+
+    val df = spark.sql(
+      """
+        |WITH w as (
+        |  SELECT * FROM tv
+        |)
+        |SELECT value from w
+        |""".stripMargin)
+
+    testStream(df)(
+      AddData(inputStream, 1, 2, 3),
+      CheckAnswer(1, 2, 3),
+      Execute { q =>
+        var isStreamingForCteDef: Option[Boolean] = None
+        var isStreamingForCteRef: Option[Boolean] = None
+
+        q.analyzedPlan.foreach {
+          case d: CTERelationDef =>
+            assert(d.resolved, "The definition node must be resolved after 
analysis.")
+            isStreamingForCteDef = Some(d.isStreaming)
+
+          case d: CTERelationRef =>
+            assert(d.resolved, "The reference node must be marked as resolved 
after analysis.")
+            isStreamingForCteRef = Some(d.isStreaming)
+
+          case _ =>
+        }
+
+        assert(isStreamingForCteDef.isDefined && 
isStreamingForCteRef.isDefined,
+          "Both definition and reference for CTE should be available in 
analyzed plan.")
+
+        assert(isStreamingForCteDef.get, "Expected isStreaming=true for CTE 
definition, but " +
+          "isStreaming is set to false.")
+
+        assert(isStreamingForCteDef === isStreamingForCteRef,
+          "isStreaming flag should be carried over from definition to 
reference, " +
+            s"definition: ${isStreamingForCteDef.get}, reference: 
${isStreamingForCteRef.get}.")
+      }
+    )
+  }
+
   private def checkExceptionMessage(df: DataFrame): Unit = {
     withTempDir { outputDir =>
       withTempDir { checkpointDir =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to