wsry commented on code in PR #20216: URL: https://github.com/apache/flink/pull/20216#discussion_r934354937
########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferCompressor.java: ########## @@ -107,15 +111,34 @@ private int compress(Buffer buffer) { "Illegal reference count, buffer need to be released."); try { + int compressedLen; int length = buffer.getSize(); - // compress the given buffer into the internal heap buffer - int compressedLen = - blockCompressor.compress( - buffer.getNioBuffer(0, length), - 0, - length, - internalBuffer.getNioBuffer(0, internalBuffer.capacity()), - 0); + MemorySegment memorySegment = buffer.getMemorySegment(); + // If buffer is in-heap, manipulate the underlying array directly. There are two main + // reasons why NIO buffer is not directly used here: One is that some compression + // libraries will use the underlying array for heap buffer, but our input buffer may be + // a read-only ByteBuffer, and it is illegal to access internal array. Another reason + // is that for the buffer in the heap directly operates the underlying array can reduce + // additional overhead compared to generating NIO buffer. Review Comment: ```suggestion // If buffer is on-heap, manipulate the underlying array directly. There are two main // reasons why NIO buffer is not directly used here: One is that some compression // libraries will use the underlying array for heap buffer, but our input buffer may be // a read-only ByteBuffer, and it is illegal to access internal array. Another reason // is that for the on-heap buffer, directly operating the underlying array can reduce // additional overhead compared to generating a NIO buffer. ``` ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/AirBlockDecompressor.java: ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.compression; + +import io.airlift.compress.Decompressor; +import io.airlift.compress.MalformedInputException; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +import static org.apache.flink.runtime.io.compression.CompressorUtils.HEADER_LENGTH; +import static org.apache.flink.runtime.io.compression.CompressorUtils.readIntLE; +import static org.apache.flink.runtime.io.compression.CompressorUtils.validateLength; + +/** Flink decompressor that wraps {@link Decompressor}. */ +public class AirBlockDecompressor implements BlockDecompressor { + Decompressor internalDecompressor; Review Comment: private final ########## flink-dist/src/main/resources/META-INF/NOTICE: ########## @@ -12,6 +12,7 @@ This project bundles the following dependencies under the Apache Software Licens - commons-cli:commons-cli:1.5.0 - commons-collections:commons-collections:3.2.2 - commons-io:commons-io:2.11.0 +- io.airlift:aircompressor:0.21 Review Comment: According to the notice check result, we should remove this. ########## docs/layouts/shortcodes/generated/all_taskmanager_network_section.html: ########## @@ -20,6 +20,12 @@ <td>String</td> <td>The blocking shuffle type, either "mmap" or "file". The "auto" means selecting the property type automatically based on system memory architecture (64 bit for mmap and 32 bit for file). Note that the memory usage of mmap is not accounted by configured memory limits, but some resource frameworks like yarn would track this memory usage and kill the container once memory exceeding some threshold. Also note that this option is experimental and might be changed future.</td> </tr> + <tr> + <td><h5>taskmanager.network.compression.codec</h5></td> + <td style="word-wrap: break-word;">"LZ4"</td> + <td>String</td> + <td>The codec to be used when compressing shuffle data, only "LZ4", "LZO" and "Z_STD" are supported now. Through tpc-ds test of these three algorithms, the results show that "LZ4" algorithm has the highest compression and decompression speed, but the compression rate is the lowest. "Z_STD"has the highest compression rate, but the compression and decompression speed is the slowest, and LZO is between the two. Also note that this option is experimental and might be changed future.</td> Review Comment: ```suggestion <td>The codec to be used when compressing shuffle data, only "LZ4", "LZO" and "ZSTD" are supported now. Through tpc-ds test of these three algorithms, the results show that "LZ4" algorithm has the highest compression and decompression speed, but the compression ratio is the lowest. "ZSTD" has the highest compression ratio, but the compression and decompression speed is the slowest, and LZO is between the two. Also note that this option is experimental and might be changed in the future.</td> ``` ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/AirBlockCompressor.java: ########## @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.compression; + +import io.airlift.compress.Compressor; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +import static org.apache.flink.runtime.io.compression.CompressorUtils.HEADER_LENGTH; +import static org.apache.flink.runtime.io.compression.CompressorUtils.writeIntLE; + +/** Flink compressor that wraps {@link Compressor}. */ +public class AirBlockCompressor implements BlockCompressor { + private final Compressor internalCompressor; + + public AirBlockCompressor(Compressor internalCompressor) { + this.internalCompressor = internalCompressor; + } + + @Override + public int getMaxCompressedSize(int srcSize) { + return HEADER_LENGTH + internalCompressor.maxCompressedLength(srcSize); + } + + @Override + public int compress(ByteBuffer src, int srcOff, int srcLen, ByteBuffer dst, int dstOff) + throws BufferCompressionException { + try { Review Comment: May also add a length check just like the below compress method. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/BufferCompressionException.java: ########## @@ -19,24 +19,24 @@ package org.apache.flink.runtime.io.compression; /** - * A {@code DataCorruptionException} is thrown when the decompressed data is corrupted and cannot be - * decompressed. + * A {@code BufferCompressionException} is thrown when the compression data cannot be compressed, Review Comment: ```suggestion * A {@code BufferCompressionException} is thrown when the target data cannot be compressed, ``` ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferDecompressor.java: ########## @@ -37,15 +37,19 @@ public class BufferDecompressor { /** The intermediate buffer for the decompressed data. */ private final NetworkBuffer internalBuffer; + /** The backup array of intermediate buffer. */ + private final byte[] internalBufferArray; + public BufferDecompressor(int bufferSize, String factoryName) { checkArgument(bufferSize > 0); checkNotNull(factoryName); // the decompressed data size should be never larger than the configured buffer size - final byte[] heapBuffer = new byte[bufferSize]; + internalBufferArray = new byte[bufferSize]; Review Comment: nit: internalBufferArray -> this.internalBufferArray ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/BufferDecompressionException.java: ########## @@ -19,25 +19,24 @@ package org.apache.flink.runtime.io.compression; /** - * An {@code InsufficientBufferException} is thrown when there is no enough buffer to serialize or - * deserialize a buffer to another buffer. When such exception being caught, user may enlarge the - * output buffer and try again. + * A {@code BufferDecompressionException} is thrown when the decompressed data cannot be Review Comment: ```suggestion * A {@code BufferDecompressionException} is thrown when the target data cannot be ``` ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/AirBlockCompressor.java: ########## @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.compression; + +import io.airlift.compress.Compressor; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +import static org.apache.flink.runtime.io.compression.CompressorUtils.HEADER_LENGTH; +import static org.apache.flink.runtime.io.compression.CompressorUtils.writeIntLE; + +/** Flink compressor that wraps {@link Compressor}. */ +public class AirBlockCompressor implements BlockCompressor { + private final Compressor internalCompressor; + + public AirBlockCompressor(Compressor internalCompressor) { + this.internalCompressor = internalCompressor; + } + + @Override + public int getMaxCompressedSize(int srcSize) { + return HEADER_LENGTH + internalCompressor.maxCompressedLength(srcSize); + } + + @Override + public int compress(ByteBuffer src, int srcOff, int srcLen, ByteBuffer dst, int dstOff) + throws BufferCompressionException { + try { + final int prevSrcOff = src.position() + srcOff; + final int prevDstOff = dst.position() + dstOff; + + src.position(prevSrcOff); + dst.position(prevDstOff + HEADER_LENGTH); + + internalCompressor.compress(src, dst); + + int compressedLength = dst.position() - prevDstOff - HEADER_LENGTH; + + dst.position(prevDstOff); + dst.order(ByteOrder.LITTLE_ENDIAN); + dst.putInt(compressedLength); + dst.putInt(srcLen); + dst.position(prevDstOff + compressedLength + HEADER_LENGTH); + + return HEADER_LENGTH + compressedLength; + } catch (Exception e) { + throw new BufferCompressionException(e); + } + } + + @Override + public int compress(byte[] src, int srcOff, int srcLen, byte[] dst, int dstOff) + throws BufferCompressionException { + try { + if (dst.length < dstOff + getMaxCompressedSize(srcLen) - 1) { Review Comment: Remove ```- 1``` ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferCompressor.java: ########## @@ -36,16 +36,20 @@ public class BufferCompressor { /** The intermediate buffer for the compressed data. */ private final NetworkBuffer internalBuffer; + /** The backup array of intermediate buffer. */ + private final byte[] internalBufferArray; + public BufferCompressor(int bufferSize, String factoryName) { checkArgument(bufferSize > 0); checkNotNull(factoryName); // the size of this intermediate heap buffer will be gotten from the // plugin configuration in the future, and currently, double size of - // the input buffer is enough for lz4-java compression library. - final byte[] heapBuffer = new byte[2 * bufferSize]; + // the input buffer is enough for compression libraries we used. Review Comment: ```suggestion // the input buffer is enough for the compression libraries used. ``` ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferDecompressor.java: ########## @@ -103,12 +107,28 @@ private int decompress(Buffer buffer) { "Illegal reference count, buffer need to be released."); int length = buffer.getSize(); - // decompress the given buffer into the internal heap buffer - return blockDecompressor.decompress( - buffer.getNioBuffer(0, length), - 0, - length, - internalBuffer.getNioBuffer(0, internalBuffer.capacity()), - 0); + MemorySegment memorySegment = buffer.getMemorySegment(); + // If buffer is in-heap, manipulate the underlying array directly. There are two main + // reasons why NIO buffer is not directly used here: One is that some compression + // libraries will use the underlying array for heap buffer, but our input buffer may be + // a read-only ByteBuffer, and it is illegal to access internal array. Another reason + // is that for the buffer in the heap directly operates the underlying array can reduce + // additional overhead compared to generating NIO buffer. Review Comment: ```suggestion // If buffer is on-heap, manipulate the underlying array directly. There are two main // reasons why NIO buffer is not directly used here: One is that some compression // libraries will use the underlying array for heap buffer, but our input buffer may be // a read-only ByteBuffer, and it is illegal to access internal array. Another reason // is that for the on-heap buffer, directly operating the underlying array can reduce // additional overhead compared to generating a NIO buffer. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org