This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch ApplicationClusterManager
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 6ac348fe5aa3fe1e12358062d203b31bb885d1b6
Author: Zachary Pinto <[email protected]>
AuthorDate: Thu Dec 7 17:46:59 2023 -0800

    Prevent the spectator routing table from containing SWAP_IN 
instances.(#2710)
    
    Prevent the spectator routing table from containing SWAP_IN instances.
---
 .../waged/model/ClusterModelProvider.java          |   6 +-
 .../apache/helix/spectator/RoutingDataCache.java   |  70 ++++++++++++-
 .../helix/spectator/RoutingTableProvider.java      |  14 ++-
 .../rebalancer/TestInstanceOperation.java          | 109 +++++++++++++++++++++
 .../TestRoutingTableProviderFromCurrentStates.java |   8 +-
 5 files changed, 195 insertions(+), 12 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
index a869a904e..69fec9b2c 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
@@ -209,13 +209,15 @@ public class ClusterModelProvider {
 
     // Get the set of active logical ids.
     Set<String> activeLogicalIds = activeInstances.stream().map(
-        instanceName -> assignableInstanceConfigMap.get(instanceName)
+        instanceName -> assignableInstanceConfigMap.getOrDefault(instanceName,
+                new InstanceConfig(instanceName))
             
.getLogicalId(clusterTopologyConfig.getEndNodeType())).collect(Collectors.toSet());
 
     Set<String> assignableLiveInstanceNames = 
dataProvider.getAssignableLiveInstances().keySet();
     Set<String> assignableLiveInstanceLogicalIds =
         assignableLiveInstanceNames.stream().map(
-            instanceName -> assignableInstanceConfigMap.get(instanceName)
+            instanceName -> 
assignableInstanceConfigMap.getOrDefault(instanceName,
+                    new InstanceConfig(instanceName))
                 
.getLogicalId(clusterTopologyConfig.getEndNodeType())).collect(Collectors.toSet());
 
     // Generate replica objects for all the resource partitions.
diff --git 
a/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java 
b/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
index 727bd8df9..8872e9eda 100644
--- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
@@ -23,8 +23,10 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
@@ -34,9 +36,11 @@ import org.apache.helix.common.caches.CurrentStateCache;
 import org.apache.helix.common.caches.CurrentStateSnapshot;
 import org.apache.helix.common.caches.CustomizedViewCache;
 import org.apache.helix.common.caches.TargetExternalViewCache;
+import org.apache.helix.constants.InstanceConstants;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.CustomizedView;
 import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,6 +51,10 @@ import org.slf4j.LoggerFactory;
 class RoutingDataCache extends BasicClusterDataCache {
   private static Logger LOG = 
LoggerFactory.getLogger(RoutingDataCache.class.getName());
 
+  // When an instance has any of these instance operations, it should not be 
routable.
+  private static final ImmutableSet<String> NON_ROUTABLE_INSTANCE_OPERATIONS =
+      ImmutableSet.of(InstanceConstants.InstanceOperation.SWAP_IN.name());
+
   private final Map<PropertyType, List<String>> _sourceDataTypeMap;
 
   private CurrentStateCache _currentStateCache;
@@ -54,6 +62,8 @@ class RoutingDataCache extends BasicClusterDataCache {
   // propertyCache, this hardcoded list of fields won't be necessary.
   private Map<String, CustomizedViewCache> _customizedViewCaches;
   private TargetExternalViewCache _targetExternalViewCache;
+  private Map<String, LiveInstance> _routableLiveInstanceMap;
+  private Map<String, InstanceConfig> _routableInstanceConfigMap;
 
   public RoutingDataCache(String clusterName, PropertyType sourceDataType) {
     this (clusterName, ImmutableMap.of(sourceDataType, 
Collections.emptyList()));
@@ -73,6 +83,8 @@ class RoutingDataCache extends BasicClusterDataCache {
         .forEach(customizedStateType -> 
_customizedViewCaches.put(customizedStateType,
             new CustomizedViewCache(clusterName, customizedStateType)));
     _targetExternalViewCache = new TargetExternalViewCache(clusterName);
+    _routableInstanceConfigMap = new HashMap<>();
+    _routableLiveInstanceMap = new HashMap<>();
     requireFullRefresh();
   }
 
@@ -88,7 +100,26 @@ class RoutingDataCache extends BasicClusterDataCache {
     LOG.info("START: RoutingDataCache.refresh() for cluster " + _clusterName);
     long startTime = System.currentTimeMillis();
 
+    // Store whether a refresh for routable instances is necessary, as the 
super.refresh() call will
+    // set the _propertyDataChangedMap values for the instance config and live 
instance change types to false.
+    boolean refreshRoutableInstanceConfigs =
+        
_propertyDataChangedMap.getOrDefault(HelixConstants.ChangeType.INSTANCE_CONFIG, 
false);
+    // If there is an InstanceConfig change, update the routable instance 
configs and live instances.
+    // Must also do live instances because whether and instance is routable is 
based off of the instance config.
+    boolean refreshRoutableLiveInstances =
+        
_propertyDataChangedMap.getOrDefault(HelixConstants.ChangeType.LIVE_INSTANCE, 
false)
+            || refreshRoutableInstanceConfigs;
+
     super.refresh(accessor);
+
+    if (refreshRoutableInstanceConfigs) {
+      
updateRoutableInstanceConfigMap(_instanceConfigPropertyCache.getPropertyMap());
+    }
+    if (refreshRoutableLiveInstances) {
+      updateRoutableLiveInstanceMap(getRoutableInstanceConfigMap(),
+          _liveInstancePropertyCache.getPropertyMap());
+    }
+
     for (PropertyType propertyType : _sourceDataTypeMap.keySet()) {
       long start = System.currentTimeMillis();
       switch (propertyType) {
@@ -114,7 +145,9 @@ class RoutingDataCache extends BasicClusterDataCache {
            * TODO: logic.
            **/
           _liveInstancePropertyCache.refresh(accessor);
-          Map<String, LiveInstance> liveInstanceMap = getLiveInstances();
+          updateRoutableLiveInstanceMap(getRoutableInstanceConfigMap(),
+              _liveInstancePropertyCache.getPropertyMap());
+          Map<String, LiveInstance> liveInstanceMap = 
getRoutableLiveInstances();
           _currentStateCache.refresh(accessor, liveInstanceMap);
           LOG.info("Reload CurrentStates. Takes " + 
(System.currentTimeMillis() - start) + " ms");
         }
@@ -150,6 +183,41 @@ class RoutingDataCache extends BasicClusterDataCache {
     }
   }
 
+  private void updateRoutableInstanceConfigMap(Map<String, InstanceConfig> 
instanceConfigMap) {
+    _routableInstanceConfigMap = instanceConfigMap.entrySet().stream().filter(
+            (instanceConfigEntry) -> 
!NON_ROUTABLE_INSTANCE_OPERATIONS.contains(
+                instanceConfigEntry.getValue().getInstanceOperation()))
+        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+  }
+
+  private void updateRoutableLiveInstanceMap(Map<String, InstanceConfig> 
instanceConfigMap,
+      Map<String, LiveInstance> liveInstanceMap) {
+    _routableLiveInstanceMap = liveInstanceMap.entrySet().stream().filter(
+            (liveInstanceEntry) -> 
instanceConfigMap.containsKey(liveInstanceEntry.getKey())
+                && !NON_ROUTABLE_INSTANCE_OPERATIONS.contains(
+                
instanceConfigMap.get(liveInstanceEntry.getKey()).getInstanceOperation()))
+        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+  }
+
+  /**
+   * Returns the LiveInstances for each of the routable instances that are 
currently up and
+   * running.
+   *
+   * @return a map of LiveInstances
+   */
+  public Map<String, LiveInstance> getRoutableLiveInstances() {
+    return Collections.unmodifiableMap(_routableLiveInstanceMap);
+  }
+
+  /**
+   * Returns the instance config map for all the routable instances that are 
in the cluster.
+   *
+   * @return a map of InstanceConfigs
+   */
+  public Map<String, InstanceConfig> getRoutableInstanceConfigMap() {
+    return Collections.unmodifiableMap(_routableInstanceConfigMap);
+  }
+
   /**
    * Retrieves the TargetExternalView for all resources
    *
diff --git 
a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java 
b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
index 0d97c9fec..c27f08462 100644
--- 
a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
+++ 
b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
@@ -923,14 +923,16 @@ public class RoutingTableProvider
           case EXTERNALVIEW: {
             String keyReference = generateReferenceKey(propertyType.name(), 
DEFAULT_STATE_TYPE);
             refreshExternalView(_dataCache.getExternalViews().values(),
-                _dataCache.getInstanceConfigMap().values(), 
_dataCache.getLiveInstances().values(),
+                _dataCache.getRoutableInstanceConfigMap().values(),
+                _dataCache.getRoutableLiveInstances().values(),
                 keyReference);
           }
             break;
           case TARGETEXTERNALVIEW: {
             String keyReference = generateReferenceKey(propertyType.name(), 
DEFAULT_STATE_TYPE);
             refreshExternalView(_dataCache.getTargetExternalViews().values(),
-                _dataCache.getInstanceConfigMap().values(), 
_dataCache.getLiveInstances().values(),
+                _dataCache.getRoutableInstanceConfigMap().values(),
+                _dataCache.getRoutableLiveInstances().values(),
                 keyReference);
           }
               break;
@@ -938,13 +940,15 @@ public class RoutingTableProvider
               for (String customizedStateType : 
_sourceDataTypeMap.getOrDefault(PropertyType.CUSTOMIZEDVIEW, 
Collections.emptyList())) {
                 String keyReference = 
generateReferenceKey(propertyType.name(),  customizedStateType);
                 
refreshCustomizedView(_dataCache.getCustomizedView(customizedStateType).values(),
-                    _dataCache.getInstanceConfigMap().values(), 
_dataCache.getLiveInstances().values(), keyReference);
+                    _dataCache.getRoutableInstanceConfigMap().values(),
+                    _dataCache.getRoutableLiveInstances().values(), 
keyReference);
               }
               break;
             case CURRENTSTATES: {
               String keyReference = generateReferenceKey(propertyType.name(),  
DEFAULT_STATE_TYPE);;
-              refreshCurrentState(_dataCache.getCurrentStatesMap(), 
_dataCache.getInstanceConfigMap().values(),
-                  _dataCache.getLiveInstances().values(), keyReference);
+              refreshCurrentState(_dataCache.getCurrentStatesMap(),
+                  _dataCache.getRoutableInstanceConfigMap().values(),
+                  _dataCache.getRoutableLiveInstances().values(), 
keyReference);
               recordPropagationLatency(System.currentTimeMillis(), 
_dataCache.getCurrentStateSnapshot());
             }
               break;
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
index b7c90d841..3f0aa5d9e 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
@@ -1,6 +1,8 @@
 package org.apache.helix.integration.rebalancer;
 
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
@@ -16,8 +18,12 @@ import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.HelixRollbackException;
+import org.apache.helix.InstanceType;
 import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyType;
 import org.apache.helix.TestHelper;
 import org.apache.helix.common.ZkTestBase;
 import org.apache.helix.constants.InstanceConstants;
@@ -41,10 +47,12 @@ import org.apache.helix.participant.statemachine.StateModel;
 import org.apache.helix.participant.statemachine.StateModelFactory;
 import org.apache.helix.participant.statemachine.StateModelInfo;
 import org.apache.helix.participant.statemachine.Transition;
+import org.apache.helix.spectator.RoutingTableProvider;
 import 
org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
 import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
 import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
 import org.testng.Assert;
+import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
@@ -70,6 +78,10 @@ public class TestInstanceOperation extends ZkTestBase {
       ImmutableSet.of("MASTER", "LEADER", "SLAVE", "STANDBY");
   private int REPLICA = 3;
   protected ClusterControllerManager _controller;
+  private HelixManager _spectator;
+  private RoutingTableProvider _routingTableProviderDefault;
+  private RoutingTableProvider _routingTableProviderEV;
+  private RoutingTableProvider _routingTableProviderCS;
   List<MockParticipantManager> _participants = new ArrayList<>();
   private List<String> _originalParticipantNames = new ArrayList<>();
   List<String> _participantNames = new ArrayList<>();
@@ -113,6 +125,15 @@ public class TestInstanceOperation extends ZkTestBase {
     _configAccessor = new ConfigAccessor(_gZkClient);
     _dataAccessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor);
 
+    // start spectator
+    _spectator =
+        HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "spectator", 
InstanceType.SPECTATOR,
+            ZK_ADDR);
+    _spectator.connect();
+    _routingTableProviderDefault = new RoutingTableProvider(_spectator);
+    _routingTableProviderEV = new RoutingTableProvider(_spectator, 
PropertyType.EXTERNALVIEW);
+    _routingTableProviderCS = new RoutingTableProvider(_spectator, 
PropertyType.CURRENTSTATES);
+
     setupClusterConfig();
 
     createTestDBs(DEFAULT_RESOURCE_DELAY_TIME);
@@ -122,6 +143,18 @@ public class TestInstanceOperation extends ZkTestBase {
     _admin = new ZKHelixAdmin(_gZkClient);
   }
 
+  @AfterClass
+  public void afterClass() {
+    for (MockParticipantManager p : _participants) {
+      p.syncStop();
+    }
+    _controller.syncStop();
+    _routingTableProviderDefault.shutdown();
+    _routingTableProviderEV.shutdown();
+    _routingTableProviderCS.shutdown();
+    _spectator.disconnect();
+  }
+
   private void setupClusterConfig() {
     _stateModelDelay = 3L;
     ClusterConfig clusterConfig = 
_configAccessor.getClusterConfig(CLUSTER_NAME);
@@ -696,12 +729,21 @@ public class TestInstanceOperation extends ZkTestBase {
     // Assert canSwapBeCompleted is true
     Assert.assertTrue(_gSetupTool.getClusterManagementTool()
         .canCompleteSwap(CLUSTER_NAME, instanceToSwapOutName));
+
+    // Validate that the SWAP_OUT instance is in routing tables and SWAP_IN is 
not.
+    validateRoutingTablesInstance(getEVs(), instanceToSwapOutName, true);
+    validateRoutingTablesInstance(getEVs(), instanceToSwapInName, false);
+
     // Assert completeSwapIfPossible is true
     Assert.assertTrue(_gSetupTool.getClusterManagementTool()
         .completeSwapIfPossible(CLUSTER_NAME, instanceToSwapOutName));
 
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
 
+    // Validate that the SWAP_IN instance is now in the routing tables.
+    validateRoutingTablesInstance(getEVs(), instanceToSwapInName, true);
+
+
     // Assert that SWAP_OUT instance is disabled and has no partitions 
assigned to it.
     Assert.assertFalse(_gSetupTool.getClusterManagementTool()
         .getInstanceConfig(CLUSTER_NAME, 
instanceToSwapOutName).getInstanceEnabled());
@@ -760,6 +802,10 @@ public class TestInstanceOperation extends ZkTestBase {
     validateEVsCorrect(getEVs(), originalEVs, 
swapOutInstancesToSwapInInstances,
         Set.of(instanceToSwapInName), Collections.emptySet());
 
+    // Validate that the SWAP_OUT instance is in routing tables and SWAP_IN is 
not.
+    validateRoutingTablesInstance(getEVs(), instanceToSwapOutName, true);
+    validateRoutingTablesInstance(getEVs(), instanceToSwapInName, false);
+
     // Assert canSwapBeCompleted is true
     Assert.assertTrue(_gSetupTool.getClusterManagementTool()
         .canCompleteSwap(CLUSTER_NAME, instanceToSwapOutName));
@@ -821,6 +867,10 @@ public class TestInstanceOperation extends ZkTestBase {
     validateEVsCorrect(getEVs(), originalEVs, 
swapOutInstancesToSwapInInstances,
         Set.of(instanceToSwapInName), Collections.emptySet());
 
+    // Validate that the SWAP_OUT instance is in routing tables and SWAP_IN is 
not.
+    validateRoutingTablesInstance(getEVs(), instanceToSwapOutName, true);
+    validateRoutingTablesInstance(getEVs(), instanceToSwapInName, false);
+
     // Assert canSwapBeCompleted is true
     Assert.assertTrue(_gSetupTool.getClusterManagementTool()
         .canCompleteSwap(CLUSTER_NAME, instanceToSwapOutName));
@@ -832,6 +882,10 @@ public class TestInstanceOperation extends ZkTestBase {
     // Wait for cluster to converge.
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
 
+    // Validate that the SWAP_OUT instance is in routing tables and SWAP_IN is 
not.
+    validateRoutingTablesInstance(getEVs(), instanceToSwapOutName, true);
+    validateRoutingTablesInstance(getEVs(), instanceToSwapInName, false);
+
     // Validate there are no partitions on the SWAP_IN instance.
     Assert.assertEquals(getPartitionsAndStatesOnInstance(getEVs(), 
instanceToSwapInName).size(), 0);
 
@@ -905,6 +959,10 @@ public class TestInstanceOperation extends ZkTestBase {
     validateEVsCorrect(getEVs(), originalEVs, 
swapOutInstancesToSwapInInstances,
         Set.of(instanceToSwapInName), Collections.emptySet());
 
+    // Validate that the SWAP_OUT instance is in routing tables and SWAP_IN is 
not.
+    validateRoutingTablesInstance(getEVs(), instanceToSwapOutName, true);
+    validateRoutingTablesInstance(getEVs(), instanceToSwapInName, false);
+
     // Assert canSwapBeCompleted is true
     Assert.assertTrue(_gSetupTool.getClusterManagementTool()
         .canCompleteSwap(CLUSTER_NAME, instanceToSwapOutName));
@@ -914,6 +972,9 @@ public class TestInstanceOperation extends ZkTestBase {
 
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
 
+    // Validate that the SWAP_IN instance is now in the routing tables.
+    validateRoutingTablesInstance(getEVs(), instanceToSwapInName, true);
+
     // Assert that SWAP_OUT instance is disabled and has no partitions 
assigned to it.
     Assert.assertFalse(_gSetupTool.getClusterManagementTool()
         .getInstanceConfig(CLUSTER_NAME, 
instanceToSwapOutName).getInstanceEnabled());
@@ -1116,6 +1177,10 @@ public class TestInstanceOperation extends ZkTestBase {
     validateEVsCorrect(getEVs(), originalEVs, 
swapOutInstancesToSwapInInstances,
         Set.of(instanceToSwapInName), Collections.emptySet());
 
+    // Validate that the SWAP_OUT instance is in routing tables and SWAP_IN is 
not.
+    validateRoutingTablesInstance(getEVs(), instanceToSwapOutName, true);
+    validateRoutingTablesInstance(getEVs(), instanceToSwapInName, false);
+
     // Assert canSwapBeCompleted is true
     Assert.assertTrue(_gSetupTool.getClusterManagementTool()
         .canCompleteSwap(CLUSTER_NAME, instanceToSwapOutName));
@@ -1125,6 +1190,9 @@ public class TestInstanceOperation extends ZkTestBase {
 
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
 
+    // Validate that the SWAP_IN instance is now in the routing tables.
+    validateRoutingTablesInstance(getEVs(), instanceToSwapInName, true);
+
     // Assert that SWAP_OUT instance is disabled and has no partitions 
assigned to it.
     Assert.assertFalse(_gSetupTool.getClusterManagementTool()
         .getInstanceConfig(CLUSTER_NAME, 
instanceToSwapOutName).getInstanceEnabled());
@@ -1246,6 +1314,47 @@ public class TestInstanceOperation extends ZkTestBase {
     return instancePartitions;
   }
 
+  private Map<String, Map<String, String>> getResourcePartitionStateOnInstance(
+      Map<String, ExternalView> evs, String instanceName) {
+    Map<String, Map<String, String>> stateByPartitionByResource = new 
HashMap<>();
+    for (String resourceEV : evs.keySet()) {
+      for (String partition : evs.get(resourceEV).getPartitionSet()) {
+        if 
(evs.get(resourceEV).getStateMap(partition).containsKey(instanceName)) {
+          if (!stateByPartitionByResource.containsKey(resourceEV)) {
+            stateByPartitionByResource.put(resourceEV, new HashMap<>());
+          }
+          stateByPartitionByResource.get(resourceEV)
+              .put(partition, 
evs.get(resourceEV).getStateMap(partition).get(instanceName));
+        }
+      }
+    }
+
+    return stateByPartitionByResource;
+  }
+
+  private Set<String> getInstanceNames(Collection<InstanceConfig> 
instanceConfigs) {
+    return instanceConfigs.stream().map(InstanceConfig::getInstanceName)
+        .collect(Collectors.toSet());
+  }
+
+  private void validateRoutingTablesInstance(Map<String, ExternalView> evs, 
String instanceName,
+      boolean shouldContain) {
+    RoutingTableProvider[] routingTableProviders =
+        new RoutingTableProvider[]{_routingTableProviderDefault, 
_routingTableProviderEV, _routingTableProviderCS};
+    getResourcePartitionStateOnInstance(evs, instanceName).forEach((resource, 
partitions) -> {
+      partitions.forEach((partition, state) -> {
+        Arrays.stream(routingTableProviders).forEach(rtp -> 
Assert.assertEquals(
+            getInstanceNames(rtp.getInstancesForResource(resource, partition, 
state)).contains(
+                instanceName), shouldContain));
+      });
+    });
+
+    Arrays.stream(routingTableProviders).forEach(rtp -> {
+      
Assert.assertEquals(getInstanceNames(rtp.getInstanceConfigs()).contains(instanceName),
+          shouldContain);
+    });
+  }
+
   private void validateEVCorrect(ExternalView actual, ExternalView original,
       Map<String, String> swapOutInstancesToSwapInInstances, Set<String> 
inFlightSwapInInstances,
       Set<String> completedSwapInInstanceNames) {
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderFromCurrentStates.java
 
b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderFromCurrentStates.java
index e8f4f82b2..cbf299860 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderFromCurrentStates.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderFromCurrentStates.java
@@ -263,7 +263,7 @@ public class TestRoutingTableProviderFromCurrentStates 
extends ZkTestBase {
   }
 
   @Test(dependsOnMethods = "testRoutingTableWithCurrentStates")
-  public void TestInconsistentStateEventProcessing() throws Exception {
+  public void testInconsistentStateEventProcessing() throws Exception {
     // This test requires an additional HelixManager since one of the provider 
event processing will
     // be blocked.
     HelixManager helixManager = HelixManagerFactory
@@ -305,10 +305,10 @@ public class TestRoutingTableProviderFromCurrentStates 
extends ZkTestBase {
       IdealState idealState =
           
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
       String targetPartitionName = 
idealState.getPartitionSet().iterator().next();
-      // Wait until the routingtable is updated.
+      // Wait until the routing table is updated.
       BlockingCurrentStateRoutingTableProvider finalRoutingTableCS = 
routingTableCS;
       Assert.assertTrue(TestHelper.verify(
-          () -> finalRoutingTableCS.getInstances(db, targetPartitionName, 
"MASTER").size() > 0,
+          () -> !finalRoutingTableCS.getInstances(db, targetPartitionName, 
"MASTER").isEmpty(),
           2000));
       String targetNodeName =
           routingTableCS.getInstances(db, targetPartitionName, 
"MASTER").get(0).getInstanceName();
@@ -352,7 +352,7 @@ public class TestRoutingTableProviderFromCurrentStates 
extends ZkTestBase {
     }
   }
 
-  @Test(dependsOnMethods = { "TestInconsistentStateEventProcessing" })
+  @Test(dependsOnMethods = {"testInconsistentStateEventProcessing"})
   public void testWithSupportSourceDataType() {
     new RoutingTableProvider(_manager, PropertyType.EXTERNALVIEW).shutdown();
     new RoutingTableProvider(_manager, 
PropertyType.TARGETEXTERNALVIEW).shutdown();

Reply via email to