[jira] [Commented] (SPARK-23722) Add support for inserting empty map or array to table

2018-03-19 Thread Sital Kedia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16405181#comment-16405181
 ] 

Sital Kedia commented on SPARK-23722:
-

Actually this is a duplicate of SPARK-21281 and the issue has been resolved in 
2.3. Closing

> Add support for inserting empty map or array to table
> -
>
> Key: SPARK-23722
> URL: https://issues.apache.org/jira/browse/SPARK-23722
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.0.2
>Reporter: Sital Kedia
>Priority: Minor
>
> A query like following fails in spark currently -
> ```
> CREATE TABLE arrayTypeTable (data array)
> INSERT OVERWRITE TABLE arrayTypeTable select array()
> ```
> This is due to Spark is not allowing casting an empty map/array to 
> array. This is a use case currently supported by both Hive and Presto 
> and we need to support this in order to make Spark compatible with existing 
> Hive queries. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-23722) Add support for inserting empty map or array to table

2018-03-19 Thread Sital Kedia (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23722?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sital Kedia resolved SPARK-23722.
-
Resolution: Duplicate

> Add support for inserting empty map or array to table
> -
>
> Key: SPARK-23722
> URL: https://issues.apache.org/jira/browse/SPARK-23722
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.0.2
>Reporter: Sital Kedia
>Priority: Minor
>
> A query like following fails in spark currently -
> ```
> CREATE TABLE arrayTypeTable (data array)
> INSERT OVERWRITE TABLE arrayTypeTable select array()
> ```
> This is due to Spark is not allowing casting an empty map/array to 
> array. This is a use case currently supported by both Hive and Presto 
> and we need to support this in order to make Spark compatible with existing 
> Hive queries. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23722) Add support for inserting empty map or array to table

2018-03-16 Thread Sital Kedia (JIRA)
Sital Kedia created SPARK-23722:
---

 Summary: Add support for inserting empty map or array to table
 Key: SPARK-23722
 URL: https://issues.apache.org/jira/browse/SPARK-23722
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.0.2
Reporter: Sital Kedia


A query like following fails in spark currently -

```

CREATE TABLE arrayTypeTable (data array)
INSERT OVERWRITE TABLE arrayTypeTable select array()

```

This is due to Spark is not allowing casting an empty map/array to 
array. This is a use case currently supported by both Hive and Presto 
and we need to support this in order to make Spark compatible with existing 
Hive queries. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18134) SQL: MapType in Group BY and Joins not working

2018-03-11 Thread Sital Kedia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16394571#comment-16394571
 ] 

Sital Kedia commented on SPARK-18134:
-

[~hvanhovell]- What is the state of this JIRA? Do we expect to make progress on 
this any time soon? This is a very critical feature we are missing in Spark 
which is blocking us from auto-migrating HiveQL to Spark. Please let us know if 
anything we can do to help resolve this issue.

 

cc - [~sameerag], [~tejasp]

> SQL: MapType in Group BY and Joins not working
> --
>
> Key: SPARK-18134
> URL: https://issues.apache.org/jira/browse/SPARK-18134
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.6.0, 1.6.1, 1.6.2, 2.0.0, 2.0.1, 
> 2.1.0
>Reporter: Christian Zorneck
>Priority: Major
>
> Since version 1.5 and issue SPARK-9415, MapTypes can no longer be used in 
> GROUP BY and join clauses. This makes it incompatible to HiveQL. So, a Hive 
> feature was removed from Spark. This makes Spark incompatible to various 
> HiveQL statements.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23502) Support async init of spark context during spark-shell startup

2018-02-28 Thread Sital Kedia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16380942#comment-16380942
 ] 

Sital Kedia commented on SPARK-23502:
-

>> what happens when you operate on {{sc}} before it's initialized?

We will wait for the future to complete before triggering any action based on 
user input, so that should be fine.

>> Is it surprising if Spark shell starts but errors out 20 seconds later? 

Yes, that might be one of the side effects.  Another major side effect as I 
mentioned is not able to print the messages like UI link and app id when the 
spark-shell starts.

 

I just wanted to get some opinion to see if this is something useful for the 
community. If we do not think so, we can close this.

 

 

 

> Support async init of spark context during spark-shell startup
> --
>
> Key: SPARK-23502
> URL: https://issues.apache.org/jira/browse/SPARK-23502
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Shell
>Affects Versions: 2.0.0
>Reporter: Sital Kedia
>Priority: Minor
>
> Currently, whenever a user starts the spark shell, we initialize the spark 
> context before returning the prompt to the user. In environments, where spark 
> context initialization takes several seconds, it is not a very good user 
> experience for the user to wait for the prompt. Instead of waiting for the 
> initialization of spark context, we can initialize it in the background while 
> we return the prompt to the user as soon as possible. Please note that even 
> if we return the prompt to the user soon, we still need to make sure to wait 
> for the spark context initialization to complete before any query is 
> executed. 
> Please note that the scala interpreter already does very similar async 
> initialization in order to return the prompt to the user faster - 
> https://github.com/scala/scala/blob/v2.12.2/src/repl/scala/tools/nsc/interpreter/ILoop.scala#L414.
>  We will be emulating the behavior for Spark. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23502) Support async init of spark context during spark-shell startup

2018-02-23 Thread Sital Kedia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16375051#comment-16375051
 ] 

Sital Kedia commented on SPARK-23502:
-

I realized that we are printing the web url link and the application id during 
the spark-shell startup 
(https://github.com/scala/scala/blob/v2.12.2/src/repl/scala/tools/nsc/interpreter/ILoop.scala#L414).
 If we do asynchronous initialization of spark context, those info will not be 
available during the startup so we won't be able to print them. 

[~r...@databricks.com], [~srowen]  - What do you think about async 
initialization of spark context and getting rid of the web url link and 
application id that is printed during the startup?

 

 

> Support async init of spark context during spark-shell startup
> --
>
> Key: SPARK-23502
> URL: https://issues.apache.org/jira/browse/SPARK-23502
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Shell
>Affects Versions: 2.0.0
>Reporter: Sital Kedia
>Priority: Minor
>
> Currently, whenever a user starts the spark shell, we initialize the spark 
> context before returning the prompt to the user. In environments, where spark 
> context initialization takes several seconds, it is not a very good user 
> experience for the user to wait for the prompt. Instead of waiting for the 
> initialization of spark context, we can initialize it in the background while 
> we return the prompt to the user as soon as possible. Please note that even 
> if we return the prompt to the user soon, we still need to make sure to wait 
> for the spark context initialization to complete before any query is 
> executed. 
> Please note that the scala interpreter already does very similar async 
> initialization in order to return the prompt to the user faster - 
> https://github.com/scala/scala/blob/v2.12.2/src/repl/scala/tools/nsc/interpreter/ILoop.scala#L414.
>  We will be emulating the behavior for Spark. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23502) Support async init of spark context during spark-shell startup

2018-02-23 Thread Sital Kedia (JIRA)
Sital Kedia created SPARK-23502:
---

 Summary: Support async init of spark context during spark-shell 
startup
 Key: SPARK-23502
 URL: https://issues.apache.org/jira/browse/SPARK-23502
 Project: Spark
  Issue Type: Improvement
  Components: Spark Shell
Affects Versions: 2.0.0
Reporter: Sital Kedia


Currently, whenever a user starts the spark shell, we initialize the spark 
context before returning the prompt to the user. In environments, where spark 
context initialization takes several seconds, it is not a very good user 
experience for the user to wait for the prompt. Instead of waiting for the 
initialization of spark context, we can initialize it in the background while 
we return the prompt to the user as soon as possible. Please note that even if 
we return the prompt to the user soon, we still need to make sure to wait for 
the spark context initialization to complete before any query is executed. 

Please note that the scala interpreter already does very similar async 
initialization in order to return the prompt to the user faster - 
https://github.com/scala/scala/blob/v2.12.2/src/repl/scala/tools/nsc/interpreter/ILoop.scala#L414.
 We will be emulating the behavior for Spark. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23310) Perf regression introduced by SPARK-21113

2018-02-02 Thread Sital Kedia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16351097#comment-16351097
 ] 

Sital Kedia commented on SPARK-23310:
-

https://github.com/apache/spark/pull/20492

> Perf regression introduced by SPARK-21113
> -
>
> Key: SPARK-23310
> URL: https://issues.apache.org/jira/browse/SPARK-23310
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Yin Huai
>Priority: Blocker
>
> While running all TPC-DS queries with SF set to 1000, we noticed that Q95 
> (https://github.com/databricks/spark-sql-perf/blob/master/src/main/resources/tpcds_2_4/q95.sql)
>  has noticeable regression (11%). After looking into it, we found that the 
> regression was introduced by SPARK-21113. Specially, ReadAheadInputStream 
> gets lock congestion. After setting 
> spark.unsafe.sorter.spill.read.ahead.enabled set to false, the regression 
> disappear and the overall performance of all TPC-DS queries has improved.
>  
> I am proposing that we set spark.unsafe.sorter.spill.read.ahead.enabled to 
> false by default for Spark 2.3 and re-enable it after addressing the lock 
> congestion issue. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23310) Perf regression introduced by SPARK-21113

2018-02-02 Thread Sital Kedia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16350875#comment-16350875
 ] 

Sital Kedia commented on SPARK-23310:
-

[~yhuai] - Sorry about introducing the regression for TPC-DS queries. Let's set 
spark.unsafe.sorter.spill.read.ahead.enabled to false to unblock the release.  
Let me know if you want me to make a PR or you can do it yourself.

 

>> ReadAheadInputStream gets lock congestion

BTW, do you have idea where the lock contention is occuring? I will file a 
separate Jira and work on fixing it.

> Perf regression introduced by SPARK-21113
> -
>
> Key: SPARK-23310
> URL: https://issues.apache.org/jira/browse/SPARK-23310
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Yin Huai
>Priority: Blocker
>
> While running all TPC-DS queries with SF set to 1000, we noticed that Q95 
> (https://github.com/databricks/spark-sql-perf/blob/master/src/main/resources/tpcds_2_4/q95.sql)
>  has noticeable regression (11%). After looking into it, we found that the 
> regression was introduced by SPARK-21113. Specially, ReadAheadInputStream 
> gets lock congestion. After setting 
> spark.unsafe.sorter.spill.read.ahead.enabled set to false, the regression 
> disappear and the overall performance of all TPC-DS queries has improved.
>  
> I am proposing that we set spark.unsafe.sorter.spill.read.ahead.enabled to 
> false by default for Spark 2.3 and re-enable it after addressing the lock 
> congestion issue. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22827) Avoid throwing OutOfMemoryError in case of exception in spill

2017-12-18 Thread Sital Kedia (JIRA)
Sital Kedia created SPARK-22827:
---

 Summary: Avoid throwing OutOfMemoryError in case of exception in 
spill
 Key: SPARK-22827
 URL: https://issues.apache.org/jira/browse/SPARK-22827
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.2.0
Reporter: Sital Kedia


Currently, the task memory manager throws an OutofMemory error when there is an 
IO exception happens in spill() - 
https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java#L194.
 Similarly there any many other places in code when if a task is not able to 
acquire memory due to an exception we throw an OutofMemory error which kills 
the entire executor and hence failing all the tasks that are running on that 
executor instead of just failing one single task. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22312) Spark job stuck with no executor due to bug in Executor Allocation Manager

2017-10-18 Thread Sital Kedia (JIRA)
Sital Kedia created SPARK-22312:
---

 Summary: Spark job stuck with no executor due to bug in Executor 
Allocation Manager
 Key: SPARK-22312
 URL: https://issues.apache.org/jira/browse/SPARK-22312
 Project: Spark
  Issue Type: Bug
  Components: Scheduler
Affects Versions: 2.2.0
Reporter: Sital Kedia


We often see the issue of Spark jobs stuck because the Executor Allocation 
Manager does not ask for any executor even if there are pending tasks in case 
dynamic allocation is turned on. Looking at the logic in EAM which calculates 
the running tasks, it can happen that the calculation will be wrong and the 
number of running tasks can become negative. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21867) Support async spilling in UnsafeShuffleWriter

2017-09-12 Thread Sital Kedia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16163705#comment-16163705
 ] 

Sital Kedia commented on SPARK-21867:
-

[~rxin] - You are right, it is very tricky to get it right. Working on the PR 
now, will have it in few days. 

> Support async spilling in UnsafeShuffleWriter
> -
>
> Key: SPARK-21867
> URL: https://issues.apache.org/jira/browse/SPARK-21867
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: Sital Kedia
>Priority: Minor
>
> Currently, Spark tasks are single-threaded. But we see it could greatly 
> improve the performance of the jobs, if we can multi-thread some part of it. 
> For example, profiling our map tasks, which reads large amount of data from 
> HDFS and spill to disks, we see that we are blocked on HDFS read and spilling 
> majority of the time. Since both these operations are IO intensive the 
> average CPU consumption during map phase is significantly low. In theory, 
> both HDFS read and spilling can be done in parallel if we had additional 
> memory to store data read from HDFS while we are spilling the last batch read.
> Let's say we have 1G of shuffle memory available per task. Currently, in case 
> of map task, it reads from HDFS and the records are stored in the available 
> memory buffer. Once we hit the memory limit and there is no more space to 
> store the records, we sort and spill the content to disk. While we are 
> spilling to disk, since we do not have any available memory, we can not read 
> from HDFS concurrently. 
> Here we propose supporting async spilling for UnsafeShuffleWriter, so that we 
> can support reading from HDFS when sort and spill is happening 
> asynchronously.  Let's say the total 1G of shuffle memory can be split into 
> two regions - active region and spilling region - each of size 500 MB. We 
> start with reading from HDFS and filling the active region. Once we hit the 
> limit of active region, we issue an asynchronous spill, while fliping the 
> active region and spilling region. While the spil is happening 
> asynchronosuly, we still have 500 MB of memory available to read the data 
> from HDFS. This way we can amortize the high disk/network io cost during 
> spilling.
> We made a prototype hack to implement this feature and we could see our map 
> tasks were as much as 40% faster. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21867) Support async spilling in UnsafeShuffleWriter

2017-08-29 Thread Sital Kedia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16146060#comment-16146060
 ] 

Sital Kedia commented on SPARK-21867:
-

cc - [~rxin], [~joshrosen], [~sameer] - What do you think of the idea?

> Support async spilling in UnsafeShuffleWriter
> -
>
> Key: SPARK-21867
> URL: https://issues.apache.org/jira/browse/SPARK-21867
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: Sital Kedia
>Priority: Minor
>
> Currently, Spark tasks are single-threaded. But we see it could greatly 
> improve the performance of the jobs, if we can multi-thread some part of it. 
> For example, profiling our map tasks, which reads large amount of data from 
> HDFS and spill to disks, we see that we are blocked on HDFS read and spilling 
> majority of the time. Since both these operations are IO intensive the 
> average CPU consumption during map phase is significantly low. In theory, 
> both HDFS read and spilling can be done in parallel if we had additional 
> memory to store data read from HDFS while we are spilling the last batch read.
> Let's say we have 1G of shuffle memory available per task. Currently, in case 
> of map task, it reads from HDFS and the records are stored in the available 
> memory buffer. Once we hit the memory limit and there is no more space to 
> store the records, we sort and spill the content to disk. While we are 
> spilling to disk, since we do not have any available memory, we can not read 
> from HDFS concurrently. 
> Here we propose supporting async spilling for UnsafeShuffleWriter, so that we 
> can support reading from HDFS when sort and spill is happening 
> asynchronously.  Let's say the total 1G of shuffle memory can be split into 
> two regions - active region and spilling region - each of size 500 MB. We 
> start with reading from HDFS and filling the active region. Once we hit the 
> limit of active region, we issue an asynchronous spill, while fliping the 
> active region and spilling region. While the spil is happening 
> asynchronosuly, we still have 500 MB of memory available to read the data 
> from HDFS. This way we can amortize the high disk/network io cost during 
> spilling.
> We made a prototype hack to implement this feature and we could see our map 
> tasks were as much as 40% faster. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-21867) Support async spilling in UnsafeShuffleWriter

2017-08-29 Thread Sital Kedia (JIRA)
Sital Kedia created SPARK-21867:
---

 Summary: Support async spilling in UnsafeShuffleWriter
 Key: SPARK-21867
 URL: https://issues.apache.org/jira/browse/SPARK-21867
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 2.2.0
Reporter: Sital Kedia
Priority: Minor


Currently, Spark tasks are single-threaded. But we see it could greatly improve 
the performance of the jobs, if we can multi-thread some part of it. For 
example, profiling our map tasks, which reads large amount of data from HDFS 
and spill to disks, we see that we are blocked on HDFS read and spilling 
majority of the time. Since both these operations are IO intensive the average 
CPU consumption during map phase is significantly low. In theory, both HDFS 
read and spilling can be done in parallel if we had additional memory to store 
data read from HDFS while we are spilling the last batch read.

Let's say we have 1G of shuffle memory available per task. Currently, in case 
of map task, it reads from HDFS and the records are stored in the available 
memory buffer. Once we hit the memory limit and there is no more space to store 
the records, we sort and spill the content to disk. While we are spilling to 
disk, since we do not have any available memory, we can not read from HDFS 
concurrently. 

Here we propose supporting async spilling for UnsafeShuffleWriter, so that we 
can support reading from HDFS when sort and spill is happening asynchronously.  
Let's say the total 1G of shuffle memory can be split into two regions - active 
region and spilling region - each of size 500 MB. We start with reading from 
HDFS and filling the active region. Once we hit the limit of active region, we 
issue an asynchronous spill, while fliping the active region and spilling 
region. While the spil is happening asynchronosuly, we still have 500 MB of 
memory available to read the data from HDFS. This way we can amortize the high 
disk/network io cost during spilling.

We made a prototype hack to implement this feature and we could see our map 
tasks were as much as 40% faster. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-21834) Incorrect executor request in case of dynamic allocation

2017-08-24 Thread Sital Kedia (JIRA)
Sital Kedia created SPARK-21834:
---

 Summary: Incorrect executor request in case of dynamic allocation
 Key: SPARK-21834
 URL: https://issues.apache.org/jira/browse/SPARK-21834
 Project: Spark
  Issue Type: Bug
  Components: Scheduler
Affects Versions: 2.2.0
Reporter: Sital Kedia


killExecutor api currently does not allow killing an executor without updating 
the total number of executors needed. In case of dynamic allocation is turned 
on and the allocator tries to kill an executor, the scheduler reduces the total 
number of executors needed ( see 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala#L635)
 which is incorrect because the allocator already takes care of setting the 
required number of executors itself. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Closed] (SPARK-21833) CoarseGrainedSchedulerBackend leaks executors in case of dynamic allocation

2017-08-24 Thread Sital Kedia (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21833?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sital Kedia closed SPARK-21833.
---
Resolution: Duplicate

Duplicate of SPARK-20540

> CoarseGrainedSchedulerBackend leaks executors in case of dynamic allocation
> ---
>
> Key: SPARK-21833
> URL: https://issues.apache.org/jira/browse/SPARK-21833
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler
>Affects Versions: 2.2.0
>Reporter: Sital Kedia
>
> We have seen this issue in coarse grained scheduler that in case of dynamic 
> executor allocation is turned on, the scheduler asks for more executors than 
> needed. Consider the following situation where there are excutor allocation 
> manager is ramping down the number of executors. It will lower the executor 
> targer number by calling requestTotalExecutors api. 
> Later,  when the allocation manager finds some executors to be idle, it will 
> call killExecutor api. The coarse grain scheduler, in the killExecutor 
> function replaces the total executor needed to current  + pending which 
> overrides the earlier target set by the allocation manager. 
> This results in scheduler spawning more executors than actually needed. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-21833) CoarseGrainedSchedulerBackend leaks executors in case of dynamic allocation

2017-08-24 Thread Sital Kedia (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21833?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sital Kedia updated SPARK-21833:

Description: 
We have seen this issue in coarse grained scheduler that in case of dynamic 
executor allocation is turned on, the scheduler asks for more executors than 
needed. Consider the following situation where there are excutor allocation 
manager is ramping down the number of executors. It will lower the executor 
targer number by calling requestTotalExecutors api. 

Later,  when the allocation manager finds some executors to be idle, it will 
call killExecutor api. The coarse grain scheduler, in the killExecutor function 
replaces the total executor needed to current  + pending which overrides the 
earlier target set by the allocation manager. 

This results in scheduler spawning more executors than actually needed. 

  was:
We have seen this issue in coarse grained scheduler that in case of dynamic 
executor allocation is turned on, the scheduler asks for more executors than 
needed. Consider the following situation where there are excutor allocation 
manager is ramping down the number of executors. It will lower the executor 
targer number by calling requestTotalExecutors api (see 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L326.
 

Later,  when the allocation manager finds some executors to be idle, it will 
call killExecutor api 
(https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L447).
 The coarse grain scheduler, in the killExecutor function replaces the total 
executor needed to current  + pending which overrides the earlier target set by 
the allocation manager 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala#L523.
 

This results in scheduler spawning more executors than actually needed. 


> CoarseGrainedSchedulerBackend leaks executors in case of dynamic allocation
> ---
>
> Key: SPARK-21833
> URL: https://issues.apache.org/jira/browse/SPARK-21833
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler
>Affects Versions: 2.2.0
>Reporter: Sital Kedia
>
> We have seen this issue in coarse grained scheduler that in case of dynamic 
> executor allocation is turned on, the scheduler asks for more executors than 
> needed. Consider the following situation where there are excutor allocation 
> manager is ramping down the number of executors. It will lower the executor 
> targer number by calling requestTotalExecutors api. 
> Later,  when the allocation manager finds some executors to be idle, it will 
> call killExecutor api. The coarse grain scheduler, in the killExecutor 
> function replaces the total executor needed to current  + pending which 
> overrides the earlier target set by the allocation manager. 
> This results in scheduler spawning more executors than actually needed. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21833) CoarseGrainedSchedulerBackend leaks executors in case of dynamic allocation

2017-08-24 Thread Sital Kedia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16140533#comment-16140533
 ] 

