This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new 679f3b1e5e96 [SPARK-47305][SQL] Fix PruneFilters to tag the isStreaming flag of LocalRelation correctly when the plan has both batch and streaming 679f3b1e5e96 is described below commit 679f3b1e5e965a6be12823faf012d0680771a5e2 Author: Jungtaek Lim <kabhwan.opensou...@gmail.com> AuthorDate: Thu Mar 7 15:11:15 2024 +0900 [SPARK-47305][SQL] Fix PruneFilters to tag the isStreaming flag of LocalRelation correctly when the plan has both batch and streaming ### What changes were proposed in this pull request? This PR proposes to fix PruneFilters to tag the isStreaming flag of LocalRelation correctly when the plan has both batch and streaming. ### Why are the changes needed? When filter is evaluated to be always false, PruneFilters replaces the filter with empty LocalRelation, which effectively prunes filter. The logic cares about migration of the isStreaming flag, but incorrectly migrated in some case, via picking up the value of isStreaming flag from root node rather than filter (or child). isStreaming flag is true if the value of isStreaming flag from any of children is true. Flipping the coin, some children might have isStreaming flag as "false". If the filter being pruned is a descendant to such children (in other word, ancestor of streaming node), LocalRelation is incorrectly tagged as streaming where it should be batch. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New UT verifying the fix. The new UT fails without this PR and passes with this PR. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #45406 from HeartSaVioR/SPARK-47305. Authored-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> (cherry picked from commit 8d6bd9bbd29da6023e5740b622e12c7e1f8581ce) Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 4 +- .../optimizer/PropagateEmptyRelationSuite.scala | 43 +++++++++++++++++++++- 2 files changed, 43 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 04d3eb962ed4..239682ab1f84 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1668,9 +1668,9 @@ object PruneFilters extends Rule[LogicalPlan] with PredicateHelper { // If the filter condition always evaluate to null or false, // replace the input with an empty relation. case Filter(Literal(null, _), child) => - LocalRelation(child.output, data = Seq.empty, isStreaming = plan.isStreaming) + LocalRelation(child.output, data = Seq.empty, isStreaming = child.isStreaming) case Filter(Literal(false, BooleanType), child) => - LocalRelation(child.output, data = Seq.empty, isStreaming = plan.isStreaming) + LocalRelation(child.output, data = Seq.empty, isStreaming = child.isStreaming) // If any deterministic condition is guaranteed to be true given the constraints on the child's // output, remove the condition case f @ Filter(fc, p: LogicalPlan) => diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala index e8d2ca1ff75d..5aeb27f7ee6b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala @@ -21,10 +21,10 @@ import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ -import org.apache.spark.sql.catalyst.expressions.{Literal, UnspecifiedFrame} +import org.apache.spark.sql.catalyst.expressions.{EqualTo, Literal, UnspecifiedFrame} import org.apache.spark.sql.catalyst.expressions.Literal.FalseLiteral import org.apache.spark.sql.catalyst.plans._ -import org.apache.spark.sql.catalyst.plans.logical.{Expand, LocalRelation, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.plans.logical.{Expand, Filter, LocalRelation, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.catalyst.types.DataTypeUtils import org.apache.spark.sql.types.{IntegerType, MetadataBuilder} @@ -222,6 +222,45 @@ class PropagateEmptyRelationSuite extends PlanTest { comparePlans(optimized, correctAnswer) } + test("SPARK-47305 correctly tag isStreaming when propagating empty relation " + + "with the plan containing batch and streaming") { + val data = Seq(Row(1)) + + val outputForStream = Seq($"a".int) + val schemaForStream = DataTypeUtils.fromAttributes(outputForStream) + val converterForStream = CatalystTypeConverters.createToCatalystConverter(schemaForStream) + + val outputForBatch = Seq($"b".int) + val schemaForBatch = DataTypeUtils.fromAttributes(outputForBatch) + val converterForBatch = CatalystTypeConverters.createToCatalystConverter(schemaForBatch) + + val streamingRelation = LocalRelation( + outputForStream, + data.map(converterForStream(_).asInstanceOf[InternalRow]), + isStreaming = true) + val batchRelation = LocalRelation( + outputForBatch, + data.map(converterForBatch(_).asInstanceOf[InternalRow]), + isStreaming = false) + + val query = streamingRelation + .join(batchRelation.where(false).select($"b"), LeftOuter, + Some(EqualTo($"a", $"b"))) + + val analyzedQuery = query.analyze + + val optimized = Optimize.execute(analyzedQuery) + // This is to deal with analysis for join condition. We expect the analyzed plan to contain + // filter and projection in batch relation, and know they will go away after optimization. + // The point to check here is that the node is replaced with "empty" LocalRelation, but the + // flag `isStreaming` is properly propagated. + val correctAnswer = analyzedQuery transform { + case Project(_, Filter(_, l: LocalRelation)) => l.copy(data = Seq.empty) + } + + comparePlans(optimized, correctAnswer) + } + test("don't propagate empty streaming relation through agg") { val output = Seq($"a".int) val data = Seq(Row(1)) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org