[ 
https://issues.apache.org/jira/browse/SPARK-19578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Artur Sukhenko updated SPARK-19578:
-----------------------------------
    Description: 
Simple job in pyspark takes 14 minutes to complete.
The text file used to reproduce contains multiple millions lines of one word 
"yes"
 (it might be the cause of poor performance)
{code}
var a = sc.textFile("/tmp/yes.txt")
a.count()
{code}
Same code took 33 sec in spark-shell

Reproduce steps:
Run this  to generate big file (press Ctrl+C after 5-6 seconds)
[spark@c6401 ~]$ yes > /tmp/yes.txt

[spark@c6401 ~]$ ll /tmp/
-rw-r--r--  1 spark     hadoop  516079616 Feb 13 11:10 yes.txt
[spark@c6401 ~]$ hadoop fs -copyFromLocal /tmp/yes.txt /tmp/
[spark@c6401 ~]$ pyspark
{code}
Python 2.6.6 (r266:84292, Feb 22 2013, 00:00:18) 
[GCC 4.4.7 20120313 (Red Hat 4.4.7-3)] on linux2
17/02/13 11:12:36 INFO SparkContext: Running Spark version 1.6.2
{code}
>>> a = sc.textFile("/tmp/yes.txt")
{code}
17/02/13 11:12:58 INFO MemoryStore: Block broadcast_0 stored as values in 
memory (estimated size 341.1 KB, free 341.1 KB)
17/02/13 11:12:58 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in 
memory (estimated size 28.3 KB, free 369.4 KB)
17/02/13 11:12:58 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 
localhost:43389 (size: 28.3 KB, free: 517.4 MB)
17/02/13 11:12:58 INFO SparkContext: Created broadcast 0 from textFile at 
NativeMethodAccessorImpl.java:-2
{code}
>>> a.count()
{code}
17/02/13 11:13:03 INFO FileInputFormat: Total input paths to process : 1
17/02/13 11:13:03 INFO SparkContext: Starting job: count at <stdin>:1
17/02/13 11:13:03 INFO DAGScheduler: Got job 0 (count at <stdin>:1) with 4 
output partitions
17/02/13 11:13:03 INFO DAGScheduler: Final stage: ResultStage 0 (count at 
<stdin>:1)
17/02/13 11:13:03 INFO DAGScheduler: Parents of final stage: List()
17/02/13 11:13:03 INFO DAGScheduler: Missing parents: List()
17/02/13 11:13:03 INFO DAGScheduler: Submitting ResultStage 0 (PythonRDD[2] at 
count at <stdin>:1), which has no missing parents
17/02/13 11:13:03 INFO MemoryStore: Block broadcast_1 stored as values in 
memory (estimated size 5.7 KB, free 375.1 KB)
17/02/13 11:13:03 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in 
memory (estimated size 3.5 KB, free 378.6 KB)
17/02/13 11:13:03 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 
localhost:43389 (size: 3.5 KB, free: 517.4 MB)
17/02/13 11:13:03 INFO SparkContext: Created broadcast 1 from broadcast at 
DAGScheduler.scala:1008
17/02/13 11:13:03 INFO DAGScheduler: Submitting 4 missing tasks from 
ResultStage 0 (PythonRDD[2] at count at <stdin>:1)
17/02/13 11:13:03 INFO TaskSchedulerImpl: Adding task set 0.0 with 4 tasks
17/02/13 11:13:03 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 
localhost, partition 0,ANY, 2149 bytes)
17/02/13 11:13:03 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
17/02/13 11:13:03 INFO HadoopRDD: Input split: 
hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:0+134217728
17/02/13 11:13:03 INFO deprecation: mapred.tip.id is deprecated. Instead, use 
mapreduce.task.id
17/02/13 11:13:03 INFO deprecation: mapred.task.id is deprecated. Instead, use 
mapreduce.task.attempt.id
17/02/13 11:13:03 INFO deprecation: mapred.task.is.map is deprecated. Instead, 
use mapreduce.task.ismap
17/02/13 11:13:03 INFO deprecation: mapred.task.partition is deprecated. 
Instead, use mapreduce.task.partition
17/02/13 11:13:03 INFO deprecation: mapred.job.id is deprecated. Instead, use 
mapreduce.job.id

17/02/13 11:16:37 INFO PythonRunner: Times: total = 213335, boot = 317, init = 
445, finish = 212573
17/02/13 11:16:37 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2182 
bytes result sent to driver
17/02/13 11:16:37 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, 
localhost, partition 1,ANY, 2149 bytes)
17/02/13 11:16:37 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
17/02/13 11:16:37 INFO HadoopRDD: Input split: 
hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:134217728+134217728
17/02/13 11:16:37 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) 
in 213605 ms on localhost (1/4)

