I am creating a workflow;  I have an existing call to updateStateByKey that
works fine, but when I created a second use where the key is a Tuple2, it's
now failing with the dreaded "overloaded method value updateStateByKey  with
alternatives ... cannot be applied to ..."  Comparing the two uses I'm not
seeing anything that seems broken, though I do note that all the messages
below describe what the code provides as Time as
org.apache.spark.streaming.Time.

a) Could the Time v org.apache.spark.streaming.Time difference be causing
this?  (I'm using Time the same in the first use, which appears to work
properly.)

b) Any suggestions of what else could be causing the error?  

------code--------
    val ssc = new StreamingContext(conf, Seconds(timeSliceArg))
    ssc.checkpoint(".")

    var lines = ssc.textFileStream(dirArg)

    var linesArray = lines.map( line => (line.split("\t")))
    var DnsSvr = linesArray.map( lineArray => (
         (lineArray(4), lineArray(5)),
         (1 , Time((lineArray(0).toDouble*1000).toLong) ))  )

    val updateDnsCount = (values: Seq[(Int, Time)], state: Option[(Int,
Time)]) => {
      val currentCount = if (values.isEmpty) 0 else values.map( x =>
x._1).sum
      val newMinTime = if (values.isEmpty) Time(Long.MaxValue) else
values.map( x => x._2).min

      val (previousCount, minTime) = state.getOrElse((0,
Time(System.currentTimeMillis)))

      (currentCount + previousCount, Seq(minTime, newMinTime).min)
    }

    var DnsSvrCum = DnsSvr.updateStateByKey[(Int, Time)](updateDnsCount)  
// <=== error here


------compilation output----------
[error] /Users/spr/Documents/.../cyberSCinet.scala:513: overloaded method
value updateStateByKey with alternatives:
[error]   (updateFunc: Iterator[((String, String), Seq[(Int,
org.apache.spark.streaming.Time)], Option[(Int,
org.apache.spark.streaming.Time)])] => Iterator[((String, String), (Int,
org.apache.spark.streaming.Time))],partitioner:
org.apache.spark.Partitioner,rememberPartitioner: Boolean)(implicit
evidence$5: scala.reflect.ClassTag[(Int,
org.apache.spark.streaming.Time)])org.apache.spark.streaming.dstream.DStream[((String,
String), (Int, org.apache.spark.streaming.Time))] <and>
[error]   (updateFunc: (Seq[(Int, org.apache.spark.streaming.Time)],
Option[(Int, org.apache.spark.streaming.Time)]) => Option[(Int,
org.apache.spark.streaming.Time)],partitioner:
org.apache.spark.Partitioner)(implicit evidence$4:
scala.reflect.ClassTag[(Int,
org.apache.spark.streaming.Time)])org.apache.spark.streaming.dstream.DStream[((String,
String), (Int, org.apache.spark.streaming.Time))] <and>
[error]   (updateFunc: (Seq[(Int, org.apache.spark.streaming.Time)],
Option[(Int, org.apache.spark.streaming.Time)]) => Option[(Int,
org.apache.spark.streaming.Time)],numPartitions: Int)(implicit evidence$3:
scala.reflect.ClassTag[(Int,
org.apache.spark.streaming.Time)])org.apache.spark.streaming.dstream.DStream[((String,
String), (Int, org.apache.spark.streaming.Time))] <and>
[error]   (updateFunc: (Seq[(Int, org.apache.spark.streaming.Time)],
Option[(Int, org.apache.spark.streaming.Time)]) => Option[(Int,
org.apache.spark.streaming.Time)])(implicit evidence$2:
scala.reflect.ClassTag[(Int,
org.apache.spark.streaming.Time)])org.apache.spark.streaming.dstream.DStream[((String,
String), (Int, org.apache.spark.streaming.Time))]
[error]  cannot be applied to ((Seq[(Int, org.apache.spark.streaming.Time)],
Option[(Int, org.apache.spark.streaming.Time)]) => (Int,
org.apache.spark.streaming.Time))
[error]     var DnsSvrCum = DnsSvr.updateStateByKey[(Int,
Time)](updateDnsCount) 





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/in-function-prototypes-tp18642.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to