Actually in 0.8.2.2 only kafkaproducer is fully implemented not
Kafkaconsumer.

Here is the implementation of kafkaconsumer poll method in 0.8.2.2.

@Override

    public Map<String, ConsumerRecords<K,V>> poll(long timeout) {

        // TODO Auto-generated method stub

        return null;

    }

KafkaConsumer will be released in 0.9, (perhaps in this Nov, as stated by
some people on this mailer).

Best Regards,
Hemant
9810752184 / 9013982184
On 31-Oct-2015 3:29 am, "Stu Smith" <stu26c...@gmail.com> wrote:

> Note that I did work around this issue, by including the entire Kafka:
>
> <dependency>
>     <groupId>org.apache.kafka</groupId>
>     <artifactId>kafka_2.10</artifactId>
>     <version>0.8.2.2</version>
> </dependency>
>
> dependency, and using the legacy Consumer API,
> instead of the kafka-clients dependency:
>
> <dependency>
>     <groupId>org.apache.kafka</groupId>
>     <artifactId>kafka-clients</artifactId>
>     <version>0.8.2.2</version>
> </dependency>
>
> Listed in the documentation:
>
> http://kafka.apache.org/documentation.html#producerapi
>
> Since this dependency is called out right before it outlines the Consumer
> API, and the Consumer API docs don't mention that the Consumer API in the
> kafka-clients dependency is broken, it might be helpful if documentation
> points at that the kafka-clients dependency contains a broken Consumer, and
> the kafka_2.10 dependency should be used to access the legacy api.
>
> Take care,
>   -stu
>
> On Fri, Oct 30, 2015 at 2:07 PM, Stu Smith <stu26c...@gmail.com> wrote:
>
> > Hello!
> >
> >   I'm running into trouble using the latest Kafka client.
> >
> > 0.8.2.2 appears to be listed as a stable release on Maven Central:
> >
> > http://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
> >
> > And it only includes the:
> >
> > org.apache.kafka.clients.consumer.KafkaConsumer client
> >
> > All the other Consumers listed in the documents do not appear to be
> > available in this release.
> >
> > However, in the example code for the 0.8.2.2 branch, it covers the
> > ConsumerConnector client:
> >
> >
> >
> https://github.com/apache/kafka/blob/0.8.2.2/examples/src/main/java/kafka/examples/Consumer.java
> >
> > Which no longer exists in the 0.8.2.2 release.
> >
> > The KafkaConsumer client I get always returns null on poll(), similar to
> > behavior reported for the 0.8.2 branch (but not the 0.8.2.2 branch):
> >
> >
> >
> http://mail-archives.apache.org/mod_mbox/kafka-users/201502.mbox/%3CCAOeJiJj-c747Ak99qioytrD4=E24W8SiVqgx=ooqfkvdb7+...@mail.gmail.com%3E
> >
> > So it appear to 0.8.2.2 shipped with an old, broken KafkaConsumer client,
> > but removed the older, working ConsumerConnector / MessageStream
> interface.
> >
> > Is KafkaConsumer expected to work in 0.8.2.2 ?
> > Or are we expected to use the old client, and I'm somehow not seeing the
> > package?
> >
> > I confirmed I have messages waiting by using the java producer api, and
> > listening with the consoleConsumer application, and it happily prints
> > whatever the producer sends. However, the ConsoleConsumer appears to be
> > using the scala API, so it can't provide any leads on how to use the java
> > one.
> >
> > Or am I doing something wrong ?
> >
> > scannerKafkaProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
> > "localhost:9092");
> > scannerKafkaProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
> > "true");
> > scannerKafkaProperties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
> > "10");
> > scannerKafkaProperties.put(ConsumerConfig.SESSION_TIMEOUT_MS, "30000");
> > scannerKafkaProperties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY,
> > "roundrobin");
> > scannerKafkaProperties.put("zookeeper.session.timeout.ms", "400");
> > scannerKafkaProperties.put("zookeeper.sync.time.ms", "200");
> > scannerKafkaProperties.put("zookeeper.connect","localhost:2181");
> >
> > ...
> > private static final String DESERIALIZER =
> > "org.apache.kafka.common.serialization.StringDeserializer";
> > private static final String SERIALIZER =
> > "org.apache.kafka.common.serialization.StringSerializer";...
> >
> > kafkaProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
> > DESERIALIZER);
> > kafkaProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
> > DESERIALIZER);
> > kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
> > SERIALIZER);
> > kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
> > SERIALIZER);
> > kafkaProperties.put("offsets.storage","kafka");
> > kafkaProperties.put("dual.commit.enabled", "false");
> > ...
> > this.kafkaConsumer = new KafkaConsumer<>( kafkaProperties );
> > this.kafkaProducer = new KafkaProducer<>( kafkaProperties );
> > ...
> > TopicPartition topicPartition = new TopicPartition(this.topic,0);
> > this.kafkaConsumer.subscribe(topicPartition);
> > ...
> > while( this.running ) {
> > Map<String, ConsumerRecords<String, String>> messages =
> > this.kafkaConsumer.poll(messageWaitTimeout);
> > if( messages == null ) {
> >     //this.log.debug("Finished polling, no messages received.");
> >     for( int i = 0; i < 200; ++i ) {
> >         this.kafkaProducer.send(new ProducerRecord<>(this.topic, 0,
> > "test", "test"));
> >     }
> >     continue;
> > }
> > ....
> > (And to re-iterate, the console consumer does pick up the messages, if I
> > run it, but the Java API does not).
> >
> > Or is the 0.8.2.2 High-Level Java API simple not usable?
> >
> > Take care,
> >   -stu
> >
>

Reply via email to