Sital Kedia commented on SPARK-21833:
-

Actually, SPARK-20540 already addressed this issue on latest trunk. Will close 
this issue. 

> CoarseGrainedSchedulerBackend leaks executors in case of dynamic allocation
> ---
>
> Key: SPARK-21833
> URL: https://issues.apache.org/jira/browse/SPARK-21833
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler
>Affects Versions: 2.2.0
>Reporter: Sital Kedia
>
> We have seen this issue in coarse grained scheduler that in case of dynamic 
> executor allocation is turned on, the scheduler asks for more executors than 
> needed. Consider the following situation where there are excutor allocation 
> manager is ramping down the number of executors. It will lower the executor 
> targer number by calling requestTotalExecutors api. 
> Later,  when the allocation manager finds some executors to be idle, it will 
> call killExecutor api. The coarse grain scheduler, in the killExecutor 
> function replaces the total executor needed to current  + pending which 
> overrides the earlier target set by the allocation manager. 
> This results in scheduler spawning more executors than actually needed. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-21833) CoarseGrainedSchedulerBackend leaks executors in case of dynamic allocation

2017-08-24 Thread Sital Kedia (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21833?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sital Kedia updated SPARK-21833:

Description: 
We have seen this issue in coarse grained scheduler that in case of dynamic 
executor allocation is turned on, the scheduler asks for more executors than 
needed. Consider the following situation where there are excutor allocation 
manager is ramping down the number of executors. It will lower the executor 
targer number by calling requestTotalExecutors api (see 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L326.
 

Later,  when the allocation manager finds some executors to be idle, it will 
call killExecutor api 
(https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L447).
 The coarse grain scheduler, in the killExecutor function replaces the total 
executor needed to current  + pending which overrides the earlier target set by 
the allocation manager 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala#L523.
 

This results in scheduler spawning more executors than actually needed. 

  was:
We have seen this issue in coarse grained scheduler that in case of dynamic 
executor allocation is turned on, the scheduler asks for more executors than 
needed. Consider the following situation where there are excutor allocation 
manager is ramping down the number of executors. It will lower the executor 
targer number by calling requestTotalExecutors api (see 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L326.
 

Later,  when the allocation manager finds some executors to be idle, it will 
call killExecutor api 
(https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L421).
 The coarse grain scheduler, in the killExecutor function replaces the total 
executor needed to current  + pending which overrides the earlier target set by 
the allocation manager 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala#L523.
 

This results in scheduler spawning more executors than actually needed. 


> CoarseGrainedSchedulerBackend leaks executors in case of dynamic allocation
> ---
>
> Key: SPARK-21833
> URL: https://issues.apache.org/jira/browse/SPARK-21833
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler
>Affects Versions: 2.2.0
>Reporter: Sital Kedia
>
> We have seen this issue in coarse grained scheduler that in case of dynamic 
> executor allocation is turned on, the scheduler asks for more executors than 
> needed. Consider the following situation where there are excutor allocation 
> manager is ramping down the number of executors. It will lower the executor 
> targer number by calling requestTotalExecutors api (see 
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L326.
>  
> Later,  when the allocation manager finds some executors to be idle, it will 
> call killExecutor api 
> (https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L447).
>  The coarse grain scheduler, in the killExecutor function replaces the total 
> executor needed to current  + pending which overrides the earlier target set 
> by the allocation manager 
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala#L523.
>  
> This results in scheduler spawning more executors than actually needed. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-21833) CoarseGrainedSchedulerBackend leaks executors in case of dynamic allocation

2017-08-24 Thread Sital Kedia (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21833?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sital Kedia updated SPARK-21833:

Description: 
We have seen this issue in coarse grained scheduler that in case of dynamic 
executor allocation is turned on, the scheduler asks for more executors than 
needed. Consider the following situation where there are excutor allocation 
manager is ramping down the number of executors. It will lower the executor 
targer number by calling requestTotalExecutors api (see 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L326.
 

Later,  when the allocation manager finds some executors to be idle, it will 
call killExecutor api 
(https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L421).
 The coarse grain scheduler, in the killExecutor function replaces the total 
executor needed to current  + pending which overrides the earlier target set by 
the allocation manager 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala#L523.
 

This results in scheduler spawning more executors than actually needed. 

  was:
We have seen this issue in coarse grained scheduler that in case of dynamic 
executor allocation is turned on, the scheduler asks for more executors than 
needed. Consider the following situation where there are excutor allocation 
manager is ramping down the number of executors. It will lower the executor 
targer number by calling requestTotalExecutors api (see 
https://github.com/sitalkedia/spark/blob/master/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L321).
 

Later,  when the allocation manager finds some executors to be idle, it will 
call killExecutor api 
(https://github.com/sitalkedia/spark/blob/master/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L421).
 The coarse grain scheduler, in the killExecutor function replaces the total 
executor needed to current  + pending which overrides the earlier target set by 
the allocation manager 
https://github.com/sitalkedia/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala#L523.
 

This results in scheduler spawning more executors than actually needed. 


> CoarseGrainedSchedulerBackend leaks executors in case of dynamic allocation
> ---
>
> Key: SPARK-21833
> URL: https://issues.apache.org/jira/browse/SPARK-21833
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler
>Affects Versions: 2.2.0
>Reporter: Sital Kedia
>
> We have seen this issue in coarse grained scheduler that in case of dynamic 
> executor allocation is turned on, the scheduler asks for more executors than 
> needed. Consider the following situation where there are excutor allocation 
> manager is ramping down the number of executors. It will lower the executor 
> targer number by calling requestTotalExecutors api (see 
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L326.
>  
> Later,  when the allocation manager finds some executors to be idle, it will 
> call killExecutor api 
> (https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L421).
>  The coarse grain scheduler, in the killExecutor function replaces the total 
> executor needed to current  + pending which overrides the earlier target set 
> by the allocation manager 
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala#L523.
>  
> This results in scheduler spawning more executors than actually needed. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-21833) CoarseGrainedSchedulerBackend leaks executors in case of dynamic allocation

2017-08-24 Thread Sital Kedia (JIRA)
Sital Kedia created SPARK-21833:
---

 Summary: CoarseGrainedSchedulerBackend leaks executors in case of 
dynamic allocation
 Key: SPARK-21833
 URL: https://issues.apache.org/jira/browse/SPARK-21833
 Project: Spark
  Issue Type: Bug
  Components: Scheduler
Affects Versions: 2.2.0
Reporter: Sital Kedia


We have seen this issue in coarse grained scheduler that in case of dynamic 
executor allocation is turned on, the scheduler asks for more executors than 
needed. Consider the following situation where there are excutor allocation 
manager is ramping down the number of executors. It will lower the executor 
targer number by calling requestTotalExecutors api (see 
https://github.com/sitalkedia/spark/blob/master/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L321).
 

Later,  when the allocation manager finds some executors to be idle, it will 
call killExecutor api 
(https://github.com/sitalkedia/spark/blob/master/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L421).
 The coarse grain scheduler, in the killExecutor function replaces the total 
executor needed to current  + pending which overrides the earlier target set by 
the allocation manager 
https://github.com/sitalkedia/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala#L523.
 

This results in scheduler spawning more executors than actually needed. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19112) add codec for ZStandard

2017-08-01 Thread Sital Kedia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16109784#comment-16109784
 ] 

Sital Kedia commented on SPARK-19112:
-

[~sowen], [~tgraves] - Using zstd compression for our Spark jobs spilling 100s 
of TBs of data, we could reduce the amount of data written to disk by as much 
as 50%. This translates to significant latency gain because of reduced disk io 
operations. There is a degradation CPU time by 2  - 5% because of zstd 
compression overhead, but for jobs which are bottlenecked by disk IO, this hit 
can be taken. We are going to enable Zstd as default compression for all of our 
jobs. We should support zstd in open source Spark as well. I will reopen the PR 
with some minor changes. 

> add codec for ZStandard
> ---
>
> Key: SPARK-19112
> URL: https://issues.apache.org/jira/browse/SPARK-19112
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Reporter: Thomas Graves
>Priority: Minor
>
> ZStandard: https://github.com/facebook/zstd and 
> http://facebook.github.io/zstd/ has been in use for a while now. v1.0 was 
> recently released. Hadoop 
> (https://issues.apache.org/jira/browse/HADOOP-13578) and others 
> (https://issues.apache.org/jira/browse/KAFKA-4514) are adopting it.
> Zstd seems to give great results => Gzip level Compression with Lz4 level CPU.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21444) Fetch failure due to node reboot causes job failure

2017-07-17 Thread Sital Kedia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16090800#comment-16090800
 ] 

Sital Kedia commented on SPARK-21444:
-

Any idea how to fix this issue?

> Fetch failure due to node reboot causes job failure
> ---
>
> Key: SPARK-21444
> URL: https://issues.apache.org/jira/browse/SPARK-21444
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler
>Affects Versions: 2.0.2
>Reporter: Sital Kedia
>
> We started seeing this issue after merging the PR - 
> https://github.com/apache/spark/pull/17955. 
> This PR introduced a change to keep the map-output statuses only in the map 
> output tracker and whenever there is a fetch failure, the scheduler tries to 
> invalidate the map-output statuses by talking all the the block manager slave 
> end point. However, in case the fetch failure is because node reboot, the 
> block manager slave end point would not be reachable and so the driver fails 
> the job as a result. See exception below -
> {code}
> 2017-07-15 05:20:50,255 WARN  storage.BlockManagerMaster 
> (Logging.scala:logWarning(87)) - Failed to remove broadcast 10 with 
> removeFromMaster = true - Connection reset by peer
> java.io.IOException: Connection reset by peer
> at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
> at sun.nio.ch.IOUtil.read(IOUtil.java:192)
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
> at 
> io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:313)
> at 
> io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:242)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
> at java.lang.Thread.run(Thread.java:745)
> 2017-07-15 05:20:50,275 ERROR scheduler.DAGSchedulerEventProcessLoop 
> (Logging.scala:logError(91)) - DAGSchedulerEventProcessLoop failed; shutting 
> down SparkContext
> org.apache.spark.SparkException: Exception thrown in awaitResult
> at 
> org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77)
> at 
> org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:75)
> at 
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> at 
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
> at 
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
> at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)
> at 
> org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:143)
> at 
> org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:271)
> at 
> org.apache.spark.broadcast.TorrentBroadcast.doDestroy(TorrentBroadcast.scala:165)
> at org.apache.spark.broadcast.Broadcast.destroy(Broadcast.scala:111)
> at org.apache.spark.broadcast.Broadcast.destroy(Broadcast.scala:98)
> at 
> org.apache.spark.ShuffleStatus.invalidateSerializedMapOutputStatusCache(MapOutputTracker.scala:197)
> at 
> org.apache.spark.ShuffleStatus.removeMapOutput(MapOutputTracker.scala:105)
> at 
> org.apache.spark.MapOutputTrackerMaster.unregisterMapOutput(MapOutputTracker.scala:420)
> at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1324)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1715)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1673)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1662)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> Caused by: java.io.IOException: Connection reset by peer
> at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
> a

[jira] [Commented] (SPARK-21444) Fetch failure due to node reboot causes job failure

2017-07-17 Thread Sital Kedia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16090799#comment-16090799
 ] 

Sital Kedia commented on SPARK-21444:
-

cc - [~joshrosen]

> Fetch failure due to node reboot causes job failure
> ---
>
> Key: SPARK-21444
> URL: https://issues.apache.org/jira/browse/SPARK-21444
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler
>Affects Versions: 2.0.2
>Reporter: Sital Kedia
>
> We started seeing this issue after merging the PR - 
> https://github.com/apache/spark/pull/17955. 
> This PR introduced a change to keep the map-output statuses only in the map 
> output tracker and whenever there is a fetch failure, the scheduler tries to 
> invalidate the map-output statuses by talking all the the block manager slave 
> end point. However, in case the fetch failure is because node reboot, the 
> block manager slave end point would not be reachable and so the driver fails 
> the job as a result. See exception below -
> {code}
> 2017-07-15 05:20:50,255 WARN  storage.BlockManagerMaster 
> (Logging.scala:logWarning(87)) - Failed to remove broadcast 10 with 
> removeFromMaster = true - Connection reset by peer
> java.io.IOException: Connection reset by peer
> at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
> at sun.nio.ch.IOUtil.read(IOUtil.java:192)
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
> at 
> io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:313)
> at 
> io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:242)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
> at java.lang.Thread.run(Thread.java:745)
> 2017-07-15 05:20:50,275 ERROR scheduler.DAGSchedulerEventProcessLoop 
> (Logging.scala:logError(91)) - DAGSchedulerEventProcessLoop failed; shutting 
> down SparkContext
> org.apache.spark.SparkException: Exception thrown in awaitResult
> at 
> org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77)
> at 
> org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:75)
> at 
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> at 
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
> at 
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
> at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)
> at 
> org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:143)
> at 
> org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:271)
> at 
> org.apache.spark.broadcast.TorrentBroadcast.doDestroy(TorrentBroadcast.scala:165)
> at org.apache.spark.broadcast.Broadcast.destroy(Broadcast.scala:111)
> at org.apache.spark.broadcast.Broadcast.destroy(Broadcast.scala:98)
> at 
> org.apache.spark.ShuffleStatus.invalidateSerializedMapOutputStatusCache(MapOutputTracker.scala:197)
> at 
> org.apache.spark.ShuffleStatus.removeMapOutput(MapOutputTracker.scala:105)
> at 
> org.apache.spark.MapOutputTrackerMaster.unregisterMapOutput(MapOutputTracker.scala:420)
> at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1324)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1715)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1673)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1662)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> Caused by: java.io.IOException: Connection reset by peer
> at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
> at sun.nio.ch.S

[jira] [Created] (SPARK-21444) Fetch failure due to node reboot causes job failure

2017-07-17 Thread Sital Kedia (JIRA)
Sital Kedia created SPARK-21444:
---

 Summary: Fetch failure due to node reboot causes job failure
 Key: SPARK-21444
 URL: https://issues.apache.org/jira/browse/SPARK-21444
 Project: Spark
  Issue Type: Bug
  Components: Scheduler
Affects Versions: 2.0.2
Reporter: Sital Kedia


We started seeing this issue after merging the PR - 
https://github.com/apache/spark/pull/17955. 

This PR introduced a change to keep the map-output statuses only in the map 
output tracker and whenever there is a fetch failure, the scheduler tries to 
invalidate the map-output statuses by talking all the the block manager slave 
end point. However, in case the fetch failure is because node reboot, the block 
manager slave end point would not be reachable and so the driver fails the job 
as a result. See exception below -


{code}
2017-07-15 05:20:50,255 WARN  storage.BlockManagerMaster 
(Logging.scala:logWarning(87)) - Failed to remove broadcast 10 with 
removeFromMaster = true - Connection reset by peer
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at 
io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:313)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
at 
io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:242)
at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:745)
2017-07-15 05:20:50,275 ERROR scheduler.DAGSchedulerEventProcessLoop 
(Logging.scala:logError(91)) - DAGSchedulerEventProcessLoop failed; shutting 
down SparkContext
org.apache.spark.SparkException: Exception thrown in awaitResult
at 
org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77)
at 
org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:75)
at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at 
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
at 
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)
at 
org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:143)
at 
org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:271)
at 
org.apache.spark.broadcast.TorrentBroadcast.doDestroy(TorrentBroadcast.scala:165)
at org.apache.spark.broadcast.Broadcast.destroy(Broadcast.scala:111)
at org.apache.spark.broadcast.Broadcast.destroy(Broadcast.scala:98)
at 
org.apache.spark.ShuffleStatus.invalidateSerializedMapOutputStatusCache(MapOutputTracker.scala:197)
at 
org.apache.spark.ShuffleStatus.removeMapOutput(MapOutputTracker.scala:105)
at 
org.apache.spark.MapOutputTrackerMaster.unregisterMapOutput(MapOutputTracker.scala:420)
at 
org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1324)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1715)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1673)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1662)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Caused by: java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at 
io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:313)
at io.netty.buffer.AbstractByteBuf.writeBytes(Abstract

[jira] [Created] (SPARK-21113) Support for read ahead input stream to amortize disk IO cost in the Spill reader

2017-06-15 Thread Sital Kedia (JIRA)
Sital Kedia created SPARK-21113:
---

 Summary: Support for read ahead input stream to amortize disk IO 
cost in the Spill reader
 Key: SPARK-21113
 URL: https://issues.apache.org/jira/browse/SPARK-21113
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 2.0.2
Reporter: Sital Kedia
Priority: Minor


Profiling some of our big jobs, we see that around 30% of the time is being 
spent in reading the spill files from disk. In order to amortize the disk IO 
cost, the idea is to implement a read ahead input stream which which 
asynchronously reads ahead from the underlying input stream when specified 
amount of data has been read from the current buffer. It does it by maintaining 
two buffer - active buffer and read ahead buffer. Active buffer contains data 
which should be returned when a read() call is issued. The read ahead buffer is 
used to asynchronously read from the underlying input stream and once the 
current active buffer is exhausted, we flip the two buffers so that we can 
start reading from the read ahead buffer without being blocked in disk I/O.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18838) High latency of event processing for large jobs

2017-06-12 Thread Sital Kedia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16046957#comment-16046957
 ] 

Sital Kedia commented on SPARK-18838:
-

[~joshrosen] - The PR for my change to multi-thread the event processor has 
been closed inadvertently as a stale PR. Can you take a look at the approach 
and let me know what you think. Please note that this change is very critical 
for running large jobs in our environment. 
https://github.com/apache/spark/pull/16291

> High latency of event processing for large jobs
> ---
>
> Key: SPARK-18838
> URL: https://issues.apache.org/jira/browse/SPARK-18838
> Project: Spark
>  Issue Type: Improvement
>Affects Versions: 2.0.0
>Reporter: Sital Kedia
> Attachments: perfResults.pdf, SparkListernerComputeTime.xlsx
>
>
> Currently we are observing the issue of very high event processing delay in 
> driver's `ListenerBus` for large jobs with many tasks. Many critical 
> component of the scheduler like `ExecutorAllocationManager`, 
> `HeartbeatReceiver` depend on the `ListenerBus` events and this delay might 
> hurt the job performance significantly or even fail the job.  For example, a 
> significant delay in receiving the `SparkListenerTaskStart` might cause 
> `ExecutorAllocationManager` manager to mistakenly remove an executor which is 
> not idle.  
> The problem is that the event processor in `ListenerBus` is a single thread 
> which loops through all the Listeners for each event and processes each event 
> synchronously 
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala#L94.
>  This single threaded processor often becomes the bottleneck for large jobs.  
> Also, if one of the Listener is very slow, all the listeners will pay the 
> price of delay incurred by the slow listener. In addition to that a slow 
> listener can cause events to be dropped from the event queue which might be 
> fatal to the job.
> To solve the above problems, we propose to get rid of the event queue and the 
> single threaded event processor. Instead each listener will have its own 
> dedicate single threaded executor service . When ever an event is posted, it 
> will be submitted to executor service of all the listeners. The Single 
> threaded executor service will guarantee in order processing of the events 
> per listener.  The queue used for the executor service will be bounded to 
> guarantee we do not grow the memory indefinitely. The downside of this 
> approach is separate event queue per listener will increase the driver memory 
> footprint. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20178) Improve Scheduler fetch failures

2017-05-30 Thread Sital Kedia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16030284#comment-16030284
 ] 

Sital Kedia commented on SPARK-20178:
-

https://github.com/apache/spark/pull/18150

> Improve Scheduler fetch failures
> 
>
> Key: SPARK-20178
> URL: https://issues.apache.org/jira/browse/SPARK-20178
> Project: Spark
>  Issue Type: Epic
>  Components: Scheduler
>Affects Versions: 2.1.0
>Reporter: Thomas Graves
>
> We have been having a lot of discussions around improving the handling of 
> fetch failures.  There are 4 jira currently related to this.  
> We should try to get a list of things we want to improve and come up with one 
> cohesive design.
> SPARK-20163,  SPARK-20091,  SPARK-14649 , and SPARK-19753
> I will put my initial thoughts in a follow on comment.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20178) Improve Scheduler fetch failures

2017-05-26 Thread Sital Kedia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16027036#comment-16027036
 ] 

Sital Kedia commented on SPARK-20178:
-

>> So to get the robustness for now I'm fine with just invalidating it 
>> immediately and see how that works.

[~tgraves] - Let me know if you want me to resurrect - 
https://github.com/apache/spark/pull/17088 which exactly does that. It was 
closed inadvertently as a stale PR. 

> Improve Scheduler fetch failures
> 
>
> Key: SPARK-20178
> URL: https://issues.apache.org/jira/browse/SPARK-20178
> Project: Spark
>  Issue Type: Epic
>  Components: Scheduler
>Affects Versions: 2.1.0
>Reporter: Thomas Graves
>
> We have been having a lot of discussions around improving the handling of 
> fetch failures.  There are 4 jira currently related to this.  
> We should try to get a list of things we want to improve and come up with one 
> cohesive design.
> SPARK-20163,  SPARK-20091,  SPARK-14649 , and SPARK-19753
> I will put my initial thoughts in a follow on comment.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18838) High latency of event processing for large jobs

2017-05-18 Thread Sital Kedia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16016217#comment-16016217
 ] 

Sital Kedia commented on SPARK-18838:
-

[~joshrosen] - >> Alternatively, we could use two queues, one for internal 
listeners and another for external ones. This wouldn't be as fine-grained as 
thread-per-listener but might buy us a lot of the benefits with perhaps less 
code needed.

Actually that is exactly what my PR is doing. 
https://github.com/apache/spark/pull/16291. I have not been able to work on it 
recently, but you can take a look and let me know how it looks. I can 
prioritize working on it. 

