chia7712 commented on code in PR #17579:
URL: https://github.com/apache/kafka/pull/17579#discussion_r1835089796


##########
test-common/src/main/java/org/apache/kafka/common/test/TestUtils.java:
##########
@@ -125,4 +148,131 @@ static void retryOnExceptionWithTimeout(final Runnable 
runnable) throws Interrup
             Thread.sleep(DEFAULT_POLL_INTERVAL_MS);
         }
     }
+
+    /**
+     * Wrap a single record log buffer.
+     */
+    public static MemoryRecords singletonRecords(byte[] value,
+                                                 byte[] key,
+                                                 Compression codec,
+                                                 long timestamp,
+                                                 byte magicValue) {
+        return records(Collections.singletonList(new SimpleRecord(timestamp, 
key, value)), magicValue, codec);
+    }
+
+    public static MemoryRecords singletonRecords(byte[] value, byte[] key) {
+        return singletonRecords(value, key, Compression.NONE, 
RecordBatch.NO_TIMESTAMP, RecordBatch.CURRENT_MAGIC_VALUE);
+    }
+
+    public static MemoryRecords records(List<SimpleRecord> records,
+                                        byte magicValue,
+                                        Compression codec,
+                                        long producerId,
+                                        short producerEpoch,
+                                        int sequence,
+                                        long baseOffset,
+                                        int partitionLeaderEpoch) {
+        int sizeInBytes = DefaultRecordBatch.sizeInBytes(records);
+        ByteBuffer buffer = ByteBuffer.allocate(sizeInBytes);
+
+        try (MemoryRecordsBuilder builder = MemoryRecords.builder(
+            buffer,
+            magicValue,
+            codec,
+            TimestampType.CREATE_TIME,
+            baseOffset,
+            System.currentTimeMillis(),
+            producerId,
+            producerEpoch,
+            sequence,
+            false,
+            partitionLeaderEpoch
+        )) {
+            records.forEach(builder::append);
+            return builder.build();
+        }
+    }
+
+    public static MemoryRecords records(List<SimpleRecord> records, byte 
magicValue, Compression codec) {
+        return records(records,
+            magicValue,
+            codec,
+            RecordBatch.NO_PRODUCER_ID,
+            RecordBatch.NO_PRODUCER_EPOCH,
+            RecordBatch.NO_SEQUENCE,
+            0L,
+            RecordBatch.NO_PARTITION_LEADER_EPOCH);
+    }
+
+    public static int waitUntilLeaderIsElectedOrChangedWithAdmin(Admin admin,
+                                                                 String topic,
+                                                                 int 
partitionNumber,
+                                                                 long 
timeoutMs) throws Exception {
+        GetPartitionLeader getPartitionLeader = (t, p) -> 
Optional.ofNullable(getLeaderFromAdmin(admin, t, p));
+        return doWaitUntilLeaderIsElectedOrChanged(getPartitionLeader, topic, 
partitionNumber, timeoutMs);
+    }
+
+    private static Integer getLeaderFromAdmin(Admin admin, String topic, int 
partition) throws Exception {
+        TopicDescription topicDescription = 
admin.describeTopics(Collections.singletonList(topic)).allTopicNames().get().get(topic);
+        return topicDescription.partitions().stream()
+            .filter(partitionInfo -> partitionInfo.partition() == partition)
+            .findFirst()
+            .map(partitionInfo -> partitionInfo.leader().id() == 
Node.noNode().id() ? null : partitionInfo.leader().id())
+            .orElse(null);
+    }
+
+    private static int doWaitUntilLeaderIsElectedOrChanged(GetPartitionLeader 
getPartitionLeader,

Review Comment:
   Could you please replace `GetPartitionLeader` by `BiFunction<String, 
Integer, Optional<Integer>>`?



##########
test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java:
##########
@@ -225,4 +235,53 @@ default void waitAcls(AclBindingFilter filter, 
Collection<AccessControlEntry> en
         }
     }
 
