This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch upgrade_helix
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit caa8f4ed4aab3c4295f81f0edd469906ab02dbc8
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Thu Feb 13 15:15:41 2025 -0800

    Upgrade Helix to 1.4.3
---
 .../helix/core/PinotHelixResourceManager.java      | 167 ++++++---------------
 .../assignment/segment/SegmentAssignmentUtils.java |  86 +++++++++--
 .../BalancedNumSegmentAssignmentStrategy.java      |   7 +-
 .../realtime/PinotLLCRealtimeSegmentManager.java   |  21 +--
 .../helix/core/relocation/SegmentRelocator.java    |  17 +--
 .../helix/core/util/MessagingServiceUtils.java     |  74 +++++++++
 .../segment/SegmentAssignmentUtilsTest.java        |  27 ++--
 .../core/relocation/SegmentRelocatorTest.java      |   4 +-
 pom.xml                                            |   2 +-
 9 files changed, 218 insertions(+), 187 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 1f983ce671..908c03b6f0 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -63,12 +63,10 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.helix.AccessOption;
 import org.apache.helix.ClusterMessagingService;
-import org.apache.helix.Criteria;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
-import org.apache.helix.InstanceType;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
@@ -155,6 +153,7 @@ import 
org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
 import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceContext;
 import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
 import 
org.apache.pinot.controller.helix.core.rebalance.ZkBasedTableRebalanceObserver;
+import org.apache.pinot.controller.helix.core.util.MessagingServiceUtils;
 import org.apache.pinot.controller.helix.starter.HelixConfig;
 import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.spi.config.DatabaseConfig;
@@ -195,10 +194,8 @@ public class PinotHelixResourceManager {
   private static final RetryPolicy DEFAULT_RETRY_POLICY = 
RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 2.0f);
   private static final int DEFAULT_SEGMENT_LINEAGE_UPDATE_NUM_RETRY = 5;
   public static final String APPEND = "APPEND";
-  private static final int DEFAULT_IDEAL_STATE_UPDATER_LOCKERS_SIZE = 500;
   private static final int DEFAULT_LINEAGE_UPDATER_LOCKERS_SIZE = 500;
   private static final String API_REQUEST_ID_PREFIX = "api-";
