Does that mean partitioning does not work in Python? Or does this only effect joining?

On 2015-02-12 19:27, Davies Liu wrote:
The feature works as expected in Scala/Java, but not implemented in Python.

On Thu, Feb 12, 2015 at 9:24 AM, Imran Rashid <iras...@cloudera.com> wrote:
I wonder if the issue is that these lines just need to add
preservesPartitioning = true
?

https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38

I am getting the feeling this is an issue w/ pyspark


On Thu, Feb 12, 2015 at 10:43 AM, Imran Rashid <iras...@cloudera.com> wrote:

ah, sorry I am not too familiar w/ pyspark, sorry I missed that part. It could be that pyspark doesn't properly support narrow dependencies, or maybe you need to be more explicit about the partitioner. I am looking into the pyspark api but you might have some better guesses here than I thought.

My suggestion to do

joinedRdd.getPartitions.foreach{println}

was just to see if the partition was a NarrowCoGroupSplitDep or a
ShuffleCoGroupSplitDep -- but it was actually a bad suggestion, those fields are hidden deeper inside and are not user-visible. But I think a better way (in scala, anyway) is to look at rdd.dependencies. its a little tricky,
though, you need to look deep into the lineage (example at the end).

Sean -- yes it does require both RDDs have the same partitioner, but that should happen naturally if you just specify the same number of partitions, you'll get equal HashPartitioners. There is a little difference in the
scala & python api that I missed here.  For partitionBy in scala, you
actually need to specify the partitioner, but not in python. However I
thought it would work like groupByKey, which does just take an int.


Here's a code example in scala -- not sure what is available from python. Hopefully somebody knows a simpler way to confirm narrow dependencies??

val d = sc.parallelize(1 to 1e6.toInt).map{x => x -> x}.groupByKey(64) val d2 = sc.parallelize(3 to 1e6.toInt).map{x => x -> x}.groupByKey(64)
scala> d.partitioner == d2.partitioner
res2: Boolean = true
val joined = d.join(d2)
val d3 = sc.parallelize(3 to 1e6.toInt).map{x => x -> x}.groupByKey(100)
val badJoined = d.join(d3)

d.setName("d")
d2.setName("d2")
d3.setName("d3")
joined.setName("joined")
badJoined.setName("badJoined")


//unfortunatley, just looking at the immediate dependencies of joined &
badJoined is misleading, b/c join actually creates
// one more step after the shuffle
scala> joined.dependencies
res20: Seq[org.apache.spark.Dependency[_]] =
List(org.apache.spark.OneToOneDependency@74751ac8)
//even with the join that does require a shuffle, we still see a
OneToOneDependency, but thats just a simple flatMap step
scala> badJoined.dependencies
res21: Seq[org.apache.spark.Dependency[_]] =
List(org.apache.spark.OneToOneDependency@1cf356cc)




//so lets make a helper function to get all the dependencies recursively

def flattenDeps(rdd: RDD[_]): Seq[(RDD[_], Dependency[_])] = {
  val deps = rdd.dependencies
  deps.map{rdd -> _} ++ deps.flatMap{dep => flattenDeps(dep.rdd)}
}


//full dependencies of the good join

scala> flattenDeps(joined).foreach{println}
(joined FlatMappedValuesRDD[9] at join at
<console>:16,org.apache.spark.OneToOneDependency@74751ac8)
(MappedValuesRDD[8] at join at
<console>:16,org.apache.spark.OneToOneDependency@623264af)
(CoGroupedRDD[7] at join at
<console>:16,org.apache.spark.OneToOneDependency@5a704f86)
(CoGroupedRDD[7] at join at
<console>:16,org.apache.spark.OneToOneDependency@37514cd)
(d ShuffledRDD[3] at groupByKey at
<console>:12,org.apache.spark.ShuffleDependency@7ba8a080)
(MappedRDD[2] at map at
<console>:12,org.apache.spark.OneToOneDependency@7bc172ec)
(d2 ShuffledRDD[6] at groupByKey at
<console>:12,org.apache.spark.ShuffleDependency@5960236d)
(MappedRDD[5] at map at
<console>:12,org.apache.spark.OneToOneDependency@36b5f6f2)



