wsry commented on code in PR #20216:
URL: https://github.com/apache/flink/pull/20216#discussion_r934354937


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferCompressor.java:
##########
@@ -107,15 +111,34 @@ private int compress(Buffer buffer) {
                 "Illegal reference count, buffer need to be released.");
 
         try {
+            int compressedLen;
             int length = buffer.getSize();
-            // compress the given buffer into the internal heap buffer
-            int compressedLen =
-                    blockCompressor.compress(
-                            buffer.getNioBuffer(0, length),
-                            0,
-                            length,
-                            internalBuffer.getNioBuffer(0, 
internalBuffer.capacity()),
-                            0);
+            MemorySegment memorySegment = buffer.getMemorySegment();
+            // If buffer is in-heap, manipulate the underlying array directly. 
There are two main
+            // reasons why NIO buffer is not directly used here: One is that 
some compression
+            // libraries will use the underlying array for heap buffer, but 
our input buffer may be
+            // a read-only ByteBuffer, and it is illegal to access internal 
array. Another reason
+            // is that for the buffer in the heap directly operates the 
underlying array can reduce
+            // additional overhead compared to generating NIO buffer.

Review Comment:
   ```suggestion
               // If buffer is on-heap, manipulate the underlying array 
directly. There are two main
               // reasons why NIO buffer is not directly used here: One is that 
some compression
               // libraries will use the underlying array for heap buffer, but 
our input buffer may be
               // a read-only ByteBuffer, and it is illegal to access internal 
array. Another reason
               // is that for the on-heap buffer, directly operating the 
underlying array can reduce
               // additional overhead compared to generating a NIO buffer.
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/AirBlockDecompressor.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.compression;
+
+import io.airlift.compress.Decompressor;
+import io.airlift.compress.MalformedInputException;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static 
org.apache.flink.runtime.io.compression.CompressorUtils.HEADER_LENGTH;
+import static 
org.apache.flink.runtime.io.compression.CompressorUtils.readIntLE;
+import static 
org.apache.flink.runtime.io.compression.CompressorUtils.validateLength;
+
+/** Flink decompressor that wraps {@link Decompressor}. */
+public class AirBlockDecompressor implements BlockDecompressor {
+    Decompressor internalDecompressor;

Review Comment:
   private final



##########
flink-dist/src/main/resources/META-INF/NOTICE:
##########
@@ -12,6 +12,7 @@ This project bundles the following dependencies under the 
Apache Software Licens
 - commons-cli:commons-cli:1.5.0
 - commons-collections:commons-collections:3.2.2
 - commons-io:commons-io:2.11.0
+- io.airlift:aircompressor:0.21

Review Comment:
   According to the notice check result, we should remove this.



##########
docs/layouts/shortcodes/generated/all_taskmanager_network_section.html:
##########
@@ -20,6 +20,12 @@
             <td>String</td>
             <td>The blocking shuffle type, either "mmap" or "file". The "auto" 
means selecting the property type automatically based on system memory 
architecture (64 bit for mmap and 32 bit for file). Note that the memory usage 
of mmap is not accounted by configured memory limits, but some resource 
frameworks like yarn would track this memory usage and kill the container once 
memory exceeding some threshold. Also note that this option is experimental and 
might be changed future.</td>
         </tr>
+        <tr>
+            <td><h5>taskmanager.network.compression.codec</h5></td>
+            <td style="word-wrap: break-word;">"LZ4"</td>
+            <td>String</td>
+            <td>The codec to be used when compressing shuffle data, only 
"LZ4", "LZO" and "Z_STD" are supported now. Through tpc-ds test of these three 
algorithms, the results show that "LZ4" algorithm has the highest compression 
and decompression speed, but the compression rate is the lowest. "Z_STD"has the 
highest compression rate, but the compression and decompression speed is the 
slowest, and LZO is between the two. Also note that this option is experimental 
and might be changed future.</td>

Review Comment:
   ```suggestion
               <td>The codec to be used when compressing shuffle data, only 
"LZ4", "LZO" and "ZSTD" are supported now. Through tpc-ds test of these three 
algorithms, the results show that "LZ4" algorithm has the highest compression 
and decompression speed, but the compression ratio is the lowest. "ZSTD" has 
the highest compression ratio, but the compression and decompression speed is 
the slowest, and LZO is between the two. Also note that this option is 
experimental and might be changed in the future.</td>
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/AirBlockCompressor.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.compression;
+
+import io.airlift.compress.Compressor;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static 
org.apache.flink.runtime.io.compression.CompressorUtils.HEADER_LENGTH;
+import static 
org.apache.flink.runtime.io.compression.CompressorUtils.writeIntLE;
+
+/** Flink compressor that wraps {@link Compressor}. */
+public class AirBlockCompressor implements BlockCompressor {
+    private final Compressor internalCompressor;
+
+    public AirBlockCompressor(Compressor internalCompressor) {
+        this.internalCompressor = internalCompressor;
+    }
+
+    @Override
+    public int getMaxCompressedSize(int srcSize) {
+        return HEADER_LENGTH + internalCompressor.maxCompressedLength(srcSize);
+    }
+
+    @Override
+    public int compress(ByteBuffer src, int srcOff, int srcLen, ByteBuffer 
dst, int dstOff)
+            throws BufferCompressionException {
+        try {

Review Comment:
   May also add a length check just like the below compress method.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/BufferCompressionException.java:
##########
@@ -19,24 +19,24 @@
 package org.apache.flink.runtime.io.compression;
 
 /**
- * A {@code DataCorruptionException} is thrown when the decompressed data is 
corrupted and cannot be
- * decompressed.
+ * A {@code BufferCompressionException} is thrown when the compression data 
cannot be compressed,

Review Comment:
   ```suggestion
    * A {@code BufferCompressionException} is thrown when the target data 
cannot be compressed,
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferDecompressor.java:
##########
@@ -37,15 +37,19 @@ public class BufferDecompressor {
     /** The intermediate buffer for the decompressed data. */
     private final NetworkBuffer internalBuffer;
 
+    /** The backup array of intermediate buffer. */
+    private final byte[] internalBufferArray;
+
     public BufferDecompressor(int bufferSize, String factoryName) {
         checkArgument(bufferSize > 0);
         checkNotNull(factoryName);
 
         // the decompressed data size should be never larger than the 
configured buffer size
-        final byte[] heapBuffer = new byte[bufferSize];
+        internalBufferArray = new byte[bufferSize];

Review Comment:
   nit: internalBufferArray  -> this.internalBufferArray



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/BufferDecompressionException.java:
##########
@@ -19,25 +19,24 @@
 package org.apache.flink.runtime.io.compression;
 
 /**
- * An {@code InsufficientBufferException} is thrown when there is no enough 
buffer to serialize or
- * deserialize a buffer to another buffer. When such exception being caught, 
user may enlarge the
- * output buffer and try again.
+ * A {@code BufferDecompressionException} is thrown when the decompressed data 
cannot be

Review Comment:
   ```suggestion
    * A {@code BufferDecompressionException} is thrown when the target data 
cannot be
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/AirBlockCompressor.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.compression;
+
+import io.airlift.compress.Compressor;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static 
org.apache.flink.runtime.io.compression.CompressorUtils.HEADER_LENGTH;
+import static 
org.apache.flink.runtime.io.compression.CompressorUtils.writeIntLE;
+
+/** Flink compressor that wraps {@link Compressor}. */
+public class AirBlockCompressor implements BlockCompressor {
+    private final Compressor internalCompressor;
+
+    public AirBlockCompressor(Compressor internalCompressor) {
+        this.internalCompressor = internalCompressor;
+    }
+
+    @Override
+    public int getMaxCompressedSize(int srcSize) {
+        return HEADER_LENGTH + internalCompressor.maxCompressedLength(srcSize);
+    }
+
+    @Override
+    public int compress(ByteBuffer src, int srcOff, int srcLen, ByteBuffer 
dst, int dstOff)
+            throws BufferCompressionException {
+        try {
+            final int prevSrcOff = src.position() + srcOff;
+            final int prevDstOff = dst.position() + dstOff;
+
+            src.position(prevSrcOff);
+            dst.position(prevDstOff + HEADER_LENGTH);
+
+            internalCompressor.compress(src, dst);
+
+            int compressedLength = dst.position() - prevDstOff - HEADER_LENGTH;
+
+            dst.position(prevDstOff);
+            dst.order(ByteOrder.LITTLE_ENDIAN);
+            dst.putInt(compressedLength);
+            dst.putInt(srcLen);
+            dst.position(prevDstOff + compressedLength + HEADER_LENGTH);
+
+            return HEADER_LENGTH + compressedLength;
+        } catch (Exception e) {
+            throw new BufferCompressionException(e);
+        }
+    }
+
+    @Override
+    public int compress(byte[] src, int srcOff, int srcLen, byte[] dst, int 
dstOff)
+            throws BufferCompressionException {
+        try {
+            if (dst.length < dstOff + getMaxCompressedSize(srcLen) - 1) {

Review Comment:
   Remove ```- 1```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferCompressor.java:
##########
@@ -36,16 +36,20 @@ public class BufferCompressor {
     /** The intermediate buffer for the compressed data. */
     private final NetworkBuffer internalBuffer;
 
+    /** The backup array of intermediate buffer. */
+    private final byte[] internalBufferArray;
+
     public BufferCompressor(int bufferSize, String factoryName) {
         checkArgument(bufferSize > 0);
         checkNotNull(factoryName);
         // the size of this intermediate heap buffer will be gotten from the
         // plugin configuration in the future, and currently, double size of
-        // the input buffer is enough for lz4-java compression library.
-        final byte[] heapBuffer = new byte[2 * bufferSize];
+        // the input buffer is enough for compression libraries we used.

Review Comment:
   ```suggestion
           // the input buffer is enough for the compression libraries used.
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferDecompressor.java:
##########
@@ -103,12 +107,28 @@ private int decompress(Buffer buffer) {
                 "Illegal reference count, buffer need to be released.");
 
         int length = buffer.getSize();
-        // decompress the given buffer into the internal heap buffer
-        return blockDecompressor.decompress(
-                buffer.getNioBuffer(0, length),
-                0,
-                length,
-                internalBuffer.getNioBuffer(0, internalBuffer.capacity()),
-                0);
+        MemorySegment memorySegment = buffer.getMemorySegment();
+        // If buffer is in-heap, manipulate the underlying array directly. 
There are two main
+        // reasons why NIO buffer is not directly used here: One is that some 
compression
+        // libraries will use the underlying array for heap buffer, but our 
input buffer may be
+        // a read-only ByteBuffer, and it is illegal to access internal array. 
Another reason
+        // is that for the buffer in the heap directly operates the underlying 
array can reduce
+        // additional overhead compared to generating NIO buffer.

Review Comment:
   ```suggestion
           // If buffer is on-heap, manipulate the underlying array directly. 
There are two main
           // reasons why NIO buffer is not directly used here: One is that 
some compression
           // libraries will use the underlying array for heap buffer, but our 
input buffer may be
           // a read-only ByteBuffer, and it is illegal to access internal 
array. Another reason
           // is that for the on-heap buffer, directly operating the underlying 
array can reduce
           // additional overhead compared to generating a NIO buffer.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to