This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new f3c6f306c [CELEBORN-2070][CIP-14] Support MapperEnd/Response in 
CppClient
f3c6f306c is described below

commit f3c6f306c18e9fb0e44606017fbe4ed48a997b75
Author: HolyLow <[email protected]>
AuthorDate: Wed Jul 30 14:40:55 2025 +0800

    [CELEBORN-2070][CIP-14] Support MapperEnd/Response in CppClient
    
    ### What changes were proposed in this pull request?
    This PR adds support for MapperEnd/Response in CppClient.
    
    ### Why are the changes needed?
    To support writing to Celeborn with CppClient.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    By compilation and UTs.
    
    Closes #3372 from 
HolyLow/issue/celeborn-2070-support-registershuffle-mapperend.
    
    Authored-by: HolyLow <[email protected]>
    Signed-off-by: SteNicholas <[email protected]>
---
 cpp/celeborn/protocol/ControlMessages.cpp          | 25 ++++++++++++++++
 cpp/celeborn/protocol/ControlMessages.h            | 17 +++++++++++
 .../protocol/tests/ControlMessagesTest.cpp         | 33 ++++++++++++++++++++++
 3 files changed, 75 insertions(+)

diff --git a/cpp/celeborn/protocol/ControlMessages.cpp 
b/cpp/celeborn/protocol/ControlMessages.cpp
index 1a8fa6ce3..36a7023eb 100644
--- a/cpp/celeborn/protocol/ControlMessages.cpp
+++ b/cpp/celeborn/protocol/ControlMessages.cpp
@@ -20,6 +20,31 @@
 
 namespace celeborn {
 namespace protocol {
+TransportMessage MapperEnd::toTransportMessage() const {
+  MessageType type = MAPPER_END;
+  PbMapperEnd pb;
+  pb.set_shuffleid(shuffleId);
+  pb.set_mapid(mapId);
+  pb.set_attemptid(attemptId);
+  pb.set_nummappers(numMappers);
+  pb.set_partitionid(partitionId);
+  std::string payload = pb.SerializeAsString();
+  return TransportMessage(type, std::move(payload));
+}
+
+std::unique_ptr<MapperEndResponse> MapperEndResponse::fromTransportMessage(
+    const TransportMessage& transportMessage) {
+  CELEBORN_CHECK(
+      transportMessage.type() == MAPPER_END_RESPONSE,
+      "transportMessageType mismatch");
+  auto payload = transportMessage.payload();
+  auto pbMapperEndResponse = utils::parseProto<PbMapperEndResponse>(
+      reinterpret_cast<const uint8_t*>(payload.c_str()), payload.size());
+  auto response = std::make_unique<MapperEndResponse>();
+  response->status = toStatusCode(pbMapperEndResponse->status());
+  return std::move(response);
+}
+
 TransportMessage GetReducerFileGroup::toTransportMessage() const {
   MessageType type = GET_REDUCER_FILE_GROUP;
   PbGetReducerFileGroup pb;
diff --git a/cpp/celeborn/protocol/ControlMessages.h 
b/cpp/celeborn/protocol/ControlMessages.h
index 7d74a4900..a7bddc41c 100644
--- a/cpp/celeborn/protocol/ControlMessages.h
+++ b/cpp/celeborn/protocol/ControlMessages.h
@@ -26,6 +26,23 @@
 
 namespace celeborn {
 namespace protocol {
+struct MapperEnd {
+  long shuffleId;
+  int mapId;
+  int attemptId;
+  int numMappers;
+  int partitionId;
+
+  TransportMessage toTransportMessage() const;
+};
+
+struct MapperEndResponse {
+  StatusCode status;
+
+  static std::unique_ptr<MapperEndResponse> fromTransportMessage(
+      const TransportMessage& transportMessage);
+};
+
 struct GetReducerFileGroup {
   int shuffleId;
 
diff --git a/cpp/celeborn/protocol/tests/ControlMessagesTest.cpp 
b/cpp/celeborn/protocol/tests/ControlMessagesTest.cpp
index 36003491a..d1f7dce41 100644
--- a/cpp/celeborn/protocol/tests/ControlMessagesTest.cpp
+++ b/cpp/celeborn/protocol/tests/ControlMessagesTest.cpp
@@ -62,6 +62,39 @@ void verifyUnpackedPartitionLocation(
 }
 } // namespace
 
+TEST(ControlMessagesTest, mapperEnd) {
+  auto mapperEnd = std::make_unique<MapperEnd>();
+  mapperEnd->shuffleId = 1000;
+  mapperEnd->mapId = 1001;
+  mapperEnd->attemptId = 1002;
+  mapperEnd->numMappers = 1003;
+  mapperEnd->partitionId = 1004;
+
+  auto transportMessage = mapperEnd->toTransportMessage();
+  EXPECT_EQ(transportMessage.type(), MAPPER_END);
+  auto payload = transportMessage.payload();
+  auto pbMapperEnd = utils::parseProto<PbMapperEnd>(
+      reinterpret_cast<const uint8_t*>(payload.c_str()), payload.size());
+  EXPECT_EQ(pbMapperEnd->shuffleid(), mapperEnd->shuffleId);
+  EXPECT_EQ(pbMapperEnd->mapid(), mapperEnd->mapId);
+  EXPECT_EQ(pbMapperEnd->attemptid(), mapperEnd->attemptId);
+  EXPECT_EQ(pbMapperEnd->nummappers(), mapperEnd->numMappers);
+  EXPECT_EQ(pbMapperEnd->partitionid(), mapperEnd->partitionId);
+}
+
+TEST(ControlMessagesTest, mapperEndResponse) {
+  PbMapperEndResponse pbMapperEndResponse;
+  pbMapperEndResponse.set_status(1);
+  TransportMessage transportMessage(
+      MAPPER_END_RESPONSE, pbMapperEndResponse.SerializeAsString());
+
+  auto mapperEndResponse =
+      MapperEndResponse::fromTransportMessage(transportMessage);
+  EXPECT_EQ(mapperEndResponse->status, 1);
+}
+
+// TEST MapperEnd/Response
+
 TEST(ControlMessagesTest, getReducerFileGroup) {
   auto getReducerFileGroup = std::make_unique<GetReducerFileGroup>();
   getReducerFileGroup->shuffleId = 1000;

Reply via email to