viktorsomogyi commented on code in PR #11565: URL: https://github.com/apache/kafka/pull/11565#discussion_r1135376859
########## connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java: ########## @@ -328,6 +335,48 @@ public Set<String> createTopics(NewTopic... topics) { return createOrFindTopics(topics).createdTopics(); } + /** + * Implements a retry logic around creating topic(s) in case it'd fail due to InvalidReplicationFactorException Review Comment: nit: add "or TimeoutException" ########## connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java: ########## @@ -70,6 +74,9 @@ public class TopicAdmin implements AutoCloseable { public static final TopicCreationResponse EMPTY_CREATION = new TopicCreationResponse(Collections.emptySet(), Collections.emptySet()); + private static final List<Class<? extends Exception>> CAUSES_TO_RETRY_TOPIC_CREATION = Arrays.asList( + InvalidReplicationFactorException.class, + TimeoutException.class); Review Comment: Since TimeoutException is a RetriableException I was wondering whether we could refactor InvalidReplicationFactorException because in some circumstances it can be retriable and I think even if there won't be more brokers, it doesn't hurt too much to retry a few times with backoff. What do you think @mimaison, would this be considered an API change? ########## connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java: ########## @@ -328,6 +335,48 @@ public Set<String> createTopics(NewTopic... topics) { return createOrFindTopics(topics).createdTopics(); } + /** + * Implements a retry logic around creating topic(s) in case it'd fail due to InvalidReplicationFactorException + * + * @param topicDescription + * @param timeoutMs + * @param backOffMs + * @param time + * @return the same as {@link TopicAdmin#createTopics(NewTopic...)} Review Comment: nit: please fill out parameters and return value if you add javadocs. ########## connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java: ########## @@ -328,6 +335,48 @@ public Set<String> createTopics(NewTopic... topics) { return createOrFindTopics(topics).createdTopics(); } + /** + * Implements a retry logic around creating topic(s) in case it'd fail due to InvalidReplicationFactorException + * + * @param topicDescription + * @param timeoutMs + * @param backOffMs + * @param time + * @return the same as {@link TopicAdmin#createTopics(NewTopic...)} + */ + public Set<String> createTopicsWithRetry(NewTopic topicDescription, long timeoutMs, long backOffMs, Time time) { Review Comment: I think it would be a bit more robust to add an overload to {{org.apache.kafka.connect.util.RetryUtil#retryUntilTimeout}} that specifies a set of exceptions or a condition to retry on and use that here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org