[ 
https://issues.apache.org/jira/browse/SPARK-19578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15864097#comment-15864097
 ] 

Nicholas Chammas commented on SPARK-19578:
------------------------------------------

I'm seeing the same thing too. You can get a much quicker response in PySpark 
by using the CSV reader, which suggests that the Python daemons are somehow 
unnecessarily involved in the {{textFile()}} version.

{code}
>>> spark.sparkContext.textFile('yes.txt').count()  # slow
120465408                                                                       
>>> spark.read.csv('yes.txt').count()  # fast
120465408                                                                       
{code}

If I interrupt the {{textFile()}} operation I get this stack trace:

{code}
  File "<stdin>", line 1, in <module>
  File "/usr/local/Cellar/apache-spark/2.0.2/libexec/python/pyspark/rdd.py", 
line 1008, in count
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "/usr/local/Cellar/apache-spark/2.0.2/libexec/python/pyspark/rdd.py", 
line 999, in sum
    return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
  File "/usr/local/Cellar/apache-spark/2.0.2/libexec/python/pyspark/rdd.py", 
line 873, in fold
    vals = self.mapPartitions(func).collect()
  File "/usr/local/Cellar/apache-spark/2.0.2/libexec/python/pyspark/rdd.py", 
line 776, in collect
    port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
{code}

Which may explain the performance slowdown. The count is being done in Python.

With the CSV/DataFrame version, I just see this:

{code}
  File "<stdin>", line 1, in <module>
  File 
"/usr/local/Cellar/apache-spark/2.0.2/libexec/python/pyspark/sql/dataframe.py", 
line 299, in count
    return int(self._jdf.count())
{code}

