[jira] [Created] (SPARK-38093) Set shuffleMergeAllowed to false for a determinate stage after the stage is finalized

2022-02-02 Thread Venkata krishnan Sowrirajan (Jira)
Venkata krishnan Sowrirajan created SPARK-38093:
---

 Summary: Set shuffleMergeAllowed to false for a determinate stage 
after the stage is finalized
 Key: SPARK-38093
 URL: https://issues.apache.org/jira/browse/SPARK-38093
 Project: Spark
  Issue Type: Sub-task
  Components: Shuffle
Affects Versions: 3.2.1
Reporter: Venkata krishnan Sowrirajan


Currently we are setting shuffleMergeAllowed to false before 
prepareShuffleServicesForShuffleMapStage if the shuffle dependency is already 
finalized. Ideally it is better to do it right after shuffle dependency 
finalization for a determinate stage. cc [~mridulm80]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-38092) Check if shuffleMergeId is the same as the current stage's shuffleMergeId before registering MergeStatus

2022-02-02 Thread Venkata krishnan Sowrirajan (Jira)
Venkata krishnan Sowrirajan created SPARK-38092:
---

 Summary: Check if shuffleMergeId is the same as the current 
stage's shuffleMergeId before registering MergeStatus
 Key: SPARK-38092
 URL: https://issues.apache.org/jira/browse/SPARK-38092
 Project: Spark
  Issue Type: Sub-task
  Components: Shuffle
Affects Versions: 3.2.1
Reporter: Venkata krishnan Sowrirajan


Currently we have handled this in the handleShuffleMergeFinalized during 
finalization ensuring the finalize request is indeed for the current stage's 
shuffle dependency shuffleMergeId. The same check has to be done before 
registering merge statuses as well.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-38064) Push-based shuffle's internal implementation details should not be exposed as API

2022-01-28 Thread Venkata krishnan Sowrirajan (Jira)
Venkata krishnan Sowrirajan created SPARK-38064:
---

 Summary: Push-based shuffle's internal implementation details 
should not be exposed as API
 Key: SPARK-38064
 URL: https://issues.apache.org/jira/browse/SPARK-38064
 Project: Spark
  Issue Type: Sub-task
  Components: Shuffle
Affects Versions: 3.2.1
Reporter: Venkata krishnan Sowrirajan


Currently the changes added in Dependency.scala for Push-based shuffle are all 
exposed as Developer API. For eg: `newShuffleMergeState`, `setMergerLocs`, 
`shuffleMergeFinalized` etc Some of these might be just internal implementation 
details which should not be leaked. This JIRA Is to appropriately mark them 
private as needed.

cc [~mridulm80][~mshen][~csingh][~Ngone51][~tgraves]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-37964) Replace usages of slaveTracker to workerTracker in MapOutputTrackerSuite

2022-01-19 Thread Venkata krishnan Sowrirajan (Jira)
Venkata krishnan Sowrirajan created SPARK-37964:
---

 Summary: Replace usages of slaveTracker to workerTracker in 
MapOutputTrackerSuite
 Key: SPARK-37964
 URL: https://issues.apache.org/jira/browse/SPARK-37964
 Project: Spark
  Issue Type: Sub-task
  Components: Shuffle
Affects Versions: 3.2.0
Reporter: Venkata krishnan Sowrirajan






--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-36926) Discrepancy in Q22 of TPCH for Spark 3.2

2021-10-04 Thread Venkata krishnan Sowrirajan (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-36926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17424265#comment-17424265
 ] 

Venkata krishnan Sowrirajan commented on SPARK-36926:
-

This looks like a regression given the behavior of other engines like hive, 
presto, impala even spark 3.1 all return 7 records.

