[ 
https://issues.apache.org/jira/browse/TWILL-199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15707150#comment-15707150
 ] 

ASF GitHub Bot commented on TWILL-199:
--------------------------------------

Github user albertshau commented on a diff in the pull request:

    https://github.com/apache/twill/pull/16#discussion_r90152039
  
    --- Diff: 
twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java ---
    @@ -35,8 +35,10 @@
          * Invoked when new messages is available.
          * @param messages Iterator of new messages. The {@link 
FetchedMessage} instance maybe reused in the Iterator
          *                 and across different invocation.
    +     * @return A long larger than zero as the offset to restart fetching 
messages when offset error is caught,
    --- End diff --
    
    what happens if the offset returned is out of bounds? Should document the 
expected behavior in such circumstances. Looking at the implementation, it 
seems like we read from the earliest offset. 


> Get next offset and handle offset error in KafkaConsumer.MessageCallback
> ------------------------------------------------------------------------
>
>                 Key: TWILL-199
>                 URL: https://issues.apache.org/jira/browse/TWILL-199
>             Project: Apache Twill
>          Issue Type: Improvement
>            Reporter: Chengfeng Mao
>
> The method {{void onReceived(Iterator<FetchedMessage> messages)}} in 
> {{KafkaConsumer.MessageCallback}} can be more flexible with the change to 
> {{Long onReceived(Iterator<FetchedMessage> messages)}} so that it can provide 
> additional functionalities:
> 1. To return the next offset to be fetched
> 2. To handle offset non-existence or offset mismatch error and take action on 
> the error
> This method will return null for backward compatibility when it doesn't need 
> to provide the next offset.
> In concrete implementation,  a class of a new interface 
> {{KafkaOffsetProvider}} can be added as a member in 
> {{KafkaConsumer.MessageCallback}} to perform the offset error handling and 
> provide the next offset. Besides, {{KafkaOffsetProvider}} also has methods to 
> provide the following functionalities:
> 1. To fetch earliest/latest offset in Kafka
> 2. To find the offset of a message with timestamp equal to the given 
> timestamp in Kafka
> For backward compatibility, if {{KafkaOffsetProvider}} instance is not 
> provided, its default value will be null and none of its methods will be 
> called.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to