This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 2460efd95 [CELEBORN-2222][CIP-14] Support Retrying when createReader 
failed for CelebornInputStream in CppClient
2460efd95 is described below

commit 2460efd95e59dfa58a53f334fe2d25c3073cc013
Author: afterincomparableyum 
<[email protected]>
AuthorDate: Thu Feb 26 11:18:48 2026 +0800

    [CELEBORN-2222][CIP-14] Support Retrying when createReader failed for 
CelebornInputStream in CppClient
    
    This PR implements retry support for createReader failures in the C++ 
client, matching the behavior of the Java implementation. The implementation 
includes:
    
    - Added configuration properties:
      * clientFetchMaxRetriesForEachReplica (default: 3)
      * dataIoRetryWait (default: 5s)
      * clientPushReplicateEnabled (default: false)
      * excludeWorkerOnFailure (default: false)
      * excludedWorker.expireTimeout (default: 60s)
      * optimizeSkewedPartitionRead (default: false)
    
    - Added peer location support methods to PartitionLocation:
      * hasPeer() - Check if location has a peer replica
      * getPeer() - Get the peer location
      * hostAndFetchPort() - Get host:port string for logging
    
    - Implemented retry logic in createReaderWithRetry():
      * Retries up to fetchChunkMaxRetry_ times (doubled if replication 
enabled)[which is why I added this parameter in this PR]
      * Switches to peer location on failure when available
      * Sleeps between retries when both replicas tried or no peer exists
      * Resets retry counter when moving to new location or on success
    
    - Added unit tests for new functionality
    
    ### How was this patch tested?
    
    Unit tests and compiling
    
    Closes #3583 from afterincomparableyum/cpp-client/celeborn-2222.
    
    Authored-by: afterincomparableyum 
<[email protected]>
    Signed-off-by: SteNicholas <[email protected]>
---
 cpp/celeborn/client/ShuffleClient.cpp              |  22 +-
 cpp/celeborn/client/ShuffleClient.h                |  14 +
 cpp/celeborn/client/reader/CelebornInputStream.cpp | 112 +++++-
 cpp/celeborn/client/reader/CelebornInputStream.h   |  19 +-
 cpp/celeborn/client/tests/CMakeLists.txt           |   1 +
 .../client/tests/CelebornInputStreamRetryTest.cpp  | 411 +++++++++++++++++++++
 cpp/celeborn/conf/CelebornConf.cpp                 |  38 +-
 cpp/celeborn/conf/CelebornConf.h                   |  31 ++
 cpp/celeborn/conf/tests/CelebornConfTest.cpp       |  30 ++
 cpp/celeborn/protocol/PartitionLocation.cpp        |  12 +
 cpp/celeborn/protocol/PartitionLocation.h          |   6 +
 .../protocol/tests/PartitionLocationTest.cpp       |  52 +++
 cpp/celeborn/utils/CelebornUtils.cpp               |  15 +
 cpp/celeborn/utils/CelebornUtils.h                 |   6 +
 cpp/celeborn/utils/tests/CelebornUtilsTest.cpp     |  55 +++
 15 files changed, 814 insertions(+), 10 deletions(-)

diff --git a/cpp/celeborn/client/ShuffleClient.cpp 
b/cpp/celeborn/client/ShuffleClient.cpp
index 35887135e..5f1e82dce 100644
--- a/cpp/celeborn/client/ShuffleClient.cpp
+++ b/cpp/celeborn/client/ShuffleClient.cpp
@@ -67,7 +67,11 @@ ShuffleClientImpl::ShuffleClientImpl(
                     [conf]() {
                       return compress::Compressor::createCompressor(*conf);
                     })
-              : std::function<std::unique_ptr<compress::Compressor>()>()) {
+              : std::function<std::unique_ptr<compress::Compressor>()>()),
+      pushReplicateEnabled_(conf->clientPushReplicateEnabled()),
+      fetchExcludeWorkerOnFailureEnabled_(
+          conf->clientFetchExcludeWorkerOnFailureEnabled()),
+      fetchExcludedWorkers_(std::make_shared<FetchExcludedWorkers>()) {
   CELEBORN_CHECK_NOT_NULL(clientFactory_);
   CELEBORN_CHECK_NOT_NULL(pushDataRetryPool_);
 }
@@ -348,7 +352,21 @@ std::unique_ptr<CelebornInputStream> 
ShuffleClientImpl::readPartition(
       attemptNumber,
       startMapIndex,
       endMapIndex,
-      needCompression);
+      needCompression,
+      fetchExcludedWorkers_,
+      this);
+}
+
+void ShuffleClientImpl::excludeFailedFetchLocation(
+    const std::string& hostAndFetchPort,
+    const std::exception& e) {
+  if (pushReplicateEnabled_ && fetchExcludeWorkerOnFailureEnabled_ &&
+      utils::isCriticalCauseForFetch(e)) {
+    auto now = std::chrono::duration_cast<std::chrono::milliseconds>(
+                   std::chrono::steady_clock::now().time_since_epoch())
+                   .count();
+    fetchExcludedWorkers_->set(hostAndFetchPort, now);
+  }
 }
 
 void ShuffleClientImpl::updateReducerFileGroup(int shuffleId) {
diff --git a/cpp/celeborn/client/ShuffleClient.h 
b/cpp/celeborn/client/ShuffleClient.h
index e899ba0cd..0a7946d12 100644
--- a/cpp/celeborn/client/ShuffleClient.h
+++ b/cpp/celeborn/client/ShuffleClient.h
@@ -55,6 +55,8 @@ class ShuffleClient {
 
   virtual void updateReducerFileGroup(int shuffleId) = 0;
 
+  using FetchExcludedWorkers = utils::ConcurrentHashMap<std::string, int64_t>;
+
   virtual std::unique_ptr<CelebornInputStream> readPartition(
       int shuffleId,
       int partitionId,
@@ -70,6 +72,10 @@ class ShuffleClient {
       int endMapIndex,
       bool needCompression) = 0;
 
+  virtual void excludeFailedFetchLocation(
+      const std::string& hostAndFetchPort,
+      const std::exception& e) = 0;
+
   virtual bool cleanupShuffle(int shuffleId) = 0;
 
   virtual void shutdown() = 0;
@@ -163,6 +169,10 @@ class ShuffleClientImpl
       int endMapIndex,
       bool needCompression) override;
 
+  void excludeFailedFetchLocation(
+      const std::string& hostAndFetchPort,
+      const std::exception& e) override;
+
   void updateReducerFileGroup(int shuffleId) override;
 
   bool cleanupShuffle(int shuffleId) override;
@@ -272,6 +282,10 @@ class ShuffleClientImpl
   // Factory for creating compressor instances on demand to avoid sharing a
   // single non-thread-safe compressor across concurrent operations.
   std::function<std::unique_ptr<compress::Compressor>()> compressorFactory_;
+  bool pushReplicateEnabled_;
+  bool fetchExcludeWorkerOnFailureEnabled_;
+  std::shared_ptr<FetchExcludedWorkers> fetchExcludedWorkers_;
+
   // TODO: pushExcludedWorker is not supported yet
 };
 } // namespace client