17/02/13 11:20:05 INFO PythonRunner: Times: total = 208227, boot = -81, init = 
122, finish = 208186
17/02/13 11:20:05 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 2182 
bytes result sent to driver
17/02/13 11:20:05 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, 
localhost, partition 2,ANY, 2149 bytes)
17/02/13 11:20:05 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)
17/02/13 11:20:05 INFO HadoopRDD: Input split: 
hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:268435456+134217728
17/02/13 11:20:05 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) 
in 208302 ms on localhost (2/4)
17/02/13 11:23:37 INFO PythonRunner: Times: total = 212021, boot = -27, init = 
45, finish = 212003
17/02/13 11:23:37 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 2182 
bytes result sent to driver
17/02/13 11:23:37 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3, 
localhost, partition 3,ANY, 2149 bytes)
17/02/13 11:23:37 INFO Executor: Running task 3.0 in stage 0.0 (TID 3)
17/02/13 11:23:37 INFO HadoopRDD: Input split: 
hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:402653184+113426432
17/02/13 11:23:37 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) 
in 212072 ms on localhost (3/4)


17/02/13 11:26:35 INFO PythonRunner: Times: total = 177879, boot = -4, init = 
9, finish = 177874
17/02/13 11:26:35 INFO Executor: Finished task 3.0 in stage 0.0 (TID 3). 2182 
bytes result sent to driver
17/02/13 11:26:35 INFO DAGScheduler: ResultStage 0 (count at <stdin>:1) 
finished in 811.885 s
17/02/13 11:26:35 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3) 
in 177937 ms on localhost (4/4)
17/02/13 11:26:35 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have 
all completed, from pool 
17/02/13 11:26:35 INFO DAGScheduler: Job 0 finished: count at <stdin>:1, took 
812.147354 s
{code}
258039808

[spark@c6401 ~]$ spark-shell 
scala> var a = sc.textFile("/tmp/yes.txt")
{code}
17/02/13 11:32:26 INFO MemoryStore: Block broadcast_0 stored as values in 
memory (estimated size 341.1 KB, free 341.1 KB)
17/02/13 11:32:26 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in 
memory (estimated size 28.3 KB, free 369.4 KB)
17/02/13 11:32:26 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 
localhost:44733 (size: 28.3 KB, free: 517.4 MB)
17/02/13 11:32:26 INFO SparkContext: Created broadcast 0 from textFile at 
<console>:21
a: org.apache.spark.rdd.RDD[String] = /tmp/yes.txt MapPartitionsRDD[1] at 
textFile at <console>:21
{code}

scala> a.count()

