[ 
https://issues.apache.org/jira/browse/SPARK-14318?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Trystan Leftwich updated SPARK-14318:
-------------------------------------
    Description: 
TPCDS Q14 parses successfully, and plans created successfully. Spark tries to 
run (I used only 1GB text file), but "hangs". Tasks are extremely slow to 
process AND all CPUs are used 100% by the executor JVMs.

It is very easy to reproduce:
1. Use the spark-sql CLI to run the query 14 (TPCDS) against a database of 1GB 
text file (assuming you know how to generate the csv data). My command is like 
this:

{noformat}
/TestAutomation/downloads/spark-master/bin/spark-sql  --driver-memory 10g 
--verbose --master yarn-client --packages com.databricks:spark-csv_2.10:1.3.0 
--executor-memory 8g --num-executors 4 --executor-cores 4 --conf 
spark.sql.join.preferSortMergeJoin=true --database hadoopds1g -f $f > 
q14.o{noformat}

The Spark console output:
{noformat}
16/03/31 15:45:37 INFO scheduler.TaskSetManager: Starting task 26.0 in stage 
17.0 (TID 65, bigaperf138.svl.ibm.com, partition 26,RACK_LOCAL, 4515 bytes)
16/03/31 15:45:37 INFO cluster.YarnClientSchedulerBackend: Launching task 65 on 
executor id: 4 hostname: bigaperf138.svl.ibm.com.
16/03/31 15:45:37 INFO scheduler.TaskSetManager: Finished task 23.0 in stage 
17.0 (TID 62) in 829687 ms on bigaperf138.svl.ibm.com (15/200)
16/03/31 15:45:52 INFO scheduler.TaskSetManager: Starting task 27.0 in stage 
17.0 (TID 66, bigaperf138.svl.ibm.com, partition 27,RACK_LOCAL, 4515 bytes)
16/03/31 15:45:52 INFO cluster.YarnClientSchedulerBackend: Launching task 66 on 
executor id: 4 hostname: bigaperf138.svl.ibm.com.
16/03/31 15:45:52 INFO scheduler.TaskSetManager: Finished task 26.0 in stage 
17.0 (TID 65) in 15505 ms on bigaperf138.svl.ibm.com (16/200)
16/03/31 15:46:17 INFO scheduler.TaskSetManager: Starting task 28.0 in stage 
17.0 (TID 67, bigaperf138.svl.ibm.com, partition 28,RACK_LOCAL, 4515 bytes)
16/03/31 15:46:17 INFO cluster.YarnClientSchedulerBackend: Launching task 67 on 
executor id: 4 hostname: bigaperf138.svl.ibm.com.
16/03/31 15:46:17 INFO scheduler.TaskSetManager: Finished task 27.0 in stage 
17.0 (TID 66) in 24929 ms on bigaperf138.svl.ibm.com (17/200)
16/03/31 15:51:53 INFO scheduler.TaskSetManager: Starting task 29.0 in stage 
17.0 (TID 68, bigaperf137.svl.ibm.com, partition 29,NODE_LOCAL, 4515 bytes)
16/03/31 15:51:53 INFO cluster.YarnClientSchedulerBackend: Launching task 68 on 
executor id: 2 hostname: bigaperf137.svl.ibm.com.
16/03/31 15:51:53 INFO scheduler.TaskSetManager: Finished task 10.0 in stage 
17.0 (TID 47) in 3775585 ms on bigaperf137.svl.ibm.com (18/200)
{noformat}

Notice that time durations between tasks are unusually long: 2~5 minutes.

When looking at the Linux 'perf' tool, two top CPU consumers are:
    86.48%        java  [unknown]   
    12.41%        libjvm.so

Using the Java hotspot profiling tools, I am able to show what hotspot methods 
are (top 5):
{noformat}
org.apache.spark.storage.DiskBlockObjectWriter.updateBytesWritten()     
46.845276       9,654,179 ms (46.8%)    9,654,179 ms    9,654,179 ms    
9,654,179 ms
org.apache.spark.unsafe.Platform.copyMemory()   18.631157       3,848,442 ms 
(18.6%)    3,848,442 ms    3,848,442 ms    3,848,442 ms
org.apache.spark.util.collection.CompactBuffer.$plus$eq()       6.8570185       
1,418,411 ms (6.9%)     1,418,411 ms    1,517,960 ms    1,517,960 ms
org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2.writeValue() 
4.6126328       955,495 ms (4.6%)       955,495 ms      2,153,910 ms    
2,153,910 ms
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write()      
4.581077        949,930 ms (4.6%)       949,930 ms      19,967,510 ms   
19,967,510 ms
{noformat}
So as you can see, the test has been running for 1.5 hours...with 46% CPU spent 
in the 
org.apache.spark.storage.DiskBlockObjectWriter.updateBytesWritten() method. 

The stacks for top two are:
{noformat}
Marshalling     
I
java/io/DataOutputStream.writeInt() line 197
org.​apache.​spark.​sql 
I
org/apache/spark/sql/execution/UnsafeRowSerializerInstance$$anon$2.writeValue() 
line 60
org.​apache.​spark.​storage     
I
org/apache/spark/storage/DiskBlockObjectWriter.write() line 185
org.​apache.​spark.​shuffle     
I
org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.write() line 150
org.​apache.​spark.​scheduler   
I
org/apache/spark/scheduler/ShuffleMapTask.runTask() line 78
I
org/apache/spark/scheduler/ShuffleMapTask.runTask() line 46
I
org/apache/spark/scheduler/Task.run() line 82
org.​apache.​spark.​executor    
I
org/apache/spark/executor/Executor$TaskRunner.run() line 231
Dispatching Overhead,​ Standard Library Worker Dispatching      
I
java/util/concurrent/ThreadPoolExecutor.runWorker() line 1142
I
java/util/concurrent/ThreadPoolExecutor$Worker.run() line 617
I
java/lang/Thread.run() line 745
{noformat}

and 

{noformat}
org.​apache.​spark.​unsafe      
I
org/apache/spark/unsafe/Platform.copyMemory() line 172
org.​apache.​spark.​sql 
I
org/apache/spark/sql/catalyst/expressions/UnsafeRow.copy() line 504
Class Loading   
I
org/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIterator.processNext()
org.​apache.​spark.​sql 
I
org/apache/spark/sql/execution/BufferedRowIterator.hasNext() line 41
I
org/apache/spark/sql/execution/WholeStageCodegen$$anonfun$doExecute$2$$anon$2.hasNext()
 line 375
scala.​collection       
I
scala/collection/Iterator$$anon$11.hasNext() line 369
org.​apache.​spark.​shuffle     
I
org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.write() line 147
org.​apache.​spark.​scheduler   
I
org/apache/spark/scheduler/ShuffleMapTask.runTask() line 78
I
org/apache/spark/scheduler/ShuffleMapTask.runTask() line 46
I
org/apache/spark/scheduler/Task.run() line 82
org.​apache.​spark.​executor    
I
org/apache/spark/executor/Executor$TaskRunner.run() line 231
Dispatching Overhead,​ Standard Library Worker Dispatching      
I
java/util/concurrent/ThreadPoolExecutor.runWorker() line 1142
I
java/util/concurrent/ThreadPoolExecutor$Worker.run() line 617
I
java/lang/Thread.run() line 745
{noformat}



  was:
TPCDS Q14 parses successfully, and plans created successfully. Spark tries to 
run (I used only 1GB text file), but "hangs". Tasks are extremely slow to 
process AND all CPUs are used 100% by the executor JVMs.

It is very easy to reproduce:
1. Use the spark-sql CLI to run the query 14 (TPCDS) against a database of 1GB 
text file (assuming you know how to generate the csv data). My command is like 
this:

{noformat}
/TestAutomation/downloads/spark-master/bin/spark-sql  --driver-memory 10g 
--verbose --master yarn-client --packages com.databricks:spark-csv_2.10:1.3.0 
--executor-memory 8g --num-executors 4 --executor-cores 4 --conf 
spark.sql.join.preferSortMergeJoin=true --database hadoopds1g -f $f > q14.out
{noformat}

The Spark console output:
{noformat}
16/03/31 15:45:37 INFO scheduler.TaskSetManager: Starting task 26.0 in stage 
17.0 (TID 65, bigaperf138.svl.ibm.com, partition 26,RACK_LOCAL, 4515 bytes)
16/03/31 15:45:37 INFO cluster.YarnClientSchedulerBackend: Launching task 65 on 
executor id: 4 hostname: bigaperf138.svl.ibm.com.
16/03/31 15:45:37 INFO scheduler.TaskSetManager: Finished task 23.0 in stage 
17.0 (TID 62) in 829687 ms on bigaperf138.svl.ibm.com (15/200)
16/03/31 15:45:52 INFO scheduler.TaskSetManager: Starting task 27.0 in stage 
17.0 (TID 66, bigaperf138.svl.ibm.com, partition 27,RACK_LOCAL, 4515 bytes)
16/03/31 15:45:52 INFO cluster.YarnClientSchedulerBackend: Launching task 66 on 
executor id: 4 hostname: bigaperf138.svl.ibm.com.
16/03/31 15:45:52 INFO scheduler.TaskSetManager: Finished task 26.0 in stage 
17.0 (TID 65) in 15505 ms on bigaperf138.svl.ibm.com (16/200)
16/03/31 15:46:17 INFO scheduler.TaskSetManager: Starting task 28.0 in stage 
17.0 (TID 67, bigaperf138.svl.ibm.com, partition 28,RACK_LOCAL, 4515 bytes)
16/03/31 15:46:17 INFO cluster.YarnClientSchedulerBackend: Launching task 67 on 
executor id: 4 hostname: bigaperf138.svl.ibm.com.
16/03/31 15:46:17 INFO scheduler.TaskSetManager: Finished task 27.0 in stage 
17.0 (TID 66) in 24929 ms on bigaperf138.svl.ibm.com (17/200)
16/03/31 15:51:53 INFO scheduler.TaskSetManager: Starting task 29.0 in stage 
17.0 (TID 68, bigaperf137.svl.ibm.com, partition 29,NODE_LOCAL, 4515 bytes)
16/03/31 15:51:53 INFO cluster.YarnClientSchedulerBackend: Launching task 68 on 
executor id: 2 hostname: bigaperf137.svl.ibm.com.
16/03/31 15:51:53 INFO scheduler.TaskSetManager: Finished task 10.0 in stage 
17.0 (TID 47) in 3775585 ms on bigaperf137.svl.ibm.com (18/200)
{noformat}

Notice that time durations between tasks are unusually long: 2~5 minutes.

When looking at the Linux 'perf' tool, two top CPU consumers are:
    86.48%        java  [unknown]   
    12.41%        libjvm.so

Using the Java hotspot profiling tools, I am able to show what hotspot methods 
are (top 5):
{noformat}
org.apache.spark.storage.DiskBlockObjectWriter.updateBytesWritten()     
46.845276       9,654,179 ms (46.8%)    9,654,179 ms    9,654,179 ms    
9,654,179 ms
org.apache.spark.unsafe.Platform.copyMemory()   18.631157       3,848,442 ms 
(18.6%)    3,848,442 ms    3,848,442 ms    3,848,442 ms
org.apache.spark.util.collection.CompactBuffer.$plus$eq()       6.8570185       
1,418,411 ms (6.9%)     1,418,411 ms    1,517,960 ms    1,517,960 ms
org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2.writeValue() 
4.6126328       955,495 ms (4.6%)       955,495 ms      2,153,910 ms    
2,153,910 ms
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write()      
4.581077        949,930 ms (4.6%)       949,930 ms      19,967,510 ms   
19,967,510 ms
{noformat}
So as you can see, the test has been running for 1.5 hours...with 46% CPU spent 
in the 
org.apache.spark.storage.DiskBlockObjectWriter.updateBytesWritten() method. 

The stacks for top two are:
{noformat}
Marshalling     
I
java/io/DataOutputStream.writeInt() line 197
org.​apache.​spark.​sql 
I
org/apache/spark/sql/execution/UnsafeRowSerializerInstance$$anon$2.writeValue() 
line 60
org.​apache.​spark.​storage     
I
org/apache/spark/storage/DiskBlockObjectWriter.write() line 185
org.​apache.​spark.​shuffle     
I
org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.write() line 150
org.​apache.​spark.​scheduler   
I
org/apache/spark/scheduler/ShuffleMapTask.runTask() line 78
I
org/apache/spark/scheduler/ShuffleMapTask.runTask() line 46
I
org/apache/spark/scheduler/Task.run() line 82
org.​apache.​spark.​executor    
I
org/apache/spark/executor/Executor$TaskRunner.run() line 231
Dispatching Overhead,​ Standard Library Worker Dispatching      
I
java/util/concurrent/ThreadPoolExecutor.runWorker() line 1142
I
java/util/concurrent/ThreadPoolExecutor$Worker.run() line 617
I
java/lang/Thread.run() line 745
{noformat}

and 

{noformat}
org.​apache.​spark.​unsafe      
I
org/apache/spark/unsafe/Platform.copyMemory() line 172
org.​apache.​spark.​sql 
I
org/apache/spark/sql/catalyst/expressions/UnsafeRow.copy() line 504
Class Loading   
I
org/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIterator.processNext()
org.​apache.​spark.​sql 
I
org/apache/spark/sql/execution/BufferedRowIterator.hasNext() line 41
I
org/apache/spark/sql/execution/WholeStageCodegen$$anonfun$doExecute$2$$anon$2.hasNext()
 line 375
scala.​collection       
I
scala/collection/Iterator$$anon$11.hasNext() line 369
org.​apache.​spark.​shuffle     
I
org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.write() line 147
org.​apache.​spark.​scheduler   
I
org/apache/spark/scheduler/ShuffleMapTask.runTask() line 78
I
org/apache/spark/scheduler/ShuffleMapTask.runTask() line 46
I
org/apache/spark/scheduler/Task.run() line 82
org.​apache.​spark.​executor    
I
org/apache/spark/executor/Executor$TaskRunner.run() line 231
Dispatching Overhead,​ Standard Library Worker Dispatching      
I
java/util/concurrent/ThreadPoolExecutor.runWorker() line 1142
I
java/util/concurrent/ThreadPoolExecutor$Worker.run() line 617
I
java/lang/Thread.run() line 745
{noformat}




> TPCDS query 14 causes Spark SQL to hang
> ---------------------------------------
>
>                 Key: SPARK-14318
>                 URL: https://issues.apache.org/jira/browse/SPARK-14318
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.6.0, 2.0.0
>            Reporter: JESSE CHEN
>              Labels: hangs
>         Attachments: threaddump-1459461915668.tdump
>
>
> TPCDS Q14 parses successfully, and plans created successfully. Spark tries to 
> run (I used only 1GB text file), but "hangs". Tasks are extremely slow to 
> process AND all CPUs are used 100% by the executor JVMs.
> It is very easy to reproduce:
> 1. Use the spark-sql CLI to run the query 14 (TPCDS) against a database of 
> 1GB text file (assuming you know how to generate the csv data). My command is 
> like this:
> {noformat}
> /TestAutomation/downloads/spark-master/bin/spark-sql  --driver-memory 10g 
> --verbose --master yarn-client --packages com.databricks:spark-csv_2.10:1.3.0 
> --executor-memory 8g --num-executors 4 --executor-cores 4 --conf 
> spark.sql.join.preferSortMergeJoin=true --database hadoopds1g -f $f > 
> q14.o{noformat}
> The Spark console output:
> {noformat}
> 16/03/31 15:45:37 INFO scheduler.TaskSetManager: Starting task 26.0 in stage 
> 17.0 (TID 65, bigaperf138.svl.ibm.com, partition 26,RACK_LOCAL, 4515 bytes)
> 16/03/31 15:45:37 INFO cluster.YarnClientSchedulerBackend: Launching task 65 
> on executor id: 4 hostname: bigaperf138.svl.ibm.com.
> 16/03/31 15:45:37 INFO scheduler.TaskSetManager: Finished task 23.0 in stage 
> 17.0 (TID 62) in 829687 ms on bigaperf138.svl.ibm.com (15/200)
> 16/03/31 15:45:52 INFO scheduler.TaskSetManager: Starting task 27.0 in stage 
> 17.0 (TID 66, bigaperf138.svl.ibm.com, partition 27,RACK_LOCAL, 4515 bytes)
> 16/03/31 15:45:52 INFO cluster.YarnClientSchedulerBackend: Launching task 66 
> on executor id: 4 hostname: bigaperf138.svl.ibm.com.
> 16/03/31 15:45:52 INFO scheduler.TaskSetManager: Finished task 26.0 in stage 
> 17.0 (TID 65) in 15505 ms on bigaperf138.svl.ibm.com (16/200)
> 16/03/31 15:46:17 INFO scheduler.TaskSetManager: Starting task 28.0 in stage 
> 17.0 (TID 67, bigaperf138.svl.ibm.com, partition 28,RACK_LOCAL, 4515 bytes)
> 16/03/31 15:46:17 INFO cluster.YarnClientSchedulerBackend: Launching task 67 
> on executor id: 4 hostname: bigaperf138.svl.ibm.com.
> 16/03/31 15:46:17 INFO scheduler.TaskSetManager: Finished task 27.0 in stage 
> 17.0 (TID 66) in 24929 ms on bigaperf138.svl.ibm.com (17/200)
> 16/03/31 15:51:53 INFO scheduler.TaskSetManager: Starting task 29.0 in stage 
> 17.0 (TID 68, bigaperf137.svl.ibm.com, partition 29,NODE_LOCAL, 4515 bytes)
> 16/03/31 15:51:53 INFO cluster.YarnClientSchedulerBackend: Launching task 68 
> on executor id: 2 hostname: bigaperf137.svl.ibm.com.
> 16/03/31 15:51:53 INFO scheduler.TaskSetManager: Finished task 10.0 in stage 
> 17.0 (TID 47) in 3775585 ms on bigaperf137.svl.ibm.com (18/200)
> {noformat}
> Notice that time durations between tasks are unusually long: 2~5 minutes.
> When looking at the Linux 'perf' tool, two top CPU consumers are:
>     86.48%        java  [unknown]   
>     12.41%        libjvm.so
> Using the Java hotspot profiling tools, I am able to show what hotspot 
> methods are (top 5):
> {noformat}
> org.apache.spark.storage.DiskBlockObjectWriter.updateBytesWritten()   
> 46.845276       9,654,179 ms (46.8%)    9,654,179 ms    9,654,179 ms    
> 9,654,179 ms
> org.apache.spark.unsafe.Platform.copyMemory() 18.631157       3,848,442 ms 
> (18.6%)    3,848,442 ms    3,848,442 ms    3,848,442 ms
> org.apache.spark.util.collection.CompactBuffer.$plus$eq()     6.8570185       
> 1,418,411 ms (6.9%)     1,418,411 ms    1,517,960 ms    1,517,960 ms
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2.writeValue()
>        4.6126328       955,495 ms (4.6%)       955,495 ms      2,153,910 ms   
>  2,153,910 ms
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write()    
> 4.581077        949,930 ms (4.6%)       949,930 ms      19,967,510 ms   
> 19,967,510 ms
> {noformat}
> So as you can see, the test has been running for 1.5 hours...with 46% CPU 
> spent in the 
> org.apache.spark.storage.DiskBlockObjectWriter.updateBytesWritten() method. 
> The stacks for top two are:
> {noformat}
> Marshalling   
> I
> java/io/DataOutputStream.writeInt() line 197
> org.​apache.​spark.​sql       
> I
> org/apache/spark/sql/execution/UnsafeRowSerializerInstance$$anon$2.writeValue()
>  line 60
> org.​apache.​spark.​storage   
> I
> org/apache/spark/storage/DiskBlockObjectWriter.write() line 185
> org.​apache.​spark.​shuffle   
> I
> org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.write() line 150
> org.​apache.​spark.​scheduler 
> I
> org/apache/spark/scheduler/ShuffleMapTask.runTask() line 78
> I
> org/apache/spark/scheduler/ShuffleMapTask.runTask() line 46
> I
> org/apache/spark/scheduler/Task.run() line 82
> org.​apache.​spark.​executor  
> I
> org/apache/spark/executor/Executor$TaskRunner.run() line 231
> Dispatching Overhead,​ Standard Library Worker Dispatching    
> I
> java/util/concurrent/ThreadPoolExecutor.runWorker() line 1142
> I
> java/util/concurrent/ThreadPoolExecutor$Worker.run() line 617
> I
> java/lang/Thread.run() line 745
> {noformat}
> and 
> {noformat}
> org.​apache.​spark.​unsafe    
> I
> org/apache/spark/unsafe/Platform.copyMemory() line 172
> org.​apache.​spark.​sql       
> I
> org/apache/spark/sql/catalyst/expressions/UnsafeRow.copy() line 504
> Class Loading 
> I
> org/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIterator.processNext()
> org.​apache.​spark.​sql       
> I
> org/apache/spark/sql/execution/BufferedRowIterator.hasNext() line 41
> I
> org/apache/spark/sql/execution/WholeStageCodegen$$anonfun$doExecute$2$$anon$2.hasNext()
>  line 375
> scala.​collection     
> I
> scala/collection/Iterator$$anon$11.hasNext() line 369
> org.​apache.​spark.​shuffle   
> I
> org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.write() line 147
> org.​apache.​spark.​scheduler 
> I
> org/apache/spark/scheduler/ShuffleMapTask.runTask() line 78
> I
> org/apache/spark/scheduler/ShuffleMapTask.runTask() line 46
> I
> org/apache/spark/scheduler/Task.run() line 82
> org.​apache.​spark.​executor  
> I
> org/apache/spark/executor/Executor$TaskRunner.run() line 231
> Dispatching Overhead,​ Standard Library Worker Dispatching    
> I
> java/util/concurrent/ThreadPoolExecutor.runWorker() line 1142
> I
> java/util/concurrent/ThreadPoolExecutor$Worker.run() line 617
> I
> java/lang/Thread.run() line 745
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to