Chen Fan created SPARK-26569:
--------------------------------

             Summary: Fixed point for batch Operator Optimizations never 
reached when optimize logicalPlan
                 Key: SPARK-26569
                 URL: https://issues.apache.org/jira/browse/SPARK-26569
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 2.1.0
         Environment:  

 
            Reporter: Chen Fan


There is a bit complicated Spark App using DataSet api run once a day, and I 
noticed the app will hang once in a while, 
 I add some log and compare two driver log which one belong to successful app, 
another belong to faied, and here is some results of investigation

1. Usually the app will running correctly, but sometime it will hang after 
finishing job 1

2. According to log I append , the successful app always reach the fixed point 
when iteration is 7 on Batch Operator Optimizations, but failed app never 
reached this fixed point.
{code:java}
2019-01-04,11:35:34,199 DEBUG org.apache.spark.sql.execution.SparkOptimizer: 
=== Result of Batch Operator Optimizations ===

2019-01-04,14:00:42,847 INFO org.apache.spark.sql.execution.SparkOptimizer: 
iteration is 6/100, for batch Operator Optimizations
2019-01-04,14:00:42,851 INFO org.apache.spark.sql.execution.SparkOptimizer: 
=== Applying Rule 
org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin ===
 
2019-01-04,14:00:42,852 INFO org.apache.spark.sql.execution.SparkOptimizer: 
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownPredicate ===
 
2019-01-04,14:00:42,903 INFO org.apache.spark.sql.execution.SparkOptimizer: 
=== Applying Rule 
org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints ===
 
2019-01-04,14:00:42,939 INFO org.apache.spark.sql.execution.SparkOptimizer: 
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.BooleanSimplification 
===
 
2019-01-04,14:00:42,951 INFO org.apache.spark.sql.execution.SparkOptimizer: 
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.PruneFilters ===
 
2019-01-04,14:00:42,970 INFO org.apache.spark.sql.execution.SparkOptimizer: 
iteration is 7/100, for batch Operator Optimizations
2019-01-04,14:00:42,971 INFO org.apache.spark.sql.execution.SparkOptimizer: 
Fixed point reached for batch Operator Optimizations after 7 iterations.

2019-01-04,14:13:15,616 INFO org.apache.spark.sql.execution.SparkOptimizer: 
iteration is 45/100, for batch Operator Optimizations
2019-01-04,14:13:15,619 INFO org.apache.spark.sql.execution.SparkOptimizer: 
=== Applying Rule 
org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin ===
 
2019-01-04,14:13:15,620 INFO org.apache.spark.sql.execution.SparkOptimizer: 
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownPredicate ===
 
2019-01-04,14:13:59,529 INFO org.apache.spark.sql.execution.SparkOptimizer: 
=== Applying Rule 
org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints ===
 
2019-01-04,14:13:59,806 INFO org.apache.spark.sql.execution.SparkOptimizer: 
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.BooleanSimplification 
===
 
2019-01-04,14:13:59,845 INFO org.apache.spark.sql.execution.SparkOptimizer: 
iteration is 46/100, for batch Operator Optimizations
2019-01-04,14:13:59,849 INFO org.apache.spark.sql.execution.SparkOptimizer: 
=== Applying Rule 
org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin ===
 
2019-01-04,14:13:59,849 INFO org.apache.spark.sql.execution.SparkOptimizer: 
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownPredicate ===
 
2019-01-04,14:14:45,340 INFO org.apache.spark.sql.execution.SparkOptimizer: 
=== Applying Rule 
org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints ===
 
2019-01-04,14:14:45,631 INFO org.apache.spark.sql.execution.SparkOptimizer: 
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.BooleanSimplification 
===
 
