This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch 2.3 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.3 by this push: new f8129f6 KAFKA-9203: Revert "MINOR: Remove workarounds for lz4-java bug affecting byte buffers (#6679)" (#7769) f8129f6 is described below commit f8129f6fa82b8aeaca4d2eb182183a6b86a5598b Author: Ismael Juma <ism...@juma.me.uk> AuthorDate: Tue Dec 3 08:26:17 2019 -0800 KAFKA-9203: Revert "MINOR: Remove workarounds for lz4-java bug affecting byte buffers (#6679)" (#7769) This reverts commit 90043d5f as it caused a regression in some cases: > Caused by: java.io.IOException: Stream frame descriptor corrupted > at org.apache.kafka.common.record.KafkaLZ4BlockInputStream.readHeader(KafkaLZ4BlockInputStream.java:132) > at org.apache.kafka.common.record.KafkaLZ4BlockInputStream.<init>(KafkaLZ4BlockInputStream.java:78) > at org.apache.kafka.common.record.CompressionType$4.wrapForInput(CompressionType.java:110) I will investigate why after, but I want to get the safe fix into 2.4.0. The reporter of KAFKA-9203 has verified that reverting this change makes the problem go away. Reviewers: Manikumar Reddy <manikumar.re...@gmail.com> --- .../common/record/KafkaLZ4BlockInputStream.java | 33 +++++++++++++++++++--- 1 file changed, 29 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java index 9a37833..850b1e9 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java +++ b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java @@ -77,6 +77,11 @@ public final class KafkaLZ4BlockInputStream extends InputStream { this.bufferSupplier = bufferSupplier; readHeader(); decompressionBuffer = bufferSupplier.get(maxBlockSize); + if (!decompressionBuffer.hasArray() || decompressionBuffer.arrayOffset() != 0) { + // require array backed decompression buffer with zero offset + // to simplify workaround for https://github.com/lz4/lz4-java/pull/65 + throw new RuntimeException("decompression buffer must have backing array with zero array offset"); + } finished = false; } @@ -126,7 +131,10 @@ public final class KafkaLZ4BlockInputStream extends InputStream { int len = in.position() - in.reset().position(); - int hash = CHECKSUM.hash(in, in.position(), len, 0); + int hash = in.hasArray() ? + // workaround for https://github.com/lz4/lz4-java/pull/65 + CHECKSUM.hash(in.array(), in.arrayOffset() + in.position(), len, 0) : + CHECKSUM.hash(in, in.position(), len, 0); in.position(in.position() + len); if (in.get() != (byte) ((hash >> 8) & 0xFF)) { throw new IOException(DESCRIPTOR_HASH_MISMATCH); @@ -164,8 +172,22 @@ public final class KafkaLZ4BlockInputStream extends InputStream { if (compressed) { try { - final int bufferSize = DECOMPRESSOR.decompress(in, in.position(), blockSize, decompressionBuffer, 0, - maxBlockSize); + // workaround for https://github.com/lz4/lz4-java/pull/65 + final int bufferSize; + if (in.hasArray()) { + bufferSize = DECOMPRESSOR.decompress( + in.array(), + in.position() + in.arrayOffset(), + blockSize, + decompressionBuffer.array(), + 0, + maxBlockSize + ); + } else { + // decompressionBuffer has zero arrayOffset, so we don't need to worry about + // https://github.com/lz4/lz4-java/pull/65 + bufferSize = DECOMPRESSOR.decompress(in, in.position(), blockSize, decompressionBuffer, 0, maxBlockSize); + } decompressionBuffer.position(0); decompressionBuffer.limit(bufferSize); decompressedBuffer = decompressionBuffer; @@ -179,7 +201,10 @@ public final class KafkaLZ4BlockInputStream extends InputStream { // verify checksum if (flg.isBlockChecksumSet()) { - int hash = CHECKSUM.hash(in, in.position(), blockSize, 0); + // workaround for https://github.com/lz4/lz4-java/pull/65 + int hash = in.hasArray() ? + CHECKSUM.hash(in.array(), in.arrayOffset() + in.position(), blockSize, 0) : + CHECKSUM.hash(in, in.position(), blockSize, 0); in.position(in.position() + blockSize); if (hash != in.getInt()) { throw new IOException(BLOCK_HASH_MISMATCH);