[ https://issues.apache.org/jira/browse/SPARK-19578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15864097#comment-15864097 ]
Nicholas Chammas commented on SPARK-19578: ------------------------------------------ I'm seeing the same thing too. You can get a much quicker response in PySpark by using the CSV reader, which suggests that the Python daemons are somehow unnecessarily involved in the {{textFile()}} version. {code} >>> spark.sparkContext.textFile('yes.txt').count() # slow 120465408 >>> spark.read.csv('yes.txt').count() # fast 120465408 {code} If I interrupt the {{textFile()}} operation I get this stack trace: {code} File "<stdin>", line 1, in <module> File "/usr/local/Cellar/apache-spark/2.0.2/libexec/python/pyspark/rdd.py", line 1008, in count return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() File "/usr/local/Cellar/apache-spark/2.0.2/libexec/python/pyspark/rdd.py", line 999, in sum return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add) File "/usr/local/Cellar/apache-spark/2.0.2/libexec/python/pyspark/rdd.py", line 873, in fold vals = self.mapPartitions(func).collect() File "/usr/local/Cellar/apache-spark/2.0.2/libexec/python/pyspark/rdd.py", line 776, in collect port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) {code} Which may explain the performance slowdown. The count is being done in Python. With the CSV/DataFrame version, I just see this: {code} File "<stdin>", line 1, in <module> File "/usr/local/Cellar/apache-spark/2.0.2/libexec/python/pyspark/sql/dataframe.py", line 299, in count return int(self._jdf.count()) {code} > Poor pyspark performance + incorrect UI input-size metrics > ---------------------------------------------------------- > > Key: SPARK-19578 > URL: https://issues.apache.org/jira/browse/SPARK-19578 > Project: Spark > Issue Type: Bug > Components: PySpark, Web UI > Affects Versions: 1.6.1, 1.6.2, 2.0.1 > Environment: Spark 1.6.2 Hortonworks > Spark 2.0.1 MapR > Spark 1.6.1 MapR > Reporter: Artur Sukhenko > Attachments: pyspark_incorrect_inputsize.png, reproduce_log, > spark_shell_correct_inputsize.png > > > Simple job in pyspark takes 14 minutes to complete. > The text file used to reproduce contains multiple millions lines of one word > "yes" > (it might be the cause of poor performance) > {code} > var a = sc.textFile("/tmp/yes.txt") > a.count() > {code} > Same code took 33 sec in spark-shell > Reproduce steps: > Run this to generate big file (press Ctrl+C after 5-6 seconds) > [spark@c6401 ~]$ yes > /tmp/yes.txt > [spark@c6401 ~]$ ll /tmp/ > -rw-r--r-- 1 spark hadoop 516079616 Feb 13 11:10 yes.txt > [spark@c6401 ~]$ hadoop fs -copyFromLocal /tmp/yes.txt /tmp/ > [spark@c6401 ~]$ pyspark > {code} > Python 2.6.6 (r266:84292, Feb 22 2013, 00:00:18) > [GCC 4.4.7 20120313 (Red Hat 4.4.7-3)] on linux2 > 17/02/13 11:12:36 INFO SparkContext: Running Spark version 1.6.2 > {code} > >>> a = sc.textFile("/tmp/yes.txt") > {code} > 17/02/13 11:12:58 INFO MemoryStore: Block broadcast_0 stored as values in > memory (estimated size 341.1 KB, free 341.1 KB) > 17/02/13 11:12:58 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes > in memory (estimated size 28.3 KB, free 369.4 KB) > 17/02/13 11:12:58 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory > on localhost:43389 (size: 28.3 KB, free: 517.4 MB) > 17/02/13 11:12:58 INFO SparkContext: Created broadcast 0 from textFile at > NativeMethodAccessorImpl.java:-2 > {code} > >>> a.count() > {code} > 17/02/13 11:13:03 INFO FileInputFormat: Total input paths to process : 1 > 17/02/13 11:13:03 INFO SparkContext: Starting job: count at <stdin>:1 > 17/02/13 11:13:03 INFO DAGScheduler: Got job 0 (count at <stdin>:1) with 4 > output partitions > 17/02/13 11:13:03 INFO DAGScheduler: Final stage: ResultStage 0 (count at > <stdin>:1) > 17/02/13 11:13:03 INFO DAGScheduler: Parents of final stage: List() > 17/02/13 11:13:03 INFO DAGScheduler: Missing parents: List() > 17/02/13 11:13:03 INFO DAGScheduler: Submitting ResultStage 0 (PythonRDD[2] > at count at <stdin>:1), which has no missing parents > 17/02/13 11:13:03 INFO MemoryStore: Block broadcast_1 stored as values in > memory (estimated size 5.7 KB, free 375.1 KB) > 17/02/13 11:13:03 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes > in memory (estimated size 3.5 KB, free 378.6 KB) > 17/02/13 11:13:03 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory > on localhost:43389 (size: 3.5 KB, free: 517.4 MB) > 17/02/13 11:13:03 INFO SparkContext: Created broadcast 1 from broadcast at > DAGScheduler.scala:1008 > 17/02/13 11:13:03 INFO DAGScheduler: Submitting 4 missing tasks from > ResultStage 0 (PythonRDD[2] at count at <stdin>:1) > 17/02/13 11:13:03 INFO TaskSchedulerImpl: Adding task set 0.0 with 4 tasks > 17/02/13 11:13:03 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, > localhost, partition 0,ANY, 2149 bytes) > 17/02/13 11:13:03 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) > 17/02/13 11:13:03 INFO HadoopRDD: Input split: > hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:0+134217728 > 17/02/13 11:13:03 INFO deprecation: mapred.tip.id is deprecated. Instead, use > mapreduce.task.id > 17/02/13 11:13:03 INFO deprecation: mapred.task.id is deprecated. Instead, > use mapreduce.task.attempt.id > 17/02/13 11:13:03 INFO deprecation: mapred.task.is.map is deprecated. > Instead, use mapreduce.task.ismap > 17/02/13 11:13:03 INFO deprecation: mapred.task.partition is deprecated. > Instead, use mapreduce.task.partition > 17/02/13 11:13:03 INFO deprecation: mapred.job.id is deprecated. Instead, use > mapreduce.job.id > 17/02/13 11:16:37 INFO PythonRunner: Times: total = 213335, boot = 317, init > = 445, finish = 212573 > 17/02/13 11:16:37 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2182 > bytes result sent to driver > 17/02/13 11:16:37 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, > localhost, partition 1,ANY, 2149 bytes) > 17/02/13 11:16:37 INFO Executor: Running task 1.0 in stage 0.0 (TID 1) > 17/02/13 11:16:37 INFO HadoopRDD: Input split: > hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:134217728+134217728 > 17/02/13 11:16:37 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) > in 213605 ms on localhost (1/4) > 17/02/13 11:20:05 INFO PythonRunner: Times: total = 208227, boot = -81, init > = 122, finish = 208186 > 17/02/13 11:20:05 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 2182 > bytes result sent to driver > 17/02/13 11:20:05 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, > localhost, partition 2,ANY, 2149 bytes) > 17/02/13 11:20:05 INFO Executor: Running task 2.0 in stage 0.0 (TID 2) > 17/02/13 11:20:05 INFO HadoopRDD: Input split: > hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:268435456+134217728 > 17/02/13 11:20:05 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) > in 208302 ms on localhost (2/4) > 17/02/13 11:23:37 INFO PythonRunner: Times: total = 212021, boot = -27, init > = 45, finish = 212003 > 17/02/13 11:23:37 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 2182 > bytes result sent to driver > 17/02/13 11:23:37 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3, > localhost, partition 3,ANY, 2149 bytes) > 17/02/13 11:23:37 INFO Executor: Running task 3.0 in stage 0.0 (TID 3) > 17/02/13 11:23:37 INFO HadoopRDD: Input split: > hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:402653184+113426432 > 17/02/13 11:23:37 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) > in 212072 ms on localhost (3/4) > 17/02/13 11:26:35 INFO PythonRunner: Times: total = 177879, boot = -4, init = > 9, finish = 177874 > 17/02/13 11:26:35 INFO Executor: Finished task 3.0 in stage 0.0 (TID 3). 2182 > bytes result sent to driver > 17/02/13 11:26:35 INFO DAGScheduler: ResultStage 0 (count at <stdin>:1) > finished in 811.885 s > 17/02/13 11:26:35 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3) > in 177937 ms on localhost (4/4) > 17/02/13 11:26:35 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks > have all completed, from pool > 17/02/13 11:26:35 INFO DAGScheduler: Job 0 finished: count at <stdin>:1, took > 812.147354 s > {code} > 258039808 > [spark@c6401 ~]$ spark-shell > scala> var a = sc.textFile("/tmp/yes.txt") > {code} > 17/02/13 11:32:26 INFO MemoryStore: Block broadcast_0 stored as values in > memory (estimated size 341.1 KB, free 341.1 KB) > 17/02/13 11:32:26 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes > in memory (estimated size 28.3 KB, free 369.4 KB) > 17/02/13 11:32:26 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory > on localhost:44733 (size: 28.3 KB, free: 517.4 MB) > 17/02/13 11:32:26 INFO SparkContext: Created broadcast 0 from textFile at > <console>:21 > a: org.apache.spark.rdd.RDD[String] = /tmp/yes.txt MapPartitionsRDD[1] at > textFile at <console>:21 > {code} > scala> a.count() > {code} > 17/02/13 11:32:45 INFO FileInputFormat: Total input paths to process : 1 > 17/02/13 11:32:46 INFO SparkContext: Starting job: count at <console>:24 > 17/02/13 11:32:46 INFO DAGScheduler: Got job 0 (count at <console>:24) with 4 > output partitions > 17/02/13 11:32:46 INFO DAGScheduler: Final stage: ResultStage 0 (count at > <console>:24) > 17/02/13 11:32:46 INFO DAGScheduler: Parents of final stage: List() > 17/02/13 11:32:46 INFO DAGScheduler: Missing parents: List() > 17/02/13 11:32:46 INFO DAGScheduler: Submitting ResultStage 0 (/tmp/yes.txt > MapPartitionsRDD[1] at textFile at <console>:21), which has no missing parents > 17/02/13 11:32:46 INFO MemoryStore: Block broadcast_1 stored as values in > memory (estimated size 3.0 KB, free 372.4 KB) > 17/02/13 11:32:46 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes > in memory (estimated size 1801.0 B, free 374.1 KB) > 17/02/13 11:32:46 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory > on localhost:44733 (size: 1801.0 B, free: 517.4 MB) > 17/02/13 11:32:46 INFO SparkContext: Created broadcast 1 from broadcast at > DAGScheduler.scala:1008 > 17/02/13 11:32:46 INFO DAGScheduler: Submitting 4 missing tasks from > ResultStage 0 (/tmp/yes.txt MapPartitionsRDD[1] at textFile at <console>:21) > 17/02/13 11:32:46 INFO TaskSchedulerImpl: Adding task set 0.0 with 4 tasks > 17/02/13 11:32:46 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, > localhost, partition 0,ANY, 2149 bytes) > 17/02/13 11:32:46 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) > 17/02/13 11:32:46 INFO HadoopRDD: Input split: > hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:0+134217728 > 17/02/13 11:32:46 INFO deprecation: mapred.tip.id is deprecated. Instead, use > mapreduce.task.id > 17/02/13 11:32:46 INFO deprecation: mapred.task.id is deprecated. Instead, > use mapreduce.task.attempt.id > 17/02/13 11:32:46 INFO deprecation: mapred.task.is.map is deprecated. > Instead, use mapreduce.task.ismap > 17/02/13 11:32:46 INFO deprecation: mapred.task.partition is deprecated. > Instead, use mapreduce.task.partition > 17/02/13 11:32:46 INFO deprecation: mapred.job.id is deprecated. Instead, use > mapreduce.job.id > 17/02/13 11:32:55 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2082 > bytes result sent to driver > 17/02/13 11:32:55 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, > localhost, partition 1,ANY, 2149 bytes) > 17/02/13 11:32:55 INFO Executor: Running task 1.0 in stage 0.0 (TID 1) > 17/02/13 11:32:55 INFO HadoopRDD: Input split: > hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:134217728+134217728 > 17/02/13 11:32:55 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) > in 8857 ms on localhost (1/4) > 17/02/13 11:33:02 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 2137 > bytes result sent to driver > 17/02/13 11:33:02 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, > localhost, partition 2,ANY, 2149 bytes) > 17/02/13 11:33:02 INFO Executor: Running task 2.0 in stage 0.0 (TID 2) > 17/02/13 11:33:02 INFO HadoopRDD: Input split: > hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:268435456+134217728 > 17/02/13 11:33:02 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) > in 7928 ms on localhost (2/4) > 17/02/13 11:33:12 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 2137 > bytes result sent to driver > 17/02/13 11:33:12 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3, > localhost, partition 3,ANY, 2149 bytes) > 17/02/13 11:33:12 INFO Executor: Running task 3.0 in stage 0.0 (TID 3) > 17/02/13 11:33:12 INFO HadoopRDD: Input split: > hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:402653184+113426432 > 17/02/13 11:33:12 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) > in 9517 ms on localhost (3/4) > 17/02/13 11:33:18 INFO Executor: Finished task 3.0 in stage 0.0 (TID 3). 2137 > bytes result sent to driver > 17/02/13 11:33:18 INFO DAGScheduler: ResultStage 0 (count at <console>:24) > finished in 32.724 s > 17/02/13 11:33:18 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3) > in 6443 ms on localhost (4/4) > 17/02/13 11:33:18 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks > have all completed, from pool > 17/02/13 11:33:18 INFO DAGScheduler: Job 0 finished: count at <console>:24, > took 32.929721 s > {code} > res0: Long = 258039808 > Also Input Size metrics in Spark UI is wrong when running pyspark, it says > 64.0 KB (hadoop), however, when running in spark-shell it will show correct > info 128.1 MB (hadoop). > -- This message was sent by Atlassian JIRA (v6.3.15#6346) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org