[ 
https://issues.apache.org/jira/browse/SPARK-26569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16751801#comment-16751801
 ] 

Chen Fan commented on SPARK-26569:
----------------------------------

I believe PR under https://issues.apache.org/jira/browse/SPARK-21652 already 
fix it on Spark 2.3.

> Fixed point for batch Operator Optimizations never reached when optimize 
> logicalPlan
> ------------------------------------------------------------------------------------
>
>                 Key: SPARK-26569
>                 URL: https://issues.apache.org/jira/browse/SPARK-26569
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.1.0
>         Environment:  
>  
>            Reporter: Chen Fan
>            Priority: Major
>
> There is a bit complicated Spark App using DataSet api run once a day, and I 
> noticed the app will hang once in a while, 
>  I add some log and compare two driver log which one belong to successful 
> app, another belong to faied, and here is some results of investigation
> 1. Usually the app will running correctly, but sometime it will hang after 
> finishing job 1
> !image-2019-01-08-19-53-20-509.png!
> 2. According to log I append , the successful app always reach the fixed 
> point when iteration is 7 on Batch Operator Optimizations, but failed app 
> never reached this fixed point.
> {code:java}
> 2019-01-04,11:35:34,199 DEBUG org.apache.spark.sql.execution.SparkOptimizer: 
> === Result of Batch Operator Optimizations ===
> 2019-01-04,14:00:42,847 INFO org.apache.spark.sql.execution.SparkOptimizer: 
> iteration is 6/100, for batch Operator Optimizations
> 2019-01-04,14:00:42,851 INFO org.apache.spark.sql.execution.SparkOptimizer: 
> === Applying Rule 
> org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin ===
>  
> 2019-01-04,14:00:42,852 INFO org.apache.spark.sql.execution.SparkOptimizer: 
> === Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownPredicate 
> ===
>  
> 2019-01-04,14:00:42,903 INFO org.apache.spark.sql.execution.SparkOptimizer: 
> === Applying Rule 
> org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints ===
>  
> 2019-01-04,14:00:42,939 INFO org.apache.spark.sql.execution.SparkOptimizer: 
> === Applying Rule 
> org.apache.spark.sql.catalyst.optimizer.BooleanSimplification ===
>  
> 2019-01-04,14:00:42,951 INFO org.apache.spark.sql.execution.SparkOptimizer: 
> === Applying Rule org.apache.spark.sql.catalyst.optimizer.PruneFilters ===
>  
> 2019-01-04,14:00:42,970 INFO org.apache.spark.sql.execution.SparkOptimizer: 
> iteration is 7/100, for batch Operator Optimizations
> 2019-01-04,14:00:42,971 INFO org.apache.spark.sql.execution.SparkOptimizer: 
> Fixed point reached for batch Operator Optimizations after 7 iterations.
> 2019-01-04,14:13:15,616 INFO org.apache.spark.sql.execution.SparkOptimizer: 
> iteration is 45/100, for batch Operator Optimizations
> 2019-01-04,14:13:15,619 INFO org.apache.spark.sql.execution.SparkOptimizer: 
> === Applying Rule 
> org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin ===
>  
> 2019-01-04,14:13:15,620 INFO org.apache.spark.sql.execution.SparkOptimizer: 
> === Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownPredicate 
> ===
>  
> 2019-01-04,14:13:59,529 INFO org.apache.spark.sql.execution.SparkOptimizer: 
> === Applying Rule 
> org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints ===
>  
> 2019-01-04,14:13:59,806 INFO org.apache.spark.sql.execution.SparkOptimizer: 
> === Applying Rule 
> org.apache.spark.sql.catalyst.optimizer.BooleanSimplification ===
>  
> 2019-01-04,14:13:59,845 INFO org.apache.spark.sql.execution.SparkOptimizer: 
> iteration is 46/100, for batch Operator Optimizations
> 2019-01-04,14:13:59,849 INFO org.apache.spark.sql.execution.SparkOptimizer: 
> === Applying Rule 
> org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin ===
>  
> 2019-01-04,14:13:59,849 INFO org.apache.spark.sql.execution.SparkOptimizer: 
> === Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownPredicate 
> ===
>  
> 2019-01-04,14:14:45,340 INFO org.apache.spark.sql.execution.SparkOptimizer: 
> === Applying Rule 
> org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints ===
>  
> 2019-01-04,14:14:45,631 INFO org.apache.spark.sql.execution.SparkOptimizer: 
> === Applying Rule 
> org.apache.spark.sql.catalyst.optimizer.BooleanSimplification ===
>  
> 2019-01-04,14:14:45,678 INFO org.apache.spark.sql.execution.SparkOptimizer: 
> iteration is 47/100, for batch Operator Optimizations
> {code}
> 3. The difference between two logical plan appear in BooleanSimplification on 
> iteration, before this rule, two logical plan is same:
> {code:java}
> // just a head part of plan
> Project [model#2486, version#12, device#11, date#30, imei#13, pType#14, 
> releaseType#15, regionType#16, expType#17, tag#18, imeiCount#1586, 
> startdate#2360, enddate#2361, status#2362]
> +- Join Inner, (((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > 
> 10000) || (model#2486 INSET 
> (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael))
>  && model#2486 INSET 
> (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael))))
>  && ((((((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > 
> 10000) || (model#2358 INSET 
> (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael))
>  && model#2486 INSET 
> (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael))))
>  && (date#30 >= startdate#2360)) && (date#30 <= 
> enddate#2361)) && (model#2486 = model#2358)) && (version#12 = 
> version#2359)))
>    :- Project [device#11, version#12, date#30, imei#13, pType#14, 
> releaseType#15, regionType#16, expType#17, tag#18, imeiCount#1586, 
> UDF(model#10, device#11) AS model#2486]
>    :  +- Filter ((((((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > 
> 10000) || UDF(model#10, device#1584) INSET 
> (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)))
>  && ((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > 10000) 
> || UDF(model#1583, device#11) INSET 
> (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael))))
>  && isnotnull(UDF(model#1583, device#11))) && 
> isnotnull(UDF(model#10, device#1584))) && (((cast(cast(imeiCount#1586 
> as decimal(20,0)) as int) > 10000) || UDF(model#10, device#11) INSET 
> (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)))
>  && isnotnull(UDF(model#10, device#11))))
>    :     +- Join Inner, ((((model#10 = model#1583) && (device#11 = 
> device#1584)) && (version#12 = version#1585)) && (date#30 = 
> date#1592))
> {code}
> 4. after BooleanSimplification, There is only one difference: Filter's 
> constraints format like (A || B) && (A || C) on successful app but A && (B || 
> C) on failed app.
> {code:java}
> //successful app's logical plan
> Filter ((((isnotnull(UDF(model#1583, device#11)) && 
> ((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > 10000) || 
> UDF(model#1583, device#11) INSET 
> (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael))))
>  && isnotnull(UDF(model#10, device#1584))) && 
> ((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > 10000) || 
> UDF(model#10, device#1584) INSET 
> (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael))))
>  && (((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > 10000) 
> || UDF(model#10, device#11) INSET 
> (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)))
>  && isnotnull(UDF(model#10, device#11))))
> // failed app's plan
> Filter (((((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > 10000) || 
> (UDF(model#10, device#1584) INSET 
> (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael))
>  && UDF(model#1583, device#11) INSET 
> (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael))))
>  && isnotnull(UDF(model#1583, device#11))) && 
> isnotnull(UDF(model#10, device#1584))) && (((cast(cast(imeiCount#1586 
> as decimal(20,0)) as int) > 10000) || UDF(model#10, device#11) INSET 
> (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)))
>  && isnotnull(UDF(model#10, device#11))))
> {code}
> 5. hereafter, error occurred with rule InferFiltersFromConstraints:
> {code:java}
> // code about upon log
> case join @ Join(left, right, joinType, conditionOpt) =>
>       // Only consider constraints that can be pushed down completely to 
> either the left or the
>       // right child
>       val constraints = join.constraints.filter { c =>
>         c.references.subsetOf(left.outputSet) || 
> c.references.subsetOf(right.outputSet)
>       }
>       // Remove those constraints that are already enforced by either the 
> left or the right child
>       val additionalConstraints = constraints -- (left.constraints ++ 
> right.constraints)
>       logInfo(
>         s"""
>            | After remove constraints additional constraints is  
> ${additionalConstraints.toList.toString}
>            | left constraints is ${left.constraints.toList.toString()}
>            | right constraints is ${right.constraints.toList.toString()}
>         """.stripMargin)
> {code}
> {code:java}
> // successful app's log
> 2019-01-08,16:44:48,911 INFO 
> org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints:
>  After remove constraints additional constraints is  
> List(((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > 10000) || 
> (model#2486 INSET 
> (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael))
>  && model#2486 INSET 
> (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)))))
>  left constraints is List(isnotnull(model#2486), isnotnull(device#11), 
> isnotnull(date#30), ((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > 
> 10000) || model#2486 INSET 
> (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael))),
>  ((((((((((((((((((((((((((((((20190107 <=> date#30) || (20190106 
> <=> date#30)) || (20190105 <=> date#30)) || (20190104 <=> 
> date#30)) || (20190103 <=> date#30)) || (20190102 <=> date#30)) 
> || (20190101 <=> date#30)) || (20181231 <=> date#30)) || 
> (20181230 <=> date#30)) || (20181229 <=> date#30)) || (20181228 
> <=> date#30)) || (20181227 <=> date#30)) || (20181226 <=> 
> date#30)) || (20181225 <=> date#30)) || (20181224 <=> date#30)) 
> || (20181223 <=> date#30)) || (20181222 <=> date#30)) || 
> (20181221 <=> date#30)) || (20181220 <=> date#30)) || (20181219 
> <=> date#30)) || (20181218 <=> date#30)) || (20181217 <=> 
> date#30)) || (20181216 <=> date#30)) || (20181215 <=> date#30)) 
> || (20181214 <=> date#30)) || (20181213 <=> date#30)) || 
> (20181212 <=> date#30)) || (20181211 <=> date#30)) || (20181210 
> <=> date#30)) || (20181209 <=> date#30)), isnotnull(version#12))
>  right constraints is List(isnotnull(enddate#2361), 
> isnotnull(startdate#2360), isnotnull(model#2358), isnotnull(version#2359))
> //failed app's log
> 2019-01-08,16:55:11,614 INFO 
> org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints:
>  After remove constraints additional constraints is  List()
>  left constraints is List(isnotnull(date#30), 
> ((((((((((((((((((((((((((((((20190107 <=> date#30) || (20190106 
> <=> date#30)) || (20190105 <=> date#30)) || (20190104 <=> 
> date#30)) || (20190103 <=> date#30)) || (20190102 <=> date#30)) 
> || (20190101 <=> date#30)) || (20181231 <=> date#30)) || 
> (20181230 <=> date#30)) || (20181229 <=> date#30)) || (20181228 
> <=> date#30)) || (20181227 <=> date#30)) || (20181226 <=> 
> date#30)) || (20181225 <=> date#30)) || (20181224 <=> date#30)) 
> || (20181223 <=> date#30)) || (20181222 <=> date#30)) || 
> (20181221 <=> date#30)) || (20181220 <=> date#30)) || (20181219 
> <=> date#30)) || (20181218 <=> date#30)) || (20181217 <=> 
> date#30)) || (20181216 <=> date#30)) || (20181215 <=> date#30)) 
> || (20181214 <=> date#30)) || (20181213 <=> date#30)) || 
> (20181212 <=> date#30)) || (20181211 <=> date#30)) || (20181210 
> <=> date#30)) || (20181209 <=> date#30)), isnotnull(version#12), 
> isnotnull(device#11), isnotnull(model#2486), ((cast(cast(imeiCount#1586 as 
> decimal(20,0)) as int) > 10000) || model#2486 INSET 
> (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael))),
>  ((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > 10000) || 
> (model#2486 INSET 
> (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael))
>  && model#2486 INSET 
> (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)))))
>  right constraints is List(isnotnull(enddate#2361), 
> isnotnull(startdate#2360), isnotnull(version#2359), isnotnull(model#2358))
> {code}
> Failed app plan's left child has an additional constraint:
> {code:java}
> ((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > 10000) || 
> (model#2486 INSET 
> (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael))
>  && model#2486 INSET 
> (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael))))
> {code}
> 5. Soon after the gap between two app's plan is getting bigger, one will 
> successful, another hang. It seems there are two possibly reason :
>  1. BooleanSimplification is not idempotent
>  2. InferFiltersFromConstraints's behavior not correct when child's 
> constraints has A || ( B && C) instead of (A || B) && (A || C)
> I'm not sure which is root casue, could someone follow this question?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to