[ https://issues.apache.org/jira/browse/SPARK-26569?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Chen Fan resolved SPARK-26569. ------------------------------ Resolution: Duplicate > Fixed point for batch Operator Optimizations never reached when optimize > logicalPlan > ------------------------------------------------------------------------------------ > > Key: SPARK-26569 > URL: https://issues.apache.org/jira/browse/SPARK-26569 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.1.0 > Environment: > > Reporter: Chen Fan > Priority: Major > > There is a bit complicated Spark App using DataSet api run once a day, and I > noticed the app will hang once in a while, > I add some log and compare two driver log which one belong to successful > app, another belong to faied, and here is some results of investigation > 1. Usually the app will running correctly, but sometime it will hang after > finishing job 1 > !image-2019-01-08-19-53-20-509.png! > 2. According to log I append , the successful app always reach the fixed > point when iteration is 7 on Batch Operator Optimizations, but failed app > never reached this fixed point. > {code:java} > 2019-01-04,11:35:34,199 DEBUG org.apache.spark.sql.execution.SparkOptimizer: > === Result of Batch Operator Optimizations === > 2019-01-04,14:00:42,847 INFO org.apache.spark.sql.execution.SparkOptimizer: > iteration is 6/100, for batch Operator Optimizations > 2019-01-04,14:00:42,851 INFO org.apache.spark.sql.execution.SparkOptimizer: > === Applying Rule > org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin === > > 2019-01-04,14:00:42,852 INFO org.apache.spark.sql.execution.SparkOptimizer: > === Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownPredicate > === > > 2019-01-04,14:00:42,903 INFO org.apache.spark.sql.execution.SparkOptimizer: > === Applying Rule > org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints === > > 2019-01-04,14:00:42,939 INFO org.apache.spark.sql.execution.SparkOptimizer: > === Applying Rule > org.apache.spark.sql.catalyst.optimizer.BooleanSimplification === > > 2019-01-04,14:00:42,951 INFO org.apache.spark.sql.execution.SparkOptimizer: > === Applying Rule org.apache.spark.sql.catalyst.optimizer.PruneFilters === > > 2019-01-04,14:00:42,970 INFO org.apache.spark.sql.execution.SparkOptimizer: > iteration is 7/100, for batch Operator Optimizations > 2019-01-04,14:00:42,971 INFO org.apache.spark.sql.execution.SparkOptimizer: > Fixed point reached for batch Operator Optimizations after 7 iterations. > 2019-01-04,14:13:15,616 INFO org.apache.spark.sql.execution.SparkOptimizer: > iteration is 45/100, for batch Operator Optimizations > 2019-01-04,14:13:15,619 INFO org.apache.spark.sql.execution.SparkOptimizer: > === Applying Rule > org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin === > > 2019-01-04,14:13:15,620 INFO org.apache.spark.sql.execution.SparkOptimizer: > === Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownPredicate > === > > 2019-01-04,14:13:59,529 INFO org.apache.spark.sql.execution.SparkOptimizer: > === Applying Rule > org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints === > > 2019-01-04,14:13:59,806 INFO org.apache.spark.sql.execution.SparkOptimizer: > === Applying Rule > org.apache.spark.sql.catalyst.optimizer.BooleanSimplification === > > 2019-01-04,14:13:59,845 INFO org.apache.spark.sql.execution.SparkOptimizer: > iteration is 46/100, for batch Operator Optimizations > 2019-01-04,14:13:59,849 INFO org.apache.spark.sql.execution.SparkOptimizer: > === Applying Rule > org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin === > > 2019-01-04,14:13:59,849 INFO org.apache.spark.sql.execution.SparkOptimizer: > === Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownPredicate > === > > 2019-01-04,14:14:45,340 INFO org.apache.spark.sql.execution.SparkOptimizer: > === Applying Rule > org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints === > > 2019-01-04,14:14:45,631 INFO org.apache.spark.sql.execution.SparkOptimizer: > === Applying Rule > org.apache.spark.sql.catalyst.optimizer.BooleanSimplification === > > 2019-01-04,14:14:45,678 INFO org.apache.spark.sql.execution.SparkOptimizer: > iteration is 47/100, for batch Operator Optimizations > {code} > 3. The difference between two logical plan appear in BooleanSimplification on > iteration, before this rule, two logical plan is same: > {code:java} > // just a head part of plan > Project [model#2486, version#12, device#11, date#30, imei#13, pType#14, > releaseType#15, regionType#16, expType#17, tag#18, imeiCount#1586, > startdate#2360, enddate#2361, status#2362] > +- Join Inner, (((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > > 10000) || (model#2486 INSET > (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)) > && model#2486 INSET > (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)))) > && ((((((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > > 10000) || (model#2358 INSET > (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)) > && model#2486 INSET > (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)))) > && (date#30 >= startdate#2360)) && (date#30 <= > enddate#2361)) && (model#2486 = model#2358)) && (version#12 = > version#2359))) > :- Project [device#11, version#12, date#30, imei#13, pType#14, > releaseType#15, regionType#16, expType#17, tag#18, imeiCount#1586, > UDF(model#10, device#11) AS model#2486] > : +- Filter ((((((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > > 10000) || UDF(model#10, device#1584) INSET > (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael))) > && ((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > 10000) > || UDF(model#1583, device#11) INSET > (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)))) > && isnotnull(UDF(model#1583, device#11))) && > isnotnull(UDF(model#10, device#1584))) && (((cast(cast(imeiCount#1586 > as decimal(20,0)) as int) > 10000) || UDF(model#10, device#11) INSET > (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael))) > && isnotnull(UDF(model#10, device#11)))) > : +- Join Inner, ((((model#10 = model#1583) && (device#11 = > device#1584)) && (version#12 = version#1585)) && (date#30 = > date#1592)) > {code} > 4. after BooleanSimplification, There is only one difference: Filter's > constraints format like (A || B) && (A || C) on successful app but A && (B || > C) on failed app. > {code:java} > //successful app's logical plan > Filter ((((isnotnull(UDF(model#1583, device#11)) && > ((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > 10000) || > UDF(model#1583, device#11) INSET > (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)))) > && isnotnull(UDF(model#10, device#1584))) && > ((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > 10000) || > UDF(model#10, device#1584) INSET > (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)))) > && (((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > 10000) > || UDF(model#10, device#11) INSET > (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael))) > && isnotnull(UDF(model#10, device#11)))) > // failed app's plan > Filter (((((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > 10000) || > (UDF(model#10, device#1584) INSET > (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)) > && UDF(model#1583, device#11) INSET > (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)))) > && isnotnull(UDF(model#1583, device#11))) && > isnotnull(UDF(model#10, device#1584))) && (((cast(cast(imeiCount#1586 > as decimal(20,0)) as int) > 10000) || UDF(model#10, device#11) INSET > (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael))) > && isnotnull(UDF(model#10, device#11)))) > {code} > 5. hereafter, error occurred with rule InferFiltersFromConstraints: > {code:java} > // code about upon log > case join @ Join(left, right, joinType, conditionOpt) => > // Only consider constraints that can be pushed down completely to > either the left or the > // right child > val constraints = join.constraints.filter { c => > c.references.subsetOf(left.outputSet) || > c.references.subsetOf(right.outputSet) > } > // Remove those constraints that are already enforced by either the > left or the right child > val additionalConstraints = constraints -- (left.constraints ++ > right.constraints) > logInfo( > s""" > | After remove constraints additional constraints is > ${additionalConstraints.toList.toString} > | left constraints is ${left.constraints.toList.toString()} > | right constraints is ${right.constraints.toList.toString()} > """.stripMargin) > {code} > {code:java} > // successful app's log > 2019-01-08,16:44:48,911 INFO > org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints: > After remove constraints additional constraints is > List(((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > 10000) || > (model#2486 INSET > (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)) > && model#2486 INSET > (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael))))) > left constraints is List(isnotnull(model#2486), isnotnull(device#11), > isnotnull(date#30), ((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > > 10000) || model#2486 INSET > (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael))), > ((((((((((((((((((((((((((((((20190107 <=> date#30) || (20190106 > <=> date#30)) || (20190105 <=> date#30)) || (20190104 <=> > date#30)) || (20190103 <=> date#30)) || (20190102 <=> date#30)) > || (20190101 <=> date#30)) || (20181231 <=> date#30)) || > (20181230 <=> date#30)) || (20181229 <=> date#30)) || (20181228 > <=> date#30)) || (20181227 <=> date#30)) || (20181226 <=> > date#30)) || (20181225 <=> date#30)) || (20181224 <=> date#30)) > || (20181223 <=> date#30)) || (20181222 <=> date#30)) || > (20181221 <=> date#30)) || (20181220 <=> date#30)) || (20181219 > <=> date#30)) || (20181218 <=> date#30)) || (20181217 <=> > date#30)) || (20181216 <=> date#30)) || (20181215 <=> date#30)) > || (20181214 <=> date#30)) || (20181213 <=> date#30)) || > (20181212 <=> date#30)) || (20181211 <=> date#30)) || (20181210 > <=> date#30)) || (20181209 <=> date#30)), isnotnull(version#12)) > right constraints is List(isnotnull(enddate#2361), > isnotnull(startdate#2360), isnotnull(model#2358), isnotnull(version#2359)) > //failed app's log > 2019-01-08,16:55:11,614 INFO > org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints: > After remove constraints additional constraints is List() > left constraints is List(isnotnull(date#30), > ((((((((((((((((((((((((((((((20190107 <=> date#30) || (20190106 > <=> date#30)) || (20190105 <=> date#30)) || (20190104 <=> > date#30)) || (20190103 <=> date#30)) || (20190102 <=> date#30)) > || (20190101 <=> date#30)) || (20181231 <=> date#30)) || > (20181230 <=> date#30)) || (20181229 <=> date#30)) || (20181228 > <=> date#30)) || (20181227 <=> date#30)) || (20181226 <=> > date#30)) || (20181225 <=> date#30)) || (20181224 <=> date#30)) > || (20181223 <=> date#30)) || (20181222 <=> date#30)) || > (20181221 <=> date#30)) || (20181220 <=> date#30)) || (20181219 > <=> date#30)) || (20181218 <=> date#30)) || (20181217 <=> > date#30)) || (20181216 <=> date#30)) || (20181215 <=> date#30)) > || (20181214 <=> date#30)) || (20181213 <=> date#30)) || > (20181212 <=> date#30)) || (20181211 <=> date#30)) || (20181210 > <=> date#30)) || (20181209 <=> date#30)), isnotnull(version#12), > isnotnull(device#11), isnotnull(model#2486), ((cast(cast(imeiCount#1586 as > decimal(20,0)) as int) > 10000) || model#2486 INSET > (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael))), > ((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > 10000) || > (model#2486 INSET > (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)) > && model#2486 INSET > (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael))))) > right constraints is List(isnotnull(enddate#2361), > isnotnull(startdate#2360), isnotnull(version#2359), isnotnull(model#2358)) > {code} > Failed app plan's left child has an additional constraint: > {code:java} > ((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > 10000) || > (model#2486 INSET > (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)) > && model#2486 INSET > (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)))) > {code} > 5. Soon after the gap between two app's plan is getting bigger, one will > successful, another hang. It seems there are two possibly reason : > 1. BooleanSimplification is not idempotent > 2. InferFiltersFromConstraints's behavior not correct when child's > constraints has A || ( B && C) instead of (A || B) && (A || C) > I'm not sure which is root casue, could someone follow this question? -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org