{code}
17/02/13 11:32:45 INFO FileInputFormat: Total input paths to process : 1
17/02/13 11:32:46 INFO SparkContext: Starting job: count at <console>:24
17/02/13 11:32:46 INFO DAGScheduler: Got job 0 (count at <console>:24) with 4 
output partitions
17/02/13 11:32:46 INFO DAGScheduler: Final stage: ResultStage 0 (count at 
<console>:24)
17/02/13 11:32:46 INFO DAGScheduler: Parents of final stage: List()
17/02/13 11:32:46 INFO DAGScheduler: Missing parents: List()
17/02/13 11:32:46 INFO DAGScheduler: Submitting ResultStage 0 (/tmp/yes.txt 
MapPartitionsRDD[1] at textFile at <console>:21), which has no missing parents
17/02/13 11:32:46 INFO MemoryStore: Block broadcast_1 stored as values in 
memory (estimated size 3.0 KB, free 372.4 KB)
17/02/13 11:32:46 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in 
memory (estimated size 1801.0 B, free 374.1 KB)
17/02/13 11:32:46 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 
localhost:44733 (size: 1801.0 B, free: 517.4 MB)
17/02/13 11:32:46 INFO SparkContext: Created broadcast 1 from broadcast at 
DAGScheduler.scala:1008
17/02/13 11:32:46 INFO DAGScheduler: Submitting 4 missing tasks from 
ResultStage 0 (/tmp/yes.txt MapPartitionsRDD[1] at textFile at <console>:21)
17/02/13 11:32:46 INFO TaskSchedulerImpl: Adding task set 0.0 with 4 tasks
17/02/13 11:32:46 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 
localhost, partition 0,ANY, 2149 bytes)
17/02/13 11:32:46 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
17/02/13 11:32:46 INFO HadoopRDD: Input split: 
hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:0+134217728
17/02/13 11:32:46 INFO deprecation: mapred.tip.id is deprecated. Instead, use 
mapreduce.task.id
17/02/13 11:32:46 INFO deprecation: mapred.task.id is deprecated. Instead, use 
mapreduce.task.attempt.id
17/02/13 11:32:46 INFO deprecation: mapred.task.is.map is deprecated. Instead, 
use mapreduce.task.ismap
17/02/13 11:32:46 INFO deprecation: mapred.task.partition is deprecated. 
Instead, use mapreduce.task.partition
17/02/13 11:32:46 INFO deprecation: mapred.job.id is deprecated. Instead, use 
mapreduce.job.id
17/02/13 11:32:55 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2082 
bytes result sent to driver
17/02/13 11:32:55 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, 
localhost, partition 1,ANY, 2149 bytes)
17/02/13 11:32:55 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
17/02/13 11:32:55 INFO HadoopRDD: Input split: 
hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:134217728+134217728
17/02/13 11:32:55 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) 
in 8857 ms on localhost (1/4)
17/02/13 11:33:02 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 2137 
bytes result sent to driver
17/02/13 11:33:02 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, 
localhost, partition 2,ANY, 2149 bytes)
17/02/13 11:33:02 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)
17/02/13 11:33:02 INFO HadoopRDD: Input split: 
hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:268435456+134217728
17/02/13 11:33:02 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) 
in 7928 ms on localhost (2/4)
17/02/13 11:33:12 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 2137 
bytes result sent to driver
17/02/13 11:33:12 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3, 
localhost, partition 3,ANY, 2149 bytes)
17/02/13 11:33:12 INFO Executor: Running task 3.0 in stage 0.0 (TID 3)
17/02/13 11:33:12 INFO HadoopRDD: Input split: 
hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:402653184+113426432
17/02/13 11:33:12 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) 
in 9517 ms on localhost (3/4)
17/02/13 11:33:18 INFO Executor: Finished task 3.0 in stage 0.0 (TID 3). 2137 
bytes result sent to driver
17/02/13 11:33:18 INFO DAGScheduler: ResultStage 0 (count at <console>:24) 
finished in 32.724 s
17/02/13 11:33:18 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3) 
in 6443 ms on localhost (4/4)
17/02/13 11:33:18 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have 
all completed, from pool 
17/02/13 11:33:18 INFO DAGScheduler: Job 0 finished: count at <console>:24, 
took 32.929721 s
{code}

res0: Long = 258039808

Also Input Size metrics in Spark UI is wrong when running pyspark, it says 64.0 
KB (hadoop), however, when running in spark-shell it will show correct info 
128.1 MB (hadoop).

 

  was:
Simple job in pyspark takes 14 minutes to complete
{code}
var a = sc.textFile("/tmp/yes.txt")
a.count()
{code}
Same code took 33 sec in spark-shell

Reproduce steps:
Run this  to generate big file (press Ctrl+C after 5-6 seconds)
[spark@c6401 ~]$ yes > /tmp/yes.txt

