I did this
val lstgItemMap = listings.map { lstg => (lstg.getItemId().toLong, lstg) }.collectAsMap val broadCastMap = sc.broadcast(lstgItemMap) val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))] = viEvents.mapPartitions({ iter => val lstgItemMap = broadCastMap.value for { (itemId, viDetail) <- iter if (lstgItemMap.contains(itemId)) } yield ({ val listing = lstgItemMap.get(itemId).get val viSummary = new VISummary viSummary.leafCategoryId = listing.getLeafCategId().toInt viSummary.itemSiteId = listing.getItemSiteId().toInt viSummary.auctionTypeCode = listing.getAuctTypeCode().toInt viSummary.sellerCountryId = listing.getSlrCntryId().toInt viSummary.buyerSegment = "0" viSummary.isBin = (if (listing.getBinPriceLstgCurncy.doubleValue() > 0) 1 else 0) val sellerId = listing.getSlrId.toLong (sellerId, (viDetail, viSummary, itemId)) }) }) Earlier : val viEvents = details.map { vi => (vi.get(14).asInstanceOf[Long], vi) } val lstgItem = listings.map { lstg => (lstg.getItemId().toLong, lstg) } val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))] = lstgItem.join(viEvents).map { case (itemId, (listing, viDetail)) => val viSummary = new VISummary viSummary.leafCategoryId = listing.getLeafCategId().toInt viSummary.itemSiteId = listing.getItemSiteId().toInt viSummary.auctionTypeCode = listing.getAuctTypeCode().toInt viSummary.sellerCountryId = listing.getSlrCntryId().toInt viSummary.buyerSegment = "0" viSummary.isBin = (if (listing.getBinPriceLstgCurncy.doubleValue() > 0) 1 else 0) val sellerId = listing.getSlrId.toLong (sellerId, (viDetail, viSummary, itemId)) } Waiting for run to complete. On Tue, Apr 21, 2015 at 9:54 AM, Punyashloka Biswal <punya.bis...@gmail.com> wrote: > Could you do it using flatMap? > > Punya > > On Tue, Apr 21, 2015 at 12:19 AM ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> > wrote: > >> The reason am asking this is, i am not able to understand how do i do a >> skip. >> >> 1) Broadcast small table-1 as map. >> 2) I jun do .map() on large table-2. >> When you do .map() you must map each element to a new element. >> However with map-side join, when i get the broadcasted map, i will >> search in it with a key, and if that element in not found in map then i >> want to skip that input all together. (This is what happens when you do >> .join, it skips automatically). With map side join you need to do it. I am >> assuming you do it with mapPartitions & yield. >> >> A working code will help me understand it better. >> >> On Tue, Apr 21, 2015 at 9:40 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> >> wrote: >> >>> Can someone share their working code of Map Side join in Spark + Scala. >>> (No Spark-SQL) >>> >>> The only resource i could find was this (Open in chrome with Chinese to >>> english translator) >>> >>> http://dongxicheng.org/framework-on-yarn/apache-spark-join-two-tables/ >>> >>> >>> >>> -- >>> Deepak >>> >>> >> >> >> -- >> Deepak >> >> -- Deepak