[ 
https://issues.apache.org/jira/browse/SPARK-55185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated SPARK-55185:
-----------------------------------
    Labels: pull-request-available sql  (was: sql)

> Adding rule InferFiltersFromConstraints to the Batch "Operator Optimization 
> after Inferring Filters" causes idempotency break
> -----------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-55185
>                 URL: https://issues.apache.org/jira/browse/SPARK-55185
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 4.2.0, 4.1.1
>            Reporter: Asif
>            Priority: Major
>              Labels: pull-request-available, sql
>
> The bug SPARK-55072 fix requires *InferFiltersFromConstraints* rule to run as 
> part of the batch 
> *Batch("Operator Optimization after Inferring Filters", fixedPoint,*
> {*}operatorOptimizationRuleSet: _{*}).*
>  
> But if *InferFiltersFromConstraints* is run as part of Fixed Point Iteration 
> , then the idempotency is not achieved in special cases ( as seen in the test 
> below)
>  
> {quote} 
> test("SPARK-55072: Bug fix needs this test to pass as that is dependent 
> change tested here - 2") {
> val rel1 = LocalRelation(
> Seq($"a".int, $"b".int),
> InternalRow(1, 1) :: InternalRow(2, 1) :: InternalRow(3, 3) :: InternalRow(7, 
> 8)
> :: InternalRow(5, 6) :: Nil)
> val nullRel = Project(
> Seq(
> Alias(Literal(null, IntegerType), "a")(),
> Alias(Literal(null, IntegerType), "b")()),
> OneRowRelation())
> val distinct = Aggregate(Seq($"a", $"b"), Seq($"a"), nullRel.union(rel1))
> val agg = Aggregate(Seq($"a"), Seq(sum($"a").as("aggFunctionAlias"), $"a"), 
> distinct).analyze
> val rel2 = LocalRelation(
> Seq($"c".int, $"d".int),
> InternalRow(1, 1) :: InternalRow(2, 1) :: InternalRow(3, 3) :: InternalRow(6, 
> 6) ::
> InternalRow(7, 7) :: InternalRow(9, 9) :: Nil).analyze
> val join = rel2.join(agg, condition =
> Some(Cast($"d", LongType) === $"aggFunctionAlias" && $"a" === $"c")).analyze
> val optimizer = new SimpleTestOptimizer()
> val batches = optimizer.defaultBatches
> val indexBeforeNewFilterInfer =
> batches.indexWhere(_.name == "Operator Optimization before Inferring Filters")
> val indexAfterNewFilterInfer =
> batches.indexWhere(_.name == "Operator Optimization after Inferring Filters")
> assert(indexAfterNewFilterInfer != -1 && indexBeforeNewFilterInfer != -1)
> // ensure that InferFiltersFromConstraint rule is present in the batch 
> Operator Optimization
> // after Inferring Filters
> val batchOfInterest = batches(indexAfterNewFilterInfer)
> val optimizerToUse = if (!batchOfInterest.rules.exists(
> _.ruleName == InferFiltersFromConstraints.ruleName)) {
> new SimpleTestOptimizer() {
> override def defaultBatches: Seq[Batch] = {
> val mutableBatches = super.defaultBatches.toBuffer
> val afterInferBatch = mutableBatches(indexAfterNewFilterInfer)
> val mutableRules = afterInferBatch.rules.toBuffer
> val newRules = mutableRules.append(InferFiltersFromConstraints).toSeq
> val newAfterInferBatch = new Batch(afterInferBatch.name, 
> afterInferBatch.strategy,
> newRules: _*)
> mutableBatches(indexAfterNewFilterInfer) = newAfterInferBatch
> mutableBatches.toSeq
> }
> }
> } else {
> optimizer
> }
> optimizerToUse.execute(join)
> }{quote}
> The above test is also added as bug test.
> The issue is a complex interaction between *PushDownPredicates* rule and 
> *InferFiltersFromConstraints and PruneFilters rules, involving union node.*
> Ideally the fix should be such that removal or addition of any rule in 
> itself, should not cause problem in achieving idempotency.
> But as seen in this case,
> PruneFilter rule, results in plan modification such that an Empty Relation is 
> created, within the Union Leg.
> As a result only one leg of Union is of importance, and so Union can be 
> replaced directly with the non empty child leg.
> This task is usually done by PropagateEmptyRelation rule.
> But since this PropagateEmptyRelation rule is not invoked after PruneFilter, 
> the empty relation remains, which causes a behaviour where PushdownPredicate 
> pushes a filterdown, while InferFilterFromConstraints again materialized the 
> new filter.
> I feel that invoking PropagateEmptyRelation after PruneFilters is the safe 
> approach, instead of  tweaking the constraints code related to Union, which 
> can become ugly especially since it is related to Union node's constraint 
> logic.{*}{*}{*}{*}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to