> High latency of event processing for large jobs
> ---
>
> Key: SPARK-18838
> URL: https://issues.apache.org/jira/browse/SPARK-18838
> Project: Spark
>  Issue Type: Improvement
>Affects Versions: 2.0.0
>Reporter: Sital Kedia
>
> Currently we are observing the issue of very high event processing delay in 
> driver's `ListenerBus` for large jobs with many tasks. Many critical 
> component of the scheduler like `ExecutorAllocationManager`, 
> `HeartbeatReceiver` depend on the `ListenerBus` events and this delay might 
> hurt the job performance significantly or even fail the job.  For example, a 
> significant delay in receiving the `SparkListenerTaskStart` might cause 
> `ExecutorAllocationManager` manager to mistakenly remove an executor which is 
> not idle.  
> The problem is that the event processor in `ListenerBus` is a single thread 
> which loops through all the Listeners for each event and processes each event 
> synchronously 
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala#L94.
>  This single threaded processor often becomes the bottleneck for large jobs.  
> Also, if one of the Listener is very slow, all the listeners will pay the 
> price of delay incurred by the slow listener. In addition to that a slow 
> listener can cause events to be dropped from the event queue which might be 
> fatal to the job.
> To solve the above problems, we propose to get rid of the event queue and the 
> single threaded event processor. Instead each listener will have its own 
> dedicate single threaded executor service . When ever an event is posted, it 
> will be submitted to executor service of all the listeners. The Single 
> threaded executor service will guarantee in order processing of the events 
> per listener.  The queue used for the executor service will be bounded to 
> guarantee we do not grow the memory indefinitely. The downside of this 
> approach is separate event queue per listener will increase the driver memory 
> footprint. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-20640) Make rpc timeout and retry for shuffle registration configurable

2017-05-08 Thread Sital Kedia (JIRA)
Sital Kedia created SPARK-20640:
---

 Summary: Make rpc timeout and retry for shuffle registration 
configurable
 Key: SPARK-20640
 URL: https://issues.apache.org/jira/browse/SPARK-20640
 Project: Spark
  Issue Type: Bug
  Components: Shuffle
Affects Versions: 2.0.2
Reporter: Sital Kedia


Currently the shuffle service registration timeout and retry has been hardcoded 
(see 
https://github.com/sitalkedia/spark/blob/master/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java#L144
 and 
https://github.com/sitalkedia/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L197).
 This works well for small workloads but under heavy workload when the shuffle 
service is busy transferring large amount of data we see significant delay in 
responding to the registration request, as a result we often see the executors 
fail to register with the shuffle service, eventually failing the job. We need 
to make these two parameters configurable.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20178) Improve Scheduler fetch failures

2017-04-05 Thread Sital Kedia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15957341#comment-15957341
 ] 

Sital Kedia commented on SPARK-20178:
-

[~tgraves] Thanks for creating the JIRA and driving the discussion

[~tgraves], [~imranr], [~markhamstra], [~kayousterhout] - . As discussed over 
the PR, I have created a doc consolidating the issues related to fetch failure 
and a high-level design goals for the scheduler 
https://docs.google.com/document/d/1D3b_ishMfm5sXmRS494JrOJmL9V_TRVZUB4TgK1l1fY/edit?usp=sharing.
  Please take a look and let me know what you think. Once we reach a consensus 
about the desired behavior and identify the changes that needs to be done, I 
can work on having more detailed design doc and also split the change into 
multiple small logical PRs.



> Improve Scheduler fetch failures
> 
>
> Key: SPARK-20178
> URL: https://issues.apache.org/jira/browse/SPARK-20178
> Project: Spark
>  Issue Type: Epic
>  Components: Scheduler
>Affects Versions: 2.1.0
>Reporter: Thomas Graves
>
> We have been having a lot of discussions around improving the handling of 
> fetch failures.  There are 4 jira currently related to this.  
> We should try to get a list of things we want to improve and come up with one 
> cohesive design.
> SPARK-20163,  SPARK-20091,  SPARK-14649 , and SPARK-19753
> I will put my initial thoughts in a follow on comment.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Closed] (SPARK-20163) Kill all running tasks in a stage in case of fetch failure

2017-03-31 Thread Sital Kedia (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sital Kedia closed SPARK-20163.
---
Resolution: Duplicate

> Kill all running tasks in a stage in case of fetch failure
> --
>
> Key: SPARK-20163
> URL: https://issues.apache.org/jira/browse/SPARK-20163
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler
>Affects Versions: 2.0.1
>Reporter: Sital Kedia
>
> Currently, the scheduler does not kill the running tasks in a stage when it 
> encounters fetch failure, as a result, we might end up running many duplicate 
> tasks in the cluster. There is already a TODO in TaskSetManager to kill all 
> running tasks which has not been implemented.  



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20163) Kill all running tasks in a stage in case of fetch failure

2017-03-31 Thread Sital Kedia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15951738#comment-15951738
 ] 

Sital Kedia commented on SPARK-20163:
-

Thanks [~imranr], closing this as this is duplicate of  SPARK-2666. 

> Kill all running tasks in a stage in case of fetch failure
> --
>
> Key: SPARK-20163
> URL: https://issues.apache.org/jira/browse/SPARK-20163
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler
>Affects Versions: 2.0.1
>Reporter: Sital Kedia
>
> Currently, the scheduler does not kill the running tasks in a stage when it 
> encounters fetch failure, as a result, we might end up running many duplicate 
> tasks in the cluster. There is already a TODO in TaskSetManager to kill all 
> running tasks which has not been implemented.  



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-20163) Kill all running tasks in a stage in case of fetch failure

2017-03-30 Thread Sital Kedia (JIRA)
Sital Kedia created SPARK-20163:
---

 Summary: Kill all running tasks in a stage in case of fetch failure
 Key: SPARK-20163
 URL: https://issues.apache.org/jira/browse/SPARK-20163
 Project: Spark
  Issue Type: Bug
  Components: Scheduler
Affects Versions: 2.0.1
Reporter: Sital Kedia


Currently, the scheduler does not kill the running tasks in a stage when it 
encounters fetch failure, as a result, we might end up running many duplicate 
tasks in the cluster. There is already a TODO in TaskSetManager to kill all 
running tasks which has not been implemented.  



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3577) Add task metric to report spill time

2017-03-29 Thread Sital Kedia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15947669#comment-15947669
 ] 

Sital Kedia commented on SPARK-3577:


I am making a change to report correct spill data size on disk. 

> Add task metric to report spill time
> 
>
> Key: SPARK-3577
> URL: https://issues.apache.org/jira/browse/SPARK-3577
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle, Spark Core
>Affects Versions: 1.1.0
>Reporter: Kay Ousterhout
>Priority: Minor
> Attachments: spill_size.jpg
>
>
> The {{ExternalSorter}} passes its own {{ShuffleWriteMetrics}} into 
> {{ExternalSorter}}.  The write time recorded in those metrics is never used.  
> We should probably add task metrics to report this spill time, since for 
> shuffles, this would have previously been reported as part of shuffle write 
> time (with the original hash-based sorter).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-20091) DagScheduler should allow running concurrent attempts of a stage in case of multiple fetch failure

2017-03-24 Thread Sital Kedia (JIRA)
Sital Kedia created SPARK-20091:
---

 Summary: DagScheduler should allow running concurrent attempts of 
a stage in case of multiple fetch failure
 Key: SPARK-20091
 URL: https://issues.apache.org/jira/browse/SPARK-20091
 Project: Spark
  Issue Type: Improvement
  Components: Scheduler
Affects Versions: 2.0.1
Reporter: Sital Kedia


Currently, the Dag scheduler does not allow running concurrent attempts of a 
stage in case of multiple fetch failure. As a result, in case of multipe fetch 
failures are detected, serial execution of map stage delays the job run 
significantly.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-20074) Make buffer size in unsafe external sorter configurable

2017-03-23 Thread Sital Kedia (JIRA)
Sital Kedia created SPARK-20074:
---

 Summary: Make buffer size in unsafe external sorter configurable
 Key: SPARK-20074
 URL: https://issues.apache.org/jira/browse/SPARK-20074
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 2.0.1
Reporter: Sital Kedia


Currently, it is hardcoded to 32kb, see - 
https://github.com/sitalkedia/spark/blob/master/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java#L123



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-20014) Optimize mergeSpillsWithFileStream method

2017-03-18 Thread Sital Kedia (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20014?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sital Kedia updated SPARK-20014:

Description: 
When the individual partition size in a spill is small, 
mergeSpillsWithTransferTo method does many small disk ios which is really 
inefficient. One way to improve the performance will be to use 
mergeSpillsWithFileStream method by turning off transfer to and using buffered 
file read/write to improve the io throughput. 
However, the current implementation of mergeSpillsWithFileStream does not do a 
buffer read/write of the files and in addition to that it unnecessarily flushes 
the output files for each partitions.  

  was:
When the individual partition size in a spill is small, 
mergeSpillsWithTransferTo method does many small disk ios which is really 
inefficient. One way to improve the performance will be to use 
mergeSpillsWithFileStream method by turning off transfer to and using buffered 
file read/write to improve the io throughput. 
However, the current implementation of mergeSpillsWithFileStream does not do a 
buffer read/write of the files and in addition to that it unnecessarily 
opens/close the output files for each partitions.  


> Optimize mergeSpillsWithFileStream method
> -
>
> Key: SPARK-20014
> URL: https://issues.apache.org/jira/browse/SPARK-20014
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.0.0
>Reporter: Sital Kedia
>
> When the individual partition size in a spill is small, 
> mergeSpillsWithTransferTo method does many small disk ios which is really 
> inefficient. One way to improve the performance will be to use 
> mergeSpillsWithFileStream method by turning off transfer to and using 
> buffered file read/write to improve the io throughput. 
> However, the current implementation of mergeSpillsWithFileStream does not do 
> a buffer read/write of the files and in addition to that it unnecessarily 
> flushes the output files for each partitions.  



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-20014) Optimize mergeSpillsWithFileStream method

2017-03-18 Thread Sital Kedia (JIRA)
Sital Kedia created SPARK-20014:
---

 Summary: Optimize mergeSpillsWithFileStream method
 Key: SPARK-20014
 URL: https://issues.apache.org/jira/browse/SPARK-20014
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 2.0.0
Reporter: Sital Kedia


When the individual partition size in a spill is small, 
mergeSpillsWithTransferTo method does many small disk ios which is really 
inefficient. One way to improve the performance will be to use 
mergeSpillsWithFileStream method by turning off transfer to and using buffered 
file read/write to improve the io throughput. 
However, the current implementation of mergeSpillsWithFileStream does not do a 
buffer read/write of the files and in addition to that it unnecessarily 
opens/close the output files for each partitions.  



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19837) Fetch failure throws a SparkException in SparkHiveWriter

2017-03-06 Thread Sital Kedia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15897961#comment-15897961
 ] 

Sital Kedia commented on SPARK-19837:
-

`SparkHiveDynamicPartitionWriterContainer` has been refactored in latest 
master. Not sure if the issue still exists. Will close this JIRA and reopen if 
we still see issue with latest. 

> Fetch failure throws a SparkException in SparkHiveWriter
> 
>
> Key: SPARK-19837
> URL: https://issues.apache.org/jira/browse/SPARK-19837
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.0.1
>Reporter: Sital Kedia
>
> Currently Fetchfailure in SparkHiveWriter fails the job with following 
> exception
> {code}
> 0_0): org.apache.spark.SparkException: Task failed while writing rows.
> at 
> org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer.writeToFile(hiveWriterContainers.scala:385)
> at 
> org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:84)
> at 
> org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:84)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> at org.apache.spark.scheduler.Task.run(Task.scala:86)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.spark.shuffle.FetchFailedException: Connection reset by 
> peer
> at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:357)
> at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:332)
> at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:54)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
> at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown
>  Source)
> at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
> at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
> at 
> org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:83)
> at 
> org.apache.spark.sql.execution.joins.SortMergeJoinScanner.advancedStreamed(SortMergeJoinExec.scala:731)
> at 
> org.apache.spark.sql.execution.joins.SortMergeJoinScanner.findNextOuterJoinRows(SortMergeJoinExec.scala:692)
> at 
> org.apache.spark.sql.execution.joins.OneSideOuterIterator.advanceStream(SortMergeJoinExec.scala:854)
> at 
> org.apache.spark.sql.execution.joins.OneSideOuterIterator.advanceNext(SortMergeJoinExec.scala:887)
> at 
> org.apache.spark.sql.execution.RowIteratorToScala.hasNext(RowIterator.scala:68)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
> at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:211)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
> at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:211)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at 
> org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer.writeToFile(hiveWriterContainers.scala:343)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsu

[jira] [Closed] (SPARK-19837) Fetch failure throws a SparkException in SparkHiveWriter

2017-03-06 Thread Sital Kedia (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sital Kedia closed SPARK-19837.
---
Resolution: Fixed

> Fetch failure throws a SparkException in SparkHiveWriter
> 
>
> Key: SPARK-19837
> URL: https://issues.apache.org/jira/browse/SPARK-19837
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.0.1
>Reporter: Sital Kedia
>
> Currently Fetchfailure in SparkHiveWriter fails the job with following 
> exception
> {code}
> 0_0): org.apache.spark.SparkException: Task failed while writing rows.
> at 
> org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer.writeToFile(hiveWriterContainers.scala:385)
> at 
> org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:84)
> at 
> org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:84)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> at org.apache.spark.scheduler.Task.run(Task.scala:86)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.spark.shuffle.FetchFailedException: Connection reset by 
> peer
> at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:357)
> at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:332)
> at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:54)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
> at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown
>  Source)
> at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
> at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
> at 
> org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:83)
> at 
> org.apache.spark.sql.execution.joins.SortMergeJoinScanner.advancedStreamed(SortMergeJoinExec.scala:731)
> at 
> org.apache.spark.sql.execution.joins.SortMergeJoinScanner.findNextOuterJoinRows(SortMergeJoinExec.scala:692)
> at 
> org.apache.spark.sql.execution.joins.OneSideOuterIterator.advanceStream(SortMergeJoinExec.scala:854)
> at 
> org.apache.spark.sql.execution.joins.OneSideOuterIterator.advanceNext(SortMergeJoinExec.scala:887)
> at 
> org.apache.spark.sql.execution.RowIteratorToScala.hasNext(RowIterator.scala:68)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
> at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:211)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
> at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:211)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at 
> org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer.writeToFile(hiveWriterContainers.scala:343)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-19837) Fetch failure throws a SparkException in SparkHiveWriter

2017-03-06 Thread Sital Kedia (JIRA)
Sital Kedia created SPARK-19837:
---

 Summary: Fetch failure throws a SparkException in SparkHiveWriter
 Key: SPARK-19837
 URL: https://issues.apache.org/jira/browse/SPARK-19837
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.0.1
Reporter: Sital Kedia


Currently Fetchfailure in SparkHiveWriter fails the job with following exception

{code}
0_0): org.apache.spark.SparkException: Task failed while writing rows.
at 
org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer.writeToFile(hiveWriterContainers.scala:385)
at 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:84)
at 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:84)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.shuffle.FetchFailedException: Connection reset by 
peer
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:357)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:332)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:54)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at 
org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:83)
at 
org.apache.spark.sql.execution.joins.SortMergeJoinScanner.advancedStreamed(SortMergeJoinExec.scala:731)
at 
org.apache.spark.sql.execution.joins.SortMergeJoinScanner.findNextOuterJoinRows(SortMergeJoinExec.scala:692)
at 
org.apache.spark.sql.execution.joins.OneSideOuterIterator.advanceStream(SortMergeJoinExec.scala:854)
at 
org.apache.spark.sql.execution.joins.OneSideOuterIterator.advanceNext(SortMergeJoinExec.scala:887)
at 
org.apache.spark.sql.execution.RowIteratorToScala.hasNext(RowIterator.scala:68)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:211)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:211)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at 
org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer.writeToFile(hiveWriterContainers.scala:343)
{code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-19803) Flaky BlockManagerProactiveReplicationSuite tests

2017-03-02 Thread Sital Kedia (JIRA)
Sital Kedia created SPARK-19803:
---

 Summary: Flaky BlockManagerProactiveReplicationSuite tests
 Key: SPARK-19803
 URL: https://issues.apache.org/jira/browse/SPARK-19803
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.3.0
Reporter: Sital Kedia


The tests added for BlockManagerProactiveReplicationSuite has made the jenkins 
build flaky. Please refer to the build for more details - 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73640/testReport/



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-19753) Remove all shuffle files on a host in case of slave lost of fetch failure

2017-02-27 Thread Sital Kedia (JIRA)
Sital Kedia created SPARK-19753:
---

 Summary: Remove all shuffle files on a host in case of slave lost 
of fetch failure
 Key: SPARK-19753
 URL: https://issues.apache.org/jira/browse/SPARK-19753
 Project: Spark
  Issue Type: Bug
  Components: Scheduler
Affects Versions: 2.0.1
Reporter: Sital Kedia


Currently, when we detect fetch failure, we only remove the shuffle files 
produced by the executor, while the host itself might be down and all the 
shuffle files are not accessible. In case we are running multiple executors on 
a host, any host going down currently results in multiple fetch failures and 
multiple retries of the stage, which is very inefficient. If we remove all the 
shuffle files on that host, on first fetch failure, we can rerun all the tasks 
on that host in a single stage retry. 





--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18838) High latency of event processing for large jobs

2016-12-15 Thread Sital Kedia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15753535#comment-15753535
 ] 

Sital Kedia commented on SPARK-18838:
-

cc - [~kayousterhout]

> High latency of event processing for large jobs
> ---
>
> Key: SPARK-18838
> URL: https://issues.apache.org/jira/browse/SPARK-18838
> Project: Spark
>  Issue Type: Improvement
>Affects Versions: 2.0.0
>Reporter: Sital Kedia
>
> Currently we are observing the issue of very high event processing delay in 
> driver's `ListenerBus` for large jobs with many tasks. Many critical 
> component of the scheduler like `ExecutorAllocationManager`, 
> `HeartbeatReceiver` depend on the `ListenerBus` events and this delay might 
> hurt the job performance significantly or even fail the job.  For example, a 
> significant delay in receiving the `SparkListenerTaskStart` might cause 
> `ExecutorAllocationManager` manager to mistakenly remove an executor which is 
> not idle.  
> The problem is that the event processor in `ListenerBus` is a single thread 
> which loops through all the Listeners for each event and processes each event 
> synchronously 
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala#L94.
>  This single threaded processor often becomes the bottleneck for large jobs.  
> Also, if one of the Listener is very slow, all the listeners will pay the 
> price of delay incurred by the slow listener. In addition to that a slow 
> listener can cause events to be dropped from the event queue which might be 
> fatal to the job.
> To solve the above problems, we propose to get rid of the event queue and the 
> single threaded event processor. Instead each listener will have its own 
> dedicate single threaded executor service . When ever an event is posted, it 
> will be submitted to executor service of all the listeners. The Single 
> threaded executor service will guarantee in order processing of the events 
> per listener.  The queue used for the executor service will be bounded to 
> guarantee we do not grow the memory indefinitely. The downside of this 
> approach is separate event queue per listener will increase the driver memory 
> footprint. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18838) High latency of event processing for large jobs

2016-12-15 Thread Sital Kedia (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sital Kedia updated SPARK-18838:

Description: 
Currently we are observing the issue of very high event processing delay in 
driver's `ListenerBus` for large jobs with many tasks. Many critical component 
of the scheduler like `ExecutorAllocationManager`, `HeartbeatReceiver` depend 
on the `ListenerBus` events and this delay might hurt the job performance 
significantly or even fail the job.  For example, a significant delay in 
receiving the `SparkListenerTaskStart` might cause `ExecutorAllocationManager` 
manager to mistakenly remove an executor which is not idle.  

The problem is that the event processor in `ListenerBus` is a single thread 
which loops through all the Listeners for each event and processes each event 
synchronously 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala#L94.
 This single threaded processor often becomes the bottleneck for large jobs.  
Also, if one of the Listener is very slow, all the listeners will pay the price 
of delay incurred by the slow listener. In addition to that a slow listener can 
cause events to be dropped from the event queue which might be fatal to the job.

To solve the above problems, we propose to get rid of the event queue and the 
single threaded event processor. Instead each listener will have its own 
dedicate single threaded executor service . When ever an event is posted, it 
will be submitted to executor service of all the listeners. The Single threaded 
executor service will guarantee in order processing of the events per listener. 
 The queue used for the executor service will be bounded to guarantee we do not 
grow the memory indefinitely. The downside of this approach is separate event 
queue per listener will increase the driver memory footprint. 




  was:
Currently we are observing the issue of very high event processing delay in 
driver's `ListenerBus` for large jobs with many tasks. Many critical component 
of the scheduler like `ExecutorAllocationManager`, `HeartbeatReceiver` depend 
on the `ListenerBus` events and these delay is causing job failure. For 
example, a significant delay in receiving the `SparkListenerTaskStart` might 
cause `ExecutorAllocationManager` manager to remove an executor which is not 
idle.  The event processor in `ListenerBus` is a single thread which loops 
through all the Listeners for each event and processes each event synchronously 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala#L94.
 
The single threaded processor often becomes the bottleneck for large jobs.  In 
addition to that, if one of the Listener is very slow, all the listeners will 
pay the price of delay incurred by the slow listener. 

To solve the above problems, we plan to have a per listener single threaded 
executor service and separate event queue. That way we are not bottlenecked by 
the single threaded event processor and also critical listeners will not be 
penalized by the slow listeners. The downside of this approach is separate 
event queue per listener will increase the driver memory footprint. 





> High latency of event processing for large jobs
> ---
>
> Key: SPARK-18838
> URL: https://issues.apache.org/jira/browse/SPARK-18838
> Project: Spark
>  Issue Type: Improvement
>Affects Versions: 2.0.0
>Reporter: Sital Kedia
>
> Currently we are observing the issue of very high event processing delay in 
> driver's `ListenerBus` for large jobs with many tasks. Many critical 
> component of the scheduler like `ExecutorAllocationManager`, 
> `HeartbeatReceiver` depend on the `ListenerBus` events and this delay might 
> hurt the job performance significantly or even fail the job.  For example, a 
> significant delay in receiving the `SparkListenerTaskStart` might cause 
> `ExecutorAllocationManager` manager to mistakenly remove an executor which is 
> not idle.  
> The problem is that the event processor in `ListenerBus` is a single thread 
> which loops through all the Listeners for each event and processes each event 
> synchronously 
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala#L94.
>  This single threaded processor often becomes the bottleneck for large jobs.  
> Also, if one of the Listener is very slow, all the listeners will pay the 
> price of delay incurred by the slow listener. In addition to that a slow 
> listener can cause events to be dropped from the event queue which might be 
> fatal to the job.
> To solve the above problems, we propose to get rid of the event queue and the 
> single threaded event processor. Instead each listener will have its own 
> dedic

[jira] [Commented] (SPARK-18838) High latency of event processing for large jobs

2016-12-14 Thread Sital Kedia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15750217#comment-15750217
 ] 

