Dean,
      the following code  test pasted . Thank for you again.

 if (args.length < 2) {
   System.err.println("Usage: StatefulNetworkWordBiggest3Vaules <hostname> 
<port>")
   System.exit(1)
 }


 val updateFunc = (key:String,values: Seq[Seq[Int]], state: Option[Seq[Int]]) 
=> {
   if(values.length>0){
     Some(values(0))
   }else {
     Some(state.getOrElse(Seq(0)))
   }
 }

 val newUpdateFunc = (iterator: Iterator[(String, Seq[Seq[Int]], 
Option[Seq[Int]])]) => {
   iterator.flatMap(t => updateFunc(t._1,t._2, t._3).map(s => (t._1, s)))
 }

 val sparkConf = new SparkConf().setAppName("StatefulNetworkWordBiggest3Vaules")
 // Create the context with a 1 second batch size
 val ssc = new StreamingContext(sparkConf, Seconds(1))
 ssc.checkpoint("/user/spark/StatefulNetworkWordBiggest3Vaules1")

 // Initial RDD input to updateStateByKey
 val initialRDD = ssc.sparkContext.parallelize(List(("hello", Seq(0)), 
("world", Seq(0))))

 // Create a ReceiverInputDStream on target ip:port and count the
 // words in input stream of \n delimited test (eg. generated by 'nc')
 val lines = ssc.socketTextStream(args(0), args(1).toInt)
 val words = lines.flatMap(_.split(" "))
 val wordDstream = words.map(x => {
   val v= scala.util.Random.nextInt(1000)
   (x,Seq(v))
 })
 // Update the cumulative count using updateStateByKey
 // This will give a Dstream made of state (which is the cumulative count of 
the words)
// wordDstream.updateStateByKey(newUpdateFunc, new HashPartitioner 
(ssc.sparkContext.defaultParallelism),true,initialRDD)
 val stateDstream = wordDstream.updateStateByKey[Seq[Int]](newUpdateFunc, new 
HashPartitioner (ssc.sparkContext.defaultParallelism),true, initialRDD)
 stateDstream.print()
 ssc.start()
 ssc.awaitTermination()











Ricky  Ou(欧   锐)


 
From: our...@cnsuning.com
Date: 2015-12-23 14:19
To: Dean Wampler
CC: user; t...@databricks.com
Subject: Re: Re: spark streaming updateStateByKey state is nonsupport other 
type except ClassTag such as list?


 as the following code modified form StateflNetwork in exampile package 
if (args.length < 2) {
  System.err.println("Usage: StatefulNetworkWordBiggest3Vaules <hostname> 
<port>")
  System.exit(1)
}
/**
 * state is min(max(3))
 */

val updateFunc = (key:String,values: Seq[Seq[Int]], state: Seq[Int]) => {
  values(0)
}

val newUpdateFunc = (iterator: Iterator[(String, Seq[Seq[Int]], Seq[Int])]) => {
  iterator.flatMap(t => updateFunc(t._1,t._2, t._3).map(s => (t._1, s)))
}

val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount")
// Create the context with a 1 second batch size
val ssc = new StreamingContext(sparkConf, Seconds(1))
ssc.checkpoint(".")

// Initial RDD input to updateStateByKey
val initialRDD = ssc.sparkContext.parallelize(List(("hello", Seq(1)), ("world", 
Seq(1))))

// Create a ReceiverInputDStream on target ip:port and count the
// words in input stream of \n delimited test (eg. generated by 'nc')
val lines = ssc.socketTextStream(args(0), args(1).toInt)
val words = lines.flatMap(_.split(" "))
val wordDstream = words.map(x => (x, scala.util.Random.nextInt(1000)))
// Update the cumulative count using updateStateByKey
// This will give a Dstream made of state (which is the cumulative count of the 
words)
val stateDstream = wordDstream.updateStateByKey[Seq[Int]](newUpdateFunc,
  new HashPartitioner (ssc.sparkContext.defaultParallelism))
stateDstream.print()
ssc.start()
ssc.awaitTermination() 

