I'm trying to implement a Spark Streaming program to calculate the number of
instances of a given key encountered and the minimum and maximum times at
which it was encountered.  updateStateByKey seems to be just the thing, but
when I define the "state" to be a tuple, I get compile errors I'm not
finding a way around.  Perhaps it's something simple, but I'm stumped.

    var lines =  ssc.textFileStream(dirArg)
    var linesArray = lines.map( line => (line.split("\t")))
    var newState = linesArray.map( lineArray => (lineArray(4), 1,

    val updateDhcpState = (newValues: Seq[(Int, Time, Time)], state:
Option[(Int, Time, Time)]) => 
      val newCount = newValues.map( x => x._1).sum
      val newMinTime = newValues.map( x => x._2).min
      val newMaxTime = newValues.map( x => x._3).max
      val (count, minTime, maxTime) = state.getOrElse((0,
Time(Int.MaxValue), Time(Int.MinValue)))

      Some((count+newCount, Seq(minTime, newMinTime).min, Seq(maxTime,
      //(count+newCount, Seq(minTime, newMinTime).min, Seq(maxTime,

    var DhcpSvrCum = newState.updateStateByKey[(Int, Time,

The error I get is

[info] Compiling 3 Scala sources to
[error] /Users/spr/Documents/...StatefulDhcpServersHisto.scala:95: value
updateStateByKey is not a member of
org.apache.spark.streaming.dstream.DStream[(String, Int,
org.apache.spark.streaming.Time, org.apache.spark.streaming.Time)]
[error]     var DhcpSvrCum = newState.updateStateByKey[(Int, Time,

I don't understand why the String is being prepended to the tuple I expect
(Int, Time, Time).  In the main example (StatefulNetworkWordCount,  here
), the data is a stream of (String, Int) tuples created by

    val wordDstream = words.map(x => (x, 1))

and the updateFunc ignores the String key in its definition

    val updateFunc = (values: Seq[Int], state: Option[Int]) => {
      val currentCount = values.sum
      val previousCount = state.getOrElse(0)
      Some(currentCount + previousCount)

Is there some special-casing of a key with a simple (non-tuple) value?  How
could this work with a tuple value?  

Thanks in advance.

View this message in context: 
Sent from the Apache Spark User List mailing list archive at Nabble.com.

To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to