+    default void waitForTopicDeletion(String topic, int numPartitions) throws 
Exception {

Review Comment:
   Could you please move all checks to `ClusterInstance#waitForTopic`? those 
checks should be enabled when the input `partitions` is equal to 0



##########
test-common/src/main/java/org/apache/kafka/common/test/TestUtils.java:
##########
@@ -125,4 +148,131 @@ static void retryOnExceptionWithTimeout(final Runnable 
runnable) throws Interrup
             Thread.sleep(DEFAULT_POLL_INTERVAL_MS);
         }
     }
+
+    /**
+     * Wrap a single record log buffer.
+     */
+    public static MemoryRecords singletonRecords(byte[] value,
+                                                 byte[] key,
+                                                 Compression codec,
+                                                 long timestamp,
+                                                 byte magicValue) {
+        return records(Collections.singletonList(new SimpleRecord(timestamp, 
key, value)), magicValue, codec);
+    }
+
+    public static MemoryRecords singletonRecords(byte[] value, byte[] key) {

Review Comment:
   this method is equal to `MemoryRecords.withRecords(Compression.NONE, new 
SimpleRecord(key, value));`, so we don't need to add this helper, right?



##########
test-common/src/main/java/org/apache/kafka/common/test/TestUtils.java:
##########
@@ -125,4 +148,131 @@ static void retryOnExceptionWithTimeout(final Runnable 
runnable) throws Interrup
             Thread.sleep(DEFAULT_POLL_INTERVAL_MS);
         }
     }
+
+    /**
+     * Wrap a single record log buffer.
+     */
+    public static MemoryRecords singletonRecords(byte[] value,
+                                                 byte[] key,
+                                                 Compression codec,
+                                                 long timestamp,
+                                                 byte magicValue) {
+        return records(Collections.singletonList(new SimpleRecord(timestamp, 
key, value)), magicValue, codec);
+    }
+
+    public static MemoryRecords singletonRecords(byte[] value, byte[] key) {
+        return singletonRecords(value, key, Compression.NONE, 
RecordBatch.NO_TIMESTAMP, RecordBatch.CURRENT_MAGIC_VALUE);
+    }
+
+    public static MemoryRecords records(List<SimpleRecord> records,
+                                        byte magicValue,
+                                        Compression codec,
+                                        long producerId,
+                                        short producerEpoch,
+                                        int sequence,
+                                        long baseOffset,
+                                        int partitionLeaderEpoch) {
+        int sizeInBytes = DefaultRecordBatch.sizeInBytes(records);
+        ByteBuffer buffer = ByteBuffer.allocate(sizeInBytes);
+
+        try (MemoryRecordsBuilder builder = MemoryRecords.builder(
+            buffer,
+            magicValue,
+            codec,
+            TimestampType.CREATE_TIME,
+            baseOffset,
+            System.currentTimeMillis(),
+            producerId,
+            producerEpoch,
+            sequence,
+            false,
+            partitionLeaderEpoch
+        )) {
+            records.forEach(builder::append);
+            return builder.build();
+        }
+    }
+
+    public static MemoryRecords records(List<SimpleRecord> records, byte 
magicValue, Compression codec) {

Review Comment:
   ditto



##########
test-common/src/main/java/org/apache/kafka/common/test/TestUtils.java:
##########
@@ -125,4 +148,131 @@ static void retryOnExceptionWithTimeout(final Runnable 
runnable) throws Interrup
             Thread.sleep(DEFAULT_POLL_INTERVAL_MS);
         }
     }
