[ https://issues.apache.org/jira/browse/TWILL-199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15797861#comment-15797861 ]
ASF GitHub Bot commented on TWILL-199: -------------------------------------- Github user maochf commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94558457 --- Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java --- @@ -273,17 +274,19 @@ private MessageCallback wrapCallback(final MessageCallback callback, final AtomicBoolean stopped = new AtomicBoolean(); return new MessageCallback() { @Override - public void onReceived(final Iterator<FetchedMessage> messages) { + public long onReceived(final Iterator<FetchedMessage> messages) { if (stopped.get()) { - return; + return Long.MIN_VALUE; --- End diff -- Maybe to pass an `AtomicLong` to this method, and it contains the initial offset and can be set to the next offset? Just realized this is probably an implementation detail in `SimpleKafkaConsumer` and is not general enough to be done in the interface. > Get next offset and handle offset error in KafkaConsumer.MessageCallback > ------------------------------------------------------------------------ > > Key: TWILL-199 > URL: https://issues.apache.org/jira/browse/TWILL-199 > Project: Apache Twill > Issue Type: Improvement > Reporter: Chengfeng Mao > > The method {{void onReceived(Iterator<FetchedMessage> messages)}} in > {{KafkaConsumer.MessageCallback}} can be more flexible with the change to > {{Long onReceived(Iterator<FetchedMessage> messages)}} so that it can provide > additional functionalities: > 1. To return the next offset to be fetched > 2. To handle offset non-existence or offset mismatch error and take action on > the error > This method will return null for backward compatibility when it doesn't need > to provide the next offset. > In concrete implementation, a class of a new interface > {{KafkaOffsetProvider}} can be added as a member in > {{KafkaConsumer.MessageCallback}} to perform the offset error handling and > provide the next offset. Besides, {{KafkaOffsetProvider}} also has methods to > provide the following functionalities: > 1. To fetch earliest/latest offset in Kafka > 2. To find the offset of a message with timestamp equal to the given > timestamp in Kafka > For backward compatibility, if {{KafkaOffsetProvider}} instance is not > provided, its default value will be null and none of its methods will be > called. -- This message was sent by Atlassian JIRA (v6.3.4#6332)