I want to disable auto commit for kafka SimpleConsumer. I am using 0.8.1
version.For High level consumer, config options can be set and passed via
consumerConfig as follows
kafka.consumer.Consumer.createJavaConsumerConnector(this.consumerConfig);

How can I achieve the same for SimpleConsumer? I mainly want to disable
auto commit. I tried setting auto commit to false in consumer.properties
and restarted kafka server, zookeeper and producer. But, that does not
work. I think I need to apply this setting through code, not in
consumer.properties. Can anyone help here?

Here is how my code looks like

List<TopicAndPartition> topicAndPartitionList = new ArrayList<>();
topicAndPartitionList.add(topicAndPartition);
OffsetFetchResponse offsetFetchResponse = consumer.fetchOffsets(new
 OffsetFetchRequest("testGroup", topicAndPartitionList, (short) 0,
correlationId,    clientName));


Map<TopicAndPartition, OffsetMetadataAndError> offsets =
offsetFetchResponse.offsets();
FetchRequest req = new FetchRequestBuilder()
.clientId(clientName)
           .addFetch(a_topic, a_partition,
offsets.get(topicAndPartition).offset(), 100000)   .build();

long readOffset = offsets.get(topicAndPartition).offset();
FetchResponse fetchResponse = consumer.fetch(req);

//Consume messages from fetchResponse


Map<TopicAndPartition, OffsetMetadataAndError > requestInfo = new
HashMap<>  ();
requestInfo.put(topicAndPartition, new
OffsetMetadataAndError(readOffset, "metadata", (short)0));
OffsetCommitResponse offsetCommitResponse = consumer.commitOffsets(new
        OffsetCommitRequest("testGroup", requestInfo, (short)0,
correlationId, clientName));


If above code crashes before committing offset, I still get latest offset
as result of offsets.get(topicAndPartition).offset() in next run which
makes me to think that auto commit of offset happens as code is executed.

Reply via email to