Also you could use Producer singletion to improve the performance, since
now you have to create a Producer for each partition in each batch
duration, you could create a singleton object and reuse it (Producer is
tread safe as I know).

-Jerry

2015-03-30 15:13 GMT+08:00 Saisai Shao <sai.sai.s...@gmail.com>:

> Yeah, after review again about your code, the reason why you cannot
> receive any data is that your previous code lacks ACTION function of
> DStream, so the code actually doesn't execute, after you changing to the
> style as I mentioned, `foreachRDD` will trigger and run the jobs as you
> wrote.
>
> Yes, your understanding is correct.
>
> Thanks
> Jerry
>
>
>
> 2015-03-30 14:58 GMT+08:00 <luohui20...@sina.com>:
>
>>
>>
>>
>>
>> To Saisai:
>>
>>         it works after I correct some of them with your advices like
>> below:
>>
>>         Furthermore, I am not quite clear about which code running on
>> driver and which code running on executor, so i wrote my understanding in
>> comment. would you help check?  Thank you.
>>
>>
>>
>> To akhil:
>>
>>       yes, kafka has enough messages.I tested it with kafka producer
>> sending scala Random Int ,it also works. thanks.
>>
>>
>>
>> object SparkStreamingSampleDirectApproach2 {
>>   def main(args: Array[String]): Unit = {
>>     Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
>>     Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
>>
>>     val Array(brokers, topics) = Array("localhost:9092", "topic1")
>>     val conf = new
>> SparkConf().setMaster("local[2]").setAppName("SparkStreamingSampleDirectApproach").set("log4j.rootCategory",
>> "WARN, console")
>>     val ssc = new StreamingContext(conf, Seconds(1))
>>
>>     val topicsSet = topics.split(",").toSet
>>     val kafkaParams = Map[String, String]("metadata.broker.list" ->
>> brokers)
>>     val messages = KafkaUtils.createDirectStream[String, String,
>> StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
>>     //    messages.saveAsTextFiles("hdfs://localhost:8020/spark/data",
>> "test")
>> //    val lines = messages.map(_._2)
>> //    val words = lines.flatMap(_.split(" "))
>> //    val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
>> //    wordCounts.print()
>>
>>     val Array(brokers2, topic2) = Array("localhost:9092", "topic2")
>>     val props = new Properties()
>>     props.put("metadata.broker.list", brokers2)
>>     props.put("serializer.class", "kafka.serializer.StringEncoder")
>>
>>     val messages2 = messages.foreachRDD { rdd =>
>> //above code running on driver
>>       rdd.foreachPartition { record
>> =>                                                    //code from here
>> running on executor
>>         val config = new ProducerConfig(props)
>>         val producer = new Producer[String, String](config)
>>         record.foreach { piece =>
>>           val msg = new KeyedMessage[String, String](topic2, piece._2)
>>           producer.send(msg)
>>         }
>>
>>       }
>>     }
>>
>>     //    val messages2 = new KeyedMessage[String,
>> String](topic2,messages.toString())
>>     //    println(messages2)
>>
>>
>> ssc.start()
>> //again, running on driver
>>     ssc.awaitTermination()
>>   }
>> }
>>
>>
>>
>>
>>
>>
>> --------------------------------
>>
>> Thanks&amp;Best regards!
>> 罗辉 San.Luo
>>
>>  ----- 原始邮件 -----
>> 发件人:Saisai Shao <sai.sai.s...@gmail.com>
>> 收件人:luohui20...@sina.com
>> 抄送人:user <user@spark.apache.org>
>> 主题:Re: How SparkStreaming output messages to Kafka?
>> 日期:2015年03月30日 14点03分
>>
>> Hi Hui,
>>
>> Did you try the direct Kafka stream example under Spark Streaming's
>> examples? Does it still fail to receive the message? Also would you please
>> check all the setups including Kafka, test with Kafka console consumer to
>> see if Kafka is OK.
>>
>> Besides seeing from your code, there's some problems in your code, here:
>>
>>     val messages2 = new KeyedMessage[String, String](topic2,messages.
>> toString())
>>     println(messages2)
>>
>>     producer.send(messages2)
>>
>> This code snippets are not lazily evaluated, this will be executed ONLY
>> ONCE when running to here, so actually you may not write the data into the
>> Kafka, you need to write like this:
>>
>> messages.foreachRDD { r =>
>>      r.foreachPartition{ iter =>
>>          // create Producer
>>          // change this partition of data (iter) into keyedMessage and
>> write into Kafka.
>>         }
>> }
>>
>> This is the basic style, sorry for any missing parts and typos, also pay
>> a attention to serialization issue when you need to create executors on
>> remote side. Please take a try again.
>>
>> Thanks
>> Jerry
>>
>>
>>
>> 2015-03-30 13:34 GMT+08:00 <luohui20...@sina.com>:
>>
>>  Hi guys,
>>
>>           I am using SparkStreaming to receive message from kafka,process
>> it and then send back to kafka. however ,kafka consumer can not receive any
>> messages. Any one share ideas?
>>
>>
>>
>> here is my code:
>>
>>
>>
>> object SparkStreamingSampleDirectApproach {
>>   def main(args: Array[String]): Unit = {
>>     Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
>>     Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
>>
>>
>>     val Array(brokers, topics) = Array("localhost:9092", "topic1")
>>     val conf = new
>> SparkConf().setMaster("local[2]").setAppName("SparkStreamingSampleDirectApproach").set("log4j.rootCategory",
>> "WARN, console")
>>     val ssc = new StreamingContext(conf, Seconds(1))
>>
>>     val topicsSet = topics.split(",").toSet
>>     val kafkaParams = Map[String, String]("metadata.broker.list" ->
>> brokers)
>>     val messages = KafkaUtils.createDirectStream[String, String,
>> StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
>> //    messages.saveAsTextFiles("hdfs://localhost:8020/spark/data", "test")
>>     val lines = messages.map(_._2)
>>     val words = lines.flatMap(_.split(" "))
>>     val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
>>     wordCounts.print()
>>
>>     val Array(brokers2, topic2) = Array("localhost:9092", "topic2")
>>     val props = new Properties()
>>     props.put("metadata.broker.list", brokers)
>>     props.put("serializer.class", "kafka.serializer.StringEncoder")
>>
>>     val config = new ProducerConfig(props)
>>     val producer = new Producer[String, String](config)
>>
>> //    val messages2 = messages.map{line =>
>> //      new KeyedMessage[String, String](topic2,wordCounts.toString())
>> //    }.toArray
>>
>>     val messages2 = new KeyedMessage[String,
>> String](topic2,messages.toString())
>>     println(messages2)
>>
>>     producer.send(messages2)
>>
>>     ssc.start()
>>     ssc.awaitTermination()
>>   }
>> }
>>
>> --------------------------------
>>
>> Thanks&amp;Best regards!
>> 罗辉 San.Luo
>>
>>
>>
>

Reply via email to