the compile error:
Error:(77, 52) overloaded method value updateStateByKey with alternatives: 
(updateFunc: Iterator[(String, Seq[Int], Option[Seq[Int]])] => 
Iterator[(String, Seq[Int])],partitioner: 
org.apache.spark.Partitioner,rememberPartitioner: Boolean,initialRDD: 
org.apache.spark.rdd.RDD[(String, Seq[Int])])(implicit evidence$7: 
scala.reflect.ClassTag[Seq[Int]])org.apache.spark.streaming.dstream.DStream[(String,
 Seq[Int])] <and> 
(updateFunc: (Seq[Int], Option[Seq[Int]]) => Option[Seq[Int]],partitioner: 
org.apache.spark.Partitioner,initialRDD: org.apache.spark.rdd.RDD[(String, 
Seq[Int])])(implicit evidence$6: 
scala.reflect.ClassTag[Seq[Int]])org.apache.spark.streaming.dstream.DStream[(String,
 Seq[Int])] <and> 
(updateFunc: Iterator[(String, Seq[Int], Option[Seq[Int]])] => 
Iterator[(String, Seq[Int])],partitioner: 
org.apache.spark.Partitioner,rememberPartitioner: Boolean)(implicit evidence$5: 
scala.reflect.ClassTag[Seq[Int]])org.apache.spark.streaming.dstream.DStream[(String,
 Seq[Int])] <and> 
(updateFunc: (Seq[Int], Option[Seq[Int]]) => Option[Seq[Int]],partitioner: 
org.apache.spark.Partitioner)(implicit evidence$4: 
scala.reflect.ClassTag[Seq[Int]])org.apache.spark.streaming.dstream.DStream[(String,
 Seq[Int])] <and> 
(updateFunc: (Seq[Int], Option[Seq[Int]]) => Option[Seq[Int]],numPartitions: 
Int)(implicit evidence$3: 
scala.reflect.ClassTag[Seq[Int]])org.apache.spark.streaming.dstream.DStream[(String,
 Seq[Int])] <and> 
(updateFunc: (Seq[Int], Option[Seq[Int]]) => Option[Seq[Int]])(implicit 
evidence$2: 
scala.reflect.ClassTag[Seq[Int]])org.apache.spark.streaming.dstream.DStream[(String,
 Seq[Int])] 
cannot be applied to (Iterator[(String, Seq[Seq[Int]], Seq[Int])] => 
Iterator[(String, Int)], org.apache.spark.HashPartitioner, Boolean, 
org.apache.spark.rdd.RDD[(String, Seq[Int])]) 
val stateDstream = wordDstream.updateStateByKey[Seq[Int]](newUpdateFunc, 
 










Ricky  Ou(欧   锐)

部   门:苏宁云商 IT总部技术支撑研发中心大
数据中心数据平台开发部
tel      :18551600418
email  : our...@cnsuning.com 

 
From: Dean Wampler
Date: 2015-12-23 00:46
To: our...@cnsuning.com
CC: user; t...@databricks.com
Subject: Re: spark streaming updateStateByKey state is nonsupport other type 
except ClassTag such as list?
There are ClassTags for Array, List, and Map, as well as for Int, etc. that you 
might have inside those collections. What do you mean by sql? Could you post 
more of your code?

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition (O'Reilly)
Typesafe
@deanwampler
http://polyglotprogramming.com

On Mon, Dec 21, 2015 at 8:51 PM, our...@cnsuning.com <our...@cnsuning.com> 
wrote:

spark streaming updateStateByKey state no support Array type  without classTag? 
 how to slove the problem?

def updateStateByKey[S: ClassTag]( 
updateFunc: (Seq[V], Option[S]) => Option[S] 
): DStream[(K, S)] = ssc.withScope { 
updateStateByKey(updateFunc, defaultPartitioner()) 
}

ClassTag  not support other type eg:hashmap ,list ,sql.


my usecase as following:
     save the lastest three click log with collecting   goods  from  different 
topic with same  member ID,  then  the system will recommend related products 
according to  lastest three click log with collecting   goods.
I want to use updateStateByKey  state to save it ,however  updateStateByKey 
state is  nonsupport  other type except ClassTag  such as list.
 


                                                                                
                                                                    thanks for 
your help






Ricky  Ou(欧   锐)




Reply via email to