yeah I thought the same thing at first too, I suggested something equivalent w/ preservesPartitioning = true, but that isn't enough. the join is done by union-ing the two transformed rdds, which is very different from the way it works under the hood in scala to enable narrow dependencies. It really needs a bigger change to pyspark. I filed this issue: https://issues.apache.org/jira/browse/SPARK-5785
(and the somewhat related issue about documentation: https://issues.apache.org/jira/browse/SPARK-5786) partitioning should still work in pyspark, you still need some notion of distributing work, and the pyspark functions have a partitionFunc to decide that. But, I am not an authority on pyspark, so perhaps there are more holes I'm not aware of ... Imran On Fri, Feb 13, 2015 at 8:36 AM, Karlson <ksonsp...@siberie.de> wrote: > In https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38, > wouldn't it help to change the lines > > vs = rdd.map(lambda (k, v): (k, (1, v))) > ws = other.map(lambda (k, v): (k, (2, v))) > > to > > vs = rdd.mapValues(lambda v: (1, v)) > ws = other.mapValues(lambda v: (2, v)) > > ? > As I understand, this would preserve the original partitioning. > > > > On 2015-02-13 12:43, Karlson wrote: > >> Does that mean partitioning does not work in Python? Or does this only >> effect joining? >> >> On 2015-02-12 19:27, Davies Liu wrote: >> >>> The feature works as expected in Scala/Java, but not implemented in >>> Python. >>> >>> On Thu, Feb 12, 2015 at 9:24 AM, Imran Rashid <iras...@cloudera.com> >>> wrote: >>> >>>> I wonder if the issue is that these lines just need to add >>>> preservesPartitioning = true >>>> ? >>>> >>>> https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38 >>>> >>>> I am getting the feeling this is an issue w/ pyspark >>>> >>>> >>>> On Thu, Feb 12, 2015 at 10:43 AM, Imran Rashid <iras...@cloudera.com> >>>> wrote: >>>> >>>>> >>>>> ah, sorry I am not too familiar w/ pyspark, sorry I missed that part. >>>>> It >>>>> could be that pyspark doesn't properly support narrow dependencies, or >>>>> maybe >>>>> you need to be more explicit about the partitioner. I am looking into >>>>> the >>>>> pyspark api but you might have some better guesses here than I thought. >>>>> >>>>> My suggestion to do >>>>> >>>>> joinedRdd.getPartitions.foreach{println} >>>>> >>>>> was just to see if the partition was a NarrowCoGroupSplitDep or a >>>>> ShuffleCoGroupSplitDep -- but it was actually a bad suggestion, those >>>>> fields >>>>> are hidden deeper inside and are not user-visible. But I think a >>>>> better way >>>>> (in scala, anyway) is to look at rdd.dependencies. its a little >>>>> tricky, >>>>> though, you need to look deep into the lineage (example at the end). >>>>> >>>>> Sean -- yes it does require both RDDs have the same partitioner, but >>>>> that >>>>> should happen naturally if you just specify the same number of >>>>> partitions, >>>>> you'll get equal HashPartitioners. There is a little difference in the >>>>> scala & python api that I missed here. For partitionBy in scala, you >>>>> actually need to specify the partitioner, but not in python. However I >>>>> thought it would work like groupByKey, which does just take an int. >>>>> >>>>> >>>>> Here's a code example in scala -- not sure what is available from >>>>> python. >>>>> Hopefully somebody knows a simpler way to confirm narrow dependencies?? >>>>> >>>>> val d = sc.parallelize(1 to 1e6.toInt).map{x => x -> x}.groupByKey(64) >>>>>> val d2 = sc.parallelize(3 to 1e6.toInt).map{x => x -> >>>>>> x}.groupByKey(64) >>>>>> scala> d.partitioner == d2.partitioner >>>>>> res2: Boolean = true >>>>>> val joined = d.join(d2) >>>>>> val d3 = sc.parallelize(3 to 1e6.toInt).map{x => x -> >>>>>> x}.groupByKey(100) >>>>>> val badJoined = d.join(d3) >>>>>> >>>>>> d.setName("d") >>>>>> d2.setName("d2") >>>>>> d3.setName("d3") >>>>>> joined.setName("joined") >>>>>> badJoined.setName("badJoined") >>>>>> >>>>>> >>>>>> //unfortunatley, just looking at the immediate dependencies of joined >>>>>> & >>>>>> badJoined is misleading, b/c join actually creates >>>>>> // one more step after the shuffle >>>>>> scala> joined.dependencies >>>>>> res20: Seq[org.apache.spark.Dependency[_]] = >>>>>> List(org.apache.spark.OneToOneDependency@74751ac8) >>>>>> //even with the join that does require a shuffle, we still see a >>>>>> OneToOneDependency, but thats just a simple flatMap step >>>>>> scala> badJoined.dependencies >>>>>> res21: Seq[org.apache.spark.Dependency[_]] = >>>>>> List(org.apache.spark.OneToOneDependency@1cf356cc) >>>>>> >>>>> >>>>> >>>>> >>>>> >>>>>> //so lets make a helper function to get all the dependencies >>>>>> recursively >>>>>> >>>>>> def flattenDeps(rdd: RDD[_]): Seq[(RDD[_], Dependency[_])] = { >>>>>> val deps = rdd.dependencies >>>>>> deps.map{rdd -> _} ++ deps.flatMap{dep => flattenDeps(dep.rdd)} >>>>>> } >>>>>> >>>>>> >>>>>> //full dependencies of the good join >>>>>> >>>>>> scala> flattenDeps(joined).foreach{println} >>>>>> (joined FlatMappedValuesRDD[9] at join at >>>>>> <console>:16,org.apache.spark.OneToOneDependency@74751ac8) >>>>>> (MappedValuesRDD[8] at join at >>>>>> <console>:16,org.apache.spark.OneToOneDependency@623264af) >>>>>> (CoGroupedRDD[7] at join at >>>>>> <console>:16,org.apache.spark.OneToOneDependency@5a704f86) >>>>>> (CoGroupedRDD[7] at join at >>>>>> <console>:16,org.apache.spark.OneToOneDependency@37514cd) >>>>>> (d ShuffledRDD[3] at groupByKey at >>>>>> <console>:12,org.apache.spark.ShuffleDependency@7ba8a080) >>>>>> (MappedRDD[2] at map at >>>>>> <console>:12,org.apache.spark.OneToOneDependency@7bc172ec) >>>>>> (d2 ShuffledRDD[6] at groupByKey at >>>>>> <console>:12,org.apache.spark.ShuffleDependency@5960236d) >>>>>> (MappedRDD[5] at map at >>>>>> <console>:12,org.apache.spark.OneToOneDependency@36b5f6f2) >>>>>> >>>>>> >>>>>> >>>>>> //full dependencies of the bad join -- notice the ShuffleDependency! >>>>>> >>>>>> scala> flattenDeps(badJoined).foreach{println} >>>>>> (badJoined FlatMappedValuesRDD[15] at join at >>>>>> <console>:16,org.apache.spark.OneToOneDependency@1cf356cc) >>>>>> (MappedValuesRDD[14] at join at >>>>>> <console>:16,org.apache.spark.OneToOneDependency@5dea4db) >>>>>> (CoGroupedRDD[13] at join at >>>>>> <console>:16,org.apache.spark.ShuffleDependency@5c1928df) >>>>>> (CoGroupedRDD[13] at join at >>>>>> <console>:16,org.apache.spark.OneToOneDependency@77ca77b5) >>>>>> (d ShuffledRDD[3] at groupByKey at >>>>>> <console>:12,org.apache.spark.ShuffleDependency@7ba8a080) >>>>>> (MappedRDD[2] at map at >>>>>> <console>:12,org.apache.spark.OneToOneDependency@7bc172ec) >>>>>> (d3 ShuffledRDD[12] at groupByKey at >>>>>> <console>:12,org.apache.spark.ShuffleDependency@d794984) >>>>>> (MappedRDD[11] at map at >>>>>> <console>:12,org.apache.spark.OneToOneDependency@15c98005) >>>>>> >>>>> >>>>> >>>>> >>>>> On Thu, Feb 12, 2015 at 10:05 AM, Karlson <ksonsp...@siberie.de> >>>>> wrote: >>>>> >>>>>> >>>>>> Hi Imran, >>>>>> >>>>>> thanks for your quick reply. >>>>>> >>>>>> Actually I am doing this: >>>>>> >>>>>> rddA = rddA.partitionBy(n).cache() >>>>>> rddB = rddB.partitionBy(n).cache() >>>>>> >>>>>> followed by >>>>>> >>>>>> rddA.count() >>>>>> rddB.count() >>>>>> >>>>>> then joinedRDD = rddA.join(rddB) >>>>>> >>>>>> I thought that the count() would force the evaluation, so any >>>>>> subsequent >>>>>> joins would be shuffleless. I was wrong about the shuffle amounts >>>>>> however. >>>>>> The shuffle write is actually 2GB (i.e. the size of the bigger RDD) >>>>>> whil >>>>>> there is no Shuffle read. A joinedRdd.count() does a shuffle read of >>>>>> about >>>>>> 1GB in size, though. >>>>>> >>>>>> The getPartitions-method does not exist on the resulting RDD (I am >>>>>> using >>>>>> the Python API). There is however foreachPartition(). What is the line >>>>>> >>>>>> joinedRdd.getPartitions.foreach{println} >>>>>> >>>>>> supposed to do? >>>>>> >>>>>> Thank you, >>>>>> >>>>>> Karlson >>>>>> >>>>>> PS: Sorry for sending this twice, I accidentally did not reply to the >>>>>> mailing list first. >>>>>> >>>>>> >>>>>> >>>>>> On 2015-02-12 16:48, Imran Rashid wrote: >>>>>> >>>>>>> >>>>>>> Hi Karlson, >>>>>>> >>>>>>> I think your assumptions are correct -- that join alone shouldn't >>>>>>> require >>>>>>> any shuffling. But its possible you are getting tripped up by lazy >>>>>>> evaluation of RDDs. After you do your partitionBy, are you sure >>>>>>> those >>>>>>> RDDs >>>>>>> are actually materialized & cached somewhere? eg., if you just did >>>>>>> this: >>>>>>> >>>>>>> val rddA = someData.partitionBy(N) >>>>>>> val rddB = someOtherData.partitionBy(N) >>>>>>> val joinedRdd = rddA.join(rddB) >>>>>>> joinedRdd.count() //or any other action >>>>>>> >>>>>>> then the partitioning isn't actually getting run until you do the >>>>>>> join. >>>>>>> So >>>>>>> though the join itself can happen without partitioning, >>>>>>> joinedRdd.count() >>>>>>> will trigger the evaluation of rddA & rddB which will require >>>>>>> shuffles. >>>>>>> Note that even if you have some intervening action on rddA & rddB >>>>>>> that >>>>>>> shuffles them, unless you persist the result, you will need to >>>>>>> reshuffle >>>>>>> them for the join. >>>>>>> >>>>>>> If this doesn't help explain things, for debugging >>>>>>> >>>>>>> joinedRdd.getPartitions.foreach{println} >>>>>>> >>>>>>> this is getting into the weeds, but at least this will tell us >>>>>>> whether >>>>>>> or >>>>>>> not you are getting narrow dependencies, which would avoid the >>>>>>> shuffle. >>>>>>> (Does anyone know of a simpler way to check this?) >>>>>>> >>>>>>> hope this helps, >>>>>>> Imran >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Thu, Feb 12, 2015 at 9:25 AM, Karlson <ksonsp...@siberie.de> >>>>>>> wrote: >>>>>>> >>>>>>> Hi All, >>>>>>>> >>>>>>>> using Pyspark, I create two RDDs (one with about 2M records >>>>>>>> (~200MB), >>>>>>>> the >>>>>>>> other with about 8M records (~2GB)) of the format (key, value). >>>>>>>> >>>>>>>> I've done a partitionBy(num_partitions) on both RDDs and verified >>>>>>>> that >>>>>>>> both RDDs have the same number of partitions and that equal keys >>>>>>>> reside >>>>>>>> on >>>>>>>> the same partition (via mapPartitionsWithIndex). >>>>>>>> >>>>>>>> Now I'd expect that for a join on the two RDDs no shuffling is >>>>>>>> necessary. >>>>>>>> Looking at the Web UI under http://driver:4040 however reveals that >>>>>>>> that >>>>>>>> assumption is false. >>>>>>>> >>>>>>>> In fact I am seeing shuffle writes of about 200MB and reads of about >>>>>>>> 50MB. >>>>>>>> >>>>>>>> What's the explanation for that behaviour? Where am I wrong with my >>>>>>>> assumption? >>>>>>>> >>>>>>>> Thanks in advance, >>>>>>>> >>>>>>>> Karlson >>>>>>>> >>>>>>>> ------------------------------------------------------------ >>>>>>>> --------- >>>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>>>>> For additional commands, e-mail: user-h...@spark.apache.org >>>>>>>> >>>>>>>> >>>>>>>> >>>>>> --------------------------------------------------------------------- >>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>>> For additional commands, e-mail: user-h...@spark.apache.org >>>>>> >>>>>> >>>>> >>>> >>> --------------------------------------------------------------------- >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> >> >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >