[ https://issues.apache.org/jira/browse/TWILL-199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15685666#comment-15685666 ]
ASF GitHub Bot commented on TWILL-199: -------------------------------------- Github user maochf commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r89042093 --- Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaOffsetProvider.java --- @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.twill.kafka.client; + +/** + * Define interface that could provide a method to check whether a message meets a given condition. If the condition is + * not met, the method will return the next offset to continue searching for the message meeting this condition. + */ +public interface KafkaOffsetProvider { + + /** + * Check whether a message meets a given condition. If the condition is not met, return the next offset to + * continue searching for the message meeting this condition. + * @param message {@link FetchedMessage} to check. + * @return A {@code long} larger than zero as the next offset to continue searching for the message meeting the + * given condition if the current message doesn't meet the condition. Return {code 0} if the current + * message meets the given condition. Return the earliest offset {@code -2} if no message meeting the + * condition can be found. + */ + public long getCandidateOffset(FetchedMessage message); --- End diff -- I wanted to mean that the offset returned is the next message to be checked, but it is not guaranteed to satisfy the given condition. > Get next offset and handle offset error in KafkaConsumer.MessageCallback > ------------------------------------------------------------------------ > > Key: TWILL-199 > URL: https://issues.apache.org/jira/browse/TWILL-199 > Project: Apache Twill > Issue Type: Improvement > Reporter: Chengfeng Mao > > The method {{void onReceived(Iterator<FetchedMessage> messages)}} in > {{KafkaConsumer.MessageCallback}} can be more flexible with the change to > {{Long onReceived(Iterator<FetchedMessage> messages)}} so that it can provide > additional functionalities: > 1. To return the next offset to be fetched > 2. To handle offset non-existence or offset mismatch error and take action on > the error > This method will return null for backward compatibility when it doesn't need > to provide the next offset. > In concrete implementation, a class of a new interface > {{KafkaOffsetProvider}} can be added as a member in > {{KafkaConsumer.MessageCallback}} to perform the offset error handling and > provide the next offset. Besides, {{KafkaOffsetProvider}} also has methods to > provide the following functionalities: > 1. To fetch earliest/latest offset in Kafka > 2. To find the offset of a message with timestamp equal to the given > timestamp in Kafka > For backward compatibility, if {{KafkaOffsetProvider}} instance is not > provided, its default value will be null and none of its methods will be > called. -- This message was sent by Atlassian JIRA (v6.3.4#6332)