This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 7e5d59289870 [SPARK-47305][SQL] Fix PruneFilters to tag the 
isStreaming flag of LocalRelation correctly when the plan has both batch and 
streaming
7e5d59289870 is described below

commit 7e5d5928987069e255da94f8dd8b0cd7696a773b
Author: Jungtaek Lim <kabhwan.opensou...@gmail.com>
AuthorDate: Thu Mar 7 15:11:15 2024 +0900

    [SPARK-47305][SQL] Fix PruneFilters to tag the isStreaming flag of 
LocalRelation correctly when the plan has both batch and streaming
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to fix PruneFilters to tag the isStreaming flag of 
LocalRelation correctly when the plan has both batch and streaming.
    
    ### Why are the changes needed?
    
    When filter is evaluated to be always false, PruneFilters replaces the 
filter with empty LocalRelation, which effectively prunes filter. The logic 
cares about migration of the isStreaming flag, but incorrectly migrated in some 
case, via picking up the value of isStreaming flag from root node rather than 
filter (or child).
    
    isStreaming flag is true if the value of isStreaming flag from any of 
children is true. Flipping the coin, some children might have isStreaming flag 
as "false". If the filter being pruned is a descendant to such children (in 
other word, ancestor of streaming node), LocalRelation is incorrectly tagged as 
streaming where it should be batch.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    New UT verifying the fix. The new UT fails without this PR and passes with 
this PR.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #45406 from HeartSaVioR/SPARK-47305.
    
    Authored-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
    (cherry picked from commit 8d6bd9bbd29da6023e5740b622e12c7e1f8581ce)
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../spark/sql/catalyst/optimizer/Optimizer.scala   |  4 +-
 .../optimizer/PropagateEmptyRelationSuite.scala    | 43 +++++++++++++++++++++-
 2 files changed, 43 insertions(+), 4 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 7aebf7c28f11..3d774af1ce33 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -1636,9 +1636,9 @@ object PruneFilters extends Rule[LogicalPlan] with 
PredicateHelper {
     // If the filter condition always evaluate to null or false,
     // replace the input with an empty relation.
     case Filter(Literal(null, _), child) =>
-      LocalRelation(child.output, data = Seq.empty, isStreaming = 
plan.isStreaming)
+      LocalRelation(child.output, data = Seq.empty, isStreaming = 
child.isStreaming)
     case Filter(Literal(false, BooleanType), child) =>
-      LocalRelation(child.output, data = Seq.empty, isStreaming = 
plan.isStreaming)
+      LocalRelation(child.output, data = Seq.empty, isStreaming = 
child.isStreaming)
     // If any deterministic condition is guaranteed to be true given the 
constraints on the child's
     // output, remove the condition
     case f @ Filter(fc, p: LogicalPlan) =>
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
index fe45e02c67fa..a1132eadcc6f 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
@@ -21,10 +21,10 @@ import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
-import org.apache.spark.sql.catalyst.expressions.{Literal, UnspecifiedFrame}
+import org.apache.spark.sql.catalyst.expressions.{EqualTo, Literal, 
UnspecifiedFrame}
 import org.apache.spark.sql.catalyst.expressions.Literal.FalseLiteral
 import org.apache.spark.sql.catalyst.plans._
-import org.apache.spark.sql.catalyst.plans.logical.{Expand, LocalRelation, 
LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.plans.logical.{Expand, Filter, 
LocalRelation, LogicalPlan, Project}
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
 import org.apache.spark.sql.types.{IntegerType, MetadataBuilder, StructType}
 
@@ -221,6 +221,45 @@ class PropagateEmptyRelationSuite extends PlanTest {
     comparePlans(optimized, correctAnswer)
   }
 
+  test("SPARK-47305 correctly tag isStreaming when propagating empty relation 
" +
+    "with the plan containing batch and streaming") {
+    val data = Seq(Row(1))
+
+    val outputForStream = Seq($"a".int)
+    val schemaForStream = DataTypeUtils.fromAttributes(outputForStream)
+    val converterForStream = 
CatalystTypeConverters.createToCatalystConverter(schemaForStream)
+
+    val outputForBatch = Seq($"b".int)
+    val schemaForBatch = DataTypeUtils.fromAttributes(outputForBatch)
+    val converterForBatch = 
CatalystTypeConverters.createToCatalystConverter(schemaForBatch)
+
+    val streamingRelation = LocalRelation(
+      outputForStream,
+      data.map(converterForStream(_).asInstanceOf[InternalRow]),
+      isStreaming = true)
+    val batchRelation = LocalRelation(
+      outputForBatch,
+      data.map(converterForBatch(_).asInstanceOf[InternalRow]),
+      isStreaming = false)
+
+    val query = streamingRelation
+      .join(batchRelation.where(false).select($"b"), LeftOuter,
+        Some(EqualTo($"a", $"b")))
+
+    val analyzedQuery = query.analyze
+
+    val optimized = Optimize.execute(analyzedQuery)
+    // This is to deal with analysis for join condition. We expect the 
analyzed plan to contain
+    // filter and projection in batch relation, and know they will go away 
after optimization.
+    // The point to check here is that the node is replaced with "empty" 
LocalRelation, but the
+    // flag `isStreaming` is properly propagated.
+    val correctAnswer = analyzedQuery transform {
+      case Project(_, Filter(_, l: LocalRelation)) => l.copy(data = Seq.empty)
+    }
+
+    comparePlans(optimized, correctAnswer)
+  }
+
   test("don't propagate empty streaming relation through agg") {
     val output = Seq($"a".int)
     val data = Seq(Row(1))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to