RE: another updateStateByKey question - updated w possible Spark bug

2014-05-05 Thread Adrian Mocanu
I’ve encountered this issue again and am able to reproduce it about 10% of the 
time.

1. Here is the input:
RDD[ (a, 126232566, 1), (a, 126232566, 2) ]
RDD[ (a, 126232566, 1), (a, 126232566, 3) ]
RDD[ (a, 126232566, 3) ]
RDD[ (a, 126232566, 4) ]
RDD[ (a, 126232566, 2) ]
RDD[ (a, 126232566, 5), (a, 126232566, 5) ]

2. Here are the actual results (printed DStream – each line is a new RDD with 
RDD Id being the last number on each line):
(((a,126232566),StateClass(3,2,ArrayBuffer(1.0, 2.0))),6)
(((a,126232566),StateClass(7,4,ArrayBuffer(1.0, 3.0))),13)
(((a,126232566),StateClass(10,5,ArrayBuffer(3.0))),20)
(((a,126232566),StateClass(10,5,ArrayBuffer())),26)   -empty elements 
Seq[V]
(((a,126232566),StateClass(14,6,ArrayBuffer(4.0))),33)
(((a,126232566),StateClass(16,7,ArrayBuffer(2.0))),40)
(((a,126232566),StateClass(26,9,ArrayBuffer(5.0, 5.0))),47)
(((a,126232566),StateClass(26,9,ArrayBuffer())),53)  -empty elements Seq[V]
(((a,126232566),StateClass(26,9,ArrayBuffer())),59)  -empty elements Seq[V]

3. Here are the expected results: (all tuples from #2 except those with empty 
Seq[V] )
(((a,126232566),StateClass(3,2,ArrayBuffer(1.0, 2.0))),6)
(((a,126232566),StateClass(7,4,ArrayBuffer(1.0, 3.0))),13)
(((a,126232566),StateClass(10,5,ArrayBuffer(3.0))),20)
(((a,126232566),StateClass(14,6,ArrayBuffer(4.0))),33)
(((a,126232566),StateClass(16,7,ArrayBuffer(2.0))),40)
(((a,126232566),StateClass(26,9,ArrayBuffer(5.0, 5.0))),47)

4. Here is the code:
case class StateClass(sum:Integer, count:Integer, elements:Seq[Double])

val updateSumFunc = (values: Seq[(String, Long, Int)], state: 
Option[StateClass]) = {
//  if (values.isEmpty) {
//// if RDD cannot find values for this key (which is from prev RDD,
//// the tuple will not be shown in this RDD w values of 0
//None
//  } else {
val previousState = state.getOrElse(StateClass(0, 0, Seq()))
val currentCount = values.size + previousState.count
var currentSum=0
for (newValue - values) yield ({
  currentSum = currentSum + newValue._3
})
currentSum= currentSum +previousState.sum
val elements = for (newValues - values) yield ({
  newValues._3.toDouble
})
Some(StateClass(currentSum, currentCount, elements))
//  }
}

val partialResultSums= inputStream.map((x:(String, Long, Int)) =((x._1), 
(x._1, x._2, x._3)))  //re map
.updateStateByKey[StateClass](updateSumFunc)  //update state
.transform(rdd=rdd.map(t=(t,rdd.id)))   //add RDD ID to RDD tuples

partialResultSums.print()

Now this is how I generate the RDDs and I suspect the delay is why the issue 
surfaces:

rddQueue += ssc.sparkContext.makeRDD(smallWindow1)  // smallWindow1 = 
List[(String, Long, Int)]( (a, 126232566, 1), (a, 126232566, 2) )

Thread.sleep(1100)
rddQueue += ssc.sparkContext.makeRDD(smallWindow2)   // smallWindow2= 
List[(String, Long, Int)]((a, 126232566, 1), (a, 126232566, 3))

Thread.sleep(1100)
rddQueue += ssc.sparkContext.makeRDD(smallWindow3)   // smallWindow3= 
List[(String, Long, Int)]((a, 126232566, 3))

Thread.sleep(1100)
rddQueue += ssc.sparkContext.makeRDD(smallWindow4)   // smallWindow4= 
List[(String, Long, Int)]((a, 126232566, 4))

Thread.sleep(1100)
rddQueue += ssc.sparkContext.makeRDD(smallWindow5)   // smallWindow5= 
List[(String, Long, Int)]((a, 126232566, 2))