//full dependencies of the bad join -- notice the ShuffleDependency!

scala> flattenDeps(badJoined).foreach{println}
(badJoined FlatMappedValuesRDD[15] at join at
<console>:16,org.apache.spark.OneToOneDependency@1cf356cc)
(MappedValuesRDD[14] at join at
<console>:16,org.apache.spark.OneToOneDependency@5dea4db)
(CoGroupedRDD[13] at join at
<console>:16,org.apache.spark.ShuffleDependency@5c1928df)
(CoGroupedRDD[13] at join at
<console>:16,org.apache.spark.OneToOneDependency@77ca77b5)
(d ShuffledRDD[3] at groupByKey at
<console>:12,org.apache.spark.ShuffleDependency@7ba8a080)
(MappedRDD[2] at map at
<console>:12,org.apache.spark.OneToOneDependency@7bc172ec)
(d3 ShuffledRDD[12] at groupByKey at
<console>:12,org.apache.spark.ShuffleDependency@d794984)
(MappedRDD[11] at map at
<console>:12,org.apache.spark.OneToOneDependency@15c98005)



On Thu, Feb 12, 2015 at 10:05 AM, Karlson <ksonsp...@siberie.de> wrote:

Hi Imran,

thanks for your quick reply.

Actually I am doing this:

    rddA = rddA.partitionBy(n).cache()
    rddB = rddB.partitionBy(n).cache()

followed by

    rddA.count()
    rddB.count()

then joinedRDD = rddA.join(rddB)

I thought that the count() would force the evaluation, so any subsequent joins would be shuffleless. I was wrong about the shuffle amounts however. The shuffle write is actually 2GB (i.e. the size of the bigger RDD) whil there is no Shuffle read. A joinedRdd.count() does a shuffle read of about
1GB in size, though.

The getPartitions-method does not exist on the resulting RDD (I am using the Python API). There is however foreachPartition(). What is the line

    joinedRdd.getPartitions.foreach{println}

supposed to do?

Thank you,

Karlson

PS: Sorry for sending this twice, I accidentally did not reply to the
mailing list first.



On 2015-02-12 16:48, Imran Rashid wrote:

Hi Karlson,

I think your assumptions are correct -- that join alone shouldn't
require
any shuffling.  But its possible you are getting tripped up by lazy
evaluation of RDDs. After you do your partitionBy, are you sure those
RDDs
are actually materialized & cached somewhere?  eg., if you just did
this:

val rddA = someData.partitionBy(N)
val rddB = someOtherData.partitionBy(N)
val joinedRdd = rddA.join(rddB)
joinedRdd.count() //or any other action

then the partitioning isn't actually getting run until you do the join.
So
though the join itself can happen without partitioning,
joinedRdd.count()
will trigger the evaluation of rddA & rddB which will require shuffles. Note that even if you have some intervening action on rddA & rddB that shuffles them, unless you persist the result, you will need to reshuffle
them for the join.

If this doesn't help explain things, for debugging

joinedRdd.getPartitions.foreach{println}

this is getting into the weeds, but at least this will tell us whether
or
not you are getting narrow dependencies, which would avoid the shuffle.
 (Does anyone know of a simpler way to check this?)

hope this helps,
Imran




On Thu, Feb 12, 2015 at 9:25 AM, Karlson <ksonsp...@siberie.de> wrote:

Hi All,

using Pyspark, I create two RDDs (one with about 2M records (~200MB),
the
other with about 8M records (~2GB)) of the format (key, value).

I've done a partitionBy(num_partitions) on both RDDs and verified that both RDDs have the same number of partitions and that equal keys reside
on
the same partition (via mapPartitionsWithIndex).

Now I'd expect that for a join on the two RDDs no shuffling is
necessary.
Looking at the Web UI under http://driver:4040 however reveals that
that
assumption is false.

In fact I am seeing shuffle writes of about 200MB and reads of about
50MB.

What's the explanation for that behaviour? Where am I wrong with my
assumption?

Thanks in advance,

Karlson

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to