> Poor pyspark performance + incorrect UI input-size metrics
> ----------------------------------------------------------
>
>                 Key: SPARK-19578
>                 URL: https://issues.apache.org/jira/browse/SPARK-19578
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark, Web UI
>    Affects Versions: 1.6.1, 1.6.2, 2.0.1
>         Environment: Spark 1.6.2 Hortonworks
> Spark 2.0.1 MapR
> Spark 1.6.1 MapR
>            Reporter: Artur Sukhenko
>         Attachments: pyspark_incorrect_inputsize.png, reproduce_log, 
> spark_shell_correct_inputsize.png
>
>
> Simple job in pyspark takes 14 minutes to complete.
> The text file used to reproduce contains multiple millions lines of one word 
> "yes"
>  (it might be the cause of poor performance)
> {code}
> var a = sc.textFile("/tmp/yes.txt")
> a.count()
> {code}
> Same code took 33 sec in spark-shell
> Reproduce steps:
> Run this  to generate big file (press Ctrl+C after 5-6 seconds)
> [spark@c6401 ~]$ yes > /tmp/yes.txt
> [spark@c6401 ~]$ ll /tmp/
> -rw-r--r--  1 spark     hadoop  516079616 Feb 13 11:10 yes.txt
> [spark@c6401 ~]$ hadoop fs -copyFromLocal /tmp/yes.txt /tmp/
> [spark@c6401 ~]$ pyspark
> {code}
> Python 2.6.6 (r266:84292, Feb 22 2013, 00:00:18) 
> [GCC 4.4.7 20120313 (Red Hat 4.4.7-3)] on linux2
> 17/02/13 11:12:36 INFO SparkContext: Running Spark version 1.6.2
> {code}
> >>> a = sc.textFile("/tmp/yes.txt")
> {code}
> 17/02/13 11:12:58 INFO MemoryStore: Block broadcast_0 stored as values in 
> memory (estimated size 341.1 KB, free 341.1 KB)
> 17/02/13 11:12:58 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes 
> in memory (estimated size 28.3 KB, free 369.4 KB)
> 17/02/13 11:12:58 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory 
> on localhost:43389 (size: 28.3 KB, free: 517.4 MB)
> 17/02/13 11:12:58 INFO SparkContext: Created broadcast 0 from textFile at 
> NativeMethodAccessorImpl.java:-2
> {code}
> >>> a.count()
> {code}
> 17/02/13 11:13:03 INFO FileInputFormat: Total input paths to process : 1
> 17/02/13 11:13:03 INFO SparkContext: Starting job: count at <stdin>:1
> 17/02/13 11:13:03 INFO DAGScheduler: Got job 0 (count at <stdin>:1) with 4 
> output partitions
> 17/02/13 11:13:03 INFO DAGScheduler: Final stage: ResultStage 0 (count at 
> <stdin>:1)
> 17/02/13 11:13:03 INFO DAGScheduler: Parents of final stage: List()
> 17/02/13 11:13:03 INFO DAGScheduler: Missing parents: List()
> 17/02/13 11:13:03 INFO DAGScheduler: Submitting ResultStage 0 (PythonRDD[2] 
> at count at <stdin>:1), which has no missing parents
> 17/02/13 11:13:03 INFO MemoryStore: Block broadcast_1 stored as values in 
> memory (estimated size 5.7 KB, free 375.1 KB)
> 17/02/13 11:13:03 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes 
> in memory (estimated size 3.5 KB, free 378.6 KB)
> 17/02/13 11:13:03 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory 
> on localhost:43389 (size: 3.5 KB, free: 517.4 MB)
> 17/02/13 11:13:03 INFO SparkContext: Created broadcast 1 from broadcast at 
> DAGScheduler.scala:1008
> 17/02/13 11:13:03 INFO DAGScheduler: Submitting 4 missing tasks from 
> ResultStage 0 (PythonRDD[2] at count at <stdin>:1)
> 17/02/13 11:13:03 INFO TaskSchedulerImpl: Adding task set 0.0 with 4 tasks
> 17/02/13 11:13:03 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 
> localhost, partition 0,ANY, 2149 bytes)
> 17/02/13 11:13:03 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
> 17/02/13 11:13:03 INFO HadoopRDD: Input split: 
> hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:0+134217728
> 17/02/13 11:13:03 INFO deprecation: mapred.tip.id is deprecated. Instead, use 
> mapreduce.task.id
> 17/02/13 11:13:03 INFO deprecation: mapred.task.id is deprecated. Instead, 
> use mapreduce.task.attempt.id
> 17/02/13 11:13:03 INFO deprecation: mapred.task.is.map is deprecated. 
> Instead, use mapreduce.task.ismap
> 17/02/13 11:13:03 INFO deprecation: mapred.task.partition is deprecated. 
> Instead, use mapreduce.task.partition
> 17/02/13 11:13:03 INFO deprecation: mapred.job.id is deprecated. Instead, use 
> mapreduce.job.id
> 17/02/13 11:16:37 INFO PythonRunner: Times: total = 213335, boot = 317, init 
> = 445, finish = 212573
> 17/02/13 11:16:37 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2182 
> bytes result sent to driver
> 17/02/13 11:16:37 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, 
> localhost, partition 1,ANY, 2149 bytes)
> 17/02/13 11:16:37 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
> 17/02/13 11:16:37 INFO HadoopRDD: Input split: 
> hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:134217728+134217728
> 17/02/13 11:16:37 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) 
> in 213605 ms on localhost (1/4)
> 17/02/13 11:20:05 INFO PythonRunner: Times: total = 208227, boot = -81, init 
> = 122, finish = 208186
> 17/02/13 11:20:05 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 2182 
> bytes result sent to driver
> 17/02/13 11:20:05 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, 
> localhost, partition 2,ANY, 2149 bytes)
> 17/02/13 11:20:05 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)
> 17/02/13 11:20:05 INFO HadoopRDD: Input split: 
> hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:268435456+134217728
> 17/02/13 11:20:05 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) 
> in 208302 ms on localhost (2/4)
> 17/02/13 11:23:37 INFO PythonRunner: Times: total = 212021, boot = -27, init 
> = 45, finish = 212003
> 17/02/13 11:23:37 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 2182 
> bytes result sent to driver
> 17/02/13 11:23:37 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3, 
> localhost, partition 3,ANY, 2149 bytes)
> 17/02/13 11:23:37 INFO Executor: Running task 3.0 in stage 0.0 (TID 3)
> 17/02/13 11:23:37 INFO HadoopRDD: Input split: 
> hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:402653184+113426432
> 17/02/13 11:23:37 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) 
> in 212072 ms on localhost (3/4)
> 17/02/13 11:26:35 INFO PythonRunner: Times: total = 177879, boot = -4, init = 
> 9, finish = 177874
> 17/02/13 11:26:35 INFO Executor: Finished task 3.0 in stage 0.0 (TID 3). 2182 
> bytes result sent to driver
> 17/02/13 11:26:35 INFO DAGScheduler: ResultStage 0 (count at <stdin>:1) 
> finished in 811.885 s
> 17/02/13 11:26:35 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3) 
> in 177937 ms on localhost (4/4)
> 17/02/13 11:26:35 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks 
> have all completed, from pool 
> 17/02/13 11:26:35 INFO DAGScheduler: Job 0 finished: count at <stdin>:1, took 
> 812.147354 s
> {code}
> 258039808
> [spark@c6401 ~]$ spark-shell 
> scala> var a = sc.textFile("/tmp/yes.txt")
> {code}
> 17/02/13 11:32:26 INFO MemoryStore: Block broadcast_0 stored as values in 
> memory (estimated size 341.1 KB, free 341.1 KB)
> 17/02/13 11:32:26 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes 
> in memory (estimated size 28.3 KB, free 369.4 KB)
> 17/02/13 11:32:26 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory 
> on localhost:44733 (size: 28.3 KB, free: 517.4 MB)
> 17/02/13 11:32:26 INFO SparkContext: Created broadcast 0 from textFile at 
> <console>:21
> a: org.apache.spark.rdd.RDD[String] = /tmp/yes.txt MapPartitionsRDD[1] at 
> textFile at <console>:21
> {code}
> scala> a.count()
> {code}
> 17/02/13 11:32:45 INFO FileInputFormat: Total input paths to process : 1
> 17/02/13 11:32:46 INFO SparkContext: Starting job: count at <console>:24
> 17/02/13 11:32:46 INFO DAGScheduler: Got job 0 (count at <console>:24) with 4 
> output partitions
> 17/02/13 11:32:46 INFO DAGScheduler: Final stage: ResultStage 0 (count at 
> <console>:24)
> 17/02/13 11:32:46 INFO DAGScheduler: Parents of final stage: List()
> 17/02/13 11:32:46 INFO DAGScheduler: Missing parents: List()
> 17/02/13 11:32:46 INFO DAGScheduler: Submitting ResultStage 0 (/tmp/yes.txt 
> MapPartitionsRDD[1] at textFile at <console>:21), which has no missing parents
> 17/02/13 11:32:46 INFO MemoryStore: Block broadcast_1 stored as values in 
> memory (estimated size 3.0 KB, free 372.4 KB)
> 17/02/13 11:32:46 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes 
> in memory (estimated size 1801.0 B, free 374.1 KB)
> 17/02/13 11:32:46 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory 
> on localhost:44733 (size: 1801.0 B, free: 517.4 MB)
> 17/02/13 11:32:46 INFO SparkContext: Created broadcast 1 from broadcast at 
> DAGScheduler.scala:1008
> 17/02/13 11:32:46 INFO DAGScheduler: Submitting 4 missing tasks from 
> ResultStage 0 (/tmp/yes.txt MapPartitionsRDD[1] at textFile at <console>:21)
> 17/02/13 11:32:46 INFO TaskSchedulerImpl: Adding task set 0.0 with 4 tasks
> 17/02/13 11:32:46 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 
> localhost, partition 0,ANY, 2149 bytes)
> 17/02/13 11:32:46 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
> 17/02/13 11:32:46 INFO HadoopRDD: Input split: 
> hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:0+134217728
> 17/02/13 11:32:46 INFO deprecation: mapred.tip.id is deprecated. Instead, use 
> mapreduce.task.id
> 17/02/13 11:32:46 INFO deprecation: mapred.task.id is deprecated. Instead, 
> use mapreduce.task.attempt.id
> 17/02/13 11:32:46 INFO deprecation: mapred.task.is.map is deprecated. 
> Instead, use mapreduce.task.ismap
> 17/02/13 11:32:46 INFO deprecation: mapred.task.partition is deprecated. 
> Instead, use mapreduce.task.partition
> 17/02/13 11:32:46 INFO deprecation: mapred.job.id is deprecated. Instead, use 
> mapreduce.job.id
> 17/02/13 11:32:55 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2082 
> bytes result sent to driver
> 17/02/13 11:32:55 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, 
> localhost, partition 1,ANY, 2149 bytes)
> 17/02/13 11:32:55 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
> 17/02/13 11:32:55 INFO HadoopRDD: Input split: 
> hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:134217728+134217728
> 17/02/13 11:32:55 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) 
> in 8857 ms on localhost (1/4)
> 17/02/13 11:33:02 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 2137 
> bytes result sent to driver
> 17/02/13 11:33:02 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, 
> localhost, partition 2,ANY, 2149 bytes)
> 17/02/13 11:33:02 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)
> 17/02/13 11:33:02 INFO HadoopRDD: Input split: 
> hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:268435456+134217728
> 17/02/13 11:33:02 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) 
> in 7928 ms on localhost (2/4)
> 17/02/13 11:33:12 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 2137 
> bytes result sent to driver
> 17/02/13 11:33:12 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3, 
> localhost, partition 3,ANY, 2149 bytes)
> 17/02/13 11:33:12 INFO Executor: Running task 3.0 in stage 0.0 (TID 3)
> 17/02/13 11:33:12 INFO HadoopRDD: Input split: 
> hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:402653184+113426432
> 17/02/13 11:33:12 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) 
> in 9517 ms on localhost (3/4)
> 17/02/13 11:33:18 INFO Executor: Finished task 3.0 in stage 0.0 (TID 3). 2137 
> bytes result sent to driver
> 17/02/13 11:33:18 INFO DAGScheduler: ResultStage 0 (count at <console>:24) 
> finished in 32.724 s
> 17/02/13 11:33:18 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3) 
> in 6443 ms on localhost (4/4)
> 17/02/13 11:33:18 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks 
> have all completed, from pool 
> 17/02/13 11:33:18 INFO DAGScheduler: Job 0 finished: count at <console>:24, 
> took 32.929721 s
> {code}
> res0: Long = 258039808
> Also Input Size metrics in Spark UI is wrong when running pyspark, it says 
> 64.0 KB (hadoop), however, when running in spark-shell it will show correct 
> info 128.1 MB (hadoop).
>  



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to