ah, sorry I am not too familiar w/ pyspark, sorry I missed that part. It could be that pyspark doesn't properly support narrow dependencies, or maybe you need to be more explicit about the partitioner. I am looking into the pyspark api but you might have some better guesses here than I thought.
My suggestion to do joinedRdd.getPartitions.foreach{println} was just to see if the partition was a NarrowCoGroupSplitDep or a ShuffleCoGroupSplitDep -- but it was actually a bad suggestion, those fields are hidden deeper inside and are not user-visible. But I think a better way (in scala, anyway) is to look at rdd.dependencies. its a little tricky, though, you need to look deep into the lineage (example at the end). Sean -- yes it does require both RDDs have the same partitioner, but that should happen naturally if you just specify the same number of partitions, you'll get equal HashPartitioners. There is a little difference in the scala & python api that I missed here. For partitionBy in scala, you actually need to specify the partitioner, but not in python. However I thought it would work like groupByKey, which does just take an int. Here's a code example in scala -- not sure what is available from python. Hopefully somebody knows a simpler way to confirm narrow dependencies?? val d = sc.parallelize(1 to 1e6.toInt).map{x => x -> x}.groupByKey(64) > val d2 = sc.parallelize(3 to 1e6.toInt).map{x => x -> x}.groupByKey(64) > scala> d.partitioner == d2.partitioner > res2: Boolean = true > val joined = d.join(d2) > val d3 = sc.parallelize(3 to 1e6.toInt).map{x => x -> x}.groupByKey(100) > val badJoined = d.join(d3) > > d.setName("d") > d2.setName("d2") > d3.setName("d3") > joined.setName("joined") > badJoined.setName("badJoined") > //unfortunatley, just looking at the immediate dependencies of joined & > badJoined is misleading, b/c join actually creates > // one more step after the shuffle > scala> joined.dependencies > res20: Seq[org.apache.spark.Dependency[_]] = > List(org.apache.spark.OneToOneDependency@74751ac8) > //even with the join that does require a shuffle, we still see a > OneToOneDependency, but thats just a simple flatMap step > scala> badJoined.dependencies > res21: Seq[org.apache.spark.Dependency[_]] = > List(org.apache.spark.OneToOneDependency@1cf356cc) > > //so lets make a helper function to get all the dependencies recursively > def flattenDeps(rdd: RDD[_]): Seq[(RDD[_], Dependency[_])] = { > val deps = rdd.dependencies > deps.map{rdd -> _} ++ deps.flatMap{dep => flattenDeps(dep.rdd)} > } > > > //full dependencies of the good join scala> flattenDeps(joined).foreach{println} > (joined FlatMappedValuesRDD[9] at join at > <console>:16,org.apache.spark.OneToOneDependency@74751ac8) > (MappedValuesRDD[8] at join at > <console>:16,org.apache.spark.OneToOneDependency@623264af) > > *(CoGroupedRDD[7] at join at > <console>:16,org.apache.spark.OneToOneDependency@5a704f86)*(CoGroupedRDD[7] > at join at <console>:16,org.apache.spark.OneToOneDependency@37514cd) > (d ShuffledRDD[3] at groupByKey at > <console>:12,org.apache.spark.ShuffleDependency@7ba8a080) > (MappedRDD[2] at map at > <console>:12,org.apache.spark.OneToOneDependency@7bc172ec) > (d2 ShuffledRDD[6] at groupByKey at > <console>:12,org.apache.spark.ShuffleDependency@5960236d) > (MappedRDD[5] at map at > <console>:12,org.apache.spark.OneToOneDependency@36b5f6f2) > > > //full dependencies of the bad join -- notice the ShuffleDependency! scala> flattenDeps(badJoined).foreach{println} > (badJoined FlatMappedValuesRDD[15] at join at > <console>:16,org.apache.spark.OneToOneDependency@1cf356cc) > (MappedValuesRDD[14] at join at > <console>:16,org.apache.spark.OneToOneDependency@5dea4db) > > *(CoGroupedRDD[13] at join at > <console>:16,org.apache.spark.ShuffleDependency@5c1928df)*(CoGroupedRDD[13] > at join at <console>:16,org.apache.spark.OneToOneDependency@77ca77b5) > (d ShuffledRDD[3] at groupByKey at > <console>:12,org.apache.spark.ShuffleDependency@7ba8a080) > (MappedRDD[2] at map at > <console>:12,org.apache.spark.OneToOneDependency@7bc172ec) > (d3 ShuffledRDD[12] at groupByKey at > <console>:12,org.apache.spark.ShuffleDependency@d794984) > (MappedRDD[11] at map at > <console>:12,org.apache.spark.OneToOneDependency@15c98005) On Thu, Feb 12, 2015 at 10:05 AM, Karlson <ksonsp...@siberie.de> wrote: > Hi Imran, > > thanks for your quick reply. > > Actually I am doing this: > > rddA = rddA.partitionBy(n).cache() > rddB = rddB.partitionBy(n).cache() > > followed by > > rddA.count() > rddB.count() > > then joinedRDD = rddA.join(rddB) > > I thought that the count() would force the evaluation, so any subsequent > joins would be shuffleless. I was wrong about the shuffle amounts however. > The shuffle write is actually 2GB (i.e. the size of the bigger RDD) whil > there is no Shuffle read. A joinedRdd.count() does a shuffle read of about > 1GB in size, though. > > The getPartitions-method does not exist on the resulting RDD (I am using > the Python API). There is however foreachPartition(). What is the line > > joinedRdd.getPartitions.foreach{println} > > supposed to do? > > Thank you, > > Karlson > > PS: Sorry for sending this twice, I accidentally did not reply to the > mailing list first. > > > > On 2015-02-12 16:48, Imran Rashid wrote: > >> Hi Karlson, >> >> I think your assumptions are correct -- that join alone shouldn't require >> any shuffling. But its possible you are getting tripped up by lazy >> evaluation of RDDs. After you do your partitionBy, are you sure those >> RDDs >> are actually materialized & cached somewhere? eg., if you just did this: >> >> val rddA = someData.partitionBy(N) >> val rddB = someOtherData.partitionBy(N) >> val joinedRdd = rddA.join(rddB) >> joinedRdd.count() //or any other action >> >> then the partitioning isn't actually getting run until you do the join. >> So >> though the join itself can happen without partitioning, joinedRdd.count() >> will trigger the evaluation of rddA & rddB which will require shuffles. >> Note that even if you have some intervening action on rddA & rddB that >> shuffles them, unless you persist the result, you will need to reshuffle >> them for the join. >> >> If this doesn't help explain things, for debugging >> >> joinedRdd.getPartitions.foreach{println} >> >> this is getting into the weeds, but at least this will tell us whether or >> not you are getting narrow dependencies, which would avoid the shuffle. >> (Does anyone know of a simpler way to check this?) >> >> hope this helps, >> Imran >> >> >> >> >> On Thu, Feb 12, 2015 at 9:25 AM, Karlson <ksonsp...@siberie.de> wrote: >> >> Hi All, >>> >>> using Pyspark, I create two RDDs (one with about 2M records (~200MB), the >>> other with about 8M records (~2GB)) of the format (key, value). >>> >>> I've done a partitionBy(num_partitions) on both RDDs and verified that >>> both RDDs have the same number of partitions and that equal keys reside >>> on >>> the same partition (via mapPartitionsWithIndex). >>> >>> Now I'd expect that for a join on the two RDDs no shuffling is necessary. >>> Looking at the Web UI under http://driver:4040 however reveals that that >>> assumption is false. >>> >>> In fact I am seeing shuffle writes of about 200MB and reads of about >>> 50MB. >>> >>> What's the explanation for that behaviour? Where am I wrong with my >>> assumption? >>> >>> Thanks in advance, >>> >>> Karlson >>> >>> --------------------------------------------------------------------- >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> >>> >>> > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >