[ 
https://issues.apache.org/jira/browse/SPARK-19578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15890930#comment-15890930
 ] 

Nicholas Chammas commented on SPARK-19578:
------------------------------------------

Makes sense to me. I suppose the Apache Arrow integration work that is 
currently ongoing (SPARK-13534) will not help RDD.count() since that will only 
benefit DataFrames. (Granted, in this specific example you can always read the 
file using spark.read.csv() or spark.read.text() which will avoid this problem.)

So it sounds like the "poor PySpark performance" part of this issue is 
"Won't/Can't fix" at this time. The incorrect UI input-size metrics sounds like 
a separate issue that should be split out. 

> Poor pyspark performance + incorrect UI input-size metrics
> ----------------------------------------------------------
>
>                 Key: SPARK-19578
>                 URL: https://issues.apache.org/jira/browse/SPARK-19578
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark, Web UI
>    Affects Versions: 1.6.1, 1.6.2, 2.0.1
>         Environment: Spark 1.6.2 Hortonworks
> Spark 2.0.1 MapR
> Spark 1.6.1 MapR
>            Reporter: Artur Sukhenko
>         Attachments: pyspark_incorrect_inputsize.png, reproduce_log, 
> spark_shell_correct_inputsize.png
>
>
> Simple job in pyspark takes 14 minutes to complete.
> The text file used to reproduce contains multiple millions lines of one word 
> "yes"
>  (it might be the cause of poor performance)
> {code}
> var a = sc.textFile("/tmp/yes.txt")
> a.count()
> {code}
> Same code took 33 sec in spark-shell
> Reproduce steps:
> Run this  to generate big file (press Ctrl+C after 5-6 seconds)
> [spark@c6401 ~]$ yes > /tmp/yes.txt
> [spark@c6401 ~]$ ll /tmp/
> -rw-r--r--  1 spark     hadoop  516079616 Feb 13 11:10 yes.txt
> [spark@c6401 ~]$ hadoop fs -copyFromLocal /tmp/yes.txt /tmp/
> [spark@c6401 ~]$ pyspark
> {code}
> Python 2.6.6 (r266:84292, Feb 22 2013, 00:00:18) 
> [GCC 4.4.7 20120313 (Red Hat 4.4.7-3)] on linux2
> 17/02/13 11:12:36 INFO SparkContext: Running Spark version 1.6.2
> {code}
> >>> a = sc.textFile("/tmp/yes.txt")
> {code}
> 17/02/13 11:12:58 INFO MemoryStore: Block broadcast_0 stored as values in 
> memory (estimated size 341.1 KB, free 341.1 KB)
> 17/02/13 11:12:58 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes 
> in memory (estimated size 28.3 KB, free 369.4 KB)
> 17/02/13 11:12:58 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory 
> on localhost:43389 (size: 28.3 KB, free: 517.4 MB)
> 17/02/13 11:12:58 INFO SparkContext: Created broadcast 0 from textFile at 
> NativeMethodAccessorImpl.java:-2
> {code}
> >>> a.count()
> {code}
> 17/02/13 11:13:03 INFO FileInputFormat: Total input paths to process : 1
> 17/02/13 11:13:03 INFO SparkContext: Starting job: count at <stdin>:1
> 17/02/13 11:13:03 INFO DAGScheduler: Got job 0 (count at <stdin>:1) with 4 
> output partitions
> 17/02/13 11:13:03 INFO DAGScheduler: Final stage: ResultStage 0 (count at 
> <stdin>:1)
> 17/02/13 11:13:03 INFO DAGScheduler: Parents of final stage: List()
> 17/02/13 11:13:03 INFO DAGScheduler: Missing parents: List()
> 17/02/13 11:13:03 INFO DAGScheduler: Submitting ResultStage 0 (PythonRDD[2] 
> at count at <stdin>:1), which has no missing parents
> 17/02/13 11:13:03 INFO MemoryStore: Block broadcast_1 stored as values in 
> memory (estimated size 5.7 KB, free 375.1 KB)
> 17/02/13 11:13:03 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes 
> in memory (estimated size 3.5 KB, free 378.6 KB)
> 17/02/13 11:13:03 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory 
> on localhost:43389 (size: 3.5 KB, free: 517.4 MB)
> 17/02/13 11:13:03 INFO SparkContext: Created broadcast 1 from broadcast at 
> DAGScheduler.scala:1008
> 17/02/13 11:13:03 INFO DAGScheduler: Submitting 4 missing tasks from 
> ResultStage 0 (PythonRDD[2] at count at <stdin>:1)
> 17/02/13 11:13:03 INFO TaskSchedulerImpl: Adding task set 0.0 with 4 tasks
> 17/02/13 11:13:03 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 
> localhost, partition 0,ANY, 2149 bytes)
> 17/02/13 11:13:03 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
> 17/02/13 11:13:03 INFO HadoopRDD: Input split: 
> hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:0+134217728
> 17/02/13 11:13:03 INFO deprecation: mapred.tip.id is deprecated. Instead, use 
> mapreduce.task.id
> 17/02/13 11:13:03 INFO deprecation: mapred.task.id is deprecated. Instead, 
> use mapreduce.task.attempt.id
> 17/02/13 11:13:03 INFO deprecation: mapred.task.is.map is deprecated. 
> Instead, use mapreduce.task.ismap
> 17/02/13 11:13:03 INFO deprecation: mapred.task.partition is deprecated. 
> Instead, use mapreduce.task.partition
> 17/02/13 11:13:03 INFO deprecation: mapred.job.id is deprecated. Instead, use 
> mapreduce.job.id
> 17/02/13 11:16:37 INFO PythonRunner: Times: total = 213335, boot = 317, init 
> = 445, finish = 212573
> 17/02/13 11:16:37 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2182 
> bytes result sent to driver
> 17/02/13 11:16:37 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, 
> localhost, partition 1,ANY, 2149 bytes)
> 17/02/13 11:16:37 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
> 17/02/13 11:16:37 INFO HadoopRDD: Input split: 
> hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:134217728+134217728
> 17/02/13 11:16:37 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) 
> in 213605 ms on localhost (1/4)
> 17/02/13 11:20:05 INFO PythonRunner: Times: total = 208227, boot = -81, init 
> = 122, finish = 208186
> 17/02/13 11:20:05 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 2182 
> bytes result sent to driver
> 17/02/13 11:20:05 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, 
> localhost, partition 2,ANY, 2149 bytes)
> 17/02/13 11:20:05 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)
> 17/02/13 11:20:05 INFO HadoopRDD: Input split: 
> hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:268435456+134217728
> 17/02/13 11:20:05 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) 
> in 208302 ms on localhost (2/4)
> 17/02/13 11:23:37 INFO PythonRunner: Times: total = 212021, boot = -27, init 
> = 45, finish = 212003
> 17/02/13 11:23:37 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 2182 
> bytes result sent to driver
> 17/02/13 11:23:37 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3, 
> localhost, partition 3,ANY, 2149 bytes)
> 17/02/13 11:23:37 INFO Executor: Running task 3.0 in stage 0.0 (TID 3)
> 17/02/13 11:23:37 INFO HadoopRDD: Input split: 
> hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:402653184+113426432
> 17/02/13 11:23:37 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) 
> in 212072 ms on localhost (3/4)
> 17/02/13 11:26:35 INFO PythonRunner: Times: total = 177879, boot = -4, init = 
> 9, finish = 177874
> 17/02/13 11:26:35 INFO Executor: Finished task 3.0 in stage 0.0 (TID 3). 2182 
> bytes result sent to driver
> 17/02/13 11:26:35 INFO DAGScheduler: ResultStage 0 (count at <stdin>:1) 
> finished in 811.885 s
> 17/02/13 11:26:35 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3) 
> in 177937 ms on localhost (4/4)
> 17/02/13 11:26:35 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks 
> have all completed, from pool 
> 17/02/13 11:26:35 INFO DAGScheduler: Job 0 finished: count at <stdin>:1, took 
> 812.147354 s
> {code}
> 258039808
> [spark@c6401 ~]$ spark-shell 
> scala> var a = sc.textFile("/tmp/yes.txt")
> {code}
> 17/02/13 11:32:26 INFO MemoryStore: Block broadcast_0 stored as values in 
> memory (estimated size 341.1 KB, free 341.1 KB)
> 17/02/13 11:32:26 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes 
> in memory (estimated size 28.3 KB, free 369.4 KB)
> 17/02/13 11:32:26 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory 
> on localhost:44733 (size: 28.3 KB, free: 517.4 MB)
> 17/02/13 11:32:26 INFO SparkContext: Created broadcast 0 from textFile at 
> <console>:21
> a: org.apache.spark.rdd.RDD[String] = /tmp/yes.txt MapPartitionsRDD[1] at 
> textFile at <console>:21
> {code}
> scala> a.count()
> {code}
> 17/02/13 11:32:45 INFO FileInputFormat: Total input paths to process : 1
> 17/02/13 11:32:46 INFO SparkContext: Starting job: count at <console>:24
> 17/02/13 11:32:46 INFO DAGScheduler: Got job 0 (count at <console>:24) with 4 
> output partitions
> 17/02/13 11:32:46 INFO DAGScheduler: Final stage: ResultStage 0 (count at 
> <console>:24)
> 17/02/13 11:32:46 INFO DAGScheduler: Parents of final stage: List()
> 17/02/13 11:32:46 INFO DAGScheduler: Missing parents: List()
> 17/02/13 11:32:46 INFO DAGScheduler: Submitting ResultStage 0 (/tmp/yes.txt 
> MapPartitionsRDD[1] at textFile at <console>:21), which has no missing parents
> 17/02/13 11:32:46 INFO MemoryStore: Block broadcast_1 stored as values in 
> memory (estimated size 3.0 KB, free 372.4 KB)
> 17/02/13 11:32:46 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes 
> in memory (estimated size 1801.0 B, free 374.1 KB)
> 17/02/13 11:32:46 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory 
> on localhost:44733 (size: 1801.0 B, free: 517.4 MB)
> 17/02/13 11:32:46 INFO SparkContext: Created broadcast 1 from broadcast at 
> DAGScheduler.scala:1008
> 17/02/13 11:32:46 INFO DAGScheduler: Submitting 4 missing tasks from 
> ResultStage 0 (/tmp/yes.txt MapPartitionsRDD[1] at textFile at <console>:21)
> 17/02/13 11:32:46 INFO TaskSchedulerImpl: Adding task set 0.0 with 4 tasks
> 17/02/13 11:32:46 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 
> localhost, partition 0,ANY, 2149 bytes)
> 17/02/13 11:32:46 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
> 17/02/13 11:32:46 INFO HadoopRDD: Input split: 
> hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:0+134217728
> 17/02/13 11:32:46 INFO deprecation: mapred.tip.id is deprecated. Instead, use 
> mapreduce.task.id
> 17/02/13 11:32:46 INFO deprecation: mapred.task.id is deprecated. Instead, 
> use mapreduce.task.attempt.id
> 17/02/13 11:32:46 INFO deprecation: mapred.task.is.map is deprecated. 
> Instead, use mapreduce.task.ismap
> 17/02/13 11:32:46 INFO deprecation: mapred.task.partition is deprecated. 
> Instead, use mapreduce.task.partition
> 17/02/13 11:32:46 INFO deprecation: mapred.job.id is deprecated. Instead, use 
> mapreduce.job.id
> 17/02/13 11:32:55 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2082 
> bytes result sent to driver
> 17/02/13 11:32:55 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, 
> localhost, partition 1,ANY, 2149 bytes)
> 17/02/13 11:32:55 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
> 17/02/13 11:32:55 INFO HadoopRDD: Input split: 
> hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:134217728+134217728
> 17/02/13 11:32:55 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) 
> in 8857 ms on localhost (1/4)
> 17/02/13 11:33:02 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 2137 
> bytes result sent to driver
> 17/02/13 11:33:02 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, 
> localhost, partition 2,ANY, 2149 bytes)
> 17/02/13 11:33:02 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)
> 17/02/13 11:33:02 INFO HadoopRDD: Input split: 
> hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:268435456+134217728
> 17/02/13 11:33:02 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) 
> in 7928 ms on localhost (2/4)
> 17/02/13 11:33:12 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 2137 
> bytes result sent to driver
> 17/02/13 11:33:12 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3, 
> localhost, partition 3,ANY, 2149 bytes)
> 17/02/13 11:33:12 INFO Executor: Running task 3.0 in stage 0.0 (TID 3)
> 17/02/13 11:33:12 INFO HadoopRDD: Input split: 
> hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:402653184+113426432
> 17/02/13 11:33:12 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) 
> in 9517 ms on localhost (3/4)
> 17/02/13 11:33:18 INFO Executor: Finished task 3.0 in stage 0.0 (TID 3). 2137 
> bytes result sent to driver
> 17/02/13 11:33:18 INFO DAGScheduler: ResultStage 0 (count at <console>:24) 
> finished in 32.724 s
> 17/02/13 11:33:18 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3) 
> in 6443 ms on localhost (4/4)
> 17/02/13 11:33:18 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks 
> have all completed, from pool 
> 17/02/13 11:33:18 INFO DAGScheduler: Job 0 finished: count at <console>:24, 
> took 32.929721 s
> {code}
> res0: Long = 258039808
> Also Input Size metrics in Spark UI is wrong when running pyspark, it says 
> 64.0 KB (hadoop), however, when running in spark-shell it will show correct 
> info 128.1 MB (hadoop).
>  



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to