yeah I thought the same thing at first too, I suggested something
equivalent w/ preservesPartitioning = true, but that isn't enough.  the
join is done by union-ing the two transformed rdds, which is very different
from the way it works under the hood in scala to enable narrow
dependencies.  It really needs a bigger change to pyspark.  I filed this
issue: https://issues.apache.org/jira/browse/SPARK-5785

(and the somewhat related issue about documentation:
https://issues.apache.org/jira/browse/SPARK-5786)

partitioning should still work in pyspark, you still need some notion of
distributing work, and the pyspark functions have a partitionFunc to decide
that.  But, I am not an authority on pyspark, so perhaps there are more
holes I'm not aware of ...

Imran

On Fri, Feb 13, 2015 at 8:36 AM, Karlson <ksonsp...@siberie.de> wrote:

> In https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38,
> wouldn't it help to change the lines
>
>     vs = rdd.map(lambda (k, v): (k, (1, v)))
>     ws = other.map(lambda (k, v): (k, (2, v)))
>
> to
>
>     vs = rdd.mapValues(lambda v: (1, v))
>     ws = other.mapValues(lambda v: (2, v))
>
> ?
> As I understand, this would preserve the original partitioning.
>
>
>
> On 2015-02-13 12:43, Karlson wrote:
>
>> Does that mean partitioning does not work in Python? Or does this only
>> effect joining?
>>
>> On 2015-02-12 19:27, Davies Liu wrote:
>>
>>> The feature works as expected in Scala/Java, but not implemented in
>>> Python.
>>>
>>> On Thu, Feb 12, 2015 at 9:24 AM, Imran Rashid <iras...@cloudera.com>
>>> wrote:
>>>
>>>> I wonder if the issue is that these lines just need to add
>>>> preservesPartitioning = true
>>>> ?
>>>>
>>>> https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38
>>>>
>>>> I am getting the feeling this is an issue w/ pyspark
>>>>
>>>>
>>>> On Thu, Feb 12, 2015 at 10:43 AM, Imran Rashid <iras...@cloudera.com>
>>>> wrote:
>>>>
>>>>>
>>>>> ah, sorry I am not too familiar w/ pyspark, sorry I missed that part.
>>>>> It
>>>>> could be that pyspark doesn't properly support narrow dependencies, or
>>>>> maybe
>>>>> you need to be more explicit about the partitioner.  I am looking into
>>>>> the
>>>>> pyspark api but you might have some better guesses here than I thought.
>>>>>
>>>>> My suggestion to do
>>>>>
>>>>> joinedRdd.getPartitions.foreach{println}
>>>>>
>>>>> was just to see if the partition was a NarrowCoGroupSplitDep or a
>>>>> ShuffleCoGroupSplitDep -- but it was actually a bad suggestion, those
>>>>> fields
>>>>> are hidden deeper inside and are not user-visible.  But I think a
>>>>> better way
>>>>> (in scala, anyway) is to look at rdd.dependencies.  its a little
>>>>> tricky,
>>>>> though, you need to look deep into the lineage (example at the end).
>>>>>
>>>>> Sean -- yes it does require both RDDs have the same partitioner, but
>>>>> that
>>>>> should happen naturally if you just specify the same number of
>>>>> partitions,
>>>>> you'll get equal HashPartitioners.  There is a little difference in the
>>>>> scala & python api that I missed here.  For partitionBy in scala, you
>>>>> actually need to specify the partitioner, but not in python.  However I
>>>>> thought it would work like groupByKey, which does just take an int.
>>>>>
>>>>>
>>>>> Here's a code example in scala -- not sure what is available from
>>>>> python.
>>>>> Hopefully somebody knows a simpler way to confirm narrow dependencies??
>>>>>
>>>>>  val d = sc.parallelize(1 to 1e6.toInt).map{x => x -> x}.groupByKey(64)
>>>>>> val d2 = sc.parallelize(3 to 1e6.toInt).map{x => x ->
>>>>>> x}.groupByKey(64)
>>>>>> scala> d.partitioner == d2.partitioner
>>>>>> res2: Boolean = true
>>>>>> val joined = d.join(d2)
>>>>>> val d3 = sc.parallelize(3 to 1e6.toInt).map{x => x ->
>>>>>> x}.groupByKey(100)
>>>>>> val badJoined = d.join(d3)
>>>>>>
>>>>>> d.setName("d")
>>>>>> d2.setName("d2")
>>>>>> d3.setName("d3")
>>>>>> joined.setName("joined")
>>>>>> badJoined.setName("badJoined")
>>>>>>
>>>>>>
>>>>>> //unfortunatley, just looking at the immediate dependencies of joined
>>>>>> &
>>>>>> badJoined is misleading, b/c join actually creates
>>>>>> // one more step after the shuffle
>>>>>> scala> joined.dependencies
>>>>>> res20: Seq[org.apache.spark.Dependency[_]] =
>>>>>> List(org.apache.spark.OneToOneDependency@74751ac8)
>>>>>> //even with the join that does require a shuffle, we still see a
>>>>>> OneToOneDependency, but thats just a simple flatMap step
>>>>>> scala> badJoined.dependencies
>>>>>> res21: Seq[org.apache.spark.Dependency[_]] =
>>>>>> List(org.apache.spark.OneToOneDependency@1cf356cc)
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>>  //so lets make a helper function to get all the dependencies
>>>>>> recursively
>>>>>>
>>>>>> def flattenDeps(rdd: RDD[_]): Seq[(RDD[_], Dependency[_])] = {
>>>>>>   val deps = rdd.dependencies
>>>>>>   deps.map{rdd -> _} ++ deps.flatMap{dep => flattenDeps(dep.rdd)}
>>>>>> }
>>>>>>
>>>>>>
>>>>>> //full dependencies of the good join
>>>>>>
>>>>>> scala> flattenDeps(joined).foreach{println}
>>>>>> (joined FlatMappedValuesRDD[9] at join at
>>>>>> <console>:16,org.apache.spark.OneToOneDependency@74751ac8)
>>>>>> (MappedValuesRDD[8] at join at
>>>>>> <console>:16,org.apache.spark.OneToOneDependency@623264af)
>>>>>> (CoGroupedRDD[7] at join at
>>>>>> <console>:16,org.apache.spark.OneToOneDependency@5a704f86)
>>>>>> (CoGroupedRDD[7] at join at
>>>>>> <console>:16,org.apache.spark.OneToOneDependency@37514cd)
>>>>>> (d ShuffledRDD[3] at groupByKey at
>>>>>> <console>:12,org.apache.spark.ShuffleDependency@7ba8a080)
>>>>>> (MappedRDD[2] at map at
>>>>>> <console>:12,org.apache.spark.OneToOneDependency@7bc172ec)
>>>>>> (d2 ShuffledRDD[6] at groupByKey at
>>>>>> <console>:12,org.apache.spark.ShuffleDependency@5960236d)
>>>>>> (MappedRDD[5] at map at
>>>>>> <console>:12,org.apache.spark.OneToOneDependency@36b5f6f2)
>>>>>>
>>>>>>
>>>>>>
>>>>>> //full dependencies of the bad join -- notice the ShuffleDependency!
>>>>>>
>>>>>> scala> flattenDeps(badJoined).foreach{println}
>>>>>> (badJoined FlatMappedValuesRDD[15] at join at
>>>>>> <console>:16,org.apache.spark.OneToOneDependency@1cf356cc)
>>>>>> (MappedValuesRDD[14] at join at
>>>>>> <console>:16,org.apache.spark.OneToOneDependency@5dea4db)
>>>>>> (CoGroupedRDD[13] at join at
>>>>>> <console>:16,org.apache.spark.ShuffleDependency@5c1928df)
>>>>>> (CoGroupedRDD[13] at join at
>>>>>> <console>:16,org.apache.spark.OneToOneDependency@77ca77b5)
>>>>>> (d ShuffledRDD[3] at groupByKey at
>>>>>> <console>:12,org.apache.spark.ShuffleDependency@7ba8a080)
>>>>>> (MappedRDD[2] at map at
>>>>>> <console>:12,org.apache.spark.OneToOneDependency@7bc172ec)
>>>>>> (d3 ShuffledRDD[12] at groupByKey at
>>>>>> <console>:12,org.apache.spark.ShuffleDependency@d794984)
>>>>>> (MappedRDD[11] at map at
>>>>>> <console>:12,org.apache.spark.OneToOneDependency@15c98005)
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Feb 12, 2015 at 10:05 AM, Karlson <ksonsp...@siberie.de>
>>>>> wrote:
>>>>>
>>>>>>
>>>>>> Hi Imran,
>>>>>>
>>>>>> thanks for your quick reply.
>>>>>>
>>>>>> Actually I am doing this:
>>>>>>
>>>>>>     rddA = rddA.partitionBy(n).cache()
>>>>>>     rddB = rddB.partitionBy(n).cache()
>>>>>>
>>>>>> followed by
>>>>>>
>>>>>>     rddA.count()
>>>>>>     rddB.count()
>>>>>>
>>>>>> then joinedRDD = rddA.join(rddB)
>>>>>>
>>>>>> I thought that the count() would force the evaluation, so any
>>>>>> subsequent
>>>>>> joins would be shuffleless. I was wrong about the shuffle amounts
>>>>>> however.
>>>>>> The shuffle write is actually 2GB (i.e. the size of the bigger RDD)
>>>>>> whil
>>>>>> there is no Shuffle read. A joinedRdd.count() does a shuffle read of
>>>>>> about
>>>>>> 1GB in size, though.
>>>>>>
>>>>>> The getPartitions-method does not exist on the resulting RDD (I am
>>>>>> using
>>>>>> the Python API). There is however foreachPartition(). What is the line
>>>>>>
>>>>>>     joinedRdd.getPartitions.foreach{println}
>>>>>>
>>>>>> supposed to do?
>>>>>>
>>>>>> Thank you,
>>>>>>
>>>>>> Karlson
>>>>>>
>>>>>> PS: Sorry for sending this twice, I accidentally did not reply to the
>>>>>> mailing list first.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On 2015-02-12 16:48, Imran Rashid wrote:
>>>>>>
>>>>>>>
>>>>>>> Hi Karlson,
>>>>>>>
>>>>>>> I think your assumptions are correct -- that join alone shouldn't
>>>>>>> require
>>>>>>> any shuffling.  But its possible you are getting tripped up by lazy
>>>>>>> evaluation of RDDs.  After you do your partitionBy, are you sure
>>>>>>> those
>>>>>>> RDDs
>>>>>>> are actually materialized & cached somewhere?  eg., if you just did
>>>>>>> this:
>>>>>>>
>>>>>>> val rddA = someData.partitionBy(N)
>>>>>>> val rddB = someOtherData.partitionBy(N)
>>>>>>> val joinedRdd = rddA.join(rddB)
>>>>>>> joinedRdd.count() //or any other action
>>>>>>>
>>>>>>> then the partitioning isn't actually getting run until you do the
>>>>>>> join.
>>>>>>> So
>>>>>>> though the join itself can happen without partitioning,
>>>>>>> joinedRdd.count()
>>>>>>> will trigger the evaluation of rddA & rddB which will require
>>>>>>> shuffles.
>>>>>>> Note that even if you have some intervening action on rddA & rddB
>>>>>>> that
>>>>>>> shuffles them, unless you persist the result, you will need to
>>>>>>> reshuffle
>>>>>>> them for the join.
>>>>>>>
>>>>>>> If this doesn't help explain things, for debugging
>>>>>>>
>>>>>>> joinedRdd.getPartitions.foreach{println}
>>>>>>>
>>>>>>> this is getting into the weeds, but at least this will tell us
>>>>>>> whether
>>>>>>> or
>>>>>>> not you are getting narrow dependencies, which would avoid the
>>>>>>> shuffle.
>>>>>>>  (Does anyone know of a simpler way to check this?)
>>>>>>>
>>>>>>> hope this helps,
>>>>>>> Imran
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Feb 12, 2015 at 9:25 AM, Karlson <ksonsp...@siberie.de>
>>>>>>> wrote:
>>>>>>>
>>>>>>>  Hi All,
>>>>>>>>
>>>>>>>> using Pyspark, I create two RDDs (one with about 2M records
>>>>>>>> (~200MB),
>>>>>>>> the
>>>>>>>> other with about 8M records (~2GB)) of the format (key, value).
>>>>>>>>
>>>>>>>> I've done a partitionBy(num_partitions) on both RDDs and verified
>>>>>>>> that
>>>>>>>> both RDDs have the same number of partitions and that equal keys
>>>>>>>> reside
>>>>>>>> on
>>>>>>>> the same partition (via mapPartitionsWithIndex).
>>>>>>>>
>>>>>>>> Now I'd expect that for a join on the two RDDs no shuffling is
>>>>>>>> necessary.
>>>>>>>> Looking at the Web UI under http://driver:4040 however reveals that
>>>>>>>> that
>>>>>>>> assumption is false.
>>>>>>>>
>>>>>>>> In fact I am seeing shuffle writes of about 200MB and reads of about
>>>>>>>> 50MB.
>>>>>>>>
>>>>>>>> What's the explanation for that behaviour? Where am I wrong with my
>>>>>>>> assumption?
>>>>>>>>
>>>>>>>> Thanks in advance,
>>>>>>>>
>>>>>>>> Karlson
>>>>>>>>
>>>>>>>> ------------------------------------------------------------
>>>>>>>> ---------
>>>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>> ---------------------------------------------------------------------
>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>>
>>>>>>
>>>>>
>>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>

Reply via email to