This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c125cc7dd1c KAFKA-19036 Rewrite LogAppendTimeTest and move it to
storage module (#19282)
c125cc7dd1c is described below
commit c125cc7dd1c4aa7ba739bb470392d774086cd150
Author: PoAn Yang <[email protected]>
AuthorDate: Sat Mar 29 03:14:53 2025 +0800
KAFKA-19036 Rewrite LogAppendTimeTest and move it to storage module (#19282)
Use Java to rewrite `LogAppendTimeTest` by new test infra and move it to
storage module.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../integration/kafka/api/LogAppendTimeTest.scala | 80 -------------
.../apache/kafka/server/log/LogAppendTimeTest.java | 132 +++++++++++++++++++++
2 files changed, 132 insertions(+), 80 deletions(-)
diff --git a/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala
b/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala
deleted file mode 100644
index f3741d2d77c..00000000000
--- a/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.api
-
-import java.util.Collections
-import java.util.concurrent.TimeUnit
-import kafka.utils.{TestInfoUtils, TestUtils}
-import org.apache.kafka.clients.producer.ProducerRecord
-import org.apache.kafka.common.record.TimestampType
-import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
-import org.apache.kafka.server.config.ServerLogConfigs
-import org.junit.jupiter.api.{BeforeEach, TestInfo}
-import org.junit.jupiter.api.Assertions.{assertEquals, assertNotEquals,
assertTrue}
-import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.MethodSource
-
-/**
- * Tests where the broker is configured to use LogAppendTime. For tests where
LogAppendTime is configured via topic
- * level configs, see the *ProducerSendTest classes.
- */
-class LogAppendTimeTest extends IntegrationTestHarness {
- val producerCount: Int = 1
- val consumerCount: Int = 1
- val brokerCount: Int = 2
-
- // This will be used for the offsets topic as well
- serverConfig.put(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_TYPE_CONFIG,
TimestampType.LOG_APPEND_TIME.name)
-
serverConfig.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG,
"2")
-
- private val topic = "topic"
-
- @BeforeEach
- override def setUp(testInfo: TestInfo): Unit = {
- super.setUp(testInfo)
- createTopic(topic)
- }
-
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
- @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
- def testProduceConsume(quorum: String, groupProtocol: String): Unit = {
- val producer = createProducer()
- val now = System.currentTimeMillis()
- val createTime = now - TimeUnit.DAYS.toMillis(1)
- val producerRecords = (1 to 10).map(i => new ProducerRecord(topic, null,
createTime, s"key$i".getBytes,
- s"value$i".getBytes))
- val recordMetadatas = producerRecords.map(producer.send).map(_.get(10,
TimeUnit.SECONDS))
- recordMetadatas.foreach { recordMetadata =>
- assertTrue(recordMetadata.timestamp >= now)
- assertTrue(recordMetadata.timestamp < now +
TimeUnit.SECONDS.toMillis(60))
- }
-
- val consumer = createConsumer()
- consumer.subscribe(Collections.singleton(topic))
- val consumerRecords = TestUtils.consumeRecords(consumer,
producerRecords.size)
-
- consumerRecords.zipWithIndex.foreach { case (consumerRecord, index) =>
- val producerRecord = producerRecords(index)
- val recordMetadata = recordMetadatas(index)
- assertEquals(new String(producerRecord.key), new
String(consumerRecord.key))
- assertEquals(new String(producerRecord.value), new
String(consumerRecord.value))
- assertNotEquals(producerRecord.timestamp, consumerRecord.timestamp)
- assertEquals(recordMetadata.timestamp, consumerRecord.timestamp)
- assertEquals(TimestampType.LOG_APPEND_TIME, consumerRecord.timestampType)
- }
- }
-}
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/LogAppendTimeTest.java
b/storage/src/test/java/org/apache/kafka/server/log/LogAppendTimeTest.java
new file mode 100644
index 00000000000..8df13a3ea35
--- /dev/null
+++ b/storage/src/test/java/org/apache/kafka/server/log/LogAppendTimeTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.TestUtils;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.Type;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class LogAppendTimeTest {
+ private static final String TOPIC = "log-append-time-topic";
+ private static final int NUM_PARTITION = 1;
+ private static final short NUM_REPLICAS = 1;
+
+ @ClusterTest(
+ types = {Type.KRAFT},
+ serverProperties = {
+ @ClusterConfigProperty(key = "log.message.timestamp.type", value =
"LogAppendTime"),
+ @ClusterConfigProperty(key = "offsets.topic.replication.factor",
value = "1"),
+ }
+ )
+ public void testProduceConsumeWithConfigOnBroker(ClusterInstance
clusterInstance) throws InterruptedException {
+ clusterInstance.createTopic(TOPIC, NUM_PARTITION, NUM_REPLICAS);
+
+ testProduceConsume(clusterInstance);
+ }
+
+ @ClusterTest(
+ types = {Type.KRAFT},
+ serverProperties = {
+ @ClusterConfigProperty(key = "offsets.topic.replication.factor",
value = "1"),
+ }
+ )
+ public void testProduceConsumeWithConfigOnTopic(ClusterInstance
clusterInstance) throws InterruptedException {
+ try (Admin admin = clusterInstance.admin()) {
+ admin.createTopics(List.of(
+ new NewTopic(TOPIC, NUM_PARTITION, NUM_REPLICAS).
+ configs(Map.of(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG,
"LogAppendTime"))));
+ clusterInstance.waitForTopic(TOPIC, NUM_PARTITION);
+ }
+
+ testProduceConsume(clusterInstance);
+ }
+
+ public void testProduceConsume(ClusterInstance clusterInstance) throws
InterruptedException {
+ long now = System.currentTimeMillis();
+ long createTime = now - TimeUnit.DAYS.toMillis(1);
+ int recordCount = 10;
+ List<ProducerRecord<byte[], byte[]>> producerRecords =
IntStream.range(0, recordCount)
+ .mapToObj(i -> new ProducerRecord<>(TOPIC, null, createTime,
"key".getBytes(), "value".getBytes()))
+ .toList();
+
+ List<RecordMetadata> recordMetadatas = new ArrayList<>();
+ try (Producer<byte[], byte[]> producer = clusterInstance.producer()) {
+ producerRecords.stream()
+ .map(record ->
+ assertDoesNotThrow(() -> producer.send(record).get(10,
TimeUnit.SECONDS)))
+ .forEach(recordMetadatas::add);
+ }
+ assertEquals(recordCount, recordMetadatas.size());
+ recordMetadatas.forEach(recordMetadata -> {
+ assertTrue(recordMetadata.timestamp() >= now);
+ assertTrue(recordMetadata.timestamp() < now +
TimeUnit.SECONDS.toMillis(60));
+ });
+
+ for (GroupProtocol groupProtocol :
clusterInstance.supportedGroupProtocols()) {
+ try (Consumer<byte[], byte[]> consumer = clusterInstance.consumer(
+ Map.of(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
groupProtocol.name())
+ )) {
+ consumer.subscribe(Collections.singleton(TOPIC));
+ ArrayList<ConsumerRecord<byte[], byte[]>> consumerRecords =
new ArrayList<>();
+ TestUtils.waitForCondition(() -> {
+ ConsumerRecords<byte[], byte[]> records =
consumer.poll(Duration.ofMillis(100));
+ records.forEach(consumerRecords::add);
+ return consumerRecords.size() == producerRecords.size();
+ }, "Consumer with protocol " + groupProtocol.name + " cannot
consume all records");
+
+ consumerRecords.forEach(consumerRecord -> {
+ int index = consumerRecords.indexOf(consumerRecord);
+ ProducerRecord<byte[], byte[]> producerRecord =
producerRecords.get(index);
+ RecordMetadata recordMetadata = recordMetadatas.get(index);
+ assertArrayEquals(producerRecord.key(),
consumerRecord.key(), "Key mismatch for consumer with protocol " +
groupProtocol.name);
+ assertArrayEquals(producerRecord.value(),
consumerRecord.value(), "Key mismatch for consumer with protocol " +
groupProtocol.name);
+ assertNotEquals(producerRecord.timestamp(),
consumerRecord.timestamp(), "Timestamp mismatch with producer record for
consumer with protocol " + groupProtocol.name);
+ assertEquals(recordMetadata.timestamp(),
consumerRecord.timestamp(), "Timestamp mismatch with record metadata for
consumer with protocol " + groupProtocol.name);
+ assertEquals(TimestampType.LOG_APPEND_TIME,
consumerRecord.timestampType(), "Timestamp type mismatch for consumer with
protocol " + groupProtocol.name);
+ });
+ }
+ }
+ }
+}