[ https://issues.apache.org/jira/browse/SPARK-19578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Artur Sukhenko updated SPARK-19578: ----------------------------------- Description: Simple job in pyspark takes 14 minutes to complete. The text file used to reproduce contains multiple millions lines of one word "yes" (it might be the cause of poor performance) {code} var a = sc.textFile("/tmp/yes.txt") a.count() {code} Same code took 33 sec in spark-shell Reproduce steps: Run this to generate big file (press Ctrl+C after 5-6 seconds) [spark@c6401 ~]$ yes > /tmp/yes.txt [spark@c6401 ~]$ ll /tmp/ -rw-r--r-- 1 spark hadoop 516079616 Feb 13 11:10 yes.txt [spark@c6401 ~]$ hadoop fs -copyFromLocal /tmp/yes.txt /tmp/ [spark@c6401 ~]$ pyspark {code} Python 2.6.6 (r266:84292, Feb 22 2013, 00:00:18) [GCC 4.4.7 20120313 (Red Hat 4.4.7-3)] on linux2 17/02/13 11:12:36 INFO SparkContext: Running Spark version 1.6.2 {code} >>> a = sc.textFile("/tmp/yes.txt") {code} 17/02/13 11:12:58 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 341.1 KB, free 341.1 KB) 17/02/13 11:12:58 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 28.3 KB, free 369.4 KB) 17/02/13 11:12:58 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:43389 (size: 28.3 KB, free: 517.4 MB) 17/02/13 11:12:58 INFO SparkContext: Created broadcast 0 from textFile at NativeMethodAccessorImpl.java:-2 {code} >>> a.count() {code} 17/02/13 11:13:03 INFO FileInputFormat: Total input paths to process : 1 17/02/13 11:13:03 INFO SparkContext: Starting job: count at <stdin>:1 17/02/13 11:13:03 INFO DAGScheduler: Got job 0 (count at <stdin>:1) with 4 output partitions 17/02/13 11:13:03 INFO DAGScheduler: Final stage: ResultStage 0 (count at <stdin>:1) 17/02/13 11:13:03 INFO DAGScheduler: Parents of final stage: List() 17/02/13 11:13:03 INFO DAGScheduler: Missing parents: List() 17/02/13 11:13:03 INFO DAGScheduler: Submitting ResultStage 0 (PythonRDD[2] at count at <stdin>:1), which has no missing parents 17/02/13 11:13:03 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 5.7 KB, free 375.1 KB) 17/02/13 11:13:03 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 3.5 KB, free 378.6 KB) 17/02/13 11:13:03 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:43389 (size: 3.5 KB, free: 517.4 MB) 17/02/13 11:13:03 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1008 17/02/13 11:13:03 INFO DAGScheduler: Submitting 4 missing tasks from ResultStage 0 (PythonRDD[2] at count at <stdin>:1) 17/02/13 11:13:03 INFO TaskSchedulerImpl: Adding task set 0.0 with 4 tasks 17/02/13 11:13:03 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0,ANY, 2149 bytes) 17/02/13 11:13:03 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) 17/02/13 11:13:03 INFO HadoopRDD: Input split: hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:0+134217728 17/02/13 11:13:03 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id 17/02/13 11:13:03 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id 17/02/13 11:13:03 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap 17/02/13 11:13:03 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition 17/02/13 11:13:03 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id 17/02/13 11:16:37 INFO PythonRunner: Times: total = 213335, boot = 317, init = 445, finish = 212573 17/02/13 11:16:37 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2182 bytes result sent to driver 17/02/13 11:16:37 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, partition 1,ANY, 2149 bytes) 17/02/13 11:16:37 INFO Executor: Running task 1.0 in stage 0.0 (TID 1) 17/02/13 11:16:37 INFO HadoopRDD: Input split: hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:134217728+134217728 17/02/13 11:16:37 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 213605 ms on localhost (1/4) 17/02/13 11:20:05 INFO PythonRunner: Times: total = 208227, boot = -81, init = 122, finish = 208186 17/02/13 11:20:05 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 2182 bytes result sent to driver 17/02/13 11:20:05 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, localhost, partition 2,ANY, 2149 bytes) 17/02/13 11:20:05 INFO Executor: Running task 2.0 in stage 0.0 (TID 2) 17/02/13 11:20:05 INFO HadoopRDD: Input split: hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:268435456+134217728 17/02/13 11:20:05 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 208302 ms on localhost (2/4) 17/02/13 11:23:37 INFO PythonRunner: Times: total = 212021, boot = -27, init = 45, finish = 212003 17/02/13 11:23:37 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 2182 bytes result sent to driver 17/02/13 11:23:37 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3, localhost, partition 3,ANY, 2149 bytes) 17/02/13 11:23:37 INFO Executor: Running task 3.0 in stage 0.0 (TID 3) 17/02/13 11:23:37 INFO HadoopRDD: Input split: hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:402653184+113426432 17/02/13 11:23:37 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 212072 ms on localhost (3/4) 17/02/13 11:26:35 INFO PythonRunner: Times: total = 177879, boot = -4, init = 9, finish = 177874 17/02/13 11:26:35 INFO Executor: Finished task 3.0 in stage 0.0 (TID 3). 2182 bytes result sent to driver 17/02/13 11:26:35 INFO DAGScheduler: ResultStage 0 (count at <stdin>:1) finished in 811.885 s 17/02/13 11:26:35 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3) in 177937 ms on localhost (4/4) 17/02/13 11:26:35 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 17/02/13 11:26:35 INFO DAGScheduler: Job 0 finished: count at <stdin>:1, took 812.147354 s {code} 258039808 [spark@c6401 ~]$ spark-shell scala> var a = sc.textFile("/tmp/yes.txt") {code} 17/02/13 11:32:26 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 341.1 KB, free 341.1 KB) 17/02/13 11:32:26 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 28.3 KB, free 369.4 KB) 17/02/13 11:32:26 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:44733 (size: 28.3 KB, free: 517.4 MB) 17/02/13 11:32:26 INFO SparkContext: Created broadcast 0 from textFile at <console>:21 a: org.apache.spark.rdd.RDD[String] = /tmp/yes.txt MapPartitionsRDD[1] at textFile at <console>:21 {code} scala> a.count() {code} 17/02/13 11:32:45 INFO FileInputFormat: Total input paths to process : 1 17/02/13 11:32:46 INFO SparkContext: Starting job: count at <console>:24 17/02/13 11:32:46 INFO DAGScheduler: Got job 0 (count at <console>:24) with 4 output partitions 17/02/13 11:32:46 INFO DAGScheduler: Final stage: ResultStage 0 (count at <console>:24) 17/02/13 11:32:46 INFO DAGScheduler: Parents of final stage: List() 17/02/13 11:32:46 INFO DAGScheduler: Missing parents: List() 17/02/13 11:32:46 INFO DAGScheduler: Submitting ResultStage 0 (/tmp/yes.txt MapPartitionsRDD[1] at textFile at <console>:21), which has no missing parents 17/02/13 11:32:46 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 3.0 KB, free 372.4 KB) 17/02/13 11:32:46 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 1801.0 B, free 374.1 KB) 17/02/13 11:32:46 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:44733 (size: 1801.0 B, free: 517.4 MB) 17/02/13 11:32:46 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1008 17/02/13 11:32:46 INFO DAGScheduler: Submitting 4 missing tasks from ResultStage 0 (/tmp/yes.txt MapPartitionsRDD[1] at textFile at <console>:21) 17/02/13 11:32:46 INFO TaskSchedulerImpl: Adding task set 0.0 with 4 tasks 17/02/13 11:32:46 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0,ANY, 2149 bytes) 17/02/13 11:32:46 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) 17/02/13 11:32:46 INFO HadoopRDD: Input split: hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:0+134217728 17/02/13 11:32:46 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id 17/02/13 11:32:46 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id 17/02/13 11:32:46 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap 17/02/13 11:32:46 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition 17/02/13 11:32:46 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id 17/02/13 11:32:55 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2082 bytes result sent to driver 17/02/13 11:32:55 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, partition 1,ANY, 2149 bytes) 17/02/13 11:32:55 INFO Executor: Running task 1.0 in stage 0.0 (TID 1) 17/02/13 11:32:55 INFO HadoopRDD: Input split: hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:134217728+134217728 17/02/13 11:32:55 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 8857 ms on localhost (1/4) 17/02/13 11:33:02 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 2137 bytes result sent to driver 17/02/13 11:33:02 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, localhost, partition 2,ANY, 2149 bytes) 17/02/13 11:33:02 INFO Executor: Running task 2.0 in stage 0.0 (TID 2) 17/02/13 11:33:02 INFO HadoopRDD: Input split: hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:268435456+134217728 17/02/13 11:33:02 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 7928 ms on localhost (2/4) 17/02/13 11:33:12 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 2137 bytes result sent to driver 17/02/13 11:33:12 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3, localhost, partition 3,ANY, 2149 bytes) 17/02/13 11:33:12 INFO Executor: Running task 3.0 in stage 0.0 (TID 3) 17/02/13 11:33:12 INFO HadoopRDD: Input split: hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:402653184+113426432 17/02/13 11:33:12 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 9517 ms on localhost (3/4) 17/02/13 11:33:18 INFO Executor: Finished task 3.0 in stage 0.0 (TID 3). 2137 bytes result sent to driver 17/02/13 11:33:18 INFO DAGScheduler: ResultStage 0 (count at <console>:24) finished in 32.724 s 17/02/13 11:33:18 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3) in 6443 ms on localhost (4/4) 17/02/13 11:33:18 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 17/02/13 11:33:18 INFO DAGScheduler: Job 0 finished: count at <console>:24, took 32.929721 s {code} res0: Long = 258039808 Also Input Size metrics in Spark UI is wrong when running pyspark, it says 64.0 KB (hadoop), however, when running in spark-shell it will show correct info 128.1 MB (hadoop). was: Simple job in pyspark takes 14 minutes to complete {code} var a = sc.textFile("/tmp/yes.txt") a.count() {code} Same code took 33 sec in spark-shell Reproduce steps: Run this to generate big file (press Ctrl+C after 5-6 seconds) [spark@c6401 ~]$ yes > /tmp/yes.txt [spark@c6401 ~]$ ll /tmp/ -rw-r--r-- 1 spark hadoop 516079616 Feb 13 11:10 yes.txt [spark@c6401 ~]$ hadoop fs -copyFromLocal /tmp/yes.txt /tmp/ [spark@c6401 ~]$ pyspark {code} Python 2.6.6 (r266:84292, Feb 22 2013, 00:00:18) [GCC 4.4.7 20120313 (Red Hat 4.4.7-3)] on linux2 17/02/13 11:12:36 INFO SparkContext: Running Spark version 1.6.2 {code} >>> a = sc.textFile("/tmp/yes.txt") {code} 17/02/13 11:12:58 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 341.1 KB, free 341.1 KB) 17/02/13 11:12:58 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 28.3 KB, free 369.4 KB) 17/02/13 11:12:58 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:43389 (size: 28.3 KB, free: 517.4 MB) 17/02/13 11:12:58 INFO SparkContext: Created broadcast 0 from textFile at NativeMethodAccessorImpl.java:-2 {code} >>> a.count() {code} 17/02/13 11:13:03 INFO FileInputFormat: Total input paths to process : 1 17/02/13 11:13:03 INFO SparkContext: Starting job: count at <stdin>:1 17/02/13 11:13:03 INFO DAGScheduler: Got job 0 (count at <stdin>:1) with 4 output partitions 17/02/13 11:13:03 INFO DAGScheduler: Final stage: ResultStage 0 (count at <stdin>:1) 17/02/13 11:13:03 INFO DAGScheduler: Parents of final stage: List() 17/02/13 11:13:03 INFO DAGScheduler: Missing parents: List() 17/02/13 11:13:03 INFO DAGScheduler: Submitting ResultStage 0 (PythonRDD[2] at count at <stdin>:1), which has no missing parents 17/02/13 11:13:03 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 5.7 KB, free 375.1 KB) 17/02/13 11:13:03 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 3.5 KB, free 378.6 KB) 17/02/13 11:13:03 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:43389 (size: 3.5 KB, free: 517.4 MB) 17/02/13 11:13:03 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1008 17/02/13 11:13:03 INFO DAGScheduler: Submitting 4 missing tasks from ResultStage 0 (PythonRDD[2] at count at <stdin>:1) 17/02/13 11:13:03 INFO TaskSchedulerImpl: Adding task set 0.0 with 4 tasks 17/02/13 11:13:03 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0,ANY, 2149 bytes) 17/02/13 11:13:03 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) 17/02/13 11:13:03 INFO HadoopRDD: Input split: hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:0+134217728 17/02/13 11:13:03 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id 17/02/13 11:13:03 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id 17/02/13 11:13:03 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap 17/02/13 11:13:03 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition 17/02/13 11:13:03 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id 17/02/13 11:16:37 INFO PythonRunner: Times: total = 213335, boot = 317, init = 445, finish = 212573 17/02/13 11:16:37 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2182 bytes result sent to driver 17/02/13 11:16:37 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, partition 1,ANY, 2149 bytes) 17/02/13 11:16:37 INFO Executor: Running task 1.0 in stage 0.0 (TID 1) 17/02/13 11:16:37 INFO HadoopRDD: Input split: hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:134217728+134217728 17/02/13 11:16:37 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 213605 ms on localhost (1/4) 17/02/13 11:20:05 INFO PythonRunner: Times: total = 208227, boot = -81, init = 122, finish = 208186 17/02/13 11:20:05 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 2182 bytes result sent to driver 17/02/13 11:20:05 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, localhost, partition 2,ANY, 2149 bytes) 17/02/13 11:20:05 INFO Executor: Running task 2.0 in stage 0.0 (TID 2) 17/02/13 11:20:05 INFO HadoopRDD: Input split: hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:268435456+134217728 17/02/13 11:20:05 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 208302 ms on localhost (2/4) 17/02/13 11:23:37 INFO PythonRunner: Times: total = 212021, boot = -27, init = 45, finish = 212003 17/02/13 11:23:37 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 2182 bytes result sent to driver 17/02/13 11:23:37 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3, localhost, partition 3,ANY, 2149 bytes) 17/02/13 11:23:37 INFO Executor: Running task 3.0 in stage 0.0 (TID 3) 17/02/13 11:23:37 INFO HadoopRDD: Input split: hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:402653184+113426432 17/02/13 11:23:37 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 212072 ms on localhost (3/4) 17/02/13 11:26:35 INFO PythonRunner: Times: total = 177879, boot = -4, init = 9, finish = 177874 17/02/13 11:26:35 INFO Executor: Finished task 3.0 in stage 0.0 (TID 3). 2182 bytes result sent to driver 17/02/13 11:26:35 INFO DAGScheduler: ResultStage 0 (count at <stdin>:1) finished in 811.885 s 17/02/13 11:26:35 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3) in 177937 ms on localhost (4/4) 17/02/13 11:26:35 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 17/02/13 11:26:35 INFO DAGScheduler: Job 0 finished: count at <stdin>:1, took 812.147354 s {code} 258039808 [spark@c6401 ~]$ spark-shell scala> var a = sc.textFile("/tmp/yes.txt") {code} 17/02/13 11:32:26 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 341.1 KB, free 341.1 KB) 17/02/13 11:32:26 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 28.3 KB, free 369.4 KB) 17/02/13 11:32:26 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:44733 (size: 28.3 KB, free: 517.4 MB) 17/02/13 11:32:26 INFO SparkContext: Created broadcast 0 from textFile at <console>:21 a: org.apache.spark.rdd.RDD[String] = /tmp/yes.txt MapPartitionsRDD[1] at textFile at <console>:21 {code} scala> a.count() {code} 17/02/13 11:32:45 INFO FileInputFormat: Total input paths to process : 1 17/02/13 11:32:46 INFO SparkContext: Starting job: count at <console>:24 17/02/13 11:32:46 INFO DAGScheduler: Got job 0 (count at <console>:24) with 4 output partitions 17/02/13 11:32:46 INFO DAGScheduler: Final stage: ResultStage 0 (count at <console>:24) 17/02/13 11:32:46 INFO DAGScheduler: Parents of final stage: List() 17/02/13 11:32:46 INFO DAGScheduler: Missing parents: List() 17/02/13 11:32:46 INFO DAGScheduler: Submitting ResultStage 0 (/tmp/yes.txt MapPartitionsRDD[1] at textFile at <console>:21), which has no missing parents 17/02/13 11:32:46 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 3.0 KB, free 372.4 KB) 17/02/13 11:32:46 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 1801.0 B, free 374.1 KB) 17/02/13 11:32:46 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:44733 (size: 1801.0 B, free: 517.4 MB) 17/02/13 11:32:46 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1008 17/02/13 11:32:46 INFO DAGScheduler: Submitting 4 missing tasks from ResultStage 0 (/tmp/yes.txt MapPartitionsRDD[1] at textFile at <console>:21) 17/02/13 11:32:46 INFO TaskSchedulerImpl: Adding task set 0.0 with 4 tasks 17/02/13 11:32:46 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0,ANY, 2149 bytes) 17/02/13 11:32:46 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) 17/02/13 11:32:46 INFO HadoopRDD: Input split: hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:0+134217728 17/02/13 11:32:46 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id 17/02/13 11:32:46 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id 17/02/13 11:32:46 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap 17/02/13 11:32:46 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition 17/02/13 11:32:46 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id 17/02/13 11:32:55 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2082 bytes result sent to driver 17/02/13 11:32:55 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, partition 1,ANY, 2149 bytes) 17/02/13 11:32:55 INFO Executor: Running task 1.0 in stage 0.0 (TID 1) 17/02/13 11:32:55 INFO HadoopRDD: Input split: hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:134217728+134217728 17/02/13 11:32:55 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 8857 ms on localhost (1/4) 17/02/13 11:33:02 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 2137 bytes result sent to driver 17/02/13 11:33:02 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, localhost, partition 2,ANY, 2149 bytes) 17/02/13 11:33:02 INFO Executor: Running task 2.0 in stage 0.0 (TID 2) 17/02/13 11:33:02 INFO HadoopRDD: Input split: hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:268435456+134217728 17/02/13 11:33:02 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 7928 ms on localhost (2/4) 17/02/13 11:33:12 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 2137 bytes result sent to driver 17/02/13 11:33:12 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3, localhost, partition 3,ANY, 2149 bytes) 17/02/13 11:33:12 INFO Executor: Running task 3.0 in stage 0.0 (TID 3) 17/02/13 11:33:12 INFO HadoopRDD: Input split: hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:402653184+113426432 17/02/13 11:33:12 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 9517 ms on localhost (3/4) 17/02/13 11:33:18 INFO Executor: Finished task 3.0 in stage 0.0 (TID 3). 2137 bytes result sent to driver 17/02/13 11:33:18 INFO DAGScheduler: ResultStage 0 (count at <console>:24) finished in 32.724 s 17/02/13 11:33:18 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3) in 6443 ms on localhost (4/4) 17/02/13 11:33:18 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 17/02/13 11:33:18 INFO DAGScheduler: Job 0 finished: count at <console>:24, took 32.929721 s {code} res0: Long = 258039808 Also Input Size metrics in Spark UI is wrong when running pyspark, it says 64.0 KB (hadoop), however, when running in spark-shell it will show correct info 128.1 MB (hadoop). > Poor pyspark performance + incorrect UI input-size metrics > ---------------------------------------------------------- > > Key: SPARK-19578 > URL: https://issues.apache.org/jira/browse/SPARK-19578 > Project: Spark > Issue Type: Bug > Components: PySpark, Web UI > Affects Versions: 1.6.1, 1.6.2, 2.0.1 > Environment: Spark 1.6.2 Hortonworks > Spark 2.0.1 MapR > Spark 1.6.1 MapR > Reporter: Artur Sukhenko > Attachments: pyspark_incorrect_inputsize.png, reproduce_log, > spark_shell_correct_inputsize.png > > > Simple job in pyspark takes 14 minutes to complete. > The text file used to reproduce contains multiple millions lines of one word > "yes" > (it might be the cause of poor performance) > {code} > var a = sc.textFile("/tmp/yes.txt") > a.count() > {code} > Same code took 33 sec in spark-shell > Reproduce steps: > Run this to generate big file (press Ctrl+C after 5-6 seconds) > [spark@c6401 ~]$ yes > /tmp/yes.txt > [spark@c6401 ~]$ ll /tmp/ > -rw-r--r-- 1 spark hadoop 516079616 Feb 13 11:10 yes.txt > [spark@c6401 ~]$ hadoop fs -copyFromLocal /tmp/yes.txt /tmp/ > [spark@c6401 ~]$ pyspark > {code} > Python 2.6.6 (r266:84292, Feb 22 2013, 00:00:18) > [GCC 4.4.7 20120313 (Red Hat 4.4.7-3)] on linux2 > 17/02/13 11:12:36 INFO SparkContext: Running Spark version 1.6.2 > {code} > >>> a = sc.textFile("/tmp/yes.txt") > {code} > 17/02/13 11:12:58 INFO MemoryStore: Block broadcast_0 stored as values in > memory (estimated size 341.1 KB, free 341.1 KB) > 17/02/13 11:12:58 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes > in memory (estimated size 28.3 KB, free 369.4 KB) > 17/02/13 11:12:58 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory > on localhost:43389 (size: 28.3 KB, free: 517.4 MB) > 17/02/13 11:12:58 INFO SparkContext: Created broadcast 0 from textFile at > NativeMethodAccessorImpl.java:-2 > {code} > >>> a.count() > {code} > 17/02/13 11:13:03 INFO FileInputFormat: Total input paths to process : 1 > 17/02/13 11:13:03 INFO SparkContext: Starting job: count at <stdin>:1 > 17/02/13 11:13:03 INFO DAGScheduler: Got job 0 (count at <stdin>:1) with 4 > output partitions > 17/02/13 11:13:03 INFO DAGScheduler: Final stage: ResultStage 0 (count at > <stdin>:1) > 17/02/13 11:13:03 INFO DAGScheduler: Parents of final stage: List() > 17/02/13 11:13:03 INFO DAGScheduler: Missing parents: List() > 17/02/13 11:13:03 INFO DAGScheduler: Submitting ResultStage 0 (PythonRDD[2] > at count at <stdin>:1), which has no missing parents > 17/02/13 11:13:03 INFO MemoryStore: Block broadcast_1 stored as values in > memory (estimated size 5.7 KB, free 375.1 KB) > 17/02/13 11:13:03 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes > in memory (estimated size 3.5 KB, free 378.6 KB) > 17/02/13 11:13:03 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory > on localhost:43389 (size: 3.5 KB, free: 517.4 MB) > 17/02/13 11:13:03 INFO SparkContext: Created broadcast 1 from broadcast at > DAGScheduler.scala:1008 > 17/02/13 11:13:03 INFO DAGScheduler: Submitting 4 missing tasks from > ResultStage 0 (PythonRDD[2] at count at <stdin>:1) > 17/02/13 11:13:03 INFO TaskSchedulerImpl: Adding task set 0.0 with 4 tasks > 17/02/13 11:13:03 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, > localhost, partition 0,ANY, 2149 bytes) > 17/02/13 11:13:03 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) > 17/02/13 11:13:03 INFO HadoopRDD: Input split: > hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:0+134217728 > 17/02/13 11:13:03 INFO deprecation: mapred.tip.id is deprecated. Instead, use > mapreduce.task.id > 17/02/13 11:13:03 INFO deprecation: mapred.task.id is deprecated. Instead, > use mapreduce.task.attempt.id > 17/02/13 11:13:03 INFO deprecation: mapred.task.is.map is deprecated. > Instead, use mapreduce.task.ismap > 17/02/13 11:13:03 INFO deprecation: mapred.task.partition is deprecated. > Instead, use mapreduce.task.partition > 17/02/13 11:13:03 INFO deprecation: mapred.job.id is deprecated. Instead, use > mapreduce.job.id > 17/02/13 11:16:37 INFO PythonRunner: Times: total = 213335, boot = 317, init > = 445, finish = 212573 > 17/02/13 11:16:37 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2182 > bytes result sent to driver > 17/02/13 11:16:37 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, > localhost, partition 1,ANY, 2149 bytes) > 17/02/13 11:16:37 INFO Executor: Running task 1.0 in stage 0.0 (TID 1) > 17/02/13 11:16:37 INFO HadoopRDD: Input split: > hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:134217728+134217728 > 17/02/13 11:16:37 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) > in 213605 ms on localhost (1/4) > 17/02/13 11:20:05 INFO PythonRunner: Times: total = 208227, boot = -81, init > = 122, finish = 208186 > 17/02/13 11:20:05 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 2182 > bytes result sent to driver > 17/02/13 11:20:05 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, > localhost, partition 2,ANY, 2149 bytes) > 17/02/13 11:20:05 INFO Executor: Running task 2.0 in stage 0.0 (TID 2) > 17/02/13 11:20:05 INFO HadoopRDD: Input split: > hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:268435456+134217728 > 17/02/13 11:20:05 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) > in 208302 ms on localhost (2/4) > 17/02/13 11:23:37 INFO PythonRunner: Times: total = 212021, boot = -27, init > = 45, finish = 212003 > 17/02/13 11:23:37 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 2182 > bytes result sent to driver > 17/02/13 11:23:37 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3, > localhost, partition 3,ANY, 2149 bytes) > 17/02/13 11:23:37 INFO Executor: Running task 3.0 in stage 0.0 (TID 3) > 17/02/13 11:23:37 INFO HadoopRDD: Input split: > hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:402653184+113426432 > 17/02/13 11:23:37 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) > in 212072 ms on localhost (3/4) > 17/02/13 11:26:35 INFO PythonRunner: Times: total = 177879, boot = -4, init = > 9, finish = 177874 > 17/02/13 11:26:35 INFO Executor: Finished task 3.0 in stage 0.0 (TID 3). 2182 > bytes result sent to driver > 17/02/13 11:26:35 INFO DAGScheduler: ResultStage 0 (count at <stdin>:1) > finished in 811.885 s > 17/02/13 11:26:35 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3) > in 177937 ms on localhost (4/4) > 17/02/13 11:26:35 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks > have all completed, from pool > 17/02/13 11:26:35 INFO DAGScheduler: Job 0 finished: count at <stdin>:1, took > 812.147354 s > {code} > 258039808 > [spark@c6401 ~]$ spark-shell > scala> var a = sc.textFile("/tmp/yes.txt") > {code} > 17/02/13 11:32:26 INFO MemoryStore: Block broadcast_0 stored as values in > memory (estimated size 341.1 KB, free 341.1 KB) > 17/02/13 11:32:26 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes > in memory (estimated size 28.3 KB, free 369.4 KB) > 17/02/13 11:32:26 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory > on localhost:44733 (size: 28.3 KB, free: 517.4 MB) > 17/02/13 11:32:26 INFO SparkContext: Created broadcast 0 from textFile at > <console>:21 > a: org.apache.spark.rdd.RDD[String] = /tmp/yes.txt MapPartitionsRDD[1] at > textFile at <console>:21 > {code} > scala> a.count() > {code} > 17/02/13 11:32:45 INFO FileInputFormat: Total input paths to process : 1 > 17/02/13 11:32:46 INFO SparkContext: Starting job: count at <console>:24 > 17/02/13 11:32:46 INFO DAGScheduler: Got job 0 (count at <console>:24) with 4 > output partitions > 17/02/13 11:32:46 INFO DAGScheduler: Final stage: ResultStage 0 (count at > <console>:24) > 17/02/13 11:32:46 INFO DAGScheduler: Parents of final stage: List() > 17/02/13 11:32:46 INFO DAGScheduler: Missing parents: List() > 17/02/13 11:32:46 INFO DAGScheduler: Submitting ResultStage 0 (/tmp/yes.txt > MapPartitionsRDD[1] at textFile at <console>:21), which has no missing parents > 17/02/13 11:32:46 INFO MemoryStore: Block broadcast_1 stored as values in > memory (estimated size 3.0 KB, free 372.4 KB) > 17/02/13 11:32:46 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes > in memory (estimated size 1801.0 B, free 374.1 KB) > 17/02/13 11:32:46 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory > on localhost:44733 (size: 1801.0 B, free: 517.4 MB) > 17/02/13 11:32:46 INFO SparkContext: Created broadcast 1 from broadcast at > DAGScheduler.scala:1008 > 17/02/13 11:32:46 INFO DAGScheduler: Submitting 4 missing tasks from > ResultStage 0 (/tmp/yes.txt MapPartitionsRDD[1] at textFile at <console>:21) > 17/02/13 11:32:46 INFO TaskSchedulerImpl: Adding task set 0.0 with 4 tasks > 17/02/13 11:32:46 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, > localhost, partition 0,ANY, 2149 bytes) > 17/02/13 11:32:46 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) > 17/02/13 11:32:46 INFO HadoopRDD: Input split: > hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:0+134217728 > 17/02/13 11:32:46 INFO deprecation: mapred.tip.id is deprecated. Instead, use > mapreduce.task.id > 17/02/13 11:32:46 INFO deprecation: mapred.task.id is deprecated. Instead, > use mapreduce.task.attempt.id > 17/02/13 11:32:46 INFO deprecation: mapred.task.is.map is deprecated. > Instead, use mapreduce.task.ismap > 17/02/13 11:32:46 INFO deprecation: mapred.task.partition is deprecated. > Instead, use mapreduce.task.partition > 17/02/13 11:32:46 INFO deprecation: mapred.job.id is deprecated. Instead, use > mapreduce.job.id > 17/02/13 11:32:55 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2082 > bytes result sent to driver > 17/02/13 11:32:55 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, > localhost, partition 1,ANY, 2149 bytes) > 17/02/13 11:32:55 INFO Executor: Running task 1.0 in stage 0.0 (TID 1) > 17/02/13 11:32:55 INFO HadoopRDD: Input split: > hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:134217728+134217728 > 17/02/13 11:32:55 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) > in 8857 ms on localhost (1/4) > 17/02/13 11:33:02 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 2137 > bytes result sent to driver > 17/02/13 11:33:02 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, > localhost, partition 2,ANY, 2149 bytes) > 17/02/13 11:33:02 INFO Executor: Running task 2.0 in stage 0.0 (TID 2) > 17/02/13 11:33:02 INFO HadoopRDD: Input split: > hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:268435456+134217728 > 17/02/13 11:33:02 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) > in 7928 ms on localhost (2/4) > 17/02/13 11:33:12 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 2137 > bytes result sent to driver > 17/02/13 11:33:12 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3, > localhost, partition 3,ANY, 2149 bytes) > 17/02/13 11:33:12 INFO Executor: Running task 3.0 in stage 0.0 (TID 3) > 17/02/13 11:33:12 INFO HadoopRDD: Input split: > hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:402653184+113426432 > 17/02/13 11:33:12 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) > in 9517 ms on localhost (3/4) > 17/02/13 11:33:18 INFO Executor: Finished task 3.0 in stage 0.0 (TID 3). 2137 > bytes result sent to driver > 17/02/13 11:33:18 INFO DAGScheduler: ResultStage 0 (count at <console>:24) > finished in 32.724 s > 17/02/13 11:33:18 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3) > in 6443 ms on localhost (4/4) > 17/02/13 11:33:18 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks > have all completed, from pool > 17/02/13 11:33:18 INFO DAGScheduler: Job 0 finished: count at <console>:24, > took 32.929721 s > {code} > res0: Long = 258039808 > Also Input Size metrics in Spark UI is wrong when running pyspark, it says > 64.0 KB (hadoop), however, when running in spark-shell it will show correct > info 128.1 MB (hadoop). > -- This message was sent by Atlassian JIRA (v6.3.15#6346) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org