This is an automated email from the ASF dual-hosted git repository.
yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 1796657af9c Add StateTransitionThreadPoolManager to override
getExecutor in SegmentOnlineOfflineStateModelFactory (#17453)
1796657af9c is described below
commit 1796657af9ce792b4a1cfba98ebeb0d15396be8b
Author: Jhow <[email protected]>
AuthorDate: Wed Jan 14 17:31:47 2026 -0800
Add StateTransitionThreadPoolManager to override getExecutor in
SegmentOnlineOfflineStateModelFactory (#17453)
---
.../server/starter/helix/BaseServerStarter.java | 17 ++++++-
.../SegmentOnlineOfflineStateModelFactory.java | 50 ++++++++++++++++++-
.../helix/StateTransitionThreadPoolManager.java | 57 ++++++++++++++++++++++
3 files changed, 120 insertions(+), 4 deletions(-)
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index b7419976f69..35e1927cd35 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -184,6 +184,9 @@ public abstract class BaseServerStarter implements
ServiceStartable {
protected volatile boolean _isServerReadyToServeQueries = false;
protected ScheduledExecutorService _helixMessageCountScheduler;
protected ServerReloadJobStatusCache _reloadJobStatusCache;
+ // Override this to provide custom thread pool for Helix state transitions.
Null means using Helix's default
+ @Nullable
+ protected StateTransitionThreadPoolManager _transitionThreadPoolManager;
@Override
public void init(PinotConfiguration serverConf)
@@ -287,6 +290,14 @@ public abstract class BaseServerStarter implements
ServiceStartable {
HelixManagerFactory.getZKHelixManager(_helixClusterName, _instanceId,
InstanceType.PARTICIPANT, _zkAddress);
_clusterConfigChangeHandler.registerClusterConfigChangeListener(ContinuousJfrStarter.INSTANCE);
+ initTransitionThreadPoolManager();
+ }
+
+ /**
+ * Override to provide custom transition thread pool manager
+ */
+ protected void initTransitionThreadPoolManager() {
+ _transitionThreadPoolManager = null;
}
/// Can be overridden to apply custom configs to the server conf.
@@ -737,7 +748,7 @@ public abstract class BaseServerStarter implements
ServiceStartable {
initSegmentFetcher(_serverConf);
StateModelFactory<?> stateModelFactory =
- new SegmentOnlineOfflineStateModelFactory(_instanceId,
instanceDataManager);
+ new SegmentOnlineOfflineStateModelFactory(_instanceId,
instanceDataManager, _transitionThreadPoolManager);
_helixManager.getStateMachineEngine()
.registerStateModelFactory(SegmentOnlineOfflineStateModelFactory.getStateModelName(),
stateModelFactory);
// Start the data manager as a pre-connect callback so that it starts
after connecting to the ZK in order to access
@@ -950,7 +961,9 @@ public abstract class BaseServerStarter implements
ServiceStartable {
_adminApiApplication.startShuttingDown();
_helixAdmin.setConfig(_instanceConfigScope,
Collections.singletonMap(Helix.IS_SHUTDOWN_IN_PROGRESS,
Boolean.toString(true)));
-
+ if (_transitionThreadPoolManager != null) {
+ _transitionThreadPoolManager.shutdown();
+ }
long endTimeMs =
startTimeMs +
_serverConf.getProperty(Server.CONFIG_OF_SHUTDOWN_TIMEOUT_MS,
Server.DEFAULT_SHUTDOWN_TIMEOUT_MS);
if (_serverConf.getProperty(Server.CONFIG_OF_SHUTDOWN_ENABLE_QUERY_CHECK,
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
index d547741706b..7f2feff5241 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
@@ -18,6 +18,8 @@
*/
package org.apache.pinot.server.starter.helix;
+import java.util.concurrent.ExecutorService;
+import javax.annotation.Nullable;
import org.apache.helix.NotificationContext;
import org.apache.helix.model.Message;
import org.apache.helix.participant.statemachine.StateModel;
@@ -40,10 +42,16 @@ public class SegmentOnlineOfflineStateModelFactory extends
StateModelFactory<Sta
private final String _instanceId;
private final InstanceDataManager _instanceDataManager;
+ /** Provides custom thread pools for executing Helix state transition
messages. If this is null, all state
+ * transition message will be executed using the default shared thread pool
by Helix */
+ @Nullable
+ private final StateTransitionThreadPoolManager
_stateTransitionThreadPoolManager;
- public SegmentOnlineOfflineStateModelFactory(String instanceId,
InstanceDataManager instanceDataManager) {
+ public SegmentOnlineOfflineStateModelFactory(String instanceId,
InstanceDataManager instanceDataManager,
+ @Nullable StateTransitionThreadPoolManager
stateTransitionThreadPoolManager) {
_instanceId = instanceId;
_instanceDataManager = instanceDataManager;
+ _stateTransitionThreadPoolManager = stateTransitionThreadPoolManager;
}
public static String getStateModelName() {
@@ -162,7 +170,6 @@ public class SegmentOnlineOfflineStateModelFactory extends
StateModelFactory<Sta
public void onBecomeOnlineFromOffline(Message message, NotificationContext
context)
throws Exception {
_logger.info("SegmentOnlineOfflineStateModel.onBecomeOnlineFromOffline()
: {}", message);
-
try {
_instanceDataManager.addOnlineSegment(message.getResourceName(),
message.getPartitionName());
} catch (Exception e) {
@@ -247,4 +254,43 @@ public class SegmentOnlineOfflineStateModelFactory extends
StateModelFactory<Sta
}
}
}
+
+ /**
+ * Get thread pool to handle the given state transition message.
+ * If this method returns null, the threadpool returned from
+ * {@link StateModelFactory#getExecutorService(String resourceName, String
fromState, String toState)} will be used;
+ * If this method returns null the threadpool returned from
+ * {@link StateModelFactory#getExecutorService(String resourceName)} will be
used.
+ * If that method return null too, then the default shared threadpool will
be used.
+ * This method may be called only once for each category of messages,
+ * it will NOT be called during each state transition.
+ * @param messageInfo contains information used to categorize messages to
use different threadpools
+ * @return An object contains the MessageIdentifierBase and the assigned
threadpool for the input message
+ */
+ @Override
+ @Nullable
+ public CustomizedExecutorService getExecutorService(Message.MessageInfo
messageInfo) {
+ if (_stateTransitionThreadPoolManager == null) {
+ return super.getExecutorService(messageInfo);
+ }
+ return _stateTransitionThreadPoolManager.getExecutorService(messageInfo);
+ }
+
+ @Override
+ @Nullable
+ public ExecutorService getExecutorService(String resourceName, String
fromState, String toState) {
+ if (_stateTransitionThreadPoolManager == null) {
+ return super.getExecutorService(resourceName, fromState, toState);
+ }
+ return _stateTransitionThreadPoolManager.getExecutorService(resourceName,
fromState, toState);
+ }
+
+ @Override
+ @Nullable
+ public ExecutorService getExecutorService(String resourceName) {
+ if (_stateTransitionThreadPoolManager == null) {
+ return super.getExecutorService(resourceName);
+ }
+ return _stateTransitionThreadPoolManager.getExecutorService(resourceName);
+ }
}
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/StateTransitionThreadPoolManager.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/StateTransitionThreadPoolManager.java
new file mode 100644
index 00000000000..835ae296181
--- /dev/null
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/StateTransitionThreadPoolManager.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.starter.helix;
+
+import java.util.concurrent.ExecutorService;
+import javax.annotation.Nullable;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+
+
+/**
+ * Manages the custom Helix state transition thread pools for Pinot server.
+ * <br/>
+ * The implementation of this interface can be passed to {@link
SegmentOnlineOfflineStateModelFactory} to provide custom
+ * thread pools for different state transition messages, instead of using
Helix's managed thread pools.
+ * <br/>
+ * Helix maintains a cache from message identifier to assigned thread pool.
Message identifiers are:
+ * <ol>
+ * <li> {@link org.apache.helix.model.Message.MessageInfo} </li>
+ * <li> (resourceName, fromState, toState) combination </li>
+ * <li> resourceName </li>
+ * </ol>
+ * For each state transition message, Helix gets the message identifiers by
the above order, and looks up the cache
+ * to find if it's mapped to a thread pool, executes the state transition on
it if it's non-null in the cache. If
+ * three of the identifiers all map to null, the default Helix-managed thread
pool would be used. During the lookup,
+ * it calls {@link SegmentOnlineOfflineStateModelFactory#getExecutorService}
upon cache misses. The cache is never
+ * cleaned up until the server shuts down, so the method would only be called
once for each argument combination
+ * (even if it returns null).
+ */
+public interface StateTransitionThreadPoolManager {
+ @Nullable
+ StateModelFactory.CustomizedExecutorService
getExecutorService(Message.MessageInfo messageInfo);
+
+ @Nullable
+ ExecutorService getExecutorService(String resourceName, String fromState,
String toState);
+
+ @Nullable
+ ExecutorService getExecutorService(String resourceName);
+
+ void shutdown();
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]