This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f1bb29b93a2 MINOR: migrate BrokerCompressionTest to storage module 
(#19277)
f1bb29b93a2 is described below

commit f1bb29b93a2a0d74cf393b7d6a7fa80b3945c105
Author: TaiJuWu <[email protected]>
AuthorDate: Thu Apr 3 22:43:42 2025 +0800

    MINOR: migrate BrokerCompressionTest to storage module (#19277)
    
    There are two change for this PR.
    
    1. Move `BrokerCompressionTest ` from core to storage
    2. Rewrite `BrokerCompressionTest ` from scala to java
    
    Reviewers: TengYao Chi <[email protected]>, PoAn Yang
    <[email protected]>, Ken Huang <[email protected]>, Chia-Ping Tsai
    <[email protected]>
---
 build.gradle                                       |   1 +
 .../unit/kafka/log/BrokerCompressionTest.scala     | 103 -------------------
 .../internals/log/BrokerCompressionTest.java       | 111 +++++++++++++++++++++
 3 files changed, 112 insertions(+), 103 deletions(-)

diff --git a/build.gradle b/build.gradle
index 7027d640029..4753be6ccc6 100644
--- a/build.gradle
+++ b/build.gradle
@@ -2248,6 +2248,7 @@ project(':storage') {
     testImplementation project(':server')
     testImplementation project(':server-common')
     testImplementation project(':server-common').sourceSets.test.output
+    testImplementation project(':transaction-coordinator')
     testImplementation libs.hamcrest
     testImplementation libs.jacksonDatabindYaml
     testImplementation libs.junitJupiter
diff --git a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala 
b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
deleted file mode 100755
index c74f6f6c1d5..00000000000
--- a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
+++ /dev/null
@@ -1,103 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.log
-
-import kafka.utils._
-import org.apache.kafka.common.compress.Compression
-import org.apache.kafka.common.config.TopicConfig
-import org.apache.kafka.common.record.{CompressionType, MemoryRecords, 
RecordBatch, SimpleRecord}
-import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.coordinator.transaction.TransactionLogConfig
-import org.apache.kafka.server.record.BrokerCompressionType
-import org.apache.kafka.server.storage.log.FetchIsolation
-import org.apache.kafka.server.util.MockTime
-import org.apache.kafka.storage.internals.log.{LogConfig, 
LogDirFailureChannel, ProducerStateManagerConfig, UnifiedLog}
-import org.apache.kafka.storage.log.metrics.BrokerTopicStats
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api._
-import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.{Arguments, MethodSource}
-
-import java.util.{Optional, Properties}
-
-class BrokerCompressionTest {
-
-  val tmpDir = TestUtils.tempDir()
-  val logDir = TestUtils.randomPartitionLogDir(tmpDir)
-  val time = new MockTime(0, 0)
-  val logConfig = new LogConfig(new Properties)
-
-  @AfterEach
-  def tearDown(): Unit = {
-    Utils.delete(tmpDir)
-  }
-
-  /**
-   * Test broker-side compression configuration
-   */
-  @ParameterizedTest
-  @MethodSource(Array("parameters"))
-  def testBrokerSideCompression(messageCompressionType: CompressionType, 
brokerCompressionType: BrokerCompressionType): Unit = {
-    val messageCompression = Compression.of(messageCompressionType).build()
-    val logProps = new Properties()
-    logProps.put(TopicConfig.COMPRESSION_TYPE_CONFIG, 
brokerCompressionType.name)
-    /*configure broker-side compression  */
-    val log = UnifiedLog.create(
-      logDir,
-      new LogConfig(logProps),
-      0L,
-      0L,
-      time.scheduler,
-      new BrokerTopicStats,
-      time,
-      5 * 60 * 1000,
-      new 
ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
 false),
-      TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
-      new LogDirFailureChannel(10),
-      true,
-      Optional.empty)
-
-    /* append two messages */
-    log.appendAsLeader(MemoryRecords.withRecords(messageCompression, 0,
-          new SimpleRecord("hello".getBytes), new 
SimpleRecord("there".getBytes)), 0)
-
-    def readBatch(offset: Int): RecordBatch = {
-      val fetchInfo = log.read(offset, 4096, FetchIsolation.LOG_END, true)
-      fetchInfo.records.batches.iterator.next()
-    }
-
-    if (brokerCompressionType != BrokerCompressionType.PRODUCER) {
-      val batches = readBatch(0)
-      val targetCompression = 
BrokerCompressionType.targetCompression(log.config.compression, null)
-      assertEquals(targetCompression.`type`(), batches.compressionType, 
"Compression at offset 0 should produce " + brokerCompressionType)
-    }
-    else
-      assertEquals(messageCompressionType, readBatch(0).compressionType, 
"Compression at offset 0 should produce " + messageCompressionType)
-  }
-
-}
-
-object BrokerCompressionTest {
-  def parameters: java.util.stream.Stream[Arguments] = {
-    java.util.Arrays.stream(
-      for (brokerCompression <- BrokerCompressionType.values;
-           messageCompression <- CompressionType.values
-      ) yield Arguments.of(messageCompression, brokerCompression)
-    )
-  }
-}
diff --git 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/BrokerCompressionTest.java
 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/BrokerCompressionTest.java
new file mode 100644
index 00000000000..aa3ec174c19
--- /dev/null
+++ 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/BrokerCompressionTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
+import org.apache.kafka.server.record.BrokerCompressionType;
+import org.apache.kafka.server.storage.log.FetchIsolation;
+import org.apache.kafka.server.util.MockTime;
+import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class BrokerCompressionTest {
+    private final File tmpDir = TestUtils.tempDirectory();
+    private final File logDir = TestUtils.randomPartitionLogDir(tmpDir);
+    private final MockTime time = new MockTime(0, 0);
+
+    @AfterEach
+    public void tearDown() throws IOException {
+        Utils.delete(tmpDir);
+    }
+
+    /**
+     * Test broker-side compression configuration
+     */
+    @ParameterizedTest
+    @MethodSource("allCompressionParameters")
+    public void testBrokerSideCompression(CompressionType 
messageCompressionType, BrokerCompressionType brokerCompressionType) throws 
IOException {
+        Compression messageCompression = 
Compression.of(messageCompressionType).build();
+
+        /* Configure broker-side compression */
+        try (UnifiedLog log = UnifiedLog.create(
+            logDir,
+            new LogConfig(Map.of(TopicConfig.COMPRESSION_TYPE_CONFIG, 
brokerCompressionType.name)),
+            0L,
+            0L,
+            time.scheduler,
+            new BrokerTopicStats(),
+            time,
+            5 * 60 * 1000,
+            new 
ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
 false),
+            
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
+            new LogDirFailureChannel(10),
+            true,
+            Optional.empty()
+        )) {
+            /* Append two messages */
+            log.appendAsLeader(
+                    MemoryRecords.withRecords(messageCompression, 0,
+                            new SimpleRecord("hello".getBytes()),
+                            new SimpleRecord("there".getBytes())
+                    ), 0
+            );
+
+            RecordBatch firstBatch = readFirstBatch(log);
+
+            if (brokerCompressionType != BrokerCompressionType.PRODUCER) {
+                Compression targetCompression = 
BrokerCompressionType.targetCompression(log.config().compression, null);
+                assertEquals(targetCompression.type(), 
firstBatch.compressionType(), "Compression at offset 0 should produce " + 
brokerCompressionType);
+            } else {
+                assertEquals(messageCompressionType, 
firstBatch.compressionType(), "Compression at offset 0 should produce " + 
messageCompressionType);
+            }
+        }
+    }
+
+    private static RecordBatch readFirstBatch(UnifiedLog log) throws 
IOException {
+        FetchDataInfo fetchInfo = log.read(0, 4096, FetchIsolation.LOG_END, 
true);
+        return fetchInfo.records.batches().iterator().next();
+    }
+
+    private static Stream<Arguments> allCompressionParameters() {
+        return Arrays.stream(BrokerCompressionType.values())
+                .flatMap(brokerCompression -> 
Arrays.stream(CompressionType.values())
+                .map(messageCompression -> Arguments.of(messageCompression, 
brokerCompression)));
+    }
+}

Reply via email to