[ 
https://issues.apache.org/jira/browse/TWILL-199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15687570#comment-15687570
 ] 

ASF GitHub Bot commented on TWILL-199:
--------------------------------------

Github user chtyim commented on a diff in the pull request:

    https://github.com/apache/twill/pull/16#discussion_r89185930
  
    --- Diff: 
twill-core/src/main/java/org/apache/twill/kafka/client/KafkaOffsetProvider.java 
---
    @@ -0,0 +1,36 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.twill.kafka.client;
    +
    +/**
    + * Define interface that could provide a method to check whether a message 
meets a given condition. If the condition is
    + * not met, the method will return the next offset to continue searching 
for the message meeting this condition.
    + */
    +public interface KafkaOffsetProvider {
    +
    +  /**
    +   * Check whether a message meets a given condition. If the condition is 
not met, return the next offset to
    --- End diff --
    
    It seems like this is not a clean contract. This is an interface, why we 
have to assume something passed in the constructor of the implementation class? 
Seem very confusing.


> Get next offset and handle offset error in KafkaConsumer.MessageCallback
> ------------------------------------------------------------------------
>
>                 Key: TWILL-199
>                 URL: https://issues.apache.org/jira/browse/TWILL-199
>             Project: Apache Twill
>          Issue Type: Improvement
>            Reporter: Chengfeng Mao
>
> The method {{void onReceived(Iterator<FetchedMessage> messages)}} in 
> {{KafkaConsumer.MessageCallback}} can be more flexible with the change to 
> {{Long onReceived(Iterator<FetchedMessage> messages)}} so that it can provide 
> additional functionalities:
> 1. To return the next offset to be fetched
> 2. To handle offset non-existence or offset mismatch error and take action on 
> the error
> This method will return null for backward compatibility when it doesn't need 
> to provide the next offset.
> In concrete implementation,  a class of a new interface 
> {{KafkaOffsetProvider}} can be added as a member in 
> {{KafkaConsumer.MessageCallback}} to perform the offset error handling and 
> provide the next offset. Besides, {{KafkaOffsetProvider}} also has methods to 
> provide the following functionalities:
> 1. To fetch earliest/latest offset in Kafka
> 2. To find the offset of a message with timestamp equal to the given 
> timestamp in Kafka
> For backward compatibility, if {{KafkaOffsetProvider}} instance is not 
> provided, its default value will be null and none of its methods will be 
> called.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to