I didn't realize there was a commitOffset() method on the high level consumer 
(the code is abstracted by the Spring Integration classes).
Yes, this actually suits my needs and I was able to get it to work for my use 
case.
Thank you very much - that was extremely helpful.

In case it's of any use to someone else, here's the solution I came up with.

Spring Configuration file
<int-kafka:inbound-channel-adapter id="kafkaInboundChannelAdapter"
kafka-consumer-context-ref="kafkaConsumerContext"
auto-startup="true"
channel="exchangeKafkaFusionInboundSpringExecutorChannel" >
<int:poller fixed-delay="10" time-unit="MILLISECONDS" max-messages-per-poll="5" 
>
       <!-- Kafka 0.8.1.1 does not have the concept of transactions.  Therefore 
we must handle this on our own. -->
       <!-- To do so on the consumer we've set the "autocommit.enable=false" so 
commits won't automatically be performed as we read msgs. -->
       <!-- The advice-chain below will call 
consumerConfig.getConsumerConnector().commitOffsets() only if no exceptions 
occurred during the processing of this chunk of msgs.  -->
       <int:advice-chain>
              <bean id="kafkaConsumerAfterAdvice" 
class="com.citigroup.tmg.exchgateway.common.util.springintegration.KafkaConsumerAfterAdvice">
                     <property name="consumerContext" 
ref="kafkaConsumerContext"/>
                     <property name="consumerGroupId" value="consumerGroupG"/>
              </bean>
       </int:advice-chain>
</int:poller>


<int-kafka:producer-context id="kafkaProducerContext">
<int-kafka:producer-configurations>
       <int-kafka:producer-configuration broker-list="175.65.76.12:9092"
              key-class-type="java.lang.String"
              value-class-type="java.lang.String"
              topic="test"
              compression-codec="default"/>
</int-kafka:producer-configurations>
</int-kafka:producer-context>


<int-kafka:zookeeper-connect id="zookeeperConnect"
       zk-connect="175.65.76.12:2181"
       zk-connection-timeout="6000"
       zk-session-timeout="6000"
       zk-sync-time="2000" />

<!-- See http://kafka.apache.org/documentation.html#consumerconfigs             
                                                 -->
<!-- or "high-level consumer" on http://kafka.apache.org/07/configuration.html 
or https://kafka.apache.org/08/configuration.html -->
<!-- 
http://grokbase.com/t/kafka/users/12b9bmsy7k/question-about-resetting-offsets-and-the-high-level-consumer
 -->
  <bean id="kafkaConsumerProperties" 
class="org.springframework.beans.factory.config.PropertiesFactoryBean">
       <property name="properties">
              <props>
                     <prop key="autocommit.enable">false</prop>
                     <prop key="auto.offset.reset">largest</prop>
              </props>
       </property>
</bean>

<int-kafka:consumer-context id="kafkaConsumerContext" consumer-timeout="4000" 
zookeeper-connect="zookeeperConnect" 
consumer-properties="kafkaConsumerProperties">
       <int-kafka:consumer-configurations>
              <int-kafka:consumer-configuration group-id="consumerGroupG" 
max-messages="20000">
                     <int-kafka:topic id="test-multi" streams="3"/>
              </int-kafka:consumer-configuration>
       </int-kafka:consumer-configurations>
</int-kafka:consumer-context>




Java Advice Class

public class KafkaConsumerAfterAdvice implements AfterReturningAdvice, 
InitializingBean {
       private KafkaConsumerContext consumerContext;
       private String consumerGroupId;

       public void setConsumerContext(KafkaConsumerContext consumerContext) {
              this.consumerContext = consumerContext;
       }

       public void setConsumerGroupId(String consumerGroupId) {
              this.consumerGroupId = consumerGroupId;
       }

       /**
       * Spring calls this after the bean has be initialized within the 
ApplicationContext.
       */
       @Override
       public void afterPropertiesSet() throws Exception {
              Assert.notNull(consumerContext, "[consumerContext] cannot be 
null");
              Assert.notNull(consumerGroupId, "[consumerGroupId] cannot be 
null");
       }

       @Override
       public void afterReturning(Object returnValue, Method method, Object[] 
args, Object target) throws Throwable {
              // If there were messages then returnValue=true otherwise 
returnValue=false.
              // Only if true do we need to take the hit to commit the offsets.
              if (returnValue.equals(true)) {
                     Iterator<ConsumerConfiguration<String, Object>> 
consumerConfigIterator = consumerContext.getConsumerConfigurations().iterator();
                     while (consumerConfigIterator.hasNext()) {
                           ConsumerConfiguration<String, Object> consumerConfig 
= consumerConfigIterator.next();
                           if 
(consumerGroupId.equals(consumerConfig.getConsumerMetadata().getGroupId())) {
                                  
consumerConfig.getConsumerConnector().commitOffsets();
                           }
                     }
              }
       }
}

Reply via email to