Sital Kedia commented on SPARK-18838:
-

[~zsxwing] - Its not only the ExecutorAllocationManager, other critical 
listeners like HeartbeatReceiver also depend on it.  In addition to that there 
might be some latency sensitive user added listener. Making the event 
processing faster by multi-threading will fix all theses issues. I have an 
initial version of the PR for this, would appreciate if you can take a look and 
give feedback on the overall design. 

> High latency of event processing for large jobs
> ---
>
> Key: SPARK-18838
> URL: https://issues.apache.org/jira/browse/SPARK-18838
> Project: Spark
>  Issue Type: Improvement
>Affects Versions: 2.0.0
>Reporter: Sital Kedia
>
> Currently we are observing the issue of very high event processing delay in 
> driver's `ListenerBus` for large jobs with many tasks. Many critical 
> component of the scheduler like `ExecutorAllocationManager`, 
> `HeartbeatReceiver` depend on the `ListenerBus` events and these delay is 
> causing job failure. For example, a significant delay in receiving the 
> `SparkListenerTaskStart` might cause `ExecutorAllocationManager` manager to 
> remove an executor which is not idle.  The event processor in `ListenerBus` 
> is a single thread which loops through all the Listeners for each event and 
> processes each event synchronously 
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala#L94.
>  
> The single threaded processor often becomes the bottleneck for large jobs.  
> In addition to that, if one of the Listener is very slow, all the listeners 
> will pay the price of delay incurred by the slow listener. 
> To solve the above problems, we plan to have a per listener single threaded 
> executor service and separate event queue. That way we are not bottlenecked 
> by the single threaded event processor and also critical listeners will not 
> be penalized by the slow listeners. The downside of this approach is separate 
> event queue per listener will increase the driver memory footprint. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18838) High latency of event processing for large jobs

2016-12-12 Thread Sital Kedia (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sital Kedia updated SPARK-18838:

Description: 
Currently we are observing the issue of very high event processing delay in 
driver's `ListenerBus` for large jobs with many tasks. Many critical component 
of the scheduler like `ExecutorAllocationManager`, `HeartbeatReceiver` depend 
on the `ListenerBus` events and these delay is causing job failure. For 
example, a significant delay in receiving the `SparkListenerTaskStart` might 
cause `ExecutorAllocationManager` manager to remove an executor which is not 
idle.  The event processor in `ListenerBus` is a single thread which loops 
through all the Listeners for each event and processes each event synchronously 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala#L94.
 
The single threaded processor often becomes the bottleneck for large jobs.  In 
addition to that, if one of the Listener is very slow, all the listeners will 
pay the price of delay incurred by the slow listener. 

To solve the above problems, we plan to have a per listener single threaded 
executor service and separate event queue. That way we are not bottlenecked by 
the single threaded event processor and also critical listeners will not be 
penalized by the slow listeners. The downside of this approach is separate 
event queue per listener will increase the driver memory footprint. 




  was:
Currently we are observing the issue of very high event processing delay in 
driver's `ListenerBus` for large jobs with many tasks. Many critical component 
of the scheduler like `ExecutorAllocationManager`, `HeartbeatReceiver` depend 
on the `ListenerBus` events and these delay is causing job failure. For 
example, a significant delay in receiving the `SparkListenerTaskStart` might 
cause `ExecutorAllocationManager` manager to remove an executor which is not 
idle.  The event processor in `ListenerBus` is a single thread which loops 
through all the Listeners for each event and processes each event synchronously 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala#L94.
 
The single threaded processor often becomes the bottleneck for large jobs.  In 
addition to that, if one of the Listener is very slow, all the listeners will 
pay the price of delay incurred by the slow listener. 

To solve the above problems, we plan to have a single threaded executor service 
and separate event queue per listener. That way we are not bottlenecked by the 
single threaded processor and also critical listeners will not be penalized by 
the slow listeners. The downside of this approach is separate event queue per 
listener will increase the driver memory footprint. 





> High latency of event processing for large jobs
> ---
>
> Key: SPARK-18838
> URL: https://issues.apache.org/jira/browse/SPARK-18838
> Project: Spark
>  Issue Type: Improvement
>Affects Versions: 2.0.0
>Reporter: Sital Kedia
>
> Currently we are observing the issue of very high event processing delay in 
> driver's `ListenerBus` for large jobs with many tasks. Many critical 
> component of the scheduler like `ExecutorAllocationManager`, 
> `HeartbeatReceiver` depend on the `ListenerBus` events and these delay is 
> causing job failure. For example, a significant delay in receiving the 
> `SparkListenerTaskStart` might cause `ExecutorAllocationManager` manager to 
> remove an executor which is not idle.  The event processor in `ListenerBus` 
> is a single thread which loops through all the Listeners for each event and 
> processes each event synchronously 
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala#L94.
>  
> The single threaded processor often becomes the bottleneck for large jobs.  
> In addition to that, if one of the Listener is very slow, all the listeners 
> will pay the price of delay incurred by the slow listener. 
> To solve the above problems, we plan to have a per listener single threaded 
> executor service and separate event queue. That way we are not bottlenecked 
> by the single threaded event processor and also critical listeners will not 
> be penalized by the slow listeners. The downside of this approach is separate 
> event queue per listener will increase the driver memory footprint. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18838) High latency of event processing for large jobs

2016-12-12 Thread Sital Kedia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15744086#comment-15744086
 ] 

Sital Kedia commented on SPARK-18838:
-

[~rxin], [~zsxwing] - Any thoughts on this? 

> High latency of event processing for large jobs
> ---
>
> Key: SPARK-18838
> URL: https://issues.apache.org/jira/browse/SPARK-18838
> Project: Spark
>  Issue Type: Improvement
>Affects Versions: 2.0.0
>Reporter: Sital Kedia
>
> Currently we are observing the issue of very high event processing delay in 
> driver's `ListenerBus` for large jobs with many tasks. Many critical 
> component of the scheduler like `ExecutorAllocationManager`, 
> `HeartbeatReceiver` depend on the `ListenerBus` events and these delay is 
> causing job failure. For example, a significant delay in receiving the 
> `SparkListenerTaskStart` might cause `ExecutorAllocationManager` manager to 
> remove an executor which is not idle.  The event processor in `ListenerBus` 
> is a single thread which loops through all the Listeners for each event and 
> processes each event synchronously 
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala#L94.
>  
> The single threaded processor often becomes the bottleneck for large jobs.  
> In addition to that, if one of the Listener is very slow, all the listeners 
> will pay the price of delay incurred by the slow listener. 
> To solve the above problems, we plan to have a single threaded executor 
> service and separate event queue per listener. That way we are not 
> bottlenecked by the single threaded processor and also critical listeners 
> will not be penalized by the slow listeners. The downside of this approach is 
> separate event queue per listener will increase the driver memory footprint. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18838) High latency of event processing for large jobs

2016-12-12 Thread Sital Kedia (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sital Kedia updated SPARK-18838:

Description: 
Currently we are observing the issue of very high event processing delay in 
driver's `ListenerBus` for large jobs with many tasks. Many critical component 
of the scheduler like `ExecutorAllocationManager`, `HeartbeatReceiver` depend 
on the `ListenerBus` events and these delay is causing job failure. For 
example, a significant delay in receiving the `SparkListenerTaskStart` might 
cause `ExecutorAllocationManager` manager to remove an executor which is not 
idle.  The event processor in `ListenerBus` is a single thread which loops 
through all the Listeners for each event and processes each event synchronously 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala#L94.
 
The single threaded processor often becomes the bottleneck for large jobs.  In 
addition to that, if one of the Listener is very slow, all the listeners will 
pay the price of delay incurred by the slow listener. 

To solve the above problems, we plan to have a single threaded executor service 
and separate event queue per listener. That way we are not bottlenecked by the 
single threaded processor and also critical listeners will not be penalized by 
the slow listeners. The downside of this approach is separate event queue per 
listener will increase the driver memory footprint. 




  was:
Currently we are observing the issue of very high event processing delay in 
driver's `ListenerBus` for large jobs with many tasks. Many critical component 
of the scheduler like `ExecutorAllocationManager`, `HeartbeatReceiver` depend 
on the `ListenerBus` events and these delay is causing job failure. For 
example, a significant delay in receiving the `SparkListenerTaskStart` might 
cause `ExecutorAllocationManager` manager to remove an executor which is not 
idle.  The event processor in `ListenerBus` is a single thread which loops 
through all the Listeners for each event and processes each event 
synchronously. The single threaded processor often becomes the bottleneck for 
large jobs.  In addition to that, if one of the Listener is very slow, all the 
listeners will pay the price of delay incurred by the slow listener. 

To solve the above problems, we plan to have a single threaded executor service 
and separate event queue per listener. That way we are not bottlenecked by the 
single threaded processor and also critical listeners will not be penalized by 
the slow listeners.





> High latency of event processing for large jobs
> ---
>
> Key: SPARK-18838
> URL: https://issues.apache.org/jira/browse/SPARK-18838
> Project: Spark
>  Issue Type: Improvement
>Affects Versions: 2.0.0
>Reporter: Sital Kedia
>
> Currently we are observing the issue of very high event processing delay in 
> driver's `ListenerBus` for large jobs with many tasks. Many critical 
> component of the scheduler like `ExecutorAllocationManager`, 
> `HeartbeatReceiver` depend on the `ListenerBus` events and these delay is 
> causing job failure. For example, a significant delay in receiving the 
> `SparkListenerTaskStart` might cause `ExecutorAllocationManager` manager to 
> remove an executor which is not idle.  The event processor in `ListenerBus` 
> is a single thread which loops through all the Listeners for each event and 
> processes each event synchronously 
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala#L94.
>  
> The single threaded processor often becomes the bottleneck for large jobs.  
> In addition to that, if one of the Listener is very slow, all the listeners 
> will pay the price of delay incurred by the slow listener. 
> To solve the above problems, we plan to have a single threaded executor 
> service and separate event queue per listener. That way we are not 
> bottlenecked by the single threaded processor and also critical listeners 
> will not be penalized by the slow listeners. The downside of this approach is 
> separate event queue per listener will increase the driver memory footprint. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-18838) High latency of event processing for large jobs

2016-12-12 Thread Sital Kedia (JIRA)
Sital Kedia created SPARK-18838:
---

 Summary: High latency of event processing for large jobs
 Key: SPARK-18838
 URL: https://issues.apache.org/jira/browse/SPARK-18838
 Project: Spark
  Issue Type: Improvement
Affects Versions: 2.0.0
Reporter: Sital Kedia


Currently we are observing the issue of very high event processing delay in 
driver's `ListenerBus` for large jobs with many tasks. Many critical component 
of the scheduler like `ExecutorAllocationManager`, `HeartbeatReceiver` depend 
on the `ListenerBus` events and these delay is causing job failure. For 
example, a significant delay in receiving the `SparkListenerTaskStart` might 
cause `ExecutorAllocationManager` manager to remove an executor which is not 
idle.  The event processor in `ListenerBus` is a single thread which loops 
through all the Listeners for each event and processes each event 
synchronously. The single threaded processor often becomes the bottleneck for 
large jobs.  In addition to that, if one of the Listener is very slow, all the 
listeners will pay the price of delay incurred by the slow listener. 

To solve the above problems, we plan to have a single threaded executor service 
and separate event queue per listener. That way we are not bottlenecked by the 
single threaded processor and also critical listeners will not be penalized by 
the slow listeners.






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-13510) Shuffle may throw FetchFailedException: Direct buffer memory

2016-11-16 Thread Sital Kedia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15671415#comment-15671415
 ] 

Sital Kedia commented on SPARK-13510:
-

[~shenhong] - We are seeing the same issue on our side. Do you have a PR for 
this yet? 

> Shuffle may throw FetchFailedException: Direct buffer memory
> 
>
> Key: SPARK-13510
> URL: https://issues.apache.org/jira/browse/SPARK-13510
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.6.0
>Reporter: Hong Shen
> Attachments: spark-13510.diff
>
>
> In our cluster, when I test spark-1.6.0 with a sql, it throw exception and 
> failed.
> {code}
> 16/02/17 15:36:03 INFO storage.ShuffleBlockFetcherIterator: Sending request 
> for 1 blocks (915.4 MB) from 10.196.134.220:7337
> 16/02/17 15:36:03 INFO shuffle.ExternalShuffleClient: External shuffle fetch 
> from 10.196.134.220:7337 (executor id 122)
> 16/02/17 15:36:03 INFO client.TransportClient: Sending fetch chunk request 0 
> to /10.196.134.220:7337
> 16/02/17 15:36:36 WARN server.TransportChannelHandler: Exception in 
> connection from /10.196.134.220:7337
> java.lang.OutOfMemoryError: Direct buffer memory
>   at java.nio.Bits.reserveMemory(Bits.java:658)
>   at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123)
>   at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)
>   at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:645)
>   at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:228)
>   at io.netty.buffer.PoolArena.allocate(PoolArena.java:212)
>   at io.netty.buffer.PoolArena.allocate(PoolArena.java:132)
>   at 
> io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:271)
>   at 
> io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:155)
>   at 
> io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:146)
>   at 
> io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:107)
>   at 
> io.netty.channel.AdaptiveRecvByteBufAllocator$HandleImpl.allocate(AdaptiveRecvByteBufAllocator.java:104)
>   at 
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:117)
>   at 
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>   at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>   at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>   at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>   at 
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
>   at java.lang.Thread.run(Thread.java:744)
> 16/02/17 15:36:36 ERROR client.TransportResponseHandler: Still have 1 
> requests outstanding when connection from /10.196.134.220:7337 is closed
> 16/02/17 15:36:36 ERROR shuffle.RetryingBlockFetcher: Failed to fetch block 
> shuffle_3_81_2, and will not retry (0 retries)
> {code}
>   The reason is that when shuffle a big block(like 1G), task will allocate 
> the same memory, it will easily throw "FetchFailedException: Direct buffer 
> memory".
>   If I add -Dio.netty.noUnsafe=true spark.executor.extraJavaOptions, it will 
> throw 
> {code}
> java.lang.OutOfMemoryError: Java heap space
> at 
> io.netty.buffer.PoolArena$HeapArena.newUnpooledChunk(PoolArena.java:607)
> at io.netty.buffer.PoolArena.allocateHuge(PoolArena.java:237)
> at io.netty.buffer.PoolArena.allocate(PoolArena.java:215)
> at io.netty.buffer.PoolArena.allocate(PoolArena.java:132)
> {code}
>   
>   In mapreduce shuffle, it will firstly judge whether the block can cache in 
> memery, but spark doesn't. 
>   If the block is more than we can cache in memory, we  should write to disk.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-16827) Stop reporting spill metrics as shuffle metrics

2016-10-12 Thread Sital Kedia (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-16827?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sital Kedia updated SPARK-16827:

Summary: Stop reporting spill metrics as shuffle metrics  (was: Query with 
Join produces excessive amount of shuffle data)

> Stop reporting spill metrics as shuffle metrics
> ---
>
> Key: SPARK-16827
> URL: https://issues.apache.org/jira/browse/SPARK-16827
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle, Spark Core
>Affects Versions: 2.0.0
>Reporter: Sital Kedia
>  Labels: performance
>
> One of our hive job which looks like this -
> {code}
>  SELECT  userid
>  FROM  table1 a
>  JOIN table2 b
>   ONa.ds = '2016-07-15'
>   AND  b.ds = '2016-07-15'
>   AND  a.source_id = b.id
> {code}
> After upgrade to Spark 2.0 the job is significantly slow.  Digging a little 
> into it, we found out that one of the stages produces excessive amount of 
> shuffle data.  Please note that this is a regression from Spark 1.6. Stage 2 
> of the job which used to produce 32KB shuffle data with 1.6, now produces 
> more than 400GB with Spark 2.0. We also tried turning off whole stage code 
> generation but that did not help. 
> PS - Even if the intermediate shuffle data size is huge, the job still 
> produces accurate output.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-17839) Use Nio's directbuffer instead of BufferedInputStream in order to avoid additional copy from os buffer cache to user buffer

2016-10-10 Thread Sital Kedia (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17839?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sital Kedia updated SPARK-17839:

Summary: Use Nio's directbuffer instead of BufferedInputStream in order to 
avoid additional copy from os buffer cache to user buffer   (was: 
UnsafeSorterSpillReader should use Nio's directbuffer to read the spill files 
in order to avoid additional copy)

> Use Nio's directbuffer instead of BufferedInputStream in order to avoid 
> additional copy from os buffer cache to user buffer 
> 
>
> Key: SPARK-17839
> URL: https://issues.apache.org/jira/browse/SPARK-17839
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle
>Affects Versions: 2.0.1
>Reporter: Sital Kedia
>Priority: Minor
>
> Currently we use BufferedInputStream to read the shuffle file which copies 
> the file content from os buffer cache to the user buffer. This adds 
> additional latency in reading the spill files. We made a change to use java 
> nio's direct buffer to read the spill files and for certain jobs spilling 
> significant amount of data, we see between 5 - 7% speedup.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-17839) UnsafeSorterSpillReader should use Nio's directbuffer to read the spill files in order to avoid additional copy

2016-10-09 Thread Sital Kedia (JIRA)
Sital Kedia created SPARK-17839:
---

 Summary: UnsafeSorterSpillReader should use Nio's directbuffer to 
read the spill files in order to avoid additional copy
 Key: SPARK-17839
 URL: https://issues.apache.org/jira/browse/SPARK-17839
 Project: Spark
  Issue Type: Improvement
  Components: Shuffle
Affects Versions: 2.0.1
Reporter: Sital Kedia
Priority: Minor


Currently we use BufferedInputStream to read the shuffle file which copies the 
file content from os buffer cache to the user buffer. This adds additional 
latency in reading the spill files. We made a change to use java nio's direct 
buffer to read the spill files and for certain jobs spilling significant amount 
of data, we see between 5 - 7% speedup.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-17509) When wrapping catalyst datatype to Hive data type avoid pattern matching

2016-09-12 Thread Sital Kedia (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sital Kedia updated SPARK-17509:

Affects Version/s: 2.0.0
  Description: Profiling a job, we saw that patten matching in wrap 
function of HiveInspector is consuming around 10% of the time which can be 
avoided.  A similar change in the unwrap function was made in SPARK-15956. 
  Component/s: SQL
   Issue Type: Improvement  (was: Bug)

> When wrapping catalyst datatype to Hive data type avoid pattern matching
> 
>
> Key: SPARK-17509
> URL: https://issues.apache.org/jira/browse/SPARK-17509
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.0.0
>Reporter: Sital Kedia
>
> Profiling a job, we saw that patten matching in wrap function of 
> HiveInspector is consuming around 10% of the time which can be avoided.  A 
> similar change in the unwrap function was made in SPARK-15956. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-17509) When wrapping catalyst datatype to Hive data type avoid pattern matching

2016-09-12 Thread Sital Kedia (JIRA)
Sital Kedia created SPARK-17509:
---

 Summary: When wrapping catalyst datatype to Hive data type avoid 
pattern matching
 Key: SPARK-17509
 URL: https://issues.apache.org/jira/browse/SPARK-17509
 Project: Spark
  Issue Type: Bug
Reporter: Sital Kedia






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16922) Query with Broadcast Hash join fails due to executor OOM in Spark 2.0

2016-09-06 Thread Sital Kedia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15469092#comment-15469092
 ] 

Sital Kedia commented on SPARK-16922:
-

There is no noticable performance gain I observed comparing to BytesToBytesMap. 
Part of the reason might be due to the fact that BroadcastHashJoin was 
consuming very less percentage of the total job time. 

