I'm trying to implement a Spark Streaming program to calculate the number of
instances of a given key encountered and the minimum and maximum times at
which it was encountered.  updateStateByKey seems to be just the thing, but
when I define the "state" to be a tuple, I get compile errors I'm not
finding a way around.  Perhaps it's something simple, but I'm stumped.

    var lines =  ssc.textFileStream(dirArg)
    var linesArray = lines.map( line => (line.split("\t")))
    var newState = linesArray.map( lineArray => (lineArray(4), 1,
       Time((lineArray(0).toDouble*1000).toInt),
       Time((lineArray(0).toDouble*1000).toInt)))

    val updateDhcpState = (newValues: Seq[(Int, Time, Time)], state:
Option[(Int, Time, Time)]) => 
    {
      val newCount = newValues.map( x => x._1).sum
      val newMinTime = newValues.map( x => x._2).min
      val newMaxTime = newValues.map( x => x._3).max
      val (count, minTime, maxTime) = state.getOrElse((0,
Time(Int.MaxValue), Time(Int.MinValue)))

      Some((count+newCount, Seq(minTime, newMinTime).min, Seq(maxTime,
newMaxTime).max))
      //(count+newCount, Seq(minTime, newMinTime).min, Seq(maxTime,
newMaxTime).max)
    }

    var DhcpSvrCum = newState.updateStateByKey[(Int, Time,
Time)](updateDhcpState) 

The error I get is

[info] Compiling 3 Scala sources to
/Users/spr/Documents/.../target/scala-2.10/classes...
[error] /Users/spr/Documents/...StatefulDhcpServersHisto.scala:95: value
updateStateByKey is not a member of
org.apache.spark.streaming.dstream.DStream[(String, Int,
org.apache.spark.streaming.Time, org.apache.spark.streaming.Time)]
[error]     var DhcpSvrCum = newState.updateStateByKey[(Int, Time,
Time)](updateDhcpState) 

I don't understand why the String is being prepended to the tuple I expect
(Int, Time, Time).  In the main example (StatefulNetworkWordCount,  here
<https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala>
 
), the data is a stream of (String, Int) tuples created by

    val wordDstream = words.map(x => (x, 1))

and the updateFunc ignores the String key in its definition

    val updateFunc = (values: Seq[Int], state: Option[Int]) => {
      val currentCount = values.sum
      val previousCount = state.getOrElse(0)
      Some(currentCount + previousCount)
    }

Is there some special-casing of a key with a simple (non-tuple) value?  How
could this work with a tuple value?  

Thanks in advance.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/does-updateStateByKey-accept-a-state-that-is-a-tuple-tp17756.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to