Actually in 0.8.2.2 only kafkaproducer is fully implemented not Kafkaconsumer.
Here is the implementation of kafkaconsumer poll method in 0.8.2.2. @Override public Map<String, ConsumerRecords<K,V>> poll(long timeout) { // TODO Auto-generated method stub return null; } KafkaConsumer will be released in 0.9, (perhaps in this Nov, as stated by some people on this mailer). Best Regards, Hemant 9810752184 / 9013982184 On 31-Oct-2015 3:29 am, "Stu Smith" <stu26c...@gmail.com> wrote: > Note that I did work around this issue, by including the entire Kafka: > > <dependency> > <groupId>org.apache.kafka</groupId> > <artifactId>kafka_2.10</artifactId> > <version>0.8.2.2</version> > </dependency> > > dependency, and using the legacy Consumer API, > instead of the kafka-clients dependency: > > <dependency> > <groupId>org.apache.kafka</groupId> > <artifactId>kafka-clients</artifactId> > <version>0.8.2.2</version> > </dependency> > > Listed in the documentation: > > http://kafka.apache.org/documentation.html#producerapi > > Since this dependency is called out right before it outlines the Consumer > API, and the Consumer API docs don't mention that the Consumer API in the > kafka-clients dependency is broken, it might be helpful if documentation > points at that the kafka-clients dependency contains a broken Consumer, and > the kafka_2.10 dependency should be used to access the legacy api. > > Take care, > -stu > > On Fri, Oct 30, 2015 at 2:07 PM, Stu Smith <stu26c...@gmail.com> wrote: > > > Hello! > > > > I'm running into trouble using the latest Kafka client. > > > > 0.8.2.2 appears to be listed as a stable release on Maven Central: > > > > http://mvnrepository.com/artifact/org.apache.kafka/kafka-clients > > > > And it only includes the: > > > > org.apache.kafka.clients.consumer.KafkaConsumer client > > > > All the other Consumers listed in the documents do not appear to be > > available in this release. > > > > However, in the example code for the 0.8.2.2 branch, it covers the > > ConsumerConnector client: > > > > > > > https://github.com/apache/kafka/blob/0.8.2.2/examples/src/main/java/kafka/examples/Consumer.java > > > > Which no longer exists in the 0.8.2.2 release. > > > > The KafkaConsumer client I get always returns null on poll(), similar to > > behavior reported for the 0.8.2 branch (but not the 0.8.2.2 branch): > > > > > > > http://mail-archives.apache.org/mod_mbox/kafka-users/201502.mbox/%3CCAOeJiJj-c747Ak99qioytrD4=E24W8SiVqgx=ooqfkvdb7+...@mail.gmail.com%3E > > > > So it appear to 0.8.2.2 shipped with an old, broken KafkaConsumer client, > > but removed the older, working ConsumerConnector / MessageStream > interface. > > > > Is KafkaConsumer expected to work in 0.8.2.2 ? > > Or are we expected to use the old client, and I'm somehow not seeing the > > package? > > > > I confirmed I have messages waiting by using the java producer api, and > > listening with the consoleConsumer application, and it happily prints > > whatever the producer sends. However, the ConsoleConsumer appears to be > > using the scala API, so it can't provide any leads on how to use the java > > one. > > > > Or am I doing something wrong ? > > > > scannerKafkaProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, > > "localhost:9092"); > > scannerKafkaProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, > > "true"); > > scannerKafkaProperties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, > > "10"); > > scannerKafkaProperties.put(ConsumerConfig.SESSION_TIMEOUT_MS, "30000"); > > scannerKafkaProperties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, > > "roundrobin"); > > scannerKafkaProperties.put("zookeeper.session.timeout.ms", "400"); > > scannerKafkaProperties.put("zookeeper.sync.time.ms", "200"); > > scannerKafkaProperties.put("zookeeper.connect","localhost:2181"); > > > > ... > > private static final String DESERIALIZER = > > "org.apache.kafka.common.serialization.StringDeserializer"; > > private static final String SERIALIZER = > > "org.apache.kafka.common.serialization.StringSerializer";... > > > > kafkaProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, > > DESERIALIZER); > > kafkaProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > > DESERIALIZER); > > kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, > > SERIALIZER); > > kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, > > SERIALIZER); > > kafkaProperties.put("offsets.storage","kafka"); > > kafkaProperties.put("dual.commit.enabled", "false"); > > ... > > this.kafkaConsumer = new KafkaConsumer<>( kafkaProperties ); > > this.kafkaProducer = new KafkaProducer<>( kafkaProperties ); > > ... > > TopicPartition topicPartition = new TopicPartition(this.topic,0); > > this.kafkaConsumer.subscribe(topicPartition); > > ... > > while( this.running ) { > > Map<String, ConsumerRecords<String, String>> messages = > > this.kafkaConsumer.poll(messageWaitTimeout); > > if( messages == null ) { > > //this.log.debug("Finished polling, no messages received."); > > for( int i = 0; i < 200; ++i ) { > > this.kafkaProducer.send(new ProducerRecord<>(this.topic, 0, > > "test", "test")); > > } > > continue; > > } > > .... > > (And to re-iterate, the console consumer does pick up the messages, if I > > run it, but the Java API does not). > > > > Or is the 0.8.2.2 High-Level Java API simple not usable? > > > > Take care, > > -stu > > >