Thanks Sidney for your response,

To check if all the messages are processed I used accumulator and also add
a print statement for debuging.


*val accum = ssc.sparkContext.accumulator(0, "Debug Accumulator")*
*...*
*...*
*...*
*val mappedDataStream = dataStream.map(_._2);*
*      mappedDataStream.foreachRDD { rdd =>*
*...*
*...*
*...*
*partition.foreach { row =>*
*              if (debug) println(row.mkString)*
*              val keyedMessage = new KeyedMessage[String,
String](props.getProperty("outTopicUnharmonized"),*
*                null, row.toString())*
*              producer.send(keyedMessage)*
*              println("Messges sent to Kafka: " + keyedMessage.message)*
*              accum += 1*
*            }*
*            //hack, should be done with the flush*
*            Thread.sleep(1000)*
*            producer.close()*
*            print("Accumulator's value is: " + accum)*

And I am getting all the messages in "*println("Messges sent to Kafka: " +
keyedMessage.message)*" received by the stream, and accumulator value is
also same as number of incoming messages.



Best Regards,


[image: InfoObjects Inc.] <http://www.infoobjects.com/>
Vikash Pareek
Team Lead  *InfoObjects Inc.*
Big Data Analytics

m: +91 8800206898 a: E5, Jhalana Institutionall Area, Jaipur, Rajasthan
302004
w: www.linkedin.com/in/pvikash e: vikash.par...@infoobjects.com




On Thu, Jun 1, 2017 at 11:24 AM, Sidney Feiner <sidney.fei...@startapp.com>
wrote:

> Are you sure that every message gets processed? It could be that some
> messages failed passing the decoder.
> And during the processing, are you maybe putting the events into a map?
> That way, events with the same key could override each other and that way
> you'll have less final events.
>
> -----Original Message-----
> From: Vikash Pareek [mailto:vikash.par...@infoobjects.com]
> Sent: Tuesday, May 30, 2017 4:00 PM
> To: user@spark.apache.org
> Subject: Message getting lost in Kafka + Spark Streaming
>
> I am facing an issue related to spark streaming with kafka, my use case is
> as
> follow:
> 1. Spark streaming(DirectStream) application reading data/messages from
> kafka topic and process it 2. On the basis of proccessed message, app will
> write proccessed message to different kafka topics for e.g. if messgese is
> harmonized then write to harmonized topic else unharmonized topic
>
> the problem is that during the streaming somehow we are lossing some
> messaged i.e all the incoming messages are not written to harmonized or
> unharmonized topics.
> for e.g. if app received 30 messages in one batch then sometime it write
> all the messges to output topics(this is expected behaviour) but sometimes
> it writes only 27 (3 messages are lost, this number can change).
>
> Versions as follow:
> Spark 1.6.0
> Kafka 0.9
>
> Kafka topics confguration is as follow:
> # of brokers: 3
> # replicxation factor: 3
> # of paritions: 3
>
> Following are the properties we are using for kafka:
> *          val props = new Properties()
>           props.put("metadata.broker.list",
> properties.getProperty("metadataBrokerList"))
>           props.put("auto.offset.reset",
> properties.getProperty("autoOffsetReset"))
>           props.put("group.id", properties.getProperty("group.id"))
>           props.put("serializer.class", "kafka.serializer.StringEncoder")
>           props.put("outTopicHarmonized",
> properties.getProperty("outletKafkaTopicHarmonized"))
>           props.put("outTopicUnharmonized",
> properties.getProperty("outletKafkaTopicUnharmonized"))
>           props.put("acks", "all");
>           props.put("retries", "5");
>           props.put("request.required.acks", "-1")
> *
> Following is the piece of code where we are writing proccessed messges to
> kafka:
> *          val schemaRdd2 = finalHarmonizedDF.toJSON
>
>           schemaRdd2.foreachPartition { partition =>
>             val producerConfig = new ProducerConfig(props)
>             val producer = new Producer[String, String](producerConfig)
>
>             partition.foreach { row =>
>               if (debug) println(row.mkString)
>               val keyedMessage = new KeyedMessage[String,
> String](props.getProperty("outTopicHarmonized"),
>                 null, row.toString())
>               producer.send(keyedMessage)
>
>             }
>             //hack, should be done with the flush
>             Thread.sleep(1000)
>             producer.close()
>           }
> *
> We explicitely added sleep(1000) for testing purpose.
> But this is also not solving the problem :(
>
> Any suggestion would be appreciated.
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Message-getting-lost-in-Kafka-Spark-
> Streaming-tp28719.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

Reply via email to