This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 822abe4 MINOR: WorkerUtils#topicDescriptions must unwrap exceptions properly (#6937) 822abe4 is described below commit 822abe47db07fcf92e8aa24e920a54239a90348d Author: Colin Patrick McCabe <cmcc...@apache.org> AuthorDate: Wed Jul 3 16:08:39 2019 -0700 MINOR: WorkerUtils#topicDescriptions must unwrap exceptions properly (#6937) Reviewers: Ismael Juma <ism...@juma.me.uk>, Stanislav Kozlovski <stanislav_kozlov...@outlook.com> --- .../kafka/clients/admin/MockAdminClient.java | 32 +++++++++++++++----- .../apache/kafka/trogdor/common/WorkerUtils.java | 12 +++++--- .../kafka/trogdor/common/WorkerUtilsTest.java | 35 +++++++++++++++------- 3 files changed, 58 insertions(+), 21 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java index 19f9eae..7ca9ce4 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java @@ -205,7 +205,11 @@ public class MockAdminClient extends AdminClient { for (Map.Entry<String, TopicMetadata> topicDescription : allTopics.entrySet()) { String topicName = topicDescription.getKey(); - topicListings.put(topicName, new TopicListing(topicName, topicDescription.getValue().isInternalTopic)); + if (topicDescription.getValue().fetchesRemainingUntilVisible > 0) { + topicDescription.getValue().fetchesRemainingUntilVisible--; + } else { + topicListings.put(topicName, new TopicListing(topicName, topicDescription.getValue().isInternalTopic)); + } } KafkaFutureImpl<Map<String, TopicListing>> future = new KafkaFutureImpl<>(); @@ -232,12 +236,16 @@ public class MockAdminClient extends AdminClient { for (Map.Entry<String, TopicMetadata> topicDescription : allTopics.entrySet()) { String topicName = topicDescription.getKey(); if (topicName.equals(requestedTopic) && !topicDescription.getValue().markedForDeletion) { - TopicMetadata topicMetadata = topicDescription.getValue(); - KafkaFutureImpl<TopicDescription> future = new KafkaFutureImpl<>(); - future.complete(new TopicDescription(topicName, topicMetadata.isInternalTopic, topicMetadata.partitions, - Collections.emptySet())); - topicDescriptions.put(topicName, future); - break; + if (topicDescription.getValue().fetchesRemainingUntilVisible > 0) { + topicDescription.getValue().fetchesRemainingUntilVisible--; + } else { + TopicMetadata topicMetadata = topicDescription.getValue(); + KafkaFutureImpl<TopicDescription> future = new KafkaFutureImpl<>(); + future.complete(new TopicDescription(topicName, topicMetadata.isInternalTopic, topicMetadata.partitions, + Collections.emptySet())); + topicDescriptions.put(topicName, future); + break; + } } } if (!topicDescriptions.containsKey(requestedTopic)) { @@ -420,6 +428,7 @@ public class MockAdminClient extends AdminClient { final boolean isInternalTopic; final List<TopicPartitionInfo> partitions; final Map<String, String> configs; + int fetchesRemainingUntilVisible; public boolean markedForDeletion; @@ -430,6 +439,7 @@ public class MockAdminClient extends AdminClient { this.partitions = partitions; this.configs = configs != null ? configs : Collections.emptyMap(); this.markedForDeletion = false; + this.fetchesRemainingUntilVisible = 0; } } @@ -441,4 +451,12 @@ public class MockAdminClient extends AdminClient { public Map<MetricName, ? extends Metric> metrics() { return mockMetrics; } + + public void setFetchesRemainingUntilVisible(String topicName, int fetchesRemainingUntilVisible) { + TopicMetadata metadata = allTopics.get(topicName); + if (metadata == null) { + throw new RuntimeException("No such topic as " + topicName); + } + metadata.fetchesRemainingUntilVisible = fetchesRemainingUntilVisible; + } } diff --git a/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java b/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java index adce304..cb765cc 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java @@ -248,7 +248,7 @@ public final class WorkerUtils { * @throws RuntimeException If one or more topics have different number of partitions than * described in 'topicsInfo' */ - private static void verifyTopics( + static void verifyTopics( Logger log, AdminClient adminClient, Collection<String> topicsToVerify, Map<String, NewTopic> topicsInfo, int retryCount, long retryBackoffMs) throws Throwable { @@ -279,9 +279,13 @@ public final class WorkerUtils { DescribeTopicsResult topicsResult = adminClient.describeTopics( topicsToVerify, new DescribeTopicsOptions().timeoutMs(ADMIN_REQUEST_TIMEOUT)); return topicsResult.all().get(); - } catch (UnknownTopicOrPartitionException exception) { - lastException = exception; - Thread.sleep(retryBackoffMs); + } catch (ExecutionException exception) { + if (exception.getCause() instanceof UnknownTopicOrPartitionException) { + lastException = (UnknownTopicOrPartitionException) exception.getCause(); + Thread.sleep(retryBackoffMs); + } else { + throw exception; + } } } throw lastException; diff --git a/tools/src/test/java/org/apache/kafka/trogdor/common/WorkerUtilsTest.java b/tools/src/test/java/org/apache/kafka/trogdor/common/WorkerUtilsTest.java index a35efe1..275589b 100644 --- a/tools/src/test/java/org/apache/kafka/trogdor/common/WorkerUtilsTest.java +++ b/tools/src/test/java/org/apache/kafka/trogdor/common/WorkerUtilsTest.java @@ -17,24 +17,23 @@ package org.apache.kafka.trogdor.common; +import static org.junit.Assert.assertEquals; +import org.apache.kafka.clients.admin.MockAdminClient; +import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionInfo; - -import org.apache.kafka.common.Node; -import org.apache.kafka.clients.admin.MockAdminClient; - import org.apache.kafka.common.errors.TopicExistsException; +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.utils.Utils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.kafka.clients.admin.NewTopic; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import static org.junit.Assert.assertEquals; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Arrays; @@ -46,7 +45,6 @@ import java.util.List; import java.util.Map; import java.util.Properties; - public class WorkerUtilsTest { private static final Logger log = LoggerFactory.getLogger(WorkerUtilsTest.class); @@ -318,4 +316,21 @@ public class WorkerUtilsTest { tpInfo, null); } + + @Test + public void testVerifyTopics() throws Throwable { + Map<String, NewTopic> newTopics = Collections.singletonMap(TEST_TOPIC, NEW_TEST_TOPIC); + WorkerUtils.createTopics(log, adminClient, newTopics, true); + adminClient.setFetchesRemainingUntilVisible(TEST_TOPIC, 2); + WorkerUtils.verifyTopics(log, adminClient, Collections.singleton(TEST_TOPIC), + Collections.singletonMap(TEST_TOPIC, NEW_TEST_TOPIC), 3, 1); + adminClient.setFetchesRemainingUntilVisible(TEST_TOPIC, 100); + try { + WorkerUtils.verifyTopics(log, adminClient, Collections.singleton(TEST_TOPIC), + Collections.singletonMap(TEST_TOPIC, NEW_TEST_TOPIC), 2, 1); + Assert.fail("expected to get UnknownTopicOrPartitionException"); + } catch (UnknownTopicOrPartitionException e) { + // expected + } + } }