RE: another updateStateByKey question - updated w possible Spark bug
I’ve encountered this issue again and am able to reproduce it about 10% of the time. 1. Here is the input: RDD[ (a, 126232566, 1), (a, 126232566, 2) ] RDD[ (a, 126232566, 1), (a, 126232566, 3) ] RDD[ (a, 126232566, 3) ] RDD[ (a, 126232566, 4) ] RDD[ (a, 126232566, 2) ] RDD[ (a, 126232566, 5), (a, 126232566, 5) ] 2. Here are the actual results (printed DStream – each line is a new RDD with RDD Id being the last number on each line): (((a,126232566),StateClass(3,2,ArrayBuffer(1.0, 2.0))),6) (((a,126232566),StateClass(7,4,ArrayBuffer(1.0, 3.0))),13) (((a,126232566),StateClass(10,5,ArrayBuffer(3.0))),20) (((a,126232566),StateClass(10,5,ArrayBuffer())),26) -empty elements Seq[V] (((a,126232566),StateClass(14,6,ArrayBuffer(4.0))),33) (((a,126232566),StateClass(16,7,ArrayBuffer(2.0))),40) (((a,126232566),StateClass(26,9,ArrayBuffer(5.0, 5.0))),47) (((a,126232566),StateClass(26,9,ArrayBuffer())),53) -empty elements Seq[V] (((a,126232566),StateClass(26,9,ArrayBuffer())),59) -empty elements Seq[V] 3. Here are the expected results: (all tuples from #2 except those with empty Seq[V] ) (((a,126232566),StateClass(3,2,ArrayBuffer(1.0, 2.0))),6) (((a,126232566),StateClass(7,4,ArrayBuffer(1.0, 3.0))),13) (((a,126232566),StateClass(10,5,ArrayBuffer(3.0))),20) (((a,126232566),StateClass(14,6,ArrayBuffer(4.0))),33) (((a,126232566),StateClass(16,7,ArrayBuffer(2.0))),40) (((a,126232566),StateClass(26,9,ArrayBuffer(5.0, 5.0))),47) 4. Here is the code: case class StateClass(sum:Integer, count:Integer, elements:Seq[Double]) val updateSumFunc = (values: Seq[(String, Long, Int)], state: Option[StateClass]) = { // if (values.isEmpty) { //// if RDD cannot find values for this key (which is from prev RDD, //// the tuple will not be shown in this RDD w values of 0 //None // } else { val previousState = state.getOrElse(StateClass(0, 0, Seq())) val currentCount = values.size + previousState.count var currentSum=0 for (newValue - values) yield ({ currentSum = currentSum + newValue._3 }) currentSum= currentSum +previousState.sum val elements = for (newValues - values) yield ({ newValues._3.toDouble }) Some(StateClass(currentSum, currentCount, elements)) // } } val partialResultSums= inputStream.map((x:(String, Long, Int)) =((x._1), (x._1, x._2, x._3))) //re map .updateStateByKey[StateClass](updateSumFunc) //update state .transform(rdd=rdd.map(t=(t,rdd.id))) //add RDD ID to RDD tuples partialResultSums.print() Now this is how I generate the RDDs and I suspect the delay is why the issue surfaces: rddQueue += ssc.sparkContext.makeRDD(smallWindow1) // smallWindow1 = List[(String, Long, Int)]( (a, 126232566, 1), (a, 126232566, 2) ) Thread.sleep(1100) rddQueue += ssc.sparkContext.makeRDD(smallWindow2) // smallWindow2= List[(String, Long, Int)]((a, 126232566, 1), (a, 126232566, 3)) Thread.sleep(1100) rddQueue += ssc.sparkContext.makeRDD(smallWindow3) // smallWindow3= List[(String, Long, Int)]((a, 126232566, 3)) Thread.sleep(1100) rddQueue += ssc.sparkContext.makeRDD(smallWindow4) // smallWindow4= List[(String, Long, Int)]((a, 126232566, 4)) Thread.sleep(1100) rddQueue += ssc.sparkContext.makeRDD(smallWindow5) // smallWindow5= List[(String, Long, Int)]((a, 126232566, 2)) Thread.sleep(1100) rddQueue += ssc.sparkContext.makeRDD(smallWindow6) // smallWindow6= List[(String, Long, Int)]((a, 126232566, 5), (a, 126232566, 5)) Thread.sleep(3100) //ssc.awaitTermination() ssc.stop() In my use case when I detect an empty Seq[V] in updateStateByKey function I return None so I can filter the tuples out. However, given that Spark calls updateStateByKey function with empty Seq[V] when it should not, messes my logic up. I wonder how to bypass this bug/feature of Spark. Thanks -Adrian From: Tathagata Das [mailto:tathagata.das1...@gmail.com] Sent: May-02-14 3:10 PM To: user@spark.apache.org Cc: u...@spark.incubator.apache.org Subject: Re: another updateStateByKey question Could be a bug. Can you share a code with data that I can use to reproduce this? TD On May 2, 2014 9:49 AM, Adrian Mocanu amoc...@verticalscope.commailto:amoc...@verticalscope.com wrote: Has anyone else noticed that sometimes the same tuple calls update state function twice? I have 2 tuples with the same key in 1 RDD part of DStream: RDD[ (a,1), (a,2) ] When the update function is called the first time Seq[V] has data: 1, 2 which is correct: StateClass(3,2, ArrayBuffer(1, 2)) Then right away (in my output I see this) the same key is used and the function is called again but this time Seq is empty: StateClass(3,2, ArrayBuffer( )) In the update function I also save Seq[V] to state so I can see it in the RDD. I also
RE: another updateStateByKey question - updated w possible Spark bug
Forgot to mention my batch interval is 1 second: val ssc = new StreamingContext(conf, Seconds(1)) hence the Thread.sleep(1100) From: Adrian Mocanu [mailto:amoc...@verticalscope.com] Sent: May-05-14 12:06 PM To: user@spark.apache.org Cc: u...@spark.incubator.apache.org Subject: RE: another updateStateByKey question - updated w possible Spark bug I’ve encountered this issue again and am able to reproduce it about 10% of the time. 1. Here is the input: RDD[ (a, 126232566, 1), (a, 126232566, 2) ] RDD[ (a, 126232566, 1), (a, 126232566, 3) ] RDD[ (a, 126232566, 3) ] RDD[ (a, 126232566, 4) ] RDD[ (a, 126232566, 2) ] RDD[ (a, 126232566, 5), (a, 126232566, 5) ] 2. Here are the actual results (printed DStream – each line is a new RDD with RDD Id being the last number on each line): (((a,126232566),StateClass(3,2,ArrayBuffer(1.0, 2.0))),6) (((a,126232566),StateClass(7,4,ArrayBuffer(1.0, 3.0))),13) (((a,126232566),StateClass(10,5,ArrayBuffer(3.0))),20) (((a,126232566),StateClass(10,5,ArrayBuffer())),26) -empty elements Seq[V] (((a,126232566),StateClass(14,6,ArrayBuffer(4.0))),33) (((a,126232566),StateClass(16,7,ArrayBuffer(2.0))),40) (((a,126232566),StateClass(26,9,ArrayBuffer(5.0, 5.0))),47) (((a,126232566),StateClass(26,9,ArrayBuffer())),53) -empty elements Seq[V] (((a,126232566),StateClass(26,9,ArrayBuffer())),59) -empty elements Seq[V] 3. Here are the expected results: (all tuples from #2 except those with empty Seq[V] ) (((a,126232566),StateClass(3,2,ArrayBuffer(1.0, 2.0))),6) (((a,126232566),StateClass(7,4,ArrayBuffer(1.0, 3.0))),13) (((a,126232566),StateClass(10,5,ArrayBuffer(3.0))),20) (((a,126232566),StateClass(14,6,ArrayBuffer(4.0))),33) (((a,126232566),StateClass(16,7,ArrayBuffer(2.0))),40) (((a,126232566),StateClass(26,9,ArrayBuffer(5.0, 5.0))),47) 4. Here is the code: case class StateClass(sum:Integer, count:Integer, elements:Seq[Double]) val updateSumFunc = (values: Seq[(String, Long, Int)], state: Option[StateClass]) = { // if (values.isEmpty) { //// if RDD cannot find values for this key (which is from prev RDD, //// the tuple will not be shown in this RDD w values of 0 //None // } else { val previousState = state.getOrElse(StateClass(0, 0, Seq())) val currentCount = values.size + previousState.count var currentSum=0 for (newValue - values) yield ({ currentSum = currentSum + newValue._3 }) currentSum= currentSum +previousState.sum val elements = for (newValues - values) yield ({ newValues._3.toDouble }) Some(StateClass(currentSum, currentCount, elements)) // } } val partialResultSums= inputStream.map((x:(String, Long, Int)) =((x._1), (x._1, x._2, x._3))) //re map .updateStateByKey[StateClass](updateSumFunc) //update state .transform(rdd=rdd.map(t=(t,rdd.id))) //add RDD ID to RDD tuples partialResultSums.print() Now this is how I generate the RDDs and I suspect the delay is why the issue surfaces: rddQueue += ssc.sparkContext.makeRDD(smallWindow1) // smallWindow1 = List[(String, Long, Int)]( (a, 126232566, 1), (a, 126232566, 2) ) Thread.sleep(1100) rddQueue += ssc.sparkContext.makeRDD(smallWindow2) // smallWindow2= List[(String, Long, Int)]((a, 126232566, 1), (a, 126232566, 3)) Thread.sleep(1100) rddQueue += ssc.sparkContext.makeRDD(smallWindow3) // smallWindow3= List[(String, Long, Int)]((a, 126232566, 3)) Thread.sleep(1100) rddQueue += ssc.sparkContext.makeRDD(smallWindow4) // smallWindow4= List[(String, Long, Int)]((a, 126232566, 4)) Thread.sleep(1100) rddQueue += ssc.sparkContext.makeRDD(smallWindow5) // smallWindow5= List[(String, Long, Int)]((a, 126232566, 2)) Thread.sleep(1100) rddQueue += ssc.sparkContext.makeRDD(smallWindow6) // smallWindow6= List[(String, Long, Int)]((a, 126232566, 5), (a, 126232566, 5)) Thread.sleep(3100) //ssc.awaitTermination() ssc.stop() In my use case when I detect an empty Seq[V] in updateStateByKey function I return None so I can filter the tuples out. However, given that Spark calls updateStateByKey function with empty Seq[V] when it should not, messes my logic up. I wonder how to bypass this bug/feature of Spark. Thanks -Adrian From: Tathagata Das [mailto:tathagata.das1...@gmail.com] Sent: May-02-14 3:10 PM To: user@spark.apache.orgmailto:user@spark.apache.org Cc: u...@spark.incubator.apache.orgmailto:u...@spark.incubator.apache.org Subject: Re: another updateStateByKey question Could be a bug. Can you share a code with data that I can use to reproduce this? TD On May 2, 2014 9:49 AM, Adrian Mocanu amoc...@verticalscope.commailto:amoc...@verticalscope.com wrote: Has anyone else noticed that sometimes the same tuple calls update state function twice? I have 2 tuples with the same key
Re: another updateStateByKey question
Could be a bug. Can you share a code with data that I can use to reproduce this? TD On May 2, 2014 9:49 AM, Adrian Mocanu amoc...@verticalscope.com wrote: Has anyone else noticed that *sometimes* the same tuple calls update state function twice? I have 2 tuples with the same key in 1 RDD part of DStream: RDD[ (a,1), (a,2) ] When the update function is called the first time Seq[V] has data: 1, 2 which is correct: StateClass(3,2, ArrayBuffer(1, 2)) Then right away (in my output I see this) the same key is used and the function is called again but this time Seq is empty: StateClass(3,2, ArrayBuffer( )) In the update function I also save Seq[V] to state so I can see it in the RDD. I also show a count and sum of the values. StateClass(sum, count, Seq[V]) Why is the update function called with empty Seq[V] on the same key when all values for that key have been already taken care of in a previous update? -Adrian