This is an automated email from the ASF dual-hosted git repository. zhaocong pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit b41488d7e86d94703c40196894727d63ab02ddff Author: Wenzhi Feng <[email protected]> AuthorDate: Fri Jan 23 17:42:32 2026 +0800 [fix][client] ControlledClusterFailover avoid unnecessary reconnection. (#25178) Co-authored-by: fengwenzhi <[email protected]> (cherry picked from commit f0ec07b3d8c5cfe36942957fc0ad32e40d69320d) --- .../apache/pulsar/client/impl/ControlledClusterFailover.java | 10 ++++++++++ .../pulsar/client/impl/ControlledClusterFailoverTest.java | 11 +++++++++++ 2 files changed, 21 insertions(+) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ControlledClusterFailover.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ControlledClusterFailover.java index c52a70c6cd7..1d4740b8472 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ControlledClusterFailover.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ControlledClusterFailover.java @@ -42,6 +42,7 @@ import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.ControlledClusterFailoverBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.ServiceUrlProvider; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.client.util.ExecutorProvider; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.asynchttpclient.AsyncHttpClient; @@ -109,6 +110,15 @@ public class ControlledClusterFailover implements ServiceUrlProvider { public void initialize(PulsarClient client) { this.pulsarClient = (PulsarClientImpl) client; + // Initialize currentControlledConfiguration from client's current configuration + // to avoid unnecessary reconnection on first scheduled check when the configuration hasn't changed + ClientConfigurationData conf = pulsarClient.getConfiguration(); + this.currentControlledConfiguration = new ControlledConfiguration(); + this.currentControlledConfiguration.setServiceUrl(currentPulsarServiceUrl); + this.currentControlledConfiguration.setTlsTrustCertsFilePath(conf.getTlsTrustCertsFilePath()); + this.currentControlledConfiguration.setAuthPluginClassName(conf.getAuthPluginClassName()); + this.currentControlledConfiguration.setAuthParamsString(conf.getAuthParams()); + // start to check service url every 30 seconds this.executor.scheduleAtFixedRate(catchingAndLoggingThrowables(() -> { ControlledConfiguration controlledConfiguration = null; diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java index 265af6dd23c..86b2fa7cb4f 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java @@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit; import lombok.Cleanup; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.ServiceUrlProvider; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.asynchttpclient.Request; import org.awaitility.Awaitility; import org.mockito.Mockito; @@ -54,6 +55,14 @@ public class ControlledClusterFailoverTest { .build(); ControlledClusterFailover controlledClusterFailover = (ControlledClusterFailover) provider; + + PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class); + ConnectionPool connectionPool = mock(ConnectionPool.class); + ClientConfigurationData clientConf = new ClientConfigurationData(); + when(pulsarClient.getCnxPool()).thenReturn(connectionPool); + when(pulsarClient.getConfiguration()).thenReturn(clientConf); + controlledClusterFailover.initialize(pulsarClient); + Request request = controlledClusterFailover.getRequestBuilder().build(); Assert.assertTrue(provider instanceof ControlledClusterFailover); @@ -91,7 +100,9 @@ public class ControlledClusterFailoverTest { ControlledClusterFailover controlledClusterFailover = Mockito.spy((ControlledClusterFailover) provider); PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class); ConnectionPool connectionPool = mock(ConnectionPool.class); + ClientConfigurationData clientConf = new ClientConfigurationData(); when(pulsarClient.getCnxPool()).thenReturn(connectionPool); + when(pulsarClient.getConfiguration()).thenReturn(clientConf); controlledClusterFailover.initialize(pulsarClient);