> Query with Broadcast Hash join fails due to executor OOM in Spark 2.0
> -
>
> Key: SPARK-16922
> URL: https://issues.apache.org/jira/browse/SPARK-16922
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 2.0.0
>Reporter: Sital Kedia
>Assignee: Davies Liu
> Fix For: 2.0.1, 2.1.0
>
>
> A query which used to work in Spark 1.6 fails with executor OOM in 2.0.
> Stack trace - 
> {code}
>   at 
> org.apache.spark.unsafe.types.UTF8String.getBytes(UTF8String.java:229)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.hash$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.findOrInsert(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>   at 
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:161)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
>   at org.apache.spark.scheduler.Task.run(Task.scala:85)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> Query plan in Spark 1.6
> {code}
> == Physical Plan ==
> TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / 
> 100.0)),mode=Final,isDistinct=false)], output=[field1#101,field3#3])
> +- TungstenExchange hashpartitioning(field1#101,200), None
>+- TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / 
> 100.0)),mode=Partial,isDistinct=false)], output=[field1#101,sum#111])
>   +- Project [field1#101,field2#74]
>  +- BroadcastHashJoin [field5#63L], [cast(cast(field4#97 as 
> decimal(20,0)) as bigint)], BuildRight
> :- ConvertToUnsafe
> :  +- HiveTableScan [field2#74,field5#63L], MetastoreRelation 
> foo, table1, Some(a), [(ds#57 >= 2013-10-01),(ds#57 <= 2013-12-31)]
> +- ConvertToUnsafe
>+- HiveTableScan [field1#101,field4#97], MetastoreRelation 
> foo, table2, Some(b)
> {code}
> Query plan in 2.0
> {code}
> == Physical Plan ==
> *HashAggregate(keys=[field1#160], functions=[sum((field2#133 / 100.0))])
> +- Exchange hashpartitioning(field1#160, 200)
>+- *HashAggregate(keys=[field1#160], functions=[partial_sum((field2#133 / 
> 100.0))])
>   +- *Project [field2#133, field1#160]
>  +- *BroadcastHashJoin [field5#122L], [cast(cast(field4#156 as 
> decimal(20,0)) as bigint)], Inner, BuildRight
> :- *Filter isnotnull(field5#122L)
> :  +- HiveTableScan [field5#122L, field2#133], MetastoreRelation 
> foo, table1, a, [isnotnull(ds#116), (ds#116 >= 2013-10-01), (ds#116 <= 
> 2013-12-31)]
> +- BroadcastExchange 
> HashedRelationBroadcastMode(List(cast(cast(input[0, string, false] as 
> decimal(20,0)) as bigint)))
>+- *Filter isnotnull(field4#156)
>   +- HiveTableScan [field4#156, field1#160], 
> MetastoreRelation foo, table2, b
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16922) Query with Broadcast Hash join fails due to executor OOM in Spark 2.0

2016-09-06 Thread Sital Kedia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15468339#comment-15468339
 ] 

Sital Kedia commented on SPARK-16922:
-

[~davies] -  Thanks for looking into this. I tested the failing job with the 
fix and it works fine now. 

> Query with Broadcast Hash join fails due to executor OOM in Spark 2.0
> -
>
> Key: SPARK-16922
> URL: https://issues.apache.org/jira/browse/SPARK-16922
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 2.0.0
>Reporter: Sital Kedia
>Assignee: Davies Liu
> Fix For: 2.0.1, 2.1.0
>
>
> A query which used to work in Spark 1.6 fails with executor OOM in 2.0.
> Stack trace - 
> {code}
>   at 
> org.apache.spark.unsafe.types.UTF8String.getBytes(UTF8String.java:229)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.hash$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.findOrInsert(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>   at 
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:161)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
>   at org.apache.spark.scheduler.Task.run(Task.scala:85)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> Query plan in Spark 1.6
> {code}
> == Physical Plan ==
> TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / 
> 100.0)),mode=Final,isDistinct=false)], output=[field1#101,field3#3])
> +- TungstenExchange hashpartitioning(field1#101,200), None
>+- TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / 
> 100.0)),mode=Partial,isDistinct=false)], output=[field1#101,sum#111])
>   +- Project [field1#101,field2#74]
>  +- BroadcastHashJoin [field5#63L], [cast(cast(field4#97 as 
> decimal(20,0)) as bigint)], BuildRight
> :- ConvertToUnsafe
> :  +- HiveTableScan [field2#74,field5#63L], MetastoreRelation 
> foo, table1, Some(a), [(ds#57 >= 2013-10-01),(ds#57 <= 2013-12-31)]
> +- ConvertToUnsafe
>+- HiveTableScan [field1#101,field4#97], MetastoreRelation 
> foo, table2, Some(b)
> {code}
> Query plan in 2.0
> {code}
> == Physical Plan ==
> *HashAggregate(keys=[field1#160], functions=[sum((field2#133 / 100.0))])
> +- Exchange hashpartitioning(field1#160, 200)
>+- *HashAggregate(keys=[field1#160], functions=[partial_sum((field2#133 / 
> 100.0))])
>   +- *Project [field2#133, field1#160]
>  +- *BroadcastHashJoin [field5#122L], [cast(cast(field4#156 as 
> decimal(20,0)) as bigint)], Inner, BuildRight
> :- *Filter isnotnull(field5#122L)
> :  +- HiveTableScan [field5#122L, field2#133], MetastoreRelation 
> foo, table1, a, [isnotnull(ds#116), (ds#116 >= 2013-10-01), (ds#116 <= 
> 2013-12-31)]
> +- BroadcastExchange 
> HashedRelationBroadcastMode(List(cast(cast(input[0, string, false] as 
> decimal(20,0)) as bigint)))
>+- *Filter isnotnull(field4#156)
>   +- HiveTableScan [field4#156, field1#160], 
> MetastoreRelation foo, table2, b
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16922) Query with Broadcast Hash join fails due to executor OOM in Spark 2.0

2016-09-01 Thread Sital Kedia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15456744#comment-15456744
 ] 

Sital Kedia commented on SPARK-16922:
-

Thanks for the fix [~davies]. I will test this change with our job to see if 
that fixes the issue. 

PS - I am away this week. Will get to it some time next week. 

> Query with Broadcast Hash join fails due to executor OOM in Spark 2.0
> -
>
> Key: SPARK-16922
> URL: https://issues.apache.org/jira/browse/SPARK-16922
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 2.0.0
>Reporter: Sital Kedia
>Assignee: Davies Liu
>
> A query which used to work in Spark 1.6 fails with executor OOM in 2.0.
> Stack trace - 
> {code}
>   at 
> org.apache.spark.unsafe.types.UTF8String.getBytes(UTF8String.java:229)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.hash$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.findOrInsert(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>   at 
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:161)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
>   at org.apache.spark.scheduler.Task.run(Task.scala:85)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> Query plan in Spark 1.6
> {code}
> == Physical Plan ==
> TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / 
> 100.0)),mode=Final,isDistinct=false)], output=[field1#101,field3#3])
> +- TungstenExchange hashpartitioning(field1#101,200), None
>+- TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / 
> 100.0)),mode=Partial,isDistinct=false)], output=[field1#101,sum#111])
>   +- Project [field1#101,field2#74]
>  +- BroadcastHashJoin [field5#63L], [cast(cast(field4#97 as 
> decimal(20,0)) as bigint)], BuildRight
> :- ConvertToUnsafe
> :  +- HiveTableScan [field2#74,field5#63L], MetastoreRelation 
> foo, table1, Some(a), [(ds#57 >= 2013-10-01),(ds#57 <= 2013-12-31)]
> +- ConvertToUnsafe
>+- HiveTableScan [field1#101,field4#97], MetastoreRelation 
> foo, table2, Some(b)
> {code}
> Query plan in 2.0
> {code}
> == Physical Plan ==
> *HashAggregate(keys=[field1#160], functions=[sum((field2#133 / 100.0))])
> +- Exchange hashpartitioning(field1#160, 200)
>+- *HashAggregate(keys=[field1#160], functions=[partial_sum((field2#133 / 
> 100.0))])
>   +- *Project [field2#133, field1#160]
>  +- *BroadcastHashJoin [field5#122L], [cast(cast(field4#156 as 
> decimal(20,0)) as bigint)], Inner, BuildRight
> :- *Filter isnotnull(field5#122L)
> :  +- HiveTableScan [field5#122L, field2#133], MetastoreRelation 
> foo, table1, a, [isnotnull(ds#116), (ds#116 >= 2013-10-01), (ds#116 <= 
> 2013-12-31)]
> +- BroadcastExchange 
> HashedRelationBroadcastMode(List(cast(cast(input[0, string, false] as 
> decimal(20,0)) as bigint)))
>+- *Filter isnotnull(field4#156)
>   +- HiveTableScan [field4#156, field1#160], 
> MetastoreRelation foo, table2, b
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Closed] (SPARK-17164) Query with colon in the table name fails to parse in 2.0

2016-08-22 Thread Sital Kedia (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sital Kedia closed SPARK-17164.
---
Resolution: Won't Fix

> Query with colon in the table name fails to parse in 2.0
> 
>
> Key: SPARK-17164
> URL: https://issues.apache.org/jira/browse/SPARK-17164
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0
>Reporter: Sital Kedia
>
> Running a simple query with colon in table name fails to parse in 2.0
> {code}
> == SQL ==
> SELECT * FROM a:b
> ---^^^
>   at 
> org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:197)
>   at 
> org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:99)
>   at 
> org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:46)
>   at 
> org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:53)
>   at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582)
>   at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:682)
>   ... 48 elided
> {code}
> Please note that this is a regression from Spark 1.6 as the query runs fine 
> in 1.6.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17164) Query with colon in the table name fails to parse in 2.0

2016-08-22 Thread Sital Kedia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17164?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15431031#comment-15431031
 ] 

Sital Kedia commented on SPARK-17164:
-

Thanks [~rxin], [~hvanhovell], that makes sense. The issue is on our side. For 
Spark 1.6 we were using our internal version of Hive parser which supports 
table name with colon. Since Spark 2.0, no longer uses the Hive parser, this 
broke on our side. We will use backtick for such tables to workaround this 
issue.

> Query with colon in the table name fails to parse in 2.0
> 
>
> Key: SPARK-17164
> URL: https://issues.apache.org/jira/browse/SPARK-17164
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0
>Reporter: Sital Kedia
>
> Running a simple query with colon in table name fails to parse in 2.0
> {code}
> == SQL ==
> SELECT * FROM a:b
> ---^^^
>   at 
> org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:197)
>   at 
> org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:99)
>   at 
> org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:46)
>   at 
> org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:53)
>   at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582)
>   at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:682)
>   ... 48 elided
> {code}
> Please note that this is a regression from Spark 1.6 as the query runs fine 
> in 1.6.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17164) Query with colon in the table name fails to parse in 2.0

2016-08-19 Thread Sital Kedia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17164?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15429193#comment-15429193
 ] 

Sital Kedia commented on SPARK-17164:
-

cc - [~hvanhovell], [~rxin]

> Query with colon in the table name fails to parse in 2.0
> 
>
> Key: SPARK-17164
> URL: https://issues.apache.org/jira/browse/SPARK-17164
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0
>Reporter: Sital Kedia
>
> Running a simple query with colon in table name fails to parse in 2.0
> {code}
> == SQL ==
> SELECT * FROM a:b
> ---^^^
>   at 
> org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:197)
>   at 
> org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:99)
>   at 
> org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:46)
>   at 
> org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:53)
>   at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582)
>   at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:682)
>   ... 48 elided
> {code}
> Please note that this is a regression from Spark 1.6 as the query runs fine 
> in 1.6.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-17164) Query with colon in the table name fails to parse in 2.0

2016-08-19 Thread Sital Kedia (JIRA)
Sital Kedia created SPARK-17164:
---

 Summary: Query with colon in the table name fails to parse in 2.0
 Key: SPARK-17164
 URL: https://issues.apache.org/jira/browse/SPARK-17164
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.0.0
Reporter: Sital Kedia


Running a simple query with colon in table name fails to parse in 2.0

{code}
== SQL ==
SELECT * FROM a:b
---^^^

  at 
org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:197)
  at 
org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:99)
  at 
org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:46)
  at 
org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:53)
  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582)
  at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:682)
  ... 48 elided

{code}

Please note that this is a regression from Spark 1.6 as the query runs fine in 
1.6.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16922) Query with Broadcast Hash join fails due to executor OOM in Spark 2.0

2016-08-18 Thread Sital Kedia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15427429#comment-15427429
 ] 

Sital Kedia commented on SPARK-16922:
-

Kryo 

> Query with Broadcast Hash join fails due to executor OOM in Spark 2.0
> -
>
> Key: SPARK-16922
> URL: https://issues.apache.org/jira/browse/SPARK-16922
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 2.0.0
>Reporter: Sital Kedia
>
> A query which used to work in Spark 1.6 fails with executor OOM in 2.0.
> Stack trace - 
> {code}
>   at 
> org.apache.spark.unsafe.types.UTF8String.getBytes(UTF8String.java:229)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.hash$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.findOrInsert(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>   at 
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:161)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
>   at org.apache.spark.scheduler.Task.run(Task.scala:85)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> Query plan in Spark 1.6
> {code}
> == Physical Plan ==
> TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / 
> 100.0)),mode=Final,isDistinct=false)], output=[field1#101,field3#3])
> +- TungstenExchange hashpartitioning(field1#101,200), None
>+- TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / 
> 100.0)),mode=Partial,isDistinct=false)], output=[field1#101,sum#111])
>   +- Project [field1#101,field2#74]
>  +- BroadcastHashJoin [field5#63L], [cast(cast(field4#97 as 
> decimal(20,0)) as bigint)], BuildRight
> :- ConvertToUnsafe
> :  +- HiveTableScan [field2#74,field5#63L], MetastoreRelation 
> foo, table1, Some(a), [(ds#57 >= 2013-10-01),(ds#57 <= 2013-12-31)]
> +- ConvertToUnsafe
>+- HiveTableScan [field1#101,field4#97], MetastoreRelation 
> foo, table2, Some(b)
> {code}
> Query plan in 2.0
> {code}
> == Physical Plan ==
> *HashAggregate(keys=[field1#160], functions=[sum((field2#133 / 100.0))])
> +- Exchange hashpartitioning(field1#160, 200)
>+- *HashAggregate(keys=[field1#160], functions=[partial_sum((field2#133 / 
> 100.0))])
>   +- *Project [field2#133, field1#160]
>  +- *BroadcastHashJoin [field5#122L], [cast(cast(field4#156 as 
> decimal(20,0)) as bigint)], Inner, BuildRight
> :- *Filter isnotnull(field5#122L)
> :  +- HiveTableScan [field5#122L, field2#133], MetastoreRelation 
> foo, table1, a, [isnotnull(ds#116), (ds#116 >= 2013-10-01), (ds#116 <= 
> 2013-12-31)]
> +- BroadcastExchange 
> HashedRelationBroadcastMode(List(cast(cast(input[0, string, false] as 
> decimal(20,0)) as bigint)))
>+- *Filter isnotnull(field4#156)
>   +- HiveTableScan [field4#156, field1#160], 
> MetastoreRelation foo, table2, b
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16922) Query with Broadcast Hash join fails due to executor OOM in Spark 2.0

2016-08-18 Thread Sital Kedia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15427259#comment-15427259
 ] 

Sital Kedia commented on SPARK-16922:
-

>> Could you also try to disable the dense mode?

I tried disabling the dense mode, that did not help either. 

> Query with Broadcast Hash join fails due to executor OOM in Spark 2.0
> -
>
> Key: SPARK-16922
> URL: https://issues.apache.org/jira/browse/SPARK-16922
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 2.0.0
>Reporter: Sital Kedia
>
> A query which used to work in Spark 1.6 fails with executor OOM in 2.0.
> Stack trace - 
> {code}
>   at 
> org.apache.spark.unsafe.types.UTF8String.getBytes(UTF8String.java:229)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.hash$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.findOrInsert(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>   at 
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:161)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
>   at org.apache.spark.scheduler.Task.run(Task.scala:85)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> Query plan in Spark 1.6
> {code}
> == Physical Plan ==
> TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / 
> 100.0)),mode=Final,isDistinct=false)], output=[field1#101,field3#3])
> +- TungstenExchange hashpartitioning(field1#101,200), None
>+- TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / 
> 100.0)),mode=Partial,isDistinct=false)], output=[field1#101,sum#111])
>   +- Project [field1#101,field2#74]
>  +- BroadcastHashJoin [field5#63L], [cast(cast(field4#97 as 
> decimal(20,0)) as bigint)], BuildRight
> :- ConvertToUnsafe
> :  +- HiveTableScan [field2#74,field5#63L], MetastoreRelation 
> foo, table1, Some(a), [(ds#57 >= 2013-10-01),(ds#57 <= 2013-12-31)]
> +- ConvertToUnsafe
>+- HiveTableScan [field1#101,field4#97], MetastoreRelation 
> foo, table2, Some(b)
> {code}
> Query plan in 2.0
> {code}
> == Physical Plan ==
> *HashAggregate(keys=[field1#160], functions=[sum((field2#133 / 100.0))])
> +- Exchange hashpartitioning(field1#160, 200)
>+- *HashAggregate(keys=[field1#160], functions=[partial_sum((field2#133 / 
> 100.0))])
>   +- *Project [field2#133, field1#160]
>  +- *BroadcastHashJoin [field5#122L], [cast(cast(field4#156 as 
> decimal(20,0)) as bigint)], Inner, BuildRight
> :- *Filter isnotnull(field5#122L)
> :  +- HiveTableScan [field5#122L, field2#133], MetastoreRelation 
> foo, table1, a, [isnotnull(ds#116), (ds#116 >= 2013-10-01), (ds#116 <= 
> 2013-12-31)]
> +- BroadcastExchange 
> HashedRelationBroadcastMode(List(cast(cast(input[0, string, false] as 
> decimal(20,0)) as bigint)))
>+- *Filter isnotnull(field4#156)
>   +- HiveTableScan [field4#156, field1#160], 
> MetastoreRelation foo, table2, b
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16922) Query with Broadcast Hash join fails due to executor OOM in Spark 2.0

2016-08-18 Thread Sital Kedia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15427250#comment-15427250
 ] 

Sital Kedia commented on SPARK-16922:
-

The failure is deterministic, we are reproducing the issue for every run of the 
job (Its not only one job, there are multiple jobs that are failing because of 
this). For now, we have made a change to not use the  LongHashedRelation to 
workaround this issue. 

> Query with Broadcast Hash join fails due to executor OOM in Spark 2.0
> -
>
> Key: SPARK-16922
> URL: https://issues.apache.org/jira/browse/SPARK-16922
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 2.0.0
>Reporter: Sital Kedia
>
> A query which used to work in Spark 1.6 fails with executor OOM in 2.0.
> Stack trace - 
> {code}
>   at 
> org.apache.spark.unsafe.types.UTF8String.getBytes(UTF8String.java:229)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.hash$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.findOrInsert(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>   at 
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:161)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
>   at org.apache.spark.scheduler.Task.run(Task.scala:85)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> Query plan in Spark 1.6
> {code}
> == Physical Plan ==
> TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / 
> 100.0)),mode=Final,isDistinct=false)], output=[field1#101,field3#3])
> +- TungstenExchange hashpartitioning(field1#101,200), None
>+- TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / 
> 100.0)),mode=Partial,isDistinct=false)], output=[field1#101,sum#111])
>   +- Project [field1#101,field2#74]
>  +- BroadcastHashJoin [field5#63L], [cast(cast(field4#97 as 
> decimal(20,0)) as bigint)], BuildRight
> :- ConvertToUnsafe
> :  +- HiveTableScan [field2#74,field5#63L], MetastoreRelation 
> foo, table1, Some(a), [(ds#57 >= 2013-10-01),(ds#57 <= 2013-12-31)]
> +- ConvertToUnsafe
>+- HiveTableScan [field1#101,field4#97], MetastoreRelation 
> foo, table2, Some(b)
> {code}
> Query plan in 2.0
> {code}
> == Physical Plan ==
> *HashAggregate(keys=[field1#160], functions=[sum((field2#133 / 100.0))])
> +- Exchange hashpartitioning(field1#160, 200)
>+- *HashAggregate(keys=[field1#160], functions=[partial_sum((field2#133 / 
> 100.0))])
>   +- *Project [field2#133, field1#160]
>  +- *BroadcastHashJoin [field5#122L], [cast(cast(field4#156 as 
> decimal(20,0)) as bigint)], Inner, BuildRight
> :- *Filter isnotnull(field5#122L)
> :  +- HiveTableScan [field5#122L, field2#133], MetastoreRelation 
> foo, table1, a, [isnotnull(ds#116), (ds#116 >= 2013-10-01), (ds#116 <= 
> 2013-12-31)]
> +- BroadcastExchange 
> HashedRelationBroadcastMode(List(cast(cast(input[0, string, false] as 
> decimal(20,0)) as bigint)))
>+- *Filter isnotnull(field4#156)
>   +- HiveTableScan [field4#156, field1#160], 
> MetastoreRelation foo, table2, b
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-17113) Job failure due to Executor OOM in offheap mode

2016-08-17 Thread Sital Kedia (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sital Kedia updated SPARK-17113:

Summary: Job failure due to Executor OOM in offheap mode  (was: Job failure 
due to Executor OOM)

> Job failure due to Executor OOM in offheap mode
> ---
>
> Key: SPARK-17113
> URL: https://issues.apache.org/jira/browse/SPARK-17113
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 1.6.2, 2.0.0
>Reporter: Sital Kedia
>
> We have been seeing many job failure due to executor OOM with following stack 
> trace - 
> {code}
> java.lang.OutOfMemoryError: Unable to acquire 1220 bytes of memory, got 0
>   at 
> org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:120)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:341)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:362)
>   at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:93)
>   at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:170)
>   at org.apache.spark.sql.execution.Sort$$anonfun$1.apply(Sort.scala:90)
>   at org.apache.spark.sql.execution.Sort$$anonfun$1.apply(Sort.scala:64)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:736)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:736)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:307)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:271)
>   at 
> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:307)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:271)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:307)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:271)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:307)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:271)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>   at org.apache.spark.scheduler.Task.run(Task.scala:89)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> Digging into the code, we found out that this is an issue with cooperative 
> memory management for off heap memory allocation.  
> In the code 
> https://github.com/sitalkedia/spark/blob/master/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java#L463,
>  when the UnsafeExternalSorter is checking if memory page is being used by 
> upstream, the base object in case of off heap memory is always null so the 
> UnsafeExternalSorter does not spill the memory pages.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17113) Job failure due to Executor OOM

2016-08-17 Thread Sital Kedia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15425216#comment-15425216
 ] 

Sital Kedia commented on SPARK-17113:
-

cc - [~davies] 