> Discrepancy in Q22 of TPCH for Spark 3.2
> 
>
> Key: SPARK-36926
> URL: https://issues.apache.org/jira/browse/SPARK-36926
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Aravind Patnam
>Priority: Critical
>
> When running TPCH scale 100 against 3.2, Query 22 has a discrepancy in the 
> number of rows returned by the query. This was tested with both AQE on and 
> off. All the other queries were matching in results. Below is the results 
> that we got when testing Q22 on 3.2: 
>  
> {code:java}
>   "results": [
> {
>   "name": "Q22",
>   "mode": "collect",
>   "parameters": {},
>   "joinTypes": [
> "SortMergeJoin"
>   ],
>   "tables": [
> "customer"
>   ],
>   "parsingTime": 0.016522,
>   "analysisTime": 0.004132,
>   "optimizationTime": 39.173868,
>   "planningTime": 23.10939,
>   "executionTime": 13762.183844,
>   "result": 0,
>   "breakDown": [],
>   "queryExecution": "== Parsed Logical Plan ==\n'Sort ['cntrycode ASC 
> NULLS FIRST], true\n+- 'Aggregate ['cntrycode], ['cntrycode, 'count(1) AS 
> numcust#150, 'sum('c_acctbal) AS totacctbal#151]\n   +- 'SubqueryAlias 
> custsale\n  +- 'Project ['substring('c_phone, 1, 2) AS cntrycode#147, 
> 'c_acctbal]\n +- 'Filter (('substring('c_phone, 1, 2) IN 
> (13,31,23,29,30,18,17) AND ('c_acctbal > scalar-subquery#148 [])) AND NOT 
> exists#149 [])\n:  :- 'Project [unresolvedalias('avg('c_acctbal), 
> None)]\n:  :  +- 'Filter (('c_acctbal > 0.00) AND 
> 'substring('c_phone, 1, 2) IN (13,31,23,29,30,18,17))\n:  : 
> +- 'UnresolvedRelation [customer], [], false\n:  +- 'Project 
> [*]\n: +- 'Filter ('o_custkey = 'c_custkey)\n:
> +- 'UnresolvedRelation [orders], [], false\n+- 
> 'UnresolvedRelation [customer], [], false\n\n== Analyzed Logical Plan 
> ==\ncntrycode: string, numcust: bigint, totacctbal: decimal(22,2)\nSort 
> [cntrycode#147 ASC NULLS FIRST], true\n+- Aggregate [cntrycode#147], 
> [cntrycode#147, count(1) AS numcust#150L, sum(c_acctbal#11) AS 
> totacctbal#151]\n   +- SubqueryAlias custsale\n  +- Project 
> [substring(c_phone#10, 1, 2) AS cntrycode#147, c_acctbal#11]\n +- 
> Filter ((substring(c_phone#10, 1, 2) IN (13,31,23,29,30,18,17) AND 
> (cast(c_acctbal#11 as decimal(16,6)) > cast(scalar-subquery#148 [] as 
> decimal(16,6 AND NOT exists#149 [c_custkey#6L])\n:  :- 
> Aggregate [avg(c_acctbal#160) AS avg(c_acctbal)#154]\n:  :  +- 
> Filter ((cast(c_acctbal#160 as decimal(12,2)) > cast(0.00 as decimal(12,2))) 
> AND substring(c_phone#159, 1, 2) IN (13,31,23,29,30,18,17))\n:  : 
> +- SubqueryAlias spark_catalog.tpch_data_orc_100.customer\n:  
> :+- Relation 
> tpch_data_orc_100.customer[c_custkey#155L,c_name#156,c_address#157,c_nationkey#158L,c_phone#159,c_acctbal#160,c_comment#161,c_mktsegment#162]
>  orc\n:  +- Project [o_orderkey#16L, o_custkey#17L, 
> o_orderstatus#18, o_totalprice#19, o_orderpriority#20, o_clerk#21, 
> o_shippriority#22, o_comment#23, o_orderdate#24]\n: +- Filter 
> (o_custkey#17L = outer(c_custkey#6L))\n:+- SubqueryAlias 
> spark_catalog.tpch_data_orc_100.orders\n:   +- Relation 
> tpch_data_orc_100.orders[o_orderkey#16L,o_custkey#17L,o_orderstatus#18,o_totalprice#19,o_orderpriority#20,o_clerk#21,o_shippriority#22,o_comment#23,o_orderdate#24]
>  orc\n+- SubqueryAlias spark_catalog.tpch_data_orc_100.customer\n 
>   +- Relation 
> tpch_data_orc_100.customer[c_custkey#6L,c_name#7,c_address#8,c_nationkey#9L,c_phone#10,c_acctbal#11,c_comment#12,c_mktsegment#13]
>  orc\n\n== Optimized Logical Plan ==\nSort [cntrycode#147 ASC NULLS FIRST], 
> true\n+- Aggregate [cntrycode#147], [cntrycode#147, count(1) AS numcust#150L, 
> sum(c_acctbal#11) AS totacctbal#151]\n   +- Project [substring(c_phone#10, 1, 
> 2) AS cntrycode#147, c_acctbal#11]\n  +- Join LeftAnti, (o_custkey#17L = 
> c_custkey#6L)\n :- Project [c_custkey#6L, c_phone#10, c_acctbal#11]\n 
> :  +- Filter ((isnotnull(c_acctbal#11) AND substring(c_phone#10, 1, 
> 2) IN (13,31,23,29,30,18,17)) AND (cast(c_acctbal#11 as decimal(16,6)) > 
> scalar-subquery#148 []))\n : :  +- Aggregate [avg(c_acctbal#160) 
> AS avg(c_acctbal)#154]\n 

[jira] [Comment Edited] (SPARK-36926) Discrepancy in Q22 of TPCH for Spark 3.2

2021-10-04 Thread Venkata krishnan Sowrirajan (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-36926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17424112#comment-17424112
 ] 

Venkata krishnan Sowrirajan edited comment on SPARK-36926 at 10/4/21, 7:44 PM:
---

I tried to get a simpler repro. 

{code:java}
 
import spark.implicits._
val buf = new scala.collection.mutable.ArrayBuffer[Double]
for (i <- 0.toDouble to .toDouble by 0.01) for (j <- 0 to 5) { buf += i }

val df = buf.toDF
df.selectExpr("cast(value as decimal(12, 2))").agg(avg("value")).show

Array[null]

df.selectExpr("cast(value as decimal(16, 2))").agg(avg("value")).show

+---+
| avg(value)|
+---+
|4999.50|
+---+
{code}


was (Author: vsowrirajan):
I tried to get a simpler repro. 

{code:java}
 
import spark.implicits._
val buf = new scala.collection.mutable.ArrayBuffer[Double]
for (i <- 0.toDouble to .toDouble by 0.01) for (j <- 0 to 5) \{ buf += i }

val df = buf.toDF
df.selectExpr("cast(value as decimal(12, 2))").agg(avg("value")).show

Array[null]

df.selectExpr("cast(value as decimal(16, 2))").agg(avg("value")).show

+---+
| avg(value)|
+---+
|4999.50|
+---+
{code}

> Discrepancy in Q22 of TPCH for Spark 3.2
> 
>
> Key: SPARK-36926
> URL: https://issues.apache.org/jira/browse/SPARK-36926
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Aravind Patnam
>Priority: Major
>
> When running TPCH scale 100 against 3.2, Query 22 has a discrepancy in the 
> number of rows returned by the query. This was tested with both AQE on and 
> off. All the other queries were matching in results. Below is the results 
> that we got when testing Q22 on 3.2: 
>  
> {code:java}
>   "results": [
> {
>   "name": "Q22",
>   "mode": "collect",
>   "parameters": {},
>   "joinTypes": [
> "SortMergeJoin"
>   ],
>   "tables": [
> "customer"
>   ],
>   "parsingTime": 0.016522,
>   "analysisTime": 0.004132,
>   "optimizationTime": 39.173868,
>   "planningTime": 23.10939,
>   "executionTime": 13762.183844,
>   "result": 0,
>   "breakDown": [],
>   "queryExecution": "== Parsed Logical Plan ==\n'Sort ['cntrycode ASC 
> NULLS FIRST], true\n+- 'Aggregate ['cntrycode], ['cntrycode, 'count(1) AS 
> numcust#150, 'sum('c_acctbal) AS totacctbal#151]\n   +- 'SubqueryAlias 
> custsale\n  +- 'Project ['substring('c_phone, 1, 2) AS cntrycode#147, 
> 'c_acctbal]\n +- 'Filter (('substring('c_phone, 1, 2) IN 
> (13,31,23,29,30,18,17) AND ('c_acctbal > scalar-subquery#148 [])) AND NOT 
> exists#149 [])\n:  :- 'Project [unresolvedalias('avg('c_acctbal), 
> None)]\n:  :  +- 'Filter (('c_acctbal > 0.00) AND 
> 'substring('c_phone, 1, 2) IN (13,31,23,29,30,18,17))\n:  : 
> +- 'UnresolvedRelation [customer], [], false\n:  +- 'Project 
> [*]\n: +- 'Filter ('o_custkey = 'c_custkey)\n:
> +- 'UnresolvedRelation [orders], [], false\n+- 
> 'UnresolvedRelation [customer], [], false\n\n== Analyzed Logical Plan 
> ==\ncntrycode: string, numcust: bigint, totacctbal: decimal(22,2)\nSort 
> [cntrycode#147 ASC NULLS FIRST], true\n+- Aggregate [cntrycode#147], 
> [cntrycode#147, count(1) AS numcust#150L, sum(c_acctbal#11) AS 
> totacctbal#151]\n   +- SubqueryAlias custsale\n  +- Project 
> [substring(c_phone#10, 1, 2) AS cntrycode#147, c_acctbal#11]\n +- 
> Filter ((substring(c_phone#10, 1, 2) IN (13,31,23,29,30,18,17) AND 
> (cast(c_acctbal#11 as decimal(16,6)) > cast(scalar-subquery#148 [] as 
> decimal(16,6 AND NOT exists#149 [c_custkey#6L])\n:  :- 
> Aggregate [avg(c_acctbal#160) AS avg(c_acctbal)#154]\n:  :  +- 
> Filter ((cast(c_acctbal#160 as decimal(12,2)) > cast(0.00 as decimal(12,2))) 
> AND substring(c_phone#159, 1, 2) IN (13,31,23,29,30,18,17))\n:  : 
> +- SubqueryAlias spark_catalog.tpch_data_orc_100.customer\n:  
> :+- Relation 
> tpch_data_orc_100.customer[c_custkey#155L,c_name#156,c_address#157,c_nationkey#158L,c_phone#159,c_acctbal#160,c_comment#161,c_mktsegment#162]
>  orc\n:  +- Project [o_orderkey#16L, o_custkey#17L, 
> o_orderstatus#18, o_totalprice#19, o_orderpriority#20, o_clerk#21, 
> o_shippriority#22, o_comment#23, o_orderdate#24]\n: +- Filter 
> (o_custkey#17L = outer(c_custkey#6L))\n:+- SubqueryAlias 
> spark_catalog.tpch_data_orc_100.orders\n:   +- Relation 
> tpch_data_orc_100.orders[o_orderkey#16L,o_custkey#17L,o_orderstatus#18,o_totalprice#19,o_orderpriority#20,o_clerk#21,o_shippriority#22,o_comment#23,o_orderdate#24]
>  orc\n+- SubqueryAlias 

[jira] [Commented] (SPARK-36926) Discrepancy in Q22 of TPCH for Spark 3.2

2021-10-04 Thread Venkata krishnan Sowrirajan (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-36926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17424112#comment-17424112
 ] 

Venkata krishnan Sowrirajan commented on SPARK-36926:
-

I tried to get a simpler repro. 

{code:java}
 
import spark.implicits._
val buf = new scala.collection.mutable.ArrayBuffer[Double]
for (i <- 0.toDouble to .toDouble by 0.01) for (j <- 0 to 5) \{ buf += i }

val df = buf.toDF
df.selectExpr("cast(value as decimal(12, 2))").agg(avg("value")).show

Array[null]

df.selectExpr("cast(value as decimal(16, 2))").agg(avg("value")).show

+---+
| avg(value)|
+---+
|4999.50|
+---+
{code}

> Discrepancy in Q22 of TPCH for Spark 3.2
> 
>
> Key: SPARK-36926
> URL: https://issues.apache.org/jira/browse/SPARK-36926
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Aravind Patnam
>Priority: Major
>
> When running TPCH scale 100 against 3.2, Query 22 has a discrepancy in the 
> number of rows returned by the query. This was tested with both AQE on and 
> off. All the other queries were matching in results. Below is the results 
> that we got when testing Q22 on 3.2: 
>  
> {code:java}
>   "results": [
> {
>   "name": "Q22",
>   "mode": "collect",
>   "parameters": {},
>   "joinTypes": [
> "SortMergeJoin"
>   ],
>   "tables": [
> "customer"
>   ],
>   "parsingTime": 0.016522,
>   "analysisTime": 0.004132,
>   "optimizationTime": 39.173868,
>   "planningTime": 23.10939,
>   "executionTime": 13762.183844,
>   "result": 0,
>   "breakDown": [],
>   "queryExecution": "== Parsed Logical Plan ==\n'Sort ['cntrycode ASC 
> NULLS FIRST], true\n+- 'Aggregate ['cntrycode], ['cntrycode, 'count(1) AS 
> numcust#150, 'sum('c_acctbal) AS totacctbal#151]\n   +- 'SubqueryAlias 
> custsale\n  +- 'Project ['substring('c_phone, 1, 2) AS cntrycode#147, 
> 'c_acctbal]\n +- 'Filter (('substring('c_phone, 1, 2) IN 
> (13,31,23,29,30,18,17) AND ('c_acctbal > scalar-subquery#148 [])) AND NOT 
> exists#149 [])\n:  :- 'Project [unresolvedalias('avg('c_acctbal), 
> None)]\n:  :  +- 'Filter (('c_acctbal > 0.00) AND 
> 'substring('c_phone, 1, 2) IN (13,31,23,29,30,18,17))\n:  : 
> +- 'UnresolvedRelation [customer], [], false\n:  +- 'Project 
> [*]\n: +- 'Filter ('o_custkey = 'c_custkey)\n:
> +- 'UnresolvedRelation [orders], [], false\n+- 
> 'UnresolvedRelation [customer], [], false\n\n== Analyzed Logical Plan 
> ==\ncntrycode: string, numcust: bigint, totacctbal: decimal(22,2)\nSort 
> [cntrycode#147 ASC NULLS FIRST], true\n+- Aggregate [cntrycode#147], 
> [cntrycode#147, count(1) AS numcust#150L, sum(c_acctbal#11) AS 
> totacctbal#151]\n   +- SubqueryAlias custsale\n  +- Project 
> [substring(c_phone#10, 1, 2) AS cntrycode#147, c_acctbal#11]\n +- 
> Filter ((substring(c_phone#10, 1, 2) IN (13,31,23,29,30,18,17) AND 
> (cast(c_acctbal#11 as decimal(16,6)) > cast(scalar-subquery#148 [] as 
> decimal(16,6 AND NOT exists#149 [c_custkey#6L])\n:  :- 
> Aggregate [avg(c_acctbal#160) AS avg(c_acctbal)#154]\n:  :  +- 
> Filter ((cast(c_acctbal#160 as decimal(12,2)) > cast(0.00 as decimal(12,2))) 
> AND substring(c_phone#159, 1, 2) IN (13,31,23,29,30,18,17))\n:  : 
> +- SubqueryAlias spark_catalog.tpch_data_orc_100.customer\n:  
> :+- Relation 
> tpch_data_orc_100.customer[c_custkey#155L,c_name#156,c_address#157,c_nationkey#158L,c_phone#159,c_acctbal#160,c_comment#161,c_mktsegment#162]
>  orc\n:  +- Project [o_orderkey#16L, o_custkey#17L, 
> o_orderstatus#18, o_totalprice#19, o_orderpriority#20, o_clerk#21, 
> o_shippriority#22, o_comment#23, o_orderdate#24]\n: +- Filter 
> (o_custkey#17L = outer(c_custkey#6L))\n:+- SubqueryAlias 
> spark_catalog.tpch_data_orc_100.orders\n:   +- Relation 
> tpch_data_orc_100.orders[o_orderkey#16L,o_custkey#17L,o_orderstatus#18,o_totalprice#19,o_orderpriority#20,o_clerk#21,o_shippriority#22,o_comment#23,o_orderdate#24]
>  orc\n+- SubqueryAlias spark_catalog.tpch_data_orc_100.customer\n 
>   +- Relation 
> tpch_data_orc_100.customer[c_custkey#6L,c_name#7,c_address#8,c_nationkey#9L,c_phone#10,c_acctbal#11,c_comment#12,c_mktsegment#13]
>  orc\n\n== Optimized Logical Plan ==\nSort [cntrycode#147 ASC NULLS FIRST], 
> true\n+- Aggregate [cntrycode#147], [cntrycode#147, count(1) AS numcust#150L, 
> sum(c_acctbal#11) AS totacctbal#151]\n   +- Project [substring(c_phone#10, 1, 
> 2) AS cntrycode#147, c_acctbal#11]\n  +- Join LeftAnti, (o_custkey#17L = 
> c_custkey#6L)\n 

[jira] [Comment Edited] (SPARK-36926) Discrepancy in Q22 of TPCH for Spark 3.2

2021-10-04 Thread Venkata krishnan Sowrirajan (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-36926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17424080#comment-17424080
 ] 

Venkata krishnan Sowrirajan edited comment on SPARK-36926 at 10/4/21, 6:53 PM:
---

The issue seems to be with this subquery part of q22.
{code:java}
sql("""select avg(c_acctbal) from customer where c_acctbal > 0.00 and 
substr(c_phone, 1, 2) in ('13', '31', '23', '29', '30', '18', '17')""").collect
Array([null])
{code}
 
{code:java}
sql("""select sum(c_acctbal)/count(c_acctbal) from customer where c_acctbal > 
0.00 and substr(c_phone, 1, 2) in ('13', '31', '23', '29', '30', '18', 
'17')""").collect
Array([4998.769056963132322922]){code}
It seems like there is some behavior change wrt 3.2.


was (Author: vsowrirajan):
The issue seems to be with this subquery part of q22.
{code:java}
sql("""select avg(c_acctbal) from customer where c_acctbal > 0.00 and 
substr(c_phone, 1, 2) in ('13', '31', '23', '29', '30', '18', '17')""").collect
Array([null])
{code}
 
{code:java}
sql("""select sum(c_acctbal)/count(c_acctbal) from customer where c_acctbal > 
0.00 and substr(c_phone, 1, 2) in ('13', '31', '23', '29', '30', '18', 
'17')""").collect
Array([4998.769056963132322922]){code}
It seems like there is some behavior change wrt 3.2 handling nulls.

> Discrepancy in Q22 of TPCH for Spark 3.2
> 
>
> Key: SPARK-36926
> URL: https://issues.apache.org/jira/browse/SPARK-36926
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Aravind Patnam
>Priority: Major
>
> When running TPCH scale 100 against 3.2, Query 22 has a discrepancy in the 
> number of rows returned by the query. This was tested with both AQE on and 
> off. All the other queries were matching in results. Below is the results 
> that we got when testing Q22 on 3.2: 
>  
> {code:java}
>   "results": [
> {
>   "name": "Q22",
>   "mode": "collect",
>   "parameters": {},
>   "joinTypes": [
> "SortMergeJoin"
>   ],
>   "tables": [
> "customer"
>   ],
>   "parsingTime": 0.016522,
>   "analysisTime": 0.004132,
>   "optimizationTime": 39.173868,
>   "planningTime": 23.10939,
>   "executionTime": 13762.183844,
>   "result": 0,
>   "breakDown": [],
>   "queryExecution": "== Parsed Logical Plan ==\n'Sort ['cntrycode ASC 
> NULLS FIRST], true\n+- 'Aggregate ['cntrycode], ['cntrycode, 'count(1) AS 
> numcust#150, 'sum('c_acctbal) AS totacctbal#151]\n   +- 'SubqueryAlias 
> custsale\n  +- 'Project ['substring('c_phone, 1, 2) AS cntrycode#147, 
> 'c_acctbal]\n +- 'Filter (('substring('c_phone, 1, 2) IN 
> (13,31,23,29,30,18,17) AND ('c_acctbal > scalar-subquery#148 [])) AND NOT 
> exists#149 [])\n:  :- 'Project [unresolvedalias('avg('c_acctbal), 
> None)]\n:  :  +- 'Filter (('c_acctbal > 0.00) AND 
> 'substring('c_phone, 1, 2) IN (13,31,23,29,30,18,17))\n:  : 
> +- 'UnresolvedRelation [customer], [], false\n:  +- 'Project 
> [*]\n: +- 'Filter ('o_custkey = 'c_custkey)\n:
> +- 'UnresolvedRelation [orders], [], false\n+- 
> 'UnresolvedRelation [customer], [], false\n\n== Analyzed Logical Plan 
> ==\ncntrycode: string, numcust: bigint, totacctbal: decimal(22,2)\nSort 
> [cntrycode#147 ASC NULLS FIRST], true\n+- Aggregate [cntrycode#147], 
> [cntrycode#147, count(1) AS numcust#150L, sum(c_acctbal#11) AS 
> totacctbal#151]\n   +- SubqueryAlias custsale\n  +- Project 
> [substring(c_phone#10, 1, 2) AS cntrycode#147, c_acctbal#11]\n +- 
> Filter ((substring(c_phone#10, 1, 2) IN (13,31,23,29,30,18,17) AND 
> (cast(c_acctbal#11 as decimal(16,6)) > cast(scalar-subquery#148 [] as 
> decimal(16,6 AND NOT exists#149 [c_custkey#6L])\n:  :- 
> Aggregate [avg(c_acctbal#160) AS avg(c_acctbal)#154]\n:  :  +- 
> Filter ((cast(c_acctbal#160 as decimal(12,2)) > cast(0.00 as decimal(12,2))) 
> AND substring(c_phone#159, 1, 2) IN (13,31,23,29,30,18,17))\n:  : 
> +- SubqueryAlias spark_catalog.tpch_data_orc_100.customer\n:  
> :+- Relation 
> tpch_data_orc_100.customer[c_custkey#155L,c_name#156,c_address#157,c_nationkey#158L,c_phone#159,c_acctbal#160,c_comment#161,c_mktsegment#162]
>  orc\n:  +- Project [o_orderkey#16L, o_custkey#17L, 
> o_orderstatus#18, o_totalprice#19, o_orderpriority#20, o_clerk#21, 
> o_shippriority#22, o_comment#23, o_orderdate#24]\n: +- Filter 
> (o_custkey#17L = outer(c_custkey#6L))\n:+- SubqueryAlias 
> spark_catalog.tpch_data_orc_100.orders\n:   +- Relation 
> 

[jira] [Comment Edited] (SPARK-36926) Discrepancy in Q22 of TPCH for Spark 3.2

2021-10-04 Thread Venkata krishnan Sowrirajan (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-36926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17424080#comment-17424080
 ] 

Venkata krishnan Sowrirajan edited comment on SPARK-36926 at 10/4/21, 6:05 PM:
---

The issue seems to be with this subquery part of q22.
{code:java}
sql("""select avg(c_acctbal) from customer where c_acctbal > 0.00 and 
substr(c_phone, 1, 2) in ('13', '31', '23', '29', '30', '18', '17')""").collect
Array([null])
{code}
 
{code:java}
sql("""select sum(c_acctbal)/count(c_acctbal) from customer where c_acctbal > 
0.00 and substr(c_phone, 1, 2) in ('13', '31', '23', '29', '30', '18', 
'17')""").collect
Array([4998.769056963132322922]){code}
It seems like there is some behavior change wrt 3.2 handling nulls.


was (Author: vsowrirajan):
The issue seems to be with this subquery part of q22.
{code:java}
sql("""select avg(c_acctbal) from customer where c_acctbal > 0.00 and 
substr(c_phone, 1, 2) in ('13', '31', '23', '29', '30', '18', '17')""").collect
Array([null])
{code}
 
{code:java}
sql("""select sum(c_acctbal)/count(c_acctbal) from customer where c_acctbal > 
0.00 and substr(c_phone, 1, 2) in ('13', '31', '23', '29', '30', '18', 
'17')""").collect
 Array([4998.769056963132322922]){code}

 It seems like there is some behavior change wrt 3.2 handling nulls.

> Discrepancy in Q22 of TPCH for Spark 3.2
> 
>
> Key: SPARK-36926
> URL: https://issues.apache.org/jira/browse/SPARK-36926
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Aravind Patnam
>Priority: Major
>
> When running TPCH scale 100 against 3.2, Query 22 has a discrepancy in the 
> number of rows returned by the query. This was tested with both AQE on and 
> off. All the other queries were matching in results. Below is the results 
> that we got when testing Q22 on 3.2: 
>  
> {code:java}
>   "results": [
> {
>   "name": "Q22",
>   "mode": "collect",
>   "parameters": {},
>   "joinTypes": [
> "SortMergeJoin"
>   ],
>   "tables": [
> "customer"
>   ],
>   "parsingTime": 0.016522,
>   "analysisTime": 0.004132,
>   "optimizationTime": 39.173868,
>   "planningTime": 23.10939,
>   "executionTime": 13762.183844,
>   "result": 0,
>   "breakDown": [],
>   "queryExecution": "== Parsed Logical Plan ==\n'Sort ['cntrycode ASC 
> NULLS FIRST], true\n+- 'Aggregate ['cntrycode], ['cntrycode, 'count(1) AS 
> numcust#150, 'sum('c_acctbal) AS totacctbal#151]\n   +- 'SubqueryAlias 
> custsale\n  +- 'Project ['substring('c_phone, 1, 2) AS cntrycode#147, 
> 'c_acctbal]\n +- 'Filter (('substring('c_phone, 1, 2) IN 
> (13,31,23,29,30,18,17) AND ('c_acctbal > scalar-subquery#148 [])) AND NOT 
> exists#149 [])\n:  :- 'Project [unresolvedalias('avg('c_acctbal), 
> None)]\n:  :  +- 'Filter (('c_acctbal > 0.00) AND 
> 'substring('c_phone, 1, 2) IN (13,31,23,29,30,18,17))\n:  : 
> +- 'UnresolvedRelation [customer], [], false\n:  +- 'Project 
> [*]\n: +- 'Filter ('o_custkey = 'c_custkey)\n:
> +- 'UnresolvedRelation [orders], [], false\n+- 
> 'UnresolvedRelation [customer], [], false\n\n== Analyzed Logical Plan 
> ==\ncntrycode: string, numcust: bigint, totacctbal: decimal(22,2)\nSort 
> [cntrycode#147 ASC NULLS FIRST], true\n+- Aggregate [cntrycode#147], 
> [cntrycode#147, count(1) AS numcust#150L, sum(c_acctbal#11) AS 
> totacctbal#151]\n   +- SubqueryAlias custsale\n  +- Project 
> [substring(c_phone#10, 1, 2) AS cntrycode#147, c_acctbal#11]\n +- 
> Filter ((substring(c_phone#10, 1, 2) IN (13,31,23,29,30,18,17) AND 
> (cast(c_acctbal#11 as decimal(16,6)) > cast(scalar-subquery#148 [] as 
> decimal(16,6 AND NOT exists#149 [c_custkey#6L])\n:  :- 
> Aggregate [avg(c_acctbal#160) AS avg(c_acctbal)#154]\n:  :  +- 
> Filter ((cast(c_acctbal#160 as decimal(12,2)) > cast(0.00 as decimal(12,2))) 
> AND substring(c_phone#159, 1, 2) IN (13,31,23,29,30,18,17))\n:  : 
> +- SubqueryAlias spark_catalog.tpch_data_orc_100.customer\n:  
> :+- Relation 
> tpch_data_orc_100.customer[c_custkey#155L,c_name#156,c_address#157,c_nationkey#158L,c_phone#159,c_acctbal#160,c_comment#161,c_mktsegment#162]
>  orc\n:  +- Project [o_orderkey#16L, o_custkey#17L, 
> o_orderstatus#18, o_totalprice#19, o_orderpriority#20, o_clerk#21, 
> o_shippriority#22, o_comment#23, o_orderdate#24]\n: +- Filter 
> (o_custkey#17L = outer(c_custkey#6L))\n:+- SubqueryAlias 
> spark_catalog.tpch_data_orc_100.orders\n:   +- Relation 
> 

[jira] [Commented] (SPARK-36926) Discrepancy in Q22 of TPCH for Spark 3.2

2021-10-04 Thread Venkata krishnan Sowrirajan (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-36926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17424080#comment-17424080
 ] 

Venkata krishnan Sowrirajan commented on SPARK-36926:
-

The issue seems to be with this subquery part of q22.
sql("""select avg(c_acctbal) from customer where c_acctbal > 0.00 and 
substr(c_phone, 1, 2) in ('13', '31', '23', '29', '30', '18', '17')""").collect

Array([null])
sql("""select sum(c_acctbal)/count(c_acctbal) from customer where c_acctbal > 
0.00 and substr(c_phone, 1, 2) in ('13', '31', '23', '29', '30', '18', 
'17')""").collect
Array([4998.769056963132322922])
It seems like there is some behavior change wrt 3.2 handling nulls.

> Discrepancy in Q22 of TPCH for Spark 3.2
> 
>
> Key: SPARK-36926
> URL: https://issues.apache.org/jira/browse/SPARK-36926
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Aravind Patnam
>Priority: Major
>
> When running TPCH scale 100 against 3.2, Query 22 has a discrepancy in the 
> number of rows returned by the query. This was tested with both AQE on and 
> off. All the other queries were matching in results. Below is the results 
> that we got when testing Q22 on 3.2: 
>  
> {code:java}
>   "results": [
> {
>   "name": "Q22",
>   "mode": "collect",
>   "parameters": {},
>   "joinTypes": [
> "SortMergeJoin"
>   ],
>   "tables": [
> "customer"
>   ],
>   "parsingTime": 0.016522,
>   "analysisTime": 0.004132,
>   "optimizationTime": 39.173868,
>   "planningTime": 23.10939,
>   "executionTime": 13762.183844,
>   "result": 0,
>   "breakDown": [],
>   "queryExecution": "== Parsed Logical Plan ==\n'Sort ['cntrycode ASC 
> NULLS FIRST], true\n+- 'Aggregate ['cntrycode], ['cntrycode, 'count(1) AS 
> numcust#150, 'sum('c_acctbal) AS totacctbal#151]\n   +- 'SubqueryAlias 
> custsale\n  +- 'Project ['substring('c_phone, 1, 2) AS cntrycode#147, 
> 'c_acctbal]\n +- 'Filter (('substring('c_phone, 1, 2) IN 
> (13,31,23,29,30,18,17) AND ('c_acctbal > scalar-subquery#148 [])) AND NOT 
> exists#149 [])\n:  :- 'Project [unresolvedalias('avg('c_acctbal), 
> None)]\n:  :  +- 'Filter (('c_acctbal > 0.00) AND 
> 'substring('c_phone, 1, 2) IN (13,31,23,29,30,18,17))\n:  : 
> +- 'UnresolvedRelation [customer], [], false\n:  +- 'Project 
> [*]\n: +- 'Filter ('o_custkey = 'c_custkey)\n:
> +- 'UnresolvedRelation [orders], [], false\n+- 
> 'UnresolvedRelation [customer], [], false\n\n== Analyzed Logical Plan 
> ==\ncntrycode: string, numcust: bigint, totacctbal: decimal(22,2)\nSort 
> [cntrycode#147 ASC NULLS FIRST], true\n+- Aggregate [cntrycode#147], 
> [cntrycode#147, count(1) AS numcust#150L, sum(c_acctbal#11) AS 
> totacctbal#151]\n   +- SubqueryAlias custsale\n  +- Project 
> [substring(c_phone#10, 1, 2) AS cntrycode#147, c_acctbal#11]\n +- 
> Filter ((substring(c_phone#10, 1, 2) IN (13,31,23,29,30,18,17) AND 
> (cast(c_acctbal#11 as decimal(16,6)) > cast(scalar-subquery#148 [] as 
> decimal(16,6 AND NOT exists#149 [c_custkey#6L])\n:  :- 
> Aggregate [avg(c_acctbal#160) AS avg(c_acctbal)#154]\n:  :  +- 
> Filter ((cast(c_acctbal#160 as decimal(12,2)) > cast(0.00 as decimal(12,2))) 
> AND substring(c_phone#159, 1, 2) IN (13,31,23,29,30,18,17))\n:  : 
> +- SubqueryAlias spark_catalog.tpch_data_orc_100.customer\n:  
> :+- Relation 
> tpch_data_orc_100.customer[c_custkey#155L,c_name#156,c_address#157,c_nationkey#158L,c_phone#159,c_acctbal#160,c_comment#161,c_mktsegment#162]
>  orc\n:  +- Project [o_orderkey#16L, o_custkey#17L, 
> o_orderstatus#18, o_totalprice#19, o_orderpriority#20, o_clerk#21, 
> o_shippriority#22, o_comment#23, o_orderdate#24]\n: +- Filter 
> (o_custkey#17L = outer(c_custkey#6L))\n:+- SubqueryAlias 
> spark_catalog.tpch_data_orc_100.orders\n:   +- Relation 
> tpch_data_orc_100.orders[o_orderkey#16L,o_custkey#17L,o_orderstatus#18,o_totalprice#19,o_orderpriority#20,o_clerk#21,o_shippriority#22,o_comment#23,o_orderdate#24]
>  orc\n+- SubqueryAlias spark_catalog.tpch_data_orc_100.customer\n 
>   +- Relation 
> tpch_data_orc_100.customer[c_custkey#6L,c_name#7,c_address#8,c_nationkey#9L,c_phone#10,c_acctbal#11,c_comment#12,c_mktsegment#13]
>  orc\n\n== Optimized Logical Plan ==\nSort [cntrycode#147 ASC NULLS FIRST], 
> true\n+- Aggregate [cntrycode#147], [cntrycode#147, count(1) AS numcust#150L, 
> sum(c_acctbal#11) AS totacctbal#151]\n   +- Project [substring(c_phone#10, 1, 
> 2) AS cntrycode#147, c_acctbal#11]\n  +- Join LeftAnti, 

[jira] [Comment Edited] (SPARK-36926) Discrepancy in Q22 of TPCH for Spark 3.2

2021-10-04 Thread Venkata krishnan Sowrirajan (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-36926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17424080#comment-17424080
 ] 

Venkata krishnan Sowrirajan edited comment on SPARK-36926 at 10/4/21, 6:04 PM:
---

The issue seems to be with this subquery part of q22.
{code:java}
sql("""select avg(c_acctbal) from customer where c_acctbal > 0.00 and 
substr(c_phone, 1, 2) in ('13', '31', '23', '29', '30', '18', '17')""").collect
Array([null])
{code}
 
{code:java}
sql("""select sum(c_acctbal)/count(c_acctbal) from customer where c_acctbal > 
0.00 and substr(c_phone, 1, 2) in ('13', '31', '23', '29', '30', '18', 
'17')""").collect
 Array([4998.769056963132322922]){code}

 It seems like there is some behavior change wrt 3.2 handling nulls.


was (Author: vsowrirajan):
The issue seems to be with this subquery part of q22.
sql("""select avg(c_acctbal) from customer where c_acctbal > 0.00 and 
substr(c_phone, 1, 2) in ('13', '31', '23', '29', '30', '18', '17')""").collect

Array([null])
sql("""select sum(c_acctbal)/count(c_acctbal) from customer where c_acctbal > 
0.00 and substr(c_phone, 1, 2) in ('13', '31', '23', '29', '30', '18', 
'17')""").collect
Array([4998.769056963132322922])
It seems like there is some behavior change wrt 3.2 handling nulls.

> Discrepancy in Q22 of TPCH for Spark 3.2
> 
>
> Key: SPARK-36926
> URL: https://issues.apache.org/jira/browse/SPARK-36926
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Aravind Patnam
>Priority: Major
>
> When running TPCH scale 100 against 3.2, Query 22 has a discrepancy in the 
> number of rows returned by the query. This was tested with both AQE on and 
> off. All the other queries were matching in results. Below is the results 
> that we got when testing Q22 on 3.2: 
>  
> {code:java}
>   "results": [
> {
>   "name": "Q22",
>   "mode": "collect",
>   "parameters": {},
>   "joinTypes": [
> "SortMergeJoin"
>   ],
>   "tables": [
> "customer"
>   ],
>   "parsingTime": 0.016522,
>   "analysisTime": 0.004132,
>   "optimizationTime": 39.173868,
>   "planningTime": 23.10939,
>   "executionTime": 13762.183844,
>   "result": 0,
>   "breakDown": [],
>   "queryExecution": "== Parsed Logical Plan ==\n'Sort ['cntrycode ASC 
> NULLS FIRST], true\n+- 'Aggregate ['cntrycode], ['cntrycode, 'count(1) AS 
> numcust#150, 'sum('c_acctbal) AS totacctbal#151]\n   +- 'SubqueryAlias 
> custsale\n  +- 'Project ['substring('c_phone, 1, 2) AS cntrycode#147, 
> 'c_acctbal]\n +- 'Filter (('substring('c_phone, 1, 2) IN 
> (13,31,23,29,30,18,17) AND ('c_acctbal > scalar-subquery#148 [])) AND NOT 
> exists#149 [])\n:  :- 'Project [unresolvedalias('avg('c_acctbal), 
> None)]\n:  :  +- 'Filter (('c_acctbal > 0.00) AND 
> 'substring('c_phone, 1, 2) IN (13,31,23,29,30,18,17))\n:  : 
> +- 'UnresolvedRelation [customer], [], false\n:  +- 'Project 
> [*]\n: +- 'Filter ('o_custkey = 'c_custkey)\n:
> +- 'UnresolvedRelation [orders], [], false\n+- 
> 'UnresolvedRelation [customer], [], false\n\n== Analyzed Logical Plan 
> ==\ncntrycode: string, numcust: bigint, totacctbal: decimal(22,2)\nSort 
> [cntrycode#147 ASC NULLS FIRST], true\n+- Aggregate [cntrycode#147], 
> [cntrycode#147, count(1) AS numcust#150L, sum(c_acctbal#11) AS 
> totacctbal#151]\n   +- SubqueryAlias custsale\n  +- Project 
> [substring(c_phone#10, 1, 2) AS cntrycode#147, c_acctbal#11]\n +- 
> Filter ((substring(c_phone#10, 1, 2) IN (13,31,23,29,30,18,17) AND 
> (cast(c_acctbal#11 as decimal(16,6)) > cast(scalar-subquery#148 [] as 
> decimal(16,6 AND NOT exists#149 [c_custkey#6L])\n:  :- 
> Aggregate [avg(c_acctbal#160) AS avg(c_acctbal)#154]\n:  :  +- 
> Filter ((cast(c_acctbal#160 as decimal(12,2)) > cast(0.00 as decimal(12,2))) 
> AND substring(c_phone#159, 1, 2) IN (13,31,23,29,30,18,17))\n:  : 
> +- SubqueryAlias spark_catalog.tpch_data_orc_100.customer\n:  
> :+- Relation 
> tpch_data_orc_100.customer[c_custkey#155L,c_name#156,c_address#157,c_nationkey#158L,c_phone#159,c_acctbal#160,c_comment#161,c_mktsegment#162]
>  orc\n:  +- Project [o_orderkey#16L, o_custkey#17L, 
> o_orderstatus#18, o_totalprice#19, o_orderpriority#20, o_clerk#21, 
> o_shippriority#22, o_comment#23, o_orderdate#24]\n: +- Filter 
> (o_custkey#17L = outer(c_custkey#6L))\n:+- SubqueryAlias 
> spark_catalog.tpch_data_orc_100.orders\n:   +- Relation 
> 

[jira] [Updated] (SPARK-36926) Discrepancy in Q22 of TPCH for Spark 3.2

2021-10-04 Thread Venkata krishnan Sowrirajan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Venkata krishnan Sowrirajan updated SPARK-36926:

Description: 
When running TPCH scale 100 against 3.2, Query 22 has a discrepancy in the 
number of rows returned by the query. This was tested with both AQE on and off. 
All the other queries were matching in results. Below is the results that we 
got when testing Q22 on 3.2: 

 
{code:java}
  "results": [
{
  "name": "Q22",
  "mode": "collect",
  "parameters": {},
  "joinTypes": [
"SortMergeJoin"
  ],
  "tables": [
"customer"
  ],
  "parsingTime": 0.016522,
  "analysisTime": 0.004132,
  "optimizationTime": 39.173868,
  "planningTime": 23.10939,
  "executionTime": 13762.183844,
  "result": 0,
  "breakDown": [],
  "queryExecution": "== Parsed Logical Plan ==\n'Sort ['cntrycode ASC NULLS 
FIRST], true\n+- 'Aggregate ['cntrycode], ['cntrycode, 'count(1) AS 
numcust#150, 'sum('c_acctbal) AS totacctbal#151]\n   +- 'SubqueryAlias 
custsale\n  +- 'Project ['substring('c_phone, 1, 2) AS cntrycode#147, 
'c_acctbal]\n +- 'Filter (('substring('c_phone, 1, 2) IN 
(13,31,23,29,30,18,17) AND ('c_acctbal > scalar-subquery#148 [])) AND NOT 
exists#149 [])\n:  :- 'Project [unresolvedalias('avg('c_acctbal), 
None)]\n:  :  +- 'Filter (('c_acctbal > 0.00) AND 
'substring('c_phone, 1, 2) IN (13,31,23,29,30,18,17))\n:  : +- 
'UnresolvedRelation [customer], [], false\n:  +- 'Project [*]\n 
   : +- 'Filter ('o_custkey = 'c_custkey)\n:+- 
'UnresolvedRelation [orders], [], false\n+- 'UnresolvedRelation 
[customer], [], false\n\n== Analyzed Logical Plan ==\ncntrycode: string, 
numcust: bigint, totacctbal: decimal(22,2)\nSort [cntrycode#147 ASC NULLS 
FIRST], true\n+- Aggregate [cntrycode#147], [cntrycode#147, count(1) AS 
numcust#150L, sum(c_acctbal#11) AS totacctbal#151]\n   +- SubqueryAlias 
custsale\n  +- Project [substring(c_phone#10, 1, 2) AS cntrycode#147, 
c_acctbal#11]\n +- Filter ((substring(c_phone#10, 1, 2) IN 
(13,31,23,29,30,18,17) AND (cast(c_acctbal#11 as decimal(16,6)) > 
cast(scalar-subquery#148 [] as decimal(16,6 AND NOT exists#149 
[c_custkey#6L])\n:  :- Aggregate [avg(c_acctbal#160) AS 
avg(c_acctbal)#154]\n:  :  +- Filter ((cast(c_acctbal#160 as 
decimal(12,2)) > cast(0.00 as decimal(12,2))) AND substring(c_phone#159, 1, 2) 
IN (13,31,23,29,30,18,17))\n:  : +- SubqueryAlias 
spark_catalog.tpch_data_orc_100.customer\n:  :+- Relation 
tpch_data_orc_100.customer[c_custkey#155L,c_name#156,c_address#157,c_nationkey#158L,c_phone#159,c_acctbal#160,c_comment#161,c_mktsegment#162]
 orc\n:  +- Project [o_orderkey#16L, o_custkey#17L, 
o_orderstatus#18, o_totalprice#19, o_orderpriority#20, o_clerk#21, 
o_shippriority#22, o_comment#23, o_orderdate#24]\n: +- Filter 
(o_custkey#17L = outer(c_custkey#6L))\n:+- SubqueryAlias 
spark_catalog.tpch_data_orc_100.orders\n:   +- Relation 
tpch_data_orc_100.orders[o_orderkey#16L,o_custkey#17L,o_orderstatus#18,o_totalprice#19,o_orderpriority#20,o_clerk#21,o_shippriority#22,o_comment#23,o_orderdate#24]
 orc\n+- SubqueryAlias spark_catalog.tpch_data_orc_100.customer\n   
+- Relation 
tpch_data_orc_100.customer[c_custkey#6L,c_name#7,c_address#8,c_nationkey#9L,c_phone#10,c_acctbal#11,c_comment#12,c_mktsegment#13]
 orc\n\n== Optimized Logical Plan ==\nSort [cntrycode#147 ASC NULLS FIRST], 
true\n+- Aggregate [cntrycode#147], [cntrycode#147, count(1) AS numcust#150L, 
sum(c_acctbal#11) AS totacctbal#151]\n   +- Project [substring(c_phone#10, 1, 
2) AS cntrycode#147, c_acctbal#11]\n  +- Join LeftAnti, (o_custkey#17L = 
c_custkey#6L)\n :- Project [c_custkey#6L, c_phone#10, c_acctbal#11]\n   
  :  +- Filter ((isnotnull(c_acctbal#11) AND substring(c_phone#10, 1, 2) IN 
(13,31,23,29,30,18,17)) AND (cast(c_acctbal#11 as decimal(16,6)) > 
scalar-subquery#148 []))\n : :  +- Aggregate [avg(c_acctbal#160) AS 
avg(c_acctbal)#154]\n : : +- Project [c_acctbal#160]\n 
: :+- Filter (isnotnull(c_acctbal#160) AND ((c_acctbal#160 > 0.00) 
AND substring(c_phone#159, 1, 2) IN (13,31,23,29,30,18,17)))\n : :  
 +- Relation 
tpch_data_orc_100.customer[c_custkey#155L,c_name#156,c_address#157,c_nationkey#158L,c_phone#159,c_acctbal#160,c_comment#161,c_mktsegment#162]
 orc\n : +- Relation 
tpch_data_orc_100.customer[c_custkey#6L,c_name#7,c_address#8,c_nationkey#9L,c_phone#10,c_acctbal#11,c_comment#12,c_mktsegment#13]
 orc\n +- Project [o_custkey#17L]\n+- Relation 

[jira] [Created] (SPARK-36908) Document speculation metrics added as part of SPARK-36038

2021-10-01 Thread Venkata krishnan Sowrirajan (Jira)
Venkata krishnan Sowrirajan created SPARK-36908:
---

 Summary: Document speculation metrics added as part of SPARK-36038
 Key: SPARK-36908
 URL: https://issues.apache.org/jira/browse/SPARK-36908
 Project: Spark
  Issue Type: Task
  Components: Spark Core
Affects Versions: 3.2.0
Reporter: Venkata krishnan Sowrirajan






--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-36810) Handle HDSF read inconsistencies on Spark when observer Namenode is used

2021-09-20 Thread Venkata krishnan Sowrirajan (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-36810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17417769#comment-17417769
 ] 

Venkata krishnan Sowrirajan commented on SPARK-36810:
-

I am currently looking into this issue, JFYI.

> Handle HDSF read inconsistencies on Spark when observer Namenode is used
> 
>
> Key: SPARK-36810
> URL: https://issues.apache.org/jira/browse/SPARK-36810
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, SQL
>Affects Versions: 3.2.0
>Reporter: Venkata krishnan Sowrirajan
>Priority: Major
>
> In short, with HDFS HA and with the use of [Observer 
> Namenode|[https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/ObserverNameNode.html],]
>  the read-after-write consistency is only available when both the write and 
> the read happens from the same client.
> But if the write happens on executor and the read happens on the driver, then 
> the reads would be inconsistent causing application failure issues. This can 
> be fixed by calling `FileSystem.msync` before making any read calls where the 
> client thinks the write could have possibly happened elsewhere.
> This issue is discussed in greater detail in this 
> [discussion|https://mail-archives.apache.org/mod_mbox/spark-dev/202108.mbox/browser]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-36810) Handle HDSF read inconsistencies on Spark when observer Namenode is used

2021-09-20 Thread Venkata krishnan Sowrirajan (Jira)
Venkata krishnan Sowrirajan created SPARK-36810:
---

 Summary: Handle HDSF read inconsistencies on Spark when observer 
Namenode is used
 Key: SPARK-36810
 URL: https://issues.apache.org/jira/browse/SPARK-36810
 Project: Spark
  Issue Type: Bug
  Components: Spark Core, SQL
Affects Versions: 3.2.0
Reporter: Venkata krishnan Sowrirajan


In short, with HDFS HA and with the use of [Observer 
Namenode|[https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/ObserverNameNode.html],]
 the read-after-write consistency is only available when both the write and the 
read happens from the same client.

But if the write happens on executor and the read happens on the driver, then 
the reads would be inconsistent causing application failure issues. This can be 
fixed by calling `FileSystem.msync` before making any read calls where the 
client thinks the write could have possibly happened elsewhere.

This issue is discussed in greater detail in this 
[discussion|https://mail-archives.apache.org/mod_mbox/spark-dev/202108.mbox/browser]
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-36558) Stage has all tasks finished but with ongoing finalization can cause job hang

2021-08-23 Thread Venkata krishnan Sowrirajan (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-36558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17403508#comment-17403508
 ] 

Venkata krishnan Sowrirajan commented on SPARK-36558:
-

One thing I missed here is not setting the newly constructed `DAGScheduler` 
into the SparkContext. Let me try that and update here.

> Stage has all tasks finished but with ongoing finalization can cause job hang
> -
>
> Key: SPARK-36558
> URL: https://issues.apache.org/jira/browse/SPARK-36558
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core
>Affects Versions: 3.2.0, 3.3.0
>Reporter: wuyi
>Priority: Blocker
>
>  
> For a stage that all tasks are finished but with ongoing finalization can 
> lead to job hang. The problem is that such stage is considered as a "missing" 
> stage (see 
> [https://github.com/apache/spark/blob/a47ceaf5492040063e31e17570678dc06846c36c/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L719-L721).]
>  And it breaks the original assumption that a "missing" stage must have tasks 
> to run. 
> Normally, if stage A is the parent of (result) stage B and all tasks have 
> finished in stage A, stage A will be skipped directly when submitting stage 
> B. However, with this bug, stage A will be submitted. And submitting a stage 
> with no tasks to run would not be able to add its child stage into the 
> waiting stage list, which leads to the job hang in the end.
>  
> The example to reproduce:
> First, change `MyRDD` to allow it to compute:
> {code:java}
> override def compute(split: Partition, context: TaskContext): Iterator[(Int, 
> Int)] = {
>    Iterator.single((1, 1))
>  }{code}
>  Then run this test:
> {code:java}
> test("Job hang") {
>   initPushBasedShuffleConfs(conf)
>   conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 5)
>   DAGSchedulerSuite.clearMergerLocs
>   DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", 
> "host5"))
>   val latch = new CountDownLatch(1)
>   val myDAGScheduler = new MyDAGScheduler(
> sc,
> sc.dagScheduler.taskScheduler,
> sc.listenerBus,
> sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
> sc.env.blockManager.master,
> sc.env) {
> override def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): Unit = 
> {
>   // By this, we can mimic a stage with all tasks finished
>   // but finalization is incomplete.
>   latch.countDown()
> }
>   }
>   sc.dagScheduler = myDAGScheduler
>   sc.taskScheduler.setDAGScheduler(myDAGScheduler)
>   val parts = 20
>   val shuffleMapRdd = new MyRDD(sc, parts, Nil)
>   val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
> HashPartitioner(parts))
>   val reduceRdd1 = new MyRDD(sc, parts, List(shuffleDep), tracker = 
> mapOutputTracker)
>   reduceRdd1.countAsync()
>   latch.await()
>   // scalastyle:off
>   println("=after wait==")
>   // set _shuffleMergedFinalized to true can avoid the hang.
>   // shuffleDep._shuffleMergedFinalized = true
>   val reduceRdd2 = new MyRDD(sc, parts, List(shuffleDep))
>   reduceRdd2.count()
> }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-36558) Stage has all tasks finished but with ongoing finalization can cause job hang

2021-08-23 Thread Venkata krishnan Sowrirajan (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-36558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17403507#comment-17403507
 ] 

Venkata krishnan Sowrirajan commented on SPARK-36558:
-

[~Ngone51] I am not sure if you can run the tests that way, because then the 
`DAGSchedulerSuite` custom `DAGScheduler` or `DAGSchedulerEventLoopTester` 
won't be used therefore it submits the job with the default `DAGScheduler` and 
waits in the `finalizeShuffleMerge` which cannot complete because of our mocked 
up merger locations. Btw below is the test code along with the above changes 
you mentioned added to `MyRDD`. Am I missing something?
{code:java}
 test("Job hang") {
initPushBasedShuffleConfs(conf)
conf.set("spark.shuffle.push.mergerLocations.minThreshold", "5")
DAGSchedulerSuite.clearMergerLocs
DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", 
"host5"))
val latch = new CountDownLatch(1)
scheduler = new MyDAGScheduler(
  sc,
  taskScheduler,
  sc.listenerBus,
  mapOutputTracker,
  blockManagerMaster,
  sc.env) {
  override private[spark] def scheduleShuffleMergeFinalize(
  stage: ShuffleMapStage): Unit = {
// By this, we can mimic a stage with all tasks finished
// but finalization is incomplete.
latch.countDown()
super.scheduleShuffleMergeFinalize(stage)
  }
}

dagEventProcessLoopTester = new 
DAGSchedulerEventProcessLoopTester(scheduler)
val parts = 5
val shuffleMapRdd = new MyRDD(sc, parts, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(parts))
val reduceRdd1 = new MyRDD(sc, parts, List(shuffleDep), tracker = 
mapOutputTracker)
reduceRdd1.count()
latch.await()
// scalastyle:off
println("=after wait==")
// set _shuffleMergedFinalized to true can avoid the hang.
val reduceRdd2 = new MyRDD(sc, parts, List(shuffleDep))
reduceRdd2.count()
  }
{code}

> Stage has all tasks finished but with ongoing finalization can cause job hang
> -
>
> Key: SPARK-36558
> URL: https://issues.apache.org/jira/browse/SPARK-36558
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core
>Affects Versions: 3.2.0, 3.3.0
>Reporter: wuyi
>Priority: Blocker
>
>  
> For a stage that all tasks are finished but with ongoing finalization can 
> lead to job hang. The problem is that such stage is considered as a "missing" 
> stage (see 
> [https://github.com/apache/spark/blob/a47ceaf5492040063e31e17570678dc06846c36c/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L719-L721).]
>  And it breaks the original assumption that a "missing" stage must have tasks 
> to run. 
> Normally, if stage A is the parent of (result) stage B and all tasks have 
> finished in stage A, stage A will be skipped directly when submitting stage 
> B. However, with this bug, stage A will be submitted. And submitting a stage 
> with no tasks to run would not be able to add its child stage into the 
> waiting stage list, which leads to the job hang in the end.
>  
> The example to reproduce:
> First, change `MyRDD` to allow it to compute:
> {code:java}
> override def compute(split: Partition, context: TaskContext): Iterator[(Int, 
> Int)] = {
>    Iterator.single((1, 1))
>  }{code}
>  Then run this test:
> {code:java}
> test("Job hang") {
>   initPushBasedShuffleConfs(conf)
>   conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 5)
>   DAGSchedulerSuite.clearMergerLocs
>   DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", 
> "host5"))
>   val latch = new CountDownLatch(1)
>   val myDAGScheduler = new MyDAGScheduler(
> sc,
> sc.dagScheduler.taskScheduler,
> sc.listenerBus,
> sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
> sc.env.blockManager.master,
> sc.env) {
> override def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): Unit = 
> {
>   // By this, we can mimic a stage with all tasks finished
>   // but finalization is incomplete.
>   latch.countDown()
> }
>   }
>   sc.dagScheduler = myDAGScheduler
>   sc.taskScheduler.setDAGScheduler(myDAGScheduler)
>   val parts = 20
>   val shuffleMapRdd = new MyRDD(sc, parts, Nil)
>   val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
> HashPartitioner(parts))
>   val reduceRdd1 = new MyRDD(sc, parts, List(shuffleDep), tracker = 
> mapOutputTracker)
>   reduceRdd1.countAsync()
>   latch.await()
>   // scalastyle:off
>   println("=after wait==")
>   // set _shuffleMergedFinalized to true can avoid the hang.
>   // shuffleDep._shuffleMergedFinalized = true
>   val reduceRdd2 = new MyRDD(sc, parts, List(shuffleDep))
>   

[jira] [Commented] (SPARK-36558) Stage has all tasks finished but with ongoing finalization can cause job hang

2021-08-23 Thread Venkata krishnan Sowrirajan (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-36558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17403404#comment-17403404
 ] 

Venkata krishnan Sowrirajan commented on SPARK-36558:
-

[~Ngone51] I tried the above test which you shared but I needed to make few 
changes to basically have shuffle merge enabled properly, without that what the 
`scheduleShuffleMergeFinalize` won't get invoked therefore it can infinitely 
wait with the CountDownLatch. Please take a look at the modified test below.


1. Keep `numMergerLocs` equal to `numParts` for the stage to have 
shuffleMergeEnabled

2. Also use `submit` option inside `DAGSchedulerSuite` to run an action on an 
RDD.

Since the 'runningStages` set would have the stage currently in merge 
finalization (but all the partitions are available) step therefore the same 
stage won't get submitted with empty partitions to be computed.

This check 
[here|[https://github.com/apache/spark/blob/a47ceaf5492040063e31e17570678dc06846c36c/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1253]]
 would prevent the same stage getting submitted again. Also below is the fixed 
unit test and this seems to run fine.
{code:java}
  test("Job hang") {
initPushBasedShuffleConfs(conf)
conf.set("spark.shuffle.push.mergerLocations.minThreshold", "5")
DAGSchedulerSuite.clearMergerLocs
DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", 
"host5"))
val latch = new CountDownLatch(1)
scheduler = new MyDAGScheduler(
  sc,
  taskScheduler,
  sc.listenerBus,
  mapOutputTracker,
  blockManagerMaster,
  sc.env) {
  override private[spark] def scheduleShuffleMergeFinalize(
  stage: ShuffleMapStage): Unit = {
// By this, we can mimic a stage with all tasks finished
// but finalization is incomplete.
latch.countDown()
// super.scheduleShuffleMergeFinalize(stage)
  }
}

dagEventProcessLoopTester = new 
DAGSchedulerEventProcessLoopTester(scheduler)
val parts = 5
val shuffleMapRdd = new MyRDD(sc, parts, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(parts))
val reduceRdd1 = new MyRDD(sc, parts, List(shuffleDep), tracker = 
mapOutputTracker)
submit(reduceRdd1, (0 until parts).toArray)
completeShuffleMapStageSuccessfully(0, 0, parts)
completeNextResultStageWithSuccess(1, 0)
latch.await()
// set _shuffleMergedFinalized to true can avoid the hang.
val reduceRdd2 = new MyRDD(sc, parts, List(shuffleDep))
submit(reduceRdd2, (0 until parts).toArray)
completeNextResultStageWithSuccess(3, 0)
  }
{code}

Let me know your thoughts. Am I missing something here? cc [~mshen] [~mridulm80]

> Stage has all tasks finished but with ongoing finalization can cause job hang
> -
>
> Key: SPARK-36558
> URL: https://issues.apache.org/jira/browse/SPARK-36558
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core
>Affects Versions: 3.2.0, 3.3.0
>Reporter: wuyi
>Priority: Blocker
>
>  
> For a stage that all tasks are finished but with ongoing finalization can 
> lead to job hang. The problem is that such stage is considered as a "missing" 
> stage (see 
> [https://github.com/apache/spark/blob/a47ceaf5492040063e31e17570678dc06846c36c/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L719-L721).]
>  And it breaks the original assumption that a "missing" stage must have tasks 
> to run. 
> Normally, if stage A is the parent of (result) stage B and all tasks have 
> finished in stage A, stage A will be skipped directly when submitting stage 
> B. However, with this bug, stage A will be submitted. And submitting a stage 
> with no tasks to run would not be able to add its child stage into the 
> waiting stage list, which leads to the job hang in the end.
>  
> The example to reproduce:
> {code:java}
> test("Job hang") {
>   initPushBasedShuffleConfs(conf)
>   conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 5)
>   DAGSchedulerSuite.clearMergerLocs
>   DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", 
> "host5"))
>   val latch = new CountDownLatch(1)
>   val myDAGScheduler = new MyDAGScheduler(
> sc,
> sc.dagScheduler.taskScheduler,
> sc.listenerBus,
> sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
> sc.env.blockManager.master,
> sc.env) {
> override def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): Unit = 
> {
>   // By this, we can mimic a stage with all tasks finished
>   // but finalization is incomplete.
>   latch.countDown()
> }
>   }
>   sc.dagScheduler = myDAGScheduler
>   sc.taskScheduler.setDAGScheduler(myDAGScheduler)
>   val parts = 

[jira] [Resolved] (SPARK-36484) Handle Stale block fetch failure on the client side by not retrying the requests

2021-08-16 Thread Venkata krishnan Sowrirajan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36484?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Venkata krishnan Sowrirajan resolved SPARK-36484.
-
Resolution: Won't Fix

Thinking more about it, it doesn't seem to add much of value in terms of 
avoiding retrying the requests as the server anyway don't do any processing for 
these stale fetch requests. Since there isn't any reasonable perf improvement, 
deciding to close this out.

> Handle Stale block fetch failure on the client side by not retrying the 
> requests
> 
>
> Key: SPARK-36484
> URL: https://issues.apache.org/jira/browse/SPARK-36484
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core
>Affects Versions: 3.2.0
>Reporter: Venkata krishnan Sowrirajan
>Priority: Minor
>
> Similar to SPARK-36378, we need to handle the stale block fetch failures on 
> the client side, although without handling it there won't be any correctness 
> issues. This would help in saving some server side bandwidth unnecessarily 
> serving stale shuffle fetch requests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-36484) Handle Stale block fetch failure on the client side by not retrying the requests

2021-08-11 Thread Venkata krishnan Sowrirajan (Jira)
Venkata krishnan Sowrirajan created SPARK-36484:
---

 Summary: Handle Stale block fetch failure on the client side by 
not retrying the requests
 Key: SPARK-36484
 URL: https://issues.apache.org/jira/browse/SPARK-36484
 Project: Spark
  Issue Type: Sub-task
  Components: Spark Core
Affects Versions: 3.2.0
Reporter: Venkata krishnan Sowrirajan


Similar to SPARK-36378, we need to handle the stale block fetch failures on the 
client side, although without handling it there won't be any correctness 
issues. This would help in saving some server side bandwidth unnecessarily 
serving stale shuffle fetch requests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-36460) Pull out NoOpMergedShuffleFileManager inner class outside

2021-08-09 Thread Venkata krishnan Sowrirajan (Jira)
Venkata krishnan Sowrirajan created SPARK-36460:
---

 Summary: Pull out NoOpMergedShuffleFileManager inner class outside
 Key: SPARK-36460
 URL: https://issues.apache.org/jira/browse/SPARK-36460
 Project: Spark
  Issue Type: Sub-task
  Components: Shuffle, Spark Core
Affects Versions: 3.2.0
Reporter: Venkata krishnan Sowrirajan






--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-36374) Push-based shuffle documentation

2021-08-01 Thread Venkata krishnan Sowrirajan (Jira)
Venkata krishnan Sowrirajan created SPARK-36374:
---

 Summary: Push-based shuffle documentation
 Key: SPARK-36374
 URL: https://issues.apache.org/jira/browse/SPARK-36374
 Project: Spark
  Issue Type: Sub-task
  Components: Documentation
Affects Versions: 3.2.0
Reporter: Venkata krishnan Sowrirajan






--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-36332) Cleanup RemoteBlockPushResolver log messages

2021-07-28 Thread Venkata krishnan Sowrirajan (Jira)
Venkata krishnan Sowrirajan created SPARK-36332:
---

 Summary: Cleanup RemoteBlockPushResolver log messages
 Key: SPARK-36332
 URL: https://issues.apache.org/jira/browse/SPARK-36332
 Project: Spark
  Issue Type: Sub-task
  Components: Shuffle, Spark Core
Affects Versions: 3.2.0
Reporter: Venkata krishnan Sowrirajan


Minor cleanups to RemoteBlockPushResolver to use AppShufflePartitionsInfo 
toString() for log messages.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-36317) PruneFileSourcePartitionsSuite tests are failing after the fix to SPARK-36136

2021-07-27 Thread Venkata krishnan Sowrirajan (Jira)
Venkata krishnan Sowrirajan created SPARK-36317:
---

 Summary: PruneFileSourcePartitionsSuite tests are failing after 
the fix to SPARK-36136
 Key: SPARK-36317
 URL: https://issues.apache.org/jira/browse/SPARK-36317
 Project: Spark
  Issue Type: Test
  Components: SQL
Affects Versions: 3.2.0
Reporter: Venkata krishnan Sowrirajan


After the fix to [SPARK-36136][SQL][TESTS] Refactor 
PruneFileSourcePartitionsSuite etc to a different package, couple of tests in 
PruneFileSourcePartitionsSuite are failing now.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-36038) Basic speculation metrics at stage level

2021-07-07 Thread Venkata krishnan Sowrirajan (Jira)
Venkata krishnan Sowrirajan created SPARK-36038:
---

 Summary: Basic speculation metrics at stage level
 Key: SPARK-36038
 URL: https://issues.apache.org/jira/browse/SPARK-36038
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 3.1.2
Reporter: Venkata krishnan Sowrirajan


Currently there are no speculation metrics available either at application 
level or at stage level. With in our platform, we have added speculation 
metrics at stage level as a summary similarly to the stage level metrics 
tracking numTotalSpeculated, numCompleted (successful), numFailed, numKilled 
etc. This enables us to effectively understand speculative execution feature at 
an application level and helps in further tuning the speculation configs.

cc [~ron8hu]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-35613) Cache commonly occurring strings from SQLMetrics, JsonProtocol and AccumulatorV2

2021-06-02 Thread Venkata krishnan Sowrirajan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-35613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Venkata krishnan Sowrirajan updated SPARK-35613:

Summary: Cache commonly occurring strings from SQLMetrics, JsonProtocol and 
AccumulatorV2  (was: Cache commonly occurring strings from SQLMetrics and in 
JsonProtocol)

> Cache commonly occurring strings from SQLMetrics, JsonProtocol and 
> AccumulatorV2
> 
>
> Key: SPARK-35613
> URL: https://issues.apache.org/jira/browse/SPARK-35613
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, SQL
>Affects Versions: 3.2.0
>Reporter: Venkata krishnan Sowrirajan
>Priority: Major
>
> SQLMetrics allows creation of different metrics like sizing, timing metrics 
> etc. some of those names for the metric can be duplicated and along with that 
> the `Some` wrapper objects adds additional memory overhead. In our internal 
> platform, this has caused huge memory usage on driver causing Full GC to kick 
> in every so often.
> cc [~mridulm80]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-35613) Cache commonly occurring strings from SQLMetrics and in JsonProtocol

2021-06-02 Thread Venkata krishnan Sowrirajan (Jira)
Venkata krishnan Sowrirajan created SPARK-35613:
---

 Summary: Cache commonly occurring strings from SQLMetrics and in 
JsonProtocol
 Key: SPARK-35613
 URL: https://issues.apache.org/jira/browse/SPARK-35613
 Project: Spark
  Issue Type: Bug
  Components: Spark Core, SQL
Affects Versions: 3.2.0
Reporter: Venkata krishnan Sowrirajan


SQLMetrics allows creation of different metrics like sizing, timing metrics 
etc. some of those names for the metric can be duplicated and along with that 
the `Some` wrapper objects adds additional memory overhead. In our internal 
platform, this has caused huge memory usage on driver causing Full GC to kick 
in every so often.

cc [~mridulm80]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-35549) Register merge status even after shuffle dependency is merge finalized

2021-05-27 Thread Venkata krishnan Sowrirajan (Jira)
Venkata krishnan Sowrirajan created SPARK-35549:
---

 Summary: Register merge status even after shuffle dependency is 
merge finalized
 Key: SPARK-35549
 URL: https://issues.apache.org/jira/browse/SPARK-35549
 Project: Spark
  Issue Type: Sub-task
  Components: Shuffle, Spark Core
Affects Versions: 3.1.0
Reporter: Venkata krishnan Sowrirajan


Currently the merge statuses which arrive late from the external shuffle 
services (or shuffle mergers) won't get registered once the shuffle dependency 
merge is finalized.
This needs to be carefully done as there are lot of corner cases like:
a) executor/node loss causing re-computation due to fetch failure and if the 
merge statuses gets registered very late then that can cause inconsistencies.
b) similar such cases

cc [~mridulm80] [~mshen]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-35547) Support push based shuffle when barrier scheduling is enabled

2021-05-27 Thread Venkata krishnan Sowrirajan (Jira)
Venkata krishnan Sowrirajan created SPARK-35547:
---

 Summary: Support push based shuffle when barrier scheduling is 
enabled
 Key: SPARK-35547
 URL: https://issues.apache.org/jira/browse/SPARK-35547
 Project: Spark
  Issue Type: Sub-task
  Components: Shuffle, Spark Core
Affects Versions: 3.1.0
Reporter: Venkata krishnan Sowrirajan


Currently push based shuffle is not enabled with barrier scheduling as it is 
not tested with barrier scheduling yet.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-35536) Cancel finalizing the shuffle merge if the stage is cancelled while waiting until shuffle merge finalize wait time.

2021-05-26 Thread Venkata krishnan Sowrirajan (Jira)
Venkata krishnan Sowrirajan created SPARK-35536:
---

 Summary: Cancel finalizing the shuffle merge if the stage is 
cancelled while waiting until shuffle merge finalize wait time.
 Key: SPARK-35536
 URL: https://issues.apache.org/jira/browse/SPARK-35536
 Project: Spark
  Issue Type: Sub-task
  Components: Shuffle, Spark Core
Affects Versions: 3.1.0
Reporter: Venkata krishnan Sowrirajan


Currently we are not handling the case of stage cancellation and if the stage 
is cancelled while the stage is about to get merge finalized.

Stage completion will wait for shuffle merge finalize wait time so that all the 
shuffle merge requests are drained but during this time the stage can be 
cancelled, if this happens then we need to let all the shuffle services to 
remove the merged shuffle data for the corresponding shuffle ID. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-35036) Improve push based shuffle to work with AQE by fetching partial map indexes for a reduce partition

2021-04-12 Thread Venkata krishnan Sowrirajan (Jira)
Venkata krishnan Sowrirajan created SPARK-35036:
---

 Summary: Improve push based shuffle to work with AQE by fetching 
partial map indexes for a reduce partition
 Key: SPARK-35036
 URL: https://issues.apache.org/jira/browse/SPARK-35036
 Project: Spark
  Issue Type: Sub-task
  Components: Spark Core
Affects Versions: 3.1.1
Reporter: Venkata krishnan Sowrirajan


Currently when both Push based shuffle and AQE is enabled and when partial set 
of map indexes are requested to MapOutputTracker this is delegated the regular 
shuffle instead of push based shuffle reading map blocks. This is because 
blocks from mapper in push based shuffle are merged out of order due to which 
its hard to only get the matching blocks of the reduce partition for the 
requested start and end map indexes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-34826) Adaptive fetch of shuffle mergers for Push based shuffle

2021-03-22 Thread Venkata krishnan Sowrirajan (Jira)
Venkata krishnan Sowrirajan created SPARK-34826:
---

 Summary: Adaptive fetch of shuffle mergers for Push based shuffle
 Key: SPARK-34826
 URL: https://issues.apache.org/jira/browse/SPARK-34826
 Project: Spark
  Issue Type: Sub-task
  Components: Shuffle, Spark Core
Affects Versions: 3.1.0
Reporter: Venkata krishnan Sowrirajan


Currently the shuffle mergers are set during the creation of ShuffleMapStage. 
In the initial set of stages, there won't be enough executors added which can 
cause not enough shuffle mergers to be set during the creation of the shuffle 
map stage. This task is to handle the issue of low merge ratio for initial 
stages.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-33778) Allow typesafe join for LeftSemi and LeftAnti

2020-12-15 Thread Venkata krishnan Sowrirajan (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-33778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17249873#comment-17249873
 ] 

Venkata krishnan Sowrirajan commented on SPARK-33778:
-

Thanks [~angerszhuuu]

> Allow typesafe join for LeftSemi and LeftAnti
> -
>
> Key: SPARK-33778
> URL: https://issues.apache.org/jira/browse/SPARK-33778
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.1
>Reporter: Venkata krishnan Sowrirajan
>Priority: Major
>
> With [SPARK-21333|https://issues.apache.org/jira/browse/SPARK-21333] change, 
> LeftSemi and LeftAnti no longer has a typesafe join API. It makes sense to 
> not support LeftSemi and LeftAnti as part of joinWith as it returns tuples 
> which includes values from both the datasets which is not possible in the 
> above joins. Neverthless, it would be nice to have a separate join API or in 
> the existing API to support LeftSemi and LeftAnti which returns Dataset.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-33778) Allow typesafe join for LeftSemi and LeftAnti

2020-12-14 Thread Venkata krishnan Sowrirajan (Jira)
Venkata krishnan Sowrirajan created SPARK-33778:
---

 Summary: Allow typesafe join for LeftSemi and LeftAnti
 Key: SPARK-33778
 URL: https://issues.apache.org/jira/browse/SPARK-33778
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.0.1
Reporter: Venkata krishnan Sowrirajan


With [SPARK-21333|https://issues.apache.org/jira/browse/SPARK-21333] change, 
LeftSemi and LeftAnti no longer has a typesafe join API. It makes sense to not 
support LeftSemi and LeftAnti as part of joinWith as it returns tuples which 
includes values from both the datasets which is not possible in the above 
joins. Neverthless, it would be nice to have a separate join API or in the 
existing API to support LeftSemi and LeftAnti which returns Dataset.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-33481) Better heuristics to compute number of shuffle mergers required for a ShuffleMapStage

2020-11-18 Thread Venkata krishnan Sowrirajan (Jira)
Venkata krishnan Sowrirajan created SPARK-33481:
---

 Summary: Better heuristics to compute number of shuffle mergers 
required for a ShuffleMapStage
 Key: SPARK-33481
 URL: https://issues.apache.org/jira/browse/SPARK-33481
 Project: Spark
  Issue Type: Sub-task
  Components: Shuffle, Spark Core
Affects Versions: 3.1.0
Reporter: Venkata krishnan Sowrirajan


Currently we have a naive way of computing the number of shuffle mergers 
required for a ShuffleMapStage by considering the resulting number of 
partitions. This needs to be further brainstormed and enhanced.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-33329) Pluggable API to fetch shuffle merger locations with Push based shuffle

2020-11-03 Thread Venkata krishnan Sowrirajan (Jira)
Venkata krishnan Sowrirajan created SPARK-33329:
---

 Summary: Pluggable API to fetch shuffle merger locations with Push 
based shuffle
 Key: SPARK-33329
 URL: https://issues.apache.org/jira/browse/SPARK-33329
 Project: Spark
  Issue Type: Sub-task
  Components: Shuffle, Spark Core
Affects Versions: 3.1.0
Reporter: Venkata krishnan Sowrirajan


Possibly extend ShuffleDriverComponents to add a separate API to fetch shuffle 
merger locations with Push based shuffle. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-32596) Clear Ivy resolution files as part of the finally block

2020-08-11 Thread Venkata krishnan Sowrirajan (Jira)
Venkata krishnan Sowrirajan created SPARK-32596:
---

 Summary: Clear Ivy resolution files as part of the finally block
 Key: SPARK-32596
 URL: https://issues.apache.org/jira/browse/SPARK-32596
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 3.0.0
Reporter: Venkata krishnan Sowrirajan


Clear ivy resolution files as part of the finally block in SparkSubmit without 
which failures while resolving packages can leave resolution files around.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-31418) Blacklisting feature aborts Spark job without retrying for max num retries in case of Dynamic allocation

2020-04-13 Thread Venkata krishnan Sowrirajan (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17082731#comment-17082731
 ] 

Venkata krishnan Sowrirajan commented on SPARK-31418:
-

[~tgraves] Currently, I'm thinking we can check if dynamic allocation is 
enabled if so we can request for one more executor using 
ExecutorAllocationClient#requestExecutors and start the abort timer. But I 
re-read your 
[comment|https://issues.apache.org/jira/browse/SPARK-22148?focusedCommentId=17078278=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17078278]
 again and it seems like you tried to pass the information to 
ExecutorAllocationManager and request the executor through 
ExecutorAllocationManager. Is that right?

Regarding, kill other non idle blacklisted executor idea, I don't think that 
would be better as we might kill tasks from other stages like mentioned in 
other comments from the PR. Let me know if you have any other thoughts on this 
problem. But we are facing this issue more frequently although retrying the 
whole job will pass but it happens frequently.

> Blacklisting feature aborts Spark job without retrying for max num retries in 
> case of Dynamic allocation
> 
>
> Key: SPARK-31418
> URL: https://issues.apache.org/jira/browse/SPARK-31418
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0, 2.4.5
>Reporter: Venkata krishnan Sowrirajan
>Priority: Major
>
> With Spark blacklisting, if a task fails on an executor, the executor gets 
> blacklisted for the task. In order to retry the task, it checks if there are 
> idle blacklisted executor which can be killed and replaced to retry the task 
> if not it aborts the job without doing max retries.
> In the context of dynamic allocation this can be better, instead of killing 
> the blacklisted idle executor (its possible there are no idle blacklisted 
> executor), request an additional executor and retry the task.
> This can be easily reproduced with a simple job like below, although this 
> example should fail eventually just to show that its not retried 
> spark.task.maxFailures times: 
> {code:java}
> def test(a: Int) = { a.asInstanceOf[String] }
> sc.parallelize(1 to 10, 10).map(x => test(x)).collect 
> {code}
> with dynamic allocation enabled and min executors set to 1. But there are 
> various other cases where this can fail as well.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-31418) Blacklisting feature aborts Spark job without retrying for max num retries in case of Dynamic allocation

2020-04-10 Thread Venkata krishnan Sowrirajan (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17080942#comment-17080942
 ] 

Venkata krishnan Sowrirajan commented on SPARK-31418:
-

Currently, I'm working on this issue.

> Blacklisting feature aborts Spark job without retrying for max num retries in 
> case of Dynamic allocation
> 
>
> Key: SPARK-31418
> URL: https://issues.apache.org/jira/browse/SPARK-31418
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0, 2.4.5
>Reporter: Venkata krishnan Sowrirajan
>Priority: Major
>
> With Spark blacklisting, if a task fails on an executor, the executor gets 
> blacklisted for the task. In order to retry the task, it checks if there are 
> idle blacklisted executor which can be killed and replaced to retry the task 
> if not it aborts the job without doing max retries.
> In the context of dynamic allocation this can be better, instead of killing 
> the blacklisted idle executor (its possible there are no idle blacklisted 
> executor), request an additional executor and retry the task.
> This can be easily reproduced with a simple job like below, although this 
> example should fail eventually just to show that its not retried 
> spark.task.maxFailures times: 
> {code:java}
> def test(a: Int) = { a.asInstanceOf[String] }
> sc.parallelize(1 to 10, 10).map(x => test(x)).collect 
> {code}
> with dynamic allocation enabled and min executors set to 1. But there are 
> various other cases where this can fail as well.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-31418) Blacklisting feature aborts Spark job without retrying for max num retries in case of Dynamic allocation

2020-04-10 Thread Venkata krishnan Sowrirajan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-31418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Venkata krishnan Sowrirajan updated SPARK-31418:

Description: 
With Spark blacklisting, if a task fails on an executor, the executor gets 
blacklisted for the task. In order to retry the task, it checks if there are 
idle blacklisted executor which can be killed and replaced to retry the task if 
not it aborts the job without doing max retries.

In the context of dynamic allocation this can be better, instead of killing the 
blacklisted idle executor (its possible there are no idle blacklisted 
executor), request an additional executor and retry the task.

This can be easily reproduced with a simple job like below, although this 
example should fail eventually just to show that its not retried 
spark.task.maxFailures times: 

{code:java}
def test(a: Int) = { a.asInstanceOf[String] }
sc.parallelize(1 to 10, 10).map(x => test(x)).collect 
{code}

with dynamic allocation enabled and min executors set to 1. But there are 
various other cases where this can fail as well.

  was:
With Spark blacklisting, if a task fails on an executor, the executor gets 
blacklisted for the task. In order to retry the task, it checks if there are 
idle blacklisted executor which can be killed and replaced to retry the task if 
not it aborts the job without doing max retries.

In the context of dynamic allocation this can be better, instead of killing the 
blacklisted idle executor (its possible there are no idle blacklisted 
executor), request an additional executor and retry the task.

This can be easily reproduced with a simple job like below: 

{code:java}
def test(a: Int) = { a.asInstanceOf[String] }
sc.parallelize(1 to 10, 10).map(x => test(x)).collect 
{code}

with dynamic allocation enabled and min executors set to 1. But there are 
various other cases where this can fail as well.


> Blacklisting feature aborts Spark job without retrying for max num retries in 
> case of Dynamic allocation
> 
>
> Key: SPARK-31418
> URL: https://issues.apache.org/jira/browse/SPARK-31418
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0, 2.4.5
>Reporter: Venkata krishnan Sowrirajan
>Priority: Major
>
> With Spark blacklisting, if a task fails on an executor, the executor gets 
> blacklisted for the task. In order to retry the task, it checks if there are 
> idle blacklisted executor which can be killed and replaced to retry the task 
> if not it aborts the job without doing max retries.
> In the context of dynamic allocation this can be better, instead of killing 
> the blacklisted idle executor (its possible there are no idle blacklisted 
> executor), request an additional executor and retry the task.
> This can be easily reproduced with a simple job like below, although this 
> example should fail eventually just to show that its not retried 
> spark.task.maxFailures times: 
> {code:java}
> def test(a: Int) = { a.asInstanceOf[String] }
> sc.parallelize(1 to 10, 10).map(x => test(x)).collect 
> {code}
> with dynamic allocation enabled and min executors set to 1. But there are 
> various other cases where this can fail as well.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-31418) Blacklisting feature aborts Spark job without retrying for max num retries in case of Dynamic allocation

2020-04-10 Thread Venkata krishnan Sowrirajan (Jira)
Venkata krishnan Sowrirajan created SPARK-31418:
---

 Summary: Blacklisting feature aborts Spark job without retrying 
for max num retries in case of Dynamic allocation
 Key: SPARK-31418
 URL: https://issues.apache.org/jira/browse/SPARK-31418
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.4.5, 2.3.0
Reporter: Venkata krishnan Sowrirajan


With Spark blacklisting, if a task fails on an executor, the executor gets 
blacklisted for the task. In order to retry the task, it checks if there are 
idle blacklisted executor which can be killed and replaced to retry the task if 
not it aborts the job without doing max retries.

In the context of dynamic allocation this can be better, instead of killing the 
blacklisted idle executor (its possible there are no idle blacklisted 
executor), request an additional executor and retry the task.

This can be easily reproduced with a simple job like below: 

{code:java}
def test(a: Int) = { a.asInstanceOf[String] }
sc.parallelize(1 to 10, 10).map(x => test(x)).collect 
{code}

with dynamic allocation enabled and min executors set to 1. But there are 
various other cases where this can fail as well.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22148) TaskSetManager.abortIfCompletelyBlacklisted should not abort when all current executors are blacklisted but dynamic allocation is enabled

2020-04-08 Thread Venkata krishnan Sowrirajan (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-22148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17078520#comment-17078520
 ] 

Venkata krishnan Sowrirajan commented on SPARK-22148:
-

Thanks for your comments [~tgraves] Makes sense, I will think about it more, 
create a new JIRA and share a new proposal based on how we think about it 
internally.

> TaskSetManager.abortIfCompletelyBlacklisted should not abort when all current 
> executors are blacklisted but dynamic allocation is enabled
> -
>
> Key: SPARK-22148
> URL: https://issues.apache.org/jira/browse/SPARK-22148
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler, Spark Core
>Affects Versions: 2.2.0
>Reporter: Juan Rodríguez Hortalá
>Assignee: Dhruve Ashar
>Priority: Major
> Fix For: 2.4.1, 3.0.0
>
> Attachments: SPARK-22148_WIP.diff
>
>
> Currently TaskSetManager.abortIfCompletelyBlacklisted aborts the TaskSet and 
> the whole Spark job with `task X (partition Y) cannot run anywhere due to 
> node and executor blacklist. Blacklisting behavior can be configured via 
> spark.blacklist.*.` when all the available executors are blacklisted for a 
> pending Task or TaskSet. This makes sense for static allocation, where the 
> set of executors is fixed for the duration of the application, but this might 
> lead to unnecessary job failures when dynamic allocation is enabled. For 
> example, in a Spark application with a single job at a time, when a node 
> fails at the end of a stage attempt, all other executors will complete their 
> tasks, but the tasks running in the executors of the failing node will be 
> pending. Spark will keep waiting for those tasks for 2 minutes by default 
> (spark.network.timeout) until the heartbeat timeout is triggered, and then it 
> will blacklist those executors for that stage. At that point in time, other 
> executors would had been released after being idle for 1 minute by default 
> (spark.dynamicAllocation.executorIdleTimeout), because the next stage hasn't 
> started yet and so there are no more tasks available (assuming the default of 
> spark.speculation = false). So Spark will fail because the only executors 
> available are blacklisted for that stage. 
> An alternative is requesting more executors to the cluster manager in this 
> situation. This could be retried a configurable number of times after a 
> configurable wait time between request attempts, so if the cluster manager 
> fails to provide a suitable executor then the job is aborted like in the 
> previous case. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-22148) TaskSetManager.abortIfCompletelyBlacklisted should not abort when all current executors are blacklisted but dynamic allocation is enabled

2020-04-07 Thread Venkata krishnan Sowrirajan (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-22148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17077674#comment-17077674
 ] 

Venkata krishnan Sowrirajan edited comment on SPARK-22148 at 4/8/20, 12:04 AM:
---

Thanks for responding [~tgraves]. Thats right. 

Lets say all the executors are busy with some task and one of the task fails, 
then we are aborting the stage as there is no idle blacklisted executor 
available to kill and replace. But with dynamic allocation enabled, we could 
have requested for more executors and retried the task.

Infact, I can reproduce this with min executors set to 1 and max to some 
number. In this case, it wouldn't scale up immediately and the first task fails 
the whole stage because the only executor available is blacklisted for the task 
and also busy running other task at that time.

// Though this example would fail as casting an int to string is not valid. 
Just for example purposes.

{code:java}
def test(a: Int) = { a.asInstanceOf[String] }
sc.parallelize(1 to 10, 10).map(x => test(x)).collect 
{code}


Although if there are more executors, then its retried. Similar other cases are 
possible 


was (Author: vsowrirajan):
Thanks for responding [~tgraves]. Thats right. 

Lets say all the executors are busy with some task and one of the task fails, 
then we are aborting the stage as there is no idle blacklisted executor 
available to kill and replace. But with dynamic allocation enabled, we could 
have requested for more executors and retried the task.

Infact, I can reproduce this with min executors set to 1 and max to some 
number. In this case, it wouldn't scale up immediately and the first task fails 
the whole stage because the only executor available is blacklisted for the task 
and also busy running other task at that time.

// Though this example would fail as casting an int to string is not valid. 
Just for example purposes.

{code:java}
def test(a: Int) = { a.asInstanceOf[String] }
sc.parallelize(1 to 10, 10).map(x => test(x)).collect 
{code}


Although if there are more executors, then its possibly retried. Similar other 
cases are possible 

> TaskSetManager.abortIfCompletelyBlacklisted should not abort when all current 
> executors are blacklisted but dynamic allocation is enabled
> -
>
> Key: SPARK-22148
> URL: https://issues.apache.org/jira/browse/SPARK-22148
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler, Spark Core
>Affects Versions: 2.2.0
>Reporter: Juan Rodríguez Hortalá
>Assignee: Dhruve Ashar
>Priority: Major
> Fix For: 2.4.1, 3.0.0
>
> Attachments: SPARK-22148_WIP.diff
>
>
> Currently TaskSetManager.abortIfCompletelyBlacklisted aborts the TaskSet and 
> the whole Spark job with `task X (partition Y) cannot run anywhere due to 
> node and executor blacklist. Blacklisting behavior can be configured via 
> spark.blacklist.*.` when all the available executors are blacklisted for a 
> pending Task or TaskSet. This makes sense for static allocation, where the 
> set of executors is fixed for the duration of the application, but this might 
> lead to unnecessary job failures when dynamic allocation is enabled. For 
> example, in a Spark application with a single job at a time, when a node 
> fails at the end of a stage attempt, all other executors will complete their 
> tasks, but the tasks running in the executors of the failing node will be 
> pending. Spark will keep waiting for those tasks for 2 minutes by default 
> (spark.network.timeout) until the heartbeat timeout is triggered, and then it 
> will blacklist those executors for that stage. At that point in time, other 
> executors would had been released after being idle for 1 minute by default 
> (spark.dynamicAllocation.executorIdleTimeout), because the next stage hasn't 
> started yet and so there are no more tasks available (assuming the default of 
> spark.speculation = false). So Spark will fail because the only executors 
> available are blacklisted for that stage. 
> An alternative is requesting more executors to the cluster manager in this 
> situation. This could be retried a configurable number of times after a 
> configurable wait time between request attempts, so if the cluster manager 
> fails to provide a suitable executor then the job is aborted like in the 
> previous case. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-22148) TaskSetManager.abortIfCompletelyBlacklisted should not abort when all current executors are blacklisted but dynamic allocation is enabled

2020-04-07 Thread Venkata krishnan Sowrirajan (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-22148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17077674#comment-17077674
 ] 

Venkata krishnan Sowrirajan edited comment on SPARK-22148 at 4/8/20, 12:04 AM:
---

Thanks for responding [~tgraves]. Thats right. 

Lets say all the executors are busy with some task and one of the task fails, 
then we are aborting the stage as there is no idle blacklisted executor 
available to kill and replace. But with dynamic allocation enabled, we could 
have requested for more executors and retried the task.

Infact, I can reproduce this with min executors set to 1 and max to some 
number. In this case, it wouldn't scale up immediately and the first task fails 
the whole stage because the only executor available is blacklisted for the task 
and also busy running other task at that time.

// Though this example would fail as casting an int to string is not valid. 
Just for example purposes.

{code:java}
def test(a: Int) = { a.asInstanceOf[String] }
sc.parallelize(1 to 10, 10).map(x => test(x)).collect 
{code}


Although if there are more executors, then its possibly retried. Similar other 
cases are possible 


was (Author: vsowrirajan):
Thanks for responding [~tgraves]. Thats right. 

Lets say all the executors are busy with some task and one of the task fails, 
then we are aborting the stage as there is no idle blacklisted executor 
available to kill and replace. But with dynamic allocation enabled, we could 
have requested for more executors and retried the task.

Infact, I can reproduce this with min executors set to 1 and max to some 
number. In this case, it wouldn't scale up immediately and the first task fails 
the whole stage because the only executor available is blacklisted for the task 
and also busy running other task at that time.

// Though this example would fail as casting an int to string is not valid. 
Just for example purposes.
def test(a: Int) = { a.asInstanceOf[String] }
sc.parallelize(1 to 10, 10).map(x => test(x)).collect 

Although if there are more executors, then its possibly retried. Similar other 
cases are possible 

> TaskSetManager.abortIfCompletelyBlacklisted should not abort when all current 
> executors are blacklisted but dynamic allocation is enabled
> -
>
> Key: SPARK-22148
> URL: https://issues.apache.org/jira/browse/SPARK-22148
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler, Spark Core
>Affects Versions: 2.2.0
>Reporter: Juan Rodríguez Hortalá
>Assignee: Dhruve Ashar
>Priority: Major
> Fix For: 2.4.1, 3.0.0
>
> Attachments: SPARK-22148_WIP.diff
>
>
> Currently TaskSetManager.abortIfCompletelyBlacklisted aborts the TaskSet and 
> the whole Spark job with `task X (partition Y) cannot run anywhere due to 
> node and executor blacklist. Blacklisting behavior can be configured via 
> spark.blacklist.*.` when all the available executors are blacklisted for a 
> pending Task or TaskSet. This makes sense for static allocation, where the 
> set of executors is fixed for the duration of the application, but this might 
> lead to unnecessary job failures when dynamic allocation is enabled. For 
> example, in a Spark application with a single job at a time, when a node 
> fails at the end of a stage attempt, all other executors will complete their 
> tasks, but the tasks running in the executors of the failing node will be 
> pending. Spark will keep waiting for those tasks for 2 minutes by default 
> (spark.network.timeout) until the heartbeat timeout is triggered, and then it 
> will blacklist those executors for that stage. At that point in time, other 
> executors would had been released after being idle for 1 minute by default 
> (spark.dynamicAllocation.executorIdleTimeout), because the next stage hasn't 
> started yet and so there are no more tasks available (assuming the default of 
> spark.speculation = false). So Spark will fail because the only executors 
> available are blacklisted for that stage. 
> An alternative is requesting more executors to the cluster manager in this 
> situation. This could be retried a configurable number of times after a 
> configurable wait time between request attempts, so if the cluster manager 
> fails to provide a suitable executor then the job is aborted like in the 
> previous case. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22148) TaskSetManager.abortIfCompletelyBlacklisted should not abort when all current executors are blacklisted but dynamic allocation is enabled

2020-04-07 Thread Venkata krishnan Sowrirajan (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-22148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17077674#comment-17077674
 ] 

Venkata krishnan Sowrirajan commented on SPARK-22148:
-

Thanks for responding [~tgraves]. Thats right. 

Lets say all the executors are busy with some task and one of the task fails, 
then we are aborting the stage as there is no idle blacklisted executor 
available to kill and replace. But with dynamic allocation enabled, we could 
have requested for more executors and retried the task.

Infact, I can reproduce this with min executors set to 1 and max to some 
number. In this case, it wouldn't scale up immediately and the first task fails 
the whole stage because the only executor available is blacklisted for the task 
and also busy running other task at that time.

// Though this example would fail as casting an int to string is not valid. 
Just for example purposes.
def test(a: Int) = { a.asInstanceOf[String] }
sc.parallelize(1 to 10, 10).map(x => test(x)).collect 

Although if there are more executors, then its possibly retried. Similar other 
cases are possible 

> TaskSetManager.abortIfCompletelyBlacklisted should not abort when all current 
> executors are blacklisted but dynamic allocation is enabled
> -
>
> Key: SPARK-22148
> URL: https://issues.apache.org/jira/browse/SPARK-22148
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler, Spark Core
>Affects Versions: 2.2.0
>Reporter: Juan Rodríguez Hortalá
>Assignee: Dhruve Ashar
>Priority: Major
> Fix For: 2.4.1, 3.0.0
>
> Attachments: SPARK-22148_WIP.diff
>
>
> Currently TaskSetManager.abortIfCompletelyBlacklisted aborts the TaskSet and 
> the whole Spark job with `task X (partition Y) cannot run anywhere due to 
> node and executor blacklist. Blacklisting behavior can be configured via 
> spark.blacklist.*.` when all the available executors are blacklisted for a 
> pending Task or TaskSet. This makes sense for static allocation, where the 
> set of executors is fixed for the duration of the application, but this might 
> lead to unnecessary job failures when dynamic allocation is enabled. For 
> example, in a Spark application with a single job at a time, when a node 
> fails at the end of a stage attempt, all other executors will complete their 
> tasks, but the tasks running in the executors of the failing node will be 
> pending. Spark will keep waiting for those tasks for 2 minutes by default 
> (spark.network.timeout) until the heartbeat timeout is triggered, and then it 
> will blacklist those executors for that stage. At that point in time, other 
> executors would had been released after being idle for 1 minute by default 
> (spark.dynamicAllocation.executorIdleTimeout), because the next stage hasn't 
> started yet and so there are no more tasks available (assuming the default of 
> spark.speculation = false). So Spark will fail because the only executors 
> available are blacklisted for that stage. 
> An alternative is requesting more executors to the cluster manager in this 
> situation. This could be retried a configurable number of times after a 
> configurable wait time between request attempts, so if the cluster manager 
> fails to provide a suitable executor then the job is aborted like in the 
> previous case. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22148) TaskSetManager.abortIfCompletelyBlacklisted should not abort when all current executors are blacklisted but dynamic allocation is enabled

2020-04-07 Thread Venkata krishnan Sowrirajan (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-22148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17077603#comment-17077603
 ] 

Venkata krishnan Sowrirajan commented on SPARK-22148:
-

[~irashid][~Dhruve Ashar] Recently we have enabled blacklisting in our platform 
and it works nicely mostly. We also have this fix where there are no executors 
to retry due to blacklisting (mainly with dynamic allocation enabled and 
happens during the tail end of the stage). 

I also went through the fix and in general blacklisting code. Although it still 
happens, where all the other executors are busy and no idle blacklisted 
executor left to kill and request a new executor which causes the stage and 
eventually the job to be aborted before all the retries. 

Do you guys also see this behavior or have this issue? Do you think requesting 
a new executor in general would help rather than trying to kill a blacklisted 
idle executor?

> TaskSetManager.abortIfCompletelyBlacklisted should not abort when all current 
> executors are blacklisted but dynamic allocation is enabled
> -
>
> Key: SPARK-22148
> URL: https://issues.apache.org/jira/browse/SPARK-22148
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler, Spark Core
>Affects Versions: 2.2.0
>Reporter: Juan Rodríguez Hortalá
>Assignee: Dhruve Ashar
>Priority: Major
> Fix For: 2.4.1, 3.0.0
>
> Attachments: SPARK-22148_WIP.diff
>
>
> Currently TaskSetManager.abortIfCompletelyBlacklisted aborts the TaskSet and 
> the whole Spark job with `task X (partition Y) cannot run anywhere due to 
> node and executor blacklist. Blacklisting behavior can be configured via 
> spark.blacklist.*.` when all the available executors are blacklisted for a 
> pending Task or TaskSet. This makes sense for static allocation, where the 
> set of executors is fixed for the duration of the application, but this might 
> lead to unnecessary job failures when dynamic allocation is enabled. For 
> example, in a Spark application with a single job at a time, when a node 
> fails at the end of a stage attempt, all other executors will complete their 
> tasks, but the tasks running in the executors of the failing node will be 
> pending. Spark will keep waiting for those tasks for 2 minutes by default 
> (spark.network.timeout) until the heartbeat timeout is triggered, and then it 
> will blacklist those executors for that stage. At that point in time, other 
> executors would had been released after being idle for 1 minute by default 
> (spark.dynamicAllocation.executorIdleTimeout), because the next stage hasn't 
> started yet and so there are no more tasks available (assuming the default of 
> spark.speculation = false). So Spark will fail because the only executors 
> available are blacklisted for that stage. 
> An alternative is requesting more executors to the cluster manager in this 
> situation. This could be retried a configurable number of times after a 
> configurable wait time between request attempts, so if the cluster manager 
> fails to provide a suitable executor then the job is aborted like in the 
> previous case. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27338) Deadlock between TaskMemoryManager and UnsafeExternalSorter$SpillableIterator

2019-04-01 Thread Venkata krishnan Sowrirajan (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27338?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16806946#comment-16806946
 ] 

Venkata krishnan Sowrirajan commented on SPARK-27338:
-

We found this issue in one of the customer workload. Will file a PR for this 
issue shortly.

> Deadlock between TaskMemoryManager and UnsafeExternalSorter$SpillableIterator
> -
>
> Key: SPARK-27338
> URL: https://issues.apache.org/jira/browse/SPARK-27338
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.0
>Reporter: Venkata krishnan Sowrirajan
>Priority: Major
>
> We saw similar deadlock like this 
> https://issues.apache.org/jira/browse/SPARK-26265 happening between 
> TaskMemoryManager and UnsafeExternalSorted$SpillableIterator
> Jstack output:
> jstack information as follow:
> {code:java}
> Found one Java-level deadlock:
> =
> "stdout writer for 
> /usr/lib/envs/env-1923-ver-1755-a-4.2.9-py-3.5.3/bin/python":
>   waiting to lock monitor 0x7fce56409088 (object 0x0005700a2f98, a 
> org.apache.spark.memory.TaskMemoryManager),
>   which is held by "Executor task launch worker for task 2203"
> "Executor task launch worker for task 2203":
>   waiting to lock monitor 0x007cd878 (object 0x0005701a0eb0, a 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter$SpillableIterator),
>   which is held by "stdout writer for 
> /usr/lib/envs/env-1923-ver-1755-a-4.2.9-py-3.5.3/bin/python"
> Java stack information for the threads listed above:
> ===
> "stdout writer for 
> /usr/lib/envs/env-1923-ver-1755-a-4.2.9-py-3.5.3/bin/python":
>   at 
> org.apache.spark.memory.TaskMemoryManager.freePage(TaskMemoryManager.java:334)
>   - waiting to lock <0x0005700a2f98> (a 
> org.apache.spark.memory.TaskMemoryManager)
>   at 
> org.apache.spark.memory.MemoryConsumer.freePage(MemoryConsumer.java:130)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.access$1100(UnsafeExternalSorter.java:48)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter$SpillableIterator.loadNext(UnsafeExternalSorter.java:583)
>   - locked <0x0005701a0eb0> (a 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter$SpillableIterator)
>   at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.next(UnsafeExternalRowSorter.java:187)
>   at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.next(UnsafeExternalRowSorter.java:174)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage10.findNextInnerJoinRows$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage10.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$2.hasNext(WholeStageCodegenExec.scala:638)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
>   at 
> scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1073)
>   at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1089)
>   at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1127)
>   at 
> scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at 
> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
>   at 
> org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.writeIteratorToStream(PythonUDFRunner.scala:50)
>   at 
> org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:345)
>   at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2067)
>   at 
> org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:194)
> "Executor task 

[jira] [Created] (SPARK-27338) Deadlock between TaskMemoryManager and UnsafeExternalSorter$SpillableIterator

2019-04-01 Thread Venkata krishnan Sowrirajan (JIRA)
Venkata krishnan Sowrirajan created SPARK-27338:
---

 Summary: Deadlock between TaskMemoryManager and 
UnsafeExternalSorter$SpillableIterator
 Key: SPARK-27338
 URL: https://issues.apache.org/jira/browse/SPARK-27338
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.4.0
Reporter: Venkata krishnan Sowrirajan


We saw similar deadlock like this 
https://issues.apache.org/jira/browse/SPARK-26265 happening between 
TaskMemoryManager and UnsafeExternalSorted$SpillableIterator

Jstack output:

jstack information as follow:


{code:java}
Found one Java-level deadlock:
=
"stdout writer for /usr/lib/envs/env-1923-ver-1755-a-4.2.9-py-3.5.3/bin/python":
  waiting to lock monitor 0x7fce56409088 (object 0x0005700a2f98, a 
org.apache.spark.memory.TaskMemoryManager),
  which is held by "Executor task launch worker for task 2203"
"Executor task launch worker for task 2203":
  waiting to lock monitor 0x007cd878 (object 0x0005701a0eb0, a 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter$SpillableIterator),
  which is held by "stdout writer for 
/usr/lib/envs/env-1923-ver-1755-a-4.2.9-py-3.5.3/bin/python"

Java stack information for the threads listed above:
===
"stdout writer for /usr/lib/envs/env-1923-ver-1755-a-4.2.9-py-3.5.3/bin/python":
at 
org.apache.spark.memory.TaskMemoryManager.freePage(TaskMemoryManager.java:334)
- waiting to lock <0x0005700a2f98> (a 
org.apache.spark.memory.TaskMemoryManager)
at 
org.apache.spark.memory.MemoryConsumer.freePage(MemoryConsumer.java:130)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.access$1100(UnsafeExternalSorter.java:48)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter$SpillableIterator.loadNext(UnsafeExternalSorter.java:583)
- locked <0x0005701a0eb0> (a 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter$SpillableIterator)
at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.next(UnsafeExternalRowSorter.java:187)
at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.next(UnsafeExternalRowSorter.java:174)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage10.findNextInnerJoinRows$(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage10.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$2.hasNext(WholeStageCodegenExec.scala:638)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at 
scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1073)
at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1089)
at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1127)
at 
scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at 
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
at 
org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.writeIteratorToStream(PythonUDFRunner.scala:50)
at 
org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:345)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2067)
at 
org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:194)
"Executor task launch worker for task 2203":
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter$SpillableIterator.spill(UnsafeExternalSorter.java:525)
- waiting to lock <0x0005701a0eb0> (a 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter$SpillableIterator)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:200)
at 

[jira] [Commented] (SPARK-26894) Fix Alias handling in AggregateEstimation

2019-03-21 Thread Venkata krishnan Sowrirajan (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16798154#comment-16798154
 ] 

Venkata krishnan Sowrirajan commented on SPARK-26894:
-

Thanks for merging the code, Takeshi Yamamuro

On Thu, Mar 21, 2019, 1:29 AM Takeshi Yamamuro (JIRA) 



> Fix Alias handling in AggregateEstimation
> -
>
> Key: SPARK-26894
> URL: https://issues.apache.org/jira/browse/SPARK-26894
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Venkata krishnan Sowrirajan
>Assignee: Venkata krishnan Sowrirajan
>Priority: Major
> Fix For: 3.0.0
>
>
> Aliases are not handled separately in AggregateEstimation similar to 
> ProjectEstimation due to which stats are not getting propagated when CBO is 
> enabled.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26894) Fix Alias handling in AggregateEstimation

2019-02-15 Thread Venkata krishnan Sowrirajan (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16769773#comment-16769773
 ] 

Venkata krishnan Sowrirajan commented on SPARK-26894:
-

Need to assign this Jira to myself. How can I do that?

> Fix Alias handling in AggregateEstimation
> -
>
> Key: SPARK-26894
> URL: https://issues.apache.org/jira/browse/SPARK-26894
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Venkata krishnan Sowrirajan
>Priority: Major
>
> Aliases are not handled separately in AggregateEstimation similar to 
> ProjectEstimation due to which stats are not getting propagated when CBO is 
> enabled.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26894) Fix Alias handling in AggregateEstimation

2019-02-15 Thread Venkata krishnan Sowrirajan (JIRA)
Venkata krishnan Sowrirajan created SPARK-26894:
---

 Summary: Fix Alias handling in AggregateEstimation
 Key: SPARK-26894
 URL: https://issues.apache.org/jira/browse/SPARK-26894
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.4.0
Reporter: Venkata krishnan Sowrirajan


Aliases are not handled separately in AggregateEstimation similar to 
ProjectEstimation due to which stats are not getting propagated when CBO is 
enabled.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-14307) Alluxio Epic JIRA

2016-03-31 Thread Venkata krishnan Sowrirajan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220423#comment-15220423
 ] 

Venkata krishnan Sowrirajan commented on SPARK-14307:
-

Invalid JIRA.

> Alluxio Epic JIRA 
> --
>
> Key: SPARK-14307
> URL: https://issues.apache.org/jira/browse/SPARK-14307
> Project: Spark
>  Issue Type: Epic
>Reporter: Venkata krishnan Sowrirajan
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-14307) Alluxio Epic JIRA

2016-03-31 Thread Venkata krishnan Sowrirajan (JIRA)
Venkata krishnan Sowrirajan created SPARK-14307:
---

 Summary: Alluxio Epic JIRA 
 Key: SPARK-14307
 URL: https://issues.apache.org/jira/browse/SPARK-14307
 Project: Spark
  Issue Type: Epic
Reporter: Venkata krishnan Sowrirajan






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Closed] (SPARK-14307) Alluxio Epic JIRA

2016-03-31 Thread Venkata krishnan Sowrirajan (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-14307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Venkata krishnan Sowrirajan closed SPARK-14307.
---
Resolution: Fixed

> Alluxio Epic JIRA 
> --
>
> Key: SPARK-14307
> URL: https://issues.apache.org/jira/browse/SPARK-14307
> Project: Spark
>  Issue Type: Epic
>Reporter: Venkata krishnan Sowrirajan
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org