This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new d6df794ae [CELEBORN-2115][CIP-14] Support PushData in cppClient
d6df794ae is described below

commit d6df794ae70d188f73838db2ceeeb6343591b55b
Author: HolyLow <[email protected]>
AuthorDate: Mon Aug 25 15:03:33 2025 +0800

    [CELEBORN-2115][CIP-14] Support PushData in cppClient
    
    ### What changes were proposed in this pull request?
    Support PushData network message in cppClient.
    
    ### Why are the changes needed?
    PushData is the network message of writing to cppClient.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Compilation and UTs.
    
    Closes #3434 from 
HolyLow/issue/celeborn-2115-support-push-date-in-cpp-client.
    
    Authored-by: HolyLow <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 cpp/celeborn/network/Message.cpp             | 32 +++++++++++++++++++
 cpp/celeborn/network/Message.h               | 27 ++++++++++++++++
 cpp/celeborn/network/tests/MessageTest.cpp   | 31 ++++++++++++++++++
 cpp/celeborn/protocol/CMakeLists.txt         |  1 +
 cpp/celeborn/protocol/Encoders.cpp           | 37 +++++++++++++++++++++
 cpp/celeborn/protocol/Encoders.h             | 32 +++++++++++++++++++
 cpp/celeborn/protocol/tests/CMakeLists.txt   |  3 +-
 cpp/celeborn/protocol/tests/EncodersTest.cpp | 48 ++++++++++++++++++++++++++++
 8 files changed, 210 insertions(+), 1 deletion(-)

diff --git a/cpp/celeborn/network/Message.cpp b/cpp/celeborn/network/Message.cpp
index f4ad4baca..f1d69d04d 100644
--- a/cpp/celeborn/network/Message.cpp
+++ b/cpp/celeborn/network/Message.cpp
@@ -16,6 +16,7 @@
  */
 
 #include "celeborn/network/Message.h"
+#include "celeborn/protocol/Encoders.h"
 
 namespace celeborn {
 namespace network {
@@ -149,5 +150,36 @@ std::unique_ptr<ChunkFetchFailure> 
ChunkFetchFailure::decodeFrom(
   return std::make_unique<ChunkFetchFailure>(
       streamChunkSlice, std::move(errorString));
 }
+
+PushData::PushData(
+    long requestId,
+    uint8_t mode,
+    const std::string& shuffleKey,
+    const std::string& partitionUniqueId,
+    std::unique_ptr<memory::ReadOnlyByteBuffer> body)
+    : Message(PUSH_DATA, std::move(body)),
+      requestId_(requestId),
+      mode_(mode),
+      shuffleKey_(shuffleKey),
+      partitionUniqueId_(partitionUniqueId) {}
+
+PushData::PushData(const PushData& other)
+    : Message(PUSH_DATA, other.body()),
+      requestId_(other.requestId_),
+      mode_(other.mode_),
+      shuffleKey_(other.shuffleKey_),
+      partitionUniqueId_(other.partitionUniqueId_) {}
+
+int PushData::internalEncodedLength() const {
+  return sizeof(long) + sizeof(uint8_t) + protocol::encodedLength(shuffleKey_) 
+
+      protocol::encodedLength(partitionUniqueId_);
+}
+
+void PushData::internalEncodeTo(memory::WriteOnlyByteBuffer& buffer) const {
+  buffer.write<long>(requestId_);
+  buffer.write<uint8_t>(mode_);
+  protocol::encode(buffer, shuffleKey_);
+  protocol::encode(buffer, partitionUniqueId_);
+}
 } // namespace network
 } // namespace celeborn
diff --git a/cpp/celeborn/network/Message.h b/cpp/celeborn/network/Message.h
index 8375e3dbe..8aeca60c9 100644
--- a/cpp/celeborn/network/Message.h
+++ b/cpp/celeborn/network/Message.h
@@ -236,5 +236,32 @@ class ChunkFetchFailure : public Message {
   protocol::StreamChunkSlice streamChunkSlice_;
   std::string errorString_;
 };
