http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/states/TestContainerAttribute.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/states/TestContainerAttribute.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/states/TestContainerAttribute.java new file mode 100644 index 0000000..63cc9bf --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/states/TestContainerAttribute.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + * + */ + +package org.apache.hadoop.hdds.scm.container.states; + +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.util.Arrays; +import java.util.List; + +/** + * Test ContainerAttribute management. + */ +public class TestContainerAttribute { + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Test + public void testInsert() throws SCMException { + ContainerAttribute<Integer> containerAttribute = new ContainerAttribute<>(); + ContainerID id = new ContainerID(42); + containerAttribute.insert(1, id); + Assert.assertEquals(1, + containerAttribute.getCollection(1).size()); + Assert.assertTrue(containerAttribute.getCollection(1).contains(id)); + + // Insert again and verify that it overwrites an existing value. + ContainerID newId = + new ContainerID(42); + containerAttribute.insert(1, newId); + Assert.assertEquals(1, + containerAttribute.getCollection(1).size()); + Assert.assertTrue(containerAttribute.getCollection(1).contains(newId)); + } + + @Test + public void testHasKey() throws SCMException { + ContainerAttribute<Integer> containerAttribute = new ContainerAttribute<>(); + + for (int x = 1; x < 42; x++) { + containerAttribute.insert(1, new ContainerID(x)); + } + Assert.assertTrue(containerAttribute.hasKey(1)); + for (int x = 1; x < 42; x++) { + Assert.assertTrue(containerAttribute.hasContainerID(1, x)); + } + + Assert.assertFalse(containerAttribute.hasContainerID(1, + new ContainerID(42))); + } + + @Test + public void testClearSet() throws SCMException { + List<String> keyslist = Arrays.asList("Key1", "Key2", "Key3"); + ContainerAttribute<String> containerAttribute = new ContainerAttribute<>(); + for (String k : keyslist) { + for (int x = 1; x < 101; x++) { + containerAttribute.insert(k, new ContainerID(x)); + } + } + for (String k : keyslist) { + Assert.assertEquals(100, + containerAttribute.getCollection(k).size()); + } + containerAttribute.clearSet("Key1"); + Assert.assertEquals(0, + containerAttribute.getCollection("Key1").size()); + } + + @Test + public void testRemove() throws SCMException { + + List<String> keyslist = Arrays.asList("Key1", "Key2", "Key3"); + ContainerAttribute<String> containerAttribute = new ContainerAttribute<>(); + + for (String k : keyslist) { + for (int x = 1; x < 101; x++) { + containerAttribute.insert(k, new ContainerID(x)); + } + } + for (int x = 1; x < 101; x += 2) { + containerAttribute.remove("Key1", new ContainerID(x)); + } + + for (int x = 1; x < 101; x += 2) { + Assert.assertFalse(containerAttribute.hasContainerID("Key1", + new ContainerID(x))); + } + + Assert.assertEquals(100, + containerAttribute.getCollection("Key2").size()); + + Assert.assertEquals(100, + containerAttribute.getCollection("Key3").size()); + + Assert.assertEquals(50, + containerAttribute.getCollection("Key1").size()); + } + + @Test + public void tesUpdate() throws SCMException { + String key1 = "Key1"; + String key2 = "Key2"; + String key3 = "Key3"; + + ContainerAttribute<String> containerAttribute = new ContainerAttribute<>(); + ContainerID id = new ContainerID(42); + + containerAttribute.insert(key1, id); + Assert.assertTrue(containerAttribute.hasContainerID(key1, id)); + Assert.assertFalse(containerAttribute.hasContainerID(key2, id)); + + // This should move the id from key1 bucket to key2 bucket. + containerAttribute.update(key1, key2, id); + Assert.assertFalse(containerAttribute.hasContainerID(key1, id)); + Assert.assertTrue(containerAttribute.hasContainerID(key2, id)); + + // This should fail since we cannot find this id in the key3 bucket. + thrown.expect(SCMException.class); + containerAttribute.update(key3, key1, id); + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java new file mode 100644 index 0000000..ad50d97 --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.node; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.TestUtils; +import org.apache.hadoop.hdds.scm.XceiverClientManager; +import org.apache.hadoop.hdds.scm.container.ContainerMapping; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.container.placement.algorithms + .ContainerPlacementPolicy; +import org.apache.hadoop.hdds.scm.container.placement.algorithms + .SCMContainerPlacementCapacity; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ReportState; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMNodeReport; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMStorageReport; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.PathUtils; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeoutException; + +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_DB_CACHE_SIZE_DEFAULT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_DB_CACHE_SIZE_MB; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState + .HEALTHY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Test for different container placement policy. + */ +public class TestContainerPlacement { + @Rule + public ExpectedException thrown = ExpectedException.none(); + private static XceiverClientManager xceiverClientManager = + new XceiverClientManager(new OzoneConfiguration()); + + private ReportState reportState = ReportState.newBuilder() + .setState(ReportState.states.noContainerReports) + .setCount(0).build(); + + /** + * Returns a new copy of Configuration. + * + * @return Config + */ + OzoneConfiguration getConf() { + return new OzoneConfiguration(); + } + + /** + * Creates a NodeManager. + * + * @param config - Config for the node manager. + * @return SCNNodeManager + * @throws IOException + */ + + SCMNodeManager createNodeManager(OzoneConfiguration config) + throws IOException { + SCMNodeManager nodeManager = new SCMNodeManager(config, + UUID.randomUUID().toString(), null); + assertFalse("Node manager should be in chill mode", + nodeManager.isOutOfChillMode()); + return nodeManager; + } + + ContainerMapping createContainerManager(Configuration config, + NodeManager scmNodeManager) throws IOException { + final int cacheSize = config.getInt(OZONE_SCM_DB_CACHE_SIZE_MB, + OZONE_SCM_DB_CACHE_SIZE_DEFAULT); + return new ContainerMapping(config, scmNodeManager, cacheSize); + + } + + /** + * Test capacity based container placement policy with node reports. + * + * @throws IOException + * @throws InterruptedException + * @throws TimeoutException + */ + @Test + public void testContainerPlacementCapacity() throws IOException, + InterruptedException, TimeoutException { + OzoneConfiguration conf = getConf(); + final int nodeCount = 4; + final long capacity = 10L * OzoneConsts.GB; + final long used = 2L * OzoneConsts.GB; + final long remaining = capacity - used; + + final File testDir = PathUtils.getTestDir( + TestContainerPlacement.class); + conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, + testDir.getAbsolutePath()); + conf.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY, + SCMContainerPlacementCapacity.class, ContainerPlacementPolicy.class); + + SCMNodeManager nodeManager = createNodeManager(conf); + ContainerMapping containerManager = + createContainerManager(conf, nodeManager); + List<DatanodeDetails> datanodes = + TestUtils.getListOfRegisteredDatanodeDetails(nodeManager, nodeCount); + try { + for (DatanodeDetails datanodeDetails : datanodes) { + SCMNodeReport.Builder nrb = SCMNodeReport.newBuilder(); + SCMStorageReport.Builder srb = SCMStorageReport.newBuilder(); + srb.setStorageUuid(UUID.randomUUID().toString()); + srb.setCapacity(capacity).setScmUsed(used). + setRemaining(remaining).build(); + nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(), + nrb.addStorageReport(srb).build(), reportState); + } + + GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(), + 100, 4 * 1000); + assertEquals(nodeCount, nodeManager.getNodeCount(HEALTHY)); + assertEquals(capacity * nodeCount, + (long) nodeManager.getStats().getCapacity().get()); + assertEquals(used * nodeCount, + (long) nodeManager.getStats().getScmUsed().get()); + assertEquals(remaining * nodeCount, + (long) nodeManager.getStats().getRemaining().get()); + + assertTrue(nodeManager.isOutOfChillMode()); + + String container1 = UUID.randomUUID().toString(); + Pipeline pipeline1 = containerManager.allocateContainer( + xceiverClientManager.getType(), + xceiverClientManager.getFactor(), container1, "OZONE") + .getPipeline(); + assertEquals(xceiverClientManager.getFactor().getNumber(), + pipeline1.getMachines().size()); + } finally { + IOUtils.closeQuietly(containerManager); + IOUtils.closeQuietly(nodeManager); + FileUtil.fullyDelete(testDir); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java new file mode 100644 index 0000000..de6e30c --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java @@ -0,0 +1,1179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.node; + +import com.google.common.base.Supplier; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.TestUtils; +import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ReportState; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMNodeReport; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMStorageReport; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.PathUtils; +import org.hamcrest.CoreMatchers; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.io.File; +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_DEADNODE_INTERVAL; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_HEARTBEAT_INTERVAL; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_MAX_HB_COUNT_TO_PROCESS; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_STALENODE_INTERVAL; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DEAD; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState + .HEALTHY; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.STALE; +import static org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMCmdType; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.StringStartsWith.startsWith; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Test the Node Manager class. + */ +public class TestNodeManager { + + private File testDir; + + private ReportState reportState = ReportState.newBuilder() + .setState(ReportState.states.noContainerReports) + .setCount(0).build(); + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @BeforeClass + public static void init() throws IOException { + } + + @Before + public void setup() { + testDir = PathUtils.getTestDir( + TestNodeManager.class); + } + + @After + public void cleanup() { + FileUtil.fullyDelete(testDir); + } + + /** + * Returns a new copy of Configuration. + * + * @return Config + */ + OzoneConfiguration getConf() { + OzoneConfiguration conf = new OzoneConfiguration(); + conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, + testDir.getAbsolutePath()); + conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 100, + TimeUnit.MILLISECONDS); + return conf; + } + + /** + * Creates a NodeManager. + * + * @param config - Config for the node manager. + * @return SCNNodeManager + * @throws IOException + */ + + SCMNodeManager createNodeManager(OzoneConfiguration config) + throws IOException { + SCMNodeManager nodeManager = new SCMNodeManager(config, + UUID.randomUUID().toString(), null); + assertFalse("Node manager should be in chill mode", + nodeManager.isOutOfChillMode()); + return nodeManager; + } + + /** + * Tests that Node manager handles heartbeats correctly, and comes out of + * chill Mode. + * + * @throws IOException + * @throws InterruptedException + * @throws TimeoutException + */ + @Test + public void testScmHeartbeat() throws IOException, + InterruptedException, TimeoutException { + + try (SCMNodeManager nodeManager = createNodeManager(getConf())) { + // Send some heartbeats from different nodes. + for (int x = 0; x < nodeManager.getMinimumChillModeNodes(); x++) { + DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails( + nodeManager); + nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(), + null, reportState); + } + + // Wait for 4 seconds max. + GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(), + 100, 4 * 1000); + + assertTrue("Heartbeat thread should have picked up the" + + "scheduled heartbeats and transitioned out of chill mode.", + nodeManager.isOutOfChillMode()); + } + } + + /** + * asserts that if we send no heartbeats node manager stays in chillmode. + * + * @throws IOException + * @throws InterruptedException + * @throws TimeoutException + */ + @Test + public void testScmNoHeartbeats() throws IOException, + InterruptedException, TimeoutException { + + try (SCMNodeManager nodeManager = createNodeManager(getConf())) { + GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(), + 100, 4 * 1000); + assertFalse("No heartbeats, Node manager should have been in" + + " chill mode.", nodeManager.isOutOfChillMode()); + } + } + + /** + * Asserts that if we don't get enough unique nodes we stay in chillmode. + * + * @throws IOException + * @throws InterruptedException + * @throws TimeoutException + */ + @Test + public void testScmNotEnoughHeartbeats() throws IOException, + InterruptedException, TimeoutException { + try (SCMNodeManager nodeManager = createNodeManager(getConf())) { + + // Need 100 nodes to come out of chill mode, only one node is sending HB. + nodeManager.setMinimumChillModeNodes(100); + nodeManager.sendHeartbeat(TestUtils.getDatanodeDetails(nodeManager) + .getProtoBufMessage(), + null, reportState); + GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(), + 100, 4 * 1000); + assertFalse("Not enough heartbeat, Node manager should have" + + "been in chillmode.", nodeManager.isOutOfChillMode()); + } + } + + /** + * Asserts that many heartbeat from the same node is counted as a single + * node. + * + * @throws IOException + * @throws InterruptedException + * @throws TimeoutException + */ + @Test + public void testScmSameNodeHeartbeats() throws IOException, + InterruptedException, TimeoutException { + + try (SCMNodeManager nodeManager = createNodeManager(getConf())) { + nodeManager.setMinimumChillModeNodes(3); + DatanodeDetails datanodeDetails = TestUtils + .getDatanodeDetails(nodeManager); + + // Send 10 heartbeat from same node, and assert we never leave chill mode. + for (int x = 0; x < 10; x++) { + nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(), + null, reportState); + } + + GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(), + 100, 4 * 1000); + assertFalse("Not enough nodes have send heartbeat to node" + + "manager.", nodeManager.isOutOfChillMode()); + } + } + + /** + * Asserts that adding heartbeats after shutdown does not work. This implies + * that heartbeat thread has been shutdown safely by closing the node + * manager. + * + * @throws IOException + * @throws InterruptedException + * @throws TimeoutException + */ + @Test + public void testScmShutdown() throws IOException, InterruptedException, + TimeoutException { + OzoneConfiguration conf = getConf(); + conf.getTimeDuration(ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, + 100, TimeUnit.MILLISECONDS); + SCMNodeManager nodeManager = createNodeManager(conf); + DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails(nodeManager); + nodeManager.close(); + + // These should never be processed. + nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(), + null, reportState); + + // Let us just wait for 2 seconds to prove that HBs are not processed. + Thread.sleep(2 * 1000); + + assertEquals("Assert new HBs were never processed", 0, + nodeManager.getLastHBProcessedCount()); + } + + /** + * Asserts scm informs datanodes to re-register with the nodemanager + * on a restart. + * + * @throws Exception + */ + @Test + public void testScmHeartbeatAfterRestart() throws Exception { + OzoneConfiguration conf = getConf(); + conf.getTimeDuration(ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, + 100, TimeUnit.MILLISECONDS); + DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails(); + try (SCMNodeManager nodemanager = createNodeManager(conf)) { + nodemanager.register(datanodeDetails.getProtoBufMessage()); + List<SCMCommand> command = nodemanager.sendHeartbeat( + datanodeDetails.getProtoBufMessage(), + null, reportState); + Assert.assertTrue(nodemanager.getAllNodes().contains(datanodeDetails)); + Assert.assertTrue("On regular HB calls, SCM responses a " + + "datanode with an empty command list", command.isEmpty()); + } + + // Sends heartbeat without registering to SCM. + // This happens when SCM restarts. + try (SCMNodeManager nodemanager = createNodeManager(conf)) { + Assert.assertFalse(nodemanager + .getAllNodes().contains(datanodeDetails)); + try { + // SCM handles heartbeat asynchronously. + // It may need more than one heartbeat processing to + // send the notification. + GenericTestUtils.waitFor(new Supplier<Boolean>() { + @Override public Boolean get() { + List<SCMCommand> command = + nodemanager.sendHeartbeat(datanodeDetails.getProtoBufMessage(), + null, reportState); + return command.size() == 1 && command.get(0).getType() + .equals(SCMCmdType.reregisterCommand); + } + }, 100, 3 * 1000); + } catch (TimeoutException e) { + Assert.fail("Times out to verify that scm informs " + + "datanode to re-register itself."); + } + } + } + + /** + * Asserts that we detect as many healthy nodes as we have generated heartbeat + * for. + * + * @throws IOException + * @throws InterruptedException + * @throws TimeoutException + */ + @Test + public void testScmHealthyNodeCount() throws IOException, + InterruptedException, TimeoutException { + OzoneConfiguration conf = getConf(); + final int count = 10; + + try (SCMNodeManager nodeManager = createNodeManager(conf)) { + + for (int x = 0; x < count; x++) { + DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails( + nodeManager); + nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(), + null, reportState); + } + GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(), + 100, 4 * 1000); + assertEquals(count, nodeManager.getNodeCount(HEALTHY)); + } + } + + /** + * Asserts that if user provides a value less than 5 times the heartbeat + * interval as the StaleNode Value, we throw since that is a QoS that we + * cannot maintain. + * + * @throws IOException + * @throws InterruptedException + * @throws TimeoutException + */ + + @Test + public void testScmSanityOfUserConfig1() throws IOException, + InterruptedException, TimeoutException { + OzoneConfiguration conf = getConf(); + final int interval = 100; + conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, interval, + MILLISECONDS); + conf.setTimeDuration(OZONE_SCM_HEARTBEAT_INTERVAL, 1, SECONDS); + + // This should be 5 times more than OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL + // and 3 times more than OZONE_SCM_HEARTBEAT_INTERVAL + conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, interval, MILLISECONDS); + + thrown.expect(IllegalArgumentException.class); + + // This string is a multiple of the interval value + thrown.expectMessage( + startsWith("100 is not within min = 500 or max = 100000")); + createNodeManager(conf); + } + + /** + * Asserts that if Stale Interval value is more than 5 times the value of HB + * processing thread it is a sane value. + * + * @throws IOException + * @throws InterruptedException + * @throws TimeoutException + */ + @Test + public void testScmSanityOfUserConfig2() throws IOException, + InterruptedException, TimeoutException { + OzoneConfiguration conf = getConf(); + final int interval = 100; + conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, interval, + TimeUnit.MILLISECONDS); + conf.setTimeDuration(OZONE_SCM_HEARTBEAT_INTERVAL, 1, TimeUnit.SECONDS); + + // This should be 5 times more than OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL + // and 3 times more than OZONE_SCM_HEARTBEAT_INTERVAL + conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3 * 1000, MILLISECONDS); + createNodeManager(conf).close(); + } + + /** + * Asserts that a single node moves from Healthy to stale node, then from + * stale node to dead node if it misses enough heartbeats. + * + * @throws IOException + * @throws InterruptedException + * @throws TimeoutException + */ + @Test + public void testScmDetectStaleAndDeadNode() throws IOException, + InterruptedException, TimeoutException { + final int interval = 100; + final int nodeCount = 10; + + OzoneConfiguration conf = getConf(); + conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, interval, + MILLISECONDS); + conf.setTimeDuration(OZONE_SCM_HEARTBEAT_INTERVAL, 1, SECONDS); + conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, SECONDS); + conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6, SECONDS); + + + try (SCMNodeManager nodeManager = createNodeManager(conf)) { + List<DatanodeDetails> nodeList = createNodeSet(nodeManager, nodeCount, + "Node"); + + DatanodeDetails staleNode = TestUtils.getDatanodeDetails(nodeManager); + + // Heartbeat once + nodeManager.sendHeartbeat(staleNode.getProtoBufMessage(), + null, reportState); + + // Heartbeat all other nodes. + for (DatanodeDetails dn : nodeList) { + nodeManager.sendHeartbeat(dn.getProtoBufMessage(), null, reportState); + } + + // Wait for 2 seconds .. and heartbeat good nodes again. + Thread.sleep(2 * 1000); + + for (DatanodeDetails dn : nodeList) { + nodeManager.sendHeartbeat(dn.getProtoBufMessage(), null, reportState); + } + + // Wait for 2 seconds, wait a total of 4 seconds to make sure that the + // node moves into stale state. + Thread.sleep(2 * 1000); + List<DatanodeDetails> staleNodeList = nodeManager.getNodes(STALE); + assertEquals("Expected to find 1 stale node", + 1, nodeManager.getNodeCount(STALE)); + assertEquals("Expected to find 1 stale node", + 1, staleNodeList.size()); + assertEquals("Stale node is not the expected ID", staleNode + .getUuid(), staleNodeList.get(0).getUuid()); + Thread.sleep(1000); + + // heartbeat good nodes again. + for (DatanodeDetails dn : nodeList) { + nodeManager.sendHeartbeat(dn.getProtoBufMessage(), null, reportState); + } + + // 6 seconds is the dead window for this test , so we wait a total of + // 7 seconds to make sure that the node moves into dead state. + Thread.sleep(2 * 1000); + + // the stale node has been removed + staleNodeList = nodeManager.getNodes(STALE); + assertEquals("Expected to find 1 stale node", + 0, nodeManager.getNodeCount(STALE)); + assertEquals("Expected to find 1 stale node", + 0, staleNodeList.size()); + + // Check for the dead node now. + List<DatanodeDetails> deadNodeList = nodeManager.getNodes(DEAD); + assertEquals("Expected to find 1 dead node", 1, + nodeManager.getNodeCount(DEAD)); + assertEquals("Expected to find 1 dead node", + 1, deadNodeList.size()); + assertEquals("Dead node is not the expected ID", staleNode + .getUuid(), deadNodeList.get(0).getUuid()); + } + } + + /** + * Asserts that we log an error for null in datanode ID. + * + * @throws IOException + * @throws InterruptedException + * @throws TimeoutException + */ + @Test + public void testScmLogErrorOnNullDatanode() throws IOException, + InterruptedException, TimeoutException { + try (SCMNodeManager nodeManager = createNodeManager(getConf())) { + GenericTestUtils.LogCapturer logCapturer = + GenericTestUtils.LogCapturer.captureLogs(SCMNodeManager.LOG); + nodeManager.sendHeartbeat(null, null, reportState); + logCapturer.stopCapturing(); + assertThat(logCapturer.getOutput(), + containsString("Datanode ID in heartbeat is null")); + } + } + + /** + * Asserts that a dead node, stale node and healthy nodes co-exist. The counts + * , lists and node ID match the expected node state. + * <p/> + * This test is pretty complicated because it explores all states of Node + * manager in a single test. Please read thru the comments to get an idea of + * the current state of the node Manager. + * <p/> + * This test is written like a state machine to avoid threads and concurrency + * issues. This test is replicated below with the use of threads. Avoiding + * threads make it easy to debug the state machine. + * + * @throws IOException + * @throws InterruptedException + * @throws TimeoutException + */ + @Test + public void testScmClusterIsInExpectedState1() throws IOException, + InterruptedException, TimeoutException { + /** + * These values are very important. Here is what it means so you don't + * have to look it up while reading this code. + * + * OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL - This the frequency of the + * HB processing thread that is running in the SCM. This thread must run + * for the SCM to process the Heartbeats. + * + * OZONE_SCM_HEARTBEAT_INTERVAL - This is the frequency at which + * datanodes will send heartbeats to SCM. Please note: This is the only + * config value for node manager that is specified in seconds. We don't + * want SCM heartbeat resolution to be more than in seconds. + * In this test it is not used, but we are forced to set it because we + * have validation code that checks Stale Node interval and Dead Node + * interval is larger than the value of + * OZONE_SCM_HEARTBEAT_INTERVAL. + * + * OZONE_SCM_STALENODE_INTERVAL - This is the time that must elapse + * from the last heartbeat for us to mark a node as stale. In this test + * we set that to 3. That is if a node has not heartbeat SCM for last 3 + * seconds we will mark it as stale. + * + * OZONE_SCM_DEADNODE_INTERVAL - This is the time that must elapse + * from the last heartbeat for a node to be marked dead. We have an + * additional constraint that this must be at least 2 times bigger than + * Stale node Interval. + * + * With these we are trying to explore the state of this cluster with + * various timeouts. Each section is commented so that you can keep + * track of the state of the cluster nodes. + * + */ + + OzoneConfiguration conf = getConf(); + conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 100, + MILLISECONDS); + conf.setTimeDuration(OZONE_SCM_HEARTBEAT_INTERVAL, 1, SECONDS); + conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, SECONDS); + conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6, SECONDS); + + + /** + * Cluster state: Healthy: All nodes are heartbeat-ing like normal. + */ + try (SCMNodeManager nodeManager = createNodeManager(conf)) { + DatanodeDetails healthyNode = + TestUtils.getDatanodeDetails(nodeManager, "HealthyNode"); + DatanodeDetails staleNode = + TestUtils.getDatanodeDetails(nodeManager, "StaleNode"); + DatanodeDetails deadNode = + TestUtils.getDatanodeDetails(nodeManager, "DeadNode"); + nodeManager.sendHeartbeat( + healthyNode.getProtoBufMessage(), null, reportState); + nodeManager.sendHeartbeat( + staleNode.getProtoBufMessage(), null, reportState); + nodeManager.sendHeartbeat( + deadNode.getProtoBufMessage(), null, reportState); + + // Sleep so that heartbeat processing thread gets to run. + Thread.sleep(500); + + //Assert all nodes are healthy. + assertEquals(3, nodeManager.getAllNodes().size()); + assertEquals(3, nodeManager.getNodeCount(HEALTHY)); + + /** + * Cluster state: Quiesced: We are going to sleep for 3 seconds. Which + * means that no node is heartbeating. All nodes should move to Stale. + */ + Thread.sleep(3 * 1000); + assertEquals(3, nodeManager.getAllNodes().size()); + assertEquals(3, nodeManager.getNodeCount(STALE)); + + + /** + * Cluster State : Move healthy node back to healthy state, move other 2 + * nodes to Stale State. + * + * We heartbeat healthy node after 1 second and let other 2 nodes elapse + * the 3 second windows. + */ + + nodeManager.sendHeartbeat( + healthyNode.getProtoBufMessage(), null, reportState); + nodeManager.sendHeartbeat( + staleNode.getProtoBufMessage(), null, reportState); + nodeManager.sendHeartbeat( + deadNode.getProtoBufMessage(), null, reportState); + + Thread.sleep(1500); + nodeManager.sendHeartbeat( + healthyNode.getProtoBufMessage(), null, reportState); + Thread.sleep(2 * 1000); + assertEquals(1, nodeManager.getNodeCount(HEALTHY)); + + + // 3.5 seconds from last heartbeat for the stale and deadNode. So those + // 2 nodes must move to Stale state and the healthy node must + // remain in the healthy State. + List<DatanodeDetails> healthyList = nodeManager.getNodes(HEALTHY); + assertEquals("Expected one healthy node", 1, healthyList.size()); + assertEquals("Healthy node is not the expected ID", healthyNode + .getUuid(), healthyList.get(0).getUuid()); + + assertEquals(2, nodeManager.getNodeCount(STALE)); + + /** + * Cluster State: Allow healthyNode to remain in healthy state and + * staleNode to move to stale state and deadNode to move to dead state. + */ + + nodeManager.sendHeartbeat( + healthyNode.getProtoBufMessage(), null, reportState); + nodeManager.sendHeartbeat( + staleNode.getProtoBufMessage(), null, reportState); + Thread.sleep(1500); + nodeManager.sendHeartbeat( + healthyNode.getProtoBufMessage(), null, reportState); + Thread.sleep(2 * 1000); + + // 3.5 seconds have elapsed for stale node, so it moves into Stale. + // 7 seconds have elapsed for dead node, so it moves into dead. + // 2 Seconds have elapsed for healthy node, so it stays in healhty state. + healthyList = nodeManager.getNodes(HEALTHY); + List<DatanodeDetails> staleList = nodeManager.getNodes(STALE); + List<DatanodeDetails> deadList = nodeManager.getNodes(DEAD); + + assertEquals(3, nodeManager.getAllNodes().size()); + assertEquals(1, nodeManager.getNodeCount(HEALTHY)); + assertEquals(1, nodeManager.getNodeCount(STALE)); + assertEquals(1, nodeManager.getNodeCount(DEAD)); + + assertEquals("Expected one healthy node", + 1, healthyList.size()); + assertEquals("Healthy node is not the expected ID", healthyNode + .getUuid(), healthyList.get(0).getUuid()); + + assertEquals("Expected one stale node", + 1, staleList.size()); + assertEquals("Stale node is not the expected ID", staleNode + .getUuid(), staleList.get(0).getUuid()); + + assertEquals("Expected one dead node", + 1, deadList.size()); + assertEquals("Dead node is not the expected ID", deadNode + .getUuid(), deadList.get(0).getUuid()); + /** + * Cluster State : let us heartbeat all the nodes and verify that we get + * back all the nodes in healthy state. + */ + nodeManager.sendHeartbeat( + healthyNode.getProtoBufMessage(), null, reportState); + nodeManager.sendHeartbeat( + staleNode.getProtoBufMessage(), null, reportState); + nodeManager.sendHeartbeat( + deadNode.getProtoBufMessage(), null, reportState); + Thread.sleep(500); + //Assert all nodes are healthy. + assertEquals(3, nodeManager.getAllNodes().size()); + assertEquals(3, nodeManager.getNodeCount(HEALTHY)); + } + } + + /** + * Heartbeat a given set of nodes at a specified frequency. + * + * @param manager - Node Manager + * @param list - List of datanodeIDs + * @param sleepDuration - Duration to sleep between heartbeats. + * @throws InterruptedException + */ + private void heartbeatNodeSet(SCMNodeManager manager, + List<DatanodeDetails> list, + int sleepDuration) throws InterruptedException { + while (!Thread.currentThread().isInterrupted()) { + for (DatanodeDetails dn : list) { + manager.sendHeartbeat(dn.getProtoBufMessage(), null, reportState); + } + Thread.sleep(sleepDuration); + } + } + + /** + * Create a set of Nodes with a given prefix. + * + * @param count - number of nodes. + * @param prefix - A prefix string that can be used in verification. + * @return List of Nodes. + */ + private List<DatanodeDetails> createNodeSet(SCMNodeManager nodeManager, int + count, String + prefix) { + List<DatanodeDetails> list = new LinkedList<>(); + for (int x = 0; x < count; x++) { + list.add(TestUtils.getDatanodeDetails(nodeManager, prefix + x)); + } + return list; + } + + /** + * Function that tells us if we found the right number of stale nodes. + * + * @param nodeManager - node manager + * @param count - number of stale nodes to look for. + * @return true if we found the expected number. + */ + private boolean findNodes(NodeManager nodeManager, int count, + HddsProtos.NodeState state) { + return count == nodeManager.getNodeCount(state); + } + + /** + * Asserts that we can create a set of nodes that send its heartbeats from + * different threads and NodeManager behaves as expected. + * + * @throws IOException + * @throws InterruptedException + */ + @Test + public void testScmClusterIsInExpectedState2() throws IOException, + InterruptedException, TimeoutException { + final int healthyCount = 5000; + final int staleCount = 100; + final int deadCount = 10; + + OzoneConfiguration conf = getConf(); + conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 100, + MILLISECONDS); + conf.setTimeDuration(OZONE_SCM_HEARTBEAT_INTERVAL, 1, SECONDS); + conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, SECONDS); + conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6, SECONDS); + conf.setInt(OZONE_SCM_MAX_HB_COUNT_TO_PROCESS, 7000); + + + try (SCMNodeManager nodeManager = createNodeManager(conf)) { + List<DatanodeDetails> healthyNodeList = createNodeSet(nodeManager, + healthyCount, "Healthy"); + List<DatanodeDetails> staleNodeList = createNodeSet(nodeManager, + staleCount, "Stale"); + List<DatanodeDetails> deadNodeList = createNodeSet(nodeManager, deadCount, + "Dead"); + + Runnable healthyNodeTask = () -> { + try { + // 2 second heartbeat makes these nodes stay healthy. + heartbeatNodeSet(nodeManager, healthyNodeList, 2 * 1000); + } catch (InterruptedException ignored) { + } + }; + + Runnable staleNodeTask = () -> { + try { + // 4 second heartbeat makes these nodes go to stale and back to + // healthy again. + heartbeatNodeSet(nodeManager, staleNodeList, 4 * 1000); + } catch (InterruptedException ignored) { + } + }; + + + // No Thread just one time HBs the node manager, so that these will be + // marked as dead nodes eventually. + for (DatanodeDetails dn : deadNodeList) { + nodeManager.sendHeartbeat(dn.getProtoBufMessage(), null, reportState); + } + + + Thread thread1 = new Thread(healthyNodeTask); + thread1.setDaemon(true); + thread1.start(); + + + Thread thread2 = new Thread(staleNodeTask); + thread2.setDaemon(true); + thread2.start(); + + Thread.sleep(10 * 1000); + + // Assert all healthy nodes are healthy now, this has to be a greater + // than check since Stale nodes can be healthy when we check the state. + + assertTrue(nodeManager.getNodeCount(HEALTHY) >= healthyCount); + + assertEquals(deadCount, nodeManager.getNodeCount(DEAD)); + + List<DatanodeDetails> deadList = nodeManager.getNodes(DEAD); + + for (DatanodeDetails node : deadList) { + assertThat(node.getHostName(), CoreMatchers.startsWith("Dead")); + } + + // Checking stale nodes is tricky since they have to move between + // healthy and stale to avoid becoming dead nodes. So we search for + // that state for a while, if we don't find that state waitfor will + // throw. + GenericTestUtils.waitFor(() -> findNodes(nodeManager, staleCount, STALE), + 500, 4 * 1000); + + thread1.interrupt(); + thread2.interrupt(); + } + } + + /** + * Asserts that we can handle 6000+ nodes heartbeating SCM. + * + * @throws IOException + * @throws InterruptedException + * @throws TimeoutException + */ + @Test + public void testScmCanHandleScale() throws IOException, + InterruptedException, TimeoutException { + final int healthyCount = 3000; + final int staleCount = 3000; + OzoneConfiguration conf = getConf(); + conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 100, + MILLISECONDS); + conf.setTimeDuration(OZONE_SCM_HEARTBEAT_INTERVAL, 1, + SECONDS); + conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3 * 1000, + MILLISECONDS); + conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6 * 1000, + MILLISECONDS); + + try (SCMNodeManager nodeManager = createNodeManager(conf)) { + List<DatanodeDetails> healthyList = createNodeSet(nodeManager, + healthyCount, "h"); + List<DatanodeDetails> staleList = createNodeSet(nodeManager, + staleCount, "s"); + + Runnable healthyNodeTask = () -> { + try { + heartbeatNodeSet(nodeManager, healthyList, 2 * 1000); + } catch (InterruptedException ignored) { + + } + }; + + Runnable staleNodeTask = () -> { + try { + heartbeatNodeSet(nodeManager, staleList, 4 * 1000); + } catch (InterruptedException ignored) { + } + }; + + Thread thread1 = new Thread(healthyNodeTask); + thread1.setDaemon(true); + thread1.start(); + + Thread thread2 = new Thread(staleNodeTask); + thread2.setDaemon(true); + thread2.start(); + Thread.sleep(3 * 1000); + + GenericTestUtils.waitFor(() -> findNodes(nodeManager, staleCount, STALE), + 500, 20 * 1000); + assertEquals("Node count mismatch", + healthyCount + staleCount, nodeManager.getAllNodes().size()); + + thread1.interrupt(); + thread2.interrupt(); + } + } + + /** + * Asserts that SCM backs off from HB processing instead of going into an + * infinite loop if SCM is flooded with too many heartbeats. This many not be + * the best thing to do, but SCM tries to protect itself and logs an error + * saying that it is getting flooded with heartbeats. In real world this can + * lead to many nodes becoming stale or dead due to the fact that SCM is not + * able to keep up with heartbeat processing. This test just verifies that SCM + * will log that information. + * @throws TimeoutException + */ + @Test + public void testScmLogsHeartbeatFlooding() throws IOException, + InterruptedException, TimeoutException { + final int healthyCount = 3000; + + // Make the HB process thread run slower. + OzoneConfiguration conf = getConf(); + conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 500, + TimeUnit.MILLISECONDS); + conf.setTimeDuration(OZONE_SCM_HEARTBEAT_INTERVAL, 1, SECONDS); + conf.setInt(OZONE_SCM_MAX_HB_COUNT_TO_PROCESS, 500); + + try (SCMNodeManager nodeManager = createNodeManager(conf)) { + List<DatanodeDetails> healthyList = createNodeSet(nodeManager, + healthyCount, "h"); + GenericTestUtils.LogCapturer logCapturer = + GenericTestUtils.LogCapturer.captureLogs(SCMNodeManager.LOG); + Runnable healthyNodeTask = () -> { + try { + // No wait in the HB sending loop. + heartbeatNodeSet(nodeManager, healthyList, 0); + } catch (InterruptedException ignored) { + } + }; + Thread thread1 = new Thread(healthyNodeTask); + thread1.setDaemon(true); + thread1.start(); + + GenericTestUtils.waitFor(() -> logCapturer.getOutput() + .contains("SCM is being " + + "flooded by heartbeats. Not able to keep up" + + " with the heartbeat counts."), + 500, 20 * 1000); + + thread1.interrupt(); + logCapturer.stopCapturing(); + } + } + + @Test + public void testScmEnterAndExitChillMode() throws IOException, + InterruptedException { + OzoneConfiguration conf = getConf(); + conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 100, + MILLISECONDS); + + try (SCMNodeManager nodeManager = createNodeManager(conf)) { + nodeManager.setMinimumChillModeNodes(10); + DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails( + nodeManager); + nodeManager.sendHeartbeat( + datanodeDetails.getProtoBufMessage(), null, reportState); + String status = nodeManager.getChillModeStatus(); + Assert.assertThat(status, containsString("Still in chill " + + "mode, waiting on nodes to report in.")); + + // Should not exit chill mode since 10 nodes have not heartbeat yet. + assertFalse(nodeManager.isOutOfChillMode()); + + // Force exit chill mode. + nodeManager.forceExitChillMode(); + assertTrue(nodeManager.isOutOfChillMode()); + status = nodeManager.getChillModeStatus(); + Assert.assertThat(status, + containsString("Out of chill mode.")); + + + // Enter back to into chill mode. + nodeManager.enterChillMode(); + assertFalse(nodeManager.isOutOfChillMode()); + status = nodeManager.getChillModeStatus(); + Assert.assertThat(status, + containsString("Out of startup chill mode," + + " but in manual chill mode.")); + + // Assert that node manager force enter cannot be overridden by nodes HBs. + for (int x = 0; x < 20; x++) { + DatanodeDetails datanode = TestUtils.getDatanodeDetails(nodeManager); + nodeManager.sendHeartbeat(datanode.getProtoBufMessage(), + null, reportState); + } + + Thread.sleep(500); + assertFalse(nodeManager.isOutOfChillMode()); + + // Make sure that once we exit out of manual chill mode, we fall back + // to the number of nodes to get out chill mode. + nodeManager.exitChillMode(); + assertTrue(nodeManager.isOutOfChillMode()); + status = nodeManager.getChillModeStatus(); + Assert.assertThat(status, + containsString("Out of chill mode.")); + } + } + + /** + * Test multiple nodes sending initial heartbeat with their node report. + * + * @throws IOException + * @throws InterruptedException + * @throws TimeoutException + */ + @Test + public void testScmStatsFromNodeReport() throws IOException, + InterruptedException, TimeoutException { + OzoneConfiguration conf = getConf(); + conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 1000, + MILLISECONDS); + final int nodeCount = 10; + final long capacity = 2000; + final long used = 100; + final long remaining = capacity - used; + + try (SCMNodeManager nodeManager = createNodeManager(conf)) { + for (int x = 0; x < nodeCount; x++) { + DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails( + nodeManager); + + SCMNodeReport.Builder nrb = SCMNodeReport.newBuilder(); + SCMStorageReport.Builder srb = SCMStorageReport.newBuilder(); + srb.setStorageUuid(UUID.randomUUID().toString()); + srb.setCapacity(capacity).setScmUsed(used). + setRemaining(capacity - used).build(); + nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(), + nrb.addStorageReport(srb).build(), reportState); + } + GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(), + 100, 4 * 1000); + assertEquals(nodeCount, nodeManager.getNodeCount(HEALTHY)); + assertEquals(capacity * nodeCount, (long) nodeManager.getStats() + .getCapacity().get()); + assertEquals(used * nodeCount, (long) nodeManager.getStats() + .getScmUsed().get()); + assertEquals(remaining * nodeCount, (long) nodeManager.getStats() + .getRemaining().get()); + } + } + + /** + * Test single node stat update based on nodereport from different heartbeat + * status (healthy, stale and dead). + * @throws IOException + * @throws InterruptedException + * @throws TimeoutException + */ + @Test + public void testScmNodeReportUpdate() throws IOException, + InterruptedException, TimeoutException { + OzoneConfiguration conf = getConf(); + final int heartbeatCount = 5; + final int nodeCount = 1; + final int interval = 100; + + conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, interval, + MILLISECONDS); + conf.setTimeDuration(OZONE_SCM_HEARTBEAT_INTERVAL, 1, SECONDS); + conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, SECONDS); + conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6, SECONDS); + + try (SCMNodeManager nodeManager = createNodeManager(conf)) { + DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails( + nodeManager); + final long capacity = 2000; + final long usedPerHeartbeat = 100; + + for (int x = 0; x < heartbeatCount; x++) { + SCMNodeReport.Builder nrb = SCMNodeReport.newBuilder(); + SCMStorageReport.Builder srb = SCMStorageReport.newBuilder(); + srb.setStorageUuid(UUID.randomUUID().toString()); + srb.setCapacity(capacity).setScmUsed(x * usedPerHeartbeat) + .setRemaining(capacity - x * usedPerHeartbeat).build(); + nrb.addStorageReport(srb); + + nodeManager.sendHeartbeat( + datanodeDetails.getProtoBufMessage(), nrb.build(), reportState); + Thread.sleep(100); + } + + final long expectedScmUsed = usedPerHeartbeat * (heartbeatCount - 1); + final long expectedRemaining = capacity - expectedScmUsed; + + GenericTestUtils.waitFor( + () -> nodeManager.getStats().getScmUsed().get() == expectedScmUsed, + 100, 4 * 1000); + + long foundCapacity = nodeManager.getStats().getCapacity().get(); + assertEquals(capacity, foundCapacity); + + long foundScmUsed = nodeManager.getStats().getScmUsed().get(); + assertEquals(expectedScmUsed, foundScmUsed); + + long foundRemaining = nodeManager.getStats().getRemaining().get(); + assertEquals(expectedRemaining, foundRemaining); + + // Test NodeManager#getNodeStats + assertEquals(nodeCount, nodeManager.getNodeStats().size()); + long nodeCapacity = nodeManager.getNodeStat(datanodeDetails).get() + .getCapacity().get(); + assertEquals(capacity, nodeCapacity); + + foundScmUsed = nodeManager.getNodeStat(datanodeDetails).get().getScmUsed() + .get(); + assertEquals(expectedScmUsed, foundScmUsed); + + foundRemaining = nodeManager.getNodeStat(datanodeDetails).get() + .getRemaining().get(); + assertEquals(expectedRemaining, foundRemaining); + + // Compare the result from + // NodeManager#getNodeStats and NodeManager#getNodeStat + SCMNodeStat stat1 = nodeManager.getNodeStats(). + get(datanodeDetails); + SCMNodeStat stat2 = nodeManager.getNodeStat(datanodeDetails).get(); + assertEquals(stat1, stat2); + + // Wait up to 4s so that the node becomes stale + // Verify the usage info should be unchanged. + GenericTestUtils.waitFor( + () -> nodeManager.getNodeCount(STALE) == 1, 100, + 4 * 1000); + assertEquals(nodeCount, nodeManager.getNodeStats().size()); + + foundCapacity = nodeManager.getNodeStat(datanodeDetails).get() + .getCapacity().get(); + assertEquals(capacity, foundCapacity); + foundScmUsed = nodeManager.getNodeStat(datanodeDetails).get() + .getScmUsed().get(); + assertEquals(expectedScmUsed, foundScmUsed); + + foundRemaining = nodeManager.getNodeStat(datanodeDetails).get(). + getRemaining().get(); + assertEquals(expectedRemaining, foundRemaining); + + // Wait up to 4 more seconds so the node becomes dead + // Verify usage info should be updated. + GenericTestUtils.waitFor( + () -> nodeManager.getNodeCount(DEAD) == 1, 100, + 4 * 1000); + + assertEquals(0, nodeManager.getNodeStats().size()); + foundCapacity = nodeManager.getStats().getCapacity().get(); + assertEquals(0, foundCapacity); + + foundScmUsed = nodeManager.getStats().getScmUsed().get(); + assertEquals(0, foundScmUsed); + + foundRemaining = nodeManager.getStats().getRemaining().get(); + assertEquals(0, foundRemaining); + + // Send a new report to bring the dead node back to healthy + SCMNodeReport.Builder nrb = SCMNodeReport.newBuilder(); + SCMStorageReport.Builder srb = SCMStorageReport.newBuilder(); + srb.setStorageUuid(UUID.randomUUID().toString()); + srb.setCapacity(capacity).setScmUsed(expectedScmUsed) + .setRemaining(expectedRemaining).build(); + nrb.addStorageReport(srb); + nodeManager.sendHeartbeat( + datanodeDetails.getProtoBufMessage(), nrb.build(), reportState); + + // Wait up to 5 seconds so that the dead node becomes healthy + // Verify usage info should be updated. + GenericTestUtils.waitFor( + () -> nodeManager.getNodeCount(HEALTHY) == 1, + 100, 5 * 1000); + GenericTestUtils.waitFor( + () -> nodeManager.getStats().getScmUsed().get() == expectedScmUsed, + 100, 4 * 1000); + assertEquals(nodeCount, nodeManager.getNodeStats().size()); + foundCapacity = nodeManager.getNodeStat(datanodeDetails).get() + .getCapacity().get(); + assertEquals(capacity, foundCapacity); + foundScmUsed = nodeManager.getNodeStat(datanodeDetails).get().getScmUsed() + .get(); + assertEquals(expectedScmUsed, foundScmUsed); + foundRemaining = nodeManager.getNodeStat(datanodeDetails).get() + .getRemaining().get(); + assertEquals(expectedRemaining, foundRemaining); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodePoolManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodePoolManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodePoolManager.java new file mode 100644 index 0000000..8f412de --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodePoolManager.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.node; + +import org.apache.commons.collections.ListUtils; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.TestUtils; +import org.apache.hadoop.hdds.scm.container.placement.algorithms + .ContainerPlacementPolicy; +import org.apache.hadoop.hdds.scm.container.placement.algorithms + .SCMContainerPlacementCapacity; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.test.PathUtils; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Test for SCM node pool manager. + */ +public class TestSCMNodePoolManager { + private static final Logger LOG = + LoggerFactory.getLogger(TestSCMNodePoolManager.class); + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + private final File testDir = PathUtils.getTestDir( + TestSCMNodePoolManager.class); + + SCMNodePoolManager createNodePoolManager(OzoneConfiguration conf) + throws IOException { + conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, + testDir.getAbsolutePath()); + conf.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY, + SCMContainerPlacementCapacity.class, ContainerPlacementPolicy.class); + return new SCMNodePoolManager(conf); + } + + /** + * Test default node pool. + * + * @throws IOException + */ + @Test + public void testDefaultNodePool() throws IOException { + OzoneConfiguration conf = new OzoneConfiguration(); + try { + final String defaultPool = "DefaultPool"; + NodePoolManager npMgr = createNodePoolManager(conf); + + final int nodeCount = 4; + final List<DatanodeDetails> nodes = TestUtils + .getListOfDatanodeDetails(nodeCount); + assertEquals(0, npMgr.getNodePools().size()); + for (DatanodeDetails node: nodes) { + npMgr.addNode(defaultPool, node); + } + List<DatanodeDetails> nodesRetrieved = npMgr.getNodes(defaultPool); + assertEquals(nodeCount, nodesRetrieved.size()); + assertTwoDatanodeListsEqual(nodes, nodesRetrieved); + + DatanodeDetails nodeRemoved = nodes.remove(2); + npMgr.removeNode(defaultPool, nodeRemoved); + List<DatanodeDetails> nodesAfterRemove = npMgr.getNodes(defaultPool); + assertTwoDatanodeListsEqual(nodes, nodesAfterRemove); + + List<DatanodeDetails> nonExistSet = npMgr.getNodes("NonExistSet"); + assertEquals(0, nonExistSet.size()); + } finally { + FileUtil.fullyDelete(testDir); + } + } + + + /** + * Test default node pool reload. + * + * @throws IOException + */ + @Test + public void testDefaultNodePoolReload() throws IOException { + OzoneConfiguration conf = new OzoneConfiguration(); + final String defaultPool = "DefaultPool"; + final int nodeCount = 4; + final List<DatanodeDetails> nodes = TestUtils + .getListOfDatanodeDetails(nodeCount); + + try { + try { + SCMNodePoolManager npMgr = createNodePoolManager(conf); + assertEquals(0, npMgr.getNodePools().size()); + for (DatanodeDetails node : nodes) { + npMgr.addNode(defaultPool, node); + } + List<DatanodeDetails> nodesRetrieved = npMgr.getNodes(defaultPool); + assertEquals(nodeCount, nodesRetrieved.size()); + assertTwoDatanodeListsEqual(nodes, nodesRetrieved); + npMgr.close(); + } finally { + LOG.info("testDefaultNodePoolReload: Finish adding nodes to pool" + + " and close."); + } + + // try reload with a new NodePoolManager instance + try { + SCMNodePoolManager npMgr = createNodePoolManager(conf); + List<DatanodeDetails> nodesRetrieved = npMgr.getNodes(defaultPool); + assertEquals(nodeCount, nodesRetrieved.size()); + assertTwoDatanodeListsEqual(nodes, nodesRetrieved); + } finally { + LOG.info("testDefaultNodePoolReload: Finish reloading node pool."); + } + } finally { + FileUtil.fullyDelete(testDir); + } + } + + /** + * Compare and verify that two datanode lists are equal. + * @param list1 - datanode list 1. + * @param list2 - datanode list 2. + */ + private void assertTwoDatanodeListsEqual(List<DatanodeDetails> list1, + List<DatanodeDetails> list2) { + assertEquals(list1.size(), list2.size()); + Collections.sort(list1); + Collections.sort(list2); + assertTrue(ListUtils.isEqualList(list1, list2)); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/package-info.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/package-info.java new file mode 100644 index 0000000..da05c59 --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm; +/** + * SCM tests + */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java new file mode 100644 index 0000000..433beb8 --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java @@ -0,0 +1,460 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common; + +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdds.scm.TestUtils; +import org.apache.hadoop.hdds.scm.VersionInfo; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsResponseProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMNodeReport; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMStorageReport; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.container.common.helpers.ContainerReport; +import org.apache.hadoop.ozone.container.common.statemachine + .DatanodeStateMachine; +import org.apache.hadoop.ozone.container.common.statemachine + .EndpointStateMachine; +import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.container.common.states.endpoint + .HeartbeatEndpointTask; +import org.apache.hadoop.ozone.container.common.states.endpoint + .RegisterEndpointTask; +import org.apache.hadoop.ozone.container.common.states.endpoint + .VersionEndpointTask; +import org.apache.hadoop.test.PathUtils; +import org.apache.hadoop.util.Time; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.net.InetSocketAddress; +import java.util.UUID; + +import static org.apache.hadoop.hdds.scm.TestUtils.getDatanodeDetails; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; +import static org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ReportState.states + .noContainerReports; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_DIRS; +import static org.apache.hadoop.ozone.container.common.ContainerTestUtils + .createEndpoint; +import static org.hamcrest.Matchers.lessThanOrEqualTo; + +/** + * Tests the endpoints. + */ +public class TestEndPoint { + private static InetSocketAddress serverAddress; + private static RPC.Server scmServer; + private static ScmTestMock scmServerImpl; + private static File testDir; + private static StorageContainerDatanodeProtocolProtos.ReportState + defaultReportState; + + @AfterClass + public static void tearDown() throws Exception { + if (scmServer != null) { + scmServer.stop(); + } + FileUtil.fullyDelete(testDir); + } + + @BeforeClass + public static void setUp() throws Exception { + serverAddress = SCMTestUtils.getReuseableAddress(); + scmServerImpl = new ScmTestMock(); + scmServer = SCMTestUtils.startScmRpcServer(SCMTestUtils.getConf(), + scmServerImpl, serverAddress, 10); + testDir = PathUtils.getTestDir(TestEndPoint.class); + defaultReportState = StorageContainerDatanodeProtocolProtos. + ReportState.newBuilder().setState(noContainerReports). + setCount(0).build(); + } + + @Test + /** + * This test asserts that we are able to make a version call to SCM server + * and gets back the expected values. + */ + public void testGetVersion() throws Exception { + try (EndpointStateMachine rpcEndPoint = + createEndpoint(SCMTestUtils.getConf(), + serverAddress, 1000)) { + SCMVersionResponseProto responseProto = rpcEndPoint.getEndPoint() + .getVersion(null); + Assert.assertNotNull(responseProto); + Assert.assertEquals(VersionInfo.DESCRIPTION_KEY, + responseProto.getKeys(0).getKey()); + Assert.assertEquals(VersionInfo.getLatestVersion().getDescription(), + responseProto.getKeys(0).getValue()); + } + } + + @Test + /** + * We make getVersion RPC call, but via the VersionEndpointTask which is + * how the state machine would make the call. + */ + public void testGetVersionTask() throws Exception { + Configuration conf = SCMTestUtils.getConf(); + try (EndpointStateMachine rpcEndPoint = createEndpoint(conf, + serverAddress, 1000)) { + rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION); + VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint, + conf); + EndpointStateMachine.EndPointStates newState = versionTask.call(); + + // if version call worked the endpoint should automatically move to the + // next state. + Assert.assertEquals(EndpointStateMachine.EndPointStates.REGISTER, + newState); + + // Now rpcEndpoint should remember the version it got from SCM + Assert.assertNotNull(rpcEndPoint.getVersion()); + } + } + + @Test + /** + * This test makes a call to end point where there is no SCM server. We + * expect that versionTask should be able to handle it. + */ + public void testGetVersionToInvalidEndpoint() throws Exception { + Configuration conf = SCMTestUtils.getConf(); + InetSocketAddress nonExistentServerAddress = SCMTestUtils + .getReuseableAddress(); + try (EndpointStateMachine rpcEndPoint = createEndpoint(conf, + nonExistentServerAddress, 1000)) { + rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION); + VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint, + conf); + EndpointStateMachine.EndPointStates newState = versionTask.call(); + + // This version call did NOT work, so endpoint should remain in the same + // state. + Assert.assertEquals(EndpointStateMachine.EndPointStates.GETVERSION, + newState); + } + } + + @Test + /** + * This test makes a getVersionRPC call, but the DummyStorageServer is + * going to respond little slowly. We will assert that we are still in the + * GETVERSION state after the timeout. + */ + public void testGetVersionAssertRpcTimeOut() throws Exception { + final long rpcTimeout = 1000; + final long tolerance = 100; + Configuration conf = SCMTestUtils.getConf(); + + try (EndpointStateMachine rpcEndPoint = createEndpoint(conf, + serverAddress, (int) rpcTimeout)) { + rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION); + VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint, + conf); + + scmServerImpl.setRpcResponseDelay(1500); + long start = Time.monotonicNow(); + EndpointStateMachine.EndPointStates newState = versionTask.call(); + long end = Time.monotonicNow(); + scmServerImpl.setRpcResponseDelay(0); + Assert.assertThat(end - start, lessThanOrEqualTo(rpcTimeout + tolerance)); + Assert.assertEquals(EndpointStateMachine.EndPointStates.GETVERSION, + newState); + } + } + + @Test + public void testRegister() throws Exception { + String[] scmAddressArray = new String[1]; + scmAddressArray[0] = serverAddress.toString(); + DatanodeDetails nodeToRegister = getDatanodeDetails(); + try (EndpointStateMachine rpcEndPoint = + createEndpoint( + SCMTestUtils.getConf(), serverAddress, 1000)) { + SCMRegisteredCmdResponseProto responseProto = rpcEndPoint.getEndPoint() + .register(nodeToRegister.getProtoBufMessage(), scmAddressArray); + Assert.assertNotNull(responseProto); + Assert.assertEquals(nodeToRegister.getUuidString(), + responseProto.getDatanodeUUID()); + Assert.assertNotNull(responseProto.getClusterID()); + } + } + + private EndpointStateMachine registerTaskHelper(InetSocketAddress scmAddress, + int rpcTimeout, boolean clearDatanodeDetails) throws Exception { + Configuration conf = SCMTestUtils.getConf(); + EndpointStateMachine rpcEndPoint = + createEndpoint(conf, + scmAddress, rpcTimeout); + rpcEndPoint.setState(EndpointStateMachine.EndPointStates.REGISTER); + RegisterEndpointTask endpointTask = + new RegisterEndpointTask(rpcEndPoint, conf); + if (!clearDatanodeDetails) { + DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails(); + HddsProtos.DatanodeDetailsProto datanodeDetailsProto = + datanodeDetails.getProtoBufMessage(); + endpointTask.setDatanodeDetailsProto(datanodeDetailsProto); + } + endpointTask.call(); + return rpcEndPoint; + } + + @Test + public void testRegisterTask() throws Exception { + try (EndpointStateMachine rpcEndpoint = + registerTaskHelper(serverAddress, 1000, false)) { + // Successful register should move us to Heartbeat state. + Assert.assertEquals(EndpointStateMachine.EndPointStates.HEARTBEAT, + rpcEndpoint.getState()); + } + } + + @Test + public void testRegisterToInvalidEndpoint() throws Exception { + InetSocketAddress address = SCMTestUtils.getReuseableAddress(); + try (EndpointStateMachine rpcEndpoint = + registerTaskHelper(address, 1000, false)) { + Assert.assertEquals(EndpointStateMachine.EndPointStates.REGISTER, + rpcEndpoint.getState()); + } + } + + @Test + public void testRegisterNoContainerID() throws Exception { + InetSocketAddress address = SCMTestUtils.getReuseableAddress(); + try (EndpointStateMachine rpcEndpoint = + registerTaskHelper(address, 1000, true)) { + // No Container ID, therefore we tell the datanode that we would like to + // shutdown. + Assert.assertEquals(EndpointStateMachine.EndPointStates.SHUTDOWN, + rpcEndpoint.getState()); + } + } + + @Test + public void testRegisterRpcTimeout() throws Exception { + final long rpcTimeout = 1000; + final long tolerance = 200; + scmServerImpl.setRpcResponseDelay(1500); + long start = Time.monotonicNow(); + registerTaskHelper(serverAddress, 1000, false).close(); + long end = Time.monotonicNow(); + scmServerImpl.setRpcResponseDelay(0); + Assert.assertThat(end - start, lessThanOrEqualTo(rpcTimeout + tolerance)); + } + + @Test + public void testHeartbeat() throws Exception { + DatanodeDetails dataNode = getDatanodeDetails(); + try (EndpointStateMachine rpcEndPoint = + createEndpoint(SCMTestUtils.getConf(), + serverAddress, 1000)) { + SCMNodeReport.Builder nrb = SCMNodeReport.newBuilder(); + SCMStorageReport.Builder srb = SCMStorageReport.newBuilder(); + srb.setStorageUuid(UUID.randomUUID().toString()); + srb.setCapacity(2000).setScmUsed(500).setRemaining(1500).build(); + nrb.addStorageReport(srb); + SCMHeartbeatResponseProto responseProto = rpcEndPoint.getEndPoint() + .sendHeartbeat( + dataNode.getProtoBufMessage(), nrb.build(), defaultReportState); + Assert.assertNotNull(responseProto); + Assert.assertEquals(0, responseProto.getCommandsCount()); + } + } + + private void heartbeatTaskHelper(InetSocketAddress scmAddress, + int rpcTimeout) throws Exception { + Configuration conf = SCMTestUtils.getConf(); + conf.set(DFS_DATANODE_DATA_DIR_KEY, testDir.getAbsolutePath()); + conf.set(OZONE_METADATA_DIRS, testDir.getAbsolutePath()); + // Mini Ozone cluster will not come up if the port is not true, since + // Ratis will exit if the server port cannot be bound. We can remove this + // hard coding once we fix the Ratis default behaviour. + conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT, true); + + + // Create a datanode state machine for stateConext used by endpoint task + try (DatanodeStateMachine stateMachine = new DatanodeStateMachine( + TestUtils.getDatanodeDetails(), conf); + EndpointStateMachine rpcEndPoint = + createEndpoint(conf, scmAddress, rpcTimeout)) { + HddsProtos.DatanodeDetailsProto datanodeDetailsProto = + getDatanodeDetails().getProtoBufMessage(); + rpcEndPoint.setState(EndpointStateMachine.EndPointStates.HEARTBEAT); + + final StateContext stateContext = + new StateContext(conf, DatanodeStateMachine.DatanodeStates.RUNNING, + stateMachine); + + HeartbeatEndpointTask endpointTask = + new HeartbeatEndpointTask(rpcEndPoint, conf, stateContext); + endpointTask.setDatanodeDetailsProto(datanodeDetailsProto); + endpointTask.call(); + Assert.assertNotNull(endpointTask.getDatanodeDetailsProto()); + + Assert.assertEquals(EndpointStateMachine.EndPointStates.HEARTBEAT, + rpcEndPoint.getState()); + } + } + + @Test + public void testHeartbeatTask() throws Exception { + heartbeatTaskHelper(serverAddress, 1000); + } + + @Test + public void testHeartbeatTaskToInvalidNode() throws Exception { + InetSocketAddress invalidAddress = SCMTestUtils.getReuseableAddress(); + heartbeatTaskHelper(invalidAddress, 1000); + } + + @Test + public void testHeartbeatTaskRpcTimeOut() throws Exception { + final long rpcTimeout = 1000; + final long tolerance = 200; + scmServerImpl.setRpcResponseDelay(1500); + long start = Time.monotonicNow(); + InetSocketAddress invalidAddress = SCMTestUtils.getReuseableAddress(); + heartbeatTaskHelper(invalidAddress, 1000); + long end = Time.monotonicNow(); + scmServerImpl.setRpcResponseDelay(0); + Assert.assertThat(end - start, + lessThanOrEqualTo(rpcTimeout + tolerance)); + } + + /** + * Returns a new container report. + * @return + */ + ContainerReport getRandomContainerReport() { + return new ContainerReport(UUID.randomUUID().toString(), + DigestUtils.sha256Hex("Random")); + } + + /** + * Creates dummy container reports. + * @param count - The number of closed containers to create. + * @return ContainerReportsProto + */ + StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto + createDummyContainerReports(int count) { + StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto.Builder + reportsBuilder = StorageContainerDatanodeProtocolProtos + .ContainerReportsRequestProto.newBuilder(); + for (int x = 0; x < count; x++) { + reportsBuilder.addReports(getRandomContainerReport() + .getProtoBufMessage()); + } + reportsBuilder.setDatanodeDetails(getDatanodeDetails() + .getProtoBufMessage()); + reportsBuilder.setType(StorageContainerDatanodeProtocolProtos + .ContainerReportsRequestProto.reportType.fullReport); + return reportsBuilder.build(); + } + + /** + * Tests that rpcEndpoint sendContainerReport works as expected. + * @throws Exception + */ + @Test + public void testContainerReportSend() throws Exception { + final int count = 1000; + scmServerImpl.reset(); + try (EndpointStateMachine rpcEndPoint = + createEndpoint(SCMTestUtils.getConf(), + serverAddress, 1000)) { + ContainerReportsResponseProto responseProto = rpcEndPoint + .getEndPoint().sendContainerReport(createDummyContainerReports( + count)); + Assert.assertNotNull(responseProto); + } + Assert.assertEquals(1, scmServerImpl.getContainerReportsCount()); + Assert.assertEquals(count, scmServerImpl.getContainerCount()); + } + + + /** + * Tests that rpcEndpoint sendContainerReport works as expected. + * @throws Exception + */ + @Test + public void testContainerReport() throws Exception { + final int count = 1000; + scmServerImpl.reset(); + try (EndpointStateMachine rpcEndPoint = + createEndpoint(SCMTestUtils.getConf(), + serverAddress, 1000)) { + ContainerReportsResponseProto responseProto = rpcEndPoint + .getEndPoint().sendContainerReport(createContainerReport(count)); + Assert.assertNotNull(responseProto); + } + Assert.assertEquals(1, scmServerImpl.getContainerReportsCount()); + Assert.assertEquals(count, scmServerImpl.getContainerCount()); + final long expectedKeyCount = count * 1000; + Assert.assertEquals(expectedKeyCount, scmServerImpl.getKeyCount()); + final long expectedBytesUsed = count * OzoneConsts.GB * 2; + Assert.assertEquals(expectedBytesUsed, scmServerImpl.getBytesUsed()); + } + + private ContainerReportsRequestProto createContainerReport(int count) { + StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto.Builder + reportsBuilder = StorageContainerDatanodeProtocolProtos + .ContainerReportsRequestProto.newBuilder(); + for (int x = 0; x < count; x++) { + ContainerReport report = new ContainerReport(UUID.randomUUID().toString(), + DigestUtils.sha256Hex("Simulated")); + report.setKeyCount(1000); + report.setSize(OzoneConsts.GB * 5); + report.setBytesUsed(OzoneConsts.GB * 2); + report.setReadCount(100); + report.setReadBytes(OzoneConsts.GB * 1); + report.setWriteCount(50); + report.setWriteBytes(OzoneConsts.GB * 2); + report.setContainerID(1); + + reportsBuilder.addReports(report.getProtoBufMessage()); + } + reportsBuilder.setDatanodeDetails(getDatanodeDetails() + .getProtoBufMessage()); + reportsBuilder.setType(StorageContainerDatanodeProtocolProtos + .ContainerReportsRequestProto.reportType.fullReport); + return reportsBuilder.build(); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org