You could do something like:

val stream = kafkaStream.getStream().repartition(1).mapPartitions(x => x.

Here stream will have 10 elements from the kafakaStream.

On Wed, Dec 3, 2014 at 1:05 PM, Hafiz Mujadid

> Hi Experts!
> Is there a way to read first N messages from kafka stream and put them in
> some collection and return to the caller for visualization purpose and
> close
> spark streaming.
> I will be glad to hear from you and will be thankful to you.
> Currently I have following code that
> def getsample(params: scala.collection.immutable.Map[String, String]): Unit
> = {
>     if (params.contains("zookeeperQourum"))
>       zkQuorum = params.get("zookeeperQourum").get
>     if (params.contains("userGroup"))
>       group = params.get("userGroup").get
>     if (params.contains("topics"))
>       topics = params.get("topics").get
>     if (params.contains("numberOfThreads"))
>       numThreads = params.get("numberOfThreads").get
>     if (params.contains("sink"))
>       sink = params.get("sink").get
>     if (params.contains("batchInterval"))
>       interval = params.get("batchInterval").get.toInt
>     val sparkConf = new
> SparkConf().setAppName("KafkaConsumer").setMaster("spark://cloud2-server:7077")
>     val ssc = new StreamingContext(sparkConf, Seconds(interval))
>     val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
>     var consumerConfig = scala.collection.immutable.Map.empty[String,
> String]
>     consumerConfig += ("auto.offset.reset" -> "smallest")
>     consumerConfig += ("zookeeper.connect" -> zkQuorum)
>     consumerConfig += ("" -> group)
>     var data = KafkaUtils.createStream[Array[Byte], Array[Byte],
> DefaultDecoder, DefaultDecoder](ssc, consumerConfig, topicMap,
> StorageLevel.MEMORY_ONLY).map(_._2)
>     val streams = data.window(Seconds(interval), Seconds(interval)).map(x
> =>
> new String(x))
>     streams.foreach(rdd => rdd.foreachPartition(itr => {
>       while (itr.hasNext && size >= 0) {
>         var
>         println(msg)
>         sample.append(msg)
>         sample.append("\n")
>         size -= 1
>       }
>     }))
>     ssc.start()
>     ssc.awaitTermination(5000)
>     ssc.stop(true)
>   }
> Where sample is a StringBuilder, when I print the contents of this string
> builder after getSample method call is returned. I got nothing in it.
> Any help will be appreciated
