Sending the response back to the dev list so this is indexable and
searchable by others.
-- Forwarded message --
From: Milos Nikolic milos.nikoli...@gmail.com
Date: Sat, Aug 30, 2014 at 5:50 PM
Subject: Re: Partitioning strategy changed in Spark 1.0.x?
To: Reynold Xin r...@databricks.com
Thank you, your insights were very helpful, and we managed to find a
solution that works for us.
Best,
Milos
On Aug 27, 2014, at 11:20 PM, Reynold Xin r...@databricks.com wrote:
I don't think you can ever expect the mapping from data to physical nodes
in Spark, even in Spark 0.9. That is because the scheduler needs to be
fault-tolerant. What if the node is busy or the node is down?
What happens is the partitioning of data is deterministic, i.e. certain
data is always hashed into certain partitions (given the same partition
count). And if you don't run foreach twice, but instead simply zip the two
RDDs that are both hash partitioned using the same partitioner, then the
scheduler will not create extra stages.
e.g.
// Let's say I have 10 nodes
val partitioner = new HashPartitioner(10)
// Create RDD
val rdd = sc.parallelize(0 until 10).map(k = (k, computeValue(k)))
// Partition twice using the same partitioner
val p1 = rdd.partitionBy(partitioner)
val p2 = rdd.partitionBy(partitioner)
p1.zip(p2) --- this should work
On Wed, Aug 27, 2014 at 1:50 PM, Milos Nikolic milos.nikoli...@gmail.com
wrote:
Sure.
Suppose we have two SQL relations, expressed as two RDDs, and we want to
do a hash join between them. First, we would partition each RDD on the join
key — that will collocate partitions with the same join key on one node.
Then, I would zip corresponding partitions from two relations and do a
local join on each node.
This approach makes sense only if Spark always places key X on node Y for
both RDDs, which is not true now. And I have no idea how to circumvent this
issue with the recent changes in hashing you mentioned.
Milos
On Aug 27, 2014, at 10:05 PM, Reynold Xin r...@databricks.com wrote:
Can you elaborate your problem?
I am not sure if I understand what you mean by on one node, I get two
different sets of keys
On Tue, Aug 26, 2014 at 2:16 AM, Milos Nikolic milos.nikoli...@gmail.com
wrote:
Hi Reynold,
The problem still exists even with more elements. On one node, I get two
different
sets of keys -- I want these local sets to be the same to be able to zip
local partitions
together later on (rather than RDD.join them, which involves shuffling).
With this recent change in hashing, RDD.zip seems not to be useful
anymore
as I cannot guarantee anymore that local partitions from two RDDs will
share
the same set of keys on one node.
Do you have any ideas on how to resolve this problem?
Thanks,
Milos
On Aug 26, 2014, at 10:04 AM, Reynold Xin r...@databricks.com wrote:
It is better to use a larger number of elements rather than just 10 for
this test.
Can you try larger? Like 1000 or 1?
IIRC, the hash function changed to murmur hash:
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala#L205
On Tue, Aug 26, 2014 at 1:01 AM, Milos Nikolic milos.nikoli...@gmail.com
wrote:
Hi guys,
I’ve noticed some changes in the behavior of partitioning under Spark
1.0.x.
I’d appreciate if someone could explain what has changed in the meantime.
Here is a small example. I want to create two RDD[(K, V)] objects and
then
collocate partitions with the same K on one node. When the same
partitioner
for two RDDs is used, partitions with the same K end up being on
different nodes.
// Let's say I have 10 nodes
val partitioner = new HashPartitioner(10)
// Create RDD
val rdd = sc.parallelize(0 until 10).map(k = (k, computeValue(k)))
// Partition twice using the same partitioner
rdd.partitionBy(partitioner).foreach { case (k, v) =
println(Dummy1 - k = + k) }
rdd.partitionBy(partitioner).foreach { case (k, v) =
println(Dummy2 - k = + k) }
The output on one node is:
Dummy1 - k = 2
Dummy2 - k = 7
I was expecting to see the same keys on each node. That was happening
under Spark 0.9.2, but not under Spark 1.0.x.
Anyone has an idea what has changed in the meantime? Or how to get
corresponding partitions on one node?
Thanks in advance,
Milos
-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org