[spark@c6401 ~]$ ll /tmp/
-rw-r--r--  1 spark     hadoop  516079616 Feb 13 11:10 yes.txt
[spark@c6401 ~]$ hadoop fs -copyFromLocal /tmp/yes.txt /tmp/
[spark@c6401 ~]$ pyspark
{code}
Python 2.6.6 (r266:84292, Feb 22 2013, 00:00:18) 
[GCC 4.4.7 20120313 (Red Hat 4.4.7-3)] on linux2
17/02/13 11:12:36 INFO SparkContext: Running Spark version 1.6.2
{code}
>>> a = sc.textFile("/tmp/yes.txt")
{code}
17/02/13 11:12:58 INFO MemoryStore: Block broadcast_0 stored as values in 
memory (estimated size 341.1 KB, free 341.1 KB)
17/02/13 11:12:58 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in 
memory (estimated size 28.3 KB, free 369.4 KB)
17/02/13 11:12:58 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 
localhost:43389 (size: 28.3 KB, free: 517.4 MB)
17/02/13 11:12:58 INFO SparkContext: Created broadcast 0 from textFile at 
NativeMethodAccessorImpl.java:-2
{code}
>>> a.count()
{code}
17/02/13 11:13:03 INFO FileInputFormat: Total input paths to process : 1
17/02/13 11:13:03 INFO SparkContext: Starting job: count at <stdin>:1
17/02/13 11:13:03 INFO DAGScheduler: Got job 0 (count at <stdin>:1) with 4 
output partitions
17/02/13 11:13:03 INFO DAGScheduler: Final stage: ResultStage 0 (count at 
<stdin>:1)
17/02/13 11:13:03 INFO DAGScheduler: Parents of final stage: List()
17/02/13 11:13:03 INFO DAGScheduler: Missing parents: List()
17/02/13 11:13:03 INFO DAGScheduler: Submitting ResultStage 0 (PythonRDD[2] at 
count at <stdin>:1), which has no missing parents
17/02/13 11:13:03 INFO MemoryStore: Block broadcast_1 stored as values in 
memory (estimated size 5.7 KB, free 375.1 KB)
17/02/13 11:13:03 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in 
memory (estimated size 3.5 KB, free 378.6 KB)
17/02/13 11:13:03 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 
localhost:43389 (size: 3.5 KB, free: 517.4 MB)
17/02/13 11:13:03 INFO SparkContext: Created broadcast 1 from broadcast at 
DAGScheduler.scala:1008
17/02/13 11:13:03 INFO DAGScheduler: Submitting 4 missing tasks from 
ResultStage 0 (PythonRDD[2] at count at <stdin>:1)
17/02/13 11:13:03 INFO TaskSchedulerImpl: Adding task set 0.0 with 4 tasks
17/02/13 11:13:03 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 
localhost, partition 0,ANY, 2149 bytes)
17/02/13 11:13:03 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
17/02/13 11:13:03 INFO HadoopRDD: Input split: 
hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:0+134217728
17/02/13 11:13:03 INFO deprecation: mapred.tip.id is deprecated. Instead, use 
mapreduce.task.id
17/02/13 11:13:03 INFO deprecation: mapred.task.id is deprecated. Instead, use 
mapreduce.task.attempt.id
17/02/13 11:13:03 INFO deprecation: mapred.task.is.map is deprecated. Instead, 
use mapreduce.task.ismap
17/02/13 11:13:03 INFO deprecation: mapred.task.partition is deprecated. 
Instead, use mapreduce.task.partition
17/02/13 11:13:03 INFO deprecation: mapred.job.id is deprecated. Instead, use 
mapreduce.job.id

17/02/13 11:16:37 INFO PythonRunner: Times: total = 213335, boot = 317, init = 
445, finish = 212573
17/02/13 11:16:37 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2182 
bytes result sent to driver
17/02/13 11:16:37 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, 
localhost, partition 1,ANY, 2149 bytes)
17/02/13 11:16:37 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
17/02/13 11:16:37 INFO HadoopRDD: Input split: 
hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:134217728+134217728
17/02/13 11:16:37 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) 
in 213605 ms on localhost (1/4)

17/02/13 11:20:05 INFO PythonRunner: Times: total = 208227, boot = -81, init = 
122, finish = 208186
17/02/13 11:20:05 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 2182 
bytes result sent to driver
17/02/13 11:20:05 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, 
localhost, partition 2,ANY, 2149 bytes)
17/02/13 11:20:05 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)
17/02/13 11:20:05 INFO HadoopRDD: Input split: 
hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:268435456+134217728
17/02/13 11:20:05 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) 
in 208302 ms on localhost (2/4)
17/02/13 11:23:37 INFO PythonRunner: Times: total = 212021, boot = -27, init = 
45, finish = 212003
17/02/13 11:23:37 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 2182 
bytes result sent to driver
17/02/13 11:23:37 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3, 
localhost, partition 3,ANY, 2149 bytes)
17/02/13 11:23:37 INFO Executor: Running task 3.0 in stage 0.0 (TID 3)
17/02/13 11:23:37 INFO HadoopRDD: Input split: 
hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:402653184+113426432
17/02/13 11:23:37 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) 
in 212072 ms on localhost (3/4)


17/02/13 11:26:35 INFO PythonRunner: Times: total = 177879, boot = -4, init = 
9, finish = 177874
17/02/13 11:26:35 INFO Executor: Finished task 3.0 in stage 0.0 (TID 3). 2182 
bytes result sent to driver
17/02/13 11:26:35 INFO DAGScheduler: ResultStage 0 (count at <stdin>:1) 
finished in 811.885 s
17/02/13 11:26:35 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3) 
in 177937 ms on localhost (4/4)
17/02/13 11:26:35 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have 
all completed, from pool 
17/02/13 11:26:35 INFO DAGScheduler: Job 0 finished: count at <stdin>:1, took 
812.147354 s
{code}
258039808

