chia7712 commented on code in PR #17579:
URL: https://github.com/apache/kafka/pull/17579#discussion_r1817940100


##########
test-common/src/main/java/org/apache/kafka/common/test/TestUtils.java:
##########
@@ -125,4 +155,210 @@ static void retryOnExceptionWithTimeout(final Runnable 
runnable) throws Interrup
             Thread.sleep(DEFAULT_POLL_INTERVAL_MS);
         }
     }
+
+    /**
+     * Wrap a single record log buffer.
+     */
+    public static MemoryRecords singletonRecords(byte[] value,
+                                                 byte[] key,
+                                                 Compression codec,
+                                                 long timestamp,
+                                                 byte magicValue) {
+        return records(Collections.singletonList(new SimpleRecord(timestamp, 
key, value)), magicValue, codec);
+    }
+
+    public static MemoryRecords singletonRecords(byte[] value, byte[] key) {
+        return singletonRecords(value, key, Compression.NONE, 
RecordBatch.NO_TIMESTAMP, RecordBatch.CURRENT_MAGIC_VALUE);
+    }
+
+    public static MemoryRecords records(List<SimpleRecord> records,
+                                        byte magicValue,
+                                        Compression codec,
+                                        long producerId,
+                                        short producerEpoch,
+                                        int sequence,
+                                        long baseOffset,
+                                        int partitionLeaderEpoch) {
+        int sizeInBytes = DefaultRecordBatch.sizeInBytes(records);
+        ByteBuffer buffer = ByteBuffer.allocate(sizeInBytes);
+
+        try (MemoryRecordsBuilder builder = MemoryRecords.builder(
+            buffer,
+            magicValue,
+            codec,
+            TimestampType.CREATE_TIME,
+            baseOffset,
+            System.currentTimeMillis(),
+            producerId,
+            producerEpoch,
+            sequence,
+            false,
+            partitionLeaderEpoch
+        )) {
+            records.forEach(builder::append);
+            return builder.build();
+        }
+    }
+
+    public static MemoryRecords records(List<SimpleRecord> records, byte 
magicValue, Compression codec) {
+        return records(records,
+            magicValue,
+            codec,
+            RecordBatch.NO_PRODUCER_ID,
+            RecordBatch.NO_PRODUCER_EPOCH,
+            RecordBatch.NO_SEQUENCE,
+            0L,
+            RecordBatch.NO_PARTITION_LEADER_EPOCH);
+    }
+
+    public static <B extends KafkaBroker> void verifyTopicDeletion(String 
topic,
+                                                                   int 
numPartitions,
+                                                                   
Collection<B> brokers) throws Exception {
+        List<TopicPartition> topicPartitions = IntStream.range(0, 
numPartitions)
+            .mapToObj(partition -> new TopicPartition(topic, partition))
+            .collect(Collectors.toList());
+
+        // Ensure that the topic-partition has been deleted from all brokers' 
replica managers
+        TestUtils.waitForCondition(() -> brokers.stream().allMatch(broker ->
+                topicPartitions.stream().allMatch(tp -> 
broker.replicaManager().onlinePartition(tp).isEmpty())),
+            "Replica manager's should have deleted all of this topic's 
partitions");
+
+        // Ensure that logs from all replicas are deleted
+        TestUtils.waitForCondition(() -> brokers.stream().allMatch(broker ->
+                topicPartitions.stream().allMatch(tp -> 
broker.logManager().getLog(tp, false).isEmpty())),
+            "Replica logs not deleted after delete topic is complete");
+
+        // Ensure that the topic is removed from all cleaner offsets
+        TestUtils.waitForCondition(() -> brokers.stream().allMatch(broker ->
+                topicPartitions.stream().allMatch(tp -> {
+                    List<File> liveLogDirs = 
CollectionConverters.asJava(broker.logManager().liveLogDirs());
+                    return liveLogDirs.stream().allMatch(logDir -> {
+                        OffsetCheckpointFile checkpointFile;
+                        try {
+                            checkpointFile = new OffsetCheckpointFile(new 
File(logDir, "cleaner-offset-checkpoint"), null);
+                        } catch (IOException e) {
+                            throw new RuntimeException(e);
+                        }
+                        return !checkpointFile.read().containsKey(tp);
+                    });
+                })),
+            "Cleaner offset for deleted partition should have been removed");
+
+        // Ensure that the topic directories are soft-deleted
+        TestUtils.waitForCondition(() -> brokers.stream().allMatch(broker ->
+                
CollectionConverters.asJava(broker.config().logDirs()).stream().allMatch(logDir 
->
+                    topicPartitions.stream().noneMatch(tp ->
+                        new File(logDir, tp.topic() + "-" + 
tp.partition()).exists()))),
+            "Failed to soft-delete the data to a delete directory");
+
+        // Ensure that the topic directories are hard-deleted
+        TestUtils.waitForCondition(() -> brokers.stream().allMatch(broker ->
+            
CollectionConverters.asJava(broker.config().logDirs()).stream().allMatch(logDir 
->
+                topicPartitions.stream().allMatch(tp ->
+                    Arrays.stream(Objects.requireNonNull(new 
File(logDir).list())).noneMatch(partitionDirectoryName ->
+                        partitionDirectoryName.startsWith(tp.topic() + "-" + 
tp.partition()) &&
+                            
partitionDirectoryName.endsWith(UnifiedLog.DeleteDirSuffix())))
+            )
+        ), "Failed to hard-delete the delete directory");
+    }
+
+    public static int waitUntilLeaderIsElectedOrChangedWithAdmin(Admin admin,
+                                                                 String topic,
+                                                                 int 
partitionNumber,
+                                                                 long 
timeoutMs,
+                                                                 
Optional<Integer> oldLeaderOpt,

Review Comment:
   `oldLeaderOpt`  and `newLeaderOpt` are never assigned



##########
test-common/src/main/java/org/apache/kafka/common/test/TestUtils.java:
##########
@@ -125,4 +155,210 @@ static void retryOnExceptionWithTimeout(final Runnable 
runnable) throws Interrup
             Thread.sleep(DEFAULT_POLL_INTERVAL_MS);
         }
     }