2019-01-04,14:14:45,678 INFO org.apache.spark.sql.execution.SparkOptimizer: 
iteration is 47/100, for batch Operator Optimizations
{code}
3. The difference between two logical plan appear in BooleanSimplification on 
iteration, before this rule, two logical plan is same:
{code:java}
// just a head part of plan
Project [model#2486, version#12, device#11, date#30, imei#13, pType#14, 
releaseType#15, regionType#16, expType#17, tag#18, imeiCount#1586, 
startdate#2360, enddate#2361, status#2362]
+- Join Inner, (((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > 
10000) || (model#2486 INSET 
(grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael))
 && model#2486 INSET 
(grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael))))
 && ((((((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > 
10000) || (model#2358 INSET 
(grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael))
 && model#2486 INSET 
(grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael))))
 && (date#30 >= startdate#2360)) && (date#30 <= 
enddate#2361)) && (model#2486 = model#2358)) && (version#12 = 
version#2359)))
   :- Project [device#11, version#12, date#30, imei#13, pType#14, 
releaseType#15, regionType#16, expType#17, tag#18, imeiCount#1586, 
UDF(model#10, device#11) AS model#2486]
   :  +- Filter ((((((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > 
10000) || UDF(model#10, device#1584) INSET 
(grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)))
 && ((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > 10000) || 
UDF(model#1583, device#11) INSET 
(grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael))))
 && isnotnull(UDF(model#1583, device#11))) && 
isnotnull(UDF(model#10, device#1584))) && (((cast(cast(imeiCount#1586 
as decimal(20,0)) as int) > 10000) || UDF(model#10, device#11) INSET 
(grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)))
 && isnotnull(UDF(model#10, device#11))))
   :     +- Join Inner, ((((model#10 = model#1583) && (device#11 = 
device#1584)) && (version#12 = version#1585)) && (date#30 = 
date#1592))
{code}
4. after BooleanSimplification, There is only one difference: Filter's 
constraints format like (A || B) && (A || C) on successful app but A && (B || 
C) on failed app.
{code:java}
//successful app's logical plan
Filter ((((isnotnull(UDF(model#1583, device#11)) && 
((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > 10000) || 
UDF(model#1583, device#11) INSET 
(grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael))))
 && isnotnull(UDF(model#10, device#1584))) && 
((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > 10000) || 
UDF(model#10, device#1584) INSET 
(grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael))))
 && (((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > 10000) 
|| UDF(model#10, device#11) INSET 
(grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)))
 && isnotnull(UDF(model#10, device#11))))

// failed app's plan
Filter (((((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > 10000) || 
(UDF(model#10, device#1584) INSET 
(grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael))
 && UDF(model#1583, device#11) INSET 
(grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael))))
 && isnotnull(UDF(model#1583, device#11))) && 
isnotnull(UDF(model#10, device#1584))) && (((cast(cast(imeiCount#1586 
as decimal(20,0)) as int) > 10000) || UDF(model#10, device#11) INSET 
(grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)))
 && isnotnull(UDF(model#10, device#11))))
{code}
5. hereafter, error occurred with rule InferFiltersFromConstraints:
{code:java}
// code about upon log
case join @ Join(left, right, joinType, conditionOpt) =>
      // Only consider constraints that can be pushed down completely to either 
the left or the
      // right child
      val constraints = join.constraints.filter { c =>
        c.references.subsetOf(left.outputSet) || 
c.references.subsetOf(right.outputSet)
      }

      // Remove those constraints that are already enforced by either the left 
or the right child
      val additionalConstraints = constraints -- (left.constraints ++ 
right.constraints)

      logInfo(
        s"""
           | After remove constraints additional constraints is  
${additionalConstraints.toList.toString}
           | left constraints is ${left.constraints.toList.toString()}
           | right constraints is ${right.constraints.toList.toString()}
        """.stripMargin)
{code}
{code:java}
// successful app's log
2019-01-08,16:44:48,911 INFO 
org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints:
 After remove constraints additional constraints is  
List(((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > 10000) || 
(model#2486 INSET 
(grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael))
 && model#2486 INSET 
(grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)))))
 left constraints is List(isnotnull(model#2486), isnotnull(device#11), 
isnotnull(date#30), ((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > 
10000) || model#2486 INSET 
(grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael))),
 ((((((((((((((((((((((((((((((20190107 <=> date#30) || (20190106 
<=> date#30)) || (20190105 <=> date#30)) || (20190104 <=> 
date#30)) || (20190103 <=> date#30)) || (20190102 <=> date#30)) || 
(20190101 <=> date#30)) || (20181231 <=> date#30)) || (20181230 
<=> date#30)) || (20181229 <=> date#30)) || (20181228 <=> 
date#30)) || (20181227 <=> date#30)) || (20181226 <=> date#30)) || 
(20181225 <=> date#30)) || (20181224 <=> date#30)) || (20181223 
<=> date#30)) || (20181222 <=> date#30)) || (20181221 <=> 
date#30)) || (20181220 <=> date#30)) || (20181219 <=> date#30)) || 
(20181218 <=> date#30)) || (20181217 <=> date#30)) || (20181216 
<=> date#30)) || (20181215 <=> date#30)) || (20181214 <=> 
date#30)) || (20181213 <=> date#30)) || (20181212 <=> date#30)) || 
(20181211 <=> date#30)) || (20181210 <=> date#30)) || (20181209 
<=> date#30)), isnotnull(version#12))
 right constraints is List(isnotnull(enddate#2361), isnotnull(startdate#2360), 
isnotnull(model#2358), isnotnull(version#2359))

//failed app's log
2019-01-08,16:55:11,614 INFO 
org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints:
 After remove constraints additional constraints is  List()
 left constraints is List(isnotnull(date#30), 
((((((((((((((((((((((((((((((20190107 <=> date#30) || (20190106 
<=> date#30)) || (20190105 <=> date#30)) || (20190104 <=> 
date#30)) || (20190103 <=> date#30)) || (20190102 <=> date#30)) || 
(20190101 <=> date#30)) || (20181231 <=> date#30)) || (20181230 
<=> date#30)) || (20181229 <=> date#30)) || (20181228 <=> 
date#30)) || (20181227 <=> date#30)) || (20181226 <=> date#30)) || 
(20181225 <=> date#30)) || (20181224 <=> date#30)) || (20181223 
<=> date#30)) || (20181222 <=> date#30)) || (20181221 <=> 
date#30)) || (20181220 <=> date#30)) || (20181219 <=> date#30)) || 
(20181218 <=> date#30)) || (20181217 <=> date#30)) || (20181216 
<=> date#30)) || (20181215 <=> date#30)) || (20181214 <=> 
date#30)) || (20181213 <=> date#30)) || (20181212 <=> date#30)) || 
(20181211 <=> date#30)) || (20181210 <=> date#30)) || (20181209 
<=> date#30)), isnotnull(version#12), isnotnull(device#11), 
isnotnull(model#2486), ((cast(cast(imeiCount#1586 as decimal(20,0)) as int) 
> 10000) || model#2486 INSET 
(grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael))),
 ((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > 10000) || 
(model#2486 INSET 
(grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael))
 && model#2486 INSET 
(grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)))))
 right constraints is List(isnotnull(enddate#2361), isnotnull(startdate#2360), 
isnotnull(version#2359), isnotnull(model#2358))
{code}
Failed app plan's left child has an additional constraint:
{code:java}
((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > 10000) || (model#2486 
INSET 
(grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael))
 && model#2486 INSET 
(grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael))))
{code}
5. Soon after the gap between two app's plan is getting bigger, one will 
successful, another hang. It seems there are two possibly reason :
 1. BooleanSimplification is not idempotent
 2. InferFiltersFromConstraints's behavior not correct when child's constraints 
has A || ( B && C) instead of (A || B) && (A || C)

I'm not sure which is root casue, could someone follow this question?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to