[
https://issues.apache.org/jira/browse/TWILL-199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Chengfeng Mao updated TWILL-199:
--------------------------------
Summary: Return next offset and handle offset error in
KafkaConsumer.MessageCallback (was: Support inconsistent Kafka offset
correction in KafkaConsumer.MessageCallback)
> Return next offset and handle offset error in KafkaConsumer.MessageCallback
> ---------------------------------------------------------------------------
>
> Key: TWILL-199
> URL: https://issues.apache.org/jira/browse/TWILL-199
> Project: Apache Twill
> Issue Type: Improvement
> Reporter: Chengfeng Mao
>
> Offsets of Kafka messages may not always be consistent. They can change when
> switching to a new Kafka instance while the messages' content remains the
> same. Therefore, we need a way to check the inconsistent offsets and correct
> them. An instance of a new class {{KafkaOffsetProvider}} can be added to the
> parameter of {{KafkaConsumer.MessageCallback}} for this purpose. For backward
> compatibility, if this instance is not provided, its default value will be
> null and no check for inconsistent offset will be done.
> The new class {{KafkaOffsetProvider}} requires the timestamp of the message
> to be processed. It provides a method to verify whether the message fetched
> with the given offset matches the intended one by checking whether the
> fetched message's timestamp is equal to the expected timestamp. If not, this
> method returns the offset of the first message found with the expected
> timestamp.
> Method {{void onReceived(Iterator<FetchedMessage> messages)}} in
> KafkaConsumer.MessageCallback should be changed to {{Long
> onReceived(Iterator<FetchedMessage> messages)}} so that it can call call the
> method in {{KafkaOffsetProvider}} to get the correct offset to start fetching
> messages with, when the original starting offset does not exist or the
> message given doesn't match the timestamp stored in KafkaOffsetProvider . If
> neither of the offset mismatches happen or the {{KafkaOffsetProvider}}
> instance is null, there's no check for inconsistent offset and this method
> will just return null.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)