I want to disable auto commit for kafka SimpleConsumer. I am using 0.8.1 version.For High level consumer, config options can be set and passed via consumerConfig as follows kafka.consumer.Consumer.createJavaConsumerConnector(this.consumerConfig);
How can I achieve the same for SimpleConsumer? I mainly want to disable auto commit. I tried setting auto commit to false in consumer.properties and restarted kafka server, zookeeper and producer. But, that does not work. I think I need to apply this setting through code, not in consumer.properties. Can anyone help here? Here is how my code looks like List<TopicAndPartition> topicAndPartitionList = new ArrayList<>(); topicAndPartitionList.add(topicAndPartition); OffsetFetchResponse offsetFetchResponse = consumer.fetchOffsets(new OffsetFetchRequest("testGroup", topicAndPartitionList, (short) 0, correlationId, clientName)); Map<TopicAndPartition, OffsetMetadataAndError> offsets = offsetFetchResponse.offsets(); FetchRequest req = new FetchRequestBuilder() .clientId(clientName) .addFetch(a_topic, a_partition, offsets.get(topicAndPartition).offset(), 100000) .build(); long readOffset = offsets.get(topicAndPartition).offset(); FetchResponse fetchResponse = consumer.fetch(req); //Consume messages from fetchResponse Map<TopicAndPartition, OffsetMetadataAndError > requestInfo = new HashMap<> (); requestInfo.put(topicAndPartition, new OffsetMetadataAndError(readOffset, "metadata", (short)0)); OffsetCommitResponse offsetCommitResponse = consumer.commitOffsets(new OffsetCommitRequest("testGroup", requestInfo, (short)0, correlationId, clientName)); If above code crashes before committing offset, I still get latest offset as result of offsets.get(topicAndPartition).offset() in next run which makes me to think that auto commit of offset happens as code is executed.