rhauch commented on a change in pull request #9780:
URL: https://github.com/apache/kafka/pull/9780#discussion_r570652791



##########
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
##########
@@ -457,17 +465,273 @@ public void verifyingGettingTopicCleanupPolicies() {
         }
     }
 
+    @Test
+    public void 
endOffsetsShouldFailWithNonRetriableWhenAuthorizationFailureOccurs() {
+        String topicName = "myTopic";
+        TopicPartition tp1 = new TopicPartition(topicName, 0);
+        Set<TopicPartition> tps = Collections.singleton(tp1);
+        Long offset = null; // response should use error
+        Cluster cluster = createCluster(1, topicName, 1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new 
MockTime(), cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+            
env.kafkaClient().prepareResponse(listOffsetsResultWithClusterAuthorizationException(tp1,
 offset));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            ConnectException e = assertThrows(ConnectException.class, () -> {
+                admin.endOffsets(tps);
+            });
+            assertTrue(e.getMessage().contains("Not authorized to get the end 
offsets"));
+        }
+    }
+
+    @Test
+    public void 
endOffsetsShouldFailWithNonRetriableWhenVersionUnsupportedErrorOccurs() {
+        String topicName = "myTopic";
+        TopicPartition tp1 = new TopicPartition(topicName, 0);
+        Set<TopicPartition> tps = Collections.singleton(tp1);
+        Long offset = null; // response should use error
+        Cluster cluster = createCluster(1, topicName, 1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new 
MockTime(), cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+            
env.kafkaClient().prepareResponse(listOffsetsResultWithUnsupportedVersion(tp1, 
offset));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            ConnectException e = assertThrows(ConnectException.class, () -> {
+                admin.endOffsets(tps);
+            });
+            assertTrue(e.getMessage().contains("is unsupported on brokers"));
+        }
+    }
+
+    @Test
+    public void endOffsetsShouldFailWithRetriableWhenTimeoutErrorOccurs() {
+        String topicName = "myTopic";
+        TopicPartition tp1 = new TopicPartition(topicName, 0);
+        Set<TopicPartition> tps = Collections.singleton(tp1);
+        Long offset = null; // response should use error
+        Cluster cluster = createCluster(1, topicName, 1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new 
MockTime(), cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+            
env.kafkaClient().prepareResponse(listOffsetsResultWithTimeout(tp1, offset));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            RetriableException e = assertThrows(RetriableException.class, () 
-> {
+                admin.endOffsets(tps);
+            });
+            assertTrue(e.getMessage().contains("Timed out while waiting"));
+        }
+    }
+
+    @Test
+    public void endOffsetsShouldFailWithNonRetriableWhenUnknownErrorOccurs() {
+        String topicName = "myTopic";
+        TopicPartition tp1 = new TopicPartition(topicName, 0);
+        Set<TopicPartition> tps = Collections.singleton(tp1);
+        Long offset = null; // response should use error
+        Cluster cluster = createCluster(1, topicName, 1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new 
MockTime(), cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+            
env.kafkaClient().prepareResponse(listOffsetsResultWithUnknownError(tp1, 
offset));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            ConnectException e = assertThrows(ConnectException.class, () -> {
+                admin.endOffsets(tps);
+            });
+            assertTrue(e.getMessage().contains("Error while getting end 
offsets for topic"));
+        }
+    }
+
+    @Test
+    public void endOffsetsShouldReturnEmptyMapWhenPartitionsSetIsNull() {
+        String topicName = "myTopic";
+        Cluster cluster = createCluster(1, topicName, 1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new 
MockTime(), cluster)) {
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            Map<TopicPartition, Long> offsets = 
admin.endOffsets(Collections.emptySet());
+            assertTrue(offsets.isEmpty());
+        }
+    }
+
+    @Test
+    public void endOffsetsShouldReturnOffsetsForOnePartition() {
+        String topicName = "myTopic";
+        TopicPartition tp1 = new TopicPartition(topicName, 0);
+        Set<TopicPartition> tps = Collections.singleton(tp1);
+        long offset = 1000L;
+        Cluster cluster = createCluster(1, topicName, 1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new 
MockTime(), cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+            env.kafkaClient().prepareResponse(listOffsetsResult(tp1, offset));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            Map<TopicPartition, Long> offsets = admin.endOffsets(tps);
+            assertEquals(1, offsets.size());
+            assertEquals(Long.valueOf(offset), offsets.get(tp1));
+        }
+    }
+
+    @Test
+    public void endOffsetsShouldReturnOffsetsForMultiplePartitions() {
+        String topicName = "myTopic";
+        TopicPartition tp1 = new TopicPartition(topicName, 0);
+        TopicPartition tp2 = new TopicPartition(topicName, 1);
+        Set<TopicPartition> tps = new HashSet<>(Arrays.asList(tp1, tp2));
+        long offset1 = 1001;
+        long offset2 = 1002;
+        Cluster cluster = createCluster(1, topicName, 2);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new 
MockTime(), cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+            env.kafkaClient().prepareResponse(listOffsetsResult(tp1, offset1, 
tp2, offset2));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            Map<TopicPartition, Long> offsets = admin.endOffsets(tps);
+            assertEquals(2, offsets.size());
+            assertEquals(Long.valueOf(offset1), offsets.get(tp1));
+            assertEquals(Long.valueOf(offset2), offsets.get(tp2));
+        }
+    }
+
+    @Test
+    public void endOffsetsShouldFailWhenAnyTopicPartitionHasError() {
+        String topicName = "myTopic";
+        TopicPartition tp1 = new TopicPartition(topicName, 0);
+        Set<TopicPartition> tps = Collections.singleton(tp1);
+        long offset = 1000;
+        Cluster cluster = createCluster(1, topicName, 1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new 
MockTime(), cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+            
env.kafkaClient().prepareResponse(listOffsetsResultWithClusterAuthorizationException(tp1,
 null));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            ConnectException e = assertThrows(ConnectException.class, () -> {
+                admin.endOffsets(tps);
+            });
+            assertTrue(e.getMessage().contains("Not authorized to get the end 
offsets"));
+        }
+    }
+
     private Cluster createCluster(int numNodes) {
+        return createCluster(numNodes, "unused", 0);
+    }
+
+    private Cluster createCluster(int numNodes, String topicName, int 
partitions) {
+        Node[] nodeArray = new Node[numNodes];

Review comment:
       Yes, these are just the ISRs for the one partition that we set up the 
cluster with.
   
   The utility method did allow multiple nodes, but we don't really use that 
much in this class. I think this changed because we now have to define the 
`PartitionInfo` instances rather than an empty map. Not sure why that's now 
different, but supplying the empty infos definitely caused problems in these 
new tests.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to