I didn't realize there was a commitOffset() method on the high level consumer
(the code is abstracted by the Spring Integration classes).
Yes, this actually suits my needs and I was able to get it to work for my use
case.
Thank you very much - that was extremely helpful.
In case it's of any use to someone else, here's the solution I came up with.
Spring Configuration file
<int-kafka:inbound-channel-adapter id="kafkaInboundChannelAdapter"
kafka-consumer-context-ref="kafkaConsumerContext"
auto-startup="true"
channel="exchangeKafkaFusionInboundSpringExecutorChannel" >
<int:poller fixed-delay="10" time-unit="MILLISECONDS" max-messages-per-poll="5"
>
<!-- Kafka 0.8.1.1 does not have the concept of transactions. Therefore
we must handle this on our own. -->
<!-- To do so on the consumer we've set the "autocommit.enable=false" so
commits won't automatically be performed as we read msgs. -->
<!-- The advice-chain below will call
consumerConfig.getConsumerConnector().commitOffsets() only if no exceptions
occurred during the processing of this chunk of msgs. -->
<int:advice-chain>
<bean id="kafkaConsumerAfterAdvice"
class="com.citigroup.tmg.exchgateway.common.util.springintegration.KafkaConsumerAfterAdvice">
<property name="consumerContext"
ref="kafkaConsumerContext"/>
<property name="consumerGroupId" value="consumerGroupG"/>
</bean>
</int:advice-chain>
</int:poller>
<int-kafka:producer-context id="kafkaProducerContext">
<int-kafka:producer-configurations>
<int-kafka:producer-configuration broker-list="175.65.76.12:9092"
key-class-type="java.lang.String"
value-class-type="java.lang.String"
topic="test"
compression-codec="default"/>
</int-kafka:producer-configurations>
</int-kafka:producer-context>
<int-kafka:zookeeper-connect id="zookeeperConnect"
zk-connect="175.65.76.12:2181"
zk-connection-timeout="6000"
zk-session-timeout="6000"
zk-sync-time="2000" />
<!-- See http://kafka.apache.org/documentation.html#consumerconfigs
-->
<!-- or "high-level consumer" on http://kafka.apache.org/07/configuration.html
or https://kafka.apache.org/08/configuration.html -->
<!--
http://grokbase.com/t/kafka/users/12b9bmsy7k/question-about-resetting-offsets-and-the-high-level-consumer
-->
<bean id="kafkaConsumerProperties"
class="org.springframework.beans.factory.config.PropertiesFactoryBean">
<property name="properties">
<props>
<prop key="autocommit.enable">false</prop>
<prop key="auto.offset.reset">largest</prop>
</props>
</property>
</bean>
<int-kafka:consumer-context id="kafkaConsumerContext" consumer-timeout="4000"
zookeeper-connect="zookeeperConnect"
consumer-properties="kafkaConsumerProperties">
<int-kafka:consumer-configurations>
<int-kafka:consumer-configuration group-id="consumerGroupG"
max-messages="20000">
<int-kafka:topic id="test-multi" streams="3"/>
</int-kafka:consumer-configuration>
</int-kafka:consumer-configurations>
</int-kafka:consumer-context>
Java Advice Class
public class KafkaConsumerAfterAdvice implements AfterReturningAdvice,
InitializingBean {
private KafkaConsumerContext consumerContext;
private String consumerGroupId;
public void setConsumerContext(KafkaConsumerContext consumerContext) {
this.consumerContext = consumerContext;
}
public void setConsumerGroupId(String consumerGroupId) {
this.consumerGroupId = consumerGroupId;
}
/**
* Spring calls this after the bean has be initialized within the
ApplicationContext.
*/
@Override
public void afterPropertiesSet() throws Exception {
Assert.notNull(consumerContext, "[consumerContext] cannot be
null");
Assert.notNull(consumerGroupId, "[consumerGroupId] cannot be
null");
}
@Override
public void afterReturning(Object returnValue, Method method, Object[]
args, Object target) throws Throwable {
// If there were messages then returnValue=true otherwise
returnValue=false.
// Only if true do we need to take the hit to commit the offsets.
if (returnValue.equals(true)) {
Iterator<ConsumerConfiguration<String, Object>>
consumerConfigIterator = consumerContext.getConsumerConfigurations().iterator();
while (consumerConfigIterator.hasNext()) {
ConsumerConfiguration<String, Object> consumerConfig
= consumerConfigIterator.next();
if
(consumerGroupId.equals(consumerConfig.getConsumerMetadata().getGroupId())) {
consumerConfig.getConsumerConnector().commitOffsets();
}
}
}
}
}