This is an automated email from the ASF dual-hosted git repository. brandonwilliams pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new f80a2ce Prevent the JVM from crashing due to corrupt LZ4 streams Patch by David Capwell; reviewed by Jordan West and brandonwilliams for CASSANDRA-15556 f80a2ce is described below commit f80a2ce10ca18e43fc8767cc8801701231e68d29 Author: David Capwell <dcapw...@gmail.com> AuthorDate: Thu Feb 6 13:34:37 2020 -0800 Prevent the JVM from crashing due to corrupt LZ4 streams Patch by David Capwell; reviewed by Jordan West and brandonwilliams for CASSANDRA-15556 --- build.xml | 2 +- lib/lz4-java-1.4.0.jar | Bin 370119 -> 0 bytes lib/lz4-java-1.7.1.jar | Bin 0 -> 649950 bytes .../transport/frame/compress/LZ4Compressor.java | 10 ++- .../data/CASSANDRA-15313/lz4-jvm-crash-failure.txt | 1 + .../microbench/ChecksummingTransformerLz4.java | 95 +++++++++++++++++++++ .../checksum/ChecksummingTransformerTest.java | 4 +- ...ecksummingWithCorruptedLZ4DoesNotCrashTest.java | 80 +++++++++++++++++ 8 files changed, 185 insertions(+), 7 deletions(-) diff --git a/build.xml b/build.xml index 8ea7740..ecc60f3 100644 --- a/build.xml +++ b/build.xml @@ -509,7 +509,7 @@ <scm connection="${scm.connection}" developerConnection="${scm.developerConnection}" url="${scm.url}"/> <dependencyManagement> <dependency groupId="org.xerial.snappy" artifactId="snappy-java" version="1.1.2.6"/> - <dependency groupId="org.lz4" artifactId="lz4-java" version="1.4.0"/> + <dependency groupId="org.lz4" artifactId="lz4-java" version="1.7.1"/> <dependency groupId="com.ning" artifactId="compress-lzf" version="0.8.4"/> <dependency groupId="com.github.luben" artifactId="zstd-jni" version="1.3.8-5"/> <dependency groupId="com.google.guava" artifactId="guava" version="27.0-jre"/> diff --git a/lib/lz4-java-1.4.0.jar b/lib/lz4-java-1.4.0.jar deleted file mode 100644 index 301908b..0000000 Binary files a/lib/lz4-java-1.4.0.jar and /dev/null differ diff --git a/lib/lz4-java-1.7.1.jar b/lib/lz4-java-1.7.1.jar new file mode 100644 index 0000000..95f57ca Binary files /dev/null and b/lib/lz4-java-1.7.1.jar differ diff --git a/src/java/org/apache/cassandra/transport/frame/compress/LZ4Compressor.java b/src/java/org/apache/cassandra/transport/frame/compress/LZ4Compressor.java index 8ac42e2..88633c6 100644 --- a/src/java/org/apache/cassandra/transport/frame/compress/LZ4Compressor.java +++ b/src/java/org/apache/cassandra/transport/frame/compress/LZ4Compressor.java @@ -21,20 +21,20 @@ package org.apache.cassandra.transport.frame.compress; import java.io.IOException; import net.jpountz.lz4.LZ4Factory; -import net.jpountz.lz4.LZ4FastDecompressor; +import net.jpountz.lz4.LZ4SafeDecompressor; public class LZ4Compressor implements Compressor { public static final LZ4Compressor INSTANCE = new LZ4Compressor(); private final net.jpountz.lz4.LZ4Compressor compressor; - private final LZ4FastDecompressor decompressor; + private final LZ4SafeDecompressor decompressor; private LZ4Compressor() { final LZ4Factory lz4Factory = LZ4Factory.fastestInstance(); compressor = lz4Factory.fastCompressor(); - decompressor = lz4Factory.fastDecompressor(); + decompressor = lz4Factory.safeDecompressor(); } public int maxCompressedLength(int length) @@ -58,7 +58,9 @@ public class LZ4Compressor implements Compressor { try { - return decompressor.decompress(src, offset, expectedDecompressedLength); + byte[] decompressed = new byte[expectedDecompressedLength]; + decompressor.decompress(src, offset, length, decompressed, 0, expectedDecompressedLength); + return decompressed; } catch (Throwable t) { diff --git a/test/data/CASSANDRA-15313/lz4-jvm-crash-failure.txt b/test/data/CASSANDRA-15313/lz4-jvm-crash-failure.txt new file mode 100644 index 0000000..0f64d84 --- /dev/null +++ b/test/data/CASSANDRA-15313/lz4-jvm-crash-failure.txt @@ -0,0 +1 @@ +656465656364656365656464636364636563646563646365636563646465656564656363646364656565646563636463656465646464646565656365646365636564646465636563656464636563656363646364646465656363636364646364636464656464646565646363656563636365646363646365636563646464646463656564646365656565656365646465656463656564646563646365656563656465656465646463636563656364646364656565656464636563656363646363656363656563636464656365646364656565636463656465646565646563646363636464656563646565646464646563656565636365 [...] diff --git a/test/microbench/org/apache/cassandra/test/microbench/ChecksummingTransformerLz4.java b/test/microbench/org/apache/cassandra/test/microbench/ChecksummingTransformerLz4.java new file mode 100644 index 0000000..7ac62fe --- /dev/null +++ b/test/microbench/org/apache/cassandra/test/microbench/ChecksummingTransformerLz4.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.test.microbench; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.util.EnumSet; +import java.util.concurrent.TimeUnit; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.Unpooled; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.transport.Frame; +import org.apache.cassandra.transport.frame.checksum.ChecksummingTransformer; +import org.apache.cassandra.transport.frame.compress.LZ4Compressor; +import org.apache.cassandra.utils.ChecksumType; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; + +@Warmup(iterations = 4, time = 1) +@Measurement(iterations = 8, time = 2) +@Fork(value = 2) +@State(Scope.Benchmark) +public class ChecksummingTransformerLz4 +{ + private static final EnumSet<Frame.Header.Flag> FLAGS = EnumSet.of(Frame.Header.Flag.COMPRESSED, Frame.Header.Flag.CHECKSUMMED); + + static { + DatabaseDescriptor.clientInitialization(); + } + + private final ChecksummingTransformer transformer = ChecksummingTransformer.getTransformer(ChecksumType.CRC32, LZ4Compressor.INSTANCE); + private ByteBuf smallEnglishASCIICompressed; + private ByteBuf smallEnglishUtf8Compressed; + private ByteBuf largeBlobCompressed; + + @Setup + public void setup() throws IOException + { + byte[] smallEnglishASCII = "this is small".getBytes(StandardCharsets.US_ASCII); + this.smallEnglishASCIICompressed = transformer.transformOutbound(Unpooled.wrappedBuffer(smallEnglishASCII)); + byte[] smallEnglishUtf8 = "this is small".getBytes(StandardCharsets.UTF_8); + this.smallEnglishUtf8Compressed = transformer.transformOutbound(Unpooled.wrappedBuffer(smallEnglishUtf8)); + + String failureHex; + try (BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream("test/data/CASSANDRA-15313/lz4-jvm-crash-failure.txt"), StandardCharsets.UTF_8))) { + failureHex = reader.readLine().trim(); + } + byte[] failure = ByteBufUtil.decodeHexDump(failureHex); + this.largeBlobCompressed = transformer.transformOutbound(Unpooled.wrappedBuffer(failure)); + } + + @Benchmark + public ByteBuf decompresSsmallEnglishASCII() { + smallEnglishASCIICompressed.readerIndex(0); + return transformer.transformInbound(smallEnglishASCIICompressed, FLAGS); + } + + @Benchmark + public ByteBuf decompresSsmallEnglishUtf8() { + smallEnglishUtf8Compressed.readerIndex(0); + return transformer.transformInbound(smallEnglishUtf8Compressed, FLAGS); + } + + @Benchmark + public ByteBuf decompresLargeBlob() { + largeBlobCompressed.readerIndex(0); + return transformer.transformInbound(largeBlobCompressed, FLAGS); + } +} diff --git a/test/unit/org/apache/cassandra/transport/frame/checksum/ChecksummingTransformerTest.java b/test/unit/org/apache/cassandra/transport/frame/checksum/ChecksummingTransformerTest.java index d678044..5f5b10d 100644 --- a/test/unit/org/apache/cassandra/transport/frame/checksum/ChecksummingTransformerTest.java +++ b/test/unit/org/apache/cassandra/transport/frame/checksum/ChecksummingTransformerTest.java @@ -86,10 +86,10 @@ public class ChecksummingTransformerTest integers().between(0, Byte.MAX_VALUE).map(Integer::byteValue), compressors(), checksumTypes()) - .checkAssert(this::roundTripWithCorruption); + .checkAssert(ChecksummingTransformerTest::roundTripWithCorruption); } - private void roundTripWithCorruption(Pair<ReusableBuffer, Integer> inputAndCorruptablePosition, + static void roundTripWithCorruption(Pair<ReusableBuffer, Integer> inputAndCorruptablePosition, byte corruptionValue, Compressor compressor, ChecksumType checksum) diff --git a/test/unit/org/apache/cassandra/transport/frame/checksum/ChecksummingWithCorruptedLZ4DoesNotCrashTest.java b/test/unit/org/apache/cassandra/transport/frame/checksum/ChecksummingWithCorruptedLZ4DoesNotCrashTest.java new file mode 100644 index 0000000..4028bfd --- /dev/null +++ b/test/unit/org/apache/cassandra/transport/frame/checksum/ChecksummingWithCorruptedLZ4DoesNotCrashTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.transport.frame.checksum; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; + +import org.junit.BeforeClass; +import org.junit.Test; + +import io.netty.buffer.ByteBufUtil; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.transport.frame.compress.LZ4Compressor; +import org.apache.cassandra.utils.ChecksumType; +import org.apache.cassandra.utils.Pair; + +/** + * When we use LZ4 with "fast" functions its unsafe in the case the stream is corrupt; for networking checksuming is + * after lz4 decompresses (see CASSANDRA-15299) which means that lz4 can crash the process. + * + * This test is stand alone for the reason that this test is known to cause the JVM to crash. Given the way we run tests + * in CI this will kill the runner which means the file will be marked as failed; if this test was embedded into another + * test file then all the other tests would be ignored if this crashes. + */ +public class ChecksummingWithCorruptedLZ4DoesNotCrashTest +{ + @BeforeClass + public static void init() + { + // required as static ChecksummingTransformer instances read default block size from config + DatabaseDescriptor.clientInitialization(); + } + + @Test + public void shouldNotCrash() throws IOException + { + // We found lz4 caused the JVM to crash, so used the input (bytes and byteToCorrupt) to the test which crashed + // to reproduce. + // It was found that the same input does not cause lz4 to crash by it self but needed repeated calls with this + // input produce such a failure. + String failureHex; + try (BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream("test/data/CASSANDRA-15313/lz4-jvm-crash-failure.txt"), StandardCharsets.UTF_8))) { + failureHex = reader.readLine().trim(); + } + byte[] failure = ByteBufUtil.decodeHexDump(failureHex); + ReusableBuffer buffer = new ReusableBuffer(failure); + int byteToCorrupt = 52997; + // corrupting these values causes the exception. + byte[] corruptionValues = new byte[] { 21, 57, 79, (byte) 179 }; + // 5k was chosen as the largest number of iterations seen needed to crash. + for (int i = 0; i < 5_000 ; i++) { + for (byte corruptionValue : corruptionValues) { + try { + ChecksummingTransformerTest.roundTripWithCorruption(Pair.create(buffer, byteToCorrupt), corruptionValue, LZ4Compressor.INSTANCE, ChecksumType.ADLER32); + } catch (AssertionError e) { + // ignore + } + } + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org