The feature works as expected in Scala/Java, but not implemented in Python.

On Thu, Feb 12, 2015 at 9:24 AM, Imran Rashid <iras...@cloudera.com> wrote:
> I wonder if the issue is that these lines just need to add
> preservesPartitioning = true
> ?
>
> https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38
>
> I am getting the feeling this is an issue w/ pyspark
>
>
> On Thu, Feb 12, 2015 at 10:43 AM, Imran Rashid <iras...@cloudera.com> wrote:
>>
>> ah, sorry I am not too familiar w/ pyspark, sorry I missed that part.  It
>> could be that pyspark doesn't properly support narrow dependencies, or maybe
>> you need to be more explicit about the partitioner.  I am looking into the
>> pyspark api but you might have some better guesses here than I thought.
>>
>> My suggestion to do
>>
>> joinedRdd.getPartitions.foreach{println}
>>
>> was just to see if the partition was a NarrowCoGroupSplitDep or a
>> ShuffleCoGroupSplitDep -- but it was actually a bad suggestion, those fields
>> are hidden deeper inside and are not user-visible.  But I think a better way
>> (in scala, anyway) is to look at rdd.dependencies.  its a little tricky,
>> though, you need to look deep into the lineage (example at the end).
>>
>> Sean -- yes it does require both RDDs have the same partitioner, but that
>> should happen naturally if you just specify the same number of partitions,
>> you'll get equal HashPartitioners.  There is a little difference in the
>> scala & python api that I missed here.  For partitionBy in scala, you
>> actually need to specify the partitioner, but not in python.  However I
>> thought it would work like groupByKey, which does just take an int.
>>
>>
>> Here's a code example in scala -- not sure what is available from python.
>> Hopefully somebody knows a simpler way to confirm narrow dependencies??
>>
>>> val d = sc.parallelize(1 to 1e6.toInt).map{x => x -> x}.groupByKey(64)
>>> val d2 = sc.parallelize(3 to 1e6.toInt).map{x => x -> x}.groupByKey(64)
>>> scala> d.partitioner == d2.partitioner
>>> res2: Boolean = true
>>> val joined = d.join(d2)
>>> val d3 = sc.parallelize(3 to 1e6.toInt).map{x => x -> x}.groupByKey(100)
>>> val badJoined = d.join(d3)
>>>
>>> d.setName("d")
>>> d2.setName("d2")
>>> d3.setName("d3")
>>> joined.setName("joined")
>>> badJoined.setName("badJoined")
>>>
>>>
>>> //unfortunatley, just looking at the immediate dependencies of joined &
>>> badJoined is misleading, b/c join actually creates
>>> // one more step after the shuffle
>>> scala> joined.dependencies
>>> res20: Seq[org.apache.spark.Dependency[_]] =
>>> List(org.apache.spark.OneToOneDependency@74751ac8)
>>> //even with the join that does require a shuffle, we still see a
>>> OneToOneDependency, but thats just a simple flatMap step
>>> scala> badJoined.dependencies
>>> res21: Seq[org.apache.spark.Dependency[_]] =
>>> List(org.apache.spark.OneToOneDependency@1cf356cc)
>>
>>
>>
>>>
>>>  //so lets make a helper function to get all the dependencies recursively
>>>
>>> def flattenDeps(rdd: RDD[_]): Seq[(RDD[_], Dependency[_])] = {
>>>   val deps = rdd.dependencies
>>>   deps.map{rdd -> _} ++ deps.flatMap{dep => flattenDeps(dep.rdd)}
>>> }
>>>
>>>
>>> //full dependencies of the good join
>>>
>>> scala> flattenDeps(joined).foreach{println}
>>> (joined FlatMappedValuesRDD[9] at join at
>>> <console>:16,org.apache.spark.OneToOneDependency@74751ac8)
>>> (MappedValuesRDD[8] at join at
>>> <console>:16,org.apache.spark.OneToOneDependency@623264af)
>>> (CoGroupedRDD[7] at join at
>>> <console>:16,org.apache.spark.OneToOneDependency@5a704f86)
>>> (CoGroupedRDD[7] at join at
>>> <console>:16,org.apache.spark.OneToOneDependency@37514cd)
>>> (d ShuffledRDD[3] at groupByKey at
>>> <console>:12,org.apache.spark.ShuffleDependency@7ba8a080)
>>> (MappedRDD[2] at map at
>>> <console>:12,org.apache.spark.OneToOneDependency@7bc172ec)
>>> (d2 ShuffledRDD[6] at groupByKey at
>>> <console>:12,org.apache.spark.ShuffleDependency@5960236d)
>>> (MappedRDD[5] at map at
>>> <console>:12,org.apache.spark.OneToOneDependency@36b5f6f2)
>>>
>>>
>>>
>>> //full dependencies of the bad join -- notice the ShuffleDependency!
>>>
>>> scala> flattenDeps(badJoined).foreach{println}
>>> (badJoined FlatMappedValuesRDD[15] at join at
>>> <console>:16,org.apache.spark.OneToOneDependency@1cf356cc)
>>> (MappedValuesRDD[14] at join at
>>> <console>:16,org.apache.spark.OneToOneDependency@5dea4db)
>>> (CoGroupedRDD[13] at join at
>>> <console>:16,org.apache.spark.ShuffleDependency@5c1928df)
>>> (CoGroupedRDD[13] at join at
>>> <console>:16,org.apache.spark.OneToOneDependency@77ca77b5)
>>> (d ShuffledRDD[3] at groupByKey at
>>> <console>:12,org.apache.spark.ShuffleDependency@7ba8a080)
>>> (MappedRDD[2] at map at
>>> <console>:12,org.apache.spark.OneToOneDependency@7bc172ec)
>>> (d3 ShuffledRDD[12] at groupByKey at
>>> <console>:12,org.apache.spark.ShuffleDependency@d794984)
>>> (MappedRDD[11] at map at
>>> <console>:12,org.apache.spark.OneToOneDependency@15c98005)
>>
>>
>>
>> On Thu, Feb 12, 2015 at 10:05 AM, Karlson <ksonsp...@siberie.de> wrote:
>>>
>>> Hi Imran,
>>>
>>> thanks for your quick reply.
>>>
>>> Actually I am doing this:
>>>
>>>     rddA = rddA.partitionBy(n).cache()
>>>     rddB = rddB.partitionBy(n).cache()
>>>
>>> followed by
>>>
>>>     rddA.count()
>>>     rddB.count()
>>>
>>> then joinedRDD = rddA.join(rddB)
>>>
>>> I thought that the count() would force the evaluation, so any subsequent
>>> joins would be shuffleless. I was wrong about the shuffle amounts however.
>>> The shuffle write is actually 2GB (i.e. the size of the bigger RDD) whil
>>> there is no Shuffle read. A joinedRdd.count() does a shuffle read of about
>>> 1GB in size, though.
>>>
>>> The getPartitions-method does not exist on the resulting RDD (I am using
>>> the Python API). There is however foreachPartition(). What is the line
>>>
>>>     joinedRdd.getPartitions.foreach{println}
>>>
>>> supposed to do?
>>>
>>> Thank you,
>>>
>>> Karlson
>>>
>>> PS: Sorry for sending this twice, I accidentally did not reply to the
>>> mailing list first.
>>>
>>>
>>>
>>> On 2015-02-12 16:48, Imran Rashid wrote:
>>>>
>>>> Hi Karlson,
>>>>
>>>> I think your assumptions are correct -- that join alone shouldn't
>>>> require
>>>> any shuffling.  But its possible you are getting tripped up by lazy
>>>> evaluation of RDDs.  After you do your partitionBy, are you sure those
>>>> RDDs
>>>> are actually materialized & cached somewhere?  eg., if you just did
>>>> this:
>>>>
>>>> val rddA = someData.partitionBy(N)
>>>> val rddB = someOtherData.partitionBy(N)
>>>> val joinedRdd = rddA.join(rddB)
>>>> joinedRdd.count() //or any other action
>>>>
>>>> then the partitioning isn't actually getting run until you do the join.
>>>> So
>>>> though the join itself can happen without partitioning,
>>>> joinedRdd.count()
>>>> will trigger the evaluation of rddA & rddB which will require shuffles.
>>>> Note that even if you have some intervening action on rddA & rddB that
>>>> shuffles them, unless you persist the result, you will need to reshuffle
>>>> them for the join.
>>>>
>>>> If this doesn't help explain things, for debugging
>>>>
>>>> joinedRdd.getPartitions.foreach{println}
>>>>
>>>> this is getting into the weeds, but at least this will tell us whether
>>>> or
>>>> not you are getting narrow dependencies, which would avoid the shuffle.
>>>>  (Does anyone know of a simpler way to check this?)
>>>>
>>>> hope this helps,
>>>> Imran
>>>>
>>>>
>>>>
>>>>
>>>> On Thu, Feb 12, 2015 at 9:25 AM, Karlson <ksonsp...@siberie.de> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> using Pyspark, I create two RDDs (one with about 2M records (~200MB),
>>>>> the
>>>>> other with about 8M records (~2GB)) of the format (key, value).
>>>>>
>>>>> I've done a partitionBy(num_partitions) on both RDDs and verified that
>>>>> both RDDs have the same number of partitions and that equal keys reside
>>>>> on
>>>>> the same partition (via mapPartitionsWithIndex).
>>>>>
>>>>> Now I'd expect that for a join on the two RDDs no shuffling is
>>>>> necessary.
>>>>> Looking at the Web UI under http://driver:4040 however reveals that
>>>>> that
>>>>> assumption is false.
>>>>>
>>>>> In fact I am seeing shuffle writes of about 200MB and reads of about
>>>>> 50MB.
>>>>>
>>>>> What's the explanation for that behaviour? Where am I wrong with my
>>>>> assumption?
>>>>>
>>>>> Thanks in advance,
>>>>>
>>>>> Karlson
>>>>>
>>>>> ---------------------------------------------------------------------
>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>
>>>>>
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to