[spark@c6401 ~]$ spark-shell 
scala> var a = sc.textFile("/tmp/yes.txt")
{code}
17/02/13 11:32:26 INFO MemoryStore: Block broadcast_0 stored as values in 
memory (estimated size 341.1 KB, free 341.1 KB)
17/02/13 11:32:26 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in 
memory (estimated size 28.3 KB, free 369.4 KB)
17/02/13 11:32:26 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 
localhost:44733 (size: 28.3 KB, free: 517.4 MB)
17/02/13 11:32:26 INFO SparkContext: Created broadcast 0 from textFile at 
<console>:21
a: org.apache.spark.rdd.RDD[String] = /tmp/yes.txt MapPartitionsRDD[1] at 
textFile at <console>:21
{code}

scala> a.count()

{code}
17/02/13 11:32:45 INFO FileInputFormat: Total input paths to process : 1
17/02/13 11:32:46 INFO SparkContext: Starting job: count at <console>:24
17/02/13 11:32:46 INFO DAGScheduler: Got job 0 (count at <console>:24) with 4 
output partitions
17/02/13 11:32:46 INFO DAGScheduler: Final stage: ResultStage 0 (count at 
<console>:24)
17/02/13 11:32:46 INFO DAGScheduler: Parents of final stage: List()
17/02/13 11:32:46 INFO DAGScheduler: Missing parents: List()
17/02/13 11:32:46 INFO DAGScheduler: Submitting ResultStage 0 (/tmp/yes.txt 
MapPartitionsRDD[1] at textFile at <console>:21), which has no missing parents
17/02/13 11:32:46 INFO MemoryStore: Block broadcast_1 stored as values in 
memory (estimated size 3.0 KB, free 372.4 KB)
17/02/13 11:32:46 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in 
memory (estimated size 1801.0 B, free 374.1 KB)
17/02/13 11:32:46 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 
localhost:44733 (size: 1801.0 B, free: 517.4 MB)
17/02/13 11:32:46 INFO SparkContext: Created broadcast 1 from broadcast at 
DAGScheduler.scala:1008
17/02/13 11:32:46 INFO DAGScheduler: Submitting 4 missing tasks from 
ResultStage 0 (/tmp/yes.txt MapPartitionsRDD[1] at textFile at <console>:21)
17/02/13 11:32:46 INFO TaskSchedulerImpl: Adding task set 0.0 with 4 tasks
17/02/13 11:32:46 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 
localhost, partition 0,ANY, 2149 bytes)
17/02/13 11:32:46 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
17/02/13 11:32:46 INFO HadoopRDD: Input split: 
hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:0+134217728
17/02/13 11:32:46 INFO deprecation: mapred.tip.id is deprecated. Instead, use 
mapreduce.task.id
17/02/13 11:32:46 INFO deprecation: mapred.task.id is deprecated. Instead, use 
mapreduce.task.attempt.id
17/02/13 11:32:46 INFO deprecation: mapred.task.is.map is deprecated. Instead, 
use mapreduce.task.ismap
17/02/13 11:32:46 INFO deprecation: mapred.task.partition is deprecated. 
Instead, use mapreduce.task.partition
17/02/13 11:32:46 INFO deprecation: mapred.job.id is deprecated. Instead, use 
mapreduce.job.id
17/02/13 11:32:55 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2082 
bytes result sent to driver
17/02/13 11:32:55 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, 
localhost, partition 1,ANY, 2149 bytes)
17/02/13 11:32:55 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
17/02/13 11:32:55 INFO HadoopRDD: Input split: 
hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:134217728+134217728
17/02/13 11:32:55 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) 
in 8857 ms on localhost (1/4)
17/02/13 11:33:02 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 2137 
bytes result sent to driver
17/02/13 11:33:02 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, 
localhost, partition 2,ANY, 2149 bytes)
17/02/13 11:33:02 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)
17/02/13 11:33:02 INFO HadoopRDD: Input split: 
hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:268435456+134217728
17/02/13 11:33:02 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) 
in 7928 ms on localhost (2/4)
17/02/13 11:33:12 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 2137 
bytes result sent to driver
17/02/13 11:33:12 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3, 
localhost, partition 3,ANY, 2149 bytes)
17/02/13 11:33:12 INFO Executor: Running task 3.0 in stage 0.0 (TID 3)
17/02/13 11:33:12 INFO HadoopRDD: Input split: 
hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:402653184+113426432
17/02/13 11:33:12 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) 
in 9517 ms on localhost (3/4)
17/02/13 11:33:18 INFO Executor: Finished task 3.0 in stage 0.0 (TID 3). 2137 
bytes result sent to driver
17/02/13 11:33:18 INFO DAGScheduler: ResultStage 0 (count at <console>:24) 
finished in 32.724 s
17/02/13 11:33:18 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3) 
in 6443 ms on localhost (4/4)
17/02/13 11:33:18 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have 
all completed, from pool 
17/02/13 11:33:18 INFO DAGScheduler: Job 0 finished: count at <console>:24, 
took 32.929721 s
{code}