+
+class PushData : public Message {
+ public:
+  PushData(
+      long requestId,
+      uint8_t mode,
+      const std::string& shuffleKey,
+      const std::string& partitionUniqueId,
+      std::unique_ptr<memory::ReadOnlyByteBuffer> body);
+
+  PushData(const PushData& other);
+
+  long requestId() const {
+    return requestId_;
+  }
+
+ private:
+  int internalEncodedLength() const override;
+
+  void internalEncodeTo(memory::WriteOnlyByteBuffer& buffer) const override;
+
+  long requestId_;
+  // 0 for primary, 1 for replica. Ref to PartitionLocation::Mode.
+  uint8_t mode_;
+  std::string shuffleKey_;
+  std::string partitionUniqueId_;
+};
 } // namespace network
 } // namespace celeborn
diff --git a/cpp/celeborn/network/tests/MessageTest.cpp 
b/cpp/celeborn/network/tests/MessageTest.cpp
index 0604f08a6..d421b5126 100644
--- a/cpp/celeborn/network/tests/MessageTest.cpp
+++ b/cpp/celeborn/network/tests/MessageTest.cpp
@@ -153,3 +153,34 @@ TEST(MessageTest, decodeChunkFetchFailure) {
   EXPECT_EQ(streamChunkSlice.len, len);
   EXPECT_EQ(chunkFetchFailure->errorMsg(), failureMsg);
 }
+
+TEST(MessageTest, encodePushData) {
+  const std::string body = "test-body";
+  auto bodyBuffer = memory::ByteBuffer::createWriteOnly(body.size());
+  bodyBuffer->writeFromString(body);
+  const long requestId = 1000;
+  const uint8_t mode = 2;
+  const std::string shuffleKey = "test-shuffle-key";
+  const std::string partitionUniqueId = "test-partition-id";
+  auto pushData = std::make_unique<PushData>(
+      requestId,
+      mode,
+      shuffleKey,
+      partitionUniqueId,
+      memory::ByteBuffer::toReadOnly(std::move(bodyBuffer)));
+
+  auto encodedBuffer = pushData->encode();
+  EXPECT_EQ(
+      encodedBuffer->read<int32_t>(),
+      sizeof(long) + sizeof(uint8_t) + sizeof(int) + shuffleKey.size() +
+          sizeof(int) + partitionUniqueId.size());
+  EXPECT_EQ(encodedBuffer->read<uint8_t>(), Message::Type::PUSH_DATA);
+  EXPECT_EQ(encodedBuffer->read<int32_t>(), body.size());
+  EXPECT_EQ(encodedBuffer->read<long>(), requestId);
+  EXPECT_EQ(encodedBuffer->read<uint8_t>(), mode);
+  EXPECT_EQ(encodedBuffer->read<int32_t>(), shuffleKey.size());
+  EXPECT_EQ(encodedBuffer->readToString(shuffleKey.size()), shuffleKey);
+  EXPECT_EQ(encodedBuffer->read<int32_t>(), partitionUniqueId.size());
+  EXPECT_EQ(
+      encodedBuffer->readToString(partitionUniqueId.size()), 
partitionUniqueId);
+}
diff --git a/cpp/celeborn/protocol/CMakeLists.txt 
b/cpp/celeborn/protocol/CMakeLists.txt
index f0b0bea78..7bce390d6 100644
--- a/cpp/celeborn/protocol/CMakeLists.txt
+++ b/cpp/celeborn/protocol/CMakeLists.txt
@@ -18,6 +18,7 @@ add_library(
         PartitionLocation.cpp
         TransportMessage.cpp
         ControlMessages.cpp
+        Encoders.cpp
         CompressionCodec.cpp)
 
 target_include_directories(protocol PUBLIC ${CMAKE_BINARY_DIR})
