I did this

val lstgItemMap = listings.map { lstg => (lstg.getItemId().toLong, lstg)
}.collectAsMap
    val broadCastMap = sc.broadcast(lstgItemMap)

    val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary,
Long))] = viEvents.mapPartitions({
      iter =>
        val lstgItemMap = broadCastMap.value
        for {
          (itemId, viDetail) <- iter
          if (lstgItemMap.contains(itemId))
        } yield ({
          val listing = lstgItemMap.get(itemId).get
          val viSummary = new VISummary
          viSummary.leafCategoryId = listing.getLeafCategId().toInt
          viSummary.itemSiteId = listing.getItemSiteId().toInt
          viSummary.auctionTypeCode = listing.getAuctTypeCode().toInt
          viSummary.sellerCountryId = listing.getSlrCntryId().toInt
          viSummary.buyerSegment = "0"
          viSummary.isBin = (if
(listing.getBinPriceLstgCurncy.doubleValue() > 0) 1 else 0)

          val sellerId = listing.getSlrId.toLong
          (sellerId, (viDetail, viSummary, itemId))
        })
    })


Earlier :

    val viEvents = details.map { vi => (vi.get(14).asInstanceOf[Long], vi) }
    val lstgItem = listings.map { lstg => (lstg.getItemId().toLong, lstg) }

    val viEventsWithListings: RDD[(Long, (DetailInputRecord,
VISummary, Long))] = lstgItem.join(viEvents).map {
      case (itemId, (listing, viDetail)) =>
        val viSummary = new VISummary
        viSummary.leafCategoryId = listing.getLeafCategId().toInt
        viSummary.itemSiteId = listing.getItemSiteId().toInt
        viSummary.auctionTypeCode = listing.getAuctTypeCode().toInt
        viSummary.sellerCountryId = listing.getSlrCntryId().toInt
        viSummary.buyerSegment = "0"
        viSummary.isBin = (if
(listing.getBinPriceLstgCurncy.doubleValue() > 0) 1 else 0)
        val sellerId = listing.getSlrId.toLong
        (sellerId, (viDetail, viSummary, itemId))
    }


Waiting for run to complete.


On Tue, Apr 21, 2015 at 9:54 AM, Punyashloka Biswal <punya.bis...@gmail.com>
wrote:

> Could you do it using flatMap?
>
> Punya
>
> On Tue, Apr 21, 2015 at 12:19 AM ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>
> wrote:
>
>> The reason am asking this is, i am not able to understand how do i do a
>> skip.
>>
>> 1) Broadcast small table-1 as map.
>> 2) I jun do .map() on large table-2.
>>        When you do .map() you must map each element to a new element.
>>  However with map-side join, when i get the broadcasted map, i will
>> search in it with a key, and if that element in not found in map then i
>> want to skip that input all together. (This is what happens when you do
>> .join, it skips automatically). With map side join you need to do it. I am
>> assuming you do it with mapPartitions & yield.
>>
>> A working code will help me understand it better.
>>
>> On Tue, Apr 21, 2015 at 9:40 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>
>> wrote:
>>
>>> Can someone share their working code of Map Side join in Spark + Scala.
>>> (No Spark-SQL)
>>>
>>> The only resource i could find was this (Open in chrome with Chinese to
>>> english translator)
>>>
>>> http://dongxicheng.org/framework-on-yarn/apache-spark-join-two-tables/
>>>
>>>
>>>
>>> --
>>> Deepak
>>>
>>>
>>
>>
>> --
>> Deepak
>>
>>


-- 
Deepak

Reply via email to