@Daniel
It's true that the first map in your code is needed, i.e. mapping so that
dstID is the new RDD key.

The self-join on the dstKey will then create all the pairs of incoming
edges (plus self-referential and duplicates that need to be filtered out).

@Koert
Are there any guidelines about setting the number of partitions in
HashParitioner then?

What I know about my data is that the distribution of indegree value
(number of incoming edges for a vertex) will be similar to a power law
<https://en.wikipedia.org/wiki/Power_law>, i.e.
there will be a small number of keys with a high number of incoming edges,
while most of the keys will
have incoming few edges.

What is a good partitioning strategy for a self-join on an RDD with
unbalanced key distributions?



On Mon, Dec 8, 2014 at 3:59 PM, Daniel Darabos <
daniel.dara...@lynxanalytics.com> wrote:

> I do not see how you hope to generate all incoming edge pairs without
> repartitioning the data by dstID. You need to perform this shuffle for
> joining too. Otherwise two incoming edges could be in separate partitions
> and never meet. Am I missing something?
>
> On Mon, Dec 8, 2014 at 3:53 PM, Theodore Vasiloudis <
> theodoros.vasilou...@gmail.com> wrote:
>
>> Using groupByKey was our first approach, and as noted in the docs is
>> highly inefficient due to the need to shuffle all the data. See
>> http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html
>>
>> On Mon, Dec 8, 2014 at 3:47 PM, Daniel Darabos <
>> daniel.dara...@lynxanalytics.com> wrote:
>>
>>> Could you not use a groupByKey instead of the join? I mean something
>>> like this:
>>>
>>> val byDst = rdd.map { case (src, dst, w) => dst -> (src, w) }
>>> byDst.groupByKey.map { case (dst, edges) =>
>>>   for {
>>>     (src1, w1) <- edges
>>>     (src2, w2) <- edges
>>>   } {
>>>     ??? // Do something.
>>>   }
>>>   ??? // Return something.
>>> }
>>>
>>> On Mon, Dec 8, 2014 at 3:28 PM, Koert Kuipers <ko...@tresata.com> wrote:
>>>
>>>> spark can do efficient joins if both RDDs have the same partitioner. so
>>>> in case of self join I would recommend to create an rdd that has explicit
>>>> partitioner and has been cached.
>>>> On Dec 8, 2014 8:52 AM, "Theodore Vasiloudis" <
>>>> theodoros.vasilou...@gmail.com> wrote:
>>>>
>>>>> Hello all,
>>>>>
>>>>> I am working on a graph problem using vanilla Spark (not GraphX) and
>>>>> at some
>>>>> point I would like to do a
>>>>> self join on an edges RDD[(srcID, dstID, w)] on the dst key, in order
>>>>> to get
>>>>> all pairs of incoming edges.
>>>>>
>>>>> Since this is the performance bottleneck for my code, I was wondering
>>>>> if
>>>>> there any steps to take before
>>>>> performing the self-join in order to make it as efficient as possible.
>>>>>
>>>>> In the  Learning Spark book
>>>>> <
>>>>> https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html
>>>>> >
>>>>> for example, in the "Data partitioning" section they recommend
>>>>> performing .partitionBy(new HashPartitioner(100)) on an RDD before
>>>>> joining
>>>>> it with another.
>>>>>
>>>>> Are there any guidelines for optimizing self-join performance?
>>>>>
>>>>> Regards,
>>>>> Theodore
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> View this message in context:
>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Efficient-self-joins-tp20576.html
>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>> Nabble.com.
>>>>>
>>>>> ---------------------------------------------------------------------
>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>
>>>>>
>>>
>>
>

Reply via email to