diff --git a/cpp/celeborn/protocol/Encoders.cpp 
b/cpp/celeborn/protocol/Encoders.cpp
new file mode 100644
index 000000000..7572ca021
--- /dev/null
+++ b/cpp/celeborn/protocol/Encoders.cpp
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "celeborn/protocol/Encoders.h"
+
+namespace celeborn {
+namespace protocol {
+
+int encodedLength(const std::string& msg) {
+  return sizeof(int) + msg.size();
+}
+
+void encode(memory::WriteOnlyByteBuffer& buffer, const std::string& msg) {
+  buffer.write<int>(msg.size());
+  buffer.writeFromString(msg);
+}
+
+std::string decode(memory::ReadOnlyByteBuffer& buffer) {
+  int size = buffer.read<int>();
+  return buffer.readToString(size);
+}
+} // namespace protocol
+} // namespace celeborn
diff --git a/cpp/celeborn/protocol/Encoders.h b/cpp/celeborn/protocol/Encoders.h
new file mode 100644
index 000000000..5e9f7d1fc
--- /dev/null
+++ b/cpp/celeborn/protocol/Encoders.h
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "celeborn/memory/ByteBuffer.h"
+
+namespace celeborn {
+namespace protocol {
+
+int encodedLength(const std::string& msg);
+
+void encode(memory::WriteOnlyByteBuffer& buffer, const std::string& msg);
+
+std::string decode(memory::ReadOnlyByteBuffer& buffer);
+
+} // namespace protocol
+} // namespace celeborn
diff --git a/cpp/celeborn/protocol/tests/CMakeLists.txt 
b/cpp/celeborn/protocol/tests/CMakeLists.txt
index cb2a2378f..bfbec98bc 100644
--- a/cpp/celeborn/protocol/tests/CMakeLists.txt
+++ b/cpp/celeborn/protocol/tests/CMakeLists.txt
@@ -17,7 +17,8 @@ add_executable(
         celeborn_protocol_test
         PartitionLocationTest.cpp
         TransportMessageTest.cpp
-        ControlMessagesTest.cpp)
+        ControlMessagesTest.cpp
+        EncodersTest.cpp)
 
 add_test(NAME celeborn_protocol_test COMMAND celeborn_protocol_test)
 
diff --git a/cpp/celeborn/protocol/tests/EncodersTest.cpp 
b/cpp/celeborn/protocol/tests/EncodersTest.cpp
new file mode 100644
index 000000000..59eb6d702
--- /dev/null
+++ b/cpp/celeborn/protocol/tests/EncodersTest.cpp
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gtest/gtest.h>
+
+#include "celeborn/protocol/Encoders.h"
+
+using namespace celeborn;
+using namespace celeborn::protocol;
+
+TEST(EncodersTest, encodedLength) {
+  std::string testString = "test-string";
+  EXPECT_EQ(encodedLength(testString), sizeof(int) + testString.size());
+}
+
+TEST(EncodersTest, encode) {
+  std::string testString = "test-string";
+  auto writeBuffer =
+      memory::ByteBuffer::createWriteOnly(sizeof(int) + testString.size());
+  encode(*writeBuffer, testString);
+  auto readBuffer = memory::ByteBuffer::toReadOnly(std::move(writeBuffer));
+  EXPECT_EQ(readBuffer->read<int>(), testString.size());
+  EXPECT_EQ(readBuffer->readToString(testString.size()), testString);
+}
+
+TEST(EncodersTest, decode) {
+  std::string testString = "test-string";
+  auto writeBuffer =
+      memory::ByteBuffer::createWriteOnly(sizeof(int) + testString.size());
+  writeBuffer->write<int>(testString.size());
+  writeBuffer->writeFromString(testString);
+  auto readBuffer = memory::ByteBuffer::toReadOnly(std::move(writeBuffer));
+  EXPECT_EQ(decode(*readBuffer), testString);
+}

Reply via email to