This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 0882db926 [CELEBORN-2096] Support Lz4 Compression in CppClient
0882db926 is described below

commit 0882db926dbb488725dc7578fe4a94a0516e7e78
Author: Jray <[email protected]>
AuthorDate: Tue Aug 19 10:05:13 2025 +0800

    [CELEBORN-2096] Support Lz4 Compression in CppClient
    
    ### What changes were proposed in this pull request?
    This PR adds support for lz4 compression in CppClient.
    
    ### Why are the changes needed?
    To support writing to Celeborn with CppClient.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    By compilation and UTs.
    
    Closes #3412 from Jraaay/feat/cpp_client_lz4_compression.
    
    Authored-by: Jray <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 cpp/celeborn/client/CMakeLists.txt                 |  4 +-
 .../compress/{Lz4Decompressor.h => Compressor.cpp} | 34 +++++----
 .../compress/{Lz4Decompressor.h => Compressor.h}   | 40 +++++++----
 cpp/celeborn/client/compress/Lz4Compressor.cpp     | 84 ++++++++++++++++++++++
 .../{Lz4Decompressor.h => Lz4Compressor.h}         | 25 ++++---
 cpp/celeborn/client/compress/Lz4Decompressor.cpp   | 16 ++---
 cpp/celeborn/client/compress/Lz4Decompressor.h     |  2 +-
 cpp/celeborn/client/tests/CMakeLists.txt           |  3 +-
 cpp/celeborn/client/tests/Lz4CompressorTest.cpp    | 79 ++++++++++++++++++++
 9 files changed, 234 insertions(+), 53 deletions(-)

diff --git a/cpp/celeborn/client/CMakeLists.txt 
b/cpp/celeborn/client/CMakeLists.txt
index b3d2cd331..c5534a3a8 100644
--- a/cpp/celeborn/client/CMakeLists.txt
+++ b/cpp/celeborn/client/CMakeLists.txt
@@ -19,7 +19,9 @@ add_library(
         ShuffleClient.cpp
         compress/Decompressor.cpp
         compress/Lz4Decompressor.cpp
-        compress/ZstdDecompressor.cpp)
+        compress/ZstdDecompressor.cpp
+        compress/Compressor.cpp
+        compress/Lz4Compressor.cpp)
 
 target_include_directories(client PUBLIC ${CMAKE_BINARY_DIR})
 
diff --git a/cpp/celeborn/client/compress/Lz4Decompressor.h 
b/cpp/celeborn/client/compress/Compressor.cpp
similarity index 62%
copy from cpp/celeborn/client/compress/Lz4Decompressor.h
copy to cpp/celeborn/client/compress/Compressor.cpp
index 3d34a6876..849b1ff0d 100644
--- a/cpp/celeborn/client/compress/Lz4Decompressor.h
+++ b/cpp/celeborn/client/compress/Compressor.cpp
@@ -15,30 +15,28 @@
  * limitations under the License.
  */
 
-#pragma once
+#include <stdexcept>
 
-#include <xxhash.h>
-#include "celeborn/client/compress/Decompressor.h"
-#include "celeborn/client/compress/Lz4Trait.h"
+#include "celeborn/client/compress/Lz4Compressor.h"
+#include "celeborn/utils/Exceptions.h"
 
 namespace celeborn {
 namespace client {
 namespace compress {
 
-class Lz4Decompressor final : public Decompressor, Lz4Trait {
- public:
-  Lz4Decompressor();
-  ~Lz4Decompressor() override;
-
-  int getOriginalLen(const uint8_t* src) override;
-  int decompress(const uint8_t* src, uint8_t* dst, int dstOff) override;
-
-  Lz4Decompressor(const Lz4Decompressor&) = delete;
-  Lz4Decompressor& operator=(const Lz4Decompressor&) = delete;
-
- private:
-  XXH32_state_t* xxhash_state_;
-};
+std::unique_ptr<Compressor> Compressor::createCompressor(
+    const conf::CelebornConf& conf) {
+  const auto codec = conf.shuffleCompressionCodec();
+  switch (codec) {
+    case protocol::CompressionCodec::LZ4:
+      return std::make_unique<Lz4Compressor>();
+    case protocol::CompressionCodec::ZSTD:
+      // TODO: impl zstd
+      CELEBORN_FAIL("Compression codec ZSTD is not supported.");
+    default:
+      CELEBORN_FAIL("Unknown compression codec.");
+  }
+}
 
 } // namespace compress
 } // namespace client
diff --git a/cpp/celeborn/client/compress/Lz4Decompressor.h 
b/cpp/celeborn/client/compress/Compressor.h
similarity index 59%
copy from cpp/celeborn/client/compress/Lz4Decompressor.h
copy to cpp/celeborn/client/compress/Compressor.h
index 3d34a6876..711c7342b 100644
--- a/cpp/celeborn/client/compress/Lz4Decompressor.h
+++ b/cpp/celeborn/client/compress/Compressor.h
@@ -17,27 +17,37 @@
 
 #pragma once
 
-#include <xxhash.h>
-#include "celeborn/client/compress/Decompressor.h"
-#include "celeborn/client/compress/Lz4Trait.h"
+#include <memory>
+
+#include "celeborn/conf/CelebornConf.h"
 
 namespace celeborn {
 namespace client {
 namespace compress {
 
-class Lz4Decompressor final : public Decompressor, Lz4Trait {
+class Compressor {
  public:
-  Lz4Decompressor();
-  ~Lz4Decompressor() override;
-
-  int getOriginalLen(const uint8_t* src) override;
-  int decompress(const uint8_t* src, uint8_t* dst, int dstOff) override;
-
-  Lz4Decompressor(const Lz4Decompressor&) = delete;
-  Lz4Decompressor& operator=(const Lz4Decompressor&) = delete;
-
- private:
-  XXH32_state_t* xxhash_state_;
+  virtual ~Compressor() = default;
+
+  virtual size_t compress(
+      const uint8_t* src,
+      int srcOffset,
+      int srcLength,
+      uint8_t* dst,
+      int dstOffset) = 0;
+
+  virtual size_t getDstCapacity(int length) = 0;
+
+  static std::unique_ptr<Compressor> createCompressor(
+      const conf::CelebornConf& conf);
+
+ protected:
+  static void writeIntLE(const int i, uint8_t* buf, int off) {
+    buf[off++] = static_cast<uint8_t>(i);
+    buf[off++] = static_cast<uint8_t>(i >> 8);
+    buf[off++] = static_cast<uint8_t>(i >> 16);
+    buf[off] = static_cast<uint8_t>(i >> 24);
+  }
 };
 
 } // namespace compress
diff --git a/cpp/celeborn/client/compress/Lz4Compressor.cpp 
b/cpp/celeborn/client/compress/Lz4Compressor.cpp
new file mode 100644
index 000000000..be21ed4a8
--- /dev/null
+++ b/cpp/celeborn/client/compress/Lz4Compressor.cpp
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <lz4.h>
+#include <cstring>
+
+#include "celeborn/client/compress/Lz4Compressor.h"
+#include "celeborn/utils/Exceptions.h"
+
+namespace celeborn {
+namespace client {
+namespace compress {
+
+Lz4Compressor::Lz4Compressor() {
+  xxhashState_ = XXH32_createState();
+  if (!xxhashState_) {
+    CELEBORN_FAIL("Failed to create XXH32 state.")
+  }
+  XXH32_reset(xxhashState_, kDefaultSeed);
+}
+
+Lz4Compressor::~Lz4Compressor() {
+  if (xxhashState_) {
+    XXH32_freeState(xxhashState_);
+  }
+}
+
+size_t Lz4Compressor::compress(
+    const uint8_t* src,
+    const int srcOffset,
+    const int srcLength,
+    uint8_t* dst,
+    const int dstOffset) {
+  const auto srcPtr = reinterpret_cast<const char*>(src + srcOffset);
+  const auto dstPtr = dst + dstOffset;
+  const auto dstDataPtr = reinterpret_cast<char*>(dstPtr + kHeaderLength);
+
+  XXH32_reset(xxhashState_, kDefaultSeed);
+  XXH32_update(xxhashState_, srcPtr, srcLength);
+  const uint32_t check = XXH32_digest(xxhashState_) & 0xFFFFFFFL;
+
+  std::copy_n(kMagic, kMagicLength, dstPtr);
+
+  int compressedLength = LZ4_compress_default(
+      srcPtr, dstDataPtr, srcLength, LZ4_compressBound(srcLength));
+
+  int compressionMethod;
+  if (compressedLength <= 0 || compressedLength >= srcLength) {
+    compressionMethod = kCompressionMethodRaw;
+    compressedLength = srcLength;
+    std::copy_n(srcPtr, srcLength, dstDataPtr);
+  } else {
+    compressionMethod = kCompressionMethodLZ4;
+  }
+
+  dstPtr[kMagicLength] = static_cast<uint8_t>(compressionMethod);
+  writeIntLE(compressedLength, dstPtr, kMagicLength + 1);
+  writeIntLE(srcLength, dstPtr, kMagicLength + 5);
+  writeIntLE(static_cast<int>(check), dstPtr, kMagicLength + 9);
+
+  return kHeaderLength + compressedLength;
+}
+
+size_t Lz4Compressor::getDstCapacity(const int length) {
+  return LZ4_compressBound(length) + kHeaderLength;
+}
+
+} // namespace compress
+} // namespace client
+} // namespace celeborn
diff --git a/cpp/celeborn/client/compress/Lz4Decompressor.h 
b/cpp/celeborn/client/compress/Lz4Compressor.h
similarity index 68%
copy from cpp/celeborn/client/compress/Lz4Decompressor.h
copy to cpp/celeborn/client/compress/Lz4Compressor.h
index 3d34a6876..e2d316cf9 100644
--- a/cpp/celeborn/client/compress/Lz4Decompressor.h
+++ b/cpp/celeborn/client/compress/Lz4Compressor.h
@@ -18,26 +18,33 @@
 #pragma once
 
 #include <xxhash.h>
-#include "celeborn/client/compress/Decompressor.h"
+
+#include "celeborn/client/compress/Compressor.h"
 #include "celeborn/client/compress/Lz4Trait.h"
 
 namespace celeborn {
 namespace client {
 namespace compress {
 
-class Lz4Decompressor final : public Decompressor, Lz4Trait {
+class Lz4Compressor final : public Compressor, Lz4Trait {
  public:
-  Lz4Decompressor();
-  ~Lz4Decompressor() override;
+  explicit Lz4Compressor();
+  ~Lz4Compressor() override;
+
+  size_t compress(
+      const uint8_t* src,
+      int srcOffset,
+      int srcLength,
+      uint8_t* dst,
+      int dstOffset) override;
 
-  int getOriginalLen(const uint8_t* src) override;
-  int decompress(const uint8_t* src, uint8_t* dst, int dstOff) override;
+  size_t getDstCapacity(int length) override;
 
-  Lz4Decompressor(const Lz4Decompressor&) = delete;
-  Lz4Decompressor& operator=(const Lz4Decompressor&) = delete;
+  Lz4Compressor(const Lz4Compressor&) = delete;
+  Lz4Compressor& operator=(const Lz4Compressor&) = delete;
 
  private:
-  XXH32_state_t* xxhash_state_;
+  XXH32_state_t* xxhashState_;
 };
 
 } // namespace compress
diff --git a/cpp/celeborn/client/compress/Lz4Decompressor.cpp 
b/cpp/celeborn/client/compress/Lz4Decompressor.cpp
index f85ced0d2..c221e9c62 100644
--- a/cpp/celeborn/client/compress/Lz4Decompressor.cpp
+++ b/cpp/celeborn/client/compress/Lz4Decompressor.cpp
@@ -25,16 +25,16 @@ namespace celeborn {
 namespace client {
 namespace compress {
 Lz4Decompressor::Lz4Decompressor() {
-  xxhash_state_ = XXH32_createState();
-  if (!xxhash_state_) {
+  xxhashState_ = XXH32_createState();
+  if (!xxhashState_) {
     CELEBORN_FAIL("Failed to create XXH32 state.")
   }
-  XXH32_reset(xxhash_state_, kDefaultSeed);
+  XXH32_reset(xxhashState_, kDefaultSeed);
 }
 
 Lz4Decompressor::~Lz4Decompressor() {
-  if (xxhash_state_) {
-    XXH32_freeState(xxhash_state_);
+  if (xxhashState_) {
+    XXH32_freeState(xxhashState_);
   }
 }
 
@@ -79,9 +79,9 @@ int Lz4Decompressor::decompress(
           std::to_string(compressionMethod));
   }
 
-  XXH32_reset(xxhash_state_, kDefaultSeed);
-  XXH32_update(xxhash_state_, dstPtr, originalLen);
-  const uint32_t actualCheck = XXH32_digest(xxhash_state_) & 0xFFFFFFFL;
+  XXH32_reset(xxhashState_, kDefaultSeed);
+  XXH32_update(xxhashState_, dstPtr, originalLen);
+  const uint32_t actualCheck = XXH32_digest(xxhashState_) & 0xFFFFFFFL;
 
   if (static_cast<uint32_t>(expectedCheck) != actualCheck) {
     CELEBORN_FAIL(
diff --git a/cpp/celeborn/client/compress/Lz4Decompressor.h 
b/cpp/celeborn/client/compress/Lz4Decompressor.h
index 3d34a6876..d574a0b3c 100644
--- a/cpp/celeborn/client/compress/Lz4Decompressor.h
+++ b/cpp/celeborn/client/compress/Lz4Decompressor.h
@@ -37,7 +37,7 @@ class Lz4Decompressor final : public Decompressor, Lz4Trait {
   Lz4Decompressor& operator=(const Lz4Decompressor&) = delete;
 
  private:
-  XXH32_state_t* xxhash_state_;
+  XXH32_state_t* xxhashState_;
 };
 
 } // namespace compress
diff --git a/cpp/celeborn/client/tests/CMakeLists.txt 
b/cpp/celeborn/client/tests/CMakeLists.txt
index 05d682576..9ac740474 100644
--- a/cpp/celeborn/client/tests/CMakeLists.txt
+++ b/cpp/celeborn/client/tests/CMakeLists.txt
@@ -17,7 +17,8 @@ add_executable(
         celeborn_client_test
         WorkerPartitionReaderTest.cpp
         Lz4DecompressorTest.cpp
-        ZstdDecompressorTest.cpp)
+        ZstdDecompressorTest.cpp
+        Lz4CompressorTest.cpp)
 
 add_test(NAME celeborn_client_test COMMAND celeborn_client_test)
 
diff --git a/cpp/celeborn/client/tests/Lz4CompressorTest.cpp 
b/cpp/celeborn/client/tests/Lz4CompressorTest.cpp
new file mode 100644
index 000000000..57ac9f22b
--- /dev/null
+++ b/cpp/celeborn/client/tests/Lz4CompressorTest.cpp
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gtest/gtest.h>
+
+#include "celeborn/client/compress/Lz4Compressor.h"
+#include "celeborn/client/compress/Lz4Decompressor.h"
+
+using namespace celeborn;
+using namespace celeborn::client;
+using namespace celeborn::protocol;
+
+TEST(Lz4CompressorTest, CompressWithLz4) {
+  compress::Lz4Compressor compressor;
+  const std::string toCompressData = "Helloooooooo Celeborn!!!!!!!!!!";
+
+  const auto maxLength = compressor.getDstCapacity(toCompressData.size());
+  std::vector<uint8_t> compressedData(maxLength);
+  compressor.compress(
+      reinterpret_cast<const uint8_t*>(toCompressData.data()),
+      0,
+      toCompressData.size(),
+      compressedData.data(),
+      0);
+
+  compress::Lz4Decompressor decompressor;
+
+  const int originalLen = decompressor.getOriginalLen(compressedData.data());
+
+  std::vector<uint8_t> decompressedData(originalLen + 1);
+  decompressedData[originalLen] = '\0';
+
+  const bool success = decompressor.decompress(
+      compressedData.data(), decompressedData.data(), 0);
+
+  EXPECT_TRUE(success);
+  EXPECT_EQ(reinterpret_cast<char*>(decompressedData.data()), toCompressData);
+}
+
+TEST(Lz4CompressorTest, CompressWithRaw) {
+  compress::Lz4Compressor compressor;
+  const std::string toCompressData = "Hello Celeborn!";
+
+  const auto maxLength = compressor.getDstCapacity(toCompressData.size());
+  std::vector<uint8_t> compressedData(maxLength);
+  compressor.compress(
+      reinterpret_cast<const uint8_t*>(toCompressData.data()),
+      0,
+      toCompressData.size(),
+      compressedData.data(),
+      0);
+
+  compress::Lz4Decompressor decompressor;
+
+  const int originalLen = decompressor.getOriginalLen(compressedData.data());
+
+  std::vector<uint8_t> decompressedData(originalLen + 1);
+  decompressedData[originalLen] = '\0';
+
+  const bool success = decompressor.decompress(
+      compressedData.data(), decompressedData.data(), 0);
+
+  EXPECT_TRUE(success);
+  EXPECT_EQ(reinterpret_cast<char*>(decompressedData.data()), toCompressData);
+}

Reply via email to