This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 7e13c9934 [CELEBORN-2098][CIP-14] Support Revive/Response in cppClient
7e13c9934 is described below

commit 7e13c9934fdafb26c916fd9f5ee6ea9f47de9e94
Author: HolyLow <[email protected]>
AuthorDate: Sat Aug 16 12:59:32 2025 +0800

    [CELEBORN-2098][CIP-14] Support Revive/Response in cppClient
    
    ### What changes were proposed in this pull request?
    This PR supports Revive/ChangeLocationResponse messages in cppClient.
    
    ### Why are the changes needed?
    These messages are used when writing triggers revive operation.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Compilation and UTs.
    
    Closes #3413 from 
HolyLow/issue/celeborn-2098-support-revive-changelocationresponse.
    
    Authored-by: HolyLow <[email protected]>
    Signed-off-by: SteNicholas <[email protected]>
---
 cpp/celeborn/protocol/ControlMessages.cpp          | 65 +++++++++++++++++
 cpp/celeborn/protocol/ControlMessages.h            | 43 +++++++++++
 cpp/celeborn/protocol/PartitionLocation.cpp        | 34 +++++++++
 cpp/celeborn/protocol/PartitionLocation.h          |  6 ++
 .../protocol/tests/ControlMessagesTest.cpp         | 47 ++++++++++++
 .../protocol/tests/PartitionLocationTest.cpp       | 84 ++++++++++++++++++++++
 6 files changed, 279 insertions(+)

diff --git a/cpp/celeborn/protocol/ControlMessages.cpp 
b/cpp/celeborn/protocol/ControlMessages.cpp
index 6354b46ea..29742ee73 100644
--- a/cpp/celeborn/protocol/ControlMessages.cpp
+++ b/cpp/celeborn/protocol/ControlMessages.cpp
@@ -123,6 +123,71 @@ std::unique_ptr<MapperEndResponse> 
MapperEndResponse::fromTransportMessage(
   return std::move(response);
 }
 
+ReviveRequest::ReviveRequest(
+    int _shuffleId,
+    int _mapId,
+    int _attemptId,
+    int _partitionId,
+    int _epoch,
+    std::shared_ptr<const PartitionLocation> _loc,
+    StatusCode _cause)
+    : shuffleId(_shuffleId),
+      mapId(_mapId),
+      attemptId(_attemptId),
+      partitionId(_partitionId),
+      epoch(_epoch),
+      loc(std::move(_loc)),
+      cause(_cause) {}
+
+TransportMessage Revive::toTransportMessage() const {
+  MessageType type = CHANGE_LOCATION;
+  PbRevive pb;
+  pb.set_shuffleid(shuffleId);
+  for (auto mapId : mapIds) {
+    pb.add_mapid(mapId);
+  }
+  for (auto& reviveRequest : reviveRequests) {
+    auto pbRevivePartitionInfo = pb.add_partitioninfo();
+    pbRevivePartitionInfo->set_partitionid(reviveRequest->partitionId);
+    pbRevivePartitionInfo->set_epoch(reviveRequest->epoch);
+    pbRevivePartitionInfo->set_status(reviveRequest->cause);
+    if (reviveRequest->loc) {
+      pbRevivePartitionInfo->set_allocated_partition(
+          reviveRequest->loc->toPb().release());
+    }
+  }
+  std::string payload = pb.SerializeAsString();
+  return TransportMessage(type, std::move(payload));
+}
+
+std::unique_ptr<ChangeLocationResponse>
+ChangeLocationResponse::fromTransportMessage(
+    const TransportMessage& transportMessage) {
+  CELEBORN_CHECK(
+      transportMessage.type() == CHANGE_LOCATION_RESPONSE,
+      "transportMessageType mismatch");
+  auto payload = transportMessage.payload();
+  auto pbChangeLocationResponse = utils::parseProto<PbChangeLocationResponse>(
+      reinterpret_cast<const uint8_t*>(payload.c_str()), payload.size());
+  auto response = std::make_unique<ChangeLocationResponse>();
+  response->endedMapIds.reserve(pbChangeLocationResponse->endedmapid_size());
+  for (auto endedMapId : pbChangeLocationResponse->endedmapid()) {
+    response->endedMapIds.push_back(endedMapId);
+  }
+  int numPartitionInfo = pbChangeLocationResponse->partitioninfo_size();
+  response->partitionInfos.resize(numPartitionInfo);
+  for (int i = 0; i < numPartitionInfo; i++) {
+    auto& partitionInfo = response->partitionInfos[i];
+    auto& pbPartitionInfo = pbChangeLocationResponse->partitioninfo(i);
+    partitionInfo.partitionId = pbPartitionInfo.partitionid();
+    partitionInfo.status = toStatusCode(pbPartitionInfo.status());
+    partitionInfo.partition =
+        PartitionLocation::fromPb(pbPartitionInfo.partition());
+    partitionInfo.oldAvailable = pbPartitionInfo.oldavailable();
+  }
+  return std::move(response);
+}
+
 TransportMessage GetReducerFileGroup::toTransportMessage() const {
   MessageType type = GET_REDUCER_FILE_GROUP;
   PbGetReducerFileGroup pb;
diff --git a/cpp/celeborn/protocol/ControlMessages.h 
b/cpp/celeborn/protocol/ControlMessages.h
index b3e360a0e..9df08deed 100644
--- a/cpp/celeborn/protocol/ControlMessages.h
+++ b/cpp/celeborn/protocol/ControlMessages.h
@@ -60,6 +60,49 @@ struct MapperEndResponse {
       const TransportMessage& transportMessage);
 };
 