Thread.sleep(1100)
rddQueue += ssc.sparkContext.makeRDD(smallWindow6)   // smallWindow6= 
List[(String, Long, Int)]((a, 126232566, 5), (a, 126232566, 5))

Thread.sleep(3100)
//ssc.awaitTermination()
ssc.stop()

In my use case when I detect an empty Seq[V] in updateStateByKey function I 
return None so I can filter the tuples out. However, given that Spark calls 
updateStateByKey function with empty Seq[V] when it should not, messes my logic 
up.
I wonder how to bypass this bug/feature of Spark.

Thanks
-Adrian
From: Tathagata Das [mailto:tathagata.das1...@gmail.com]
Sent: May-02-14 3:10 PM
To: user@spark.apache.org
Cc: u...@spark.incubator.apache.org
Subject: Re: another updateStateByKey question


Could be a bug. Can you share a code with data that I can use to reproduce this?

TD
On May 2, 2014 9:49 AM, Adrian Mocanu 
amoc...@verticalscope.commailto:amoc...@verticalscope.com wrote:
Has anyone else noticed that sometimes the same tuple calls update state 
function twice?
I have 2 tuples with the same key in 1 RDD part of DStream: RDD[ (a,1), (a,2) ]
When the update function is called the first time Seq[V] has data: 1, 2 which 
is correct: StateClass(3,2, ArrayBuffer(1, 2))
Then right away (in my output I see this) the same key is used and the function 
is called again but this time Seq is empty: StateClass(3,2, ArrayBuffer( ))

In the update function I also save Seq[V] to state so I can see it in the RDD. 
I also

RE: another updateStateByKey question - updated w possible Spark bug

2014-05-05 Thread Adrian Mocanu
Forgot to mention my batch interval is 1 second:
val ssc = new StreamingContext(conf, Seconds(1))
hence the Thread.sleep(1100)

From: Adrian Mocanu [mailto:amoc...@verticalscope.com]
Sent: May-05-14 12:06 PM
To: user@spark.apache.org
Cc: u...@spark.incubator.apache.org
Subject: RE: another updateStateByKey question - updated w possible Spark bug

I’ve encountered this issue again and am able to reproduce it about 10% of the 
time.

1. Here is the input:
RDD[ (a, 126232566, 1), (a, 126232566, 2) ]
RDD[ (a, 126232566, 1), (a, 126232566, 3) ]
RDD[ (a, 126232566, 3) ]
RDD[ (a, 126232566, 4) ]
RDD[ (a, 126232566, 2) ]
RDD[ (a, 126232566, 5), (a, 126232566, 5) ]

2. Here are the actual results (printed DStream – each line is a new RDD with 
RDD Id being the last number on each line):
(((a,126232566),StateClass(3,2,ArrayBuffer(1.0, 2.0))),6)
(((a,126232566),StateClass(7,4,ArrayBuffer(1.0, 3.0))),13)
(((a,126232566),StateClass(10,5,ArrayBuffer(3.0))),20)
(((a,126232566),StateClass(10,5,ArrayBuffer())),26)   -empty elements 
Seq[V]
(((a,126232566),StateClass(14,6,ArrayBuffer(4.0))),33)
(((a,126232566),StateClass(16,7,ArrayBuffer(2.0))),40)
(((a,126232566),StateClass(26,9,ArrayBuffer(5.0, 5.0))),47)
(((a,126232566),StateClass(26,9,ArrayBuffer())),53)  -empty elements Seq[V]
(((a,126232566),StateClass(26,9,ArrayBuffer())),59)  -empty elements Seq[V]

3. Here are the expected results: (all tuples from #2 except those with empty 
Seq[V] )
(((a,126232566),StateClass(3,2,ArrayBuffer(1.0, 2.0))),6)
(((a,126232566),StateClass(7,4,ArrayBuffer(1.0, 3.0))),13)
(((a,126232566),StateClass(10,5,ArrayBuffer(3.0))),20)
(((a,126232566),StateClass(14,6,ArrayBuffer(4.0))),33)
(((a,126232566),StateClass(16,7,ArrayBuffer(2.0))),40)
(((a,126232566),StateClass(26,9,ArrayBuffer(5.0, 5.0))),47)

4. Here is the code:
case class StateClass(sum:Integer, count:Integer, elements:Seq[Double])

