Based on execution on small test cases, it appears that the construction below does what I intend. (Yes, all those Tuple1()s were superfluous.)
var lines = ssc.textFileStream(dirArg) var linesArray = lines.map( line => (line.split("\t"))) var newState = linesArray.map( lineArray => ((lineArray(4), (1, Time((lineArray(0).toDouble*1000).toLong), Time((lineArray(0).toDouble*1000).toLong))) )) val updateDhcpState = (newValues: Seq[(Int, Time, Time)], state: Option[(Int, Time, Time)]) => Option[(Int, Time, Time)] { val newCount = newValues.map( x => x._1).sum val newMinTime = newValues.map( x => x._2).min val newMaxTime = newValues.map( x => x._3).max val (count, minTime, maxTime) = state.getOrElse((0, Time(Int.MaxValue), Time(Int.MinValue))) (count+newCount, Seq(minTime, newMinTime).min, Seq(maxTime, newMaxTime).max) } var DhcpSvrCum = newState.updateStateByKey[(Int, Time, Time)](updateDhcpState) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/does-updateStateByKey-accept-a-state-that-is-a-tuple-tp17756p17828.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org