+struct ReviveRequest {
+  int shuffleId;
+  int mapId;
+  int attemptId;
+  int partitionId;
+  int epoch;
+  std::shared_ptr<const PartitionLocation> loc;
+  StatusCode cause;
+  std::atomic<int> reviveStatus{StatusCode::REVIVE_INITIALIZED};
+
+  ReviveRequest(
+      int _shuffleId,
+      int _mapId,
+      int _attemptId,
+      int _partitionId,
+      int _epoch,
+      std::shared_ptr<const PartitionLocation> _loc,
+      StatusCode _cause);
+};
+
+struct Revive {
+  int shuffleId;
+  std::unordered_set<int> mapIds;
+  std::unordered_set<std::shared_ptr<ReviveRequest>> reviveRequests;
+
+  TransportMessage toTransportMessage() const;
+};
+
+struct ChangeLocationPartitionInfo {
+  int partitionId;
+  StatusCode status;
+  std::shared_ptr<const PartitionLocation> partition;
+  bool oldAvailable;
+};
+
+struct ChangeLocationResponse {
+  std::vector<int> endedMapIds;
+  std::vector<ChangeLocationPartitionInfo> partitionInfos;
+
+  static std::unique_ptr<ChangeLocationResponse> fromTransportMessage(
+      const TransportMessage& transportMessage);
+};
+
 struct GetReducerFileGroup {
   int shuffleId;
 
diff --git a/cpp/celeborn/protocol/PartitionLocation.cpp 
b/cpp/celeborn/protocol/PartitionLocation.cpp
index 6573ec2a2..a1820c5df 100644
--- a/cpp/celeborn/protocol/PartitionLocation.cpp
+++ b/cpp/celeborn/protocol/PartitionLocation.cpp
@@ -33,6 +33,16 @@ std::unique_ptr<StorageInfo> StorageInfo::fromPb(const 
PbStorageInfo& pb) {
   return std::move(result);
 }
 
+std::unique_ptr<PbStorageInfo> StorageInfo::toPb() const {
+  auto pbStorageInfo = std::make_unique<PbStorageInfo>();
+  pbStorageInfo->set_type(type);
+  pbStorageInfo->set_mountpoint(mountPoint);
+  pbStorageInfo->set_finalresult(finalResult);
+  pbStorageInfo->set_filepath(filePath);
+  pbStorageInfo->set_availablestoragetypes(availableStorageTypes);
+  return pbStorageInfo;
+}
+
 std::unique_ptr<const PartitionLocation> PartitionLocation::fromPb(
     const PbPartitionLocation& pb) {
   auto result = fromPbWithoutPeer(pb);
@@ -98,6 +108,15 @@ PartitionLocation::PartitionLocation(const 
PartitionLocation& other)
               : nullptr),
       storageInfo(std::make_unique<StorageInfo>(*other.storageInfo)) {}
 
+std::unique_ptr<PbPartitionLocation> PartitionLocation::toPb() const {
+  auto pbPartitionLocation = toPbWithoutPeer();
+  if (replicaPeer) {
+    auto pbPeerPartitionLocation = replicaPeer->toPbWithoutPeer();
+    pbPartitionLocation->set_allocated_peer(pbPeerPartitionLocation.release());
+  }
+  return pbPartitionLocation;
+}
+
 std::unique_ptr<PartitionLocation> PartitionLocation::fromPbWithoutPeer(
     const PbPartitionLocation& pb) {
   auto result = std::make_unique<PartitionLocation>();
@@ -114,6 +133,21 @@ std::unique_ptr<PartitionLocation> 
PartitionLocation::fromPbWithoutPeer(
   return std::move(result);
 }
 
+std::unique_ptr<PbPartitionLocation> PartitionLocation::toPbWithoutPeer()
+    const {
+  auto pbPartitionLocation = std::make_unique<PbPartitionLocation>();
+  pbPartitionLocation->set_id(id);
+  pbPartitionLocation->set_epoch(epoch);
+  pbPartitionLocation->set_host(host);
+  pbPartitionLocation->set_rpcport(rpcPort);
+  pbPartitionLocation->set_pushport(pushPort);
+  pbPartitionLocation->set_fetchport(fetchPort);
+  pbPartitionLocation->set_replicateport(replicatePort);
+  pbPartitionLocation->set_mode(static_cast<PbPartitionLocation_Mode>(mode));
+  
pbPartitionLocation->set_allocated_storageinfo(storageInfo->toPb().release());
+  return pbPartitionLocation;
+}
+
 StatusCode toStatusCode(int32_t code) {
   CELEBORN_CHECK(code >= 0);
   CELEBORN_CHECK(code <= StatusCode::TAIL);
diff --git a/cpp/celeborn/protocol/PartitionLocation.h 
b/cpp/celeborn/protocol/PartitionLocation.h
index 700773cba..4e88d16de 100644
--- a/cpp/celeborn/protocol/PartitionLocation.h
+++ b/cpp/celeborn/protocol/PartitionLocation.h
@@ -31,6 +31,8 @@ struct StorageInfo {
 
   StorageInfo(const StorageInfo& other) = default;
 
+  std::unique_ptr<PbStorageInfo> toPb() const;
+
   enum Type {
     MEMORY = 0,
     HDD = 1,
@@ -88,6 +90,8 @@ struct PartitionLocation {
 
   PartitionLocation(const PartitionLocation& other);
 
+  std::unique_ptr<PbPartitionLocation> toPb() const;
+
   std::string filename() const {
     return std::to_string(id) + "-" + std::to_string(epoch) + "-" +
         std::to_string(mode);
@@ -96,6 +100,8 @@ struct PartitionLocation {
  private:
   static std::unique_ptr<PartitionLocation> fromPbWithoutPeer(
       const PbPartitionLocation& pb);
+
+  std::unique_ptr<PbPartitionLocation> toPbWithoutPeer() const;
 };
 } // namespace protocol
 } // namespace celeborn
diff --git a/cpp/celeborn/protocol/tests/ControlMessagesTest.cpp 
b/cpp/celeborn/protocol/tests/ControlMessagesTest.cpp
index 0e4688072..bd27d9a6c 100644
--- a/cpp/celeborn/protocol/tests/ControlMessagesTest.cpp
+++ b/cpp/celeborn/protocol/tests/ControlMessagesTest.cpp
@@ -142,6 +142,53 @@ TEST(ControlMessagesTest, mapperEndResponse) {
   EXPECT_EQ(mapperEndResponse->status, 1);
 }
 
+TEST(ControlMessagesTest, revive) {
+  auto revive = std::make_unique<Revive>();
+  revive->shuffleId = 1000;
+  revive->mapIds.insert(1001);
+  auto reviveRequest = std::make_shared<ReviveRequest>(
+      1000, 1001, 1002, 1003, 1004, nullptr, StatusCode::SLOT_NOT_AVAILABLE);
+  revive->reviveRequests.insert(reviveRequest);
+
+  auto transportMessage = revive->toTransportMessage();
+  EXPECT_EQ(transportMessage.type(), CHANGE_LOCATION);
+  auto payload = transportMessage.payload();
+  auto pbRevive = utils::parseProto<PbRevive>(
+      reinterpret_cast<const uint8_t*>(payload.c_str()), payload.size());
+  EXPECT_EQ(pbRevive->shuffleid(), 1000);
+  EXPECT_EQ(pbRevive->mapid_size(), 1);
+  EXPECT_EQ(pbRevive->mapid(0), 1001);
+  EXPECT_EQ(pbRevive->partitioninfo_size(), 1);
+  auto& pbPartitionInfo = pbRevive->partitioninfo(0);
+  EXPECT_EQ(pbPartitionInfo.partitionid(), reviveRequest->partitionId);
+  EXPECT_EQ(pbPartitionInfo.epoch(), reviveRequest->epoch);
+  EXPECT_EQ(pbPartitionInfo.status(), reviveRequest->cause);
+}
+
+TEST(ControlMessagesTest, changeLocationResponse) {
+  PbChangeLocationResponse pbChangeLocationResponse;
+  pbChangeLocationResponse.add_endedmapid(1001);
+  auto pbPartitionInfo = pbChangeLocationResponse.add_partitioninfo();
+  pbPartitionInfo->set_partitionid(1002);
+  pbPartitionInfo->set_status(1);
+  pbPartitionInfo->set_oldavailable(true);
+  auto pbPartition = pbPartitionInfo->mutable_partition();
+  pbPartition->set_epoch(1003);
+
+  TransportMessage transportMessage(
+      CHANGE_LOCATION_RESPONSE, pbChangeLocationResponse.SerializeAsString());
+  auto changeLocationResponse =
+      ChangeLocationResponse::fromTransportMessage(transportMessage);
+  EXPECT_EQ(changeLocationResponse->endedMapIds.size(), 1);
+  EXPECT_EQ(changeLocationResponse->endedMapIds[0], 1001);
+  EXPECT_EQ(changeLocationResponse->partitionInfos.size(), 1);
+  auto partitionInfo = &changeLocationResponse->partitionInfos[0];
+  EXPECT_EQ(partitionInfo->partitionId, 1002);
+  EXPECT_EQ(partitionInfo->status, 1);
+  EXPECT_EQ(partitionInfo->oldAvailable, true);
+  EXPECT_EQ(partitionInfo->partition->epoch, 1003);
+}
+
 TEST(ControlMessagesTest, getReducerFileGroup) {
   auto getReducerFileGroup = std::make_unique<GetReducerFileGroup>();
   getReducerFileGroup->shuffleId = 1000;
diff --git a/cpp/celeborn/protocol/tests/PartitionLocationTest.cpp 
b/cpp/celeborn/protocol/tests/PartitionLocationTest.cpp
index 20c6f40fd..edcb56520 100644
--- a/cpp/celeborn/protocol/tests/PartitionLocationTest.cpp
+++ b/cpp/celeborn/protocol/tests/PartitionLocationTest.cpp
@@ -40,6 +40,24 @@ void verifyStorageInfo(const StorageInfo* storageInfo) {
   EXPECT_EQ(storageInfo->availableStorageTypes, 1);
 }
 
+std::unique_ptr<StorageInfo> generateStorageInfo() {
+  auto storageInfo = std::make_unique<StorageInfo>();
+  storageInfo->type = static_cast<StorageInfo::Type>(1);
+  storageInfo->mountPoint = "test_mountpoint";
+  storageInfo->finalResult = true;
+  storageInfo->filePath = "test_filepath";
+  storageInfo->availableStorageTypes = 1;
+  return std::move(storageInfo);
+}
+
+void verifyStorageInfoPb(const PbStorageInfo* pbStorageInfo) {
+  EXPECT_EQ(pbStorageInfo->type(), 1);
+  EXPECT_EQ(pbStorageInfo->mountpoint(), "test_mountpoint");
+  EXPECT_EQ(pbStorageInfo->finalresult(), true);
+  EXPECT_EQ(pbStorageInfo->filepath(), "test_filepath");
+  EXPECT_EQ(pbStorageInfo->availablestoragetypes(), 1);
+}
+
 std::unique_ptr<PbPartitionLocation> generateBasicPartitionLocationPb() {
   auto pbPartitionLocation = std::make_unique<PbPartitionLocation>();
   pbPartitionLocation->set_id(1);
@@ -62,12 +80,41 @@ void verifyBasicPartitionLocation(const PartitionLocation* 
partitionLocation) {
   EXPECT_EQ(partitionLocation->replicatePort, 1004);
 }
 
+std::unique_ptr<PartitionLocation> generateBasicPartitionLocation() {
+  auto partitionLocation = std::make_unique<PartitionLocation>();
+  partitionLocation->id = 1;
+  partitionLocation->epoch = 101;
+  partitionLocation->host = "test_host";
+  partitionLocation->rpcPort = 1001;
+  partitionLocation->pushPort = 1002;
+  partitionLocation->fetchPort = 1003;
+  partitionLocation->replicatePort = 1004;
+  return std::move(partitionLocation);
+}
+
+void verifyBasicPartitionLocationPb(
+    const PbPartitionLocation* pbPartitionLocation) {
+  EXPECT_EQ(pbPartitionLocation->id(), 1);
+  EXPECT_EQ(pbPartitionLocation->epoch(), 101);
+  EXPECT_EQ(pbPartitionLocation->host(), "test_host");
+  EXPECT_EQ(pbPartitionLocation->rpcport(), 1001);
+  EXPECT_EQ(pbPartitionLocation->pushport(), 1002);
+  EXPECT_EQ(pbPartitionLocation->fetchport(), 1003);
+  EXPECT_EQ(pbPartitionLocation->replicateport(), 1004);
+}
+
 TEST(PartitionLocationTest, storageInfoFromPb) {
   auto pbStorageInfo = generateStorageInfoPb();
   auto storageInfo = StorageInfo::fromPb(*pbStorageInfo);
   verifyStorageInfo(storageInfo.get());
 }
 
+TEST(PartitionLocationTest, storageInfoToProto) {
+  auto storageInfo = generateStorageInfo();
+  auto pbStorageInfo = storageInfo->toPb();
+  verifyStorageInfoPb(pbStorageInfo.get());
+}
+
 TEST(PartitionLocationTest, fromPbWithoutPeer) {
   auto pbPartitionLocation = generateBasicPartitionLocationPb();
   pbPartitionLocation->set_mode(PbPartitionLocation_Mode_Primary);
@@ -109,3 +156,40 @@ TEST(PartitionLocationTest, fromPbWithPeer) {
   EXPECT_EQ(partitionLocationReplica->mode, PartitionLocation::Mode::REPLICA);
   verifyStorageInfo(partitionLocationReplica->storageInfo.get());
 }
+
+TEST(PartitionLocationTest, toProtoWithoutPeer) {
+  auto partitionLocation = generateBasicPartitionLocation();
+  partitionLocation->mode = PartitionLocation::PRIMARY;
+  partitionLocation->storageInfo = generateStorageInfo();
+
+  auto pbPartitionLocation = partitionLocation->toPb();
+
+  verifyBasicPartitionLocationPb(pbPartitionLocation.get());
+  EXPECT_EQ(pbPartitionLocation->mode(), PbPartitionLocation_Mode_Primary);
+  verifyStorageInfoPb(&pbPartitionLocation->storageinfo());
+}
+
+TEST(PartitionLocationTest, toProtoWithPeer) {
+  auto partitionLocationPrimary = generateBasicPartitionLocation();
+  partitionLocationPrimary->mode = PartitionLocation::PRIMARY;
+  partitionLocationPrimary->storageInfo = generateStorageInfo();
+
+  auto partitionLocationReplica = generateBasicPartitionLocation();
+  partitionLocationReplica->mode = PartitionLocation::REPLICA;
+  partitionLocationReplica->storageInfo = generateStorageInfo();
+
+  partitionLocationPrimary->replicaPeer = std::move(partitionLocationReplica);
+
+  auto pbPartitionLocationPrimary = partitionLocationPrimary->toPb();
+
+  verifyBasicPartitionLocationPb(pbPartitionLocationPrimary.get());
+  EXPECT_EQ(
+      pbPartitionLocationPrimary->mode(), PbPartitionLocation_Mode_Primary);
+  verifyStorageInfoPb(&pbPartitionLocationPrimary->storageinfo());
+
+  auto pbPartitionLocationReplica = &pbPartitionLocationPrimary->peer();
+  verifyBasicPartitionLocationPb(pbPartitionLocationReplica);
+  EXPECT_EQ(
+      pbPartitionLocationReplica->mode(), PbPartitionLocation_Mode_Replica);
+  verifyStorageInfoPb(&pbPartitionLocationReplica->storageinfo());
+}

Reply via email to