mumrah commented on code in PR #13802: URL: https://github.com/apache/kafka/pull/13802#discussion_r1223265946
########## metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriterTest.java: ########## @@ -195,9 +235,658 @@ public void iterateTopics(EnumSet<TopicVisitorInterest> interests, TopicVisitor (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY)); writer.handleSnapshot(image, consumer); assertEquals(1, opCounts.remove("CreateTopic")); - assertEquals(1, opCounts.remove("UpdatePartition")); + assertEquals(1, opCounts.remove("UpdatePartitions")); assertEquals(1, opCounts.remove("UpdateTopic")); assertEquals(0, opCounts.size()); assertEquals("bar", topicClient.createdTopics.get(0)); } + + @Test + public void testDeleteTopicFromSnapshot() { + CapturingTopicMigrationClient topicClient = new CapturingTopicMigrationClient() { + @Override + public void iterateTopics(EnumSet<TopicVisitorInterest> interests, TopicVisitor visitor) { + visitor.visitTopic("spam", Uuid.randomUuid(), Collections.emptyMap()); + } + }; + CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder() + .setBrokersInZk(0) + .setTopicMigrationClient(topicClient) + .build(); + + KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient); + + Map<String, Integer> opCounts = new HashMap<>(); + KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts, + (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY)); + writer.handleTopicsSnapshot(TopicsImage.EMPTY, consumer); + assertEquals(1, opCounts.remove("DeleteTopic")); + assertEquals(1, opCounts.remove("DeleteTopicConfig")); + assertEquals(0, opCounts.size()); + assertEquals(Collections.singletonList("spam"), topicClient.deletedTopics); + + opCounts.clear(); + topicClient.reset(); + writer.handleTopicsSnapshot(TopicsImageTest.IMAGE1, consumer); + assertEquals(1, opCounts.remove("DeleteTopic")); + assertEquals(1, opCounts.remove("DeleteTopicConfig")); + assertEquals(2, opCounts.remove("CreateTopic")); + assertEquals(0, opCounts.size()); + assertEquals(Collections.singletonList("spam"), topicClient.deletedTopics); + assertEquals(Arrays.asList("foo", "bar"), topicClient.createdTopics); + } + + @FunctionalInterface + interface TopicVerifier { + void verify(Uuid topicId, TopicsImage topicsImage, CapturingTopicMigrationClient topicClient, KRaftMigrationZkWriter writer); + } + + void setupTopicWithTwoPartitions(TopicVerifier verifier) { + // Set up a topic with two partitions in ZK (via iterateTopics) and a KRaft TopicsImage, then run the given verifier + Uuid topicId = Uuid.randomUuid(); + Map<Integer, PartitionRegistration> partitionMap = new HashMap<>(); + partitionMap.put(0, new PartitionRegistration(new int[]{2, 3, 4}, new int[]{2, 3, 4}, new int[]{}, new int[]{}, 2, LeaderRecoveryState.RECOVERED, 0, -1)); + partitionMap.put(1, new PartitionRegistration(new int[]{3, 4, 5}, new int[]{3, 4, 5}, new int[]{}, new int[]{}, 3, LeaderRecoveryState.RECOVERED, 0, -1)); + + CapturingTopicMigrationClient topicClient = new CapturingTopicMigrationClient() { + @Override + public void iterateTopics(EnumSet<TopicVisitorInterest> interests, TopicVisitor visitor) { + Map<Integer, List<Integer>> assignments = new HashMap<>(); + assignments.put(0, Arrays.asList(2, 3, 4)); + assignments.put(1, Arrays.asList(3, 4, 5)); + visitor.visitTopic("spam", topicId, assignments); + visitor.visitPartition(new TopicIdPartition(topicId, new TopicPartition("spam", 0)), partitionMap.get(0)); + visitor.visitPartition(new TopicIdPartition(topicId, new TopicPartition("spam", 1)), partitionMap.get(1)); + } + }; + + CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder() + .setBrokersInZk(0) + .setTopicMigrationClient(topicClient) + .build(); + KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient); + + TopicsDelta delta = new TopicsDelta(TopicsImage.EMPTY); + delta.replay(new TopicRecord().setTopicId(topicId).setName("spam")); + delta.replay((PartitionRecord) partitionMap.get(0).toRecord(topicId, 0).message()); + delta.replay((PartitionRecord) partitionMap.get(1).toRecord(topicId, 1).message()); + TopicsImage image = delta.apply(); + + verifier.verify(topicId, image, topicClient, writer); + } + + @Test + public void testUpdatePartitionsFromSnapshot() { + setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> { + Map<String, Integer> opCounts = new HashMap<>(); + KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts, + (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY)); + writer.handleTopicsSnapshot(topicsImage, consumer); + assertEquals(0, opCounts.size(), "No operations expected since the data is the same"); + + TopicsDelta topicsDelta = new TopicsDelta(topicsImage); + topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setIsr(Arrays.asList(2, 3))); + topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(1).setReplicas(Arrays.asList(3, 4, 5)).setLeader(3)); + topicsImage = topicsDelta.apply(); + + writer.handleTopicsSnapshot(topicsImage, consumer); + assertEquals(1, opCounts.remove("UpdatePartitions")); + assertEquals(0, opCounts.size()); + }); + } + + @Test + public void testTopicReassignmentDelta() { + setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> { + TopicsDelta topicsDelta = new TopicsDelta(topicsImage); + topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setIsr(Arrays.asList(2, 3))); + topicsImage = topicsDelta.apply(); + + Map<String, Integer> opCounts = new HashMap<>(); + KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts, + (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY)); + writer.handleTopicsDelta(__ -> "", topicsImage, topicsDelta, consumer); + assertEquals(1, opCounts.remove("UpdatePartitions")); + assertEquals(0, opCounts.size()); + + assertEquals(1, topicClient.updatedTopicPartitions.get("spam").size()); + assertEquals(Collections.singleton(0), topicClient.updatedTopicPartitions.get("spam")); + }); + } + + @Test + public void testNewTopicSnapshot() { + setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> { + TopicsDelta topicsDelta = new TopicsDelta(topicsImage); + Uuid newTopicId = Uuid.randomUuid(); + topicsDelta.replay(new TopicRecord().setTopicId(newTopicId).setName("new")); + topicsDelta.replay(new PartitionRecord().setTopicId(newTopicId).setPartitionId(0).setReplicas(Arrays.asList(0, 1, 2))); + topicsDelta.replay(new PartitionRecord().setTopicId(newTopicId).setPartitionId(1).setReplicas(Arrays.asList(1, 2, 3))); + topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setIsr(Arrays.asList(2, 3))); + topicsImage = topicsDelta.apply(); + + Map<String, Integer> opCounts = new HashMap<>(); + KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts, + (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY)); + writer.handleTopicsSnapshot(topicsImage, consumer); + assertEquals(1, opCounts.remove("UpdatePartitions")); + assertEquals(1, opCounts.remove("CreateTopic")); + assertEquals(0, opCounts.size()); + }); + } + + @Test + public void testNewTopicDelta() { + setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> { + TopicsDelta topicsDelta = new TopicsDelta(topicsImage); + Uuid newTopicId = Uuid.randomUuid(); + topicsDelta.replay(new TopicRecord().setTopicId(newTopicId).setName("new")); + topicsDelta.replay(new PartitionRecord().setTopicId(newTopicId).setPartitionId(0).setReplicas(Arrays.asList(0, 1, 2))); + topicsDelta.replay(new PartitionRecord().setTopicId(newTopicId).setPartitionId(1).setReplicas(Arrays.asList(1, 2, 3))); + topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setIsr(Arrays.asList(2, 3))); + topicsImage = topicsDelta.apply(); + + Map<String, Integer> opCounts = new HashMap<>(); + KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts, + (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY)); + writer.handleTopicsDelta(__ -> "", topicsImage, topicsDelta, consumer); + assertEquals(1, opCounts.remove("UpdatePartitions")); + assertEquals(1, opCounts.remove("CreateTopic")); + assertEquals(0, opCounts.size()); + }); + } + + @Test + public void testNewPartitionDelta() { + setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> { + TopicsDelta topicsDelta = new TopicsDelta(topicsImage); + topicsDelta.replay(new PartitionRecord().setTopicId(topicId).setPartitionId(2).setReplicas(Arrays.asList(1, 2, 3))); + topicsImage = topicsDelta.apply(); + + Map<String, Integer> opCounts = new HashMap<>(); + KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts, + (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY)); + writer.handleTopicsDelta(__ -> "", topicsImage, topicsDelta, consumer); + assertEquals(1, opCounts.remove("UpdatePartitions")); + assertEquals(1, opCounts.remove("UpdateTopic")); + assertEquals(0, opCounts.size()); + }); + } + + @Test + public void testPartitionDelta() { + setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> { + TopicsDelta topicsDelta = new TopicsDelta(topicsImage); + topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setReplicas(Arrays.asList(3, 4, 5)).setLeader(3)); + topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(1).setReplicas(Arrays.asList(1, 2, 3)).setLeader(1)); + topicsImage = topicsDelta.apply(); + + Map<String, Integer> opCounts = new HashMap<>(); + KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts, + (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY)); + writer.handleTopicsDelta(__ -> "", topicsImage, topicsDelta, consumer); + assertEquals(1, opCounts.remove("UpdateTopic")); + assertEquals(1, opCounts.remove("UpdatePartitions")); Review Comment: TopicMigrationClient#updateTopicPartitions takes a map of all the partition states to update for a topic, so it's just one "operation" from the client perspective (even though it's multiple ZK writes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org