This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 7e13c9934 [CELEBORN-2098][CIP-14] Support Revive/Response in cppClient
7e13c9934 is described below
commit 7e13c9934fdafb26c916fd9f5ee6ea9f47de9e94
Author: HolyLow <[email protected]>
AuthorDate: Sat Aug 16 12:59:32 2025 +0800
[CELEBORN-2098][CIP-14] Support Revive/Response in cppClient
### What changes were proposed in this pull request?
This PR supports Revive/ChangeLocationResponse messages in cppClient.
### Why are the changes needed?
These messages are used when writing triggers revive operation.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Compilation and UTs.
Closes #3413 from
HolyLow/issue/celeborn-2098-support-revive-changelocationresponse.
Authored-by: HolyLow <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
---
cpp/celeborn/protocol/ControlMessages.cpp | 65 +++++++++++++++++
cpp/celeborn/protocol/ControlMessages.h | 43 +++++++++++
cpp/celeborn/protocol/PartitionLocation.cpp | 34 +++++++++
cpp/celeborn/protocol/PartitionLocation.h | 6 ++
.../protocol/tests/ControlMessagesTest.cpp | 47 ++++++++++++
.../protocol/tests/PartitionLocationTest.cpp | 84 ++++++++++++++++++++++
6 files changed, 279 insertions(+)
diff --git a/cpp/celeborn/protocol/ControlMessages.cpp
b/cpp/celeborn/protocol/ControlMessages.cpp
index 6354b46ea..29742ee73 100644
--- a/cpp/celeborn/protocol/ControlMessages.cpp
+++ b/cpp/celeborn/protocol/ControlMessages.cpp
@@ -123,6 +123,71 @@ std::unique_ptr<MapperEndResponse>
MapperEndResponse::fromTransportMessage(
return std::move(response);
}
+ReviveRequest::ReviveRequest(
+ int _shuffleId,
+ int _mapId,
+ int _attemptId,
+ int _partitionId,
+ int _epoch,
+ std::shared_ptr<const PartitionLocation> _loc,
+ StatusCode _cause)
+ : shuffleId(_shuffleId),
+ mapId(_mapId),
+ attemptId(_attemptId),
+ partitionId(_partitionId),
+ epoch(_epoch),
+ loc(std::move(_loc)),
+ cause(_cause) {}
+
+TransportMessage Revive::toTransportMessage() const {
+ MessageType type = CHANGE_LOCATION;
+ PbRevive pb;
+ pb.set_shuffleid(shuffleId);
+ for (auto mapId : mapIds) {
+ pb.add_mapid(mapId);
+ }
+ for (auto& reviveRequest : reviveRequests) {
+ auto pbRevivePartitionInfo = pb.add_partitioninfo();
+ pbRevivePartitionInfo->set_partitionid(reviveRequest->partitionId);
+ pbRevivePartitionInfo->set_epoch(reviveRequest->epoch);
+ pbRevivePartitionInfo->set_status(reviveRequest->cause);
+ if (reviveRequest->loc) {
+ pbRevivePartitionInfo->set_allocated_partition(
+ reviveRequest->loc->toPb().release());
+ }
+ }
+ std::string payload = pb.SerializeAsString();
+ return TransportMessage(type, std::move(payload));
+}
+
+std::unique_ptr<ChangeLocationResponse>
+ChangeLocationResponse::fromTransportMessage(
+ const TransportMessage& transportMessage) {
+ CELEBORN_CHECK(
+ transportMessage.type() == CHANGE_LOCATION_RESPONSE,
+ "transportMessageType mismatch");
+ auto payload = transportMessage.payload();
+ auto pbChangeLocationResponse = utils::parseProto<PbChangeLocationResponse>(
+ reinterpret_cast<const uint8_t*>(payload.c_str()), payload.size());
+ auto response = std::make_unique<ChangeLocationResponse>();
+ response->endedMapIds.reserve(pbChangeLocationResponse->endedmapid_size());
+ for (auto endedMapId : pbChangeLocationResponse->endedmapid()) {
+ response->endedMapIds.push_back(endedMapId);
+ }
+ int numPartitionInfo = pbChangeLocationResponse->partitioninfo_size();
+ response->partitionInfos.resize(numPartitionInfo);
+ for (int i = 0; i < numPartitionInfo; i++) {
+ auto& partitionInfo = response->partitionInfos[i];
+ auto& pbPartitionInfo = pbChangeLocationResponse->partitioninfo(i);
+ partitionInfo.partitionId = pbPartitionInfo.partitionid();
+ partitionInfo.status = toStatusCode(pbPartitionInfo.status());
+ partitionInfo.partition =
+ PartitionLocation::fromPb(pbPartitionInfo.partition());
+ partitionInfo.oldAvailable = pbPartitionInfo.oldavailable();
+ }
+ return std::move(response);
+}
+
TransportMessage GetReducerFileGroup::toTransportMessage() const {
MessageType type = GET_REDUCER_FILE_GROUP;
PbGetReducerFileGroup pb;
diff --git a/cpp/celeborn/protocol/ControlMessages.h
b/cpp/celeborn/protocol/ControlMessages.h
index b3e360a0e..9df08deed 100644
--- a/cpp/celeborn/protocol/ControlMessages.h
+++ b/cpp/celeborn/protocol/ControlMessages.h
@@ -60,6 +60,49 @@ struct MapperEndResponse {
const TransportMessage& transportMessage);
};
+struct ReviveRequest {
+ int shuffleId;
+ int mapId;
+ int attemptId;
+ int partitionId;
+ int epoch;
+ std::shared_ptr<const PartitionLocation> loc;
+ StatusCode cause;
+ std::atomic<int> reviveStatus{StatusCode::REVIVE_INITIALIZED};
+
+ ReviveRequest(
+ int _shuffleId,
+ int _mapId,
+ int _attemptId,
+ int _partitionId,
+ int _epoch,
+ std::shared_ptr<const PartitionLocation> _loc,
+ StatusCode _cause);
+};
+
+struct Revive {
+ int shuffleId;
+ std::unordered_set<int> mapIds;
+ std::unordered_set<std::shared_ptr<ReviveRequest>> reviveRequests;
+
+ TransportMessage toTransportMessage() const;
+};
+
+struct ChangeLocationPartitionInfo {
+ int partitionId;
+ StatusCode status;
+ std::shared_ptr<const PartitionLocation> partition;
+ bool oldAvailable;
+};
+
+struct ChangeLocationResponse {
+ std::vector<int> endedMapIds;
+ std::vector<ChangeLocationPartitionInfo> partitionInfos;
+
+ static std::unique_ptr<ChangeLocationResponse> fromTransportMessage(
+ const TransportMessage& transportMessage);
+};
+
struct GetReducerFileGroup {
int shuffleId;
diff --git a/cpp/celeborn/protocol/PartitionLocation.cpp
b/cpp/celeborn/protocol/PartitionLocation.cpp
index 6573ec2a2..a1820c5df 100644
--- a/cpp/celeborn/protocol/PartitionLocation.cpp
+++ b/cpp/celeborn/protocol/PartitionLocation.cpp
@@ -33,6 +33,16 @@ std::unique_ptr<StorageInfo> StorageInfo::fromPb(const
PbStorageInfo& pb) {
return std::move(result);
}
+std::unique_ptr<PbStorageInfo> StorageInfo::toPb() const {
+ auto pbStorageInfo = std::make_unique<PbStorageInfo>();
+ pbStorageInfo->set_type(type);
+ pbStorageInfo->set_mountpoint(mountPoint);
+ pbStorageInfo->set_finalresult(finalResult);
+ pbStorageInfo->set_filepath(filePath);
+ pbStorageInfo->set_availablestoragetypes(availableStorageTypes);
+ return pbStorageInfo;
+}
+
std::unique_ptr<const PartitionLocation> PartitionLocation::fromPb(
const PbPartitionLocation& pb) {
auto result = fromPbWithoutPeer(pb);
@@ -98,6 +108,15 @@ PartitionLocation::PartitionLocation(const
PartitionLocation& other)
: nullptr),
storageInfo(std::make_unique<StorageInfo>(*other.storageInfo)) {}
+std::unique_ptr<PbPartitionLocation> PartitionLocation::toPb() const {
+ auto pbPartitionLocation = toPbWithoutPeer();
+ if (replicaPeer) {
+ auto pbPeerPartitionLocation = replicaPeer->toPbWithoutPeer();
+ pbPartitionLocation->set_allocated_peer(pbPeerPartitionLocation.release());
+ }
+ return pbPartitionLocation;
+}
+
std::unique_ptr<PartitionLocation> PartitionLocation::fromPbWithoutPeer(
const PbPartitionLocation& pb) {
auto result = std::make_unique<PartitionLocation>();
@@ -114,6 +133,21 @@ std::unique_ptr<PartitionLocation>
PartitionLocation::fromPbWithoutPeer(
return std::move(result);
}
+std::unique_ptr<PbPartitionLocation> PartitionLocation::toPbWithoutPeer()
+ const {
+ auto pbPartitionLocation = std::make_unique<PbPartitionLocation>();
+ pbPartitionLocation->set_id(id);
+ pbPartitionLocation->set_epoch(epoch);
+ pbPartitionLocation->set_host(host);
+ pbPartitionLocation->set_rpcport(rpcPort);
+ pbPartitionLocation->set_pushport(pushPort);
+ pbPartitionLocation->set_fetchport(fetchPort);
+ pbPartitionLocation->set_replicateport(replicatePort);
+ pbPartitionLocation->set_mode(static_cast<PbPartitionLocation_Mode>(mode));
+
pbPartitionLocation->set_allocated_storageinfo(storageInfo->toPb().release());
+ return pbPartitionLocation;
+}
+
StatusCode toStatusCode(int32_t code) {
CELEBORN_CHECK(code >= 0);
CELEBORN_CHECK(code <= StatusCode::TAIL);
diff --git a/cpp/celeborn/protocol/PartitionLocation.h
b/cpp/celeborn/protocol/PartitionLocation.h
index 700773cba..4e88d16de 100644
--- a/cpp/celeborn/protocol/PartitionLocation.h
+++ b/cpp/celeborn/protocol/PartitionLocation.h
@@ -31,6 +31,8 @@ struct StorageInfo {
StorageInfo(const StorageInfo& other) = default;
+ std::unique_ptr<PbStorageInfo> toPb() const;
+
enum Type {
MEMORY = 0,
HDD = 1,
@@ -88,6 +90,8 @@ struct PartitionLocation {
PartitionLocation(const PartitionLocation& other);
+ std::unique_ptr<PbPartitionLocation> toPb() const;
+
std::string filename() const {
return std::to_string(id) + "-" + std::to_string(epoch) + "-" +
std::to_string(mode);
@@ -96,6 +100,8 @@ struct PartitionLocation {
private:
static std::unique_ptr<PartitionLocation> fromPbWithoutPeer(
const PbPartitionLocation& pb);
+
+ std::unique_ptr<PbPartitionLocation> toPbWithoutPeer() const;
};
} // namespace protocol
} // namespace celeborn
diff --git a/cpp/celeborn/protocol/tests/ControlMessagesTest.cpp
b/cpp/celeborn/protocol/tests/ControlMessagesTest.cpp
index 0e4688072..bd27d9a6c 100644
--- a/cpp/celeborn/protocol/tests/ControlMessagesTest.cpp
+++ b/cpp/celeborn/protocol/tests/ControlMessagesTest.cpp
@@ -142,6 +142,53 @@ TEST(ControlMessagesTest, mapperEndResponse) {
EXPECT_EQ(mapperEndResponse->status, 1);
}
+TEST(ControlMessagesTest, revive) {
+ auto revive = std::make_unique<Revive>();
+ revive->shuffleId = 1000;
+ revive->mapIds.insert(1001);
+ auto reviveRequest = std::make_shared<ReviveRequest>(
+ 1000, 1001, 1002, 1003, 1004, nullptr, StatusCode::SLOT_NOT_AVAILABLE);
+ revive->reviveRequests.insert(reviveRequest);
+
+ auto transportMessage = revive->toTransportMessage();
+ EXPECT_EQ(transportMessage.type(), CHANGE_LOCATION);
+ auto payload = transportMessage.payload();
+ auto pbRevive = utils::parseProto<PbRevive>(
+ reinterpret_cast<const uint8_t*>(payload.c_str()), payload.size());
+ EXPECT_EQ(pbRevive->shuffleid(), 1000);
+ EXPECT_EQ(pbRevive->mapid_size(), 1);
+ EXPECT_EQ(pbRevive->mapid(0), 1001);
+ EXPECT_EQ(pbRevive->partitioninfo_size(), 1);
+ auto& pbPartitionInfo = pbRevive->partitioninfo(0);
+ EXPECT_EQ(pbPartitionInfo.partitionid(), reviveRequest->partitionId);
+ EXPECT_EQ(pbPartitionInfo.epoch(), reviveRequest->epoch);
+ EXPECT_EQ(pbPartitionInfo.status(), reviveRequest->cause);
+}
+
+TEST(ControlMessagesTest, changeLocationResponse) {
+ PbChangeLocationResponse pbChangeLocationResponse;
+ pbChangeLocationResponse.add_endedmapid(1001);
+ auto pbPartitionInfo = pbChangeLocationResponse.add_partitioninfo();
+ pbPartitionInfo->set_partitionid(1002);
+ pbPartitionInfo->set_status(1);
+ pbPartitionInfo->set_oldavailable(true);
+ auto pbPartition = pbPartitionInfo->mutable_partition();
+ pbPartition->set_epoch(1003);
+
+ TransportMessage transportMessage(
+ CHANGE_LOCATION_RESPONSE, pbChangeLocationResponse.SerializeAsString());
+ auto changeLocationResponse =
+ ChangeLocationResponse::fromTransportMessage(transportMessage);
+ EXPECT_EQ(changeLocationResponse->endedMapIds.size(), 1);
+ EXPECT_EQ(changeLocationResponse->endedMapIds[0], 1001);
+ EXPECT_EQ(changeLocationResponse->partitionInfos.size(), 1);
+ auto partitionInfo = &changeLocationResponse->partitionInfos[0];
+ EXPECT_EQ(partitionInfo->partitionId, 1002);
+ EXPECT_EQ(partitionInfo->status, 1);
+ EXPECT_EQ(partitionInfo->oldAvailable, true);
+ EXPECT_EQ(partitionInfo->partition->epoch, 1003);
+}
+
TEST(ControlMessagesTest, getReducerFileGroup) {
auto getReducerFileGroup = std::make_unique<GetReducerFileGroup>();
getReducerFileGroup->shuffleId = 1000;
diff --git a/cpp/celeborn/protocol/tests/PartitionLocationTest.cpp
b/cpp/celeborn/protocol/tests/PartitionLocationTest.cpp
index 20c6f40fd..edcb56520 100644
--- a/cpp/celeborn/protocol/tests/PartitionLocationTest.cpp
+++ b/cpp/celeborn/protocol/tests/PartitionLocationTest.cpp
@@ -40,6 +40,24 @@ void verifyStorageInfo(const StorageInfo* storageInfo) {
EXPECT_EQ(storageInfo->availableStorageTypes, 1);
}
+std::unique_ptr<StorageInfo> generateStorageInfo() {
+ auto storageInfo = std::make_unique<StorageInfo>();
+ storageInfo->type = static_cast<StorageInfo::Type>(1);
+ storageInfo->mountPoint = "test_mountpoint";
+ storageInfo->finalResult = true;
+ storageInfo->filePath = "test_filepath";
+ storageInfo->availableStorageTypes = 1;
+ return std::move(storageInfo);
+}
+
+void verifyStorageInfoPb(const PbStorageInfo* pbStorageInfo) {
+ EXPECT_EQ(pbStorageInfo->type(), 1);
+ EXPECT_EQ(pbStorageInfo->mountpoint(), "test_mountpoint");
+ EXPECT_EQ(pbStorageInfo->finalresult(), true);
+ EXPECT_EQ(pbStorageInfo->filepath(), "test_filepath");
+ EXPECT_EQ(pbStorageInfo->availablestoragetypes(), 1);
+}
+
std::unique_ptr<PbPartitionLocation> generateBasicPartitionLocationPb() {
auto pbPartitionLocation = std::make_unique<PbPartitionLocation>();
pbPartitionLocation->set_id(1);
@@ -62,12 +80,41 @@ void verifyBasicPartitionLocation(const PartitionLocation*
partitionLocation) {
EXPECT_EQ(partitionLocation->replicatePort, 1004);
}
+std::unique_ptr<PartitionLocation> generateBasicPartitionLocation() {
+ auto partitionLocation = std::make_unique<PartitionLocation>();
+ partitionLocation->id = 1;
+ partitionLocation->epoch = 101;
+ partitionLocation->host = "test_host";
+ partitionLocation->rpcPort = 1001;
+ partitionLocation->pushPort = 1002;
+ partitionLocation->fetchPort = 1003;
+ partitionLocation->replicatePort = 1004;
+ return std::move(partitionLocation);
+}
+
+void verifyBasicPartitionLocationPb(
+ const PbPartitionLocation* pbPartitionLocation) {
+ EXPECT_EQ(pbPartitionLocation->id(), 1);
+ EXPECT_EQ(pbPartitionLocation->epoch(), 101);
+ EXPECT_EQ(pbPartitionLocation->host(), "test_host");
+ EXPECT_EQ(pbPartitionLocation->rpcport(), 1001);
+ EXPECT_EQ(pbPartitionLocation->pushport(), 1002);
+ EXPECT_EQ(pbPartitionLocation->fetchport(), 1003);
+ EXPECT_EQ(pbPartitionLocation->replicateport(), 1004);
+}
+
TEST(PartitionLocationTest, storageInfoFromPb) {
auto pbStorageInfo = generateStorageInfoPb();
auto storageInfo = StorageInfo::fromPb(*pbStorageInfo);
verifyStorageInfo(storageInfo.get());
}
+TEST(PartitionLocationTest, storageInfoToProto) {
+ auto storageInfo = generateStorageInfo();
+ auto pbStorageInfo = storageInfo->toPb();
+ verifyStorageInfoPb(pbStorageInfo.get());
+}
+
TEST(PartitionLocationTest, fromPbWithoutPeer) {
auto pbPartitionLocation = generateBasicPartitionLocationPb();
pbPartitionLocation->set_mode(PbPartitionLocation_Mode_Primary);
@@ -109,3 +156,40 @@ TEST(PartitionLocationTest, fromPbWithPeer) {
EXPECT_EQ(partitionLocationReplica->mode, PartitionLocation::Mode::REPLICA);
verifyStorageInfo(partitionLocationReplica->storageInfo.get());
}
+
+TEST(PartitionLocationTest, toProtoWithoutPeer) {
+ auto partitionLocation = generateBasicPartitionLocation();
+ partitionLocation->mode = PartitionLocation::PRIMARY;
+ partitionLocation->storageInfo = generateStorageInfo();
+
+ auto pbPartitionLocation = partitionLocation->toPb();
+
+ verifyBasicPartitionLocationPb(pbPartitionLocation.get());
+ EXPECT_EQ(pbPartitionLocation->mode(), PbPartitionLocation_Mode_Primary);
+ verifyStorageInfoPb(&pbPartitionLocation->storageinfo());
+}
+
+TEST(PartitionLocationTest, toProtoWithPeer) {
+ auto partitionLocationPrimary = generateBasicPartitionLocation();
+ partitionLocationPrimary->mode = PartitionLocation::PRIMARY;
+ partitionLocationPrimary->storageInfo = generateStorageInfo();
+
+ auto partitionLocationReplica = generateBasicPartitionLocation();
+ partitionLocationReplica->mode = PartitionLocation::REPLICA;
+ partitionLocationReplica->storageInfo = generateStorageInfo();
+
+ partitionLocationPrimary->replicaPeer = std::move(partitionLocationReplica);
+
+ auto pbPartitionLocationPrimary = partitionLocationPrimary->toPb();
+
+ verifyBasicPartitionLocationPb(pbPartitionLocationPrimary.get());
+ EXPECT_EQ(
+ pbPartitionLocationPrimary->mode(), PbPartitionLocation_Mode_Primary);
+ verifyStorageInfoPb(&pbPartitionLocationPrimary->storageinfo());
+
+ auto pbPartitionLocationReplica = &pbPartitionLocationPrimary->peer();
+ verifyBasicPartitionLocationPb(pbPartitionLocationReplica);
+ EXPECT_EQ(
+ pbPartitionLocationReplica->mode(), PbPartitionLocation_Mode_Replica);
+ verifyStorageInfoPb(&pbPartitionLocationReplica->storageinfo());
+}