This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new d6df794ae [CELEBORN-2115][CIP-14] Support PushData in cppClient
d6df794ae is described below
commit d6df794ae70d188f73838db2ceeeb6343591b55b
Author: HolyLow <[email protected]>
AuthorDate: Mon Aug 25 15:03:33 2025 +0800
[CELEBORN-2115][CIP-14] Support PushData in cppClient
### What changes were proposed in this pull request?
Support PushData network message in cppClient.
### Why are the changes needed?
PushData is the network message of writing to cppClient.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Compilation and UTs.
Closes #3434 from
HolyLow/issue/celeborn-2115-support-push-date-in-cpp-client.
Authored-by: HolyLow <[email protected]>
Signed-off-by: mingji <[email protected]>
---
cpp/celeborn/network/Message.cpp | 32 +++++++++++++++++++
cpp/celeborn/network/Message.h | 27 ++++++++++++++++
cpp/celeborn/network/tests/MessageTest.cpp | 31 ++++++++++++++++++
cpp/celeborn/protocol/CMakeLists.txt | 1 +
cpp/celeborn/protocol/Encoders.cpp | 37 +++++++++++++++++++++
cpp/celeborn/protocol/Encoders.h | 32 +++++++++++++++++++
cpp/celeborn/protocol/tests/CMakeLists.txt | 3 +-
cpp/celeborn/protocol/tests/EncodersTest.cpp | 48 ++++++++++++++++++++++++++++
8 files changed, 210 insertions(+), 1 deletion(-)
diff --git a/cpp/celeborn/network/Message.cpp b/cpp/celeborn/network/Message.cpp
index f4ad4baca..f1d69d04d 100644
--- a/cpp/celeborn/network/Message.cpp
+++ b/cpp/celeborn/network/Message.cpp
@@ -16,6 +16,7 @@
*/
#include "celeborn/network/Message.h"
+#include "celeborn/protocol/Encoders.h"
namespace celeborn {
namespace network {
@@ -149,5 +150,36 @@ std::unique_ptr<ChunkFetchFailure>
ChunkFetchFailure::decodeFrom(
return std::make_unique<ChunkFetchFailure>(
streamChunkSlice, std::move(errorString));
}
+
+PushData::PushData(
+ long requestId,
+ uint8_t mode,
+ const std::string& shuffleKey,
+ const std::string& partitionUniqueId,
+ std::unique_ptr<memory::ReadOnlyByteBuffer> body)
+ : Message(PUSH_DATA, std::move(body)),
+ requestId_(requestId),
+ mode_(mode),
+ shuffleKey_(shuffleKey),
+ partitionUniqueId_(partitionUniqueId) {}
+
+PushData::PushData(const PushData& other)
+ : Message(PUSH_DATA, other.body()),
+ requestId_(other.requestId_),
+ mode_(other.mode_),
+ shuffleKey_(other.shuffleKey_),
+ partitionUniqueId_(other.partitionUniqueId_) {}
+
+int PushData::internalEncodedLength() const {
+ return sizeof(long) + sizeof(uint8_t) + protocol::encodedLength(shuffleKey_)
+
+ protocol::encodedLength(partitionUniqueId_);
+}
+
+void PushData::internalEncodeTo(memory::WriteOnlyByteBuffer& buffer) const {
+ buffer.write<long>(requestId_);
+ buffer.write<uint8_t>(mode_);
+ protocol::encode(buffer, shuffleKey_);
+ protocol::encode(buffer, partitionUniqueId_);
+}
} // namespace network
} // namespace celeborn
diff --git a/cpp/celeborn/network/Message.h b/cpp/celeborn/network/Message.h
index 8375e3dbe..8aeca60c9 100644
--- a/cpp/celeborn/network/Message.h
+++ b/cpp/celeborn/network/Message.h
@@ -236,5 +236,32 @@ class ChunkFetchFailure : public Message {
protocol::StreamChunkSlice streamChunkSlice_;
std::string errorString_;
};
+
+class PushData : public Message {
+ public:
+ PushData(
+ long requestId,
+ uint8_t mode,
+ const std::string& shuffleKey,
+ const std::string& partitionUniqueId,
+ std::unique_ptr<memory::ReadOnlyByteBuffer> body);
+
+ PushData(const PushData& other);
+
+ long requestId() const {
+ return requestId_;
+ }
+
+ private:
+ int internalEncodedLength() const override;
+
+ void internalEncodeTo(memory::WriteOnlyByteBuffer& buffer) const override;
+
+ long requestId_;
+ // 0 for primary, 1 for replica. Ref to PartitionLocation::Mode.
+ uint8_t mode_;
+ std::string shuffleKey_;
+ std::string partitionUniqueId_;
+};
} // namespace network
} // namespace celeborn
diff --git a/cpp/celeborn/network/tests/MessageTest.cpp
b/cpp/celeborn/network/tests/MessageTest.cpp
index 0604f08a6..d421b5126 100644
--- a/cpp/celeborn/network/tests/MessageTest.cpp
+++ b/cpp/celeborn/network/tests/MessageTest.cpp
@@ -153,3 +153,34 @@ TEST(MessageTest, decodeChunkFetchFailure) {
EXPECT_EQ(streamChunkSlice.len, len);
EXPECT_EQ(chunkFetchFailure->errorMsg(), failureMsg);
}
+
+TEST(MessageTest, encodePushData) {
+ const std::string body = "test-body";
+ auto bodyBuffer = memory::ByteBuffer::createWriteOnly(body.size());
+ bodyBuffer->writeFromString(body);
+ const long requestId = 1000;
+ const uint8_t mode = 2;
+ const std::string shuffleKey = "test-shuffle-key";
+ const std::string partitionUniqueId = "test-partition-id";
+ auto pushData = std::make_unique<PushData>(
+ requestId,
+ mode,
+ shuffleKey,
+ partitionUniqueId,
+ memory::ByteBuffer::toReadOnly(std::move(bodyBuffer)));
+
+ auto encodedBuffer = pushData->encode();
+ EXPECT_EQ(
+ encodedBuffer->read<int32_t>(),
+ sizeof(long) + sizeof(uint8_t) + sizeof(int) + shuffleKey.size() +
+ sizeof(int) + partitionUniqueId.size());
+ EXPECT_EQ(encodedBuffer->read<uint8_t>(), Message::Type::PUSH_DATA);
+ EXPECT_EQ(encodedBuffer->read<int32_t>(), body.size());
+ EXPECT_EQ(encodedBuffer->read<long>(), requestId);
+ EXPECT_EQ(encodedBuffer->read<uint8_t>(), mode);
+ EXPECT_EQ(encodedBuffer->read<int32_t>(), shuffleKey.size());
+ EXPECT_EQ(encodedBuffer->readToString(shuffleKey.size()), shuffleKey);
+ EXPECT_EQ(encodedBuffer->read<int32_t>(), partitionUniqueId.size());
+ EXPECT_EQ(
+ encodedBuffer->readToString(partitionUniqueId.size()),
partitionUniqueId);
+}
diff --git a/cpp/celeborn/protocol/CMakeLists.txt
b/cpp/celeborn/protocol/CMakeLists.txt
index f0b0bea78..7bce390d6 100644
--- a/cpp/celeborn/protocol/CMakeLists.txt
+++ b/cpp/celeborn/protocol/CMakeLists.txt
@@ -18,6 +18,7 @@ add_library(
PartitionLocation.cpp
TransportMessage.cpp
ControlMessages.cpp
+ Encoders.cpp
CompressionCodec.cpp)
target_include_directories(protocol PUBLIC ${CMAKE_BINARY_DIR})
diff --git a/cpp/celeborn/protocol/Encoders.cpp
b/cpp/celeborn/protocol/Encoders.cpp
new file mode 100644
index 000000000..7572ca021
--- /dev/null
+++ b/cpp/celeborn/protocol/Encoders.cpp
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "celeborn/protocol/Encoders.h"
+
+namespace celeborn {
+namespace protocol {
+
+int encodedLength(const std::string& msg) {
+ return sizeof(int) + msg.size();
+}
+
+void encode(memory::WriteOnlyByteBuffer& buffer, const std::string& msg) {
+ buffer.write<int>(msg.size());
+ buffer.writeFromString(msg);
+}
+
+std::string decode(memory::ReadOnlyByteBuffer& buffer) {
+ int size = buffer.read<int>();
+ return buffer.readToString(size);
+}
+} // namespace protocol
+} // namespace celeborn
diff --git a/cpp/celeborn/protocol/Encoders.h b/cpp/celeborn/protocol/Encoders.h
new file mode 100644
index 000000000..5e9f7d1fc
--- /dev/null
+++ b/cpp/celeborn/protocol/Encoders.h
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "celeborn/memory/ByteBuffer.h"
+
+namespace celeborn {
+namespace protocol {
+
+int encodedLength(const std::string& msg);
+
+void encode(memory::WriteOnlyByteBuffer& buffer, const std::string& msg);
+
+std::string decode(memory::ReadOnlyByteBuffer& buffer);
+
+} // namespace protocol
+} // namespace celeborn
diff --git a/cpp/celeborn/protocol/tests/CMakeLists.txt
b/cpp/celeborn/protocol/tests/CMakeLists.txt
index cb2a2378f..bfbec98bc 100644
--- a/cpp/celeborn/protocol/tests/CMakeLists.txt
+++ b/cpp/celeborn/protocol/tests/CMakeLists.txt
@@ -17,7 +17,8 @@ add_executable(
celeborn_protocol_test
PartitionLocationTest.cpp
TransportMessageTest.cpp
- ControlMessagesTest.cpp)
+ ControlMessagesTest.cpp
+ EncodersTest.cpp)
add_test(NAME celeborn_protocol_test COMMAND celeborn_protocol_test)
diff --git a/cpp/celeborn/protocol/tests/EncodersTest.cpp
b/cpp/celeborn/protocol/tests/EncodersTest.cpp
new file mode 100644
index 000000000..59eb6d702
--- /dev/null
+++ b/cpp/celeborn/protocol/tests/EncodersTest.cpp
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gtest/gtest.h>
+
+#include "celeborn/protocol/Encoders.h"
+
+using namespace celeborn;
+using namespace celeborn::protocol;
+
+TEST(EncodersTest, encodedLength) {
+ std::string testString = "test-string";
+ EXPECT_EQ(encodedLength(testString), sizeof(int) + testString.size());
+}
+
+TEST(EncodersTest, encode) {
+ std::string testString = "test-string";
+ auto writeBuffer =
+ memory::ByteBuffer::createWriteOnly(sizeof(int) + testString.size());
+ encode(*writeBuffer, testString);
+ auto readBuffer = memory::ByteBuffer::toReadOnly(std::move(writeBuffer));
+ EXPECT_EQ(readBuffer->read<int>(), testString.size());
+ EXPECT_EQ(readBuffer->readToString(testString.size()), testString);
+}
+
+TEST(EncodersTest, decode) {
+ std::string testString = "test-string";
+ auto writeBuffer =
+ memory::ByteBuffer::createWriteOnly(sizeof(int) + testString.size());
+ writeBuffer->write<int>(testString.size());
+ writeBuffer->writeFromString(testString);
+ auto readBuffer = memory::ByteBuffer::toReadOnly(std::move(writeBuffer));
+ EXPECT_EQ(decode(*readBuffer), testString);
+}