This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d0a6aba30bd [multistage] Reset GRPC connection backoff when server is 
re-enabled (#17466)
d0a6aba30bd is described below

commit d0a6aba30bdad4dc0179ecd189d42ff2913b71a9
Author: dang-stripe <[email protected]>
AuthorDate: Wed Jan 14 13:08:14 2026 -0800

    [multistage] Reset GRPC connection backoff when server is re-enabled 
(#17466)
---
 .../broker/broker/helix/BaseBrokerStarter.java     |   3 +
 .../MultiStageBrokerRequestHandler.java            |   4 +
 .../routing/manager/BaseBrokerRoutingManager.java  |  17 ++
 .../routing/manager/BrokerRoutingManagerTest.java  | 220 +++++++++++++++++++++
 .../query/service/dispatch/QueryDispatcher.java    |  26 ++-
 5 files changed, 266 insertions(+), 4 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
index e73efb92134..8ab0498d5a7 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
@@ -415,6 +415,9 @@ public abstract class BaseBrokerStarter implements 
ServiceStartable {
           new MultiStageBrokerRequestHandler(_brokerConf, brokerId, 
requestIdGenerator, _routingManager,
               _accessControlFactory, _queryQuotaManager, _tableCache, 
_multiStageQueryThrottler, _failureDetector,
               _threadAccountant, multiClusterRoutingContext);
+      MultiStageBrokerRequestHandler finalHandler = 
multiStageBrokerRequestHandler;
+      _routingManager.setServerReenableCallback(
+          serverInstance -> 
finalHandler.getQueryDispatcher().resetClientConnectionBackoff(serverInstance));
     }
     TimeSeriesRequestHandler timeSeriesRequestHandler = null;
     if 
(StringUtils.isNotBlank(_brokerConf.getProperty(PinotTimeSeriesConfiguration.getEnabledLanguagesConfigKey())))
 {
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 0db5addd53e..717b8fb1430 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -855,4 +855,8 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
         return false;
     }
   }
+
+  public QueryDispatcher getQueryDispatcher() {
+    return _queryDispatcher;
+  }
 }
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/BaseBrokerRoutingManager.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/BaseBrokerRoutingManager.java
index e2ecdf59033..320fc2eebbb 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/BaseBrokerRoutingManager.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/BaseBrokerRoutingManager.java
@@ -36,6 +36,7 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Consumer;
 import javax.annotation.Nullable;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.helix.AccessOption;
@@ -129,6 +130,8 @@ public abstract class BaseBrokerRoutingManager implements 
RoutingManager, Cluste
   private final PinotConfiguration _pinotConfig;
   private final boolean _enablePartitionMetadataManager;
   private final ExecutorService _executorService;
+  @Nullable
+  private Consumer<ServerInstance> _serverReenableCallback;
 
   // Global read-write lock for protecting the global data structures such as 
_enabledServerInstanceMap,
   // _excludedServers, and _routableServers. Write lock must be held if any of 
these are modified, read lock must be
@@ -180,6 +183,14 @@ public abstract class BaseBrokerRoutingManager implements 
RoutingManager, Cluste
     _propertyStore = helixManager.getHelixPropertyStore();
   }
 
