I'm trying to implement a Spark Streaming program to calculate the number of instances of a given key encountered and the minimum and maximum times at which it was encountered. updateStateByKey seems to be just the thing, but when I define the "state" to be a tuple, I get compile errors I'm not finding a way around. Perhaps it's something simple, but I'm stumped.
var lines = ssc.textFileStream(dirArg) var linesArray = lines.map( line => (line.split("\t"))) var newState = linesArray.map( lineArray => (lineArray(4), 1, Time((lineArray(0).toDouble*1000).toInt), Time((lineArray(0).toDouble*1000).toInt))) val updateDhcpState = (newValues: Seq[(Int, Time, Time)], state: Option[(Int, Time, Time)]) => { val newCount = newValues.map( x => x._1).sum val newMinTime = newValues.map( x => x._2).min val newMaxTime = newValues.map( x => x._3).max val (count, minTime, maxTime) = state.getOrElse((0, Time(Int.MaxValue), Time(Int.MinValue))) Some((count+newCount, Seq(minTime, newMinTime).min, Seq(maxTime, newMaxTime).max)) //(count+newCount, Seq(minTime, newMinTime).min, Seq(maxTime, newMaxTime).max) } var DhcpSvrCum = newState.updateStateByKey[(Int, Time, Time)](updateDhcpState) The error I get is [info] Compiling 3 Scala sources to /Users/spr/Documents/.../target/scala-2.10/classes... [error] /Users/spr/Documents/...StatefulDhcpServersHisto.scala:95: value updateStateByKey is not a member of org.apache.spark.streaming.dstream.DStream[(String, Int, org.apache.spark.streaming.Time, org.apache.spark.streaming.Time)] [error] var DhcpSvrCum = newState.updateStateByKey[(Int, Time, Time)](updateDhcpState) I don't understand why the String is being prepended to the tuple I expect (Int, Time, Time). In the main example (StatefulNetworkWordCount, here <https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala> ), the data is a stream of (String, Int) tuples created by val wordDstream = words.map(x => (x, 1)) and the updateFunc ignores the String key in its definition val updateFunc = (values: Seq[Int], state: Option[Int]) => { val currentCount = values.sum val previousCount = state.getOrElse(0) Some(currentCount + previousCount) } Is there some special-casing of a key with a simple (non-tuple) value? How could this work with a tuple value? Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/does-updateStateByKey-accept-a-state-that-is-a-tuple-tp17756.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org