[
https://issues.apache.org/jira/browse/TWILL-199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15796931#comment-15796931
]
ASF GitHub Bot commented on TWILL-199:
--------------------------------------
Github user maochf commented on a diff in the pull request:
https://github.com/apache/twill/pull/16#discussion_r94521832
--- Diff:
twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
---
@@ -273,17 +274,19 @@ private MessageCallback wrapCallback(final
MessageCallback callback,
final AtomicBoolean stopped = new AtomicBoolean();
return new MessageCallback() {
@Override
- public void onReceived(final Iterator<FetchedMessage> messages) {
+ public long onReceived(final Iterator<FetchedMessage> messages) {
if (stopped.get()) {
- return;
+ return Long.MIN_VALUE;
}
- Futures.getUnchecked(executor.submit(new Runnable() {
+ return Futures.getUnchecked(executor.submit(new Callable<Long>()
{
+ long nextOffset = Long.MIN_VALUE;
@Override
- public void run() {
+ public Long call() {
if (stopped.get()) {
- return;
+ return nextOffset;
}
- callback.onReceived(messages);
+ nextOffset = callback.onReceived(messages);
--- End diff --
Because in line 286, the stored `nextOffset` should be returned
> Get next offset and handle offset error in KafkaConsumer.MessageCallback
> ------------------------------------------------------------------------
>
> Key: TWILL-199
> URL: https://issues.apache.org/jira/browse/TWILL-199
> Project: Apache Twill
> Issue Type: Improvement
> Reporter: Chengfeng Mao
>
> The method {{void onReceived(Iterator<FetchedMessage> messages)}} in
> {{KafkaConsumer.MessageCallback}} can be more flexible with the change to
> {{Long onReceived(Iterator<FetchedMessage> messages)}} so that it can provide
> additional functionalities:
> 1. To return the next offset to be fetched
> 2. To handle offset non-existence or offset mismatch error and take action on
> the error
> This method will return null for backward compatibility when it doesn't need
> to provide the next offset.
> In concrete implementation, a class of a new interface
> {{KafkaOffsetProvider}} can be added as a member in
> {{KafkaConsumer.MessageCallback}} to perform the offset error handling and
> provide the next offset. Besides, {{KafkaOffsetProvider}} also has methods to
> provide the following functionalities:
> 1. To fetch earliest/latest offset in Kafka
> 2. To find the offset of a message with timestamp equal to the given
> timestamp in Kafka
> For backward compatibility, if {{KafkaOffsetProvider}} instance is not
> provided, its default value will be null and none of its methods will be
> called.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)