This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d0a6aba30bd [multistage] Reset GRPC connection backoff when server is
re-enabled (#17466)
d0a6aba30bd is described below
commit d0a6aba30bdad4dc0179ecd189d42ff2913b71a9
Author: dang-stripe <[email protected]>
AuthorDate: Wed Jan 14 13:08:14 2026 -0800
[multistage] Reset GRPC connection backoff when server is re-enabled
(#17466)
---
.../broker/broker/helix/BaseBrokerStarter.java | 3 +
.../MultiStageBrokerRequestHandler.java | 4 +
.../routing/manager/BaseBrokerRoutingManager.java | 17 ++
.../routing/manager/BrokerRoutingManagerTest.java | 220 +++++++++++++++++++++
.../query/service/dispatch/QueryDispatcher.java | 26 ++-
5 files changed, 266 insertions(+), 4 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
index e73efb92134..8ab0498d5a7 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
@@ -415,6 +415,9 @@ public abstract class BaseBrokerStarter implements
ServiceStartable {
new MultiStageBrokerRequestHandler(_brokerConf, brokerId,
requestIdGenerator, _routingManager,
_accessControlFactory, _queryQuotaManager, _tableCache,
_multiStageQueryThrottler, _failureDetector,
_threadAccountant, multiClusterRoutingContext);
+ MultiStageBrokerRequestHandler finalHandler =
multiStageBrokerRequestHandler;
+ _routingManager.setServerReenableCallback(
+ serverInstance ->
finalHandler.getQueryDispatcher().resetClientConnectionBackoff(serverInstance));
}
TimeSeriesRequestHandler timeSeriesRequestHandler = null;
if
(StringUtils.isNotBlank(_brokerConf.getProperty(PinotTimeSeriesConfiguration.getEnabledLanguagesConfigKey())))
{
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 0db5addd53e..717b8fb1430 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -855,4 +855,8 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
return false;
}
}
+
+ public QueryDispatcher getQueryDispatcher() {
+ return _queryDispatcher;
+ }
}
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/BaseBrokerRoutingManager.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/BaseBrokerRoutingManager.java
index e2ecdf59033..320fc2eebbb 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/BaseBrokerRoutingManager.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/BaseBrokerRoutingManager.java
@@ -36,6 +36,7 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.AccessOption;
@@ -129,6 +130,8 @@ public abstract class BaseBrokerRoutingManager implements
RoutingManager, Cluste
private final PinotConfiguration _pinotConfig;
private final boolean _enablePartitionMetadataManager;
private final ExecutorService _executorService;
+ @Nullable
+ private Consumer<ServerInstance> _serverReenableCallback;
// Global read-write lock for protecting the global data structures such as
_enabledServerInstanceMap,
// _excludedServers, and _routableServers. Write lock must be held if any of
these are modified, read lock must be
@@ -180,6 +183,14 @@ public abstract class BaseBrokerRoutingManager implements
RoutingManager, Cluste
_propertyStore = helixManager.getHelixPropertyStore();
}
+ /**
+ * Sets a callback to be invoked when a server is re-enabled after being
excluded.
+ * This is useful for resetting gRPC channel state to avoid exponential
backoff delays.
+ */
+ public void setServerReenableCallback(Consumer<ServerInstance> callback) {
+ _serverReenableCallback = callback;
+ }
+
private Object getRoutingTableBuildLock(String tableNameWithType) {
String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
return _routingTableBuildLocks.computeIfAbsent(rawTableName, k -> new
Object());
@@ -387,6 +398,12 @@ public abstract class BaseBrokerRoutingManager implements
RoutingManager, Cluste
// NOTE: Remove new enabled server from excluded servers because
the server is likely being restarted
if (_excludedServers.remove(instanceId)) {
LOGGER.info("Got excluded server: {} re-enabled, including it
into the routing", instanceId);
+ // We clear any GRPC channel reconnection backoff in this
callback when a server is re-enabled. Otherwise,
+ // MSE queries to this server may fail fast until the next
backoff retry succeeds.
+ if (_serverReenableCallback != null) {
+ LOGGER.info("Calling server re-enable callback for server:
{}", instanceId);
+ _serverReenableCallback.accept(serverInstance);
+ }
}
}
}
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/manager/BrokerRoutingManagerTest.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/manager/BrokerRoutingManagerTest.java
new file mode 100644
index 00000000000..e03153e2f08
--- /dev/null
+++
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/manager/BrokerRoutingManagerTest.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.routing.manager;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Consumer;
+import org.apache.helix.AccessOption;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixConstants.ChangeType;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.core.transport.ServerInstance;
+import
org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+public class BrokerRoutingManagerTest {
+ private static final String SERVER_INSTANCE_ID = "Server_localhost_8000";
+ private static final String SERVER_HOST = "localhost";
+ private static final int SERVER_PORT = 8000;
+ private static final String INSTANCE_CONFIGS_PATH = "/CONFIGS/PARTICIPANT";
+
+ private AutoCloseable _mocks;
+
+ @Mock
+ private BrokerMetrics _brokerMetrics;
+
+ @Mock
+ private ServerRoutingStatsManager _serverRoutingStatsManager;
+
+ @Mock
+ private HelixManager _helixManager;
+
+ @Mock
+ private HelixDataAccessor _helixDataAccessor;
+
+ @Mock
+ private BaseDataAccessor<ZNRecord> _zkDataAccessor;
+
+ @Mock
+ private ZkHelixPropertyStore<ZNRecord> _propertyStore;
+
+ @Mock
+ private PropertyKey.Builder _keyBuilder;
+
+ @Mock
+ private PropertyKey _instanceConfigsKey;
+
+ @Mock
+ private Consumer<ServerInstance> _serverReenableCallback;
+
+ private BrokerRoutingManager _routingManager;
+
+ @BeforeMethod
+ public void setUp() {
+ _mocks = MockitoAnnotations.openMocks(this);
+
+ // Set up Helix mocks
+ when(_helixManager.getHelixDataAccessor()).thenReturn(_helixDataAccessor);
+ when(_helixManager.getHelixPropertyStore()).thenReturn(_propertyStore);
+ when(_helixDataAccessor.getBaseDataAccessor()).thenReturn(_zkDataAccessor);
+ when(_helixDataAccessor.keyBuilder()).thenReturn(_keyBuilder);
+ when(_keyBuilder.instanceConfigs()).thenReturn(_instanceConfigsKey);
+ when(_keyBuilder.externalViews()).thenReturn(mock(PropertyKey.class));
+ when(_keyBuilder.idealStates()).thenReturn(mock(PropertyKey.class));
+ when(_instanceConfigsKey.getPath()).thenReturn(INSTANCE_CONFIGS_PATH);
+
+ // Mock paths for external views and ideal states
+ PropertyKey evKey = mock(PropertyKey.class);
+ PropertyKey isKey = mock(PropertyKey.class);
+ when(_keyBuilder.externalViews()).thenReturn(evKey);
+ when(_keyBuilder.idealStates()).thenReturn(isKey);
+ when(evKey.getPath()).thenReturn("/EXTERNALVIEW");
+ when(isKey.getPath()).thenReturn("/IDEALSTATES");
+
+ // Create routing manager
+ _routingManager = new BrokerRoutingManager(_brokerMetrics,
_serverRoutingStatsManager, new PinotConfiguration());
+ _routingManager.init(_helixManager);
+ }
+
+ @AfterMethod
+ public void tearDown()
+ throws Exception {
+ _mocks.close();
+ }
+
+ @Test
+ public void testNoErrorWhenCallbackNotSet() {
+ // Don't set callback
+
+ // Enable server
+ List<ZNRecord> instanceConfigs =
Collections.singletonList(createEnabledServerZNRecord(SERVER_INSTANCE_ID));
+ when(_zkDataAccessor.getChildren(eq(INSTANCE_CONFIGS_PATH), any(),
eq(AccessOption.PERSISTENT),
+ anyInt(), anyInt())).thenReturn(instanceConfigs);
+ _routingManager.processClusterChange(ChangeType.INSTANCE_CONFIG);
+
+ // Exclude server
+ _routingManager.excludeServerFromRouting(SERVER_INSTANCE_ID);
+
+ // Disable then re-enable
+ when(_zkDataAccessor.getChildren(eq(INSTANCE_CONFIGS_PATH), any(),
eq(AccessOption.PERSISTENT),
+ anyInt(), anyInt())).thenReturn(Collections.emptyList());
+ _routingManager.processClusterChange(ChangeType.INSTANCE_CONFIG);
+
+ when(_zkDataAccessor.getChildren(eq(INSTANCE_CONFIGS_PATH), any(),
eq(AccessOption.PERSISTENT),
+ anyInt(), anyInt())).thenReturn(instanceConfigs);
+
+ // Should not throw NPE
+ _routingManager.processClusterChange(ChangeType.INSTANCE_CONFIG);
+
+ // Server should be re-enabled in the map
+
assertTrue(_routingManager.getEnabledServerInstanceMap().containsKey(SERVER_INSTANCE_ID));
+ }
+
+ @Test
+ public void testServerReenableCallbackInvokedWhenExcludedServerReenabled() {
+ // Set up callback
+ _routingManager.setServerReenableCallback(_serverReenableCallback);
+
+ // First, enable the server by processing instance config change
+ List<ZNRecord> instanceConfigs =
Collections.singletonList(createEnabledServerZNRecord(SERVER_INSTANCE_ID));
+ when(_zkDataAccessor.getChildren(eq(INSTANCE_CONFIGS_PATH), any(),
eq(AccessOption.PERSISTENT),
+ anyInt(), anyInt())).thenReturn(instanceConfigs);
+
+ _routingManager.processClusterChange(ChangeType.INSTANCE_CONFIG);
+
+ // Verify server is now in enabled map
+
assertTrue(_routingManager.getEnabledServerInstanceMap().containsKey(SERVER_INSTANCE_ID));
+
+ // Exclude the server (simulating failure detector marking it unhealthy)
+ _routingManager.excludeServerFromRouting(SERVER_INSTANCE_ID);
+
+ // Now simulate server being disabled then re-enabled (e.g., restart)
+ // First, disable
+ when(_zkDataAccessor.getChildren(eq(INSTANCE_CONFIGS_PATH), any(),
eq(AccessOption.PERSISTENT),
+ anyInt(), anyInt())).thenReturn(Collections.emptyList());
+ _routingManager.processClusterChange(ChangeType.INSTANCE_CONFIG);
+
+ // Then re-enable
+ when(_zkDataAccessor.getChildren(eq(INSTANCE_CONFIGS_PATH), any(),
eq(AccessOption.PERSISTENT),
+ anyInt(), anyInt())).thenReturn(instanceConfigs);
+ _routingManager.processClusterChange(ChangeType.INSTANCE_CONFIG);
+
+ // Verify callback was invoked with correct ServerInstance
+ ArgumentCaptor<ServerInstance> captor =
ArgumentCaptor.forClass(ServerInstance.class);
+ verify(_serverReenableCallback).accept(captor.capture());
+
+ ServerInstance capturedInstance = captor.getValue();
+ assertEquals(capturedInstance.getHostname(), SERVER_HOST);
+ assertEquals(capturedInstance.getPort(), SERVER_PORT);
+ }
+
+ @Test
+ public void testServerReenableCallbackNotInvokedForNewServer() {
+ // Set up callback
+ _routingManager.setServerReenableCallback(_serverReenableCallback);
+
+ // Enable a new server (never excluded)
+ List<ZNRecord> instanceConfigs =
Collections.singletonList(createEnabledServerZNRecord(SERVER_INSTANCE_ID));
+ when(_zkDataAccessor.getChildren(eq(INSTANCE_CONFIGS_PATH), any(),
eq(AccessOption.PERSISTENT),
+ anyInt(), anyInt())).thenReturn(instanceConfigs);
+
+ _routingManager.processClusterChange(ChangeType.INSTANCE_CONFIG);
+
+ // Verify callback was NOT invoked (server was never excluded)
+ verify(_serverReenableCallback, never()).accept(any());
+ }
+
+ /**
+ * Creates a ZNRecord representing an enabled server instance.
+ */
+ private ZNRecord createEnabledServerZNRecord(String instanceId) {
+ ZNRecord record = new ZNRecord(instanceId);
+
record.setSimpleField(InstanceConfig.InstanceConfigProperty.HELIX_ENABLED.name(),
"true");
+
record.setSimpleField(InstanceConfig.InstanceConfigProperty.HELIX_HOST.name(),
+ instanceId.split("_")[1]); // Extract host from Server_host_port
+
record.setSimpleField(InstanceConfig.InstanceConfigProperty.HELIX_PORT.name(),
+ instanceId.split("_")[2]); // Extract port from Server_host_port
+ // Don't set IS_SHUTDOWN_IN_PROGRESS or QUERIES_DISABLED (they default to
false)
+ return record;
+ }
+}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
index 4695f336196..aa5d55d68b0 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
@@ -255,9 +255,8 @@ public class QueryDispatcher {
public FailureDetector.ServerState
checkConnectivityToInstance(ServerInstance serverInstance) {
String hostname = serverInstance.getHostname();
int port = serverInstance.getQueryServicePort();
- String hostnamePort = String.format("%s_%d", hostname, port);
- DispatchClient client = _dispatchClientMap.get(hostnamePort);
+ DispatchClient client = _dispatchClientMap.get(toHostnamePortKey(hostname,
port));
// Could occur if the cluster is only serving single-stage queries
if (client == null) {
LOGGER.debug("No DispatchClient found for server with instanceId: {}",
serverInstance.getInstanceId());
@@ -510,8 +509,27 @@ public class QueryDispatcher {
private DispatchClient getOrCreateDispatchClient(QueryServerInstance
queryServerInstance) {
String hostname = queryServerInstance.getHostname();
int port = queryServerInstance.getQueryServicePort();
- String hostnamePort = String.format("%s_%d", hostname, port);
- return _dispatchClientMap.computeIfAbsent(hostnamePort, k -> new
DispatchClient(hostname, port, _tlsConfig));
+ return _dispatchClientMap.computeIfAbsent(toHostnamePortKey(hostname,
port),
+ k -> new DispatchClient(hostname, port, _tlsConfig));
+ }
+
+ /**
+ * Reset the connection backoff for a server. When the GRPC channel enters a
TRANSIENT_FAILURE state from
+ * connection failures, it will fast fail requests and reconnect with
exponential backoff. This method
+ * resets the backoff so servers that have recovered can be reconnected to
immediately.
+ */
+ public void resetClientConnectionBackoff(ServerInstance serverInstance) {
+ String hostname = serverInstance.getHostname();
+ int port = serverInstance.getQueryServicePort();
+ DispatchClient dispatchClient =
_dispatchClientMap.get(toHostnamePortKey(hostname, port));
+ if (dispatchClient != null) {
+ LOGGER.info("Resetting connection backoff for server: {}",
serverInstance.getInstanceId());
+ dispatchClient.getChannel().resetConnectBackoff();
+ }
+ }
+
+ private static String toHostnamePortKey(String hostname, int port) {
+ return String.format("%s_%d", hostname, port);
}
/// Concatenates the results of the sub-plan and returns a [QueryResult]
with the concatenated result.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]