Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1131#discussion_r57770218 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import org.apache.kafka.common.TopicPartition; + +import java.io.Serializable; +import java.util.Collection; +import java.util.Set; + +public interface KafkaSpoutRetryService extends Serializable { + /** + * Schedules this {@link KafkaSpoutMessageId} if not yet scheduled, or updates retry time if it has already been scheduled. + * @param msgId message to schedule for retrial + */ + void schedule(KafkaSpoutMessageId msgId); + + /** + * Removes a message from the list of messages scheduled for retrial + * @param msgId message to remove from retrial + */ + boolean remove(KafkaSpoutMessageId msgId); + + /** + * Removes all the messages whose {@link TopicPartition} does NOT belong to the specified {@code Collection<TopicPartition>}. + * All messages that come from a {@link TopicPartition} existing in the collection will be kept. + * This method is useful to cleanup state following partition rebalance. + * @param topicPartitions Collection of {@link TopicPartition} for which to keep messages + * @return true if at least one message was removed, false otherwise + */ + boolean remove(Collection<TopicPartition> topicPartitions); + + /** + * @return set of topic partitions that have offsets that are ready to be retried, i.e., + * for which a tuple has failed and has retry time less than current time + */ + Set<TopicPartition> retriableTopicPartitions(); + + /** + * Checks if a specific failed {@link KafkaSpoutMessageId} is is ready to be retried, + * i.e is scheduled and has retry time that is less than current time. + * @return true if message is ready to be retried, false otherwise + */ + boolean retry(KafkaSpoutMessageId msgId); --- End diff -- Can we rename this to a question like canBeRetried, or isReadyForRetry?
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---