Based on execution on small test cases, it appears that the construction
below does what I intend.  (Yes, all those Tuple1()s were superfluous.)

    var lines =  ssc.textFileStream(dirArg) 
    var linesArray = lines.map( line => (line.split("\t"))) 
    var newState = linesArray.map( lineArray => ((lineArray(4),
       (1, Time((lineArray(0).toDouble*1000).toLong),
         Time((lineArray(0).toDouble*1000).toLong)))  ))

    val updateDhcpState = (newValues: Seq[(Int, Time, Time)], state:
Option[(Int, Time, Time)]) => 
        Option[(Int, Time, Time)]
    { 
      val newCount = newValues.map( x => x._1).sum 
      val newMinTime = newValues.map( x => x._2).min 
      val newMaxTime = newValues.map( x => x._3).max 
      val (count, minTime, maxTime) = state.getOrElse((0,
Time(Int.MaxValue), Time(Int.MinValue))) 

      (count+newCount, Seq(minTime, newMinTime).min, Seq(maxTime,
newMaxTime).max) 
    } 

    var DhcpSvrCum = newState.updateStateByKey[(Int, Time,
Time)](updateDhcpState) 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/does-updateStateByKey-accept-a-state-that-is-a-tuple-tp17756p17828.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to