[ https://issues.apache.org/jira/browse/TWILL-199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Chengfeng Mao updated TWILL-199: -------------------------------- Summary: Return next offset and handle offset error in KafkaConsumer.MessageCallback (was: Support inconsistent Kafka offset correction in KafkaConsumer.MessageCallback) > Return next offset and handle offset error in KafkaConsumer.MessageCallback > --------------------------------------------------------------------------- > > Key: TWILL-199 > URL: https://issues.apache.org/jira/browse/TWILL-199 > Project: Apache Twill > Issue Type: Improvement > Reporter: Chengfeng Mao > > Offsets of Kafka messages may not always be consistent. They can change when > switching to a new Kafka instance while the messages' content remains the > same. Therefore, we need a way to check the inconsistent offsets and correct > them. An instance of a new class {{KafkaOffsetProvider}} can be added to the > parameter of {{KafkaConsumer.MessageCallback}} for this purpose. For backward > compatibility, if this instance is not provided, its default value will be > null and no check for inconsistent offset will be done. > The new class {{KafkaOffsetProvider}} requires the timestamp of the message > to be processed. It provides a method to verify whether the message fetched > with the given offset matches the intended one by checking whether the > fetched message's timestamp is equal to the expected timestamp. If not, this > method returns the offset of the first message found with the expected > timestamp. > Method {{void onReceived(Iterator<FetchedMessage> messages)}} in > KafkaConsumer.MessageCallback should be changed to {{Long > onReceived(Iterator<FetchedMessage> messages)}} so that it can call call the > method in {{KafkaOffsetProvider}} to get the correct offset to start fetching > messages with, when the original starting offset does not exist or the > message given doesn't match the timestamp stored in KafkaOffsetProvider . If > neither of the offset mismatches happen or the {{KafkaOffsetProvider}} > instance is null, there's no check for inconsistent offset and this method > will just return null. -- This message was sent by Atlassian JIRA (v6.3.4#6332)