This is an automated email from the ASF dual-hosted git repository.

yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 1796657af9c Add StateTransitionThreadPoolManager to override 
getExecutor in SegmentOnlineOfflineStateModelFactory (#17453)
1796657af9c is described below

commit 1796657af9ce792b4a1cfba98ebeb0d15396be8b
Author: Jhow <[email protected]>
AuthorDate: Wed Jan 14 17:31:47 2026 -0800

    Add StateTransitionThreadPoolManager to override getExecutor in 
SegmentOnlineOfflineStateModelFactory (#17453)
---
 .../server/starter/helix/BaseServerStarter.java    | 17 ++++++-
 .../SegmentOnlineOfflineStateModelFactory.java     | 50 ++++++++++++++++++-
 .../helix/StateTransitionThreadPoolManager.java    | 57 ++++++++++++++++++++++
 3 files changed, 120 insertions(+), 4 deletions(-)

diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index b7419976f69..35e1927cd35 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -184,6 +184,9 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
   protected volatile boolean _isServerReadyToServeQueries = false;
   protected ScheduledExecutorService _helixMessageCountScheduler;
   protected ServerReloadJobStatusCache _reloadJobStatusCache;
+  // Override this to provide custom thread pool for Helix state transitions. 
Null means using Helix's default
+  @Nullable
+  protected StateTransitionThreadPoolManager _transitionThreadPoolManager;
 
   @Override
   public void init(PinotConfiguration serverConf)
@@ -287,6 +290,14 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
         HelixManagerFactory.getZKHelixManager(_helixClusterName, _instanceId, 
InstanceType.PARTICIPANT, _zkAddress);
 
     
_clusterConfigChangeHandler.registerClusterConfigChangeListener(ContinuousJfrStarter.INSTANCE);
+    initTransitionThreadPoolManager();
+  }
+
+  /**
+   * Override to provide custom transition thread pool manager
+   */
+  protected void initTransitionThreadPoolManager() {
+    _transitionThreadPoolManager = null;
   }
 
   /// Can be overridden to apply custom configs to the server conf.
@@ -737,7 +748,7 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
 
     initSegmentFetcher(_serverConf);
     StateModelFactory<?> stateModelFactory =
-        new SegmentOnlineOfflineStateModelFactory(_instanceId, 
instanceDataManager);
+        new SegmentOnlineOfflineStateModelFactory(_instanceId, 
instanceDataManager, _transitionThreadPoolManager);
     _helixManager.getStateMachineEngine()
         
.registerStateModelFactory(SegmentOnlineOfflineStateModelFactory.getStateModelName(),
 stateModelFactory);
     // Start the data manager as a pre-connect callback so that it starts 
after connecting to the ZK in order to access
@@ -950,7 +961,9 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
     _adminApiApplication.startShuttingDown();
     _helixAdmin.setConfig(_instanceConfigScope,
         Collections.singletonMap(Helix.IS_SHUTDOWN_IN_PROGRESS, 
Boolean.toString(true)));
-
+    if (_transitionThreadPoolManager != null) {
+      _transitionThreadPoolManager.shutdown();
+    }
     long endTimeMs =
         startTimeMs + 
_serverConf.getProperty(Server.CONFIG_OF_SHUTDOWN_TIMEOUT_MS, 
Server.DEFAULT_SHUTDOWN_TIMEOUT_MS);
     if (_serverConf.getProperty(Server.CONFIG_OF_SHUTDOWN_ENABLE_QUERY_CHECK,
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
index d547741706b..7f2feff5241 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pinot.server.starter.helix;
 
+import java.util.concurrent.ExecutorService;
+import javax.annotation.Nullable;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.model.Message;
 import org.apache.helix.participant.statemachine.StateModel;
@@ -40,10 +42,16 @@ public class SegmentOnlineOfflineStateModelFactory extends 
StateModelFactory<Sta
 
   private final String _instanceId;
   private final InstanceDataManager _instanceDataManager;
+  /** Provides custom thread pools for executing Helix state transition 
messages. If this is null, all state
+   * transition message will be executed using the default shared thread pool 
by Helix */
+  @Nullable
+  private final StateTransitionThreadPoolManager 
_stateTransitionThreadPoolManager;
 
-  public SegmentOnlineOfflineStateModelFactory(String instanceId, 
InstanceDataManager instanceDataManager) {
+  public SegmentOnlineOfflineStateModelFactory(String instanceId, 
InstanceDataManager instanceDataManager,
+      @Nullable StateTransitionThreadPoolManager 
stateTransitionThreadPoolManager) {
     _instanceId = instanceId;
     _instanceDataManager = instanceDataManager;
+    _stateTransitionThreadPoolManager = stateTransitionThreadPoolManager;
   }
 
   public static String getStateModelName() {
@@ -162,7 +170,6 @@ public class SegmentOnlineOfflineStateModelFactory extends 
StateModelFactory<Sta
     public void onBecomeOnlineFromOffline(Message message, NotificationContext 
context)
         throws Exception {
       _logger.info("SegmentOnlineOfflineStateModel.onBecomeOnlineFromOffline() 
: {}", message);
-
       try {
         _instanceDataManager.addOnlineSegment(message.getResourceName(), 
message.getPartitionName());
       } catch (Exception e) {
@@ -247,4 +254,43 @@ public class SegmentOnlineOfflineStateModelFactory extends 
StateModelFactory<Sta
       }
     }
   }
+
+  /**
+   * Get thread pool to handle the given state transition message.
+   * If this method returns null, the threadpool returned from
+   * {@link StateModelFactory#getExecutorService(String resourceName, String 
fromState, String toState)} will be used;
+   * If this method returns null the threadpool returned from
+   * {@link StateModelFactory#getExecutorService(String resourceName)} will be 
used.
+   * If that method return null too, then the default shared threadpool will 
be used.
+   * This method may be called only once for each category of messages,
+   * it will NOT be called during each state transition.
+   * @param messageInfo contains information used to categorize messages to 
use different threadpools
+   * @return An object contains the MessageIdentifierBase and the assigned 
threadpool for the input message
+   */
+  @Override
+  @Nullable
+  public CustomizedExecutorService getExecutorService(Message.MessageInfo 
messageInfo) {
+    if (_stateTransitionThreadPoolManager == null) {
+      return super.getExecutorService(messageInfo);
+    }
+    return _stateTransitionThreadPoolManager.getExecutorService(messageInfo);
+  }
+
+  @Override
+  @Nullable
+  public ExecutorService getExecutorService(String resourceName, String 
fromState, String toState) {
+    if (_stateTransitionThreadPoolManager == null) {
+      return super.getExecutorService(resourceName, fromState, toState);
+    }
+    return _stateTransitionThreadPoolManager.getExecutorService(resourceName, 
fromState, toState);
+  }
+
+  @Override
+  @Nullable
+  public ExecutorService getExecutorService(String resourceName) {
+    if (_stateTransitionThreadPoolManager == null) {
+      return super.getExecutorService(resourceName);
+    }
+    return _stateTransitionThreadPoolManager.getExecutorService(resourceName);
+  }
 }
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/StateTransitionThreadPoolManager.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/StateTransitionThreadPoolManager.java
new file mode 100644
index 00000000000..835ae296181
--- /dev/null
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/StateTransitionThreadPoolManager.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.starter.helix;
+
+import java.util.concurrent.ExecutorService;
+import javax.annotation.Nullable;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+
+
+/**
+ * Manages the custom Helix state transition thread pools for Pinot server.
+ * <br/>
+ * The implementation of this interface can be passed to {@link 
SegmentOnlineOfflineStateModelFactory} to provide custom
+ * thread pools for different state transition messages, instead of using 
Helix's managed thread pools.
+ * <br/>
+ * Helix maintains a cache from message identifier to assigned thread pool. 
Message identifiers are:
+ * <ol>
+ *   <li> {@link org.apache.helix.model.Message.MessageInfo} </li>
+ *   <li> (resourceName, fromState, toState) combination </li>
+ *   <li> resourceName </li>
+ * </ol>
+ * For each state transition message, Helix gets the message identifiers by 
the above order, and looks up the cache
+ * to find if it's mapped to a thread pool, executes the state transition on 
it if it's non-null in the cache. If
+ * three of the identifiers all map to null, the default Helix-managed thread 
pool would be used. During the lookup,
+ * it calls {@link SegmentOnlineOfflineStateModelFactory#getExecutorService} 
upon cache misses. The cache is never
+ * cleaned up until the server shuts down, so the method would only be called 
once for each argument combination
+ * (even if it returns null).
+ */
+public interface StateTransitionThreadPoolManager {
+  @Nullable
+  StateModelFactory.CustomizedExecutorService 
getExecutorService(Message.MessageInfo messageInfo);
+
+  @Nullable
+  ExecutorService getExecutorService(String resourceName, String fromState, 
String toState);
+
+  @Nullable
+  ExecutorService getExecutorService(String resourceName);
+
+  void shutdown();
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to