http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java new file mode 100644 index 0000000..1a21ef9 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java @@ -0,0 +1,176 @@ +package org.apache.helix.messaging.p2pMessage; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.List; +import java.util.Map; +import org.apache.helix.HelixConstants; +import org.apache.helix.controller.common.PartitionStateMap; +import org.apache.helix.controller.pipeline.Pipeline; +import org.apache.helix.controller.stages.AttributeName; +import org.apache.helix.controller.stages.BaseStageTest; +import org.apache.helix.controller.stages.BestPossibleStateCalcStage; +import org.apache.helix.controller.stages.BestPossibleStateOutput; +import org.apache.helix.controller.stages.ClusterDataCache; +import org.apache.helix.controller.stages.CurrentStateOutput; +import org.apache.helix.controller.stages.IntermediateStateCalcStage; +import org.apache.helix.controller.stages.MessageGenerationPhase; +import org.apache.helix.controller.stages.MessageSelectionStage; +import org.apache.helix.controller.stages.MessageSelectionStageOutput; +import org.apache.helix.controller.stages.MessageThrottleStage; +import org.apache.helix.controller.stages.ReadClusterDataStage; +import org.apache.helix.model.BuiltInStateModelDefinitions; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.MasterSlaveSMD; +import org.apache.helix.model.Message; +import org.apache.helix.model.Partition; +import org.apache.helix.model.Resource; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +public class TestP2PStateTransitionMessages extends BaseStageTest { + String db = "testDB"; + int numPartition = 1; + int numReplica = 3; + + + private void preSetup() { + setupIdealState(3, new String[]{db}, numPartition, numReplica, IdealState.RebalanceMode.SEMI_AUTO, + BuiltInStateModelDefinitions.MasterSlave.name()); + setupStateModel(); + setupInstances(3); + setupLiveInstances(3); + } + + @Test + public void testP2PMessageEnabled() throws Exception { + preSetup(); + ClusterConfig clusterConfig = new ClusterConfig(_clusterName); + clusterConfig.enableP2PMessage(true); + setClusterConfig(clusterConfig); + + testP2PMessage(clusterConfig, true); + } + + @Test + public void testP2PMessageDisabled() throws Exception { + preSetup(); + testP2PMessage(null, false); + } + + private void testP2PMessage(ClusterConfig clusterConfig, Boolean p2pMessageEnabled) throws Exception { + Map<String, Resource> resourceMap = + getResourceMap(new String[]{db}, numPartition, BuiltInStateModelDefinitions.MasterSlave.name(), clusterConfig, + null); + + event.addAttribute(AttributeName.RESOURCES.name(), resourceMap); + event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resourceMap); + event.addAttribute(AttributeName.CURRENT_STATE.name(), new CurrentStateOutput()); + event.addAttribute(AttributeName.helixmanager.name(), manager); + + Pipeline pipeline = createPipeline(); + pipeline.handle(event); + + BestPossibleStateOutput bestPossibleStateOutput = event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name()); + + CurrentStateOutput currentStateOutput = populateCurrentStateFromBestPossible(bestPossibleStateOutput); + event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput); + + Partition p = new Partition(db + "_0"); + + String masterInstance = + getTopStateInstance(bestPossibleStateOutput.getInstanceStateMap(db, p), MasterSlaveSMD.States.MASTER.name()); + Assert.assertNotNull(masterInstance); + + admin.enableInstance(_clusterName, masterInstance, false); + ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name()); + cache.notifyDataChange(HelixConstants.ChangeType.INSTANCE_CONFIG); + + pipeline.handle(event); + + bestPossibleStateOutput = event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name()); + + MessageSelectionStageOutput messageOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.name()); + List<Message> messages = messageOutput.getMessages(db, p); + + Assert.assertEquals(messages.size(), 1); + Message message = messages.get(0); + Assert.assertEquals(message.getTgtName(), masterInstance); + Assert.assertEquals(message.getFromState(), MasterSlaveSMD.States.MASTER.name()); + Assert.assertEquals(message.getToState(), MasterSlaveSMD.States.SLAVE.name()); + + if (p2pMessageEnabled) { + Assert.assertEquals(message.getRelayMessages().entrySet().size(), 1); + String newMasterInstance = + getTopStateInstance(bestPossibleStateOutput.getInstanceStateMap(db, p), MasterSlaveSMD.States.MASTER.name()); + + Message relayMessage = message.getRelayMessage(newMasterInstance); + Assert.assertNotNull(relayMessage); + Assert.assertEquals(relayMessage.getMsgSubType(), Message.MessageType.RELAYED_MESSAGE.name()); + Assert.assertEquals(relayMessage.getTgtName(), newMasterInstance); + Assert.assertEquals(relayMessage.getRelaySrcHost(), masterInstance); + Assert.assertEquals(relayMessage.getFromState(), MasterSlaveSMD.States.SLAVE.name()); + Assert.assertEquals(relayMessage.getToState(), MasterSlaveSMD.States.MASTER.name()); + } else { + Assert.assertTrue(message.getRelayMessages().entrySet().isEmpty()); + } + } + + private String getTopStateInstance(Map<String, String> instanceStateMap, String topState) { + String masterInstance = null; + for (Map.Entry<String, String> e : instanceStateMap.entrySet()) { + if (topState.equals(e.getValue())) { + masterInstance = e.getKey(); + } + } + + return masterInstance; + } + + private CurrentStateOutput populateCurrentStateFromBestPossible(BestPossibleStateOutput bestPossibleStateOutput) { + CurrentStateOutput currentStateOutput = new CurrentStateOutput(); + for (String resource : bestPossibleStateOutput.getResourceStatesMap().keySet()) { + PartitionStateMap partitionStateMap = bestPossibleStateOutput.getPartitionStateMap(resource); + for (Partition p : partitionStateMap.partitionSet()) { + Map<String, String> stateMap = partitionStateMap.getPartitionMap(p); + + for (Map.Entry<String, String> e : stateMap.entrySet()) { + currentStateOutput.setCurrentState(resource, p, e.getKey(), e.getValue()); + } + } + } + return currentStateOutput; + } + + private Pipeline createPipeline() { + Pipeline pipeline = new Pipeline("test"); + pipeline.addStage(new ReadClusterDataStage()); + pipeline.addStage(new BestPossibleStateCalcStage()); + pipeline.addStage(new IntermediateStateCalcStage()); + pipeline.addStage(new MessageGenerationPhase()); + pipeline.addStage(new MessageSelectionStage()); + pipeline.addStage(new MessageThrottleStage()); + + return pipeline; + } +}
http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/test/java/org/apache/helix/mock/MockBaseDataAccessor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockBaseDataAccessor.java b/helix-core/src/test/java/org/apache/helix/mock/MockBaseDataAccessor.java index f84565b..77da401 100644 --- a/helix-core/src/test/java/org/apache/helix/mock/MockBaseDataAccessor.java +++ b/helix-core/src/test/java/org/apache/helix/mock/MockBaseDataAccessor.java @@ -150,7 +150,7 @@ public class MockBaseDataAccessor implements BaseDataAccessor<ZNRecord> { @Override public List<ZNRecord> get(List<String> paths, List<Stat> stats, int options) { - List<ZNRecord> records = new ArrayList<ZNRecord>(); + List<ZNRecord> records = new ArrayList<>(); for (int i = 0; i < paths.size(); i++) { ZNRecord record = get(paths.get(i), stats.get(i), options); records.add(record); @@ -160,7 +160,7 @@ public class MockBaseDataAccessor implements BaseDataAccessor<ZNRecord> { @Override public List<ZNRecord> getChildren(String parentPath, List<Stat> stats, int options) { - List<ZNRecord> children = new ArrayList<ZNRecord>(); + List<ZNRecord> children = new ArrayList<>(); for (String key : _recordMap.keySet()) { if (key.startsWith(parentPath)) { String[] keySplit = key.split("\\/"); @@ -182,7 +182,7 @@ public class MockBaseDataAccessor implements BaseDataAccessor<ZNRecord> { @Override public List<String> getChildNames(String parentPath, int options) { - List<String> child = new ArrayList<String>(); + List<String> child = new ArrayList<>(); for (String key : _recordMap.keySet()) { if (key.startsWith(parentPath)) { String[] keySplit = key.split("\\/"); http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java index 8679007..037d92b 100644 --- a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java +++ b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import org.apache.helix.BaseDataAccessor; +import org.apache.helix.ConfigAccessor; import org.apache.helix.HelixAdmin; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixException; @@ -215,8 +216,19 @@ public class MockHelixAdmin implements HelixAdmin { } - @Override public void enableInstance(String clusterName, String instanceName, boolean enabled) { + @Override + public void enableInstance(String clusterName, String instanceName, boolean enabled) { + String instanceConfigsPath = PropertyPathBuilder.instanceConfig(clusterName); + if (!_baseDataAccessor.exists(instanceConfigsPath, 0)) { + _baseDataAccessor.create(instanceConfigsPath, new ZNRecord(instanceName), 0); + } + + String instanceConfigPath = instanceConfigsPath + "/" + instanceName; + ZNRecord record = (ZNRecord) _baseDataAccessor.get(instanceConfigPath, null, 0); + InstanceConfig instanceConfig = new InstanceConfig(record); + instanceConfig.setInstanceEnabled(enabled); + _baseDataAccessor.set(instanceConfigPath, instanceConfig.getRecord(), 0); } @Override public void enableInstance(String clusterName, List<String> instances, http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDisableResourceMbean.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDisableResourceMbean.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDisableResourceMbean.java index 8c05626..143f3c0 100644 --- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDisableResourceMbean.java +++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDisableResourceMbean.java @@ -28,6 +28,8 @@ import org.apache.helix.model.HelixConfigScope; import org.apache.helix.model.IdealState.RebalanceMode; import org.apache.helix.model.ResourceConfig; import org.apache.helix.model.builder.HelixConfigScopeBuilder; +import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; +import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier; import org.testng.Assert; import org.testng.annotations.Test; @@ -42,7 +44,8 @@ import java.util.Map; public class TestDisableResourceMbean extends ZkUnitTestBase { private MBeanServerConnection _mbeanServer = ManagementFactory.getPlatformMBeanServer(); - @Test public void testDisableResourceMonitoring() throws Exception { + @Test + public void testDisableResourceMonitoring() throws Exception { final int NUM_PARTICIPANTS = 2; String clusterName = TestHelper.getTestClassName() + "_" + TestHelper.getTestMethodName(); System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis())); @@ -83,7 +86,9 @@ public class TestDisableResourceMbean extends ZkUnitTestBase { new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0"); controller.syncStart(); - Thread.sleep(300); + HelixClusterVerifier clusterVerifier = + new BestPossibleExternalViewVerifier.Builder(clusterName).setZkClient(_gZkClient).build(); + Assert.assertTrue(clusterVerifier.verify()); // Verify the bean was created for TestDB0, but not for TestDB1. Assert.assertTrue(_mbeanServer.isRegistered(getMbeanName("TestDB0", clusterName))); http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java index 6f1b083..51b048d 100644 --- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java +++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java @@ -47,15 +47,13 @@ public class TestTopStateHandoffMetrics extends BaseStageTest { public final static String TEST_RESOURCE = "TestResource"; public final static String PARTITION = "PARTITION"; - public void preSetup() { setupLiveInstances(3); setupStateModel(); Resource resource = new Resource(TEST_RESOURCE); resource.setStateModelDefRef("MasterSlave"); resource.addPartition(PARTITION); - event.addAttribute(AttributeName.RESOURCES.name(), - Collections.singletonMap(TEST_RESOURCE, resource)); + event.addAttribute(AttributeName.RESOURCES.name(), Collections.singletonMap(TEST_RESOURCE, resource)); event.addAttribute(AttributeName.clusterStatusMonitor.name(), new ClusterStatusMonitor("TestCluster")); }
