@Daniel

Not an expert either, I'm just going by what I see performance-wise
currently. Our groupByKey implementation was more than an order of
magnitude slower than using the self join and then reduceByKey.
FTA:

*"pairs on the same machine with the same key are combined (by using the
lamdba function passed into reduceByKey) before the data is shuffled. [...]
On the other hand, when calling groupByKey - all the key-value pairs are
shuffled around. This is a lot of unnessary data to being transferred over
the network."*
Maybe this is the reason that using the self-join performs better.
Hopefully someone with more knowledge can enlighten us.

Back on the topic, using Koert's suggestion of creating an explicit
Partitioner and caching the result I've been able to get ~1.5x improvement
in runtime for large datasets. I'm using 100 partitions currently, I'll
check if
increasing this value improves performance. Decreasing the number of
partitions has a large negative effect on the runtime.

On Mon, Dec 8, 2014 at 5:46 PM, Daniel Darabos <
daniel.dara...@lynxanalytics.com> wrote:

>
> On Mon, Dec 8, 2014 at 5:26 PM, Theodore Vasiloudis <
> theodoros.vasilou...@gmail.com> wrote:
>
>> @Daniel
>> It's true that the first map in your code is needed, i.e. mapping so that
>> dstID is the new RDD key.
>>
>
> You wrote groupByKey is "highly inefficient due to the need to shuffle
> all the data", but you seem to agree that the shuffle cannot be avoided. Both
> approaches cause 1 shuffle.
>
> I still don't see why you expect a speedup from doing this with a join.
> But don't let me discourage you or anything. I'm not an expert, just trying
> to learn.
>
> The self-join on the dstKey will then create all the pairs of incoming
>> edges (plus self-referential and duplicates that need to be filtered out).
>>
>> @Koert
>> Are there any guidelines about setting the number of partitions in
>> HashParitioner then?
>>
>> What I know about my data is that the distribution of indegree value
>> (number of incoming edges for a vertex) will be similar to a power law
>> <https://en.wikipedia.org/wiki/Power_law>, i.e.
>> there will be a small number of keys with a high number of incoming
>> edges, while most of the keys will
>> have incoming few edges.
>>
>> What is a good partitioning strategy for a self-join on an RDD with
>> unbalanced key distributions?
>>
>>
>>
>> On Mon, Dec 8, 2014 at 3:59 PM, Daniel Darabos <
>> daniel.dara...@lynxanalytics.com> wrote:
>>
>>> I do not see how you hope to generate all incoming edge pairs without
>>> repartitioning the data by dstID. You need to perform this shuffle for
>>> joining too. Otherwise two incoming edges could be in separate partitions
>>> and never meet. Am I missing something?
>>>
>>> On Mon, Dec 8, 2014 at 3:53 PM, Theodore Vasiloudis <
>>> theodoros.vasilou...@gmail.com> wrote:
>>>
>>>> Using groupByKey was our first approach, and as noted in the docs is
>>>> highly inefficient due to the need to shuffle all the data. See
>>>> http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html
>>>>
>>>> On Mon, Dec 8, 2014 at 3:47 PM, Daniel Darabos <
>>>> daniel.dara...@lynxanalytics.com> wrote:
>>>>
>>>>> Could you not use a groupByKey instead of the join? I mean something
>>>>> like this:
>>>>>
>>>>> val byDst = rdd.map { case (src, dst, w) => dst -> (src, w) }
>>>>> byDst.groupByKey.map { case (dst, edges) =>
>>>>>   for {
>>>>>     (src1, w1) <- edges
>>>>>     (src2, w2) <- edges
>>>>>   } {
>>>>>     ??? // Do something.
>>>>>   }
>>>>>   ??? // Return something.
>>>>> }
>>>>>
>>>>> On Mon, Dec 8, 2014 at 3:28 PM, Koert Kuipers <ko...@tresata.com>
>>>>> wrote:
>>>>>
>>>>>> spark can do efficient joins if both RDDs have the same partitioner.
>>>>>> so in case of self join I would recommend to create an rdd that has
>>>>>> explicit partitioner and has been cached.
>>>>>> On Dec 8, 2014 8:52 AM, "Theodore Vasiloudis" <
>>>>>> theodoros.vasilou...@gmail.com> wrote:
>>>>>>
>>>>>>> Hello all,
>>>>>>>
>>>>>>> I am working on a graph problem using vanilla Spark (not GraphX) and
>>>>>>> at some
>>>>>>> point I would like to do a
>>>>>>> self join on an edges RDD[(srcID, dstID, w)] on the dst key, in
>>>>>>> order to get
>>>>>>> all pairs of incoming edges.
>>>>>>>
>>>>>>> Since this is the performance bottleneck for my code, I was
>>>>>>> wondering if
>>>>>>> there any steps to take before
>>>>>>> performing the self-join in order to make it as efficient as
>>>>>>> possible.
>>>>>>>
>>>>>>> In the  Learning Spark book
>>>>>>> <
>>>>>>> https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html
>>>>>>> >
>>>>>>> for example, in the "Data partitioning" section they recommend
>>>>>>> performing .partitionBy(new HashPartitioner(100)) on an RDD before
>>>>>>> joining
>>>>>>> it with another.
>>>>>>>
>>>>>>> Are there any guidelines for optimizing self-join performance?
>>>>>>>
>>>>>>> Regards,
>>>>>>> Theodore
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> View this message in context:
>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Efficient-self-joins-tp20576.html
>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>> Nabble.com.
>>>>>>>
>>>>>>> ---------------------------------------------------------------------
>>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>>>
>>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to