Hello!

  I'm running into trouble using the latest Kafka client.

0.8.2.2 appears to be listed as a stable release on Maven Central:

http://mvnrepository.com/artifact/org.apache.kafka/kafka-clients

And it only includes the:

org.apache.kafka.clients.consumer.KafkaConsumer client

All the other Consumers listed in the documents do not appear to be
available in this release.

However, in the example code for the 0.8.2.2 branch, it covers the
ConsumerConnector client:

https://github.com/apache/kafka/blob/0.8.2.2/examples/src/main/java/kafka/examples/Consumer.java

Which no longer exists in the 0.8.2.2 release.

The KafkaConsumer client I get always returns null on poll(), similar to
behavior reported for the 0.8.2 branch (but not the 0.8.2.2 branch):

http://mail-archives.apache.org/mod_mbox/kafka-users/201502.mbox/%3CCAOeJiJj-c747Ak99qioytrD4=E24W8SiVqgx=ooqfkvdb7+...@mail.gmail.com%3E

So it appear to 0.8.2.2 shipped with an old, broken KafkaConsumer client,
but removed the older, working ConsumerConnector / MessageStream interface.

Is KafkaConsumer expected to work in 0.8.2.2 ?
Or are we expected to use the old client, and I'm somehow not seeing the
package?

I confirmed I have messages waiting by using the java producer api, and
listening with the consoleConsumer application, and it happily prints
whatever the producer sends. However, the ConsoleConsumer appears to be
using the scala API, so it can't provide any leads on how to use the java
one.

Or am I doing something wrong ?

scannerKafkaProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
scannerKafkaProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
"true");
scannerKafkaProperties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
"10");
scannerKafkaProperties.put(ConsumerConfig.SESSION_TIMEOUT_MS, "30000");
scannerKafkaProperties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY,
"roundrobin");
scannerKafkaProperties.put("zookeeper.session.timeout.ms", "400");
scannerKafkaProperties.put("zookeeper.sync.time.ms", "200");
scannerKafkaProperties.put("zookeeper.connect","localhost:2181");

...
private static final String DESERIALIZER =
"org.apache.kafka.common.serialization.StringDeserializer";
private static final String SERIALIZER =
"org.apache.kafka.common.serialization.StringSerializer";...

kafkaProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
DESERIALIZER);
kafkaProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
DESERIALIZER);
kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, SERIALIZER);
kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
SERIALIZER);
kafkaProperties.put("offsets.storage","kafka");
kafkaProperties.put("dual.commit.enabled", "false");
...
this.kafkaConsumer = new KafkaConsumer<>( kafkaProperties );
this.kafkaProducer = new KafkaProducer<>( kafkaProperties );
...
TopicPartition topicPartition = new TopicPartition(this.topic,0);
this.kafkaConsumer.subscribe(topicPartition);
...
while( this.running ) {
Map<String, ConsumerRecords<String, String>> messages =
this.kafkaConsumer.poll(messageWaitTimeout);
if( messages == null ) {
    //this.log.debug("Finished polling, no messages received.");
    for( int i = 0; i < 200; ++i ) {
        this.kafkaProducer.send(new ProducerRecord<>(this.topic, 0, "test",
"test"));
    }
    continue;
}
....
(And to re-iterate, the console consumer does pick up the messages, if I
run it, but the Java API does not).

Or is the 0.8.2.2 High-Level Java API simple not usable?

Take care,
  -stu

Reply via email to