+
+    /**
+     * Wrap a single record log buffer.
+     */
+    public static MemoryRecords singletonRecords(byte[] value,
+                                                 byte[] key,
+                                                 Compression codec,
+                                                 long timestamp,
+                                                 byte magicValue) {
+        return records(Collections.singletonList(new SimpleRecord(timestamp, 
key, value)), magicValue, codec);
+    }
+
+    public static MemoryRecords singletonRecords(byte[] value, byte[] key) {
+        return singletonRecords(value, key, Compression.NONE, 
RecordBatch.NO_TIMESTAMP, RecordBatch.CURRENT_MAGIC_VALUE);
+    }
+
+    public static MemoryRecords records(List<SimpleRecord> records,
+                                        byte magicValue,
+                                        Compression codec,
+                                        long producerId,
+                                        short producerEpoch,
+                                        int sequence,
+                                        long baseOffset,
+                                        int partitionLeaderEpoch) {
+        int sizeInBytes = DefaultRecordBatch.sizeInBytes(records);
+        ByteBuffer buffer = ByteBuffer.allocate(sizeInBytes);
+
+        try (MemoryRecordsBuilder builder = MemoryRecords.builder(
+            buffer,
+            magicValue,
+            codec,
+            TimestampType.CREATE_TIME,
+            baseOffset,
+            System.currentTimeMillis(),
+            producerId,
+            producerEpoch,
+            sequence,
+            false,
+            partitionLeaderEpoch
+        )) {
+            records.forEach(builder::append);
+            return builder.build();
+        }
+    }
+
+    public static MemoryRecords records(List<SimpleRecord> records, byte 
magicValue, Compression codec) {
+        return records(records,
+            magicValue,
+            codec,
+            RecordBatch.NO_PRODUCER_ID,
+            RecordBatch.NO_PRODUCER_EPOCH,
+            RecordBatch.NO_SEQUENCE,
+            0L,
+            RecordBatch.NO_PARTITION_LEADER_EPOCH);
+    }
+
+    public static <B extends KafkaBroker> void verifyTopicDeletion(String 
topic,

Review Comment:
   Could you move this method to `ClusterInstance`?



##########
test-common/src/main/java/org/apache/kafka/common/test/TestUtils.java:
##########
@@ -125,4 +155,210 @@ static void retryOnExceptionWithTimeout(final Runnable 
runnable) throws Interrup
             Thread.sleep(DEFAULT_POLL_INTERVAL_MS);
         }
     }
