Repository: helix
Updated Branches:
  refs/heads/master 3ba447f97 -> 993beb383


Refine RoutingTable refresh() logic.

Simplify the construtors.
Refine Log string to a clearer statement.
Fixing a potential bug that empty instanceConfig is recorded in the 
RoutingTable.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/acea2f16
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/acea2f16
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/acea2f16

Branch: refs/heads/master
Commit: acea2f16bce6c439fdc8d1dfb8ad37650679f822
Parents: 3ba447f
Author: Jiajun Wang <[email protected]>
Authored: Fri Jun 29 15:07:53 2018 -0700
Committer: jiajunwang <[email protected]>
Committed: Tue Jul 17 11:52:25 2018 -0700

----------------------------------------------------------------------
 .../apache/helix/spectator/RoutingTable.java    | 86 ++++++++++----------
 .../spectator/TestRoutingTableProvider.java     | 70 ++++++++++------
 2 files changed, 88 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/acea2f16/helix-core/src/main/java/org/apache/helix/spectator/RoutingTable.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTable.java 
b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTable.java
index e3a3349..115131b 100644
--- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTable.java
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTable.java
@@ -47,6 +47,7 @@ class RoutingTable {
   private final Map<String, ResourceInfo> _resourceInfoMap;
   // mapping a resource group name to a resourceGroupInfo
   private final Map<String, ResourceGroupInfo> _resourceGroupInfoMap;
+
   private final Collection<LiveInstance> _liveInstances;
   private final Collection<InstanceConfig> _instanceConfigs;
   private final Collection<ExternalView> _externalViews;
@@ -56,32 +57,29 @@ class RoutingTable {
         Collections.<LiveInstance>emptyList());
   }
 
