This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new f3c6f306c [CELEBORN-2070][CIP-14] Support MapperEnd/Response in
CppClient
f3c6f306c is described below
commit f3c6f306c18e9fb0e44606017fbe4ed48a997b75
Author: HolyLow <[email protected]>
AuthorDate: Wed Jul 30 14:40:55 2025 +0800
[CELEBORN-2070][CIP-14] Support MapperEnd/Response in CppClient
### What changes were proposed in this pull request?
This PR adds support for MapperEnd/Response in CppClient.
### Why are the changes needed?
To support writing to Celeborn with CppClient.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
By compilation and UTs.
Closes #3372 from
HolyLow/issue/celeborn-2070-support-registershuffle-mapperend.
Authored-by: HolyLow <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
---
cpp/celeborn/protocol/ControlMessages.cpp | 25 ++++++++++++++++
cpp/celeborn/protocol/ControlMessages.h | 17 +++++++++++
.../protocol/tests/ControlMessagesTest.cpp | 33 ++++++++++++++++++++++
3 files changed, 75 insertions(+)
diff --git a/cpp/celeborn/protocol/ControlMessages.cpp
b/cpp/celeborn/protocol/ControlMessages.cpp
index 1a8fa6ce3..36a7023eb 100644
--- a/cpp/celeborn/protocol/ControlMessages.cpp
+++ b/cpp/celeborn/protocol/ControlMessages.cpp
@@ -20,6 +20,31 @@
namespace celeborn {
namespace protocol {
+TransportMessage MapperEnd::toTransportMessage() const {
+ MessageType type = MAPPER_END;
+ PbMapperEnd pb;
+ pb.set_shuffleid(shuffleId);
+ pb.set_mapid(mapId);
+ pb.set_attemptid(attemptId);
+ pb.set_nummappers(numMappers);
+ pb.set_partitionid(partitionId);
+ std::string payload = pb.SerializeAsString();
+ return TransportMessage(type, std::move(payload));
+}
+
+std::unique_ptr<MapperEndResponse> MapperEndResponse::fromTransportMessage(
+ const TransportMessage& transportMessage) {
+ CELEBORN_CHECK(
+ transportMessage.type() == MAPPER_END_RESPONSE,
+ "transportMessageType mismatch");
+ auto payload = transportMessage.payload();
+ auto pbMapperEndResponse = utils::parseProto<PbMapperEndResponse>(
+ reinterpret_cast<const uint8_t*>(payload.c_str()), payload.size());
+ auto response = std::make_unique<MapperEndResponse>();
+ response->status = toStatusCode(pbMapperEndResponse->status());
+ return std::move(response);
+}
+
TransportMessage GetReducerFileGroup::toTransportMessage() const {
MessageType type = GET_REDUCER_FILE_GROUP;
PbGetReducerFileGroup pb;
diff --git a/cpp/celeborn/protocol/ControlMessages.h
b/cpp/celeborn/protocol/ControlMessages.h
index 7d74a4900..a7bddc41c 100644
--- a/cpp/celeborn/protocol/ControlMessages.h
+++ b/cpp/celeborn/protocol/ControlMessages.h
@@ -26,6 +26,23 @@
namespace celeborn {
namespace protocol {
+struct MapperEnd {
+ long shuffleId;
+ int mapId;
+ int attemptId;
+ int numMappers;
+ int partitionId;
+
+ TransportMessage toTransportMessage() const;
+};
+
+struct MapperEndResponse {
+ StatusCode status;
+
+ static std::unique_ptr<MapperEndResponse> fromTransportMessage(
+ const TransportMessage& transportMessage);
+};
+
struct GetReducerFileGroup {
int shuffleId;
diff --git a/cpp/celeborn/protocol/tests/ControlMessagesTest.cpp
b/cpp/celeborn/protocol/tests/ControlMessagesTest.cpp
index 36003491a..d1f7dce41 100644
--- a/cpp/celeborn/protocol/tests/ControlMessagesTest.cpp
+++ b/cpp/celeborn/protocol/tests/ControlMessagesTest.cpp
@@ -62,6 +62,39 @@ void verifyUnpackedPartitionLocation(
}
} // namespace
+TEST(ControlMessagesTest, mapperEnd) {
+ auto mapperEnd = std::make_unique<MapperEnd>();
+ mapperEnd->shuffleId = 1000;
+ mapperEnd->mapId = 1001;
+ mapperEnd->attemptId = 1002;
+ mapperEnd->numMappers = 1003;
+ mapperEnd->partitionId = 1004;
+
+ auto transportMessage = mapperEnd->toTransportMessage();
+ EXPECT_EQ(transportMessage.type(), MAPPER_END);
+ auto payload = transportMessage.payload();
+ auto pbMapperEnd = utils::parseProto<PbMapperEnd>(
+ reinterpret_cast<const uint8_t*>(payload.c_str()), payload.size());
+ EXPECT_EQ(pbMapperEnd->shuffleid(), mapperEnd->shuffleId);
+ EXPECT_EQ(pbMapperEnd->mapid(), mapperEnd->mapId);
+ EXPECT_EQ(pbMapperEnd->attemptid(), mapperEnd->attemptId);
+ EXPECT_EQ(pbMapperEnd->nummappers(), mapperEnd->numMappers);
+ EXPECT_EQ(pbMapperEnd->partitionid(), mapperEnd->partitionId);
+}
+
+TEST(ControlMessagesTest, mapperEndResponse) {
+ PbMapperEndResponse pbMapperEndResponse;
+ pbMapperEndResponse.set_status(1);
+ TransportMessage transportMessage(
+ MAPPER_END_RESPONSE, pbMapperEndResponse.SerializeAsString());
+
+ auto mapperEndResponse =
+ MapperEndResponse::fromTransportMessage(transportMessage);
+ EXPECT_EQ(mapperEndResponse->status, 1);
+}
+
+// TEST MapperEnd/Response
+
TEST(ControlMessagesTest, getReducerFileGroup) {
auto getReducerFileGroup = std::make_unique<GetReducerFileGroup>();
getReducerFileGroup->shuffleId = 1000;