The feature works as expected in Scala/Java, but not implemented in Python.
On Thu, Feb 12, 2015 at 9:24 AM, Imran Rashid <iras...@cloudera.com> wrote: > I wonder if the issue is that these lines just need to add > preservesPartitioning = true > ? > > https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38 > > I am getting the feeling this is an issue w/ pyspark > > > On Thu, Feb 12, 2015 at 10:43 AM, Imran Rashid <iras...@cloudera.com> wrote: >> >> ah, sorry I am not too familiar w/ pyspark, sorry I missed that part. It >> could be that pyspark doesn't properly support narrow dependencies, or maybe >> you need to be more explicit about the partitioner. I am looking into the >> pyspark api but you might have some better guesses here than I thought. >> >> My suggestion to do >> >> joinedRdd.getPartitions.foreach{println} >> >> was just to see if the partition was a NarrowCoGroupSplitDep or a >> ShuffleCoGroupSplitDep -- but it was actually a bad suggestion, those fields >> are hidden deeper inside and are not user-visible. But I think a better way >> (in scala, anyway) is to look at rdd.dependencies. its a little tricky, >> though, you need to look deep into the lineage (example at the end). >> >> Sean -- yes it does require both RDDs have the same partitioner, but that >> should happen naturally if you just specify the same number of partitions, >> you'll get equal HashPartitioners. There is a little difference in the >> scala & python api that I missed here. For partitionBy in scala, you >> actually need to specify the partitioner, but not in python. However I >> thought it would work like groupByKey, which does just take an int. >> >> >> Here's a code example in scala -- not sure what is available from python. >> Hopefully somebody knows a simpler way to confirm narrow dependencies?? >> >>> val d = sc.parallelize(1 to 1e6.toInt).map{x => x -> x}.groupByKey(64) >>> val d2 = sc.parallelize(3 to 1e6.toInt).map{x => x -> x}.groupByKey(64) >>> scala> d.partitioner == d2.partitioner >>> res2: Boolean = true >>> val joined = d.join(d2) >>> val d3 = sc.parallelize(3 to 1e6.toInt).map{x => x -> x}.groupByKey(100) >>> val badJoined = d.join(d3) >>> >>> d.setName("d") >>> d2.setName("d2") >>> d3.setName("d3") >>> joined.setName("joined") >>> badJoined.setName("badJoined") >>> >>> >>> //unfortunatley, just looking at the immediate dependencies of joined & >>> badJoined is misleading, b/c join actually creates >>> // one more step after the shuffle >>> scala> joined.dependencies >>> res20: Seq[org.apache.spark.Dependency[_]] = >>> List(org.apache.spark.OneToOneDependency@74751ac8) >>> //even with the join that does require a shuffle, we still see a >>> OneToOneDependency, but thats just a simple flatMap step >>> scala> badJoined.dependencies >>> res21: Seq[org.apache.spark.Dependency[_]] = >>> List(org.apache.spark.OneToOneDependency@1cf356cc) >> >> >> >>> >>> //so lets make a helper function to get all the dependencies recursively >>> >>> def flattenDeps(rdd: RDD[_]): Seq[(RDD[_], Dependency[_])] = { >>> val deps = rdd.dependencies >>> deps.map{rdd -> _} ++ deps.flatMap{dep => flattenDeps(dep.rdd)} >>> } >>> >>> >>> //full dependencies of the good join >>> >>> scala> flattenDeps(joined).foreach{println} >>> (joined FlatMappedValuesRDD[9] at join at >>> <console>:16,org.apache.spark.OneToOneDependency@74751ac8) >>> (MappedValuesRDD[8] at join at >>> <console>:16,org.apache.spark.OneToOneDependency@623264af) >>> (CoGroupedRDD[7] at join at >>> <console>:16,org.apache.spark.OneToOneDependency@5a704f86) >>> (CoGroupedRDD[7] at join at >>> <console>:16,org.apache.spark.OneToOneDependency@37514cd) >>> (d ShuffledRDD[3] at groupByKey at >>> <console>:12,org.apache.spark.ShuffleDependency@7ba8a080) >>> (MappedRDD[2] at map at >>> <console>:12,org.apache.spark.OneToOneDependency@7bc172ec) >>> (d2 ShuffledRDD[6] at groupByKey at >>> <console>:12,org.apache.spark.ShuffleDependency@5960236d) >>> (MappedRDD[5] at map at >>> <console>:12,org.apache.spark.OneToOneDependency@36b5f6f2) >>> >>> >>> >>> //full dependencies of the bad join -- notice the ShuffleDependency! >>> >>> scala> flattenDeps(badJoined).foreach{println} >>> (badJoined FlatMappedValuesRDD[15] at join at >>> <console>:16,org.apache.spark.OneToOneDependency@1cf356cc) >>> (MappedValuesRDD[14] at join at >>> <console>:16,org.apache.spark.OneToOneDependency@5dea4db) >>> (CoGroupedRDD[13] at join at >>> <console>:16,org.apache.spark.ShuffleDependency@5c1928df) >>> (CoGroupedRDD[13] at join at >>> <console>:16,org.apache.spark.OneToOneDependency@77ca77b5) >>> (d ShuffledRDD[3] at groupByKey at >>> <console>:12,org.apache.spark.ShuffleDependency@7ba8a080) >>> (MappedRDD[2] at map at >>> <console>:12,org.apache.spark.OneToOneDependency@7bc172ec) >>> (d3 ShuffledRDD[12] at groupByKey at >>> <console>:12,org.apache.spark.ShuffleDependency@d794984) >>> (MappedRDD[11] at map at >>> <console>:12,org.apache.spark.OneToOneDependency@15c98005) >> >> >> >> On Thu, Feb 12, 2015 at 10:05 AM, Karlson <ksonsp...@siberie.de> wrote: >>> >>> Hi Imran, >>> >>> thanks for your quick reply. >>> >>> Actually I am doing this: >>> >>> rddA = rddA.partitionBy(n).cache() >>> rddB = rddB.partitionBy(n).cache() >>> >>> followed by >>> >>> rddA.count() >>> rddB.count() >>> >>> then joinedRDD = rddA.join(rddB) >>> >>> I thought that the count() would force the evaluation, so any subsequent >>> joins would be shuffleless. I was wrong about the shuffle amounts however. >>> The shuffle write is actually 2GB (i.e. the size of the bigger RDD) whil >>> there is no Shuffle read. A joinedRdd.count() does a shuffle read of about >>> 1GB in size, though. >>> >>> The getPartitions-method does not exist on the resulting RDD (I am using >>> the Python API). There is however foreachPartition(). What is the line >>> >>> joinedRdd.getPartitions.foreach{println} >>> >>> supposed to do? >>> >>> Thank you, >>> >>> Karlson >>> >>> PS: Sorry for sending this twice, I accidentally did not reply to the >>> mailing list first. >>> >>> >>> >>> On 2015-02-12 16:48, Imran Rashid wrote: >>>> >>>> Hi Karlson, >>>> >>>> I think your assumptions are correct -- that join alone shouldn't >>>> require >>>> any shuffling. But its possible you are getting tripped up by lazy >>>> evaluation of RDDs. After you do your partitionBy, are you sure those >>>> RDDs >>>> are actually materialized & cached somewhere? eg., if you just did >>>> this: >>>> >>>> val rddA = someData.partitionBy(N) >>>> val rddB = someOtherData.partitionBy(N) >>>> val joinedRdd = rddA.join(rddB) >>>> joinedRdd.count() //or any other action >>>> >>>> then the partitioning isn't actually getting run until you do the join. >>>> So >>>> though the join itself can happen without partitioning, >>>> joinedRdd.count() >>>> will trigger the evaluation of rddA & rddB which will require shuffles. >>>> Note that even if you have some intervening action on rddA & rddB that >>>> shuffles them, unless you persist the result, you will need to reshuffle >>>> them for the join. >>>> >>>> If this doesn't help explain things, for debugging >>>> >>>> joinedRdd.getPartitions.foreach{println} >>>> >>>> this is getting into the weeds, but at least this will tell us whether >>>> or >>>> not you are getting narrow dependencies, which would avoid the shuffle. >>>> (Does anyone know of a simpler way to check this?) >>>> >>>> hope this helps, >>>> Imran >>>> >>>> >>>> >>>> >>>> On Thu, Feb 12, 2015 at 9:25 AM, Karlson <ksonsp...@siberie.de> wrote: >>>> >>>>> Hi All, >>>>> >>>>> using Pyspark, I create two RDDs (one with about 2M records (~200MB), >>>>> the >>>>> other with about 8M records (~2GB)) of the format (key, value). >>>>> >>>>> I've done a partitionBy(num_partitions) on both RDDs and verified that >>>>> both RDDs have the same number of partitions and that equal keys reside >>>>> on >>>>> the same partition (via mapPartitionsWithIndex). >>>>> >>>>> Now I'd expect that for a join on the two RDDs no shuffling is >>>>> necessary. >>>>> Looking at the Web UI under http://driver:4040 however reveals that >>>>> that >>>>> assumption is false. >>>>> >>>>> In fact I am seeing shuffle writes of about 200MB and reads of about >>>>> 50MB. >>>>> >>>>> What's the explanation for that behaviour? Where am I wrong with my >>>>> assumption? >>>>> >>>>> Thanks in advance, >>>>> >>>>> Karlson >>>>> >>>>> --------------------------------------------------------------------- >>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>> For additional commands, e-mail: user-h...@spark.apache.org >>>>> >>>>> >>> >>> --------------------------------------------------------------------- >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> >> > --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org