@Daniel It's true that the first map in your code is needed, i.e. mapping so that dstID is the new RDD key.
The self-join on the dstKey will then create all the pairs of incoming edges (plus self-referential and duplicates that need to be filtered out). @Koert Are there any guidelines about setting the number of partitions in HashParitioner then? What I know about my data is that the distribution of indegree value (number of incoming edges for a vertex) will be similar to a power law <https://en.wikipedia.org/wiki/Power_law>, i.e. there will be a small number of keys with a high number of incoming edges, while most of the keys will have incoming few edges. What is a good partitioning strategy for a self-join on an RDD with unbalanced key distributions? On Mon, Dec 8, 2014 at 3:59 PM, Daniel Darabos < daniel.dara...@lynxanalytics.com> wrote: > I do not see how you hope to generate all incoming edge pairs without > repartitioning the data by dstID. You need to perform this shuffle for > joining too. Otherwise two incoming edges could be in separate partitions > and never meet. Am I missing something? > > On Mon, Dec 8, 2014 at 3:53 PM, Theodore Vasiloudis < > theodoros.vasilou...@gmail.com> wrote: > >> Using groupByKey was our first approach, and as noted in the docs is >> highly inefficient due to the need to shuffle all the data. See >> http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html >> >> On Mon, Dec 8, 2014 at 3:47 PM, Daniel Darabos < >> daniel.dara...@lynxanalytics.com> wrote: >> >>> Could you not use a groupByKey instead of the join? I mean something >>> like this: >>> >>> val byDst = rdd.map { case (src, dst, w) => dst -> (src, w) } >>> byDst.groupByKey.map { case (dst, edges) => >>> for { >>> (src1, w1) <- edges >>> (src2, w2) <- edges >>> } { >>> ??? // Do something. >>> } >>> ??? // Return something. >>> } >>> >>> On Mon, Dec 8, 2014 at 3:28 PM, Koert Kuipers <ko...@tresata.com> wrote: >>> >>>> spark can do efficient joins if both RDDs have the same partitioner. so >>>> in case of self join I would recommend to create an rdd that has explicit >>>> partitioner and has been cached. >>>> On Dec 8, 2014 8:52 AM, "Theodore Vasiloudis" < >>>> theodoros.vasilou...@gmail.com> wrote: >>>> >>>>> Hello all, >>>>> >>>>> I am working on a graph problem using vanilla Spark (not GraphX) and >>>>> at some >>>>> point I would like to do a >>>>> self join on an edges RDD[(srcID, dstID, w)] on the dst key, in order >>>>> to get >>>>> all pairs of incoming edges. >>>>> >>>>> Since this is the performance bottleneck for my code, I was wondering >>>>> if >>>>> there any steps to take before >>>>> performing the self-join in order to make it as efficient as possible. >>>>> >>>>> In the Learning Spark book >>>>> < >>>>> https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html >>>>> > >>>>> for example, in the "Data partitioning" section they recommend >>>>> performing .partitionBy(new HashPartitioner(100)) on an RDD before >>>>> joining >>>>> it with another. >>>>> >>>>> Are there any guidelines for optimizing self-join performance? >>>>> >>>>> Regards, >>>>> Theodore >>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> View this message in context: >>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Efficient-self-joins-tp20576.html >>>>> Sent from the Apache Spark User List mailing list archive at >>>>> Nabble.com. >>>>> >>>>> --------------------------------------------------------------------- >>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>> For additional commands, e-mail: user-h...@spark.apache.org >>>>> >>>>> >>> >> >