I'm trying to understand updateStateByKey.

Here's an example I'm testing with:
Input data: DStream( RDD( ("a",2) ), RDD( ("a",3) ), RDD( ("a",4) ), RDD( 
("a",5) ), RDD( ("a",6) ), RDD( ("a",7) ) )

Code:
  val updateFunc = (values: Seq[Int], state: Option[StateClass]) => {
      val previousState = state.getOrElse( StateClass(0,0, Seq()) )
      val currentSum = values.sum + previousState.sum
      val currentCount = values.size + previousState.count

            if (currentCount==previousState.count) {
              None    //if this RDD has no change then remove the tuple
            } else {
              Some( StateClass(currentSum, currentCount, values) )
            }
    }

intStream.updateStateByKey[StateClass](updateFunc).transform(rdd=>rdd.map(t=>(t,rdd.id))).print()

Results:
((a,StateClass(14,5,ArrayBuffer(2.0, 3.0, 3.0, 3.0, 3.0))),12)
((a,StateClass(17,6,ArrayBuffer(3.0))),22)
((a,StateClass(20,7,ArrayBuffer(3.0))),32)

Questions:
Why does RDD with ID=12 have these elements: (2.0, 3.0, 3.0, 3.0, 3.0) ?
These do not exist in input data so where do these numbers come from? ..well 2 
and 3 exists but not the other 3's and it's missing 4,5,6,7 also.
What is going on here?

-Adrian

Reply via email to