-  public RoutingTable(Collection<ExternalView> externalViews, 
Collection<InstanceConfig> instanceConfigs,
-      Collection<LiveInstance> liveInstances) {
-    _externalViews = externalViews;
-    _resourceInfoMap = new HashMap<>();
-    _resourceGroupInfoMap = new HashMap<>();
-    _liveInstances = new HashSet<>(liveInstances);
-    _instanceConfigs = new HashSet<>(instanceConfigs);
-    refresh(externalViews);
+  public RoutingTable(Map<String, Map<String, Map<String, CurrentState>>> 
currentStateMap,
+      Collection<InstanceConfig> instanceConfigs, Collection<LiveInstance> 
liveInstances) {
+    // TODO Aggregate currentState to an ExternalView in the RoutingTable, so 
there is no need to refresh according to the currentStateMap. - jjwang
+    this(Collections.<ExternalView>emptyList(), instanceConfigs, 
liveInstances);
+    refresh(currentStateMap);
   }
 
-  public RoutingTable(Map<String, Map<String, Map<String, CurrentState>>> 
currentStateMap,
+  public RoutingTable(Collection<ExternalView> externalViews,
       Collection<InstanceConfig> instanceConfigs, Collection<LiveInstance> 
liveInstances) {
-    _externalViews = Collections.emptyList();
     _resourceInfoMap = new HashMap<>();
     _resourceGroupInfoMap = new HashMap<>();
-    _liveInstances = liveInstances;
-    _instanceConfigs = instanceConfigs;
-    refresh(currentStateMap);
+    _liveInstances = new HashSet<>(liveInstances);
+    _instanceConfigs = new HashSet<>(instanceConfigs);
+    _externalViews = new HashSet<>(externalViews);
+    refresh(externalViews);
   }
 
   private void refresh(Collection<ExternalView> externalViewList) {
     Map<String, InstanceConfig> instanceConfigMap = new HashMap<>();
-    for (InstanceConfig config : _instanceConfigs) {
-      instanceConfigMap.put(config.getId(), config);
-    }
-    if (externalViewList != null) {
+    if (externalViewList != null && !externalViewList.isEmpty()) {
+      for (InstanceConfig config : _instanceConfigs) {
+        instanceConfigMap.put(config.getId(), config);
+      }
       for (ExternalView extView : externalViewList) {
         String resourceName = extView.getId();
         for (String partitionName : extView.getPartitionSet()) {
@@ -97,8 +95,10 @@ class RoutingTable {
                 addEntry(resourceName, partitionName, currentState, 
instanceConfig);
               }
             } else {
-              logger.error("Invalid instance name. " + instanceName
-                  + " .Not found in /cluster/configs/. instanceName: ");
+              logger.warn(
+                  "Participant {} is not found with proper configuration 
information. It might already be removed from the cluster. "
+                      + "Skip recording partition assignment entry: Partition 
{}, Participant {}, State {}.",
+                  instanceName, partitionName, instanceName, 
stateMap.get(instanceName));
             }
           }
         }
@@ -108,32 +108,36 @@ class RoutingTable {
 
   private void refresh(Map<String, Map<String, Map<String, CurrentState>>> 
currentStateMap) {
     Map<String, InstanceConfig> instanceConfigMap = new HashMap<>();
-    for (InstanceConfig config : _instanceConfigs) {
-      instanceConfigMap.put(config.getId(), config);
-    }
-
-    for (LiveInstance liveInstance : _liveInstances) {
-      String instanceName = liveInstance.getInstanceName();
-      String sessionId = liveInstance.getSessionId();
-      InstanceConfig instanceConfig = instanceConfigMap.get(instanceName);
-      if (instanceConfig == null) {
-        logger.error("Invalid instance name. " + instanceName
-            + " .Not found in /cluster/configs/. instanceName: ");
+    if (currentStateMap != null && !currentStateMap.isEmpty()) {
+      for (InstanceConfig config : _instanceConfigs) {
+        instanceConfigMap.put(config.getId(), config);
       }
+      for (LiveInstance liveInstance : _liveInstances) {
+        String instanceName = liveInstance.getInstanceName();
+        String sessionId = liveInstance.getSessionId();
+        InstanceConfig instanceConfig = instanceConfigMap.get(instanceName);
+        if (instanceConfig == null) {
+          logger.warn(
+              "Participant {} is not found with proper configuration 
information. It might already be removed from the cluster. "
+                  + "Skip recording partition assignments that are related to 
this instance.",
+              instanceName);
+          continue;
+        }
 
-      Map<String, CurrentState> currentStates = Collections.emptyMap();
-      if (currentStateMap.containsKey(instanceName) && 
currentStateMap.get(instanceName)
-          .containsKey(sessionId)) {
-        currentStates = currentStateMap.get(instanceName).get(sessionId);
-      }
+        Map<String, CurrentState> currentStates = Collections.emptyMap();
+        if (currentStateMap.containsKey(instanceName) && 
currentStateMap.get(instanceName)
+            .containsKey(sessionId)) {
+          currentStates = currentStateMap.get(instanceName).get(sessionId);
+        }
 
-      for (CurrentState currentState : currentStates.values()) {
-        String resourceName = currentState.getResourceName();
-        Map<String, String> stateMap = currentState.getPartitionStateMap();
+        for (CurrentState currentState : currentStates.values()) {
+          String resourceName = currentState.getResourceName();
+          Map<String, String> stateMap = currentState.getPartitionStateMap();
 
-        for (String partitionName : stateMap.keySet()) {
-          String state = stateMap.get(partitionName);
-          addEntry(resourceName, partitionName, state, instanceConfig);
+          for (String partitionName : stateMap.keySet()) {
+            String state = stateMap.get(partitionName);
+            addEntry(resourceName, partitionName, state, instanceConfig);
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/acea2f16/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProvider.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProvider.java
 
b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProvider.java
index dbe621c..01fa6df 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProvider.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProvider.java
@@ -11,6 +11,7 @@ import java.util.Set;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyType;
 import org.apache.helix.api.listeners.RoutingTableChangeListener;
 import org.apache.helix.common.ZkTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
@@ -41,12 +42,13 @@ public class TestRoutingTableProvider extends ZkTestBase {
   static final int REPLICA_NUMBER = 3;
 
   private HelixManager _spectator;
-  private List<MockParticipantManager> _participants = new 
ArrayList<MockParticipantManager>();
+  private List<MockParticipantManager> _participants = new ArrayList<>();
   private List<String> _instances = new ArrayList<>();
   private ClusterControllerManager _controller;
   private ZkHelixClusterVerifier _clusterVerifier;
-  private RoutingTableProvider _routingTableProvider;
-  private RoutingTableProvider _routingTableProvider2;
+  private RoutingTableProvider _routingTableProvider_default;
+  private RoutingTableProvider _routingTableProvider_ev;
+  private RoutingTableProvider _routingTableProvider_cs;
   private boolean _listenerTestResult = true;
 
   class MockRoutingTableChangeListener implements RoutingTableChangeListener {
@@ -101,15 +103,16 @@ public class TestRoutingTableProvider extends ZkTestBase {
     _controller.syncStart();
 
     // start speculator
-    _routingTableProvider = new RoutingTableProvider();
+    _routingTableProvider_default = new RoutingTableProvider();
     _spectator = HelixManagerFactory
         .getZKHelixManager(CLUSTER_NAME, "spectator", InstanceType.SPECTATOR, 
ZK_ADDR);
     _spectator.connect();
-    _spectator.addExternalViewChangeListener(_routingTableProvider);
-    _spectator.addLiveInstanceChangeListener(_routingTableProvider);
-    _spectator.addInstanceConfigChangeListener(_routingTableProvider);
+    _spectator.addExternalViewChangeListener(_routingTableProvider_default);
+    _spectator.addLiveInstanceChangeListener(_routingTableProvider_default);
+    _spectator.addInstanceConfigChangeListener(_routingTableProvider_default);
 
-    _routingTableProvider2 = new RoutingTableProvider(_spectator);
+    _routingTableProvider_ev = new RoutingTableProvider(_spectator);
+    _routingTableProvider_cs = new RoutingTableProvider(_spectator, 
PropertyType.CURRENTSTATES);
 
     _clusterVerifier = new 
BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkClient(_gZkClient).build();
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
@@ -122,26 +125,31 @@ public class TestRoutingTableProvider extends ZkTestBase {
       p.syncStop();
     }
     _controller.syncStop();
-    _routingTableProvider.shutdown();
-    _routingTableProvider2.shutdown();
+    _routingTableProvider_default.shutdown();
+    _routingTableProvider_ev.shutdown();
     _spectator.disconnect();
     deleteCluster(CLUSTER_NAME);
   }
 
   @Test
   public void testRoutingTable() {
-    Assert.assertEquals(_routingTableProvider.getLiveInstances().size(), 
_instances.size());
-    Assert.assertEquals(_routingTableProvider.getInstanceConfigs().size(), 
_instances.size());
+    
Assert.assertEquals(_routingTableProvider_default.getLiveInstances().size(), 
_instances.size());
+    
Assert.assertEquals(_routingTableProvider_default.getInstanceConfigs().size(), 
_instances.size());
 
-    Assert.assertEquals(_routingTableProvider2.getLiveInstances().size(), 
_instances.size());
-    Assert.assertEquals(_routingTableProvider2.getInstanceConfigs().size(), 
_instances.size());
+    Assert.assertEquals(_routingTableProvider_ev.getLiveInstances().size(), 
_instances.size());
+    Assert.assertEquals(_routingTableProvider_ev.getInstanceConfigs().size(), 
_instances.size());
 
-    validateRoutingTable(_routingTableProvider, Sets.newSet(_instances.get(0)),
+    Assert.assertEquals(_routingTableProvider_cs.getLiveInstances().size(), 
_instances.size());
+    Assert.assertEquals(_routingTableProvider_cs.getInstanceConfigs().size(), 
_instances.size());
+
+    validateRoutingTable(_routingTableProvider_default, 
Sets.newSet(_instances.get(0)),
+        Sets.newSet(_instances.get(1), _instances.get(2)));
+    validateRoutingTable(_routingTableProvider_ev, 
Sets.newSet(_instances.get(0)),
         Sets.newSet(_instances.get(1), _instances.get(2)));
-    validateRoutingTable(_routingTableProvider2, 
Sets.newSet(_instances.get(0)),
+    validateRoutingTable(_routingTableProvider_cs, 
Sets.newSet(_instances.get(0)),
         Sets.newSet(_instances.get(1), _instances.get(2)));
 
-    Collection<String> databases = _routingTableProvider.getResources();
+    Collection<String> databases = 
_routingTableProvider_default.getResources();
     Assert.assertEquals(databases.size(), 1);
   }
 
@@ -152,9 +160,11 @@ public class TestRoutingTableProvider extends ZkTestBase {
     _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, 
prevMasterInstance, false);
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
 
-    validateRoutingTable(_routingTableProvider, Sets.newSet(_instances.get(1)),
+    validateRoutingTable(_routingTableProvider_default, 
Sets.newSet(_instances.get(1)),
+        Sets.newSet(_instances.get(2)));
+    validateRoutingTable(_routingTableProvider_ev, 
Sets.newSet(_instances.get(1)),
         Sets.newSet(_instances.get(2)));
-    validateRoutingTable(_routingTableProvider2, 
Sets.newSet(_instances.get(1)),
+    validateRoutingTable(_routingTableProvider_cs, 
Sets.newSet(_instances.get(1)),
         Sets.newSet(_instances.get(2)));
   }
 
@@ -165,8 +175,9 @@ public class TestRoutingTableProvider extends ZkTestBase {
     Map<String, Set<String>> context = new HashMap<>();
     context.put("MASTER", Sets.newSet(_instances.get(0)));
     context.put("SLAVE", Sets.newSet(_instances.get(1), _instances.get(2)));
-    
_routingTableProvider.addRoutingTableChangeListener(routingTableChangeListener, 
context);
-    _routingTableProvider.addRoutingTableChangeListener(new 
MockRoutingTableChangeListener(), null);
+    
_routingTableProvider_default.addRoutingTableChangeListener(routingTableChangeListener,
 context);
+    _routingTableProvider_default
+        .addRoutingTableChangeListener(new MockRoutingTableChangeListener(), 
null);
     // reenable the master instance to cause change
     String prevMasterInstance = _instances.get(0);
     _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, 
prevMasterInstance, true);
@@ -182,15 +193,20 @@ public class TestRoutingTableProvider extends ZkTestBase {
 
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
 
-    Assert.assertEquals(_routingTableProvider.getLiveInstances().size(), 
_instances.size() - 1);
-    Assert.assertEquals(_routingTableProvider.getInstanceConfigs().size(), 
_instances.size());
+    
Assert.assertEquals(_routingTableProvider_default.getLiveInstances().size(), 
_instances.size() - 1);
+    
Assert.assertEquals(_routingTableProvider_default.getInstanceConfigs().size(), 
_instances.size());
 
-    Assert.assertEquals(_routingTableProvider2.getLiveInstances().size(), 
_instances.size() - 1);
-    Assert.assertEquals(_routingTableProvider2.getInstanceConfigs().size(), 
_instances.size());
+    Assert.assertEquals(_routingTableProvider_ev.getLiveInstances().size(), 
_instances.size() - 1);
+    Assert.assertEquals(_routingTableProvider_ev.getInstanceConfigs().size(), 
_instances.size());
 
-    validateRoutingTable(_routingTableProvider, Sets.newSet(_instances.get(0)),
+    Assert.assertEquals(_routingTableProvider_cs.getLiveInstances().size(), 
_instances.size() - 1);
+    Assert.assertEquals(_routingTableProvider_cs.getInstanceConfigs().size(), 
_instances.size());
+
+    validateRoutingTable(_routingTableProvider_default, 
Sets.newSet(_instances.get(0)),
+        Sets.newSet(_instances.get(2)));
+    validateRoutingTable(_routingTableProvider_ev, 
Sets.newSet(_instances.get(0)),
         Sets.newSet(_instances.get(2)));
-    validateRoutingTable(_routingTableProvider2, 
Sets.newSet(_instances.get(0)),
+    validateRoutingTable(_routingTableProvider_cs, 
Sets.newSet(_instances.get(0)),
         Sets.newSet(_instances.get(2)));
   }
 

Reply via email to