This is an automated email from the ASF dual-hosted git repository. kurt pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 2345bbe [FLINK-11858][table-runtime-blink] Introduce block compression to batch table runtime. 2345bbe is described below commit 2345bbe0c8cfc1af75b8faef243dcde11777d5b7 Author: Kurt Young <ykt...@gmail.com> AuthorDate: Fri Mar 8 13:59:02 2019 +0800 [FLINK-11858][table-runtime-blink] Introduce block compression to batch table runtime. This closes #7941 --- flink-table/flink-table-runtime-blink/pom.xml | 7 + .../compression/BlockCompressionFactory.java | 30 ++++ .../table/runtime/compression/BlockCompressor.java | 66 +++++++ .../runtime/compression/BlockDecompressor.java | 64 +++++++ .../compression/DataCorruptionException.java | 42 +++++ .../compression/InsufficientBufferException.java | 43 +++++ .../compression/Lz4BlockCompressionFactory.java | 42 +++++ .../runtime/compression/Lz4BlockCompressor.java | 106 ++++++++++++ .../runtime/compression/Lz4BlockDecompressor.java | 126 ++++++++++++++ .../runtime/compression/BlockCompressionTest.java | 192 +++++++++++++++++++++ 10 files changed, 718 insertions(+) diff --git a/flink-table/flink-table-runtime-blink/pom.xml b/flink-table/flink-table-runtime-blink/pom.xml index e8b1623..114b4ea 100644 --- a/flink-table/flink-table-runtime-blink/pom.xml +++ b/flink-table/flink-table-runtime-blink/pom.xml @@ -61,6 +61,13 @@ under the License. <version>${janino.version}</version> </dependency> + <!-- Lz4 compression library --> + <dependency> + <groupId>org.lz4</groupId> + <artifactId>lz4-java</artifactId> + <version>1.5.0</version> + </dependency> + <!-- test dependencies --> <dependency> diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/compression/BlockCompressionFactory.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/compression/BlockCompressionFactory.java new file mode 100644 index 0000000..a3c1261 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/compression/BlockCompressionFactory.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.compression; + +/** + * Each compression codec has a implementation of {@link BlockCompressionFactory} + * to create compressors and decompressors. + */ +public interface BlockCompressionFactory { + + BlockCompressor getCompressor(); + + BlockDecompressor getDecompressor(); +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/compression/BlockCompressor.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/compression/BlockCompressor.java new file mode 100644 index 0000000..7c1e5b2 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/compression/BlockCompressor.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.compression; + +import java.nio.ByteBuffer; + +/** + * A compressor which compresses a whole byte array each time. + * It will read from and write to byte arrays given from the outside, reducing copy time. + */ +public interface BlockCompressor { + + /** + * Get the max compressed size for a given original size. + */ + int getMaxCompressedSize(int srcSize); + + /** + * Compress source data read from ({@link ByteBuffer#position()} + {@code srcOff}), + * and write the compressed data to dst. + * + * @param src Uncompressed data to read from + * @param srcOff The start offset of uncompressed data + * @param srcLen The length of data which want to be compressed + * @param dst The target to write compressed data + * @param dstOff The start offset to write the compressed data + * + * @return Length of compressed data + * + * @throws InsufficientBufferException if the target does not have sufficient space + */ + int compress(ByteBuffer src, int srcOff, int srcLen, ByteBuffer dst, int dstOff) + throws InsufficientBufferException; + + /** + * Compress data read from src, and write the compressed data to dst. + * + * @param src Uncompressed data to read from + * @param srcOff The start offset of uncompressed data + * @param srcLen The length of data which want to be compressed + * @param dst The target to write compressed data + * @param dstOff The start offset to write the compressed data + * + * @return Length of compressed data + * + * @throws InsufficientBufferException if the target does not have sufficient space + */ + int compress(byte[] src, int srcOff, int srcLen, byte[] dst, int dstOff) + throws InsufficientBufferException; +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/compression/BlockDecompressor.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/compression/BlockDecompressor.java new file mode 100644 index 0000000..b1c7ecf --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/compression/BlockDecompressor.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.compression; + +import java.nio.ByteBuffer; + +/** + * A decompressor which decompresses a block each time. + */ +public interface BlockDecompressor { + + /** + * Decompress source data read from ({@link ByteBuffer#position()} + {@code srcOff}), + * and write the decompressed data to dst. + * + * @param src Compressed data to read from + * @param srcOff The start offset of compressed data + * @param srcLen The length of data which want to be decompressed + * @param dst The target to write decompressed data + * @param dstOff The start offset to write the decompressed data + * + * @return Length of decompressed data + * + * @throws DataCorruptionException if data corruption found when decompressing + * @throws InsufficientBufferException if the target does not have sufficient space + */ + int decompress(ByteBuffer src, int srcOff, int srcLen, ByteBuffer dst, int dstOff) + throws DataCorruptionException, InsufficientBufferException; + + + /** + * Decompress source data read from src and write the decompressed data to dst. + * + * @param src Compressed data to read from + * @param srcOff The start offset of compressed data + * @param srcLen The length of data which want to be decompressed + * @param dst The target to write decompressed data + * @param dstOff The start offset to write the decompressed data + * + * @return Length of decompressed data + * + * @throws DataCorruptionException if data corruption found when decompressing + * @throws InsufficientBufferException if the target does not have sufficient space + */ + int decompress(byte[] src, int srcOff, int srcLen, byte[] dst, int dstOff) + throws DataCorruptionException, InsufficientBufferException; + +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/compression/DataCorruptionException.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/compression/DataCorruptionException.java new file mode 100644 index 0000000..f1a40c3 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/compression/DataCorruptionException.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.compression; + +/** + * A {@code DataCorruptionException} is thrown when the decompressed data is corrupted and cannot be + * decompressed. + */ +public class DataCorruptionException extends RuntimeException { + + public DataCorruptionException() { + super(); + } + + public DataCorruptionException(String message) { + super(message); + } + + public DataCorruptionException(String message, Throwable e) { + super(message, e); + } + + public DataCorruptionException(Throwable e) { + super(e); + } +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/compression/InsufficientBufferException.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/compression/InsufficientBufferException.java new file mode 100644 index 0000000..98a8125 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/compression/InsufficientBufferException.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.compression; + +/** + * An {@code InsufficientBufferException} is thrown when there is no enough buffer to + * serialize or deserialize a buffer to another buffer. When such exception being caught, + * user may enlarge the output buffer and try again. + */ +public class InsufficientBufferException extends RuntimeException { + + public InsufficientBufferException() { + super(); + } + + public InsufficientBufferException(String message) { + super(message); + } + + public InsufficientBufferException(String message, Throwable e) { + super(message, e); + } + + public InsufficientBufferException(Throwable e) { + super(e); + } +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/compression/Lz4BlockCompressionFactory.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/compression/Lz4BlockCompressionFactory.java new file mode 100644 index 0000000..0c69589 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/compression/Lz4BlockCompressionFactory.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.compression; + +/** + * Implementation of {@link BlockCompressionFactory} for Lz4 codec. + */ +public class Lz4BlockCompressionFactory implements BlockCompressionFactory { + + + /** + * We put two integers before each compressed block, the first integer represents the compressed + * length of the block, and the second one represents the original length of the block. + */ + static final int HEADER_LENGTH = 8; + + @Override + public BlockCompressor getCompressor() { + return new Lz4BlockCompressor(); + } + + @Override + public BlockDecompressor getDecompressor() { + return new Lz4BlockDecompressor(); + } +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/compression/Lz4BlockCompressor.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/compression/Lz4BlockCompressor.java new file mode 100644 index 0000000..d908b04 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/compression/Lz4BlockCompressor.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.compression; + +import net.jpountz.lz4.LZ4Compressor; +import net.jpountz.lz4.LZ4Exception; +import net.jpountz.lz4.LZ4Factory; + +import java.nio.BufferOverflowException; +import java.nio.ByteBuffer; + +import static org.apache.flink.table.runtime.compression.Lz4BlockCompressionFactory.HEADER_LENGTH; + +/** + * Encode data into LZ4 format (not compatible with the LZ4 Frame format). + * It reads from and writes to byte arrays provided from the outside, thus reducing copy time. + * + * <p>This class is copied and modified from {@link net.jpountz.lz4.LZ4BlockOutputStream}. + */ +public class Lz4BlockCompressor implements BlockCompressor { + + private final LZ4Compressor compressor; + + public Lz4BlockCompressor() { + this.compressor = LZ4Factory.fastestInstance().fastCompressor(); + } + + @Override + public int getMaxCompressedSize(int srcSize) { + return HEADER_LENGTH + compressor.maxCompressedLength(srcSize); + } + + @Override + public int compress(ByteBuffer src, int srcOff, int srcLen, ByteBuffer dst, int dstOff) + throws InsufficientBufferException { + try { + final int prevSrcOff = src.position() + srcOff; + final int prevDstOff = dst.position() + dstOff; + + int maxCompressedSize = compressor.maxCompressedLength(srcLen); + int compressedLength = compressor.compress( + src, + prevSrcOff, + srcLen, + dst, + prevDstOff + HEADER_LENGTH, + maxCompressedSize + ); + + src.position(prevSrcOff + srcLen); + + dst.position(prevDstOff); + dst.putInt(compressedLength); + dst.putInt(srcLen); + dst.position(prevDstOff + compressedLength + HEADER_LENGTH); + + return HEADER_LENGTH + compressedLength; + } + catch (LZ4Exception | ArrayIndexOutOfBoundsException | BufferOverflowException e) { + throw new InsufficientBufferException(e); + } + } + + @Override + public int compress(byte[] src, int srcOff, int srcLen, byte[] dst, int dstOff) + throws InsufficientBufferException { + try { + int compressedLength = compressor.compress( + src, + srcOff, + srcLen, + dst, + dstOff + HEADER_LENGTH + ); + writeIntLE(compressedLength, dst, dstOff); + writeIntLE(srcLen, dst, dstOff + 4); + return HEADER_LENGTH + compressedLength; + } + catch (LZ4Exception | BufferOverflowException | ArrayIndexOutOfBoundsException e) { + throw new InsufficientBufferException(e); + } + } + + private static void writeIntLE(int i, byte[] buf, int offset) { + buf[offset++] = (byte) i; + buf[offset++] = (byte) (i >>> 8); + buf[offset++] = (byte) (i >>> 16); + buf[offset] = (byte) (i >>> 24); + } +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/compression/Lz4BlockDecompressor.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/compression/Lz4BlockDecompressor.java new file mode 100644 index 0000000..80ab179 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/compression/Lz4BlockDecompressor.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.compression; + +import net.jpountz.lz4.LZ4Exception; +import net.jpountz.lz4.LZ4Factory; +import net.jpountz.lz4.LZ4FastDecompressor; +import net.jpountz.util.SafeUtils; + +import java.nio.ByteBuffer; + +import static org.apache.flink.table.runtime.compression.Lz4BlockCompressionFactory.HEADER_LENGTH; + +/** + * Decode data written with {@link Lz4BlockCompressor}. + * It reads from and writes to byte arrays provided from the outside, thus reducing copy time. + * + * <p>This class is copied and modified from {@link net.jpountz.lz4.LZ4BlockInputStream}. + */ +public class Lz4BlockDecompressor implements BlockDecompressor { + + private final LZ4FastDecompressor decompressor; + + public Lz4BlockDecompressor() { + this.decompressor = LZ4Factory.fastestInstance().fastDecompressor(); + } + + @Override + public int decompress(ByteBuffer src, int srcOff, int srcLen, ByteBuffer dst, int dstOff) + throws DataCorruptionException { + final int prevSrcOff = src.position() + srcOff; + final int prevDstOff = dst.position() + dstOff; + + final int compressedLen = src.getInt(prevSrcOff); + final int originalLen = src.getInt(prevSrcOff + 4); + validateLength(compressedLen, originalLen); + + if (dst.capacity() - prevDstOff < originalLen) { + throw new InsufficientBufferException("Buffer length too small"); + } + + if (src.limit() - prevSrcOff - HEADER_LENGTH < compressedLen) { + throw new DataCorruptionException("Source data is not integral for decompression."); + } + + try { + final int compressedLen2 = decompressor.decompress( + src, + prevSrcOff + HEADER_LENGTH, + dst, + prevDstOff, + originalLen + ); + if (compressedLen != compressedLen2) { + throw new DataCorruptionException( + "Input is corrupted, unexpected compressed length."); + } + src.position(prevSrcOff + compressedLen + HEADER_LENGTH); + dst.position(prevDstOff + originalLen); + } + catch (LZ4Exception e) { + throw new DataCorruptionException("Input is corrupted", e); + } + + return originalLen; + } + + @Override + public int decompress(byte[] src, int srcOff, int srcLen, byte[] dst, int dstOff) + throws InsufficientBufferException, DataCorruptionException { + final int compressedLen = SafeUtils.readIntLE(src, srcOff); + final int originalLen = SafeUtils.readIntLE(src, srcOff + 4); + validateLength(compressedLen, originalLen); + + if (dst.length - dstOff < originalLen) { + throw new InsufficientBufferException("Buffer length too small"); + } + + if (src.length - srcOff - HEADER_LENGTH < compressedLen) { + throw new DataCorruptionException("Source data is not integral for decompression."); + } + + try { + final int compressedLen2 = decompressor.decompress( + src, + srcOff + HEADER_LENGTH, + dst, + dstOff, + originalLen + ); + if (compressedLen != compressedLen2) { + throw new DataCorruptionException("Input is corrupted"); + } + } + catch (LZ4Exception e) { + throw new DataCorruptionException("Input is corrupted", e); + } + + return originalLen; + } + + private void validateLength(int compressedLen, int originalLen) throws DataCorruptionException { + if (originalLen < 0 + || compressedLen < 0 + || (originalLen == 0 && compressedLen != 0) + || (originalLen != 0 && compressedLen == 0)) { + throw new DataCorruptionException("Input is corrupted, invalid length."); + } + } +} diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/compression/BlockCompressionTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/compression/BlockCompressionTest.java new file mode 100644 index 0000000..ebcfec7 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/compression/BlockCompressionTest.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.compression; + +import org.junit.Assert; +import org.junit.Test; +import sun.misc.Cleaner; +import sun.nio.ch.DirectBuffer; + +import java.nio.ByteBuffer; + +import static org.apache.flink.table.runtime.compression.Lz4BlockCompressionFactory.HEADER_LENGTH; +import static org.junit.Assert.assertEquals; + +/** + * Tests for block compression. + */ +public class BlockCompressionTest { + + @Test + public void testLz4() { + BlockCompressionFactory factory = new Lz4BlockCompressionFactory(); + runArrayTest(factory, 32768); + runArrayTest(factory, 16); + + runByteBufferTest(factory, false, 32768); + runByteBufferTest(factory, false, 16); + runByteBufferTest(factory, true, 32768); + runByteBufferTest(factory, true, 16); + } + + private void runArrayTest(BlockCompressionFactory factory, int originalLen) { + BlockCompressor compressor = factory.getCompressor(); + BlockDecompressor decompressor = factory.getDecompressor(); + + int originalOff = 64; + byte[] data = new byte[originalOff + originalLen]; + for (int i = 0; i < originalLen; i++) { + data[originalOff + i] = (byte) i; + } + + int compressedOff = 32; + + // 1. test compress with insufficient target + byte[] insufficientArray = new byte[compressedOff + HEADER_LENGTH + 1]; + try { + compressor.compress(data, originalOff, originalLen, insufficientArray, compressedOff); + Assert.fail("expect exception here"); + } catch (InsufficientBufferException ex) {} + + // 2. test normal compress + byte[] compressedData = + new byte[compressedOff + compressor.getMaxCompressedSize(originalLen)]; + int compressedLen = compressor.compress( + data, + originalOff, + originalLen, + compressedData, + compressedOff + ); + + int decompressedOff = 16; + + // 3. test decompress with insufficient target + insufficientArray = new byte[decompressedOff + originalLen - 1]; + try { + decompressor.decompress( + compressedData, + compressedOff, + compressedLen, + insufficientArray, + decompressedOff + ); + Assert.fail("expect exception here"); + } catch (InsufficientBufferException ex) {} + + // 4. test normal decompress + byte[] decompressedData = new byte[decompressedOff + originalLen]; + int decompressedLen = decompressor.decompress( + compressedData, + compressedOff, + compressedLen, + decompressedData, + decompressedOff + ); + assertEquals(originalLen, decompressedLen); + + for (int i = 0; i < originalLen; i++) { + assertEquals(data[originalOff + i], decompressedData[decompressedOff + i]); + } + } + + private void runByteBufferTest( + BlockCompressionFactory factory, + boolean isDirect, + int originalLen) { + BlockCompressor compressor = factory.getCompressor(); + BlockDecompressor decompressor = factory.getDecompressor(); + + int originalOff = 64; + ByteBuffer data; + if (isDirect) { + data = ByteBuffer.allocateDirect(originalOff + originalLen); + } else { + data = ByteBuffer.allocate(originalOff + originalLen); + } + + // Useless data + for (int i = 0; i < originalOff; i++) { + data.put((byte) 0x5a); + } + + for (int i = 0; i < originalLen; i++) { + data.put((byte) i); + } + data.flip(); + + ByteBuffer compressedData; + int maxCompressedLen = compressor.getMaxCompressedSize(originalLen); + if (isDirect) { + compressedData = ByteBuffer.allocateDirect(maxCompressedLen); + } else { + compressedData = ByteBuffer.allocate(maxCompressedLen); + } + int compressedLen = compressor.compress(data, originalOff, originalLen, compressedData, 0); + assertEquals(compressedLen, compressedData.position()); + compressedData.flip(); + + int compressedOff = 32; + ByteBuffer copiedCompressedData; + if (isDirect) { + copiedCompressedData = ByteBuffer.allocateDirect(compressedOff + compressedLen); + } else { + copiedCompressedData = ByteBuffer.allocate(compressedOff + compressedLen); + } + + // Useless data + for (int i = 0; i < compressedOff; i++) { + copiedCompressedData.put((byte) 0x5a); + } + + byte[] compressedByteArray = new byte[compressedLen]; + compressedData.get(compressedByteArray, 0, compressedLen); + copiedCompressedData.put(compressedByteArray); + copiedCompressedData.flip(); + + ByteBuffer decompressedData; + if (isDirect) { + decompressedData = ByteBuffer.allocateDirect(originalLen); + } else { + decompressedData = ByteBuffer.allocate(originalLen); + } + int decompressedLen = decompressor.decompress( + copiedCompressedData, compressedOff, compressedLen, decompressedData, 0); + assertEquals(decompressedLen, decompressedData.position()); + decompressedData.flip(); + + for (int i = 0; i < decompressedLen; i++) { + assertEquals((byte) i, decompressedData.get()); + } + + if (isDirect) { + cleanDirectBuffer(data); + cleanDirectBuffer(compressedData); + cleanDirectBuffer(copiedCompressedData); + cleanDirectBuffer(decompressedData); + } + } + + private void cleanDirectBuffer(ByteBuffer buffer) { + Cleaner cleaner = ((DirectBuffer) buffer).cleaner(); + if (cleaner != null) { + cleaner.clean(); + } + } +}