This is an automated email from the ASF dual-hosted git repository. nicholasjiang pushed a commit to branch branch-0.6 in repository https://gitbox.apache.org/repos/asf/celeborn.git
commit ccb09d80c948b4793defb14ec4278e73eca4c7dd Author: Jray <[email protected]> AuthorDate: Fri Aug 8 18:19:48 2025 +0800 [CELEBORN-2090] Support Lz4 Decompression in CppClient ### What changes were proposed in this pull request? This PR adds support for lz4 decompression in CppClient. ### Why are the changes needed? To support reading from Celeborn with CppClient. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? By compilation and UTs. Closes #3402 from Jraaay/feat/cpp_client_lz4_decompression. Authored-by: Jray <[email protected]> Signed-off-by: SteNicholas <[email protected]> (cherry picked from commit cfb490c9380cd5e8e903042de9350b3eea8ce9c8) Signed-off-by: SteNicholas <[email protected]> --- .github/workflows/cpp_integration.yml | 15 +++- LICENSE | 3 + cpp/CMakeLists.txt | 6 ++ cpp/README.md | 2 +- cpp/celeborn/client/CMakeLists.txt | 5 +- cpp/celeborn/client/ShuffleClient.cpp | 14 +++- cpp/celeborn/client/ShuffleClient.h | 16 ++++ .../celeborn/client/compress/Decompressor.cpp | 25 ++++-- .../celeborn/client/compress/Decompressor.h | 33 ++++++-- cpp/celeborn/client/compress/Lz4Decompressor.cpp | 97 ++++++++++++++++++++++ .../celeborn/client/compress/Lz4Decompressor.h | 32 +++++-- .../celeborn/client/compress/Lz4Trait.h | 25 ++++-- cpp/celeborn/client/reader/CelebornInputStream.cpp | 41 +++++++-- cpp/celeborn/client/reader/CelebornInputStream.h | 8 +- cpp/celeborn/client/tests/CMakeLists.txt | 3 +- cpp/celeborn/client/tests/Lz4DecompressorTest.cpp | 70 ++++++++++++++++ cpp/celeborn/conf/CMakeLists.txt | 1 + cpp/celeborn/conf/CelebornConf.cpp | 8 ++ cpp/celeborn/conf/CelebornConf.h | 6 ++ cpp/celeborn/memory/ByteBuffer.cpp | 22 +++++ cpp/celeborn/memory/ByteBuffer.h | 13 ++- cpp/celeborn/memory/tests/ByteBufferTest.cpp | 76 +++++++++++++++++ cpp/celeborn/protocol/CMakeLists.txt | 3 +- .../celeborn/protocol/CompressionCodec.cpp | 30 +++++-- .../celeborn/protocol/CompressionCodec.h | 20 +++-- cpp/celeborn/tests/DataSumWithReaderClient.cpp | 11 ++- cpp/cmake/FindLZ4.cmake | 41 +++++++++ cpp/scripts/setup-ubuntu.sh | 2 + ...stBase.scala => JavaWriteCppReadTestBase.scala} | 23 +++-- ...ONE.scala => JavaWriteCppReadTestWithLZ4.scala} | 4 +- ...NE.scala => JavaWriteCppReadTestWithNONE.scala} | 4 +- 31 files changed, 587 insertions(+), 72 deletions(-) diff --git a/.github/workflows/cpp_integration.yml b/.github/workflows/cpp_integration.yml index 0af4c61e5..0b221aeb9 100644 --- a/.github/workflows/cpp_integration.yml +++ b/.github/workflows/cpp_integration.yml @@ -30,7 +30,7 @@ on: jobs: celeborn_cpp_check_lint: runs-on: ubuntu-22.04 - container: holylow/celeborn-cpp-dev:0.3 + container: jraaaay/celeborn-cpp-dev:0.4 steps: - uses: actions/checkout@v4 with: @@ -43,7 +43,7 @@ jobs: xargs clang-format-15 -style=file:./.clang-format -n --Werror celeborn_cpp_unit_test: runs-on: ubuntu-22.04 - container: holylow/celeborn-cpp-dev:0.3 + container: jraaaay/celeborn-cpp-dev:0.4 steps: - uses: actions/checkout@v4 with: @@ -59,7 +59,7 @@ jobs: run: ctest celeborn_cpp_integration_test: runs-on: ubuntu-22.04 - container: holylow/celeborn-cpp-dev:0.3 + container: jraaaay/celeborn-cpp-dev:0.4 steps: - uses: actions/checkout@v4 with: @@ -90,5 +90,12 @@ jobs: build/mvn -pl worker \ test-compile exec:java \ -Dexec.classpathScope="test" \ - -Dexec.mainClass="org.apache.celeborn.service.deploy.cluster.JavaReadCppWriteTestWithNONE" \ + -Dexec.mainClass="org.apache.celeborn.service.deploy.cluster.JavaWriteCppReadTestWithNONE" \ + -Dexec.args="-XX:MaxDirectMemorySize=2G" + - name: Run Java-Cpp Hybrid Integration Test (LZ4 Decompression) + run: | + build/mvn -pl worker \ + test-compile exec:java \ + -Dexec.classpathScope="test" \ + -Dexec.mainClass="org.apache.celeborn.service.deploy.cluster.JavaWriteCppReadTestWithLZ4" \ -Dexec.args="-XX:MaxDirectMemorySize=2G" diff --git a/LICENSE b/LICENSE index f757490f9..ee0a3f890 100644 --- a/LICENSE +++ b/LICENSE @@ -274,6 +274,9 @@ Meta Velox ./cpp/celeborn/conf/BaseConf.h ./cpp/celeborn/conf/BaseConf.cpp +Meta Folly +./cpp/cmake/FindLz4.cmake + ------------------------------------------------------------------------------------ This product bundles various third-party components under the CC0 license. This section summarizes those components. diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 8552b7ef1..f24a8abff 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -129,6 +129,12 @@ find_package(Sodium REQUIRED) find_library(FIZZ fizz REQUIRED) find_library(WANGLE wangle REQUIRED) +find_package(LZ4 REQUIRED) +set(LZ4_WITH_DEPENDENCIES + ${LZ4_LIBRARY} + xxhash +) + find_library(RE2 re2) find_package(fizz CONFIG REQUIRED) diff --git a/cpp/README.md b/cpp/README.md index 2aa07b4e1..d2c794228 100644 --- a/cpp/README.md +++ b/cpp/README.md @@ -21,7 +21,7 @@ docker run \ -w /celeborn \ -it --rm \ --name celeborn-cpp-dev-container \ - holylow/celeborn-cpp-dev:0.3 \ + jraaaay/celeborn-cpp-dev:0.4 \ /bin/bash ``` diff --git a/cpp/celeborn/client/CMakeLists.txt b/cpp/celeborn/client/CMakeLists.txt index 2491b7d7f..94e58795d 100644 --- a/cpp/celeborn/client/CMakeLists.txt +++ b/cpp/celeborn/client/CMakeLists.txt @@ -16,7 +16,9 @@ add_library( client reader/WorkerPartitionReader.cpp reader/CelebornInputStream.cpp - ShuffleClient.cpp) + ShuffleClient.cpp + compress/Decompressor.cpp + compress/Lz4Decompressor.cpp) target_include_directories(client PUBLIC ${CMAKE_BINARY_DIR}) @@ -31,6 +33,7 @@ target_link_libraries( ${FIZZ} ${LIBSODIUM_LIBRARY} ${FOLLY_WITH_DEPENDENCIES} + ${LZ4_WITH_DEPENDENCIES} ${GLOG} ${GFLAGS_LIBRARIES} ) diff --git a/cpp/celeborn/client/ShuffleClient.cpp b/cpp/celeborn/client/ShuffleClient.cpp index 8e8ca2121..ccf7c6dcb 100644 --- a/cpp/celeborn/client/ShuffleClient.cpp +++ b/cpp/celeborn/client/ShuffleClient.cpp @@ -48,6 +48,17 @@ std::unique_ptr<CelebornInputStream> ShuffleClientImpl::readPartition( int attemptNumber, int startMapIndex, int endMapIndex) { + return ShuffleClientImpl::readPartition( + shuffleId, partitionId, attemptNumber, startMapIndex, endMapIndex, true); +} + +std::unique_ptr<CelebornInputStream> ShuffleClientImpl::readPartition( + int shuffleId, + int partitionId, + int attemptNumber, + int startMapIndex, + int endMapIndex, + bool needCompression) { const auto& reducerFileGroupInfo = getReducerFileGroupInfo(shuffleId); std::string shuffleKey = utils::makeShuffleKey(appUniqueId_, shuffleId); std::vector<std::shared_ptr<const protocol::PartitionLocation>> locations; @@ -64,7 +75,8 @@ std::unique_ptr<CelebornInputStream> ShuffleClientImpl::readPartition( reducerFileGroupInfo.attempts, attemptNumber, startMapIndex, - endMapIndex); + endMapIndex, + needCompression); } void ShuffleClientImpl::updateReducerFileGroup(int shuffleId) { diff --git a/cpp/celeborn/client/ShuffleClient.h b/cpp/celeborn/client/ShuffleClient.h index 288e52e9e..284c7ade9 100644 --- a/cpp/celeborn/client/ShuffleClient.h +++ b/cpp/celeborn/client/ShuffleClient.h @@ -38,6 +38,14 @@ class ShuffleClient { int startMapIndex, int endMapIndex) = 0; + virtual std::unique_ptr<CelebornInputStream> readPartition( + int shuffleId, + int partitionId, + int attemptNumber, + int startMapIndex, + int endMapIndex, + bool needCompression) = 0; + virtual bool cleanupShuffle(int shuffleId) = 0; virtual void shutdown() = 0; @@ -62,6 +70,14 @@ class ShuffleClientImpl : public ShuffleClient { int startMapIndex, int endMapIndex) override; + std::unique_ptr<CelebornInputStream> readPartition( + int shuffleId, + int partitionId, + int attemptNumber, + int startMapIndex, + int endMapIndex, + bool needCompression) override; + void updateReducerFileGroup(int shuffleId) override; bool cleanupShuffle(int shuffleId) override; diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala b/cpp/celeborn/client/compress/Decompressor.cpp similarity index 55% copy from worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala copy to cpp/celeborn/client/compress/Decompressor.cpp index 4aa5b0ed7..999d01822 100644 --- a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala +++ b/cpp/celeborn/client/compress/Decompressor.cpp @@ -15,13 +15,28 @@ * limitations under the License. */ -package org.apache.celeborn.service.deploy.cluster +#include <stdexcept> -import org.apache.celeborn.common.protocol.CompressionCodec +#include "celeborn/client/compress/Lz4Decompressor.h" +#include "celeborn/utils/Exceptions.h" -object JavaReadCppWriteTestWithNONE extends JavaReadCppWriteTestBase { +namespace celeborn { +namespace client { +namespace compress { - def main(args: Array[String]) = { - testJavaReadCppWrite(CompressionCodec.NONE) +std::unique_ptr<Decompressor> Decompressor::createDecompressor( + protocol::CompressionCodec codec) { + switch (codec) { + case protocol::CompressionCodec::LZ4: + return std::make_unique<Lz4Decompressor>(); + case protocol::CompressionCodec::ZSTD: + // TODO: impl zstd + CELEBORN_FAIL("Compression codec ZSTD is not supported."); + default: + CELEBORN_FAIL("Unknown compression codec."); } } + +} // namespace compress +} // namespace client +} // namespace celeborn diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala b/cpp/celeborn/client/compress/Decompressor.h similarity index 53% copy from worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala copy to cpp/celeborn/client/compress/Decompressor.h index 4aa5b0ed7..694c07d42 100644 --- a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala +++ b/cpp/celeborn/client/compress/Decompressor.h @@ -15,13 +15,34 @@ * limitations under the License. */ -package org.apache.celeborn.service.deploy.cluster +#pragma once -import org.apache.celeborn.common.protocol.CompressionCodec +#include <memory> -object JavaReadCppWriteTestWithNONE extends JavaReadCppWriteTestBase { +#include "celeborn/protocol/CompressionCodec.h" - def main(args: Array[String]) = { - testJavaReadCppWrite(CompressionCodec.NONE) +namespace celeborn { +namespace client { +namespace compress { + +class Decompressor { + public: + virtual ~Decompressor() = default; + + virtual int decompress(const uint8_t* src, uint8_t* dst, int dst_off) = 0; + + virtual int getOriginalLen(const uint8_t* src) = 0; + + static std::unique_ptr<Decompressor> createDecompressor( + protocol::CompressionCodec codec); + + protected: + static int32_t readIntLE(const uint8_t* buf, const int i) { + const auto p = buf; + return (p[i]) | (p[i + 1] << 8) | (p[i + 2] << 16) | (p[i + 3] << 24); } -} +}; + +} // namespace compress +} // namespace client +} // namespace celeborn diff --git a/cpp/celeborn/client/compress/Lz4Decompressor.cpp b/cpp/celeborn/client/compress/Lz4Decompressor.cpp new file mode 100644 index 000000000..f85ced0d2 --- /dev/null +++ b/cpp/celeborn/client/compress/Lz4Decompressor.cpp @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "celeborn/client/compress/Lz4Decompressor.h" +#include <lz4.h> +#include <cstring> +#include <iostream> +#include "celeborn/utils/Exceptions.h" + +namespace celeborn { +namespace client { +namespace compress { +Lz4Decompressor::Lz4Decompressor() { + xxhash_state_ = XXH32_createState(); + if (!xxhash_state_) { + CELEBORN_FAIL("Failed to create XXH32 state.") + } + XXH32_reset(xxhash_state_, kDefaultSeed); +} + +Lz4Decompressor::~Lz4Decompressor() { + if (xxhash_state_) { + XXH32_freeState(xxhash_state_); + } +} + +int Lz4Decompressor::getOriginalLen(const uint8_t* src) { + return readIntLE(src, kMagicLength + 5); +} + +int Lz4Decompressor::decompress( + const uint8_t* src, + uint8_t* dst, + const int dstOff) { + const int compressionMethod = static_cast<unsigned char>(src[kMagicLength]); + const int compressedLen = readIntLE(src, kMagicLength + 1); + const int originalLen = readIntLE(src, kMagicLength + 5); + const int expectedCheck = readIntLE(src, kMagicLength + 9); + + const uint8_t* compressedDataPtr = src + kHeaderLength; + uint8_t* dstPtr = dst + dstOff; + + switch (compressionMethod) { + case kCompressionMethodRaw: + std::memcpy(dstPtr, compressedDataPtr, originalLen); + break; + case kCompressionMethodLZ4: { + const int decompressedBytes = LZ4_decompress_safe( + reinterpret_cast<const char*>(compressedDataPtr), + reinterpret_cast<char*>(dstPtr), + compressedLen, + originalLen); + + if (decompressedBytes != originalLen) { + CELEBORN_FAIL( + std::string("Decompression failed! LZ4 error or size mismatch. ") + + "Expected: " + std::to_string(originalLen) + + ", Got: " + std::to_string(decompressedBytes)); + } + break; + } + default: + CELEBORN_FAIL( + std::string("Unsupported compression method: ") + + std::to_string(compressionMethod)); + } + + XXH32_reset(xxhash_state_, kDefaultSeed); + XXH32_update(xxhash_state_, dstPtr, originalLen); + const uint32_t actualCheck = XXH32_digest(xxhash_state_) & 0xFFFFFFFL; + + if (static_cast<uint32_t>(expectedCheck) != actualCheck) { + CELEBORN_FAIL( + std::string("Checksum mismatch! Expected: ") + + std::to_string(expectedCheck) + + ", Actual: " + std::to_string(actualCheck)); + } + + return originalLen; +} +} // namespace compress +} // namespace client +} // namespace celeborn diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala b/cpp/celeborn/client/compress/Lz4Decompressor.h similarity index 54% copy from worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala copy to cpp/celeborn/client/compress/Lz4Decompressor.h index 4aa5b0ed7..3d34a6876 100644 --- a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala +++ b/cpp/celeborn/client/compress/Lz4Decompressor.h @@ -15,13 +15,31 @@ * limitations under the License. */ -package org.apache.celeborn.service.deploy.cluster +#pragma once -import org.apache.celeborn.common.protocol.CompressionCodec +#include <xxhash.h> +#include "celeborn/client/compress/Decompressor.h" +#include "celeborn/client/compress/Lz4Trait.h" -object JavaReadCppWriteTestWithNONE extends JavaReadCppWriteTestBase { +namespace celeborn { +namespace client { +namespace compress { - def main(args: Array[String]) = { - testJavaReadCppWrite(CompressionCodec.NONE) - } -} +class Lz4Decompressor final : public Decompressor, Lz4Trait { + public: + Lz4Decompressor(); + ~Lz4Decompressor() override; + + int getOriginalLen(const uint8_t* src) override; + int decompress(const uint8_t* src, uint8_t* dst, int dstOff) override; + + Lz4Decompressor(const Lz4Decompressor&) = delete; + Lz4Decompressor& operator=(const Lz4Decompressor&) = delete; + + private: + XXH32_state_t* xxhash_state_; +}; + +} // namespace compress +} // namespace client +} // namespace celeborn diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala b/cpp/celeborn/client/compress/Lz4Trait.h similarity index 60% copy from worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala copy to cpp/celeborn/client/compress/Lz4Trait.h index 4aa5b0ed7..fc236a1c5 100644 --- a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala +++ b/cpp/celeborn/client/compress/Lz4Trait.h @@ -15,13 +15,24 @@ * limitations under the License. */ -package org.apache.celeborn.service.deploy.cluster +#pragma once -import org.apache.celeborn.common.protocol.CompressionCodec +namespace celeborn { +namespace client { +namespace compress { -object JavaReadCppWriteTestWithNONE extends JavaReadCppWriteTestBase { +struct Lz4Trait { + static constexpr char kMagic[] = {'L', 'Z', '4', 'B', 'l', 'o', 'c', 'k'}; + static constexpr int kMagicLength = sizeof(kMagic); - def main(args: Array[String]) = { - testJavaReadCppWrite(CompressionCodec.NONE) - } -} + static constexpr int kHeaderLength = kMagicLength + 1 + 4 + 4 + 4; + + static constexpr int kCompressionMethodRaw = 0x10; + static constexpr int kCompressionMethodLZ4 = 0x20; + + static constexpr int kDefaultSeed = 0x9747b28c; +}; + +} // namespace compress +} // namespace client +} // namespace celeborn diff --git a/cpp/celeborn/client/reader/CelebornInputStream.cpp b/cpp/celeborn/client/reader/CelebornInputStream.cpp index a3023d6f3..d75b705e5 100644 --- a/cpp/celeborn/client/reader/CelebornInputStream.cpp +++ b/cpp/celeborn/client/reader/CelebornInputStream.cpp @@ -16,6 +16,8 @@ */ #include "celeborn/client/reader/CelebornInputStream.h" +#include <lz4.h> +#include "celeborn/client/compress/Decompressor.h" namespace celeborn { namespace client { @@ -27,7 +29,8 @@ CelebornInputStream::CelebornInputStream( const std::vector<int>& attempts, int attemptNumber, int startMapIndex, - int endMapIndex) + int endMapIndex, + bool needCompression) : shuffleKey_(shuffleKey), conf_(conf), clientFactory_(clientFactory), @@ -38,7 +41,15 @@ CelebornInputStream::CelebornInputStream( endMapIndex_(endMapIndex), currLocationIndex_(0), currBatchPos_(0), - currBatchSize_(0) { + currBatchSize_(0), + shouldDecompress_( + conf_->shuffleCompressionCodec() != + protocol::CompressionCodec::NONE && + needCompression) { + if (shouldDecompress_) { + decompressor_ = compress::Decompressor::createDecompressor( + conf_->shuffleCompressionCodec()); + } moveToNextReader(); } @@ -57,8 +68,8 @@ int CelebornInputStream::read(uint8_t* buffer, size_t offset, size_t len) { } size_t batchRemainingSize = currBatchSize_ - currBatchPos_; size_t toReadBytes = std::min(len - readBytes, batchRemainingSize); - CELEBORN_CHECK_GE(currChunk_->remainingSize(), toReadBytes); - auto size = currChunk_->readToBuffer(&buf[readBytes], toReadBytes); + CELEBORN_CHECK_GE(decompressedChunk_->remainingSize(), toReadBytes); + auto size = decompressedChunk_->readToBuffer(&buf[readBytes], toReadBytes); CELEBORN_CHECK_EQ(toReadBytes, size); readBytes += toReadBytes; currBatchPos_ += toReadBytes; @@ -83,13 +94,31 @@ bool CelebornInputStream::fillBuffer() { CELEBORN_CHECK_GE(currChunk_->remainingSize(), size); CELEBORN_CHECK_LT(mapId, attempts_.size()); - // TODO: compression is not supported yet! + if (shouldDecompress_) { + if (size > compressedBuf_.size()) { + compressedBuf_.resize(size); + } + currChunk_->readToBuffer(compressedBuf_.data(), size); + } if (attemptId == attempts_[mapId]) { auto& batchRecord = getBatchRecord(mapId); if (batchRecord.count(batchId) <= 0) { batchRecord.insert(batchId); - currBatchSize_ = size; + if (shouldDecompress_) { + const auto originalLength = + decompressor_->getOriginalLen(compressedBuf_.data()); + std::unique_ptr<folly::IOBuf> decompressedBuf_ = + folly::IOBuf::createCombined(originalLength); + decompressedBuf_->append(originalLength); + currBatchSize_ = decompressor_->decompress( + compressedBuf_.data(), decompressedBuf_->writableData(), 0); + decompressedChunk_ = memory::ByteBuffer::createReadOnly( + std::move(decompressedBuf_), false); + } else { + currBatchSize_ = size; + decompressedChunk_ = currChunk_->readToReadOnlyBuffer(size); + } currBatchPos_ = 0; hasData = true; break; diff --git a/cpp/celeborn/client/reader/CelebornInputStream.h b/cpp/celeborn/client/reader/CelebornInputStream.h index af143321c..5dd9c4f76 100644 --- a/cpp/celeborn/client/reader/CelebornInputStream.h +++ b/cpp/celeborn/client/reader/CelebornInputStream.h @@ -17,6 +17,7 @@ #pragma once +#include "celeborn/client/compress/Decompressor.h" #include "celeborn/client/reader/WorkerPartitionReader.h" #include "celeborn/conf/CelebornConf.h" @@ -33,7 +34,8 @@ class CelebornInputStream { const std::vector<int>& attempts, int attemptNumber, int startMapIndex, - int endMapIndex); + int endMapIndex, + bool needCompression); int read(uint8_t* buffer, size_t offset, size_t len); @@ -68,9 +70,13 @@ class CelebornInputStream { int attemptNumber_; int startMapIndex_; int endMapIndex_; + bool shouldDecompress_; + std::unique_ptr<compress::Decompressor> decompressor_; + std::vector<uint8_t> compressedBuf_; int currLocationIndex_; std::unique_ptr<memory::ReadOnlyByteBuffer> currChunk_; + std::unique_ptr<memory::ReadOnlyByteBuffer> decompressedChunk_; size_t currBatchPos_; size_t currBatchSize_; std::shared_ptr<PartitionReader> currReader_; diff --git a/cpp/celeborn/client/tests/CMakeLists.txt b/cpp/celeborn/client/tests/CMakeLists.txt index 6543ca28d..ab1dea923 100644 --- a/cpp/celeborn/client/tests/CMakeLists.txt +++ b/cpp/celeborn/client/tests/CMakeLists.txt @@ -15,7 +15,8 @@ add_executable( celeborn_client_test - WorkerPartitionReaderTest.cpp) + WorkerPartitionReaderTest.cpp + Lz4DecompressorTest.cpp) add_test(NAME celeborn_client_test COMMAND celeborn_client_test) diff --git a/cpp/celeborn/client/tests/Lz4DecompressorTest.cpp b/cpp/celeborn/client/tests/Lz4DecompressorTest.cpp new file mode 100644 index 000000000..6cd786367 --- /dev/null +++ b/cpp/celeborn/client/tests/Lz4DecompressorTest.cpp @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <gtest/gtest.h> + +#include "celeborn/client/compress/Lz4Decompressor.h" + +using namespace celeborn; +using namespace celeborn::client; +using namespace celeborn::protocol; + +TEST(Lz4DecompressorTest, DecompressWithLz4) { + compress::Lz4Decompressor decompressor; + + std::vector<uint8_t> compressedData = { + 76, 90, 52, 66, 108, 111, 99, 107, 32, 29, 0, 0, 0, + 31, 0, 0, 0, 116, 18, 177, 8, 83, 72, 101, 108, 108, + 111, 1, 0, 240, 4, 32, 67, 101, 108, 101, 98, 111, 114, + 110, 33, 33, 33, 33, 33, 33, 33, 33, 33, 33}; + + const int originalLen = decompressor.getOriginalLen(compressedData.data()); + + const auto decompressedData = new uint8_t[originalLen + 1]; + decompressedData[originalLen] = '\0'; + + const bool success = + decompressor.decompress(compressedData.data(), decompressedData, 0); + + EXPECT_TRUE(success); + + EXPECT_EQ( + reinterpret_cast<char*>(decompressedData), + std::string("Helloooooooo Celeborn!!!!!!!!!!")); +} + +TEST(Lz4DecompressorTest, DecompressWithRaw) { + compress::Lz4Decompressor decompressor; + + std::vector<uint8_t> compressedData = { + 76, 90, 52, 66, 108, 111, 99, 107, 16, 15, 0, 0, 0, + 15, 0, 0, 0, 188, 66, 58, 13, 72, 101, 108, 108, 111, + 32, 67, 101, 108, 101, 98, 111, 114, 110, 33, 110, 33}; + + const int originalLen = decompressor.getOriginalLen(compressedData.data()); + + const auto decompressedData = new uint8_t[originalLen + 1]; + decompressedData[originalLen] = '\0'; + + const bool success = + decompressor.decompress(compressedData.data(), decompressedData, 0); + + EXPECT_TRUE(success); + + EXPECT_EQ( + reinterpret_cast<char*>(decompressedData), + std::string("Hello Celeborn!")); +} diff --git a/cpp/celeborn/conf/CMakeLists.txt b/cpp/celeborn/conf/CMakeLists.txt index f9fbca886..86dfc7e8d 100644 --- a/cpp/celeborn/conf/CMakeLists.txt +++ b/cpp/celeborn/conf/CMakeLists.txt @@ -20,6 +20,7 @@ add_library( target_link_libraries( conf + protocol utils memory ${FOLLY_WITH_DEPENDENCIES} diff --git a/cpp/celeborn/conf/CelebornConf.cpp b/cpp/celeborn/conf/CelebornConf.cpp index f8d34862e..4f85c19a0 100644 --- a/cpp/celeborn/conf/CelebornConf.cpp +++ b/cpp/celeborn/conf/CelebornConf.cpp @@ -140,6 +140,9 @@ const std::unordered_map<std::string, folly::Optional<std::string>> NUM_PROP(kNetworkIoNumConnectionsPerPeer, "1"), NUM_PROP(kNetworkIoClientThreads, 0), NUM_PROP(kClientFetchMaxReqsInFlight, 3), + STR_PROP( + kShuffleCompressionCodec, + protocol::toString(protocol::CompressionCodec::NONE)), // NUM_PROP(kNumExample, 50'000), // BOOL_PROP(kBoolExample, false), }; @@ -202,5 +205,10 @@ int CelebornConf::networkIoClientThreads() const { int CelebornConf::clientFetchMaxReqsInFlight() const { return std::stoi(optionalProperty(kClientFetchMaxReqsInFlight).value()); } + +protocol::CompressionCodec CelebornConf::shuffleCompressionCodec() const { + return protocol::toCompressionCodec( + optionalProperty(kShuffleCompressionCodec).value()); +} } // namespace conf } // namespace celeborn diff --git a/cpp/celeborn/conf/CelebornConf.h b/cpp/celeborn/conf/CelebornConf.h index 7f36823fa..783bc96e1 100644 --- a/cpp/celeborn/conf/CelebornConf.h +++ b/cpp/celeborn/conf/CelebornConf.h @@ -18,6 +18,7 @@ #pragma once #include "celeborn/conf/BaseConf.h" +#include "celeborn/protocol/CompressionCodec.h" #include "celeborn/utils/CelebornUtils.h" namespace celeborn { @@ -60,6 +61,9 @@ class CelebornConf : public BaseConf { static constexpr std::string_view kClientFetchMaxReqsInFlight{ "celeborn.client.fetch.maxReqsInFlight"}; + static constexpr std::string_view kShuffleCompressionCodec{ + "celeborn.client.shuffle.compression.codec"}; + CelebornConf(); CelebornConf(const std::string& filename); @@ -83,6 +87,8 @@ class CelebornConf : public BaseConf { int networkIoClientThreads() const; int clientFetchMaxReqsInFlight() const; + + protocol::CompressionCodec shuffleCompressionCodec() const; }; } // namespace conf } // namespace celeborn diff --git a/cpp/celeborn/memory/ByteBuffer.cpp b/cpp/celeborn/memory/ByteBuffer.cpp index 061ca2edf..a44ba92bd 100644 --- a/cpp/celeborn/memory/ByteBuffer.cpp +++ b/cpp/celeborn/memory/ByteBuffer.cpp @@ -74,5 +74,27 @@ std::unique_ptr<folly::IOBuf> ByteBuffer::trimBuffer( } return std::move(data); } + +std::unique_ptr<ReadOnlyByteBuffer> ReadOnlyByteBuffer::readToReadOnlyBuffer( + const size_t len) const { + std::unique_ptr<folly::IOBuf> leftData = folly::IOBuf::create(0); + auto cnt = 0; + while (cnt < len) { + if (this->remainingSize() == 0) { + break; + } + std::unique_ptr<folly::IOBuf> newBlock = + std::move(this->cursor_->currentBuffer()->clone()); + newBlock->pop(); + newBlock->trimStart(this->cursor_->getPositionInCurrentBuffer()); + if (newBlock->length() > len - cnt) { + newBlock->trimEnd(newBlock->length() - (len - cnt)); + } + this->cursor_->skip(newBlock->length()); + cnt += newBlock->length(); + leftData->appendToChain(std::move(newBlock)); + } + return createReadOnly(std::move(leftData), isBigEndian_); +} } // namespace memory } // namespace celeborn diff --git a/cpp/celeborn/memory/ByteBuffer.h b/cpp/celeborn/memory/ByteBuffer.h index 763582d32..0c1027fe2 100644 --- a/cpp/celeborn/memory/ByteBuffer.h +++ b/cpp/celeborn/memory/ByteBuffer.h @@ -48,12 +48,11 @@ class ByteBuffer { assert(data_); } - std::unique_ptr<folly::IOBuf> data_; - bool isBigEndian_; - - private: static std::unique_ptr<folly::IOBuf> trimBuffer( const ReadOnlyByteBuffer& buffer); + + std::unique_ptr<folly::IOBuf> data_; + bool isBigEndian_; }; class ReadOnlyByteBuffer : public ByteBuffer { @@ -126,6 +125,8 @@ class ReadOnlyByteBuffer : public ByteBuffer { return cursor_->pullAtMost(buf, len); } + std::unique_ptr<ReadOnlyByteBuffer> readToReadOnlyBuffer(size_t len) const; + std::unique_ptr<folly::IOBuf> getData() const { return data_->clone(); } @@ -171,6 +172,10 @@ class WriteOnlyByteBuffer : public ByteBuffer { appender_->push(reinterpret_cast<const uint8_t*>(ptr), data.size()); } + void writeFromBuffer(const void* data, const size_t len) const { + appender_->push(static_cast<const uint8_t*>(data), len); + } + size_t size() const { return data_->computeChainDataLength(); } diff --git a/cpp/celeborn/memory/tests/ByteBufferTest.cpp b/cpp/celeborn/memory/tests/ByteBufferTest.cpp index 350b019a6..f6c4c6711 100644 --- a/cpp/celeborn/memory/tests/ByteBufferTest.cpp +++ b/cpp/celeborn/memory/tests/ByteBufferTest.cpp @@ -144,6 +144,75 @@ void testReadData(ReadOnlyByteBuffer* readBuffer, size_t size) { EXPECT_THROW(readBuffer->readLE<int64_t>(), std::exception); } +void testReadOnlyBufferReadData(ReadOnlyByteBuffer* readBuffer, size_t size) { + EXPECT_EQ(size, testSize); + size_t remainingSize = size; + EXPECT_EQ(readBuffer->size(), size); + EXPECT_EQ(readBuffer->remainingSize(), remainingSize); + + // Test read string. + auto strRead = + readBuffer->readToReadOnlyBuffer(strPayload.size())->readToString(); + EXPECT_EQ(strRead, strPayload); + remainingSize -= strPayload.size(); + EXPECT_EQ(readBuffer->remainingSize(), remainingSize); + + // Test read BigEndian. + EXPECT_EQ( + readBuffer->readToReadOnlyBuffer(sizeof(int16_t))->readBE<int16_t>(), + int16Payload); + remainingSize -= sizeof(int16_t); + EXPECT_EQ(readBuffer->remainingSize(), remainingSize); + EXPECT_EQ( + readBuffer->readToReadOnlyBuffer(sizeof(int32_t))->readBE<int32_t>(), + int32Payload); + remainingSize -= sizeof(int32_t); + EXPECT_EQ(readBuffer->remainingSize(), remainingSize); + EXPECT_EQ( + readBuffer->readToReadOnlyBuffer(sizeof(int64_t))->readBE<int64_t>(), + int64Payload); + remainingSize -= sizeof(int64_t); + EXPECT_EQ(readBuffer->remainingSize(), remainingSize); + + // Test read LittleEndian. + EXPECT_EQ( + readBuffer->readToReadOnlyBuffer(sizeof(int16_t))->readLE<int16_t>(), + int16Payload); + remainingSize -= sizeof(int16_t); + EXPECT_EQ(readBuffer->remainingSize(), remainingSize); + EXPECT_EQ( + readBuffer->readToReadOnlyBuffer(sizeof(int32_t))->readLE<int32_t>(), + int32Payload); + remainingSize -= sizeof(int32_t); + EXPECT_EQ(readBuffer->remainingSize(), remainingSize); + EXPECT_EQ( + readBuffer->readToReadOnlyBuffer(sizeof(int64_t))->readLE<int64_t>(), + int64Payload); + remainingSize -= sizeof(int64_t); + EXPECT_EQ(readBuffer->remainingSize(), remainingSize); + + // Test retreat and skip. + const auto retreatSize = sizeof(int32_t) + sizeof(int64_t); + remainingSize += retreatSize; + readBuffer->retreat(retreatSize); + EXPECT_EQ(readBuffer->remainingSize(), remainingSize); + EXPECT_EQ( + readBuffer->readToReadOnlyBuffer(sizeof(int32_t))->readLE<int32_t>(), + int32Payload); + remainingSize -= sizeof(int32_t); + EXPECT_EQ(readBuffer->remainingSize(), remainingSize); + readBuffer->skip(sizeof(int64_t)); + remainingSize -= sizeof(int64_t); + EXPECT_EQ(readBuffer->remainingSize(), remainingSize); + + // Test read end. + EXPECT_EQ(readBuffer->size(), size); + EXPECT_EQ(readBuffer->remainingSize(), 0); + EXPECT_THROW( + readBuffer->readToReadOnlyBuffer(sizeof(int64_t))->readLE<int64_t>(), + std::exception); +} + TEST(ByteBufferTest, continuousBufferRead) { size_t size = 0; auto data = createRawData(size); @@ -189,6 +258,13 @@ TEST(ByteBufferTest, writeBufferAndRead) { testReadData(readBuffer.get(), size); } +TEST(ByteBufferTest, readOnlyBufferRead) { + size_t size = 0; + auto writeBuffer = createWriteOnlyBuffer(size); + auto readBuffer = WriteOnlyByteBuffer::toReadOnly(std::move(writeBuffer)); + testReadOnlyBufferReadData(readBuffer.get(), size); +} + TEST(ByteBufferTest, concatReadBuffer) { size_t size = 0; auto writeBuffer1 = createWriteOnlyBuffer(size); diff --git a/cpp/celeborn/protocol/CMakeLists.txt b/cpp/celeborn/protocol/CMakeLists.txt index a1dc054d3..f0b0bea78 100644 --- a/cpp/celeborn/protocol/CMakeLists.txt +++ b/cpp/celeborn/protocol/CMakeLists.txt @@ -17,7 +17,8 @@ add_library( STATIC PartitionLocation.cpp TransportMessage.cpp - ControlMessages.cpp) + ControlMessages.cpp + CompressionCodec.cpp) target_include_directories(protocol PUBLIC ${CMAKE_BINARY_DIR}) diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala b/cpp/celeborn/protocol/CompressionCodec.cpp similarity index 56% copy from worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala copy to cpp/celeborn/protocol/CompressionCodec.cpp index 4aa5b0ed7..93f90ebd1 100644 --- a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala +++ b/cpp/celeborn/protocol/CompressionCodec.cpp @@ -15,13 +15,31 @@ * limitations under the License. */ -package org.apache.celeborn.service.deploy.cluster +#include "celeborn/protocol/CompressionCodec.h" -import org.apache.celeborn.common.protocol.CompressionCodec - -object JavaReadCppWriteTestWithNONE extends JavaReadCppWriteTestBase { +namespace celeborn { +namespace protocol { +CompressionCodec toCompressionCodec(std::string_view code) { + if (code == "LZ4") { + return CompressionCodec::LZ4; + } + if (code == "ZSTD") { + return CompressionCodec::ZSTD; + } + return CompressionCodec::NONE; +} - def main(args: Array[String]) = { - testJavaReadCppWrite(CompressionCodec.NONE) +std::string_view toString(CompressionCodec codec) { + switch (codec) { + case CompressionCodec::LZ4: + return "LZ4"; + case CompressionCodec::ZSTD: + return "ZSTD"; + case CompressionCodec::NONE: + return "NONE"; + default: + return "UNKNOWN"; } } +} // namespace protocol +} // namespace celeborn diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala b/cpp/celeborn/protocol/CompressionCodec.h similarity index 73% copy from worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala copy to cpp/celeborn/protocol/CompressionCodec.h index 4aa5b0ed7..785167c1f 100644 --- a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala +++ b/cpp/celeborn/protocol/CompressionCodec.h @@ -15,13 +15,19 @@ * limitations under the License. */ -package org.apache.celeborn.service.deploy.cluster +#pragma once +#include <string_view> -import org.apache.celeborn.common.protocol.CompressionCodec +namespace celeborn { +namespace protocol { +enum class CompressionCodec { + LZ4, + ZSTD, + NONE, +}; -object JavaReadCppWriteTestWithNONE extends JavaReadCppWriteTestBase { +CompressionCodec toCompressionCodec(std::string_view code); - def main(args: Array[String]) = { - testJavaReadCppWrite(CompressionCodec.NONE) - } -} +std::string_view toString(CompressionCodec codec); +} // namespace protocol +} // namespace celeborn diff --git a/cpp/celeborn/tests/DataSumWithReaderClient.cpp b/cpp/celeborn/tests/DataSumWithReaderClient.cpp index d60166e92..8c060fa54 100644 --- a/cpp/celeborn/tests/DataSumWithReaderClient.cpp +++ b/cpp/celeborn/tests/DataSumWithReaderClient.cpp @@ -23,7 +23,7 @@ int main(int argc, char** argv) { // Read the configs. - assert(argc == 8); + assert(argc == 9); std::string lifecycleManagerHost = argv[1]; int lifecycleManagerPort = std::atoi(argv[2]); std::string appUniqueId = argv[3]; @@ -31,15 +31,19 @@ int main(int argc, char** argv) { int attemptId = std::atoi(argv[5]); int numPartitions = std::atoi(argv[6]); std::string resultFile = argv[7]; + std::string compressCodec = argv[8]; std::cout << "lifecycleManagerHost = " << lifecycleManagerHost << ", lifecycleManagerPort = " << lifecycleManagerPort << ", appUniqueId = " << appUniqueId << ", shuffleId = " << shuffleId << ", attemptId = " << attemptId << ", numPartitions = " << numPartitions - << ", resultFile = " << resultFile << std::endl; + << ", resultFile = " << resultFile << std::endl + << ", compressCodec = " << compressCodec << std::endl; // Create shuffleClient and setup. auto conf = std::make_shared<celeborn::conf::CelebornConf>(); + conf->registerProperty( + celeborn::conf::CelebornConf::kShuffleCompressionCodec, compressCodec); auto clientFactory = std::make_shared<celeborn::network::TransportClientFactory>(conf); auto shuffleClient = std::make_unique<celeborn::client::ShuffleClientImpl>( @@ -63,6 +67,9 @@ int main(int argc, char** argv) { dataCnt++; continue; } + if (c == '+') { + continue; + } assert(c >= '0' && c <= '9'); data *= 10; data += c - '0'; diff --git a/cpp/cmake/FindLZ4.cmake b/cpp/cmake/FindLZ4.cmake new file mode 100644 index 000000000..918568b55 --- /dev/null +++ b/cpp/cmake/FindLZ4.cmake @@ -0,0 +1,41 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Finds liblz4. +# +# This module defines: +# LZ4_FOUND +# LZ4_INCLUDE_DIR +# LZ4_LIBRARY +# + +find_path(LZ4_INCLUDE_DIR NAMES lz4.h) + +find_library(LZ4_LIBRARY_DEBUG NAMES lz4d) +find_library(LZ4_LIBRARY_RELEASE NAMES lz4) + +include(SelectLibraryConfigurations) +SELECT_LIBRARY_CONFIGURATIONS(LZ4) + +include(FindPackageHandleStandardArgs) +FIND_PACKAGE_HANDLE_STANDARD_ARGS( + LZ4 DEFAULT_MSG + LZ4_LIBRARY LZ4_INCLUDE_DIR +) + +if (LZ4_FOUND) + message(STATUS "Found LZ4: ${LZ4_LIBRARY}") +endif() + +mark_as_advanced(LZ4_INCLUDE_DIR LZ4_LIBRARY) diff --git a/cpp/scripts/setup-ubuntu.sh b/cpp/scripts/setup-ubuntu.sh index fe137a180..e26ffa4f3 100755 --- a/cpp/scripts/setup-ubuntu.sh +++ b/cpp/scripts/setup-ubuntu.sh @@ -126,6 +126,8 @@ function install_celeborn_cpp_deps_from_apt { libgmock-dev \ libevent-dev \ libsodium-dev \ + libxxhash-dev \ + liblz4-dev \ libzstd-dev \ libre2-dev } diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestBase.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaWriteCppReadTestBase.scala similarity index 89% rename from worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestBase.scala rename to worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaWriteCppReadTestBase.scala index a57ffebe8..a124cc590 100644 --- a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestBase.scala +++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaWriteCppReadTestBase.scala @@ -35,7 +35,7 @@ import org.apache.celeborn.common.protocol.CompressionCodec import org.apache.celeborn.common.util.Utils.runCommand import org.apache.celeborn.service.deploy.MiniClusterFeature -trait JavaReadCppWriteTestBase extends AnyFunSuite +trait JavaWriteCppReadTestBase extends AnyFunSuite with Logging with MiniClusterFeature with BeforeAndAfterAll { var masterPort = 0 @@ -51,16 +51,16 @@ trait JavaReadCppWriteTestBase extends AnyFunSuite shutdownMiniCluster() } - def testJavaReadCppWrite(codec: CompressionCodec): Unit = { + def testJavaWriteCppRead(codec: CompressionCodec): Unit = { beforeAll() try { - runJavaReadCppWrite(codec) + runJavaWriteCppRead(codec) } finally { afterAll() } } - def runJavaReadCppWrite(codec: CompressionCodec): Unit = { + def runJavaWriteCppRead(codec: CompressionCodec): Unit = { val appUniqueId = "test-app" val shuffleId = 0 val attemptId = 0 @@ -87,13 +87,19 @@ trait JavaReadCppWriteTestBase extends AnyFunSuite val numData = 1000 var sums = new util.ArrayList[Long](numPartitions) val rand = new Random() + var prefix = "-" + if (codec != CompressionCodec.NONE) { + // Add duplicate strings to make the compressed length shorter than the original length. + // Will be dropped in cpp test client. + prefix = prefix ++ "++++++++++" + } for (mapId <- 0 until numMappers) { for (partitionId <- 0 until numPartitions) { sums.add(0) for (i <- 0 until numData) { val data = rand.nextInt(maxData) sums.set(partitionId, sums.get(partitionId) + data) - val dataStr = "-" + data.toString + val dataStr = prefix + data.toString shuffleClient.pushOrMergeData( shuffleId, mapId, @@ -105,7 +111,7 @@ trait JavaReadCppWriteTestBase extends AnyFunSuite numMappers, numPartitions, false, - true) + false) } } shuffleClient.pushMergedData(shuffleId, mapId, attemptId) @@ -120,9 +126,10 @@ trait JavaReadCppWriteTestBase extends AnyFunSuite val cppBinRelativeDirectory = "cpp/build/celeborn/tests/" val cppBinFileName = "cppDataSumWithReaderClient" val cppBinFilePath = s"$projectDirectory/$cppBinRelativeDirectory/$cppBinFileName" - // Execution command: $exec lifecycleManagerHost lifecycleManagerPort appUniqueId shuffleId attemptId numPartitions cppResultFile + val cppCodec = codec.name() + // Execution command: $exec lifecycleManagerHost lifecycleManagerPort appUniqueId shuffleId attemptId numPartitions cppResultFile cppCodec val command = { - s"$cppBinFilePath $lifecycleManagerHost $lifecycleManagerPort $appUniqueId $shuffleId $attemptId $numPartitions $cppResultFile" + s"$cppBinFilePath $lifecycleManagerHost $lifecycleManagerPort $appUniqueId $shuffleId $attemptId $numPartitions $cppResultFile $cppCodec" } println(s"run command: $command") val commandOutput = runCommand(command) diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaWriteCppReadTestWithLZ4.scala similarity index 88% copy from worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala copy to worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaWriteCppReadTestWithLZ4.scala index 4aa5b0ed7..bc1961384 100644 --- a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala +++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaWriteCppReadTestWithLZ4.scala @@ -19,9 +19,9 @@ package org.apache.celeborn.service.deploy.cluster import org.apache.celeborn.common.protocol.CompressionCodec -object JavaReadCppWriteTestWithNONE extends JavaReadCppWriteTestBase { +object JavaWriteCppReadTestWithLZ4 extends JavaWriteCppReadTestBase { def main(args: Array[String]) = { - testJavaReadCppWrite(CompressionCodec.NONE) + testJavaWriteCppRead(CompressionCodec.LZ4) } } diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaWriteCppReadTestWithNONE.scala similarity index 89% rename from worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala rename to worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaWriteCppReadTestWithNONE.scala index 4aa5b0ed7..a649f8350 100644 --- a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaReadCppWriteTestWithNONE.scala +++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaWriteCppReadTestWithNONE.scala @@ -19,9 +19,9 @@ package org.apache.celeborn.service.deploy.cluster import org.apache.celeborn.common.protocol.CompressionCodec -object JavaReadCppWriteTestWithNONE extends JavaReadCppWriteTestBase { +object JavaWriteCppReadTestWithNONE extends JavaWriteCppReadTestBase { def main(args: Array[String]) = { - testJavaReadCppWrite(CompressionCodec.NONE) + testJavaWriteCppRead(CompressionCodec.NONE) } }