val updateSumFunc = (values: Seq[(String, Long, Int)], state: 
Option[StateClass]) = {
//  if (values.isEmpty) {
//// if RDD cannot find values for this key (which is from prev RDD,
//// the tuple will not be shown in this RDD w values of 0
//None
//  } else {
val previousState = state.getOrElse(StateClass(0, 0, Seq()))
val currentCount = values.size + previousState.count
var currentSum=0
for (newValue - values) yield ({
  currentSum = currentSum + newValue._3
})
currentSum= currentSum +previousState.sum
val elements = for (newValues - values) yield ({
  newValues._3.toDouble
})
Some(StateClass(currentSum, currentCount, elements))
//  }
}

val partialResultSums= inputStream.map((x:(String, Long, Int)) =((x._1), 
(x._1, x._2, x._3)))  //re map
.updateStateByKey[StateClass](updateSumFunc)  //update state
.transform(rdd=rdd.map(t=(t,rdd.id)))   //add RDD ID to RDD tuples

partialResultSums.print()

Now this is how I generate the RDDs and I suspect the delay is why the issue 
surfaces:

rddQueue += ssc.sparkContext.makeRDD(smallWindow1)  // smallWindow1 = 
List[(String, Long, Int)]( (a, 126232566, 1), (a, 126232566, 2) )

Thread.sleep(1100)
rddQueue += ssc.sparkContext.makeRDD(smallWindow2)   // smallWindow2= 
List[(String, Long, Int)]((a, 126232566, 1), (a, 126232566, 3))

Thread.sleep(1100)
rddQueue += ssc.sparkContext.makeRDD(smallWindow3)   // smallWindow3= 
List[(String, Long, Int)]((a, 126232566, 3))

Thread.sleep(1100)
rddQueue += ssc.sparkContext.makeRDD(smallWindow4)   // smallWindow4= 
List[(String, Long, Int)]((a, 126232566, 4))

Thread.sleep(1100)
rddQueue += ssc.sparkContext.makeRDD(smallWindow5)   // smallWindow5= 
List[(String, Long, Int)]((a, 126232566, 2))

Thread.sleep(1100)
rddQueue += ssc.sparkContext.makeRDD(smallWindow6)   // smallWindow6= 
List[(String, Long, Int)]((a, 126232566, 5), (a, 126232566, 5))

Thread.sleep(3100)
//ssc.awaitTermination()
ssc.stop()

In my use case when I detect an empty Seq[V] in updateStateByKey function I 
return None so I can filter the tuples out. However, given that Spark calls 
updateStateByKey function with empty Seq[V] when it should not, messes my logic 
up.
I wonder how to bypass this bug/feature of Spark.

Thanks
-Adrian
From: Tathagata Das [mailto:tathagata.das1...@gmail.com]
Sent: May-02-14 3:10 PM
To: user@spark.apache.orgmailto:user@spark.apache.org
Cc: u...@spark.incubator.apache.orgmailto:u...@spark.incubator.apache.org
Subject: Re: another updateStateByKey question


Could be a bug. Can you share a code with data that I can use to reproduce this?

TD
On May 2, 2014 9:49 AM, Adrian Mocanu 
amoc...@verticalscope.commailto:amoc...@verticalscope.com wrote:
Has anyone else noticed that sometimes the same tuple calls update state 
function twice?
I have 2 tuples with the same key

Re: another updateStateByKey question

2014-05-02 Thread Tathagata Das
Could be a bug. Can you share a code with data that I can use to reproduce
this?

TD
On May 2, 2014 9:49 AM, Adrian Mocanu amoc...@verticalscope.com wrote:

  Has anyone else noticed that *sometimes* the same tuple calls update
 state function twice?

 I have 2 tuples with the same key in 1 RDD part of DStream: RDD[ (a,1),
 (a,2) ]

 When the update function is called the first time Seq[V] has data: 1, 2
 which is correct: StateClass(3,2, ArrayBuffer(1, 2))

 Then right away (in my output I see this) the same key is used and the
 function is called again but this time Seq is empty: StateClass(3,2,
 ArrayBuffer( ))



 In the update function I also save Seq[V] to state so I can see it in the
 RDD. I also show a count and sum of the values.

 StateClass(sum, count, Seq[V])



 Why is the update function called with empty Seq[V] on the same key when
 all values for that key have been already taken care of in a previous
 update?



 -Adrian