This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 2460efd95 [CELEBORN-2222][CIP-14] Support Retrying when createReader
failed for CelebornInputStream in CppClient
2460efd95 is described below
commit 2460efd95e59dfa58a53f334fe2d25c3073cc013
Author: afterincomparableyum
<[email protected]>
AuthorDate: Thu Feb 26 11:18:48 2026 +0800
[CELEBORN-2222][CIP-14] Support Retrying when createReader failed for
CelebornInputStream in CppClient
This PR implements retry support for createReader failures in the C++
client, matching the behavior of the Java implementation. The implementation
includes:
- Added configuration properties:
* clientFetchMaxRetriesForEachReplica (default: 3)
* dataIoRetryWait (default: 5s)
* clientPushReplicateEnabled (default: false)
* excludeWorkerOnFailure (default: false)
* excludedWorker.expireTimeout (default: 60s)
* optimizeSkewedPartitionRead (default: false)
- Added peer location support methods to PartitionLocation:
* hasPeer() - Check if location has a peer replica
* getPeer() - Get the peer location
* hostAndFetchPort() - Get host:port string for logging
- Implemented retry logic in createReaderWithRetry():
* Retries up to fetchChunkMaxRetry_ times (doubled if replication
enabled)[which is why I added this parameter in this PR]
* Switches to peer location on failure when available
* Sleeps between retries when both replicas tried or no peer exists
* Resets retry counter when moving to new location or on success
- Added unit tests for new functionality
### How was this patch tested?
Unit tests and compiling
Closes #3583 from afterincomparableyum/cpp-client/celeborn-2222.
Authored-by: afterincomparableyum
<[email protected]>
Signed-off-by: SteNicholas <[email protected]>
---
cpp/celeborn/client/ShuffleClient.cpp | 22 +-
cpp/celeborn/client/ShuffleClient.h | 14 +
cpp/celeborn/client/reader/CelebornInputStream.cpp | 112 +++++-
cpp/celeborn/client/reader/CelebornInputStream.h | 19 +-
cpp/celeborn/client/tests/CMakeLists.txt | 1 +
.../client/tests/CelebornInputStreamRetryTest.cpp | 411 +++++++++++++++++++++
cpp/celeborn/conf/CelebornConf.cpp | 38 +-
cpp/celeborn/conf/CelebornConf.h | 31 ++
cpp/celeborn/conf/tests/CelebornConfTest.cpp | 30 ++
cpp/celeborn/protocol/PartitionLocation.cpp | 12 +
cpp/celeborn/protocol/PartitionLocation.h | 6 +
.../protocol/tests/PartitionLocationTest.cpp | 52 +++
cpp/celeborn/utils/CelebornUtils.cpp | 15 +
cpp/celeborn/utils/CelebornUtils.h | 6 +
cpp/celeborn/utils/tests/CelebornUtilsTest.cpp | 55 +++
15 files changed, 814 insertions(+), 10 deletions(-)
diff --git a/cpp/celeborn/client/ShuffleClient.cpp
b/cpp/celeborn/client/ShuffleClient.cpp
index 35887135e..5f1e82dce 100644
--- a/cpp/celeborn/client/ShuffleClient.cpp
+++ b/cpp/celeborn/client/ShuffleClient.cpp
@@ -67,7 +67,11 @@ ShuffleClientImpl::ShuffleClientImpl(
[conf]() {
return compress::Compressor::createCompressor(*conf);
})
- : std::function<std::unique_ptr<compress::Compressor>()>()) {
+ : std::function<std::unique_ptr<compress::Compressor>()>()),
+ pushReplicateEnabled_(conf->clientPushReplicateEnabled()),
+ fetchExcludeWorkerOnFailureEnabled_(
+ conf->clientFetchExcludeWorkerOnFailureEnabled()),
+ fetchExcludedWorkers_(std::make_shared<FetchExcludedWorkers>()) {
CELEBORN_CHECK_NOT_NULL(clientFactory_);
CELEBORN_CHECK_NOT_NULL(pushDataRetryPool_);
}
@@ -348,7 +352,21 @@ std::unique_ptr<CelebornInputStream>
ShuffleClientImpl::readPartition(
attemptNumber,
startMapIndex,
endMapIndex,
- needCompression);
+ needCompression,
+ fetchExcludedWorkers_,
+ this);
+}
+
+void ShuffleClientImpl::excludeFailedFetchLocation(
+ const std::string& hostAndFetchPort,
+ const std::exception& e) {
+ if (pushReplicateEnabled_ && fetchExcludeWorkerOnFailureEnabled_ &&
+ utils::isCriticalCauseForFetch(e)) {
+ auto now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ fetchExcludedWorkers_->set(hostAndFetchPort, now);
+ }
}
void ShuffleClientImpl::updateReducerFileGroup(int shuffleId) {
diff --git a/cpp/celeborn/client/ShuffleClient.h
b/cpp/celeborn/client/ShuffleClient.h
index e899ba0cd..0a7946d12 100644
--- a/cpp/celeborn/client/ShuffleClient.h
+++ b/cpp/celeborn/client/ShuffleClient.h
@@ -55,6 +55,8 @@ class ShuffleClient {
virtual void updateReducerFileGroup(int shuffleId) = 0;
+ using FetchExcludedWorkers = utils::ConcurrentHashMap<std::string, int64_t>;
+
virtual std::unique_ptr<CelebornInputStream> readPartition(
int shuffleId,
int partitionId,
@@ -70,6 +72,10 @@ class ShuffleClient {
int endMapIndex,
bool needCompression) = 0;
+ virtual void excludeFailedFetchLocation(
+ const std::string& hostAndFetchPort,
+ const std::exception& e) = 0;
+
virtual bool cleanupShuffle(int shuffleId) = 0;
virtual void shutdown() = 0;
@@ -163,6 +169,10 @@ class ShuffleClientImpl
int endMapIndex,
bool needCompression) override;
+ void excludeFailedFetchLocation(
+ const std::string& hostAndFetchPort,
+ const std::exception& e) override;
+
void updateReducerFileGroup(int shuffleId) override;
bool cleanupShuffle(int shuffleId) override;
@@ -272,6 +282,10 @@ class ShuffleClientImpl
// Factory for creating compressor instances on demand to avoid sharing a
// single non-thread-safe compressor across concurrent operations.
std::function<std::unique_ptr<compress::Compressor>()> compressorFactory_;
+ bool pushReplicateEnabled_;
+ bool fetchExcludeWorkerOnFailureEnabled_;
+ std::shared_ptr<FetchExcludedWorkers> fetchExcludedWorkers_;
+
// TODO: pushExcludedWorker is not supported yet
};
} // namespace client
diff --git a/cpp/celeborn/client/reader/CelebornInputStream.cpp
b/cpp/celeborn/client/reader/CelebornInputStream.cpp
index d75b705e5..f81152321 100644
--- a/cpp/celeborn/client/reader/CelebornInputStream.cpp
+++ b/cpp/celeborn/client/reader/CelebornInputStream.cpp
@@ -17,7 +17,11 @@
#include "celeborn/client/reader/CelebornInputStream.h"
#include <lz4.h>
+#include <thread>
+#include "CelebornInputStream.h"
+#include "celeborn/client/ShuffleClient.h"
#include "celeborn/client/compress/Decompressor.h"
+#include "celeborn/utils/CelebornUtils.h"
namespace celeborn {
namespace client {
@@ -30,7 +34,9 @@ CelebornInputStream::CelebornInputStream(
int attemptNumber,
int startMapIndex,
int endMapIndex,
- bool needCompression)
+ bool needCompression,
+ const std::shared_ptr<FetchExcludedWorkers>& fetchExcludedWorkers,
+ ShuffleClient* shuffleClient)
: shuffleKey_(shuffleKey),
conf_(conf),
clientFactory_(clientFactory),
@@ -45,7 +51,22 @@ CelebornInputStream::CelebornInputStream(
shouldDecompress_(
conf_->shuffleCompressionCodec() !=
protocol::CompressionCodec::NONE &&
- needCompression) {
+ needCompression),
+ fetchChunkRetryCnt_(0),
+ fetchChunkMaxRetry_(
+ conf_->clientPushReplicateEnabled()
+ ? conf_->clientFetchMaxRetriesForEachReplica() * 2
+ : conf_->clientFetchMaxRetriesForEachReplica()),
+ retryWait_(conf_->networkIoRetryWait()),
+ fetchExcludedWorkers_(fetchExcludedWorkers),
+ fetchExcludedWorkerExpireTimeoutMs_(
+ std::chrono::duration_cast<std::chrono::milliseconds>(
+ conf_->clientFetchExcludedWorkerExpireTimeout())
+ .count()),
+ readSkewPartitionWithoutMapRange_(
+ conf_->clientAdaptiveOptimizeSkewedPartitionReadEnabled() &&
+ startMapIndex > endMapIndex),
+ shuffleClient_(shuffleClient) {
if (shouldDecompress_) {
decompressor_ = compress::Decompressor::createDecompressor(
conf_->shuffleCompressionCodec());
@@ -178,6 +199,7 @@ void CelebornInputStream::moveToNextReader() {
if (!location) {
return;
}
+ fetchChunkRetryCnt_ = 0;
currReader_ = createReaderWithRetry(*location);
currLocationIndex_++;
if (currReader_->hasNext()) {
@@ -189,9 +211,62 @@ void CelebornInputStream::moveToNextReader() {
std::shared_ptr<PartitionReader> CelebornInputStream::createReaderWithRetry(
const protocol::PartitionLocation& location) {
- // TODO: support retrying when createReader failed. Maybe switch to peer
- // location?
- return createReader(location);
+ const protocol::PartitionLocation* currentLocation = &location;
+ std::exception_ptr lastException;
+
+ while (fetchChunkRetryCnt_ < fetchChunkMaxRetry_) {
+ // TODO (Investigate): When a location is already in the exclusion list,
the
+ // code throws and then retries/sleeps (especially in the no-peer branch).
+ // Since isExcluded(*currentLocation) will keep returning true until the
+ // exclusion expires, these retries are guaranteed to fail and just add
+ // delay. Consider failing fast (no sleep/retry) or skipping to another
+ // available location/peer when the current location is excluded.
+ try {
+ VLOG(1) << "Create reader for location " << currentLocation->host << ":"
+ << currentLocation->fetchPort;
+ if (isExcluded(*currentLocation)) {
+ CELEBORN_FAIL(
+ "Fetch data from excluded worker! {}",
+ currentLocation->hostAndFetchPort());
+ }
+ auto reader = createReader(*currentLocation);
+ return reader;
+ } catch (const std::exception& e) {
+ lastException = std::current_exception();
+ shuffleClient_->excludeFailedFetchLocation(
+ currentLocation->hostAndFetchPort(), e);
+ fetchChunkRetryCnt_++;
+
+ if (currentLocation->hasPeer() && !readSkewPartitionWithoutMapRange_) {
+ if (fetchChunkRetryCnt_ % 2 == 0) {
+ std::this_thread::sleep_for(retryWait_);
+ }
+ LOG(WARNING) << "CreatePartitionReader failed " << fetchChunkRetryCnt_
+ << "/" << fetchChunkMaxRetry_ << " times for location "
+ << currentLocation->hostAndFetchPort()
+ << ", change to peer. Error: " << e.what();
+ // TODO: When stream handlers are supported, send BUFFER_STREAM_END
+ // to close the active stream before switching to peer, matching the
+ // Java CelebornInputStream behavior. Currently, the C++ client does
+ // not have pre-opened stream handlers, so stream cleanup is handled
+ // by WorkerPartitionReader's destructor.
+ currentLocation = currentLocation->getPeer();
+ } else {
+ LOG(WARNING) << "CreatePartitionReader failed " << fetchChunkRetryCnt_
+ << "/" << fetchChunkMaxRetry_ << " times for location "
+ << currentLocation->hostAndFetchPort()
+ << ", retry the same location. Error: " << e.what();
+ std::this_thread::sleep_for(retryWait_);
+ }
+ }
+ }
+
+ // Max retries exceeded, rethrow the last exception wrapped with context
+ std::string errorMessage = "createPartitionReader failed after " +
+ std::to_string(fetchChunkRetryCnt_) +
+ " retries. Original location: " + location.hostAndFetchPort() +
+ ", last attempted location: " + currentLocation->hostAndFetchPort();
+ throw utils::CelebornRuntimeError(lastException, errorMessage, false);
}
std::shared_ptr<PartitionReader> CelebornInputStream::createReader(
@@ -236,6 +311,33 @@ std::unordered_set<int>&
CelebornInputStream::getBatchRecord(int mapId) {
return *batchRecords_[mapId];
}
+bool CelebornInputStream::isExcluded(
+ const protocol::PartitionLocation& location) {
+ auto now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ auto timestamp = fetchExcludedWorkers_->get(location.hostAndFetchPort());
+ if (!timestamp.has_value()) {
+ return false;
+ }
+ if (now - timestamp.value() > fetchExcludedWorkerExpireTimeoutMs_) {
+ fetchExcludedWorkers_->erase(location.hostAndFetchPort());
+ return false;
+ }
+ if (location.hasPeer()) {
+ auto peerTimestamp =
+ fetchExcludedWorkers_->get(location.getPeer()->hostAndFetchPort());
+ // To avoid both replicate locations being excluded, if peer was added to
+ // excluded list earlier, change to try peer.
+ if (!peerTimestamp.has_value() ||
+ peerTimestamp.value() < timestamp.value()) {
+ return true;
+ }
+ return false;
+ }
+ return true;
+}
+
void CelebornInputStream::cleanupReader() {
currReader_ = nullptr;
}
diff --git a/cpp/celeborn/client/reader/CelebornInputStream.h
b/cpp/celeborn/client/reader/CelebornInputStream.h
index 5dd9c4f76..735690be0 100644
--- a/cpp/celeborn/client/reader/CelebornInputStream.h
+++ b/cpp/celeborn/client/reader/CelebornInputStream.h
@@ -20,11 +20,16 @@
#include "celeborn/client/compress/Decompressor.h"
#include "celeborn/client/reader/WorkerPartitionReader.h"
#include "celeborn/conf/CelebornConf.h"
+#include "celeborn/utils/CelebornUtils.h"
namespace celeborn {
namespace client {
+class ShuffleClient;
+
class CelebornInputStream {
public:
+ using FetchExcludedWorkers = utils::ConcurrentHashMap<std::string, int64_t>;
+
CelebornInputStream(
const std::string& shuffleKey,
const std::shared_ptr<const conf::CelebornConf>& conf,
@@ -35,7 +40,9 @@ class CelebornInputStream {
int attemptNumber,
int startMapIndex,
int endMapIndex,
- bool needCompression);
+ bool needCompression,
+ const std::shared_ptr<FetchExcludedWorkers>& fetchExcludedWorkers,
+ ShuffleClient* shuffleClient);
int read(uint8_t* buffer, size_t offset, size_t len);
@@ -56,6 +63,8 @@ class CelebornInputStream {
std::shared_ptr<PartitionReader> createReader(
const protocol::PartitionLocation& location);
+ bool isExcluded(const protocol::PartitionLocation& location);
+
std::shared_ptr<const protocol::PartitionLocation> nextReadableLocation();
std::unordered_set<int>& getBatchRecord(int mapId);
@@ -81,6 +90,14 @@ class CelebornInputStream {
size_t currBatchSize_;
std::shared_ptr<PartitionReader> currReader_;
std::vector<std::unique_ptr<std::unordered_set<int>>> batchRecords_;
+
+ int fetchChunkRetryCnt_;
+ int fetchChunkMaxRetry_;
+ utils::Timeout retryWait_;
+ std::shared_ptr<FetchExcludedWorkers> fetchExcludedWorkers_;
+ int64_t fetchExcludedWorkerExpireTimeoutMs_;
+ bool readSkewPartitionWithoutMapRange_;
+ ShuffleClient* shuffleClient_;
};
} // namespace client
} // namespace celeborn
diff --git a/cpp/celeborn/client/tests/CMakeLists.txt
b/cpp/celeborn/client/tests/CMakeLists.txt
index 63c37c6e1..1402982e0 100644
--- a/cpp/celeborn/client/tests/CMakeLists.txt
+++ b/cpp/celeborn/client/tests/CMakeLists.txt
@@ -17,6 +17,7 @@ add_executable(
celeborn_client_test
WorkerPartitionReaderTest.cpp
PushDataCallbackTest.cpp
+ CelebornInputStreamRetryTest.cpp
PushStateTest.cpp
ReviveManagerTest.cpp
Lz4DecompressorTest.cpp
diff --git a/cpp/celeborn/client/tests/CelebornInputStreamRetryTest.cpp
b/cpp/celeborn/client/tests/CelebornInputStreamRetryTest.cpp
new file mode 100644
index 000000000..564860459
--- /dev/null
+++ b/cpp/celeborn/client/tests/CelebornInputStreamRetryTest.cpp
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gtest/gtest.h>
+#include <system_error>
+
+#include "celeborn/client/ShuffleClient.h"
+#include "celeborn/client/reader/CelebornInputStream.h"
+#include "celeborn/conf/CelebornConf.h"
+
+using namespace celeborn;
+using namespace celeborn::client;
+using namespace celeborn::network;
+using namespace celeborn::protocol;
+using namespace celeborn::conf;
+
+namespace {
+using MS = std::chrono::milliseconds;
+
+class StubShuffleClient : public ShuffleClient {
+ public:
+ explicit StubShuffleClient(
+ std::shared_ptr<CelebornConf> conf,
+ std::shared_ptr<ShuffleClient::FetchExcludedWorkers> excludedWorkers)
+ : conf_(std::move(conf)), excludedWorkers_(std::move(excludedWorkers)) {}
+
+ void setupLifecycleManagerRef(std::string&, int) override {}
+ void setupLifecycleManagerRef(
+ std::shared_ptr<network::NettyRpcEndpointRef>&) override {}
+ int pushData(int, int, int, int, const uint8_t*, size_t, size_t, int, int)
+ override {
+ return 0;
+ }
+ void mapperEnd(int, int, int, int) override {}
+ void cleanup(int, int, int) override {}
+ void updateReducerFileGroup(int) override {}
+ std::unique_ptr<CelebornInputStream> readPartition(int, int, int, int, int)
+ override {
+ return nullptr;
+ }
+ std::unique_ptr<CelebornInputStream>
+ readPartition(int, int, int, int, int, bool) override {
+ return nullptr;
+ }
+ bool cleanupShuffle(int) override {
+ return true;
+ }
+ void shutdown() override {}
+
+ void excludeFailedFetchLocation(
+ const std::string& hostAndFetchPort,
+ const std::exception& e) override {
+ if (conf_->clientPushReplicateEnabled() &&
+ conf_->clientFetchExcludeWorkerOnFailureEnabled() &&
+ utils::isCriticalCauseForFetch(e)) {
+ auto now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ excludedWorkers_->set(hostAndFetchPort, now);
+ }
+ }
+
+ private:
+ std::shared_ptr<CelebornConf> conf_;
+ std::shared_ptr<ShuffleClient::FetchExcludedWorkers> excludedWorkers_;
+};
+
+// A TransportClient that always throws a configurable exception
+// from sendRpcRequestSync, allowing us to exercise retry logic.
+class FailingTransportClient : public TransportClient {
+ public:
+ explicit FailingTransportClient(std::exception_ptr exceptionToThrow)
+ : TransportClient(nullptr, nullptr, MS(100)),
+ exceptionToThrow_(std::move(exceptionToThrow)) {}
+
+ RpcResponse sendRpcRequestSync(const RpcRequest& request, Timeout timeout)
+ override {
+ std::rethrow_exception(exceptionToThrow_);
+ }
+
+ void sendRpcRequestWithoutResponse(const RpcRequest& request) override {}
+
+ void fetchChunkAsync(
+ const StreamChunkSlice& streamChunkSlice,
+ const RpcRequest& request,
+ FetchChunkSuccessCallback onSuccess,
+ FetchChunkFailureCallback onFailure) override {}
+
+ private:
+ std::exception_ptr exceptionToThrow_;
+};
+
+class TrackingTransportClientFactory : public TransportClientFactory {
+ public:
+ explicit TrackingTransportClientFactory(
+ std::shared_ptr<FailingTransportClient> client)
+ : TransportClientFactory(std::make_shared<CelebornConf>()),
+ client_(std::move(client)) {}
+
+ std::shared_ptr<TransportClient> createClient(
+ const std::string& host,
+ uint16_t port) override {
+ hosts_.push_back(host);
+ return client_;
+ }
+
+ const std::vector<std::string>& hosts() const {
+ return hosts_;
+ }
+
+ private:
+ std::shared_ptr<FailingTransportClient> client_;
+ std::vector<std::string> hosts_;
+};
+
+std::shared_ptr<PartitionLocation> makeLocationWithPeer() {
+ auto primary = std::make_shared<PartitionLocation>();
+ primary->id = 0;
+ primary->epoch = 0;
+ primary->host = "primary-host";
+ primary->pushPort = 1001;
+ primary->fetchPort = 1002;
+ primary->replicatePort = 1003;
+ primary->mode = PartitionLocation::PRIMARY;
+ primary->storageInfo = std::make_unique<StorageInfo>();
+ primary->storageInfo->type = StorageInfo::HDD;
+
+ auto replica = std::make_unique<PartitionLocation>();
+ replica->id = 0;
+ replica->epoch = 0;
+ replica->host = "replica-host";
+ replica->pushPort = 2001;
+ replica->fetchPort = 2002;
+ replica->replicatePort = 2003;
+ replica->mode = PartitionLocation::REPLICA;
+ replica->storageInfo = std::make_unique<StorageInfo>();
+ replica->storageInfo->type = StorageInfo::HDD;
+
+ primary->replicaPeer = std::move(replica);
+ return primary;
+}
+
+std::shared_ptr<PartitionLocation> makeLocationWithoutPeer() {
+ auto location = std::make_shared<PartitionLocation>();
+ location->id = 0;
+ location->epoch = 0;
+ location->host = "solo-host";
+ location->pushPort = 3001;
+ location->fetchPort = 3002;
+ location->replicatePort = 3003;
+ location->mode = PartitionLocation::PRIMARY;
+ location->storageInfo = std::make_unique<StorageInfo>();
+ location->storageInfo->type = StorageInfo::HDD;
+ return location;
+}
+
+std::shared_ptr<CelebornConf> makeTestConf(bool replicateEnabled = true) {
+ auto conf = std::make_shared<CelebornConf>();
+ conf->registerProperty(CelebornConf::kNetworkIoRetryWait, "1ms");
+ conf->registerProperty(
+ CelebornConf::kClientFetchMaxRetriesForEachReplica, "2");
+ conf->registerProperty(
+ CelebornConf::kClientPushReplicateEnabled,
+ replicateEnabled ? "true" : "false");
+ conf->registerProperty(
+ CelebornConf::kClientFetchExcludeWorkerOnFailureEnabled, "true");
+ return conf;
+}
+} // namespace
+
+// Verifies that createReaderWithRetry exhausts all retries and throws.
+TEST(CelebornInputStreamRetryTest, allRetriesExhaustedThrows) {
+ auto client = std::make_shared<FailingTransportClient>(
+ std::make_exception_ptr(std::system_error(
+ std::make_error_code(std::errc::connection_refused))));
+ auto factory = std::make_shared<TrackingTransportClientFactory>(client);
+ auto conf = makeTestConf();
+ auto excludedWorkers =
+ std::make_shared<CelebornInputStream::FetchExcludedWorkers>();
+ StubShuffleClient shuffleClient(conf, excludedWorkers);
+
+ auto location = makeLocationWithPeer();
+ std::vector<std::shared_ptr<const PartitionLocation>> locations;
+ locations.push_back(std::move(location));
+ std::vector<int> attempts = {0};
+
+ EXPECT_THROW(
+ CelebornInputStream(
+ "test-shuffle-key",
+ conf,
+ factory,
+ std::move(locations),
+ attempts,
+ 0,
+ 0,
+ 100,
+ false,
+ excludedWorkers,
+ &shuffleClient),
+ std::exception);
+}
+
+// Verifies that on failure, the retry logic switches from primary to replica.
+TEST(CelebornInputStreamRetryTest, switchesToPeerOnFailure) {
+ auto client = std::make_shared<FailingTransportClient>(
+ std::make_exception_ptr(std::system_error(
+ std::make_error_code(std::errc::connection_refused))));
+ auto factory = std::make_shared<TrackingTransportClientFactory>(client);
+ auto conf = makeTestConf();
+ auto excludedWorkers =
+ std::make_shared<CelebornInputStream::FetchExcludedWorkers>();
+ StubShuffleClient shuffleClient(conf, excludedWorkers);
+
+ auto location = makeLocationWithPeer();
+ std::vector<std::shared_ptr<const PartitionLocation>> locations;
+ locations.push_back(std::move(location));
+ std::vector<int> attempts = {0};
+
+ try {
+ CelebornInputStream(
+ "test-shuffle-key",
+ conf,
+ factory,
+ std::move(locations),
+ attempts,
+ 0,
+ 0,
+ 100,
+ false,
+ excludedWorkers,
+ &shuffleClient);
+ } catch (...) {
+ // Expected to throw after exhausting retries
+ }
+
+ auto& hosts = factory->hosts();
+ // First attempt on primary, then switches to replica
+ ASSERT_GE(hosts.size(), 2u);
+ EXPECT_EQ(hosts[0], "primary-host");
+ EXPECT_EQ(hosts[1], "replica-host");
+}
+
+// Verifies that critical failures cause workers to be added to the
+// exclusion list.
+TEST(CelebornInputStreamRetryTest, excludesCriticalFailures) {
+ auto client = std::make_shared<FailingTransportClient>(
+ std::make_exception_ptr(std::system_error(
+ std::make_error_code(std::errc::connection_refused))));
+ auto factory = std::make_shared<TrackingTransportClientFactory>(client);
+ auto conf = makeTestConf(true);
+ auto excludedWorkers =
+ std::make_shared<CelebornInputStream::FetchExcludedWorkers>();
+ StubShuffleClient shuffleClient(conf, excludedWorkers);
+
+ auto location = makeLocationWithPeer();
+ std::vector<std::shared_ptr<const PartitionLocation>> locations;
+ locations.push_back(std::move(location));
+ std::vector<int> attempts = {0};
+
+ try {
+ CelebornInputStream(
+ "test-shuffle-key",
+ conf,
+ factory,
+ std::move(locations),
+ attempts,
+ 0,
+ 0,
+ 100,
+ false,
+ excludedWorkers,
+ &shuffleClient);
+ } catch (...) {
+ }
+
+ // Both primary and replica should have been excluded (critical failure)
+ EXPECT_TRUE(excludedWorkers->get("primary-host:1002").has_value());
+ EXPECT_TRUE(excludedWorkers->get("replica-host:2002").has_value());
+}
+
+// Verifies that non-critical failures do NOT cause worker exclusion,
+// matching the isCriticalCauseForFetch filtering behavior.
+TEST(CelebornInputStreamRetryTest, doesNotExcludeNonCriticalFailures) {
+ auto client = std::make_shared<FailingTransportClient>(
+ std::make_exception_ptr(std::runtime_error("LZ4 decompression failed")));
+ auto factory = std::make_shared<TrackingTransportClientFactory>(client);
+ auto conf = makeTestConf(true);
+ auto excludedWorkers =
+ std::make_shared<CelebornInputStream::FetchExcludedWorkers>();
+ StubShuffleClient shuffleClient(conf, excludedWorkers);
+
+ auto location = makeLocationWithPeer();
+ std::vector<std::shared_ptr<const PartitionLocation>> locations;
+ locations.push_back(std::move(location));
+ std::vector<int> attempts = {0};
+
+ try {
+ CelebornInputStream(
+ "test-shuffle-key",
+ conf,
+ factory,
+ std::move(locations),
+ attempts,
+ 0,
+ 0,
+ 100,
+ false,
+ excludedWorkers,
+ &shuffleClient);
+ } catch (...) {
+ }
+
+ // Non-critical failure should NOT exclude workers
+ EXPECT_FALSE(excludedWorkers->get("primary-host:1002").has_value());
+ EXPECT_FALSE(excludedWorkers->get("replica-host:2002").has_value());
+}
+
+// Verifies that without a peer, all retries target the same location.
+TEST(CelebornInputStreamRetryTest, noPeerRetriesSameLocation) {
+ auto client = std::make_shared<FailingTransportClient>(
+ std::make_exception_ptr(std::system_error(
+ std::make_error_code(std::errc::connection_refused))));
+ auto factory = std::make_shared<TrackingTransportClientFactory>(client);
+ // Replication disabled: maxRetry = 2
+ auto conf = makeTestConf(false);
+ auto excludedWorkers =
+ std::make_shared<CelebornInputStream::FetchExcludedWorkers>();
+ StubShuffleClient shuffleClient(conf, excludedWorkers);
+
+ auto location = makeLocationWithoutPeer();
+ std::vector<std::shared_ptr<const PartitionLocation>> locations;
+ locations.push_back(std::move(location));
+ std::vector<int> attempts = {0};
+
+ try {
+ CelebornInputStream(
+ "test-shuffle-key",
+ conf,
+ factory,
+ std::move(locations),
+ attempts,
+ 0,
+ 0,
+ 100,
+ false,
+ excludedWorkers,
+ &shuffleClient);
+ } catch (...) {
+ }
+
+ // All retries should target the same host
+ for (const auto& host : factory->hosts()) {
+ EXPECT_EQ(host, "solo-host");
+ }
+ // maxRetry = 2
+ EXPECT_EQ(factory->hosts().size(), 2u);
+}
+
+// Verifies that with replication enabled, maxRetry is doubled.
+TEST(CelebornInputStreamRetryTest, replicationDoublesMaxRetries) {
+ auto client = std::make_shared<FailingTransportClient>(
+ std::make_exception_ptr(std::system_error(
+ std::make_error_code(std::errc::connection_refused))));
+ auto factory = std::make_shared<TrackingTransportClientFactory>(client);
+ // Replication enabled: maxRetry = 2 * 2 = 4
+ auto conf = makeTestConf(true);
+ conf->registerProperty(
+ CelebornConf::kClientFetchExcludeWorkerOnFailureEnabled, "false");
+ auto excludedWorkers =
+ std::make_shared<CelebornInputStream::FetchExcludedWorkers>();
+ StubShuffleClient shuffleClient(conf, excludedWorkers);
+
+ auto location = makeLocationWithPeer();
+ std::vector<std::shared_ptr<const PartitionLocation>> locations;
+ locations.push_back(std::move(location));
+ std::vector<int> attempts = {0};
+
+ try {
+ CelebornInputStream(
+ "test-shuffle-key",
+ conf,
+ factory,
+ std::move(locations),
+ attempts,
+ 0,
+ 0,
+ 100,
+ false,
+ excludedWorkers,
+ &shuffleClient);
+ } catch (...) {
+ }
+
+ // With maxRetriesForEachReplica=2 and replication enabled,
+ // fetchChunkMaxRetry = 2 * 2 = 4 total attempts
+ EXPECT_EQ(factory->hosts().size(), 4u);
+}
\ No newline at end of file
diff --git a/cpp/celeborn/conf/CelebornConf.cpp
b/cpp/celeborn/conf/CelebornConf.cpp
index b6da2f702..ed00cd871 100644
--- a/cpp/celeborn/conf/CelebornConf.cpp
+++ b/cpp/celeborn/conf/CelebornConf.cpp
@@ -166,8 +166,12 @@ CelebornConf::defaultProperties() {
BOOL_PROP(kClientPushMaxBytesSizeInFlightEnabled, false),
NONE_PROP(kClientPushMaxBytesSizeInFlightTotal),
NONE_PROP(kClientPushMaxBytesSizeInFlightPerWorker),
- // NUM_PROP(kNumExample, 50'000),
- // BOOL_PROP(kBoolExample, false),
+ NUM_PROP(kClientFetchMaxRetriesForEachReplica, 3),
+ STR_PROP(kNetworkIoRetryWait, "5s"),
+ BOOL_PROP(kClientPushReplicateEnabled, false),
+ BOOL_PROP(kClientFetchExcludeWorkerOnFailureEnabled, false),
+ STR_PROP(kClientFetchExcludedWorkerExpireTimeout, "60s"),
+ BOOL_PROP(kClientAdaptiveOptimizeSkewedPartitionReadEnabled, false),
};
return defaultProp;
}
@@ -349,5 +353,35 @@ int CelebornConf::shuffleCompressionZstdCompressLevel()
const {
return std::stoi(
optionalProperty(kShuffleCompressionZstdCompressLevel).value());
}
+
+int CelebornConf::clientFetchMaxRetriesForEachReplica() const {
+ return std::stoi(
+ optionalProperty(kClientFetchMaxRetriesForEachReplica).value());
+}
+
+Timeout CelebornConf::networkIoRetryWait() const {
+ return utils::toTimeout(
+ toDuration(optionalProperty(kNetworkIoRetryWait).value()));
+}
+
+bool CelebornConf::clientPushReplicateEnabled() const {
+ return
folly::to<bool>(optionalProperty(kClientPushReplicateEnabled).value());
+}
+
+bool CelebornConf::clientFetchExcludeWorkerOnFailureEnabled() const {
+ return folly::to<bool>(
+ optionalProperty(kClientFetchExcludeWorkerOnFailureEnabled).value());
+}
+
+Timeout CelebornConf::clientFetchExcludedWorkerExpireTimeout() const {
+ return utils::toTimeout(toDuration(
+ optionalProperty(kClientFetchExcludedWorkerExpireTimeout).value()));
+}
+
+bool CelebornConf::clientAdaptiveOptimizeSkewedPartitionReadEnabled() const {
+ return folly::to<bool>(
+ optionalProperty(kClientAdaptiveOptimizeSkewedPartitionReadEnabled)
+ .value());
+}
} // namespace conf
} // namespace celeborn
diff --git a/cpp/celeborn/conf/CelebornConf.h b/cpp/celeborn/conf/CelebornConf.h
index e4299a482..c94f55bdb 100644
--- a/cpp/celeborn/conf/CelebornConf.h
+++ b/cpp/celeborn/conf/CelebornConf.h
@@ -129,6 +129,25 @@ class CelebornConf : public BaseConf {
static constexpr std::string_view kShuffleCompressionZstdCompressLevel{
"celeborn.client.shuffle.compression.zstd.level"};
+ static constexpr std::string_view kClientFetchMaxRetriesForEachReplica{
+ "celeborn.client.fetch.maxRetriesForEachReplica"};
+
+ static constexpr std::string_view kNetworkIoRetryWait{
+ "celeborn.data.io.retryWait"};
+
+ static constexpr std::string_view kClientPushReplicateEnabled{
+ "celeborn.client.push.replicate.enabled"};
+
+ static constexpr std::string_view kClientFetchExcludeWorkerOnFailureEnabled{
+ "celeborn.client.fetch.excludeWorkerOnFailure.enabled"};
+
+ static constexpr std::string_view kClientFetchExcludedWorkerExpireTimeout{
+ "celeborn.client.fetch.excludedWorker.expireTimeout"};
+
+ static constexpr std::string_view
+ kClientAdaptiveOptimizeSkewedPartitionReadEnabled{
+ "celeborn.client.adaptive.optimizeSkewedPartitionRead.enabled"};
+
CelebornConf();
CelebornConf(const std::string& filename);
@@ -196,6 +215,18 @@ class CelebornConf : public BaseConf {
protocol::CompressionCodec shuffleCompressionCodec() const;
int shuffleCompressionZstdCompressLevel() const;
+
+ int clientFetchMaxRetriesForEachReplica() const;
+
+ Timeout networkIoRetryWait() const;
+
+ bool clientPushReplicateEnabled() const;
+
+ bool clientFetchExcludeWorkerOnFailureEnabled() const;
+
+ Timeout clientFetchExcludedWorkerExpireTimeout() const;
+
+ bool clientAdaptiveOptimizeSkewedPartitionReadEnabled() const;
};
} // namespace conf
} // namespace celeborn
diff --git a/cpp/celeborn/conf/tests/CelebornConfTest.cpp
b/cpp/celeborn/conf/tests/CelebornConfTest.cpp
index 79619d888..368666033 100644
--- a/cpp/celeborn/conf/tests/CelebornConfTest.cpp
+++ b/cpp/celeborn/conf/tests/CelebornConfTest.cpp
@@ -51,6 +51,12 @@ void testDefaultValues(CelebornConf* conf) {
EXPECT_EQ(conf->clientFetchMaxReqsInFlight(), 3);
EXPECT_EQ(conf->shuffleCompressionCodec(), CompressionCodec::NONE);
EXPECT_EQ(conf->shuffleCompressionZstdCompressLevel(), 1);
+ EXPECT_EQ(conf->clientFetchMaxRetriesForEachReplica(), 3);
+ EXPECT_EQ(conf->networkIoRetryWait(), SECOND(5));
+ EXPECT_FALSE(conf->clientPushReplicateEnabled());
+ EXPECT_FALSE(conf->clientFetchExcludeWorkerOnFailureEnabled());
+ EXPECT_EQ(conf->clientFetchExcludedWorkerExpireTimeout(), SECOND(60));
+ EXPECT_FALSE(conf->clientAdaptiveOptimizeSkewedPartitionReadEnabled());
}
TEST(CelebornConfTest, defaultValues) {
@@ -86,6 +92,30 @@ TEST(CelebornConfTest, setValues) {
conf->registerProperty(
CelebornConf::kShuffleCompressionZstdCompressLevel, "5");
EXPECT_EQ(conf->shuffleCompressionZstdCompressLevel(), 5);
+ conf->registerProperty(
+ CelebornConf::kClientFetchMaxRetriesForEachReplica, "5");
+ EXPECT_EQ(conf->clientFetchMaxRetriesForEachReplica(), 5);
+ conf->registerProperty(CelebornConf::kNetworkIoRetryWait, "10s");
+ EXPECT_EQ(conf->networkIoRetryWait(), SECOND(10));
+ conf->registerProperty(CelebornConf::kClientPushReplicateEnabled, "true");
+ EXPECT_TRUE(conf->clientPushReplicateEnabled());
+ conf->registerProperty(CelebornConf::kClientPushReplicateEnabled, "false");
+ EXPECT_FALSE(conf->clientPushReplicateEnabled());
+ conf->registerProperty(
+ CelebornConf::kClientFetchExcludeWorkerOnFailureEnabled, "true");
+ EXPECT_TRUE(conf->clientFetchExcludeWorkerOnFailureEnabled());
+ conf->registerProperty(
+ CelebornConf::kClientFetchExcludeWorkerOnFailureEnabled, "false");
+ EXPECT_FALSE(conf->clientFetchExcludeWorkerOnFailureEnabled());
+ conf->registerProperty(
+ CelebornConf::kClientFetchExcludedWorkerExpireTimeout, "30s");
+ EXPECT_EQ(conf->clientFetchExcludedWorkerExpireTimeout(), SECOND(30));
+ conf->registerProperty(
+ CelebornConf::kClientAdaptiveOptimizeSkewedPartitionReadEnabled, "true");
+ EXPECT_TRUE(conf->clientAdaptiveOptimizeSkewedPartitionReadEnabled());
+ conf->registerProperty(
+ CelebornConf::kClientAdaptiveOptimizeSkewedPartitionReadEnabled,
"false");
+ EXPECT_FALSE(conf->clientAdaptiveOptimizeSkewedPartitionReadEnabled());
EXPECT_THROW(
conf->registerProperty("non-exist-key", "non-exist-value"),
diff --git a/cpp/celeborn/protocol/PartitionLocation.cpp
b/cpp/celeborn/protocol/PartitionLocation.cpp
index 333a4ef13..54d7101f6 100644
--- a/cpp/celeborn/protocol/PartitionLocation.cpp
+++ b/cpp/celeborn/protocol/PartitionLocation.cpp
@@ -160,6 +160,18 @@ std::string PartitionLocation::hostAndPushPort() const {
return fmt::format("{}:{}", host, pushPort);
}
+std::string PartitionLocation::hostAndFetchPort() const {
+ return fmt::format("{}:{}", host, fetchPort);
+}
+
+bool PartitionLocation::hasPeer() const {
+ return replicaPeer != nullptr;
+}
+
+const PartitionLocation* PartitionLocation::getPeer() const {
+ return replicaPeer.get();
+}
+
StatusCode toStatusCode(int32_t code) {
CELEBORN_CHECK(code >= 0);
CELEBORN_CHECK(code <= StatusCode::TAIL);
diff --git a/cpp/celeborn/protocol/PartitionLocation.h
b/cpp/celeborn/protocol/PartitionLocation.h
index 16a535bd0..f62478e03 100644
--- a/cpp/celeborn/protocol/PartitionLocation.h
+++ b/cpp/celeborn/protocol/PartitionLocation.h
@@ -98,6 +98,12 @@ struct PartitionLocation {
std::string hostAndPushPort() const;
+ std::string hostAndFetchPort() const;
+
+ bool hasPeer() const;
+
+ const PartitionLocation* getPeer() const;
+
private:
static std::unique_ptr<PartitionLocation> fromPbWithoutPeer(
const PbPartitionLocation& pb);
diff --git a/cpp/celeborn/protocol/tests/PartitionLocationTest.cpp
b/cpp/celeborn/protocol/tests/PartitionLocationTest.cpp
index edcb56520..e21bff8e2 100644
--- a/cpp/celeborn/protocol/tests/PartitionLocationTest.cpp
+++ b/cpp/celeborn/protocol/tests/PartitionLocationTest.cpp
@@ -193,3 +193,55 @@ TEST(PartitionLocationTest, toProtoWithPeer) {
pbPartitionLocationReplica->mode(), PbPartitionLocation_Mode_Replica);
verifyStorageInfoPb(&pbPartitionLocationReplica->storageinfo());
}
+
+TEST(PartitionLocationTest, hasPeer) {
+ auto partitionLocationWithoutPeer = generateBasicPartitionLocation();
+ partitionLocationWithoutPeer->mode = PartitionLocation::PRIMARY;
+ partitionLocationWithoutPeer->storageInfo = generateStorageInfo();
+
+ EXPECT_FALSE(partitionLocationWithoutPeer->hasPeer());
+
+ auto partitionLocationWithPeer = generateBasicPartitionLocation();
+ partitionLocationWithPeer->mode = PartitionLocation::PRIMARY;
+ partitionLocationWithPeer->storageInfo = generateStorageInfo();
+
+ auto partitionLocationReplica = generateBasicPartitionLocation();
+ partitionLocationReplica->mode = PartitionLocation::REPLICA;
+ partitionLocationReplica->storageInfo = generateStorageInfo();
+
+ partitionLocationWithPeer->replicaPeer = std::move(partitionLocationReplica);
+
+ EXPECT_TRUE(partitionLocationWithPeer->hasPeer());
+}
+
+TEST(PartitionLocationTest, getPeer) {
+ auto partitionLocationPrimary = generateBasicPartitionLocation();
+ partitionLocationPrimary->mode = PartitionLocation::PRIMARY;
+ partitionLocationPrimary->storageInfo = generateStorageInfo();
+
+ auto partitionLocationReplica = generateBasicPartitionLocation();
+ partitionLocationReplica->mode = PartitionLocation::REPLICA;
+ partitionLocationReplica->storageInfo = generateStorageInfo();
+
+ partitionLocationPrimary->replicaPeer = std::move(partitionLocationReplica);
+
+ const auto* peer = partitionLocationPrimary->getPeer();
+ EXPECT_NE(peer, nullptr);
+ EXPECT_EQ(peer->mode, PartitionLocation::Mode::REPLICA);
+ verifyBasicPartitionLocation(peer);
+
+ auto partitionLocationWithoutPeer = generateBasicPartitionLocation();
+ partitionLocationWithoutPeer->mode = PartitionLocation::PRIMARY;
+ partitionLocationWithoutPeer->storageInfo = generateStorageInfo();
+
+ EXPECT_EQ(partitionLocationWithoutPeer->getPeer(), nullptr);
+}
+
+TEST(PartitionLocationTest, hostAndFetchPort) {
+ auto partitionLocation = generateBasicPartitionLocation();
+ partitionLocation->host = "test_host";
+ partitionLocation->fetchPort = 1003;
+
+ std::string expected = "test_host:1003";
+ EXPECT_EQ(partitionLocation->hostAndFetchPort(), expected);
+}
diff --git a/cpp/celeborn/utils/CelebornUtils.cpp
b/cpp/celeborn/utils/CelebornUtils.cpp
index 675850445..7460e0a6e 100644
--- a/cpp/celeborn/utils/CelebornUtils.cpp
+++ b/cpp/celeborn/utils/CelebornUtils.cpp
@@ -19,6 +19,21 @@
namespace celeborn {
namespace utils {
+
+bool isCriticalCauseForFetch(const std::exception& e) {
+ if (dynamic_cast<const std::system_error*>(&e)) {
+ return true;
+ }
+
+ std::string msg = e.what();
+ if (msg.rfind("Connecting to", 0) == 0 || msg.rfind("Failed to", 0) == 0 ||
+ msg.find("imeout") != std::string::npos) {
+ return true;
+ }
+
+ return false;
+}
+
std::string makeShuffleKey(const std::string& appId, const int shuffleId) {
return appId + "-" + std::to_string(shuffleId);
}
diff --git a/cpp/celeborn/utils/CelebornUtils.h
b/cpp/celeborn/utils/CelebornUtils.h
index 6d39226ce..4883c81f4 100644
--- a/cpp/celeborn/utils/CelebornUtils.h
+++ b/cpp/celeborn/utils/CelebornUtils.h
@@ -22,6 +22,7 @@
#include <charconv>
#include <chrono>
#include <set>
+#include <system_error>
#include <vector>
#include "celeborn/memory/ByteBuffer.h"
@@ -37,6 +38,11 @@ namespace utils {
#define CELEBORN_SHUTDOWN_LOG(severity) \
LOG(severity) << CELEBORN_SHUTDOWN_LOG_PREFIX
+/// Checks if an exception represents a critical cause for fetch failure,
+/// matching the behavior of Java's Utils.isCriticalCauseForFetch.
+/// Critical causes include: connection errors, timeouts, and IO failures.
+bool isCriticalCauseForFetch(const std::exception& e);
+
template <typename T>
std::vector<T> toVector(const std::set<T>& in) {
std::vector<T> out{};
diff --git a/cpp/celeborn/utils/tests/CelebornUtilsTest.cpp
b/cpp/celeborn/utils/tests/CelebornUtilsTest.cpp
index 8517074ec..4e9e891f4 100644
--- a/cpp/celeborn/utils/tests/CelebornUtilsTest.cpp
+++ b/cpp/celeborn/utils/tests/CelebornUtilsTest.cpp
@@ -18,6 +18,7 @@
#include <gtest/gtest.h>
#include <atomic>
#include <string>
+#include <system_error>
#include <thread>
#include <vector>
@@ -36,6 +37,60 @@ class CelebornUtilsTest : public testing::Test {
std::unique_ptr<ConcurrentHashSet<int>> set_;
};
+// Tests for isCriticalCauseForFetch, matching Java's
+// Utils.isCriticalCauseForFetch
+TEST(IsCriticalCauseForFetchTest, systemErrorIsCritical) {
+ std::system_error e(
+ std::make_error_code(std::errc::connection_refused),
+ "Connection refused");
+ EXPECT_TRUE(isCriticalCauseForFetch(e));
+}
+
+TEST(IsCriticalCauseForFetchTest, connectionRefusedByMessage) {
+ std::runtime_error e("Connecting to worker1:9097 failed");
+ EXPECT_TRUE(isCriticalCauseForFetch(e));
+}
+
+TEST(IsCriticalCauseForFetchTest, failedToConnectByMessage) {
+ std::runtime_error e("Failed to open stream for shuffle 1");
+ EXPECT_TRUE(isCriticalCauseForFetch(e));
+}
+
+TEST(IsCriticalCauseForFetchTest, timeoutByMessage) {
+ std::runtime_error e("RPC request Timeout after 120s");
+ EXPECT_TRUE(isCriticalCauseForFetch(e));
+}
+
+TEST(IsCriticalCauseForFetchTest, lowercaseTimeoutByMessage) {
+ std::runtime_error e("Fetch chunk timeout for streamId 42");
+ EXPECT_TRUE(isCriticalCauseForFetch(e));
+}
+
+TEST(IsCriticalCauseForFetchTest, genericErrorIsNotCritical) {
+ std::runtime_error e("Invalid batch header checksum");
+ EXPECT_FALSE(isCriticalCauseForFetch(e));
+}
+
+TEST(IsCriticalCauseForFetchTest, decompressionErrorIsNotCritical) {
+ std::runtime_error e("LZ4 decompression failed");
+ EXPECT_FALSE(isCriticalCauseForFetch(e));
+}
+
+TEST(IsCriticalCauseForFetchTest, emptyMessageIsNotCritical) {
+ std::runtime_error e("");
+ EXPECT_FALSE(isCriticalCauseForFetch(e));
+}
+
+TEST(IsCriticalCauseForFetchTest, substringConnectingToIsNotCritical) {
+ std::runtime_error e("Error while Connecting to worker1:9097");
+ EXPECT_FALSE(isCriticalCauseForFetch(e));
+}
+
+TEST(IsCriticalCauseForFetchTest, substringFailedToIsNotCritical) {
+ std::runtime_error e("Worker unexpectedly Failed to respond");
+ EXPECT_FALSE(isCriticalCauseForFetch(e));
+}
+
TEST_F(CelebornUtilsTest, mapBasicInsertAndRetrieve) {
map_->set("apple", 10);
auto result = map_->get("apple");