+
+    /**
+     * Wrap a single record log buffer.
+     */
+    public static MemoryRecords singletonRecords(byte[] value,
+                                                 byte[] key,
+                                                 Compression codec,
+                                                 long timestamp,
+                                                 byte magicValue) {
+        return records(Collections.singletonList(new SimpleRecord(timestamp, 
key, value)), magicValue, codec);
+    }
+
+    public static MemoryRecords singletonRecords(byte[] value, byte[] key) {
+        return singletonRecords(value, key, Compression.NONE, 
RecordBatch.NO_TIMESTAMP, RecordBatch.CURRENT_MAGIC_VALUE);
+    }
+
+    public static MemoryRecords records(List<SimpleRecord> records,
+                                        byte magicValue,
+                                        Compression codec,
+                                        long producerId,
+                                        short producerEpoch,
+                                        int sequence,
+                                        long baseOffset,
+                                        int partitionLeaderEpoch) {
+        int sizeInBytes = DefaultRecordBatch.sizeInBytes(records);
+        ByteBuffer buffer = ByteBuffer.allocate(sizeInBytes);
+
+        try (MemoryRecordsBuilder builder = MemoryRecords.builder(
+            buffer,
+            magicValue,
+            codec,
+            TimestampType.CREATE_TIME,
+            baseOffset,
+            System.currentTimeMillis(),
+            producerId,
+            producerEpoch,
+            sequence,
+            false,
+            partitionLeaderEpoch
+        )) {
+            records.forEach(builder::append);
+            return builder.build();
+        }
+    }
+
+    public static MemoryRecords records(List<SimpleRecord> records, byte 
magicValue, Compression codec) {
+        return records(records,
+            magicValue,
+            codec,
+            RecordBatch.NO_PRODUCER_ID,
+            RecordBatch.NO_PRODUCER_EPOCH,
+            RecordBatch.NO_SEQUENCE,
+            0L,
+            RecordBatch.NO_PARTITION_LEADER_EPOCH);
+    }
+
+    public static <B extends KafkaBroker> void verifyTopicDeletion(String 
topic,
+                                                                   int 
numPartitions,
+                                                                   
Collection<B> brokers) throws Exception {
+        List<TopicPartition> topicPartitions = IntStream.range(0, 
numPartitions)
+            .mapToObj(partition -> new TopicPartition(topic, partition))
+            .collect(Collectors.toList());
+
+        // Ensure that the topic-partition has been deleted from all brokers' 
replica managers
+        TestUtils.waitForCondition(() -> brokers.stream().allMatch(broker ->
+                topicPartitions.stream().allMatch(tp -> 
broker.replicaManager().onlinePartition(tp).isEmpty())),
+            "Replica manager's should have deleted all of this topic's 
partitions");
+
+        // Ensure that logs from all replicas are deleted
+        TestUtils.waitForCondition(() -> brokers.stream().allMatch(broker ->
+                topicPartitions.stream().allMatch(tp -> 
broker.logManager().getLog(tp, false).isEmpty())),
+            "Replica logs not deleted after delete topic is complete");
+
+        // Ensure that the topic is removed from all cleaner offsets
+        TestUtils.waitForCondition(() -> brokers.stream().allMatch(broker ->
+                topicPartitions.stream().allMatch(tp -> {
+                    List<File> liveLogDirs = 
CollectionConverters.asJava(broker.logManager().liveLogDirs());
+                    return liveLogDirs.stream().allMatch(logDir -> {
+                        OffsetCheckpointFile checkpointFile;
+                        try {
+                            checkpointFile = new OffsetCheckpointFile(new 
File(logDir, "cleaner-offset-checkpoint"), null);
+                        } catch (IOException e) {
+                            throw new RuntimeException(e);
+                        }
+                        return !checkpointFile.read().containsKey(tp);
+                    });
+                })),
+            "Cleaner offset for deleted partition should have been removed");
+
+        // Ensure that the topic directories are soft-deleted
+        TestUtils.waitForCondition(() -> brokers.stream().allMatch(broker ->
+                
CollectionConverters.asJava(broker.config().logDirs()).stream().allMatch(logDir 
->
+                    topicPartitions.stream().noneMatch(tp ->
+                        new File(logDir, tp.topic() + "-" + 
tp.partition()).exists()))),
+            "Failed to soft-delete the data to a delete directory");
+
+        // Ensure that the topic directories are hard-deleted
+        TestUtils.waitForCondition(() -> brokers.stream().allMatch(broker ->
+            
CollectionConverters.asJava(broker.config().logDirs()).stream().allMatch(logDir 
->
+                topicPartitions.stream().allMatch(tp ->
+                    Arrays.stream(Objects.requireNonNull(new 
File(logDir).list())).noneMatch(partitionDirectoryName ->
+                        partitionDirectoryName.startsWith(tp.topic() + "-" + 
tp.partition()) &&
+                            
partitionDirectoryName.endsWith(UnifiedLog.DeleteDirSuffix())))
+            )
+        ), "Failed to hard-delete the delete directory");
+    }
+
+    public static int waitUntilLeaderIsElectedOrChangedWithAdmin(Admin admin,
+                                                                 String topic,
+                                                                 int 
partitionNumber,
+                                                                 long 
timeoutMs,
+                                                                 
Optional<Integer> oldLeaderOpt,
+                                                                 
Optional<Integer> newLeaderOpt) throws Exception {
+        if (oldLeaderOpt.isPresent() && newLeaderOpt.isPresent()) {
+            throw new IllegalArgumentException("Can't define both the old and 
the new leader");
+        }
+
+        GetPartitionLeader getPartitionLeader = (t, p) -> {
+            TopicDescription topicDescription = 
admin.describeTopics(Collections.singletonList(t)).allTopicNames().get().get(t);
+            return topicDescription.partitions().stream()
+                    .filter(partitionInfo -> partitionInfo.partition() == p)
+                    .findFirst()
+                    .map(partitionInfo -> partitionInfo.leader().id() == 
Node.noNode().id() ? null : partitionInfo.leader().id());
+        };
+
+        return doWaitUntilLeaderIsElectedOrChanged(getPartitionLeader, topic, 
partitionNumber, timeoutMs, oldLeaderOpt, newLeaderOpt);
+    }
+
+    private static int doWaitUntilLeaderIsElectedOrChanged(GetPartitionLeader 
getPartitionLeader,

Review Comment:
   Could you please try to simply `waitUntilLeaderIsElectedOrChangedWithAdmin` 
and `doWaitUntilLeaderIsElectedOrChanged`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to