Hello everyone, I have use kafka_2.11-0.10.0.0, I write a simple consumer
code. But I can't find consumer offset in zookeeper(I have check
/consumers??/config/clients??/config/topics/__consumer_offsets). I want to know
where does kafka store consumer?
here is my consumer code:
package com.nfsq.fire.eye.message; import
org.apache.kafka.clients.consumer.ConsumerRecord; import
org.apache.kafka.clients.consumer.ConsumerRecords; import
org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.ArrayList;
import java.util.Arrays; import java.util.List; import java.util.Properties;
/** * Created by letianyipin on 16/6/13. */ public class
KafkaSimpleConsumerDemo { public static void main(String[] args) {
Properties props = new Properties(); props.put("bootstrap.servers",
"10.4.250.5:9092,10.4.250.6:9092,10.4.250.7:9092");
props.put("group.id", "test"); props.put("enable.auto.commit",
"false"); // props.put("auto.commit.interval.ms", "1000000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("client.id", "22i am terry d342"); //
props.put("autooffset.reset", "smallest"); //
props.put("autocommit.enable", false); KafkaConsumer<String, String>
consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("say-hello-test1")); final int
minBatchSize = 200; List<ConsumerRecord<String, String>> buffer = new
ArrayList<>(); while (true) { ConsumerRecords<String,
String> records = consumer.poll(100); for
(ConsumerRecord<String, String> record : records){
buffer.add(record); } consumer.commitSync(); }
} }