This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c125cc7dd1c KAFKA-19036 Rewrite LogAppendTimeTest and move it to 
storage module (#19282)
c125cc7dd1c is described below

commit c125cc7dd1c4aa7ba739bb470392d774086cd150
Author: PoAn Yang <[email protected]>
AuthorDate: Sat Mar 29 03:14:53 2025 +0800

    KAFKA-19036 Rewrite LogAppendTimeTest and move it to storage module (#19282)
    
    Use Java to rewrite `LogAppendTimeTest` by new test infra and move it to
    storage module.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../integration/kafka/api/LogAppendTimeTest.scala  |  80 -------------
 .../apache/kafka/server/log/LogAppendTimeTest.java | 132 +++++++++++++++++++++
 2 files changed, 132 insertions(+), 80 deletions(-)

diff --git a/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala 
b/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala
deleted file mode 100644
index f3741d2d77c..00000000000
--- a/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.api
-
-import java.util.Collections
-import java.util.concurrent.TimeUnit
-import kafka.utils.{TestInfoUtils, TestUtils}
-import org.apache.kafka.clients.producer.ProducerRecord
-import org.apache.kafka.common.record.TimestampType
-import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
-import org.apache.kafka.server.config.ServerLogConfigs
-import org.junit.jupiter.api.{BeforeEach, TestInfo}
-import org.junit.jupiter.api.Assertions.{assertEquals, assertNotEquals, 
assertTrue}
-import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.MethodSource
-
-/**
-  * Tests where the broker is configured to use LogAppendTime. For tests where 
LogAppendTime is configured via topic
-  * level configs, see the *ProducerSendTest classes.
-  */
-class LogAppendTimeTest extends IntegrationTestHarness {
-  val producerCount: Int = 1
-  val consumerCount: Int = 1
-  val brokerCount: Int = 2
-
-  // This will be used for the offsets topic as well
-  serverConfig.put(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_TYPE_CONFIG, 
TimestampType.LOG_APPEND_TIME.name)
-  
serverConfig.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG,
 "2")
-
-  private val topic = "topic"
-
-  @BeforeEach
-  override def setUp(testInfo: TestInfo): Unit = {
-    super.setUp(testInfo)
-    createTopic(topic)
-  }
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
-  def testProduceConsume(quorum: String, groupProtocol: String): Unit = {
-    val producer = createProducer()
-    val now = System.currentTimeMillis()
-    val createTime = now - TimeUnit.DAYS.toMillis(1)
-    val producerRecords = (1 to 10).map(i => new ProducerRecord(topic, null, 
createTime, s"key$i".getBytes,
-      s"value$i".getBytes))
-    val recordMetadatas = producerRecords.map(producer.send).map(_.get(10, 
TimeUnit.SECONDS))
-    recordMetadatas.foreach { recordMetadata =>
-      assertTrue(recordMetadata.timestamp >= now)
-      assertTrue(recordMetadata.timestamp < now + 
TimeUnit.SECONDS.toMillis(60))
-    }
-
-    val consumer = createConsumer()
-    consumer.subscribe(Collections.singleton(topic))
-    val consumerRecords = TestUtils.consumeRecords(consumer, 
producerRecords.size)
-
-    consumerRecords.zipWithIndex.foreach { case (consumerRecord, index) =>
-      val producerRecord = producerRecords(index)
-      val recordMetadata = recordMetadatas(index)
-      assertEquals(new String(producerRecord.key), new 
String(consumerRecord.key))
-      assertEquals(new String(producerRecord.value), new 
String(consumerRecord.value))
-      assertNotEquals(producerRecord.timestamp, consumerRecord.timestamp)
-      assertEquals(recordMetadata.timestamp, consumerRecord.timestamp)
-      assertEquals(TimestampType.LOG_APPEND_TIME, consumerRecord.timestampType)
-    }
-  }
-}
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/LogAppendTimeTest.java 
b/storage/src/test/java/org/apache/kafka/server/log/LogAppendTimeTest.java
new file mode 100644
index 00000000000..8df13a3ea35
--- /dev/null
+++ b/storage/src/test/java/org/apache/kafka/server/log/LogAppendTimeTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.TestUtils;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.Type;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class LogAppendTimeTest {
+    private static final String TOPIC = "log-append-time-topic";
+    private static final int NUM_PARTITION = 1;
+    private static final short NUM_REPLICAS = 1;
+
+    @ClusterTest(
+        types = {Type.KRAFT},
+        serverProperties = {
+            @ClusterConfigProperty(key = "log.message.timestamp.type", value = 
"LogAppendTime"),
+            @ClusterConfigProperty(key = "offsets.topic.replication.factor", 
value = "1"),
+        }
+    )
+    public void testProduceConsumeWithConfigOnBroker(ClusterInstance 
clusterInstance) throws InterruptedException {
+        clusterInstance.createTopic(TOPIC, NUM_PARTITION, NUM_REPLICAS);
+
+        testProduceConsume(clusterInstance);
+    }
+
+    @ClusterTest(
+        types = {Type.KRAFT},
+        serverProperties = {
+            @ClusterConfigProperty(key = "offsets.topic.replication.factor", 
value = "1"),
+        }
+    )
+    public void testProduceConsumeWithConfigOnTopic(ClusterInstance 
clusterInstance) throws InterruptedException {
+        try (Admin admin = clusterInstance.admin()) {
+            admin.createTopics(List.of(
+                new NewTopic(TOPIC, NUM_PARTITION, NUM_REPLICAS).
+                    configs(Map.of(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, 
"LogAppendTime"))));
+            clusterInstance.waitForTopic(TOPIC, NUM_PARTITION);
+        }
+
+        testProduceConsume(clusterInstance);
+    }
+
+    public void testProduceConsume(ClusterInstance clusterInstance) throws 
InterruptedException {
+        long now = System.currentTimeMillis();
+        long createTime = now - TimeUnit.DAYS.toMillis(1);
+        int recordCount = 10;
+        List<ProducerRecord<byte[], byte[]>> producerRecords = 
IntStream.range(0, recordCount)
+            .mapToObj(i -> new ProducerRecord<>(TOPIC, null, createTime, 
"key".getBytes(), "value".getBytes()))
+            .toList();
+
+        List<RecordMetadata> recordMetadatas = new ArrayList<>();
+        try (Producer<byte[], byte[]> producer = clusterInstance.producer()) {
+            producerRecords.stream()
+                .map(record ->
+                    assertDoesNotThrow(() -> producer.send(record).get(10, 
TimeUnit.SECONDS)))
+                .forEach(recordMetadatas::add);
+        }
+        assertEquals(recordCount, recordMetadatas.size());
+        recordMetadatas.forEach(recordMetadata -> {
+            assertTrue(recordMetadata.timestamp() >= now);
+            assertTrue(recordMetadata.timestamp() < now + 
TimeUnit.SECONDS.toMillis(60));
+        });
+
+        for (GroupProtocol groupProtocol : 
clusterInstance.supportedGroupProtocols()) {
+            try (Consumer<byte[], byte[]> consumer = clusterInstance.consumer(
+                Map.of(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
groupProtocol.name())
+            )) {
+                consumer.subscribe(Collections.singleton(TOPIC));
+                ArrayList<ConsumerRecord<byte[], byte[]>> consumerRecords = 
new ArrayList<>();
+                TestUtils.waitForCondition(() -> {
+                    ConsumerRecords<byte[], byte[]> records = 
consumer.poll(Duration.ofMillis(100));
+                    records.forEach(consumerRecords::add);
+                    return consumerRecords.size() == producerRecords.size();
+                }, "Consumer with protocol " + groupProtocol.name + " cannot 
consume all records");
+
+                consumerRecords.forEach(consumerRecord -> {
+                    int index = consumerRecords.indexOf(consumerRecord);
+                    ProducerRecord<byte[], byte[]> producerRecord = 
producerRecords.get(index);
+                    RecordMetadata recordMetadata = recordMetadatas.get(index);
+                    assertArrayEquals(producerRecord.key(), 
consumerRecord.key(), "Key mismatch for consumer with protocol " + 
groupProtocol.name);
+                    assertArrayEquals(producerRecord.value(), 
consumerRecord.value(), "Key mismatch for consumer with protocol " + 
groupProtocol.name);
+                    assertNotEquals(producerRecord.timestamp(), 
consumerRecord.timestamp(), "Timestamp mismatch with producer record for 
consumer with protocol " + groupProtocol.name);
+                    assertEquals(recordMetadata.timestamp(), 
consumerRecord.timestamp(), "Timestamp mismatch with record metadata for 
consumer with protocol " + groupProtocol.name);
+                    assertEquals(TimestampType.LOG_APPEND_TIME, 
consumerRecord.timestampType(), "Timestamp type mismatch for consumer with 
protocol " + groupProtocol.name);
+                });
+            }
+        }
+    }
+}

Reply via email to