I didn't realize there was a commitOffset() method on the high level consumer (the code is abstracted by the Spring Integration classes). Yes, this actually suits my needs and I was able to get it to work for my use case. Thank you very much - that was extremely helpful.
In case it's of any use to someone else, here's the solution I came up with. Spring Configuration file <int-kafka:inbound-channel-adapter id="kafkaInboundChannelAdapter" kafka-consumer-context-ref="kafkaConsumerContext" auto-startup="true" channel="exchangeKafkaFusionInboundSpringExecutorChannel" > <int:poller fixed-delay="10" time-unit="MILLISECONDS" max-messages-per-poll="5" > <!-- Kafka 0.8.1.1 does not have the concept of transactions. Therefore we must handle this on our own. --> <!-- To do so on the consumer we've set the "autocommit.enable=false" so commits won't automatically be performed as we read msgs. --> <!-- The advice-chain below will call consumerConfig.getConsumerConnector().commitOffsets() only if no exceptions occurred during the processing of this chunk of msgs. --> <int:advice-chain> <bean id="kafkaConsumerAfterAdvice" class="com.citigroup.tmg.exchgateway.common.util.springintegration.KafkaConsumerAfterAdvice"> <property name="consumerContext" ref="kafkaConsumerContext"/> <property name="consumerGroupId" value="consumerGroupG"/> </bean> </int:advice-chain> </int:poller> <int-kafka:producer-context id="kafkaProducerContext"> <int-kafka:producer-configurations> <int-kafka:producer-configuration broker-list="175.65.76.12:9092" key-class-type="java.lang.String" value-class-type="java.lang.String" topic="test" compression-codec="default"/> </int-kafka:producer-configurations> </int-kafka:producer-context> <int-kafka:zookeeper-connect id="zookeeperConnect" zk-connect="175.65.76.12:2181" zk-connection-timeout="6000" zk-session-timeout="6000" zk-sync-time="2000" /> <!-- See http://kafka.apache.org/documentation.html#consumerconfigs --> <!-- or "high-level consumer" on http://kafka.apache.org/07/configuration.html or https://kafka.apache.org/08/configuration.html --> <!-- http://grokbase.com/t/kafka/users/12b9bmsy7k/question-about-resetting-offsets-and-the-high-level-consumer --> <bean id="kafkaConsumerProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean"> <property name="properties"> <props> <prop key="autocommit.enable">false</prop> <prop key="auto.offset.reset">largest</prop> </props> </property> </bean> <int-kafka:consumer-context id="kafkaConsumerContext" consumer-timeout="4000" zookeeper-connect="zookeeperConnect" consumer-properties="kafkaConsumerProperties"> <int-kafka:consumer-configurations> <int-kafka:consumer-configuration group-id="consumerGroupG" max-messages="20000"> <int-kafka:topic id="test-multi" streams="3"/> </int-kafka:consumer-configuration> </int-kafka:consumer-configurations> </int-kafka:consumer-context> Java Advice Class public class KafkaConsumerAfterAdvice implements AfterReturningAdvice, InitializingBean { private KafkaConsumerContext consumerContext; private String consumerGroupId; public void setConsumerContext(KafkaConsumerContext consumerContext) { this.consumerContext = consumerContext; } public void setConsumerGroupId(String consumerGroupId) { this.consumerGroupId = consumerGroupId; } /** * Spring calls this after the bean has be initialized within the ApplicationContext. */ @Override public void afterPropertiesSet() throws Exception { Assert.notNull(consumerContext, "[consumerContext] cannot be null"); Assert.notNull(consumerGroupId, "[consumerGroupId] cannot be null"); } @Override public void afterReturning(Object returnValue, Method method, Object[] args, Object target) throws Throwable { // If there were messages then returnValue=true otherwise returnValue=false. // Only if true do we need to take the hit to commit the offsets. if (returnValue.equals(true)) { Iterator<ConsumerConfiguration<String, Object>> consumerConfigIterator = consumerContext.getConsumerConfigurations().iterator(); while (consumerConfigIterator.hasNext()) { ConsumerConfiguration<String, Object> consumerConfig = consumerConfigIterator.next(); if (consumerGroupId.equals(consumerConfig.getConsumerMetadata().getGroupId())) { consumerConfig.getConsumerConnector().commitOffsets(); } } } } }