res0: Long = 258039808

Also Input Size metrics in Spark UI is wrong when running pyspark, it says 64.0 
KB (hadoop), however, when running in spark-shell it will show correct info 
128.1 MB (hadoop).

 


> Poor pyspark performance + incorrect UI input-size metrics
> ----------------------------------------------------------
>
>                 Key: SPARK-19578
>                 URL: https://issues.apache.org/jira/browse/SPARK-19578
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark, Web UI
>    Affects Versions: 1.6.1, 1.6.2, 2.0.1
>         Environment: Spark 1.6.2 Hortonworks
> Spark 2.0.1 MapR
> Spark 1.6.1 MapR
>            Reporter: Artur Sukhenko
>         Attachments: pyspark_incorrect_inputsize.png, reproduce_log, 
> spark_shell_correct_inputsize.png
>
>
> Simple job in pyspark takes 14 minutes to complete.
> The text file used to reproduce contains multiple millions lines of one word 
> "yes"
>  (it might be the cause of poor performance)
> {code}
> var a = sc.textFile("/tmp/yes.txt")
> a.count()
> {code}
> Same code took 33 sec in spark-shell
> Reproduce steps:
> Run this  to generate big file (press Ctrl+C after 5-6 seconds)
> [spark@c6401 ~]$ yes > /tmp/yes.txt
> [spark@c6401 ~]$ ll /tmp/
> -rw-r--r--  1 spark     hadoop  516079616 Feb 13 11:10 yes.txt
> [spark@c6401 ~]$ hadoop fs -copyFromLocal /tmp/yes.txt /tmp/
> [spark@c6401 ~]$ pyspark
> {code}
> Python 2.6.6 (r266:84292, Feb 22 2013, 00:00:18) 
> [GCC 4.4.7 20120313 (Red Hat 4.4.7-3)] on linux2
> 17/02/13 11:12:36 INFO SparkContext: Running Spark version 1.6.2
> {code}
> >>> a = sc.textFile("/tmp/yes.txt")
> {code}
> 17/02/13 11:12:58 INFO MemoryStore: Block broadcast_0 stored as values in 
> memory (estimated size 341.1 KB, free 341.1 KB)
> 17/02/13 11:12:58 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes 
> in memory (estimated size 28.3 KB, free 369.4 KB)
> 17/02/13 11:12:58 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory 
> on localhost:43389 (size: 28.3 KB, free: 517.4 MB)
> 17/02/13 11:12:58 INFO SparkContext: Created broadcast 0 from textFile at 
> NativeMethodAccessorImpl.java:-2
> {code}
> >>> a.count()
> {code}
> 17/02/13 11:13:03 INFO FileInputFormat: Total input paths to process : 1
> 17/02/13 11:13:03 INFO SparkContext: Starting job: count at <stdin>:1
> 17/02/13 11:13:03 INFO DAGScheduler: Got job 0 (count at <stdin>:1) with 4 
> output partitions
> 17/02/13 11:13:03 INFO DAGScheduler: Final stage: ResultStage 0 (count at 
> <stdin>:1)
> 17/02/13 11:13:03 INFO DAGScheduler: Parents of final stage: List()
> 17/02/13 11:13:03 INFO DAGScheduler: Missing parents: List()
> 17/02/13 11:13:03 INFO DAGScheduler: Submitting ResultStage 0 (PythonRDD[2] 
> at count at <stdin>:1), which has no missing parents
> 17/02/13 11:13:03 INFO MemoryStore: Block broadcast_1 stored as values in 
> memory (estimated size 5.7 KB, free 375.1 KB)
> 17/02/13 11:13:03 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes 
> in memory (estimated size 3.5 KB, free 378.6 KB)
> 17/02/13 11:13:03 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory 
> on localhost:43389 (size: 3.5 KB, free: 517.4 MB)
> 17/02/13 11:13:03 INFO SparkContext: Created broadcast 1 from broadcast at 
> DAGScheduler.scala:1008
> 17/02/13 11:13:03 INFO DAGScheduler: Submitting 4 missing tasks from 
> ResultStage 0 (PythonRDD[2] at count at <stdin>:1)
> 17/02/13 11:13:03 INFO TaskSchedulerImpl: Adding task set 0.0 with 4 tasks
> 17/02/13 11:13:03 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 
> localhost, partition 0,ANY, 2149 bytes)
> 17/02/13 11:13:03 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
> 17/02/13 11:13:03 INFO HadoopRDD: Input split: 
> hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:0+134217728
> 17/02/13 11:13:03 INFO deprecation: mapred.tip.id is deprecated. Instead, use 
> mapreduce.task.id
> 17/02/13 11:13:03 INFO deprecation: mapred.task.id is deprecated. Instead, 
> use mapreduce.task.attempt.id
> 17/02/13 11:13:03 INFO deprecation: mapred.task.is.map is deprecated. 
> Instead, use mapreduce.task.ismap
> 17/02/13 11:13:03 INFO deprecation: mapred.task.partition is deprecated. 
> Instead, use mapreduce.task.partition
> 17/02/13 11:13:03 INFO deprecation: mapred.job.id is deprecated. Instead, use 
> mapreduce.job.id
> 17/02/13 11:16:37 INFO PythonRunner: Times: total = 213335, boot = 317, init 
> = 445, finish = 212573
> 17/02/13 11:16:37 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2182 
> bytes result sent to driver
> 17/02/13 11:16:37 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, 
> localhost, partition 1,ANY, 2149 bytes)
> 17/02/13 11:16:37 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
> 17/02/13 11:16:37 INFO HadoopRDD: Input split: 
> hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:134217728+134217728
> 17/02/13 11:16:37 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) 
> in 213605 ms on localhost (1/4)
> 17/02/13 11:20:05 INFO PythonRunner: Times: total = 208227, boot = -81, init 
> = 122, finish = 208186
> 17/02/13 11:20:05 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 2182 
> bytes result sent to driver
> 17/02/13 11:20:05 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, 
> localhost, partition 2,ANY, 2149 bytes)
> 17/02/13 11:20:05 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)
> 17/02/13 11:20:05 INFO HadoopRDD: Input split: 
> hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:268435456+134217728
> 17/02/13 11:20:05 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) 
> in 208302 ms on localhost (2/4)
> 17/02/13 11:23:37 INFO PythonRunner: Times: total = 212021, boot = -27, init 
> = 45, finish = 212003
> 17/02/13 11:23:37 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 2182 
> bytes result sent to driver
> 17/02/13 11:23:37 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3, 
> localhost, partition 3,ANY, 2149 bytes)
> 17/02/13 11:23:37 INFO Executor: Running task 3.0 in stage 0.0 (TID 3)
> 17/02/13 11:23:37 INFO HadoopRDD: Input split: 
> hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:402653184+113426432
> 17/02/13 11:23:37 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) 
> in 212072 ms on localhost (3/4)
> 17/02/13 11:26:35 INFO PythonRunner: Times: total = 177879, boot = -4, init = 
> 9, finish = 177874
> 17/02/13 11:26:35 INFO Executor: Finished task 3.0 in stage 0.0 (TID 3). 2182 
> bytes result sent to driver
> 17/02/13 11:26:35 INFO DAGScheduler: ResultStage 0 (count at <stdin>:1) 
> finished in 811.885 s
> 17/02/13 11:26:35 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3) 
> in 177937 ms on localhost (4/4)
> 17/02/13 11:26:35 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks 
> have all completed, from pool 
> 17/02/13 11:26:35 INFO DAGScheduler: Job 0 finished: count at <stdin>:1, took 
> 812.147354 s
> {code}
> 258039808
> [spark@c6401 ~]$ spark-shell 
> scala> var a = sc.textFile("/tmp/yes.txt")
> {code}
> 17/02/13 11:32:26 INFO MemoryStore: Block broadcast_0 stored as values in 
> memory (estimated size 341.1 KB, free 341.1 KB)
> 17/02/13 11:32:26 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes 
> in memory (estimated size 28.3 KB, free 369.4 KB)
> 17/02/13 11:32:26 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory 
> on localhost:44733 (size: 28.3 KB, free: 517.4 MB)
> 17/02/13 11:32:26 INFO SparkContext: Created broadcast 0 from textFile at 
> <console>:21
> a: org.apache.spark.rdd.RDD[String] = /tmp/yes.txt MapPartitionsRDD[1] at 
> textFile at <console>:21
> {code}
> scala> a.count()
> {code}
> 17/02/13 11:32:45 INFO FileInputFormat: Total input paths to process : 1
> 17/02/13 11:32:46 INFO SparkContext: Starting job: count at <console>:24
> 17/02/13 11:32:46 INFO DAGScheduler: Got job 0 (count at <console>:24) with 4 
> output partitions
> 17/02/13 11:32:46 INFO DAGScheduler: Final stage: ResultStage 0 (count at 
> <console>:24)
> 17/02/13 11:32:46 INFO DAGScheduler: Parents of final stage: List()
> 17/02/13 11:32:46 INFO DAGScheduler: Missing parents: List()
> 17/02/13 11:32:46 INFO DAGScheduler: Submitting ResultStage 0 (/tmp/yes.txt 
> MapPartitionsRDD[1] at textFile at <console>:21), which has no missing parents
> 17/02/13 11:32:46 INFO MemoryStore: Block broadcast_1 stored as values in 
> memory (estimated size 3.0 KB, free 372.4 KB)
> 17/02/13 11:32:46 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes 
> in memory (estimated size 1801.0 B, free 374.1 KB)
> 17/02/13 11:32:46 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory 
> on localhost:44733 (size: 1801.0 B, free: 517.4 MB)
> 17/02/13 11:32:46 INFO SparkContext: Created broadcast 1 from broadcast at 
> DAGScheduler.scala:1008
> 17/02/13 11:32:46 INFO DAGScheduler: Submitting 4 missing tasks from 
> ResultStage 0 (/tmp/yes.txt MapPartitionsRDD[1] at textFile at <console>:21)
> 17/02/13 11:32:46 INFO TaskSchedulerImpl: Adding task set 0.0 with 4 tasks
> 17/02/13 11:32:46 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 
> localhost, partition 0,ANY, 2149 bytes)
> 17/02/13 11:32:46 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
> 17/02/13 11:32:46 INFO HadoopRDD: Input split: 
> hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:0+134217728
> 17/02/13 11:32:46 INFO deprecation: mapred.tip.id is deprecated. Instead, use 
> mapreduce.task.id
> 17/02/13 11:32:46 INFO deprecation: mapred.task.id is deprecated. Instead, 
> use mapreduce.task.attempt.id
> 17/02/13 11:32:46 INFO deprecation: mapred.task.is.map is deprecated. 
> Instead, use mapreduce.task.ismap
> 17/02/13 11:32:46 INFO deprecation: mapred.task.partition is deprecated. 
> Instead, use mapreduce.task.partition
> 17/02/13 11:32:46 INFO deprecation: mapred.job.id is deprecated. Instead, use 
> mapreduce.job.id
> 17/02/13 11:32:55 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2082 
> bytes result sent to driver
> 17/02/13 11:32:55 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, 
> localhost, partition 1,ANY, 2149 bytes)
> 17/02/13 11:32:55 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
> 17/02/13 11:32:55 INFO HadoopRDD: Input split: 
> hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:134217728+134217728
> 17/02/13 11:32:55 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) 
> in 8857 ms on localhost (1/4)
> 17/02/13 11:33:02 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 2137 
> bytes result sent to driver
> 17/02/13 11:33:02 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, 
> localhost, partition 2,ANY, 2149 bytes)
> 17/02/13 11:33:02 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)
> 17/02/13 11:33:02 INFO HadoopRDD: Input split: 
> hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:268435456+134217728
> 17/02/13 11:33:02 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) 
> in 7928 ms on localhost (2/4)
> 17/02/13 11:33:12 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 2137 
> bytes result sent to driver
> 17/02/13 11:33:12 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3, 
> localhost, partition 3,ANY, 2149 bytes)
> 17/02/13 11:33:12 INFO Executor: Running task 3.0 in stage 0.0 (TID 3)
> 17/02/13 11:33:12 INFO HadoopRDD: Input split: 
> hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:402653184+113426432
> 17/02/13 11:33:12 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) 
> in 9517 ms on localhost (3/4)
> 17/02/13 11:33:18 INFO Executor: Finished task 3.0 in stage 0.0 (TID 3). 2137 
> bytes result sent to driver
> 17/02/13 11:33:18 INFO DAGScheduler: ResultStage 0 (count at <console>:24) 
> finished in 32.724 s
> 17/02/13 11:33:18 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3) 
> in 6443 ms on localhost (4/4)
> 17/02/13 11:33:18 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks 
> have all completed, from pool 
> 17/02/13 11:33:18 INFO DAGScheduler: Job 0 finished: count at <console>:24, 
> took 32.929721 s
> {code}
> res0: Long = 258039808
> Also Input Size metrics in Spark UI is wrong when running pyspark, it says 
> 64.0 KB (hadoop), however, when running in spark-shell it will show correct 
> info 128.1 MB (hadoop).
>  



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to