+  /**
+   * Sets a callback to be invoked when a server is re-enabled after being 
excluded.
+   * This is useful for resetting gRPC channel state to avoid exponential 
backoff delays.
+   */
+  public void setServerReenableCallback(Consumer<ServerInstance> callback) {
+    _serverReenableCallback = callback;
+  }
+
   private Object getRoutingTableBuildLock(String tableNameWithType) {
     String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
     return _routingTableBuildLocks.computeIfAbsent(rawTableName, k -> new 
Object());
@@ -387,6 +398,12 @@ public abstract class BaseBrokerRoutingManager implements 
RoutingManager, Cluste
             // NOTE: Remove new enabled server from excluded servers because 
the server is likely being restarted
             if (_excludedServers.remove(instanceId)) {
               LOGGER.info("Got excluded server: {} re-enabled, including it 
into the routing", instanceId);
+              // We clear any GRPC channel reconnection backoff in this 
callback when a server is re-enabled. Otherwise,
+              // MSE queries to this server may fail fast until the next 
backoff retry succeeds.
+              if (_serverReenableCallback != null) {
+                LOGGER.info("Calling server re-enable callback for server: 
{}", instanceId);
+                _serverReenableCallback.accept(serverInstance);
+              }
             }
           }
         }
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/manager/BrokerRoutingManagerTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/manager/BrokerRoutingManagerTest.java
new file mode 100644
index 00000000000..e03153e2f08
--- /dev/null
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/manager/BrokerRoutingManagerTest.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.routing.manager;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Consumer;
+import org.apache.helix.AccessOption;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixConstants.ChangeType;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.core.transport.ServerInstance;
+import 
org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+public class BrokerRoutingManagerTest {
+  private static final String SERVER_INSTANCE_ID = "Server_localhost_8000";
+  private static final String SERVER_HOST = "localhost";
+  private static final int SERVER_PORT = 8000;
+  private static final String INSTANCE_CONFIGS_PATH = "/CONFIGS/PARTICIPANT";
+
+  private AutoCloseable _mocks;
+
+  @Mock
+  private BrokerMetrics _brokerMetrics;
+
+  @Mock
+  private ServerRoutingStatsManager _serverRoutingStatsManager;
+
+  @Mock
+  private HelixManager _helixManager;
+
+  @Mock
+  private HelixDataAccessor _helixDataAccessor;
+
+  @Mock
+  private BaseDataAccessor<ZNRecord> _zkDataAccessor;
+
+  @Mock
+  private ZkHelixPropertyStore<ZNRecord> _propertyStore;
+
+  @Mock
+  private PropertyKey.Builder _keyBuilder;
+
+  @Mock
+  private PropertyKey _instanceConfigsKey;
+
+  @Mock
+  private Consumer<ServerInstance> _serverReenableCallback;
+
+  private BrokerRoutingManager _routingManager;
+
+  @BeforeMethod
+  public void setUp() {
+    _mocks = MockitoAnnotations.openMocks(this);
+
+    // Set up Helix mocks
+    when(_helixManager.getHelixDataAccessor()).thenReturn(_helixDataAccessor);
+    when(_helixManager.getHelixPropertyStore()).thenReturn(_propertyStore);
+    when(_helixDataAccessor.getBaseDataAccessor()).thenReturn(_zkDataAccessor);
+    when(_helixDataAccessor.keyBuilder()).thenReturn(_keyBuilder);
+    when(_keyBuilder.instanceConfigs()).thenReturn(_instanceConfigsKey);
+    when(_keyBuilder.externalViews()).thenReturn(mock(PropertyKey.class));
+    when(_keyBuilder.idealStates()).thenReturn(mock(PropertyKey.class));
+    when(_instanceConfigsKey.getPath()).thenReturn(INSTANCE_CONFIGS_PATH);
+
+    // Mock paths for external views and ideal states
+    PropertyKey evKey = mock(PropertyKey.class);
+    PropertyKey isKey = mock(PropertyKey.class);
+    when(_keyBuilder.externalViews()).thenReturn(evKey);
+    when(_keyBuilder.idealStates()).thenReturn(isKey);
+    when(evKey.getPath()).thenReturn("/EXTERNALVIEW");
+    when(isKey.getPath()).thenReturn("/IDEALSTATES");
+
+    // Create routing manager
+    _routingManager = new BrokerRoutingManager(_brokerMetrics, 
_serverRoutingStatsManager, new PinotConfiguration());
+    _routingManager.init(_helixManager);
+  }
+
+  @AfterMethod
+  public void tearDown()
+      throws Exception {
+    _mocks.close();
+  }
+
+  @Test
+  public void testNoErrorWhenCallbackNotSet() {
+    // Don't set callback
+
+    // Enable server
+    List<ZNRecord> instanceConfigs = 
Collections.singletonList(createEnabledServerZNRecord(SERVER_INSTANCE_ID));
+    when(_zkDataAccessor.getChildren(eq(INSTANCE_CONFIGS_PATH), any(), 
eq(AccessOption.PERSISTENT),
+            anyInt(), anyInt())).thenReturn(instanceConfigs);
+    _routingManager.processClusterChange(ChangeType.INSTANCE_CONFIG);
+
+    // Exclude server
+    _routingManager.excludeServerFromRouting(SERVER_INSTANCE_ID);
+
+    // Disable then re-enable
+    when(_zkDataAccessor.getChildren(eq(INSTANCE_CONFIGS_PATH), any(), 
eq(AccessOption.PERSISTENT),
+            anyInt(), anyInt())).thenReturn(Collections.emptyList());
+    _routingManager.processClusterChange(ChangeType.INSTANCE_CONFIG);
+
+    when(_zkDataAccessor.getChildren(eq(INSTANCE_CONFIGS_PATH), any(), 
eq(AccessOption.PERSISTENT),
+            anyInt(), anyInt())).thenReturn(instanceConfigs);
+
+    // Should not throw NPE
+    _routingManager.processClusterChange(ChangeType.INSTANCE_CONFIG);
+
+    // Server should be re-enabled in the map
+    
assertTrue(_routingManager.getEnabledServerInstanceMap().containsKey(SERVER_INSTANCE_ID));
+  }
+
+  @Test
+  public void testServerReenableCallbackInvokedWhenExcludedServerReenabled() {
+    // Set up callback
+    _routingManager.setServerReenableCallback(_serverReenableCallback);
+
+    // First, enable the server by processing instance config change
+    List<ZNRecord> instanceConfigs = 
Collections.singletonList(createEnabledServerZNRecord(SERVER_INSTANCE_ID));
+    when(_zkDataAccessor.getChildren(eq(INSTANCE_CONFIGS_PATH), any(), 
eq(AccessOption.PERSISTENT),
+        anyInt(), anyInt())).thenReturn(instanceConfigs);
+
+    _routingManager.processClusterChange(ChangeType.INSTANCE_CONFIG);
+
+    // Verify server is now in enabled map
+    
assertTrue(_routingManager.getEnabledServerInstanceMap().containsKey(SERVER_INSTANCE_ID));
+
+    // Exclude the server (simulating failure detector marking it unhealthy)
+    _routingManager.excludeServerFromRouting(SERVER_INSTANCE_ID);
+
+    // Now simulate server being disabled then re-enabled (e.g., restart)
+    // First, disable
+    when(_zkDataAccessor.getChildren(eq(INSTANCE_CONFIGS_PATH), any(), 
eq(AccessOption.PERSISTENT),
+        anyInt(), anyInt())).thenReturn(Collections.emptyList());
+    _routingManager.processClusterChange(ChangeType.INSTANCE_CONFIG);
+
+    // Then re-enable
+    when(_zkDataAccessor.getChildren(eq(INSTANCE_CONFIGS_PATH), any(), 
eq(AccessOption.PERSISTENT),
+        anyInt(), anyInt())).thenReturn(instanceConfigs);
+    _routingManager.processClusterChange(ChangeType.INSTANCE_CONFIG);
+
+    // Verify callback was invoked with correct ServerInstance
+    ArgumentCaptor<ServerInstance> captor = 
ArgumentCaptor.forClass(ServerInstance.class);
+    verify(_serverReenableCallback).accept(captor.capture());
+
+    ServerInstance capturedInstance = captor.getValue();
+    assertEquals(capturedInstance.getHostname(), SERVER_HOST);
+    assertEquals(capturedInstance.getPort(), SERVER_PORT);
+  }
+
+  @Test
+  public void testServerReenableCallbackNotInvokedForNewServer() {
+    // Set up callback
+    _routingManager.setServerReenableCallback(_serverReenableCallback);
+
+    // Enable a new server (never excluded)
+    List<ZNRecord> instanceConfigs = 
Collections.singletonList(createEnabledServerZNRecord(SERVER_INSTANCE_ID));
+    when(_zkDataAccessor.getChildren(eq(INSTANCE_CONFIGS_PATH), any(), 
eq(AccessOption.PERSISTENT),
+        anyInt(), anyInt())).thenReturn(instanceConfigs);
+
+    _routingManager.processClusterChange(ChangeType.INSTANCE_CONFIG);
+
+    // Verify callback was NOT invoked (server was never excluded)
+    verify(_serverReenableCallback, never()).accept(any());
+  }
+
+  /**
+   * Creates a ZNRecord representing an enabled server instance.
+   */
+  private ZNRecord createEnabledServerZNRecord(String instanceId) {
+    ZNRecord record = new ZNRecord(instanceId);
+    
record.setSimpleField(InstanceConfig.InstanceConfigProperty.HELIX_ENABLED.name(),
 "true");
+    
record.setSimpleField(InstanceConfig.InstanceConfigProperty.HELIX_HOST.name(),
+        instanceId.split("_")[1]); // Extract host from Server_host_port
+    
record.setSimpleField(InstanceConfig.InstanceConfigProperty.HELIX_PORT.name(),
+        instanceId.split("_")[2]); // Extract port from Server_host_port
+    // Don't set IS_SHUTDOWN_IN_PROGRESS or QUERIES_DISABLED (they default to 
false)
+    return record;
+  }
+}
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
index 4695f336196..aa5d55d68b0 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
@@ -255,9 +255,8 @@ public class QueryDispatcher {
   public FailureDetector.ServerState 
checkConnectivityToInstance(ServerInstance serverInstance) {
     String hostname = serverInstance.getHostname();
     int port = serverInstance.getQueryServicePort();
-    String hostnamePort = String.format("%s_%d", hostname, port);
 
-    DispatchClient client = _dispatchClientMap.get(hostnamePort);
+    DispatchClient client = _dispatchClientMap.get(toHostnamePortKey(hostname, 
port));
     // Could occur if the cluster is only serving single-stage queries
     if (client == null) {
       LOGGER.debug("No DispatchClient found for server with instanceId: {}", 
serverInstance.getInstanceId());
@@ -510,8 +509,27 @@ public class QueryDispatcher {
   private DispatchClient getOrCreateDispatchClient(QueryServerInstance 
queryServerInstance) {
     String hostname = queryServerInstance.getHostname();
     int port = queryServerInstance.getQueryServicePort();
-    String hostnamePort = String.format("%s_%d", hostname, port);
-    return _dispatchClientMap.computeIfAbsent(hostnamePort, k -> new 
DispatchClient(hostname, port, _tlsConfig));
+    return _dispatchClientMap.computeIfAbsent(toHostnamePortKey(hostname, 
port),
+        k -> new DispatchClient(hostname, port, _tlsConfig));
+  }
+
+  /**
+   * Reset the connection backoff for a server. When the GRPC channel enters a 
TRANSIENT_FAILURE state from
+   * connection failures, it will fast fail requests and reconnect with 
exponential backoff. This method
+   * resets the backoff so servers that have recovered can be reconnected to 
immediately.
+   */
+  public void resetClientConnectionBackoff(ServerInstance serverInstance) {
+    String hostname = serverInstance.getHostname();
+    int port = serverInstance.getQueryServicePort();
+    DispatchClient dispatchClient = 
_dispatchClientMap.get(toHostnamePortKey(hostname, port));
+    if (dispatchClient != null) {
+      LOGGER.info("Resetting connection backoff for server: {}", 
serverInstance.getInstanceId());
+      dispatchClient.getChannel().resetConnectBackoff();
+    }
+  }
+
+  private static String toHostnamePortKey(String hostname, int port) {
+    return String.format("%s_%d", hostname, port);
   }
 
   /// Concatenates the results of the sub-plan and returns a [QueryResult] 
with the concatenated result.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to