I am facing an issue related to spark streaming with kafka, my use case is as
follow:
1. Spark streaming(DirectStream) application reading data/messages from
kafka topic and process it
2. On the basis of proccessed message, app will write proccessed message to
different kafka topics
for e.g. if messgese is harmonized then write to harmonized topic else
unharmonized topic
 
the problem is that during the streaming somehow we are lossing some
messaged i.e all the incoming messages are not written to harmonized or
unharmonized topics.
for e.g. if app received 30 messages in one batch then sometime it write all
the messges to output topics(this is expected behaviour) but sometimes it
writes only 27 (3 messages are lost, this number can change).
 
Versions as follow:
Spark 1.6.0
Kafka 0.9
 
Kafka topics confguration is as follow:
# of brokers: 3
# replicxation factor: 3
# of paritions: 3
 
Following are the properties we are using for kafka:
*          val props = new Properties()
          props.put("metadata.broker.list",
properties.getProperty("metadataBrokerList"))
          props.put("auto.offset.reset",
properties.getProperty("autoOffsetReset"))
          props.put("group.id", properties.getProperty("group.id"))
          props.put("serializer.class", "kafka.serializer.StringEncoder")
          props.put("outTopicHarmonized",
properties.getProperty("outletKafkaTopicHarmonized"))
          props.put("outTopicUnharmonized",
properties.getProperty("outletKafkaTopicUnharmonized"))
          props.put("acks", "all");
          props.put("retries", "5");
          props.put("request.required.acks", "-1")
* 
Following is the piece of code where we are writing proccessed messges to
kafka:
*          val schemaRdd2 = finalHarmonizedDF.toJSON
 
          schemaRdd2.foreachPartition { partition =>
            val producerConfig = new ProducerConfig(props)
            val producer = new Producer[String, String](producerConfig)
 
            partition.foreach { row =>
              if (debug) println(row.mkString)
              val keyedMessage = new KeyedMessage[String,
String](props.getProperty("outTopicHarmonized"),
                null, row.toString())
              producer.send(keyedMessage)
 
            }
            //hack, should be done with the flush
            Thread.sleep(1000)
            producer.close()
          }
* 
We explicitely added sleep(1000) for testing purpose.
But this is also not solving the problem :(
 
Any suggestion would be appreciated.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Message-getting-lost-in-Kafka-Spark-Streaming-tp28719.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to