In
https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38,
wouldn't it help to change the lines
vs = rdd.map(lambda (k, v): (k, (1, v)))
ws = other.map(lambda (k, v): (k, (2, v)))
to
vs = rdd.mapValues(lambda v: (1, v))
ws = other.mapValues(lambda v: (2, v))
?
As I understand, this would preserve the original partitioning.
On 2015-02-13 12:43, Karlson wrote:
Does that mean partitioning does not work in Python? Or does this only
effect joining?
On 2015-02-12 19:27, Davies Liu wrote:
The feature works as expected in Scala/Java, but not implemented in
Python.
On Thu, Feb 12, 2015 at 9:24 AM, Imran Rashid <iras...@cloudera.com>
wrote:
I wonder if the issue is that these lines just need to add
preservesPartitioning = true
?
https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38
I am getting the feeling this is an issue w/ pyspark
On Thu, Feb 12, 2015 at 10:43 AM, Imran Rashid <iras...@cloudera.com>
wrote:
ah, sorry I am not too familiar w/ pyspark, sorry I missed that
part. It
could be that pyspark doesn't properly support narrow dependencies,
or maybe
you need to be more explicit about the partitioner. I am looking
into the
pyspark api but you might have some better guesses here than I
thought.
My suggestion to do
joinedRdd.getPartitions.foreach{println}
was just to see if the partition was a NarrowCoGroupSplitDep or a
ShuffleCoGroupSplitDep -- but it was actually a bad suggestion,
those fields
are hidden deeper inside and are not user-visible. But I think a
better way
(in scala, anyway) is to look at rdd.dependencies. its a little
tricky,
though, you need to look deep into the lineage (example at the end).
Sean -- yes it does require both RDDs have the same partitioner, but
that
should happen naturally if you just specify the same number of
partitions,
you'll get equal HashPartitioners. There is a little difference in
the
scala & python api that I missed here. For partitionBy in scala,
you
actually need to specify the partitioner, but not in python.
However I
thought it would work like groupByKey, which does just take an int.
Here's a code example in scala -- not sure what is available from
python.
Hopefully somebody knows a simpler way to confirm narrow
dependencies??
val d = sc.parallelize(1 to 1e6.toInt).map{x => x ->
x}.groupByKey(64)
val d2 = sc.parallelize(3 to 1e6.toInt).map{x => x ->
x}.groupByKey(64)
scala> d.partitioner == d2.partitioner
res2: Boolean = true
val joined = d.join(d2)
val d3 = sc.parallelize(3 to 1e6.toInt).map{x => x ->
x}.groupByKey(100)
val badJoined = d.join(d3)
d.setName("d")
d2.setName("d2")
d3.setName("d3")
joined.setName("joined")
badJoined.setName("badJoined")
//unfortunatley, just looking at the immediate dependencies of
joined &
badJoined is misleading, b/c join actually creates
// one more step after the shuffle
scala> joined.dependencies
res20: Seq[org.apache.spark.Dependency[_]] =
List(org.apache.spark.OneToOneDependency@74751ac8)
//even with the join that does require a shuffle, we still see a
OneToOneDependency, but thats just a simple flatMap step
scala> badJoined.dependencies
res21: Seq[org.apache.spark.Dependency[_]] =
List(org.apache.spark.OneToOneDependency@1cf356cc)
//so lets make a helper function to get all the dependencies
recursively
def flattenDeps(rdd: RDD[_]): Seq[(RDD[_], Dependency[_])] = {
val deps = rdd.dependencies
deps.map{rdd -> _} ++ deps.flatMap{dep => flattenDeps(dep.rdd)}
}
//full dependencies of the good join
scala> flattenDeps(joined).foreach{println}
(joined FlatMappedValuesRDD[9] at join at
<console>:16,org.apache.spark.OneToOneDependency@74751ac8)
(MappedValuesRDD[8] at join at
<console>:16,org.apache.spark.OneToOneDependency@623264af)
(CoGroupedRDD[7] at join at
<console>:16,org.apache.spark.OneToOneDependency@5a704f86)
(CoGroupedRDD[7] at join at
<console>:16,org.apache.spark.OneToOneDependency@37514cd)
(d ShuffledRDD[3] at groupByKey at
<console>:12,org.apache.spark.ShuffleDependency@7ba8a080)
(MappedRDD[2] at map at
<console>:12,org.apache.spark.OneToOneDependency@7bc172ec)
(d2 ShuffledRDD[6] at groupByKey at
<console>:12,org.apache.spark.ShuffleDependency@5960236d)
(MappedRDD[5] at map at
<console>:12,org.apache.spark.OneToOneDependency@36b5f6f2)
//full dependencies of the bad join -- notice the
ShuffleDependency!
scala> flattenDeps(badJoined).foreach{println}
(badJoined FlatMappedValuesRDD[15] at join at
<console>:16,org.apache.spark.OneToOneDependency@1cf356cc)
(MappedValuesRDD[14] at join at
<console>:16,org.apache.spark.OneToOneDependency@5dea4db)
(CoGroupedRDD[13] at join at
<console>:16,org.apache.spark.ShuffleDependency@5c1928df)
(CoGroupedRDD[13] at join at
<console>:16,org.apache.spark.OneToOneDependency@77ca77b5)
(d ShuffledRDD[3] at groupByKey at
<console>:12,org.apache.spark.ShuffleDependency@7ba8a080)
(MappedRDD[2] at map at
<console>:12,org.apache.spark.OneToOneDependency@7bc172ec)
(d3 ShuffledRDD[12] at groupByKey at
<console>:12,org.apache.spark.ShuffleDependency@d794984)
(MappedRDD[11] at map at
<console>:12,org.apache.spark.OneToOneDependency@15c98005)
On Thu, Feb 12, 2015 at 10:05 AM, Karlson <ksonsp...@siberie.de>
wrote:
Hi Imran,
thanks for your quick reply.
Actually I am doing this:
rddA = rddA.partitionBy(n).cache()
rddB = rddB.partitionBy(n).cache()
followed by
rddA.count()
rddB.count()
then joinedRDD = rddA.join(rddB)
I thought that the count() would force the evaluation, so any
subsequent
joins would be shuffleless. I was wrong about the shuffle amounts
however.
The shuffle write is actually 2GB (i.e. the size of the bigger RDD)
whil
there is no Shuffle read. A joinedRdd.count() does a shuffle read
of about
1GB in size, though.
The getPartitions-method does not exist on the resulting RDD (I am
using
the Python API). There is however foreachPartition(). What is the
line
joinedRdd.getPartitions.foreach{println}
supposed to do?
Thank you,
Karlson
PS: Sorry for sending this twice, I accidentally did not reply to
the
mailing list first.
On 2015-02-12 16:48, Imran Rashid wrote:
Hi Karlson,
I think your assumptions are correct -- that join alone shouldn't
require
any shuffling. But its possible you are getting tripped up by
lazy
evaluation of RDDs. After you do your partitionBy, are you sure
those
RDDs
are actually materialized & cached somewhere? eg., if you just
did
this:
val rddA = someData.partitionBy(N)
val rddB = someOtherData.partitionBy(N)
val joinedRdd = rddA.join(rddB)
joinedRdd.count() //or any other action
then the partitioning isn't actually getting run until you do the
join.
So
though the join itself can happen without partitioning,
joinedRdd.count()
will trigger the evaluation of rddA & rddB which will require
shuffles.
Note that even if you have some intervening action on rddA & rddB
that
shuffles them, unless you persist the result, you will need to
reshuffle
them for the join.
If this doesn't help explain things, for debugging
joinedRdd.getPartitions.foreach{println}
this is getting into the weeds, but at least this will tell us
whether
or
not you are getting narrow dependencies, which would avoid the
shuffle.
(Does anyone know of a simpler way to check this?)
hope this helps,
Imran
On Thu, Feb 12, 2015 at 9:25 AM, Karlson <ksonsp...@siberie.de>
wrote:
Hi All,
using Pyspark, I create two RDDs (one with about 2M records
(~200MB),
the
other with about 8M records (~2GB)) of the format (key, value).
I've done a partitionBy(num_partitions) on both RDDs and verified
that
both RDDs have the same number of partitions and that equal keys
reside
on
the same partition (via mapPartitionsWithIndex).
Now I'd expect that for a join on the two RDDs no shuffling is
necessary.
Looking at the Web UI under http://driver:4040 however reveals
that
that
assumption is false.
In fact I am seeing shuffle writes of about 200MB and reads of
about
50MB.
What's the explanation for that behaviour? Where am I wrong with
my
assumption?
Thanks in advance,
Karlson
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org