[ 
https://issues.apache.org/jira/browse/TWILL-199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chengfeng Mao updated TWILL-199:
--------------------------------
    Summary: Return next offset and handle offset error in 
KafkaConsumer.MessageCallback  (was: Support inconsistent Kafka offset 
correction in KafkaConsumer.MessageCallback)

> Return next offset and handle offset error in KafkaConsumer.MessageCallback
> ---------------------------------------------------------------------------
>
>                 Key: TWILL-199
>                 URL: https://issues.apache.org/jira/browse/TWILL-199
>             Project: Apache Twill
>          Issue Type: Improvement
>            Reporter: Chengfeng Mao
>
> Offsets of Kafka messages may not always be consistent. They can change when 
> switching to a new Kafka instance while the messages' content remains the 
> same. Therefore, we need a way to check the inconsistent offsets and correct 
> them. An instance of a new class {{KafkaOffsetProvider}} can be added to the 
> parameter of {{KafkaConsumer.MessageCallback}} for this purpose. For backward 
> compatibility, if this instance is not provided, its default value will be 
> null and no check for inconsistent offset will be done.
> The new class {{KafkaOffsetProvider}} requires the timestamp of the message 
> to be processed. It provides a method to verify whether the message fetched 
> with the given offset matches the intended one by checking whether the 
> fetched message's timestamp is equal to the expected timestamp. If not, this 
> method returns the offset of the first message found with the expected 
> timestamp.
> Method {{void onReceived(Iterator<FetchedMessage> messages)}} in 
> KafkaConsumer.MessageCallback should be changed to {{Long 
> onReceived(Iterator<FetchedMessage> messages)}} so that it can call call the 
> method in {{KafkaOffsetProvider}} to get the correct offset to start fetching 
> messages with, when the original starting offset does not exist or the 
> message given doesn't match the timestamp stored in KafkaOffsetProvider . If 
> neither of the offset mismatches happen or the {{KafkaOffsetProvider}} 
> instance is null, there's no check for inconsistent offset and this method 
> will just return null.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to