[ https://issues.apache.org/jira/browse/STORM-822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15216506#comment-15216506 ]
ASF GitHub Bot commented on STORM-822: -------------------------------------- Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1131#discussion_r57770291 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import org.apache.kafka.common.TopicPartition; + +import java.io.Serializable; +import java.util.Collection; +import java.util.Set; + +public interface KafkaSpoutRetryService extends Serializable { + /** + * Schedules this {@link KafkaSpoutMessageId} if not yet scheduled, or updates retry time if it has already been scheduled. + * @param msgId message to schedule for retrial + */ + void schedule(KafkaSpoutMessageId msgId); + + /** + * Removes a message from the list of messages scheduled for retrial + * @param msgId message to remove from retrial + */ + boolean remove(KafkaSpoutMessageId msgId); + + /** + * Removes all the messages whose {@link TopicPartition} does NOT belong to the specified {@code Collection<TopicPartition>}. + * All messages that come from a {@link TopicPartition} existing in the collection will be kept. + * This method is useful to cleanup state following partition rebalance. + * @param topicPartitions Collection of {@link TopicPartition} for which to keep messages + * @return true if at least one message was removed, false otherwise + */ + boolean remove(Collection<TopicPartition> topicPartitions); + + /** + * @return set of topic partitions that have offsets that are ready to be retried, i.e., + * for which a tuple has failed and has retry time less than current time + */ + Set<TopicPartition> retriableTopicPartitions(); + + /** + * Checks if a specific failed {@link KafkaSpoutMessageId} is is ready to be retried, + * i.e is scheduled and has retry time that is less than current time. + * @return true if message is ready to be retried, false otherwise + */ + boolean retry(KafkaSpoutMessageId msgId); + + /** + * Checks if a specific failed {@link KafkaSpoutMessageId} is scheduled to be retried. + * The message may or may not be ready to be retried yet. + * @return true if the message is scheduled to be retried, regardless of being or not ready to be retried. + * Returns false is this message is not scheduled for retrial + */ + boolean scheduled(KafkaSpoutMessageId msgId); --- End diff -- can we rename this to a question to like isScheduled? > As a storm developer I’d like to use the new kafka consumer API (0.8.3) to > reduce dependencies and use long term supported kafka apis > -------------------------------------------------------------------------------------------------------------------------------------- > > Key: STORM-822 > URL: https://issues.apache.org/jira/browse/STORM-822 > Project: Apache Storm > Issue Type: Story > Components: storm-kafka > Reporter: Thomas Becker > Assignee: Hugo Louro > -- This message was sent by Atlassian JIRA (v6.3.4#6332)