PS: I am using Spark1.6.1, kafka 0.10.0.0

--------------------------------

 

Thanks&Best regards!
San.Luo

----- 原始邮件 -----
发件人:<luohui20...@sina.com>
收件人:"user" <user@spark.apache.org>
主题:How to get recommand result for users in a kafka SparkStreaming Application
日期:2016年08月03日 15点01分

hello guys:      I have an app which consumes json messages from kafka and 
recommend movies for the users in those messages ,the code like this :

    conf.setAppName("KafkaStreaming")
    val storageLevel = StorageLevel.DISK_ONLY
    val ssc = new StreamingContext(conf, Seconds(batchInterval.toInt / 1000))
    val kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap, 
storageLevel)    val topicMap = topics.split(",").map((_, 
numThreads.toInt)).toMap    kafkaStream.foreachRDD { rdd =>
      val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
      val ALSModel = MatrixFactorizationModel.load(rdd.sparkContext, 
"/user/hadoop/model/myCollaborativeFilter20160802/")
      val recRdd = sqlContext.read.json(rdd.values)
      val getRecResult = org.apache.spark.sql.functions.udf((x: Long) =>
        ALSModel.recommendProducts(x.toInt, 10).mkString
        )      val resultDF = recRdd.withColumn("recommandresult", 
getRecResult(recRdd.col("userid")))//      val resultDF2 = recRdd.map { x =>
//        ALSModel.recommendProducts(x.getLong(3).toInt, 10)
//      }
      println("output result:")
      resultDF.collect.foreach(println)    }
    ssc.start()
    ssc.awaitTermination()

here are the logs of my app:16/08/03 14:40:49 WARN TaskSetManager: Lost task 
0.0 in stage 20.0 (TID 133, slave62): org.apache.spark.SparkException: RDD 
transformations and actions can only be invoked by the driver, not inside of 
other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is 
invalid because the values transformation and count action cannot be performed 
inside of the rdd1.map transformation. For more information, see SPARK-5063.
        at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
        at 
org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:928)
        at 
org.apache.spark.mllib.recommendation.MatrixFactorizationModel.recommendProducts(MatrixFactorizationModel.scala:168)
        at 
org.brave.spark.streaming.KafkaSparkStreaming2$$anonfun$main$1$$anonfun$2.apply(KafkaSparkStreaming2.scala:51)
        at 
org.brave.spark.streaming.KafkaSparkStreaming2$$anonfun$main$1$$anonfun$2.apply(KafkaSparkStreaming2.scala:50)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
 Source)
        at 
org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51)
        at 
org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
        at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
        at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
        at scala.collection.AbstractIterator.to(Iterator.scala:1157)
        at 
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
        at 
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
        at 
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927)
        at 
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

It seems that method MatrixFactorizationModel.recommendProducts is also a 
transformation.Is there another way to get the recommended movies for userids 
from a kafka streaming messages?

my data is like this:
ProducerRecord(topic=test, partition=null, key=null, 
value={"userid":29694,"movieid":6503,"rating":2.5,"timestamp":1.088441729E9}, 
timestamp=null)
ProducerRecord(topic=test, partition=null, key=null, 
value={"userid":33063,"movieid":36,"rating":3.0,"timestamp":9.02034829E8}, 
timestamp=null)     .........       

--------------------------------

 

Thanks&amp;Best regards!
San.Luo

Reply via email to