This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.6 by this push:
new 551a1d8a2 [CELEBORN-2200] Throw IOException when compressed data
header corrupted
551a1d8a2 is described below
commit 551a1d8a2f848c02850b61404155ea212af63e36
Author: jiang13021 <[email protected]>
AuthorDate: Wed Nov 12 14:34:46 2025 +0800
[CELEBORN-2200] Throw IOException when compressed data header corrupted
### What changes were proposed in this pull request?
As title.
### Why are the changes needed?
We discovered that the corruption of the compressed data header may cause
data loss. By throwing IOException, we can trigger a stage rerun to avoid data
loss.
### Does this PR resolve a correctness bug?
Yes
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Add UT testLz4CodecCorrupted & testZstdCodecCorrupted.
Closes #3534 from jiang13021/celeborn-2200.
Authored-by: jiang13021 <[email protected]>
Signed-off-by: Shuang <[email protected]>
(cherry picked from commit 79f0d319a17b838e15021b4abbb4040c710254ac)
Signed-off-by: Shuang <[email protected]>
---
.../celeborn/client/compress/Decompressor.java | 4 +-
.../celeborn/client/compress/Lz4Decompressor.java | 18 ++++----
.../celeborn/client/compress/ZstdDecompressor.java | 20 +++++----
.../celeborn/client/compress/CodecSuiteJ.java | 49 +++++++++++++++++++++-
4 files changed, 72 insertions(+), 19 deletions(-)
diff --git
a/client/src/main/java/org/apache/celeborn/client/compress/Decompressor.java
b/client/src/main/java/org/apache/celeborn/client/compress/Decompressor.java
index 2918fe1f2..37cead4c2 100644
--- a/client/src/main/java/org/apache/celeborn/client/compress/Decompressor.java
+++ b/client/src/main/java/org/apache/celeborn/client/compress/Decompressor.java
@@ -17,6 +17,8 @@
package org.apache.celeborn.client.compress;
+import java.io.IOException;
+
import scala.Option;
import org.apache.celeborn.common.CelebornConf;
@@ -24,7 +26,7 @@ import org.apache.celeborn.common.protocol.CompressionCodec;
public interface Decompressor {
- int decompress(byte[] src, byte[] dst, int dstOff);
+ int decompress(byte[] src, byte[] dst, int dstOff) throws IOException;
int getOriginalLen(byte[] src);
diff --git
a/client/src/main/java/org/apache/celeborn/client/compress/Lz4Decompressor.java
b/client/src/main/java/org/apache/celeborn/client/compress/Lz4Decompressor.java
index 94569f800..8a538ef34 100644
---
a/client/src/main/java/org/apache/celeborn/client/compress/Lz4Decompressor.java
+++
b/client/src/main/java/org/apache/celeborn/client/compress/Lz4Decompressor.java
@@ -17,6 +17,7 @@
package org.apache.celeborn.client.compress;
+import java.io.IOException;
import java.util.Map;
import java.util.function.Supplier;
import java.util.zip.Checksum;
@@ -56,7 +57,7 @@ public class Lz4Decompressor extends Lz4Trait implements
Decompressor {
}
@Override
- public int decompress(byte[] src, byte[] dst, int dstOff) {
+ public int decompress(byte[] src, byte[] dst, int dstOff) throws IOException
{
int compressionMethod = src[MAGIC_LENGTH] & 0xFF;
int compressedLen = readIntLE(src, MAGIC_LENGTH + 1);
int originalLen = readIntLE(src, MAGIC_LENGTH + 5);
@@ -69,19 +70,20 @@ public class Lz4Decompressor extends Lz4Trait implements
Decompressor {
case COMPRESSION_METHOD_LZ4:
int compressedLen2 = decompressor.decompress(src, HEADER_LENGTH, dst,
dstOff, originalLen);
if (compressedLen != compressedLen2) {
- logger.error(
- "Compressed length corrupted! expected: {}, actual: {}.",
- compressedLen,
- compressedLen2);
- return -1;
+ throw new IOException(
+ "Compressed length corrupted! expected: "
+ + compressedLen
+ + ", actual: "
+ + compressedLen2
+ + ".");
}
}
checksum.reset();
checksum.update(dst, dstOff, originalLen);
if ((int) checksum.getValue() != check) {
- logger.error("Checksum not equal! expected: {}, actual: {}.", check,
checksum.getValue());
- return -1;
+ throw new IOException(
+ "Checksum not equal! expected: " + check + ", actual: " +
checksum.getValue() + ".");
}
return originalLen;
diff --git
a/client/src/main/java/org/apache/celeborn/client/compress/ZstdDecompressor.java
b/client/src/main/java/org/apache/celeborn/client/compress/ZstdDecompressor.java
index c35bc9670..1c06a1c72 100644
---
a/client/src/main/java/org/apache/celeborn/client/compress/ZstdDecompressor.java
+++
b/client/src/main/java/org/apache/celeborn/client/compress/ZstdDecompressor.java
@@ -17,6 +17,7 @@
package org.apache.celeborn.client.compress;
+import java.io.IOException;
import java.util.zip.CRC32;
import java.util.zip.Checksum;
@@ -38,7 +39,7 @@ public class ZstdDecompressor extends ZstdTrait implements
Decompressor {
}
@Override
- public int decompress(byte[] src, byte[] dst, int dstOff) {
+ public int decompress(byte[] src, byte[] dst, int dstOff) throws IOException
{
int compressionMethod = src[MAGIC_LENGTH] & 0xFF;
int compressedLen = readIntLE(src, MAGIC_LENGTH + 1);
int originalLen = readIntLE(src, MAGIC_LENGTH + 5);
@@ -54,21 +55,24 @@ public class ZstdDecompressor extends ZstdTrait implements
Decompressor {
Zstd.decompressByteArray(
dst, dstOff, originalLen, src, HEADER_LENGTH,
compressedLen);
if (originalLen != originalLen2) {
- logger.error(
- "Original length corrupted! expected: {}, actual: {}.",
originalLen, originalLen2);
- return -1;
+ throw new IOException(
+ "Original length corrupted! expected: "
+ + originalLen
+ + ", actual: "
+ + originalLen2
+ + ".");
}
break;
default:
- logger.error("Unknown compression method whose decimal number is {}
.", compressionMethod);
- return -1;
+ throw new IOException(
+ "Unknown compression method whose decimal number is {" +
compressionMethod + "} .");
}
checksum.reset();
checksum.update(dst, dstOff, originalLen);
if ((int) checksum.getValue() != check) {
- logger.error("Checksum not equal! expected: {}, actual: {}.", check,
checksum.getValue());
- return -1;
+ throw new IOException(
+ "Checksum not equal! expected: " + check + ", actual: " +
checksum.getValue() + ".");
}
return originalLen;
}
diff --git
a/client/src/test/java/org/apache/celeborn/client/compress/CodecSuiteJ.java
b/client/src/test/java/org/apache/celeborn/client/compress/CodecSuiteJ.java
index 07ba142e4..1b010422e 100644
--- a/client/src/test/java/org/apache/celeborn/client/compress/CodecSuiteJ.java
+++ b/client/src/test/java/org/apache/celeborn/client/compress/CodecSuiteJ.java
@@ -17,6 +17,7 @@
package org.apache.celeborn.client.compress;
+import java.io.IOException;
import java.nio.charset.StandardCharsets;
import scala.Option;
@@ -30,7 +31,7 @@ import org.apache.celeborn.common.CelebornConf;
public class CodecSuiteJ {
@Test
- public void testLz4Codec() {
+ public void testLz4Codec() throws IOException {
int blockSize = new CelebornConf().clientPushBufferMaxSize();
Lz4Compressor lz4Compressor = new Lz4Compressor(blockSize);
byte[] data =
RandomStringUtils.random(1024).getBytes(StandardCharsets.UTF_8);
@@ -47,7 +48,29 @@ public class CodecSuiteJ {
}
@Test
- public void testZstdCodec() {
+ public void testLz4CodecCorrupted() {
+ int blockSize = (new CelebornConf()).clientPushBufferMaxSize();
+ Lz4Compressor lz4Compressor = new Lz4Compressor(blockSize);
+ byte[] data =
RandomStringUtils.random(1024).getBytes(StandardCharsets.UTF_8);
+ int oriLength = data.length;
+ lz4Compressor.compress(data, 0, oriLength);
+
+ byte[] compressedBuffer = lz4Compressor.getCompressedBuffer().clone();
+ // Manually corrupted data
+ compressedBuffer[Lz4Trait.MAGIC_LENGTH + 9] =
++compressedBuffer[Lz4Trait.MAGIC_LENGTH + 9];
+
+ Lz4Decompressor lz4Decompressor = new Lz4Decompressor(Option.empty());
+ byte[] dst = new byte[oriLength];
+ try {
+ lz4Decompressor.decompress(compressedBuffer, dst, 0);
+ Assert.fail("The compressed data is corrupted, so decompression should
fail.");
+ } catch (IOException e) {
+ Assert.assertTrue(e.getMessage().contains("Checksum not equal!"));
+ }
+ }
+
+ @Test
+ public void testZstdCodec() throws IOException {
for (int level = -5; level <= 22; level++) {
int blockSize = new CelebornConf().clientPushBufferMaxSize();
ZstdCompressor zstdCompressor = new ZstdCompressor(blockSize, level);
@@ -65,4 +88,26 @@ public class CodecSuiteJ {
Assert.assertArrayEquals(data, dst);
}
}
+
+ @Test
+ public void testZstdCodecCorrupted() {
+ int blockSize = (new CelebornConf()).clientPushBufferMaxSize();
+ ZstdCompressor zstdCompressor = new ZstdCompressor(blockSize, 1);
+ byte[] data =
RandomStringUtils.random(1024).getBytes(StandardCharsets.UTF_8);
+ int oriLength = data.length;
+ zstdCompressor.compress(data, 0, oriLength);
+
+ byte[] compressedBuffer = zstdCompressor.getCompressedBuffer().clone();
+ // Manually corrupted data
+ compressedBuffer[ZstdTrait.MAGIC_LENGTH + 9] =
++compressedBuffer[ZstdTrait.MAGIC_LENGTH + 9];
+
+ ZstdDecompressor zstdDecompressor = new ZstdDecompressor();
+ byte[] dst = new byte[oriLength];
+ try {
+ zstdDecompressor.decompress(compressedBuffer, dst, 0);
+ Assert.fail("The compressed data is corrupted, so decompression should
fail.");
+ } catch (IOException e) {
+ Assert.assertTrue(e.getMessage().contains("Checksum not equal!"));
+ }
+ }
}