rhauch commented on a change in pull request #9780: URL: https://github.com/apache/kafka/pull/9780#discussion_r570652791
########## File path: connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java ########## @@ -457,17 +465,273 @@ public void verifyingGettingTopicCleanupPolicies() { } } + @Test + public void endOffsetsShouldFailWithNonRetriableWhenAuthorizationFailureOccurs() { + String topicName = "myTopic"; + TopicPartition tp1 = new TopicPartition(topicName, 0); + Set<TopicPartition> tps = Collections.singleton(tp1); + Long offset = null; // response should use error + Cluster cluster = createCluster(1, topicName, 1); + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); + env.kafkaClient().prepareResponse(listOffsetsResultWithClusterAuthorizationException(tp1, offset)); + TopicAdmin admin = new TopicAdmin(null, env.adminClient()); + ConnectException e = assertThrows(ConnectException.class, () -> { + admin.endOffsets(tps); + }); + assertTrue(e.getMessage().contains("Not authorized to get the end offsets")); + } + } + + @Test + public void endOffsetsShouldFailWithNonRetriableWhenVersionUnsupportedErrorOccurs() { + String topicName = "myTopic"; + TopicPartition tp1 = new TopicPartition(topicName, 0); + Set<TopicPartition> tps = Collections.singleton(tp1); + Long offset = null; // response should use error + Cluster cluster = createCluster(1, topicName, 1); + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); + env.kafkaClient().prepareResponse(listOffsetsResultWithUnsupportedVersion(tp1, offset)); + TopicAdmin admin = new TopicAdmin(null, env.adminClient()); + ConnectException e = assertThrows(ConnectException.class, () -> { + admin.endOffsets(tps); + }); + assertTrue(e.getMessage().contains("is unsupported on brokers")); + } + } + + @Test + public void endOffsetsShouldFailWithRetriableWhenTimeoutErrorOccurs() { + String topicName = "myTopic"; + TopicPartition tp1 = new TopicPartition(topicName, 0); + Set<TopicPartition> tps = Collections.singleton(tp1); + Long offset = null; // response should use error + Cluster cluster = createCluster(1, topicName, 1); + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); + env.kafkaClient().prepareResponse(listOffsetsResultWithTimeout(tp1, offset)); + TopicAdmin admin = new TopicAdmin(null, env.adminClient()); + RetriableException e = assertThrows(RetriableException.class, () -> { + admin.endOffsets(tps); + }); + assertTrue(e.getMessage().contains("Timed out while waiting")); + } + } + + @Test + public void endOffsetsShouldFailWithNonRetriableWhenUnknownErrorOccurs() { + String topicName = "myTopic"; + TopicPartition tp1 = new TopicPartition(topicName, 0); + Set<TopicPartition> tps = Collections.singleton(tp1); + Long offset = null; // response should use error + Cluster cluster = createCluster(1, topicName, 1); + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); + env.kafkaClient().prepareResponse(listOffsetsResultWithUnknownError(tp1, offset)); + TopicAdmin admin = new TopicAdmin(null, env.adminClient()); + ConnectException e = assertThrows(ConnectException.class, () -> { + admin.endOffsets(tps); + }); + assertTrue(e.getMessage().contains("Error while getting end offsets for topic")); + } + } + + @Test + public void endOffsetsShouldReturnEmptyMapWhenPartitionsSetIsNull() { + String topicName = "myTopic"; + Cluster cluster = createCluster(1, topicName, 1); + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) { + TopicAdmin admin = new TopicAdmin(null, env.adminClient()); + Map<TopicPartition, Long> offsets = admin.endOffsets(Collections.emptySet()); + assertTrue(offsets.isEmpty()); + } + } + + @Test + public void endOffsetsShouldReturnOffsetsForOnePartition() { + String topicName = "myTopic"; + TopicPartition tp1 = new TopicPartition(topicName, 0); + Set<TopicPartition> tps = Collections.singleton(tp1); + long offset = 1000L; + Cluster cluster = createCluster(1, topicName, 1); + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); + env.kafkaClient().prepareResponse(listOffsetsResult(tp1, offset)); + TopicAdmin admin = new TopicAdmin(null, env.adminClient()); + Map<TopicPartition, Long> offsets = admin.endOffsets(tps); + assertEquals(1, offsets.size()); + assertEquals(Long.valueOf(offset), offsets.get(tp1)); + } + } + + @Test + public void endOffsetsShouldReturnOffsetsForMultiplePartitions() { + String topicName = "myTopic"; + TopicPartition tp1 = new TopicPartition(topicName, 0); + TopicPartition tp2 = new TopicPartition(topicName, 1); + Set<TopicPartition> tps = new HashSet<>(Arrays.asList(tp1, tp2)); + long offset1 = 1001; + long offset2 = 1002; + Cluster cluster = createCluster(1, topicName, 2); + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); + env.kafkaClient().prepareResponse(listOffsetsResult(tp1, offset1, tp2, offset2)); + TopicAdmin admin = new TopicAdmin(null, env.adminClient()); + Map<TopicPartition, Long> offsets = admin.endOffsets(tps); + assertEquals(2, offsets.size()); + assertEquals(Long.valueOf(offset1), offsets.get(tp1)); + assertEquals(Long.valueOf(offset2), offsets.get(tp2)); + } + } + + @Test + public void endOffsetsShouldFailWhenAnyTopicPartitionHasError() { + String topicName = "myTopic"; + TopicPartition tp1 = new TopicPartition(topicName, 0); + Set<TopicPartition> tps = Collections.singleton(tp1); + long offset = 1000; + Cluster cluster = createCluster(1, topicName, 1); + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); + env.kafkaClient().prepareResponse(listOffsetsResultWithClusterAuthorizationException(tp1, null)); + TopicAdmin admin = new TopicAdmin(null, env.adminClient()); + ConnectException e = assertThrows(ConnectException.class, () -> { + admin.endOffsets(tps); + }); + assertTrue(e.getMessage().contains("Not authorized to get the end offsets")); + } + } + private Cluster createCluster(int numNodes) { + return createCluster(numNodes, "unused", 0); + } + + private Cluster createCluster(int numNodes, String topicName, int partitions) { + Node[] nodeArray = new Node[numNodes]; Review comment: Yes, these are just the ISRs for the one partition that we set up the cluster with. The utility method did allow multiple nodes, but we don't really use that much in this class. I think this changed because we now have to define the `PartitionInfo` instances rather than an empty map. Not sure why that's now different, but supplying the empty infos definitely caused problems in these new tests. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org