Hello! I'm running into trouble using the latest Kafka client.
0.8.2.2 appears to be listed as a stable release on Maven Central: http://mvnrepository.com/artifact/org.apache.kafka/kafka-clients And it only includes the: org.apache.kafka.clients.consumer.KafkaConsumer client All the other Consumers listed in the documents do not appear to be available in this release. However, in the example code for the 0.8.2.2 branch, it covers the ConsumerConnector client: https://github.com/apache/kafka/blob/0.8.2.2/examples/src/main/java/kafka/examples/Consumer.java Which no longer exists in the 0.8.2.2 release. The KafkaConsumer client I get always returns null on poll(), similar to behavior reported for the 0.8.2 branch (but not the 0.8.2.2 branch): http://mail-archives.apache.org/mod_mbox/kafka-users/201502.mbox/%3CCAOeJiJj-c747Ak99qioytrD4=E24W8SiVqgx=ooqfkvdb7+...@mail.gmail.com%3E So it appear to 0.8.2.2 shipped with an old, broken KafkaConsumer client, but removed the older, working ConsumerConnector / MessageStream interface. Is KafkaConsumer expected to work in 0.8.2.2 ? Or are we expected to use the old client, and I'm somehow not seeing the package? I confirmed I have messages waiting by using the java producer api, and listening with the consoleConsumer application, and it happily prints whatever the producer sends. However, the ConsoleConsumer appears to be using the scala API, so it can't provide any leads on how to use the java one. Or am I doing something wrong ? scannerKafkaProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); scannerKafkaProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); scannerKafkaProperties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "10"); scannerKafkaProperties.put(ConsumerConfig.SESSION_TIMEOUT_MS, "30000"); scannerKafkaProperties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, "roundrobin"); scannerKafkaProperties.put("zookeeper.session.timeout.ms", "400"); scannerKafkaProperties.put("zookeeper.sync.time.ms", "200"); scannerKafkaProperties.put("zookeeper.connect","localhost:2181"); ... private static final String DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer"; private static final String SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";... kafkaProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, DESERIALIZER); kafkaProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, DESERIALIZER); kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, SERIALIZER); kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SERIALIZER); kafkaProperties.put("offsets.storage","kafka"); kafkaProperties.put("dual.commit.enabled", "false"); ... this.kafkaConsumer = new KafkaConsumer<>( kafkaProperties ); this.kafkaProducer = new KafkaProducer<>( kafkaProperties ); ... TopicPartition topicPartition = new TopicPartition(this.topic,0); this.kafkaConsumer.subscribe(topicPartition); ... while( this.running ) { Map<String, ConsumerRecords<String, String>> messages = this.kafkaConsumer.poll(messageWaitTimeout); if( messages == null ) { //this.log.debug("Finished polling, no messages received."); for( int i = 0; i < 200; ++i ) { this.kafkaProducer.send(new ProducerRecord<>(this.topic, 0, "test", "test")); } continue; } .... (And to re-iterate, the console consumer does pick up the messages, if I run it, but the Java API does not). Or is the 0.8.2.2 High-Level Java API simple not usable? Take care, -stu