[jira] [Commented] (SPARK-23722) Add support for inserting empty map or array to table
[ https://issues.apache.org/jira/browse/SPARK-23722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16405181#comment-16405181 ] Sital Kedia commented on SPARK-23722: - Actually this is a duplicate of SPARK-21281 and the issue has been resolved in 2.3. Closing > Add support for inserting empty map or array to table > - > > Key: SPARK-23722 > URL: https://issues.apache.org/jira/browse/SPARK-23722 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.0.2 >Reporter: Sital Kedia >Priority: Minor > > A query like following fails in spark currently - > ``` > CREATE TABLE arrayTypeTable (data array) > INSERT OVERWRITE TABLE arrayTypeTable select array() > ``` > This is due to Spark is not allowing casting an empty map/array to > array. This is a use case currently supported by both Hive and Presto > and we need to support this in order to make Spark compatible with existing > Hive queries. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-23722) Add support for inserting empty map or array to table
[ https://issues.apache.org/jira/browse/SPARK-23722?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sital Kedia resolved SPARK-23722. - Resolution: Duplicate > Add support for inserting empty map or array to table > - > > Key: SPARK-23722 > URL: https://issues.apache.org/jira/browse/SPARK-23722 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.0.2 >Reporter: Sital Kedia >Priority: Minor > > A query like following fails in spark currently - > ``` > CREATE TABLE arrayTypeTable (data array) > INSERT OVERWRITE TABLE arrayTypeTable select array() > ``` > This is due to Spark is not allowing casting an empty map/array to > array. This is a use case currently supported by both Hive and Presto > and we need to support this in order to make Spark compatible with existing > Hive queries. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23722) Add support for inserting empty map or array to table
Sital Kedia created SPARK-23722: --- Summary: Add support for inserting empty map or array to table Key: SPARK-23722 URL: https://issues.apache.org/jira/browse/SPARK-23722 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.0.2 Reporter: Sital Kedia A query like following fails in spark currently - ``` CREATE TABLE arrayTypeTable (data array) INSERT OVERWRITE TABLE arrayTypeTable select array() ``` This is due to Spark is not allowing casting an empty map/array to array. This is a use case currently supported by both Hive and Presto and we need to support this in order to make Spark compatible with existing Hive queries. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18134) SQL: MapType in Group BY and Joins not working
[ https://issues.apache.org/jira/browse/SPARK-18134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16394571#comment-16394571 ] Sital Kedia commented on SPARK-18134: - [~hvanhovell]- What is the state of this JIRA? Do we expect to make progress on this any time soon? This is a very critical feature we are missing in Spark which is blocking us from auto-migrating HiveQL to Spark. Please let us know if anything we can do to help resolve this issue. cc - [~sameerag], [~tejasp] > SQL: MapType in Group BY and Joins not working > -- > > Key: SPARK-18134 > URL: https://issues.apache.org/jira/browse/SPARK-18134 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.6.0, 1.6.1, 1.6.2, 2.0.0, 2.0.1, > 2.1.0 >Reporter: Christian Zorneck >Priority: Major > > Since version 1.5 and issue SPARK-9415, MapTypes can no longer be used in > GROUP BY and join clauses. This makes it incompatible to HiveQL. So, a Hive > feature was removed from Spark. This makes Spark incompatible to various > HiveQL statements. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23502) Support async init of spark context during spark-shell startup
[ https://issues.apache.org/jira/browse/SPARK-23502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16380942#comment-16380942 ] Sital Kedia commented on SPARK-23502: - >> what happens when you operate on {{sc}} before it's initialized? We will wait for the future to complete before triggering any action based on user input, so that should be fine. >> Is it surprising if Spark shell starts but errors out 20 seconds later? Yes, that might be one of the side effects. Another major side effect as I mentioned is not able to print the messages like UI link and app id when the spark-shell starts. I just wanted to get some opinion to see if this is something useful for the community. If we do not think so, we can close this. > Support async init of spark context during spark-shell startup > -- > > Key: SPARK-23502 > URL: https://issues.apache.org/jira/browse/SPARK-23502 > Project: Spark > Issue Type: Improvement > Components: Spark Shell >Affects Versions: 2.0.0 >Reporter: Sital Kedia >Priority: Minor > > Currently, whenever a user starts the spark shell, we initialize the spark > context before returning the prompt to the user. In environments, where spark > context initialization takes several seconds, it is not a very good user > experience for the user to wait for the prompt. Instead of waiting for the > initialization of spark context, we can initialize it in the background while > we return the prompt to the user as soon as possible. Please note that even > if we return the prompt to the user soon, we still need to make sure to wait > for the spark context initialization to complete before any query is > executed. > Please note that the scala interpreter already does very similar async > initialization in order to return the prompt to the user faster - > https://github.com/scala/scala/blob/v2.12.2/src/repl/scala/tools/nsc/interpreter/ILoop.scala#L414. > We will be emulating the behavior for Spark. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23502) Support async init of spark context during spark-shell startup
[ https://issues.apache.org/jira/browse/SPARK-23502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16375051#comment-16375051 ] Sital Kedia commented on SPARK-23502: - I realized that we are printing the web url link and the application id during the spark-shell startup (https://github.com/scala/scala/blob/v2.12.2/src/repl/scala/tools/nsc/interpreter/ILoop.scala#L414). If we do asynchronous initialization of spark context, those info will not be available during the startup so we won't be able to print them. [~r...@databricks.com], [~srowen] - What do you think about async initialization of spark context and getting rid of the web url link and application id that is printed during the startup? > Support async init of spark context during spark-shell startup > -- > > Key: SPARK-23502 > URL: https://issues.apache.org/jira/browse/SPARK-23502 > Project: Spark > Issue Type: Improvement > Components: Spark Shell >Affects Versions: 2.0.0 >Reporter: Sital Kedia >Priority: Minor > > Currently, whenever a user starts the spark shell, we initialize the spark > context before returning the prompt to the user. In environments, where spark > context initialization takes several seconds, it is not a very good user > experience for the user to wait for the prompt. Instead of waiting for the > initialization of spark context, we can initialize it in the background while > we return the prompt to the user as soon as possible. Please note that even > if we return the prompt to the user soon, we still need to make sure to wait > for the spark context initialization to complete before any query is > executed. > Please note that the scala interpreter already does very similar async > initialization in order to return the prompt to the user faster - > https://github.com/scala/scala/blob/v2.12.2/src/repl/scala/tools/nsc/interpreter/ILoop.scala#L414. > We will be emulating the behavior for Spark. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23502) Support async init of spark context during spark-shell startup
Sital Kedia created SPARK-23502: --- Summary: Support async init of spark context during spark-shell startup Key: SPARK-23502 URL: https://issues.apache.org/jira/browse/SPARK-23502 Project: Spark Issue Type: Improvement Components: Spark Shell Affects Versions: 2.0.0 Reporter: Sital Kedia Currently, whenever a user starts the spark shell, we initialize the spark context before returning the prompt to the user. In environments, where spark context initialization takes several seconds, it is not a very good user experience for the user to wait for the prompt. Instead of waiting for the initialization of spark context, we can initialize it in the background while we return the prompt to the user as soon as possible. Please note that even if we return the prompt to the user soon, we still need to make sure to wait for the spark context initialization to complete before any query is executed. Please note that the scala interpreter already does very similar async initialization in order to return the prompt to the user faster - https://github.com/scala/scala/blob/v2.12.2/src/repl/scala/tools/nsc/interpreter/ILoop.scala#L414. We will be emulating the behavior for Spark. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23310) Perf regression introduced by SPARK-21113
[ https://issues.apache.org/jira/browse/SPARK-23310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16351097#comment-16351097 ] Sital Kedia commented on SPARK-23310: - https://github.com/apache/spark/pull/20492 > Perf regression introduced by SPARK-21113 > - > > Key: SPARK-23310 > URL: https://issues.apache.org/jira/browse/SPARK-23310 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: Yin Huai >Priority: Blocker > > While running all TPC-DS queries with SF set to 1000, we noticed that Q95 > (https://github.com/databricks/spark-sql-perf/blob/master/src/main/resources/tpcds_2_4/q95.sql) > has noticeable regression (11%). After looking into it, we found that the > regression was introduced by SPARK-21113. Specially, ReadAheadInputStream > gets lock congestion. After setting > spark.unsafe.sorter.spill.read.ahead.enabled set to false, the regression > disappear and the overall performance of all TPC-DS queries has improved. > > I am proposing that we set spark.unsafe.sorter.spill.read.ahead.enabled to > false by default for Spark 2.3 and re-enable it after addressing the lock > congestion issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23310) Perf regression introduced by SPARK-21113
[ https://issues.apache.org/jira/browse/SPARK-23310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16350875#comment-16350875 ] Sital Kedia commented on SPARK-23310: - [~yhuai] - Sorry about introducing the regression for TPC-DS queries. Let's set spark.unsafe.sorter.spill.read.ahead.enabled to false to unblock the release. Let me know if you want me to make a PR or you can do it yourself. >> ReadAheadInputStream gets lock congestion BTW, do you have idea where the lock contention is occuring? I will file a separate Jira and work on fixing it. > Perf regression introduced by SPARK-21113 > - > > Key: SPARK-23310 > URL: https://issues.apache.org/jira/browse/SPARK-23310 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: Yin Huai >Priority: Blocker > > While running all TPC-DS queries with SF set to 1000, we noticed that Q95 > (https://github.com/databricks/spark-sql-perf/blob/master/src/main/resources/tpcds_2_4/q95.sql) > has noticeable regression (11%). After looking into it, we found that the > regression was introduced by SPARK-21113. Specially, ReadAheadInputStream > gets lock congestion. After setting > spark.unsafe.sorter.spill.read.ahead.enabled set to false, the regression > disappear and the overall performance of all TPC-DS queries has improved. > > I am proposing that we set spark.unsafe.sorter.spill.read.ahead.enabled to > false by default for Spark 2.3 and re-enable it after addressing the lock > congestion issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-22827) Avoid throwing OutOfMemoryError in case of exception in spill
Sital Kedia created SPARK-22827: --- Summary: Avoid throwing OutOfMemoryError in case of exception in spill Key: SPARK-22827 URL: https://issues.apache.org/jira/browse/SPARK-22827 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.2.0 Reporter: Sital Kedia Currently, the task memory manager throws an OutofMemory error when there is an IO exception happens in spill() - https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java#L194. Similarly there any many other places in code when if a task is not able to acquire memory due to an exception we throw an OutofMemory error which kills the entire executor and hence failing all the tasks that are running on that executor instead of just failing one single task. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-22312) Spark job stuck with no executor due to bug in Executor Allocation Manager
Sital Kedia created SPARK-22312: --- Summary: Spark job stuck with no executor due to bug in Executor Allocation Manager Key: SPARK-22312 URL: https://issues.apache.org/jira/browse/SPARK-22312 Project: Spark Issue Type: Bug Components: Scheduler Affects Versions: 2.2.0 Reporter: Sital Kedia We often see the issue of Spark jobs stuck because the Executor Allocation Manager does not ask for any executor even if there are pending tasks in case dynamic allocation is turned on. Looking at the logic in EAM which calculates the running tasks, it can happen that the calculation will be wrong and the number of running tasks can become negative. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21867) Support async spilling in UnsafeShuffleWriter
[ https://issues.apache.org/jira/browse/SPARK-21867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16163705#comment-16163705 ] Sital Kedia commented on SPARK-21867: - [~rxin] - You are right, it is very tricky to get it right. Working on the PR now, will have it in few days. > Support async spilling in UnsafeShuffleWriter > - > > Key: SPARK-21867 > URL: https://issues.apache.org/jira/browse/SPARK-21867 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.2.0 >Reporter: Sital Kedia >Priority: Minor > > Currently, Spark tasks are single-threaded. But we see it could greatly > improve the performance of the jobs, if we can multi-thread some part of it. > For example, profiling our map tasks, which reads large amount of data from > HDFS and spill to disks, we see that we are blocked on HDFS read and spilling > majority of the time. Since both these operations are IO intensive the > average CPU consumption during map phase is significantly low. In theory, > both HDFS read and spilling can be done in parallel if we had additional > memory to store data read from HDFS while we are spilling the last batch read. > Let's say we have 1G of shuffle memory available per task. Currently, in case > of map task, it reads from HDFS and the records are stored in the available > memory buffer. Once we hit the memory limit and there is no more space to > store the records, we sort and spill the content to disk. While we are > spilling to disk, since we do not have any available memory, we can not read > from HDFS concurrently. > Here we propose supporting async spilling for UnsafeShuffleWriter, so that we > can support reading from HDFS when sort and spill is happening > asynchronously. Let's say the total 1G of shuffle memory can be split into > two regions - active region and spilling region - each of size 500 MB. We > start with reading from HDFS and filling the active region. Once we hit the > limit of active region, we issue an asynchronous spill, while fliping the > active region and spilling region. While the spil is happening > asynchronosuly, we still have 500 MB of memory available to read the data > from HDFS. This way we can amortize the high disk/network io cost during > spilling. > We made a prototype hack to implement this feature and we could see our map > tasks were as much as 40% faster. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21867) Support async spilling in UnsafeShuffleWriter
[ https://issues.apache.org/jira/browse/SPARK-21867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16146060#comment-16146060 ] Sital Kedia commented on SPARK-21867: - cc - [~rxin], [~joshrosen], [~sameer] - What do you think of the idea? > Support async spilling in UnsafeShuffleWriter > - > > Key: SPARK-21867 > URL: https://issues.apache.org/jira/browse/SPARK-21867 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.2.0 >Reporter: Sital Kedia >Priority: Minor > > Currently, Spark tasks are single-threaded. But we see it could greatly > improve the performance of the jobs, if we can multi-thread some part of it. > For example, profiling our map tasks, which reads large amount of data from > HDFS and spill to disks, we see that we are blocked on HDFS read and spilling > majority of the time. Since both these operations are IO intensive the > average CPU consumption during map phase is significantly low. In theory, > both HDFS read and spilling can be done in parallel if we had additional > memory to store data read from HDFS while we are spilling the last batch read. > Let's say we have 1G of shuffle memory available per task. Currently, in case > of map task, it reads from HDFS and the records are stored in the available > memory buffer. Once we hit the memory limit and there is no more space to > store the records, we sort and spill the content to disk. While we are > spilling to disk, since we do not have any available memory, we can not read > from HDFS concurrently. > Here we propose supporting async spilling for UnsafeShuffleWriter, so that we > can support reading from HDFS when sort and spill is happening > asynchronously. Let's say the total 1G of shuffle memory can be split into > two regions - active region and spilling region - each of size 500 MB. We > start with reading from HDFS and filling the active region. Once we hit the > limit of active region, we issue an asynchronous spill, while fliping the > active region and spilling region. While the spil is happening > asynchronosuly, we still have 500 MB of memory available to read the data > from HDFS. This way we can amortize the high disk/network io cost during > spilling. > We made a prototype hack to implement this feature and we could see our map > tasks were as much as 40% faster. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-21867) Support async spilling in UnsafeShuffleWriter
Sital Kedia created SPARK-21867: --- Summary: Support async spilling in UnsafeShuffleWriter Key: SPARK-21867 URL: https://issues.apache.org/jira/browse/SPARK-21867 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 2.2.0 Reporter: Sital Kedia Priority: Minor Currently, Spark tasks are single-threaded. But we see it could greatly improve the performance of the jobs, if we can multi-thread some part of it. For example, profiling our map tasks, which reads large amount of data from HDFS and spill to disks, we see that we are blocked on HDFS read and spilling majority of the time. Since both these operations are IO intensive the average CPU consumption during map phase is significantly low. In theory, both HDFS read and spilling can be done in parallel if we had additional memory to store data read from HDFS while we are spilling the last batch read. Let's say we have 1G of shuffle memory available per task. Currently, in case of map task, it reads from HDFS and the records are stored in the available memory buffer. Once we hit the memory limit and there is no more space to store the records, we sort and spill the content to disk. While we are spilling to disk, since we do not have any available memory, we can not read from HDFS concurrently. Here we propose supporting async spilling for UnsafeShuffleWriter, so that we can support reading from HDFS when sort and spill is happening asynchronously. Let's say the total 1G of shuffle memory can be split into two regions - active region and spilling region - each of size 500 MB. We start with reading from HDFS and filling the active region. Once we hit the limit of active region, we issue an asynchronous spill, while fliping the active region and spilling region. While the spil is happening asynchronosuly, we still have 500 MB of memory available to read the data from HDFS. This way we can amortize the high disk/network io cost during spilling. We made a prototype hack to implement this feature and we could see our map tasks were as much as 40% faster. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-21834) Incorrect executor request in case of dynamic allocation
Sital Kedia created SPARK-21834: --- Summary: Incorrect executor request in case of dynamic allocation Key: SPARK-21834 URL: https://issues.apache.org/jira/browse/SPARK-21834 Project: Spark Issue Type: Bug Components: Scheduler Affects Versions: 2.2.0 Reporter: Sital Kedia killExecutor api currently does not allow killing an executor without updating the total number of executors needed. In case of dynamic allocation is turned on and the allocator tries to kill an executor, the scheduler reduces the total number of executors needed ( see https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala#L635) which is incorrect because the allocator already takes care of setting the required number of executors itself. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Closed] (SPARK-21833) CoarseGrainedSchedulerBackend leaks executors in case of dynamic allocation
[ https://issues.apache.org/jira/browse/SPARK-21833?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sital Kedia closed SPARK-21833. --- Resolution: Duplicate Duplicate of SPARK-20540 > CoarseGrainedSchedulerBackend leaks executors in case of dynamic allocation > --- > > Key: SPARK-21833 > URL: https://issues.apache.org/jira/browse/SPARK-21833 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 2.2.0 >Reporter: Sital Kedia > > We have seen this issue in coarse grained scheduler that in case of dynamic > executor allocation is turned on, the scheduler asks for more executors than > needed. Consider the following situation where there are excutor allocation > manager is ramping down the number of executors. It will lower the executor > targer number by calling requestTotalExecutors api. > Later, when the allocation manager finds some executors to be idle, it will > call killExecutor api. The coarse grain scheduler, in the killExecutor > function replaces the total executor needed to current + pending which > overrides the earlier target set by the allocation manager. > This results in scheduler spawning more executors than actually needed. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21833) CoarseGrainedSchedulerBackend leaks executors in case of dynamic allocation
[ https://issues.apache.org/jira/browse/SPARK-21833?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sital Kedia updated SPARK-21833: Description: We have seen this issue in coarse grained scheduler that in case of dynamic executor allocation is turned on, the scheduler asks for more executors than needed. Consider the following situation where there are excutor allocation manager is ramping down the number of executors. It will lower the executor targer number by calling requestTotalExecutors api. Later, when the allocation manager finds some executors to be idle, it will call killExecutor api. The coarse grain scheduler, in the killExecutor function replaces the total executor needed to current + pending which overrides the earlier target set by the allocation manager. This results in scheduler spawning more executors than actually needed. was: We have seen this issue in coarse grained scheduler that in case of dynamic executor allocation is turned on, the scheduler asks for more executors than needed. Consider the following situation where there are excutor allocation manager is ramping down the number of executors. It will lower the executor targer number by calling requestTotalExecutors api (see https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L326. Later, when the allocation manager finds some executors to be idle, it will call killExecutor api (https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L447). The coarse grain scheduler, in the killExecutor function replaces the total executor needed to current + pending which overrides the earlier target set by the allocation manager https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala#L523. This results in scheduler spawning more executors than actually needed. > CoarseGrainedSchedulerBackend leaks executors in case of dynamic allocation > --- > > Key: SPARK-21833 > URL: https://issues.apache.org/jira/browse/SPARK-21833 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 2.2.0 >Reporter: Sital Kedia > > We have seen this issue in coarse grained scheduler that in case of dynamic > executor allocation is turned on, the scheduler asks for more executors than > needed. Consider the following situation where there are excutor allocation > manager is ramping down the number of executors. It will lower the executor > targer number by calling requestTotalExecutors api. > Later, when the allocation manager finds some executors to be idle, it will > call killExecutor api. The coarse grain scheduler, in the killExecutor > function replaces the total executor needed to current + pending which > overrides the earlier target set by the allocation manager. > This results in scheduler spawning more executors than actually needed. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21833) CoarseGrainedSchedulerBackend leaks executors in case of dynamic allocation
[ https://issues.apache.org/jira/browse/SPARK-21833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16140533#comment-16140533 ] Sital Kedia commented on SPARK-21833: - Actually, SPARK-20540 already addressed this issue on latest trunk. Will close this issue. > CoarseGrainedSchedulerBackend leaks executors in case of dynamic allocation > --- > > Key: SPARK-21833 > URL: https://issues.apache.org/jira/browse/SPARK-21833 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 2.2.0 >Reporter: Sital Kedia > > We have seen this issue in coarse grained scheduler that in case of dynamic > executor allocation is turned on, the scheduler asks for more executors than > needed. Consider the following situation where there are excutor allocation > manager is ramping down the number of executors. It will lower the executor > targer number by calling requestTotalExecutors api. > Later, when the allocation manager finds some executors to be idle, it will > call killExecutor api. The coarse grain scheduler, in the killExecutor > function replaces the total executor needed to current + pending which > overrides the earlier target set by the allocation manager. > This results in scheduler spawning more executors than actually needed. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21833) CoarseGrainedSchedulerBackend leaks executors in case of dynamic allocation
[ https://issues.apache.org/jira/browse/SPARK-21833?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sital Kedia updated SPARK-21833: Description: We have seen this issue in coarse grained scheduler that in case of dynamic executor allocation is turned on, the scheduler asks for more executors than needed. Consider the following situation where there are excutor allocation manager is ramping down the number of executors. It will lower the executor targer number by calling requestTotalExecutors api (see https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L326. Later, when the allocation manager finds some executors to be idle, it will call killExecutor api (https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L447). The coarse grain scheduler, in the killExecutor function replaces the total executor needed to current + pending which overrides the earlier target set by the allocation manager https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala#L523. This results in scheduler spawning more executors than actually needed. was: We have seen this issue in coarse grained scheduler that in case of dynamic executor allocation is turned on, the scheduler asks for more executors than needed. Consider the following situation where there are excutor allocation manager is ramping down the number of executors. It will lower the executor targer number by calling requestTotalExecutors api (see https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L326. Later, when the allocation manager finds some executors to be idle, it will call killExecutor api (https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L421). The coarse grain scheduler, in the killExecutor function replaces the total executor needed to current + pending which overrides the earlier target set by the allocation manager https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala#L523. This results in scheduler spawning more executors than actually needed. > CoarseGrainedSchedulerBackend leaks executors in case of dynamic allocation > --- > > Key: SPARK-21833 > URL: https://issues.apache.org/jira/browse/SPARK-21833 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 2.2.0 >Reporter: Sital Kedia > > We have seen this issue in coarse grained scheduler that in case of dynamic > executor allocation is turned on, the scheduler asks for more executors than > needed. Consider the following situation where there are excutor allocation > manager is ramping down the number of executors. It will lower the executor > targer number by calling requestTotalExecutors api (see > https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L326. > > Later, when the allocation manager finds some executors to be idle, it will > call killExecutor api > (https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L447). > The coarse grain scheduler, in the killExecutor function replaces the total > executor needed to current + pending which overrides the earlier target set > by the allocation manager > https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala#L523. > > This results in scheduler spawning more executors than actually needed. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21833) CoarseGrainedSchedulerBackend leaks executors in case of dynamic allocation
[ https://issues.apache.org/jira/browse/SPARK-21833?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sital Kedia updated SPARK-21833: Description: We have seen this issue in coarse grained scheduler that in case of dynamic executor allocation is turned on, the scheduler asks for more executors than needed. Consider the following situation where there are excutor allocation manager is ramping down the number of executors. It will lower the executor targer number by calling requestTotalExecutors api (see https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L326. Later, when the allocation manager finds some executors to be idle, it will call killExecutor api (https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L421). The coarse grain scheduler, in the killExecutor function replaces the total executor needed to current + pending which overrides the earlier target set by the allocation manager https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala#L523. This results in scheduler spawning more executors than actually needed. was: We have seen this issue in coarse grained scheduler that in case of dynamic executor allocation is turned on, the scheduler asks for more executors than needed. Consider the following situation where there are excutor allocation manager is ramping down the number of executors. It will lower the executor targer number by calling requestTotalExecutors api (see https://github.com/sitalkedia/spark/blob/master/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L321). Later, when the allocation manager finds some executors to be idle, it will call killExecutor api (https://github.com/sitalkedia/spark/blob/master/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L421). The coarse grain scheduler, in the killExecutor function replaces the total executor needed to current + pending which overrides the earlier target set by the allocation manager https://github.com/sitalkedia/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala#L523. This results in scheduler spawning more executors than actually needed. > CoarseGrainedSchedulerBackend leaks executors in case of dynamic allocation > --- > > Key: SPARK-21833 > URL: https://issues.apache.org/jira/browse/SPARK-21833 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 2.2.0 >Reporter: Sital Kedia > > We have seen this issue in coarse grained scheduler that in case of dynamic > executor allocation is turned on, the scheduler asks for more executors than > needed. Consider the following situation where there are excutor allocation > manager is ramping down the number of executors. It will lower the executor > targer number by calling requestTotalExecutors api (see > https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L326. > > Later, when the allocation manager finds some executors to be idle, it will > call killExecutor api > (https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L421). > The coarse grain scheduler, in the killExecutor function replaces the total > executor needed to current + pending which overrides the earlier target set > by the allocation manager > https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala#L523. > > This results in scheduler spawning more executors than actually needed. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-21833) CoarseGrainedSchedulerBackend leaks executors in case of dynamic allocation
Sital Kedia created SPARK-21833: --- Summary: CoarseGrainedSchedulerBackend leaks executors in case of dynamic allocation Key: SPARK-21833 URL: https://issues.apache.org/jira/browse/SPARK-21833 Project: Spark Issue Type: Bug Components: Scheduler Affects Versions: 2.2.0 Reporter: Sital Kedia We have seen this issue in coarse grained scheduler that in case of dynamic executor allocation is turned on, the scheduler asks for more executors than needed. Consider the following situation where there are excutor allocation manager is ramping down the number of executors. It will lower the executor targer number by calling requestTotalExecutors api (see https://github.com/sitalkedia/spark/blob/master/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L321). Later, when the allocation manager finds some executors to be idle, it will call killExecutor api (https://github.com/sitalkedia/spark/blob/master/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L421). The coarse grain scheduler, in the killExecutor function replaces the total executor needed to current + pending which overrides the earlier target set by the allocation manager https://github.com/sitalkedia/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala#L523. This results in scheduler spawning more executors than actually needed. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19112) add codec for ZStandard
[ https://issues.apache.org/jira/browse/SPARK-19112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16109784#comment-16109784 ] Sital Kedia commented on SPARK-19112: - [~sowen], [~tgraves] - Using zstd compression for our Spark jobs spilling 100s of TBs of data, we could reduce the amount of data written to disk by as much as 50%. This translates to significant latency gain because of reduced disk io operations. There is a degradation CPU time by 2 - 5% because of zstd compression overhead, but for jobs which are bottlenecked by disk IO, this hit can be taken. We are going to enable Zstd as default compression for all of our jobs. We should support zstd in open source Spark as well. I will reopen the PR with some minor changes. > add codec for ZStandard > --- > > Key: SPARK-19112 > URL: https://issues.apache.org/jira/browse/SPARK-19112 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Reporter: Thomas Graves >Priority: Minor > > ZStandard: https://github.com/facebook/zstd and > http://facebook.github.io/zstd/ has been in use for a while now. v1.0 was > recently released. Hadoop > (https://issues.apache.org/jira/browse/HADOOP-13578) and others > (https://issues.apache.org/jira/browse/KAFKA-4514) are adopting it. > Zstd seems to give great results => Gzip level Compression with Lz4 level CPU. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21444) Fetch failure due to node reboot causes job failure
[ https://issues.apache.org/jira/browse/SPARK-21444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16090800#comment-16090800 ] Sital Kedia commented on SPARK-21444: - Any idea how to fix this issue? > Fetch failure due to node reboot causes job failure > --- > > Key: SPARK-21444 > URL: https://issues.apache.org/jira/browse/SPARK-21444 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 2.0.2 >Reporter: Sital Kedia > > We started seeing this issue after merging the PR - > https://github.com/apache/spark/pull/17955. > This PR introduced a change to keep the map-output statuses only in the map > output tracker and whenever there is a fetch failure, the scheduler tries to > invalidate the map-output statuses by talking all the the block manager slave > end point. However, in case the fetch failure is because node reboot, the > block manager slave end point would not be reachable and so the driver fails > the job as a result. See exception below - > {code} > 2017-07-15 05:20:50,255 WARN storage.BlockManagerMaster > (Logging.scala:logWarning(87)) - Failed to remove broadcast 10 with > removeFromMaster = true - Connection reset by peer > java.io.IOException: Connection reset by peer > at sun.nio.ch.FileDispatcherImpl.read0(Native Method) > at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) > at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) > at sun.nio.ch.IOUtil.read(IOUtil.java:192) > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) > at > io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:313) > at > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881) > at > io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:242) > at > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119) > at > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) > at > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) > at java.lang.Thread.run(Thread.java:745) > 2017-07-15 05:20:50,275 ERROR scheduler.DAGSchedulerEventProcessLoop > (Logging.scala:logError(91)) - DAGSchedulerEventProcessLoop failed; shutting > down SparkContext > org.apache.spark.SparkException: Exception thrown in awaitResult > at > org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77) > at > org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:75) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > at > org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) > at > org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) > at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167) > at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83) > at > org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:143) > at > org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:271) > at > org.apache.spark.broadcast.TorrentBroadcast.doDestroy(TorrentBroadcast.scala:165) > at org.apache.spark.broadcast.Broadcast.destroy(Broadcast.scala:111) > at org.apache.spark.broadcast.Broadcast.destroy(Broadcast.scala:98) > at > org.apache.spark.ShuffleStatus.invalidateSerializedMapOutputStatusCache(MapOutputTracker.scala:197) > at > org.apache.spark.ShuffleStatus.removeMapOutput(MapOutputTracker.scala:105) > at > org.apache.spark.MapOutputTrackerMaster.unregisterMapOutput(MapOutputTracker.scala:420) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1324) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1715) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1673) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1662) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > Caused by: java.io.IOException: Connection reset by peer > at sun.nio.ch.FileDispatcherImpl.read0(Native Method) > a
[jira] [Commented] (SPARK-21444) Fetch failure due to node reboot causes job failure
[ https://issues.apache.org/jira/browse/SPARK-21444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16090799#comment-16090799 ] Sital Kedia commented on SPARK-21444: - cc - [~joshrosen] > Fetch failure due to node reboot causes job failure > --- > > Key: SPARK-21444 > URL: https://issues.apache.org/jira/browse/SPARK-21444 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 2.0.2 >Reporter: Sital Kedia > > We started seeing this issue after merging the PR - > https://github.com/apache/spark/pull/17955. > This PR introduced a change to keep the map-output statuses only in the map > output tracker and whenever there is a fetch failure, the scheduler tries to > invalidate the map-output statuses by talking all the the block manager slave > end point. However, in case the fetch failure is because node reboot, the > block manager slave end point would not be reachable and so the driver fails > the job as a result. See exception below - > {code} > 2017-07-15 05:20:50,255 WARN storage.BlockManagerMaster > (Logging.scala:logWarning(87)) - Failed to remove broadcast 10 with > removeFromMaster = true - Connection reset by peer > java.io.IOException: Connection reset by peer > at sun.nio.ch.FileDispatcherImpl.read0(Native Method) > at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) > at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) > at sun.nio.ch.IOUtil.read(IOUtil.java:192) > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) > at > io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:313) > at > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881) > at > io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:242) > at > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119) > at > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) > at > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) > at java.lang.Thread.run(Thread.java:745) > 2017-07-15 05:20:50,275 ERROR scheduler.DAGSchedulerEventProcessLoop > (Logging.scala:logError(91)) - DAGSchedulerEventProcessLoop failed; shutting > down SparkContext > org.apache.spark.SparkException: Exception thrown in awaitResult > at > org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77) > at > org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:75) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > at > org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) > at > org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) > at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167) > at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83) > at > org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:143) > at > org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:271) > at > org.apache.spark.broadcast.TorrentBroadcast.doDestroy(TorrentBroadcast.scala:165) > at org.apache.spark.broadcast.Broadcast.destroy(Broadcast.scala:111) > at org.apache.spark.broadcast.Broadcast.destroy(Broadcast.scala:98) > at > org.apache.spark.ShuffleStatus.invalidateSerializedMapOutputStatusCache(MapOutputTracker.scala:197) > at > org.apache.spark.ShuffleStatus.removeMapOutput(MapOutputTracker.scala:105) > at > org.apache.spark.MapOutputTrackerMaster.unregisterMapOutput(MapOutputTracker.scala:420) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1324) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1715) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1673) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1662) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > Caused by: java.io.IOException: Connection reset by peer > at sun.nio.ch.FileDispatcherImpl.read0(Native Method) > at sun.nio.ch.S
[jira] [Created] (SPARK-21444) Fetch failure due to node reboot causes job failure
Sital Kedia created SPARK-21444: --- Summary: Fetch failure due to node reboot causes job failure Key: SPARK-21444 URL: https://issues.apache.org/jira/browse/SPARK-21444 Project: Spark Issue Type: Bug Components: Scheduler Affects Versions: 2.0.2 Reporter: Sital Kedia We started seeing this issue after merging the PR - https://github.com/apache/spark/pull/17955. This PR introduced a change to keep the map-output statuses only in the map output tracker and whenever there is a fetch failure, the scheduler tries to invalidate the map-output statuses by talking all the the block manager slave end point. However, in case the fetch failure is because node reboot, the block manager slave end point would not be reachable and so the driver fails the job as a result. See exception below - {code} 2017-07-15 05:20:50,255 WARN storage.BlockManagerMaster (Logging.scala:logWarning(87)) - Failed to remove broadcast 10 with removeFromMaster = true - Connection reset by peer java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:192) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:313) at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881) at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:242) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) at java.lang.Thread.run(Thread.java:745) 2017-07-15 05:20:50,275 ERROR scheduler.DAGSchedulerEventProcessLoop (Logging.scala:logError(91)) - DAGSchedulerEventProcessLoop failed; shutting down SparkContext org.apache.spark.SparkException: Exception thrown in awaitResult at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77) at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:75) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167) at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83) at org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:143) at org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:271) at org.apache.spark.broadcast.TorrentBroadcast.doDestroy(TorrentBroadcast.scala:165) at org.apache.spark.broadcast.Broadcast.destroy(Broadcast.scala:111) at org.apache.spark.broadcast.Broadcast.destroy(Broadcast.scala:98) at org.apache.spark.ShuffleStatus.invalidateSerializedMapOutputStatusCache(MapOutputTracker.scala:197) at org.apache.spark.ShuffleStatus.removeMapOutput(MapOutputTracker.scala:105) at org.apache.spark.MapOutputTrackerMaster.unregisterMapOutput(MapOutputTracker.scala:420) at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1324) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1715) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1673) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1662) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) Caused by: java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:192) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:313) at io.netty.buffer.AbstractByteBuf.writeBytes(Abstract
[jira] [Created] (SPARK-21113) Support for read ahead input stream to amortize disk IO cost in the Spill reader
Sital Kedia created SPARK-21113: --- Summary: Support for read ahead input stream to amortize disk IO cost in the Spill reader Key: SPARK-21113 URL: https://issues.apache.org/jira/browse/SPARK-21113 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 2.0.2 Reporter: Sital Kedia Priority: Minor Profiling some of our big jobs, we see that around 30% of the time is being spent in reading the spill files from disk. In order to amortize the disk IO cost, the idea is to implement a read ahead input stream which which asynchronously reads ahead from the underlying input stream when specified amount of data has been read from the current buffer. It does it by maintaining two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying input stream and once the current active buffer is exhausted, we flip the two buffers so that we can start reading from the read ahead buffer without being blocked in disk I/O. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18838) High latency of event processing for large jobs
[ https://issues.apache.org/jira/browse/SPARK-18838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16046957#comment-16046957 ] Sital Kedia commented on SPARK-18838: - [~joshrosen] - The PR for my change to multi-thread the event processor has been closed inadvertently as a stale PR. Can you take a look at the approach and let me know what you think. Please note that this change is very critical for running large jobs in our environment. https://github.com/apache/spark/pull/16291 > High latency of event processing for large jobs > --- > > Key: SPARK-18838 > URL: https://issues.apache.org/jira/browse/SPARK-18838 > Project: Spark > Issue Type: Improvement >Affects Versions: 2.0.0 >Reporter: Sital Kedia > Attachments: perfResults.pdf, SparkListernerComputeTime.xlsx > > > Currently we are observing the issue of very high event processing delay in > driver's `ListenerBus` for large jobs with many tasks. Many critical > component of the scheduler like `ExecutorAllocationManager`, > `HeartbeatReceiver` depend on the `ListenerBus` events and this delay might > hurt the job performance significantly or even fail the job. For example, a > significant delay in receiving the `SparkListenerTaskStart` might cause > `ExecutorAllocationManager` manager to mistakenly remove an executor which is > not idle. > The problem is that the event processor in `ListenerBus` is a single thread > which loops through all the Listeners for each event and processes each event > synchronously > https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala#L94. > This single threaded processor often becomes the bottleneck for large jobs. > Also, if one of the Listener is very slow, all the listeners will pay the > price of delay incurred by the slow listener. In addition to that a slow > listener can cause events to be dropped from the event queue which might be > fatal to the job. > To solve the above problems, we propose to get rid of the event queue and the > single threaded event processor. Instead each listener will have its own > dedicate single threaded executor service . When ever an event is posted, it > will be submitted to executor service of all the listeners. The Single > threaded executor service will guarantee in order processing of the events > per listener. The queue used for the executor service will be bounded to > guarantee we do not grow the memory indefinitely. The downside of this > approach is separate event queue per listener will increase the driver memory > footprint. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20178) Improve Scheduler fetch failures
[ https://issues.apache.org/jira/browse/SPARK-20178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16030284#comment-16030284 ] Sital Kedia commented on SPARK-20178: - https://github.com/apache/spark/pull/18150 > Improve Scheduler fetch failures > > > Key: SPARK-20178 > URL: https://issues.apache.org/jira/browse/SPARK-20178 > Project: Spark > Issue Type: Epic > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Thomas Graves > > We have been having a lot of discussions around improving the handling of > fetch failures. There are 4 jira currently related to this. > We should try to get a list of things we want to improve and come up with one > cohesive design. > SPARK-20163, SPARK-20091, SPARK-14649 , and SPARK-19753 > I will put my initial thoughts in a follow on comment. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20178) Improve Scheduler fetch failures
[ https://issues.apache.org/jira/browse/SPARK-20178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16027036#comment-16027036 ] Sital Kedia commented on SPARK-20178: - >> So to get the robustness for now I'm fine with just invalidating it >> immediately and see how that works. [~tgraves] - Let me know if you want me to resurrect - https://github.com/apache/spark/pull/17088 which exactly does that. It was closed inadvertently as a stale PR. > Improve Scheduler fetch failures > > > Key: SPARK-20178 > URL: https://issues.apache.org/jira/browse/SPARK-20178 > Project: Spark > Issue Type: Epic > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Thomas Graves > > We have been having a lot of discussions around improving the handling of > fetch failures. There are 4 jira currently related to this. > We should try to get a list of things we want to improve and come up with one > cohesive design. > SPARK-20163, SPARK-20091, SPARK-14649 , and SPARK-19753 > I will put my initial thoughts in a follow on comment. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18838) High latency of event processing for large jobs
[ https://issues.apache.org/jira/browse/SPARK-18838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16016217#comment-16016217 ] Sital Kedia commented on SPARK-18838: - [~joshrosen] - >> Alternatively, we could use two queues, one for internal listeners and another for external ones. This wouldn't be as fine-grained as thread-per-listener but might buy us a lot of the benefits with perhaps less code needed. Actually that is exactly what my PR is doing. https://github.com/apache/spark/pull/16291. I have not been able to work on it recently, but you can take a look and let me know how it looks. I can prioritize working on it. > High latency of event processing for large jobs > --- > > Key: SPARK-18838 > URL: https://issues.apache.org/jira/browse/SPARK-18838 > Project: Spark > Issue Type: Improvement >Affects Versions: 2.0.0 >Reporter: Sital Kedia > > Currently we are observing the issue of very high event processing delay in > driver's `ListenerBus` for large jobs with many tasks. Many critical > component of the scheduler like `ExecutorAllocationManager`, > `HeartbeatReceiver` depend on the `ListenerBus` events and this delay might > hurt the job performance significantly or even fail the job. For example, a > significant delay in receiving the `SparkListenerTaskStart` might cause > `ExecutorAllocationManager` manager to mistakenly remove an executor which is > not idle. > The problem is that the event processor in `ListenerBus` is a single thread > which loops through all the Listeners for each event and processes each event > synchronously > https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala#L94. > This single threaded processor often becomes the bottleneck for large jobs. > Also, if one of the Listener is very slow, all the listeners will pay the > price of delay incurred by the slow listener. In addition to that a slow > listener can cause events to be dropped from the event queue which might be > fatal to the job. > To solve the above problems, we propose to get rid of the event queue and the > single threaded event processor. Instead each listener will have its own > dedicate single threaded executor service . When ever an event is posted, it > will be submitted to executor service of all the listeners. The Single > threaded executor service will guarantee in order processing of the events > per listener. The queue used for the executor service will be bounded to > guarantee we do not grow the memory indefinitely. The downside of this > approach is separate event queue per listener will increase the driver memory > footprint. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-20640) Make rpc timeout and retry for shuffle registration configurable
Sital Kedia created SPARK-20640: --- Summary: Make rpc timeout and retry for shuffle registration configurable Key: SPARK-20640 URL: https://issues.apache.org/jira/browse/SPARK-20640 Project: Spark Issue Type: Bug Components: Shuffle Affects Versions: 2.0.2 Reporter: Sital Kedia Currently the shuffle service registration timeout and retry has been hardcoded (see https://github.com/sitalkedia/spark/blob/master/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java#L144 and https://github.com/sitalkedia/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L197). This works well for small workloads but under heavy workload when the shuffle service is busy transferring large amount of data we see significant delay in responding to the registration request, as a result we often see the executors fail to register with the shuffle service, eventually failing the job. We need to make these two parameters configurable. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20178) Improve Scheduler fetch failures
[ https://issues.apache.org/jira/browse/SPARK-20178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15957341#comment-15957341 ] Sital Kedia commented on SPARK-20178: - [~tgraves] Thanks for creating the JIRA and driving the discussion [~tgraves], [~imranr], [~markhamstra], [~kayousterhout] - . As discussed over the PR, I have created a doc consolidating the issues related to fetch failure and a high-level design goals for the scheduler https://docs.google.com/document/d/1D3b_ishMfm5sXmRS494JrOJmL9V_TRVZUB4TgK1l1fY/edit?usp=sharing. Please take a look and let me know what you think. Once we reach a consensus about the desired behavior and identify the changes that needs to be done, I can work on having more detailed design doc and also split the change into multiple small logical PRs. > Improve Scheduler fetch failures > > > Key: SPARK-20178 > URL: https://issues.apache.org/jira/browse/SPARK-20178 > Project: Spark > Issue Type: Epic > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Thomas Graves > > We have been having a lot of discussions around improving the handling of > fetch failures. There are 4 jira currently related to this. > We should try to get a list of things we want to improve and come up with one > cohesive design. > SPARK-20163, SPARK-20091, SPARK-14649 , and SPARK-19753 > I will put my initial thoughts in a follow on comment. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Closed] (SPARK-20163) Kill all running tasks in a stage in case of fetch failure
[ https://issues.apache.org/jira/browse/SPARK-20163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sital Kedia closed SPARK-20163. --- Resolution: Duplicate > Kill all running tasks in a stage in case of fetch failure > -- > > Key: SPARK-20163 > URL: https://issues.apache.org/jira/browse/SPARK-20163 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 2.0.1 >Reporter: Sital Kedia > > Currently, the scheduler does not kill the running tasks in a stage when it > encounters fetch failure, as a result, we might end up running many duplicate > tasks in the cluster. There is already a TODO in TaskSetManager to kill all > running tasks which has not been implemented. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20163) Kill all running tasks in a stage in case of fetch failure
[ https://issues.apache.org/jira/browse/SPARK-20163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15951738#comment-15951738 ] Sital Kedia commented on SPARK-20163: - Thanks [~imranr], closing this as this is duplicate of SPARK-2666. > Kill all running tasks in a stage in case of fetch failure > -- > > Key: SPARK-20163 > URL: https://issues.apache.org/jira/browse/SPARK-20163 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 2.0.1 >Reporter: Sital Kedia > > Currently, the scheduler does not kill the running tasks in a stage when it > encounters fetch failure, as a result, we might end up running many duplicate > tasks in the cluster. There is already a TODO in TaskSetManager to kill all > running tasks which has not been implemented. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-20163) Kill all running tasks in a stage in case of fetch failure
Sital Kedia created SPARK-20163: --- Summary: Kill all running tasks in a stage in case of fetch failure Key: SPARK-20163 URL: https://issues.apache.org/jira/browse/SPARK-20163 Project: Spark Issue Type: Bug Components: Scheduler Affects Versions: 2.0.1 Reporter: Sital Kedia Currently, the scheduler does not kill the running tasks in a stage when it encounters fetch failure, as a result, we might end up running many duplicate tasks in the cluster. There is already a TODO in TaskSetManager to kill all running tasks which has not been implemented. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3577) Add task metric to report spill time
[ https://issues.apache.org/jira/browse/SPARK-3577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15947669#comment-15947669 ] Sital Kedia commented on SPARK-3577: I am making a change to report correct spill data size on disk. > Add task metric to report spill time > > > Key: SPARK-3577 > URL: https://issues.apache.org/jira/browse/SPARK-3577 > Project: Spark > Issue Type: Bug > Components: Shuffle, Spark Core >Affects Versions: 1.1.0 >Reporter: Kay Ousterhout >Priority: Minor > Attachments: spill_size.jpg > > > The {{ExternalSorter}} passes its own {{ShuffleWriteMetrics}} into > {{ExternalSorter}}. The write time recorded in those metrics is never used. > We should probably add task metrics to report this spill time, since for > shuffles, this would have previously been reported as part of shuffle write > time (with the original hash-based sorter). -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-20091) DagScheduler should allow running concurrent attempts of a stage in case of multiple fetch failure
Sital Kedia created SPARK-20091: --- Summary: DagScheduler should allow running concurrent attempts of a stage in case of multiple fetch failure Key: SPARK-20091 URL: https://issues.apache.org/jira/browse/SPARK-20091 Project: Spark Issue Type: Improvement Components: Scheduler Affects Versions: 2.0.1 Reporter: Sital Kedia Currently, the Dag scheduler does not allow running concurrent attempts of a stage in case of multiple fetch failure. As a result, in case of multipe fetch failures are detected, serial execution of map stage delays the job run significantly. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-20074) Make buffer size in unsafe external sorter configurable
Sital Kedia created SPARK-20074: --- Summary: Make buffer size in unsafe external sorter configurable Key: SPARK-20074 URL: https://issues.apache.org/jira/browse/SPARK-20074 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 2.0.1 Reporter: Sital Kedia Currently, it is hardcoded to 32kb, see - https://github.com/sitalkedia/spark/blob/master/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java#L123 -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20014) Optimize mergeSpillsWithFileStream method
[ https://issues.apache.org/jira/browse/SPARK-20014?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sital Kedia updated SPARK-20014: Description: When the individual partition size in a spill is small, mergeSpillsWithTransferTo method does many small disk ios which is really inefficient. One way to improve the performance will be to use mergeSpillsWithFileStream method by turning off transfer to and using buffered file read/write to improve the io throughput. However, the current implementation of mergeSpillsWithFileStream does not do a buffer read/write of the files and in addition to that it unnecessarily flushes the output files for each partitions. was: When the individual partition size in a spill is small, mergeSpillsWithTransferTo method does many small disk ios which is really inefficient. One way to improve the performance will be to use mergeSpillsWithFileStream method by turning off transfer to and using buffered file read/write to improve the io throughput. However, the current implementation of mergeSpillsWithFileStream does not do a buffer read/write of the files and in addition to that it unnecessarily opens/close the output files for each partitions. > Optimize mergeSpillsWithFileStream method > - > > Key: SPARK-20014 > URL: https://issues.apache.org/jira/browse/SPARK-20014 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.0.0 >Reporter: Sital Kedia > > When the individual partition size in a spill is small, > mergeSpillsWithTransferTo method does many small disk ios which is really > inefficient. One way to improve the performance will be to use > mergeSpillsWithFileStream method by turning off transfer to and using > buffered file read/write to improve the io throughput. > However, the current implementation of mergeSpillsWithFileStream does not do > a buffer read/write of the files and in addition to that it unnecessarily > flushes the output files for each partitions. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-20014) Optimize mergeSpillsWithFileStream method
Sital Kedia created SPARK-20014: --- Summary: Optimize mergeSpillsWithFileStream method Key: SPARK-20014 URL: https://issues.apache.org/jira/browse/SPARK-20014 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 2.0.0 Reporter: Sital Kedia When the individual partition size in a spill is small, mergeSpillsWithTransferTo method does many small disk ios which is really inefficient. One way to improve the performance will be to use mergeSpillsWithFileStream method by turning off transfer to and using buffered file read/write to improve the io throughput. However, the current implementation of mergeSpillsWithFileStream does not do a buffer read/write of the files and in addition to that it unnecessarily opens/close the output files for each partitions. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19837) Fetch failure throws a SparkException in SparkHiveWriter
[ https://issues.apache.org/jira/browse/SPARK-19837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15897961#comment-15897961 ] Sital Kedia commented on SPARK-19837: - `SparkHiveDynamicPartitionWriterContainer` has been refactored in latest master. Not sure if the issue still exists. Will close this JIRA and reopen if we still see issue with latest. > Fetch failure throws a SparkException in SparkHiveWriter > > > Key: SPARK-19837 > URL: https://issues.apache.org/jira/browse/SPARK-19837 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.0.1 >Reporter: Sital Kedia > > Currently Fetchfailure in SparkHiveWriter fails the job with following > exception > {code} > 0_0): org.apache.spark.SparkException: Task failed while writing rows. > at > org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer.writeToFile(hiveWriterContainers.scala:385) > at > org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:84) > at > org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:84) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) > at org.apache.spark.scheduler.Task.run(Task.scala:86) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.spark.shuffle.FetchFailedException: Connection reset by > peer > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:357) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:332) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:54) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at > org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:83) > at > org.apache.spark.sql.execution.joins.SortMergeJoinScanner.advancedStreamed(SortMergeJoinExec.scala:731) > at > org.apache.spark.sql.execution.joins.SortMergeJoinScanner.findNextOuterJoinRows(SortMergeJoinExec.scala:692) > at > org.apache.spark.sql.execution.joins.OneSideOuterIterator.advanceStream(SortMergeJoinExec.scala:854) > at > org.apache.spark.sql.execution.joins.OneSideOuterIterator.advanceNext(SortMergeJoinExec.scala:887) > at > org.apache.spark.sql.execution.RowIteratorToScala.hasNext(RowIterator.scala:68) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) > at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:211) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) > at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:211) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer.writeToFile(hiveWriterContainers.scala:343) > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsu
[jira] [Closed] (SPARK-19837) Fetch failure throws a SparkException in SparkHiveWriter
[ https://issues.apache.org/jira/browse/SPARK-19837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sital Kedia closed SPARK-19837. --- Resolution: Fixed > Fetch failure throws a SparkException in SparkHiveWriter > > > Key: SPARK-19837 > URL: https://issues.apache.org/jira/browse/SPARK-19837 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.0.1 >Reporter: Sital Kedia > > Currently Fetchfailure in SparkHiveWriter fails the job with following > exception > {code} > 0_0): org.apache.spark.SparkException: Task failed while writing rows. > at > org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer.writeToFile(hiveWriterContainers.scala:385) > at > org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:84) > at > org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:84) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) > at org.apache.spark.scheduler.Task.run(Task.scala:86) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.spark.shuffle.FetchFailedException: Connection reset by > peer > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:357) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:332) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:54) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at > org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:83) > at > org.apache.spark.sql.execution.joins.SortMergeJoinScanner.advancedStreamed(SortMergeJoinExec.scala:731) > at > org.apache.spark.sql.execution.joins.SortMergeJoinScanner.findNextOuterJoinRows(SortMergeJoinExec.scala:692) > at > org.apache.spark.sql.execution.joins.OneSideOuterIterator.advanceStream(SortMergeJoinExec.scala:854) > at > org.apache.spark.sql.execution.joins.OneSideOuterIterator.advanceNext(SortMergeJoinExec.scala:887) > at > org.apache.spark.sql.execution.RowIteratorToScala.hasNext(RowIterator.scala:68) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) > at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:211) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) > at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:211) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer.writeToFile(hiveWriterContainers.scala:343) > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-19837) Fetch failure throws a SparkException in SparkHiveWriter
Sital Kedia created SPARK-19837: --- Summary: Fetch failure throws a SparkException in SparkHiveWriter Key: SPARK-19837 URL: https://issues.apache.org/jira/browse/SPARK-19837 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.0.1 Reporter: Sital Kedia Currently Fetchfailure in SparkHiveWriter fails the job with following exception {code} 0_0): org.apache.spark.SparkException: Task failed while writing rows. at org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer.writeToFile(hiveWriterContainers.scala:385) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:84) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:84) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:86) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.spark.shuffle.FetchFailedException: Connection reset by peer at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:357) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:332) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:54) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) at org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:83) at org.apache.spark.sql.execution.joins.SortMergeJoinScanner.advancedStreamed(SortMergeJoinExec.scala:731) at org.apache.spark.sql.execution.joins.SortMergeJoinScanner.findNextOuterJoinRows(SortMergeJoinExec.scala:692) at org.apache.spark.sql.execution.joins.OneSideOuterIterator.advanceStream(SortMergeJoinExec.scala:854) at org.apache.spark.sql.execution.joins.OneSideOuterIterator.advanceNext(SortMergeJoinExec.scala:887) at org.apache.spark.sql.execution.RowIteratorToScala.hasNext(RowIterator.scala:68) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:211) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:211) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer.writeToFile(hiveWriterContainers.scala:343) {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-19803) Flaky BlockManagerProactiveReplicationSuite tests
Sital Kedia created SPARK-19803: --- Summary: Flaky BlockManagerProactiveReplicationSuite tests Key: SPARK-19803 URL: https://issues.apache.org/jira/browse/SPARK-19803 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.3.0 Reporter: Sital Kedia The tests added for BlockManagerProactiveReplicationSuite has made the jenkins build flaky. Please refer to the build for more details - https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73640/testReport/ -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-19753) Remove all shuffle files on a host in case of slave lost of fetch failure
Sital Kedia created SPARK-19753: --- Summary: Remove all shuffle files on a host in case of slave lost of fetch failure Key: SPARK-19753 URL: https://issues.apache.org/jira/browse/SPARK-19753 Project: Spark Issue Type: Bug Components: Scheduler Affects Versions: 2.0.1 Reporter: Sital Kedia Currently, when we detect fetch failure, we only remove the shuffle files produced by the executor, while the host itself might be down and all the shuffle files are not accessible. In case we are running multiple executors on a host, any host going down currently results in multiple fetch failures and multiple retries of the stage, which is very inefficient. If we remove all the shuffle files on that host, on first fetch failure, we can rerun all the tasks on that host in a single stage retry. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18838) High latency of event processing for large jobs
[ https://issues.apache.org/jira/browse/SPARK-18838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15753535#comment-15753535 ] Sital Kedia commented on SPARK-18838: - cc - [~kayousterhout] > High latency of event processing for large jobs > --- > > Key: SPARK-18838 > URL: https://issues.apache.org/jira/browse/SPARK-18838 > Project: Spark > Issue Type: Improvement >Affects Versions: 2.0.0 >Reporter: Sital Kedia > > Currently we are observing the issue of very high event processing delay in > driver's `ListenerBus` for large jobs with many tasks. Many critical > component of the scheduler like `ExecutorAllocationManager`, > `HeartbeatReceiver` depend on the `ListenerBus` events and this delay might > hurt the job performance significantly or even fail the job. For example, a > significant delay in receiving the `SparkListenerTaskStart` might cause > `ExecutorAllocationManager` manager to mistakenly remove an executor which is > not idle. > The problem is that the event processor in `ListenerBus` is a single thread > which loops through all the Listeners for each event and processes each event > synchronously > https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala#L94. > This single threaded processor often becomes the bottleneck for large jobs. > Also, if one of the Listener is very slow, all the listeners will pay the > price of delay incurred by the slow listener. In addition to that a slow > listener can cause events to be dropped from the event queue which might be > fatal to the job. > To solve the above problems, we propose to get rid of the event queue and the > single threaded event processor. Instead each listener will have its own > dedicate single threaded executor service . When ever an event is posted, it > will be submitted to executor service of all the listeners. The Single > threaded executor service will guarantee in order processing of the events > per listener. The queue used for the executor service will be bounded to > guarantee we do not grow the memory indefinitely. The downside of this > approach is separate event queue per listener will increase the driver memory > footprint. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-18838) High latency of event processing for large jobs
[ https://issues.apache.org/jira/browse/SPARK-18838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sital Kedia updated SPARK-18838: Description: Currently we are observing the issue of very high event processing delay in driver's `ListenerBus` for large jobs with many tasks. Many critical component of the scheduler like `ExecutorAllocationManager`, `HeartbeatReceiver` depend on the `ListenerBus` events and this delay might hurt the job performance significantly or even fail the job. For example, a significant delay in receiving the `SparkListenerTaskStart` might cause `ExecutorAllocationManager` manager to mistakenly remove an executor which is not idle. The problem is that the event processor in `ListenerBus` is a single thread which loops through all the Listeners for each event and processes each event synchronously https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala#L94. This single threaded processor often becomes the bottleneck for large jobs. Also, if one of the Listener is very slow, all the listeners will pay the price of delay incurred by the slow listener. In addition to that a slow listener can cause events to be dropped from the event queue which might be fatal to the job. To solve the above problems, we propose to get rid of the event queue and the single threaded event processor. Instead each listener will have its own dedicate single threaded executor service . When ever an event is posted, it will be submitted to executor service of all the listeners. The Single threaded executor service will guarantee in order processing of the events per listener. The queue used for the executor service will be bounded to guarantee we do not grow the memory indefinitely. The downside of this approach is separate event queue per listener will increase the driver memory footprint. was: Currently we are observing the issue of very high event processing delay in driver's `ListenerBus` for large jobs with many tasks. Many critical component of the scheduler like `ExecutorAllocationManager`, `HeartbeatReceiver` depend on the `ListenerBus` events and these delay is causing job failure. For example, a significant delay in receiving the `SparkListenerTaskStart` might cause `ExecutorAllocationManager` manager to remove an executor which is not idle. The event processor in `ListenerBus` is a single thread which loops through all the Listeners for each event and processes each event synchronously https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala#L94. The single threaded processor often becomes the bottleneck for large jobs. In addition to that, if one of the Listener is very slow, all the listeners will pay the price of delay incurred by the slow listener. To solve the above problems, we plan to have a per listener single threaded executor service and separate event queue. That way we are not bottlenecked by the single threaded event processor and also critical listeners will not be penalized by the slow listeners. The downside of this approach is separate event queue per listener will increase the driver memory footprint. > High latency of event processing for large jobs > --- > > Key: SPARK-18838 > URL: https://issues.apache.org/jira/browse/SPARK-18838 > Project: Spark > Issue Type: Improvement >Affects Versions: 2.0.0 >Reporter: Sital Kedia > > Currently we are observing the issue of very high event processing delay in > driver's `ListenerBus` for large jobs with many tasks. Many critical > component of the scheduler like `ExecutorAllocationManager`, > `HeartbeatReceiver` depend on the `ListenerBus` events and this delay might > hurt the job performance significantly or even fail the job. For example, a > significant delay in receiving the `SparkListenerTaskStart` might cause > `ExecutorAllocationManager` manager to mistakenly remove an executor which is > not idle. > The problem is that the event processor in `ListenerBus` is a single thread > which loops through all the Listeners for each event and processes each event > synchronously > https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala#L94. > This single threaded processor often becomes the bottleneck for large jobs. > Also, if one of the Listener is very slow, all the listeners will pay the > price of delay incurred by the slow listener. In addition to that a slow > listener can cause events to be dropped from the event queue which might be > fatal to the job. > To solve the above problems, we propose to get rid of the event queue and the > single threaded event processor. Instead each listener will have its own > dedic
[jira] [Commented] (SPARK-18838) High latency of event processing for large jobs
[ https://issues.apache.org/jira/browse/SPARK-18838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15750217#comment-15750217 ] Sital Kedia commented on SPARK-18838: - [~zsxwing] - Its not only the ExecutorAllocationManager, other critical listeners like HeartbeatReceiver also depend on it. In addition to that there might be some latency sensitive user added listener. Making the event processing faster by multi-threading will fix all theses issues. I have an initial version of the PR for this, would appreciate if you can take a look and give feedback on the overall design. > High latency of event processing for large jobs > --- > > Key: SPARK-18838 > URL: https://issues.apache.org/jira/browse/SPARK-18838 > Project: Spark > Issue Type: Improvement >Affects Versions: 2.0.0 >Reporter: Sital Kedia > > Currently we are observing the issue of very high event processing delay in > driver's `ListenerBus` for large jobs with many tasks. Many critical > component of the scheduler like `ExecutorAllocationManager`, > `HeartbeatReceiver` depend on the `ListenerBus` events and these delay is > causing job failure. For example, a significant delay in receiving the > `SparkListenerTaskStart` might cause `ExecutorAllocationManager` manager to > remove an executor which is not idle. The event processor in `ListenerBus` > is a single thread which loops through all the Listeners for each event and > processes each event synchronously > https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala#L94. > > The single threaded processor often becomes the bottleneck for large jobs. > In addition to that, if one of the Listener is very slow, all the listeners > will pay the price of delay incurred by the slow listener. > To solve the above problems, we plan to have a per listener single threaded > executor service and separate event queue. That way we are not bottlenecked > by the single threaded event processor and also critical listeners will not > be penalized by the slow listeners. The downside of this approach is separate > event queue per listener will increase the driver memory footprint. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-18838) High latency of event processing for large jobs
[ https://issues.apache.org/jira/browse/SPARK-18838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sital Kedia updated SPARK-18838: Description: Currently we are observing the issue of very high event processing delay in driver's `ListenerBus` for large jobs with many tasks. Many critical component of the scheduler like `ExecutorAllocationManager`, `HeartbeatReceiver` depend on the `ListenerBus` events and these delay is causing job failure. For example, a significant delay in receiving the `SparkListenerTaskStart` might cause `ExecutorAllocationManager` manager to remove an executor which is not idle. The event processor in `ListenerBus` is a single thread which loops through all the Listeners for each event and processes each event synchronously https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala#L94. The single threaded processor often becomes the bottleneck for large jobs. In addition to that, if one of the Listener is very slow, all the listeners will pay the price of delay incurred by the slow listener. To solve the above problems, we plan to have a per listener single threaded executor service and separate event queue. That way we are not bottlenecked by the single threaded event processor and also critical listeners will not be penalized by the slow listeners. The downside of this approach is separate event queue per listener will increase the driver memory footprint. was: Currently we are observing the issue of very high event processing delay in driver's `ListenerBus` for large jobs with many tasks. Many critical component of the scheduler like `ExecutorAllocationManager`, `HeartbeatReceiver` depend on the `ListenerBus` events and these delay is causing job failure. For example, a significant delay in receiving the `SparkListenerTaskStart` might cause `ExecutorAllocationManager` manager to remove an executor which is not idle. The event processor in `ListenerBus` is a single thread which loops through all the Listeners for each event and processes each event synchronously https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala#L94. The single threaded processor often becomes the bottleneck for large jobs. In addition to that, if one of the Listener is very slow, all the listeners will pay the price of delay incurred by the slow listener. To solve the above problems, we plan to have a single threaded executor service and separate event queue per listener. That way we are not bottlenecked by the single threaded processor and also critical listeners will not be penalized by the slow listeners. The downside of this approach is separate event queue per listener will increase the driver memory footprint. > High latency of event processing for large jobs > --- > > Key: SPARK-18838 > URL: https://issues.apache.org/jira/browse/SPARK-18838 > Project: Spark > Issue Type: Improvement >Affects Versions: 2.0.0 >Reporter: Sital Kedia > > Currently we are observing the issue of very high event processing delay in > driver's `ListenerBus` for large jobs with many tasks. Many critical > component of the scheduler like `ExecutorAllocationManager`, > `HeartbeatReceiver` depend on the `ListenerBus` events and these delay is > causing job failure. For example, a significant delay in receiving the > `SparkListenerTaskStart` might cause `ExecutorAllocationManager` manager to > remove an executor which is not idle. The event processor in `ListenerBus` > is a single thread which loops through all the Listeners for each event and > processes each event synchronously > https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala#L94. > > The single threaded processor often becomes the bottleneck for large jobs. > In addition to that, if one of the Listener is very slow, all the listeners > will pay the price of delay incurred by the slow listener. > To solve the above problems, we plan to have a per listener single threaded > executor service and separate event queue. That way we are not bottlenecked > by the single threaded event processor and also critical listeners will not > be penalized by the slow listeners. The downside of this approach is separate > event queue per listener will increase the driver memory footprint. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18838) High latency of event processing for large jobs
[ https://issues.apache.org/jira/browse/SPARK-18838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15744086#comment-15744086 ] Sital Kedia commented on SPARK-18838: - [~rxin], [~zsxwing] - Any thoughts on this? > High latency of event processing for large jobs > --- > > Key: SPARK-18838 > URL: https://issues.apache.org/jira/browse/SPARK-18838 > Project: Spark > Issue Type: Improvement >Affects Versions: 2.0.0 >Reporter: Sital Kedia > > Currently we are observing the issue of very high event processing delay in > driver's `ListenerBus` for large jobs with many tasks. Many critical > component of the scheduler like `ExecutorAllocationManager`, > `HeartbeatReceiver` depend on the `ListenerBus` events and these delay is > causing job failure. For example, a significant delay in receiving the > `SparkListenerTaskStart` might cause `ExecutorAllocationManager` manager to > remove an executor which is not idle. The event processor in `ListenerBus` > is a single thread which loops through all the Listeners for each event and > processes each event synchronously > https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala#L94. > > The single threaded processor often becomes the bottleneck for large jobs. > In addition to that, if one of the Listener is very slow, all the listeners > will pay the price of delay incurred by the slow listener. > To solve the above problems, we plan to have a single threaded executor > service and separate event queue per listener. That way we are not > bottlenecked by the single threaded processor and also critical listeners > will not be penalized by the slow listeners. The downside of this approach is > separate event queue per listener will increase the driver memory footprint. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-18838) High latency of event processing for large jobs
[ https://issues.apache.org/jira/browse/SPARK-18838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sital Kedia updated SPARK-18838: Description: Currently we are observing the issue of very high event processing delay in driver's `ListenerBus` for large jobs with many tasks. Many critical component of the scheduler like `ExecutorAllocationManager`, `HeartbeatReceiver` depend on the `ListenerBus` events and these delay is causing job failure. For example, a significant delay in receiving the `SparkListenerTaskStart` might cause `ExecutorAllocationManager` manager to remove an executor which is not idle. The event processor in `ListenerBus` is a single thread which loops through all the Listeners for each event and processes each event synchronously https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala#L94. The single threaded processor often becomes the bottleneck for large jobs. In addition to that, if one of the Listener is very slow, all the listeners will pay the price of delay incurred by the slow listener. To solve the above problems, we plan to have a single threaded executor service and separate event queue per listener. That way we are not bottlenecked by the single threaded processor and also critical listeners will not be penalized by the slow listeners. The downside of this approach is separate event queue per listener will increase the driver memory footprint. was: Currently we are observing the issue of very high event processing delay in driver's `ListenerBus` for large jobs with many tasks. Many critical component of the scheduler like `ExecutorAllocationManager`, `HeartbeatReceiver` depend on the `ListenerBus` events and these delay is causing job failure. For example, a significant delay in receiving the `SparkListenerTaskStart` might cause `ExecutorAllocationManager` manager to remove an executor which is not idle. The event processor in `ListenerBus` is a single thread which loops through all the Listeners for each event and processes each event synchronously. The single threaded processor often becomes the bottleneck for large jobs. In addition to that, if one of the Listener is very slow, all the listeners will pay the price of delay incurred by the slow listener. To solve the above problems, we plan to have a single threaded executor service and separate event queue per listener. That way we are not bottlenecked by the single threaded processor and also critical listeners will not be penalized by the slow listeners. > High latency of event processing for large jobs > --- > > Key: SPARK-18838 > URL: https://issues.apache.org/jira/browse/SPARK-18838 > Project: Spark > Issue Type: Improvement >Affects Versions: 2.0.0 >Reporter: Sital Kedia > > Currently we are observing the issue of very high event processing delay in > driver's `ListenerBus` for large jobs with many tasks. Many critical > component of the scheduler like `ExecutorAllocationManager`, > `HeartbeatReceiver` depend on the `ListenerBus` events and these delay is > causing job failure. For example, a significant delay in receiving the > `SparkListenerTaskStart` might cause `ExecutorAllocationManager` manager to > remove an executor which is not idle. The event processor in `ListenerBus` > is a single thread which loops through all the Listeners for each event and > processes each event synchronously > https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala#L94. > > The single threaded processor often becomes the bottleneck for large jobs. > In addition to that, if one of the Listener is very slow, all the listeners > will pay the price of delay incurred by the slow listener. > To solve the above problems, we plan to have a single threaded executor > service and separate event queue per listener. That way we are not > bottlenecked by the single threaded processor and also critical listeners > will not be penalized by the slow listeners. The downside of this approach is > separate event queue per listener will increase the driver memory footprint. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-18838) High latency of event processing for large jobs
Sital Kedia created SPARK-18838: --- Summary: High latency of event processing for large jobs Key: SPARK-18838 URL: https://issues.apache.org/jira/browse/SPARK-18838 Project: Spark Issue Type: Improvement Affects Versions: 2.0.0 Reporter: Sital Kedia Currently we are observing the issue of very high event processing delay in driver's `ListenerBus` for large jobs with many tasks. Many critical component of the scheduler like `ExecutorAllocationManager`, `HeartbeatReceiver` depend on the `ListenerBus` events and these delay is causing job failure. For example, a significant delay in receiving the `SparkListenerTaskStart` might cause `ExecutorAllocationManager` manager to remove an executor which is not idle. The event processor in `ListenerBus` is a single thread which loops through all the Listeners for each event and processes each event synchronously. The single threaded processor often becomes the bottleneck for large jobs. In addition to that, if one of the Listener is very slow, all the listeners will pay the price of delay incurred by the slow listener. To solve the above problems, we plan to have a single threaded executor service and separate event queue per listener. That way we are not bottlenecked by the single threaded processor and also critical listeners will not be penalized by the slow listeners. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-13510) Shuffle may throw FetchFailedException: Direct buffer memory
[ https://issues.apache.org/jira/browse/SPARK-13510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15671415#comment-15671415 ] Sital Kedia commented on SPARK-13510: - [~shenhong] - We are seeing the same issue on our side. Do you have a PR for this yet? > Shuffle may throw FetchFailedException: Direct buffer memory > > > Key: SPARK-13510 > URL: https://issues.apache.org/jira/browse/SPARK-13510 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.6.0 >Reporter: Hong Shen > Attachments: spark-13510.diff > > > In our cluster, when I test spark-1.6.0 with a sql, it throw exception and > failed. > {code} > 16/02/17 15:36:03 INFO storage.ShuffleBlockFetcherIterator: Sending request > for 1 blocks (915.4 MB) from 10.196.134.220:7337 > 16/02/17 15:36:03 INFO shuffle.ExternalShuffleClient: External shuffle fetch > from 10.196.134.220:7337 (executor id 122) > 16/02/17 15:36:03 INFO client.TransportClient: Sending fetch chunk request 0 > to /10.196.134.220:7337 > 16/02/17 15:36:36 WARN server.TransportChannelHandler: Exception in > connection from /10.196.134.220:7337 > java.lang.OutOfMemoryError: Direct buffer memory > at java.nio.Bits.reserveMemory(Bits.java:658) > at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123) > at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306) > at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:645) > at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:228) > at io.netty.buffer.PoolArena.allocate(PoolArena.java:212) > at io.netty.buffer.PoolArena.allocate(PoolArena.java:132) > at > io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:271) > at > io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:155) > at > io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:146) > at > io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:107) > at > io.netty.channel.AdaptiveRecvByteBufAllocator$HandleImpl.allocate(AdaptiveRecvByteBufAllocator.java:104) > at > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:117) > at > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) > at > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) > at java.lang.Thread.run(Thread.java:744) > 16/02/17 15:36:36 ERROR client.TransportResponseHandler: Still have 1 > requests outstanding when connection from /10.196.134.220:7337 is closed > 16/02/17 15:36:36 ERROR shuffle.RetryingBlockFetcher: Failed to fetch block > shuffle_3_81_2, and will not retry (0 retries) > {code} > The reason is that when shuffle a big block(like 1G), task will allocate > the same memory, it will easily throw "FetchFailedException: Direct buffer > memory". > If I add -Dio.netty.noUnsafe=true spark.executor.extraJavaOptions, it will > throw > {code} > java.lang.OutOfMemoryError: Java heap space > at > io.netty.buffer.PoolArena$HeapArena.newUnpooledChunk(PoolArena.java:607) > at io.netty.buffer.PoolArena.allocateHuge(PoolArena.java:237) > at io.netty.buffer.PoolArena.allocate(PoolArena.java:215) > at io.netty.buffer.PoolArena.allocate(PoolArena.java:132) > {code} > > In mapreduce shuffle, it will firstly judge whether the block can cache in > memery, but spark doesn't. > If the block is more than we can cache in memory, we should write to disk. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-16827) Stop reporting spill metrics as shuffle metrics
[ https://issues.apache.org/jira/browse/SPARK-16827?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sital Kedia updated SPARK-16827: Summary: Stop reporting spill metrics as shuffle metrics (was: Query with Join produces excessive amount of shuffle data) > Stop reporting spill metrics as shuffle metrics > --- > > Key: SPARK-16827 > URL: https://issues.apache.org/jira/browse/SPARK-16827 > Project: Spark > Issue Type: Bug > Components: Shuffle, Spark Core >Affects Versions: 2.0.0 >Reporter: Sital Kedia > Labels: performance > > One of our hive job which looks like this - > {code} > SELECT userid > FROM table1 a > JOIN table2 b > ONa.ds = '2016-07-15' > AND b.ds = '2016-07-15' > AND a.source_id = b.id > {code} > After upgrade to Spark 2.0 the job is significantly slow. Digging a little > into it, we found out that one of the stages produces excessive amount of > shuffle data. Please note that this is a regression from Spark 1.6. Stage 2 > of the job which used to produce 32KB shuffle data with 1.6, now produces > more than 400GB with Spark 2.0. We also tried turning off whole stage code > generation but that did not help. > PS - Even if the intermediate shuffle data size is huge, the job still > produces accurate output. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-17839) Use Nio's directbuffer instead of BufferedInputStream in order to avoid additional copy from os buffer cache to user buffer
[ https://issues.apache.org/jira/browse/SPARK-17839?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sital Kedia updated SPARK-17839: Summary: Use Nio's directbuffer instead of BufferedInputStream in order to avoid additional copy from os buffer cache to user buffer (was: UnsafeSorterSpillReader should use Nio's directbuffer to read the spill files in order to avoid additional copy) > Use Nio's directbuffer instead of BufferedInputStream in order to avoid > additional copy from os buffer cache to user buffer > > > Key: SPARK-17839 > URL: https://issues.apache.org/jira/browse/SPARK-17839 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 2.0.1 >Reporter: Sital Kedia >Priority: Minor > > Currently we use BufferedInputStream to read the shuffle file which copies > the file content from os buffer cache to the user buffer. This adds > additional latency in reading the spill files. We made a change to use java > nio's direct buffer to read the spill files and for certain jobs spilling > significant amount of data, we see between 5 - 7% speedup. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-17839) UnsafeSorterSpillReader should use Nio's directbuffer to read the spill files in order to avoid additional copy
Sital Kedia created SPARK-17839: --- Summary: UnsafeSorterSpillReader should use Nio's directbuffer to read the spill files in order to avoid additional copy Key: SPARK-17839 URL: https://issues.apache.org/jira/browse/SPARK-17839 Project: Spark Issue Type: Improvement Components: Shuffle Affects Versions: 2.0.1 Reporter: Sital Kedia Priority: Minor Currently we use BufferedInputStream to read the shuffle file which copies the file content from os buffer cache to the user buffer. This adds additional latency in reading the spill files. We made a change to use java nio's direct buffer to read the spill files and for certain jobs spilling significant amount of data, we see between 5 - 7% speedup. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-17509) When wrapping catalyst datatype to Hive data type avoid pattern matching
[ https://issues.apache.org/jira/browse/SPARK-17509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sital Kedia updated SPARK-17509: Affects Version/s: 2.0.0 Description: Profiling a job, we saw that patten matching in wrap function of HiveInspector is consuming around 10% of the time which can be avoided. A similar change in the unwrap function was made in SPARK-15956. Component/s: SQL Issue Type: Improvement (was: Bug) > When wrapping catalyst datatype to Hive data type avoid pattern matching > > > Key: SPARK-17509 > URL: https://issues.apache.org/jira/browse/SPARK-17509 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.0.0 >Reporter: Sital Kedia > > Profiling a job, we saw that patten matching in wrap function of > HiveInspector is consuming around 10% of the time which can be avoided. A > similar change in the unwrap function was made in SPARK-15956. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-17509) When wrapping catalyst datatype to Hive data type avoid pattern matching
Sital Kedia created SPARK-17509: --- Summary: When wrapping catalyst datatype to Hive data type avoid pattern matching Key: SPARK-17509 URL: https://issues.apache.org/jira/browse/SPARK-17509 Project: Spark Issue Type: Bug Reporter: Sital Kedia -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-16922) Query with Broadcast Hash join fails due to executor OOM in Spark 2.0
[ https://issues.apache.org/jira/browse/SPARK-16922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15469092#comment-15469092 ] Sital Kedia commented on SPARK-16922: - There is no noticable performance gain I observed comparing to BytesToBytesMap. Part of the reason might be due to the fact that BroadcastHashJoin was consuming very less percentage of the total job time. > Query with Broadcast Hash join fails due to executor OOM in Spark 2.0 > - > > Key: SPARK-16922 > URL: https://issues.apache.org/jira/browse/SPARK-16922 > Project: Spark > Issue Type: Bug > Components: Shuffle >Affects Versions: 2.0.0 >Reporter: Sital Kedia >Assignee: Davies Liu > Fix For: 2.0.1, 2.1.0 > > > A query which used to work in Spark 1.6 fails with executor OOM in 2.0. > Stack trace - > {code} > at > org.apache.spark.unsafe.types.UTF8String.getBytes(UTF8String.java:229) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.hash$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.findOrInsert(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:161) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) > at org.apache.spark.scheduler.Task.run(Task.scala:85) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > Query plan in Spark 1.6 > {code} > == Physical Plan == > TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / > 100.0)),mode=Final,isDistinct=false)], output=[field1#101,field3#3]) > +- TungstenExchange hashpartitioning(field1#101,200), None >+- TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / > 100.0)),mode=Partial,isDistinct=false)], output=[field1#101,sum#111]) > +- Project [field1#101,field2#74] > +- BroadcastHashJoin [field5#63L], [cast(cast(field4#97 as > decimal(20,0)) as bigint)], BuildRight > :- ConvertToUnsafe > : +- HiveTableScan [field2#74,field5#63L], MetastoreRelation > foo, table1, Some(a), [(ds#57 >= 2013-10-01),(ds#57 <= 2013-12-31)] > +- ConvertToUnsafe >+- HiveTableScan [field1#101,field4#97], MetastoreRelation > foo, table2, Some(b) > {code} > Query plan in 2.0 > {code} > == Physical Plan == > *HashAggregate(keys=[field1#160], functions=[sum((field2#133 / 100.0))]) > +- Exchange hashpartitioning(field1#160, 200) >+- *HashAggregate(keys=[field1#160], functions=[partial_sum((field2#133 / > 100.0))]) > +- *Project [field2#133, field1#160] > +- *BroadcastHashJoin [field5#122L], [cast(cast(field4#156 as > decimal(20,0)) as bigint)], Inner, BuildRight > :- *Filter isnotnull(field5#122L) > : +- HiveTableScan [field5#122L, field2#133], MetastoreRelation > foo, table1, a, [isnotnull(ds#116), (ds#116 >= 2013-10-01), (ds#116 <= > 2013-12-31)] > +- BroadcastExchange > HashedRelationBroadcastMode(List(cast(cast(input[0, string, false] as > decimal(20,0)) as bigint))) >+- *Filter isnotnull(field4#156) > +- HiveTableScan [field4#156, field1#160], > MetastoreRelation foo, table2, b > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-16922) Query with Broadcast Hash join fails due to executor OOM in Spark 2.0
[ https://issues.apache.org/jira/browse/SPARK-16922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15468339#comment-15468339 ] Sital Kedia commented on SPARK-16922: - [~davies] - Thanks for looking into this. I tested the failing job with the fix and it works fine now. > Query with Broadcast Hash join fails due to executor OOM in Spark 2.0 > - > > Key: SPARK-16922 > URL: https://issues.apache.org/jira/browse/SPARK-16922 > Project: Spark > Issue Type: Bug > Components: Shuffle >Affects Versions: 2.0.0 >Reporter: Sital Kedia >Assignee: Davies Liu > Fix For: 2.0.1, 2.1.0 > > > A query which used to work in Spark 1.6 fails with executor OOM in 2.0. > Stack trace - > {code} > at > org.apache.spark.unsafe.types.UTF8String.getBytes(UTF8String.java:229) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.hash$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.findOrInsert(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:161) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) > at org.apache.spark.scheduler.Task.run(Task.scala:85) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > Query plan in Spark 1.6 > {code} > == Physical Plan == > TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / > 100.0)),mode=Final,isDistinct=false)], output=[field1#101,field3#3]) > +- TungstenExchange hashpartitioning(field1#101,200), None >+- TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / > 100.0)),mode=Partial,isDistinct=false)], output=[field1#101,sum#111]) > +- Project [field1#101,field2#74] > +- BroadcastHashJoin [field5#63L], [cast(cast(field4#97 as > decimal(20,0)) as bigint)], BuildRight > :- ConvertToUnsafe > : +- HiveTableScan [field2#74,field5#63L], MetastoreRelation > foo, table1, Some(a), [(ds#57 >= 2013-10-01),(ds#57 <= 2013-12-31)] > +- ConvertToUnsafe >+- HiveTableScan [field1#101,field4#97], MetastoreRelation > foo, table2, Some(b) > {code} > Query plan in 2.0 > {code} > == Physical Plan == > *HashAggregate(keys=[field1#160], functions=[sum((field2#133 / 100.0))]) > +- Exchange hashpartitioning(field1#160, 200) >+- *HashAggregate(keys=[field1#160], functions=[partial_sum((field2#133 / > 100.0))]) > +- *Project [field2#133, field1#160] > +- *BroadcastHashJoin [field5#122L], [cast(cast(field4#156 as > decimal(20,0)) as bigint)], Inner, BuildRight > :- *Filter isnotnull(field5#122L) > : +- HiveTableScan [field5#122L, field2#133], MetastoreRelation > foo, table1, a, [isnotnull(ds#116), (ds#116 >= 2013-10-01), (ds#116 <= > 2013-12-31)] > +- BroadcastExchange > HashedRelationBroadcastMode(List(cast(cast(input[0, string, false] as > decimal(20,0)) as bigint))) >+- *Filter isnotnull(field4#156) > +- HiveTableScan [field4#156, field1#160], > MetastoreRelation foo, table2, b > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-16922) Query with Broadcast Hash join fails due to executor OOM in Spark 2.0
[ https://issues.apache.org/jira/browse/SPARK-16922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15456744#comment-15456744 ] Sital Kedia commented on SPARK-16922: - Thanks for the fix [~davies]. I will test this change with our job to see if that fixes the issue. PS - I am away this week. Will get to it some time next week. > Query with Broadcast Hash join fails due to executor OOM in Spark 2.0 > - > > Key: SPARK-16922 > URL: https://issues.apache.org/jira/browse/SPARK-16922 > Project: Spark > Issue Type: Bug > Components: Shuffle >Affects Versions: 2.0.0 >Reporter: Sital Kedia >Assignee: Davies Liu > > A query which used to work in Spark 1.6 fails with executor OOM in 2.0. > Stack trace - > {code} > at > org.apache.spark.unsafe.types.UTF8String.getBytes(UTF8String.java:229) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.hash$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.findOrInsert(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:161) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) > at org.apache.spark.scheduler.Task.run(Task.scala:85) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > Query plan in Spark 1.6 > {code} > == Physical Plan == > TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / > 100.0)),mode=Final,isDistinct=false)], output=[field1#101,field3#3]) > +- TungstenExchange hashpartitioning(field1#101,200), None >+- TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / > 100.0)),mode=Partial,isDistinct=false)], output=[field1#101,sum#111]) > +- Project [field1#101,field2#74] > +- BroadcastHashJoin [field5#63L], [cast(cast(field4#97 as > decimal(20,0)) as bigint)], BuildRight > :- ConvertToUnsafe > : +- HiveTableScan [field2#74,field5#63L], MetastoreRelation > foo, table1, Some(a), [(ds#57 >= 2013-10-01),(ds#57 <= 2013-12-31)] > +- ConvertToUnsafe >+- HiveTableScan [field1#101,field4#97], MetastoreRelation > foo, table2, Some(b) > {code} > Query plan in 2.0 > {code} > == Physical Plan == > *HashAggregate(keys=[field1#160], functions=[sum((field2#133 / 100.0))]) > +- Exchange hashpartitioning(field1#160, 200) >+- *HashAggregate(keys=[field1#160], functions=[partial_sum((field2#133 / > 100.0))]) > +- *Project [field2#133, field1#160] > +- *BroadcastHashJoin [field5#122L], [cast(cast(field4#156 as > decimal(20,0)) as bigint)], Inner, BuildRight > :- *Filter isnotnull(field5#122L) > : +- HiveTableScan [field5#122L, field2#133], MetastoreRelation > foo, table1, a, [isnotnull(ds#116), (ds#116 >= 2013-10-01), (ds#116 <= > 2013-12-31)] > +- BroadcastExchange > HashedRelationBroadcastMode(List(cast(cast(input[0, string, false] as > decimal(20,0)) as bigint))) >+- *Filter isnotnull(field4#156) > +- HiveTableScan [field4#156, field1#160], > MetastoreRelation foo, table2, b > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Closed] (SPARK-17164) Query with colon in the table name fails to parse in 2.0
[ https://issues.apache.org/jira/browse/SPARK-17164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sital Kedia closed SPARK-17164. --- Resolution: Won't Fix > Query with colon in the table name fails to parse in 2.0 > > > Key: SPARK-17164 > URL: https://issues.apache.org/jira/browse/SPARK-17164 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0 >Reporter: Sital Kedia > > Running a simple query with colon in table name fails to parse in 2.0 > {code} > == SQL == > SELECT * FROM a:b > ---^^^ > at > org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:197) > at > org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:99) > at > org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:46) > at > org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:53) > at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582) > at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:682) > ... 48 elided > {code} > Please note that this is a regression from Spark 1.6 as the query runs fine > in 1.6. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17164) Query with colon in the table name fails to parse in 2.0
[ https://issues.apache.org/jira/browse/SPARK-17164?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15431031#comment-15431031 ] Sital Kedia commented on SPARK-17164: - Thanks [~rxin], [~hvanhovell], that makes sense. The issue is on our side. For Spark 1.6 we were using our internal version of Hive parser which supports table name with colon. Since Spark 2.0, no longer uses the Hive parser, this broke on our side. We will use backtick for such tables to workaround this issue. > Query with colon in the table name fails to parse in 2.0 > > > Key: SPARK-17164 > URL: https://issues.apache.org/jira/browse/SPARK-17164 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0 >Reporter: Sital Kedia > > Running a simple query with colon in table name fails to parse in 2.0 > {code} > == SQL == > SELECT * FROM a:b > ---^^^ > at > org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:197) > at > org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:99) > at > org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:46) > at > org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:53) > at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582) > at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:682) > ... 48 elided > {code} > Please note that this is a regression from Spark 1.6 as the query runs fine > in 1.6. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17164) Query with colon in the table name fails to parse in 2.0
[ https://issues.apache.org/jira/browse/SPARK-17164?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15429193#comment-15429193 ] Sital Kedia commented on SPARK-17164: - cc - [~hvanhovell], [~rxin] > Query with colon in the table name fails to parse in 2.0 > > > Key: SPARK-17164 > URL: https://issues.apache.org/jira/browse/SPARK-17164 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0 >Reporter: Sital Kedia > > Running a simple query with colon in table name fails to parse in 2.0 > {code} > == SQL == > SELECT * FROM a:b > ---^^^ > at > org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:197) > at > org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:99) > at > org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:46) > at > org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:53) > at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582) > at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:682) > ... 48 elided > {code} > Please note that this is a regression from Spark 1.6 as the query runs fine > in 1.6. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-17164) Query with colon in the table name fails to parse in 2.0
Sital Kedia created SPARK-17164: --- Summary: Query with colon in the table name fails to parse in 2.0 Key: SPARK-17164 URL: https://issues.apache.org/jira/browse/SPARK-17164 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.0.0 Reporter: Sital Kedia Running a simple query with colon in table name fails to parse in 2.0 {code} == SQL == SELECT * FROM a:b ---^^^ at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:197) at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:99) at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:46) at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:53) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582) at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:682) ... 48 elided {code} Please note that this is a regression from Spark 1.6 as the query runs fine in 1.6. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-16922) Query with Broadcast Hash join fails due to executor OOM in Spark 2.0
[ https://issues.apache.org/jira/browse/SPARK-16922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15427429#comment-15427429 ] Sital Kedia commented on SPARK-16922: - Kryo > Query with Broadcast Hash join fails due to executor OOM in Spark 2.0 > - > > Key: SPARK-16922 > URL: https://issues.apache.org/jira/browse/SPARK-16922 > Project: Spark > Issue Type: Bug > Components: Shuffle >Affects Versions: 2.0.0 >Reporter: Sital Kedia > > A query which used to work in Spark 1.6 fails with executor OOM in 2.0. > Stack trace - > {code} > at > org.apache.spark.unsafe.types.UTF8String.getBytes(UTF8String.java:229) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.hash$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.findOrInsert(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:161) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) > at org.apache.spark.scheduler.Task.run(Task.scala:85) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > Query plan in Spark 1.6 > {code} > == Physical Plan == > TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / > 100.0)),mode=Final,isDistinct=false)], output=[field1#101,field3#3]) > +- TungstenExchange hashpartitioning(field1#101,200), None >+- TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / > 100.0)),mode=Partial,isDistinct=false)], output=[field1#101,sum#111]) > +- Project [field1#101,field2#74] > +- BroadcastHashJoin [field5#63L], [cast(cast(field4#97 as > decimal(20,0)) as bigint)], BuildRight > :- ConvertToUnsafe > : +- HiveTableScan [field2#74,field5#63L], MetastoreRelation > foo, table1, Some(a), [(ds#57 >= 2013-10-01),(ds#57 <= 2013-12-31)] > +- ConvertToUnsafe >+- HiveTableScan [field1#101,field4#97], MetastoreRelation > foo, table2, Some(b) > {code} > Query plan in 2.0 > {code} > == Physical Plan == > *HashAggregate(keys=[field1#160], functions=[sum((field2#133 / 100.0))]) > +- Exchange hashpartitioning(field1#160, 200) >+- *HashAggregate(keys=[field1#160], functions=[partial_sum((field2#133 / > 100.0))]) > +- *Project [field2#133, field1#160] > +- *BroadcastHashJoin [field5#122L], [cast(cast(field4#156 as > decimal(20,0)) as bigint)], Inner, BuildRight > :- *Filter isnotnull(field5#122L) > : +- HiveTableScan [field5#122L, field2#133], MetastoreRelation > foo, table1, a, [isnotnull(ds#116), (ds#116 >= 2013-10-01), (ds#116 <= > 2013-12-31)] > +- BroadcastExchange > HashedRelationBroadcastMode(List(cast(cast(input[0, string, false] as > decimal(20,0)) as bigint))) >+- *Filter isnotnull(field4#156) > +- HiveTableScan [field4#156, field1#160], > MetastoreRelation foo, table2, b > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-16922) Query with Broadcast Hash join fails due to executor OOM in Spark 2.0
[ https://issues.apache.org/jira/browse/SPARK-16922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15427259#comment-15427259 ] Sital Kedia commented on SPARK-16922: - >> Could you also try to disable the dense mode? I tried disabling the dense mode, that did not help either. > Query with Broadcast Hash join fails due to executor OOM in Spark 2.0 > - > > Key: SPARK-16922 > URL: https://issues.apache.org/jira/browse/SPARK-16922 > Project: Spark > Issue Type: Bug > Components: Shuffle >Affects Versions: 2.0.0 >Reporter: Sital Kedia > > A query which used to work in Spark 1.6 fails with executor OOM in 2.0. > Stack trace - > {code} > at > org.apache.spark.unsafe.types.UTF8String.getBytes(UTF8String.java:229) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.hash$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.findOrInsert(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:161) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) > at org.apache.spark.scheduler.Task.run(Task.scala:85) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > Query plan in Spark 1.6 > {code} > == Physical Plan == > TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / > 100.0)),mode=Final,isDistinct=false)], output=[field1#101,field3#3]) > +- TungstenExchange hashpartitioning(field1#101,200), None >+- TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / > 100.0)),mode=Partial,isDistinct=false)], output=[field1#101,sum#111]) > +- Project [field1#101,field2#74] > +- BroadcastHashJoin [field5#63L], [cast(cast(field4#97 as > decimal(20,0)) as bigint)], BuildRight > :- ConvertToUnsafe > : +- HiveTableScan [field2#74,field5#63L], MetastoreRelation > foo, table1, Some(a), [(ds#57 >= 2013-10-01),(ds#57 <= 2013-12-31)] > +- ConvertToUnsafe >+- HiveTableScan [field1#101,field4#97], MetastoreRelation > foo, table2, Some(b) > {code} > Query plan in 2.0 > {code} > == Physical Plan == > *HashAggregate(keys=[field1#160], functions=[sum((field2#133 / 100.0))]) > +- Exchange hashpartitioning(field1#160, 200) >+- *HashAggregate(keys=[field1#160], functions=[partial_sum((field2#133 / > 100.0))]) > +- *Project [field2#133, field1#160] > +- *BroadcastHashJoin [field5#122L], [cast(cast(field4#156 as > decimal(20,0)) as bigint)], Inner, BuildRight > :- *Filter isnotnull(field5#122L) > : +- HiveTableScan [field5#122L, field2#133], MetastoreRelation > foo, table1, a, [isnotnull(ds#116), (ds#116 >= 2013-10-01), (ds#116 <= > 2013-12-31)] > +- BroadcastExchange > HashedRelationBroadcastMode(List(cast(cast(input[0, string, false] as > decimal(20,0)) as bigint))) >+- *Filter isnotnull(field4#156) > +- HiveTableScan [field4#156, field1#160], > MetastoreRelation foo, table2, b > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-16922) Query with Broadcast Hash join fails due to executor OOM in Spark 2.0
[ https://issues.apache.org/jira/browse/SPARK-16922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15427250#comment-15427250 ] Sital Kedia commented on SPARK-16922: - The failure is deterministic, we are reproducing the issue for every run of the job (Its not only one job, there are multiple jobs that are failing because of this). For now, we have made a change to not use the LongHashedRelation to workaround this issue. > Query with Broadcast Hash join fails due to executor OOM in Spark 2.0 > - > > Key: SPARK-16922 > URL: https://issues.apache.org/jira/browse/SPARK-16922 > Project: Spark > Issue Type: Bug > Components: Shuffle >Affects Versions: 2.0.0 >Reporter: Sital Kedia > > A query which used to work in Spark 1.6 fails with executor OOM in 2.0. > Stack trace - > {code} > at > org.apache.spark.unsafe.types.UTF8String.getBytes(UTF8String.java:229) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.hash$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.findOrInsert(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:161) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) > at org.apache.spark.scheduler.Task.run(Task.scala:85) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > Query plan in Spark 1.6 > {code} > == Physical Plan == > TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / > 100.0)),mode=Final,isDistinct=false)], output=[field1#101,field3#3]) > +- TungstenExchange hashpartitioning(field1#101,200), None >+- TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / > 100.0)),mode=Partial,isDistinct=false)], output=[field1#101,sum#111]) > +- Project [field1#101,field2#74] > +- BroadcastHashJoin [field5#63L], [cast(cast(field4#97 as > decimal(20,0)) as bigint)], BuildRight > :- ConvertToUnsafe > : +- HiveTableScan [field2#74,field5#63L], MetastoreRelation > foo, table1, Some(a), [(ds#57 >= 2013-10-01),(ds#57 <= 2013-12-31)] > +- ConvertToUnsafe >+- HiveTableScan [field1#101,field4#97], MetastoreRelation > foo, table2, Some(b) > {code} > Query plan in 2.0 > {code} > == Physical Plan == > *HashAggregate(keys=[field1#160], functions=[sum((field2#133 / 100.0))]) > +- Exchange hashpartitioning(field1#160, 200) >+- *HashAggregate(keys=[field1#160], functions=[partial_sum((field2#133 / > 100.0))]) > +- *Project [field2#133, field1#160] > +- *BroadcastHashJoin [field5#122L], [cast(cast(field4#156 as > decimal(20,0)) as bigint)], Inner, BuildRight > :- *Filter isnotnull(field5#122L) > : +- HiveTableScan [field5#122L, field2#133], MetastoreRelation > foo, table1, a, [isnotnull(ds#116), (ds#116 >= 2013-10-01), (ds#116 <= > 2013-12-31)] > +- BroadcastExchange > HashedRelationBroadcastMode(List(cast(cast(input[0, string, false] as > decimal(20,0)) as bigint))) >+- *Filter isnotnull(field4#156) > +- HiveTableScan [field4#156, field1#160], > MetastoreRelation foo, table2, b > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-17113) Job failure due to Executor OOM in offheap mode
[ https://issues.apache.org/jira/browse/SPARK-17113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sital Kedia updated SPARK-17113: Summary: Job failure due to Executor OOM in offheap mode (was: Job failure due to Executor OOM) > Job failure due to Executor OOM in offheap mode > --- > > Key: SPARK-17113 > URL: https://issues.apache.org/jira/browse/SPARK-17113 > Project: Spark > Issue Type: Bug >Affects Versions: 1.6.2, 2.0.0 >Reporter: Sital Kedia > > We have been seeing many job failure due to executor OOM with following stack > trace - > {code} > java.lang.OutOfMemoryError: Unable to acquire 1220 bytes of memory, got 0 > at > org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:120) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:341) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:362) > at > org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:93) > at > org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:170) > at org.apache.spark.sql.execution.Sort$$anonfun$1.apply(Sort.scala:90) > at org.apache.spark.sql.execution.Sort$$anonfun$1.apply(Sort.scala:64) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:736) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:736) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:307) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:271) > at > org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:307) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:271) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:307) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:271) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:307) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:271) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) > at org.apache.spark.scheduler.Task.run(Task.scala:89) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > Digging into the code, we found out that this is an issue with cooperative > memory management for off heap memory allocation. > In the code > https://github.com/sitalkedia/spark/blob/master/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java#L463, > when the UnsafeExternalSorter is checking if memory page is being used by > upstream, the base object in case of off heap memory is always null so the > UnsafeExternalSorter does not spill the memory pages. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17113) Job failure due to Executor OOM
[ https://issues.apache.org/jira/browse/SPARK-17113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15425216#comment-15425216 ] Sital Kedia commented on SPARK-17113: - cc - [~davies] > Job failure due to Executor OOM > --- > > Key: SPARK-17113 > URL: https://issues.apache.org/jira/browse/SPARK-17113 > Project: Spark > Issue Type: Bug >Affects Versions: 1.6.2, 2.0.0 >Reporter: Sital Kedia > > We have been seeing many job failure due to executor OOM with following stack > trace - > {code} > java.lang.OutOfMemoryError: Unable to acquire 1220 bytes of memory, got 0 > at > org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:120) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:341) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:362) > at > org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:93) > at > org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:170) > at org.apache.spark.sql.execution.Sort$$anonfun$1.apply(Sort.scala:90) > at org.apache.spark.sql.execution.Sort$$anonfun$1.apply(Sort.scala:64) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:736) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:736) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:307) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:271) > at > org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:307) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:271) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:307) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:271) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:307) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:271) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) > at org.apache.spark.scheduler.Task.run(Task.scala:89) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > Digging into the code, we found out that this is an issue with cooperative > memory management for off heap memory allocation. > In the code > https://github.com/sitalkedia/spark/blob/master/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java#L463, > when the UnsafeExternalSorter is checking if memory page is being used by > upstream, the base object in case of off heap memory is always null so the > UnsafeExternalSorter does not spill the memory pages. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-17113) Job failure due to Executor OOM
Sital Kedia created SPARK-17113: --- Summary: Job failure due to Executor OOM Key: SPARK-17113 URL: https://issues.apache.org/jira/browse/SPARK-17113 Project: Spark Issue Type: Bug Affects Versions: 2.0.0, 1.6.2 Reporter: Sital Kedia We have been seeing many job failure due to executor OOM with following stack trace - {code} java.lang.OutOfMemoryError: Unable to acquire 1220 bytes of memory, got 0 at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:120) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:341) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:362) at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:93) at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:170) at org.apache.spark.sql.execution.Sort$$anonfun$1.apply(Sort.scala:90) at org.apache.spark.sql.execution.Sort$$anonfun$1.apply(Sort.scala:64) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:736) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:736) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:307) at org.apache.spark.rdd.RDD.iterator(RDD.scala:271) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:307) at org.apache.spark.rdd.RDD.iterator(RDD.scala:271) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:307) at org.apache.spark.rdd.RDD.iterator(RDD.scala:271) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:307) at org.apache.spark.rdd.RDD.iterator(RDD.scala:271) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) {code} Digging into the code, we found out that this is an issue with cooperative memory management for off heap memory allocation. In the code https://github.com/sitalkedia/spark/blob/master/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java#L463, when the UnsafeExternalSorter is checking if memory page is being used by upstream, the base object in case of off heap memory is always null so the UnsafeExternalSorter does not spill the memory pages. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-16922) Query with Broadcast Hash join fails due to executor OOM in Spark 2.0
[ https://issues.apache.org/jira/browse/SPARK-16922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15421524#comment-15421524 ] Sital Kedia commented on SPARK-16922: - Yes, I have the above mentioned PR as well. > Query with Broadcast Hash join fails due to executor OOM in Spark 2.0 > - > > Key: SPARK-16922 > URL: https://issues.apache.org/jira/browse/SPARK-16922 > Project: Spark > Issue Type: Bug > Components: Shuffle >Affects Versions: 2.0.0 >Reporter: Sital Kedia > > A query which used to work in Spark 1.6 fails with executor OOM in 2.0. > Stack trace - > {code} > at > org.apache.spark.unsafe.types.UTF8String.getBytes(UTF8String.java:229) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.hash$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.findOrInsert(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:161) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) > at org.apache.spark.scheduler.Task.run(Task.scala:85) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > Query plan in Spark 1.6 > {code} > == Physical Plan == > TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / > 100.0)),mode=Final,isDistinct=false)], output=[field1#101,field3#3]) > +- TungstenExchange hashpartitioning(field1#101,200), None >+- TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / > 100.0)),mode=Partial,isDistinct=false)], output=[field1#101,sum#111]) > +- Project [field1#101,field2#74] > +- BroadcastHashJoin [field5#63L], [cast(cast(field4#97 as > decimal(20,0)) as bigint)], BuildRight > :- ConvertToUnsafe > : +- HiveTableScan [field2#74,field5#63L], MetastoreRelation > foo, table1, Some(a), [(ds#57 >= 2013-10-01),(ds#57 <= 2013-12-31)] > +- ConvertToUnsafe >+- HiveTableScan [field1#101,field4#97], MetastoreRelation > foo, table2, Some(b) > {code} > Query plan in 2.0 > {code} > == Physical Plan == > *HashAggregate(keys=[field1#160], functions=[sum((field2#133 / 100.0))]) > +- Exchange hashpartitioning(field1#160, 200) >+- *HashAggregate(keys=[field1#160], functions=[partial_sum((field2#133 / > 100.0))]) > +- *Project [field2#133, field1#160] > +- *BroadcastHashJoin [field5#122L], [cast(cast(field4#156 as > decimal(20,0)) as bigint)], Inner, BuildRight > :- *Filter isnotnull(field5#122L) > : +- HiveTableScan [field5#122L, field2#133], MetastoreRelation > foo, table1, a, [isnotnull(ds#116), (ds#116 >= 2013-10-01), (ds#116 <= > 2013-12-31)] > +- BroadcastExchange > HashedRelationBroadcastMode(List(cast(cast(input[0, string, false] as > decimal(20,0)) as bigint))) >+- *Filter isnotnull(field4#156) > +- HiveTableScan [field4#156, field1#160], > MetastoreRelation foo, table2, b > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-16922) Query with Broadcast Hash join fails due to executor OOM in Spark 2.0
[ https://issues.apache.org/jira/browse/SPARK-16922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15419458#comment-15419458 ] Sital Kedia commented on SPARK-16922: - I am using the fix in https://github.com/apache/spark/pull/14464/files, still the issue remains. The joining key lies in the range [43304915L to 10150946266075397L] > Query with Broadcast Hash join fails due to executor OOM in Spark 2.0 > - > > Key: SPARK-16922 > URL: https://issues.apache.org/jira/browse/SPARK-16922 > Project: Spark > Issue Type: Bug > Components: Shuffle >Affects Versions: 2.0.0 >Reporter: Sital Kedia > > A query which used to work in Spark 1.6 fails with executor OOM in 2.0. > Stack trace - > {code} > at > org.apache.spark.unsafe.types.UTF8String.getBytes(UTF8String.java:229) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.hash$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.findOrInsert(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:161) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) > at org.apache.spark.scheduler.Task.run(Task.scala:85) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > Query plan in Spark 1.6 > {code} > == Physical Plan == > TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / > 100.0)),mode=Final,isDistinct=false)], output=[field1#101,field3#3]) > +- TungstenExchange hashpartitioning(field1#101,200), None >+- TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / > 100.0)),mode=Partial,isDistinct=false)], output=[field1#101,sum#111]) > +- Project [field1#101,field2#74] > +- BroadcastHashJoin [field5#63L], [cast(cast(field4#97 as > decimal(20,0)) as bigint)], BuildRight > :- ConvertToUnsafe > : +- HiveTableScan [field2#74,field5#63L], MetastoreRelation > foo, table1, Some(a), [(ds#57 >= 2013-10-01),(ds#57 <= 2013-12-31)] > +- ConvertToUnsafe >+- HiveTableScan [field1#101,field4#97], MetastoreRelation > foo, table2, Some(b) > {code} > Query plan in 2.0 > {code} > == Physical Plan == > *HashAggregate(keys=[field1#160], functions=[sum((field2#133 / 100.0))]) > +- Exchange hashpartitioning(field1#160, 200) >+- *HashAggregate(keys=[field1#160], functions=[partial_sum((field2#133 / > 100.0))]) > +- *Project [field2#133, field1#160] > +- *BroadcastHashJoin [field5#122L], [cast(cast(field4#156 as > decimal(20,0)) as bigint)], Inner, BuildRight > :- *Filter isnotnull(field5#122L) > : +- HiveTableScan [field5#122L, field2#133], MetastoreRelation > foo, table1, a, [isnotnull(ds#116), (ds#116 >= 2013-10-01), (ds#116 <= > 2013-12-31)] > +- BroadcastExchange > HashedRelationBroadcastMode(List(cast(cast(input[0, string, false] as > decimal(20,0)) as bigint))) >+- *Filter isnotnull(field4#156) > +- HiveTableScan [field4#156, field1#160], > MetastoreRelation foo, table2, b > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-16922) Query with Broadcast Hash join fails due to executor OOM in Spark 2.0
[ https://issues.apache.org/jira/browse/SPARK-16922?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sital Kedia updated SPARK-16922: Summary: Query with Broadcast Hash join fails due to executor OOM in Spark 2.0 (was: Query failure due to executor OOM in Spark 2.0) > Query with Broadcast Hash join fails due to executor OOM in Spark 2.0 > - > > Key: SPARK-16922 > URL: https://issues.apache.org/jira/browse/SPARK-16922 > Project: Spark > Issue Type: Bug > Components: Shuffle >Affects Versions: 2.0.0 >Reporter: Sital Kedia > > A query which used to work in Spark 1.6 fails with executor OOM in 2.0. > Stack trace - > {code} > at > org.apache.spark.unsafe.types.UTF8String.getBytes(UTF8String.java:229) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.hash$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.findOrInsert(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:161) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) > at org.apache.spark.scheduler.Task.run(Task.scala:85) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > Query plan in Spark 1.6 > {code} > == Physical Plan == > TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / > 100.0)),mode=Final,isDistinct=false)], output=[field1#101,field3#3]) > +- TungstenExchange hashpartitioning(field1#101,200), None >+- TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / > 100.0)),mode=Partial,isDistinct=false)], output=[field1#101,sum#111]) > +- Project [field1#101,field2#74] > +- BroadcastHashJoin [field5#63L], [cast(cast(field4#97 as > decimal(20,0)) as bigint)], BuildRight > :- ConvertToUnsafe > : +- HiveTableScan [field2#74,field5#63L], MetastoreRelation > foo, table1, Some(a), [(ds#57 >= 2013-10-01),(ds#57 <= 2013-12-31)] > +- ConvertToUnsafe >+- HiveTableScan [field1#101,field4#97], MetastoreRelation > foo, table2, Some(b) > {code} > Query plan in 2.0 > {code} > == Physical Plan == > *HashAggregate(keys=[field1#160], functions=[sum((field2#133 / 100.0))]) > +- Exchange hashpartitioning(field1#160, 200) >+- *HashAggregate(keys=[field1#160], functions=[partial_sum((field2#133 / > 100.0))]) > +- *Project [field2#133, field1#160] > +- *BroadcastHashJoin [field5#122L], [cast(cast(field4#156 as > decimal(20,0)) as bigint)], Inner, BuildRight > :- *Filter isnotnull(field5#122L) > : +- HiveTableScan [field5#122L, field2#133], MetastoreRelation > foo, table1, a, [isnotnull(ds#116), (ds#116 >= 2013-10-01), (ds#116 <= > 2013-12-31)] > +- BroadcastExchange > HashedRelationBroadcastMode(List(cast(cast(input[0, string, false] as > decimal(20,0)) as bigint))) >+- *Filter isnotnull(field4#156) > +- HiveTableScan [field4#156, field1#160], > MetastoreRelation foo, table2, b > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-16922) Query failure due to executor OOM in Spark 2.0
[ https://issues.apache.org/jira/browse/SPARK-16922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15419383#comment-15419383 ] Sital Kedia edited comment on SPARK-16922 at 8/12/16 8:06 PM: -- I found that the regression was introduced in https://github.com/apache/spark/pull/12278, which introduced a new data structure (LongHashedRelation) for long types. I made a hack to use UnsafeHashedRelation instead of LongHashedRelation in https://github.com/apache/spark/blob/branch-2.0/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala#L105 and things started working fine. This might be due to some data corruption happening in LongHashedRelation. cc - [~davies] was (Author: sitalke...@gmail.com): I found that the regression was introduced in https://github.com/apache/spark/pull/12278, whichintroduced a new data structure (LongHashedRelation) for long types. I made a hack to use UnsafeHashedRelation instead of LongHashedRelation in https://github.com/apache/spark/blob/branch-2.0/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala#L105 and things started working fine. This might be due to some data corruption happening in LongHashedRelation. cc - [~davies] > Query failure due to executor OOM in Spark 2.0 > -- > > Key: SPARK-16922 > URL: https://issues.apache.org/jira/browse/SPARK-16922 > Project: Spark > Issue Type: Bug > Components: Shuffle >Affects Versions: 2.0.0 >Reporter: Sital Kedia > > A query which used to work in Spark 1.6 fails with executor OOM in 2.0. > Stack trace - > {code} > at > org.apache.spark.unsafe.types.UTF8String.getBytes(UTF8String.java:229) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.hash$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.findOrInsert(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:161) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) > at org.apache.spark.scheduler.Task.run(Task.scala:85) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > Query plan in Spark 1.6 > {code} > == Physical Plan == > TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / > 100.0)),mode=Final,isDistinct=false)], output=[field1#101,field3#3]) > +- TungstenExchange hashpartitioning(field1#101,200), None >+- TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / > 100.0)),mode=Partial,isDistinct=false)], output=[field1#101,sum#111]) > +- Project [field1#101,field2#74] > +- BroadcastHashJoin [field5#63L], [cast(cast(field4#97 as > decimal(20,0)) as bigint)], BuildRight > :- ConvertToUnsafe > : +- HiveTableScan [field2#74,field5#63L], MetastoreRelation > foo, table1, Some(a), [(ds#57 >= 2013-10-01),(ds#57 <= 2013-12-31)] > +- ConvertToUnsafe >+- HiveTableScan [field1#101,field4#97], MetastoreRelation > foo, table2, Some(b) > {code} > Query plan in 2.0 > {code} > == Physical Plan == > *HashAggregate(keys=[field1#160], functions=[sum((field2#133 / 100.0))]) > +- Exchange hashpartitioning(field1#160, 200) >+- *HashAggregate(keys=[field1#160], functions=[partial_sum((field2#133 / > 100.0))]) > +- *Project [field2#133, field1#160] > +- *BroadcastHashJoin [field5#122L], [cast(cast(field4#156 as > decimal(20,0)) as bigint)], Inner, BuildRight > :- *Filter isnotnull(field5#122L) > : +- HiveTableScan [field5#122L, field2#133], MetastoreRelation > foo, table1, a, [isnotnull(ds#116), (ds#116 >= 2013-10-01), (ds#116 <= > 2013-12-31)] > +- BroadcastExchange > HashedRelationBroadcastMode(Li
[jira] [Commented] (SPARK-16922) Query failure due to executor OOM in Spark 2.0
[ https://issues.apache.org/jira/browse/SPARK-16922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15419383#comment-15419383 ] Sital Kedia commented on SPARK-16922: - I found that the regression was introduced in https://github.com/apache/spark/pull/12278, whichintroduced a new data structure (LongHashedRelation) for long types. I made a hack to use UnsafeHashedRelation instead of LongHashedRelation in https://github.com/apache/spark/blob/branch-2.0/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala#L105 and things started working fine. This might be due to some data corruption happening in LongHashedRelation. cc - [~davies] > Query failure due to executor OOM in Spark 2.0 > -- > > Key: SPARK-16922 > URL: https://issues.apache.org/jira/browse/SPARK-16922 > Project: Spark > Issue Type: Bug > Components: Shuffle >Affects Versions: 2.0.0 >Reporter: Sital Kedia > > A query which used to work in Spark 1.6 fails with executor OOM in 2.0. > Stack trace - > {code} > at > org.apache.spark.unsafe.types.UTF8String.getBytes(UTF8String.java:229) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.hash$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.findOrInsert(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:161) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) > at org.apache.spark.scheduler.Task.run(Task.scala:85) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > Query plan in Spark 1.6 > {code} > == Physical Plan == > TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / > 100.0)),mode=Final,isDistinct=false)], output=[field1#101,field3#3]) > +- TungstenExchange hashpartitioning(field1#101,200), None >+- TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / > 100.0)),mode=Partial,isDistinct=false)], output=[field1#101,sum#111]) > +- Project [field1#101,field2#74] > +- BroadcastHashJoin [field5#63L], [cast(cast(field4#97 as > decimal(20,0)) as bigint)], BuildRight > :- ConvertToUnsafe > : +- HiveTableScan [field2#74,field5#63L], MetastoreRelation > foo, table1, Some(a), [(ds#57 >= 2013-10-01),(ds#57 <= 2013-12-31)] > +- ConvertToUnsafe >+- HiveTableScan [field1#101,field4#97], MetastoreRelation > foo, table2, Some(b) > {code} > Query plan in 2.0 > {code} > == Physical Plan == > *HashAggregate(keys=[field1#160], functions=[sum((field2#133 / 100.0))]) > +- Exchange hashpartitioning(field1#160, 200) >+- *HashAggregate(keys=[field1#160], functions=[partial_sum((field2#133 / > 100.0))]) > +- *Project [field2#133, field1#160] > +- *BroadcastHashJoin [field5#122L], [cast(cast(field4#156 as > decimal(20,0)) as bigint)], Inner, BuildRight > :- *Filter isnotnull(field5#122L) > : +- HiveTableScan [field5#122L, field2#133], MetastoreRelation > foo, table1, a, [isnotnull(ds#116), (ds#116 >= 2013-10-01), (ds#116 <= > 2013-12-31)] > +- BroadcastExchange > HashedRelationBroadcastMode(List(cast(cast(input[0, string, false] as > decimal(20,0)) as bigint))) >+- *Filter isnotnull(field4#156) > +- HiveTableScan [field4#156, field1#160], > MetastoreRelation foo, table2, b > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-16922) Query failure due to executor OOM in Spark 2.0
[ https://issues.apache.org/jira/browse/SPARK-16922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15410703#comment-15410703 ] Sital Kedia commented on SPARK-16922: - Update - The query works fine when Broadcast hash join in turned off, so the issue might be in broadcast hash join. I put some debug print in UnsafeRowWriter class (https://github.com/apache/spark/blob/branch-2.0/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java#L214) and I found that it is receiving a row of size around 800MB and OOMing while trying to grow the buffer holder. It might suggest that there is some data corruption going on probably in the Broadcast hash join. cc- [~davies] - Any pointer on how to debug this issue further? > Query failure due to executor OOM in Spark 2.0 > -- > > Key: SPARK-16922 > URL: https://issues.apache.org/jira/browse/SPARK-16922 > Project: Spark > Issue Type: Bug > Components: Shuffle >Affects Versions: 2.0.0 >Reporter: Sital Kedia > > A query which used to work in Spark 1.6 fails with executor OOM in 2.0. > Stack trace - > {code} > at > org.apache.spark.unsafe.types.UTF8String.getBytes(UTF8String.java:229) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.hash$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.findOrInsert(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:161) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) > at org.apache.spark.scheduler.Task.run(Task.scala:85) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > Query plan in Spark 1.6 > {code} > == Physical Plan == > TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / > 100.0)),mode=Final,isDistinct=false)], output=[field1#101,field3#3]) > +- TungstenExchange hashpartitioning(field1#101,200), None >+- TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / > 100.0)),mode=Partial,isDistinct=false)], output=[field1#101,sum#111]) > +- Project [field1#101,field2#74] > +- BroadcastHashJoin [field5#63L], [cast(cast(field4#97 as > decimal(20,0)) as bigint)], BuildRight > :- ConvertToUnsafe > : +- HiveTableScan [field2#74,field5#63L], MetastoreRelation > foo, table1, Some(a), [(ds#57 >= 2013-10-01),(ds#57 <= 2013-12-31)] > +- ConvertToUnsafe >+- HiveTableScan [field1#101,field4#97], MetastoreRelation > foo, table2, Some(b) > {code} > Query plan in 2.0 > {code} > == Physical Plan == > *HashAggregate(keys=[field1#160], functions=[sum((field2#133 / 100.0))]) > +- Exchange hashpartitioning(field1#160, 200) >+- *HashAggregate(keys=[field1#160], functions=[partial_sum((field2#133 / > 100.0))]) > +- *Project [field2#133, field1#160] > +- *BroadcastHashJoin [field5#122L], [cast(cast(field4#156 as > decimal(20,0)) as bigint)], Inner, BuildRight > :- *Filter isnotnull(field5#122L) > : +- HiveTableScan [field5#122L, field2#133], MetastoreRelation > foo, table1, a, [isnotnull(ds#116), (ds#116 >= 2013-10-01), (ds#116 <= > 2013-12-31)] > +- BroadcastExchange > HashedRelationBroadcastMode(List(cast(cast(input[0, string, false] as > decimal(20,0)) as bigint))) >+- *Filter isnotnull(field4#156) > +- HiveTableScan [field4#156, field1#160], > MetastoreRelation foo, table2, b > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-16922) Query failure due to executor OOM in Spark 2.0
[ https://issues.apache.org/jira/browse/SPARK-16922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15409878#comment-15409878 ] Sital Kedia commented on SPARK-16922: - PS - Rerunning the query with spark.sql.codegen.aggregate.map.columns.max=0 to disabled vectorized aggregation for columnar map also OOMs, but with a different stack trace. {code} [Stage 1:>(0 + 4) / 184]16/08/05 11:26:31 WARN TaskSetManager: Lost task 2.0 in stage 1.0 (TID 4, hadoop4774.prn2.facebook.com): java.lang.OutOfMemoryError: Java heap space at org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder.grow(BufferHolder.java:73) at org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(UnsafeRowWriter.java:214) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:161) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) {code} > Query failure due to executor OOM in Spark 2.0 > -- > > Key: SPARK-16922 > URL: https://issues.apache.org/jira/browse/SPARK-16922 > Project: Spark > Issue Type: Bug > Components: Shuffle >Affects Versions: 2.0.0 >Reporter: Sital Kedia > > A query which used to work in Spark 1.6 fails with executor OOM in 2.0. > Stack trace - > {code} > at > org.apache.spark.unsafe.types.UTF8String.getBytes(UTF8String.java:229) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.hash$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.findOrInsert(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:161) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) > at org.apache.spark.scheduler.Task.run(Task.scala:85) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > Query plan in Spark 1.6 > {code} > == Physical Plan == > TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / > 100.0)),mode=Final,isDistinct=false)], output=[field1#101,field3#3]) > +- TungstenExchange hashpartitioning(field1#101,200), None >+- TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / > 100.0)),mode=Partial,isDistinct=false)], output=[field1#101,sum#111]) > +- Project [field1#101,field2#74] > +- BroadcastHashJoin [field5#63L], [cast(cast(field4#97 as > decimal(20,0)) as bigint)], BuildRight > :- ConvertToUnsafe > : +- HiveTableScan [field2#74,field5#63L], MetastoreRelation > foo, table1, Some(a), [(ds#57 >= 2013-10-01),(ds#57 <= 2013-12-31)] > +- ConvertToUnsafe >+- HiveTableScan [field1#101,field4#97], MetastoreRelation > foo, table2, Some(b)
[jira] [Commented] (SPARK-16922) Query failure due to executor OOM in Spark 2.0
[ https://issues.apache.org/jira/browse/SPARK-16922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15409813#comment-15409813 ] Sital Kedia commented on SPARK-16922: - cc - [~rxin] > Query failure due to executor OOM in Spark 2.0 > -- > > Key: SPARK-16922 > URL: https://issues.apache.org/jira/browse/SPARK-16922 > Project: Spark > Issue Type: Bug > Components: Shuffle >Affects Versions: 2.0.0 >Reporter: Sital Kedia > > A query which used to work in Spark 1.6 fails with executor OOM in 2.0. > Stack trace - > {code} > at > org.apache.spark.unsafe.types.UTF8String.getBytes(UTF8String.java:229) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.hash$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.findOrInsert(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:161) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) > at org.apache.spark.scheduler.Task.run(Task.scala:85) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > Query plan in Spark 1.6 > {code} > == Physical Plan == > TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / > 100.0)),mode=Final,isDistinct=false)], output=[field1#101,field3#3]) > +- TungstenExchange hashpartitioning(field1#101,200), None >+- TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / > 100.0)),mode=Partial,isDistinct=false)], output=[field1#101,sum#111]) > +- Project [field1#101,field2#74] > +- BroadcastHashJoin [field5#63L], [cast(cast(field4#97 as > decimal(20,0)) as bigint)], BuildRight > :- ConvertToUnsafe > : +- HiveTableScan [field2#74,field5#63L], MetastoreRelation > foo, table1, Some(a), [(ds#57 >= 2013-10-01),(ds#57 <= 2013-12-31)] > +- ConvertToUnsafe >+- HiveTableScan [field1#101,field4#97], MetastoreRelation > foo, table2, Some(b) > {code} > Query plan in 2.0 > {code} > == Physical Plan == > *HashAggregate(keys=[field1#160], functions=[sum((field2#133 / 100.0))]) > +- Exchange hashpartitioning(field1#160, 200) >+- *HashAggregate(keys=[field1#160], functions=[partial_sum((field2#133 / > 100.0))]) > +- *Project [field2#133, field1#160] > +- *BroadcastHashJoin [field5#122L], [cast(cast(field4#156 as > decimal(20,0)) as bigint)], Inner, BuildRight > :- *Filter isnotnull(field5#122L) > : +- HiveTableScan [field5#122L, field2#133], MetastoreRelation > foo, table1, a, [isnotnull(ds#116), (ds#116 >= 2013-10-01), (ds#116 <= > 2013-12-31)] > +- BroadcastExchange > HashedRelationBroadcastMode(List(cast(cast(input[0, string, false] as > decimal(20,0)) as bigint))) >+- *Filter isnotnull(field4#156) > +- HiveTableScan [field4#156, field1#160], > MetastoreRelation foo, table2, b > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-16922) Query failure due to executor OOM in Spark 2.0
Sital Kedia created SPARK-16922: --- Summary: Query failure due to executor OOM in Spark 2.0 Key: SPARK-16922 URL: https://issues.apache.org/jira/browse/SPARK-16922 Project: Spark Issue Type: Bug Components: Shuffle Affects Versions: 2.0.0 Reporter: Sital Kedia A query which used to work in Spark 1.6 fails with executor OOM in 2.0. Stack trace - {code} at org.apache.spark.unsafe.types.UTF8String.getBytes(UTF8String.java:229) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.hash$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.findOrInsert(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:161) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) {code} Query plan in Spark 1.6 {code} == Physical Plan == TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / 100.0)),mode=Final,isDistinct=false)], output=[field1#101,field3#3]) +- TungstenExchange hashpartitioning(field1#101,200), None +- TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / 100.0)),mode=Partial,isDistinct=false)], output=[field1#101,sum#111]) +- Project [field1#101,field2#74] +- BroadcastHashJoin [field5#63L], [cast(cast(field4#97 as decimal(20,0)) as bigint)], BuildRight :- ConvertToUnsafe : +- HiveTableScan [field2#74,field5#63L], MetastoreRelation foo, table1, Some(a), [(ds#57 >= 2013-10-01),(ds#57 <= 2013-12-31)] +- ConvertToUnsafe +- HiveTableScan [field1#101,field4#97], MetastoreRelation foo, table2, Some(b) {code} Query plan in 2.0 {code} == Physical Plan == *HashAggregate(keys=[field1#160], functions=[sum((field2#133 / 100.0))]) +- Exchange hashpartitioning(field1#160, 200) +- *HashAggregate(keys=[field1#160], functions=[partial_sum((field2#133 / 100.0))]) +- *Project [field2#133, field1#160] +- *BroadcastHashJoin [field5#122L], [cast(cast(field4#156 as decimal(20,0)) as bigint)], Inner, BuildRight :- *Filter isnotnull(field5#122L) : +- HiveTableScan [field5#122L, field2#133], MetastoreRelation foo, table1, a, [isnotnull(ds#116), (ds#116 >= 2013-10-01), (ds#116 <= 2013-12-31)] +- BroadcastExchange HashedRelationBroadcastMode(List(cast(cast(input[0, string, false] as decimal(20,0)) as bigint))) +- *Filter isnotnull(field4#156) +- HiveTableScan [field4#156, field1#160], MetastoreRelation foo, table2, b {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-16827) Query with Join produces excessive amount of shuffle data
[ https://issues.apache.org/jira/browse/SPARK-16827?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15402461#comment-15402461 ] Sital Kedia commented on SPARK-16827: - Performance is worse for this job. But I suspect this is not because of shuffle data write. Will update once I figure out the reason. > Query with Join produces excessive amount of shuffle data > - > > Key: SPARK-16827 > URL: https://issues.apache.org/jira/browse/SPARK-16827 > Project: Spark > Issue Type: Bug > Components: Shuffle, Spark Core >Affects Versions: 2.0.0 >Reporter: Sital Kedia > Labels: performance > > One of our hive job which looks like this - > {code} > SELECT userid > FROM table1 a > JOIN table2 b > ONa.ds = '2016-07-15' > AND b.ds = '2016-07-15' > AND a.source_id = b.id > {code} > After upgrade to Spark 2.0 the job is significantly slow. Digging a little > into it, we found out that one of the stages produces excessive amount of > shuffle data. Please note that this is a regression from Spark 1.6. Stage 2 > of the job which used to produce 32KB shuffle data with 1.6, now produces > more than 400GB with Spark 2.0. We also tried turning off whole stage code > generation but that did not help. > PS - Even if the intermediate shuffle data size is huge, the job still > produces accurate output. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-16827) Query with Join produces excessive amount of shuffle data
[ https://issues.apache.org/jira/browse/SPARK-16827?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15401523#comment-15401523 ] Sital Kedia commented on SPARK-16827: - Actually it seems like this is a bug in shuffle write metrics calculation, actual amount of shuffle data written might be the same, because the final output is exactly the same. > Query with Join produces excessive amount of shuffle data > - > > Key: SPARK-16827 > URL: https://issues.apache.org/jira/browse/SPARK-16827 > Project: Spark > Issue Type: Bug > Components: Shuffle, Spark Core >Affects Versions: 2.0.0 >Reporter: Sital Kedia > Labels: performance > > One of our hive job which looks like this - > {code} > SELECT userid > FROM table1 a > JOIN table2 b > ONa.ds = '2016-07-15' > AND b.ds = '2016-07-15' > AND a.source_id = b.id > {code} > After upgrade to Spark 2.0 the job is significantly slow. Digging a little > into it, we found out that one of the stages produces excessive amount of > shuffle data. Please note that this is a regression from Spark 1.6. Stage 2 > of the job which used to produce 32KB shuffle data with 1.6, now produces > more than 400GB with Spark 2.0. We also tried turning off whole stage code > generation but that did not help. > PS - Even if the intermediate shuffle data size is huge, the job still > produces accurate output. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-16827) Query with Join produces excessive amount of shuffle data
[ https://issues.apache.org/jira/browse/SPARK-16827?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15401413#comment-15401413 ] Sital Kedia edited comment on SPARK-16827 at 8/1/16 12:52 AM: -- That is not the case. There is no broadcast join involved, its using shuffle join in both 1.6 and 2.0. Plan in 1.6 - {code} Project [target_id#136L AS userid#115L] +- SortMergeJoin [source_id#135L], [id#140L] :- Sort [source_id#135L ASC], false, 0 : +- TungstenExchange hashpartitioning(source_id#135L,800), None : +- ConvertToUnsafe :+- HiveTableScan [target_id#136L,source_id#135L], MetastoreRelation foo, table1, Some(a), [(ds#134 = 2016-07-15)] +- Sort [id#140L ASC], false, 0 +- TungstenExchange hashpartitioning(id#140L,800), None +- ConvertToUnsafe +- HiveTableScan [id#140L], MetastoreRelation foo, table2, Some(b), [(ds#139 = 2016-07-15)] {code} Plan in 2.0 - {code} *Project [target_id#151L AS userid#111L] +- *SortMergeJoin [source_id#150L], [id#155L], Inner :- *Sort [source_id#150L ASC], false, 0 : +- Exchange hashpartitioning(source_id#150L, 800) : +- *Filter isnotnull(source_id#150L) :+- HiveTableScan [source_id#150L, target_id#151L], MetastoreRelation foo, table1, a, [isnotnull(ds#149), (ds#149 = 2016-07-15)] +- *Sort [id#155L ASC], false, 0 +- Exchange hashpartitioning(id#155L, 800) +- *Filter isnotnull(id#155L) +- HiveTableScan [id#155L], MetastoreRelation foo, table2, b, [isnotnull(ds#154), (ds#154 = 2016-07-15)] {code} was (Author: sitalke...@gmail.com): That is not the case. There is no broadcast join involved, its using shuffle join in both 1.6 and 2.0. Plan in 1.6 - Project [target_id#136L AS userid#115L] +- SortMergeJoin [source_id#135L], [id#140L] :- Sort [source_id#135L ASC], false, 0 : +- TungstenExchange hashpartitioning(source_id#135L,800), None : +- ConvertToUnsafe :+- HiveTableScan [target_id#136L,source_id#135L], MetastoreRelation foo, table1, Some(a), [(ds#134 = 2016-07-15)] +- Sort [id#140L ASC], false, 0 +- TungstenExchange hashpartitioning(id#140L,800), None +- ConvertToUnsafe +- HiveTableScan [id#140L], MetastoreRelation foo, table2, Some(b), [(ds#139 = 2016-07-15)] Plan in 2.0 - *Project [target_id#151L AS userid#111L] +- *SortMergeJoin [source_id#150L], [id#155L], Inner :- *Sort [source_id#150L ASC], false, 0 : +- Exchange hashpartitioning(source_id#150L, 800) : +- *Filter isnotnull(source_id#150L) :+- HiveTableScan [source_id#150L, target_id#151L], MetastoreRelation foo, table1, a, [isnotnull(ds#149), (ds#149 = 2016-07-15)] +- *Sort [id#155L ASC], false, 0 +- Exchange hashpartitioning(id#155L, 800) +- *Filter isnotnull(id#155L) +- HiveTableScan [id#155L], MetastoreRelation foo, table2, b, [isnotnull(ds#154), (ds#154 = 2016-07-15)] > Query with Join produces excessive amount of shuffle data > - > > Key: SPARK-16827 > URL: https://issues.apache.org/jira/browse/SPARK-16827 > Project: Spark > Issue Type: Bug > Components: Shuffle, Spark Core >Affects Versions: 2.0.0 >Reporter: Sital Kedia > Labels: performance > > One of our hive job which looks like this - > {code} > SELECT userid > FROM table1 a > JOIN table2 b > ONa.ds = '2016-07-15' > AND b.ds = '2016-07-15' > AND a.source_id = b.id > {code} > After upgrade to Spark 2.0 the job is significantly slow. Digging a little > into it, we found out that one of the stages produces excessive amount of > shuffle data. Please note that this is a regression from Spark 1.6. Stage 2 > of the job which used to produce 32KB shuffle data with 1.6, now produces > more than 400GB with Spark 2.0. We also tried turning off whole stage code > generation but that did not help. > PS - Even if the intermediate shuffle data size is huge, the job still > produces accurate output. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-16827) Query with Join produces excessive amount of shuffle data
[ https://issues.apache.org/jira/browse/SPARK-16827?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15401413#comment-15401413 ] Sital Kedia commented on SPARK-16827: - That is not the case. There is no broadcast join involved, its using shuffle join in both 1.6 and 2.0. Plan in 1.6 - Project [target_id#136L AS userid#115L] +- SortMergeJoin [source_id#135L], [id#140L] :- Sort [source_id#135L ASC], false, 0 : +- TungstenExchange hashpartitioning(source_id#135L,800), None : +- ConvertToUnsafe :+- HiveTableScan [target_id#136L,source_id#135L], MetastoreRelation foo, table1, Some(a), [(ds#134 = 2016-07-15)] +- Sort [id#140L ASC], false, 0 +- TungstenExchange hashpartitioning(id#140L,800), None +- ConvertToUnsafe +- HiveTableScan [id#140L], MetastoreRelation foo, table2, Some(b), [(ds#139 = 2016-07-15)] Plan in 2.0 - *Project [target_id#151L AS userid#111L] +- *SortMergeJoin [source_id#150L], [id#155L], Inner :- *Sort [source_id#150L ASC], false, 0 : +- Exchange hashpartitioning(source_id#150L, 800) : +- *Filter isnotnull(source_id#150L) :+- HiveTableScan [source_id#150L, target_id#151L], MetastoreRelation foo, table1, a, [isnotnull(ds#149), (ds#149 = 2016-07-15)] +- *Sort [id#155L ASC], false, 0 +- Exchange hashpartitioning(id#155L, 800) +- *Filter isnotnull(id#155L) +- HiveTableScan [id#155L], MetastoreRelation foo, table2, b, [isnotnull(ds#154), (ds#154 = 2016-07-15)] > Query with Join produces excessive amount of shuffle data > - > > Key: SPARK-16827 > URL: https://issues.apache.org/jira/browse/SPARK-16827 > Project: Spark > Issue Type: Bug > Components: Shuffle, Spark Core >Affects Versions: 2.0.0 >Reporter: Sital Kedia > Labels: performance > > One of our hive job which looks like this - > {code} > SELECT userid > FROM table1 a > JOIN table2 b > ONa.ds = '2016-07-15' > AND b.ds = '2016-07-15' > AND a.source_id = b.id > {code} > After upgrade to Spark 2.0 the job is significantly slow. Digging a little > into it, we found out that one of the stages produces excessive amount of > shuffle data. Please note that this is a regression from Spark 1.6. Stage 2 > of the job which used to produce 32KB shuffle data with 1.6, now produces > more than 400GB with Spark 2.0. We also tried turning off whole stage code > generation but that did not help. > PS - Even if the intermediate shuffle data size is huge, the job still > produces accurate output. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-16827) Query with Join produces excessive amount of shuffle data
[ https://issues.apache.org/jira/browse/SPARK-16827?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sital Kedia updated SPARK-16827: Description: One of our hive job which looks like this - {code} SELECT userid FROM table1 a JOIN table2 b ONa.ds = '2016-07-15' AND b.ds = '2016-07-15' AND a.source_id = b.id {code} After upgrade to Spark 2.0 the job is significantly slow. Digging a little into it, we found out that one of the stages produces excessive amount of shuffle data. Please note that this is a regression from Spark 1.6. Stage 2 of the job which used to produce 32KB shuffle data with 1.6, now produces more than 400GB with Spark 2.0. We also tried turning off whole stage code generation but that did not help. PS - Even if the intermediate shuffle data size is huge, the job still produces accurate output. was: One of our hive job which looks like this - {code} SELECT userid FROM table1 a JOIN table2 b ONa.ds = '2016-07-15' AND b.ds = '2016-07-15' AND a.source_id = b.id {code} After upgrade to Spark 2.0 the job is significantly slow. Digging a little into it, we found out that one of the stages produces excessive amount of shuffle data. Please note that this is a regression from Spark 1.6. Stage 2 of the job which used to produce 32KB shuffle data with 1.6, now produces more than 400GB with Spark 2.0. We also tried turning off whole stage code generation but that did not help. > Query with Join produces excessive amount of shuffle data > - > > Key: SPARK-16827 > URL: https://issues.apache.org/jira/browse/SPARK-16827 > Project: Spark > Issue Type: Bug > Components: Shuffle, Spark Core >Affects Versions: 2.0.0 >Reporter: Sital Kedia > Labels: performance > > One of our hive job which looks like this - > {code} > SELECT userid > FROM table1 a > JOIN table2 b > ONa.ds = '2016-07-15' > AND b.ds = '2016-07-15' > AND a.source_id = b.id > {code} > After upgrade to Spark 2.0 the job is significantly slow. Digging a little > into it, we found out that one of the stages produces excessive amount of > shuffle data. Please note that this is a regression from Spark 1.6. Stage 2 > of the job which used to produce 32KB shuffle data with 1.6, now produces > more than 400GB with Spark 2.0. We also tried turning off whole stage code > generation but that did not help. > PS - Even if the intermediate shuffle data size is huge, the job still > produces accurate output. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-16827) Query with Join produces excessive amount of shuffle data
[ https://issues.apache.org/jira/browse/SPARK-16827?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15401405#comment-15401405 ] Sital Kedia commented on SPARK-16827: - [~rxin] - Any idea how to debug this issue? > Query with Join produces excessive amount of shuffle data > - > > Key: SPARK-16827 > URL: https://issues.apache.org/jira/browse/SPARK-16827 > Project: Spark > Issue Type: Bug > Components: Shuffle, Spark Core >Affects Versions: 2.0.0 >Reporter: Sital Kedia > Labels: performance > > One of our hive job which looks like this - > {code} > SELECT userid > FROM table1 a > JOIN table2 b > ONa.ds = '2016-07-15' > AND b.ds = '2016-07-15' > AND a.source_id = b.id > {code} > After upgrade to Spark 2.0 the job is significantly slow. Digging a little > into it, we found out that one of the stages produces excessive amount of > shuffle data. Please note that this is a regression from Spark 1.6. Stage 2 > of the job which used to produce 32KB shuffle data with 1.6, now produces > more than 400GB with Spark 2.0. We also tried turning off whole stage code > generation but that did not help. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-16827) Query with Join produces excessive shuffle data
[ https://issues.apache.org/jira/browse/SPARK-16827?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sital Kedia updated SPARK-16827: Description: One of our hive job which looks like this - {code} SELECT userid FROM table1 a JOIN table2 b ONa.ds = '2016-07-15' AND b.ds = '2016-07-15' AND a.source_id = b.id {code} After upgrade to Spark 2.0 the job is significantly slow. Digging a little into it, we found out that one of the stages produces excessive amount of shuffle data. Please note that this is a regression from Spark 1.6. Stage 2 of the job which used to produce 32KB shuffle data with 1.6, now produces more than 400GB with Spark 2.0. We also tried turning off whole stage code generation but that did not help. was: One of our hive job which looks like this - {code] SELECT userid FROM table1 a JOIN table2 b ONa.ds = '2016-07-15' AND b.ds = '2016-07-15' AND a.source_id = b.id {code} After upgrade to Spark 2.0 the job is significantly slow. Digging a little into it, we found out that one of the stages produces excessive amount of shuffle data. Please note that this is a regression from Spark 1.6. Stage 2 of the job which used to produce 32KB shuffle data with 1.6, now produces more than 400GB with Spark 2.0. We also tried turning off whole stage code generation but that did not help. > Query with Join produces excessive shuffle data > --- > > Key: SPARK-16827 > URL: https://issues.apache.org/jira/browse/SPARK-16827 > Project: Spark > Issue Type: Bug > Components: Shuffle, Spark Core >Affects Versions: 2.0.0 >Reporter: Sital Kedia > Labels: performance > > One of our hive job which looks like this - > {code} > SELECT userid > FROM table1 a > JOIN table2 b > ONa.ds = '2016-07-15' > AND b.ds = '2016-07-15' > AND a.source_id = b.id > {code} > After upgrade to Spark 2.0 the job is significantly slow. Digging a little > into it, we found out that one of the stages produces excessive amount of > shuffle data. Please note that this is a regression from Spark 1.6. Stage 2 > of the job which used to produce 32KB shuffle data with 1.6, now produces > more than 400GB with Spark 2.0. We also tried turning off whole stage code > generation but that did not help. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-16827) Query with Join produces excessive amount of shuffle data
[ https://issues.apache.org/jira/browse/SPARK-16827?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sital Kedia updated SPARK-16827: Summary: Query with Join produces excessive amount of shuffle data (was: Query with Join produces excessive shuffle data) > Query with Join produces excessive amount of shuffle data > - > > Key: SPARK-16827 > URL: https://issues.apache.org/jira/browse/SPARK-16827 > Project: Spark > Issue Type: Bug > Components: Shuffle, Spark Core >Affects Versions: 2.0.0 >Reporter: Sital Kedia > Labels: performance > > One of our hive job which looks like this - > {code} > SELECT userid > FROM table1 a > JOIN table2 b > ONa.ds = '2016-07-15' > AND b.ds = '2016-07-15' > AND a.source_id = b.id > {code} > After upgrade to Spark 2.0 the job is significantly slow. Digging a little > into it, we found out that one of the stages produces excessive amount of > shuffle data. Please note that this is a regression from Spark 1.6. Stage 2 > of the job which used to produce 32KB shuffle data with 1.6, now produces > more than 400GB with Spark 2.0. We also tried turning off whole stage code > generation but that did not help. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-16827) Query with Join produces excessive shuffle data
[ https://issues.apache.org/jira/browse/SPARK-16827?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sital Kedia updated SPARK-16827: Description: One of our hive job which looks like this - {code] SELECT userid FROM table1 a JOIN table2 b ONa.ds = '2016-07-15' AND b.ds = '2016-07-15' AND a.source_id = b.id {code} After upgrade to Spark 2.0 the job is significantly slow. Digging a little into it, we found out that one of the stages produces excessive amount of shuffle data. Please note that this is a regression from Spark 1.6. Stage 2 of the job which used to produce 32KB shuffle data with 1.6, now produces more than 400GB with Spark 2.0. We also tried turning off whole stage code generation but that did not help. was: One of our hive job which looks like this - SELECT userid FROM table1 a JOIN table2 b ONa.ds = '2016-07-15' AND b.ds = '2016-07-15' AND a.source_id = b.id After upgrade to Spark 2.0 the job is significantly slow. Digging a little into it, we found out that one of the stages produces excessive amount of shuffle data. Please note that this is a regression from Spark 1.6 > Query with Join produces excessive shuffle data > --- > > Key: SPARK-16827 > URL: https://issues.apache.org/jira/browse/SPARK-16827 > Project: Spark > Issue Type: Bug > Components: Shuffle, Spark Core >Affects Versions: 2.0.0 >Reporter: Sital Kedia > Labels: performance > > One of our hive job which looks like this - > {code] > SELECT userid > FROM table1 a > JOIN table2 b > ONa.ds = '2016-07-15' > AND b.ds = '2016-07-15' > AND a.source_id = b.id > {code} > After upgrade to Spark 2.0 the job is significantly slow. Digging a little > into it, we found out that one of the stages produces excessive amount of > shuffle data. Please note that this is a regression from Spark 1.6. Stage 2 > of the job which used to produce 32KB shuffle data with 1.6, now produces > more than 400GB with Spark 2.0. We also tried turning off whole stage code > generation but that did not help. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-16827) Query with Join produces excessive shuffle data
Sital Kedia created SPARK-16827: --- Summary: Query with Join produces excessive shuffle data Key: SPARK-16827 URL: https://issues.apache.org/jira/browse/SPARK-16827 Project: Spark Issue Type: Bug Components: Shuffle, Spark Core Affects Versions: 2.0.0 Reporter: Sital Kedia One of our hive job which looks like this - SELECT userid FROM table1 a JOIN table2 b ONa.ds = '2016-07-15' AND b.ds = '2016-07-15' AND a.source_id = b.id After upgrade to Spark 2.0 the job is significantly slow. Digging a little into it, we found out that one of the stages produces excessive amount of shuffle data. Please note that this is a regression from Spark 1.6 -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-15958) Make initial buffer size for the Sorter configurable
Sital Kedia created SPARK-15958: --- Summary: Make initial buffer size for the Sorter configurable Key: SPARK-15958 URL: https://issues.apache.org/jira/browse/SPARK-15958 Project: Spark Issue Type: Bug Affects Versions: 2.0.0 Reporter: Sital Kedia Currently the initial buffer size in the sorter is hard coded inside the code (https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java#L88) and is too small for large workload. As a result, the sorter spends significant time expanding the buffer size and copying the data. It would be useful to have it configurable. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-15569) Executors spending significant time in DiskObjectWriter.updateBytesWritten function
Sital Kedia created SPARK-15569: --- Summary: Executors spending significant time in DiskObjectWriter.updateBytesWritten function Key: SPARK-15569 URL: https://issues.apache.org/jira/browse/SPARK-15569 Project: Spark Issue Type: Bug Components: Shuffle Reporter: Sital Kedia Profiling a Spark job spilling large amount of intermediate data we found that significant portion of time is being spent in DiskObjectWriter.updateBytesWritten function. Looking at the code (https://github.com/sitalkedia/spark/blob/master/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala#L206), we see that the function is being called too frequently to update the number of bytes written to disk. We should reduce the frequency to avoid this. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-15391) Spark executor OOM during TimSort
[ https://issues.apache.org/jira/browse/SPARK-15391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15289791#comment-15289791 ] Sital Kedia commented on SPARK-15391: - cc- [~davies] - Any idea how to fix this issue? > Spark executor OOM during TimSort > - > > Key: SPARK-15391 > URL: https://issues.apache.org/jira/browse/SPARK-15391 > Project: Spark > Issue Type: Bug >Reporter: Sital Kedia > > While running a query, we are seeing a lot of executor OOM while doing > TimSort. > Stack trace - > {code} > at > org.apache.spark.util.collection.unsafe.sort.UnsafeSortDataFormat.allocate(UnsafeSortDataFormat.java:86) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeSortDataFormat.allocate(UnsafeSortDataFormat.java:32) > at > org.apache.spark.util.collection.TimSort$SortState.ensureCapacity(TimSort.java:951) > at > org.apache.spark.util.collection.TimSort$SortState.mergeLo(TimSort.java:699) > at > org.apache.spark.util.collection.TimSort$SortState.mergeAt(TimSort.java:525) > at > org.apache.spark.util.collection.TimSort$SortState.mergeCollapse(TimSort.java:453) > at > org.apache.spark.util.collection.TimSort$SortState.access$200(TimSort.java:325) > at org.apache.spark.util.collection.TimSort.sort(TimSort.java:153) > at org.apache.spark.util.collection.Sorter.sort(Sorter.scala:37) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.getSortedIterator(UnsafeInMemorySorter.java:235) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:198) > at org.apache.spark.memory.MemoryConsumer.spill(MemoryConsumer.java:58) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:356) > at > org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:91) > {code} > Out of total 32g available to the executors, we are allocating 24g onheap and > 8g onheap memory. Looking at the code > (https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java#L87), > we see that during TimSort we are allocating the memory buffer onheap > irrespective of memory mode that is reason of executor OOM. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-15391) Spark executor OOM during TimSort
[ https://issues.apache.org/jira/browse/SPARK-15391?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sital Kedia updated SPARK-15391: Description: While running a query, we are seeing a lot of executor OOM while doing TimSort. Stack trace - {code} at org.apache.spark.util.collection.unsafe.sort.UnsafeSortDataFormat.allocate(UnsafeSortDataFormat.java:86) at org.apache.spark.util.collection.unsafe.sort.UnsafeSortDataFormat.allocate(UnsafeSortDataFormat.java:32) at org.apache.spark.util.collection.TimSort$SortState.ensureCapacity(TimSort.java:951) at org.apache.spark.util.collection.TimSort$SortState.mergeLo(TimSort.java:699) at org.apache.spark.util.collection.TimSort$SortState.mergeAt(TimSort.java:525) at org.apache.spark.util.collection.TimSort$SortState.mergeCollapse(TimSort.java:453) at org.apache.spark.util.collection.TimSort$SortState.access$200(TimSort.java:325) at org.apache.spark.util.collection.TimSort.sort(TimSort.java:153) at org.apache.spark.util.collection.Sorter.sort(Sorter.scala:37) at org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.getSortedIterator(UnsafeInMemorySorter.java:235) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:198) at org.apache.spark.memory.MemoryConsumer.spill(MemoryConsumer.java:58) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:356) at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:91) {code} Out of total 32g available to the executors, we are allocating 24g onheap and 8g onheap memory. Looking at the code (https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java#L87), we see that during TimSort we are allocating the memory buffer onheap irrespective of memory mode that is reason of executor OOM. was: While running a query, we are seeing a lot of executor OOM while doing TimSort. Stack trace - at org.apache.spark.util.collection.unsafe.sort.UnsafeSortDataFormat.allocate(UnsafeSortDataFormat.java:86) at org.apache.spark.util.collection.unsafe.sort.UnsafeSortDataFormat.allocate(UnsafeSortDataFormat.java:32) at org.apache.spark.util.collection.TimSort$SortState.ensureCapacity(TimSort.java:951) at org.apache.spark.util.collection.TimSort$SortState.mergeLo(TimSort.java:699) at org.apache.spark.util.collection.TimSort$SortState.mergeAt(TimSort.java:525) at org.apache.spark.util.collection.TimSort$SortState.mergeCollapse(TimSort.java:453) at org.apache.spark.util.collection.TimSort$SortState.access$200(TimSort.java:325) at org.apache.spark.util.collection.TimSort.sort(TimSort.java:153) at org.apache.spark.util.collection.Sorter.sort(Sorter.scala:37) at org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.getSortedIterator(UnsafeInMemorySorter.java:235) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:198) at org.apache.spark.memory.MemoryConsumer.spill(MemoryConsumer.java:58) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:356) at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:91) Out of total 32g available to the executors, we are allocating 24g onheap and 8g onheap memory. Looking at the code (https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java#L87), we see that during TimSort we are allocating the memory buffer onheap irrespective of memory mode that is reason of executor OOM. > Spark executor OOM during TimSort > - > > Key: SPARK-15391 > URL: https://issues.apache.org/jira/browse/SPARK-15391 > Project: Spark > Issue Type: Bug >Reporter: Sital Kedia > > While running a query, we are seeing a lot of executor OOM while doing > TimSort. > Stack trace - > {code} > at > org.apache.spark.util.collection.unsafe.sort.UnsafeSortDataFormat.allocate(UnsafeSortDataFormat.java:86) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeSortDataFormat.allocate(UnsafeSortDataFormat.java:32) > at > org.apache.spark.util.collection.TimSort$SortState.ensureCapacity(TimSort.java:951) > at > org.apache.spark.util.collection.TimSort$SortState.mergeLo(TimSort.java:699) > at > org.apache.spark.util.collection.TimSort$SortState.mergeAt(TimSort.java:525) > at > org.apache.spark.util.collection.TimSort$SortState.mergeCollapse(TimSort.java:453) >
[jira] [Updated] (SPARK-15391) Spark executor OOM during TimSort
[ https://issues.apache.org/jira/browse/SPARK-15391?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sital Kedia updated SPARK-15391: Description: While running a query, we are seeing a lot of executor OOM while doing TimSort. Stack trace - at org.apache.spark.util.collection.unsafe.sort.UnsafeSortDataFormat.allocate(UnsafeSortDataFormat.java:86) at org.apache.spark.util.collection.unsafe.sort.UnsafeSortDataFormat.allocate(UnsafeSortDataFormat.java:32) at org.apache.spark.util.collection.TimSort$SortState.ensureCapacity(TimSort.java:951) at org.apache.spark.util.collection.TimSort$SortState.mergeLo(TimSort.java:699) at org.apache.spark.util.collection.TimSort$SortState.mergeAt(TimSort.java:525) at org.apache.spark.util.collection.TimSort$SortState.mergeCollapse(TimSort.java:453) at org.apache.spark.util.collection.TimSort$SortState.access$200(TimSort.java:325) at org.apache.spark.util.collection.TimSort.sort(TimSort.java:153) at org.apache.spark.util.collection.Sorter.sort(Sorter.scala:37) at org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.getSortedIterator(UnsafeInMemorySorter.java:235) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:198) at org.apache.spark.memory.MemoryConsumer.spill(MemoryConsumer.java:58) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:356) at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:91) Out of total 32g available to the executors, we are allocating 24g onheap and 8g onheap memory. Looking at the code (https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java#L87), we see that during TimSort we are allocating the memory buffer onheap irrespective of memory mode that is reason of executor OOM. > Spark executor OOM during TimSort > - > > Key: SPARK-15391 > URL: https://issues.apache.org/jira/browse/SPARK-15391 > Project: Spark > Issue Type: Bug >Reporter: Sital Kedia > > While running a query, we are seeing a lot of executor OOM while doing > TimSort. > Stack trace - > at > org.apache.spark.util.collection.unsafe.sort.UnsafeSortDataFormat.allocate(UnsafeSortDataFormat.java:86) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeSortDataFormat.allocate(UnsafeSortDataFormat.java:32) > at > org.apache.spark.util.collection.TimSort$SortState.ensureCapacity(TimSort.java:951) > at > org.apache.spark.util.collection.TimSort$SortState.mergeLo(TimSort.java:699) > at > org.apache.spark.util.collection.TimSort$SortState.mergeAt(TimSort.java:525) > at > org.apache.spark.util.collection.TimSort$SortState.mergeCollapse(TimSort.java:453) > at > org.apache.spark.util.collection.TimSort$SortState.access$200(TimSort.java:325) > at org.apache.spark.util.collection.TimSort.sort(TimSort.java:153) > at org.apache.spark.util.collection.Sorter.sort(Sorter.scala:37) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.getSortedIterator(UnsafeInMemorySorter.java:235) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:198) > at org.apache.spark.memory.MemoryConsumer.spill(MemoryConsumer.java:58) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:356) > at > org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:91) > Out of total 32g available to the executors, we are allocating 24g onheap and > 8g onheap memory. Looking at the code > (https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java#L87), > we see that during TimSort we are allocating the memory buffer onheap > irrespective of memory mode that is reason of executor OOM. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-15391) Spark executor OOM during TimSort
Sital Kedia created SPARK-15391: --- Summary: Spark executor OOM during TimSort Key: SPARK-15391 URL: https://issues.apache.org/jira/browse/SPARK-15391 Project: Spark Issue Type: Bug Reporter: Sital Kedia -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-13850) TimSort Comparison method violates its general contract
[ https://issues.apache.org/jira/browse/SPARK-13850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15286906#comment-15286906 ] Sital Kedia commented on SPARK-13850: - I am not 100% sure of the root cause, but I suspect this is happening when JVM is trying to allocate very large size buffer for pointer array. The issue might be because the JVM is not able to allocate large buffer in contiguous memory location on heap and since the unsafe operations assume contiguous memory location of the objects, any unsafe operation on large buffer results in memory corruption which manifests as TimSort issue. > TimSort Comparison method violates its general contract > --- > > Key: SPARK-13850 > URL: https://issues.apache.org/jira/browse/SPARK-13850 > Project: Spark > Issue Type: Bug > Components: Shuffle >Affects Versions: 1.6.0 >Reporter: Sital Kedia > > While running a query which does a group by on a large dataset, the query > fails with following stack trace. > {code} > Job aborted due to stage failure: Task 4077 in stage 1.3 failed 4 times, most > recent failure: Lost task 4077.3 in stage 1.3 (TID 88702, > hadoop3030.prn2.facebook.com): java.lang.IllegalArgumentException: Comparison > method violates its general contract! > at > org.apache.spark.util.collection.TimSort$SortState.mergeLo(TimSort.java:794) > at > org.apache.spark.util.collection.TimSort$SortState.mergeAt(TimSort.java:525) > at > org.apache.spark.util.collection.TimSort$SortState.mergeCollapse(TimSort.java:453) > at > org.apache.spark.util.collection.TimSort$SortState.access$200(TimSort.java:325) > at org.apache.spark.util.collection.TimSort.sort(TimSort.java:153) > at org.apache.spark.util.collection.Sorter.sort(Sorter.scala:37) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.getSortedIterator(UnsafeInMemorySorter.java:228) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:186) > at > org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:175) > at > org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:249) > at > org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:112) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:318) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:333) > at > org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:91) > at > org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:168) > at org.apache.spark.sql.execution.Sort$$anonfun$1.apply(Sort.scala:90) > at org.apache.spark.sql.execution.Sort$$anonfun$1.apply(Sort.scala:64) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:728) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:728) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) > at org.apache.spark.scheduler.Task.run(Task.scala:89) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > Please note that the same query used t
[jira] [Commented] (SPARK-13850) TimSort Comparison method violates its general contract
[ https://issues.apache.org/jira/browse/SPARK-13850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15283110#comment-15283110 ] Sital Kedia commented on SPARK-13850: - I have found a workaround for this issue. Please take a look at the PR. > TimSort Comparison method violates its general contract > --- > > Key: SPARK-13850 > URL: https://issues.apache.org/jira/browse/SPARK-13850 > Project: Spark > Issue Type: Bug > Components: Shuffle >Affects Versions: 1.6.0 >Reporter: Sital Kedia > > While running a query which does a group by on a large dataset, the query > fails with following stack trace. > {code} > Job aborted due to stage failure: Task 4077 in stage 1.3 failed 4 times, most > recent failure: Lost task 4077.3 in stage 1.3 (TID 88702, > hadoop3030.prn2.facebook.com): java.lang.IllegalArgumentException: Comparison > method violates its general contract! > at > org.apache.spark.util.collection.TimSort$SortState.mergeLo(TimSort.java:794) > at > org.apache.spark.util.collection.TimSort$SortState.mergeAt(TimSort.java:525) > at > org.apache.spark.util.collection.TimSort$SortState.mergeCollapse(TimSort.java:453) > at > org.apache.spark.util.collection.TimSort$SortState.access$200(TimSort.java:325) > at org.apache.spark.util.collection.TimSort.sort(TimSort.java:153) > at org.apache.spark.util.collection.Sorter.sort(Sorter.scala:37) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.getSortedIterator(UnsafeInMemorySorter.java:228) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:186) > at > org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:175) > at > org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:249) > at > org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:112) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:318) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:333) > at > org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:91) > at > org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:168) > at org.apache.spark.sql.execution.Sort$$anonfun$1.apply(Sort.scala:90) > at org.apache.spark.sql.execution.Sort$$anonfun$1.apply(Sort.scala:64) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:728) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:728) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) > at org.apache.spark.scheduler.Task.run(Task.scala:89) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > Please note that the same query used to succeed in Spark 1.5 so it seems like > a regression in 1.6. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-15233) Spark task metrics should include hdfs read write latency
[ https://issues.apache.org/jira/browse/SPARK-15233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sital Kedia updated SPARK-15233: Affects Version/s: 1.6.1 Priority: Minor (was: Major) Description: Currently the Spark task metrics does not have hdfs read/write latency. It will be very useful to have those to find the bottleneck in the query. Issue Type: Improvement (was: Bug) Summary: Spark task metrics should include hdfs read write latency (was: Spark UI should show metrics for hdfs read write latency) > Spark task metrics should include hdfs read write latency > - > > Key: SPARK-15233 > URL: https://issues.apache.org/jira/browse/SPARK-15233 > Project: Spark > Issue Type: Improvement >Affects Versions: 1.6.1 >Reporter: Sital Kedia >Priority: Minor > > Currently the Spark task metrics does not have hdfs read/write latency. It > will be very useful to have those to find the bottleneck in the query. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-15233) Spark UI should show metrics for hdfs read write latency
Sital Kedia created SPARK-15233: --- Summary: Spark UI should show metrics for hdfs read write latency Key: SPARK-15233 URL: https://issues.apache.org/jira/browse/SPARK-15233 Project: Spark Issue Type: Bug Reporter: Sital Kedia -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org