I took that RDD run through it and printed 4 elements from it, they all printed correctly.
val x = viEvents.map { case (itemId, event) => println(event.get("guid"), itemId, event.get("itemId"), event.get("siteId")) (itemId, event) } The above code prints (27c9fbc014b4f61526f0574001b73b00,261197590161,261197590161,3) (27c9fbc014b4f61526f0574001b73b00,261197590161,261197590161,3) (27c9fbc014b4f61526f0574001b73b00,261197590161,261197590161,3) (340da8c014a46272c0c8c830011c3bf0,221733319941,221733319941,77) (340da8c014a46272c0c8c830011c3bf0,181704048554,181704048554,77) (340da8c014a46272c0c8c830011c3bf0,231524481696,231524481696,77) (340da8c014a46272c0c8c830011c3bf0,271830464992,271830464992,77) (393938d71480a2aaf8e440d1fff709f4,141586046141,141586046141,0) (3a792a7c14c0a35882346c04fff4e236,161605492016,161605492016,0) (3a792a7c14c0a35882346c04fff4e236,161605492016,161605492016,0) viEvents.collect.foreach(a => println(a._2.get("guid"), a._1, a._2.get("itemId"), a._2.get("siteId"))) *Now, i collected it, this might have lead to serialization of the RDD.* Now when i print the same 4 elements, *i only get guid values for all. Has this got something to do with serialization ?* (27c9fbc014b4f61526f0574001b73b00,261197590161,27c9fbc014b4f61526f0574001b73b00,27c9fbc014b4f61526f0574001b73b00) (27c9fbc014b4f61526f0574001b73b00,261197590161,27c9fbc014b4f61526f0574001b73b00,27c9fbc014b4f61526f0574001b73b00) (27c9fbc014b4f61526f0574001b73b00,261197590161,27c9fbc014b4f61526f0574001b73b00,27c9fbc014b4f61526f0574001b73b00) (340da8c014a46272c0c8c830011c3bf0,221733319941,340da8c014a46272c0c8c830011c3bf0,340da8c014a46272c0c8c830011c3bf0) (340da8c014a46272c0c8c830011c3bf0,181704048554,340da8c014a46272c0c8c830011c3bf0,340da8c014a46272c0c8c830011c3bf0) (340da8c014a46272c0c8c830011c3bf0,231524481696,340da8c014a46272c0c8c830011c3bf0,340da8c014a46272c0c8c830011c3bf0) (340da8c014a46272c0c8c830011c3bf0,271830464992,340da8c014a46272c0c8c830011c3bf0,340da8c014a46272c0c8c830011c3bf0) (393938d71480a2aaf8e440d1fff709f4,141586046141,393938d71480a2aaf8e440d1fff709f4,393938d71480a2aaf8e440d1fff709f4) (3a792a7c14c0a35882346c04fff4e236,161605492016,3a792a7c14c0a35882346c04fff4e236,3a792a7c14c0a35882346c04fff4e236) (3a792a7c14c0a35882346c04fff4e236,161605492016,3a792a7c14c0a35882346c04fff4e236,3a792a7c14c0a35882346c04fff4e236) The RDD is of type org.apache.spark.rdd.RDD[(Long, com.ebay.ep.poc.spark.reporting.process.detail.model.DetailInputRecord)] At the time of context creation i did this val conf = new SparkConf() .setAppName(detail) .set("spark.serializer", "org.apache.spark.serializer.*KryoSerializer* ") .set("spark.kryoserializer.buffer.mb", arguments.get("buffersize").get) .set("spark.kryoserializer.buffer.max.mb", arguments.get("maxbuffersize").get) .set("spark.driver.maxResultSize", arguments.get("maxResultSize").get) .set("spark.yarn.maxAppAttempts", "1") .registerKryoClasses(Array(classOf[com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLevelMetricSum], classOf[com.ebay.ep.poc.spark.reporting.process.detail.model.DetailInputRecord], classOf[com.ebay.ep.poc.spark.reporting.process.detail.model.InputRecord], classOf[com.ebay.ep.poc.spark.reporting.process.model.SessionRecord], classOf[com.ebay.ep.poc.spark.reporting.process.model.DataRecord], classOf[com.ebay.ep.poc.spark.reporting.process.model.ExperimentationRecord])) The class heirarchy is DetailInputRecord extends InputRecord extends SessionRecord extends ExperimentationRecord extends org.apache.avro.generic.GenericRecord.Record(schema: Schema) Please suggest. On Sat, Apr 11, 2015 at 4:50 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> wrote: > I have two RDD > > leftRDD = RDD[(Long, (DetailInputRecord, VISummary, Long))] > and > rightRDD = > RDD[(Long, com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLevelMetricSum) > > DetailInputRecord is a object that contains (guid, sessionKey, > sessionStartDAte, siteID) > > There are 10 records in leftRDD (confirmed with leftRDD.count, and each of > DetailInputRecord record in leftRDD has data within its members) > > I do leftRDD.leftOuterJoin(rightRDD) > > viEventsWithListings = leftRDD > spsLvlMetric = rightRDD > > val viEventsWithListingsJoinSpsLevelMetric = > viEventsWithListings.leftOuterJoin(spsLvlMetric).map { > case (viJoinSpsLevelMetric) => { > val (sellerId, ((viEventDetail, viSummary, itemId), spsLvlMetric)) > = viJoinSpsLevelMetric > > println("sellerId:" + sellerId) > println("sessionKey:" + viEventDetail.get("sessionKey")) > println("guid:" + viEventDetail.get("guid")) > println("sessionStartDate:" + > viEventDetail.get("sessionStartDate")) > println("siteId:" + viEventDetail.get("siteId")) > > if (spsLvlMetric.isDefined) { > > // do something > > } > } > > I print each of the items within the DetailInputRecord (viEventDetail) of > viEventsWithListings before and within leftOuterJoin. Before leftOuterJoin > i get values of each member within record (total 10 records). > > Within join when i do the print i get only guid as value for all members. > How is this possible ? > > Within join: (print statements. These are guids) > sessionKey:27c9fbc014b4f61526f0574001b73b00 > guid:27c9fbc014b4f61526f0574001b73b00 > sessionStartDate:27c9fbc014b4f61526f0574001b73b00 > siteId:27c9fbc014b4f61526f0574001b73b00 > > What went wrong, i have debugged multiple times but fail to understand the > reason. > Appreciate your help > -- > Deepak > > -- Deepak