[ 
https://issues.apache.org/jira/browse/TWILL-199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chengfeng Mao updated TWILL-199:
--------------------------------
    Description: 
The method {{void onReceived(Iterator<FetchedMessage> messages)}} in 
{{KafkaConsumer.MessageCallback}} can be more flexible with the change to 
{{Long onReceived(Iterator<FetchedMessage> messages)}} so that it can provide 
additional functionalities:
1. To return the next offset to be fetched
2. To handle offset non-existence or offset mismatch error and take action on 
the error

This method will return null for backward compatibility when it doesn't need to 
provide the next offset.

In concrete implementation,  a class of a new interface {{KafkaOffsetProvider}} 
can be added as a member in {{KafkaConsumer.MessageCallback}} to perform the 
offset error handling and provide the next offset. Besides, 
{{KafkaOffsetProvider}} also has methods to provide the following 
functionalities:
1. To fetch earliest/latest offset in Kafka
2. To find the offset of a message with timestamp equal to the given timestamp 
in Kafka
For backward compatibility, if {{KafkaOffsetProvider}} instance is not 
provided, its default value will be null and none of its methods will be called.

  was:
Offsets of Kafka messages may not always be consistent. They can change when 
switching to a new Kafka instance while the messages' content remains the same. 
Therefore, we need a way to check the inconsistent offsets and correct them. An 
instance of a new class {{KafkaOffsetProvider}} can be added to the parameter 
of {{KafkaConsumer.MessageCallback}} for this purpose. For backward 
compatibility, if this instance is not provided, its default value will be null 
and no check for inconsistent offset will be done.

The new class {{KafkaOffsetProvider}} requires the timestamp of the message to 
be processed. It provides a method to verify whether the message fetched with 
the given offset matches the intended one by checking whether the fetched 
message's timestamp is equal to the expected timestamp. If not, this method 
returns the offset of the first message found with the expected timestamp.

Method {{void onReceived(Iterator<FetchedMessage> messages)}} in 
KafkaConsumer.MessageCallback can be more flexible with the change to {{Long 
onReceived(Iterator<FetchedMessage> messages)}} so that it can call call the 
method in {{KafkaOffsetProvider}} to get the correct offset to start fetching 
messages with, when the original starting offset does not exist or the message 
given doesn't match the timestamp stored in KafkaOffsetProvider . If neither of 
the offset mismatches happen or the {{KafkaOffsetProvider}} instance is null, 
there's no check for inconsistent offset and this method will just return null.


> Get next offset and handle offset error in KafkaConsumer.MessageCallback
> ------------------------------------------------------------------------
>
>                 Key: TWILL-199
>                 URL: https://issues.apache.org/jira/browse/TWILL-199
>             Project: Apache Twill
>          Issue Type: Improvement
>            Reporter: Chengfeng Mao
>
> The method {{void onReceived(Iterator<FetchedMessage> messages)}} in 
> {{KafkaConsumer.MessageCallback}} can be more flexible with the change to 
> {{Long onReceived(Iterator<FetchedMessage> messages)}} so that it can provide 
> additional functionalities:
> 1. To return the next offset to be fetched
> 2. To handle offset non-existence or offset mismatch error and take action on 
> the error
> This method will return null for backward compatibility when it doesn't need 
> to provide the next offset.
> In concrete implementation,  a class of a new interface 
> {{KafkaOffsetProvider}} can be added as a member in 
> {{KafkaConsumer.MessageCallback}} to perform the offset error handling and 
> provide the next offset. Besides, {{KafkaOffsetProvider}} also has methods to 
> provide the following functionalities:
> 1. To fetch earliest/latest offset in Kafka
> 2. To find the offset of a message with timestamp equal to the given 
> timestamp in Kafka
> For backward compatibility, if {{KafkaOffsetProvider}} instance is not 
> provided, its default value will be null and none of its methods will be 
> called.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to