Re: Message getting lost in Kafka + Spark Streaming
Thanks Sidney for your response, To check if all the messages are processed I used accumulator and also add a print statement for debuging. *val accum = ssc.sparkContext.accumulator(0, "Debug Accumulator")* *...* *...* *...* *val mappedDataStream = dataStream.map(_._2);* * mappedDataStream.foreachRDD { rdd =>* *...* *...* *...* *partition.foreach { row =>* * if (debug) println(row.mkString)* * val keyedMessage = new KeyedMessage[String, String](props.getProperty("outTopicUnharmonized"),* *null, row.toString())* * producer.send(keyedMessage)* * println("Messges sent to Kafka: " + keyedMessage.message)* * accum += 1* *}* *//hack, should be done with the flush* *Thread.sleep(1000)* *producer.close()* *print("Accumulator's value is: " + accum)* And I am getting all the messages in "*println("Messges sent to Kafka: " + keyedMessage.message)*" received by the stream, and accumulator value is also same as number of incoming messages. Best Regards, [image: InfoObjects Inc.] <http://www.infoobjects.com/> Vikash Pareek Team Lead *InfoObjects Inc.* Big Data Analytics m: +91 8800206898 a: E5, Jhalana Institutionall Area, Jaipur, Rajasthan 302004 w: www.linkedin.com/in/pvikash e: vikash.par...@infoobjects.com On Thu, Jun 1, 2017 at 11:24 AM, Sidney Feiner wrote: > Are you sure that every message gets processed? It could be that some > messages failed passing the decoder. > And during the processing, are you maybe putting the events into a map? > That way, events with the same key could override each other and that way > you'll have less final events. > > -Original Message- > From: Vikash Pareek [mailto:vikash.par...@infoobjects.com] > Sent: Tuesday, May 30, 2017 4:00 PM > To: user@spark.apache.org > Subject: Message getting lost in Kafka + Spark Streaming > > I am facing an issue related to spark streaming with kafka, my use case is > as > follow: > 1. Spark streaming(DirectStream) application reading data/messages from > kafka topic and process it 2. On the basis of proccessed message, app will > write proccessed message to different kafka topics for e.g. if messgese is > harmonized then write to harmonized topic else unharmonized topic > > the problem is that during the streaming somehow we are lossing some > messaged i.e all the incoming messages are not written to harmonized or > unharmonized topics. > for e.g. if app received 30 messages in one batch then sometime it write > all the messges to output topics(this is expected behaviour) but sometimes > it writes only 27 (3 messages are lost, this number can change). > > Versions as follow: > Spark 1.6.0 > Kafka 0.9 > > Kafka topics confguration is as follow: > # of brokers: 3 > # replicxation factor: 3 > # of paritions: 3 > > Following are the properties we are using for kafka: > * val props = new Properties() > props.put("metadata.broker.list", > properties.getProperty("metadataBrokerList")) > props.put("auto.offset.reset", > properties.getProperty("autoOffsetReset")) > props.put("group.id", properties.getProperty("group.id")) > props.put("serializer.class", "kafka.serializer.StringEncoder") > props.put("outTopicHarmonized", > properties.getProperty("outletKafkaTopicHarmonized")) > props.put("outTopicUnharmonized", > properties.getProperty("outletKafkaTopicUnharmonized")) > props.put("acks", "all"); > props.put("retries", "5"); > props.put("request.required.acks", "-1") > * > Following is the piece of code where we are writing proccessed messges to > kafka: > * val schemaRdd2 = finalHarmonizedDF.toJSON > > schemaRdd2.foreachPartition { partition => > val producerConfig = new ProducerConfig(props) > val producer = new Producer[String, String](producerConfig) > > partition.foreach { row => > if (debug) println(row.mkString) > val keyedMessage = new KeyedMessage[String, > String](props.getProperty("outTopicHarmonized"), > null, row.toString()) > producer.send(keyedMessage) > > } > //hack, should be done with the flush > Thread.sleep(1000) > producer.close() > } > * > We explicitely added sleep(1000) for testing purpose. > But this is also not solving the problem :( > > Any suggestion would be appreciated. > > > > -- > View this message in context: http://apache-spark-user-list. > 1001560.n3.nabble.com/Message-getting-lost-in-Kafka-Spark- > Streaming-tp28719.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
RE: Message getting lost in Kafka + Spark Streaming
Are you sure that every message gets processed? It could be that some messages failed passing the decoder. And during the processing, are you maybe putting the events into a map? That way, events with the same key could override each other and that way you'll have less final events. -Original Message- From: Vikash Pareek [mailto:vikash.par...@infoobjects.com] Sent: Tuesday, May 30, 2017 4:00 PM To: user@spark.apache.org Subject: Message getting lost in Kafka + Spark Streaming I am facing an issue related to spark streaming with kafka, my use case is as follow: 1. Spark streaming(DirectStream) application reading data/messages from kafka topic and process it 2. On the basis of proccessed message, app will write proccessed message to different kafka topics for e.g. if messgese is harmonized then write to harmonized topic else unharmonized topic the problem is that during the streaming somehow we are lossing some messaged i.e all the incoming messages are not written to harmonized or unharmonized topics. for e.g. if app received 30 messages in one batch then sometime it write all the messges to output topics(this is expected behaviour) but sometimes it writes only 27 (3 messages are lost, this number can change). Versions as follow: Spark 1.6.0 Kafka 0.9 Kafka topics confguration is as follow: # of brokers: 3 # replicxation factor: 3 # of paritions: 3 Following are the properties we are using for kafka: * val props = new Properties() props.put("metadata.broker.list", properties.getProperty("metadataBrokerList")) props.put("auto.offset.reset", properties.getProperty("autoOffsetReset")) props.put("group.id", properties.getProperty("group.id")) props.put("serializer.class", "kafka.serializer.StringEncoder") props.put("outTopicHarmonized", properties.getProperty("outletKafkaTopicHarmonized")) props.put("outTopicUnharmonized", properties.getProperty("outletKafkaTopicUnharmonized")) props.put("acks", "all"); props.put("retries", "5"); props.put("request.required.acks", "-1") * Following is the piece of code where we are writing proccessed messges to kafka: * val schemaRdd2 = finalHarmonizedDF.toJSON schemaRdd2.foreachPartition { partition => val producerConfig = new ProducerConfig(props) val producer = new Producer[String, String](producerConfig) partition.foreach { row => if (debug) println(row.mkString) val keyedMessage = new KeyedMessage[String, String](props.getProperty("outTopicHarmonized"), null, row.toString()) producer.send(keyedMessage) } //hack, should be done with the flush Thread.sleep(1000) producer.close() } * We explicitely added sleep(1000) for testing purpose. But this is also not solving the problem :( Any suggestion would be appreciated. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Message-getting-lost-in-Kafka-Spark-Streaming-tp28719.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Message getting lost in Kafka + Spark Streaming
First thing I noticed, you should be using a singleton kafka producer, not recreating one every partition. It's threadsafe. On Tue, May 30, 2017 at 7:59 AM, Vikash Pareek wrote: > I am facing an issue related to spark streaming with kafka, my use case is as > follow: > 1. Spark streaming(DirectStream) application reading data/messages from > kafka topic and process it > 2. On the basis of proccessed message, app will write proccessed message to > different kafka topics > for e.g. if messgese is harmonized then write to harmonized topic else > unharmonized topic > > the problem is that during the streaming somehow we are lossing some > messaged i.e all the incoming messages are not written to harmonized or > unharmonized topics. > for e.g. if app received 30 messages in one batch then sometime it write all > the messges to output topics(this is expected behaviour) but sometimes it > writes only 27 (3 messages are lost, this number can change). > > Versions as follow: > Spark 1.6.0 > Kafka 0.9 > > Kafka topics confguration is as follow: > # of brokers: 3 > # replicxation factor: 3 > # of paritions: 3 > > Following are the properties we are using for kafka: > * val props = new Properties() > props.put("metadata.broker.list", > properties.getProperty("metadataBrokerList")) > props.put("auto.offset.reset", > properties.getProperty("autoOffsetReset")) > props.put("group.id", properties.getProperty("group.id")) > props.put("serializer.class", "kafka.serializer.StringEncoder") > props.put("outTopicHarmonized", > properties.getProperty("outletKafkaTopicHarmonized")) > props.put("outTopicUnharmonized", > properties.getProperty("outletKafkaTopicUnharmonized")) > props.put("acks", "all"); > props.put("retries", "5"); > props.put("request.required.acks", "-1") > * > Following is the piece of code where we are writing proccessed messges to > kafka: > * val schemaRdd2 = finalHarmonizedDF.toJSON > > schemaRdd2.foreachPartition { partition => > val producerConfig = new ProducerConfig(props) > val producer = new Producer[String, String](producerConfig) > > partition.foreach { row => > if (debug) println(row.mkString) > val keyedMessage = new KeyedMessage[String, > String](props.getProperty("outTopicHarmonized"), > null, row.toString()) > producer.send(keyedMessage) > > } > //hack, should be done with the flush > Thread.sleep(1000) > producer.close() > } > * > We explicitely added sleep(1000) for testing purpose. > But this is also not solving the problem :( > > Any suggestion would be appreciated. > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Message-getting-lost-in-Kafka-Spark-Streaming-tp28719.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Message getting lost in Kafka + Spark Streaming
I am facing an issue related to spark streaming with kafka, my use case is as follow: 1. Spark streaming(DirectStream) application reading data/messages from kafka topic and process it 2. On the basis of proccessed message, app will write proccessed message to different kafka topics for e.g. if messgese is harmonized then write to harmonized topic else unharmonized topic the problem is that during the streaming somehow we are lossing some messaged i.e all the incoming messages are not written to harmonized or unharmonized topics. for e.g. if app received 30 messages in one batch then sometime it write all the messges to output topics(this is expected behaviour) but sometimes it writes only 27 (3 messages are lost, this number can change). Versions as follow: Spark 1.6.0 Kafka 0.9 Kafka topics confguration is as follow: # of brokers: 3 # replicxation factor: 3 # of paritions: 3 Following are the properties we are using for kafka: * val props = new Properties() props.put("metadata.broker.list", properties.getProperty("metadataBrokerList")) props.put("auto.offset.reset", properties.getProperty("autoOffsetReset")) props.put("group.id", properties.getProperty("group.id")) props.put("serializer.class", "kafka.serializer.StringEncoder") props.put("outTopicHarmonized", properties.getProperty("outletKafkaTopicHarmonized")) props.put("outTopicUnharmonized", properties.getProperty("outletKafkaTopicUnharmonized")) props.put("acks", "all"); props.put("retries", "5"); props.put("request.required.acks", "-1") * Following is the piece of code where we are writing proccessed messges to kafka: * val schemaRdd2 = finalHarmonizedDF.toJSON schemaRdd2.foreachPartition { partition => val producerConfig = new ProducerConfig(props) val producer = new Producer[String, String](producerConfig) partition.foreach { row => if (debug) println(row.mkString) val keyedMessage = new KeyedMessage[String, String](props.getProperty("outTopicHarmonized"), null, row.toString()) producer.send(keyedMessage) } //hack, should be done with the flush Thread.sleep(1000) producer.close() } * We explicitely added sleep(1000) for testing purpose. But this is also not solving the problem :( Any suggestion would be appreciated. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Message-getting-lost-in-Kafka-Spark-Streaming-tp28719.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org