This is an automated email from the ASF dual-hosted git repository.

william pushed a commit to branch branch-2.2
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/branch-2.2 by this push:
     new 5f601ef1e ORC-1966: [C++] Fix ZSTD compress/decompress to propagate 
errors
5f601ef1e is described below

commit 5f601ef1e8bb97fbd218a3b6a9dc74b4659fcc2f
Author: Kevin Cai <[email protected]>
AuthorDate: Thu Jul 31 13:38:55 2025 -0700

    ORC-1966: [C++] Fix ZSTD compress/decompress to propagate errors
    
    ### What changes were proposed in this pull request?
    
    Check the return code after calls ZSTD_XXXX interface, make sure errors are 
properly handled.
    
    ### Why are the changes needed?
    
    Be able to detect corrupted zstd compression and decompression and handle 
the error properly
    
    ### How was this patch tested?
    
    Unit Tests Covered
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    NO
    
    Closes #2344 from kevincai/ORC-1966-zstd-error-handling.
    
    Authored-by: Kevin Cai <[email protected]>
    Signed-off-by: William Hyun <[email protected]>
    (cherry picked from commit 84c238a03032fcda6f1cdc77b12d3f16934a9794)
    Signed-off-by: William Hyun <[email protected]>
---
 c++/src/Compression.cc      | 17 +++++++++++++----
 c++/test/TestCompression.cc | 28 ++++++++++++++++++++++++++++
 2 files changed, 41 insertions(+), 4 deletions(-)

diff --git a/c++/src/Compression.cc b/c++/src/Compression.cc
index f373a75bf..b552324ce 100644
--- a/c++/src/Compression.cc
+++ b/c++/src/Compression.cc
@@ -1156,8 +1156,13 @@ namespace orc {
   };
 
   uint64_t ZSTDCompressionStream::doBlockCompression() {
-    return ZSTD_compressCCtx(cctx_, compressorBuffer.data(), 
compressorBuffer.size(),
-                             rawInputBuffer.data(), 
static_cast<size_t>(bufferSize), level);
+    auto ret = ZSTD_compressCCtx(cctx_, compressorBuffer.data(), 
compressorBuffer.size(),
+                                 rawInputBuffer.data(), 
static_cast<size_t>(bufferSize), level);
+    if (ZSTD_isError(ret)) {
+      throw CompressionError(std::string("Error while calling 
ZSTD_compressCCtx(), error: ") +
+                             ZSTD_getErrorName(ret));
+    }
+    return ret;
   }
 
   DIAGNOSTIC_PUSH
@@ -1213,8 +1218,12 @@ namespace orc {
 
   uint64_t ZSTDDecompressionStream::decompress(const char* inputPtr, uint64_t 
length, char* output,
                                                size_t maxOutputLength) {
-    return static_cast<uint64_t>(
-        ZSTD_decompressDCtx(dctx_, output, maxOutputLength, inputPtr, length));
+    auto ret = ZSTD_decompressDCtx(dctx_, output, maxOutputLength, inputPtr, 
length);
+    if (ZSTD_isError(ret)) {
+      throw CompressionError(std::string("Error while calling 
ZSTD_decompressDCtx(), error: ") +
+                             ZSTD_getErrorName(ret));
+    }
+    return static_cast<uint64_t>(ret);
   }
 
   DIAGNOSTIC_PUSH
diff --git a/c++/test/TestCompression.cc b/c++/test/TestCompression.cc
index e95a6f016..6a5c4a856 100644
--- a/c++/test/TestCompression.cc
+++ b/c++/test/TestCompression.cc
@@ -60,6 +60,7 @@ namespace orc {
         ++pos;
       }
     }
+    EXPECT_EQ(size, pos);
   }
 
   void compressAndVerify(CompressionKind kind, OutputStream* outStream,
@@ -369,4 +370,31 @@ namespace orc {
     testSeekDecompressionStream(CompressionKind_LZ4);
     testSeekDecompressionStream(CompressionKind_SNAPPY);
   }
+
+  TEST(Compression, ZstdDecompressStreamCorrupted) {
+    MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+    MemoryPool* pool = getDefaultPool();
+    CompressionKind kind = CompressionKind_ZSTD;
+
+    uint64_t capacity = 1024;
+    uint64_t block = 128;
+
+    char testData[] = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
+    // generate valid compressed data from testData
+    compressAndVerify(kind, &memStream, CompressionStrategy_SPEED, capacity, 
block, *pool, testData,
+                      sizeof(testData));
+
+    // Corrupt the compressed data by flipping the 2nd byte counting from the 
end
+    std::string corruptedData(memStream.getData(), memStream.getLength());
+    size_t corruptedPos = corruptedData.size() - 2;
+    corruptedData.at(corruptedPos) ^= 0x1;
+
+    // create a new memStream with the corrupted data
+    MemoryOutputStream memStream2(DEFAULT_MEM_STREAM_SIZE);
+    memStream2.write(corruptedData.data(), corruptedData.size());
+
+    // The corruption shall be detected correctly.
+    EXPECT_THROW(decompressAndVerify(memStream2, kind, testData, 
sizeof(testData), *pool, capacity),
+                 CompressionError);
+  }
 }  // namespace orc

Reply via email to