[jira] [Created] (SPARK-38093) Set shuffleMergeAllowed to false for a determinate stage after the stage is finalized
Venkata krishnan Sowrirajan created SPARK-38093: --- Summary: Set shuffleMergeAllowed to false for a determinate stage after the stage is finalized Key: SPARK-38093 URL: https://issues.apache.org/jira/browse/SPARK-38093 Project: Spark Issue Type: Sub-task Components: Shuffle Affects Versions: 3.2.1 Reporter: Venkata krishnan Sowrirajan Currently we are setting shuffleMergeAllowed to false before prepareShuffleServicesForShuffleMapStage if the shuffle dependency is already finalized. Ideally it is better to do it right after shuffle dependency finalization for a determinate stage. cc [~mridulm80] -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-38092) Check if shuffleMergeId is the same as the current stage's shuffleMergeId before registering MergeStatus
Venkata krishnan Sowrirajan created SPARK-38092: --- Summary: Check if shuffleMergeId is the same as the current stage's shuffleMergeId before registering MergeStatus Key: SPARK-38092 URL: https://issues.apache.org/jira/browse/SPARK-38092 Project: Spark Issue Type: Sub-task Components: Shuffle Affects Versions: 3.2.1 Reporter: Venkata krishnan Sowrirajan Currently we have handled this in the handleShuffleMergeFinalized during finalization ensuring the finalize request is indeed for the current stage's shuffle dependency shuffleMergeId. The same check has to be done before registering merge statuses as well. -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-38064) Push-based shuffle's internal implementation details should not be exposed as API
Venkata krishnan Sowrirajan created SPARK-38064: --- Summary: Push-based shuffle's internal implementation details should not be exposed as API Key: SPARK-38064 URL: https://issues.apache.org/jira/browse/SPARK-38064 Project: Spark Issue Type: Sub-task Components: Shuffle Affects Versions: 3.2.1 Reporter: Venkata krishnan Sowrirajan Currently the changes added in Dependency.scala for Push-based shuffle are all exposed as Developer API. For eg: `newShuffleMergeState`, `setMergerLocs`, `shuffleMergeFinalized` etc Some of these might be just internal implementation details which should not be leaked. This JIRA Is to appropriately mark them private as needed. cc [~mridulm80][~mshen][~csingh][~Ngone51][~tgraves] -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-37964) Replace usages of slaveTracker to workerTracker in MapOutputTrackerSuite
Venkata krishnan Sowrirajan created SPARK-37964: --- Summary: Replace usages of slaveTracker to workerTracker in MapOutputTrackerSuite Key: SPARK-37964 URL: https://issues.apache.org/jira/browse/SPARK-37964 Project: Spark Issue Type: Sub-task Components: Shuffle Affects Versions: 3.2.0 Reporter: Venkata krishnan Sowrirajan -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-36926) Discrepancy in Q22 of TPCH for Spark 3.2
[ https://issues.apache.org/jira/browse/SPARK-36926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17424265#comment-17424265 ] Venkata krishnan Sowrirajan commented on SPARK-36926: - This looks like a regression given the behavior of other engines like hive, presto, impala even spark 3.1 all return 7 records. > Discrepancy in Q22 of TPCH for Spark 3.2 > > > Key: SPARK-36926 > URL: https://issues.apache.org/jira/browse/SPARK-36926 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.2.0 >Reporter: Aravind Patnam >Priority: Critical > > When running TPCH scale 100 against 3.2, Query 22 has a discrepancy in the > number of rows returned by the query. This was tested with both AQE on and > off. All the other queries were matching in results. Below is the results > that we got when testing Q22 on 3.2: > > {code:java} > "results": [ > { > "name": "Q22", > "mode": "collect", > "parameters": {}, > "joinTypes": [ > "SortMergeJoin" > ], > "tables": [ > "customer" > ], > "parsingTime": 0.016522, > "analysisTime": 0.004132, > "optimizationTime": 39.173868, > "planningTime": 23.10939, > "executionTime": 13762.183844, > "result": 0, > "breakDown": [], > "queryExecution": "== Parsed Logical Plan ==\n'Sort ['cntrycode ASC > NULLS FIRST], true\n+- 'Aggregate ['cntrycode], ['cntrycode, 'count(1) AS > numcust#150, 'sum('c_acctbal) AS totacctbal#151]\n +- 'SubqueryAlias > custsale\n +- 'Project ['substring('c_phone, 1, 2) AS cntrycode#147, > 'c_acctbal]\n +- 'Filter (('substring('c_phone, 1, 2) IN > (13,31,23,29,30,18,17) AND ('c_acctbal > scalar-subquery#148 [])) AND NOT > exists#149 [])\n: :- 'Project [unresolvedalias('avg('c_acctbal), > None)]\n: : +- 'Filter (('c_acctbal > 0.00) AND > 'substring('c_phone, 1, 2) IN (13,31,23,29,30,18,17))\n: : > +- 'UnresolvedRelation [customer], [], false\n: +- 'Project > [*]\n: +- 'Filter ('o_custkey = 'c_custkey)\n: > +- 'UnresolvedRelation [orders], [], false\n+- > 'UnresolvedRelation [customer], [], false\n\n== Analyzed Logical Plan > ==\ncntrycode: string, numcust: bigint, totacctbal: decimal(22,2)\nSort > [cntrycode#147 ASC NULLS FIRST], true\n+- Aggregate [cntrycode#147], > [cntrycode#147, count(1) AS numcust#150L, sum(c_acctbal#11) AS > totacctbal#151]\n +- SubqueryAlias custsale\n +- Project > [substring(c_phone#10, 1, 2) AS cntrycode#147, c_acctbal#11]\n +- > Filter ((substring(c_phone#10, 1, 2) IN (13,31,23,29,30,18,17) AND > (cast(c_acctbal#11 as decimal(16,6)) > cast(scalar-subquery#148 [] as > decimal(16,6 AND NOT exists#149 [c_custkey#6L])\n: :- > Aggregate [avg(c_acctbal#160) AS avg(c_acctbal)#154]\n: : +- > Filter ((cast(c_acctbal#160 as decimal(12,2)) > cast(0.00 as decimal(12,2))) > AND substring(c_phone#159, 1, 2) IN (13,31,23,29,30,18,17))\n: : > +- SubqueryAlias spark_catalog.tpch_data_orc_100.customer\n: > :+- Relation > tpch_data_orc_100.customer[c_custkey#155L,c_name#156,c_address#157,c_nationkey#158L,c_phone#159,c_acctbal#160,c_comment#161,c_mktsegment#162] > orc\n: +- Project [o_orderkey#16L, o_custkey#17L, > o_orderstatus#18, o_totalprice#19, o_orderpriority#20, o_clerk#21, > o_shippriority#22, o_comment#23, o_orderdate#24]\n: +- Filter > (o_custkey#17L = outer(c_custkey#6L))\n:+- SubqueryAlias > spark_catalog.tpch_data_orc_100.orders\n: +- Relation > tpch_data_orc_100.orders[o_orderkey#16L,o_custkey#17L,o_orderstatus#18,o_totalprice#19,o_orderpriority#20,o_clerk#21,o_shippriority#22,o_comment#23,o_orderdate#24] > orc\n+- SubqueryAlias spark_catalog.tpch_data_orc_100.customer\n > +- Relation > tpch_data_orc_100.customer[c_custkey#6L,c_name#7,c_address#8,c_nationkey#9L,c_phone#10,c_acctbal#11,c_comment#12,c_mktsegment#13] > orc\n\n== Optimized Logical Plan ==\nSort [cntrycode#147 ASC NULLS FIRST], > true\n+- Aggregate [cntrycode#147], [cntrycode#147, count(1) AS numcust#150L, > sum(c_acctbal#11) AS totacctbal#151]\n +- Project [substring(c_phone#10, 1, > 2) AS cntrycode#147, c_acctbal#11]\n +- Join LeftAnti, (o_custkey#17L = > c_custkey#6L)\n :- Project [c_custkey#6L, c_phone#10, c_acctbal#11]\n > : +- Filter ((isnotnull(c_acctbal#11) AND substring(c_phone#10, 1, > 2) IN (13,31,23,29,30,18,17)) AND (cast(c_acctbal#11 as decimal(16,6)) > > scalar-subquery#148 []))\n : : +- Aggregate [avg(c_acctbal#160) > AS avg(c_acctbal)#154]\n
[jira] [Comment Edited] (SPARK-36926) Discrepancy in Q22 of TPCH for Spark 3.2
[ https://issues.apache.org/jira/browse/SPARK-36926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17424112#comment-17424112 ] Venkata krishnan Sowrirajan edited comment on SPARK-36926 at 10/4/21, 7:44 PM: --- I tried to get a simpler repro. {code:java} import spark.implicits._ val buf = new scala.collection.mutable.ArrayBuffer[Double] for (i <- 0.toDouble to .toDouble by 0.01) for (j <- 0 to 5) { buf += i } val df = buf.toDF df.selectExpr("cast(value as decimal(12, 2))").agg(avg("value")).show Array[null] df.selectExpr("cast(value as decimal(16, 2))").agg(avg("value")).show +---+ | avg(value)| +---+ |4999.50| +---+ {code} was (Author: vsowrirajan): I tried to get a simpler repro. {code:java} import spark.implicits._ val buf = new scala.collection.mutable.ArrayBuffer[Double] for (i <- 0.toDouble to .toDouble by 0.01) for (j <- 0 to 5) \{ buf += i } val df = buf.toDF df.selectExpr("cast(value as decimal(12, 2))").agg(avg("value")).show Array[null] df.selectExpr("cast(value as decimal(16, 2))").agg(avg("value")).show +---+ | avg(value)| +---+ |4999.50| +---+ {code} > Discrepancy in Q22 of TPCH for Spark 3.2 > > > Key: SPARK-36926 > URL: https://issues.apache.org/jira/browse/SPARK-36926 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.2.0 >Reporter: Aravind Patnam >Priority: Major > > When running TPCH scale 100 against 3.2, Query 22 has a discrepancy in the > number of rows returned by the query. This was tested with both AQE on and > off. All the other queries were matching in results. Below is the results > that we got when testing Q22 on 3.2: > > {code:java} > "results": [ > { > "name": "Q22", > "mode": "collect", > "parameters": {}, > "joinTypes": [ > "SortMergeJoin" > ], > "tables": [ > "customer" > ], > "parsingTime": 0.016522, > "analysisTime": 0.004132, > "optimizationTime": 39.173868, > "planningTime": 23.10939, > "executionTime": 13762.183844, > "result": 0, > "breakDown": [], > "queryExecution": "== Parsed Logical Plan ==\n'Sort ['cntrycode ASC > NULLS FIRST], true\n+- 'Aggregate ['cntrycode], ['cntrycode, 'count(1) AS > numcust#150, 'sum('c_acctbal) AS totacctbal#151]\n +- 'SubqueryAlias > custsale\n +- 'Project ['substring('c_phone, 1, 2) AS cntrycode#147, > 'c_acctbal]\n +- 'Filter (('substring('c_phone, 1, 2) IN > (13,31,23,29,30,18,17) AND ('c_acctbal > scalar-subquery#148 [])) AND NOT > exists#149 [])\n: :- 'Project [unresolvedalias('avg('c_acctbal), > None)]\n: : +- 'Filter (('c_acctbal > 0.00) AND > 'substring('c_phone, 1, 2) IN (13,31,23,29,30,18,17))\n: : > +- 'UnresolvedRelation [customer], [], false\n: +- 'Project > [*]\n: +- 'Filter ('o_custkey = 'c_custkey)\n: > +- 'UnresolvedRelation [orders], [], false\n+- > 'UnresolvedRelation [customer], [], false\n\n== Analyzed Logical Plan > ==\ncntrycode: string, numcust: bigint, totacctbal: decimal(22,2)\nSort > [cntrycode#147 ASC NULLS FIRST], true\n+- Aggregate [cntrycode#147], > [cntrycode#147, count(1) AS numcust#150L, sum(c_acctbal#11) AS > totacctbal#151]\n +- SubqueryAlias custsale\n +- Project > [substring(c_phone#10, 1, 2) AS cntrycode#147, c_acctbal#11]\n +- > Filter ((substring(c_phone#10, 1, 2) IN (13,31,23,29,30,18,17) AND > (cast(c_acctbal#11 as decimal(16,6)) > cast(scalar-subquery#148 [] as > decimal(16,6 AND NOT exists#149 [c_custkey#6L])\n: :- > Aggregate [avg(c_acctbal#160) AS avg(c_acctbal)#154]\n: : +- > Filter ((cast(c_acctbal#160 as decimal(12,2)) > cast(0.00 as decimal(12,2))) > AND substring(c_phone#159, 1, 2) IN (13,31,23,29,30,18,17))\n: : > +- SubqueryAlias spark_catalog.tpch_data_orc_100.customer\n: > :+- Relation > tpch_data_orc_100.customer[c_custkey#155L,c_name#156,c_address#157,c_nationkey#158L,c_phone#159,c_acctbal#160,c_comment#161,c_mktsegment#162] > orc\n: +- Project [o_orderkey#16L, o_custkey#17L, > o_orderstatus#18, o_totalprice#19, o_orderpriority#20, o_clerk#21, > o_shippriority#22, o_comment#23, o_orderdate#24]\n: +- Filter > (o_custkey#17L = outer(c_custkey#6L))\n:+- SubqueryAlias > spark_catalog.tpch_data_orc_100.orders\n: +- Relation > tpch_data_orc_100.orders[o_orderkey#16L,o_custkey#17L,o_orderstatus#18,o_totalprice#19,o_orderpriority#20,o_clerk#21,o_shippriority#22,o_comment#23,o_orderdate#24] > orc\n+- SubqueryAlias
[jira] [Commented] (SPARK-36926) Discrepancy in Q22 of TPCH for Spark 3.2
[ https://issues.apache.org/jira/browse/SPARK-36926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17424112#comment-17424112 ] Venkata krishnan Sowrirajan commented on SPARK-36926: - I tried to get a simpler repro. {code:java} import spark.implicits._ val buf = new scala.collection.mutable.ArrayBuffer[Double] for (i <- 0.toDouble to .toDouble by 0.01) for (j <- 0 to 5) \{ buf += i } val df = buf.toDF df.selectExpr("cast(value as decimal(12, 2))").agg(avg("value")).show Array[null] df.selectExpr("cast(value as decimal(16, 2))").agg(avg("value")).show +---+ | avg(value)| +---+ |4999.50| +---+ {code} > Discrepancy in Q22 of TPCH for Spark 3.2 > > > Key: SPARK-36926 > URL: https://issues.apache.org/jira/browse/SPARK-36926 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.2.0 >Reporter: Aravind Patnam >Priority: Major > > When running TPCH scale 100 against 3.2, Query 22 has a discrepancy in the > number of rows returned by the query. This was tested with both AQE on and > off. All the other queries were matching in results. Below is the results > that we got when testing Q22 on 3.2: > > {code:java} > "results": [ > { > "name": "Q22", > "mode": "collect", > "parameters": {}, > "joinTypes": [ > "SortMergeJoin" > ], > "tables": [ > "customer" > ], > "parsingTime": 0.016522, > "analysisTime": 0.004132, > "optimizationTime": 39.173868, > "planningTime": 23.10939, > "executionTime": 13762.183844, > "result": 0, > "breakDown": [], > "queryExecution": "== Parsed Logical Plan ==\n'Sort ['cntrycode ASC > NULLS FIRST], true\n+- 'Aggregate ['cntrycode], ['cntrycode, 'count(1) AS > numcust#150, 'sum('c_acctbal) AS totacctbal#151]\n +- 'SubqueryAlias > custsale\n +- 'Project ['substring('c_phone, 1, 2) AS cntrycode#147, > 'c_acctbal]\n +- 'Filter (('substring('c_phone, 1, 2) IN > (13,31,23,29,30,18,17) AND ('c_acctbal > scalar-subquery#148 [])) AND NOT > exists#149 [])\n: :- 'Project [unresolvedalias('avg('c_acctbal), > None)]\n: : +- 'Filter (('c_acctbal > 0.00) AND > 'substring('c_phone, 1, 2) IN (13,31,23,29,30,18,17))\n: : > +- 'UnresolvedRelation [customer], [], false\n: +- 'Project > [*]\n: +- 'Filter ('o_custkey = 'c_custkey)\n: > +- 'UnresolvedRelation [orders], [], false\n+- > 'UnresolvedRelation [customer], [], false\n\n== Analyzed Logical Plan > ==\ncntrycode: string, numcust: bigint, totacctbal: decimal(22,2)\nSort > [cntrycode#147 ASC NULLS FIRST], true\n+- Aggregate [cntrycode#147], > [cntrycode#147, count(1) AS numcust#150L, sum(c_acctbal#11) AS > totacctbal#151]\n +- SubqueryAlias custsale\n +- Project > [substring(c_phone#10, 1, 2) AS cntrycode#147, c_acctbal#11]\n +- > Filter ((substring(c_phone#10, 1, 2) IN (13,31,23,29,30,18,17) AND > (cast(c_acctbal#11 as decimal(16,6)) > cast(scalar-subquery#148 [] as > decimal(16,6 AND NOT exists#149 [c_custkey#6L])\n: :- > Aggregate [avg(c_acctbal#160) AS avg(c_acctbal)#154]\n: : +- > Filter ((cast(c_acctbal#160 as decimal(12,2)) > cast(0.00 as decimal(12,2))) > AND substring(c_phone#159, 1, 2) IN (13,31,23,29,30,18,17))\n: : > +- SubqueryAlias spark_catalog.tpch_data_orc_100.customer\n: > :+- Relation > tpch_data_orc_100.customer[c_custkey#155L,c_name#156,c_address#157,c_nationkey#158L,c_phone#159,c_acctbal#160,c_comment#161,c_mktsegment#162] > orc\n: +- Project [o_orderkey#16L, o_custkey#17L, > o_orderstatus#18, o_totalprice#19, o_orderpriority#20, o_clerk#21, > o_shippriority#22, o_comment#23, o_orderdate#24]\n: +- Filter > (o_custkey#17L = outer(c_custkey#6L))\n:+- SubqueryAlias > spark_catalog.tpch_data_orc_100.orders\n: +- Relation > tpch_data_orc_100.orders[o_orderkey#16L,o_custkey#17L,o_orderstatus#18,o_totalprice#19,o_orderpriority#20,o_clerk#21,o_shippriority#22,o_comment#23,o_orderdate#24] > orc\n+- SubqueryAlias spark_catalog.tpch_data_orc_100.customer\n > +- Relation > tpch_data_orc_100.customer[c_custkey#6L,c_name#7,c_address#8,c_nationkey#9L,c_phone#10,c_acctbal#11,c_comment#12,c_mktsegment#13] > orc\n\n== Optimized Logical Plan ==\nSort [cntrycode#147 ASC NULLS FIRST], > true\n+- Aggregate [cntrycode#147], [cntrycode#147, count(1) AS numcust#150L, > sum(c_acctbal#11) AS totacctbal#151]\n +- Project [substring(c_phone#10, 1, > 2) AS cntrycode#147, c_acctbal#11]\n +- Join LeftAnti, (o_custkey#17L = > c_custkey#6L)\n
[jira] [Comment Edited] (SPARK-36926) Discrepancy in Q22 of TPCH for Spark 3.2
[ https://issues.apache.org/jira/browse/SPARK-36926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17424080#comment-17424080 ] Venkata krishnan Sowrirajan edited comment on SPARK-36926 at 10/4/21, 6:53 PM: --- The issue seems to be with this subquery part of q22. {code:java} sql("""select avg(c_acctbal) from customer where c_acctbal > 0.00 and substr(c_phone, 1, 2) in ('13', '31', '23', '29', '30', '18', '17')""").collect Array([null]) {code} {code:java} sql("""select sum(c_acctbal)/count(c_acctbal) from customer where c_acctbal > 0.00 and substr(c_phone, 1, 2) in ('13', '31', '23', '29', '30', '18', '17')""").collect Array([4998.769056963132322922]){code} It seems like there is some behavior change wrt 3.2. was (Author: vsowrirajan): The issue seems to be with this subquery part of q22. {code:java} sql("""select avg(c_acctbal) from customer where c_acctbal > 0.00 and substr(c_phone, 1, 2) in ('13', '31', '23', '29', '30', '18', '17')""").collect Array([null]) {code} {code:java} sql("""select sum(c_acctbal)/count(c_acctbal) from customer where c_acctbal > 0.00 and substr(c_phone, 1, 2) in ('13', '31', '23', '29', '30', '18', '17')""").collect Array([4998.769056963132322922]){code} It seems like there is some behavior change wrt 3.2 handling nulls. > Discrepancy in Q22 of TPCH for Spark 3.2 > > > Key: SPARK-36926 > URL: https://issues.apache.org/jira/browse/SPARK-36926 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.2.0 >Reporter: Aravind Patnam >Priority: Major > > When running TPCH scale 100 against 3.2, Query 22 has a discrepancy in the > number of rows returned by the query. This was tested with both AQE on and > off. All the other queries were matching in results. Below is the results > that we got when testing Q22 on 3.2: > > {code:java} > "results": [ > { > "name": "Q22", > "mode": "collect", > "parameters": {}, > "joinTypes": [ > "SortMergeJoin" > ], > "tables": [ > "customer" > ], > "parsingTime": 0.016522, > "analysisTime": 0.004132, > "optimizationTime": 39.173868, > "planningTime": 23.10939, > "executionTime": 13762.183844, > "result": 0, > "breakDown": [], > "queryExecution": "== Parsed Logical Plan ==\n'Sort ['cntrycode ASC > NULLS FIRST], true\n+- 'Aggregate ['cntrycode], ['cntrycode, 'count(1) AS > numcust#150, 'sum('c_acctbal) AS totacctbal#151]\n +- 'SubqueryAlias > custsale\n +- 'Project ['substring('c_phone, 1, 2) AS cntrycode#147, > 'c_acctbal]\n +- 'Filter (('substring('c_phone, 1, 2) IN > (13,31,23,29,30,18,17) AND ('c_acctbal > scalar-subquery#148 [])) AND NOT > exists#149 [])\n: :- 'Project [unresolvedalias('avg('c_acctbal), > None)]\n: : +- 'Filter (('c_acctbal > 0.00) AND > 'substring('c_phone, 1, 2) IN (13,31,23,29,30,18,17))\n: : > +- 'UnresolvedRelation [customer], [], false\n: +- 'Project > [*]\n: +- 'Filter ('o_custkey = 'c_custkey)\n: > +- 'UnresolvedRelation [orders], [], false\n+- > 'UnresolvedRelation [customer], [], false\n\n== Analyzed Logical Plan > ==\ncntrycode: string, numcust: bigint, totacctbal: decimal(22,2)\nSort > [cntrycode#147 ASC NULLS FIRST], true\n+- Aggregate [cntrycode#147], > [cntrycode#147, count(1) AS numcust#150L, sum(c_acctbal#11) AS > totacctbal#151]\n +- SubqueryAlias custsale\n +- Project > [substring(c_phone#10, 1, 2) AS cntrycode#147, c_acctbal#11]\n +- > Filter ((substring(c_phone#10, 1, 2) IN (13,31,23,29,30,18,17) AND > (cast(c_acctbal#11 as decimal(16,6)) > cast(scalar-subquery#148 [] as > decimal(16,6 AND NOT exists#149 [c_custkey#6L])\n: :- > Aggregate [avg(c_acctbal#160) AS avg(c_acctbal)#154]\n: : +- > Filter ((cast(c_acctbal#160 as decimal(12,2)) > cast(0.00 as decimal(12,2))) > AND substring(c_phone#159, 1, 2) IN (13,31,23,29,30,18,17))\n: : > +- SubqueryAlias spark_catalog.tpch_data_orc_100.customer\n: > :+- Relation > tpch_data_orc_100.customer[c_custkey#155L,c_name#156,c_address#157,c_nationkey#158L,c_phone#159,c_acctbal#160,c_comment#161,c_mktsegment#162] > orc\n: +- Project [o_orderkey#16L, o_custkey#17L, > o_orderstatus#18, o_totalprice#19, o_orderpriority#20, o_clerk#21, > o_shippriority#22, o_comment#23, o_orderdate#24]\n: +- Filter > (o_custkey#17L = outer(c_custkey#6L))\n:+- SubqueryAlias > spark_catalog.tpch_data_orc_100.orders\n: +- Relation >
[jira] [Comment Edited] (SPARK-36926) Discrepancy in Q22 of TPCH for Spark 3.2
[ https://issues.apache.org/jira/browse/SPARK-36926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17424080#comment-17424080 ] Venkata krishnan Sowrirajan edited comment on SPARK-36926 at 10/4/21, 6:05 PM: --- The issue seems to be with this subquery part of q22. {code:java} sql("""select avg(c_acctbal) from customer where c_acctbal > 0.00 and substr(c_phone, 1, 2) in ('13', '31', '23', '29', '30', '18', '17')""").collect Array([null]) {code} {code:java} sql("""select sum(c_acctbal)/count(c_acctbal) from customer where c_acctbal > 0.00 and substr(c_phone, 1, 2) in ('13', '31', '23', '29', '30', '18', '17')""").collect Array([4998.769056963132322922]){code} It seems like there is some behavior change wrt 3.2 handling nulls. was (Author: vsowrirajan): The issue seems to be with this subquery part of q22. {code:java} sql("""select avg(c_acctbal) from customer where c_acctbal > 0.00 and substr(c_phone, 1, 2) in ('13', '31', '23', '29', '30', '18', '17')""").collect Array([null]) {code} {code:java} sql("""select sum(c_acctbal)/count(c_acctbal) from customer where c_acctbal > 0.00 and substr(c_phone, 1, 2) in ('13', '31', '23', '29', '30', '18', '17')""").collect Array([4998.769056963132322922]){code} It seems like there is some behavior change wrt 3.2 handling nulls. > Discrepancy in Q22 of TPCH for Spark 3.2 > > > Key: SPARK-36926 > URL: https://issues.apache.org/jira/browse/SPARK-36926 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.2.0 >Reporter: Aravind Patnam >Priority: Major > > When running TPCH scale 100 against 3.2, Query 22 has a discrepancy in the > number of rows returned by the query. This was tested with both AQE on and > off. All the other queries were matching in results. Below is the results > that we got when testing Q22 on 3.2: > > {code:java} > "results": [ > { > "name": "Q22", > "mode": "collect", > "parameters": {}, > "joinTypes": [ > "SortMergeJoin" > ], > "tables": [ > "customer" > ], > "parsingTime": 0.016522, > "analysisTime": 0.004132, > "optimizationTime": 39.173868, > "planningTime": 23.10939, > "executionTime": 13762.183844, > "result": 0, > "breakDown": [], > "queryExecution": "== Parsed Logical Plan ==\n'Sort ['cntrycode ASC > NULLS FIRST], true\n+- 'Aggregate ['cntrycode], ['cntrycode, 'count(1) AS > numcust#150, 'sum('c_acctbal) AS totacctbal#151]\n +- 'SubqueryAlias > custsale\n +- 'Project ['substring('c_phone, 1, 2) AS cntrycode#147, > 'c_acctbal]\n +- 'Filter (('substring('c_phone, 1, 2) IN > (13,31,23,29,30,18,17) AND ('c_acctbal > scalar-subquery#148 [])) AND NOT > exists#149 [])\n: :- 'Project [unresolvedalias('avg('c_acctbal), > None)]\n: : +- 'Filter (('c_acctbal > 0.00) AND > 'substring('c_phone, 1, 2) IN (13,31,23,29,30,18,17))\n: : > +- 'UnresolvedRelation [customer], [], false\n: +- 'Project > [*]\n: +- 'Filter ('o_custkey = 'c_custkey)\n: > +- 'UnresolvedRelation [orders], [], false\n+- > 'UnresolvedRelation [customer], [], false\n\n== Analyzed Logical Plan > ==\ncntrycode: string, numcust: bigint, totacctbal: decimal(22,2)\nSort > [cntrycode#147 ASC NULLS FIRST], true\n+- Aggregate [cntrycode#147], > [cntrycode#147, count(1) AS numcust#150L, sum(c_acctbal#11) AS > totacctbal#151]\n +- SubqueryAlias custsale\n +- Project > [substring(c_phone#10, 1, 2) AS cntrycode#147, c_acctbal#11]\n +- > Filter ((substring(c_phone#10, 1, 2) IN (13,31,23,29,30,18,17) AND > (cast(c_acctbal#11 as decimal(16,6)) > cast(scalar-subquery#148 [] as > decimal(16,6 AND NOT exists#149 [c_custkey#6L])\n: :- > Aggregate [avg(c_acctbal#160) AS avg(c_acctbal)#154]\n: : +- > Filter ((cast(c_acctbal#160 as decimal(12,2)) > cast(0.00 as decimal(12,2))) > AND substring(c_phone#159, 1, 2) IN (13,31,23,29,30,18,17))\n: : > +- SubqueryAlias spark_catalog.tpch_data_orc_100.customer\n: > :+- Relation > tpch_data_orc_100.customer[c_custkey#155L,c_name#156,c_address#157,c_nationkey#158L,c_phone#159,c_acctbal#160,c_comment#161,c_mktsegment#162] > orc\n: +- Project [o_orderkey#16L, o_custkey#17L, > o_orderstatus#18, o_totalprice#19, o_orderpriority#20, o_clerk#21, > o_shippriority#22, o_comment#23, o_orderdate#24]\n: +- Filter > (o_custkey#17L = outer(c_custkey#6L))\n:+- SubqueryAlias > spark_catalog.tpch_data_orc_100.orders\n: +- Relation >
[jira] [Commented] (SPARK-36926) Discrepancy in Q22 of TPCH for Spark 3.2
[ https://issues.apache.org/jira/browse/SPARK-36926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17424080#comment-17424080 ] Venkata krishnan Sowrirajan commented on SPARK-36926: - The issue seems to be with this subquery part of q22. sql("""select avg(c_acctbal) from customer where c_acctbal > 0.00 and substr(c_phone, 1, 2) in ('13', '31', '23', '29', '30', '18', '17')""").collect Array([null]) sql("""select sum(c_acctbal)/count(c_acctbal) from customer where c_acctbal > 0.00 and substr(c_phone, 1, 2) in ('13', '31', '23', '29', '30', '18', '17')""").collect Array([4998.769056963132322922]) It seems like there is some behavior change wrt 3.2 handling nulls. > Discrepancy in Q22 of TPCH for Spark 3.2 > > > Key: SPARK-36926 > URL: https://issues.apache.org/jira/browse/SPARK-36926 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.2.0 >Reporter: Aravind Patnam >Priority: Major > > When running TPCH scale 100 against 3.2, Query 22 has a discrepancy in the > number of rows returned by the query. This was tested with both AQE on and > off. All the other queries were matching in results. Below is the results > that we got when testing Q22 on 3.2: > > {code:java} > "results": [ > { > "name": "Q22", > "mode": "collect", > "parameters": {}, > "joinTypes": [ > "SortMergeJoin" > ], > "tables": [ > "customer" > ], > "parsingTime": 0.016522, > "analysisTime": 0.004132, > "optimizationTime": 39.173868, > "planningTime": 23.10939, > "executionTime": 13762.183844, > "result": 0, > "breakDown": [], > "queryExecution": "== Parsed Logical Plan ==\n'Sort ['cntrycode ASC > NULLS FIRST], true\n+- 'Aggregate ['cntrycode], ['cntrycode, 'count(1) AS > numcust#150, 'sum('c_acctbal) AS totacctbal#151]\n +- 'SubqueryAlias > custsale\n +- 'Project ['substring('c_phone, 1, 2) AS cntrycode#147, > 'c_acctbal]\n +- 'Filter (('substring('c_phone, 1, 2) IN > (13,31,23,29,30,18,17) AND ('c_acctbal > scalar-subquery#148 [])) AND NOT > exists#149 [])\n: :- 'Project [unresolvedalias('avg('c_acctbal), > None)]\n: : +- 'Filter (('c_acctbal > 0.00) AND > 'substring('c_phone, 1, 2) IN (13,31,23,29,30,18,17))\n: : > +- 'UnresolvedRelation [customer], [], false\n: +- 'Project > [*]\n: +- 'Filter ('o_custkey = 'c_custkey)\n: > +- 'UnresolvedRelation [orders], [], false\n+- > 'UnresolvedRelation [customer], [], false\n\n== Analyzed Logical Plan > ==\ncntrycode: string, numcust: bigint, totacctbal: decimal(22,2)\nSort > [cntrycode#147 ASC NULLS FIRST], true\n+- Aggregate [cntrycode#147], > [cntrycode#147, count(1) AS numcust#150L, sum(c_acctbal#11) AS > totacctbal#151]\n +- SubqueryAlias custsale\n +- Project > [substring(c_phone#10, 1, 2) AS cntrycode#147, c_acctbal#11]\n +- > Filter ((substring(c_phone#10, 1, 2) IN (13,31,23,29,30,18,17) AND > (cast(c_acctbal#11 as decimal(16,6)) > cast(scalar-subquery#148 [] as > decimal(16,6 AND NOT exists#149 [c_custkey#6L])\n: :- > Aggregate [avg(c_acctbal#160) AS avg(c_acctbal)#154]\n: : +- > Filter ((cast(c_acctbal#160 as decimal(12,2)) > cast(0.00 as decimal(12,2))) > AND substring(c_phone#159, 1, 2) IN (13,31,23,29,30,18,17))\n: : > +- SubqueryAlias spark_catalog.tpch_data_orc_100.customer\n: > :+- Relation > tpch_data_orc_100.customer[c_custkey#155L,c_name#156,c_address#157,c_nationkey#158L,c_phone#159,c_acctbal#160,c_comment#161,c_mktsegment#162] > orc\n: +- Project [o_orderkey#16L, o_custkey#17L, > o_orderstatus#18, o_totalprice#19, o_orderpriority#20, o_clerk#21, > o_shippriority#22, o_comment#23, o_orderdate#24]\n: +- Filter > (o_custkey#17L = outer(c_custkey#6L))\n:+- SubqueryAlias > spark_catalog.tpch_data_orc_100.orders\n: +- Relation > tpch_data_orc_100.orders[o_orderkey#16L,o_custkey#17L,o_orderstatus#18,o_totalprice#19,o_orderpriority#20,o_clerk#21,o_shippriority#22,o_comment#23,o_orderdate#24] > orc\n+- SubqueryAlias spark_catalog.tpch_data_orc_100.customer\n > +- Relation > tpch_data_orc_100.customer[c_custkey#6L,c_name#7,c_address#8,c_nationkey#9L,c_phone#10,c_acctbal#11,c_comment#12,c_mktsegment#13] > orc\n\n== Optimized Logical Plan ==\nSort [cntrycode#147 ASC NULLS FIRST], > true\n+- Aggregate [cntrycode#147], [cntrycode#147, count(1) AS numcust#150L, > sum(c_acctbal#11) AS totacctbal#151]\n +- Project [substring(c_phone#10, 1, > 2) AS cntrycode#147, c_acctbal#11]\n +- Join LeftAnti,
[jira] [Comment Edited] (SPARK-36926) Discrepancy in Q22 of TPCH for Spark 3.2
[ https://issues.apache.org/jira/browse/SPARK-36926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17424080#comment-17424080 ] Venkata krishnan Sowrirajan edited comment on SPARK-36926 at 10/4/21, 6:04 PM: --- The issue seems to be with this subquery part of q22. {code:java} sql("""select avg(c_acctbal) from customer where c_acctbal > 0.00 and substr(c_phone, 1, 2) in ('13', '31', '23', '29', '30', '18', '17')""").collect Array([null]) {code} {code:java} sql("""select sum(c_acctbal)/count(c_acctbal) from customer where c_acctbal > 0.00 and substr(c_phone, 1, 2) in ('13', '31', '23', '29', '30', '18', '17')""").collect Array([4998.769056963132322922]){code} It seems like there is some behavior change wrt 3.2 handling nulls. was (Author: vsowrirajan): The issue seems to be with this subquery part of q22. sql("""select avg(c_acctbal) from customer where c_acctbal > 0.00 and substr(c_phone, 1, 2) in ('13', '31', '23', '29', '30', '18', '17')""").collect Array([null]) sql("""select sum(c_acctbal)/count(c_acctbal) from customer where c_acctbal > 0.00 and substr(c_phone, 1, 2) in ('13', '31', '23', '29', '30', '18', '17')""").collect Array([4998.769056963132322922]) It seems like there is some behavior change wrt 3.2 handling nulls. > Discrepancy in Q22 of TPCH for Spark 3.2 > > > Key: SPARK-36926 > URL: https://issues.apache.org/jira/browse/SPARK-36926 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.2.0 >Reporter: Aravind Patnam >Priority: Major > > When running TPCH scale 100 against 3.2, Query 22 has a discrepancy in the > number of rows returned by the query. This was tested with both AQE on and > off. All the other queries were matching in results. Below is the results > that we got when testing Q22 on 3.2: > > {code:java} > "results": [ > { > "name": "Q22", > "mode": "collect", > "parameters": {}, > "joinTypes": [ > "SortMergeJoin" > ], > "tables": [ > "customer" > ], > "parsingTime": 0.016522, > "analysisTime": 0.004132, > "optimizationTime": 39.173868, > "planningTime": 23.10939, > "executionTime": 13762.183844, > "result": 0, > "breakDown": [], > "queryExecution": "== Parsed Logical Plan ==\n'Sort ['cntrycode ASC > NULLS FIRST], true\n+- 'Aggregate ['cntrycode], ['cntrycode, 'count(1) AS > numcust#150, 'sum('c_acctbal) AS totacctbal#151]\n +- 'SubqueryAlias > custsale\n +- 'Project ['substring('c_phone, 1, 2) AS cntrycode#147, > 'c_acctbal]\n +- 'Filter (('substring('c_phone, 1, 2) IN > (13,31,23,29,30,18,17) AND ('c_acctbal > scalar-subquery#148 [])) AND NOT > exists#149 [])\n: :- 'Project [unresolvedalias('avg('c_acctbal), > None)]\n: : +- 'Filter (('c_acctbal > 0.00) AND > 'substring('c_phone, 1, 2) IN (13,31,23,29,30,18,17))\n: : > +- 'UnresolvedRelation [customer], [], false\n: +- 'Project > [*]\n: +- 'Filter ('o_custkey = 'c_custkey)\n: > +- 'UnresolvedRelation [orders], [], false\n+- > 'UnresolvedRelation [customer], [], false\n\n== Analyzed Logical Plan > ==\ncntrycode: string, numcust: bigint, totacctbal: decimal(22,2)\nSort > [cntrycode#147 ASC NULLS FIRST], true\n+- Aggregate [cntrycode#147], > [cntrycode#147, count(1) AS numcust#150L, sum(c_acctbal#11) AS > totacctbal#151]\n +- SubqueryAlias custsale\n +- Project > [substring(c_phone#10, 1, 2) AS cntrycode#147, c_acctbal#11]\n +- > Filter ((substring(c_phone#10, 1, 2) IN (13,31,23,29,30,18,17) AND > (cast(c_acctbal#11 as decimal(16,6)) > cast(scalar-subquery#148 [] as > decimal(16,6 AND NOT exists#149 [c_custkey#6L])\n: :- > Aggregate [avg(c_acctbal#160) AS avg(c_acctbal)#154]\n: : +- > Filter ((cast(c_acctbal#160 as decimal(12,2)) > cast(0.00 as decimal(12,2))) > AND substring(c_phone#159, 1, 2) IN (13,31,23,29,30,18,17))\n: : > +- SubqueryAlias spark_catalog.tpch_data_orc_100.customer\n: > :+- Relation > tpch_data_orc_100.customer[c_custkey#155L,c_name#156,c_address#157,c_nationkey#158L,c_phone#159,c_acctbal#160,c_comment#161,c_mktsegment#162] > orc\n: +- Project [o_orderkey#16L, o_custkey#17L, > o_orderstatus#18, o_totalprice#19, o_orderpriority#20, o_clerk#21, > o_shippriority#22, o_comment#23, o_orderdate#24]\n: +- Filter > (o_custkey#17L = outer(c_custkey#6L))\n:+- SubqueryAlias > spark_catalog.tpch_data_orc_100.orders\n: +- Relation >
[jira] [Updated] (SPARK-36926) Discrepancy in Q22 of TPCH for Spark 3.2
[ https://issues.apache.org/jira/browse/SPARK-36926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Venkata krishnan Sowrirajan updated SPARK-36926: Description: When running TPCH scale 100 against 3.2, Query 22 has a discrepancy in the number of rows returned by the query. This was tested with both AQE on and off. All the other queries were matching in results. Below is the results that we got when testing Q22 on 3.2: {code:java} "results": [ { "name": "Q22", "mode": "collect", "parameters": {}, "joinTypes": [ "SortMergeJoin" ], "tables": [ "customer" ], "parsingTime": 0.016522, "analysisTime": 0.004132, "optimizationTime": 39.173868, "planningTime": 23.10939, "executionTime": 13762.183844, "result": 0, "breakDown": [], "queryExecution": "== Parsed Logical Plan ==\n'Sort ['cntrycode ASC NULLS FIRST], true\n+- 'Aggregate ['cntrycode], ['cntrycode, 'count(1) AS numcust#150, 'sum('c_acctbal) AS totacctbal#151]\n +- 'SubqueryAlias custsale\n +- 'Project ['substring('c_phone, 1, 2) AS cntrycode#147, 'c_acctbal]\n +- 'Filter (('substring('c_phone, 1, 2) IN (13,31,23,29,30,18,17) AND ('c_acctbal > scalar-subquery#148 [])) AND NOT exists#149 [])\n: :- 'Project [unresolvedalias('avg('c_acctbal), None)]\n: : +- 'Filter (('c_acctbal > 0.00) AND 'substring('c_phone, 1, 2) IN (13,31,23,29,30,18,17))\n: : +- 'UnresolvedRelation [customer], [], false\n: +- 'Project [*]\n : +- 'Filter ('o_custkey = 'c_custkey)\n:+- 'UnresolvedRelation [orders], [], false\n+- 'UnresolvedRelation [customer], [], false\n\n== Analyzed Logical Plan ==\ncntrycode: string, numcust: bigint, totacctbal: decimal(22,2)\nSort [cntrycode#147 ASC NULLS FIRST], true\n+- Aggregate [cntrycode#147], [cntrycode#147, count(1) AS numcust#150L, sum(c_acctbal#11) AS totacctbal#151]\n +- SubqueryAlias custsale\n +- Project [substring(c_phone#10, 1, 2) AS cntrycode#147, c_acctbal#11]\n +- Filter ((substring(c_phone#10, 1, 2) IN (13,31,23,29,30,18,17) AND (cast(c_acctbal#11 as decimal(16,6)) > cast(scalar-subquery#148 [] as decimal(16,6 AND NOT exists#149 [c_custkey#6L])\n: :- Aggregate [avg(c_acctbal#160) AS avg(c_acctbal)#154]\n: : +- Filter ((cast(c_acctbal#160 as decimal(12,2)) > cast(0.00 as decimal(12,2))) AND substring(c_phone#159, 1, 2) IN (13,31,23,29,30,18,17))\n: : +- SubqueryAlias spark_catalog.tpch_data_orc_100.customer\n: :+- Relation tpch_data_orc_100.customer[c_custkey#155L,c_name#156,c_address#157,c_nationkey#158L,c_phone#159,c_acctbal#160,c_comment#161,c_mktsegment#162] orc\n: +- Project [o_orderkey#16L, o_custkey#17L, o_orderstatus#18, o_totalprice#19, o_orderpriority#20, o_clerk#21, o_shippriority#22, o_comment#23, o_orderdate#24]\n: +- Filter (o_custkey#17L = outer(c_custkey#6L))\n:+- SubqueryAlias spark_catalog.tpch_data_orc_100.orders\n: +- Relation tpch_data_orc_100.orders[o_orderkey#16L,o_custkey#17L,o_orderstatus#18,o_totalprice#19,o_orderpriority#20,o_clerk#21,o_shippriority#22,o_comment#23,o_orderdate#24] orc\n+- SubqueryAlias spark_catalog.tpch_data_orc_100.customer\n +- Relation tpch_data_orc_100.customer[c_custkey#6L,c_name#7,c_address#8,c_nationkey#9L,c_phone#10,c_acctbal#11,c_comment#12,c_mktsegment#13] orc\n\n== Optimized Logical Plan ==\nSort [cntrycode#147 ASC NULLS FIRST], true\n+- Aggregate [cntrycode#147], [cntrycode#147, count(1) AS numcust#150L, sum(c_acctbal#11) AS totacctbal#151]\n +- Project [substring(c_phone#10, 1, 2) AS cntrycode#147, c_acctbal#11]\n +- Join LeftAnti, (o_custkey#17L = c_custkey#6L)\n :- Project [c_custkey#6L, c_phone#10, c_acctbal#11]\n : +- Filter ((isnotnull(c_acctbal#11) AND substring(c_phone#10, 1, 2) IN (13,31,23,29,30,18,17)) AND (cast(c_acctbal#11 as decimal(16,6)) > scalar-subquery#148 []))\n : : +- Aggregate [avg(c_acctbal#160) AS avg(c_acctbal)#154]\n : : +- Project [c_acctbal#160]\n : :+- Filter (isnotnull(c_acctbal#160) AND ((c_acctbal#160 > 0.00) AND substring(c_phone#159, 1, 2) IN (13,31,23,29,30,18,17)))\n : : +- Relation tpch_data_orc_100.customer[c_custkey#155L,c_name#156,c_address#157,c_nationkey#158L,c_phone#159,c_acctbal#160,c_comment#161,c_mktsegment#162] orc\n : +- Relation tpch_data_orc_100.customer[c_custkey#6L,c_name#7,c_address#8,c_nationkey#9L,c_phone#10,c_acctbal#11,c_comment#12,c_mktsegment#13] orc\n +- Project [o_custkey#17L]\n+- Relation
[jira] [Created] (SPARK-36908) Document speculation metrics added as part of SPARK-36038
Venkata krishnan Sowrirajan created SPARK-36908: --- Summary: Document speculation metrics added as part of SPARK-36038 Key: SPARK-36908 URL: https://issues.apache.org/jira/browse/SPARK-36908 Project: Spark Issue Type: Task Components: Spark Core Affects Versions: 3.2.0 Reporter: Venkata krishnan Sowrirajan -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-36810) Handle HDSF read inconsistencies on Spark when observer Namenode is used
[ https://issues.apache.org/jira/browse/SPARK-36810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17417769#comment-17417769 ] Venkata krishnan Sowrirajan commented on SPARK-36810: - I am currently looking into this issue, JFYI. > Handle HDSF read inconsistencies on Spark when observer Namenode is used > > > Key: SPARK-36810 > URL: https://issues.apache.org/jira/browse/SPARK-36810 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 3.2.0 >Reporter: Venkata krishnan Sowrirajan >Priority: Major > > In short, with HDFS HA and with the use of [Observer > Namenode|[https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/ObserverNameNode.html],] > the read-after-write consistency is only available when both the write and > the read happens from the same client. > But if the write happens on executor and the read happens on the driver, then > the reads would be inconsistent causing application failure issues. This can > be fixed by calling `FileSystem.msync` before making any read calls where the > client thinks the write could have possibly happened elsewhere. > This issue is discussed in greater detail in this > [discussion|https://mail-archives.apache.org/mod_mbox/spark-dev/202108.mbox/browser] > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-36810) Handle HDSF read inconsistencies on Spark when observer Namenode is used
Venkata krishnan Sowrirajan created SPARK-36810: --- Summary: Handle HDSF read inconsistencies on Spark when observer Namenode is used Key: SPARK-36810 URL: https://issues.apache.org/jira/browse/SPARK-36810 Project: Spark Issue Type: Bug Components: Spark Core, SQL Affects Versions: 3.2.0 Reporter: Venkata krishnan Sowrirajan In short, with HDFS HA and with the use of [Observer Namenode|[https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/ObserverNameNode.html],] the read-after-write consistency is only available when both the write and the read happens from the same client. But if the write happens on executor and the read happens on the driver, then the reads would be inconsistent causing application failure issues. This can be fixed by calling `FileSystem.msync` before making any read calls where the client thinks the write could have possibly happened elsewhere. This issue is discussed in greater detail in this [discussion|https://mail-archives.apache.org/mod_mbox/spark-dev/202108.mbox/browser] -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-36558) Stage has all tasks finished but with ongoing finalization can cause job hang
[ https://issues.apache.org/jira/browse/SPARK-36558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17403508#comment-17403508 ] Venkata krishnan Sowrirajan commented on SPARK-36558: - One thing I missed here is not setting the newly constructed `DAGScheduler` into the SparkContext. Let me try that and update here. > Stage has all tasks finished but with ongoing finalization can cause job hang > - > > Key: SPARK-36558 > URL: https://issues.apache.org/jira/browse/SPARK-36558 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Affects Versions: 3.2.0, 3.3.0 >Reporter: wuyi >Priority: Blocker > > > For a stage that all tasks are finished but with ongoing finalization can > lead to job hang. The problem is that such stage is considered as a "missing" > stage (see > [https://github.com/apache/spark/blob/a47ceaf5492040063e31e17570678dc06846c36c/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L719-L721).] > And it breaks the original assumption that a "missing" stage must have tasks > to run. > Normally, if stage A is the parent of (result) stage B and all tasks have > finished in stage A, stage A will be skipped directly when submitting stage > B. However, with this bug, stage A will be submitted. And submitting a stage > with no tasks to run would not be able to add its child stage into the > waiting stage list, which leads to the job hang in the end. > > The example to reproduce: > First, change `MyRDD` to allow it to compute: > {code:java} > override def compute(split: Partition, context: TaskContext): Iterator[(Int, > Int)] = { > Iterator.single((1, 1)) > }{code} > Then run this test: > {code:java} > test("Job hang") { > initPushBasedShuffleConfs(conf) > conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 5) > DAGSchedulerSuite.clearMergerLocs > DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", > "host5")) > val latch = new CountDownLatch(1) > val myDAGScheduler = new MyDAGScheduler( > sc, > sc.dagScheduler.taskScheduler, > sc.listenerBus, > sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], > sc.env.blockManager.master, > sc.env) { > override def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): Unit = > { > // By this, we can mimic a stage with all tasks finished > // but finalization is incomplete. > latch.countDown() > } > } > sc.dagScheduler = myDAGScheduler > sc.taskScheduler.setDAGScheduler(myDAGScheduler) > val parts = 20 > val shuffleMapRdd = new MyRDD(sc, parts, Nil) > val shuffleDep = new ShuffleDependency(shuffleMapRdd, new > HashPartitioner(parts)) > val reduceRdd1 = new MyRDD(sc, parts, List(shuffleDep), tracker = > mapOutputTracker) > reduceRdd1.countAsync() > latch.await() > // scalastyle:off > println("=after wait==") > // set _shuffleMergedFinalized to true can avoid the hang. > // shuffleDep._shuffleMergedFinalized = true > val reduceRdd2 = new MyRDD(sc, parts, List(shuffleDep)) > reduceRdd2.count() > } > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-36558) Stage has all tasks finished but with ongoing finalization can cause job hang
[ https://issues.apache.org/jira/browse/SPARK-36558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17403507#comment-17403507 ] Venkata krishnan Sowrirajan commented on SPARK-36558: - [~Ngone51] I am not sure if you can run the tests that way, because then the `DAGSchedulerSuite` custom `DAGScheduler` or `DAGSchedulerEventLoopTester` won't be used therefore it submits the job with the default `DAGScheduler` and waits in the `finalizeShuffleMerge` which cannot complete because of our mocked up merger locations. Btw below is the test code along with the above changes you mentioned added to `MyRDD`. Am I missing something? {code:java} test("Job hang") { initPushBasedShuffleConfs(conf) conf.set("spark.shuffle.push.mergerLocations.minThreshold", "5") DAGSchedulerSuite.clearMergerLocs DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", "host5")) val latch = new CountDownLatch(1) scheduler = new MyDAGScheduler( sc, taskScheduler, sc.listenerBus, mapOutputTracker, blockManagerMaster, sc.env) { override private[spark] def scheduleShuffleMergeFinalize( stage: ShuffleMapStage): Unit = { // By this, we can mimic a stage with all tasks finished // but finalization is incomplete. latch.countDown() super.scheduleShuffleMergeFinalize(stage) } } dagEventProcessLoopTester = new DAGSchedulerEventProcessLoopTester(scheduler) val parts = 5 val shuffleMapRdd = new MyRDD(sc, parts, Nil) val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(parts)) val reduceRdd1 = new MyRDD(sc, parts, List(shuffleDep), tracker = mapOutputTracker) reduceRdd1.count() latch.await() // scalastyle:off println("=after wait==") // set _shuffleMergedFinalized to true can avoid the hang. val reduceRdd2 = new MyRDD(sc, parts, List(shuffleDep)) reduceRdd2.count() } {code} > Stage has all tasks finished but with ongoing finalization can cause job hang > - > > Key: SPARK-36558 > URL: https://issues.apache.org/jira/browse/SPARK-36558 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Affects Versions: 3.2.0, 3.3.0 >Reporter: wuyi >Priority: Blocker > > > For a stage that all tasks are finished but with ongoing finalization can > lead to job hang. The problem is that such stage is considered as a "missing" > stage (see > [https://github.com/apache/spark/blob/a47ceaf5492040063e31e17570678dc06846c36c/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L719-L721).] > And it breaks the original assumption that a "missing" stage must have tasks > to run. > Normally, if stage A is the parent of (result) stage B and all tasks have > finished in stage A, stage A will be skipped directly when submitting stage > B. However, with this bug, stage A will be submitted. And submitting a stage > with no tasks to run would not be able to add its child stage into the > waiting stage list, which leads to the job hang in the end. > > The example to reproduce: > First, change `MyRDD` to allow it to compute: > {code:java} > override def compute(split: Partition, context: TaskContext): Iterator[(Int, > Int)] = { > Iterator.single((1, 1)) > }{code} > Then run this test: > {code:java} > test("Job hang") { > initPushBasedShuffleConfs(conf) > conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 5) > DAGSchedulerSuite.clearMergerLocs > DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", > "host5")) > val latch = new CountDownLatch(1) > val myDAGScheduler = new MyDAGScheduler( > sc, > sc.dagScheduler.taskScheduler, > sc.listenerBus, > sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], > sc.env.blockManager.master, > sc.env) { > override def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): Unit = > { > // By this, we can mimic a stage with all tasks finished > // but finalization is incomplete. > latch.countDown() > } > } > sc.dagScheduler = myDAGScheduler > sc.taskScheduler.setDAGScheduler(myDAGScheduler) > val parts = 20 > val shuffleMapRdd = new MyRDD(sc, parts, Nil) > val shuffleDep = new ShuffleDependency(shuffleMapRdd, new > HashPartitioner(parts)) > val reduceRdd1 = new MyRDD(sc, parts, List(shuffleDep), tracker = > mapOutputTracker) > reduceRdd1.countAsync() > latch.await() > // scalastyle:off > println("=after wait==") > // set _shuffleMergedFinalized to true can avoid the hang. > // shuffleDep._shuffleMergedFinalized = true > val reduceRdd2 = new MyRDD(sc, parts, List(shuffleDep)) >
[jira] [Commented] (SPARK-36558) Stage has all tasks finished but with ongoing finalization can cause job hang
[ https://issues.apache.org/jira/browse/SPARK-36558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17403404#comment-17403404 ] Venkata krishnan Sowrirajan commented on SPARK-36558: - [~Ngone51] I tried the above test which you shared but I needed to make few changes to basically have shuffle merge enabled properly, without that what the `scheduleShuffleMergeFinalize` won't get invoked therefore it can infinitely wait with the CountDownLatch. Please take a look at the modified test below. 1. Keep `numMergerLocs` equal to `numParts` for the stage to have shuffleMergeEnabled 2. Also use `submit` option inside `DAGSchedulerSuite` to run an action on an RDD. Since the 'runningStages` set would have the stage currently in merge finalization (but all the partitions are available) step therefore the same stage won't get submitted with empty partitions to be computed. This check [here|[https://github.com/apache/spark/blob/a47ceaf5492040063e31e17570678dc06846c36c/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1253]] would prevent the same stage getting submitted again. Also below is the fixed unit test and this seems to run fine. {code:java} test("Job hang") { initPushBasedShuffleConfs(conf) conf.set("spark.shuffle.push.mergerLocations.minThreshold", "5") DAGSchedulerSuite.clearMergerLocs DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", "host5")) val latch = new CountDownLatch(1) scheduler = new MyDAGScheduler( sc, taskScheduler, sc.listenerBus, mapOutputTracker, blockManagerMaster, sc.env) { override private[spark] def scheduleShuffleMergeFinalize( stage: ShuffleMapStage): Unit = { // By this, we can mimic a stage with all tasks finished // but finalization is incomplete. latch.countDown() // super.scheduleShuffleMergeFinalize(stage) } } dagEventProcessLoopTester = new DAGSchedulerEventProcessLoopTester(scheduler) val parts = 5 val shuffleMapRdd = new MyRDD(sc, parts, Nil) val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(parts)) val reduceRdd1 = new MyRDD(sc, parts, List(shuffleDep), tracker = mapOutputTracker) submit(reduceRdd1, (0 until parts).toArray) completeShuffleMapStageSuccessfully(0, 0, parts) completeNextResultStageWithSuccess(1, 0) latch.await() // set _shuffleMergedFinalized to true can avoid the hang. val reduceRdd2 = new MyRDD(sc, parts, List(shuffleDep)) submit(reduceRdd2, (0 until parts).toArray) completeNextResultStageWithSuccess(3, 0) } {code} Let me know your thoughts. Am I missing something here? cc [~mshen] [~mridulm80] > Stage has all tasks finished but with ongoing finalization can cause job hang > - > > Key: SPARK-36558 > URL: https://issues.apache.org/jira/browse/SPARK-36558 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Affects Versions: 3.2.0, 3.3.0 >Reporter: wuyi >Priority: Blocker > > > For a stage that all tasks are finished but with ongoing finalization can > lead to job hang. The problem is that such stage is considered as a "missing" > stage (see > [https://github.com/apache/spark/blob/a47ceaf5492040063e31e17570678dc06846c36c/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L719-L721).] > And it breaks the original assumption that a "missing" stage must have tasks > to run. > Normally, if stage A is the parent of (result) stage B and all tasks have > finished in stage A, stage A will be skipped directly when submitting stage > B. However, with this bug, stage A will be submitted. And submitting a stage > with no tasks to run would not be able to add its child stage into the > waiting stage list, which leads to the job hang in the end. > > The example to reproduce: > {code:java} > test("Job hang") { > initPushBasedShuffleConfs(conf) > conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 5) > DAGSchedulerSuite.clearMergerLocs > DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", > "host5")) > val latch = new CountDownLatch(1) > val myDAGScheduler = new MyDAGScheduler( > sc, > sc.dagScheduler.taskScheduler, > sc.listenerBus, > sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], > sc.env.blockManager.master, > sc.env) { > override def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): Unit = > { > // By this, we can mimic a stage with all tasks finished > // but finalization is incomplete. > latch.countDown() > } > } > sc.dagScheduler = myDAGScheduler > sc.taskScheduler.setDAGScheduler(myDAGScheduler) > val parts =
[jira] [Resolved] (SPARK-36484) Handle Stale block fetch failure on the client side by not retrying the requests
[ https://issues.apache.org/jira/browse/SPARK-36484?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Venkata krishnan Sowrirajan resolved SPARK-36484. - Resolution: Won't Fix Thinking more about it, it doesn't seem to add much of value in terms of avoiding retrying the requests as the server anyway don't do any processing for these stale fetch requests. Since there isn't any reasonable perf improvement, deciding to close this out. > Handle Stale block fetch failure on the client side by not retrying the > requests > > > Key: SPARK-36484 > URL: https://issues.apache.org/jira/browse/SPARK-36484 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Affects Versions: 3.2.0 >Reporter: Venkata krishnan Sowrirajan >Priority: Minor > > Similar to SPARK-36378, we need to handle the stale block fetch failures on > the client side, although without handling it there won't be any correctness > issues. This would help in saving some server side bandwidth unnecessarily > serving stale shuffle fetch requests. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-36484) Handle Stale block fetch failure on the client side by not retrying the requests
Venkata krishnan Sowrirajan created SPARK-36484: --- Summary: Handle Stale block fetch failure on the client side by not retrying the requests Key: SPARK-36484 URL: https://issues.apache.org/jira/browse/SPARK-36484 Project: Spark Issue Type: Sub-task Components: Spark Core Affects Versions: 3.2.0 Reporter: Venkata krishnan Sowrirajan Similar to SPARK-36378, we need to handle the stale block fetch failures on the client side, although without handling it there won't be any correctness issues. This would help in saving some server side bandwidth unnecessarily serving stale shuffle fetch requests. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-36460) Pull out NoOpMergedShuffleFileManager inner class outside
Venkata krishnan Sowrirajan created SPARK-36460: --- Summary: Pull out NoOpMergedShuffleFileManager inner class outside Key: SPARK-36460 URL: https://issues.apache.org/jira/browse/SPARK-36460 Project: Spark Issue Type: Sub-task Components: Shuffle, Spark Core Affects Versions: 3.2.0 Reporter: Venkata krishnan Sowrirajan -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-36374) Push-based shuffle documentation
Venkata krishnan Sowrirajan created SPARK-36374: --- Summary: Push-based shuffle documentation Key: SPARK-36374 URL: https://issues.apache.org/jira/browse/SPARK-36374 Project: Spark Issue Type: Sub-task Components: Documentation Affects Versions: 3.2.0 Reporter: Venkata krishnan Sowrirajan -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-36332) Cleanup RemoteBlockPushResolver log messages
Venkata krishnan Sowrirajan created SPARK-36332: --- Summary: Cleanup RemoteBlockPushResolver log messages Key: SPARK-36332 URL: https://issues.apache.org/jira/browse/SPARK-36332 Project: Spark Issue Type: Sub-task Components: Shuffle, Spark Core Affects Versions: 3.2.0 Reporter: Venkata krishnan Sowrirajan Minor cleanups to RemoteBlockPushResolver to use AppShufflePartitionsInfo toString() for log messages. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-36317) PruneFileSourcePartitionsSuite tests are failing after the fix to SPARK-36136
Venkata krishnan Sowrirajan created SPARK-36317: --- Summary: PruneFileSourcePartitionsSuite tests are failing after the fix to SPARK-36136 Key: SPARK-36317 URL: https://issues.apache.org/jira/browse/SPARK-36317 Project: Spark Issue Type: Test Components: SQL Affects Versions: 3.2.0 Reporter: Venkata krishnan Sowrirajan After the fix to [SPARK-36136][SQL][TESTS] Refactor PruneFileSourcePartitionsSuite etc to a different package, couple of tests in PruneFileSourcePartitionsSuite are failing now. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-36038) Basic speculation metrics at stage level
Venkata krishnan Sowrirajan created SPARK-36038: --- Summary: Basic speculation metrics at stage level Key: SPARK-36038 URL: https://issues.apache.org/jira/browse/SPARK-36038 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 3.1.2 Reporter: Venkata krishnan Sowrirajan Currently there are no speculation metrics available either at application level or at stage level. With in our platform, we have added speculation metrics at stage level as a summary similarly to the stage level metrics tracking numTotalSpeculated, numCompleted (successful), numFailed, numKilled etc. This enables us to effectively understand speculative execution feature at an application level and helps in further tuning the speculation configs. cc [~ron8hu] -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-35613) Cache commonly occurring strings from SQLMetrics, JsonProtocol and AccumulatorV2
[ https://issues.apache.org/jira/browse/SPARK-35613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Venkata krishnan Sowrirajan updated SPARK-35613: Summary: Cache commonly occurring strings from SQLMetrics, JsonProtocol and AccumulatorV2 (was: Cache commonly occurring strings from SQLMetrics and in JsonProtocol) > Cache commonly occurring strings from SQLMetrics, JsonProtocol and > AccumulatorV2 > > > Key: SPARK-35613 > URL: https://issues.apache.org/jira/browse/SPARK-35613 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 3.2.0 >Reporter: Venkata krishnan Sowrirajan >Priority: Major > > SQLMetrics allows creation of different metrics like sizing, timing metrics > etc. some of those names for the metric can be duplicated and along with that > the `Some` wrapper objects adds additional memory overhead. In our internal > platform, this has caused huge memory usage on driver causing Full GC to kick > in every so often. > cc [~mridulm80] -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-35613) Cache commonly occurring strings from SQLMetrics and in JsonProtocol
Venkata krishnan Sowrirajan created SPARK-35613: --- Summary: Cache commonly occurring strings from SQLMetrics and in JsonProtocol Key: SPARK-35613 URL: https://issues.apache.org/jira/browse/SPARK-35613 Project: Spark Issue Type: Bug Components: Spark Core, SQL Affects Versions: 3.2.0 Reporter: Venkata krishnan Sowrirajan SQLMetrics allows creation of different metrics like sizing, timing metrics etc. some of those names for the metric can be duplicated and along with that the `Some` wrapper objects adds additional memory overhead. In our internal platform, this has caused huge memory usage on driver causing Full GC to kick in every so often. cc [~mridulm80] -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-35549) Register merge status even after shuffle dependency is merge finalized
Venkata krishnan Sowrirajan created SPARK-35549: --- Summary: Register merge status even after shuffle dependency is merge finalized Key: SPARK-35549 URL: https://issues.apache.org/jira/browse/SPARK-35549 Project: Spark Issue Type: Sub-task Components: Shuffle, Spark Core Affects Versions: 3.1.0 Reporter: Venkata krishnan Sowrirajan Currently the merge statuses which arrive late from the external shuffle services (or shuffle mergers) won't get registered once the shuffle dependency merge is finalized. This needs to be carefully done as there are lot of corner cases like: a) executor/node loss causing re-computation due to fetch failure and if the merge statuses gets registered very late then that can cause inconsistencies. b) similar such cases cc [~mridulm80] [~mshen] -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-35547) Support push based shuffle when barrier scheduling is enabled
Venkata krishnan Sowrirajan created SPARK-35547: --- Summary: Support push based shuffle when barrier scheduling is enabled Key: SPARK-35547 URL: https://issues.apache.org/jira/browse/SPARK-35547 Project: Spark Issue Type: Sub-task Components: Shuffle, Spark Core Affects Versions: 3.1.0 Reporter: Venkata krishnan Sowrirajan Currently push based shuffle is not enabled with barrier scheduling as it is not tested with barrier scheduling yet. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-35536) Cancel finalizing the shuffle merge if the stage is cancelled while waiting until shuffle merge finalize wait time.
Venkata krishnan Sowrirajan created SPARK-35536: --- Summary: Cancel finalizing the shuffle merge if the stage is cancelled while waiting until shuffle merge finalize wait time. Key: SPARK-35536 URL: https://issues.apache.org/jira/browse/SPARK-35536 Project: Spark Issue Type: Sub-task Components: Shuffle, Spark Core Affects Versions: 3.1.0 Reporter: Venkata krishnan Sowrirajan Currently we are not handling the case of stage cancellation and if the stage is cancelled while the stage is about to get merge finalized. Stage completion will wait for shuffle merge finalize wait time so that all the shuffle merge requests are drained but during this time the stage can be cancelled, if this happens then we need to let all the shuffle services to remove the merged shuffle data for the corresponding shuffle ID. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-35036) Improve push based shuffle to work with AQE by fetching partial map indexes for a reduce partition
Venkata krishnan Sowrirajan created SPARK-35036: --- Summary: Improve push based shuffle to work with AQE by fetching partial map indexes for a reduce partition Key: SPARK-35036 URL: https://issues.apache.org/jira/browse/SPARK-35036 Project: Spark Issue Type: Sub-task Components: Spark Core Affects Versions: 3.1.1 Reporter: Venkata krishnan Sowrirajan Currently when both Push based shuffle and AQE is enabled and when partial set of map indexes are requested to MapOutputTracker this is delegated the regular shuffle instead of push based shuffle reading map blocks. This is because blocks from mapper in push based shuffle are merged out of order due to which its hard to only get the matching blocks of the reduce partition for the requested start and end map indexes. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-34826) Adaptive fetch of shuffle mergers for Push based shuffle
Venkata krishnan Sowrirajan created SPARK-34826: --- Summary: Adaptive fetch of shuffle mergers for Push based shuffle Key: SPARK-34826 URL: https://issues.apache.org/jira/browse/SPARK-34826 Project: Spark Issue Type: Sub-task Components: Shuffle, Spark Core Affects Versions: 3.1.0 Reporter: Venkata krishnan Sowrirajan Currently the shuffle mergers are set during the creation of ShuffleMapStage. In the initial set of stages, there won't be enough executors added which can cause not enough shuffle mergers to be set during the creation of the shuffle map stage. This task is to handle the issue of low merge ratio for initial stages. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-33778) Allow typesafe join for LeftSemi and LeftAnti
[ https://issues.apache.org/jira/browse/SPARK-33778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17249873#comment-17249873 ] Venkata krishnan Sowrirajan commented on SPARK-33778: - Thanks [~angerszhuuu] > Allow typesafe join for LeftSemi and LeftAnti > - > > Key: SPARK-33778 > URL: https://issues.apache.org/jira/browse/SPARK-33778 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.0.1 >Reporter: Venkata krishnan Sowrirajan >Priority: Major > > With [SPARK-21333|https://issues.apache.org/jira/browse/SPARK-21333] change, > LeftSemi and LeftAnti no longer has a typesafe join API. It makes sense to > not support LeftSemi and LeftAnti as part of joinWith as it returns tuples > which includes values from both the datasets which is not possible in the > above joins. Neverthless, it would be nice to have a separate join API or in > the existing API to support LeftSemi and LeftAnti which returns Dataset. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-33778) Allow typesafe join for LeftSemi and LeftAnti
Venkata krishnan Sowrirajan created SPARK-33778: --- Summary: Allow typesafe join for LeftSemi and LeftAnti Key: SPARK-33778 URL: https://issues.apache.org/jira/browse/SPARK-33778 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.0.1 Reporter: Venkata krishnan Sowrirajan With [SPARK-21333|https://issues.apache.org/jira/browse/SPARK-21333] change, LeftSemi and LeftAnti no longer has a typesafe join API. It makes sense to not support LeftSemi and LeftAnti as part of joinWith as it returns tuples which includes values from both the datasets which is not possible in the above joins. Neverthless, it would be nice to have a separate join API or in the existing API to support LeftSemi and LeftAnti which returns Dataset. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-33481) Better heuristics to compute number of shuffle mergers required for a ShuffleMapStage
Venkata krishnan Sowrirajan created SPARK-33481: --- Summary: Better heuristics to compute number of shuffle mergers required for a ShuffleMapStage Key: SPARK-33481 URL: https://issues.apache.org/jira/browse/SPARK-33481 Project: Spark Issue Type: Sub-task Components: Shuffle, Spark Core Affects Versions: 3.1.0 Reporter: Venkata krishnan Sowrirajan Currently we have a naive way of computing the number of shuffle mergers required for a ShuffleMapStage by considering the resulting number of partitions. This needs to be further brainstormed and enhanced. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-33329) Pluggable API to fetch shuffle merger locations with Push based shuffle
Venkata krishnan Sowrirajan created SPARK-33329: --- Summary: Pluggable API to fetch shuffle merger locations with Push based shuffle Key: SPARK-33329 URL: https://issues.apache.org/jira/browse/SPARK-33329 Project: Spark Issue Type: Sub-task Components: Shuffle, Spark Core Affects Versions: 3.1.0 Reporter: Venkata krishnan Sowrirajan Possibly extend ShuffleDriverComponents to add a separate API to fetch shuffle merger locations with Push based shuffle. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-32596) Clear Ivy resolution files as part of the finally block
Venkata krishnan Sowrirajan created SPARK-32596: --- Summary: Clear Ivy resolution files as part of the finally block Key: SPARK-32596 URL: https://issues.apache.org/jira/browse/SPARK-32596 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 3.0.0 Reporter: Venkata krishnan Sowrirajan Clear ivy resolution files as part of the finally block in SparkSubmit without which failures while resolving packages can leave resolution files around. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-31418) Blacklisting feature aborts Spark job without retrying for max num retries in case of Dynamic allocation
[ https://issues.apache.org/jira/browse/SPARK-31418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17082731#comment-17082731 ] Venkata krishnan Sowrirajan commented on SPARK-31418: - [~tgraves] Currently, I'm thinking we can check if dynamic allocation is enabled if so we can request for one more executor using ExecutorAllocationClient#requestExecutors and start the abort timer. But I re-read your [comment|https://issues.apache.org/jira/browse/SPARK-22148?focusedCommentId=17078278=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17078278] again and it seems like you tried to pass the information to ExecutorAllocationManager and request the executor through ExecutorAllocationManager. Is that right? Regarding, kill other non idle blacklisted executor idea, I don't think that would be better as we might kill tasks from other stages like mentioned in other comments from the PR. Let me know if you have any other thoughts on this problem. But we are facing this issue more frequently although retrying the whole job will pass but it happens frequently. > Blacklisting feature aborts Spark job without retrying for max num retries in > case of Dynamic allocation > > > Key: SPARK-31418 > URL: https://issues.apache.org/jira/browse/SPARK-31418 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0, 2.4.5 >Reporter: Venkata krishnan Sowrirajan >Priority: Major > > With Spark blacklisting, if a task fails on an executor, the executor gets > blacklisted for the task. In order to retry the task, it checks if there are > idle blacklisted executor which can be killed and replaced to retry the task > if not it aborts the job without doing max retries. > In the context of dynamic allocation this can be better, instead of killing > the blacklisted idle executor (its possible there are no idle blacklisted > executor), request an additional executor and retry the task. > This can be easily reproduced with a simple job like below, although this > example should fail eventually just to show that its not retried > spark.task.maxFailures times: > {code:java} > def test(a: Int) = { a.asInstanceOf[String] } > sc.parallelize(1 to 10, 10).map(x => test(x)).collect > {code} > with dynamic allocation enabled and min executors set to 1. But there are > various other cases where this can fail as well. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-31418) Blacklisting feature aborts Spark job without retrying for max num retries in case of Dynamic allocation
[ https://issues.apache.org/jira/browse/SPARK-31418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17080942#comment-17080942 ] Venkata krishnan Sowrirajan commented on SPARK-31418: - Currently, I'm working on this issue. > Blacklisting feature aborts Spark job without retrying for max num retries in > case of Dynamic allocation > > > Key: SPARK-31418 > URL: https://issues.apache.org/jira/browse/SPARK-31418 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0, 2.4.5 >Reporter: Venkata krishnan Sowrirajan >Priority: Major > > With Spark blacklisting, if a task fails on an executor, the executor gets > blacklisted for the task. In order to retry the task, it checks if there are > idle blacklisted executor which can be killed and replaced to retry the task > if not it aborts the job without doing max retries. > In the context of dynamic allocation this can be better, instead of killing > the blacklisted idle executor (its possible there are no idle blacklisted > executor), request an additional executor and retry the task. > This can be easily reproduced with a simple job like below, although this > example should fail eventually just to show that its not retried > spark.task.maxFailures times: > {code:java} > def test(a: Int) = { a.asInstanceOf[String] } > sc.parallelize(1 to 10, 10).map(x => test(x)).collect > {code} > with dynamic allocation enabled and min executors set to 1. But there are > various other cases where this can fail as well. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-31418) Blacklisting feature aborts Spark job without retrying for max num retries in case of Dynamic allocation
[ https://issues.apache.org/jira/browse/SPARK-31418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Venkata krishnan Sowrirajan updated SPARK-31418: Description: With Spark blacklisting, if a task fails on an executor, the executor gets blacklisted for the task. In order to retry the task, it checks if there are idle blacklisted executor which can be killed and replaced to retry the task if not it aborts the job without doing max retries. In the context of dynamic allocation this can be better, instead of killing the blacklisted idle executor (its possible there are no idle blacklisted executor), request an additional executor and retry the task. This can be easily reproduced with a simple job like below, although this example should fail eventually just to show that its not retried spark.task.maxFailures times: {code:java} def test(a: Int) = { a.asInstanceOf[String] } sc.parallelize(1 to 10, 10).map(x => test(x)).collect {code} with dynamic allocation enabled and min executors set to 1. But there are various other cases where this can fail as well. was: With Spark blacklisting, if a task fails on an executor, the executor gets blacklisted for the task. In order to retry the task, it checks if there are idle blacklisted executor which can be killed and replaced to retry the task if not it aborts the job without doing max retries. In the context of dynamic allocation this can be better, instead of killing the blacklisted idle executor (its possible there are no idle blacklisted executor), request an additional executor and retry the task. This can be easily reproduced with a simple job like below: {code:java} def test(a: Int) = { a.asInstanceOf[String] } sc.parallelize(1 to 10, 10).map(x => test(x)).collect {code} with dynamic allocation enabled and min executors set to 1. But there are various other cases where this can fail as well. > Blacklisting feature aborts Spark job without retrying for max num retries in > case of Dynamic allocation > > > Key: SPARK-31418 > URL: https://issues.apache.org/jira/browse/SPARK-31418 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0, 2.4.5 >Reporter: Venkata krishnan Sowrirajan >Priority: Major > > With Spark blacklisting, if a task fails on an executor, the executor gets > blacklisted for the task. In order to retry the task, it checks if there are > idle blacklisted executor which can be killed and replaced to retry the task > if not it aborts the job without doing max retries. > In the context of dynamic allocation this can be better, instead of killing > the blacklisted idle executor (its possible there are no idle blacklisted > executor), request an additional executor and retry the task. > This can be easily reproduced with a simple job like below, although this > example should fail eventually just to show that its not retried > spark.task.maxFailures times: > {code:java} > def test(a: Int) = { a.asInstanceOf[String] } > sc.parallelize(1 to 10, 10).map(x => test(x)).collect > {code} > with dynamic allocation enabled and min executors set to 1. But there are > various other cases where this can fail as well. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-31418) Blacklisting feature aborts Spark job without retrying for max num retries in case of Dynamic allocation
Venkata krishnan Sowrirajan created SPARK-31418: --- Summary: Blacklisting feature aborts Spark job without retrying for max num retries in case of Dynamic allocation Key: SPARK-31418 URL: https://issues.apache.org/jira/browse/SPARK-31418 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.4.5, 2.3.0 Reporter: Venkata krishnan Sowrirajan With Spark blacklisting, if a task fails on an executor, the executor gets blacklisted for the task. In order to retry the task, it checks if there are idle blacklisted executor which can be killed and replaced to retry the task if not it aborts the job without doing max retries. In the context of dynamic allocation this can be better, instead of killing the blacklisted idle executor (its possible there are no idle blacklisted executor), request an additional executor and retry the task. This can be easily reproduced with a simple job like below: {code:java} def test(a: Int) = { a.asInstanceOf[String] } sc.parallelize(1 to 10, 10).map(x => test(x)).collect {code} with dynamic allocation enabled and min executors set to 1. But there are various other cases where this can fail as well. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22148) TaskSetManager.abortIfCompletelyBlacklisted should not abort when all current executors are blacklisted but dynamic allocation is enabled
[ https://issues.apache.org/jira/browse/SPARK-22148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17078520#comment-17078520 ] Venkata krishnan Sowrirajan commented on SPARK-22148: - Thanks for your comments [~tgraves] Makes sense, I will think about it more, create a new JIRA and share a new proposal based on how we think about it internally. > TaskSetManager.abortIfCompletelyBlacklisted should not abort when all current > executors are blacklisted but dynamic allocation is enabled > - > > Key: SPARK-22148 > URL: https://issues.apache.org/jira/browse/SPARK-22148 > Project: Spark > Issue Type: Bug > Components: Scheduler, Spark Core >Affects Versions: 2.2.0 >Reporter: Juan Rodríguez Hortalá >Assignee: Dhruve Ashar >Priority: Major > Fix For: 2.4.1, 3.0.0 > > Attachments: SPARK-22148_WIP.diff > > > Currently TaskSetManager.abortIfCompletelyBlacklisted aborts the TaskSet and > the whole Spark job with `task X (partition Y) cannot run anywhere due to > node and executor blacklist. Blacklisting behavior can be configured via > spark.blacklist.*.` when all the available executors are blacklisted for a > pending Task or TaskSet. This makes sense for static allocation, where the > set of executors is fixed for the duration of the application, but this might > lead to unnecessary job failures when dynamic allocation is enabled. For > example, in a Spark application with a single job at a time, when a node > fails at the end of a stage attempt, all other executors will complete their > tasks, but the tasks running in the executors of the failing node will be > pending. Spark will keep waiting for those tasks for 2 minutes by default > (spark.network.timeout) until the heartbeat timeout is triggered, and then it > will blacklist those executors for that stage. At that point in time, other > executors would had been released after being idle for 1 minute by default > (spark.dynamicAllocation.executorIdleTimeout), because the next stage hasn't > started yet and so there are no more tasks available (assuming the default of > spark.speculation = false). So Spark will fail because the only executors > available are blacklisted for that stage. > An alternative is requesting more executors to the cluster manager in this > situation. This could be retried a configurable number of times after a > configurable wait time between request attempts, so if the cluster manager > fails to provide a suitable executor then the job is aborted like in the > previous case. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-22148) TaskSetManager.abortIfCompletelyBlacklisted should not abort when all current executors are blacklisted but dynamic allocation is enabled
[ https://issues.apache.org/jira/browse/SPARK-22148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17077674#comment-17077674 ] Venkata krishnan Sowrirajan edited comment on SPARK-22148 at 4/8/20, 12:04 AM: --- Thanks for responding [~tgraves]. Thats right. Lets say all the executors are busy with some task and one of the task fails, then we are aborting the stage as there is no idle blacklisted executor available to kill and replace. But with dynamic allocation enabled, we could have requested for more executors and retried the task. Infact, I can reproduce this with min executors set to 1 and max to some number. In this case, it wouldn't scale up immediately and the first task fails the whole stage because the only executor available is blacklisted for the task and also busy running other task at that time. // Though this example would fail as casting an int to string is not valid. Just for example purposes. {code:java} def test(a: Int) = { a.asInstanceOf[String] } sc.parallelize(1 to 10, 10).map(x => test(x)).collect {code} Although if there are more executors, then its retried. Similar other cases are possible was (Author: vsowrirajan): Thanks for responding [~tgraves]. Thats right. Lets say all the executors are busy with some task and one of the task fails, then we are aborting the stage as there is no idle blacklisted executor available to kill and replace. But with dynamic allocation enabled, we could have requested for more executors and retried the task. Infact, I can reproduce this with min executors set to 1 and max to some number. In this case, it wouldn't scale up immediately and the first task fails the whole stage because the only executor available is blacklisted for the task and also busy running other task at that time. // Though this example would fail as casting an int to string is not valid. Just for example purposes. {code:java} def test(a: Int) = { a.asInstanceOf[String] } sc.parallelize(1 to 10, 10).map(x => test(x)).collect {code} Although if there are more executors, then its possibly retried. Similar other cases are possible > TaskSetManager.abortIfCompletelyBlacklisted should not abort when all current > executors are blacklisted but dynamic allocation is enabled > - > > Key: SPARK-22148 > URL: https://issues.apache.org/jira/browse/SPARK-22148 > Project: Spark > Issue Type: Bug > Components: Scheduler, Spark Core >Affects Versions: 2.2.0 >Reporter: Juan Rodríguez Hortalá >Assignee: Dhruve Ashar >Priority: Major > Fix For: 2.4.1, 3.0.0 > > Attachments: SPARK-22148_WIP.diff > > > Currently TaskSetManager.abortIfCompletelyBlacklisted aborts the TaskSet and > the whole Spark job with `task X (partition Y) cannot run anywhere due to > node and executor blacklist. Blacklisting behavior can be configured via > spark.blacklist.*.` when all the available executors are blacklisted for a > pending Task or TaskSet. This makes sense for static allocation, where the > set of executors is fixed for the duration of the application, but this might > lead to unnecessary job failures when dynamic allocation is enabled. For > example, in a Spark application with a single job at a time, when a node > fails at the end of a stage attempt, all other executors will complete their > tasks, but the tasks running in the executors of the failing node will be > pending. Spark will keep waiting for those tasks for 2 minutes by default > (spark.network.timeout) until the heartbeat timeout is triggered, and then it > will blacklist those executors for that stage. At that point in time, other > executors would had been released after being idle for 1 minute by default > (spark.dynamicAllocation.executorIdleTimeout), because the next stage hasn't > started yet and so there are no more tasks available (assuming the default of > spark.speculation = false). So Spark will fail because the only executors > available are blacklisted for that stage. > An alternative is requesting more executors to the cluster manager in this > situation. This could be retried a configurable number of times after a > configurable wait time between request attempts, so if the cluster manager > fails to provide a suitable executor then the job is aborted like in the > previous case. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-22148) TaskSetManager.abortIfCompletelyBlacklisted should not abort when all current executors are blacklisted but dynamic allocation is enabled
[ https://issues.apache.org/jira/browse/SPARK-22148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17077674#comment-17077674 ] Venkata krishnan Sowrirajan edited comment on SPARK-22148 at 4/8/20, 12:04 AM: --- Thanks for responding [~tgraves]. Thats right. Lets say all the executors are busy with some task and one of the task fails, then we are aborting the stage as there is no idle blacklisted executor available to kill and replace. But with dynamic allocation enabled, we could have requested for more executors and retried the task. Infact, I can reproduce this with min executors set to 1 and max to some number. In this case, it wouldn't scale up immediately and the first task fails the whole stage because the only executor available is blacklisted for the task and also busy running other task at that time. // Though this example would fail as casting an int to string is not valid. Just for example purposes. {code:java} def test(a: Int) = { a.asInstanceOf[String] } sc.parallelize(1 to 10, 10).map(x => test(x)).collect {code} Although if there are more executors, then its possibly retried. Similar other cases are possible was (Author: vsowrirajan): Thanks for responding [~tgraves]. Thats right. Lets say all the executors are busy with some task and one of the task fails, then we are aborting the stage as there is no idle blacklisted executor available to kill and replace. But with dynamic allocation enabled, we could have requested for more executors and retried the task. Infact, I can reproduce this with min executors set to 1 and max to some number. In this case, it wouldn't scale up immediately and the first task fails the whole stage because the only executor available is blacklisted for the task and also busy running other task at that time. // Though this example would fail as casting an int to string is not valid. Just for example purposes. def test(a: Int) = { a.asInstanceOf[String] } sc.parallelize(1 to 10, 10).map(x => test(x)).collect Although if there are more executors, then its possibly retried. Similar other cases are possible > TaskSetManager.abortIfCompletelyBlacklisted should not abort when all current > executors are blacklisted but dynamic allocation is enabled > - > > Key: SPARK-22148 > URL: https://issues.apache.org/jira/browse/SPARK-22148 > Project: Spark > Issue Type: Bug > Components: Scheduler, Spark Core >Affects Versions: 2.2.0 >Reporter: Juan Rodríguez Hortalá >Assignee: Dhruve Ashar >Priority: Major > Fix For: 2.4.1, 3.0.0 > > Attachments: SPARK-22148_WIP.diff > > > Currently TaskSetManager.abortIfCompletelyBlacklisted aborts the TaskSet and > the whole Spark job with `task X (partition Y) cannot run anywhere due to > node and executor blacklist. Blacklisting behavior can be configured via > spark.blacklist.*.` when all the available executors are blacklisted for a > pending Task or TaskSet. This makes sense for static allocation, where the > set of executors is fixed for the duration of the application, but this might > lead to unnecessary job failures when dynamic allocation is enabled. For > example, in a Spark application with a single job at a time, when a node > fails at the end of a stage attempt, all other executors will complete their > tasks, but the tasks running in the executors of the failing node will be > pending. Spark will keep waiting for those tasks for 2 minutes by default > (spark.network.timeout) until the heartbeat timeout is triggered, and then it > will blacklist those executors for that stage. At that point in time, other > executors would had been released after being idle for 1 minute by default > (spark.dynamicAllocation.executorIdleTimeout), because the next stage hasn't > started yet and so there are no more tasks available (assuming the default of > spark.speculation = false). So Spark will fail because the only executors > available are blacklisted for that stage. > An alternative is requesting more executors to the cluster manager in this > situation. This could be retried a configurable number of times after a > configurable wait time between request attempts, so if the cluster manager > fails to provide a suitable executor then the job is aborted like in the > previous case. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22148) TaskSetManager.abortIfCompletelyBlacklisted should not abort when all current executors are blacklisted but dynamic allocation is enabled
[ https://issues.apache.org/jira/browse/SPARK-22148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17077674#comment-17077674 ] Venkata krishnan Sowrirajan commented on SPARK-22148: - Thanks for responding [~tgraves]. Thats right. Lets say all the executors are busy with some task and one of the task fails, then we are aborting the stage as there is no idle blacklisted executor available to kill and replace. But with dynamic allocation enabled, we could have requested for more executors and retried the task. Infact, I can reproduce this with min executors set to 1 and max to some number. In this case, it wouldn't scale up immediately and the first task fails the whole stage because the only executor available is blacklisted for the task and also busy running other task at that time. // Though this example would fail as casting an int to string is not valid. Just for example purposes. def test(a: Int) = { a.asInstanceOf[String] } sc.parallelize(1 to 10, 10).map(x => test(x)).collect Although if there are more executors, then its possibly retried. Similar other cases are possible > TaskSetManager.abortIfCompletelyBlacklisted should not abort when all current > executors are blacklisted but dynamic allocation is enabled > - > > Key: SPARK-22148 > URL: https://issues.apache.org/jira/browse/SPARK-22148 > Project: Spark > Issue Type: Bug > Components: Scheduler, Spark Core >Affects Versions: 2.2.0 >Reporter: Juan Rodríguez Hortalá >Assignee: Dhruve Ashar >Priority: Major > Fix For: 2.4.1, 3.0.0 > > Attachments: SPARK-22148_WIP.diff > > > Currently TaskSetManager.abortIfCompletelyBlacklisted aborts the TaskSet and > the whole Spark job with `task X (partition Y) cannot run anywhere due to > node and executor blacklist. Blacklisting behavior can be configured via > spark.blacklist.*.` when all the available executors are blacklisted for a > pending Task or TaskSet. This makes sense for static allocation, where the > set of executors is fixed for the duration of the application, but this might > lead to unnecessary job failures when dynamic allocation is enabled. For > example, in a Spark application with a single job at a time, when a node > fails at the end of a stage attempt, all other executors will complete their > tasks, but the tasks running in the executors of the failing node will be > pending. Spark will keep waiting for those tasks for 2 minutes by default > (spark.network.timeout) until the heartbeat timeout is triggered, and then it > will blacklist those executors for that stage. At that point in time, other > executors would had been released after being idle for 1 minute by default > (spark.dynamicAllocation.executorIdleTimeout), because the next stage hasn't > started yet and so there are no more tasks available (assuming the default of > spark.speculation = false). So Spark will fail because the only executors > available are blacklisted for that stage. > An alternative is requesting more executors to the cluster manager in this > situation. This could be retried a configurable number of times after a > configurable wait time between request attempts, so if the cluster manager > fails to provide a suitable executor then the job is aborted like in the > previous case. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22148) TaskSetManager.abortIfCompletelyBlacklisted should not abort when all current executors are blacklisted but dynamic allocation is enabled
[ https://issues.apache.org/jira/browse/SPARK-22148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17077603#comment-17077603 ] Venkata krishnan Sowrirajan commented on SPARK-22148: - [~irashid][~Dhruve Ashar] Recently we have enabled blacklisting in our platform and it works nicely mostly. We also have this fix where there are no executors to retry due to blacklisting (mainly with dynamic allocation enabled and happens during the tail end of the stage). I also went through the fix and in general blacklisting code. Although it still happens, where all the other executors are busy and no idle blacklisted executor left to kill and request a new executor which causes the stage and eventually the job to be aborted before all the retries. Do you guys also see this behavior or have this issue? Do you think requesting a new executor in general would help rather than trying to kill a blacklisted idle executor? > TaskSetManager.abortIfCompletelyBlacklisted should not abort when all current > executors are blacklisted but dynamic allocation is enabled > - > > Key: SPARK-22148 > URL: https://issues.apache.org/jira/browse/SPARK-22148 > Project: Spark > Issue Type: Bug > Components: Scheduler, Spark Core >Affects Versions: 2.2.0 >Reporter: Juan Rodríguez Hortalá >Assignee: Dhruve Ashar >Priority: Major > Fix For: 2.4.1, 3.0.0 > > Attachments: SPARK-22148_WIP.diff > > > Currently TaskSetManager.abortIfCompletelyBlacklisted aborts the TaskSet and > the whole Spark job with `task X (partition Y) cannot run anywhere due to > node and executor blacklist. Blacklisting behavior can be configured via > spark.blacklist.*.` when all the available executors are blacklisted for a > pending Task or TaskSet. This makes sense for static allocation, where the > set of executors is fixed for the duration of the application, but this might > lead to unnecessary job failures when dynamic allocation is enabled. For > example, in a Spark application with a single job at a time, when a node > fails at the end of a stage attempt, all other executors will complete their > tasks, but the tasks running in the executors of the failing node will be > pending. Spark will keep waiting for those tasks for 2 minutes by default > (spark.network.timeout) until the heartbeat timeout is triggered, and then it > will blacklist those executors for that stage. At that point in time, other > executors would had been released after being idle for 1 minute by default > (spark.dynamicAllocation.executorIdleTimeout), because the next stage hasn't > started yet and so there are no more tasks available (assuming the default of > spark.speculation = false). So Spark will fail because the only executors > available are blacklisted for that stage. > An alternative is requesting more executors to the cluster manager in this > situation. This could be retried a configurable number of times after a > configurable wait time between request attempts, so if the cluster manager > fails to provide a suitable executor then the job is aborted like in the > previous case. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27338) Deadlock between TaskMemoryManager and UnsafeExternalSorter$SpillableIterator
[ https://issues.apache.org/jira/browse/SPARK-27338?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16806946#comment-16806946 ] Venkata krishnan Sowrirajan commented on SPARK-27338: - We found this issue in one of the customer workload. Will file a PR for this issue shortly. > Deadlock between TaskMemoryManager and UnsafeExternalSorter$SpillableIterator > - > > Key: SPARK-27338 > URL: https://issues.apache.org/jira/browse/SPARK-27338 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.4.0 >Reporter: Venkata krishnan Sowrirajan >Priority: Major > > We saw similar deadlock like this > https://issues.apache.org/jira/browse/SPARK-26265 happening between > TaskMemoryManager and UnsafeExternalSorted$SpillableIterator > Jstack output: > jstack information as follow: > {code:java} > Found one Java-level deadlock: > = > "stdout writer for > /usr/lib/envs/env-1923-ver-1755-a-4.2.9-py-3.5.3/bin/python": > waiting to lock monitor 0x7fce56409088 (object 0x0005700a2f98, a > org.apache.spark.memory.TaskMemoryManager), > which is held by "Executor task launch worker for task 2203" > "Executor task launch worker for task 2203": > waiting to lock monitor 0x007cd878 (object 0x0005701a0eb0, a > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter$SpillableIterator), > which is held by "stdout writer for > /usr/lib/envs/env-1923-ver-1755-a-4.2.9-py-3.5.3/bin/python" > Java stack information for the threads listed above: > === > "stdout writer for > /usr/lib/envs/env-1923-ver-1755-a-4.2.9-py-3.5.3/bin/python": > at > org.apache.spark.memory.TaskMemoryManager.freePage(TaskMemoryManager.java:334) > - waiting to lock <0x0005700a2f98> (a > org.apache.spark.memory.TaskMemoryManager) > at > org.apache.spark.memory.MemoryConsumer.freePage(MemoryConsumer.java:130) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.access$1100(UnsafeExternalSorter.java:48) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter$SpillableIterator.loadNext(UnsafeExternalSorter.java:583) > - locked <0x0005701a0eb0> (a > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter$SpillableIterator) > at > org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.next(UnsafeExternalRowSorter.java:187) > at > org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.next(UnsafeExternalRowSorter.java:174) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage10.findNextInnerJoinRows$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage10.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$2.hasNext(WholeStageCodegenExec.scala:638) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) > at > scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1073) > at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1089) > at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1127) > at > scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at > org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224) > at > org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.writeIteratorToStream(PythonUDFRunner.scala:50) > at > org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:345) > at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2067) > at > org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:194) > "Executor task
[jira] [Created] (SPARK-27338) Deadlock between TaskMemoryManager and UnsafeExternalSorter$SpillableIterator
Venkata krishnan Sowrirajan created SPARK-27338: --- Summary: Deadlock between TaskMemoryManager and UnsafeExternalSorter$SpillableIterator Key: SPARK-27338 URL: https://issues.apache.org/jira/browse/SPARK-27338 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.4.0 Reporter: Venkata krishnan Sowrirajan We saw similar deadlock like this https://issues.apache.org/jira/browse/SPARK-26265 happening between TaskMemoryManager and UnsafeExternalSorted$SpillableIterator Jstack output: jstack information as follow: {code:java} Found one Java-level deadlock: = "stdout writer for /usr/lib/envs/env-1923-ver-1755-a-4.2.9-py-3.5.3/bin/python": waiting to lock monitor 0x7fce56409088 (object 0x0005700a2f98, a org.apache.spark.memory.TaskMemoryManager), which is held by "Executor task launch worker for task 2203" "Executor task launch worker for task 2203": waiting to lock monitor 0x007cd878 (object 0x0005701a0eb0, a org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter$SpillableIterator), which is held by "stdout writer for /usr/lib/envs/env-1923-ver-1755-a-4.2.9-py-3.5.3/bin/python" Java stack information for the threads listed above: === "stdout writer for /usr/lib/envs/env-1923-ver-1755-a-4.2.9-py-3.5.3/bin/python": at org.apache.spark.memory.TaskMemoryManager.freePage(TaskMemoryManager.java:334) - waiting to lock <0x0005700a2f98> (a org.apache.spark.memory.TaskMemoryManager) at org.apache.spark.memory.MemoryConsumer.freePage(MemoryConsumer.java:130) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.access$1100(UnsafeExternalSorter.java:48) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter$SpillableIterator.loadNext(UnsafeExternalSorter.java:583) - locked <0x0005701a0eb0> (a org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter$SpillableIterator) at org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.next(UnsafeExternalRowSorter.java:187) at org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.next(UnsafeExternalRowSorter.java:174) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage10.findNextInnerJoinRows$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage10.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$2.hasNext(WholeStageCodegenExec.scala:638) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1073) at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1089) at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1127) at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224) at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.writeIteratorToStream(PythonUDFRunner.scala:50) at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:345) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2067) at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:194) "Executor task launch worker for task 2203": at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter$SpillableIterator.spill(UnsafeExternalSorter.java:525) - waiting to lock <0x0005701a0eb0> (a org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter$SpillableIterator) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:200) at
[jira] [Commented] (SPARK-26894) Fix Alias handling in AggregateEstimation
[ https://issues.apache.org/jira/browse/SPARK-26894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16798154#comment-16798154 ] Venkata krishnan Sowrirajan commented on SPARK-26894: - Thanks for merging the code, Takeshi Yamamuro On Thu, Mar 21, 2019, 1:29 AM Takeshi Yamamuro (JIRA) > Fix Alias handling in AggregateEstimation > - > > Key: SPARK-26894 > URL: https://issues.apache.org/jira/browse/SPARK-26894 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Venkata krishnan Sowrirajan >Assignee: Venkata krishnan Sowrirajan >Priority: Major > Fix For: 3.0.0 > > > Aliases are not handled separately in AggregateEstimation similar to > ProjectEstimation due to which stats are not getting propagated when CBO is > enabled. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26894) Fix Alias handling in AggregateEstimation
[ https://issues.apache.org/jira/browse/SPARK-26894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16769773#comment-16769773 ] Venkata krishnan Sowrirajan commented on SPARK-26894: - Need to assign this Jira to myself. How can I do that? > Fix Alias handling in AggregateEstimation > - > > Key: SPARK-26894 > URL: https://issues.apache.org/jira/browse/SPARK-26894 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Venkata krishnan Sowrirajan >Priority: Major > > Aliases are not handled separately in AggregateEstimation similar to > ProjectEstimation due to which stats are not getting propagated when CBO is > enabled. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26894) Fix Alias handling in AggregateEstimation
Venkata krishnan Sowrirajan created SPARK-26894: --- Summary: Fix Alias handling in AggregateEstimation Key: SPARK-26894 URL: https://issues.apache.org/jira/browse/SPARK-26894 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.4.0 Reporter: Venkata krishnan Sowrirajan Aliases are not handled separately in AggregateEstimation similar to ProjectEstimation due to which stats are not getting propagated when CBO is enabled. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-14307) Alluxio Epic JIRA
[ https://issues.apache.org/jira/browse/SPARK-14307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220423#comment-15220423 ] Venkata krishnan Sowrirajan commented on SPARK-14307: - Invalid JIRA. > Alluxio Epic JIRA > -- > > Key: SPARK-14307 > URL: https://issues.apache.org/jira/browse/SPARK-14307 > Project: Spark > Issue Type: Epic >Reporter: Venkata krishnan Sowrirajan > -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-14307) Alluxio Epic JIRA
Venkata krishnan Sowrirajan created SPARK-14307: --- Summary: Alluxio Epic JIRA Key: SPARK-14307 URL: https://issues.apache.org/jira/browse/SPARK-14307 Project: Spark Issue Type: Epic Reporter: Venkata krishnan Sowrirajan -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Closed] (SPARK-14307) Alluxio Epic JIRA
[ https://issues.apache.org/jira/browse/SPARK-14307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Venkata krishnan Sowrirajan closed SPARK-14307. --- Resolution: Fixed > Alluxio Epic JIRA > -- > > Key: SPARK-14307 > URL: https://issues.apache.org/jira/browse/SPARK-14307 > Project: Spark > Issue Type: Epic >Reporter: Venkata krishnan Sowrirajan > -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org