[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4
liyafan82 commented on a change in pull request #8949: URL: https://github.com/apache/arrow/pull/8949#discussion_r792333746 ## File path: java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java ## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.compression; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.arrow.flatbuf.CompressionType; +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.util.MemoryUtil; +import org.apache.arrow.util.Preconditions; +import org.apache.arrow.vector.compression.CompressionCodec; +import org.apache.arrow.vector.compression.CompressionUtil; +import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream; +import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream; +import org.apache.commons.compress.utils.IOUtils; + +import io.netty.util.internal.PlatformDependent; + +/** + * Compression codec for the LZ4 algorithm. + */ +public class Lz4CompressionCodec implements CompressionCodec { + + @Override + public ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) { +Preconditions.checkArgument(uncompressedBuffer.writerIndex() <= Integer.MAX_VALUE, +"The uncompressed buffer size exceeds the integer limit"); + +if (uncompressedBuffer.writerIndex() == 0L) { + // shortcut for empty buffer + ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH); + compressedBuffer.setLong(0, 0); + compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH); + uncompressedBuffer.close(); + return compressedBuffer; +} + +try { + ArrowBuf compressedBuffer = doCompress(allocator, uncompressedBuffer); + long compressedLength = compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH; + if (compressedLength > uncompressedBuffer.writerIndex()) { +// compressed buffer is larger, send the raw buffer +compressedBuffer.close(); +compressedBuffer = CompressionUtil.compressRawBuffer(allocator, uncompressedBuffer); + } + + uncompressedBuffer.close(); + return compressedBuffer; +} catch (IOException e) { + throw new RuntimeException(e); +} + } + + private ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) throws IOException { +byte[] inBytes = new byte[(int) uncompressedBuffer.writerIndex()]; +PlatformDependent.copyMemory(uncompressedBuffer.memoryAddress(), inBytes, 0, uncompressedBuffer.writerIndex()); +ByteArrayOutputStream baos = new ByteArrayOutputStream(); +try (InputStream in = new ByteArrayInputStream(inBytes); + OutputStream out = new FramedLZ4CompressorOutputStream(baos)) { + IOUtils.copy(in, out); +} + +byte[] outBytes = baos.toByteArray(); + +ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length); + +long uncompressedLength = uncompressedBuffer.writerIndex(); +if (!MemoryUtil.LITTLE_ENDIAN) { + uncompressedLength = Long.reverseBytes(uncompressedLength); +} +// first 8 bytes reserved for uncompressed length, to be consistent with the +// C++ implementation. +compressedBuffer.setLong(0, uncompressedLength); + +PlatformDependent.copyMemory( +outBytes, 0, compressedBuffer.memoryAddress() + CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, outBytes.length); +compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length); +return compressedBuffer; + } + + @Override + public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf compressedBuffer) { +Preconditions.checkArgument(compressedBuffer.writerIndex() <= Integer.MAX_VALUE, +"The compressed buffer size exceeds the integer limit"); + +Preconditions.checkArgument(compressedBuffer.writerIndex() >= CompressionUtil.SIZE_OF_UNCOMPRES
[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4
liyafan82 commented on a change in pull request #8949: URL: https://github.com/apache/arrow/pull/8949#discussion_r595805905 ## File path: java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java ## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.compression; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.arrow.flatbuf.CompressionType; +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.util.MemoryUtil; +import org.apache.arrow.util.Preconditions; +import org.apache.arrow.vector.compression.CompressionCodec; +import org.apache.arrow.vector.compression.CompressionUtil; +import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream; +import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream; +import org.apache.commons.compress.utils.IOUtils; + +import io.netty.util.internal.PlatformDependent; Review comment: I guess no for now. Maybe we can have one in the future, so we can remove the dependency on Netty (and other dependencies on Netty as well). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4
liyafan82 commented on a change in pull request #8949: URL: https://github.com/apache/arrow/pull/8949#discussion_r592197707 ## File path: java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java ## @@ -47,14 +57,21 @@ public static ArrowBodyCompression createBodyCompression(CompressionCodec codec) } /** - * Creates the {@link CompressionCodec} given the compression type. + * Process compression by compressing the buffer as is. */ - public static CompressionCodec createCodec(byte compressionType) { -switch (compressionType) { - case NoCompressionCodec.COMPRESSION_TYPE: -return NoCompressionCodec.INSTANCE; - default: -throw new IllegalArgumentException("Compression type not supported: " + compressionType); -} + public static ArrowBuf compressRawBuffer(BufferAllocator allocator, ArrowBuf inputBuffer) { +ArrowBuf compressedBuffer = allocator.buffer(SIZE_OF_UNCOMPRESSED_LENGTH + inputBuffer.writerIndex()); +compressedBuffer.setLong(0, NO_COMPRESSION_LENGTH); +compressedBuffer.setBytes(SIZE_OF_UNCOMPRESSED_LENGTH, inputBuffer, 0, inputBuffer.writerIndex()); +compressedBuffer.writerIndex(SIZE_OF_UNCOMPRESSED_LENGTH + inputBuffer.writerIndex()); +return compressedBuffer; + } + + /** + * Process decompression by decompressing the buffer as is. Review comment: I chose `extractUncompressedBuffer`. Thanks for the good suggestion. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4
liyafan82 commented on a change in pull request #8949: URL: https://github.com/apache/arrow/pull/8949#discussion_r592197440 ## File path: java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java ## @@ -79,7 +105,12 @@ private void loadBuffers( List ownBuffers = new ArrayList<>(bufferLayoutCount); for (int j = 0; j < bufferLayoutCount; j++) { ArrowBuf nextBuf = buffers.next(); - ownBuffers.add(codec.decompress(vector.getAllocator(), nextBuf)); + // for vectors without nulls, the buffer is empty, so there is no need to decompress it. + ArrowBuf bufferToAdd = nextBuf.writerIndex() > 0 ? codec.decompress(vector.getAllocator(), nextBuf) : nextBuf; + ownBuffers.add(bufferToAdd); + if (decompressionPerformed) { +decompressedBuffers.add(bufferToAdd); + } } try { vector.loadFieldBuffers(fieldNode, ownBuffers); Review comment: Good suggestion. This way, we do not need to change the public interface of `VectorLoader`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4
liyafan82 commented on a change in pull request #8949: URL: https://github.com/apache/arrow/pull/8949#discussion_r592196563 ## File path: java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java ## @@ -47,14 +57,21 @@ public static ArrowBodyCompression createBodyCompression(CompressionCodec codec) } /** - * Creates the {@link CompressionCodec} given the compression type. + * Process compression by compressing the buffer as is. */ - public static CompressionCodec createCodec(byte compressionType) { -switch (compressionType) { - case NoCompressionCodec.COMPRESSION_TYPE: -return NoCompressionCodec.INSTANCE; - default: -throw new IllegalArgumentException("Compression type not supported: " + compressionType); -} + public static ArrowBuf compressRawBuffer(BufferAllocator allocator, ArrowBuf inputBuffer) { +ArrowBuf compressedBuffer = allocator.buffer(SIZE_OF_UNCOMPRESSED_LENGTH + inputBuffer.writerIndex()); Review comment: This is a good point. It could be useful in many scenarios. However, so far, I do not have a method to solve it (without changing our APIs significantly). The fundamental reason is that the compression feature is only used in `VectorLoader`, which requires a single compressed buffer for each input buffer. In addition, we do not have an efficient way to combine two `ArrowBuf` objects. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4
liyafan82 commented on a change in pull request #8949: URL: https://github.com/apache/arrow/pull/8949#discussion_r592192366 ## File path: java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java ## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.compression; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.arrow.flatbuf.CompressionType; +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.util.MemoryUtil; +import org.apache.arrow.util.Preconditions; +import org.apache.arrow.vector.compression.CompressionCodec; +import org.apache.arrow.vector.compression.CompressionUtil; +import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream; +import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream; +import org.apache.commons.compress.utils.IOUtils; + +import io.netty.util.internal.PlatformDependent; + +/** + * Compression codec for the LZ4 algorithm. + */ +public class Lz4CompressionCodec implements CompressionCodec { + + @Override + public ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) { +Preconditions.checkArgument(uncompressedBuffer.writerIndex() <= Integer.MAX_VALUE, +"The uncompressed buffer size exceeds the integer limit"); + +if (uncompressedBuffer.writerIndex() == 0L) { + // shortcut for empty buffer + ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH); + compressedBuffer.setLong(0, 0); + compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH); + uncompressedBuffer.close(); + return compressedBuffer; +} + +try { + ArrowBuf compressedBuffer = doCompress(allocator, uncompressedBuffer); + long compressedLength = compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH; + if (compressedLength > uncompressedBuffer.writerIndex()) { +// compressed buffer is larger, send the raw buffer +compressedBuffer.close(); +compressedBuffer = CompressionUtil.compressRawBuffer(allocator, uncompressedBuffer); + } + + uncompressedBuffer.close(); + return compressedBuffer; +} catch (IOException e) { + throw new RuntimeException(e); +} + } + + private ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) throws IOException { +byte[] inBytes = new byte[(int) uncompressedBuffer.writerIndex()]; +PlatformDependent.copyMemory(uncompressedBuffer.memoryAddress(), inBytes, 0, uncompressedBuffer.writerIndex()); +ByteArrayOutputStream baos = new ByteArrayOutputStream(); +try (InputStream in = new ByteArrayInputStream(inBytes); + OutputStream out = new FramedLZ4CompressorOutputStream(baos)) { + IOUtils.copy(in, out); +} + +byte[] outBytes = baos.toByteArray(); + +ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length); + +long uncompressedLength = uncompressedBuffer.writerIndex(); +if (!MemoryUtil.LITTLE_ENDIAN) { + uncompressedLength = Long.reverseBytes(uncompressedLength); +} +// first 8 bytes reserved for uncompressed length, to be consistent with the +// C++ implementation. Review comment: Revised. Thanks for your kind reminder. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4
liyafan82 commented on a change in pull request #8949: URL: https://github.com/apache/arrow/pull/8949#discussion_r592192114 ## File path: java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java ## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.compression; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.arrow.flatbuf.CompressionType; +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.util.MemoryUtil; +import org.apache.arrow.util.Preconditions; +import org.apache.arrow.vector.compression.CompressionCodec; +import org.apache.arrow.vector.compression.CompressionUtil; +import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream; +import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream; +import org.apache.commons.compress.utils.IOUtils; + +import io.netty.util.internal.PlatformDependent; + +/** + * Compression codec for the LZ4 algorithm. + */ +public class Lz4CompressionCodec implements CompressionCodec { + + @Override + public ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) { +Preconditions.checkArgument(uncompressedBuffer.writerIndex() <= Integer.MAX_VALUE, +"The uncompressed buffer size exceeds the integer limit"); + +if (uncompressedBuffer.writerIndex() == 0L) { + // shortcut for empty buffer + ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH); + compressedBuffer.setLong(0, 0); + compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH); + uncompressedBuffer.close(); + return compressedBuffer; +} + +try { + ArrowBuf compressedBuffer = doCompress(allocator, uncompressedBuffer); + long compressedLength = compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH; + if (compressedLength > uncompressedBuffer.writerIndex()) { +// compressed buffer is larger, send the raw buffer +compressedBuffer.close(); +compressedBuffer = CompressionUtil.compressRawBuffer(allocator, uncompressedBuffer); Review comment: Sure. It sounds better. I have changed the name accordingly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4
liyafan82 commented on a change in pull request #8949: URL: https://github.com/apache/arrow/pull/8949#discussion_r592190683 ## File path: java/compression/src/main/java/org/apache/arrow/compression/CommonsCompressionFactory.java ## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.compression; + +import org.apache.arrow.flatbuf.CompressionType; +import org.apache.arrow.vector.compression.CompressionCodec; +import org.apache.arrow.vector.compression.NoCompressionCodec; + +/** + * A factory implementation based on Apache Commons library. + */ +public class CommonsCompressionFactory implements CompressionCodec.Factory { + + public static final CommonsCompressionFactory INSTANCE = new CommonsCompressionFactory(); + + @Override + public CompressionCodec createCodec(byte codecType) { +switch (codecType) { + case NoCompressionCodec.COMPRESSION_TYPE: Review comment: Sounds reasonable. I have revised the code accordingly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4
liyafan82 commented on a change in pull request #8949: URL: https://github.com/apache/arrow/pull/8949#discussion_r592190402 ## File path: java/compression/src/main/java/org/apache/arrow/compression/CommonsCompressionFactory.java ## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.compression; + +import org.apache.arrow.flatbuf.CompressionType; +import org.apache.arrow.vector.compression.CompressionCodec; +import org.apache.arrow.vector.compression.NoCompressionCodec; + +/** + * A factory implementation based on Apache Commons library. + */ +public class CommonsCompressionFactory implements CompressionCodec.Factory { + + public static final CommonsCompressionFactory INSTANCE = new CommonsCompressionFactory(); + + @Override + public CompressionCodec createCodec(byte codecType) { Review comment: Sounds reasonable. I have extracted an enum `CompressionUtil#CodecType` for this purpose. It also makes some other operations easier. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4
liyafan82 commented on a change in pull request #8949: URL: https://github.com/apache/arrow/pull/8949#discussion_r589202945 ## File path: java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java ## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.compression; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.arrow.flatbuf.CompressionType; +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.util.MemoryUtil; +import org.apache.arrow.util.Preconditions; +import org.apache.arrow.vector.compression.CompressionCodec; +import org.apache.arrow.vector.compression.CompressionUtil; +import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream; +import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream; +import org.apache.commons.compress.utils.IOUtils; + +import io.netty.util.internal.PlatformDependent; + +/** + * Compression codec for the LZ4 algorithm. + */ +public class Lz4CompressionCodec implements CompressionCodec { + + @Override + public ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) { +Preconditions.checkArgument(uncompressedBuffer.writerIndex() <= Integer.MAX_VALUE, +"The uncompressed buffer size exceeds the integer limit"); + +if (uncompressedBuffer.writerIndex() == 0L) { + // shortcut for empty buffer + ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH); + compressedBuffer.setLong(0, 0); + compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH); + uncompressedBuffer.close(); + return compressedBuffer; +} + +try { + ArrowBuf compressedBuffer = doCompress(allocator, uncompressedBuffer); + long compressedLength = compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH; + if (compressedLength > uncompressedBuffer.writerIndex()) { +// compressed buffer is larger, send the raw buffer +compressedBuffer.close(); +compressedBuffer = CompressionUtil.compressRawBuffer(allocator, uncompressedBuffer); + } + + uncompressedBuffer.close(); + return compressedBuffer; +} catch (IOException e) { + throw new RuntimeException(e); +} + } + + private ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) throws IOException { +byte[] inBytes = new byte[(int) uncompressedBuffer.writerIndex()]; Review comment: Sure. I have opened ARROW-11901 to track it. I also want to share some thoughts briefly: to use the native memory directly, we need a way to wrap `ByteBuffers` as input/output streams (In Java, the only "standard" way to access the off-heap memory is through the `DirectByteBuffer`). We need some third party library to achieve that. We also need to evaluate the performance thereafter, because the Commons-compress library also uses on-heap data extensively, the copy between on-heap and off-heap data can be difficult to avoid. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4
liyafan82 commented on a change in pull request #8949: URL: https://github.com/apache/arrow/pull/8949#discussion_r589171170 ## File path: java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java ## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.compression; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.arrow.flatbuf.CompressionType; +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.util.MemoryUtil; +import org.apache.arrow.util.Preconditions; +import org.apache.arrow.vector.compression.CompressionCodec; +import org.apache.arrow.vector.compression.CompressionUtil; +import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream; +import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream; +import org.apache.commons.compress.utils.IOUtils; + +import io.netty.util.internal.PlatformDependent; + +/** + * Compression codec for the LZ4 algorithm. + */ +public class Lz4CompressionCodec implements CompressionCodec { + + @Override + public ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) { +Preconditions.checkArgument(uncompressedBuffer.writerIndex() <= Integer.MAX_VALUE, +"The uncompressed buffer size exceeds the integer limit"); + +if (uncompressedBuffer.writerIndex() == 0L) { + // shortcut for empty buffer + ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH); + compressedBuffer.setLong(0, 0); + compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH); + uncompressedBuffer.close(); + return compressedBuffer; +} + +try { + ArrowBuf compressedBuffer = doCompress(allocator, uncompressedBuffer); + long compressedLength = compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH; + if (compressedLength > uncompressedBuffer.writerIndex()) { +// compressed buffer is larger, send the raw buffer +compressedBuffer.close(); +compressedBuffer = CompressionUtil.compressRawBuffer(allocator, uncompressedBuffer); + } + + uncompressedBuffer.close(); + return compressedBuffer; +} catch (IOException e) { + throw new RuntimeException(e); +} + } + + private ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) throws IOException { +byte[] inBytes = new byte[(int) uncompressedBuffer.writerIndex()]; +PlatformDependent.copyMemory(uncompressedBuffer.memoryAddress(), inBytes, 0, uncompressedBuffer.writerIndex()); +ByteArrayOutputStream baos = new ByteArrayOutputStream(); +try (InputStream in = new ByteArrayInputStream(inBytes); + OutputStream out = new FramedLZ4CompressorOutputStream(baos)) { + IOUtils.copy(in, out); +} + +byte[] outBytes = baos.toByteArray(); + +ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length); + +long uncompressedLength = uncompressedBuffer.writerIndex(); +if (!MemoryUtil.LITTLE_ENDIAN) { + uncompressedLength = Long.reverseBytes(uncompressedLength); +} +// first 8 bytes reserved for uncompressed length, to be consistent with the +// C++ implementation. +compressedBuffer.setLong(0, uncompressedLength); + +PlatformDependent.copyMemory( +outBytes, 0, compressedBuffer.memoryAddress() + CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, outBytes.length); +compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length); +return compressedBuffer; + } + + @Override + public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf compressedBuffer) { +Preconditions.checkArgument(compressedBuffer.writerIndex() <= Integer.MAX_VALUE, +"The compressed buffer size exceeds the integer limit"); + +Preconditions.checkArgument(compressedBuffer.writerIndex() >= CompressionUtil.SIZE_OF_UNCOMPRES
[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4
liyafan82 commented on a change in pull request #8949: URL: https://github.com/apache/arrow/pull/8949#discussion_r589169372 ## File path: java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java ## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.compression; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.arrow.flatbuf.CompressionType; +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.util.MemoryUtil; +import org.apache.arrow.util.Preconditions; +import org.apache.arrow.vector.compression.CompressionCodec; +import org.apache.arrow.vector.compression.CompressionUtil; +import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream; +import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream; +import org.apache.commons.compress.utils.IOUtils; + +import io.netty.util.internal.PlatformDependent; + +/** + * Compression codec for the LZ4 algorithm. + */ +public class Lz4CompressionCodec implements CompressionCodec { Review comment: Good suggestion. I have opened ARROW-11899 to track it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4
liyafan82 commented on a change in pull request #8949: URL: https://github.com/apache/arrow/pull/8949#discussion_r588067931 ## File path: java/compression/pom.xml ## @@ -0,0 +1,56 @@ + + +http://maven.apache.org/POM/4.0.0"; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> + 4.0.0 + +org.apache.arrow +arrow-java-root +4.0.0-SNAPSHOT + + arrow-compression + Arrow Compression + (Experimental/Contrib) A library for working with the compression/decompression of Arrow data. + + + + org.apache.arrow + arrow-format + ${project.version} + + + org.apache.arrow + arrow-vector + ${project.version} + ${arrow.vector.classifier} + + + org.apache.arrow + arrow-memory-core + ${project.version} + + + org.apache.arrow + arrow-memory-unsafe + ${project.version} + test + + + org.apache.commons + commons-compress + 1.20 + + + io.netty Review comment: The `vector` module requires this library. If we do not add it in the dependency list, we get an errow when building with maven: ``` Used undeclared dependencies found: io.netty:netty-common:jar:4.1.48.Final:compile ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4
liyafan82 commented on a change in pull request #8949: URL: https://github.com/apache/arrow/pull/8949#discussion_r586185940 ## File path: java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java ## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.compression; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.arrow.flatbuf.CompressionType; +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.util.MemoryUtil; +import org.apache.arrow.util.Preconditions; +import org.apache.arrow.vector.compression.CompressionCodec; +import org.apache.arrow.vector.compression.CompressionUtil; +import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream; +import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream; +import org.apache.commons.compress.utils.IOUtils; + +import io.netty.util.internal.PlatformDependent; + +/** + * Compression codec for the LZ4 algorithm. + */ +public class Lz4CompressionCodec implements CompressionCodec { + + @Override + public ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) { +Preconditions.checkArgument(uncompressedBuffer.writerIndex() <= Integer.MAX_VALUE, +"The uncompressed buffer size exceeds the integer limit"); + +if (uncompressedBuffer.writerIndex() == 0L) { + // shortcut for empty buffer + ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH); + compressedBuffer.setLong(0, 0); + compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH); + uncompressedBuffer.close(); + return compressedBuffer; +} + +try { + ArrowBuf compressedBuffer = doCompress(allocator, uncompressedBuffer); + long compressedLength = compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH; + if (compressedLength > uncompressedBuffer.writerIndex()) { +// compressed buffer is larger, send the raw buffer +compressedBuffer.close(); +compressedBuffer = CompressionUtil.compressRawBuffer(allocator, uncompressedBuffer); + } + + uncompressedBuffer.close(); + return compressedBuffer; +} catch (IOException e) { + throw new RuntimeException(e); +} + } + + private ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) throws IOException { +byte[] inBytes = new byte[(int) uncompressedBuffer.writerIndex()]; +PlatformDependent.copyMemory(uncompressedBuffer.memoryAddress(), inBytes, 0, uncompressedBuffer.writerIndex()); +ByteArrayOutputStream baos = new ByteArrayOutputStream(); +try (InputStream in = new ByteArrayInputStream(inBytes); + OutputStream out = new FramedLZ4CompressorOutputStream(baos)) { + IOUtils.copy(in, out); +} + +byte[] outBytes = baos.toByteArray(); + +ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length); + +long uncompressedLength = uncompressedBuffer.writerIndex(); +if (!MemoryUtil.LITTLE_ENDIAN) { + uncompressedLength = Long.reverseBytes(uncompressedLength); +} +// first 8 bytes reserved for uncompressed length, to be consistent with the +// C++ implementation. +compressedBuffer.setLong(0, uncompressedLength); + +PlatformDependent.copyMemory( +outBytes, 0, compressedBuffer.memoryAddress() + CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, outBytes.length); +compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length); +return compressedBuffer; + } + + @Override + public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf compressedBuffer) { +Preconditions.checkArgument(compressedBuffer.writerIndex() <= Integer.MAX_VALUE, +"The compressed buffer size exceeds the integer limit"); + +Preconditions.checkArgument(compressedBuffer.writerIndex() >= CompressionUtil.SIZE_OF_UNCOMPRES
[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4
liyafan82 commented on a change in pull request #8949: URL: https://github.com/apache/arrow/pull/8949#discussion_r585290009 ## File path: java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java ## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.compression; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.arrow.flatbuf.CompressionType; +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.util.MemoryUtil; +import org.apache.arrow.util.Preconditions; +import org.apache.arrow.vector.compression.CompressionCodec; +import org.apache.arrow.vector.compression.CompressionUtil; +import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream; +import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream; +import org.apache.commons.compress.utils.IOUtils; + +import io.netty.util.internal.PlatformDependent; + +/** + * Compression codec for the LZ4 algorithm. + */ +public class Lz4CompressionCodec implements CompressionCodec { + + @Override + public ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) { +Preconditions.checkArgument(uncompressedBuffer.writerIndex() <= Integer.MAX_VALUE, +"The uncompressed buffer size exceeds the integer limit"); + +if (uncompressedBuffer.writerIndex() == 0L) { + // shortcut for empty buffer + ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH); + compressedBuffer.setLong(0, 0); + compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH); + uncompressedBuffer.close(); + return compressedBuffer; +} + +try { + ArrowBuf compressedBuffer = doCompress(allocator, uncompressedBuffer); + long compressedLength = compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH; + if (compressedLength > uncompressedBuffer.writerIndex()) { +// compressed buffer is larger, send the raw buffer +compressedBuffer.close(); +compressedBuffer = CompressionUtil.compressRawBuffer(allocator, uncompressedBuffer); + } + + uncompressedBuffer.close(); + return compressedBuffer; +} catch (IOException e) { + throw new RuntimeException(e); +} + } + + private ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) throws IOException { +byte[] inBytes = new byte[(int) uncompressedBuffer.writerIndex()]; +PlatformDependent.copyMemory(uncompressedBuffer.memoryAddress(), inBytes, 0, uncompressedBuffer.writerIndex()); +ByteArrayOutputStream baos = new ByteArrayOutputStream(); +try (InputStream in = new ByteArrayInputStream(inBytes); + OutputStream out = new FramedLZ4CompressorOutputStream(baos)) { + IOUtils.copy(in, out); +} + +byte[] outBytes = baos.toByteArray(); + +ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length); + +long uncompressedLength = uncompressedBuffer.writerIndex(); +if (!MemoryUtil.LITTLE_ENDIAN) { + uncompressedLength = Long.reverseBytes(uncompressedLength); +} +// first 8 bytes reserved for uncompressed length, to be consistent with the +// C++ implementation. +compressedBuffer.setLong(0, uncompressedLength); + +PlatformDependent.copyMemory( +outBytes, 0, compressedBuffer.memoryAddress() + CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, outBytes.length); +compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length); +return compressedBuffer; + } + + @Override + public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf compressedBuffer) { +Preconditions.checkArgument(compressedBuffer.writerIndex() <= Integer.MAX_VALUE, +"The compressed buffer size exceeds the integer limit"); + +Preconditions.checkArgument(compressedBuffer.writerIndex() >= CompressionUtil.SIZE_OF_UNCOMPRES
[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4
liyafan82 commented on a change in pull request #8949: URL: https://github.com/apache/arrow/pull/8949#discussion_r585195875 ## File path: java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java ## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.compression; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.arrow.flatbuf.CompressionType; +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.util.MemoryUtil; +import org.apache.arrow.util.Preconditions; +import org.apache.arrow.vector.compression.CompressionCodec; +import org.apache.arrow.vector.compression.CompressionUtil; +import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream; +import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream; +import org.apache.commons.compress.utils.IOUtils; + +import io.netty.util.internal.PlatformDependent; + +/** + * Compression codec for the LZ4 algorithm. + */ +public class Lz4CompressionCodec implements CompressionCodec { + + @Override + public ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) { +Preconditions.checkArgument(uncompressedBuffer.writerIndex() <= Integer.MAX_VALUE, +"The uncompressed buffer size exceeds the integer limit"); + +if (uncompressedBuffer.writerIndex() == 0L) { + // shortcut for empty buffer + ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH); + compressedBuffer.setLong(0, 0); + compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH); + uncompressedBuffer.close(); + return compressedBuffer; +} + +try { + ArrowBuf compressedBuffer = doCompress(allocator, uncompressedBuffer); + long compressedLength = compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH; + if (compressedLength > uncompressedBuffer.writerIndex()) { +// compressed buffer is larger, send the raw buffer +compressedBuffer.close(); +compressedBuffer = CompressionUtil.compressRawBuffer(allocator, uncompressedBuffer); + } + + uncompressedBuffer.close(); + return compressedBuffer; +} catch (IOException e) { + throw new RuntimeException(e); +} + } + + private ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) throws IOException { +byte[] inBytes = new byte[(int) uncompressedBuffer.writerIndex()]; +PlatformDependent.copyMemory(uncompressedBuffer.memoryAddress(), inBytes, 0, uncompressedBuffer.writerIndex()); +ByteArrayOutputStream baos = new ByteArrayOutputStream(); +try (InputStream in = new ByteArrayInputStream(inBytes); + OutputStream out = new FramedLZ4CompressorOutputStream(baos)) { + IOUtils.copy(in, out); +} + +byte[] outBytes = baos.toByteArray(); + +ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length); + +long uncompressedLength = uncompressedBuffer.writerIndex(); +if (!MemoryUtil.LITTLE_ENDIAN) { + uncompressedLength = Long.reverseBytes(uncompressedLength); +} +// first 8 bytes reserved for uncompressed length, to be consistent with the +// C++ implementation. +compressedBuffer.setLong(0, uncompressedLength); + +PlatformDependent.copyMemory( +outBytes, 0, compressedBuffer.memoryAddress() + CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, outBytes.length); +compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length); +return compressedBuffer; + } + + @Override + public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf compressedBuffer) { +Preconditions.checkArgument(compressedBuffer.writerIndex() <= Integer.MAX_VALUE, +"The compressed buffer size exceeds the integer limit"); + +Preconditions.checkArgument(compressedBuffer.writerIndex() >= CompressionUtil.SIZE_OF_UNCOMPRES
[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4
liyafan82 commented on a change in pull request #8949: URL: https://github.com/apache/arrow/pull/8949#discussion_r585195604 ## File path: java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java ## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.compression; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.arrow.flatbuf.CompressionType; +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.util.MemoryUtil; +import org.apache.arrow.util.Preconditions; +import org.apache.arrow.vector.compression.CompressionCodec; +import org.apache.arrow.vector.compression.CompressionUtil; +import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream; +import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream; +import org.apache.commons.compress.utils.IOUtils; + +import io.netty.util.internal.PlatformDependent; + +/** + * Compression codec for the LZ4 algorithm. + */ +public class Lz4CompressionCodec implements CompressionCodec { + + @Override + public ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) { +Preconditions.checkArgument(uncompressedBuffer.writerIndex() <= Integer.MAX_VALUE, +"The uncompressed buffer size exceeds the integer limit"); + +if (uncompressedBuffer.writerIndex() == 0L) { + // shortcut for empty buffer + ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH); + compressedBuffer.setLong(0, 0); + compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH); + uncompressedBuffer.close(); + return compressedBuffer; +} + +try { + ArrowBuf compressedBuffer = doCompress(allocator, uncompressedBuffer); + long compressedLength = compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH; + if (compressedLength > uncompressedBuffer.writerIndex()) { +// compressed buffer is larger, send the raw buffer +compressedBuffer.close(); +compressedBuffer = CompressionUtil.compressRawBuffer(allocator, uncompressedBuffer); + } + + uncompressedBuffer.close(); + return compressedBuffer; +} catch (IOException e) { + throw new RuntimeException(e); +} + } + + private ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) throws IOException { +byte[] inBytes = new byte[(int) uncompressedBuffer.writerIndex()]; +PlatformDependent.copyMemory(uncompressedBuffer.memoryAddress(), inBytes, 0, uncompressedBuffer.writerIndex()); +ByteArrayOutputStream baos = new ByteArrayOutputStream(); +try (InputStream in = new ByteArrayInputStream(inBytes); + OutputStream out = new FramedLZ4CompressorOutputStream(baos)) { + IOUtils.copy(in, out); +} + +byte[] outBytes = baos.toByteArray(); + +ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length); + +long uncompressedLength = uncompressedBuffer.writerIndex(); +if (!MemoryUtil.LITTLE_ENDIAN) { + uncompressedLength = Long.reverseBytes(uncompressedLength); +} +// first 8 bytes reserved for uncompressed length, to be consistent with the +// C++ implementation. +compressedBuffer.setLong(0, uncompressedLength); + +PlatformDependent.copyMemory( +outBytes, 0, compressedBuffer.memoryAddress() + CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, outBytes.length); +compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length); +return compressedBuffer; + } + + @Override + public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf compressedBuffer) { +Preconditions.checkArgument(compressedBuffer.writerIndex() <= Integer.MAX_VALUE, +"The compressed buffer size exceeds the integer limit"); + +Preconditions.checkArgument(compressedBuffer.writerIndex() >= CompressionUtil.SIZE_OF_UNCOMPRES
[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4
liyafan82 commented on a change in pull request #8949: URL: https://github.com/apache/arrow/pull/8949#discussion_r585195259 ## File path: java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java ## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.compression; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.arrow.flatbuf.CompressionType; +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.util.MemoryUtil; +import org.apache.arrow.util.Preconditions; +import org.apache.arrow.vector.compression.CompressionCodec; +import org.apache.arrow.vector.compression.CompressionUtil; +import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream; +import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream; +import org.apache.commons.compress.utils.IOUtils; + +import io.netty.util.internal.PlatformDependent; + +/** + * Compression codec for the LZ4 algorithm. + */ +public class Lz4CompressionCodec implements CompressionCodec { + + @Override + public ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) { +Preconditions.checkArgument(uncompressedBuffer.writerIndex() <= Integer.MAX_VALUE, +"The uncompressed buffer size exceeds the integer limit"); + +if (uncompressedBuffer.writerIndex() == 0L) { + // shortcut for empty buffer + ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH); + compressedBuffer.setLong(0, 0); + compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH); + uncompressedBuffer.close(); + return compressedBuffer; +} + +try { + ArrowBuf compressedBuffer = doCompress(allocator, uncompressedBuffer); + long compressedLength = compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH; + if (compressedLength > uncompressedBuffer.writerIndex()) { +// compressed buffer is larger, send the raw buffer +compressedBuffer.close(); +compressedBuffer = CompressionUtil.compressRawBuffer(allocator, uncompressedBuffer); + } + + uncompressedBuffer.close(); + return compressedBuffer; +} catch (IOException e) { + throw new RuntimeException(e); +} + } + + private ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) throws IOException { +byte[] inBytes = new byte[(int) uncompressedBuffer.writerIndex()]; +PlatformDependent.copyMemory(uncompressedBuffer.memoryAddress(), inBytes, 0, uncompressedBuffer.writerIndex()); +ByteArrayOutputStream baos = new ByteArrayOutputStream(); +try (InputStream in = new ByteArrayInputStream(inBytes); + OutputStream out = new FramedLZ4CompressorOutputStream(baos)) { + IOUtils.copy(in, out); +} + +byte[] outBytes = baos.toByteArray(); + +ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length); + +long uncompressedLength = uncompressedBuffer.writerIndex(); +if (!MemoryUtil.LITTLE_ENDIAN) { + uncompressedLength = Long.reverseBytes(uncompressedLength); +} +// first 8 bytes reserved for uncompressed length, to be consistent with the +// C++ implementation. +compressedBuffer.setLong(0, uncompressedLength); + +PlatformDependent.copyMemory( +outBytes, 0, compressedBuffer.memoryAddress() + CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, outBytes.length); +compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length); +return compressedBuffer; + } + + @Override + public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf compressedBuffer) { +Preconditions.checkArgument(compressedBuffer.writerIndex() <= Integer.MAX_VALUE, +"The compressed buffer size exceeds the integer limit"); + +Preconditions.checkArgument(compressedBuffer.writerIndex() >= CompressionUtil.SIZE_OF_UNCOMPRES
[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4
liyafan82 commented on a change in pull request #8949: URL: https://github.com/apache/arrow/pull/8949#discussion_r575091316 ## File path: java/vector/pom.xml ## @@ -74,6 +74,11 @@ org.slf4j slf4j-api + Review comment: @emkornfield Sounds reasonable. I will try to revise the PR accordingly. Thanks for your good suggestion. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4
liyafan82 commented on a change in pull request #8949: URL: https://github.com/apache/arrow/pull/8949#discussion_r558083064 ## File path: java/vector/src/main/java/org/apache/arrow/vector/compression/Lz4CompressionCodec.java ## @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.vector.compression; + +import static org.apache.arrow.memory.util.MemoryUtil.LITTLE_ENDIAN; + +import java.nio.ByteBuffer; + +import org.apache.arrow.flatbuf.CompressionType; +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.util.MemoryUtil; +import org.apache.arrow.util.Preconditions; + +import net.jpountz.lz4.LZ4Compressor; +import net.jpountz.lz4.LZ4Factory; +import net.jpountz.lz4.LZ4FastDecompressor; + +/** + * Compression codec for the LZ4 algorithm. + */ +public class Lz4CompressionCodec implements CompressionCodec { + + private static final long SIZE_OF_MESSAGE_LENGTH = 8L; + + private final LZ4Factory factory; + + private LZ4Compressor compressor; + + private LZ4FastDecompressor decompressor; + + public Lz4CompressionCodec() { +factory = LZ4Factory.fastestInstance(); + } + + @Override + public ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) { +Preconditions.checkArgument(uncompressedBuffer.writerIndex() <= Integer.MAX_VALUE, +"The uncompressed buffer size exceeds the integer limit"); + +if (uncompressedBuffer.writerIndex() == 0L) { + // shortcut for empty buffer + ArrowBuf compressedBuffer = allocator.buffer(SIZE_OF_MESSAGE_LENGTH); + compressedBuffer.setLong(0, 0); + compressedBuffer.writerIndex(SIZE_OF_MESSAGE_LENGTH); + uncompressedBuffer.close(); + return compressedBuffer; +} + +// create compressor lazily +if (compressor == null) { + compressor = factory.fastCompressor(); +} + +int maxCompressedLength = compressor.maxCompressedLength((int) uncompressedBuffer.writerIndex()); + +// first 8 bytes reserved for uncompressed length, to be consistent with the +// C++ implementation. +ArrowBuf compressedBuffer = allocator.buffer(maxCompressedLength + SIZE_OF_MESSAGE_LENGTH); +long uncompressedLength = uncompressedBuffer.writerIndex(); +if (!LITTLE_ENDIAN) { + uncompressedLength = Long.reverseBytes(uncompressedLength); +} +compressedBuffer.setLong(0, uncompressedLength); + +ByteBuffer uncompressed = +MemoryUtil.directBuffer(uncompressedBuffer.memoryAddress(), (int) uncompressedBuffer.writerIndex()); +ByteBuffer compressed = +MemoryUtil.directBuffer(compressedBuffer.memoryAddress() + SIZE_OF_MESSAGE_LENGTH, maxCompressedLength); + +int compressedLength = compressor.compress( +uncompressed, 0, (int) uncompressedBuffer.writerIndex(), compressed, 0, maxCompressedLength); +compressedBuffer.writerIndex(compressedLength + SIZE_OF_MESSAGE_LENGTH); + +uncompressedBuffer.close(); +return compressedBuffer; + } + + @Override + public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf compressedBuffer) { +Preconditions.checkArgument(compressedBuffer.writerIndex() <= Integer.MAX_VALUE, +"The compressed buffer size exceeds the integer limit"); + +Preconditions.checkArgument(compressedBuffer.writerIndex() >= SIZE_OF_MESSAGE_LENGTH, +"Not enough data to decompress."); + +long decompressedLength = compressedBuffer.getLong(0); +if (!LITTLE_ENDIAN) { + decompressedLength = Long.reverseBytes(decompressedLength); +} + +if (decompressedLength == 0L) { + // shortcut for empty buffer + compressedBuffer.close(); + return allocator.getEmpty(); +} + +// create decompressor lazily +if (decompressor == null) { + decompressor = factory.fastDecompressor(); +} + +ByteBuffer compressed = MemoryUtil.directBuffer( +compressedBuffer.memoryAddress() + SIZE_OF_MESSAGE_LENGTH, (int) compressedBuffer.writerIndex()); Review comment: Nice catch. Thank you @stczwd This is an automated message from the Apache
[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4
liyafan82 commented on a change in pull request #8949: URL: https://github.com/apache/arrow/pull/8949#discussion_r552335719 ## File path: java/vector/src/main/java/org/apache/arrow/vector/compression/Lz4CompressionCodec.java ## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.vector.compression; + +import static org.apache.arrow.memory.util.MemoryUtil.LITTLE_ENDIAN; + +import java.nio.ByteBuffer; + +import org.apache.arrow.flatbuf.CompressionType; +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.util.MemoryUtil; +import org.apache.arrow.util.Preconditions; + +import net.jpountz.lz4.LZ4Compressor; +import net.jpountz.lz4.LZ4Factory; +import net.jpountz.lz4.LZ4FastDecompressor; + +/** + * Compression codec for the LZ4 algorithm. + */ +public class Lz4CompressionCodec implements CompressionCodec { + + private static final long SIZE_OF_MESSAGE_LENGTH = 8L; + + private final LZ4Factory factory; + + private LZ4Compressor compressor; + + private LZ4FastDecompressor decompressor; + + public Lz4CompressionCodec() { +factory = LZ4Factory.fastestInstance(); + } + + @Override + public ArrowBuf compress(BufferAllocator allocator, ArrowBuf unCompressedBuffer) { +Preconditions.checkArgument(unCompressedBuffer.writerIndex() <= Integer.MAX_VALUE, +"The uncompressed buffer size exceeds the integer limit"); + +// create compressor lazily Review comment: For some scenarios (e.g. flight sender), we only need the compressor, while for others (e.g. flight receiver), we only need the decompressor. So there is no need to create both eagerly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4
liyafan82 commented on a change in pull request #8949: URL: https://github.com/apache/arrow/pull/8949#discussion_r552335378 ## File path: java/vector/src/main/java/org/apache/arrow/vector/compression/Lz4CompressionCodec.java ## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.vector.compression; + +import static org.apache.arrow.memory.util.MemoryUtil.LITTLE_ENDIAN; + +import java.nio.ByteBuffer; + +import org.apache.arrow.flatbuf.CompressionType; +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.util.MemoryUtil; +import org.apache.arrow.util.Preconditions; + +import net.jpountz.lz4.LZ4Compressor; +import net.jpountz.lz4.LZ4Factory; +import net.jpountz.lz4.LZ4FastDecompressor; + +/** + * Compression codec for the LZ4 algorithm. + */ +public class Lz4CompressionCodec implements CompressionCodec { + + private static final long SIZE_OF_MESSAGE_LENGTH = 8L; + + private final LZ4Factory factory; + + private LZ4Compressor compressor; + + private LZ4FastDecompressor decompressor; + + public Lz4CompressionCodec() { +factory = LZ4Factory.fastestInstance(); + } + + @Override + public ArrowBuf compress(BufferAllocator allocator, ArrowBuf unCompressedBuffer) { Review comment: Nice catch. Thank you! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4
liyafan82 commented on a change in pull request #8949: URL: https://github.com/apache/arrow/pull/8949#discussion_r552332372 ## File path: java/vector/src/main/java/org/apache/arrow/vector/compression/Lz4CompressionCodec.java ## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.vector.compression; + +import static org.apache.arrow.memory.util.MemoryUtil.LITTLE_ENDIAN; + +import java.nio.ByteBuffer; + +import org.apache.arrow.flatbuf.CompressionType; +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.util.MemoryUtil; +import org.apache.arrow.util.Preconditions; + +import net.jpountz.lz4.LZ4Compressor; Review comment: @kiszk You are right. I chose this library because our C++ implementation also depends on this repo (https://github.com/lz4/lz4). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4
liyafan82 commented on a change in pull request #8949: URL: https://github.com/apache/arrow/pull/8949#discussion_r546486677 ## File path: java/vector/src/main/java/org/apache/arrow/vector/compression/Lz4CompressionCodec.java ## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.vector.compression; + +import java.nio.ByteBuffer; + +import org.apache.arrow.flatbuf.CompressionType; +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.util.MemoryUtil; +import org.apache.arrow.util.Preconditions; + +import net.jpountz.lz4.LZ4Compressor; +import net.jpountz.lz4.LZ4Factory; +import net.jpountz.lz4.LZ4FastDecompressor; + +/** + * Compression codec for the LZ4 algorithm. + */ +public class Lz4CompressionCodec implements CompressionCodec { + + private static final long SIZE_OF_MESSAGE_LENGTH = 8L; + + private final LZ4Factory factory; + + private LZ4Compressor compressor; + + private LZ4FastDecompressor decompressor; + + public Lz4CompressionCodec() { +factory = LZ4Factory.fastestInstance(); + } + + @Override + public ArrowBuf compress(BufferAllocator allocator, ArrowBuf unCompressedBuffer) { +Preconditions.checkArgument(unCompressedBuffer.writerIndex() <= Integer.MAX_VALUE, +"The uncompressed buffer size exceeds the integer limit"); + +// create compressor lazily +if (compressor == null) { + compressor = factory.fastCompressor(); +} + +int maxCompressedLength = compressor.maxCompressedLength((int) unCompressedBuffer.writerIndex()); + +// first 8 bytes reserved for uncompressed length, to be consistent with the +// C++ implementation. +ArrowBuf compressedBuffer = allocator.buffer(maxCompressedLength + SIZE_OF_MESSAGE_LENGTH); +compressedBuffer.setLong(0, unCompressedBuffer.writerIndex()); Review comment: Revised accordingly. Thanks for your kind reminder. ## File path: java/vector/src/main/java/org/apache/arrow/vector/compression/Lz4CompressionCodec.java ## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.vector.compression; + +import java.nio.ByteBuffer; + +import org.apache.arrow.flatbuf.CompressionType; +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.util.MemoryUtil; +import org.apache.arrow.util.Preconditions; + +import net.jpountz.lz4.LZ4Compressor; +import net.jpountz.lz4.LZ4Factory; +import net.jpountz.lz4.LZ4FastDecompressor; + +/** + * Compression codec for the LZ4 algorithm. + */ +public class Lz4CompressionCodec implements CompressionCodec { + + private static final long SIZE_OF_MESSAGE_LENGTH = 8L; + + private final LZ4Factory factory; + + private LZ4Compressor compressor; + + private LZ4FastDecompressor decompressor; + + public Lz4CompressionCodec() { +factory = LZ4Factory.fastestInstance(); + } + + @Override + public ArrowBuf compress(BufferAllocator allocator, ArrowBuf unCompressedBuffer) { +Preconditions.checkArgument(unCompressedBuffer.writerIndex() <= Integer.MAX_VALUE, +"The uncompressed buffer size exceeds the integer limit"); + +// create compressor lazily +if (compressor == null) { + compressor = factory.fastCompressor(); +} + +int maxCompressedLength = compressor.maxCompressedLength((int) unCompres