Hello With Respect, Here I met a problem when use scala API to send/receive data to/from kafka brokers. I write a very simple producer and consumer code(just like the official examples), I found the code with Java API can work correctly, but the code with Scala API may lost data. Here is details.
Config: I down load kafka_2.11-0.10.0.0.tgz binary files and start it on single mode. Just one broker and one zookeeper, use default configuration. Problem: (1)Java API Test I write a simple consumer and producer program with Java API first. The producer code is like this code A void produce() { int messageNo = 1; while (messageNo <= Config.count) { for (String topic : KafkaConfig.topics.split(",")) { String key = String.valueOf(messageNo); String data = topic + "-" + new Date(); producer.send(new KeyedMessage<String, String>(topic, key ,data)); System.out.println(topic + "#" + key + "#" + data); } messageNo ++; } } -------------------------------------------------------------------------------------------------------------------------------------------------------------- The consumer code is like this: code B void consume() { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties()); StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties()); for (String topic : Config.topics.split(",")) { topicCountMap.put(topic, new Integer(1)); } final Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder); for (final String topic : Config.topics.split(",")) { new Thread(new Runnable() { public void run() { int count = 0; KafkaStream<String, String> stream = consumerMap.get(topic).get(0); ConsumerIterator<String, String> it = stream.iterator(); while (it.hasNext()) { count ++; MessageAndMetadata<String,String> message = it.next(); System.out.println(count + "#" + message.topic() +":" + message.key() + ":"+message.message()); } } }).start(); } } -------------------------------------------------------------------------------------------------------------------------------------------------------------- As I change the number of Config.count (which is the total number of every topic, here I use two topic -- a and b) I found that the consumer will receive the same number data no matter what the count is. So Java API is correct, but when I do the same thing using Scala API, I found some data may lost when send to the kafka brokers. (2) Kafka API Test I write a simple producer program with Scala API, part of it like this -------------------------------------------------------------------------------------------------------------------------------------------------------------- code C def main(args:Array[String]): Unit ={ val producer = { // Zookeeper connection properties val props = new util.HashMap[String, Object]() props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Config.brokers) props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") props.put(ProducerConfig.ACKS_CONFIG,"-1") new KafkaProducer[String, String](props) } (1 to Config.count).foreach(key => { Config.topics.split(",").foreach( topic =>{ val data = topic+new Date().toString val message = new ProducerRecord[String,String](topic, key+"", data) producer.send(message) System.out.println(topic + "#" + key + "#" + data) }) }) } -------------------------------------------------------------------------------------------------------------------------------------------------------------- Step: (1) First I start a consumer node waiting for receiving data;(use code B above) (2) Then I start a producer node to produce data. (use code C above) Problem: I found that when I start code C, the producer produce data very fast that sending 100 messages takes no more than 1 second. While the Producer with Java API(code A above) cannot send data so fast; the consumer(code B) can only receive 34 topics a and 34 topics b then stand by, in fact I produce 100 messages. I have changed the count of the message that I send to Kafka, but no matter how many data I want to send, the producer can only receive half then I send sometimes less. More messages I send then more data I will lost.[lost means I can not send them successfully to brokers or I send successfully but cannot receive? In fact what I saw is I received part of the data and will not consume any more] Try: I found that the producer of Scala API send data too fast, so I add Thread.sleep(time) after send a single data. I found I do works! when I set the time = 100, I found It rearly lost data. But sometimes I still cannot receive the same count of data I send. Question: If I use the scala API wrong? Or something else... I offered my codes in attachment. Looking forward your reply and advice.