> Job failure due to Executor OOM
> ---
>
> Key: SPARK-17113
> URL: https://issues.apache.org/jira/browse/SPARK-17113
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 1.6.2, 2.0.0
>Reporter: Sital Kedia
>
> We have been seeing many job failure due to executor OOM with following stack 
> trace - 
> {code}
> java.lang.OutOfMemoryError: Unable to acquire 1220 bytes of memory, got 0
>   at 
> org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:120)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:341)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:362)
>   at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:93)
>   at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:170)
>   at org.apache.spark.sql.execution.Sort$$anonfun$1.apply(Sort.scala:90)
>   at org.apache.spark.sql.execution.Sort$$anonfun$1.apply(Sort.scala:64)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:736)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:736)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:307)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:271)
>   at 
> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:307)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:271)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:307)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:271)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:307)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:271)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>   at org.apache.spark.scheduler.Task.run(Task.scala:89)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> Digging into the code, we found out that this is an issue with cooperative 
> memory management for off heap memory allocation.  
> In the code 
> https://github.com/sitalkedia/spark/blob/master/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java#L463,
>  when the UnsafeExternalSorter is checking if memory page is being used by 
> upstream, the base object in case of off heap memory is always null so the 
> UnsafeExternalSorter does not spill the memory pages.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-17113) Job failure due to Executor OOM

2016-08-17 Thread Sital Kedia (JIRA)
Sital Kedia created SPARK-17113:
---

 Summary: Job failure due to Executor OOM
 Key: SPARK-17113
 URL: https://issues.apache.org/jira/browse/SPARK-17113
 Project: Spark
  Issue Type: Bug
Affects Versions: 2.0.0, 1.6.2
Reporter: Sital Kedia


We have been seeing many job failure due to executor OOM with following stack 
trace - 

{code}
java.lang.OutOfMemoryError: Unable to acquire 1220 bytes of memory, got 0
at 
org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:120)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:341)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:362)
at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:93)
at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:170)
at org.apache.spark.sql.execution.Sort$$anonfun$1.apply(Sort.scala:90)
at org.apache.spark.sql.execution.Sort$$anonfun$1.apply(Sort.scala:64)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:736)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:736)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:307)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:271)
at 
org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:307)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:271)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:307)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:271)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:307)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:271)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{code}

Digging into the code, we found out that this is an issue with cooperative 
memory management for off heap memory allocation.  

In the code 
https://github.com/sitalkedia/spark/blob/master/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java#L463,
 when the UnsafeExternalSorter is checking if memory page is being used by 
upstream, the base object in case of off heap memory is always null so the 
UnsafeExternalSorter does not spill the memory pages.





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16922) Query with Broadcast Hash join fails due to executor OOM in Spark 2.0

2016-08-15 Thread Sital Kedia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15421524#comment-15421524
 ] 

Sital Kedia commented on SPARK-16922:
-

Yes, I have the above mentioned PR as well. 