-  private static final int INFINITE_TIMEOUT = -1;
 
   private enum LineageUpdateType {
     START, END, REVERT
@@ -207,8 +204,6 @@ public class PinotHelixResourceManager {
   // TODO: make this configurable
   public static final long EXTERNAL_VIEW_ONLINE_SEGMENTS_MAX_WAIT_MS = 10 * 
60_000L; // 10 minutes
   public static final long EXTERNAL_VIEW_CHECK_INTERVAL_MS = 1_000L; // 1 
second
-  public static final long SEGMENT_CLEANUP_TIMEOUT_MS = 20 * 60_000L; // 20 
minutes
-  public static final long SEGMENT_CLEANUP_CHECK_INTERVAL_MS = 1_000L; // 1 
second
 
   private static final DateTimeFormatter SIMPLE_DATE_FORMAT =
       
DateTimeFormatter.ofPattern("yyyyMMdd'T'HHmmss'Z'").withZone(ZoneOffset.UTC);
@@ -2579,17 +2574,9 @@ public class PinotHelixResourceManager {
     }
 
     LOGGER.info("Sending delete table messages for table: {}", 
tableNameWithType);
-    Criteria recipientCriteria = new Criteria();
-    recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
-    recipientCriteria.setInstanceName("%");
-    recipientCriteria.setResource(tableNameWithType);
-    recipientCriteria.setSessionSpecific(true);
-    TableDeletionMessage tableDeletionMessage = new 
TableDeletionMessage(tableNameWithType);
     ClusterMessagingService messagingService = 
_helixZkManager.getMessagingService();
-
-    // Infinite timeout on the recipient
-    int timeoutMs = -1;
-    int numMessagesSent = messagingService.send(recipientCriteria, 
tableDeletionMessage, null, timeoutMs);
+    TableDeletionMessage tableDeletionMessage = new 
TableDeletionMessage(tableNameWithType);
+    int numMessagesSent = MessagingServiceUtils.send(messagingService, 
tableDeletionMessage, tableNameWithType);
     if (numMessagesSent > 0) {
       LOGGER.info("Sent {} delete table messages for table: {}", 
numMessagesSent, tableNameWithType);
     } else {
@@ -2634,20 +2621,15 @@ public class PinotHelixResourceManager {
       Preconditions.checkArgument(tt == TableType.OFFLINE,
           "Table: %s is not an OFFLINE table, which is required to force to 
download segments", tableNameWithType);
     }
-    // Infinite timeout on the recipient
-    int timeoutMs = -1;
+
+    ClusterMessagingService messagingService = 
_helixZkManager.getMessagingService();
     Map<String, Pair<Integer, String>> instanceMsgInfoMap = new HashMap<>();
     for (Map.Entry<String, List<String>> entry : 
instanceToSegmentsMap.entrySet()) {
       String targetInstance = entry.getKey();
-      List<String> segments = entry.getValue();
-      Criteria recipientCriteria = new Criteria();
-      recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
-      recipientCriteria.setInstanceName(targetInstance);
-      recipientCriteria.setResource(tableNameWithType);
-      recipientCriteria.setSessionSpecific(true);
-      SegmentReloadMessage segmentReloadMessage = new 
SegmentReloadMessage(tableNameWithType, segments, forceDownload);
-      ClusterMessagingService messagingService = 
_helixZkManager.getMessagingService();
-      int numMessagesSent = messagingService.send(recipientCriteria, 
segmentReloadMessage, null, timeoutMs);
+      SegmentReloadMessage segmentReloadMessage =
+          new SegmentReloadMessage(tableNameWithType, entry.getValue(), 
forceDownload);
+      int numMessagesSent =
+          MessagingServiceUtils.send(messagingService, segmentReloadMessage, 
tableNameWithType, null, targetInstance);
       if (numMessagesSent > 0) {
         LOGGER.info("Sent {} reload messages to instance: {} for table: {}", 
numMessagesSent, targetInstance,
             tableNameWithType);
@@ -2671,17 +2653,10 @@ public class PinotHelixResourceManager {
           "Table: %s is not an OFFLINE table, which is required to force to 
download segments", tableNameWithType);
     }
 
-    Criteria recipientCriteria = new Criteria();
-    recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
-    recipientCriteria.setInstanceName(targetInstance == null ? "%" : 
targetInstance);
-    recipientCriteria.setResource(tableNameWithType);
-    recipientCriteria.setSessionSpecific(true);
-    SegmentReloadMessage segmentReloadMessage = new 
SegmentReloadMessage(tableNameWithType, forceDownload);
     ClusterMessagingService messagingService = 
_helixZkManager.getMessagingService();
-
-    // Infinite timeout on the recipient
-    int timeoutMs = -1;
-    int numMessagesSent = messagingService.send(recipientCriteria, 
segmentReloadMessage, null, timeoutMs);
+    SegmentReloadMessage segmentReloadMessage = new 
SegmentReloadMessage(tableNameWithType, forceDownload);
+    int numMessagesSent =
+        MessagingServiceUtils.send(messagingService, segmentReloadMessage, 
tableNameWithType, null, targetInstance);
     if (numMessagesSent > 0) {
       LOGGER.info("Sent {} reload messages for table: {}", numMessagesSent, 
tableNameWithType);
     } else {
@@ -2704,19 +2679,12 @@ public class PinotHelixResourceManager {
           segmentName);
     }
 
-    Criteria recipientCriteria = new Criteria();
-    recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
-    recipientCriteria.setInstanceName(targetInstance == null ? "%" : 
targetInstance);
-    recipientCriteria.setResource(tableNameWithType);
-    recipientCriteria.setPartition(segmentName);
-    recipientCriteria.setSessionSpecific(true);
-    SegmentReloadMessage segmentReloadMessage =
-        new SegmentReloadMessage(tableNameWithType, 
Collections.singletonList(segmentName), forceDownload);
     ClusterMessagingService messagingService = 
_helixZkManager.getMessagingService();
-
-    // Infinite timeout on the recipient
-    int timeoutMs = -1;
-    int numMessagesSent = messagingService.send(recipientCriteria, 
segmentReloadMessage, null, timeoutMs);
+    SegmentReloadMessage segmentReloadMessage =
+        new SegmentReloadMessage(tableNameWithType, List.of(segmentName), 
forceDownload);
+    int numMessagesSent =
+        MessagingServiceUtils.send(messagingService, segmentReloadMessage, 
tableNameWithType, segmentName,
+            targetInstance);
     if (numMessagesSent > 0) {
       LOGGER.info("Sent {} reload messages for segment: {} in table: {}", 
numMessagesSent, segmentName,
           tableNameWithType);
@@ -2810,8 +2778,8 @@ public class PinotHelixResourceManager {
    * However instead of resetting only the ERROR state to its initial state. 
we reset all state regardless.
    */
   private void resetPartitionAllState(String instanceName, String 
resourceName, Set<String> resetPartitionNames) {
-    LOGGER.info("Reset partitions {} for resource {} on instance {} in cluster 
{}.",
-        resetPartitionNames == null ? "NULL" : resetPartitionNames, 
resourceName, instanceName, _helixClusterName);
+    LOGGER.info("Resetting partitions: {} for resource: {} on instance: {}", 
resetPartitionNames, resourceName,
+        instanceName);
     HelixDataAccessor accessor = _helixZkManager.getHelixDataAccessor();
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
 
@@ -2847,7 +2815,7 @@ public class PinotHelixResourceManager {
               resourceName, resetPartitionNames, instanceName, message, 
message.getResourceName()));
     }
 
-    String adminName = null;
+    String adminName;
     try {
       adminName = InetAddress.getLocalHost().getCanonicalHostName() + "-ADMIN";
     } catch (UnknownHostException e) {
@@ -2899,21 +2867,13 @@ public class PinotHelixResourceManager {
    */
   public void sendSegmentRefreshMessage(String tableNameWithType, String 
segmentName, boolean refreshServerSegment,
       boolean refreshBrokerRouting) {
+    ClusterMessagingService messagingService = 
_helixZkManager.getMessagingService();
     SegmentRefreshMessage segmentRefreshMessage = new 
SegmentRefreshMessage(tableNameWithType, segmentName);
 
     // Send segment refresh message to servers
-    Criteria recipientCriteria = new Criteria();
-    recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
-    recipientCriteria.setInstanceName("%");
-    recipientCriteria.setSessionSpecific(true);
-    ClusterMessagingService messagingService = 
_helixZkManager.getMessagingService();
-
     if (refreshServerSegment) {
-      // Send segment refresh message to servers
-      recipientCriteria.setResource(tableNameWithType);
-      recipientCriteria.setPartition(segmentName);
-      // Send message with no callback and infinite timeout on the recipient
-      int numMessagesSent = messagingService.send(recipientCriteria, 
segmentRefreshMessage, null, -1);
+      int numMessagesSent =
+          MessagingServiceUtils.send(messagingService, segmentRefreshMessage, 
tableNameWithType, segmentName, null);
       if (numMessagesSent > 0) {
         // TODO: Would be nice if we can get the name of the instances to 
which messages were sent
         LOGGER.info("Sent {} segment refresh messages to servers for segment: 
{} of table: {}", numMessagesSent,
@@ -2924,11 +2884,11 @@ public class PinotHelixResourceManager {
       }
     }
 
+    // Send segment refresh message to brokers
     if (refreshBrokerRouting) {
-      // Send segment refresh message to brokers
-      recipientCriteria.setResource(Helix.BROKER_RESOURCE_INSTANCE);
-      recipientCriteria.setPartition(tableNameWithType);
-      int numMessagesSent = messagingService.send(recipientCriteria, 
segmentRefreshMessage, null, -1);
+      int numMessagesSent =
+          MessagingServiceUtils.send(messagingService, segmentRefreshMessage, 
Helix.BROKER_RESOURCE_INSTANCE,
+              tableNameWithType, null);
       if (numMessagesSent > 0) {
         // TODO: Would be nice if we can get the name of the instances to 
which messages were sent
         LOGGER.info("Sent {} segment refresh messages to brokers for segment: 
{} of table: {}", numMessagesSent,
@@ -2941,18 +2901,11 @@ public class PinotHelixResourceManager {
   }
 
   private void sendTableConfigRefreshMessage(String tableNameWithType) {
+    ClusterMessagingService messagingService = 
_helixZkManager.getMessagingService();
     TableConfigRefreshMessage tableConfigRefreshMessage = new 
TableConfigRefreshMessage(tableNameWithType);
-
-    // Send table config refresh message to brokers
-    Criteria recipientCriteria = new Criteria();
-    recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
-    recipientCriteria.setInstanceName("%");
-    recipientCriteria.setResource(Helix.BROKER_RESOURCE_INSTANCE);
-    recipientCriteria.setSessionSpecific(true);
-    recipientCriteria.setPartition(tableNameWithType);
-    // Send message with no callback and infinite timeout on the recipient
     int numMessagesSent =
-        _helixZkManager.getMessagingService().send(recipientCriteria, 
tableConfigRefreshMessage, null, -1);
+        MessagingServiceUtils.send(messagingService, 
tableConfigRefreshMessage, Helix.BROKER_RESOURCE_INSTANCE,
+            tableNameWithType, null);
     if (numMessagesSent > 0) {
       // TODO: Would be nice if we can get the name of the instances to which 
messages were sent
       LOGGER.info("Sent {} table config refresh messages to brokers for table: 
{}", numMessagesSent, tableNameWithType);
@@ -2962,18 +2915,12 @@ public class PinotHelixResourceManager {
   }
 
   private void sendApplicationQpsQuotaRefreshMessage(String appName) {
-    ApplicationQpsQuotaRefreshMessage message = new 
ApplicationQpsQuotaRefreshMessage(appName);
-
-    // Send database config refresh message to brokers
-    Criteria criteria = new Criteria();
-    criteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
-    criteria.setInstanceName("%");
-    criteria.setResource(Helix.BROKER_RESOURCE_INSTANCE);
-    criteria.setSessionSpecific(true);
-
-    int numMessagesSent = _helixZkManager.getMessagingService().send(criteria, 
message, null, INFINITE_TIMEOUT);
+    ClusterMessagingService messagingService = 
_helixZkManager.getMessagingService();
+    ApplicationQpsQuotaRefreshMessage quotaRefreshMessage = new 
ApplicationQpsQuotaRefreshMessage(appName);
+    int numMessagesSent =
+        MessagingServiceUtils.send(messagingService, quotaRefreshMessage, 
Helix.BROKER_RESOURCE_INSTANCE);
     if (numMessagesSent > 0) {
-      LOGGER.info("Sent {} applcation qps quota refresh messages to brokers 
for application: {}", numMessagesSent,
+      LOGGER.info("Sent {} application qps quota refresh messages to brokers 
for application: {}", numMessagesSent,
           appName);
     } else {
       LOGGER.warn("No application qps quota refresh message sent to brokers 
for application: {}", appName);
@@ -2981,17 +2928,10 @@ public class PinotHelixResourceManager {
   }
 
   private void sendDatabaseConfigRefreshMessage(String databaseName) {
+    ClusterMessagingService messagingService = 
_helixZkManager.getMessagingService();
     DatabaseConfigRefreshMessage databaseConfigRefreshMessage = new 
DatabaseConfigRefreshMessage(databaseName);
-
-    // Send database config refresh message to brokers
-    Criteria recipientCriteria = new Criteria();
-    recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
-    recipientCriteria.setInstanceName("%");
-    recipientCriteria.setResource(Helix.BROKER_RESOURCE_INSTANCE);
-    recipientCriteria.setSessionSpecific(true);
-    // Send message with no callback and infinite timeout on the recipient
     int numMessagesSent =
-        _helixZkManager.getMessagingService().send(recipientCriteria, 
databaseConfigRefreshMessage, null, -1);
+        MessagingServiceUtils.send(messagingService, 
databaseConfigRefreshMessage, Helix.BROKER_RESOURCE_INSTANCE);
     if (numMessagesSent > 0) {
       LOGGER.info("Sent {} database config refresh messages to brokers for 
database: {}", numMessagesSent,
           databaseName);
@@ -3001,18 +2941,11 @@ public class PinotHelixResourceManager {
   }
 
   private void sendRoutingTableRebuildMessage(String tableNameWithType) {
+    ClusterMessagingService messagingService = 
_helixZkManager.getMessagingService();
     RoutingTableRebuildMessage routingTableRebuildMessage = new 
RoutingTableRebuildMessage(tableNameWithType);
-
-    // Send table config refresh message to brokers
-    Criteria recipientCriteria = new Criteria();
-    recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
-    recipientCriteria.setInstanceName("%");
-    recipientCriteria.setResource(Helix.BROKER_RESOURCE_INSTANCE);
-    recipientCriteria.setSessionSpecific(true);
-    recipientCriteria.setPartition(tableNameWithType);
-    // Send message with no callback and infinite timeout on the recipient
     int numMessagesSent =
-        _helixZkManager.getMessagingService().send(recipientCriteria, 
routingTableRebuildMessage, null, -1);
+        MessagingServiceUtils.send(messagingService, 
routingTableRebuildMessage, Helix.BROKER_RESOURCE_INSTANCE,
+            tableNameWithType, null);
     if (numMessagesSent > 0) {
       // TODO: Would be nice if we can get the name of the instances to which 
messages were sent
       LOGGER.info("Sent {} routing table rebuild messages to brokers for 
table: {}", numMessagesSent,
@@ -4515,27 +4448,17 @@ public class PinotHelixResourceManager {
   public PeriodicTaskInvocationResponse invokeControllerPeriodicTask(String 
tableName, String periodicTaskName,
       Map<String, String> taskProperties) {
     String periodicTaskRequestId = API_REQUEST_ID_PREFIX + 
UUID.randomUUID().toString().substring(0, 8);
-
     LOGGER.info("[TaskRequestId: {}] Sending periodic task message to all 
controllers for running task {} against {},"
             + " with properties {}.\"", periodicTaskRequestId, 
periodicTaskName,
         tableName != null ? " table '" + tableName + "'" : "all tables", 
taskProperties);
-
-    // Create and send message to send to all controllers (including this one)
-    Criteria recipientCriteria = new Criteria();
-    recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
-    recipientCriteria.setInstanceName("%");
-    recipientCriteria.setSessionSpecific(true);
-    
recipientCriteria.setResource(CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME);
-    recipientCriteria.setSelfExcluded(false);
+    ClusterMessagingService messagingService = 
_helixZkManager.getMessagingService();
     RunPeriodicTaskMessage runPeriodicTaskMessage =
         new RunPeriodicTaskMessage(periodicTaskRequestId, periodicTaskName, 
tableName, taskProperties);
-
-    ClusterMessagingService clusterMessagingService = 
getHelixZkManager().getMessagingService();
-    int messageCount = clusterMessagingService.send(recipientCriteria, 
runPeriodicTaskMessage, null, -1);
-
+    int numMessagesSent = 
MessagingServiceUtils.sendIncludingSelf(messagingService, 
runPeriodicTaskMessage,
+        Helix.LEAD_CONTROLLER_RESOURCE_NAME);
     LOGGER.info("[TaskRequestId: {}] Periodic task execution message sent to 
{} controllers.", periodicTaskRequestId,
-        messageCount);
-    return new PeriodicTaskInvocationResponse(periodicTaskRequestId, 
messageCount > 0);
+        numMessagesSent);
+    return new PeriodicTaskInvocationResponse(periodicTaskRequestId, 
numMessagesSent > 0);
   }
 
   /**
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
index 727bdbdfda..9c7e37e86e 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
@@ -25,7 +25,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.PriorityQueue;
@@ -33,7 +32,6 @@ import java.util.Set;
 import java.util.TreeMap;
 import javax.annotation.Nullable;
 import org.apache.helix.HelixManager;
-import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
 import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
@@ -143,24 +141,82 @@ public class SegmentAssignmentUtils {
   }
 
   /**
-   * Rebalances the table with Helix AutoRebalanceStrategy.
+   * Rebalances the table for the non-replica-group based segment assignment 
strategy by using uniformly spraying
+   * segment replicas to the instances.
+   * <ul>
+   *   <li>
+   *     1. Calculate the target number of segments on each instance
+   *   </li>
+   *   <li>
+   *     2. Loop over all the segments and keep the assignment if target 
number of segments for the instance has not
+   *     been reached and track the not assigned segments
+   *   </li>
+   *   <li>
+   *     3. Assign the left-over segments to the instances with the least 
segments, or the smallest index if there is a
+   *     tie
+   *   </li>
+   * </ul>
    */
-  public static Map<String, Map<String, String>> 
rebalanceTableWithHelixAutoRebalanceStrategy(
+  public static Map<String, Map<String, String>> 
rebalanceNonReplicaGroupBasedTable(
       Map<String, Map<String, String>> currentAssignment, List<String> 
instances, int replication) {
-    // Use Helix AutoRebalanceStrategy to rebalance the table
-    LinkedHashMap<String, Integer> states = new LinkedHashMap<>();
-    states.put(SegmentStateModel.ONLINE, replication);
-    AutoRebalanceStrategy autoRebalanceStrategy =
-        new AutoRebalanceStrategy(null, new 
ArrayList<>(currentAssignment.keySet()), states);
-    // Make a copy of the current assignment because this step might change 
the passed in assignment
-    Map<String, Map<String, String>> currentAssignmentCopy = new TreeMap<>();
+    Map<String, Integer> instanceNameToIdMap = 
getInstanceNameToIdMap(instances);
+
+    // Calculate target number of segments per instance
+    // NOTE: in order to minimize the segment movements, use the ceiling of 
the quotient
+    int numInstances = instances.size();
+    int numSegments = currentAssignment.size();
+    int targetNumSegmentsPerInstance = (numSegments * replication + 
numInstances - 1) / numInstances;
+
+    // Do not move segment if target number of segments is not reached, track 
the segments need to be moved
+    Map<String, Map<String, String>> newAssignment = new TreeMap<>();
+    int[] numSegmentsAssignedPerInstance = new int[numInstances];
+    List<String> segmentsNotAssigned = new ArrayList<>();
     for (Map.Entry<String, Map<String, String>> entry : 
currentAssignment.entrySet()) {
       String segmentName = entry.getKey();
-      Map<String, String> instanceStateMap = entry.getValue();
-      currentAssignmentCopy.put(segmentName, new TreeMap<>(instanceStateMap));
+      Set<String> currentInstances = entry.getValue().keySet();
+      int remainingReplicas = replication;
+      for (String instanceName : currentInstances) {
+        Integer instanceId = instanceNameToIdMap.get(instanceName);
+        if (instanceId != null && numSegmentsAssignedPerInstance[instanceId] < 
targetNumSegmentsPerInstance) {
+          newAssignment.computeIfAbsent(segmentName, k -> new 
TreeMap<>()).put(instanceName, SegmentStateModel.ONLINE);
+          numSegmentsAssignedPerInstance[instanceId]++;
+          remainingReplicas--;
+          if (remainingReplicas == 0) {
+            break;
+          }
+        }
+      }
+      for (int i = 0; i < remainingReplicas; i++) {
+        segmentsNotAssigned.add(segmentName);
+      }
+    }
+
+    // Assign each not assigned segment to the instance with the least 
segments, or the smallest id if there is a tie
+    PriorityQueue<Pairs.IntPair> heap = new PriorityQueue<>(numInstances, 
Pairs.intPairComparator());
+    for (int instanceId = 0; instanceId < numInstances; instanceId++) {
+      heap.add(new Pairs.IntPair(numSegmentsAssignedPerInstance[instanceId], 
instanceId));
     }
-    return autoRebalanceStrategy.computePartitionAssignment(instances, 
instances, currentAssignmentCopy, null)
-        .getMapFields();
+    List<Pairs.IntPair> skippedPairs = new ArrayList<>();
+    for (String segmentName : segmentsNotAssigned) {
+      Map<String, String> instanceStateMap = 
newAssignment.computeIfAbsent(segmentName, k -> new TreeMap<>());
+      while (true) {
+        Pairs.IntPair intPair = heap.remove();
+        int instanceId = intPair.getRight();
+        String instanceName = instances.get(instanceId);
+        // Skip the instance if it already has the segment
+        if (instanceStateMap.put(instanceName, SegmentStateModel.ONLINE) == 
null) {
+          intPair.setLeft(intPair.getLeft() + 1);
+          heap.add(intPair);
+          break;
+        } else {
+          skippedPairs.add(intPair);
+        }
+      }
+      heap.addAll(skippedPairs);
+      skippedPairs.clear();
+    }
+
+    return newAssignment;
   }
 
   /**
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/BalancedNumSegmentAssignmentStrategy.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/BalancedNumSegmentAssignmentStrategy.java
index e9c540da78..2a714e61fc 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/BalancedNumSegmentAssignmentStrategy.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/BalancedNumSegmentAssignmentStrategy.java
@@ -42,17 +42,16 @@ import org.slf4j.LoggerFactory;
 public class BalancedNumSegmentAssignmentStrategy implements 
SegmentAssignmentStrategy {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(BalancedNumSegmentAssignmentStrategy.class);
 
-  private String _tableNameWithType;
   private int _replication;
 
   @Override
   public void init(HelixManager helixManager, TableConfig tableConfig) {
-    _tableNameWithType = tableConfig.getTableName();
+    String tableNameWithType = tableConfig.getTableName();
     SegmentsValidationAndRetentionConfig validationAndRetentionConfig = 
tableConfig.getValidationConfig();
     Preconditions.checkState(validationAndRetentionConfig != null, "Validation 
Config is null");
     _replication = tableConfig.getReplication();
     LOGGER.info("Initialized BalancedNumSegmentAssignmentStrategy for table: " 
+ "{} with replication: {}",
-        _tableNameWithType, _replication);
+        tableNameWithType, _replication);
   }
 
   @Override
@@ -70,7 +69,7 @@ public class BalancedNumSegmentAssignmentStrategy implements 
SegmentAssignmentSt
     List<String> instances =
         
SegmentAssignmentUtils.getInstancesForNonReplicaGroupBasedAssignment(instancePartitions,
 _replication);
     newAssignment =
-        
SegmentAssignmentUtils.rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment,
 instances, _replication);
+        
SegmentAssignmentUtils.rebalanceNonReplicaGroupBasedTable(currentAssignment, 
instances, _replication);
     return newAssignment;
   }
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 7121dfe1dc..95cc876754 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -54,10 +54,8 @@ import javax.annotation.Nullable;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.helix.AccessOption;
 import org.apache.helix.ClusterMessagingService;
-import org.apache.helix.Criteria;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixManager;
-import org.apache.helix.InstanceType;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
@@ -98,6 +96,7 @@ import 
org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpd
 import 
org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpdater;
 import 
org.apache.pinot.controller.helix.core.retention.strategy.RetentionStrategy;
 import 
org.apache.pinot.controller.helix.core.retention.strategy.TimeRetentionStrategy;
+import org.apache.pinot.controller.helix.core.util.MessagingServiceUtils;
 import org.apache.pinot.controller.validation.RealtimeSegmentValidationManager;
 import org.apache.pinot.core.data.manager.realtime.SegmentCompletionUtils;
 import org.apache.pinot.core.util.PeerServerSegmentFinder;
@@ -1312,16 +1311,10 @@ public class PinotLLCRealtimeSegmentManager {
         newConsumingSegment, newInstances, realtimeTableName, 
instancesNoLongerServe);
 
     ClusterMessagingService messagingService = 
_helixManager.getMessagingService();
+    IngestionMetricsRemoveMessage message = new 
IngestionMetricsRemoveMessage();
     List<String> instancesSent = new 
ArrayList<>(instancesNoLongerServe.size());
     for (String instance : instancesNoLongerServe) {
-      Criteria recipientCriteria = new Criteria();
-      recipientCriteria.setInstanceName(instance);
-      recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
-      recipientCriteria.setResource(realtimeTableName);
-      recipientCriteria.setPartition(committedSegment);
-      recipientCriteria.setSessionSpecific(true);
-      IngestionMetricsRemoveMessage message = new 
IngestionMetricsRemoveMessage();
-      if (messagingService.send(recipientCriteria, message, null, -1) > 0) {
+      if (MessagingServiceUtils.send(messagingService, message, 
realtimeTableName, committedSegment, instance) > 0) {
         instancesSent.add(instance);
       } else {
         LOGGER.warn("Failed to send ingestion metrics remove message for 
table: {} segment: {} to instance: {}",
@@ -2251,13 +2244,9 @@ public class PinotLLCRealtimeSegmentManager {
 
   private void sendForceCommitMessageToServers(String tableNameWithType, 
Set<String> consumingSegments) {
     if (!consumingSegments.isEmpty()) {
-      Criteria recipientCriteria = new Criteria();
-      recipientCriteria.setInstanceName("%");
-      recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
-      recipientCriteria.setResource(tableNameWithType);
-      recipientCriteria.setSessionSpecific(true);
+      ClusterMessagingService messagingService = 
_helixManager.getMessagingService();
       ForceCommitMessage message = new ForceCommitMessage(tableNameWithType, 
consumingSegments);
-      int numMessagesSent = 
_helixManager.getMessagingService().send(recipientCriteria, message, null, -1);
+      int numMessagesSent = MessagingServiceUtils.send(messagingService, 
message, tableNameWithType);
       if (numMessagesSent > 0) {
         LOGGER.info("Sent {} force commit messages for table: {} segments: 
{}", numMessagesSent, tableNameWithType,
             consumingSegments);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java
index 13b4bae7f1..c6fe9bf26f 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java
@@ -32,8 +32,6 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.function.Consumer;
 import org.apache.hc.client5.http.io.HttpClientConnectionManager;
 import org.apache.helix.ClusterMessagingService;
-import org.apache.helix.Criteria;
-import org.apache.helix.InstanceType;
 import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils;
 import org.apache.pinot.common.messages.SegmentReloadMessage;
 import org.apache.pinot.common.metrics.ControllerMetrics;
@@ -45,6 +43,7 @@ import 
org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTas
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
 import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
+import org.apache.pinot.controller.helix.core.util.MessagingServiceUtils;
 import org.apache.pinot.controller.util.TableTierReader;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
@@ -262,7 +261,7 @@ public class SegmentRelocator extends 
ControllerPeriodicTask<Void> {
         }
       }
     }
-    if (serverToSegmentsToMigrate.size() > 0) {
+    if (!serverToSegmentsToMigrate.isEmpty()) {
       LOGGER.info("Notify servers: {} to move segments to new tiers locally", 
serverToSegmentsToMigrate.keySet());
       reloadSegmentsForLocalTierMigration(tableNameWithType, 
serverToSegmentsToMigrate, messagingService);
     } else {
@@ -275,17 +274,13 @@ public class SegmentRelocator extends 
ControllerPeriodicTask<Void> {
     for (Map.Entry<String, Set<String>> entry : 
serverToSegmentsToMigrate.entrySet()) {
       String serverName = entry.getKey();
       Set<String> segmentNames = entry.getValue();
+      LOGGER.info("Sending SegmentReloadMessage to server: {} to reload 
segments: {} of table: {}", serverName,
+          segmentNames, tableNameWithType);
       // One SegmentReloadMessage per server but takes all segment names.
-      Criteria recipientCriteria = new Criteria();
-      recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
-      recipientCriteria.setInstanceName(serverName);
-      recipientCriteria.setResource(tableNameWithType);
-      recipientCriteria.setSessionSpecific(true);
       SegmentReloadMessage segmentReloadMessage =
           new SegmentReloadMessage(tableNameWithType, new 
ArrayList<>(segmentNames), false);
-      LOGGER.info("Sending SegmentReloadMessage to server: {} to reload 
segments: {} of table: {}", serverName,
-          segmentNames, tableNameWithType);
-      int numMessagesSent = messagingService.send(recipientCriteria, 
segmentReloadMessage, null, -1);
+      int numMessagesSent =
+          MessagingServiceUtils.send(messagingService, segmentReloadMessage, 
tableNameWithType, null, serverName);
       if (numMessagesSent > 0) {
         LOGGER.info("Sent SegmentReloadMessage to server: {} for table: {}", 
serverName, tableNameWithType);
       } else {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/MessagingServiceUtils.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/MessagingServiceUtils.java
new file mode 100644
index 0000000000..f977240f96
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/MessagingServiceUtils.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.util;
+
+import javax.annotation.Nullable;
+import org.apache.helix.ClusterMessagingService;
+import org.apache.helix.Criteria;
+import org.apache.helix.InstanceType;
+import org.apache.helix.model.Message;
+
+
+public class MessagingServiceUtils {
+  private MessagingServiceUtils() {
+  }
+
+  /**
+   * Sends a message to the recipients specified by the criteria, returns the 
messages being sent.
+   */
+  public static int send(ClusterMessagingService messagingService, Message 
message, Criteria criteria) {
+    try {
+      return messagingService.send(criteria, message);
+    } catch (Exception e) {
+      // NOTE:
+      // It can throw exception when the target resource doesn't exist (e.g. 
ExternalView has not been created yet). It
+      // is normal case, and we count it as no message being sent.
+      return 0;
+    }
+  }
+
+  private static int send(ClusterMessagingService messagingService, Message 
message, String resource,
+      @Nullable String partition, @Nullable String instanceName, boolean 
includingSelf) {
+    Criteria criteria = new Criteria();
+    criteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
+    criteria.setSessionSpecific(true);
+    criteria.setResource(resource);
+    if (partition != null) {
+      criteria.setPartition(partition);
+    }
+    if (instanceName != null) {
+      criteria.setInstanceName(instanceName);
+    }
+    criteria.setSelfExcluded(!includingSelf);
+    return send(messagingService, message, criteria);
+  }
+
+  public static int send(ClusterMessagingService messagingService, Message 
message, String resource,
+      @Nullable String partition, @Nullable String instanceName) {
+    return send(messagingService, message, resource, partition, instanceName, 
false);
+  }
+
+  public static int send(ClusterMessagingService messagingService, Message 
message, String resource) {
+    return send(messagingService, message, resource, null, null, false);
+  }
+
+  public static int sendIncludingSelf(ClusterMessagingService 
messagingService, Message message, String resource) {
+    return send(messagingService, message, resource, null, null, true);
+  }
+}
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtilsTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtilsTest.java
index 0f43b7869d..501db9e572 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtilsTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtilsTest.java
@@ -76,8 +76,7 @@ public class SegmentAssignmentUtilsTest {
     Arrays.fill(expectedNumSegmentsAssignedPerInstance, 
numSegmentsPerInstance);
     assertEquals(numSegmentsAssignedPerInstance, 
expectedNumSegmentsAssignedPerInstance);
     // Current assignment should already be balanced
-    assertEquals(
-        
SegmentAssignmentUtils.rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment,
 instances, NUM_REPLICAS),
+    
assertEquals(SegmentAssignmentUtils.rebalanceNonReplicaGroupBasedTable(currentAssignment,
 instances, NUM_REPLICAS),
         currentAssignment);
 
     // Replace instance_0 with instance_10
@@ -85,8 +84,7 @@ public class SegmentAssignmentUtilsTest {
     String newInstanceName = INSTANCE_NAME_PREFIX + 10;
     newInstances.set(0, newInstanceName);
     Map<String, Map<String, String>> newAssignment =
-        
SegmentAssignmentUtils.rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment,
 newInstances,
-            NUM_REPLICAS);
+        
SegmentAssignmentUtils.rebalanceNonReplicaGroupBasedTable(currentAssignment, 
newInstances, NUM_REPLICAS);
     // There should be 100 segments assigned
     assertEquals(currentAssignment.size(), numSegments);
     // Each segment should have 3 replicas
@@ -116,30 +114,29 @@ public class SegmentAssignmentUtilsTest {
     // }
     int newNumInstances = numInstances - 5;
     newInstances = 
SegmentAssignmentTestUtils.getNameList(INSTANCE_NAME_PREFIX, newNumInstances);
-    newAssignment = 
SegmentAssignmentUtils.rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment,
 newInstances,
-        NUM_REPLICAS);
+    newAssignment =
+        
SegmentAssignmentUtils.rebalanceNonReplicaGroupBasedTable(currentAssignment, 
newInstances, NUM_REPLICAS);
     // There should be 100 segments assigned
     assertEquals(newAssignment.size(), numSegments);
     // Each segment should have 3 replicas
     for (Map<String, String> instanceStateMap : newAssignment.values()) {
       assertEquals(instanceStateMap.size(), NUM_REPLICAS);
     }
-    // The segments are not perfectly balanced, but should be deterministic
     numSegmentsAssignedPerInstance =
         
SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(newAssignment, 
newInstances);
-    assertEquals(numSegmentsAssignedPerInstance[0], 56);
+    assertEquals(numSegmentsAssignedPerInstance[0], 60);
     assertEquals(numSegmentsAssignedPerInstance[1], 60);
     assertEquals(numSegmentsAssignedPerInstance[2], 60);
     assertEquals(numSegmentsAssignedPerInstance[3], 60);
-    assertEquals(numSegmentsAssignedPerInstance[4], 64);
+    assertEquals(numSegmentsAssignedPerInstance[4], 60);
     numSegmentsToMovePerInstance =
         
SegmentAssignmentUtils.getNumSegmentsToMovePerInstance(currentAssignment, 
newAssignment);
     assertEquals(numSegmentsToMovePerInstance.size(), numInstances);
-    assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + 0), 
IntIntPair.of(26, 0));
+    assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + 0), 
IntIntPair.of(30, 0));
     assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + 1), 
IntIntPair.of(30, 0));
     assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + 2), 
IntIntPair.of(30, 0));
     assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + 3), 
IntIntPair.of(30, 0));
-    assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + 4), 
IntIntPair.of(34, 0));
+    assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + 4), 
IntIntPair.of(30, 0));
     for (int i = 5; i < 10; i++) {
       assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + i), 
IntIntPair.of(0, 30));
     }
@@ -150,8 +147,8 @@ public class SegmentAssignmentUtilsTest {
     // }
     newNumInstances = numInstances + 5;
     newInstances = 
SegmentAssignmentTestUtils.getNameList(INSTANCE_NAME_PREFIX, newNumInstances);
-    newAssignment = 
SegmentAssignmentUtils.rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment,
 newInstances,
-        NUM_REPLICAS);
+    newAssignment =
+        
SegmentAssignmentUtils.rebalanceNonReplicaGroupBasedTable(currentAssignment, 
newInstances, NUM_REPLICAS);
     // There should be 100 segments assigned
     assertEquals(newAssignment.size(), numSegments);
     // Each segment should have 3 replicas
@@ -182,8 +179,8 @@ public class SegmentAssignmentUtilsTest {
     // }
     String newInstanceNamePrefix = "i_";
     newInstances = 
SegmentAssignmentTestUtils.getNameList(newInstanceNamePrefix, numInstances);
-    newAssignment = 
SegmentAssignmentUtils.rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment,
 newInstances,
-        NUM_REPLICAS);
+    newAssignment =
+        
SegmentAssignmentUtils.rebalanceNonReplicaGroupBasedTable(currentAssignment, 
newInstances, NUM_REPLICAS);
     // There should be 100 segments assigned
     assertEquals(newAssignment.size(), numSegments);
     // Each segment should have 3 replicas
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocatorTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocatorTest.java
index 3c08b79d92..55029806a3 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocatorTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocatorTest.java
@@ -43,7 +43,6 @@ import org.apache.pinot.util.TestUtils;
 import org.mockito.ArgumentCaptor;
 import org.testng.annotations.Test;
 
-import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -93,8 +92,7 @@ public class SegmentRelocatorTest {
 
     ArgumentCaptor<Criteria> criteriaCapture = 
ArgumentCaptor.forClass(Criteria.class);
     ArgumentCaptor<SegmentReloadMessage> reloadMessageCapture = 
ArgumentCaptor.forClass(SegmentReloadMessage.class);
-    verify(messagingService, times(2)).send(criteriaCapture.capture(), 
reloadMessageCapture.capture(), eq(null),
-        eq(-1));
+    verify(messagingService, times(2)).send(criteriaCapture.capture(), 
reloadMessageCapture.capture());
 
     List<Criteria> criteriaList = criteriaCapture.getAllValues();
     List<SegmentReloadMessage> msgList = reloadMessageCapture.getAllValues();
diff --git a/pom.xml b/pom.xml
index 98eb651a00..f2445a5223 100644
--- a/pom.xml
+++ b/pom.xml
@@ -140,7 +140,7 @@
     <parquet.version>1.15.0</parquet.version>
     <orc.version>1.9.5</orc.version>
     <hive.version>2.8.1</hive.version>
-    <helix.version>1.3.1</helix.version>
+    <helix.version>1.4.3</helix.version>
     <zkclient.version>0.11</zkclient.version>
     <jackson.version>2.18.2</jackson.version>
     <zookeeper.version>3.9.3</zookeeper.version>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to