[ 
https://issues.apache.org/jira/browse/SPARK-19578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15890846#comment-15890846
 ] 

holdenk commented on SPARK-19578:
---------------------------------

[~nchammas] It's an interesting idea but I don't think it would work and it 
wouldn't help as much as you would expect.
So the first part as to why it won't work in the general case: we use batch 
serialization inside of Python frequently, so one JRDD object could be a 
different number of Python RDD objects.

The not helping as much as you expect, even if we keep the count part in the 
JVM (assuming we didn't use batch serialization or something else), we would 
still need to roundtrip the data through Python to do any meaningful Python 
work. Computing the sum in Python isn't the expensive part - its copying the 
data from the JVM into Python, which leads nicely into our next problem.

Now as for skipping having Python being involved entirely, thats a bit more 
tricky. In the past people have suggested optimizing count in Scala in a 
similar mechanism (e.g. if we know the number of input records and we know we 
didn't drop or add any records we can just return the number of input records) 
- the problems with that are people depend on count to force evaluation of 
their RDD to do things like store in cache. This problem is further compounded 
in Python since to the Scala code it doesn't know if we've done a `map` (won't 
change number of records) or a `flatMap` (could change number of records) so 
implementing the optimization would be even more involved in Python (although 
possibly plumbing some of this data through to the Scala side would improve the 
UI and debugging experience - the more important part of it would break a lot 
of peoples work flow is a bigger issue).

Now in this very specific case (data loaded in the JVM and no Python 
transformations) we _could_ optimize it since our input data hasn't had any 
Python transformations applied to it - but I'm not super sure that is something 
worth doing.

This is only after my first cup of coffee, so if some of my explanation doesn't 
make sense let me know and I can drink more coffee and explain that part 
specifically.

> Poor pyspark performance + incorrect UI input-size metrics
> ----------------------------------------------------------
>
>                 Key: SPARK-19578
>                 URL: https://issues.apache.org/jira/browse/SPARK-19578
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark, Web UI
>    Affects Versions: 1.6.1, 1.6.2, 2.0.1
>         Environment: Spark 1.6.2 Hortonworks
> Spark 2.0.1 MapR
> Spark 1.6.1 MapR
>            Reporter: Artur Sukhenko
>         Attachments: pyspark_incorrect_inputsize.png, reproduce_log, 
> spark_shell_correct_inputsize.png
>
>
> Simple job in pyspark takes 14 minutes to complete.
> The text file used to reproduce contains multiple millions lines of one word 
> "yes"
>  (it might be the cause of poor performance)
> {code}
> var a = sc.textFile("/tmp/yes.txt")
> a.count()
> {code}
> Same code took 33 sec in spark-shell
> Reproduce steps:
> Run this  to generate big file (press Ctrl+C after 5-6 seconds)
> [spark@c6401 ~]$ yes > /tmp/yes.txt
> [spark@c6401 ~]$ ll /tmp/
> -rw-r--r--  1 spark     hadoop  516079616 Feb 13 11:10 yes.txt
> [spark@c6401 ~]$ hadoop fs -copyFromLocal /tmp/yes.txt /tmp/
> [spark@c6401 ~]$ pyspark
> {code}
> Python 2.6.6 (r266:84292, Feb 22 2013, 00:00:18) 
> [GCC 4.4.7 20120313 (Red Hat 4.4.7-3)] on linux2
> 17/02/13 11:12:36 INFO SparkContext: Running Spark version 1.6.2
> {code}
> >>> a = sc.textFile("/tmp/yes.txt")
> {code}
> 17/02/13 11:12:58 INFO MemoryStore: Block broadcast_0 stored as values in 
> memory (estimated size 341.1 KB, free 341.1 KB)
> 17/02/13 11:12:58 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes 
> in memory (estimated size 28.3 KB, free 369.4 KB)
> 17/02/13 11:12:58 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory 
> on localhost:43389 (size: 28.3 KB, free: 517.4 MB)
> 17/02/13 11:12:58 INFO SparkContext: Created broadcast 0 from textFile at 
> NativeMethodAccessorImpl.java:-2
> {code}
> >>> a.count()
> {code}
> 17/02/13 11:13:03 INFO FileInputFormat: Total input paths to process : 1
> 17/02/13 11:13:03 INFO SparkContext: Starting job: count at <stdin>:1
> 17/02/13 11:13:03 INFO DAGScheduler: Got job 0 (count at <stdin>:1) with 4 
> output partitions
> 17/02/13 11:13:03 INFO DAGScheduler: Final stage: ResultStage 0 (count at 
> <stdin>:1)
> 17/02/13 11:13:03 INFO DAGScheduler: Parents of final stage: List()
> 17/02/13 11:13:03 INFO DAGScheduler: Missing parents: List()
> 17/02/13 11:13:03 INFO DAGScheduler: Submitting ResultStage 0 (PythonRDD[2] 
> at count at <stdin>:1), which has no missing parents
> 17/02/13 11:13:03 INFO MemoryStore: Block broadcast_1 stored as values in 
> memory (estimated size 5.7 KB, free 375.1 KB)
> 17/02/13 11:13:03 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes 
> in memory (estimated size 3.5 KB, free 378.6 KB)
> 17/02/13 11:13:03 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory 
> on localhost:43389 (size: 3.5 KB, free: 517.4 MB)
> 17/02/13 11:13:03 INFO SparkContext: Created broadcast 1 from broadcast at 
> DAGScheduler.scala:1008
> 17/02/13 11:13:03 INFO DAGScheduler: Submitting 4 missing tasks from 
> ResultStage 0 (PythonRDD[2] at count at <stdin>:1)
> 17/02/13 11:13:03 INFO TaskSchedulerImpl: Adding task set 0.0 with 4 tasks
> 17/02/13 11:13:03 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 
> localhost, partition 0,ANY, 2149 bytes)
> 17/02/13 11:13:03 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
> 17/02/13 11:13:03 INFO HadoopRDD: Input split: 
> hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:0+134217728
> 17/02/13 11:13:03 INFO deprecation: mapred.tip.id is deprecated. Instead, use 
> mapreduce.task.id
> 17/02/13 11:13:03 INFO deprecation: mapred.task.id is deprecated. Instead, 
> use mapreduce.task.attempt.id
> 17/02/13 11:13:03 INFO deprecation: mapred.task.is.map is deprecated. 
> Instead, use mapreduce.task.ismap
> 17/02/13 11:13:03 INFO deprecation: mapred.task.partition is deprecated. 
> Instead, use mapreduce.task.partition
> 17/02/13 11:13:03 INFO deprecation: mapred.job.id is deprecated. Instead, use 
> mapreduce.job.id
> 17/02/13 11:16:37 INFO PythonRunner: Times: total = 213335, boot = 317, init 
> = 445, finish = 212573
> 17/02/13 11:16:37 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2182 
> bytes result sent to driver
> 17/02/13 11:16:37 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, 
> localhost, partition 1,ANY, 2149 bytes)
> 17/02/13 11:16:37 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
> 17/02/13 11:16:37 INFO HadoopRDD: Input split: 
> hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:134217728+134217728
> 17/02/13 11:16:37 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) 
> in 213605 ms on localhost (1/4)
> 17/02/13 11:20:05 INFO PythonRunner: Times: total = 208227, boot = -81, init 
> = 122, finish = 208186
> 17/02/13 11:20:05 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 2182 
> bytes result sent to driver
> 17/02/13 11:20:05 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, 
> localhost, partition 2,ANY, 2149 bytes)
> 17/02/13 11:20:05 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)
> 17/02/13 11:20:05 INFO HadoopRDD: Input split: 
> hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:268435456+134217728
> 17/02/13 11:20:05 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) 
> in 208302 ms on localhost (2/4)
> 17/02/13 11:23:37 INFO PythonRunner: Times: total = 212021, boot = -27, init 
> = 45, finish = 212003
> 17/02/13 11:23:37 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 2182 
> bytes result sent to driver
> 17/02/13 11:23:37 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3, 
> localhost, partition 3,ANY, 2149 bytes)
> 17/02/13 11:23:37 INFO Executor: Running task 3.0 in stage 0.0 (TID 3)
> 17/02/13 11:23:37 INFO HadoopRDD: Input split: 
> hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:402653184+113426432
> 17/02/13 11:23:37 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) 
> in 212072 ms on localhost (3/4)
> 17/02/13 11:26:35 INFO PythonRunner: Times: total = 177879, boot = -4, init = 
> 9, finish = 177874
> 17/02/13 11:26:35 INFO Executor: Finished task 3.0 in stage 0.0 (TID 3). 2182 
> bytes result sent to driver
> 17/02/13 11:26:35 INFO DAGScheduler: ResultStage 0 (count at <stdin>:1) 
> finished in 811.885 s
> 17/02/13 11:26:35 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3) 
> in 177937 ms on localhost (4/4)
> 17/02/13 11:26:35 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks 
> have all completed, from pool 
> 17/02/13 11:26:35 INFO DAGScheduler: Job 0 finished: count at <stdin>:1, took 
> 812.147354 s
> {code}
> 258039808
> [spark@c6401 ~]$ spark-shell 
> scala> var a = sc.textFile("/tmp/yes.txt")
> {code}
> 17/02/13 11:32:26 INFO MemoryStore: Block broadcast_0 stored as values in 
> memory (estimated size 341.1 KB, free 341.1 KB)
> 17/02/13 11:32:26 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes 
> in memory (estimated size 28.3 KB, free 369.4 KB)
> 17/02/13 11:32:26 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory 
> on localhost:44733 (size: 28.3 KB, free: 517.4 MB)
> 17/02/13 11:32:26 INFO SparkContext: Created broadcast 0 from textFile at 
> <console>:21
> a: org.apache.spark.rdd.RDD[String] = /tmp/yes.txt MapPartitionsRDD[1] at 
> textFile at <console>:21
> {code}
> scala> a.count()
> {code}
> 17/02/13 11:32:45 INFO FileInputFormat: Total input paths to process : 1
> 17/02/13 11:32:46 INFO SparkContext: Starting job: count at <console>:24
> 17/02/13 11:32:46 INFO DAGScheduler: Got job 0 (count at <console>:24) with 4 
> output partitions
> 17/02/13 11:32:46 INFO DAGScheduler: Final stage: ResultStage 0 (count at 
> <console>:24)
> 17/02/13 11:32:46 INFO DAGScheduler: Parents of final stage: List()
> 17/02/13 11:32:46 INFO DAGScheduler: Missing parents: List()
> 17/02/13 11:32:46 INFO DAGScheduler: Submitting ResultStage 0 (/tmp/yes.txt 
> MapPartitionsRDD[1] at textFile at <console>:21), which has no missing parents
> 17/02/13 11:32:46 INFO MemoryStore: Block broadcast_1 stored as values in 
> memory (estimated size 3.0 KB, free 372.4 KB)
> 17/02/13 11:32:46 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes 
> in memory (estimated size 1801.0 B, free 374.1 KB)
> 17/02/13 11:32:46 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory 
> on localhost:44733 (size: 1801.0 B, free: 517.4 MB)
> 17/02/13 11:32:46 INFO SparkContext: Created broadcast 1 from broadcast at 
> DAGScheduler.scala:1008
> 17/02/13 11:32:46 INFO DAGScheduler: Submitting 4 missing tasks from 
> ResultStage 0 (/tmp/yes.txt MapPartitionsRDD[1] at textFile at <console>:21)
> 17/02/13 11:32:46 INFO TaskSchedulerImpl: Adding task set 0.0 with 4 tasks
> 17/02/13 11:32:46 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 
> localhost, partition 0,ANY, 2149 bytes)
> 17/02/13 11:32:46 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
> 17/02/13 11:32:46 INFO HadoopRDD: Input split: 
> hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:0+134217728
> 17/02/13 11:32:46 INFO deprecation: mapred.tip.id is deprecated. Instead, use 
> mapreduce.task.id
> 17/02/13 11:32:46 INFO deprecation: mapred.task.id is deprecated. Instead, 
> use mapreduce.task.attempt.id
> 17/02/13 11:32:46 INFO deprecation: mapred.task.is.map is deprecated. 
> Instead, use mapreduce.task.ismap
> 17/02/13 11:32:46 INFO deprecation: mapred.task.partition is deprecated. 
> Instead, use mapreduce.task.partition
> 17/02/13 11:32:46 INFO deprecation: mapred.job.id is deprecated. Instead, use 
> mapreduce.job.id
> 17/02/13 11:32:55 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2082 
> bytes result sent to driver
> 17/02/13 11:32:55 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, 
> localhost, partition 1,ANY, 2149 bytes)
> 17/02/13 11:32:55 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
> 17/02/13 11:32:55 INFO HadoopRDD: Input split: 
> hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:134217728+134217728
> 17/02/13 11:32:55 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) 
> in 8857 ms on localhost (1/4)
> 17/02/13 11:33:02 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 2137 
> bytes result sent to driver
> 17/02/13 11:33:02 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, 
> localhost, partition 2,ANY, 2149 bytes)
> 17/02/13 11:33:02 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)
> 17/02/13 11:33:02 INFO HadoopRDD: Input split: 
> hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:268435456+134217728
> 17/02/13 11:33:02 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) 
> in 7928 ms on localhost (2/4)
> 17/02/13 11:33:12 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 2137 
> bytes result sent to driver
> 17/02/13 11:33:12 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3, 
> localhost, partition 3,ANY, 2149 bytes)
> 17/02/13 11:33:12 INFO Executor: Running task 3.0 in stage 0.0 (TID 3)
> 17/02/13 11:33:12 INFO HadoopRDD: Input split: 
> hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:402653184+113426432
> 17/02/13 11:33:12 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) 
> in 9517 ms on localhost (3/4)
> 17/02/13 11:33:18 INFO Executor: Finished task 3.0 in stage 0.0 (TID 3). 2137 
> bytes result sent to driver
> 17/02/13 11:33:18 INFO DAGScheduler: ResultStage 0 (count at <console>:24) 
> finished in 32.724 s
> 17/02/13 11:33:18 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3) 
> in 6443 ms on localhost (4/4)
> 17/02/13 11:33:18 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks 
> have all completed, from pool 
> 17/02/13 11:33:18 INFO DAGScheduler: Job 0 finished: count at <console>:24, 
> took 32.929721 s
> {code}
> res0: Long = 258039808
> Also Input Size metrics in Spark UI is wrong when running pyspark, it says 
> 64.0 KB (hadoop), however, when running in spark-shell it will show correct 
> info 128.1 MB (hadoop).
>  



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to