You could do something like: val stream = kafkaStream.getStream().repartition(1).mapPartitions(x => x. take(*10*))
Here stream will have 10 elements from the kafakaStream. Thanks Best Regards On Wed, Dec 3, 2014 at 1:05 PM, Hafiz Mujadid <hafizmujadi...@gmail.com> wrote: > Hi Experts! > > Is there a way to read first N messages from kafka stream and put them in > some collection and return to the caller for visualization purpose and > close > spark streaming. > > I will be glad to hear from you and will be thankful to you. > > Currently I have following code that > > def getsample(params: scala.collection.immutable.Map[String, String]): Unit > = { > if (params.contains("zookeeperQourum")) > zkQuorum = params.get("zookeeperQourum").get > if (params.contains("userGroup")) > group = params.get("userGroup").get > if (params.contains("topics")) > topics = params.get("topics").get > if (params.contains("numberOfThreads")) > numThreads = params.get("numberOfThreads").get > if (params.contains("sink")) > sink = params.get("sink").get > if (params.contains("batchInterval")) > interval = params.get("batchInterval").get.toInt > val sparkConf = new > > SparkConf().setAppName("KafkaConsumer").setMaster("spark://cloud2-server:7077") > val ssc = new StreamingContext(sparkConf, Seconds(interval)) > val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap > var consumerConfig = scala.collection.immutable.Map.empty[String, > String] > consumerConfig += ("auto.offset.reset" -> "smallest") > consumerConfig += ("zookeeper.connect" -> zkQuorum) > consumerConfig += ("group.id" -> group) > var data = KafkaUtils.createStream[Array[Byte], Array[Byte], > DefaultDecoder, DefaultDecoder](ssc, consumerConfig, topicMap, > StorageLevel.MEMORY_ONLY).map(_._2) > val streams = data.window(Seconds(interval), Seconds(interval)).map(x > => > new String(x)) > streams.foreach(rdd => rdd.foreachPartition(itr => { > while (itr.hasNext && size >= 0) { > var msg=itr.next > println(msg) > sample.append(msg) > sample.append("\n") > size -= 1 > } > })) > ssc.start() > ssc.awaitTermination(5000) > ssc.stop(true) > } > > Where sample is a StringBuilder, when I print the contents of this string > builder after getSample method call is returned. I got nothing in it. > > > Any help will be appreciated > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/getting-firs-N-messages-froma-Kafka-topic-using-Spark-Streaming-tp20227.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >