This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 0882db926 [CELEBORN-2096] Support Lz4 Compression in CppClient
0882db926 is described below
commit 0882db926dbb488725dc7578fe4a94a0516e7e78
Author: Jray <[email protected]>
AuthorDate: Tue Aug 19 10:05:13 2025 +0800
[CELEBORN-2096] Support Lz4 Compression in CppClient
### What changes were proposed in this pull request?
This PR adds support for lz4 compression in CppClient.
### Why are the changes needed?
To support writing to Celeborn with CppClient.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
By compilation and UTs.
Closes #3412 from Jraaay/feat/cpp_client_lz4_compression.
Authored-by: Jray <[email protected]>
Signed-off-by: Shuang <[email protected]>
---
cpp/celeborn/client/CMakeLists.txt | 4 +-
.../compress/{Lz4Decompressor.h => Compressor.cpp} | 34 +++++----
.../compress/{Lz4Decompressor.h => Compressor.h} | 40 +++++++----
cpp/celeborn/client/compress/Lz4Compressor.cpp | 84 ++++++++++++++++++++++
.../{Lz4Decompressor.h => Lz4Compressor.h} | 25 ++++---
cpp/celeborn/client/compress/Lz4Decompressor.cpp | 16 ++---
cpp/celeborn/client/compress/Lz4Decompressor.h | 2 +-
cpp/celeborn/client/tests/CMakeLists.txt | 3 +-
cpp/celeborn/client/tests/Lz4CompressorTest.cpp | 79 ++++++++++++++++++++
9 files changed, 234 insertions(+), 53 deletions(-)
diff --git a/cpp/celeborn/client/CMakeLists.txt
b/cpp/celeborn/client/CMakeLists.txt
index b3d2cd331..c5534a3a8 100644
--- a/cpp/celeborn/client/CMakeLists.txt
+++ b/cpp/celeborn/client/CMakeLists.txt
@@ -19,7 +19,9 @@ add_library(
ShuffleClient.cpp
compress/Decompressor.cpp
compress/Lz4Decompressor.cpp
- compress/ZstdDecompressor.cpp)
+ compress/ZstdDecompressor.cpp
+ compress/Compressor.cpp
+ compress/Lz4Compressor.cpp)
target_include_directories(client PUBLIC ${CMAKE_BINARY_DIR})
diff --git a/cpp/celeborn/client/compress/Lz4Decompressor.h
b/cpp/celeborn/client/compress/Compressor.cpp
similarity index 62%
copy from cpp/celeborn/client/compress/Lz4Decompressor.h
copy to cpp/celeborn/client/compress/Compressor.cpp
index 3d34a6876..849b1ff0d 100644
--- a/cpp/celeborn/client/compress/Lz4Decompressor.h
+++ b/cpp/celeborn/client/compress/Compressor.cpp
@@ -15,30 +15,28 @@
* limitations under the License.
*/
-#pragma once
+#include <stdexcept>
-#include <xxhash.h>
-#include "celeborn/client/compress/Decompressor.h"
-#include "celeborn/client/compress/Lz4Trait.h"
+#include "celeborn/client/compress/Lz4Compressor.h"
+#include "celeborn/utils/Exceptions.h"
namespace celeborn {
namespace client {
namespace compress {
-class Lz4Decompressor final : public Decompressor, Lz4Trait {
- public:
- Lz4Decompressor();
- ~Lz4Decompressor() override;
-
- int getOriginalLen(const uint8_t* src) override;
- int decompress(const uint8_t* src, uint8_t* dst, int dstOff) override;
-
- Lz4Decompressor(const Lz4Decompressor&) = delete;
- Lz4Decompressor& operator=(const Lz4Decompressor&) = delete;
-
- private:
- XXH32_state_t* xxhash_state_;
-};
+std::unique_ptr<Compressor> Compressor::createCompressor(
+ const conf::CelebornConf& conf) {
+ const auto codec = conf.shuffleCompressionCodec();
+ switch (codec) {
+ case protocol::CompressionCodec::LZ4:
+ return std::make_unique<Lz4Compressor>();
+ case protocol::CompressionCodec::ZSTD:
+ // TODO: impl zstd
+ CELEBORN_FAIL("Compression codec ZSTD is not supported.");
+ default:
+ CELEBORN_FAIL("Unknown compression codec.");
+ }
+}
} // namespace compress
} // namespace client
diff --git a/cpp/celeborn/client/compress/Lz4Decompressor.h
b/cpp/celeborn/client/compress/Compressor.h
similarity index 59%
copy from cpp/celeborn/client/compress/Lz4Decompressor.h
copy to cpp/celeborn/client/compress/Compressor.h
index 3d34a6876..711c7342b 100644
--- a/cpp/celeborn/client/compress/Lz4Decompressor.h
+++ b/cpp/celeborn/client/compress/Compressor.h
@@ -17,27 +17,37 @@
#pragma once
-#include <xxhash.h>
-#include "celeborn/client/compress/Decompressor.h"
-#include "celeborn/client/compress/Lz4Trait.h"
+#include <memory>
+
+#include "celeborn/conf/CelebornConf.h"
namespace celeborn {
namespace client {
namespace compress {
-class Lz4Decompressor final : public Decompressor, Lz4Trait {
+class Compressor {
public:
- Lz4Decompressor();
- ~Lz4Decompressor() override;
-
- int getOriginalLen(const uint8_t* src) override;
- int decompress(const uint8_t* src, uint8_t* dst, int dstOff) override;
-
- Lz4Decompressor(const Lz4Decompressor&) = delete;
- Lz4Decompressor& operator=(const Lz4Decompressor&) = delete;
-
- private:
- XXH32_state_t* xxhash_state_;
+ virtual ~Compressor() = default;
+
+ virtual size_t compress(
+ const uint8_t* src,
+ int srcOffset,
+ int srcLength,
+ uint8_t* dst,
+ int dstOffset) = 0;
+
+ virtual size_t getDstCapacity(int length) = 0;
+
+ static std::unique_ptr<Compressor> createCompressor(
+ const conf::CelebornConf& conf);
+
+ protected:
+ static void writeIntLE(const int i, uint8_t* buf, int off) {
+ buf[off++] = static_cast<uint8_t>(i);
+ buf[off++] = static_cast<uint8_t>(i >> 8);
+ buf[off++] = static_cast<uint8_t>(i >> 16);
+ buf[off] = static_cast<uint8_t>(i >> 24);
+ }
};
} // namespace compress
diff --git a/cpp/celeborn/client/compress/Lz4Compressor.cpp
b/cpp/celeborn/client/compress/Lz4Compressor.cpp
new file mode 100644
index 000000000..be21ed4a8
--- /dev/null
+++ b/cpp/celeborn/client/compress/Lz4Compressor.cpp
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <lz4.h>
+#include <cstring>
+
+#include "celeborn/client/compress/Lz4Compressor.h"
+#include "celeborn/utils/Exceptions.h"
+
+namespace celeborn {
+namespace client {
+namespace compress {
+
+Lz4Compressor::Lz4Compressor() {
+ xxhashState_ = XXH32_createState();
+ if (!xxhashState_) {
+ CELEBORN_FAIL("Failed to create XXH32 state.")
+ }
+ XXH32_reset(xxhashState_, kDefaultSeed);
+}
+
+Lz4Compressor::~Lz4Compressor() {
+ if (xxhashState_) {
+ XXH32_freeState(xxhashState_);
+ }
+}
+
+size_t Lz4Compressor::compress(
+ const uint8_t* src,
+ const int srcOffset,
+ const int srcLength,
+ uint8_t* dst,
+ const int dstOffset) {
+ const auto srcPtr = reinterpret_cast<const char*>(src + srcOffset);
+ const auto dstPtr = dst + dstOffset;
+ const auto dstDataPtr = reinterpret_cast<char*>(dstPtr + kHeaderLength);
+
+ XXH32_reset(xxhashState_, kDefaultSeed);
+ XXH32_update(xxhashState_, srcPtr, srcLength);
+ const uint32_t check = XXH32_digest(xxhashState_) & 0xFFFFFFFL;
+
+ std::copy_n(kMagic, kMagicLength, dstPtr);
+
+ int compressedLength = LZ4_compress_default(
+ srcPtr, dstDataPtr, srcLength, LZ4_compressBound(srcLength));
+
+ int compressionMethod;
+ if (compressedLength <= 0 || compressedLength >= srcLength) {
+ compressionMethod = kCompressionMethodRaw;
+ compressedLength = srcLength;
+ std::copy_n(srcPtr, srcLength, dstDataPtr);
+ } else {
+ compressionMethod = kCompressionMethodLZ4;
+ }
+
+ dstPtr[kMagicLength] = static_cast<uint8_t>(compressionMethod);
+ writeIntLE(compressedLength, dstPtr, kMagicLength + 1);
+ writeIntLE(srcLength, dstPtr, kMagicLength + 5);
+ writeIntLE(static_cast<int>(check), dstPtr, kMagicLength + 9);
+
+ return kHeaderLength + compressedLength;
+}
+
+size_t Lz4Compressor::getDstCapacity(const int length) {
+ return LZ4_compressBound(length) + kHeaderLength;
+}
+
+} // namespace compress
+} // namespace client
+} // namespace celeborn
diff --git a/cpp/celeborn/client/compress/Lz4Decompressor.h
b/cpp/celeborn/client/compress/Lz4Compressor.h
similarity index 68%
copy from cpp/celeborn/client/compress/Lz4Decompressor.h
copy to cpp/celeborn/client/compress/Lz4Compressor.h
index 3d34a6876..e2d316cf9 100644
--- a/cpp/celeborn/client/compress/Lz4Decompressor.h
+++ b/cpp/celeborn/client/compress/Lz4Compressor.h
@@ -18,26 +18,33 @@
#pragma once
#include <xxhash.h>
-#include "celeborn/client/compress/Decompressor.h"
+
+#include "celeborn/client/compress/Compressor.h"
#include "celeborn/client/compress/Lz4Trait.h"
namespace celeborn {
namespace client {
namespace compress {
-class Lz4Decompressor final : public Decompressor, Lz4Trait {
+class Lz4Compressor final : public Compressor, Lz4Trait {
public:
- Lz4Decompressor();
- ~Lz4Decompressor() override;
+ explicit Lz4Compressor();
+ ~Lz4Compressor() override;
+
+ size_t compress(
+ const uint8_t* src,
+ int srcOffset,
+ int srcLength,
+ uint8_t* dst,
+ int dstOffset) override;
- int getOriginalLen(const uint8_t* src) override;
- int decompress(const uint8_t* src, uint8_t* dst, int dstOff) override;
+ size_t getDstCapacity(int length) override;
- Lz4Decompressor(const Lz4Decompressor&) = delete;
- Lz4Decompressor& operator=(const Lz4Decompressor&) = delete;
+ Lz4Compressor(const Lz4Compressor&) = delete;
+ Lz4Compressor& operator=(const Lz4Compressor&) = delete;
private:
- XXH32_state_t* xxhash_state_;
+ XXH32_state_t* xxhashState_;
};
} // namespace compress
diff --git a/cpp/celeborn/client/compress/Lz4Decompressor.cpp
b/cpp/celeborn/client/compress/Lz4Decompressor.cpp
index f85ced0d2..c221e9c62 100644
--- a/cpp/celeborn/client/compress/Lz4Decompressor.cpp
+++ b/cpp/celeborn/client/compress/Lz4Decompressor.cpp
@@ -25,16 +25,16 @@ namespace celeborn {
namespace client {
namespace compress {
Lz4Decompressor::Lz4Decompressor() {
- xxhash_state_ = XXH32_createState();
- if (!xxhash_state_) {
+ xxhashState_ = XXH32_createState();
+ if (!xxhashState_) {
CELEBORN_FAIL("Failed to create XXH32 state.")
}
- XXH32_reset(xxhash_state_, kDefaultSeed);
+ XXH32_reset(xxhashState_, kDefaultSeed);
}
Lz4Decompressor::~Lz4Decompressor() {
- if (xxhash_state_) {
- XXH32_freeState(xxhash_state_);
+ if (xxhashState_) {
+ XXH32_freeState(xxhashState_);
}
}
@@ -79,9 +79,9 @@ int Lz4Decompressor::decompress(
std::to_string(compressionMethod));
}
- XXH32_reset(xxhash_state_, kDefaultSeed);
- XXH32_update(xxhash_state_, dstPtr, originalLen);
- const uint32_t actualCheck = XXH32_digest(xxhash_state_) & 0xFFFFFFFL;
+ XXH32_reset(xxhashState_, kDefaultSeed);
+ XXH32_update(xxhashState_, dstPtr, originalLen);
+ const uint32_t actualCheck = XXH32_digest(xxhashState_) & 0xFFFFFFFL;
if (static_cast<uint32_t>(expectedCheck) != actualCheck) {
CELEBORN_FAIL(
diff --git a/cpp/celeborn/client/compress/Lz4Decompressor.h
b/cpp/celeborn/client/compress/Lz4Decompressor.h
index 3d34a6876..d574a0b3c 100644
--- a/cpp/celeborn/client/compress/Lz4Decompressor.h
+++ b/cpp/celeborn/client/compress/Lz4Decompressor.h
@@ -37,7 +37,7 @@ class Lz4Decompressor final : public Decompressor, Lz4Trait {
Lz4Decompressor& operator=(const Lz4Decompressor&) = delete;
private:
- XXH32_state_t* xxhash_state_;
+ XXH32_state_t* xxhashState_;
};
} // namespace compress
diff --git a/cpp/celeborn/client/tests/CMakeLists.txt
b/cpp/celeborn/client/tests/CMakeLists.txt
index 05d682576..9ac740474 100644
--- a/cpp/celeborn/client/tests/CMakeLists.txt
+++ b/cpp/celeborn/client/tests/CMakeLists.txt
@@ -17,7 +17,8 @@ add_executable(
celeborn_client_test
WorkerPartitionReaderTest.cpp
Lz4DecompressorTest.cpp
- ZstdDecompressorTest.cpp)
+ ZstdDecompressorTest.cpp
+ Lz4CompressorTest.cpp)
add_test(NAME celeborn_client_test COMMAND celeborn_client_test)
diff --git a/cpp/celeborn/client/tests/Lz4CompressorTest.cpp
b/cpp/celeborn/client/tests/Lz4CompressorTest.cpp
new file mode 100644
index 000000000..57ac9f22b
--- /dev/null
+++ b/cpp/celeborn/client/tests/Lz4CompressorTest.cpp
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gtest/gtest.h>
+
+#include "celeborn/client/compress/Lz4Compressor.h"
+#include "celeborn/client/compress/Lz4Decompressor.h"
+
+using namespace celeborn;
+using namespace celeborn::client;
+using namespace celeborn::protocol;
+
+TEST(Lz4CompressorTest, CompressWithLz4) {
+ compress::Lz4Compressor compressor;
+ const std::string toCompressData = "Helloooooooo Celeborn!!!!!!!!!!";
+
+ const auto maxLength = compressor.getDstCapacity(toCompressData.size());
+ std::vector<uint8_t> compressedData(maxLength);
+ compressor.compress(
+ reinterpret_cast<const uint8_t*>(toCompressData.data()),
+ 0,
+ toCompressData.size(),
+ compressedData.data(),
+ 0);
+
+ compress::Lz4Decompressor decompressor;
+
+ const int originalLen = decompressor.getOriginalLen(compressedData.data());
+
+ std::vector<uint8_t> decompressedData(originalLen + 1);
+ decompressedData[originalLen] = '\0';
+
+ const bool success = decompressor.decompress(
+ compressedData.data(), decompressedData.data(), 0);
+
+ EXPECT_TRUE(success);
+ EXPECT_EQ(reinterpret_cast<char*>(decompressedData.data()), toCompressData);
+}
+
+TEST(Lz4CompressorTest, CompressWithRaw) {
+ compress::Lz4Compressor compressor;
+ const std::string toCompressData = "Hello Celeborn!";
+
+ const auto maxLength = compressor.getDstCapacity(toCompressData.size());
+ std::vector<uint8_t> compressedData(maxLength);
+ compressor.compress(
+ reinterpret_cast<const uint8_t*>(toCompressData.data()),
+ 0,
+ toCompressData.size(),
+ compressedData.data(),
+ 0);
+
+ compress::Lz4Decompressor decompressor;
+
+ const int originalLen = decompressor.getOriginalLen(compressedData.data());
+
+ std::vector<uint8_t> decompressedData(originalLen + 1);
+ decompressedData[originalLen] = '\0';
+
+ const bool success = decompressor.decompress(
+ compressedData.data(), decompressedData.data(), 0);
+
+ EXPECT_TRUE(success);
+ EXPECT_EQ(reinterpret_cast<char*>(decompressedData.data()), toCompressData);
+}