diff --git a/cpp/celeborn/client/reader/CelebornInputStream.cpp 
b/cpp/celeborn/client/reader/CelebornInputStream.cpp
index d75b705e5..f81152321 100644
--- a/cpp/celeborn/client/reader/CelebornInputStream.cpp
+++ b/cpp/celeborn/client/reader/CelebornInputStream.cpp
@@ -17,7 +17,11 @@
 
 #include "celeborn/client/reader/CelebornInputStream.h"
 #include <lz4.h>
+#include <thread>
+#include "CelebornInputStream.h"
+#include "celeborn/client/ShuffleClient.h"
 #include "celeborn/client/compress/Decompressor.h"
+#include "celeborn/utils/CelebornUtils.h"
 
 namespace celeborn {
 namespace client {
@@ -30,7 +34,9 @@ CelebornInputStream::CelebornInputStream(
     int attemptNumber,
     int startMapIndex,
     int endMapIndex,
-    bool needCompression)
+    bool needCompression,
+    const std::shared_ptr<FetchExcludedWorkers>& fetchExcludedWorkers,
+    ShuffleClient* shuffleClient)
     : shuffleKey_(shuffleKey),
       conf_(conf),
       clientFactory_(clientFactory),
@@ -45,7 +51,22 @@ CelebornInputStream::CelebornInputStream(
       shouldDecompress_(
           conf_->shuffleCompressionCodec() !=
               protocol::CompressionCodec::NONE &&
-          needCompression) {
+          needCompression),
+      fetchChunkRetryCnt_(0),
+      fetchChunkMaxRetry_(
+          conf_->clientPushReplicateEnabled()
+              ? conf_->clientFetchMaxRetriesForEachReplica() * 2
+              : conf_->clientFetchMaxRetriesForEachReplica()),
+      retryWait_(conf_->networkIoRetryWait()),
+      fetchExcludedWorkers_(fetchExcludedWorkers),
+      fetchExcludedWorkerExpireTimeoutMs_(
+          std::chrono::duration_cast<std::chrono::milliseconds>(
+              conf_->clientFetchExcludedWorkerExpireTimeout())
+              .count()),
+      readSkewPartitionWithoutMapRange_(
+          conf_->clientAdaptiveOptimizeSkewedPartitionReadEnabled() &&
+          startMapIndex > endMapIndex),
+      shuffleClient_(shuffleClient) {
   if (shouldDecompress_) {
     decompressor_ = compress::Decompressor::createDecompressor(
         conf_->shuffleCompressionCodec());
@@ -178,6 +199,7 @@ void CelebornInputStream::moveToNextReader() {
   if (!location) {
     return;
   }
+  fetchChunkRetryCnt_ = 0;
   currReader_ = createReaderWithRetry(*location);
   currLocationIndex_++;
   if (currReader_->hasNext()) {
@@ -189,9 +211,62 @@ void CelebornInputStream::moveToNextReader() {
 
 std::shared_ptr<PartitionReader> CelebornInputStream::createReaderWithRetry(
     const protocol::PartitionLocation& location) {
-  // TODO: support retrying when createReader failed. Maybe switch to peer
-  // location?
-  return createReader(location);
+  const protocol::PartitionLocation* currentLocation = &location;
+  std::exception_ptr lastException;
+
+  while (fetchChunkRetryCnt_ < fetchChunkMaxRetry_) {
+    // TODO (Investigate): When a location is already in the exclusion list, 
the
+    // code throws and then retries/sleeps (especially in the no-peer branch).
+    // Since isExcluded(*currentLocation) will keep returning true until the
+    // exclusion expires, these retries are guaranteed to fail and just add
+    // delay. Consider failing fast (no sleep/retry) or skipping to another
+    // available location/peer when the current location is excluded.
+    try {
+      VLOG(1) << "Create reader for location " << currentLocation->host << ":"
+              << currentLocation->fetchPort;
+      if (isExcluded(*currentLocation)) {
+        CELEBORN_FAIL(
+            "Fetch data from excluded worker! {}",
+            currentLocation->hostAndFetchPort());
+      }
+      auto reader = createReader(*currentLocation);
+      return reader;
+    } catch (const std::exception& e) {
+      lastException = std::current_exception();
+      shuffleClient_->excludeFailedFetchLocation(
+          currentLocation->hostAndFetchPort(), e);
+      fetchChunkRetryCnt_++;
+
+      if (currentLocation->hasPeer() && !readSkewPartitionWithoutMapRange_) {
+        if (fetchChunkRetryCnt_ % 2 == 0) {
+          std::this_thread::sleep_for(retryWait_);
+        }
+        LOG(WARNING) << "CreatePartitionReader failed " << fetchChunkRetryCnt_
+                     << "/" << fetchChunkMaxRetry_ << " times for location "
+                     << currentLocation->hostAndFetchPort()
+                     << ", change to peer. Error: " << e.what();
+        // TODO: When stream handlers are supported, send BUFFER_STREAM_END
+        // to close the active stream before switching to peer, matching the
+        // Java CelebornInputStream behavior. Currently, the C++ client does
+        // not have pre-opened stream handlers, so stream cleanup is handled
+        // by WorkerPartitionReader's destructor.
+        currentLocation = currentLocation->getPeer();
+      } else {
+        LOG(WARNING) << "CreatePartitionReader failed " << fetchChunkRetryCnt_
+                     << "/" << fetchChunkMaxRetry_ << " times for location "
+                     << currentLocation->hostAndFetchPort()
+                     << ", retry the same location. Error: " << e.what();
+        std::this_thread::sleep_for(retryWait_);
+      }
+    }
+  }
+
+  // Max retries exceeded, rethrow the last exception wrapped with context
+  std::string errorMessage = "createPartitionReader failed after " +
+      std::to_string(fetchChunkRetryCnt_) +
+      " retries. Original location: " + location.hostAndFetchPort() +
+      ", last attempted location: " + currentLocation->hostAndFetchPort();
+  throw utils::CelebornRuntimeError(lastException, errorMessage, false);
 }
 
 std::shared_ptr<PartitionReader> CelebornInputStream::createReader(
@@ -236,6 +311,33 @@ std::unordered_set<int>& 
CelebornInputStream::getBatchRecord(int mapId) {
   return *batchRecords_[mapId];
 }
 
+bool CelebornInputStream::isExcluded(
+    const protocol::PartitionLocation& location) {
+  auto now = std::chrono::duration_cast<std::chrono::milliseconds>(
+                 std::chrono::steady_clock::now().time_since_epoch())
+                 .count();
+  auto timestamp = fetchExcludedWorkers_->get(location.hostAndFetchPort());
+  if (!timestamp.has_value()) {
+    return false;
+  }
+  if (now - timestamp.value() > fetchExcludedWorkerExpireTimeoutMs_) {
+    fetchExcludedWorkers_->erase(location.hostAndFetchPort());
+    return false;
+  }
+  if (location.hasPeer()) {
+    auto peerTimestamp =
+        fetchExcludedWorkers_->get(location.getPeer()->hostAndFetchPort());
+    // To avoid both replicate locations being excluded, if peer was added to
+    // excluded list earlier, change to try peer.
+    if (!peerTimestamp.has_value() ||
+        peerTimestamp.value() < timestamp.value()) {
+      return true;
+    }
+    return false;
+  }
+  return true;
+}
+
 void CelebornInputStream::cleanupReader() {
   currReader_ = nullptr;
 }
diff --git a/cpp/celeborn/client/reader/CelebornInputStream.h 
b/cpp/celeborn/client/reader/CelebornInputStream.h
index 5dd9c4f76..735690be0 100644
--- a/cpp/celeborn/client/reader/CelebornInputStream.h
+++ b/cpp/celeborn/client/reader/CelebornInputStream.h
@@ -20,11 +20,16 @@
 #include "celeborn/client/compress/Decompressor.h"
 #include "celeborn/client/reader/WorkerPartitionReader.h"
 #include "celeborn/conf/CelebornConf.h"
+#include "celeborn/utils/CelebornUtils.h"
 
 namespace celeborn {
 namespace client {
+class ShuffleClient;
+
 class CelebornInputStream {
  public:
+  using FetchExcludedWorkers = utils::ConcurrentHashMap<std::string, int64_t>;
+
   CelebornInputStream(
       const std::string& shuffleKey,
       const std::shared_ptr<const conf::CelebornConf>& conf,
@@ -35,7 +40,9 @@ class CelebornInputStream {
       int attemptNumber,
       int startMapIndex,
       int endMapIndex,
-      bool needCompression);
+      bool needCompression,
+      const std::shared_ptr<FetchExcludedWorkers>& fetchExcludedWorkers,
+      ShuffleClient* shuffleClient);
 
   int read(uint8_t* buffer, size_t offset, size_t len);
 
@@ -56,6 +63,8 @@ class CelebornInputStream {
   std::shared_ptr<PartitionReader> createReader(
       const protocol::PartitionLocation& location);
 
+  bool isExcluded(const protocol::PartitionLocation& location);
+
   std::shared_ptr<const protocol::PartitionLocation> nextReadableLocation();
 
   std::unordered_set<int>& getBatchRecord(int mapId);
@@ -81,6 +90,14 @@ class CelebornInputStream {
   size_t currBatchSize_;
   std::shared_ptr<PartitionReader> currReader_;
   std::vector<std::unique_ptr<std::unordered_set<int>>> batchRecords_;
+
+  int fetchChunkRetryCnt_;
+  int fetchChunkMaxRetry_;
+  utils::Timeout retryWait_;
+  std::shared_ptr<FetchExcludedWorkers> fetchExcludedWorkers_;
+  int64_t fetchExcludedWorkerExpireTimeoutMs_;
+  bool readSkewPartitionWithoutMapRange_;
+  ShuffleClient* shuffleClient_;
 };
 } // namespace client
 } // namespace celeborn
diff --git a/cpp/celeborn/client/tests/CMakeLists.txt 
b/cpp/celeborn/client/tests/CMakeLists.txt
index 63c37c6e1..1402982e0 100644
--- a/cpp/celeborn/client/tests/CMakeLists.txt
+++ b/cpp/celeborn/client/tests/CMakeLists.txt
@@ -17,6 +17,7 @@ add_executable(
         celeborn_client_test
         WorkerPartitionReaderTest.cpp
         PushDataCallbackTest.cpp
+        CelebornInputStreamRetryTest.cpp
         PushStateTest.cpp
         ReviveManagerTest.cpp
         Lz4DecompressorTest.cpp
diff --git a/cpp/celeborn/client/tests/CelebornInputStreamRetryTest.cpp 
b/cpp/celeborn/client/tests/CelebornInputStreamRetryTest.cpp
new file mode 100644
index 000000000..564860459
--- /dev/null
+++ b/cpp/celeborn/client/tests/CelebornInputStreamRetryTest.cpp
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gtest/gtest.h>
+#include <system_error>
+
+#include "celeborn/client/ShuffleClient.h"
+#include "celeborn/client/reader/CelebornInputStream.h"
+#include "celeborn/conf/CelebornConf.h"
+
+using namespace celeborn;
+using namespace celeborn::client;
+using namespace celeborn::network;
+using namespace celeborn::protocol;
+using namespace celeborn::conf;
+
+namespace {
+using MS = std::chrono::milliseconds;
+
+class StubShuffleClient : public ShuffleClient {
+ public:
+  explicit StubShuffleClient(
+      std::shared_ptr<CelebornConf> conf,
+      std::shared_ptr<ShuffleClient::FetchExcludedWorkers> excludedWorkers)
+      : conf_(std::move(conf)), excludedWorkers_(std::move(excludedWorkers)) {}
+
+  void setupLifecycleManagerRef(std::string&, int) override {}
+  void setupLifecycleManagerRef(
+      std::shared_ptr<network::NettyRpcEndpointRef>&) override {}
+  int pushData(int, int, int, int, const uint8_t*, size_t, size_t, int, int)
+      override {
+    return 0;
+  }
+  void mapperEnd(int, int, int, int) override {}
+  void cleanup(int, int, int) override {}
+  void updateReducerFileGroup(int) override {}
+  std::unique_ptr<CelebornInputStream> readPartition(int, int, int, int, int)
+      override {
+    return nullptr;
+  }
+  std::unique_ptr<CelebornInputStream>
+  readPartition(int, int, int, int, int, bool) override {
+    return nullptr;
+  }
+  bool cleanupShuffle(int) override {
+    return true;
+  }
+  void shutdown() override {}
+
+  void excludeFailedFetchLocation(
+      const std::string& hostAndFetchPort,
+      const std::exception& e) override {
+    if (conf_->clientPushReplicateEnabled() &&
+        conf_->clientFetchExcludeWorkerOnFailureEnabled() &&
+        utils::isCriticalCauseForFetch(e)) {
+      auto now = std::chrono::duration_cast<std::chrono::milliseconds>(
+                     std::chrono::steady_clock::now().time_since_epoch())
+                     .count();
+      excludedWorkers_->set(hostAndFetchPort, now);
+    }
+  }
+
+ private:
+  std::shared_ptr<CelebornConf> conf_;
+  std::shared_ptr<ShuffleClient::FetchExcludedWorkers> excludedWorkers_;
+};
+
+// A TransportClient that always throws a configurable exception
+// from sendRpcRequestSync, allowing us to exercise retry logic.
+class FailingTransportClient : public TransportClient {
+ public:
+  explicit FailingTransportClient(std::exception_ptr exceptionToThrow)
+      : TransportClient(nullptr, nullptr, MS(100)),
+        exceptionToThrow_(std::move(exceptionToThrow)) {}
+
+  RpcResponse sendRpcRequestSync(const RpcRequest& request, Timeout timeout)
+      override {
+    std::rethrow_exception(exceptionToThrow_);
+  }
+
+  void sendRpcRequestWithoutResponse(const RpcRequest& request) override {}
+
+  void fetchChunkAsync(
+      const StreamChunkSlice& streamChunkSlice,
+      const RpcRequest& request,
+      FetchChunkSuccessCallback onSuccess,
+      FetchChunkFailureCallback onFailure) override {}
+
+ private:
+  std::exception_ptr exceptionToThrow_;
+};
+
+class TrackingTransportClientFactory : public TransportClientFactory {
+ public:
+  explicit TrackingTransportClientFactory(
+      std::shared_ptr<FailingTransportClient> client)
+      : TransportClientFactory(std::make_shared<CelebornConf>()),
+        client_(std::move(client)) {}
+
+  std::shared_ptr<TransportClient> createClient(
+      const std::string& host,
+      uint16_t port) override {
+    hosts_.push_back(host);
+    return client_;
+  }
+
+  const std::vector<std::string>& hosts() const {
+    return hosts_;
+  }
+
+ private:
+  std::shared_ptr<FailingTransportClient> client_;
+  std::vector<std::string> hosts_;
+};
+
+std::shared_ptr<PartitionLocation> makeLocationWithPeer() {
+  auto primary = std::make_shared<PartitionLocation>();
+  primary->id = 0;
+  primary->epoch = 0;
+  primary->host = "primary-host";
+  primary->pushPort = 1001;
+  primary->fetchPort = 1002;
+  primary->replicatePort = 1003;
+  primary->mode = PartitionLocation::PRIMARY;
+  primary->storageInfo = std::make_unique<StorageInfo>();
+  primary->storageInfo->type = StorageInfo::HDD;
+
+  auto replica = std::make_unique<PartitionLocation>();
+  replica->id = 0;
+  replica->epoch = 0;
+  replica->host = "replica-host";
+  replica->pushPort = 2001;
+  replica->fetchPort = 2002;
+  replica->replicatePort = 2003;
+  replica->mode = PartitionLocation::REPLICA;
+  replica->storageInfo = std::make_unique<StorageInfo>();
+  replica->storageInfo->type = StorageInfo::HDD;
+
+  primary->replicaPeer = std::move(replica);
+  return primary;
+}
+
+std::shared_ptr<PartitionLocation> makeLocationWithoutPeer() {
+  auto location = std::make_shared<PartitionLocation>();
+  location->id = 0;
+  location->epoch = 0;
+  location->host = "solo-host";
+  location->pushPort = 3001;
+  location->fetchPort = 3002;
+  location->replicatePort = 3003;
+  location->mode = PartitionLocation::PRIMARY;
+  location->storageInfo = std::make_unique<StorageInfo>();
+  location->storageInfo->type = StorageInfo::HDD;
+  return location;
+}
+
+std::shared_ptr<CelebornConf> makeTestConf(bool replicateEnabled = true) {
+  auto conf = std::make_shared<CelebornConf>();
+  conf->registerProperty(CelebornConf::kNetworkIoRetryWait, "1ms");
+  conf->registerProperty(
+      CelebornConf::kClientFetchMaxRetriesForEachReplica, "2");
+  conf->registerProperty(
+      CelebornConf::kClientPushReplicateEnabled,
+      replicateEnabled ? "true" : "false");
+  conf->registerProperty(
+      CelebornConf::kClientFetchExcludeWorkerOnFailureEnabled, "true");
+  return conf;
+}
+} // namespace
+
+// Verifies that createReaderWithRetry exhausts all retries and throws.
+TEST(CelebornInputStreamRetryTest, allRetriesExhaustedThrows) {
+  auto client = std::make_shared<FailingTransportClient>(
+      std::make_exception_ptr(std::system_error(
+          std::make_error_code(std::errc::connection_refused))));
+  auto factory = std::make_shared<TrackingTransportClientFactory>(client);
+  auto conf = makeTestConf();
+  auto excludedWorkers =
+      std::make_shared<CelebornInputStream::FetchExcludedWorkers>();
+  StubShuffleClient shuffleClient(conf, excludedWorkers);
+
+  auto location = makeLocationWithPeer();
+  std::vector<std::shared_ptr<const PartitionLocation>> locations;
+  locations.push_back(std::move(location));
+  std::vector<int> attempts = {0};
+
+  EXPECT_THROW(
+      CelebornInputStream(
+          "test-shuffle-key",
+          conf,
+          factory,
+          std::move(locations),
+          attempts,
+          0,
+          0,
+          100,
+          false,
+          excludedWorkers,
+          &shuffleClient),
+      std::exception);
+}
+
+// Verifies that on failure, the retry logic switches from primary to replica.
+TEST(CelebornInputStreamRetryTest, switchesToPeerOnFailure) {
+  auto client = std::make_shared<FailingTransportClient>(
+      std::make_exception_ptr(std::system_error(
+          std::make_error_code(std::errc::connection_refused))));
+  auto factory = std::make_shared<TrackingTransportClientFactory>(client);
+  auto conf = makeTestConf();
+  auto excludedWorkers =
+      std::make_shared<CelebornInputStream::FetchExcludedWorkers>();
+  StubShuffleClient shuffleClient(conf, excludedWorkers);
+
+  auto location = makeLocationWithPeer();
+  std::vector<std::shared_ptr<const PartitionLocation>> locations;
+  locations.push_back(std::move(location));
+  std::vector<int> attempts = {0};
+
+  try {
+    CelebornInputStream(
+        "test-shuffle-key",
+        conf,
+        factory,
+        std::move(locations),
+        attempts,
+        0,
+        0,
+        100,
+        false,
+        excludedWorkers,
+        &shuffleClient);
+  } catch (...) {
+    // Expected to throw after exhausting retries
+  }
+
+  auto& hosts = factory->hosts();
+  // First attempt on primary, then switches to replica
+  ASSERT_GE(hosts.size(), 2u);
+  EXPECT_EQ(hosts[0], "primary-host");
+  EXPECT_EQ(hosts[1], "replica-host");
+}
+
+// Verifies that critical failures cause workers to be added to the
+// exclusion list.
+TEST(CelebornInputStreamRetryTest, excludesCriticalFailures) {
+  auto client = std::make_shared<FailingTransportClient>(
+      std::make_exception_ptr(std::system_error(
+          std::make_error_code(std::errc::connection_refused))));
+  auto factory = std::make_shared<TrackingTransportClientFactory>(client);
+  auto conf = makeTestConf(true);
+  auto excludedWorkers =
+      std::make_shared<CelebornInputStream::FetchExcludedWorkers>();
+  StubShuffleClient shuffleClient(conf, excludedWorkers);
+
+  auto location = makeLocationWithPeer();
+  std::vector<std::shared_ptr<const PartitionLocation>> locations;
+  locations.push_back(std::move(location));
+  std::vector<int> attempts = {0};
+
+  try {
+    CelebornInputStream(
+        "test-shuffle-key",
+        conf,
+        factory,
+        std::move(locations),
+        attempts,
+        0,
+        0,
+        100,
+        false,
+        excludedWorkers,
+        &shuffleClient);
+  } catch (...) {
+  }
+
+  // Both primary and replica should have been excluded (critical failure)
+  EXPECT_TRUE(excludedWorkers->get("primary-host:1002").has_value());
+  EXPECT_TRUE(excludedWorkers->get("replica-host:2002").has_value());
+}
+
+// Verifies that non-critical failures do NOT cause worker exclusion,
+// matching the isCriticalCauseForFetch filtering behavior.
+TEST(CelebornInputStreamRetryTest, doesNotExcludeNonCriticalFailures) {
+  auto client = std::make_shared<FailingTransportClient>(
+      std::make_exception_ptr(std::runtime_error("LZ4 decompression failed")));
+  auto factory = std::make_shared<TrackingTransportClientFactory>(client);
+  auto conf = makeTestConf(true);
+  auto excludedWorkers =
+      std::make_shared<CelebornInputStream::FetchExcludedWorkers>();
+  StubShuffleClient shuffleClient(conf, excludedWorkers);
+
+  auto location = makeLocationWithPeer();
+  std::vector<std::shared_ptr<const PartitionLocation>> locations;
+  locations.push_back(std::move(location));
+  std::vector<int> attempts = {0};
+
+  try {
+    CelebornInputStream(
+        "test-shuffle-key",
+        conf,
+        factory,
+        std::move(locations),
+        attempts,
+        0,
+        0,
+        100,
+        false,
+        excludedWorkers,
+        &shuffleClient);
+  } catch (...) {
+  }
+
+  // Non-critical failure should NOT exclude workers
+  EXPECT_FALSE(excludedWorkers->get("primary-host:1002").has_value());
+  EXPECT_FALSE(excludedWorkers->get("replica-host:2002").has_value());
+}
+
+// Verifies that without a peer, all retries target the same location.
+TEST(CelebornInputStreamRetryTest, noPeerRetriesSameLocation) {
+  auto client = std::make_shared<FailingTransportClient>(
+      std::make_exception_ptr(std::system_error(
+          std::make_error_code(std::errc::connection_refused))));
+  auto factory = std::make_shared<TrackingTransportClientFactory>(client);
+  // Replication disabled: maxRetry = 2
+  auto conf = makeTestConf(false);
+  auto excludedWorkers =
+      std::make_shared<CelebornInputStream::FetchExcludedWorkers>();
+  StubShuffleClient shuffleClient(conf, excludedWorkers);
+
+  auto location = makeLocationWithoutPeer();
+  std::vector<std::shared_ptr<const PartitionLocation>> locations;
+  locations.push_back(std::move(location));
+  std::vector<int> attempts = {0};
+
+  try {
+    CelebornInputStream(
+        "test-shuffle-key",
+        conf,
+        factory,
+        std::move(locations),
+        attempts,
+        0,
+        0,
+        100,
+        false,
+        excludedWorkers,
+        &shuffleClient);
+  } catch (...) {
+  }
+
+  // All retries should target the same host
+  for (const auto& host : factory->hosts()) {
+    EXPECT_EQ(host, "solo-host");
+  }
+  // maxRetry = 2
+  EXPECT_EQ(factory->hosts().size(), 2u);
+}
+
+// Verifies that with replication enabled, maxRetry is doubled.
+TEST(CelebornInputStreamRetryTest, replicationDoublesMaxRetries) {
+  auto client = std::make_shared<FailingTransportClient>(
+      std::make_exception_ptr(std::system_error(
+          std::make_error_code(std::errc::connection_refused))));
+  auto factory = std::make_shared<TrackingTransportClientFactory>(client);
+  // Replication enabled: maxRetry = 2 * 2 = 4
+  auto conf = makeTestConf(true);
+  conf->registerProperty(
+      CelebornConf::kClientFetchExcludeWorkerOnFailureEnabled, "false");
+  auto excludedWorkers =
+      std::make_shared<CelebornInputStream::FetchExcludedWorkers>();
+  StubShuffleClient shuffleClient(conf, excludedWorkers);
+
+  auto location = makeLocationWithPeer();
+  std::vector<std::shared_ptr<const PartitionLocation>> locations;
+  locations.push_back(std::move(location));
+  std::vector<int> attempts = {0};
+
+  try {
+    CelebornInputStream(
+        "test-shuffle-key",
+        conf,
+        factory,
+        std::move(locations),
+        attempts,
+        0,
+        0,
+        100,
+        false,
+        excludedWorkers,
+        &shuffleClient);
+  } catch (...) {
+  }
+
+  // With maxRetriesForEachReplica=2 and replication enabled,
+  // fetchChunkMaxRetry = 2 * 2 = 4 total attempts
+  EXPECT_EQ(factory->hosts().size(), 4u);
+}
\ No newline at end of file
diff --git a/cpp/celeborn/conf/CelebornConf.cpp 
b/cpp/celeborn/conf/CelebornConf.cpp
index b6da2f702..ed00cd871 100644
--- a/cpp/celeborn/conf/CelebornConf.cpp
+++ b/cpp/celeborn/conf/CelebornConf.cpp
@@ -166,8 +166,12 @@ CelebornConf::defaultProperties() {
           BOOL_PROP(kClientPushMaxBytesSizeInFlightEnabled, false),
           NONE_PROP(kClientPushMaxBytesSizeInFlightTotal),
           NONE_PROP(kClientPushMaxBytesSizeInFlightPerWorker),
-          // NUM_PROP(kNumExample, 50'000),
-          // BOOL_PROP(kBoolExample, false),
+          NUM_PROP(kClientFetchMaxRetriesForEachReplica, 3),
+          STR_PROP(kNetworkIoRetryWait, "5s"),
+          BOOL_PROP(kClientPushReplicateEnabled, false),
+          BOOL_PROP(kClientFetchExcludeWorkerOnFailureEnabled, false),
+          STR_PROP(kClientFetchExcludedWorkerExpireTimeout, "60s"),
+          BOOL_PROP(kClientAdaptiveOptimizeSkewedPartitionReadEnabled, false),
       };
   return defaultProp;
 }
@@ -349,5 +353,35 @@ int CelebornConf::shuffleCompressionZstdCompressLevel() 
const {
   return std::stoi(
       optionalProperty(kShuffleCompressionZstdCompressLevel).value());
 }
+
+int CelebornConf::clientFetchMaxRetriesForEachReplica() const {
+  return std::stoi(
+      optionalProperty(kClientFetchMaxRetriesForEachReplica).value());
+}
+
+Timeout CelebornConf::networkIoRetryWait() const {
+  return utils::toTimeout(
+      toDuration(optionalProperty(kNetworkIoRetryWait).value()));
+}
+
+bool CelebornConf::clientPushReplicateEnabled() const {
+  return 
folly::to<bool>(optionalProperty(kClientPushReplicateEnabled).value());
+}
+
+bool CelebornConf::clientFetchExcludeWorkerOnFailureEnabled() const {
+  return folly::to<bool>(
+      optionalProperty(kClientFetchExcludeWorkerOnFailureEnabled).value());
+}
+
+Timeout CelebornConf::clientFetchExcludedWorkerExpireTimeout() const {
+  return utils::toTimeout(toDuration(
+      optionalProperty(kClientFetchExcludedWorkerExpireTimeout).value()));
+}
+
+bool CelebornConf::clientAdaptiveOptimizeSkewedPartitionReadEnabled() const {
+  return folly::to<bool>(
+      optionalProperty(kClientAdaptiveOptimizeSkewedPartitionReadEnabled)
+          .value());
+}
 } // namespace conf
 } // namespace celeborn
diff --git a/cpp/celeborn/conf/CelebornConf.h b/cpp/celeborn/conf/CelebornConf.h
index e4299a482..c94f55bdb 100644
--- a/cpp/celeborn/conf/CelebornConf.h
+++ b/cpp/celeborn/conf/CelebornConf.h
@@ -129,6 +129,25 @@ class CelebornConf : public BaseConf {
   static constexpr std::string_view kShuffleCompressionZstdCompressLevel{
       "celeborn.client.shuffle.compression.zstd.level"};
 
+  static constexpr std::string_view kClientFetchMaxRetriesForEachReplica{
+      "celeborn.client.fetch.maxRetriesForEachReplica"};
+
+  static constexpr std::string_view kNetworkIoRetryWait{
+      "celeborn.data.io.retryWait"};
+
+  static constexpr std::string_view kClientPushReplicateEnabled{
+      "celeborn.client.push.replicate.enabled"};
+
+  static constexpr std::string_view kClientFetchExcludeWorkerOnFailureEnabled{
+      "celeborn.client.fetch.excludeWorkerOnFailure.enabled"};
+
+  static constexpr std::string_view kClientFetchExcludedWorkerExpireTimeout{
+      "celeborn.client.fetch.excludedWorker.expireTimeout"};
+
+  static constexpr std::string_view
+      kClientAdaptiveOptimizeSkewedPartitionReadEnabled{
+          "celeborn.client.adaptive.optimizeSkewedPartitionRead.enabled"};
+
   CelebornConf();
 
   CelebornConf(const std::string& filename);
@@ -196,6 +215,18 @@ class CelebornConf : public BaseConf {
   protocol::CompressionCodec shuffleCompressionCodec() const;
 
   int shuffleCompressionZstdCompressLevel() const;
+
+  int clientFetchMaxRetriesForEachReplica() const;
+
+  Timeout networkIoRetryWait() const;
+
+  bool clientPushReplicateEnabled() const;
+
+  bool clientFetchExcludeWorkerOnFailureEnabled() const;
+
+  Timeout clientFetchExcludedWorkerExpireTimeout() const;
+
+  bool clientAdaptiveOptimizeSkewedPartitionReadEnabled() const;
 };
 } // namespace conf
 } // namespace celeborn
diff --git a/cpp/celeborn/conf/tests/CelebornConfTest.cpp 
b/cpp/celeborn/conf/tests/CelebornConfTest.cpp
index 79619d888..368666033 100644
--- a/cpp/celeborn/conf/tests/CelebornConfTest.cpp
+++ b/cpp/celeborn/conf/tests/CelebornConfTest.cpp
@@ -51,6 +51,12 @@ void testDefaultValues(CelebornConf* conf) {
   EXPECT_EQ(conf->clientFetchMaxReqsInFlight(), 3);
   EXPECT_EQ(conf->shuffleCompressionCodec(), CompressionCodec::NONE);
   EXPECT_EQ(conf->shuffleCompressionZstdCompressLevel(), 1);
+  EXPECT_EQ(conf->clientFetchMaxRetriesForEachReplica(), 3);
+  EXPECT_EQ(conf->networkIoRetryWait(), SECOND(5));
+  EXPECT_FALSE(conf->clientPushReplicateEnabled());
+  EXPECT_FALSE(conf->clientFetchExcludeWorkerOnFailureEnabled());
+  EXPECT_EQ(conf->clientFetchExcludedWorkerExpireTimeout(), SECOND(60));
+  EXPECT_FALSE(conf->clientAdaptiveOptimizeSkewedPartitionReadEnabled());
 }
 
 TEST(CelebornConfTest, defaultValues) {
@@ -86,6 +92,30 @@ TEST(CelebornConfTest, setValues) {
   conf->registerProperty(
       CelebornConf::kShuffleCompressionZstdCompressLevel, "5");
   EXPECT_EQ(conf->shuffleCompressionZstdCompressLevel(), 5);
+  conf->registerProperty(
+      CelebornConf::kClientFetchMaxRetriesForEachReplica, "5");
+  EXPECT_EQ(conf->clientFetchMaxRetriesForEachReplica(), 5);
+  conf->registerProperty(CelebornConf::kNetworkIoRetryWait, "10s");
+  EXPECT_EQ(conf->networkIoRetryWait(), SECOND(10));
+  conf->registerProperty(CelebornConf::kClientPushReplicateEnabled, "true");
+  EXPECT_TRUE(conf->clientPushReplicateEnabled());
+  conf->registerProperty(CelebornConf::kClientPushReplicateEnabled, "false");
+  EXPECT_FALSE(conf->clientPushReplicateEnabled());
+  conf->registerProperty(
+      CelebornConf::kClientFetchExcludeWorkerOnFailureEnabled, "true");
+  EXPECT_TRUE(conf->clientFetchExcludeWorkerOnFailureEnabled());
+  conf->registerProperty(
+      CelebornConf::kClientFetchExcludeWorkerOnFailureEnabled, "false");
+  EXPECT_FALSE(conf->clientFetchExcludeWorkerOnFailureEnabled());
+  conf->registerProperty(
+      CelebornConf::kClientFetchExcludedWorkerExpireTimeout, "30s");
+  EXPECT_EQ(conf->clientFetchExcludedWorkerExpireTimeout(), SECOND(30));
+  conf->registerProperty(
+      CelebornConf::kClientAdaptiveOptimizeSkewedPartitionReadEnabled, "true");
+  EXPECT_TRUE(conf->clientAdaptiveOptimizeSkewedPartitionReadEnabled());
+  conf->registerProperty(
+      CelebornConf::kClientAdaptiveOptimizeSkewedPartitionReadEnabled, 
"false");
+  EXPECT_FALSE(conf->clientAdaptiveOptimizeSkewedPartitionReadEnabled());
 
   EXPECT_THROW(
       conf->registerProperty("non-exist-key", "non-exist-value"),
diff --git a/cpp/celeborn/protocol/PartitionLocation.cpp 
b/cpp/celeborn/protocol/PartitionLocation.cpp
index 333a4ef13..54d7101f6 100644
--- a/cpp/celeborn/protocol/PartitionLocation.cpp
+++ b/cpp/celeborn/protocol/PartitionLocation.cpp
@@ -160,6 +160,18 @@ std::string PartitionLocation::hostAndPushPort() const {
   return fmt::format("{}:{}", host, pushPort);
 }
 
+std::string PartitionLocation::hostAndFetchPort() const {
+  return fmt::format("{}:{}", host, fetchPort);
+}
+
+bool PartitionLocation::hasPeer() const {
+  return replicaPeer != nullptr;
+}
+
+const PartitionLocation* PartitionLocation::getPeer() const {
+  return replicaPeer.get();
+}
+
 StatusCode toStatusCode(int32_t code) {
   CELEBORN_CHECK(code >= 0);
   CELEBORN_CHECK(code <= StatusCode::TAIL);
diff --git a/cpp/celeborn/protocol/PartitionLocation.h 
b/cpp/celeborn/protocol/PartitionLocation.h
index 16a535bd0..f62478e03 100644
--- a/cpp/celeborn/protocol/PartitionLocation.h
+++ b/cpp/celeborn/protocol/PartitionLocation.h
@@ -98,6 +98,12 @@ struct PartitionLocation {
 
   std::string hostAndPushPort() const;
 
+  std::string hostAndFetchPort() const;
+
+  bool hasPeer() const;
+
+  const PartitionLocation* getPeer() const;
+
  private:
   static std::unique_ptr<PartitionLocation> fromPbWithoutPeer(
       const PbPartitionLocation& pb);
diff --git a/cpp/celeborn/protocol/tests/PartitionLocationTest.cpp 
b/cpp/celeborn/protocol/tests/PartitionLocationTest.cpp
index edcb56520..e21bff8e2 100644
--- a/cpp/celeborn/protocol/tests/PartitionLocationTest.cpp
+++ b/cpp/celeborn/protocol/tests/PartitionLocationTest.cpp
@@ -193,3 +193,55 @@ TEST(PartitionLocationTest, toProtoWithPeer) {
       pbPartitionLocationReplica->mode(), PbPartitionLocation_Mode_Replica);
   verifyStorageInfoPb(&pbPartitionLocationReplica->storageinfo());
 }
+
+TEST(PartitionLocationTest, hasPeer) {
+  auto partitionLocationWithoutPeer = generateBasicPartitionLocation();
+  partitionLocationWithoutPeer->mode = PartitionLocation::PRIMARY;
+  partitionLocationWithoutPeer->storageInfo = generateStorageInfo();
+
+  EXPECT_FALSE(partitionLocationWithoutPeer->hasPeer());
+
+  auto partitionLocationWithPeer = generateBasicPartitionLocation();
+  partitionLocationWithPeer->mode = PartitionLocation::PRIMARY;
+  partitionLocationWithPeer->storageInfo = generateStorageInfo();
+
+  auto partitionLocationReplica = generateBasicPartitionLocation();
+  partitionLocationReplica->mode = PartitionLocation::REPLICA;
+  partitionLocationReplica->storageInfo = generateStorageInfo();
+
+  partitionLocationWithPeer->replicaPeer = std::move(partitionLocationReplica);
+
+  EXPECT_TRUE(partitionLocationWithPeer->hasPeer());
+}
+
+TEST(PartitionLocationTest, getPeer) {
+  auto partitionLocationPrimary = generateBasicPartitionLocation();
+  partitionLocationPrimary->mode = PartitionLocation::PRIMARY;
+  partitionLocationPrimary->storageInfo = generateStorageInfo();
+
+  auto partitionLocationReplica = generateBasicPartitionLocation();
+  partitionLocationReplica->mode = PartitionLocation::REPLICA;
+  partitionLocationReplica->storageInfo = generateStorageInfo();
+
+  partitionLocationPrimary->replicaPeer = std::move(partitionLocationReplica);
+
+  const auto* peer = partitionLocationPrimary->getPeer();
+  EXPECT_NE(peer, nullptr);
+  EXPECT_EQ(peer->mode, PartitionLocation::Mode::REPLICA);
+  verifyBasicPartitionLocation(peer);
+
+  auto partitionLocationWithoutPeer = generateBasicPartitionLocation();
+  partitionLocationWithoutPeer->mode = PartitionLocation::PRIMARY;
+  partitionLocationWithoutPeer->storageInfo = generateStorageInfo();
+
+  EXPECT_EQ(partitionLocationWithoutPeer->getPeer(), nullptr);
+}
+
+TEST(PartitionLocationTest, hostAndFetchPort) {
+  auto partitionLocation = generateBasicPartitionLocation();
+  partitionLocation->host = "test_host";
+  partitionLocation->fetchPort = 1003;
+
+  std::string expected = "test_host:1003";
+  EXPECT_EQ(partitionLocation->hostAndFetchPort(), expected);
+}
diff --git a/cpp/celeborn/utils/CelebornUtils.cpp 
b/cpp/celeborn/utils/CelebornUtils.cpp
index 675850445..7460e0a6e 100644
--- a/cpp/celeborn/utils/CelebornUtils.cpp
+++ b/cpp/celeborn/utils/CelebornUtils.cpp
@@ -19,6 +19,21 @@
 
 namespace celeborn {
 namespace utils {
+
+bool isCriticalCauseForFetch(const std::exception& e) {
+  if (dynamic_cast<const std::system_error*>(&e)) {
+    return true;
+  }
+
+  std::string msg = e.what();
+  if (msg.rfind("Connecting to", 0) == 0 || msg.rfind("Failed to", 0) == 0 ||
+      msg.find("imeout") != std::string::npos) {
+    return true;
+  }
+
+  return false;
+}
+
 std::string makeShuffleKey(const std::string& appId, const int shuffleId) {
   return appId + "-" + std::to_string(shuffleId);
 }
diff --git a/cpp/celeborn/utils/CelebornUtils.h 
b/cpp/celeborn/utils/CelebornUtils.h
index 6d39226ce..4883c81f4 100644
--- a/cpp/celeborn/utils/CelebornUtils.h
+++ b/cpp/celeborn/utils/CelebornUtils.h
@@ -22,6 +22,7 @@
 #include <charconv>
 #include <chrono>
 #include <set>
+#include <system_error>
 #include <vector>
 
 #include "celeborn/memory/ByteBuffer.h"
@@ -37,6 +38,11 @@ namespace utils {
 #define CELEBORN_SHUTDOWN_LOG(severity) \
   LOG(severity) << CELEBORN_SHUTDOWN_LOG_PREFIX
 
+/// Checks if an exception represents a critical cause for fetch failure,
+/// matching the behavior of Java's Utils.isCriticalCauseForFetch.
+/// Critical causes include: connection errors, timeouts, and IO failures.
+bool isCriticalCauseForFetch(const std::exception& e);
+
 template <typename T>
 std::vector<T> toVector(const std::set<T>& in) {
   std::vector<T> out{};
diff --git a/cpp/celeborn/utils/tests/CelebornUtilsTest.cpp 
b/cpp/celeborn/utils/tests/CelebornUtilsTest.cpp
index 8517074ec..4e9e891f4 100644
--- a/cpp/celeborn/utils/tests/CelebornUtilsTest.cpp
+++ b/cpp/celeborn/utils/tests/CelebornUtilsTest.cpp
@@ -18,6 +18,7 @@
 #include <gtest/gtest.h>
 #include <atomic>
 #include <string>
+#include <system_error>
 #include <thread>
 #include <vector>
 
@@ -36,6 +37,60 @@ class CelebornUtilsTest : public testing::Test {
   std::unique_ptr<ConcurrentHashSet<int>> set_;
 };
 
+// Tests for isCriticalCauseForFetch, matching Java's
+// Utils.isCriticalCauseForFetch
+TEST(IsCriticalCauseForFetchTest, systemErrorIsCritical) {
+  std::system_error e(
+      std::make_error_code(std::errc::connection_refused),
+      "Connection refused");
+  EXPECT_TRUE(isCriticalCauseForFetch(e));
+}
+
+TEST(IsCriticalCauseForFetchTest, connectionRefusedByMessage) {
+  std::runtime_error e("Connecting to worker1:9097 failed");
+  EXPECT_TRUE(isCriticalCauseForFetch(e));
+}
+
+TEST(IsCriticalCauseForFetchTest, failedToConnectByMessage) {
+  std::runtime_error e("Failed to open stream for shuffle 1");
+  EXPECT_TRUE(isCriticalCauseForFetch(e));
+}
+
+TEST(IsCriticalCauseForFetchTest, timeoutByMessage) {
+  std::runtime_error e("RPC request Timeout after 120s");
+  EXPECT_TRUE(isCriticalCauseForFetch(e));
+}
+
+TEST(IsCriticalCauseForFetchTest, lowercaseTimeoutByMessage) {
+  std::runtime_error e("Fetch chunk timeout for streamId 42");
+  EXPECT_TRUE(isCriticalCauseForFetch(e));
+}
+
+TEST(IsCriticalCauseForFetchTest, genericErrorIsNotCritical) {
+  std::runtime_error e("Invalid batch header checksum");
+  EXPECT_FALSE(isCriticalCauseForFetch(e));
+}
+
+TEST(IsCriticalCauseForFetchTest, decompressionErrorIsNotCritical) {
+  std::runtime_error e("LZ4 decompression failed");
+  EXPECT_FALSE(isCriticalCauseForFetch(e));
+}
+
+TEST(IsCriticalCauseForFetchTest, emptyMessageIsNotCritical) {
+  std::runtime_error e("");
+  EXPECT_FALSE(isCriticalCauseForFetch(e));
+}
+
+TEST(IsCriticalCauseForFetchTest, substringConnectingToIsNotCritical) {
+  std::runtime_error e("Error while Connecting to worker1:9097");
+  EXPECT_FALSE(isCriticalCauseForFetch(e));
+}
+
+TEST(IsCriticalCauseForFetchTest, substringFailedToIsNotCritical) {
+  std::runtime_error e("Worker unexpectedly Failed to respond");
+  EXPECT_FALSE(isCriticalCauseForFetch(e));
+}
+
 TEST_F(CelebornUtilsTest, mapBasicInsertAndRetrieve) {
   map_->set("apple", 10);
   auto result = map_->get("apple");


Reply via email to