Copilot commented on code in PR #547:
URL: https://github.com/apache/pulsar-client-cpp/pull/547#discussion_r2944160087


##########
include/pulsar/AutoClusterFailover.h:
##########
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef PULSAR_AUTO_CLUSTER_FAILOVER_H_
+#define PULSAR_AUTO_CLUSTER_FAILOVER_H_
+
+#include <pulsar/ServiceInfoProvider.h>
+
+#include <chrono>
+#include <cstdint>

Review Comment:
   This public header uses `std::vector` (Config::secondary) and 
`std::shared_ptr` (impl_ member) but doesn’t include `<vector>` / `<memory>`. 
Relying on transitive includes can break consumers; please add the direct 
includes here.
   



##########
include/pulsar/AutoClusterFailover.h:
##########
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef PULSAR_AUTO_CLUSTER_FAILOVER_H_
+#define PULSAR_AUTO_CLUSTER_FAILOVER_H_
+
+#include <pulsar/ServiceInfoProvider.h>
+
+#include <chrono>
+#include <cstdint>
+
+namespace pulsar {
+
+class Client;
+class AutoClusterFailoverImpl;
+
+class PULSAR_PUBLIC AutoClusterFailover final : public ServiceInfoProvider {
+   public:
+    struct Config {
+        const ServiceInfo primary;
+        const std::vector<ServiceInfo> secondary;
+        std::chrono::milliseconds checkInterval{5000};  // 5 seconds
+        uint32_t failoverThreshold{1};
+        uint32_t switchBackThreshold{1};
+
+        Config(ServiceInfo primary, std::vector<ServiceInfo> secondary)
+            : primary(std::move(primary)), secondary(std::move(secondary)) {}
+    };
+
+    /**
+     * Builder helps create an AutoClusterFailover configuration.
+     *
+     * Example:
+     *   ServiceInfo primary{...};
+     *   std::vector<ServiceInfo> secondaries{...};
+     *   AutoClusterFailover provider = AutoClusterFailover::Builder(primary, 
secondaries)
+     *       .withCheckInterval(std::chrono::seconds(5))
+     *       .withFailoverThreshold(3)
+     *       .withSwitchBackThreshold(3)
+     *       .build();
+     *
+     * Notes:
+     * - primary: the preferred cluster to use when available.
+     * - secondary: ordered list of fallback clusters.
+     * - checkInterval: frequency of health probes.
+     * - failoverThreshold: the number of consecutive failed probes required 
before switching away from
+     *   the current cluster.
+     * - switchBackThreshold: the number of consecutive successful probes to 
the primary required before
+     *   switching back from a secondary.

Review Comment:
   The Builder docs state that `switchBackThreshold` is “the number of 
consecutive successful probes to the primary required before switching back 
from a secondary”, but the current implementation can switch back immediately 
when the active secondary is unavailable and the primary is available. Either 
clarify this exception in the docs or adjust the logic to honor the threshold 
consistently.



##########
lib/AutoClusterFailover.cc:
##########
@@ -0,0 +1,395 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <pulsar/AutoClusterFailover.h>
+
+#include <chrono>
+#include <future>
+#include <memory>
+#include <optional>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "AsioTimer.h"
+#include "LogUtils.h"
+#include "ServiceURI.h"
+#include "Url.h"
+
+#ifdef USE_ASIO
+#include <asio/connect.hpp>
+#include <asio/executor_work_guard.hpp>
+#include <asio/io_context.hpp>
+#include <asio/ip/tcp.hpp>
+#include <asio/post.hpp>
+#include <asio/steady_timer.hpp>
+#else
+#include <boost/asio/connect.hpp>
+#include <boost/asio/executor_work_guard.hpp>
+#include <boost/asio/io_context.hpp>
+#include <boost/asio/ip/tcp.hpp>
+#include <boost/asio/post.hpp>
+#include <boost/asio/steady_timer.hpp>
+#endif
+
+#include "AsioDefines.h"
+
+DECLARE_LOG_OBJECT()
+
+namespace pulsar {
+
+class AutoClusterFailoverImpl : public 
std::enable_shared_from_this<AutoClusterFailoverImpl> {
+   public:
+    AutoClusterFailoverImpl(AutoClusterFailover::Config&& config)
+        : config_(std::move(config)), currentServiceInfo_(&config_.primary) {}
+
+    ~AutoClusterFailoverImpl() {
+        using namespace std::chrono_literals;
+        if (!thread_.joinable() || !future_.valid()) {
+            return;
+        }
+
+        cancelTimer(*timer_);
+        workGuard_.reset();
+        ioContext_.stop();
+
+        if (future_.wait_for(1s) != std::future_status::ready) {
+            LOG_WARN("AutoClusterFailoverImpl is not stopped within 3 seconds, 
skip it");
+            thread_.detach();
+        } else {
+            thread_.join();
+        }
+    }
+
+    auto primary() const noexcept { return config_.primary; }
+
+    void initialize(std::function<void(ServiceInfo)>&& onServiceInfoUpdate) {
+        onServiceInfoUpdate_ = std::move(onServiceInfoUpdate);
+        workGuard_.emplace(ASIO::make_work_guard(ioContext_));
+        timer_.emplace(ioContext_);
+
+        auto weakSelf = weak_from_this();
+        ASIO::post(ioContext_, [weakSelf] {
+            if (auto self = weakSelf.lock()) {
+                self->scheduleFailoverCheck();
+            }
+        });
+
+        // Capturing `this` is safe because the thread will be joined in the 
destructor
+        std::promise<void> promise;
+        future_ = promise.get_future();
+        thread_ = std::thread([this, promise{std::move(promise)}]() mutable {
+            ioContext_.run();
+            promise.set_value();
+        });
+    }
+
+   private:
+    static constexpr std::chrono::milliseconds probeTimeout_{30000};
+    using CompletionCallback = std::function<void()>;
+    using ProbeCallback = std::function<void(bool)>;
+
+    struct ProbeContext {
+        ASIO::ip::tcp::resolver resolver;
+        ASIO::ip::tcp::socket socket;
+        ASIO::steady_timer timer;
+        ProbeCallback callback;
+        bool done{false};
+        std::string hostUrl;
+
+        ProbeContext(ASIO::io_context& ioContext, std::string hostUrl, 
ProbeCallback callback)
+            : resolver(ioContext),
+              socket(ioContext),
+              timer(ioContext),
+              callback(std::move(callback)),
+              hostUrl(std::move(hostUrl)) {}
+    };
+
+    AutoClusterFailover::Config config_;
+    const ServiceInfo* currentServiceInfo_;
+    uint32_t consecutiveFailureCount_{0};
+    uint32_t consecutivePrimaryRecoveryCount_{0};
+
+    std::thread thread_;
+    std::future<void> future_;
+
+    ASIO::io_context ioContext_;
+    std::function<void(ServiceInfo)> onServiceInfoUpdate_;
+
+    std::optional<ASIO::executor_work_guard<ASIO::io_context::executor_type>> 
workGuard_;
+    std::optional<ASIO::steady_timer> timer_;
+
+    bool isUsingPrimary() const noexcept { return currentServiceInfo_ == 
&config_.primary; }
+
+    const ServiceInfo& current() const noexcept { return *currentServiceInfo_; 
}
+
+    void scheduleFailoverCheck() {
+        timer_->expires_after(config_.checkInterval);
+        auto weakSelf = weak_from_this();
+        timer_->async_wait([weakSelf](ASIO_ERROR error) {
+            if (error) {
+                return;
+            }
+            if (auto self = weakSelf.lock()) {
+                self->executeFailoverCheck();
+            }
+        });
+    }
+
+    void executeFailoverCheck() {
+        auto done = [weakSelf = weak_from_this()] {
+            if (auto self = weakSelf.lock()) {
+                self->scheduleFailoverCheck();
+            }
+        };
+
+        if (isUsingPrimary()) {
+            checkAndFailoverToSecondaryAsync(std::move(done));
+        } else {
+            checkSecondaryAndPrimaryAsync(std::move(done));
+        }
+    }
+
+    static void completeProbe(const std::shared_ptr<ProbeContext>& context, 
bool success,
+                              const ASIO_ERROR& error = ASIO_SUCCESS) {
+        if (context->done) {
+            return;
+        }
+
+        context->done = true;
+        ASIO_ERROR ignored;
+        context->resolver.cancel();
+        context->socket.close(ignored);
+        context->timer.cancel(ignored);
+
+        context->callback(success);
+    }
+
+    void probeHostAsync(const std::string& hostUrl, ProbeCallback callback) {
+        Url parsedUrl;
+        if (!Url::parse(hostUrl, parsedUrl)) {
+            LOG_WARN("Failed to parse service URL for probing: " << hostUrl);
+            callback(false);
+            return;
+        }
+
+        auto context = std::make_shared<ProbeContext>(ioContext_, hostUrl, 
std::move(callback));
+        context->timer.expires_after(probeTimeout_);
+        context->timer.async_wait([context](const ASIO_ERROR& error) {
+            if (!error) {
+                completeProbe(context, false, ASIO::error::timed_out);
+            }
+        });
+
+        context->resolver.async_resolve(
+            parsedUrl.host(), std::to_string(parsedUrl.port()),
+            [context](const ASIO_ERROR& error, const 
ASIO::ip::tcp::resolver::results_type& endpoints) {
+                if (error) {
+                    completeProbe(context, false, error);
+                    return;
+                }
+
+                ASIO::async_connect(
+                    context->socket, endpoints,
+                    [context](const ASIO_ERROR& connectError, const 
ASIO::ip::tcp::endpoint&) {
+                        completeProbe(context, !connectError, connectError);
+                    });
+            });
+    }
+
+    void probeHostsAsync(const std::shared_ptr<std::vector<std::string>>& 
hosts, size_t index,
+                         ProbeCallback callback) {
+        if (index >= hosts->size()) {
+            callback(false);
+            return;
+        }
+
+        auto hostUrl = (*hosts)[index];
+        auto weakSelf = weak_from_this();
+        probeHostAsync(hostUrl,
+                       [weakSelf, hosts, index, callback = 
std::move(callback)](bool available) mutable {
+                           if (available) {
+                               callback(true);
+                               return;
+                           }
+                           if (auto self = weakSelf.lock()) {
+                               self->probeHostsAsync(hosts, index + 1, 
std::move(callback));
+                           }
+                       });
+    }
+
+    void probeAvailableAsync(const ServiceInfo& serviceInfo, ProbeCallback 
callback) {
+        try {
+            ServiceURI serviceUri{serviceInfo.serviceUrl()};
+            auto hosts = 
std::make_shared<std::vector<std::string>>(serviceUri.getServiceHosts());
+            if (hosts->empty()) {
+                callback(false);
+                return;
+            }
+            probeHostsAsync(hosts, 0, std::move(callback));
+        } catch (const std::exception& e) {
+            LOG_WARN("Failed to probe service URL " << 
serviceInfo.serviceUrl() << ": " << e.what());
+            callback(false);
+        }
+    }
+
+    void switchTo(const ServiceInfo* serviceInfo) {
+        if (currentServiceInfo_ == serviceInfo) {
+            return;
+        }
+
+        LOG_INFO("Switch service URL from " << current().serviceUrl() << " to 
" << serviceInfo->serviceUrl());
+        currentServiceInfo_ = serviceInfo;
+        consecutiveFailureCount_ = 0;
+        consecutivePrimaryRecoveryCount_ = 0;
+        onServiceInfoUpdate_(current());
+    }
+
+    void probeSecondaryFrom(size_t index, CompletionCallback done) {
+        if (index >= config_.secondary.size()) {
+            done();
+            return;
+        }
+
+        auto weakSelf = weak_from_this();
+        probeAvailableAsync(config_.secondary[index],
+                            [weakSelf, index, done = std::move(done)](bool 
available) mutable {
+                                auto self = weakSelf.lock();
+                                if (!self) {
+                                    return;
+                                }
+
+                                LOG_DEBUG("Detected secondary " << 
self->config_.secondary[index].serviceUrl()
+                                                                << " 
availability: " << available);
+                                if (available) {
+                                    
self->switchTo(&self->config_.secondary[index]);
+                                    done();
+                                    return;
+                                }
+
+                                self->probeSecondaryFrom(index + 1, 
std::move(done));
+                            });
+    }
+
+    void checkAndFailoverToSecondaryAsync(CompletionCallback done) {
+        auto weakSelf = weak_from_this();
+        probeAvailableAsync(current(), [weakSelf, done = std::move(done)](bool 
primaryAvailable) mutable {
+            auto self = weakSelf.lock();
+            if (!self) {
+                return;
+            }
+
+            LOG_DEBUG("Detected primary " << self->current().serviceUrl()
+                                          << " availability: " << 
primaryAvailable);
+            if (primaryAvailable) {
+                self->consecutiveFailureCount_ = 0;
+                done();
+                return;
+            }
+
+            if (++self->consecutiveFailureCount_ < 
self->config_.failoverThreshold) {
+                done();
+                return;
+            }
+
+            self->probeSecondaryFrom(0, std::move(done));
+        });
+    }
+
+    void checkSwitchBackToPrimaryAsync(CompletionCallback done, 
std::optional<bool> primaryAvailableHint) {
+        auto handlePrimaryAvailable = [weakSelf = weak_from_this(),
+                                       done = std::move(done)](bool 
primaryAvailable) mutable {
+            auto self = weakSelf.lock();
+            if (!self) {
+                return;
+            }
+
+            if (!primaryAvailable) {
+                self->consecutivePrimaryRecoveryCount_ = 0;
+                done();
+                return;
+            }
+
+            if (++self->consecutivePrimaryRecoveryCount_ >= 
self->config_.switchBackThreshold) {
+                self->switchTo(&self->config_.primary);
+            }
+            done();
+        };
+
+        if (primaryAvailableHint.has_value()) {
+            handlePrimaryAvailable(*primaryAvailableHint);
+            return;
+        }
+
+        probeAvailableAsync(config_.primary, 
std::move(handlePrimaryAvailable));
+    }
+
+    void checkSecondaryAndPrimaryAsync(CompletionCallback done) {
+        auto weakSelf = weak_from_this();
+        probeAvailableAsync(current(), [weakSelf, done = std::move(done)](bool 
secondaryAvailable) mutable {
+            auto self = weakSelf.lock();
+            if (!self) {
+                return;
+            }
+
+            LOG_DEBUG("Detected secondary " << self->current().serviceUrl()
+                                            << " availability: " << 
secondaryAvailable);
+            if (secondaryAvailable) {
+                self->consecutiveFailureCount_ = 0;
+                self->checkSwitchBackToPrimaryAsync(std::move(done), 
std::nullopt);
+                return;
+            }
+
+            if (++self->consecutiveFailureCount_ < 
self->config_.failoverThreshold) {
+                self->checkSwitchBackToPrimaryAsync(std::move(done), 
std::nullopt);
+                return;
+            }
+
+            self->probeAvailableAsync(
+                self->config_.primary, [weakSelf, done = std::move(done)](bool 
primaryAvailable) mutable {
+                    auto self = weakSelf.lock();
+                    if (!self) {
+                        return;
+                    }
+
+                    LOG_DEBUG("Detected primary after secondary is available "

Review Comment:
   This debug log message says “after secondary is available”, but it’s 
executed in the branch where `secondaryAvailable` is false. Consider correcting 
the message to avoid misleading diagnostics.
   



##########
lib/AutoClusterFailover.cc:
##########
@@ -0,0 +1,395 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <pulsar/AutoClusterFailover.h>
+
+#include <chrono>
+#include <future>
+#include <memory>
+#include <optional>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "AsioTimer.h"
+#include "LogUtils.h"
+#include "ServiceURI.h"
+#include "Url.h"
+
+#ifdef USE_ASIO
+#include <asio/connect.hpp>
+#include <asio/executor_work_guard.hpp>
+#include <asio/io_context.hpp>
+#include <asio/ip/tcp.hpp>
+#include <asio/post.hpp>
+#include <asio/steady_timer.hpp>
+#else
+#include <boost/asio/connect.hpp>
+#include <boost/asio/executor_work_guard.hpp>
+#include <boost/asio/io_context.hpp>
+#include <boost/asio/ip/tcp.hpp>
+#include <boost/asio/post.hpp>
+#include <boost/asio/steady_timer.hpp>
+#endif
+
+#include "AsioDefines.h"
+
+DECLARE_LOG_OBJECT()
+
+namespace pulsar {
+
+class AutoClusterFailoverImpl : public 
std::enable_shared_from_this<AutoClusterFailoverImpl> {
+   public:
+    AutoClusterFailoverImpl(AutoClusterFailover::Config&& config)
+        : config_(std::move(config)), currentServiceInfo_(&config_.primary) {}
+
+    ~AutoClusterFailoverImpl() {
+        using namespace std::chrono_literals;
+        if (!thread_.joinable() || !future_.valid()) {
+            return;
+        }
+
+        cancelTimer(*timer_);
+        workGuard_.reset();
+        ioContext_.stop();
+
+        if (future_.wait_for(1s) != std::future_status::ready) {
+            LOG_WARN("AutoClusterFailoverImpl is not stopped within 3 seconds, 
skip it");
+            thread_.detach();
+        } else {
+            thread_.join();
+        }

Review Comment:
   Detaching `thread_` in the destructor is unsafe here because the thread 
lambda captures `this` and continues running `ioContext_.run()` on members that 
will be destroyed when the destructor returns (use-after-free). Also, the 
warning says “within 3 seconds” but `wait_for(1s)` only waits 1 second. Prefer 
a safe shutdown that guarantees the thread has fully exited (or keep the state 
alive independently of `this` if you truly must detach), and keep the 
timeout/message consistent.
   



##########
lib/AutoClusterFailover.cc:
##########
@@ -0,0 +1,395 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <pulsar/AutoClusterFailover.h>
+
+#include <chrono>
+#include <future>
+#include <memory>
+#include <optional>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "AsioTimer.h"
+#include "LogUtils.h"
+#include "ServiceURI.h"
+#include "Url.h"
+
+#ifdef USE_ASIO
+#include <asio/connect.hpp>
+#include <asio/executor_work_guard.hpp>
+#include <asio/io_context.hpp>
+#include <asio/ip/tcp.hpp>
+#include <asio/post.hpp>
+#include <asio/steady_timer.hpp>
+#else
+#include <boost/asio/connect.hpp>
+#include <boost/asio/executor_work_guard.hpp>
+#include <boost/asio/io_context.hpp>
+#include <boost/asio/ip/tcp.hpp>
+#include <boost/asio/post.hpp>
+#include <boost/asio/steady_timer.hpp>
+#endif
+
+#include "AsioDefines.h"
+
+DECLARE_LOG_OBJECT()
+
+namespace pulsar {
+
+class AutoClusterFailoverImpl : public 
std::enable_shared_from_this<AutoClusterFailoverImpl> {
+   public:
+    AutoClusterFailoverImpl(AutoClusterFailover::Config&& config)
+        : config_(std::move(config)), currentServiceInfo_(&config_.primary) {}
+
+    ~AutoClusterFailoverImpl() {
+        using namespace std::chrono_literals;
+        if (!thread_.joinable() || !future_.valid()) {
+            return;
+        }
+
+        cancelTimer(*timer_);

Review Comment:
   `cancelTimer(*timer_)` dereferences an `std::optional<steady_timer>` without 
checking `timer_.has_value()`. It’s currently only safe if `initialize()` 
always completes fully before the thread becomes joinable; adding a defensive 
`if (timer_)` (like other code in the repo) would avoid UB if that assumption 
changes.
   



##########
tests/ServiceInfoProviderTest.cc:
##########
@@ -93,6 +204,75 @@ class TestServiceInfoProvider : public ServiceInfoProvider {
     mutable std::mutex mutex_;
 };
 
+TEST(AutoClusterFailoverTest, testFailoverToFirstAvailableSecondaryAfterDelay) 
{
+    ProbeTcpServer availableSecondary;
+    ProbeTcpServer unavailableSecondary;
+    const auto primaryUrl = unavailableSecondary.getServiceUrl();
+    unavailableSecondary.stop();
+
+    ProbeTcpServer skippedSecondary;
+    const auto skippedSecondaryUrl = skippedSecondary.getServiceUrl();
+    skippedSecondary.stop();

Review Comment:
   Current tests cover primary->secondary failover and secondary->primary 
switchback, but there’s no coverage for the case where the selected secondary 
later becomes unavailable while another secondary is available (the provider 
should typically move to the next available secondary instead of staying on a 
down one). Adding a test for that scenario would help prevent regressions in 
multi-secondary setups.



##########
lib/AutoClusterFailover.cc:
##########
@@ -0,0 +1,395 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <pulsar/AutoClusterFailover.h>
+
+#include <chrono>
+#include <future>
+#include <memory>
+#include <optional>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "AsioTimer.h"
+#include "LogUtils.h"
+#include "ServiceURI.h"
+#include "Url.h"
+
+#ifdef USE_ASIO
+#include <asio/connect.hpp>
+#include <asio/executor_work_guard.hpp>
+#include <asio/io_context.hpp>
+#include <asio/ip/tcp.hpp>
+#include <asio/post.hpp>
+#include <asio/steady_timer.hpp>
+#else
+#include <boost/asio/connect.hpp>
+#include <boost/asio/executor_work_guard.hpp>
+#include <boost/asio/io_context.hpp>
+#include <boost/asio/ip/tcp.hpp>
+#include <boost/asio/post.hpp>
+#include <boost/asio/steady_timer.hpp>
+#endif
+
+#include "AsioDefines.h"
+
+DECLARE_LOG_OBJECT()
+
+namespace pulsar {
+
+class AutoClusterFailoverImpl : public 
std::enable_shared_from_this<AutoClusterFailoverImpl> {
+   public:
+    AutoClusterFailoverImpl(AutoClusterFailover::Config&& config)
+        : config_(std::move(config)), currentServiceInfo_(&config_.primary) {}
+
+    ~AutoClusterFailoverImpl() {
+        using namespace std::chrono_literals;
+        if (!thread_.joinable() || !future_.valid()) {
+            return;
+        }
+
+        cancelTimer(*timer_);
+        workGuard_.reset();
+        ioContext_.stop();
+
+        if (future_.wait_for(1s) != std::future_status::ready) {
+            LOG_WARN("AutoClusterFailoverImpl is not stopped within 3 seconds, 
skip it");
+            thread_.detach();
+        } else {
+            thread_.join();
+        }
+    }
+
+    auto primary() const noexcept { return config_.primary; }
+
+    void initialize(std::function<void(ServiceInfo)>&& onServiceInfoUpdate) {
+        onServiceInfoUpdate_ = std::move(onServiceInfoUpdate);
+        workGuard_.emplace(ASIO::make_work_guard(ioContext_));
+        timer_.emplace(ioContext_);
+
+        auto weakSelf = weak_from_this();
+        ASIO::post(ioContext_, [weakSelf] {
+            if (auto self = weakSelf.lock()) {
+                self->scheduleFailoverCheck();
+            }
+        });
+
+        // Capturing `this` is safe because the thread will be joined in the 
destructor
+        std::promise<void> promise;
+        future_ = promise.get_future();
+        thread_ = std::thread([this, promise{std::move(promise)}]() mutable {
+            ioContext_.run();
+            promise.set_value();
+        });
+    }
+
+   private:
+    static constexpr std::chrono::milliseconds probeTimeout_{30000};
+    using CompletionCallback = std::function<void()>;
+    using ProbeCallback = std::function<void(bool)>;
+
+    struct ProbeContext {
+        ASIO::ip::tcp::resolver resolver;
+        ASIO::ip::tcp::socket socket;
+        ASIO::steady_timer timer;
+        ProbeCallback callback;
+        bool done{false};
+        std::string hostUrl;
+
+        ProbeContext(ASIO::io_context& ioContext, std::string hostUrl, 
ProbeCallback callback)
+            : resolver(ioContext),
+              socket(ioContext),
+              timer(ioContext),
+              callback(std::move(callback)),
+              hostUrl(std::move(hostUrl)) {}
+    };
+
+    AutoClusterFailover::Config config_;
+    const ServiceInfo* currentServiceInfo_;
+    uint32_t consecutiveFailureCount_{0};
+    uint32_t consecutivePrimaryRecoveryCount_{0};
+
+    std::thread thread_;
+    std::future<void> future_;
+
+    ASIO::io_context ioContext_;
+    std::function<void(ServiceInfo)> onServiceInfoUpdate_;
+
+    std::optional<ASIO::executor_work_guard<ASIO::io_context::executor_type>> 
workGuard_;
+    std::optional<ASIO::steady_timer> timer_;
+
+    bool isUsingPrimary() const noexcept { return currentServiceInfo_ == 
&config_.primary; }
+
+    const ServiceInfo& current() const noexcept { return *currentServiceInfo_; 
}
+
+    void scheduleFailoverCheck() {
+        timer_->expires_after(config_.checkInterval);
+        auto weakSelf = weak_from_this();
+        timer_->async_wait([weakSelf](ASIO_ERROR error) {
+            if (error) {
+                return;
+            }
+            if (auto self = weakSelf.lock()) {
+                self->executeFailoverCheck();
+            }
+        });
+    }
+
+    void executeFailoverCheck() {
+        auto done = [weakSelf = weak_from_this()] {
+            if (auto self = weakSelf.lock()) {
+                self->scheduleFailoverCheck();
+            }
+        };
+
+        if (isUsingPrimary()) {
+            checkAndFailoverToSecondaryAsync(std::move(done));
+        } else {
+            checkSecondaryAndPrimaryAsync(std::move(done));
+        }
+    }
+
+    static void completeProbe(const std::shared_ptr<ProbeContext>& context, 
bool success,
+                              const ASIO_ERROR& error = ASIO_SUCCESS) {
+        if (context->done) {
+            return;
+        }
+
+        context->done = true;
+        ASIO_ERROR ignored;
+        context->resolver.cancel();
+        context->socket.close(ignored);
+        context->timer.cancel(ignored);
+
+        context->callback(success);
+    }
+
+    void probeHostAsync(const std::string& hostUrl, ProbeCallback callback) {
+        Url parsedUrl;
+        if (!Url::parse(hostUrl, parsedUrl)) {
+            LOG_WARN("Failed to parse service URL for probing: " << hostUrl);
+            callback(false);
+            return;
+        }
+
+        auto context = std::make_shared<ProbeContext>(ioContext_, hostUrl, 
std::move(callback));
+        context->timer.expires_after(probeTimeout_);
+        context->timer.async_wait([context](const ASIO_ERROR& error) {
+            if (!error) {
+                completeProbe(context, false, ASIO::error::timed_out);
+            }
+        });
+
+        context->resolver.async_resolve(
+            parsedUrl.host(), std::to_string(parsedUrl.port()),
+            [context](const ASIO_ERROR& error, const 
ASIO::ip::tcp::resolver::results_type& endpoints) {
+                if (error) {
+                    completeProbe(context, false, error);
+                    return;
+                }
+
+                ASIO::async_connect(
+                    context->socket, endpoints,
+                    [context](const ASIO_ERROR& connectError, const 
ASIO::ip::tcp::endpoint&) {
+                        completeProbe(context, !connectError, connectError);
+                    });
+            });
+    }
+
+    void probeHostsAsync(const std::shared_ptr<std::vector<std::string>>& 
hosts, size_t index,
+                         ProbeCallback callback) {
+        if (index >= hosts->size()) {
+            callback(false);
+            return;
+        }
+
+        auto hostUrl = (*hosts)[index];
+        auto weakSelf = weak_from_this();
+        probeHostAsync(hostUrl,
+                       [weakSelf, hosts, index, callback = 
std::move(callback)](bool available) mutable {
+                           if (available) {
+                               callback(true);
+                               return;
+                           }
+                           if (auto self = weakSelf.lock()) {
+                               self->probeHostsAsync(hosts, index + 1, 
std::move(callback));
+                           }
+                       });
+    }
+
+    void probeAvailableAsync(const ServiceInfo& serviceInfo, ProbeCallback 
callback) {
+        try {
+            ServiceURI serviceUri{serviceInfo.serviceUrl()};
+            auto hosts = 
std::make_shared<std::vector<std::string>>(serviceUri.getServiceHosts());
+            if (hosts->empty()) {
+                callback(false);
+                return;
+            }
+            probeHostsAsync(hosts, 0, std::move(callback));
+        } catch (const std::exception& e) {
+            LOG_WARN("Failed to probe service URL " << 
serviceInfo.serviceUrl() << ": " << e.what());
+            callback(false);
+        }
+    }
+
+    void switchTo(const ServiceInfo* serviceInfo) {
+        if (currentServiceInfo_ == serviceInfo) {
+            return;
+        }
+
+        LOG_INFO("Switch service URL from " << current().serviceUrl() << " to 
" << serviceInfo->serviceUrl());
+        currentServiceInfo_ = serviceInfo;
+        consecutiveFailureCount_ = 0;
+        consecutivePrimaryRecoveryCount_ = 0;
+        onServiceInfoUpdate_(current());
+    }
+
+    void probeSecondaryFrom(size_t index, CompletionCallback done) {
+        if (index >= config_.secondary.size()) {
+            done();
+            return;
+        }
+
+        auto weakSelf = weak_from_this();
+        probeAvailableAsync(config_.secondary[index],
+                            [weakSelf, index, done = std::move(done)](bool 
available) mutable {
+                                auto self = weakSelf.lock();
+                                if (!self) {
+                                    return;
+                                }
+
+                                LOG_DEBUG("Detected secondary " << 
self->config_.secondary[index].serviceUrl()
+                                                                << " 
availability: " << available);
+                                if (available) {
+                                    
self->switchTo(&self->config_.secondary[index]);
+                                    done();
+                                    return;
+                                }
+
+                                self->probeSecondaryFrom(index + 1, 
std::move(done));
+                            });
+    }
+
+    void checkAndFailoverToSecondaryAsync(CompletionCallback done) {
+        auto weakSelf = weak_from_this();
+        probeAvailableAsync(current(), [weakSelf, done = std::move(done)](bool 
primaryAvailable) mutable {
+            auto self = weakSelf.lock();
+            if (!self) {
+                return;
+            }
+
+            LOG_DEBUG("Detected primary " << self->current().serviceUrl()
+                                          << " availability: " << 
primaryAvailable);
+            if (primaryAvailable) {
+                self->consecutiveFailureCount_ = 0;
+                done();
+                return;
+            }
+
+            if (++self->consecutiveFailureCount_ < 
self->config_.failoverThreshold) {
+                done();
+                return;
+            }
+
+            self->probeSecondaryFrom(0, std::move(done));
+        });
+    }
+
+    void checkSwitchBackToPrimaryAsync(CompletionCallback done, 
std::optional<bool> primaryAvailableHint) {
+        auto handlePrimaryAvailable = [weakSelf = weak_from_this(),
+                                       done = std::move(done)](bool 
primaryAvailable) mutable {
+            auto self = weakSelf.lock();
+            if (!self) {
+                return;
+            }
+
+            if (!primaryAvailable) {
+                self->consecutivePrimaryRecoveryCount_ = 0;
+                done();
+                return;
+            }
+
+            if (++self->consecutivePrimaryRecoveryCount_ >= 
self->config_.switchBackThreshold) {
+                self->switchTo(&self->config_.primary);
+            }
+            done();
+        };
+
+        if (primaryAvailableHint.has_value()) {
+            handlePrimaryAvailable(*primaryAvailableHint);
+            return;
+        }
+
+        probeAvailableAsync(config_.primary, 
std::move(handlePrimaryAvailable));
+    }
+
+    void checkSecondaryAndPrimaryAsync(CompletionCallback done) {
+        auto weakSelf = weak_from_this();
+        probeAvailableAsync(current(), [weakSelf, done = std::move(done)](bool 
secondaryAvailable) mutable {
+            auto self = weakSelf.lock();
+            if (!self) {
+                return;
+            }
+
+            LOG_DEBUG("Detected secondary " << self->current().serviceUrl()
+                                            << " availability: " << 
secondaryAvailable);
+            if (secondaryAvailable) {
+                self->consecutiveFailureCount_ = 0;
+                self->checkSwitchBackToPrimaryAsync(std::move(done), 
std::nullopt);
+                return;
+            }
+
+            if (++self->consecutiveFailureCount_ < 
self->config_.failoverThreshold) {
+                self->checkSwitchBackToPrimaryAsync(std::move(done), 
std::nullopt);
+                return;
+            }
+
+            self->probeAvailableAsync(
+                self->config_.primary, [weakSelf, done = std::move(done)](bool 
primaryAvailable) mutable {
+                    auto self = weakSelf.lock();
+                    if (!self) {
+                        return;
+                    }
+
+                    LOG_DEBUG("Detected primary after secondary is available "
+                              << self->config_.primary.serviceUrl() << " 
availability: " << primaryAvailable);
+                    if (primaryAvailable) {
+                        self->switchTo(&self->config_.primary);
+                        done();
+                        return;
+                    }
+
+                    self->checkSwitchBackToPrimaryAsync(std::move(done), 
false);
+                });

Review Comment:
   When running on a secondary and that secondary becomes unavailable (and 
`failoverThreshold` is reached), this path only probes the primary and 
otherwise stays on the same (unavailable) secondary. If there are multiple 
configured secondaries, the provider never attempts to switch to another 
available secondary, which can leave the client stuck on a down cluster. 
Consider probing the remaining `config_.secondary` entries (skipping the 
current one) before/while checking the primary.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to