Asif created SPARK-55185:
----------------------------

             Summary: Adding rule InferFiltersFromConstraints to the Batch 
"Operator Optimization after Inferring Filters" causes idempotency break
                 Key: SPARK-55185
                 URL: https://issues.apache.org/jira/browse/SPARK-55185
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 4.1.1, 4.2.0
            Reporter: Asif


The bug SPARK-55072 fix requires *InferFiltersFromConstraints* rule to run as 
part of the batch 
*Batch("Operator Optimization after Inferring Filters", fixedPoint,*
{*}operatorOptimizationRuleSet: _{*}).*
 
But if *InferFiltersFromConstraints* is run as part of Fixed Point Iteration , 
then the idempotency is not achieved in special cases ( as seen in the test 
below)
 
{quote} 
test("SPARK-55072: Bug fix needs this test to pass as that is dependent change 
tested here - 2") {
val rel1 = LocalRelation(
Seq($"a".int, $"b".int),
InternalRow(1, 1) :: InternalRow(2, 1) :: InternalRow(3, 3) :: InternalRow(7, 8)
:: InternalRow(5, 6) :: Nil)

val nullRel = Project(
Seq(
Alias(Literal(null, IntegerType), "a")(),
Alias(Literal(null, IntegerType), "b")()),
OneRowRelation())

val distinct = Aggregate(Seq($"a", $"b"), Seq($"a"), nullRel.union(rel1))
val agg = Aggregate(Seq($"a"), Seq(sum($"a").as("aggFunctionAlias"), $"a"), 
distinct).analyze
val rel2 = LocalRelation(
Seq($"c".int, $"d".int),
InternalRow(1, 1) :: InternalRow(2, 1) :: InternalRow(3, 3) :: InternalRow(6, 
6) ::
InternalRow(7, 7) :: InternalRow(9, 9) :: Nil).analyze

val join = rel2.join(agg, condition =
Some(Cast($"d", LongType) === $"aggFunctionAlias" && $"a" === $"c")).analyze

val optimizer = new SimpleTestOptimizer()
val batches = optimizer.defaultBatches
val indexBeforeNewFilterInfer =
batches.indexWhere(_.name == "Operator Optimization before Inferring Filters")
val indexAfterNewFilterInfer =
batches.indexWhere(_.name == "Operator Optimization after Inferring Filters")
assert(indexAfterNewFilterInfer != -1 && indexBeforeNewFilterInfer != -1)
// ensure that InferFiltersFromConstraint rule is present in the batch Operator 
Optimization
// after Inferring Filters
val batchOfInterest = batches(indexAfterNewFilterInfer)
val optimizerToUse = if (!batchOfInterest.rules.exists(
_.ruleName == InferFiltersFromConstraints.ruleName)) {
new SimpleTestOptimizer() {
override def defaultBatches: Seq[Batch] = {
val mutableBatches = super.defaultBatches.toBuffer
val afterInferBatch = mutableBatches(indexAfterNewFilterInfer)
val mutableRules = afterInferBatch.rules.toBuffer
val newRules = mutableRules.append(InferFiltersFromConstraints).toSeq
val newAfterInferBatch = new Batch(afterInferBatch.name, 
afterInferBatch.strategy,
newRules: _*)
mutableBatches(indexAfterNewFilterInfer) = newAfterInferBatch
mutableBatches.toSeq
}
}
} else {
optimizer
}
optimizerToUse.execute(join)
}{quote}
The above test is also added as bug test.

The issue is a complex interaction between *PushDownPredicates* rule and 
*InferFiltersFromConstraints and PruneFilters rules, involving union node.*

Ideally the fix should be such that removal or addition of any rule in itself, 
should not cause problem in achieving idempotency.

But as seen in this case,

PruneFilter rule, results in plan modification such that an Empty Relation is 
created, within the Union Leg.

As a result only one leg of Union is of importance, and so Union can be 
replaced directly with the non empty child leg.

This task is usually done by PropagateEmptyRelation rule.

But since this PropagateEmptyRelation rule is not invoked after PruneFilter, 
the empty relation remains, which causes a behaviour where PushdownPredicate 
pushes a filterdown, while InferFilterFromConstraints again materialized the 
new filter.

I feel that invoking PropagateEmptyRelation after PruneFilters is the safe 
approach, instead of  tweaking the constraints code related to Union, which can 
become ugly especially since it is related to Union node's constraint 
logic.{*}{*}{*}{*}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to