Artur Sukhenko created SPARK-19578:
--------------------------------------
Summary: Poor pyspark performance + incorrect UI input-size metrics
Key: SPARK-19578
URL: https://issues.apache.org/jira/browse/SPARK-19578
Project: Spark
Issue Type: Bug
Components: PySpark, Web UI
Affects Versions: 2.0.1, 1.6.2, 1.6.1
Environment: Spark 1.6.2 Hortonworks
Spark 2.0.1 MapR
Spark 1.6.1 MapR
Reporter: Artur Sukhenko
Simple job in pyspark takes 14 minutes to complete
{code}
var a = sc.textFile("/tmp/yes.txt")
a.count()
{code}
Same code took 33 sec in spark-shell
Reproduce steps:
Run this to generate big file (press Ctrl+C after 5-6 seconds)
[spark@c6401 ~]$ yes > /tmp/yes.txt
[spark@c6401 ~]$ ll /tmp/
-rw-r--r-- 1 spark hadoop 516079616 Feb 13 11:10 yes.txt
[spark@c6401 ~]$ hadoop fs -copyFromLocal /tmp/yes.txt /tmp/
[spark@c6401 ~]$ pyspark
{code}
Python 2.6.6 (r266:84292, Feb 22 2013, 00:00:18)
[GCC 4.4.7 20120313 (Red Hat 4.4.7-3)] on linux2
17/02/13 11:12:36 INFO SparkContext: Running Spark version 1.6.2
{code}
>>> a = sc.textFile("/tmp/yes.txt")
{code}
17/02/13 11:12:58 INFO MemoryStore: Block broadcast_0 stored as values in
memory (estimated size 341.1 KB, free 341.1 KB)
17/02/13 11:12:58 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in
memory (estimated size 28.3 KB, free 369.4 KB)
17/02/13 11:12:58 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on
localhost:43389 (size: 28.3 KB, free: 517.4 MB)
17/02/13 11:12:58 INFO SparkContext: Created broadcast 0 from textFile at
NativeMethodAccessorImpl.java:-2
{code}
>>> a.count()
{code}
17/02/13 11:13:03 INFO FileInputFormat: Total input paths to process : 1
17/02/13 11:13:03 INFO SparkContext: Starting job: count at <stdin>:1
17/02/13 11:13:03 INFO DAGScheduler: Got job 0 (count at <stdin>:1) with 4
output partitions
17/02/13 11:13:03 INFO DAGScheduler: Final stage: ResultStage 0 (count at
<stdin>:1)
17/02/13 11:13:03 INFO DAGScheduler: Parents of final stage: List()
17/02/13 11:13:03 INFO DAGScheduler: Missing parents: List()
17/02/13 11:13:03 INFO DAGScheduler: Submitting ResultStage 0 (PythonRDD[2] at
count at <stdin>:1), which has no missing parents
17/02/13 11:13:03 INFO MemoryStore: Block broadcast_1 stored as values in
memory (estimated size 5.7 KB, free 375.1 KB)
17/02/13 11:13:03 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in
memory (estimated size 3.5 KB, free 378.6 KB)
17/02/13 11:13:03 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on
localhost:43389 (size: 3.5 KB, free: 517.4 MB)
17/02/13 11:13:03 INFO SparkContext: Created broadcast 1 from broadcast at
DAGScheduler.scala:1008
17/02/13 11:13:03 INFO DAGScheduler: Submitting 4 missing tasks from
ResultStage 0 (PythonRDD[2] at count at <stdin>:1)
17/02/13 11:13:03 INFO TaskSchedulerImpl: Adding task set 0.0 with 4 tasks
17/02/13 11:13:03 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0,
localhost, partition 0,ANY, 2149 bytes)
17/02/13 11:13:03 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
17/02/13 11:13:03 INFO HadoopRDD: Input split:
hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:0+134217728
17/02/13 11:13:03 INFO deprecation: mapred.tip.id is deprecated. Instead, use
mapreduce.task.id
17/02/13 11:13:03 INFO deprecation: mapred.task.id is deprecated. Instead, use
mapreduce.task.attempt.id
17/02/13 11:13:03 INFO deprecation: mapred.task.is.map is deprecated. Instead,
use mapreduce.task.ismap
17/02/13 11:13:03 INFO deprecation: mapred.task.partition is deprecated.
Instead, use mapreduce.task.partition
17/02/13 11:13:03 INFO deprecation: mapred.job.id is deprecated. Instead, use
mapreduce.job.id
17/02/13 11:16:37 INFO PythonRunner: Times: total = 213335, boot = 317, init =
445, finish = 212573
17/02/13 11:16:37 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2182
bytes result sent to driver
17/02/13 11:16:37 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1,
localhost, partition 1,ANY, 2149 bytes)
17/02/13 11:16:37 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
17/02/13 11:16:37 INFO HadoopRDD: Input split:
hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:134217728+134217728
17/02/13 11:16:37 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0)
in 213605 ms on localhost (1/4)
17/02/13 11:20:05 INFO PythonRunner: Times: total = 208227, boot = -81, init =
122, finish = 208186
17/02/13 11:20:05 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 2182
bytes result sent to driver
17/02/13 11:20:05 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2,
localhost, partition 2,ANY, 2149 bytes)
17/02/13 11:20:05 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)
17/02/13 11:20:05 INFO HadoopRDD: Input split:
hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:268435456+134217728
17/02/13 11:20:05 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1)
in 208302 ms on localhost (2/4)
17/02/13 11:23:37 INFO PythonRunner: Times: total = 212021, boot = -27, init =
45, finish = 212003
17/02/13 11:23:37 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 2182
bytes result sent to driver
17/02/13 11:23:37 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3,
localhost, partition 3,ANY, 2149 bytes)
17/02/13 11:23:37 INFO Executor: Running task 3.0 in stage 0.0 (TID 3)
17/02/13 11:23:37 INFO HadoopRDD: Input split:
hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:402653184+113426432
17/02/13 11:23:37 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2)
in 212072 ms on localhost (3/4)
17/02/13 11:26:35 INFO PythonRunner: Times: total = 177879, boot = -4, init =
9, finish = 177874
17/02/13 11:26:35 INFO Executor: Finished task 3.0 in stage 0.0 (TID 3). 2182
bytes result sent to driver
17/02/13 11:26:35 INFO DAGScheduler: ResultStage 0 (count at <stdin>:1)
finished in 811.885 s
17/02/13 11:26:35 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3)
in 177937 ms on localhost (4/4)
17/02/13 11:26:35 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have
all completed, from pool
17/02/13 11:26:35 INFO DAGScheduler: Job 0 finished: count at <stdin>:1, took
812.147354 s
{code}
258039808
[spark@c6401 ~]$ spark-shell
scala> var a = sc.textFile("/tmp/yes.txt")
{code}
17/02/13 11:32:26 INFO MemoryStore: Block broadcast_0 stored as values in
memory (estimated size 341.1 KB, free 341.1 KB)
17/02/13 11:32:26 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in
memory (estimated size 28.3 KB, free 369.4 KB)
17/02/13 11:32:26 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on
localhost:44733 (size: 28.3 KB, free: 517.4 MB)
17/02/13 11:32:26 INFO SparkContext: Created broadcast 0 from textFile at
<console>:21
a: org.apache.spark.rdd.RDD[String] = /tmp/yes.txt MapPartitionsRDD[1] at
textFile at <console>:21
{code}
scala> a.count()
{code}
17/02/13 11:32:45 INFO FileInputFormat: Total input paths to process : 1
17/02/13 11:32:46 INFO SparkContext: Starting job: count at <console>:24
17/02/13 11:32:46 INFO DAGScheduler: Got job 0 (count at <console>:24) with 4
output partitions
17/02/13 11:32:46 INFO DAGScheduler: Final stage: ResultStage 0 (count at
<console>:24)
17/02/13 11:32:46 INFO DAGScheduler: Parents of final stage: List()
17/02/13 11:32:46 INFO DAGScheduler: Missing parents: List()
17/02/13 11:32:46 INFO DAGScheduler: Submitting ResultStage 0 (/tmp/yes.txt
MapPartitionsRDD[1] at textFile at <console>:21), which has no missing parents
17/02/13 11:32:46 INFO MemoryStore: Block broadcast_1 stored as values in
memory (estimated size 3.0 KB, free 372.4 KB)
17/02/13 11:32:46 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in
memory (estimated size 1801.0 B, free 374.1 KB)
17/02/13 11:32:46 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on
localhost:44733 (size: 1801.0 B, free: 517.4 MB)
17/02/13 11:32:46 INFO SparkContext: Created broadcast 1 from broadcast at
DAGScheduler.scala:1008
17/02/13 11:32:46 INFO DAGScheduler: Submitting 4 missing tasks from
ResultStage 0 (/tmp/yes.txt MapPartitionsRDD[1] at textFile at <console>:21)
17/02/13 11:32:46 INFO TaskSchedulerImpl: Adding task set 0.0 with 4 tasks
17/02/13 11:32:46 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0,
localhost, partition 0,ANY, 2149 bytes)
17/02/13 11:32:46 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
17/02/13 11:32:46 INFO HadoopRDD: Input split:
hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:0+134217728
17/02/13 11:32:46 INFO deprecation: mapred.tip.id is deprecated. Instead, use
mapreduce.task.id
17/02/13 11:32:46 INFO deprecation: mapred.task.id is deprecated. Instead, use
mapreduce.task.attempt.id
17/02/13 11:32:46 INFO deprecation: mapred.task.is.map is deprecated. Instead,
use mapreduce.task.ismap
17/02/13 11:32:46 INFO deprecation: mapred.task.partition is deprecated.
Instead, use mapreduce.task.partition
17/02/13 11:32:46 INFO deprecation: mapred.job.id is deprecated. Instead, use
mapreduce.job.id
17/02/13 11:32:55 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2082
bytes result sent to driver
17/02/13 11:32:55 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1,
localhost, partition 1,ANY, 2149 bytes)
17/02/13 11:32:55 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
17/02/13 11:32:55 INFO HadoopRDD: Input split:
hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:134217728+134217728
17/02/13 11:32:55 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0)
in 8857 ms on localhost (1/4)
17/02/13 11:33:02 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 2137
bytes result sent to driver
17/02/13 11:33:02 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2,
localhost, partition 2,ANY, 2149 bytes)
17/02/13 11:33:02 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)
17/02/13 11:33:02 INFO HadoopRDD: Input split:
hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:268435456+134217728
17/02/13 11:33:02 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1)
in 7928 ms on localhost (2/4)
17/02/13 11:33:12 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 2137
bytes result sent to driver
17/02/13 11:33:12 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3,
localhost, partition 3,ANY, 2149 bytes)
17/02/13 11:33:12 INFO Executor: Running task 3.0 in stage 0.0 (TID 3)
17/02/13 11:33:12 INFO HadoopRDD: Input split:
hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:402653184+113426432
17/02/13 11:33:12 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2)
in 9517 ms on localhost (3/4)
17/02/13 11:33:18 INFO Executor: Finished task 3.0 in stage 0.0 (TID 3). 2137
bytes result sent to driver
17/02/13 11:33:18 INFO DAGScheduler: ResultStage 0 (count at <console>:24)
finished in 32.724 s
17/02/13 11:33:18 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3)
in 6443 ms on localhost (4/4)
17/02/13 11:33:18 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have
all completed, from pool
17/02/13 11:33:18 INFO DAGScheduler: Job 0 finished: count at <console>:24,
took 32.929721 s
{code}
res0: Long = 258039808
Also Input Size metrics in Spark UI is wrong when running pyspark, it says 64.0
KB (hadoop), however, when running in spark-shell it will show correct info
128.1 MB (hadoop).
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]