junrao commented on code in PR #15516: URL: https://github.com/apache/kafka/pull/15516#discussion_r1600483273
########## clients/src/test/java/org/apache/kafka/common/compress/GzipCompressionTest.java: ########## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.compress; + +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class GzipCompressionTest { + + @Test + public void testCompressionDecompression() throws IOException { + GzipCompression.Builder builder = Compression.gzip(); + byte[] data = "data".getBytes(StandardCharsets.UTF_8); Review Comment: Should we test sth bigger like more than 512 bytes so that it covers the out.flush() logic? ########## clients/src/test/java/org/apache/kafka/common/compress/GzipCompressionTest.java: ########## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.compress; + +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class GzipCompressionTest { + + @Test + public void testCompressionDecompression() throws IOException { + GzipCompression.Builder builder = Compression.gzip(); + byte[] data = "data".getBytes(StandardCharsets.UTF_8); + + for (byte magic : Arrays.asList(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)) { + for (int level : Arrays.asList(GzipCompression.MIN_LEVEL, GzipCompression.DEFAULT_LEVEL, GzipCompression.MAX_LEVEL)) { + GzipCompression compression = builder.level(level).build(); + ByteBufferOutputStream bufferStream = new ByteBufferOutputStream(4); + try (OutputStream out = compression.wrapForOutput(bufferStream, RecordBatch.CURRENT_MAGIC_VALUE)) { + out.write(data); + out.flush(); + } + bufferStream.buffer().flip(); + + try (InputStream inputStream = compression.wrapForInput(bufferStream.buffer(), magic, BufferSupplier.create())) { + byte[] result = new byte[data.length]; + int read = inputStream.read(result); + assertEquals(data.length, read); + assertArrayEquals(data, result); + } + } + } + } + + @Test + public void testCompressionLevels() { + GzipCompression.Builder builder = Compression.gzip(); + + assertThrows(IllegalArgumentException.class, () -> builder.level(GzipCompression.MIN_LEVEL - 1)); + assertThrows(IllegalArgumentException.class, () -> builder.level(GzipCompression.MAX_LEVEL + 1)); + + builder.level(GzipCompression.MIN_LEVEL); + builder.level(GzipCompression.MAX_LEVEL); + } + + @Test + public void testLevelValidator() { + GzipCompression.LevelValidator validator = new GzipCompression.LevelValidator(); + for (int level = GzipCompression.MIN_LEVEL; level <= GzipCompression.MAX_LEVEL; level++) { + validator.ensureValid("", level); + } + validator.ensureValid("", GzipCompression.DEFAULT_LEVEL); + assertThrows(ConfigException.class, () -> validator.ensureValid("", 0)); Review Comment: This is the same as GzipCompression.MIN_LEVEL - 1, right? ########## core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala: ########## @@ -711,6 +710,30 @@ class KafkaConfigTest { assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props)) } + @Test + def testInvalidGzipCompressionLevel(): Unit = { + val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) + props.setProperty(KafkaConfig.CompressionTypeProp, "gzip") + props.setProperty(KafkaConfig.CompressionGzipLevelProp, (GzipCompression.MAX_LEVEL+1).toString) Review Comment: space before and after + ########## core/src/test/scala/unit/kafka/log/LogValidatorTest.scala: ########## @@ -1587,7 +1612,7 @@ class LogValidatorTest { private def createTwoBatchedRecords(magicValue: Byte, timestamp: Long, - codec: CompressionType): MemoryRecords = { + codec: Compression): MemoryRecords = { Review Comment: This is an existing issue, but timestamp seems unused. ########## core/src/main/scala/kafka/log/UnifiedLog.scala: ########## @@ -791,7 +790,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, val offset = PrimitiveRef.ofLong(localLog.logEndOffset) appendInfo.setFirstOffset(offset.value) val validateAndOffsetAssignResult = try { - val targetCompression = BrokerCompressionType.forName(config.compressionType).targetCompressionType(appendInfo.sourceCompression) + val targetCompression = config.compressionType.targetCompression(config.compression, appendInfo.sourceCompression()) Review Comment: `config.compressionType` is not really being used. Could we make `targetCompression` a static method and independent of compressionType? ########## core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala: ########## @@ -917,6 +940,10 @@ class KafkaConfigTest { case MetricConfigs.METRIC_SAMPLE_WINDOW_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "-1", "0") case MetricConfigs.METRIC_REPORTER_CLASSES_CONFIG => // ignore string case MetricConfigs.METRIC_RECORDING_LEVEL_CONFIG => // ignore string + + case KafkaConfig.CompressionGzipLevelProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0") Review Comment: Should we test LZ4 too? ########## clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java: ########## @@ -60,20 +61,20 @@ public class MemoryRecordsBuilderTest { private static class Args { final int bufferOffset; - final CompressionType compressionType; + final Compression compression; final byte magic; - public Args(int bufferOffset, CompressionType compressionType, byte magic) { + public Args(int bufferOffset, Compression compression, byte magic) { this.bufferOffset = bufferOffset; - this.compressionType = compressionType; + this.compression = compression; this.magic = magic; } @Override public String toString() { return "magic=" + magic + ", bufferOffset=" + bufferOffset + - ", compressionType=" + compressionType; + ", compressionType=" + compression; Review Comment: compressionType => compression? ########## clients/src/test/java/org/apache/kafka/common/compress/NoCompressionTest.java: ########## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.compress; + +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class NoCompressionTest { + + @Test + public void testCompressionDecompression() throws IOException { + NoCompression compression = Compression.NONE; + byte[] data = "data".getBytes(StandardCharsets.UTF_8); + + for (byte magic : Arrays.asList(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)) { + ByteBufferOutputStream bufferStream = new ByteBufferOutputStream(4); + try (OutputStream out = compression.wrapForOutput(bufferStream, RecordBatch.CURRENT_MAGIC_VALUE)) { Review Comment: `RecordBatch.CURRENT_MAGIC_VALUE` should be `magic`, right? Ditto in SnappyCompressionTest, GzipCompressionTest and ZstdCompressionTest. ########## core/src/test/scala/unit/kafka/log/LogConfigTest.scala: ########## @@ -95,6 +95,8 @@ class LogConfigTest { case TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG => assertPropertyInvalid(name, "not_a_boolean") case TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG => assertPropertyInvalid(name, "not_a_number", "-3") case TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG => assertPropertyInvalid(name, "not_a_number", "-3") + case TopicConfig.COMPRESSION_GZIP_LEVEL_CONFIG => assertPropertyInvalid(name, "not_a_number", "-2") + case TopicConfig.COMPRESSION_ZSTD_LEVEL_CONFIG => assertPropertyInvalid(name, "not_a_number", "-0.1") Review Comment: Should we test LZ4 too? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org