+
+    /**
+     * Wrap a single record log buffer.
+     */
+    public static MemoryRecords singletonRecords(byte[] value,
+                                                 byte[] key,
+                                                 Compression codec,
+                                                 long timestamp,
+                                                 byte magicValue) {
+        return records(Collections.singletonList(new SimpleRecord(timestamp, 
key, value)), magicValue, codec);
+    }
+
+    public static MemoryRecords singletonRecords(byte[] value, byte[] key) {
+        return singletonRecords(value, key, Compression.NONE, 
RecordBatch.NO_TIMESTAMP, RecordBatch.CURRENT_MAGIC_VALUE);
+    }
+
+    public static MemoryRecords records(List<SimpleRecord> records,
+                                        byte magicValue,
+                                        Compression codec,
+                                        long producerId,
+                                        short producerEpoch,
+                                        int sequence,
+                                        long baseOffset,
+                                        int partitionLeaderEpoch) {
+        int sizeInBytes = DefaultRecordBatch.sizeInBytes(records);
+        ByteBuffer buffer = ByteBuffer.allocate(sizeInBytes);
+
+        try (MemoryRecordsBuilder builder = MemoryRecords.builder(
+            buffer,
+            magicValue,
+            codec,
+            TimestampType.CREATE_TIME,
+            baseOffset,
+            System.currentTimeMillis(),
+            producerId,
+            producerEpoch,
+            sequence,
+            false,
+            partitionLeaderEpoch
+        )) {
+            records.forEach(builder::append);
+            return builder.build();
+        }
+    }
+
+    public static MemoryRecords records(List<SimpleRecord> records, byte 
magicValue, Compression codec) {
+        return records(records,
+            magicValue,
+            codec,
+            RecordBatch.NO_PRODUCER_ID,
+            RecordBatch.NO_PRODUCER_EPOCH,
+            RecordBatch.NO_SEQUENCE,
+            0L,
+            RecordBatch.NO_PARTITION_LEADER_EPOCH);
+    }
+
+    public static int waitUntilLeaderIsElectedOrChangedWithAdmin(Admin admin,
+                                                                 String topic,
+                                                                 int 
partitionNumber,
+                                                                 long 
timeoutMs) throws Exception {
+        GetPartitionLeader getPartitionLeader = (t, p) -> 
Optional.ofNullable(getLeaderFromAdmin(admin, t, p));
+        return doWaitUntilLeaderIsElectedOrChanged(getPartitionLeader, topic, 
partitionNumber, timeoutMs);
+    }
+
+    private static Integer getLeaderFromAdmin(Admin admin, String topic, int 
partition) throws Exception {
+        TopicDescription topicDescription = 
admin.describeTopics(Collections.singletonList(topic)).allTopicNames().get().get(topic);
+        return topicDescription.partitions().stream()
+            .filter(partitionInfo -> partitionInfo.partition() == partition)
+            .findFirst()
+            .map(partitionInfo -> partitionInfo.leader().id() == 
Node.noNode().id() ? null : partitionInfo.leader().id())
+            .orElse(null);
+    }
+
+    private static int doWaitUntilLeaderIsElectedOrChanged(GetPartitionLeader 
getPartitionLeader,
+                                                           String topic,
+                                                           int partition,
+                                                           long timeoutMs) 
throws Exception {
+        long startTime = System.currentTimeMillis();
+        TopicPartition topicPartition = new TopicPartition(topic, partition);
+        Optional<Integer> electedLeader = Optional.empty();
+
+        while (electedLeader.isEmpty() && System.currentTimeMillis() < 
startTime + timeoutMs) {
+            Optional<Integer> leader = 
getPartitionLeader.getPartitionLeader(topic, partition);
+            if (leader.isPresent()) {
+                log.trace("Leader {} is elected for partition {}", 
leader.get(), topicPartition);
+                electedLeader = leader;
+            } else {
+                log.trace("Leader for partition {} is not elected yet", 
topicPartition);
+            }
+            Thread.sleep(Math.min(timeoutMs, 100L));
+        }
+
+        Optional<Integer> finalLeader = electedLeader;
+        return electedLeader.orElseThrow(() -> new AssertionError("Timing out 
after " + timeoutMs
+            + " ms since a leader was not elected for partition " + 
topicPartition + ", leader is " + finalLeader));
+    }
+
+    public static <B extends KafkaBroker> Map<TopicPartition, 
UpdateMetadataPartitionState> waitForAllPartitionsMetadata(

Review Comment:
   Can this be replaced by `ClusterInstance#waitForTopic`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to