junrao commented on code in PR #15516:
URL: https://github.com/apache/kafka/pull/15516#discussion_r1600483273


##########
clients/src/test/java/org/apache/kafka/common/compress/GzipCompressionTest.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.compress;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class GzipCompressionTest {
+
+    @Test
+    public void testCompressionDecompression() throws IOException {
+        GzipCompression.Builder builder = Compression.gzip();
+        byte[] data = "data".getBytes(StandardCharsets.UTF_8);

Review Comment:
   Should we test sth bigger like more than 512 bytes so that it covers the 
out.flush() logic?



##########
clients/src/test/java/org/apache/kafka/common/compress/GzipCompressionTest.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.compress;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class GzipCompressionTest {
+
+    @Test
+    public void testCompressionDecompression() throws IOException {
+        GzipCompression.Builder builder = Compression.gzip();
+        byte[] data = "data".getBytes(StandardCharsets.UTF_8);
+
+        for (byte magic : Arrays.asList(RecordBatch.MAGIC_VALUE_V0, 
RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)) {
+            for (int level : Arrays.asList(GzipCompression.MIN_LEVEL, 
GzipCompression.DEFAULT_LEVEL, GzipCompression.MAX_LEVEL)) {
+                GzipCompression compression = builder.level(level).build();
+                ByteBufferOutputStream bufferStream = new 
ByteBufferOutputStream(4);
+                try (OutputStream out = 
compression.wrapForOutput(bufferStream, RecordBatch.CURRENT_MAGIC_VALUE)) {
+                    out.write(data);
+                    out.flush();
+                }
+                bufferStream.buffer().flip();
+
+                try (InputStream inputStream = 
compression.wrapForInput(bufferStream.buffer(), magic, 
BufferSupplier.create())) {
+                    byte[] result = new byte[data.length];
+                    int read = inputStream.read(result);
+                    assertEquals(data.length, read);
+                    assertArrayEquals(data, result);
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testCompressionLevels() {
+        GzipCompression.Builder builder = Compression.gzip();
+
+        assertThrows(IllegalArgumentException.class, () -> 
builder.level(GzipCompression.MIN_LEVEL - 1));
+        assertThrows(IllegalArgumentException.class, () -> 
builder.level(GzipCompression.MAX_LEVEL + 1));
+
+        builder.level(GzipCompression.MIN_LEVEL);
+        builder.level(GzipCompression.MAX_LEVEL);
+    }
+
+    @Test
+    public void testLevelValidator() {
+        GzipCompression.LevelValidator validator = new 
GzipCompression.LevelValidator();
+        for (int level = GzipCompression.MIN_LEVEL; level <= 
GzipCompression.MAX_LEVEL; level++) {
+            validator.ensureValid("", level);
+        }
+        validator.ensureValid("", GzipCompression.DEFAULT_LEVEL);
+        assertThrows(ConfigException.class, () -> validator.ensureValid("", 
0));

Review Comment:
   This is the same as GzipCompression.MIN_LEVEL - 1, right?



##########
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala:
##########
@@ -711,6 +710,30 @@ class KafkaConfigTest {
     assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
   }
 
+  @Test
+  def testInvalidGzipCompressionLevel(): Unit = {
+    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 8181)
+    props.setProperty(KafkaConfig.CompressionTypeProp, "gzip")
+    props.setProperty(KafkaConfig.CompressionGzipLevelProp, 
(GzipCompression.MAX_LEVEL+1).toString)

Review Comment:
   space before and after +



##########
core/src/test/scala/unit/kafka/log/LogValidatorTest.scala:
##########
@@ -1587,7 +1612,7 @@ class LogValidatorTest {
 
   private def createTwoBatchedRecords(magicValue: Byte,
                                       timestamp: Long,
-                                      codec: CompressionType): MemoryRecords = 
{
+                                      codec: Compression): MemoryRecords = {

Review Comment:
   This is an existing issue, but timestamp seems unused.



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -791,7 +790,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
             val offset = PrimitiveRef.ofLong(localLog.logEndOffset)
             appendInfo.setFirstOffset(offset.value)
             val validateAndOffsetAssignResult = try {
-              val targetCompression = 
BrokerCompressionType.forName(config.compressionType).targetCompressionType(appendInfo.sourceCompression)
+              val targetCompression = 
config.compressionType.targetCompression(config.compression, 
appendInfo.sourceCompression())

Review Comment:
   `config.compressionType` is not really being used. Could we make 
`targetCompression` a static method and independent of compressionType?



##########
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala:
##########
@@ -917,6 +940,10 @@ class KafkaConfigTest {
         case MetricConfigs.METRIC_SAMPLE_WINDOW_MS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", "-1", "0")
         case MetricConfigs.METRIC_REPORTER_CLASSES_CONFIG => // ignore string
         case MetricConfigs.METRIC_RECORDING_LEVEL_CONFIG => // ignore string
+
+        case KafkaConfig.CompressionGzipLevelProp => 
assertPropertyInvalid(baseProperties, name, "not_a_number", "0")

Review Comment:
   Should we test LZ4 too?



##########
clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java:
##########
@@ -60,20 +61,20 @@ public class MemoryRecordsBuilderTest {
 
     private static class Args {
         final int bufferOffset;
-        final CompressionType compressionType;
+        final Compression compression;
         final byte magic;
 
-        public Args(int bufferOffset, CompressionType compressionType, byte 
magic) {
+        public Args(int bufferOffset, Compression compression, byte magic) {
             this.bufferOffset = bufferOffset;
-            this.compressionType = compressionType;
+            this.compression = compression;
             this.magic = magic;
         }
 
         @Override
         public String toString() {
             return "magic=" + magic +
                 ", bufferOffset=" + bufferOffset +
-                ", compressionType=" + compressionType;
+                ", compressionType=" + compression;

Review Comment:
   compressionType => compression?



##########
clients/src/test/java/org/apache/kafka/common/compress/NoCompressionTest.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.compress;
+
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class NoCompressionTest {
+
+    @Test
+    public void testCompressionDecompression() throws IOException {
+        NoCompression compression = Compression.NONE;
+        byte[] data = "data".getBytes(StandardCharsets.UTF_8);
+
+        for (byte magic : Arrays.asList(RecordBatch.MAGIC_VALUE_V0, 
RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)) {
+            ByteBufferOutputStream bufferStream = new 
ByteBufferOutputStream(4);
+            try (OutputStream out = compression.wrapForOutput(bufferStream, 
RecordBatch.CURRENT_MAGIC_VALUE)) {

Review Comment:
   `RecordBatch.CURRENT_MAGIC_VALUE` should be `magic`, right? Ditto in 
SnappyCompressionTest, GzipCompressionTest and ZstdCompressionTest.



##########
core/src/test/scala/unit/kafka/log/LogConfigTest.scala:
##########
@@ -95,6 +95,8 @@ class LogConfigTest {
       case TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG => 
assertPropertyInvalid(name, "not_a_boolean")
       case TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG => 
assertPropertyInvalid(name, "not_a_number", "-3")
       case TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG => 
assertPropertyInvalid(name, "not_a_number", "-3")
+      case TopicConfig.COMPRESSION_GZIP_LEVEL_CONFIG => 
assertPropertyInvalid(name, "not_a_number", "-2")
+      case TopicConfig.COMPRESSION_ZSTD_LEVEL_CONFIG => 
assertPropertyInvalid(name, "not_a_number", "-0.1")

Review Comment:
   Should we test LZ4 too?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to