This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f1bb29b93a2 MINOR: migrate BrokerCompressionTest to storage module
(#19277)
f1bb29b93a2 is described below
commit f1bb29b93a2a0d74cf393b7d6a7fa80b3945c105
Author: TaiJuWu <[email protected]>
AuthorDate: Thu Apr 3 22:43:42 2025 +0800
MINOR: migrate BrokerCompressionTest to storage module (#19277)
There are two change for this PR.
1. Move `BrokerCompressionTest ` from core to storage
2. Rewrite `BrokerCompressionTest ` from scala to java
Reviewers: TengYao Chi <[email protected]>, PoAn Yang
<[email protected]>, Ken Huang <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
build.gradle | 1 +
.../unit/kafka/log/BrokerCompressionTest.scala | 103 -------------------
.../internals/log/BrokerCompressionTest.java | 111 +++++++++++++++++++++
3 files changed, 112 insertions(+), 103 deletions(-)
diff --git a/build.gradle b/build.gradle
index 7027d640029..4753be6ccc6 100644
--- a/build.gradle
+++ b/build.gradle
@@ -2248,6 +2248,7 @@ project(':storage') {
testImplementation project(':server')
testImplementation project(':server-common')
testImplementation project(':server-common').sourceSets.test.output
+ testImplementation project(':transaction-coordinator')
testImplementation libs.hamcrest
testImplementation libs.jacksonDatabindYaml
testImplementation libs.junitJupiter
diff --git a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
deleted file mode 100755
index c74f6f6c1d5..00000000000
--- a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
+++ /dev/null
@@ -1,103 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.log
-
-import kafka.utils._
-import org.apache.kafka.common.compress.Compression
-import org.apache.kafka.common.config.TopicConfig
-import org.apache.kafka.common.record.{CompressionType, MemoryRecords,
RecordBatch, SimpleRecord}
-import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.coordinator.transaction.TransactionLogConfig
-import org.apache.kafka.server.record.BrokerCompressionType
-import org.apache.kafka.server.storage.log.FetchIsolation
-import org.apache.kafka.server.util.MockTime
-import org.apache.kafka.storage.internals.log.{LogConfig,
LogDirFailureChannel, ProducerStateManagerConfig, UnifiedLog}
-import org.apache.kafka.storage.log.metrics.BrokerTopicStats
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api._
-import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.{Arguments, MethodSource}
-
-import java.util.{Optional, Properties}
-
-class BrokerCompressionTest {
-
- val tmpDir = TestUtils.tempDir()
- val logDir = TestUtils.randomPartitionLogDir(tmpDir)
- val time = new MockTime(0, 0)
- val logConfig = new LogConfig(new Properties)
-
- @AfterEach
- def tearDown(): Unit = {
- Utils.delete(tmpDir)
- }
-
- /**
- * Test broker-side compression configuration
- */
- @ParameterizedTest
- @MethodSource(Array("parameters"))
- def testBrokerSideCompression(messageCompressionType: CompressionType,
brokerCompressionType: BrokerCompressionType): Unit = {
- val messageCompression = Compression.of(messageCompressionType).build()
- val logProps = new Properties()
- logProps.put(TopicConfig.COMPRESSION_TYPE_CONFIG,
brokerCompressionType.name)
- /*configure broker-side compression */
- val log = UnifiedLog.create(
- logDir,
- new LogConfig(logProps),
- 0L,
- 0L,
- time.scheduler,
- new BrokerTopicStats,
- time,
- 5 * 60 * 1000,
- new
ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
false),
- TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
- new LogDirFailureChannel(10),
- true,
- Optional.empty)
-
- /* append two messages */
- log.appendAsLeader(MemoryRecords.withRecords(messageCompression, 0,
- new SimpleRecord("hello".getBytes), new
SimpleRecord("there".getBytes)), 0)
-
- def readBatch(offset: Int): RecordBatch = {
- val fetchInfo = log.read(offset, 4096, FetchIsolation.LOG_END, true)
- fetchInfo.records.batches.iterator.next()
- }
-
- if (brokerCompressionType != BrokerCompressionType.PRODUCER) {
- val batches = readBatch(0)
- val targetCompression =
BrokerCompressionType.targetCompression(log.config.compression, null)
- assertEquals(targetCompression.`type`(), batches.compressionType,
"Compression at offset 0 should produce " + brokerCompressionType)
- }
- else
- assertEquals(messageCompressionType, readBatch(0).compressionType,
"Compression at offset 0 should produce " + messageCompressionType)
- }
-
-}
-
-object BrokerCompressionTest {
- def parameters: java.util.stream.Stream[Arguments] = {
- java.util.Arrays.stream(
- for (brokerCompression <- BrokerCompressionType.values;
- messageCompression <- CompressionType.values
- ) yield Arguments.of(messageCompression, brokerCompression)
- )
- }
-}
diff --git
a/storage/src/test/java/org/apache/kafka/storage/internals/log/BrokerCompressionTest.java
b/storage/src/test/java/org/apache/kafka/storage/internals/log/BrokerCompressionTest.java
new file mode 100644
index 00000000000..aa3ec174c19
--- /dev/null
+++
b/storage/src/test/java/org/apache/kafka/storage/internals/log/BrokerCompressionTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
+import org.apache.kafka.server.record.BrokerCompressionType;
+import org.apache.kafka.server.storage.log.FetchIsolation;
+import org.apache.kafka.server.util.MockTime;
+import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class BrokerCompressionTest {
+ private final File tmpDir = TestUtils.tempDirectory();
+ private final File logDir = TestUtils.randomPartitionLogDir(tmpDir);
+ private final MockTime time = new MockTime(0, 0);
+
+ @AfterEach
+ public void tearDown() throws IOException {
+ Utils.delete(tmpDir);
+ }
+
+ /**
+ * Test broker-side compression configuration
+ */
+ @ParameterizedTest
+ @MethodSource("allCompressionParameters")
+ public void testBrokerSideCompression(CompressionType
messageCompressionType, BrokerCompressionType brokerCompressionType) throws
IOException {
+ Compression messageCompression =
Compression.of(messageCompressionType).build();
+
+ /* Configure broker-side compression */
+ try (UnifiedLog log = UnifiedLog.create(
+ logDir,
+ new LogConfig(Map.of(TopicConfig.COMPRESSION_TYPE_CONFIG,
brokerCompressionType.name)),
+ 0L,
+ 0L,
+ time.scheduler,
+ new BrokerTopicStats(),
+ time,
+ 5 * 60 * 1000,
+ new
ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
false),
+
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
+ new LogDirFailureChannel(10),
+ true,
+ Optional.empty()
+ )) {
+ /* Append two messages */
+ log.appendAsLeader(
+ MemoryRecords.withRecords(messageCompression, 0,
+ new SimpleRecord("hello".getBytes()),
+ new SimpleRecord("there".getBytes())
+ ), 0
+ );
+
+ RecordBatch firstBatch = readFirstBatch(log);
+
+ if (brokerCompressionType != BrokerCompressionType.PRODUCER) {
+ Compression targetCompression =
BrokerCompressionType.targetCompression(log.config().compression, null);
+ assertEquals(targetCompression.type(),
firstBatch.compressionType(), "Compression at offset 0 should produce " +
brokerCompressionType);
+ } else {
+ assertEquals(messageCompressionType,
firstBatch.compressionType(), "Compression at offset 0 should produce " +
messageCompressionType);
+ }
+ }
+ }
+
+ private static RecordBatch readFirstBatch(UnifiedLog log) throws
IOException {
+ FetchDataInfo fetchInfo = log.read(0, 4096, FetchIsolation.LOG_END,
true);
+ return fetchInfo.records.batches().iterator().next();
+ }
+
+ private static Stream<Arguments> allCompressionParameters() {
+ return Arrays.stream(BrokerCompressionType.values())
+ .flatMap(brokerCompression ->
Arrays.stream(CompressionType.values())
+ .map(messageCompression -> Arguments.of(messageCompression,
brokerCompression)));
+ }
+}