Repository: helix Updated Branches: refs/heads/master 3ba447f97 -> 993beb383
Refine RoutingTable refresh() logic. Simplify the construtors. Refine Log string to a clearer statement. Fixing a potential bug that empty instanceConfig is recorded in the RoutingTable. Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/acea2f16 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/acea2f16 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/acea2f16 Branch: refs/heads/master Commit: acea2f16bce6c439fdc8d1dfb8ad37650679f822 Parents: 3ba447f Author: Jiajun Wang <[email protected]> Authored: Fri Jun 29 15:07:53 2018 -0700 Committer: jiajunwang <[email protected]> Committed: Tue Jul 17 11:52:25 2018 -0700 ---------------------------------------------------------------------- .../apache/helix/spectator/RoutingTable.java | 86 ++++++++++---------- .../spectator/TestRoutingTableProvider.java | 70 ++++++++++------ 2 files changed, 88 insertions(+), 68 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/acea2f16/helix-core/src/main/java/org/apache/helix/spectator/RoutingTable.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTable.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTable.java index e3a3349..115131b 100644 --- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTable.java +++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTable.java @@ -47,6 +47,7 @@ class RoutingTable { private final Map<String, ResourceInfo> _resourceInfoMap; // mapping a resource group name to a resourceGroupInfo private final Map<String, ResourceGroupInfo> _resourceGroupInfoMap; + private final Collection<LiveInstance> _liveInstances; private final Collection<InstanceConfig> _instanceConfigs; private final Collection<ExternalView> _externalViews; @@ -56,32 +57,29 @@ class RoutingTable { Collections.<LiveInstance>emptyList()); } - public RoutingTable(Collection<ExternalView> externalViews, Collection<InstanceConfig> instanceConfigs, - Collection<LiveInstance> liveInstances) { - _externalViews = externalViews; - _resourceInfoMap = new HashMap<>(); - _resourceGroupInfoMap = new HashMap<>(); - _liveInstances = new HashSet<>(liveInstances); - _instanceConfigs = new HashSet<>(instanceConfigs); - refresh(externalViews); + public RoutingTable(Map<String, Map<String, Map<String, CurrentState>>> currentStateMap, + Collection<InstanceConfig> instanceConfigs, Collection<LiveInstance> liveInstances) { + // TODO Aggregate currentState to an ExternalView in the RoutingTable, so there is no need to refresh according to the currentStateMap. - jjwang + this(Collections.<ExternalView>emptyList(), instanceConfigs, liveInstances); + refresh(currentStateMap); } - public RoutingTable(Map<String, Map<String, Map<String, CurrentState>>> currentStateMap, + public RoutingTable(Collection<ExternalView> externalViews, Collection<InstanceConfig> instanceConfigs, Collection<LiveInstance> liveInstances) { - _externalViews = Collections.emptyList(); _resourceInfoMap = new HashMap<>(); _resourceGroupInfoMap = new HashMap<>(); - _liveInstances = liveInstances; - _instanceConfigs = instanceConfigs; - refresh(currentStateMap); + _liveInstances = new HashSet<>(liveInstances); + _instanceConfigs = new HashSet<>(instanceConfigs); + _externalViews = new HashSet<>(externalViews); + refresh(externalViews); } private void refresh(Collection<ExternalView> externalViewList) { Map<String, InstanceConfig> instanceConfigMap = new HashMap<>(); - for (InstanceConfig config : _instanceConfigs) { - instanceConfigMap.put(config.getId(), config); - } - if (externalViewList != null) { + if (externalViewList != null && !externalViewList.isEmpty()) { + for (InstanceConfig config : _instanceConfigs) { + instanceConfigMap.put(config.getId(), config); + } for (ExternalView extView : externalViewList) { String resourceName = extView.getId(); for (String partitionName : extView.getPartitionSet()) { @@ -97,8 +95,10 @@ class RoutingTable { addEntry(resourceName, partitionName, currentState, instanceConfig); } } else { - logger.error("Invalid instance name. " + instanceName - + " .Not found in /cluster/configs/. instanceName: "); + logger.warn( + "Participant {} is not found with proper configuration information. It might already be removed from the cluster. " + + "Skip recording partition assignment entry: Partition {}, Participant {}, State {}.", + instanceName, partitionName, instanceName, stateMap.get(instanceName)); } } } @@ -108,32 +108,36 @@ class RoutingTable { private void refresh(Map<String, Map<String, Map<String, CurrentState>>> currentStateMap) { Map<String, InstanceConfig> instanceConfigMap = new HashMap<>(); - for (InstanceConfig config : _instanceConfigs) { - instanceConfigMap.put(config.getId(), config); - } - - for (LiveInstance liveInstance : _liveInstances) { - String instanceName = liveInstance.getInstanceName(); - String sessionId = liveInstance.getSessionId(); - InstanceConfig instanceConfig = instanceConfigMap.get(instanceName); - if (instanceConfig == null) { - logger.error("Invalid instance name. " + instanceName - + " .Not found in /cluster/configs/. instanceName: "); + if (currentStateMap != null && !currentStateMap.isEmpty()) { + for (InstanceConfig config : _instanceConfigs) { + instanceConfigMap.put(config.getId(), config); } + for (LiveInstance liveInstance : _liveInstances) { + String instanceName = liveInstance.getInstanceName(); + String sessionId = liveInstance.getSessionId(); + InstanceConfig instanceConfig = instanceConfigMap.get(instanceName); + if (instanceConfig == null) { + logger.warn( + "Participant {} is not found with proper configuration information. It might already be removed from the cluster. " + + "Skip recording partition assignments that are related to this instance.", + instanceName); + continue; + } - Map<String, CurrentState> currentStates = Collections.emptyMap(); - if (currentStateMap.containsKey(instanceName) && currentStateMap.get(instanceName) - .containsKey(sessionId)) { - currentStates = currentStateMap.get(instanceName).get(sessionId); - } + Map<String, CurrentState> currentStates = Collections.emptyMap(); + if (currentStateMap.containsKey(instanceName) && currentStateMap.get(instanceName) + .containsKey(sessionId)) { + currentStates = currentStateMap.get(instanceName).get(sessionId); + } - for (CurrentState currentState : currentStates.values()) { - String resourceName = currentState.getResourceName(); - Map<String, String> stateMap = currentState.getPartitionStateMap(); + for (CurrentState currentState : currentStates.values()) { + String resourceName = currentState.getResourceName(); + Map<String, String> stateMap = currentState.getPartitionStateMap(); - for (String partitionName : stateMap.keySet()) { - String state = stateMap.get(partitionName); - addEntry(resourceName, partitionName, state, instanceConfig); + for (String partitionName : stateMap.keySet()) { + String state = stateMap.get(partitionName); + addEntry(resourceName, partitionName, state, instanceConfig); + } } } } http://git-wip-us.apache.org/repos/asf/helix/blob/acea2f16/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProvider.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProvider.java b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProvider.java index dbe621c..01fa6df 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProvider.java +++ b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProvider.java @@ -11,6 +11,7 @@ import java.util.Set; import org.apache.helix.HelixManager; import org.apache.helix.HelixManagerFactory; import org.apache.helix.InstanceType; +import org.apache.helix.PropertyType; import org.apache.helix.api.listeners.RoutingTableChangeListener; import org.apache.helix.common.ZkTestBase; import org.apache.helix.integration.manager.ClusterControllerManager; @@ -41,12 +42,13 @@ public class TestRoutingTableProvider extends ZkTestBase { static final int REPLICA_NUMBER = 3; private HelixManager _spectator; - private List<MockParticipantManager> _participants = new ArrayList<MockParticipantManager>(); + private List<MockParticipantManager> _participants = new ArrayList<>(); private List<String> _instances = new ArrayList<>(); private ClusterControllerManager _controller; private ZkHelixClusterVerifier _clusterVerifier; - private RoutingTableProvider _routingTableProvider; - private RoutingTableProvider _routingTableProvider2; + private RoutingTableProvider _routingTableProvider_default; + private RoutingTableProvider _routingTableProvider_ev; + private RoutingTableProvider _routingTableProvider_cs; private boolean _listenerTestResult = true; class MockRoutingTableChangeListener implements RoutingTableChangeListener { @@ -101,15 +103,16 @@ public class TestRoutingTableProvider extends ZkTestBase { _controller.syncStart(); // start speculator - _routingTableProvider = new RoutingTableProvider(); + _routingTableProvider_default = new RoutingTableProvider(); _spectator = HelixManagerFactory .getZKHelixManager(CLUSTER_NAME, "spectator", InstanceType.SPECTATOR, ZK_ADDR); _spectator.connect(); - _spectator.addExternalViewChangeListener(_routingTableProvider); - _spectator.addLiveInstanceChangeListener(_routingTableProvider); - _spectator.addInstanceConfigChangeListener(_routingTableProvider); + _spectator.addExternalViewChangeListener(_routingTableProvider_default); + _spectator.addLiveInstanceChangeListener(_routingTableProvider_default); + _spectator.addInstanceConfigChangeListener(_routingTableProvider_default); - _routingTableProvider2 = new RoutingTableProvider(_spectator); + _routingTableProvider_ev = new RoutingTableProvider(_spectator); + _routingTableProvider_cs = new RoutingTableProvider(_spectator, PropertyType.CURRENTSTATES); _clusterVerifier = new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkClient(_gZkClient).build(); Assert.assertTrue(_clusterVerifier.verifyByPolling()); @@ -122,26 +125,31 @@ public class TestRoutingTableProvider extends ZkTestBase { p.syncStop(); } _controller.syncStop(); - _routingTableProvider.shutdown(); - _routingTableProvider2.shutdown(); + _routingTableProvider_default.shutdown(); + _routingTableProvider_ev.shutdown(); _spectator.disconnect(); deleteCluster(CLUSTER_NAME); } @Test public void testRoutingTable() { - Assert.assertEquals(_routingTableProvider.getLiveInstances().size(), _instances.size()); - Assert.assertEquals(_routingTableProvider.getInstanceConfigs().size(), _instances.size()); + Assert.assertEquals(_routingTableProvider_default.getLiveInstances().size(), _instances.size()); + Assert.assertEquals(_routingTableProvider_default.getInstanceConfigs().size(), _instances.size()); - Assert.assertEquals(_routingTableProvider2.getLiveInstances().size(), _instances.size()); - Assert.assertEquals(_routingTableProvider2.getInstanceConfigs().size(), _instances.size()); + Assert.assertEquals(_routingTableProvider_ev.getLiveInstances().size(), _instances.size()); + Assert.assertEquals(_routingTableProvider_ev.getInstanceConfigs().size(), _instances.size()); - validateRoutingTable(_routingTableProvider, Sets.newSet(_instances.get(0)), + Assert.assertEquals(_routingTableProvider_cs.getLiveInstances().size(), _instances.size()); + Assert.assertEquals(_routingTableProvider_cs.getInstanceConfigs().size(), _instances.size()); + + validateRoutingTable(_routingTableProvider_default, Sets.newSet(_instances.get(0)), + Sets.newSet(_instances.get(1), _instances.get(2))); + validateRoutingTable(_routingTableProvider_ev, Sets.newSet(_instances.get(0)), Sets.newSet(_instances.get(1), _instances.get(2))); - validateRoutingTable(_routingTableProvider2, Sets.newSet(_instances.get(0)), + validateRoutingTable(_routingTableProvider_cs, Sets.newSet(_instances.get(0)), Sets.newSet(_instances.get(1), _instances.get(2))); - Collection<String> databases = _routingTableProvider.getResources(); + Collection<String> databases = _routingTableProvider_default.getResources(); Assert.assertEquals(databases.size(), 1); } @@ -152,9 +160,11 @@ public class TestRoutingTableProvider extends ZkTestBase { _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, prevMasterInstance, false); Assert.assertTrue(_clusterVerifier.verifyByPolling()); - validateRoutingTable(_routingTableProvider, Sets.newSet(_instances.get(1)), + validateRoutingTable(_routingTableProvider_default, Sets.newSet(_instances.get(1)), + Sets.newSet(_instances.get(2))); + validateRoutingTable(_routingTableProvider_ev, Sets.newSet(_instances.get(1)), Sets.newSet(_instances.get(2))); - validateRoutingTable(_routingTableProvider2, Sets.newSet(_instances.get(1)), + validateRoutingTable(_routingTableProvider_cs, Sets.newSet(_instances.get(1)), Sets.newSet(_instances.get(2))); } @@ -165,8 +175,9 @@ public class TestRoutingTableProvider extends ZkTestBase { Map<String, Set<String>> context = new HashMap<>(); context.put("MASTER", Sets.newSet(_instances.get(0))); context.put("SLAVE", Sets.newSet(_instances.get(1), _instances.get(2))); - _routingTableProvider.addRoutingTableChangeListener(routingTableChangeListener, context); - _routingTableProvider.addRoutingTableChangeListener(new MockRoutingTableChangeListener(), null); + _routingTableProvider_default.addRoutingTableChangeListener(routingTableChangeListener, context); + _routingTableProvider_default + .addRoutingTableChangeListener(new MockRoutingTableChangeListener(), null); // reenable the master instance to cause change String prevMasterInstance = _instances.get(0); _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, prevMasterInstance, true); @@ -182,15 +193,20 @@ public class TestRoutingTableProvider extends ZkTestBase { Assert.assertTrue(_clusterVerifier.verifyByPolling()); - Assert.assertEquals(_routingTableProvider.getLiveInstances().size(), _instances.size() - 1); - Assert.assertEquals(_routingTableProvider.getInstanceConfigs().size(), _instances.size()); + Assert.assertEquals(_routingTableProvider_default.getLiveInstances().size(), _instances.size() - 1); + Assert.assertEquals(_routingTableProvider_default.getInstanceConfigs().size(), _instances.size()); - Assert.assertEquals(_routingTableProvider2.getLiveInstances().size(), _instances.size() - 1); - Assert.assertEquals(_routingTableProvider2.getInstanceConfigs().size(), _instances.size()); + Assert.assertEquals(_routingTableProvider_ev.getLiveInstances().size(), _instances.size() - 1); + Assert.assertEquals(_routingTableProvider_ev.getInstanceConfigs().size(), _instances.size()); - validateRoutingTable(_routingTableProvider, Sets.newSet(_instances.get(0)), + Assert.assertEquals(_routingTableProvider_cs.getLiveInstances().size(), _instances.size() - 1); + Assert.assertEquals(_routingTableProvider_cs.getInstanceConfigs().size(), _instances.size()); + + validateRoutingTable(_routingTableProvider_default, Sets.newSet(_instances.get(0)), + Sets.newSet(_instances.get(2))); + validateRoutingTable(_routingTableProvider_ev, Sets.newSet(_instances.get(0)), Sets.newSet(_instances.get(2))); - validateRoutingTable(_routingTableProvider2, Sets.newSet(_instances.get(0)), + validateRoutingTable(_routingTableProvider_cs, Sets.newSet(_instances.get(0)), Sets.newSet(_instances.get(2))); }
