This is how i implemented map-side join using broadcast. val listings = DataUtil.getDwLstgItem(sc, DateUtil.addDaysToDate(startDate, -89)) val viEvents = details.map { vi => (vi.get(14).asInstanceOf[Long], vi) }
val lstgItemMap = listings.map { lstg => (lstg.getItemId().toLong, lstg) }.*collectAsMap* val broadCastMap = sc.*broadcast*(lstgItemMap) val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))] = viEvents.mapPartitions({ iter => val lstgItemMap = broadCastMap.value *for* { (itemId, viDetail) <- iter if (lstgItemMap.contains(itemId)) } *yield* ({ val listing = lstgItemMap.get(itemId).get val viSummary = new VISummary viSummary.leafCategoryId = listing.getLeafCategId().toInt viSummary.itemSiteId = listing.getItemSiteId().toInt viSummary.auctionTypeCode = listing.getAuctTypeCode().toInt viSummary.sellerCountryId = listing.getSlrCntryId().toInt viSummary.buyerSegment = "0" viSummary.isBin = (if (listing.getBinPriceLstgCurncy.doubleValue() > 0) 1 else 0) val sellerId = listing.getSlrId.toLong (sellerId, (viDetail, viSummary, itemId)) }) }) Usage: collectAsMap, broadcast, scala-yield. Learning: As lstgItemMap is collected as Map on driver, when its size exceeds driver-memory it throws OOM error. As i had a limit of 12G on memory and my dataset size is around 100G, i could not use map-side join and switched back to join(). Sharing this so others can use the code example in case they want to implement map side join with Spark+Scala. On Tue, Apr 21, 2015 at 7:32 PM, ayan guha <guha.a...@gmail.com> wrote: > Hi > > Sorry was typing from mobile hence could not elaborate earlier. > > I presume you want to do map-side join and you mean you want to join 2 RDD > without shuffle? > > Please have a quick look > http://apache-spark-user-list.1001560.n3.nabble.com/Text-file-and-shuffle-td5973.html#none > > 1) co-partition you data for cogroup: > > val par = HashPartitioner(128) > val x = sc.textFile(..).map(...).partitionBy(par) > val y = sc.textFile(...).map(...).partitionBy(par) > ... > > This should enable join with (much less) shuffle. > > Another option provided in the same thread - to broadcast in case one of > the table is small(ish). > > Hope this helps. > > Best > Ayan > > On Tue, Apr 21, 2015 at 3:56 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> > wrote: > >> These are pair RDDs (itemId, item) & (itemId, listing). >> >> What do you mean by re-partitioning of these RDDS ? >> Now what you mean by "your partitioner" >> >> Can you elaborate ? >> >> On Tue, Apr 21, 2015 at 11:18 AM, ayan guha <guha.a...@gmail.com> wrote: >> >>> If you are using a pairrdd, then you can use partition by method to >>> provide your partitioner >>> On 21 Apr 2015 15:04, "ÐΞ€ρ@Ҝ (๏̯͡๏)" <deepuj...@gmail.com> wrote: >>> >>>> What is re-partition ? >>>> >>>> On Tue, Apr 21, 2015 at 10:23 AM, ayan guha <guha.a...@gmail.com> >>>> wrote: >>>> >>>>> In my understanding you need to create a key out of the data and >>>>> repartition both datasets to achieve map side join. >>>>> On 21 Apr 2015 14:10, "ÐΞ€ρ@Ҝ (๏̯͡๏)" <deepuj...@gmail.com> wrote: >>>>> >>>>>> Can someone share their working code of Map Side join in Spark + >>>>>> Scala. (No Spark-SQL) >>>>>> >>>>>> The only resource i could find was this (Open in chrome with Chinese >>>>>> to english translator) >>>>>> >>>>>> http://dongxicheng.org/framework-on-yarn/apache-spark-join-two-tables/ >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Deepak >>>>>> >>>>>> >>>> >>>> >>>> -- >>>> Deepak >>>> >>>> >> >> >> -- >> Deepak >> >> > > > -- > Best Regards, > Ayan Guha > -- Deepak