This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 822abe4  MINOR: WorkerUtils#topicDescriptions must unwrap exceptions 
properly (#6937)
822abe4 is described below

commit 822abe47db07fcf92e8aa24e920a54239a90348d
Author: Colin Patrick McCabe <cmcc...@apache.org>
AuthorDate: Wed Jul 3 16:08:39 2019 -0700

    MINOR: WorkerUtils#topicDescriptions must unwrap exceptions properly (#6937)
    
    Reviewers: Ismael Juma <ism...@juma.me.uk>, Stanislav Kozlovski 
<stanislav_kozlov...@outlook.com>
---
 .../kafka/clients/admin/MockAdminClient.java       | 32 +++++++++++++++-----
 .../apache/kafka/trogdor/common/WorkerUtils.java   | 12 +++++---
 .../kafka/trogdor/common/WorkerUtilsTest.java      | 35 +++++++++++++++-------
 3 files changed, 58 insertions(+), 21 deletions(-)

diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java 
b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index 19f9eae..7ca9ce4 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -205,7 +205,11 @@ public class MockAdminClient extends AdminClient {
 
         for (Map.Entry<String, TopicMetadata> topicDescription : 
allTopics.entrySet()) {
             String topicName = topicDescription.getKey();
-            topicListings.put(topicName, new TopicListing(topicName, 
topicDescription.getValue().isInternalTopic));
+            if (topicDescription.getValue().fetchesRemainingUntilVisible > 0) {
+                topicDescription.getValue().fetchesRemainingUntilVisible--;
+            } else {
+                topicListings.put(topicName, new TopicListing(topicName, 
topicDescription.getValue().isInternalTopic));
+            }
         }
 
         KafkaFutureImpl<Map<String, TopicListing>> future = new 
KafkaFutureImpl<>();
@@ -232,12 +236,16 @@ public class MockAdminClient extends AdminClient {
             for (Map.Entry<String, TopicMetadata> topicDescription : 
allTopics.entrySet()) {
                 String topicName = topicDescription.getKey();
                 if (topicName.equals(requestedTopic) && 
!topicDescription.getValue().markedForDeletion) {
-                    TopicMetadata topicMetadata = topicDescription.getValue();
-                    KafkaFutureImpl<TopicDescription> future = new 
KafkaFutureImpl<>();
-                    future.complete(new TopicDescription(topicName, 
topicMetadata.isInternalTopic, topicMetadata.partitions,
-                            Collections.emptySet()));
-                    topicDescriptions.put(topicName, future);
-                    break;
+                    if 
(topicDescription.getValue().fetchesRemainingUntilVisible > 0) {
+                        
topicDescription.getValue().fetchesRemainingUntilVisible--;
+                    } else {
+                        TopicMetadata topicMetadata = 
topicDescription.getValue();
+                        KafkaFutureImpl<TopicDescription> future = new 
KafkaFutureImpl<>();
+                        future.complete(new TopicDescription(topicName, 
topicMetadata.isInternalTopic, topicMetadata.partitions,
+                                Collections.emptySet()));
+                        topicDescriptions.put(topicName, future);
+                        break;
+                    }
                 }
             }
             if (!topicDescriptions.containsKey(requestedTopic)) {
@@ -420,6 +428,7 @@ public class MockAdminClient extends AdminClient {
         final boolean isInternalTopic;
         final List<TopicPartitionInfo> partitions;
         final Map<String, String> configs;
+        int fetchesRemainingUntilVisible;
 
         public boolean markedForDeletion;
 
@@ -430,6 +439,7 @@ public class MockAdminClient extends AdminClient {
             this.partitions = partitions;
             this.configs = configs != null ? configs : Collections.emptyMap();
             this.markedForDeletion = false;
+            this.fetchesRemainingUntilVisible = 0;
         }
     }
 
@@ -441,4 +451,12 @@ public class MockAdminClient extends AdminClient {
     public Map<MetricName, ? extends Metric> metrics() {
         return mockMetrics;
     }
+
+    public void setFetchesRemainingUntilVisible(String topicName, int 
fetchesRemainingUntilVisible) {
+        TopicMetadata metadata = allTopics.get(topicName);
+        if (metadata == null) {
+            throw new RuntimeException("No such topic as " + topicName);
+        }
+        metadata.fetchesRemainingUntilVisible = fetchesRemainingUntilVisible;
+    }
 }
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java 
b/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java
index adce304..cb765cc 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java
@@ -248,7 +248,7 @@ public final class WorkerUtils {
      * @throws RuntimeException  If one or more topics have different number 
of partitions than
      * described in 'topicsInfo'
      */
-    private static void verifyTopics(
+    static void verifyTopics(
         Logger log, AdminClient adminClient,
         Collection<String> topicsToVerify, Map<String, NewTopic> topicsInfo, 
int retryCount, long retryBackoffMs) throws Throwable {
 
@@ -279,9 +279,13 @@ public final class WorkerUtils {
                 DescribeTopicsResult topicsResult = adminClient.describeTopics(
                         topicsToVerify, new 
DescribeTopicsOptions().timeoutMs(ADMIN_REQUEST_TIMEOUT));
                 return topicsResult.all().get();
-            } catch (UnknownTopicOrPartitionException exception) {
-                lastException = exception;
-                Thread.sleep(retryBackoffMs);
+            } catch (ExecutionException exception) {
+                if (exception.getCause() instanceof 
UnknownTopicOrPartitionException) {
+                    lastException = (UnknownTopicOrPartitionException) 
exception.getCause();
+                    Thread.sleep(retryBackoffMs);
+                } else {
+                    throw exception;
+                }
             }
         }
         throw lastException;
diff --git 
a/tools/src/test/java/org/apache/kafka/trogdor/common/WorkerUtilsTest.java 
b/tools/src/test/java/org/apache/kafka/trogdor/common/WorkerUtilsTest.java
index a35efe1..275589b 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/common/WorkerUtilsTest.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/common/WorkerUtilsTest.java
@@ -17,24 +17,23 @@
 
 package org.apache.kafka.trogdor.common;
 
+import static org.junit.Assert.assertEquals;
 
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.TopicPartitionInfo;
-
-import org.apache.kafka.common.Node;
-import org.apache.kafka.clients.admin.MockAdminClient;
-
 import org.apache.kafka.common.errors.TopicExistsException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.kafka.clients.admin.NewTopic;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
-import static org.junit.Assert.assertEquals;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -46,7 +45,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
-
 public class WorkerUtilsTest {
 
     private static final Logger log = 
LoggerFactory.getLogger(WorkerUtilsTest.class);
@@ -318,4 +316,21 @@ public class WorkerUtilsTest {
             tpInfo,
             null);
     }
+
+    @Test
+    public void testVerifyTopics() throws Throwable {
+        Map<String, NewTopic> newTopics = Collections.singletonMap(TEST_TOPIC, 
NEW_TEST_TOPIC);
+        WorkerUtils.createTopics(log, adminClient, newTopics, true);
+        adminClient.setFetchesRemainingUntilVisible(TEST_TOPIC, 2);
+        WorkerUtils.verifyTopics(log, adminClient, 
Collections.singleton(TEST_TOPIC),
+            Collections.singletonMap(TEST_TOPIC, NEW_TEST_TOPIC), 3, 1);
+        adminClient.setFetchesRemainingUntilVisible(TEST_TOPIC, 100);
+        try {
+            WorkerUtils.verifyTopics(log, adminClient, 
Collections.singleton(TEST_TOPIC),
+                    Collections.singletonMap(TEST_TOPIC, NEW_TEST_TOPIC), 2, 
1);
+            Assert.fail("expected to get UnknownTopicOrPartitionException");
+        } catch (UnknownTopicOrPartitionException e) {
+            // expected
+        }
+    }
 }

Reply via email to