hi, 
I'm trying to run simple kafka spark streaming example over spark-shell:

sc.stop
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext._
import kafka.serializer.DefaultDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.storage.StorageLevel
val sparkConf = new SparkConf().setAppName("Summarizer").setMaster("local")
val ssc = new StreamingContext(sparkConf, Seconds(10))
val kafkaParams = Map[String, String]("zookeeper.connect" -> "127.0.0.1:2181", 
"group.id" -> "test")
val messages = KafkaUtils.createStream[Array[Byte], Array[Byte], 
DefaultDecoder, DefaultDecoder](ssc, kafkaParams, Map("test" -> 1), 
StorageLevel.MEMORY_ONLY_SER).map(_._2)
messages.foreachRDD { pairRDD =>
println(s"DataListener.listen() [pairRDD = ${pairRDD}]")
println(s"DataListener.listen() [pairRDD.count = ${pairRDD.count()}]")
pairRDD.foreach(row => println(s"DataListener.listen() [row = ${row}]"))
}
ssc.start()
ssc.awaitTermination()


in spark output i'm able to find only following println log:
println(s"DataListener.listen() [pairRDD = ${pairRDD}]")

but unfortunately can't find output of:
println(s"DataListener.listen() [pairRDD.count = ${pairRDD.count()}]") and 
println(s"DataListener.listen() [row = ${row}]")

it's my spark-shell full output - http://pastebin.com/sfxbYYga 
<http://pastebin.com/sfxbYYga>

any ideas what i'm doing wrong? thanks!

Reply via email to