> Query with Broadcast Hash join fails due to executor OOM in Spark 2.0
> -
>
> Key: SPARK-16922
> URL: https://issues.apache.org/jira/browse/SPARK-16922
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 2.0.0
>Reporter: Sital Kedia
>
> A query which used to work in Spark 1.6 fails with executor OOM in 2.0.
> Stack trace - 
> {code}
>   at 
> org.apache.spark.unsafe.types.UTF8String.getBytes(UTF8String.java:229)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.hash$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.findOrInsert(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>   at 
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:161)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
>   at org.apache.spark.scheduler.Task.run(Task.scala:85)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> Query plan in Spark 1.6
> {code}
> == Physical Plan ==
> TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / 
> 100.0)),mode=Final,isDistinct=false)], output=[field1#101,field3#3])
> +- TungstenExchange hashpartitioning(field1#101,200), None
>+- TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / 
> 100.0)),mode=Partial,isDistinct=false)], output=[field1#101,sum#111])
>   +- Project [field1#101,field2#74]
>  +- BroadcastHashJoin [field5#63L], [cast(cast(field4#97 as 
> decimal(20,0)) as bigint)], BuildRight
> :- ConvertToUnsafe
> :  +- HiveTableScan [field2#74,field5#63L], MetastoreRelation 
> foo, table1, Some(a), [(ds#57 >= 2013-10-01),(ds#57 <= 2013-12-31)]
> +- ConvertToUnsafe
>+- HiveTableScan [field1#101,field4#97], MetastoreRelation 
> foo, table2, Some(b)
> {code}
> Query plan in 2.0
> {code}
> == Physical Plan ==
> *HashAggregate(keys=[field1#160], functions=[sum((field2#133 / 100.0))])
> +- Exchange hashpartitioning(field1#160, 200)
>+- *HashAggregate(keys=[field1#160], functions=[partial_sum((field2#133 / 
> 100.0))])
>   +- *Project [field2#133, field1#160]
>  +- *BroadcastHashJoin [field5#122L], [cast(cast(field4#156 as 
> decimal(20,0)) as bigint)], Inner, BuildRight
> :- *Filter isnotnull(field5#122L)
> :  +- HiveTableScan [field5#122L, field2#133], MetastoreRelation 
> foo, table1, a, [isnotnull(ds#116), (ds#116 >= 2013-10-01), (ds#116 <= 
> 2013-12-31)]
> +- BroadcastExchange 
> HashedRelationBroadcastMode(List(cast(cast(input[0, string, false] as 
> decimal(20,0)) as bigint)))
>+- *Filter isnotnull(field4#156)
>   +- HiveTableScan [field4#156, field1#160], 
> MetastoreRelation foo, table2, b
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16922) Query with Broadcast Hash join fails due to executor OOM in Spark 2.0

2016-08-12 Thread Sital Kedia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15419458#comment-15419458
 ] 

Sital Kedia commented on SPARK-16922:
-

I am using the fix in https://github.com/apache/spark/pull/14464/files, still 
the issue remains. The joining key lies in the range [43304915L to 
10150946266075397L]

> Query with Broadcast Hash join fails due to executor OOM in Spark 2.0
> -
>
> Key: SPARK-16922
> URL: https://issues.apache.org/jira/browse/SPARK-16922
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 2.0.0
>Reporter: Sital Kedia
>
> A query which used to work in Spark 1.6 fails with executor OOM in 2.0.
> Stack trace - 
> {code}
>   at 
> org.apache.spark.unsafe.types.UTF8String.getBytes(UTF8String.java:229)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.hash$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.findOrInsert(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>   at 
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:161)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
>   at org.apache.spark.scheduler.Task.run(Task.scala:85)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> Query plan in Spark 1.6
> {code}
> == Physical Plan ==
> TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / 
> 100.0)),mode=Final,isDistinct=false)], output=[field1#101,field3#3])
> +- TungstenExchange hashpartitioning(field1#101,200), None
>+- TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / 
> 100.0)),mode=Partial,isDistinct=false)], output=[field1#101,sum#111])
>   +- Project [field1#101,field2#74]
>  +- BroadcastHashJoin [field5#63L], [cast(cast(field4#97 as 
> decimal(20,0)) as bigint)], BuildRight
> :- ConvertToUnsafe
> :  +- HiveTableScan [field2#74,field5#63L], MetastoreRelation 
> foo, table1, Some(a), [(ds#57 >= 2013-10-01),(ds#57 <= 2013-12-31)]
> +- ConvertToUnsafe
>+- HiveTableScan [field1#101,field4#97], MetastoreRelation 
> foo, table2, Some(b)
> {code}
> Query plan in 2.0
> {code}
> == Physical Plan ==
> *HashAggregate(keys=[field1#160], functions=[sum((field2#133 / 100.0))])
> +- Exchange hashpartitioning(field1#160, 200)
>+- *HashAggregate(keys=[field1#160], functions=[partial_sum((field2#133 / 
> 100.0))])
>   +- *Project [field2#133, field1#160]
>  +- *BroadcastHashJoin [field5#122L], [cast(cast(field4#156 as 
> decimal(20,0)) as bigint)], Inner, BuildRight
> :- *Filter isnotnull(field5#122L)
> :  +- HiveTableScan [field5#122L, field2#133], MetastoreRelation 
> foo, table1, a, [isnotnull(ds#116), (ds#116 >= 2013-10-01), (ds#116 <= 
> 2013-12-31)]
> +- BroadcastExchange 
> HashedRelationBroadcastMode(List(cast(cast(input[0, string, false] as 
> decimal(20,0)) as bigint)))
>+- *Filter isnotnull(field4#156)
>   +- HiveTableScan [field4#156, field1#160], 
> MetastoreRelation foo, table2, b
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-16922) Query with Broadcast Hash join fails due to executor OOM in Spark 2.0

2016-08-12 Thread Sital Kedia (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-16922?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sital Kedia updated SPARK-16922:

Summary: Query with Broadcast Hash join fails due to executor OOM in Spark 
2.0  (was: Query failure due to executor OOM in Spark 2.0)

> Query with Broadcast Hash join fails due to executor OOM in Spark 2.0
> -
>
> Key: SPARK-16922
> URL: https://issues.apache.org/jira/browse/SPARK-16922
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 2.0.0
>Reporter: Sital Kedia
>
> A query which used to work in Spark 1.6 fails with executor OOM in 2.0.
> Stack trace - 
> {code}
>   at 
> org.apache.spark.unsafe.types.UTF8String.getBytes(UTF8String.java:229)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.hash$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.findOrInsert(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>   at 
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:161)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
>   at org.apache.spark.scheduler.Task.run(Task.scala:85)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> Query plan in Spark 1.6
> {code}
> == Physical Plan ==
> TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / 
> 100.0)),mode=Final,isDistinct=false)], output=[field1#101,field3#3])
> +- TungstenExchange hashpartitioning(field1#101,200), None
>+- TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / 
> 100.0)),mode=Partial,isDistinct=false)], output=[field1#101,sum#111])
>   +- Project [field1#101,field2#74]
>  +- BroadcastHashJoin [field5#63L], [cast(cast(field4#97 as 
> decimal(20,0)) as bigint)], BuildRight
> :- ConvertToUnsafe
> :  +- HiveTableScan [field2#74,field5#63L], MetastoreRelation 
> foo, table1, Some(a), [(ds#57 >= 2013-10-01),(ds#57 <= 2013-12-31)]
> +- ConvertToUnsafe
>+- HiveTableScan [field1#101,field4#97], MetastoreRelation 
> foo, table2, Some(b)
> {code}
> Query plan in 2.0
> {code}
> == Physical Plan ==
> *HashAggregate(keys=[field1#160], functions=[sum((field2#133 / 100.0))])
> +- Exchange hashpartitioning(field1#160, 200)
>+- *HashAggregate(keys=[field1#160], functions=[partial_sum((field2#133 / 
> 100.0))])
>   +- *Project [field2#133, field1#160]
>  +- *BroadcastHashJoin [field5#122L], [cast(cast(field4#156 as 
> decimal(20,0)) as bigint)], Inner, BuildRight
> :- *Filter isnotnull(field5#122L)
> :  +- HiveTableScan [field5#122L, field2#133], MetastoreRelation 
> foo, table1, a, [isnotnull(ds#116), (ds#116 >= 2013-10-01), (ds#116 <= 
> 2013-12-31)]
> +- BroadcastExchange 
> HashedRelationBroadcastMode(List(cast(cast(input[0, string, false] as 
> decimal(20,0)) as bigint)))
>+- *Filter isnotnull(field4#156)
>   +- HiveTableScan [field4#156, field1#160], 
> MetastoreRelation foo, table2, b
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-16922) Query failure due to executor OOM in Spark 2.0

2016-08-12 Thread Sital Kedia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15419383#comment-15419383
 ] 

Sital Kedia edited comment on SPARK-16922 at 8/12/16 8:06 PM:
--

I found that the regression was introduced in 
https://github.com/apache/spark/pull/12278, which introduced a new data 
structure (LongHashedRelation) for long types. I made a hack to use 
UnsafeHashedRelation instead of LongHashedRelation in 
https://github.com/apache/spark/blob/branch-2.0/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala#L105
 and things started working fine.  This might be due to some data corruption 
happening in LongHashedRelation. 

cc - [~davies]


was (Author: sitalke...@gmail.com):
I found that the regression was introduced in 
https://github.com/apache/spark/pull/12278, whichintroduced a new data 
structure (LongHashedRelation) for long types. I made a hack to use 
UnsafeHashedRelation instead of LongHashedRelation in 
https://github.com/apache/spark/blob/branch-2.0/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala#L105
 and things started working fine.  This might be due to some data corruption 
happening in LongHashedRelation. 

cc - [~davies]

> Query failure due to executor OOM in Spark 2.0
> --
>
> Key: SPARK-16922
> URL: https://issues.apache.org/jira/browse/SPARK-16922
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 2.0.0
>Reporter: Sital Kedia
>
> A query which used to work in Spark 1.6 fails with executor OOM in 2.0.
> Stack trace - 
> {code}
>   at 
> org.apache.spark.unsafe.types.UTF8String.getBytes(UTF8String.java:229)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.hash$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.findOrInsert(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>   at 
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:161)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
>   at org.apache.spark.scheduler.Task.run(Task.scala:85)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> Query plan in Spark 1.6
> {code}
> == Physical Plan ==
> TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / 
> 100.0)),mode=Final,isDistinct=false)], output=[field1#101,field3#3])
> +- TungstenExchange hashpartitioning(field1#101,200), None
>+- TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / 
> 100.0)),mode=Partial,isDistinct=false)], output=[field1#101,sum#111])
>   +- Project [field1#101,field2#74]
>  +- BroadcastHashJoin [field5#63L], [cast(cast(field4#97 as 
> decimal(20,0)) as bigint)], BuildRight
> :- ConvertToUnsafe
> :  +- HiveTableScan [field2#74,field5#63L], MetastoreRelation 
> foo, table1, Some(a), [(ds#57 >= 2013-10-01),(ds#57 <= 2013-12-31)]
> +- ConvertToUnsafe
>+- HiveTableScan [field1#101,field4#97], MetastoreRelation 
> foo, table2, Some(b)
> {code}
> Query plan in 2.0
> {code}
> == Physical Plan ==
> *HashAggregate(keys=[field1#160], functions=[sum((field2#133 / 100.0))])
> +- Exchange hashpartitioning(field1#160, 200)
>+- *HashAggregate(keys=[field1#160], functions=[partial_sum((field2#133 / 
> 100.0))])
>   +- *Project [field2#133, field1#160]
>  +- *BroadcastHashJoin [field5#122L], [cast(cast(field4#156 as 
> decimal(20,0)) as bigint)], Inner, BuildRight
> :- *Filter isnotnull(field5#122L)
> :  +- HiveTableScan [field5#122L, field2#133], MetastoreRelation 
> foo, table1, a, [isnotnull(ds#116), (ds#116 >= 2013-10-01), (ds#116 <= 
> 2013-12-31)]
> +- BroadcastExchange 
> HashedRelationBroadcastMode(Li

[jira] [Commented] (SPARK-16922) Query failure due to executor OOM in Spark 2.0

2016-08-12 Thread Sital Kedia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15419383#comment-15419383
 ] 

Sital Kedia commented on SPARK-16922:
-

I found that the regression was introduced in 
https://github.com/apache/spark/pull/12278, whichintroduced a new data 
structure (LongHashedRelation) for long types. I made a hack to use 
UnsafeHashedRelation instead of LongHashedRelation in 
https://github.com/apache/spark/blob/branch-2.0/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala#L105
 and things started working fine.  This might be due to some data corruption 
happening in LongHashedRelation. 

cc - [~davies]

> Query failure due to executor OOM in Spark 2.0
> --
>
> Key: SPARK-16922
> URL: https://issues.apache.org/jira/browse/SPARK-16922
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 2.0.0
>Reporter: Sital Kedia
>
> A query which used to work in Spark 1.6 fails with executor OOM in 2.0.
> Stack trace - 
> {code}
>   at 
> org.apache.spark.unsafe.types.UTF8String.getBytes(UTF8String.java:229)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.hash$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.findOrInsert(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>   at 
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:161)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
>   at org.apache.spark.scheduler.Task.run(Task.scala:85)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> Query plan in Spark 1.6
> {code}
> == Physical Plan ==
> TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / 
> 100.0)),mode=Final,isDistinct=false)], output=[field1#101,field3#3])
> +- TungstenExchange hashpartitioning(field1#101,200), None
>+- TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / 
> 100.0)),mode=Partial,isDistinct=false)], output=[field1#101,sum#111])
>   +- Project [field1#101,field2#74]
>  +- BroadcastHashJoin [field5#63L], [cast(cast(field4#97 as 
> decimal(20,0)) as bigint)], BuildRight
> :- ConvertToUnsafe
> :  +- HiveTableScan [field2#74,field5#63L], MetastoreRelation 
> foo, table1, Some(a), [(ds#57 >= 2013-10-01),(ds#57 <= 2013-12-31)]
> +- ConvertToUnsafe
>+- HiveTableScan [field1#101,field4#97], MetastoreRelation 
> foo, table2, Some(b)
> {code}
> Query plan in 2.0
> {code}
> == Physical Plan ==
> *HashAggregate(keys=[field1#160], functions=[sum((field2#133 / 100.0))])
> +- Exchange hashpartitioning(field1#160, 200)
>+- *HashAggregate(keys=[field1#160], functions=[partial_sum((field2#133 / 
> 100.0))])
>   +- *Project [field2#133, field1#160]
>  +- *BroadcastHashJoin [field5#122L], [cast(cast(field4#156 as 
> decimal(20,0)) as bigint)], Inner, BuildRight
> :- *Filter isnotnull(field5#122L)
> :  +- HiveTableScan [field5#122L, field2#133], MetastoreRelation 
> foo, table1, a, [isnotnull(ds#116), (ds#116 >= 2013-10-01), (ds#116 <= 
> 2013-12-31)]
> +- BroadcastExchange 
> HashedRelationBroadcastMode(List(cast(cast(input[0, string, false] as 
> decimal(20,0)) as bigint)))
>+- *Filter isnotnull(field4#156)
>   +- HiveTableScan [field4#156, field1#160], 
> MetastoreRelation foo, table2, b
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16922) Query failure due to executor OOM in Spark 2.0

2016-08-06 Thread Sital Kedia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15410703#comment-15410703
 ] 

Sital Kedia commented on SPARK-16922:
-

Update - The query works fine when Broadcast hash join in turned off, so the 
issue might be in broadcast hash join. I put some debug print in 
UnsafeRowWriter class 
(https://github.com/apache/spark/blob/branch-2.0/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java#L214)
 and I found that it is receiving a row of size around 800MB and OOMing while 
trying to grow the buffer holder. It might suggest that there is some data 
corruption going on probably in the Broadcast hash join. 

cc- [~davies] - Any pointer on how to debug this issue further? 

> Query failure due to executor OOM in Spark 2.0
> --
>
> Key: SPARK-16922
> URL: https://issues.apache.org/jira/browse/SPARK-16922
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 2.0.0
>Reporter: Sital Kedia
>
> A query which used to work in Spark 1.6 fails with executor OOM in 2.0.
> Stack trace - 
> {code}
>   at 
> org.apache.spark.unsafe.types.UTF8String.getBytes(UTF8String.java:229)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.hash$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.findOrInsert(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>   at 
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:161)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
>   at org.apache.spark.scheduler.Task.run(Task.scala:85)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> Query plan in Spark 1.6
> {code}
> == Physical Plan ==
> TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / 
> 100.0)),mode=Final,isDistinct=false)], output=[field1#101,field3#3])
> +- TungstenExchange hashpartitioning(field1#101,200), None
>+- TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / 
> 100.0)),mode=Partial,isDistinct=false)], output=[field1#101,sum#111])
>   +- Project [field1#101,field2#74]
>  +- BroadcastHashJoin [field5#63L], [cast(cast(field4#97 as 
> decimal(20,0)) as bigint)], BuildRight
> :- ConvertToUnsafe
> :  +- HiveTableScan [field2#74,field5#63L], MetastoreRelation 
> foo, table1, Some(a), [(ds#57 >= 2013-10-01),(ds#57 <= 2013-12-31)]
> +- ConvertToUnsafe
>+- HiveTableScan [field1#101,field4#97], MetastoreRelation 
> foo, table2, Some(b)
> {code}
> Query plan in 2.0
> {code}
> == Physical Plan ==
> *HashAggregate(keys=[field1#160], functions=[sum((field2#133 / 100.0))])
> +- Exchange hashpartitioning(field1#160, 200)
>+- *HashAggregate(keys=[field1#160], functions=[partial_sum((field2#133 / 
> 100.0))])
>   +- *Project [field2#133, field1#160]
>  +- *BroadcastHashJoin [field5#122L], [cast(cast(field4#156 as 
> decimal(20,0)) as bigint)], Inner, BuildRight
> :- *Filter isnotnull(field5#122L)
> :  +- HiveTableScan [field5#122L, field2#133], MetastoreRelation 
> foo, table1, a, [isnotnull(ds#116), (ds#116 >= 2013-10-01), (ds#116 <= 
> 2013-12-31)]
> +- BroadcastExchange 
> HashedRelationBroadcastMode(List(cast(cast(input[0, string, false] as 
> decimal(20,0)) as bigint)))
>+- *Filter isnotnull(field4#156)
>   +- HiveTableScan [field4#156, field1#160], 
> MetastoreRelation foo, table2, b
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16922) Query failure due to executor OOM in Spark 2.0

2016-08-05 Thread Sital Kedia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15409878#comment-15409878
 ] 

Sital Kedia commented on SPARK-16922:
-

PS - Rerunning the query with spark.sql.codegen.aggregate.map.columns.max=0 to 
disabled vectorized aggregation for columnar map also OOMs, but with a 
different stack trace. 

{code}
[Stage 1:>(0 + 4) / 
184]16/08/05 11:26:31 WARN TaskSetManager: Lost task 2.0 in stage 1.0 (TID 4, 
hadoop4774.prn2.facebook.com): java.lang.OutOfMemoryError: Java heap space
at 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder.grow(BufferHolder.java:73)
at 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(UnsafeRowWriter.java:214)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at 
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:161)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

{code}

> Query failure due to executor OOM in Spark 2.0
> --
>
> Key: SPARK-16922
> URL: https://issues.apache.org/jira/browse/SPARK-16922
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 2.0.0
>Reporter: Sital Kedia
>
> A query which used to work in Spark 1.6 fails with executor OOM in 2.0.
> Stack trace - 
> {code}
>   at 
> org.apache.spark.unsafe.types.UTF8String.getBytes(UTF8String.java:229)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.hash$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.findOrInsert(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>   at 
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:161)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
>   at org.apache.spark.scheduler.Task.run(Task.scala:85)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> Query plan in Spark 1.6
> {code}
> == Physical Plan ==
> TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / 
> 100.0)),mode=Final,isDistinct=false)], output=[field1#101,field3#3])
> +- TungstenExchange hashpartitioning(field1#101,200), None
>+- TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / 
> 100.0)),mode=Partial,isDistinct=false)], output=[field1#101,sum#111])
>   +- Project [field1#101,field2#74]
>  +- BroadcastHashJoin [field5#63L], [cast(cast(field4#97 as 
> decimal(20,0)) as bigint)], BuildRight
> :- ConvertToUnsafe
> :  +- HiveTableScan [field2#74,field5#63L], MetastoreRelation 
> foo, table1, Some(a), [(ds#57 >= 2013-10-01),(ds#57 <= 2013-12-31)]
> +- ConvertToUnsafe
>+- HiveTableScan [field1#101,field4#97], MetastoreRelation 
> foo, table2, Some(b)

[jira] [Commented] (SPARK-16922) Query failure due to executor OOM in Spark 2.0

2016-08-05 Thread Sital Kedia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15409813#comment-15409813
 ] 

Sital Kedia commented on SPARK-16922:
-

cc - [~rxin] 

> Query failure due to executor OOM in Spark 2.0
> --
>
> Key: SPARK-16922
> URL: https://issues.apache.org/jira/browse/SPARK-16922
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 2.0.0
>Reporter: Sital Kedia
>
> A query which used to work in Spark 1.6 fails with executor OOM in 2.0.
> Stack trace - 
> {code}
>   at 
> org.apache.spark.unsafe.types.UTF8String.getBytes(UTF8String.java:229)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.hash$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.findOrInsert(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>   at 
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:161)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
>   at org.apache.spark.scheduler.Task.run(Task.scala:85)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> Query plan in Spark 1.6
> {code}
> == Physical Plan ==
> TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / 
> 100.0)),mode=Final,isDistinct=false)], output=[field1#101,field3#3])
> +- TungstenExchange hashpartitioning(field1#101,200), None
>+- TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / 
> 100.0)),mode=Partial,isDistinct=false)], output=[field1#101,sum#111])
>   +- Project [field1#101,field2#74]
>  +- BroadcastHashJoin [field5#63L], [cast(cast(field4#97 as 
> decimal(20,0)) as bigint)], BuildRight
> :- ConvertToUnsafe
> :  +- HiveTableScan [field2#74,field5#63L], MetastoreRelation 
> foo, table1, Some(a), [(ds#57 >= 2013-10-01),(ds#57 <= 2013-12-31)]
> +- ConvertToUnsafe
>+- HiveTableScan [field1#101,field4#97], MetastoreRelation 
> foo, table2, Some(b)
> {code}
> Query plan in 2.0
> {code}
> == Physical Plan ==
> *HashAggregate(keys=[field1#160], functions=[sum((field2#133 / 100.0))])
> +- Exchange hashpartitioning(field1#160, 200)
>+- *HashAggregate(keys=[field1#160], functions=[partial_sum((field2#133 / 
> 100.0))])
>   +- *Project [field2#133, field1#160]
>  +- *BroadcastHashJoin [field5#122L], [cast(cast(field4#156 as 
> decimal(20,0)) as bigint)], Inner, BuildRight
> :- *Filter isnotnull(field5#122L)
> :  +- HiveTableScan [field5#122L, field2#133], MetastoreRelation 
> foo, table1, a, [isnotnull(ds#116), (ds#116 >= 2013-10-01), (ds#116 <= 
> 2013-12-31)]
> +- BroadcastExchange 
> HashedRelationBroadcastMode(List(cast(cast(input[0, string, false] as 
> decimal(20,0)) as bigint)))
>+- *Filter isnotnull(field4#156)
>   +- HiveTableScan [field4#156, field1#160], 
> MetastoreRelation foo, table2, b
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-16922) Query failure due to executor OOM in Spark 2.0

2016-08-05 Thread Sital Kedia (JIRA)
Sital Kedia created SPARK-16922:
---

 Summary: Query failure due to executor OOM in Spark 2.0
 Key: SPARK-16922
 URL: https://issues.apache.org/jira/browse/SPARK-16922
 Project: Spark
  Issue Type: Bug
  Components: Shuffle
Affects Versions: 2.0.0
Reporter: Sital Kedia


A query which used to work in Spark 1.6 fails with executor OOM in 2.0.

Stack trace - 
{code}
at 
org.apache.spark.unsafe.types.UTF8String.getBytes(UTF8String.java:229)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.hash$(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.findOrInsert(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at 
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:161)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

{code}

Query plan in Spark 1.6

{code}
== Physical Plan ==
TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / 
100.0)),mode=Final,isDistinct=false)], output=[field1#101,field3#3])
+- TungstenExchange hashpartitioning(field1#101,200), None
   +- TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / 
100.0)),mode=Partial,isDistinct=false)], output=[field1#101,sum#111])
  +- Project [field1#101,field2#74]
 +- BroadcastHashJoin [field5#63L], [cast(cast(field4#97 as 
decimal(20,0)) as bigint)], BuildRight
:- ConvertToUnsafe
:  +- HiveTableScan [field2#74,field5#63L], MetastoreRelation foo, 
table1, Some(a), [(ds#57 >= 2013-10-01),(ds#57 <= 2013-12-31)]
+- ConvertToUnsafe
   +- HiveTableScan [field1#101,field4#97], MetastoreRelation foo, 
table2, Some(b)
{code}


Query plan in 2.0
{code}
== Physical Plan ==
*HashAggregate(keys=[field1#160], functions=[sum((field2#133 / 100.0))])
+- Exchange hashpartitioning(field1#160, 200)
   +- *HashAggregate(keys=[field1#160], functions=[partial_sum((field2#133 / 
100.0))])
  +- *Project [field2#133, field1#160]
 +- *BroadcastHashJoin [field5#122L], [cast(cast(field4#156 as 
decimal(20,0)) as bigint)], Inner, BuildRight
:- *Filter isnotnull(field5#122L)
:  +- HiveTableScan [field5#122L, field2#133], MetastoreRelation 
foo, table1, a, [isnotnull(ds#116), (ds#116 >= 2013-10-01), (ds#116 <= 
2013-12-31)]
+- BroadcastExchange 
HashedRelationBroadcastMode(List(cast(cast(input[0, string, false] as 
decimal(20,0)) as bigint)))
   +- *Filter isnotnull(field4#156)
  +- HiveTableScan [field4#156, field1#160], MetastoreRelation 
foo, table2, b
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16827) Query with Join produces excessive amount of shuffle data

2016-08-01 Thread Sital Kedia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16827?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15402461#comment-15402461
 ] 

Sital Kedia commented on SPARK-16827:
-

Performance is worse for this job. But I suspect this is not because of shuffle 
data write. Will update once I figure out the reason.

> Query with Join produces excessive amount of shuffle data
> -
>
> Key: SPARK-16827
> URL: https://issues.apache.org/jira/browse/SPARK-16827
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle, Spark Core
>Affects Versions: 2.0.0
>Reporter: Sital Kedia
>  Labels: performance
>
> One of our hive job which looks like this -
> {code}
>  SELECT  userid
>  FROM  table1 a
>  JOIN table2 b
>   ONa.ds = '2016-07-15'
>   AND  b.ds = '2016-07-15'
>   AND  a.source_id = b.id
> {code}
> After upgrade to Spark 2.0 the job is significantly slow.  Digging a little 
> into it, we found out that one of the stages produces excessive amount of 
> shuffle data.  Please note that this is a regression from Spark 1.6. Stage 2 
> of the job which used to produce 32KB shuffle data with 1.6, now produces 
> more than 400GB with Spark 2.0. We also tried turning off whole stage code 
> generation but that did not help. 
> PS - Even if the intermediate shuffle data size is huge, the job still 
> produces accurate output.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16827) Query with Join produces excessive amount of shuffle data

2016-07-31 Thread Sital Kedia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16827?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15401523#comment-15401523
 ] 

Sital Kedia commented on SPARK-16827:
-

Actually it seems like this is a bug in shuffle write metrics calculation, 
actual amount of shuffle data written might be the same,  because the final 
output is exactly the same. 

> Query with Join produces excessive amount of shuffle data
> -
>
> Key: SPARK-16827
> URL: https://issues.apache.org/jira/browse/SPARK-16827
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle, Spark Core
>Affects Versions: 2.0.0
>Reporter: Sital Kedia
>  Labels: performance
>
> One of our hive job which looks like this -
> {code}
>  SELECT  userid
>  FROM  table1 a
>  JOIN table2 b
>   ONa.ds = '2016-07-15'
>   AND  b.ds = '2016-07-15'
>   AND  a.source_id = b.id
> {code}
> After upgrade to Spark 2.0 the job is significantly slow.  Digging a little 
> into it, we found out that one of the stages produces excessive amount of 
> shuffle data.  Please note that this is a regression from Spark 1.6. Stage 2 
> of the job which used to produce 32KB shuffle data with 1.6, now produces 
> more than 400GB with Spark 2.0. We also tried turning off whole stage code 
> generation but that did not help. 
> PS - Even if the intermediate shuffle data size is huge, the job still 
> produces accurate output.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-16827) Query with Join produces excessive amount of shuffle data

2016-07-31 Thread Sital Kedia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16827?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15401413#comment-15401413
 ] 

Sital Kedia edited comment on SPARK-16827 at 8/1/16 12:52 AM:
--

That is not the case. There is no broadcast join involved, its using shuffle 
join in both 1.6 and 2.0.

Plan in 1.6 -
{code}
Project [target_id#136L AS userid#115L]
+- SortMergeJoin [source_id#135L], [id#140L]
   :- Sort [source_id#135L ASC], false, 0
   :  +- TungstenExchange hashpartitioning(source_id#135L,800), None
   : +- ConvertToUnsafe
   :+- HiveTableScan [target_id#136L,source_id#135L], MetastoreRelation 
foo, table1, Some(a), [(ds#134 = 2016-07-15)]
   +- Sort [id#140L ASC], false, 0
  +- TungstenExchange hashpartitioning(id#140L,800), None
 +- ConvertToUnsafe
+- HiveTableScan [id#140L], MetastoreRelation foo, table2, Some(b), 
[(ds#139 = 2016-07-15)]

{code}

Plan in 2.0  - 

{code}
*Project [target_id#151L AS userid#111L]
+- *SortMergeJoin [source_id#150L], [id#155L], Inner
   :- *Sort [source_id#150L ASC], false, 0
   :  +- Exchange hashpartitioning(source_id#150L, 800)
   : +- *Filter isnotnull(source_id#150L)
   :+- HiveTableScan [source_id#150L, target_id#151L], 
MetastoreRelation foo, table1, a, [isnotnull(ds#149), (ds#149 = 2016-07-15)]
   +- *Sort [id#155L ASC], false, 0
  +- Exchange hashpartitioning(id#155L, 800)
 +- *Filter isnotnull(id#155L)
+- HiveTableScan [id#155L], MetastoreRelation foo, table2, b, 
[isnotnull(ds#154), (ds#154 = 2016-07-15)]
{code}



was (Author: sitalke...@gmail.com):
That is not the case. There is no broadcast join involved, its using shuffle 
join in both 1.6 and 2.0.

Plan in 1.6 -

Project [target_id#136L AS userid#115L]
+- SortMergeJoin [source_id#135L], [id#140L]
   :- Sort [source_id#135L ASC], false, 0
   :  +- TungstenExchange hashpartitioning(source_id#135L,800), None
   : +- ConvertToUnsafe
   :+- HiveTableScan [target_id#136L,source_id#135L], MetastoreRelation 
foo, table1, Some(a), [(ds#134 = 2016-07-15)]
   +- Sort [id#140L ASC], false, 0
  +- TungstenExchange hashpartitioning(id#140L,800), None
 +- ConvertToUnsafe
+- HiveTableScan [id#140L], MetastoreRelation foo, table2, Some(b), 
[(ds#139 = 2016-07-15)]


Plan in 2.0  - 

*Project [target_id#151L AS userid#111L]
+- *SortMergeJoin [source_id#150L], [id#155L], Inner
   :- *Sort [source_id#150L ASC], false, 0
   :  +- Exchange hashpartitioning(source_id#150L, 800)
   : +- *Filter isnotnull(source_id#150L)
   :+- HiveTableScan [source_id#150L, target_id#151L], 
MetastoreRelation foo, table1, a, [isnotnull(ds#149), (ds#149 = 2016-07-15)]
   +- *Sort [id#155L ASC], false, 0
  +- Exchange hashpartitioning(id#155L, 800)
 +- *Filter isnotnull(id#155L)
+- HiveTableScan [id#155L], MetastoreRelation foo, table2, b, 
[isnotnull(ds#154), (ds#154 = 2016-07-15)]

> Query with Join produces excessive amount of shuffle data
> -
>
> Key: SPARK-16827
> URL: https://issues.apache.org/jira/browse/SPARK-16827
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle, Spark Core
>Affects Versions: 2.0.0
>Reporter: Sital Kedia
>  Labels: performance
>
> One of our hive job which looks like this -
> {code}
>  SELECT  userid
>  FROM  table1 a
>  JOIN table2 b
>   ONa.ds = '2016-07-15'
>   AND  b.ds = '2016-07-15'
>   AND  a.source_id = b.id
> {code}
> After upgrade to Spark 2.0 the job is significantly slow.  Digging a little 
> into it, we found out that one of the stages produces excessive amount of 
> shuffle data.  Please note that this is a regression from Spark 1.6. Stage 2 
> of the job which used to produce 32KB shuffle data with 1.6, now produces 
> more than 400GB with Spark 2.0. We also tried turning off whole stage code 
> generation but that did not help. 
> PS - Even if the intermediate shuffle data size is huge, the job still 
> produces accurate output.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16827) Query with Join produces excessive amount of shuffle data

2016-07-31 Thread Sital Kedia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16827?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15401413#comment-15401413
 ] 

Sital Kedia commented on SPARK-16827:
-

That is not the case. There is no broadcast join involved, its using shuffle 
join in both 1.6 and 2.0.

Plan in 1.6 -

Project [target_id#136L AS userid#115L]
+- SortMergeJoin [source_id#135L], [id#140L]
   :- Sort [source_id#135L ASC], false, 0
   :  +- TungstenExchange hashpartitioning(source_id#135L,800), None
   : +- ConvertToUnsafe
   :+- HiveTableScan [target_id#136L,source_id#135L], MetastoreRelation 
foo, table1, Some(a), [(ds#134 = 2016-07-15)]
   +- Sort [id#140L ASC], false, 0
  +- TungstenExchange hashpartitioning(id#140L,800), None
 +- ConvertToUnsafe
+- HiveTableScan [id#140L], MetastoreRelation foo, table2, Some(b), 
[(ds#139 = 2016-07-15)]


Plan in 2.0  - 

*Project [target_id#151L AS userid#111L]
+- *SortMergeJoin [source_id#150L], [id#155L], Inner
   :- *Sort [source_id#150L ASC], false, 0
   :  +- Exchange hashpartitioning(source_id#150L, 800)
   : +- *Filter isnotnull(source_id#150L)
   :+- HiveTableScan [source_id#150L, target_id#151L], 
MetastoreRelation foo, table1, a, [isnotnull(ds#149), (ds#149 = 2016-07-15)]
   +- *Sort [id#155L ASC], false, 0
  +- Exchange hashpartitioning(id#155L, 800)
 +- *Filter isnotnull(id#155L)
+- HiveTableScan [id#155L], MetastoreRelation foo, table2, b, 
[isnotnull(ds#154), (ds#154 = 2016-07-15)]

> Query with Join produces excessive amount of shuffle data
> -
>
> Key: SPARK-16827
> URL: https://issues.apache.org/jira/browse/SPARK-16827
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle, Spark Core
>Affects Versions: 2.0.0
>Reporter: Sital Kedia
>  Labels: performance
>
> One of our hive job which looks like this -
> {code}
>  SELECT  userid
>  FROM  table1 a
>  JOIN table2 b
>   ONa.ds = '2016-07-15'
>   AND  b.ds = '2016-07-15'
>   AND  a.source_id = b.id
> {code}
> After upgrade to Spark 2.0 the job is significantly slow.  Digging a little 
> into it, we found out that one of the stages produces excessive amount of 
> shuffle data.  Please note that this is a regression from Spark 1.6. Stage 2 
> of the job which used to produce 32KB shuffle data with 1.6, now produces 
> more than 400GB with Spark 2.0. We also tried turning off whole stage code 
> generation but that did not help. 
> PS - Even if the intermediate shuffle data size is huge, the job still 
> produces accurate output.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-16827) Query with Join produces excessive amount of shuffle data

2016-07-31 Thread Sital Kedia (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-16827?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sital Kedia updated SPARK-16827:

Description: 
One of our hive job which looks like this -

{code}
 SELECT  userid
 FROM  table1 a
 JOIN table2 b
  ONa.ds = '2016-07-15'
  AND  b.ds = '2016-07-15'
  AND  a.source_id = b.id
{code}

After upgrade to Spark 2.0 the job is significantly slow.  Digging a little 
into it, we found out that one of the stages produces excessive amount of 
shuffle data.  Please note that this is a regression from Spark 1.6. Stage 2 of 
the job which used to produce 32KB shuffle data with 1.6, now produces more 
than 400GB with Spark 2.0. We also tried turning off whole stage code 
generation but that did not help. 

PS - Even if the intermediate shuffle data size is huge, the job still produces 
accurate output.

  was:
One of our hive job which looks like this -

{code}
 SELECT  userid
 FROM  table1 a
 JOIN table2 b
  ONa.ds = '2016-07-15'
  AND  b.ds = '2016-07-15'
  AND  a.source_id = b.id
{code}

After upgrade to Spark 2.0 the job is significantly slow.  Digging a little 
into it, we found out that one of the stages produces excessive amount of 
shuffle data.  Please note that this is a regression from Spark 1.6. Stage 2 of 
the job which used to produce 32KB shuffle data with 1.6, now produces more 
than 400GB with Spark 2.0. We also tried turning off whole stage code 
generation but that did not help.




> Query with Join produces excessive amount of shuffle data
> -
>
> Key: SPARK-16827
> URL: https://issues.apache.org/jira/browse/SPARK-16827
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle, Spark Core
>Affects Versions: 2.0.0
>Reporter: Sital Kedia
>  Labels: performance
>
> One of our hive job which looks like this -
> {code}
>  SELECT  userid
>  FROM  table1 a
>  JOIN table2 b
>   ONa.ds = '2016-07-15'
>   AND  b.ds = '2016-07-15'
>   AND  a.source_id = b.id
> {code}
> After upgrade to Spark 2.0 the job is significantly slow.  Digging a little 
> into it, we found out that one of the stages produces excessive amount of 
> shuffle data.  Please note that this is a regression from Spark 1.6. Stage 2 
> of the job which used to produce 32KB shuffle data with 1.6, now produces 
> more than 400GB with Spark 2.0. We also tried turning off whole stage code 
> generation but that did not help. 
> PS - Even if the intermediate shuffle data size is huge, the job still 
> produces accurate output.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16827) Query with Join produces excessive amount of shuffle data

2016-07-31 Thread Sital Kedia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16827?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15401405#comment-15401405
 ] 

Sital Kedia commented on SPARK-16827:
-

[~rxin] - Any idea how to debug this issue?

> Query with Join produces excessive amount of shuffle data
> -
>
> Key: SPARK-16827
> URL: https://issues.apache.org/jira/browse/SPARK-16827
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle, Spark Core
>Affects Versions: 2.0.0
>Reporter: Sital Kedia
>  Labels: performance
>
> One of our hive job which looks like this -
> {code}
>  SELECT  userid
>  FROM  table1 a
>  JOIN table2 b
>   ONa.ds = '2016-07-15'
>   AND  b.ds = '2016-07-15'
>   AND  a.source_id = b.id
> {code}
> After upgrade to Spark 2.0 the job is significantly slow.  Digging a little 
> into it, we found out that one of the stages produces excessive amount of 
> shuffle data.  Please note that this is a regression from Spark 1.6. Stage 2 
> of the job which used to produce 32KB shuffle data with 1.6, now produces 
> more than 400GB with Spark 2.0. We also tried turning off whole stage code 
> generation but that did not help.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-16827) Query with Join produces excessive shuffle data

2016-07-31 Thread Sital Kedia (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-16827?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sital Kedia updated SPARK-16827:

Description: 
One of our hive job which looks like this -

{code}
 SELECT  userid
 FROM  table1 a
 JOIN table2 b
  ONa.ds = '2016-07-15'
  AND  b.ds = '2016-07-15'
  AND  a.source_id = b.id
{code}

After upgrade to Spark 2.0 the job is significantly slow.  Digging a little 
into it, we found out that one of the stages produces excessive amount of 
shuffle data.  Please note that this is a regression from Spark 1.6. Stage 2 of 
the job which used to produce 32KB shuffle data with 1.6, now produces more 
than 400GB with Spark 2.0. We also tried turning off whole stage code 
generation but that did not help.



  was:
One of our hive job which looks like this -

{code]
 SELECT  userid
 FROM  table1 a
 JOIN table2 b
  ONa.ds = '2016-07-15'
  AND  b.ds = '2016-07-15'
  AND  a.source_id = b.id
{code}

After upgrade to Spark 2.0 the job is significantly slow.  Digging a little 
into it, we found out that one of the stages produces excessive amount of 
shuffle data.  Please note that this is a regression from Spark 1.6. Stage 2 of 
the job which used to produce 32KB shuffle data with 1.6, now produces more 
than 400GB with Spark 2.0. We also tried turning off whole stage code 
generation but that did not help.




> Query with Join produces excessive shuffle data
> ---
>
> Key: SPARK-16827
> URL: https://issues.apache.org/jira/browse/SPARK-16827
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle, Spark Core
>Affects Versions: 2.0.0
>Reporter: Sital Kedia
>  Labels: performance
>
> One of our hive job which looks like this -
> {code}
>  SELECT  userid
>  FROM  table1 a
>  JOIN table2 b
>   ONa.ds = '2016-07-15'
>   AND  b.ds = '2016-07-15'
>   AND  a.source_id = b.id
> {code}
> After upgrade to Spark 2.0 the job is significantly slow.  Digging a little 
> into it, we found out that one of the stages produces excessive amount of 
> shuffle data.  Please note that this is a regression from Spark 1.6. Stage 2 
> of the job which used to produce 32KB shuffle data with 1.6, now produces 
> more than 400GB with Spark 2.0. We also tried turning off whole stage code 
> generation but that did not help.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-16827) Query with Join produces excessive amount of shuffle data

2016-07-31 Thread Sital Kedia (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-16827?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sital Kedia updated SPARK-16827:

Summary: Query with Join produces excessive amount of shuffle data  (was: 
Query with Join produces excessive shuffle data)

> Query with Join produces excessive amount of shuffle data
> -
>
> Key: SPARK-16827
> URL: https://issues.apache.org/jira/browse/SPARK-16827
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle, Spark Core
>Affects Versions: 2.0.0
>Reporter: Sital Kedia
>  Labels: performance
>
> One of our hive job which looks like this -
> {code}
>  SELECT  userid
>  FROM  table1 a
>  JOIN table2 b
>   ONa.ds = '2016-07-15'
>   AND  b.ds = '2016-07-15'
>   AND  a.source_id = b.id
> {code}
> After upgrade to Spark 2.0 the job is significantly slow.  Digging a little 
> into it, we found out that one of the stages produces excessive amount of 
> shuffle data.  Please note that this is a regression from Spark 1.6. Stage 2 
> of the job which used to produce 32KB shuffle data with 1.6, now produces 
> more than 400GB with Spark 2.0. We also tried turning off whole stage code 
> generation but that did not help.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-16827) Query with Join produces excessive shuffle data

2016-07-31 Thread Sital Kedia (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-16827?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sital Kedia updated SPARK-16827:

Description: 
One of our hive job which looks like this -

{code]
 SELECT  userid
 FROM  table1 a
 JOIN table2 b
  ONa.ds = '2016-07-15'
  AND  b.ds = '2016-07-15'
  AND  a.source_id = b.id
{code}

After upgrade to Spark 2.0 the job is significantly slow.  Digging a little 
into it, we found out that one of the stages produces excessive amount of 
shuffle data.  Please note that this is a regression from Spark 1.6. Stage 2 of 
the job which used to produce 32KB shuffle data with 1.6, now produces more 
than 400GB with Spark 2.0. We also tried turning off whole stage code 
generation but that did not help.



  was:
One of our hive job which looks like this -

 SELECT  userid
 FROM  table1 a
 JOIN table2 b
  ONa.ds = '2016-07-15'
  AND  b.ds = '2016-07-15'
  AND  a.source_id = b.id

After upgrade to Spark 2.0 the job is significantly slow.  Digging a little 
into it, we found out that one of the stages produces excessive amount of 
shuffle data.  Please note that this is a regression from Spark 1.6


> Query with Join produces excessive shuffle data
> ---
>
> Key: SPARK-16827
> URL: https://issues.apache.org/jira/browse/SPARK-16827
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle, Spark Core
>Affects Versions: 2.0.0
>Reporter: Sital Kedia
>  Labels: performance
>
> One of our hive job which looks like this -
> {code]
>  SELECT  userid
>  FROM  table1 a
>  JOIN table2 b
>   ONa.ds = '2016-07-15'
>   AND  b.ds = '2016-07-15'
>   AND  a.source_id = b.id
> {code}
> After upgrade to Spark 2.0 the job is significantly slow.  Digging a little 
> into it, we found out that one of the stages produces excessive amount of 
> shuffle data.  Please note that this is a regression from Spark 1.6. Stage 2 
> of the job which used to produce 32KB shuffle data with 1.6, now produces 
> more than 400GB with Spark 2.0. We also tried turning off whole stage code 
> generation but that did not help.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-16827) Query with Join produces excessive shuffle data

2016-07-31 Thread Sital Kedia (JIRA)
Sital Kedia created SPARK-16827:
---

 Summary: Query with Join produces excessive shuffle data
 Key: SPARK-16827
 URL: https://issues.apache.org/jira/browse/SPARK-16827
 Project: Spark
  Issue Type: Bug
  Components: Shuffle, Spark Core
Affects Versions: 2.0.0
Reporter: Sital Kedia


One of our hive job which looks like this -

 SELECT  userid
 FROM  table1 a
 JOIN table2 b
  ONa.ds = '2016-07-15'
  AND  b.ds = '2016-07-15'
  AND  a.source_id = b.id

After upgrade to Spark 2.0 the job is significantly slow.  Digging a little 
into it, we found out that one of the stages produces excessive amount of 
shuffle data.  Please note that this is a regression from Spark 1.6



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-15958) Make initial buffer size for the Sorter configurable

2016-06-14 Thread Sital Kedia (JIRA)
Sital Kedia created SPARK-15958:
---

 Summary: Make initial buffer size for the Sorter configurable
 Key: SPARK-15958
 URL: https://issues.apache.org/jira/browse/SPARK-15958
 Project: Spark
  Issue Type: Bug
Affects Versions: 2.0.0
Reporter: Sital Kedia


Currently the initial buffer size in the sorter is hard coded inside the code 
(https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java#L88)
 and is too small for large workload. As a result, the sorter spends 
significant time expanding the buffer size and copying the data. It would be 
useful to have it configurable. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-15569) Executors spending significant time in DiskObjectWriter.updateBytesWritten function

2016-05-26 Thread Sital Kedia (JIRA)
Sital Kedia created SPARK-15569:
---

 Summary: Executors spending significant time in 
DiskObjectWriter.updateBytesWritten function
 Key: SPARK-15569
 URL: https://issues.apache.org/jira/browse/SPARK-15569
 Project: Spark
  Issue Type: Bug
  Components: Shuffle
Reporter: Sital Kedia


Profiling a Spark job spilling large amount of intermediate data we found that 
significant portion of time is being spent in 
DiskObjectWriter.updateBytesWritten function. Looking at the code 
(https://github.com/sitalkedia/spark/blob/master/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala#L206),
 we see that the function is being called too frequently to update the number 
of bytes written to disk. We should reduce the frequency to avoid this. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-15391) Spark executor OOM during TimSort

2016-05-18 Thread Sital Kedia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15289791#comment-15289791
 ] 

Sital Kedia commented on SPARK-15391:
-

cc- [~davies]  - Any idea how to fix this issue?

> Spark executor OOM during TimSort
> -
>
> Key: SPARK-15391
> URL: https://issues.apache.org/jira/browse/SPARK-15391
> Project: Spark
>  Issue Type: Bug
>Reporter: Sital Kedia
>
> While running a query, we are seeing a lot of executor OOM while doing 
> TimSort.
> Stack trace - 
> {code}
> at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeSortDataFormat.allocate(UnsafeSortDataFormat.java:86)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeSortDataFormat.allocate(UnsafeSortDataFormat.java:32)
>   at 
> org.apache.spark.util.collection.TimSort$SortState.ensureCapacity(TimSort.java:951)
>   at 
> org.apache.spark.util.collection.TimSort$SortState.mergeLo(TimSort.java:699)
>   at 
> org.apache.spark.util.collection.TimSort$SortState.mergeAt(TimSort.java:525)
>   at 
> org.apache.spark.util.collection.TimSort$SortState.mergeCollapse(TimSort.java:453)
>   at 
> org.apache.spark.util.collection.TimSort$SortState.access$200(TimSort.java:325)
>   at org.apache.spark.util.collection.TimSort.sort(TimSort.java:153)
>   at org.apache.spark.util.collection.Sorter.sort(Sorter.scala:37)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.getSortedIterator(UnsafeInMemorySorter.java:235)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:198)
>   at org.apache.spark.memory.MemoryConsumer.spill(MemoryConsumer.java:58)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:356)
>   at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:91)
> {code}
> Out of total 32g available to the executors, we are allocating 24g onheap and 
> 8g onheap memory. Looking at the code 
> (https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java#L87),
>  we see that during TimSort we are allocating the memory buffer onheap 
> irrespective of memory mode that is reason of executor OOM. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-15391) Spark executor OOM during TimSort

2016-05-18 Thread Sital Kedia (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-15391?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sital Kedia updated SPARK-15391:

Description: 
While running a query, we are seeing a lot of executor OOM while doing TimSort.

Stack trace - 

{code}
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeSortDataFormat.allocate(UnsafeSortDataFormat.java:86)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeSortDataFormat.allocate(UnsafeSortDataFormat.java:32)
at 
org.apache.spark.util.collection.TimSort$SortState.ensureCapacity(TimSort.java:951)
at 
org.apache.spark.util.collection.TimSort$SortState.mergeLo(TimSort.java:699)
at 
org.apache.spark.util.collection.TimSort$SortState.mergeAt(TimSort.java:525)
at 
org.apache.spark.util.collection.TimSort$SortState.mergeCollapse(TimSort.java:453)
at 
org.apache.spark.util.collection.TimSort$SortState.access$200(TimSort.java:325)
at org.apache.spark.util.collection.TimSort.sort(TimSort.java:153)
at org.apache.spark.util.collection.Sorter.sort(Sorter.scala:37)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.getSortedIterator(UnsafeInMemorySorter.java:235)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:198)
at org.apache.spark.memory.MemoryConsumer.spill(MemoryConsumer.java:58)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:356)
at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:91)

{code}

Out of total 32g available to the executors, we are allocating 24g onheap and 
8g onheap memory. Looking at the code 
(https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java#L87),
 we see that during TimSort we are allocating the memory buffer onheap 
irrespective of memory mode that is reason of executor OOM. 

  was:
While running a query, we are seeing a lot of executor OOM while doing TimSort.

Stack trace - 


at 
org.apache.spark.util.collection.unsafe.sort.UnsafeSortDataFormat.allocate(UnsafeSortDataFormat.java:86)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeSortDataFormat.allocate(UnsafeSortDataFormat.java:32)
at 
org.apache.spark.util.collection.TimSort$SortState.ensureCapacity(TimSort.java:951)
at 
org.apache.spark.util.collection.TimSort$SortState.mergeLo(TimSort.java:699)
at 
org.apache.spark.util.collection.TimSort$SortState.mergeAt(TimSort.java:525)
at 
org.apache.spark.util.collection.TimSort$SortState.mergeCollapse(TimSort.java:453)
at 
org.apache.spark.util.collection.TimSort$SortState.access$200(TimSort.java:325)
at org.apache.spark.util.collection.TimSort.sort(TimSort.java:153)
at org.apache.spark.util.collection.Sorter.sort(Sorter.scala:37)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.getSortedIterator(UnsafeInMemorySorter.java:235)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:198)
at org.apache.spark.memory.MemoryConsumer.spill(MemoryConsumer.java:58)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:356)
at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:91)

Out of total 32g available to the executors, we are allocating 24g onheap and 
8g onheap memory. Looking at the code 
(https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java#L87),
 we see that during TimSort we are allocating the memory buffer onheap 
irrespective of memory mode that is reason of executor OOM. 


> Spark executor OOM during TimSort
> -
>
> Key: SPARK-15391
> URL: https://issues.apache.org/jira/browse/SPARK-15391
> Project: Spark
>  Issue Type: Bug
>Reporter: Sital Kedia
>
> While running a query, we are seeing a lot of executor OOM while doing 
> TimSort.
> Stack trace - 
> {code}
> at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeSortDataFormat.allocate(UnsafeSortDataFormat.java:86)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeSortDataFormat.allocate(UnsafeSortDataFormat.java:32)
>   at 
> org.apache.spark.util.collection.TimSort$SortState.ensureCapacity(TimSort.java:951)
>   at 
> org.apache.spark.util.collection.TimSort$SortState.mergeLo(TimSort.java:699)
>   at 
> org.apache.spark.util.collection.TimSort$SortState.mergeAt(TimSort.java:525)
>   at 
> org.apache.spark.util.collection.TimSort$SortState.mergeCollapse(TimSort.java:453)
>  

[jira] [Updated] (SPARK-15391) Spark executor OOM during TimSort

2016-05-18 Thread Sital Kedia (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-15391?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sital Kedia updated SPARK-15391:

Description: 
While running a query, we are seeing a lot of executor OOM while doing TimSort.

Stack trace - 


at 
org.apache.spark.util.collection.unsafe.sort.UnsafeSortDataFormat.allocate(UnsafeSortDataFormat.java:86)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeSortDataFormat.allocate(UnsafeSortDataFormat.java:32)
at 
org.apache.spark.util.collection.TimSort$SortState.ensureCapacity(TimSort.java:951)
at 
org.apache.spark.util.collection.TimSort$SortState.mergeLo(TimSort.java:699)
at 
org.apache.spark.util.collection.TimSort$SortState.mergeAt(TimSort.java:525)
at 
org.apache.spark.util.collection.TimSort$SortState.mergeCollapse(TimSort.java:453)
at 
org.apache.spark.util.collection.TimSort$SortState.access$200(TimSort.java:325)
at org.apache.spark.util.collection.TimSort.sort(TimSort.java:153)
at org.apache.spark.util.collection.Sorter.sort(Sorter.scala:37)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.getSortedIterator(UnsafeInMemorySorter.java:235)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:198)
at org.apache.spark.memory.MemoryConsumer.spill(MemoryConsumer.java:58)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:356)
at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:91)

Out of total 32g available to the executors, we are allocating 24g onheap and 
8g onheap memory. Looking at the code 
(https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java#L87),
 we see that during TimSort we are allocating the memory buffer onheap 
irrespective of memory mode that is reason of executor OOM. 

> Spark executor OOM during TimSort
> -
>
> Key: SPARK-15391
> URL: https://issues.apache.org/jira/browse/SPARK-15391
> Project: Spark
>  Issue Type: Bug
>Reporter: Sital Kedia
>
> While running a query, we are seeing a lot of executor OOM while doing 
> TimSort.
> Stack trace - 
> at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeSortDataFormat.allocate(UnsafeSortDataFormat.java:86)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeSortDataFormat.allocate(UnsafeSortDataFormat.java:32)
>   at 
> org.apache.spark.util.collection.TimSort$SortState.ensureCapacity(TimSort.java:951)
>   at 
> org.apache.spark.util.collection.TimSort$SortState.mergeLo(TimSort.java:699)
>   at 
> org.apache.spark.util.collection.TimSort$SortState.mergeAt(TimSort.java:525)
>   at 
> org.apache.spark.util.collection.TimSort$SortState.mergeCollapse(TimSort.java:453)
>   at 
> org.apache.spark.util.collection.TimSort$SortState.access$200(TimSort.java:325)
>   at org.apache.spark.util.collection.TimSort.sort(TimSort.java:153)
>   at org.apache.spark.util.collection.Sorter.sort(Sorter.scala:37)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.getSortedIterator(UnsafeInMemorySorter.java:235)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:198)
>   at org.apache.spark.memory.MemoryConsumer.spill(MemoryConsumer.java:58)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:356)
>   at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:91)
> Out of total 32g available to the executors, we are allocating 24g onheap and 
> 8g onheap memory. Looking at the code 
> (https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java#L87),
>  we see that during TimSort we are allocating the memory buffer onheap 
> irrespective of memory mode that is reason of executor OOM. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-15391) Spark executor OOM during TimSort

2016-05-18 Thread Sital Kedia (JIRA)
Sital Kedia created SPARK-15391:
---

 Summary: Spark executor OOM during TimSort
 Key: SPARK-15391
 URL: https://issues.apache.org/jira/browse/SPARK-15391
 Project: Spark
  Issue Type: Bug
Reporter: Sital Kedia






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-13850) TimSort Comparison method violates its general contract

2016-05-17 Thread Sital Kedia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15286906#comment-15286906
 ] 

Sital Kedia commented on SPARK-13850:
-

I am not 100% sure of the root cause, but I suspect this is happening when JVM 
is trying to allocate very large size buffer for pointer array. The issue might 
be because the JVM is not able to allocate large buffer in contiguous memory 
location on heap and since the unsafe operations assume contiguous memory 
location of the objects, any unsafe operation on large buffer results in memory 
corruption which manifests as TimSort issue.

> TimSort Comparison method violates its general contract
> ---
>
> Key: SPARK-13850
> URL: https://issues.apache.org/jira/browse/SPARK-13850
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 1.6.0
>Reporter: Sital Kedia
>
> While running a query which does a group by on a large dataset, the query 
> fails with following stack trace. 
> {code}
> Job aborted due to stage failure: Task 4077 in stage 1.3 failed 4 times, most 
> recent failure: Lost task 4077.3 in stage 1.3 (TID 88702, 
> hadoop3030.prn2.facebook.com): java.lang.IllegalArgumentException: Comparison 
> method violates its general contract!
>   at 
> org.apache.spark.util.collection.TimSort$SortState.mergeLo(TimSort.java:794)
>   at 
> org.apache.spark.util.collection.TimSort$SortState.mergeAt(TimSort.java:525)
>   at 
> org.apache.spark.util.collection.TimSort$SortState.mergeCollapse(TimSort.java:453)
>   at 
> org.apache.spark.util.collection.TimSort$SortState.access$200(TimSort.java:325)
>   at org.apache.spark.util.collection.TimSort.sort(TimSort.java:153)
>   at org.apache.spark.util.collection.Sorter.sort(Sorter.scala:37)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.getSortedIterator(UnsafeInMemorySorter.java:228)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:186)
>   at 
> org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:175)
>   at 
> org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:249)
>   at 
> org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:112)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:318)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:333)
>   at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:91)
>   at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:168)
>   at org.apache.spark.sql.execution.Sort$$anonfun$1.apply(Sort.scala:90)
>   at org.apache.spark.sql.execution.Sort$$anonfun$1.apply(Sort.scala:64)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:728)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:728)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>   at org.apache.spark.scheduler.Task.run(Task.scala:89)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> Please note that the same query used t

[jira] [Commented] (SPARK-13850) TimSort Comparison method violates its general contract

2016-05-13 Thread Sital Kedia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15283110#comment-15283110
 ] 

Sital Kedia commented on SPARK-13850:
-

I have found a workaround for this issue. Please take a look at the PR. 

> TimSort Comparison method violates its general contract
> ---
>
> Key: SPARK-13850
> URL: https://issues.apache.org/jira/browse/SPARK-13850
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 1.6.0
>Reporter: Sital Kedia
>
> While running a query which does a group by on a large dataset, the query 
> fails with following stack trace. 
> {code}
> Job aborted due to stage failure: Task 4077 in stage 1.3 failed 4 times, most 
> recent failure: Lost task 4077.3 in stage 1.3 (TID 88702, 
> hadoop3030.prn2.facebook.com): java.lang.IllegalArgumentException: Comparison 
> method violates its general contract!
>   at 
> org.apache.spark.util.collection.TimSort$SortState.mergeLo(TimSort.java:794)
>   at 
> org.apache.spark.util.collection.TimSort$SortState.mergeAt(TimSort.java:525)
>   at 
> org.apache.spark.util.collection.TimSort$SortState.mergeCollapse(TimSort.java:453)
>   at 
> org.apache.spark.util.collection.TimSort$SortState.access$200(TimSort.java:325)
>   at org.apache.spark.util.collection.TimSort.sort(TimSort.java:153)
>   at org.apache.spark.util.collection.Sorter.sort(Sorter.scala:37)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.getSortedIterator(UnsafeInMemorySorter.java:228)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:186)
>   at 
> org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:175)
>   at 
> org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:249)
>   at 
> org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:112)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:318)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:333)
>   at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:91)
>   at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:168)
>   at org.apache.spark.sql.execution.Sort$$anonfun$1.apply(Sort.scala:90)
>   at org.apache.spark.sql.execution.Sort$$anonfun$1.apply(Sort.scala:64)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:728)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:728)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>   at org.apache.spark.scheduler.Task.run(Task.scala:89)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> Please note that the same query used to succeed in Spark 1.5 so it seems like 
> a regression in 1.6.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-15233) Spark task metrics should include hdfs read write latency

2016-05-09 Thread Sital Kedia (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-15233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sital Kedia updated SPARK-15233:

Affects Version/s: 1.6.1
 Priority: Minor  (was: Major)
  Description: Currently the Spark task metrics does not have hdfs 
read/write latency. It will be very useful to have those to find the bottleneck 
in the query.
   Issue Type: Improvement  (was: Bug)
  Summary: Spark task metrics should include hdfs read write 
latency  (was: Spark UI should show metrics for hdfs read write latency)

> Spark task metrics should include hdfs read write latency
> -
>
> Key: SPARK-15233
> URL: https://issues.apache.org/jira/browse/SPARK-15233
> Project: Spark
>  Issue Type: Improvement
>Affects Versions: 1.6.1
>Reporter: Sital Kedia
>Priority: Minor
>
> Currently the Spark task metrics does not have hdfs read/write latency. It 
> will be very useful to have those to find the bottleneck in the query.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-15233) Spark UI should show metrics for hdfs read write latency

2016-05-09 Thread Sital Kedia (JIRA)
Sital Kedia created SPARK-15233:
---

 Summary: Spark UI should show metrics for hdfs read write latency
 Key: SPARK-15233
 URL: https://issues.apache.org/jira/browse/SPARK-15233
 Project: Spark
  Issue Type: Bug
Reporter: Sital Kedia






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



  1   2   >