This is how i implemented map-side join using broadcast.

   val listings = DataUtil.getDwLstgItem(sc,
DateUtil.addDaysToDate(startDate, -89))
    val viEvents = details.map { vi => (vi.get(14).asInstanceOf[Long], vi) }

    val lstgItemMap = listings.map { lstg => (lstg.getItemId().toLong,
lstg) }.*collectAsMap*
    val broadCastMap = sc.*broadcast*(lstgItemMap)
    val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary,
Long))] = viEvents.mapPartitions({
      iter =>
        val lstgItemMap = broadCastMap.value
        *for* {
          (itemId, viDetail) <- iter
          if (lstgItemMap.contains(itemId))
        } *yield* ({
          val listing = lstgItemMap.get(itemId).get
          val viSummary = new VISummary
          viSummary.leafCategoryId = listing.getLeafCategId().toInt
          viSummary.itemSiteId = listing.getItemSiteId().toInt
          viSummary.auctionTypeCode = listing.getAuctTypeCode().toInt
          viSummary.sellerCountryId = listing.getSlrCntryId().toInt
          viSummary.buyerSegment = "0"
          viSummary.isBin = (if
(listing.getBinPriceLstgCurncy.doubleValue() > 0) 1 else 0)

          val sellerId = listing.getSlrId.toLong
          (sellerId, (viDetail, viSummary, itemId))
        })
    })

Usage: collectAsMap, broadcast, scala-yield.

Learning: As lstgItemMap is collected as Map on driver, when its size
exceeds driver-memory it throws OOM error. As i had a limit of 12G on
memory and my dataset size is around 100G, i could not use map-side join
and switched back to join().

Sharing this so others can use the code example in case they want to
implement map side join with Spark+Scala.

On Tue, Apr 21, 2015 at 7:32 PM, ayan guha <guha.a...@gmail.com> wrote:

> Hi
>
> Sorry was typing from mobile hence could not elaborate earlier.
>
> I presume you want to do map-side join and you mean you want to join 2 RDD
> without shuffle?
>
> Please have a quick look
> http://apache-spark-user-list.1001560.n3.nabble.com/Text-file-and-shuffle-td5973.html#none
>
> 1) co-partition you data for cogroup:
>
>     val par = HashPartitioner(128)
>     val x = sc.textFile(..).map(...).partitionBy(par)
>     val y = sc.textFile(...).map(...).partitionBy(par)
>     ...
>
> This should enable join with (much less) shuffle.
>
> Another option provided in the same thread - to broadcast in case one of
> the table is small(ish).
>
> Hope this helps.
>
> Best
> Ayan
>
> On Tue, Apr 21, 2015 at 3:56 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>
> wrote:
>
>> These are pair RDDs (itemId, item) & (itemId, listing).
>>
>> What do you mean by re-partitioning of these RDDS ?
>> Now what you mean by "your partitioner"
>>
>> Can you elaborate ?
>>
>> On Tue, Apr 21, 2015 at 11:18 AM, ayan guha <guha.a...@gmail.com> wrote:
>>
>>> If you are using a pairrdd, then you can use partition by method to
>>> provide your partitioner
>>> On 21 Apr 2015 15:04, "ÐΞ€ρ@Ҝ (๏̯͡๏)" <deepuj...@gmail.com> wrote:
>>>
>>>> What is re-partition ?
>>>>
>>>> On Tue, Apr 21, 2015 at 10:23 AM, ayan guha <guha.a...@gmail.com>
>>>> wrote:
>>>>
>>>>> In my understanding you need to create a key out of the data and
>>>>> repartition both datasets to achieve map side join.
>>>>> On 21 Apr 2015 14:10, "ÐΞ€ρ@Ҝ (๏̯͡๏)" <deepuj...@gmail.com> wrote:
>>>>>
>>>>>> Can someone share their working code of Map Side join in Spark +
>>>>>> Scala. (No Spark-SQL)
>>>>>>
>>>>>> The only resource i could find was this (Open in chrome with Chinese
>>>>>> to english translator)
>>>>>>
>>>>>> http://dongxicheng.org/framework-on-yarn/apache-spark-join-two-tables/
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Deepak
>>>>>>
>>>>>>
>>>>
>>>>
>>>> --
>>>> Deepak
>>>>
>>>>
>>
>>
>> --
>> Deepak
>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>



-- 
Deepak

Reply via email to