Unusual behavior with leftouterjoin

2015-04-11 Thread ๏̯͡๏
I have two RDD

leftRDD = RDD[(Long, (DetailInputRecord, VISummary, Long))]
and
rightRDD =
RDD[(Long, com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLevelMetricSum)

DetailInputRecord is a object that contains (guid, sessionKey,
sessionStartDAte, siteID)

There are 10 records in leftRDD (confirmed with leftRDD.count, and each of
DetailInputRecord record in leftRDD has data within its members)

I do leftRDD.leftOuterJoin(rightRDD)

viEventsWithListings  = leftRDD
spsLvlMetric   = rightRDD

val viEventsWithListingsJoinSpsLevelMetric =
viEventsWithListings.leftOuterJoin(spsLvlMetric).map  {
  case (viJoinSpsLevelMetric) => {
val (sellerId, ((viEventDetail, viSummary, itemId), spsLvlMetric))
= viJoinSpsLevelMetric

println("sellerId:" + sellerId)
println("sessionKey:" + viEventDetail.get("sessionKey"))
println("guid:" + viEventDetail.get("guid"))
println("sessionStartDate:" + viEventDetail.get("sessionStartDate"))
println("siteId:" + viEventDetail.get("siteId"))

if (spsLvlMetric.isDefined) {

// do something

 }
}

I print  each of the items within the DetailInputRecord (viEventDetail) of
viEventsWithListings before and within leftOuterJoin.  Before leftOuterJoin
i get values of each member within record (total 10 records).

Within join when i do the print i get only guid as value for all members.
How is this possible ?

Within join: (print statements. These are guids)
sessionKey:27c9fbc014b4f61526f0574001b73b00
guid:27c9fbc014b4f61526f0574001b73b00
sessionStartDate:27c9fbc014b4f61526f0574001b73b00
siteId:27c9fbc014b4f61526f0574001b73b00

What went wrong, i have debugged multiple times but fail to understand the
reason.
Appreciate your help
-- 
Deepak


Re: Unusual behavior with leftouterjoin

2015-04-11 Thread ๏̯͡๏
I took that RDD run through it and printed 4 elements from it, they all
printed correctly.


val x = viEvents.map {
  case (itemId, event) =>
println(event.get("guid"), itemId, event.get("itemId"),
event.get("siteId"))
(itemId, event)
}

The above code prints

(27c9fbc014b4f61526f0574001b73b00,261197590161,261197590161,3)
(27c9fbc014b4f61526f0574001b73b00,261197590161,261197590161,3)
(27c9fbc014b4f61526f0574001b73b00,261197590161,261197590161,3)
(340da8c014a46272c0c8c830011c3bf0,221733319941,221733319941,77)
(340da8c014a46272c0c8c830011c3bf0,181704048554,181704048554,77)
(340da8c014a46272c0c8c830011c3bf0,231524481696,231524481696,77)
(340da8c014a46272c0c8c830011c3bf0,271830464992,271830464992,77)
(393938d71480a2aaf8e440d1fff709f4,141586046141,141586046141,0)
(3a792a7c14c0a35882346c04fff4e236,161605492016,161605492016,0)
(3a792a7c14c0a35882346c04fff4e236,161605492016,161605492016,0)

viEvents.collect.foreach(a => println(a._2.get("guid"), a._1,
a._2.get("itemId"), a._2.get("siteId")))

*Now, i collected it, this might have lead to serialization of the RDD.*
Now when i print the same 4 elements, *i only get guid values for all. Has
this got something to do with serialization ?*

(27c9fbc014b4f61526f0574001b73b00,261197590161,27c9fbc014b4f61526f0574001b73b00,27c9fbc014b4f61526f0574001b73b00)
(27c9fbc014b4f61526f0574001b73b00,261197590161,27c9fbc014b4f61526f0574001b73b00,27c9fbc014b4f61526f0574001b73b00)
(27c9fbc014b4f61526f0574001b73b00,261197590161,27c9fbc014b4f61526f0574001b73b00,27c9fbc014b4f61526f0574001b73b00)
(340da8c014a46272c0c8c830011c3bf0,221733319941,340da8c014a46272c0c8c830011c3bf0,340da8c014a46272c0c8c830011c3bf0)
(340da8c014a46272c0c8c830011c3bf0,181704048554,340da8c014a46272c0c8c830011c3bf0,340da8c014a46272c0c8c830011c3bf0)
(340da8c014a46272c0c8c830011c3bf0,231524481696,340da8c014a46272c0c8c830011c3bf0,340da8c014a46272c0c8c830011c3bf0)
(340da8c014a46272c0c8c830011c3bf0,271830464992,340da8c014a46272c0c8c830011c3bf0,340da8c014a46272c0c8c830011c3bf0)
(393938d71480a2aaf8e440d1fff709f4,141586046141,393938d71480a2aaf8e440d1fff709f4,393938d71480a2aaf8e440d1fff709f4)
(3a792a7c14c0a35882346c04fff4e236,161605492016,3a792a7c14c0a35882346c04fff4e236,3a792a7c14c0a35882346c04fff4e236)
(3a792a7c14c0a35882346c04fff4e236,161605492016,3a792a7c14c0a35882346c04fff4e236,3a792a7c14c0a35882346c04fff4e236)



The RDD is of type org.apache.spark.rdd.RDD[(Long,
 com.ebay.ep.poc.spark.reporting.process.detail.model.DetailInputRecord)]

At the time of context creation i did this
val conf = new SparkConf()
  .setAppName(detail)
  .set("spark.serializer", "org.apache.spark.serializer.*KryoSerializer*
")
  .set("spark.kryoserializer.buffer.mb",
arguments.get("buffersize").get)
  .set("spark.kryoserializer.buffer.max.mb",
arguments.get("maxbuffersize").get)
  .set("spark.driver.maxResultSize", arguments.get("maxResultSize").get)
  .set("spark.yarn.maxAppAttempts", "1")

.registerKryoClasses(Array(classOf[com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLevelMetricSum],

classOf[com.ebay.ep.poc.spark.reporting.process.detail.model.DetailInputRecord],

classOf[com.ebay.ep.poc.spark.reporting.process.detail.model.InputRecord],

classOf[com.ebay.ep.poc.spark.reporting.process.model.SessionRecord],
  classOf[com.ebay.ep.poc.spark.reporting.process.model.DataRecord],

classOf[com.ebay.ep.poc.spark.reporting.process.model.ExperimentationRecord]))

The class heirarchy is

DetailInputRecord extends InputRecord extends SessionRecord extends
ExperimentationRecord extends
   org.apache.avro.generic.GenericRecord.Record(schema: Schema)


Please suggest.







On Sat, Apr 11, 2015 at 4:50 PM, ÐΞ€ρ@Ҝ (๏̯͡๏)  wrote:

> I have two RDD
>
> leftRDD = RDD[(Long, (DetailInputRecord, VISummary, Long))]
> and
> rightRDD =
> RDD[(Long, com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLevelMetricSum)
>
> DetailInputRecord is a object that contains (guid, sessionKey,
> sessionStartDAte, siteID)
>
> There are 10 records in leftRDD (confirmed with leftRDD.count, and each of
> DetailInputRecord record in leftRDD has data within its members)
>
> I do leftRDD.leftOuterJoin(rightRDD)
>
> viEventsWithListings  = leftRDD
> spsLvlMetric   = rightRDD
>
> val viEventsWithListingsJoinSpsLevelMetric =
> viEventsWithListings.leftOuterJoin(spsLvlMetric).map  {
>   case (viJoinSpsLevelMetric) => {
> val (sellerId, ((viEventDetail, viSummary, itemId), spsLvlMetric))
> = viJoinSpsLevelMetric
>
> println("sellerId:" + sellerId)
> println("sessionKey:" + viEventDetail.get("sessionKey"))
> println("guid:" + viEventDetail.get("guid"))
> println("sessionStartDate:" +
> viEventDetail.get("sessionStartDate"))
> println("siteId:" + viEventDetail.get("siteId"))
>
> if (spsLvlMetric.isDefined) {
>
> // do something
>
>  }
> }
>
> I print  each of the items within the DetailInputRecord (viEventDetail) of
> viEventsWithListings before and w