chia7712 commented on code in PR #17579:
URL: https://github.com/apache/kafka/pull/17579#discussion_r1817940100
##########
test-common/src/main/java/org/apache/kafka/common/test/TestUtils.java:
##########
@@ -125,4 +155,210 @@ static void retryOnExceptionWithTimeout(final Runnable
runnable) throws Interrup
Thread.sleep(DEFAULT_POLL_INTERVAL_MS);
}
}
+
+ /**
+ * Wrap a single record log buffer.
+ */
+ public static MemoryRecords singletonRecords(byte[] value,
+ byte[] key,
+ Compression codec,
+ long timestamp,
+ byte magicValue) {
+ return records(Collections.singletonList(new SimpleRecord(timestamp,
key, value)), magicValue, codec);
+ }
+
+ public static MemoryRecords singletonRecords(byte[] value, byte[] key) {
+ return singletonRecords(value, key, Compression.NONE,
RecordBatch.NO_TIMESTAMP, RecordBatch.CURRENT_MAGIC_VALUE);
+ }
+
+ public static MemoryRecords records(List<SimpleRecord> records,
+ byte magicValue,
+ Compression codec,
+ long producerId,
+ short producerEpoch,
+ int sequence,
+ long baseOffset,
+ int partitionLeaderEpoch) {
+ int sizeInBytes = DefaultRecordBatch.sizeInBytes(records);
+ ByteBuffer buffer = ByteBuffer.allocate(sizeInBytes);
+
+ try (MemoryRecordsBuilder builder = MemoryRecords.builder(
+ buffer,
+ magicValue,
+ codec,
+ TimestampType.CREATE_TIME,
+ baseOffset,
+ System.currentTimeMillis(),
+ producerId,
+ producerEpoch,
+ sequence,
+ false,
+ partitionLeaderEpoch
+ )) {
+ records.forEach(builder::append);
+ return builder.build();
+ }
+ }
+
+ public static MemoryRecords records(List<SimpleRecord> records, byte
magicValue, Compression codec) {
+ return records(records,
+ magicValue,
+ codec,
+ RecordBatch.NO_PRODUCER_ID,
+ RecordBatch.NO_PRODUCER_EPOCH,
+ RecordBatch.NO_SEQUENCE,
+ 0L,
+ RecordBatch.NO_PARTITION_LEADER_EPOCH);
+ }
+
+ public static <B extends KafkaBroker> void verifyTopicDeletion(String
topic,
+ int
numPartitions,
+
Collection<B> brokers) throws Exception {
+ List<TopicPartition> topicPartitions = IntStream.range(0,
numPartitions)
+ .mapToObj(partition -> new TopicPartition(topic, partition))
+ .collect(Collectors.toList());
+
+ // Ensure that the topic-partition has been deleted from all brokers'
replica managers
+ TestUtils.waitForCondition(() -> brokers.stream().allMatch(broker ->
+ topicPartitions.stream().allMatch(tp ->
broker.replicaManager().onlinePartition(tp).isEmpty())),
+ "Replica manager's should have deleted all of this topic's
partitions");
+
+ // Ensure that logs from all replicas are deleted
+ TestUtils.waitForCondition(() -> brokers.stream().allMatch(broker ->
+ topicPartitions.stream().allMatch(tp ->
broker.logManager().getLog(tp, false).isEmpty())),
+ "Replica logs not deleted after delete topic is complete");
+
+ // Ensure that the topic is removed from all cleaner offsets
+ TestUtils.waitForCondition(() -> brokers.stream().allMatch(broker ->
+ topicPartitions.stream().allMatch(tp -> {
+ List<File> liveLogDirs =
CollectionConverters.asJava(broker.logManager().liveLogDirs());
+ return liveLogDirs.stream().allMatch(logDir -> {
+ OffsetCheckpointFile checkpointFile;
+ try {
+ checkpointFile = new OffsetCheckpointFile(new
File(logDir, "cleaner-offset-checkpoint"), null);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return !checkpointFile.read().containsKey(tp);
+ });
+ })),
+ "Cleaner offset for deleted partition should have been removed");
+
+ // Ensure that the topic directories are soft-deleted
+ TestUtils.waitForCondition(() -> brokers.stream().allMatch(broker ->
+
CollectionConverters.asJava(broker.config().logDirs()).stream().allMatch(logDir
->
+ topicPartitions.stream().noneMatch(tp ->
+ new File(logDir, tp.topic() + "-" +
tp.partition()).exists()))),
+ "Failed to soft-delete the data to a delete directory");
+
+ // Ensure that the topic directories are hard-deleted
+ TestUtils.waitForCondition(() -> brokers.stream().allMatch(broker ->
+
CollectionConverters.asJava(broker.config().logDirs()).stream().allMatch(logDir
->
+ topicPartitions.stream().allMatch(tp ->
+ Arrays.stream(Objects.requireNonNull(new
File(logDir).list())).noneMatch(partitionDirectoryName ->
+ partitionDirectoryName.startsWith(tp.topic() + "-" +
tp.partition()) &&
+
partitionDirectoryName.endsWith(UnifiedLog.DeleteDirSuffix())))
+ )
+ ), "Failed to hard-delete the delete directory");
+ }
+
+ public static int waitUntilLeaderIsElectedOrChangedWithAdmin(Admin admin,
+ String topic,
+ int
partitionNumber,
+ long
timeoutMs,
+
Optional<Integer> oldLeaderOpt,
Review Comment:
`oldLeaderOpt` and `newLeaderOpt` are never assigned
##########
test-common/src/main/java/org/apache/kafka/common/test/TestUtils.java:
##########
@@ -125,4 +155,210 @@ static void retryOnExceptionWithTimeout(final Runnable
runnable) throws Interrup
Thread.sleep(DEFAULT_POLL_INTERVAL_MS);
}
}
+
+ /**
+ * Wrap a single record log buffer.
+ */
+ public static MemoryRecords singletonRecords(byte[] value,
+ byte[] key,
+ Compression codec,
+ long timestamp,
+ byte magicValue) {
+ return records(Collections.singletonList(new SimpleRecord(timestamp,
key, value)), magicValue, codec);
+ }
+
+ public static MemoryRecords singletonRecords(byte[] value, byte[] key) {
+ return singletonRecords(value, key, Compression.NONE,
RecordBatch.NO_TIMESTAMP, RecordBatch.CURRENT_MAGIC_VALUE);
+ }
+
+ public static MemoryRecords records(List<SimpleRecord> records,
+ byte magicValue,
+ Compression codec,
+ long producerId,
+ short producerEpoch,
+ int sequence,
+ long baseOffset,
+ int partitionLeaderEpoch) {
+ int sizeInBytes = DefaultRecordBatch.sizeInBytes(records);
+ ByteBuffer buffer = ByteBuffer.allocate(sizeInBytes);
+
+ try (MemoryRecordsBuilder builder = MemoryRecords.builder(
+ buffer,
+ magicValue,
+ codec,
+ TimestampType.CREATE_TIME,
+ baseOffset,
+ System.currentTimeMillis(),
+ producerId,
+ producerEpoch,
+ sequence,
+ false,
+ partitionLeaderEpoch
+ )) {
+ records.forEach(builder::append);
+ return builder.build();
+ }
+ }
+
+ public static MemoryRecords records(List<SimpleRecord> records, byte
magicValue, Compression codec) {
+ return records(records,
+ magicValue,
+ codec,
+ RecordBatch.NO_PRODUCER_ID,
+ RecordBatch.NO_PRODUCER_EPOCH,
+ RecordBatch.NO_SEQUENCE,
+ 0L,
+ RecordBatch.NO_PARTITION_LEADER_EPOCH);
+ }
+
+ public static <B extends KafkaBroker> void verifyTopicDeletion(String
topic,
Review Comment:
Could you move this method to `ClusterInstance`?
##########
test-common/src/main/java/org/apache/kafka/common/test/TestUtils.java:
##########
@@ -125,4 +155,210 @@ static void retryOnExceptionWithTimeout(final Runnable
runnable) throws Interrup
Thread.sleep(DEFAULT_POLL_INTERVAL_MS);
}
}
+
+ /**
+ * Wrap a single record log buffer.
+ */
+ public static MemoryRecords singletonRecords(byte[] value,
+ byte[] key,
+ Compression codec,
+ long timestamp,
+ byte magicValue) {
+ return records(Collections.singletonList(new SimpleRecord(timestamp,
key, value)), magicValue, codec);
+ }
+
+ public static MemoryRecords singletonRecords(byte[] value, byte[] key) {
+ return singletonRecords(value, key, Compression.NONE,
RecordBatch.NO_TIMESTAMP, RecordBatch.CURRENT_MAGIC_VALUE);
+ }
+
+ public static MemoryRecords records(List<SimpleRecord> records,
+ byte magicValue,
+ Compression codec,
+ long producerId,
+ short producerEpoch,
+ int sequence,
+ long baseOffset,
+ int partitionLeaderEpoch) {
+ int sizeInBytes = DefaultRecordBatch.sizeInBytes(records);
+ ByteBuffer buffer = ByteBuffer.allocate(sizeInBytes);
+
+ try (MemoryRecordsBuilder builder = MemoryRecords.builder(
+ buffer,
+ magicValue,
+ codec,
+ TimestampType.CREATE_TIME,
+ baseOffset,
+ System.currentTimeMillis(),
+ producerId,
+ producerEpoch,
+ sequence,
+ false,
+ partitionLeaderEpoch
+ )) {
+ records.forEach(builder::append);
+ return builder.build();
+ }
+ }
+
+ public static MemoryRecords records(List<SimpleRecord> records, byte
magicValue, Compression codec) {
+ return records(records,
+ magicValue,
+ codec,
+ RecordBatch.NO_PRODUCER_ID,
+ RecordBatch.NO_PRODUCER_EPOCH,
+ RecordBatch.NO_SEQUENCE,
+ 0L,
+ RecordBatch.NO_PARTITION_LEADER_EPOCH);
+ }
+
+ public static <B extends KafkaBroker> void verifyTopicDeletion(String
topic,
+ int
numPartitions,
+
Collection<B> brokers) throws Exception {
+ List<TopicPartition> topicPartitions = IntStream.range(0,
numPartitions)
+ .mapToObj(partition -> new TopicPartition(topic, partition))
+ .collect(Collectors.toList());
+
+ // Ensure that the topic-partition has been deleted from all brokers'
replica managers
+ TestUtils.waitForCondition(() -> brokers.stream().allMatch(broker ->
+ topicPartitions.stream().allMatch(tp ->
broker.replicaManager().onlinePartition(tp).isEmpty())),
+ "Replica manager's should have deleted all of this topic's
partitions");
+
+ // Ensure that logs from all replicas are deleted
+ TestUtils.waitForCondition(() -> brokers.stream().allMatch(broker ->
+ topicPartitions.stream().allMatch(tp ->
broker.logManager().getLog(tp, false).isEmpty())),
+ "Replica logs not deleted after delete topic is complete");
+
+ // Ensure that the topic is removed from all cleaner offsets
+ TestUtils.waitForCondition(() -> brokers.stream().allMatch(broker ->
+ topicPartitions.stream().allMatch(tp -> {
+ List<File> liveLogDirs =
CollectionConverters.asJava(broker.logManager().liveLogDirs());
+ return liveLogDirs.stream().allMatch(logDir -> {
+ OffsetCheckpointFile checkpointFile;
+ try {
+ checkpointFile = new OffsetCheckpointFile(new
File(logDir, "cleaner-offset-checkpoint"), null);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return !checkpointFile.read().containsKey(tp);
+ });
+ })),
+ "Cleaner offset for deleted partition should have been removed");
+
+ // Ensure that the topic directories are soft-deleted
+ TestUtils.waitForCondition(() -> brokers.stream().allMatch(broker ->
+
CollectionConverters.asJava(broker.config().logDirs()).stream().allMatch(logDir
->
+ topicPartitions.stream().noneMatch(tp ->
+ new File(logDir, tp.topic() + "-" +
tp.partition()).exists()))),
+ "Failed to soft-delete the data to a delete directory");
+
+ // Ensure that the topic directories are hard-deleted
+ TestUtils.waitForCondition(() -> brokers.stream().allMatch(broker ->
+
CollectionConverters.asJava(broker.config().logDirs()).stream().allMatch(logDir
->
+ topicPartitions.stream().allMatch(tp ->
+ Arrays.stream(Objects.requireNonNull(new
File(logDir).list())).noneMatch(partitionDirectoryName ->
+ partitionDirectoryName.startsWith(tp.topic() + "-" +
tp.partition()) &&
+
partitionDirectoryName.endsWith(UnifiedLog.DeleteDirSuffix())))
+ )
+ ), "Failed to hard-delete the delete directory");
+ }
+
+ public static int waitUntilLeaderIsElectedOrChangedWithAdmin(Admin admin,
+ String topic,
+ int
partitionNumber,
+ long
timeoutMs,
+
Optional<Integer> oldLeaderOpt,
+
Optional<Integer> newLeaderOpt) throws Exception {
+ if (oldLeaderOpt.isPresent() && newLeaderOpt.isPresent()) {
+ throw new IllegalArgumentException("Can't define both the old and
the new leader");
+ }
+
+ GetPartitionLeader getPartitionLeader = (t, p) -> {
+ TopicDescription topicDescription =
admin.describeTopics(Collections.singletonList(t)).allTopicNames().get().get(t);
+ return topicDescription.partitions().stream()
+ .filter(partitionInfo -> partitionInfo.partition() == p)
+ .findFirst()
+ .map(partitionInfo -> partitionInfo.leader().id() ==
Node.noNode().id() ? null : partitionInfo.leader().id());
+ };
+
+ return doWaitUntilLeaderIsElectedOrChanged(getPartitionLeader, topic,
partitionNumber, timeoutMs, oldLeaderOpt, newLeaderOpt);
+ }
+
+ private static int doWaitUntilLeaderIsElectedOrChanged(GetPartitionLeader
getPartitionLeader,
Review Comment:
Could you please try to simply `waitUntilLeaderIsElectedOrChangedWithAdmin`
and `doWaitUntilLeaderIsElectedOrChanged`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]