Dean, the following code test pasted . Thank for you again. if (args.length < 2) { System.err.println("Usage: StatefulNetworkWordBiggest3Vaules <hostname> <port>") System.exit(1) }
val updateFunc = (key:String,values: Seq[Seq[Int]], state: Option[Seq[Int]]) => { if(values.length>0){ Some(values(0)) }else { Some(state.getOrElse(Seq(0))) } } val newUpdateFunc = (iterator: Iterator[(String, Seq[Seq[Int]], Option[Seq[Int]])]) => { iterator.flatMap(t => updateFunc(t._1,t._2, t._3).map(s => (t._1, s))) } val sparkConf = new SparkConf().setAppName("StatefulNetworkWordBiggest3Vaules") // Create the context with a 1 second batch size val ssc = new StreamingContext(sparkConf, Seconds(1)) ssc.checkpoint("/user/spark/StatefulNetworkWordBiggest3Vaules1") // Initial RDD input to updateStateByKey val initialRDD = ssc.sparkContext.parallelize(List(("hello", Seq(0)), ("world", Seq(0)))) // Create a ReceiverInputDStream on target ip:port and count the // words in input stream of \n delimited test (eg. generated by 'nc') val lines = ssc.socketTextStream(args(0), args(1).toInt) val words = lines.flatMap(_.split(" ")) val wordDstream = words.map(x => { val v= scala.util.Random.nextInt(1000) (x,Seq(v)) }) // Update the cumulative count using updateStateByKey // This will give a Dstream made of state (which is the cumulative count of the words) // wordDstream.updateStateByKey(newUpdateFunc, new HashPartitioner (ssc.sparkContext.defaultParallelism),true,initialRDD) val stateDstream = wordDstream.updateStateByKey[Seq[Int]](newUpdateFunc, new HashPartitioner (ssc.sparkContext.defaultParallelism),true, initialRDD) stateDstream.print() ssc.start() ssc.awaitTermination() Ricky Ou(欧 锐) From: our...@cnsuning.com Date: 2015-12-23 14:19 To: Dean Wampler CC: user; t...@databricks.com Subject: Re: Re: spark streaming updateStateByKey state is nonsupport other type except ClassTag such as list? as the following code modified form StateflNetwork in exampile package if (args.length < 2) { System.err.println("Usage: StatefulNetworkWordBiggest3Vaules <hostname> <port>") System.exit(1) } /** * state is min(max(3)) */ val updateFunc = (key:String,values: Seq[Seq[Int]], state: Seq[Int]) => { values(0) } val newUpdateFunc = (iterator: Iterator[(String, Seq[Seq[Int]], Seq[Int])]) => { iterator.flatMap(t => updateFunc(t._1,t._2, t._3).map(s => (t._1, s))) } val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount") // Create the context with a 1 second batch size val ssc = new StreamingContext(sparkConf, Seconds(1)) ssc.checkpoint(".") // Initial RDD input to updateStateByKey val initialRDD = ssc.sparkContext.parallelize(List(("hello", Seq(1)), ("world", Seq(1)))) // Create a ReceiverInputDStream on target ip:port and count the // words in input stream of \n delimited test (eg. generated by 'nc') val lines = ssc.socketTextStream(args(0), args(1).toInt) val words = lines.flatMap(_.split(" ")) val wordDstream = words.map(x => (x, scala.util.Random.nextInt(1000))) // Update the cumulative count using updateStateByKey // This will give a Dstream made of state (which is the cumulative count of the words) val stateDstream = wordDstream.updateStateByKey[Seq[Int]](newUpdateFunc, new HashPartitioner (ssc.sparkContext.defaultParallelism)) stateDstream.print() ssc.start() ssc.awaitTermination() the compile error: Error:(77, 52) overloaded method value updateStateByKey with alternatives: (updateFunc: Iterator[(String, Seq[Int], Option[Seq[Int]])] => Iterator[(String, Seq[Int])],partitioner: org.apache.spark.Partitioner,rememberPartitioner: Boolean,initialRDD: org.apache.spark.rdd.RDD[(String, Seq[Int])])(implicit evidence$7: scala.reflect.ClassTag[Seq[Int]])org.apache.spark.streaming.dstream.DStream[(String, Seq[Int])] <and> (updateFunc: (Seq[Int], Option[Seq[Int]]) => Option[Seq[Int]],partitioner: org.apache.spark.Partitioner,initialRDD: org.apache.spark.rdd.RDD[(String, Seq[Int])])(implicit evidence$6: scala.reflect.ClassTag[Seq[Int]])org.apache.spark.streaming.dstream.DStream[(String, Seq[Int])] <and> (updateFunc: Iterator[(String, Seq[Int], Option[Seq[Int]])] => Iterator[(String, Seq[Int])],partitioner: org.apache.spark.Partitioner,rememberPartitioner: Boolean)(implicit evidence$5: scala.reflect.ClassTag[Seq[Int]])org.apache.spark.streaming.dstream.DStream[(String, Seq[Int])] <and> (updateFunc: (Seq[Int], Option[Seq[Int]]) => Option[Seq[Int]],partitioner: org.apache.spark.Partitioner)(implicit evidence$4: scala.reflect.ClassTag[Seq[Int]])org.apache.spark.streaming.dstream.DStream[(String, Seq[Int])] <and> (updateFunc: (Seq[Int], Option[Seq[Int]]) => Option[Seq[Int]],numPartitions: Int)(implicit evidence$3: scala.reflect.ClassTag[Seq[Int]])org.apache.spark.streaming.dstream.DStream[(String, Seq[Int])] <and> (updateFunc: (Seq[Int], Option[Seq[Int]]) => Option[Seq[Int]])(implicit evidence$2: scala.reflect.ClassTag[Seq[Int]])org.apache.spark.streaming.dstream.DStream[(String, Seq[Int])] cannot be applied to (Iterator[(String, Seq[Seq[Int]], Seq[Int])] => Iterator[(String, Int)], org.apache.spark.HashPartitioner, Boolean, org.apache.spark.rdd.RDD[(String, Seq[Int])]) val stateDstream = wordDstream.updateStateByKey[Seq[Int]](newUpdateFunc, Ricky Ou(欧 锐) 部 门:苏宁云商 IT总部技术支撑研发中心大 数据中心数据平台开发部 tel :18551600418 email : our...@cnsuning.com From: Dean Wampler Date: 2015-12-23 00:46 To: our...@cnsuning.com CC: user; t...@databricks.com Subject: Re: spark streaming updateStateByKey state is nonsupport other type except ClassTag such as list? There are ClassTags for Array, List, and Map, as well as for Int, etc. that you might have inside those collections. What do you mean by sql? Could you post more of your code? Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition (O'Reilly) Typesafe @deanwampler http://polyglotprogramming.com On Mon, Dec 21, 2015 at 8:51 PM, our...@cnsuning.com <our...@cnsuning.com> wrote: spark streaming updateStateByKey state no support Array type without classTag? how to slove the problem? def updateStateByKey[S: ClassTag]( updateFunc: (Seq[V], Option[S]) => Option[S] ): DStream[(K, S)] = ssc.withScope { updateStateByKey(updateFunc, defaultPartitioner()) } ClassTag not support other type eg:hashmap ,list ,sql. my usecase as following: save the lastest three click log with collecting goods from different topic with same member ID, then the system will recommend related products according to lastest three click log with collecting goods. I want to use updateStateByKey state to save it ,however updateStateByKey state is nonsupport other type except ClassTag such as list. thanks for your help Ricky Ou(欧 锐)