Try to use Dataframe instead of RDD.
Here's an introduction to Dataframe:
https://databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-scale-data-science.html

2016-05-06 21:52 GMT+07:00 pratik gawande <pratik.gawa...@hotmail.com>:

> Thanks Shao for quick reply. I will look into how pyspark jobs are
> executed. Any suggestions or reference to docs on how to tune pyspark jobs?
>
>
>
>
>
> On Thu, May 5, 2016 at 10:12 PM -0700, "Saisai Shao" <
> sai.sai.s...@gmail.com> wrote:
>
> Writing RDD based application using pyspark will bring in additional
> overheads, Spark is running on the JVM whereas your python code is running
> on python runtime, so data should be communicated between JVM world and
> python world, this requires additional serialization-deserialization, IPC.
> Also other parts will bring in overheads. So the performance difference is
> expected, but you could tune the application to reduce the gap.
>
> Also because python RDD wraps a lot, so the DAG you saw is different from
> Scala, that is also expected.
>
> Thanks
> Saisai
>
>
> On Fri, May 6, 2016 at 12:47 PM, pratik gawande <
> pratik.gawa...@hotmail.com> wrote:
>
>> Hello,
>>
>> I am new to spark. For one of  job I am finding significant performance
>> difference when run in pyspark vs scala. Could you please let me know if
>> this is known and scala is preferred over python for writing spark jobs?
>> Also DAG visualization shows completely different DAGs for scala and
>> pyspark. I have pasted DAG for both using toDebugString() method. Let me
>> know if you need any additional information.
>>
>> *Time for Job in scala* : 52 secs
>>
>> *Time for job in pyspark *: 4.2 min
>>
>>
>> *Scala code in Zepplin:*
>>
>> val lines = sc.textFile("s3://[test-bucket]/output2/")
>> val words = lines.flatMap(line => line.split(" "))
>> val filteredWords = words.filter(word => word.equals("Gutenberg") ||
>> word.equals("flower") || word.equals("a"))
>> val wordMap = filteredWords.map(word => (word, 1)).reduceByKey(_ + _)
>> wordMap.collect()
>>
>> *pyspark code in Zepplin:*
>>
>> lines = sc.textFile("s3://[test-bucket]/output2/")
>> words = lines.flatMap(lambda x: x.split())
>> filteredWords = words.filter(lambda x: (x == "Gutenberg" or x ==
>> "flower" or x == "a"))
>> result = filteredWords.map(lambda x: (x, 1)).reduceByKey(lambda a,b:
>> a+b).collect()
>> print result
>>
>> *Scala final RDD:*
>>
>>
>> *print wordMap.toDebugString() *
>>
>>  lines: org.apache.spark.rdd.RDD[String] = s3://[test-bucket]/output2/
>> MapPartitionsRDD[108] at textFile at <console>:30 words:
>> org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[109] at flatMap at
>> <console>:31 filteredWords: org.apache.spark.rdd.RDD[String] =
>> MapPartitionsRDD[110] at filter at <console>:33 wordMap:
>> org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[112] at reduceByKey
>> at <console>:35 (10) ShuffledRDD[112] at reduceByKey at <console>:35 []
>> +-(10) MapPartitionsRDD[111] at map at <console>:35 [] |
>> MapPartitionsRDD[110] at filter at <console>:33 [] | MapPartitionsRDD[109]
>> at flatMap at <console>:31 [] | s3://[test-bucket]/output2/
>> MapPartitionsRDD[108] at textFile at <console>:30 [] |
>> s3://[test-bucket]/output2/ HadoopRDD[107] at textFile at <console>:30 []
>>
>>
>> *PySpark final RDD:*
>>
>>
>> *println(wordMap.toDebugString) *
>>
>> (10) PythonRDD[119] at RDD at PythonRDD.scala:43 [] | s3://[test-bucket]/
>> output2/ MapPartitionsRDD[114] at textFile at null:-1 [] |
>> s3://[test-bucket]/output2/HadoopRDD[113] at textFile at null:-1 []
>> PythonRDD[120] at RDD at PythonRDD.scala:43
>>
>>
>> Thanks,
>>
>> Pratik
>>
>
>

Reply via email to