Thanks Sidney for your response,

To check if all the messages are processed I used accumulator and also add
a print statement for debuging.

*val accum = ssc.sparkContext.accumulator(0, "Debug Accumulator")*
*val mappedDataStream =;*
*      mappedDataStream.foreachRDD { rdd =>*
*partition.foreach { row =>*
*              if (debug) println(row.mkString)*
*              val keyedMessage = new KeyedMessage[String,
*                null, row.toString())*
*              producer.send(keyedMessage)*
*              println("Messges sent to Kafka: " + keyedMessage.message)*
*              accum += 1*
*            }*
*            //hack, should be done with the flush*
*            Thread.sleep(1000)*
*            producer.close()*
*            print("Accumulator's value is: " + accum)*

And I am getting all the messages in "*println("Messges sent to Kafka: " +
keyedMessage.message)*" received by the stream, and accumulator value is
also same as number of incoming messages.

Best Regards,

> Are you sure that every message gets processed? It could be that some
> messages failed passing the decoder.
> And during the processing, are you maybe putting the events into a map?
> That way, events with the same key could override each other and that way
> you'll have less final events.
> I am facing an issue related to spark streaming with kafka, my use case is
> as
> follow:
> 1. Spark streaming(DirectStream) application reading data/messages from
> kafka topic and process it 2. On the basis of proccessed message, app will
> write proccessed message to different kafka topics for e.g. if messgese is
> harmonized then write to harmonized topic else unharmonized topic
> the problem is that during the streaming somehow we are lossing some
> messaged i.e all the incoming messages are not written to harmonized or
> unharmonized topics.
> for e.g. if app received 30 messages in one batch then sometime it write
> all the messges to output topics(this is expected behaviour) but sometimes
> it writes only 27 (3 messages are lost, this number can change).
> Versions as follow:
> Spark 1.6.0
> Kafka 0.9
> Kafka topics confguration is as follow:
> # of brokers: 3
> # replicxation factor: 3
> # of paritions: 3
> Following are the properties we are using for kafka:
> *          val props = new Properties()
>           props.put("",
> properties.getProperty("metadataBrokerList"))
>           props.put("auto.offset.reset",
> properties.getProperty("autoOffsetReset"))
>           props.put("", properties.getProperty(""))
>           props.put("serializer.class", "kafka.serializer.StringEncoder")
>           props.put("outTopicHarmonized",
> properties.getProperty("outletKafkaTopicHarmonized"))
>           props.put("outTopicUnharmonized",
> properties.getProperty("outletKafkaTopicUnharmonized"))
>           props.put("acks", "all");
>           props.put("retries", "5");
>           props.put("request.required.acks", "-1")
> *
> Following is the piece of code where we are writing proccessed messges to
> kafka:
> *          val schemaRdd2 = finalHarmonizedDF.toJSON
>           schemaRdd2.foreachPartition { partition =>
>             val producerConfig = new ProducerConfig(props)
>             val producer = new Producer[String, String](producerConfig)
>             partition.foreach { row =>
>               if (debug) println(row.mkString)
>               val keyedMessage = new KeyedMessage[String,
> String](props.getProperty("outTopicHarmonized"),
>                 null, row.toString())
>               producer.send(keyedMessage)
>             }
>             //hack, should be done with the flush
>             Thread.sleep(1000)
>             producer.close()
>           }
> *
> We explicitely added sleep(1000) for testing purpose.
> But this is also not solving the problem :(
> Any suggestion would be appreciated.
