http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/webapps/scm/scm-overview.html ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/webapps/scm/scm-overview.html b/hadoop-hdds/server-scm/src/main/webapps/scm/scm-overview.html new file mode 100644 index 0000000..fca23ba --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/webapps/scm/scm-overview.html @@ -0,0 +1,60 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<h2>Node counts</h2> + +<table class="table table-bordered table-striped" class="col-md-6"> + <tbody> + <tr ng-repeat="typestat in $ctrl.nodemanagermetrics.NodeCount | orderBy:'key':false:$ctrl.nodeOrder"> + <td>{{typestat.key}}</td> + <td>{{typestat.value}}</td> + </tr> + </tbody> +</table> + +<h2>Status</h2> +<table class="table table-bordered table-striped" class="col-md-6"> + <tbody> + <tr> + <td>Client Rpc port</td> + <td>{{$ctrl.overview.jmx.ClientRpcPort}}</td> + </tr> + <tr> + <td>Datanode Rpc port</td> + <td>{{$ctrl.overview.jmx.DatanodeRpcPort}}</td> + </tr> + <tr> + <td>Block Manager: Open containers</td> + <td>{{$ctrl.blockmanagermetrics.OpenContainersNo}}</td> + </tr> + <tr> + <td>Node Manager: Minimum chill mode nodes</td> + <td>{{$ctrl.nodemanagermetrics.MinimumChillModeNodes}}</td> + </tr> + <tr> + <td>Node Manager: Out-of-node chill mode</td> + <td>{{$ctrl.nodemanagermetrics.OutOfNodeChillMode}}</td> + </tr> + <tr> + <td>Node Manager: Chill mode status</td> + <td>{{$ctrl.nodemanagermetrics.ChillModeStatus}}</td> + </tr> + <tr> + <td>Node Manager: Manual chill mode</td> + <td>{{$ctrl.nodemanagermetrics.InManualChillMode}}</td> + </tr> + </tbody> +</table> \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/webapps/scm/scm.js ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/webapps/scm/scm.js b/hadoop-hdds/server-scm/src/main/webapps/scm/scm.js new file mode 100644 index 0000000..bcfa8b7 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/webapps/scm/scm.js @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +(function () { + "use strict"; + angular.module('scm', ['ozone', 'nvd3']); + + angular.module('scm').component('scmOverview', { + templateUrl: 'scm-overview.html', + require: { + overview: "^overview" + }, + controller: function ($http) { + var ctrl = this; + $http.get("jmx?qry=Hadoop:service=BlockManager,name=*") + .then(function (result) { + ctrl.blockmanagermetrics = result.data.beans[0]; + }); + $http.get("jmx?qry=Hadoop:service=SCMNodeManager,name=SCMNodeManagerInfo") + .then(function (result) { + ctrl.nodemanagermetrics = result.data.beans[0]; + }); + + var statusSortOrder = { + "HEALTHY": "a", + "STALE": "b", + "DEAD": "c", + "UNKNOWN": "z", + "DECOMMISSIONING": "x", + "DECOMMISSIONED": "y" + }; + ctrl.nodeOrder = function (v1, v2) { + //status with non defined sort order will be "undefined" + return ("" + statusSortOrder[v1.value]).localeCompare("" + statusSortOrder[v2.value]) + } + + } + }); + +})(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/HddsServerUtilTest.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/HddsServerUtilTest.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/HddsServerUtilTest.java new file mode 100644 index 0000000..6e01e53 --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/HddsServerUtilTest.java @@ -0,0 +1,308 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hadoop.hdds.scm; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.Timeout; + +import java.net.InetSocketAddress; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; + +import static org.apache.hadoop.hdds.HddsUtils.getSCMAddresses; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Test the HDDS server side utilities. + */ +public class HddsServerUtilTest { + + @Rule + public Timeout timeout = new Timeout(300000); + + @Rule + public ExpectedException thrown= ExpectedException.none(); + + /** + * Verify DataNode endpoint lookup failure if neither the client nor + * datanode endpoint are configured. + */ + @Test + public void testMissingScmDataNodeAddress() { + final Configuration conf = new OzoneConfiguration(); + thrown.expect(IllegalArgumentException.class); + HddsServerUtil.getScmAddressForDataNodes(conf); + } + + /** + * Verify that the datanode endpoint is parsed correctly. + * This tests the logic used by the DataNodes to determine which address + * to connect to. + */ + @Test + public void testGetScmDataNodeAddress() { + final Configuration conf = new OzoneConfiguration(); + + // First try a client address with just a host name. Verify it falls + // back to the default port. + conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "1.2.3.4"); + InetSocketAddress addr = HddsServerUtil.getScmAddressForDataNodes(conf); + assertThat(addr.getHostString(), is("1.2.3.4")); + assertThat(addr.getPort(), is( + ScmConfigKeys.OZONE_SCM_DATANODE_PORT_DEFAULT)); + + // Next try a client address with just a host name and port. + // Verify the port is ignored and the default DataNode port is used. + conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "1.2.3.4:100"); + addr = HddsServerUtil.getScmAddressForDataNodes(conf); + assertThat(addr.getHostString(), is("1.2.3.4")); + assertThat(addr.getPort(), is( + ScmConfigKeys.OZONE_SCM_DATANODE_PORT_DEFAULT)); + + // Set both OZONE_SCM_CLIENT_ADDRESS_KEY and + // OZONE_SCM_DATANODE_ADDRESS_KEY. + // Verify that the latter overrides and the port number is still the + // default. + conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "1.2.3.4:100"); + conf.set(ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY, "5.6.7.8"); + addr = + HddsServerUtil.getScmAddressForDataNodes(conf); + assertThat(addr.getHostString(), is("5.6.7.8")); + assertThat(addr.getPort(), is( + ScmConfigKeys.OZONE_SCM_DATANODE_PORT_DEFAULT)); + + // Set both OZONE_SCM_CLIENT_ADDRESS_KEY and + // OZONE_SCM_DATANODE_ADDRESS_KEY. + // Verify that the latter overrides and the port number from the latter is + // used. + conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "1.2.3.4:100"); + conf.set(ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY, "5.6.7.8:200"); + addr = HddsServerUtil.getScmAddressForDataNodes(conf); + assertThat(addr.getHostString(), is("5.6.7.8")); + assertThat(addr.getPort(), is(200)); + } + + + /** + * Verify that the client endpoint bind address is computed correctly. + * This tests the logic used by the SCM to determine its own bind address. + */ + @Test + public void testScmClientBindHostDefault() { + final Configuration conf = new OzoneConfiguration(); + + // The bind host should be 0.0.0.0 unless OZONE_SCM_CLIENT_BIND_HOST_KEY + // is set differently. + conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "1.2.3.4"); + InetSocketAddress addr = HddsServerUtil.getScmClientBindAddress(conf); + assertThat(addr.getHostString(), is("0.0.0.0")); + assertThat(addr.getPort(), is(ScmConfigKeys.OZONE_SCM_CLIENT_PORT_DEFAULT)); + + // The bind host should be 0.0.0.0 unless OZONE_SCM_CLIENT_BIND_HOST_KEY + // is set differently. The port number from OZONE_SCM_CLIENT_ADDRESS_KEY + // should be respected. + conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "1.2.3.4:100"); + conf.set(ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY, "1.2.3.4:200"); + addr = HddsServerUtil.getScmClientBindAddress(conf); + assertThat(addr.getHostString(), is("0.0.0.0")); + assertThat(addr.getPort(), is(100)); + + // OZONE_SCM_CLIENT_BIND_HOST_KEY should be respected. + // Port number should be default if none is specified via + // OZONE_SCM_DATANODE_ADDRESS_KEY. + conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "1.2.3.4"); + conf.set(ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY, "1.2.3.4"); + conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_BIND_HOST_KEY, "5.6.7.8"); + addr = HddsServerUtil.getScmClientBindAddress(conf); + assertThat(addr.getHostString(), is("5.6.7.8")); + assertThat(addr.getPort(), is( + ScmConfigKeys.OZONE_SCM_CLIENT_PORT_DEFAULT)); + + // OZONE_SCM_CLIENT_BIND_HOST_KEY should be respected. + // Port number from OZONE_SCM_CLIENT_ADDRESS_KEY should be + // respected. + conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "1.2.3.4:100"); + conf.set(ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY, "1.2.3.4:200"); + conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_BIND_HOST_KEY, "5.6.7.8"); + addr = HddsServerUtil.getScmClientBindAddress(conf); + assertThat(addr.getHostString(), is("5.6.7.8")); + assertThat(addr.getPort(), is(100)); + } + + /** + * Verify that the DataNode endpoint bind address is computed correctly. + * This tests the logic used by the SCM to determine its own bind address. + */ + @Test + public void testScmDataNodeBindHostDefault() { + final Configuration conf = new OzoneConfiguration(); + + // The bind host should be 0.0.0.0 unless OZONE_SCM_DATANODE_BIND_HOST_KEY + // is set differently. + conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "1.2.3.4"); + InetSocketAddress addr = HddsServerUtil.getScmDataNodeBindAddress(conf); + assertThat(addr.getHostString(), is("0.0.0.0")); + assertThat(addr.getPort(), is( + ScmConfigKeys.OZONE_SCM_DATANODE_PORT_DEFAULT)); + + // The bind host should be 0.0.0.0 unless OZONE_SCM_DATANODE_BIND_HOST_KEY + // is set differently. The port number from OZONE_SCM_DATANODE_ADDRESS_KEY + // should be respected. + conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "1.2.3.4:100"); + conf.set(ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY, "1.2.3.4:200"); + addr = HddsServerUtil.getScmDataNodeBindAddress(conf); + assertThat(addr.getHostString(), is("0.0.0.0")); + assertThat(addr.getPort(), is(200)); + + // OZONE_SCM_DATANODE_BIND_HOST_KEY should be respected. + // Port number should be default if none is specified via + // OZONE_SCM_DATANODE_ADDRESS_KEY. + conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "1.2.3.4:100"); + conf.set(ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY, "1.2.3.4"); + conf.set(ScmConfigKeys.OZONE_SCM_DATANODE_BIND_HOST_KEY, "5.6.7.8"); + addr = HddsServerUtil.getScmDataNodeBindAddress(conf); + assertThat(addr.getHostString(), is("5.6.7.8")); + assertThat(addr.getPort(), is( + ScmConfigKeys.OZONE_SCM_DATANODE_PORT_DEFAULT)); + + // OZONE_SCM_DATANODE_BIND_HOST_KEY should be respected. + // Port number from OZONE_SCM_DATANODE_ADDRESS_KEY should be + // respected. + conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "1.2.3.4:100"); + conf.set(ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY, "1.2.3.4:200"); + conf.set(ScmConfigKeys.OZONE_SCM_DATANODE_BIND_HOST_KEY, "5.6.7.8"); + addr = HddsServerUtil.getScmDataNodeBindAddress(conf); + assertThat(addr.getHostString(), is("5.6.7.8")); + assertThat(addr.getPort(), is(200)); + } + + + + @Test + public void testGetSCMAddresses() { + final Configuration conf = new OzoneConfiguration(); + Collection<InetSocketAddress> addresses = null; + InetSocketAddress addr = null; + Iterator<InetSocketAddress> it = null; + + // Verify valid IP address setup + conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, "1.2.3.4"); + addresses = getSCMAddresses(conf); + assertThat(addresses.size(), is(1)); + addr = addresses.iterator().next(); + assertThat(addr.getHostName(), is("1.2.3.4")); + assertThat(addr.getPort(), is(ScmConfigKeys.OZONE_SCM_DEFAULT_PORT)); + + // Verify valid hostname setup + conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, "scm1"); + addresses = getSCMAddresses(conf); + assertThat(addresses.size(), is(1)); + addr = addresses.iterator().next(); + assertThat(addr.getHostName(), is("scm1")); + assertThat(addr.getPort(), is(ScmConfigKeys.OZONE_SCM_DEFAULT_PORT)); + + // Verify valid hostname and port + conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, "scm1:1234"); + addresses = getSCMAddresses(conf); + assertThat(addresses.size(), is(1)); + addr = addresses.iterator().next(); + assertThat(addr.getHostName(), is("scm1")); + assertThat(addr.getPort(), is(1234)); + + final HashMap<String, Integer> hostsAndPorts = + new HashMap<String, Integer>(); + hostsAndPorts.put("scm1", 1234); + hostsAndPorts.put("scm2", 2345); + hostsAndPorts.put("scm3", 3456); + + // Verify multiple hosts and port + conf.setStrings( + ScmConfigKeys.OZONE_SCM_NAMES, "scm1:1234,scm2:2345,scm3:3456"); + addresses = getSCMAddresses(conf); + assertThat(addresses.size(), is(3)); + it = addresses.iterator(); + HashMap<String, Integer> expected1 = new HashMap<>(hostsAndPorts); + while(it.hasNext()) { + InetSocketAddress current = it.next(); + assertTrue(expected1.remove(current.getHostName(), + current.getPort())); + } + assertTrue(expected1.isEmpty()); + + // Verify names with spaces + conf.setStrings( + ScmConfigKeys.OZONE_SCM_NAMES, " scm1:1234, scm2:2345 , scm3:3456 "); + addresses = getSCMAddresses(conf); + assertThat(addresses.size(), is(3)); + it = addresses.iterator(); + HashMap<String, Integer> expected2 = new HashMap<>(hostsAndPorts); + while(it.hasNext()) { + InetSocketAddress current = it.next(); + assertTrue(expected2.remove(current.getHostName(), + current.getPort())); + } + assertTrue(expected2.isEmpty()); + + // Verify empty value + conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, ""); + try { + addresses = getSCMAddresses(conf); + fail("Empty value should cause an IllegalArgumentException"); + } catch (Exception e) { + assertTrue(e instanceof IllegalArgumentException); + } + + // Verify invalid hostname + conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, "s..x..:1234"); + try { + addresses = getSCMAddresses(conf); + fail("An invalid hostname should cause an IllegalArgumentException"); + } catch (Exception e) { + assertTrue(e instanceof IllegalArgumentException); + } + + // Verify invalid port + conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, "scm:xyz"); + try { + addresses = getSCMAddresses(conf); + fail("An invalid port should cause an IllegalArgumentException"); + } catch (Exception e) { + assertTrue(e instanceof IllegalArgumentException); + } + + // Verify a mixed case (valid and invalid value both appears) + conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, "scm1:1234, scm:xyz"); + try { + addresses = getSCMAddresses(conf); + fail("An invalid value should cause an IllegalArgumentException"); + } catch (Exception e) { + assertTrue(e instanceof IllegalArgumentException); + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestStorageContainerManagerHttpServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestStorageContainerManagerHttpServer.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestStorageContainerManagerHttpServer.java new file mode 100644 index 0000000..5d9139d --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestStorageContainerManagerHttpServer.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.web.URLConnectionFactory; +import org.apache.hadoop.http.HttpConfig; +import org.apache.hadoop.http.HttpConfig.Policy; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.ssl.KeyStoreTestUtil; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import java.io.File; +import java.net.InetSocketAddress; +import java.net.URL; +import java.net.URLConnection; +import java.util.Arrays; +import java.util.Collection; + +/** + * Test http server os SCM with various HTTP option. + */ +@RunWith(value = Parameterized.class) +public class TestStorageContainerManagerHttpServer { + private static final String BASEDIR = GenericTestUtils + .getTempPath(TestStorageContainerManagerHttpServer.class.getSimpleName()); + private static String keystoresDir; + private static String sslConfDir; + private static Configuration conf; + private static URLConnectionFactory connectionFactory; + + @Parameters public static Collection<Object[]> policy() { + Object[][] params = new Object[][] { + {HttpConfig.Policy.HTTP_ONLY}, + {HttpConfig.Policy.HTTPS_ONLY}, + {HttpConfig.Policy.HTTP_AND_HTTPS} }; + return Arrays.asList(params); + } + + private final HttpConfig.Policy policy; + + public TestStorageContainerManagerHttpServer(Policy policy) { + super(); + this.policy = policy; + } + + @BeforeClass public static void setUp() throws Exception { + File base = new File(BASEDIR); + FileUtil.fullyDelete(base); + base.mkdirs(); + conf = new Configuration(); + keystoresDir = new File(BASEDIR).getAbsolutePath(); + sslConfDir = KeyStoreTestUtil.getClasspathDir( + org.apache.hadoop.hdfs.server.namenode.TestNameNodeHttpServer.class); + KeyStoreTestUtil.setupSSLConfig(keystoresDir, sslConfDir, conf, false); + connectionFactory = + URLConnectionFactory.newDefaultURLConnectionFactory(conf); + conf.set(DFSConfigKeys.DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY, + KeyStoreTestUtil.getClientSSLConfigFileName()); + conf.set(DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY, + KeyStoreTestUtil.getServerSSLConfigFileName()); + } + + @AfterClass public static void tearDown() throws Exception { + FileUtil.fullyDelete(new File(BASEDIR)); + KeyStoreTestUtil.cleanupSSLConfig(keystoresDir, sslConfDir); + } + + @Test public void testHttpPolicy() throws Exception { + conf.set(DFSConfigKeys.DFS_HTTP_POLICY_KEY, policy.name()); + conf.set(ScmConfigKeys.OZONE_SCM_HTTPS_ADDRESS_KEY, "localhost:0"); + + InetSocketAddress addr = InetSocketAddress.createUnresolved("localhost", 0); + StorageContainerManagerHttpServer server = null; + try { + server = new StorageContainerManagerHttpServer(conf); + server.start(); + + Assert.assertTrue(implies(policy.isHttpEnabled(), + canAccess("http", server.getHttpAddress()))); + Assert.assertTrue( + implies(!policy.isHttpEnabled(), server.getHttpAddress() == null)); + + Assert.assertTrue(implies(policy.isHttpsEnabled(), + canAccess("https", server.getHttpsAddress()))); + Assert.assertTrue( + implies(!policy.isHttpsEnabled(), server.getHttpsAddress() == null)); + + } finally { + if (server != null) { + server.stop(); + } + } + } + + private static boolean canAccess(String scheme, InetSocketAddress addr) { + if (addr == null) { + return false; + } + try { + URL url = + new URL(scheme + "://" + NetUtils.getHostPortString(addr) + "/jmx"); + URLConnection conn = connectionFactory.openConnection(url); + conn.connect(); + conn.getContent(); + } catch (Exception e) { + return false; + } + return true; + } + + private static boolean implies(boolean a, boolean b) { + return !a || b; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java new file mode 100644 index 0000000..e191dd5 --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.scm; + +import org.apache.hadoop.hdds.scm.node.SCMNodeManager; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.UUID; + +/** + * Stateless helper functions to handler scm/datanode connection. + */ +public class TestUtils { + + private TestUtils() { + } + + public static DatanodeDetails getDatanodeDetails(SCMNodeManager nodeManager) { + + return getDatanodeDetails(nodeManager, UUID.randomUUID().toString()); + } + + /** + * Create a new DatanodeDetails with NodeID set to the string. + * + * @param uuid - node ID, it is generally UUID. + * @return DatanodeID. + */ + public static DatanodeDetails getDatanodeDetails(SCMNodeManager nodeManager, + String uuid) { + DatanodeDetails datanodeDetails = getDatanodeDetails(uuid); + nodeManager.register(datanodeDetails.getProtoBufMessage()); + return datanodeDetails; + } + + /** + * Get specified number of DatanodeDetails and registered them with node + * manager. + * + * @param nodeManager - node manager to register the datanode ids. + * @param count - number of DatanodeDetails needed. + * @return + */ + public static List<DatanodeDetails> getListOfRegisteredDatanodeDetails( + SCMNodeManager nodeManager, int count) { + ArrayList<DatanodeDetails> datanodes = new ArrayList<>(); + for (int i = 0; i < count; i++) { + datanodes.add(getDatanodeDetails(nodeManager)); + } + return datanodes; + } + + /** + * Get a datanode details. + * + * @return DatanodeDetails + */ + public static DatanodeDetails getDatanodeDetails() { + return getDatanodeDetails(UUID.randomUUID().toString()); + } + + private static DatanodeDetails getDatanodeDetails(String uuid) { + Random random = new Random(); + String ipAddress = + random.nextInt(256) + "." + random.nextInt(256) + "." + random + .nextInt(256) + "." + random.nextInt(256); + + String hostName = uuid; + DatanodeDetails.Builder builder = DatanodeDetails.newBuilder(); + builder.setUuid(uuid) + .setHostName("localhost") + .setIpAddress(ipAddress) + .setInfoPort(0) + .setInfoSecurePort(0) + .setContainerPort(0) + .setRatisPort(0) + .setOzoneRestPort(0); + return builder.build(); + } + + /** + * Get specified number of list of DatanodeDetails. + * + * @param count - number of datanode IDs needed. + * @return + */ + public static List<DatanodeDetails> getListOfDatanodeDetails(int count) { + ArrayList<DatanodeDetails> datanodes = new ArrayList<>(); + for (int i = 0; i < count; i++) { + datanodes.add(getDatanodeDetails()); + } + return datanodes; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java new file mode 100644 index 0000000..0eff702 --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.block; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.container.ContainerMapping; +import org.apache.hadoop.hdds.scm.container.MockNodeManager; +import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.container.common.SCMTestUtils; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.UUID; + +import static org.apache.hadoop.ozone.OzoneConsts.GB; +import static org.apache.hadoop.ozone.OzoneConsts.MB; + + +/** + * Tests for SCM Block Manager. + */ +public class TestBlockManager { + private static ContainerMapping mapping; + private static MockNodeManager nodeManager; + private static BlockManagerImpl blockManager; + private static File testDir; + private final static long DEFAULT_BLOCK_SIZE = 128 * MB; + private static HddsProtos.ReplicationFactor factor; + private static HddsProtos.ReplicationType type; + private static String containerOwner = "OZONE"; + + @Rule + public ExpectedException thrown = ExpectedException.none(); + @BeforeClass + public static void setUp() throws Exception { + Configuration conf = SCMTestUtils.getConf(); + + String path = GenericTestUtils + .getTempPath(TestBlockManager.class.getSimpleName()); + + conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, path); + testDir = Paths.get(path).toFile(); + boolean folderExisted = testDir.exists() || testDir.mkdirs(); + if (!folderExisted) { + throw new IOException("Unable to create test directory path"); + } + nodeManager = new MockNodeManager(true, 10); + mapping = new ContainerMapping(conf, nodeManager, 128); + blockManager = new BlockManagerImpl(conf, nodeManager, mapping, 128); + if(conf.getBoolean(ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY, + ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT)){ + factor = HddsProtos.ReplicationFactor.THREE; + type = HddsProtos.ReplicationType.RATIS; + } else { + factor = HddsProtos.ReplicationFactor.ONE; + type = HddsProtos.ReplicationType.STAND_ALONE; + } + } + + @AfterClass + public static void cleanup() throws IOException { + blockManager.close(); + mapping.close(); + FileUtil.fullyDelete(testDir); + } + + @Before + public void clearChillMode() { + nodeManager.setChillmode(false); + } + + @Test + public void testAllocateBlock() throws Exception { + AllocatedBlock block = blockManager.allocateBlock(DEFAULT_BLOCK_SIZE, + type, factor, containerOwner); + Assert.assertNotNull(block); + } + + @Test + public void testGetAllocatedBlock() throws IOException { + AllocatedBlock block = blockManager.allocateBlock(DEFAULT_BLOCK_SIZE, + type, factor, containerOwner); + Assert.assertNotNull(block); + Pipeline pipeline = blockManager.getBlock(block.getKey()); + Assert.assertEquals(pipeline.getLeader().getUuid(), + block.getPipeline().getLeader().getUuid()); + } + + @Test + public void testDeleteBlock() throws Exception { + AllocatedBlock block = blockManager.allocateBlock(DEFAULT_BLOCK_SIZE, + type, factor, containerOwner); + Assert.assertNotNull(block); + blockManager.deleteBlocks(Collections.singletonList(block.getKey())); + + // Deleted block can not be retrieved + thrown.expectMessage("Specified block key does not exist."); + blockManager.getBlock(block.getKey()); + + // Tombstone of the deleted block can be retrieved if it has not been + // cleaned yet. + String deletedKeyName = blockManager.getDeletedKeyName(block.getKey()); + Pipeline pipeline = blockManager.getBlock(deletedKeyName); + Assert.assertEquals(pipeline.getLeader().getUuid(), + block.getPipeline().getLeader().getUuid()); + } + + @Test + public void testAllocateOversizedBlock() throws IOException { + long size = 6 * GB; + thrown.expectMessage("Unsupported block size"); + AllocatedBlock block = blockManager.allocateBlock(size, + type, factor, containerOwner); + } + + @Test + public void testGetNoneExistentContainer() throws IOException { + String nonExistBlockKey = UUID.randomUUID().toString(); + thrown.expectMessage("Specified block key does not exist."); + blockManager.getBlock(nonExistBlockKey); + } + + @Test + public void testChillModeAllocateBlockFails() throws IOException { + nodeManager.setChillmode(true); + thrown.expectMessage("Unable to create block while in chill mode"); + blockManager.allocateBlock(DEFAULT_BLOCK_SIZE, + type, factor, containerOwner); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java new file mode 100644 index 0000000..e820fa4 --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java @@ -0,0 +1,367 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.block; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.hdds.scm.container.ContainerMapping; +import org.apache.hadoop.hdds.scm.container.Mapping; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineChannel; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.utils.MetadataKeyFilters; +import org.apache.hadoop.utils.MetadataStore; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.UUID; +import java.util.stream.Collectors; + +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_BLOCK_DELETION_MAX_RETRY; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_DIRS; +import static org.mockito.Mockito.mock; + +/** + * Tests for DeletedBlockLog. + */ +public class TestDeletedBlockLog { + + private static DeletedBlockLogImpl deletedBlockLog; + private OzoneConfiguration conf; + private File testDir; + + @Before + public void setup() throws Exception { + testDir = GenericTestUtils.getTestDir( + TestDeletedBlockLog.class.getSimpleName()); + conf = new OzoneConfiguration(); + conf.setInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY, 20); + conf.set(OZONE_METADATA_DIRS, testDir.getAbsolutePath()); + deletedBlockLog = new DeletedBlockLogImpl(conf); + } + + @After + public void tearDown() throws Exception { + deletedBlockLog.close(); + FileUtils.deleteDirectory(testDir); + } + + private Map<String, List<String>> generateData(int dataSize) { + Map<String, List<String>> blockMap = new HashMap<>(); + Random random = new Random(1); + for (int i = 0; i < dataSize; i++) { + String containerName = "container-" + UUID.randomUUID().toString(); + List<String> blocks = new ArrayList<>(); + int blockSize = random.nextInt(30) + 1; + for (int j = 0; j < blockSize; j++) { + blocks.add("block-" + UUID.randomUUID().toString()); + } + blockMap.put(containerName, blocks); + } + return blockMap; + } + + @Test + public void testGetTransactions() throws Exception { + List<DeletedBlocksTransaction> blocks = + deletedBlockLog.getTransactions(30); + Assert.assertEquals(0, blocks.size()); + + // Creates 40 TX in the log. + for (Map.Entry<String, List<String>> entry : generateData(40).entrySet()){ + deletedBlockLog.addTransaction(entry.getKey(), entry.getValue()); + } + + // Get first 30 TXs. + blocks = deletedBlockLog.getTransactions(30); + Assert.assertEquals(30, blocks.size()); + for (int i = 0; i < 30; i++) { + Assert.assertEquals(i + 1, blocks.get(i).getTxID()); + } + + // Get another 30 TXs. + // The log only 10 left, so this time it will only return 10 TXs. + blocks = deletedBlockLog.getTransactions(30); + Assert.assertEquals(10, blocks.size()); + for (int i = 30; i < 40; i++) { + Assert.assertEquals(i + 1, blocks.get(i - 30).getTxID()); + } + + // Get another 50 TXs. + // By now the position should have moved to the beginning, + // this call will return all 40 TXs. + blocks = deletedBlockLog.getTransactions(50); + Assert.assertEquals(40, blocks.size()); + for (int i = 0; i < 40; i++) { + Assert.assertEquals(i + 1, blocks.get(i).getTxID()); + } + List<Long> txIDs = new ArrayList<>(); + for (DeletedBlocksTransaction block : blocks) { + txIDs.add(block.getTxID()); + } + deletedBlockLog.commitTransactions(txIDs); + } + + @Test + public void testIncrementCount() throws Exception { + int maxRetry = conf.getInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY, 20); + + // Create 30 TXs in the log. + for (Map.Entry<String, List<String>> entry : generateData(30).entrySet()){ + deletedBlockLog.addTransaction(entry.getKey(), entry.getValue()); + } + + // This will return all TXs, total num 30. + List<DeletedBlocksTransaction> blocks = + deletedBlockLog.getTransactions(40); + List<Long> txIDs = blocks.stream().map(DeletedBlocksTransaction::getTxID) + .collect(Collectors.toList()); + + for (int i = 0; i < maxRetry; i++) { + deletedBlockLog.incrementCount(txIDs); + } + + // Increment another time so it exceed the maxRetry. + // On this call, count will be set to -1 which means TX eventually fails. + deletedBlockLog.incrementCount(txIDs); + blocks = deletedBlockLog.getTransactions(40); + for (DeletedBlocksTransaction block : blocks) { + Assert.assertEquals(-1, block.getCount()); + } + + // If all TXs are failed, getTransactions call will always return nothing. + blocks = deletedBlockLog.getTransactions(40); + Assert.assertEquals(blocks.size(), 0); + } + + @Test + public void testCommitTransactions() throws Exception { + for (Map.Entry<String, List<String>> entry : generateData(50).entrySet()){ + deletedBlockLog.addTransaction(entry.getKey(), entry.getValue()); + } + List<DeletedBlocksTransaction> blocks = + deletedBlockLog.getTransactions(20); + List<Long> txIDs = new ArrayList<>(); + for (DeletedBlocksTransaction block : blocks) { + txIDs.add(block.getTxID()); + } + // Add an invalid txID. + txIDs.add(70L); + deletedBlockLog.commitTransactions(txIDs); + blocks = deletedBlockLog.getTransactions(50); + Assert.assertEquals(30, blocks.size()); + } + + @Test + public void testRandomOperateTransactions() throws Exception { + Random random = new Random(); + int added = 0, committed = 0; + List<DeletedBlocksTransaction> blocks = new ArrayList<>(); + List<Long> txIDs = new ArrayList<>(); + byte[] latestTxid = DFSUtil.string2Bytes("#LATEST_TXID#"); + MetadataKeyFilters.MetadataKeyFilter avoidLatestTxid = + (preKey, currentKey, nextKey) -> + !Arrays.equals(latestTxid, currentKey); + MetadataStore store = deletedBlockLog.getDeletedStore(); + // Randomly add/get/commit/increase transactions. + for (int i = 0; i < 100; i++) { + int state = random.nextInt(4); + if (state == 0) { + for (Map.Entry<String, List<String>> entry : + generateData(10).entrySet()){ + deletedBlockLog.addTransaction(entry.getKey(), entry.getValue()); + } + added += 10; + } else if (state == 1) { + blocks = deletedBlockLog.getTransactions(20); + txIDs = new ArrayList<>(); + for (DeletedBlocksTransaction block : blocks) { + txIDs.add(block.getTxID()); + } + deletedBlockLog.incrementCount(txIDs); + } else if (state == 2) { + txIDs = new ArrayList<>(); + for (DeletedBlocksTransaction block : blocks) { + txIDs.add(block.getTxID()); + } + blocks = new ArrayList<>(); + committed += txIDs.size(); + deletedBlockLog.commitTransactions(txIDs); + } else { + // verify the number of added and committed. + List<Map.Entry<byte[], byte[]>> result = + store.getRangeKVs(null, added, avoidLatestTxid); + Assert.assertEquals(added, result.size() + committed); + } + } + } + + @Test + public void testPersistence() throws Exception { + for (Map.Entry<String, List<String>> entry : generateData(50).entrySet()){ + deletedBlockLog.addTransaction(entry.getKey(), entry.getValue()); + } + // close db and reopen it again to make sure + // transactions are stored persistently. + deletedBlockLog.close(); + deletedBlockLog = new DeletedBlockLogImpl(conf); + List<DeletedBlocksTransaction> blocks = + deletedBlockLog.getTransactions(10); + List<Long> txIDs = new ArrayList<>(); + for (DeletedBlocksTransaction block : blocks) { + txIDs.add(block.getTxID()); + } + deletedBlockLog.commitTransactions(txIDs); + blocks = deletedBlockLog.getTransactions(10); + Assert.assertEquals(10, blocks.size()); + } + + @Test + public void testDeletedBlockTransactions() throws IOException { + int txNum = 10; + int maximumAllowedTXNum = 5; + List<DeletedBlocksTransaction> blocks = null; + List<String> containerNames = new LinkedList<>(); + + int count = 0; + String containerName = null; + DatanodeDetails dnDd1 = DatanodeDetails.newBuilder() + .setUuid("node1") + .setIpAddress("127.0.0.1") + .setHostName("localhost") + .setInfoPort(0) + .setInfoSecurePort(0) + .setContainerPort(0) + .setRatisPort(0) + .setOzoneRestPort(0) + .build(); + DatanodeDetails dnId2 = DatanodeDetails.newBuilder() + .setUuid("node2") + .setIpAddress("127.0.0.1") + .setHostName("localhost") + .setInfoPort(0) + .setInfoSecurePort(0) + .setContainerPort(0) + .setRatisPort(0) + .setOzoneRestPort(0) + .build(); + Mapping mappingService = mock(ContainerMapping.class); + // Creates {TXNum} TX in the log. + for (Map.Entry<String, List<String>> entry : generateData(txNum) + .entrySet()) { + count++; + containerName = entry.getKey(); + containerNames.add(containerName); + deletedBlockLog.addTransaction(containerName, entry.getValue()); + + // make TX[1-6] for datanode1; TX[7-10] for datanode2 + if (count <= (maximumAllowedTXNum + 1)) { + mockContainerInfo(mappingService, containerName, dnDd1); + } else { + mockContainerInfo(mappingService, containerName, dnId2); + } + } + + DatanodeDeletedBlockTransactions transactions = + new DatanodeDeletedBlockTransactions(mappingService, + maximumAllowedTXNum, 2); + deletedBlockLog.getTransactions(transactions); + + List<Long> txIDs = new LinkedList<>(); + for (UUID id : transactions.getDatanodeIDs()) { + List<DeletedBlocksTransaction> txs = transactions + .getDatanodeTransactions(id); + for (DeletedBlocksTransaction tx : txs) { + txIDs.add(tx.getTxID()); + } + } + + // delete TX ID + deletedBlockLog.commitTransactions(txIDs); + blocks = deletedBlockLog.getTransactions(txNum); + // There should be one block remained since dnID1 reaches + // the maximum value (5). + Assert.assertEquals(1, blocks.size()); + + Assert.assertFalse(transactions.isFull()); + // The number of TX in dnID1 won't more than maximum value. + Assert.assertEquals(maximumAllowedTXNum, + transactions.getDatanodeTransactions(dnDd1.getUuid()).size()); + + int size = transactions.getDatanodeTransactions(dnId2.getUuid()).size(); + // add duplicated container in dnID2, this should be failed. + DeletedBlocksTransaction.Builder builder = + DeletedBlocksTransaction.newBuilder(); + builder.setTxID(11); + builder.setContainerName(containerName); + builder.setCount(0); + transactions.addTransaction(builder.build()); + + // The number of TX in dnID2 should not be changed. + Assert.assertEquals(size, + transactions.getDatanodeTransactions(dnId2.getUuid()).size()); + + // Add new TX in dnID2, then dnID2 will reach maximum value. + containerName = "newContainer"; + builder = DeletedBlocksTransaction.newBuilder(); + builder.setTxID(12); + builder.setContainerName(containerName); + builder.setCount(0); + mockContainerInfo(mappingService, containerName, dnId2); + transactions.addTransaction(builder.build()); + // Since all node are full, then transactions is full. + Assert.assertTrue(transactions.isFull()); + } + + private void mockContainerInfo(Mapping mappingService, String containerName, + DatanodeDetails dd) throws IOException { + PipelineChannel pipelineChannel = + new PipelineChannel("fake", LifeCycleState.OPEN, + ReplicationType.STAND_ALONE, ReplicationFactor.ONE, "fake"); + pipelineChannel.addMember(dd); + Pipeline pipeline = new Pipeline(containerName, pipelineChannel); + + ContainerInfo.Builder builder = new ContainerInfo.Builder(); + builder.setPipeline(pipeline); + + ContainerInfo conatinerInfo = builder.build(); + Mockito.doReturn(conatinerInfo).when(mappingService) + .getContainer(containerName); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java new file mode 100644 index 0000000..e3473b3 --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java @@ -0,0 +1,520 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.scm.container; + +import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; +import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.node.NodePoolManager; +import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ReportState; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMNodeReport; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMStorageReport; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.protocol.VersionResponse; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; +import org.assertj.core.util.Preconditions; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static org.apache.hadoop.hdds.scm.TestUtils.getDatanodeDetails; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DEAD; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState + .HEALTHY; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.STALE; + +/** + * Test Helper for testing container Mapping. + */ +public class MockNodeManager implements NodeManager { + private final static NodeData[] NODES = { + new NodeData(10L * OzoneConsts.TB, OzoneConsts.GB), + new NodeData(64L * OzoneConsts.TB, 100 * OzoneConsts.GB), + new NodeData(128L * OzoneConsts.TB, 256 * OzoneConsts.GB), + new NodeData(40L * OzoneConsts.TB, OzoneConsts.TB), + new NodeData(256L * OzoneConsts.TB, 200 * OzoneConsts.TB), + new NodeData(20L * OzoneConsts.TB, 10 * OzoneConsts.GB), + new NodeData(32L * OzoneConsts.TB, 16 * OzoneConsts.TB), + new NodeData(OzoneConsts.TB, 900 * OzoneConsts.GB), + new NodeData(OzoneConsts.TB, 900 * OzoneConsts.GB, NodeData.STALE), + new NodeData(OzoneConsts.TB, 200L * OzoneConsts.GB, NodeData.STALE), + new NodeData(OzoneConsts.TB, 200L * OzoneConsts.GB, NodeData.DEAD) + }; + private final List<DatanodeDetails> healthyNodes; + private final List<DatanodeDetails> staleNodes; + private final List<DatanodeDetails> deadNodes; + private final Map<UUID, SCMNodeStat> nodeMetricMap; + private final SCMNodeStat aggregateStat; + private boolean chillmode; + private final Map<UUID, List<SCMCommand>> commandMap; + + public MockNodeManager(boolean initializeFakeNodes, int nodeCount) { + this.healthyNodes = new LinkedList<>(); + this.staleNodes = new LinkedList<>(); + this.deadNodes = new LinkedList<>(); + this.nodeMetricMap = new HashMap<>(); + aggregateStat = new SCMNodeStat(); + if (initializeFakeNodes) { + for (int x = 0; x < nodeCount; x++) { + DatanodeDetails dd = getDatanodeDetails(); + populateNodeMetric(dd, x); + } + } + chillmode = false; + this.commandMap = new HashMap<>(); + } + + /** + * Invoked from ctor to create some node Metrics. + * + * @param datanodeDetails - Datanode details + */ + private void populateNodeMetric(DatanodeDetails datanodeDetails, int x) { + SCMNodeStat newStat = new SCMNodeStat(); + long remaining = + NODES[x % NODES.length].capacity - NODES[x % NODES.length].used; + newStat.set( + (NODES[x % NODES.length].capacity), + (NODES[x % NODES.length].used), remaining); + this.nodeMetricMap.put(datanodeDetails.getUuid(), newStat); + aggregateStat.add(newStat); + + if (NODES[x % NODES.length].getCurrentState() == NodeData.HEALTHY) { + healthyNodes.add(datanodeDetails); + } + + if (NODES[x % NODES.length].getCurrentState() == NodeData.STALE) { + staleNodes.add(datanodeDetails); + } + + if (NODES[x % NODES.length].getCurrentState() == NodeData.DEAD) { + deadNodes.add(datanodeDetails); + } + + } + + /** + * Sets the chill mode value. + * @param chillmode boolean + */ + public void setChillmode(boolean chillmode) { + this.chillmode = chillmode; + } + + /** + * Removes a data node from the management of this Node Manager. + * + * @param node - DataNode. + * @throws UnregisteredNodeException + */ + @Override + public void removeNode(DatanodeDetails node) + throws UnregisteredNodeException { + + } + + /** + * Gets all Live Datanodes that is currently communicating with SCM. + * + * @param nodestate - State of the node + * @return List of Datanodes that are Heartbeating SCM. + */ + @Override + public List<DatanodeDetails> getNodes(HddsProtos.NodeState nodestate) { + if (nodestate == HEALTHY) { + return healthyNodes; + } + + if (nodestate == STALE) { + return staleNodes; + } + + if (nodestate == DEAD) { + return deadNodes; + } + + return null; + } + + /** + * Returns the Number of Datanodes that are communicating with SCM. + * + * @param nodestate - State of the node + * @return int -- count + */ + @Override + public int getNodeCount(HddsProtos.NodeState nodestate) { + List<DatanodeDetails> nodes = getNodes(nodestate); + if (nodes != null) { + return nodes.size(); + } + return 0; + } + + /** + * Get all datanodes known to SCM. + * + * @return List of DatanodeDetails known to SCM. + */ + @Override + public List<DatanodeDetails> getAllNodes() { + return null; + } + + /** + * Get the minimum number of nodes to get out of chill mode. + * + * @return int + */ + @Override + public int getMinimumChillModeNodes() { + return 0; + } + + /** + * Chill mode is the period when node manager waits for a minimum configured + * number of datanodes to report in. This is called chill mode to indicate the + * period before node manager gets into action. + * <p> + * Forcefully exits the chill mode, even if we have not met the minimum + * criteria of the nodes reporting in. + */ + @Override + public void forceExitChillMode() { + + } + + /** + * Puts the node manager into manual chill mode. + */ + @Override + public void enterChillMode() { + + } + + /** + * Brings node manager out of manual chill mode. + */ + @Override + public void exitChillMode() { + + } + + /** + * Returns true if node manager is out of chill mode, else false. + * @return true if out of chill mode, else false + */ + @Override + public boolean isOutOfChillMode() { + return !chillmode; + } + + /** + * Returns a chill mode status string. + * + * @return String + */ + @Override + public String getChillModeStatus() { + return null; + } + + /** + * Returns the aggregated node stats. + * @return the aggregated node stats. + */ + @Override + public SCMNodeStat getStats() { + return aggregateStat; + } + + /** + * Return a map of nodes to their stats. + * @return a list of individual node stats (live/stale but not dead). + */ + @Override + public Map<UUID, SCMNodeStat> getNodeStats() { + return nodeMetricMap; + } + + /** + * Return the node stat of the specified datanode. + * @param datanodeDetails - datanode details. + * @return node stat if it is live/stale, null if it is dead or does't exist. + */ + @Override + public SCMNodeMetric getNodeStat(DatanodeDetails datanodeDetails) { + return new SCMNodeMetric(nodeMetricMap.get(datanodeDetails.getUuid())); + } + + @Override + public NodePoolManager getNodePoolManager() { + return Mockito.mock(NodePoolManager.class); + } + + /** + * Used for testing. + * + * @return true if the HB check is done. + */ + @Override + public boolean waitForHeartbeatProcessed() { + return false; + } + + /** + * Returns the node state of a specific node. + * + * @param dd - DatanodeDetails + * @return Healthy/Stale/Dead. + */ + @Override + public HddsProtos.NodeState getNodeState(DatanodeDetails dd) { + return null; + } + + @Override + public void addDatanodeCommand(UUID dnId, SCMCommand command) { + if(commandMap.containsKey(dnId)) { + List<SCMCommand> commandList = commandMap.get(dnId); + Preconditions.checkNotNull(commandList); + commandList.add(command); + } else { + List<SCMCommand> commandList = new LinkedList<>(); + commandList.add(command); + commandMap.put(dnId, commandList); + } + } + + // Returns the number of commands that is queued to this node manager. + public int getCommandCount(DatanodeDetails dd) { + List<SCMCommand> list = commandMap.get(dd); + return (list == null) ? 0 : list.size(); + } + + public void clearCommandQueue(UUID dnId) { + if(commandMap.containsKey(dnId)) { + commandMap.put(dnId, new LinkedList<>()); + } + } + + /** + * Closes this stream and releases any system resources associated with it. If + * the stream is already closed then invoking this method has no effect. + * <p> + * <p> As noted in {@link AutoCloseable#close()}, cases where the close may + * fail require careful attention. It is strongly advised to relinquish the + * underlying resources and to internally <em>mark</em> the {@code Closeable} + * as closed, prior to throwing the {@code IOException}. + * + * @throws IOException if an I/O error occurs + */ + @Override + public void close() throws IOException { + + } + + /** + * When an object implementing interface <code>Runnable</code> is used to + * create a thread, starting the thread causes the object's <code>run</code> + * method to be called in that separately executing thread. + * <p> + * The general contract of the method <code>run</code> is that it may take any + * action whatsoever. + * + * @see Thread#run() + */ + @Override + public void run() { + + } + + /** + * Gets the version info from SCM. + * + * @param versionRequest - version Request. + * @return - returns SCM version info and other required information needed by + * datanode. + */ + @Override + public VersionResponse getVersion(SCMVersionRequestProto versionRequest) { + return null; + } + + /** + * Register the node if the node finds that it is not registered with any + * SCM. + * + * @param datanodeDetails DatanodeDetailsProto + * @return SCMHeartbeatResponseProto + */ + @Override + public SCMCommand register(HddsProtos.DatanodeDetailsProto datanodeDetails) { + return null; + } + + /** + * Send heartbeat to indicate the datanode is alive and doing well. + * + * @param datanodeDetails - Datanode ID. + * @param nodeReport - node report. + * @param containerReportState - container report state. + * @return SCMheartbeat response list + */ + @Override + public List<SCMCommand> sendHeartbeat( + HddsProtos.DatanodeDetailsProto datanodeDetails, + SCMNodeReport nodeReport, ReportState containerReportState) { + if ((datanodeDetails != null) && (nodeReport != null) && (nodeReport + .getStorageReportCount() > 0)) { + SCMNodeStat stat = this.nodeMetricMap.get(datanodeDetails.getUuid()); + + long totalCapacity = 0L; + long totalRemaining = 0L; + long totalScmUsed = 0L; + List<SCMStorageReport> storageReports = nodeReport.getStorageReportList(); + for (SCMStorageReport report : storageReports) { + totalCapacity += report.getCapacity(); + totalRemaining += report.getRemaining(); + totalScmUsed += report.getScmUsed(); + } + aggregateStat.subtract(stat); + stat.set(totalCapacity, totalScmUsed, totalRemaining); + aggregateStat.add(stat); + nodeMetricMap.put(DatanodeDetails + .getFromProtoBuf(datanodeDetails).getUuid(), stat); + + } + return null; + } + + @Override + public Map<String, Integer> getNodeCount() { + Map<String, Integer> nodeCountMap = new HashMap<String, Integer>(); + for (HddsProtos.NodeState state : HddsProtos.NodeState.values()) { + nodeCountMap.put(state.toString(), getNodeCount(state)); + } + return nodeCountMap; + } + + /** + * Makes it easy to add a container. + * + * @param datanodeDetails datanode details + * @param size number of bytes. + */ + public void addContainer(DatanodeDetails datanodeDetails, long size) { + SCMNodeStat stat = this.nodeMetricMap.get(datanodeDetails.getUuid()); + if (stat != null) { + aggregateStat.subtract(stat); + stat.getCapacity().add(size); + aggregateStat.add(stat); + nodeMetricMap.put(datanodeDetails.getUuid(), stat); + } + } + + /** + * Makes it easy to simulate a delete of a container. + * + * @param datanodeDetails datanode Details + * @param size number of bytes. + */ + public void delContainer(DatanodeDetails datanodeDetails, long size) { + SCMNodeStat stat = this.nodeMetricMap.get(datanodeDetails.getUuid()); + if (stat != null) { + aggregateStat.subtract(stat); + stat.getCapacity().subtract(size); + aggregateStat.add(stat); + nodeMetricMap.put(datanodeDetails.getUuid(), stat); + } + } + + /** + * A class to declare some values for the nodes so that our tests + * won't fail. + */ + private static class NodeData { + public static final long HEALTHY = 1; + public static final long STALE = 2; + public static final long DEAD = 3; + + private long capacity; + private long used; + + private long currentState; + + /** + * By default nodes are healthy. + * @param capacity + * @param used + */ + NodeData(long capacity, long used) { + this(capacity, used, HEALTHY); + } + + /** + * Constructs a nodeDefinition. + * + * @param capacity capacity. + * @param used used. + * @param currentState - Healthy, Stale and DEAD nodes. + */ + NodeData(long capacity, long used, long currentState) { + this.capacity = capacity; + this.used = used; + this.currentState = currentState; + } + + public long getCapacity() { + return capacity; + } + + public void setCapacity(long capacity) { + this.capacity = capacity; + } + + public long getUsed() { + return used; + } + + public void setUsed(long used) { + this.used = used; + } + + public long getCurrentState() { + return currentState; + } + + public void setCurrentState(long currentState) { + this.currentState = currentState; + } + + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerMapping.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerMapping.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerMapping.java new file mode 100644 index 0000000..200a611 --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerMapping.java @@ -0,0 +1,333 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.scm.container; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.TestUtils; +import org.apache.hadoop.hdds.scm.XceiverClientManager; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.container.common.SCMTestUtils; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.NavigableSet; +import java.util.Set; +import java.util.TreeSet; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +/** + * Tests for Container Mapping. + */ +public class TestContainerMapping { + private static ContainerMapping mapping; + private static MockNodeManager nodeManager; + private static File testDir; + private static XceiverClientManager xceiverClientManager; + private static String containerOwner = "OZONE"; + + private static final long TIMEOUT = 10000; + + @Rule + public ExpectedException thrown = ExpectedException.none(); + @BeforeClass + public static void setUp() throws Exception { + Configuration conf = SCMTestUtils.getConf(); + + testDir = GenericTestUtils + .getTestDir(TestContainerMapping.class.getSimpleName()); + conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, + testDir.getAbsolutePath()); + conf.setTimeDuration( + ScmConfigKeys.OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT, + TIMEOUT, + TimeUnit.MILLISECONDS); + boolean folderExisted = testDir.exists() || testDir.mkdirs(); + if (!folderExisted) { + throw new IOException("Unable to create test directory path"); + } + nodeManager = new MockNodeManager(true, 10); + mapping = new ContainerMapping(conf, nodeManager, 128); + xceiverClientManager = new XceiverClientManager(conf); + } + + @AfterClass + public static void cleanup() throws IOException { + if(mapping != null) { + mapping.close(); + } + FileUtil.fullyDelete(testDir); + } + + @Before + public void clearChillMode() { + nodeManager.setChillmode(false); + } + + @Test + public void testallocateContainer() throws Exception { + ContainerInfo containerInfo = mapping.allocateContainer( + xceiverClientManager.getType(), + xceiverClientManager.getFactor(), + UUID.randomUUID().toString(), containerOwner); + Assert.assertNotNull(containerInfo); + } + + @Test + public void testallocateContainerDistributesAllocation() throws Exception { + /* This is a lame test, we should really be testing something like + z-score or make sure that we don't have 3sigma kind of events. Too lazy + to write all that code. This test very lamely tests if we have more than + 5 separate nodes from the list of 10 datanodes that got allocated a + container. + */ + Set<UUID> pipelineList = new TreeSet<>(); + for (int x = 0; x < 30; x++) { + ContainerInfo containerInfo = mapping.allocateContainer( + xceiverClientManager.getType(), + xceiverClientManager.getFactor(), + UUID.randomUUID().toString(), containerOwner); + + Assert.assertNotNull(containerInfo); + Assert.assertNotNull(containerInfo.getPipeline()); + pipelineList.add(containerInfo.getPipeline().getLeader() + .getUuid()); + } + Assert.assertTrue(pipelineList.size() > 5); + } + + @Test + public void testGetContainer() throws IOException { + String containerName = UUID.randomUUID().toString(); + Pipeline pipeline = mapping.allocateContainer( + xceiverClientManager.getType(), + xceiverClientManager.getFactor(), containerName, + containerOwner).getPipeline(); + Assert.assertNotNull(pipeline); + Pipeline newPipeline = mapping.getContainer(containerName).getPipeline(); + Assert.assertEquals(pipeline.getLeader().getUuid(), + newPipeline.getLeader().getUuid()); + } + + @Test + public void testDuplicateAllocateContainerFails() throws IOException { + String containerName = UUID.randomUUID().toString(); + Pipeline pipeline = mapping.allocateContainer( + xceiverClientManager.getType(), + xceiverClientManager.getFactor(), containerName, + containerOwner).getPipeline(); + Assert.assertNotNull(pipeline); + thrown.expectMessage("Specified container already exists."); + mapping.allocateContainer(xceiverClientManager.getType(), + xceiverClientManager.getFactor(), containerName, + containerOwner); + } + + @Test + public void testgetNoneExistentContainer() throws IOException { + String containerName = UUID.randomUUID().toString(); + thrown.expectMessage("Specified key does not exist."); + mapping.getContainer(containerName); + } + + @Test + public void testChillModeAllocateContainerFails() throws IOException { + String containerName = UUID.randomUUID().toString(); + nodeManager.setChillmode(true); + thrown.expectMessage("Unable to create container while in chill mode"); + mapping.allocateContainer(xceiverClientManager.getType(), + xceiverClientManager.getFactor(), containerName, + containerOwner); + } + + @Test + public void testContainerCreationLeaseTimeout() throws IOException, + InterruptedException { + String containerName = UUID.randomUUID().toString(); + nodeManager.setChillmode(false); + ContainerInfo containerInfo = mapping.allocateContainer( + xceiverClientManager.getType(), + xceiverClientManager.getFactor(), + containerName, + containerOwner); + mapping.updateContainerState(containerInfo.getContainerName(), + HddsProtos.LifeCycleEvent.CREATE); + Thread.sleep(TIMEOUT + 1000); + + NavigableSet<ContainerID> deleteContainers = mapping.getStateManager() + .getMatchingContainerIDs( + "OZONE", + xceiverClientManager.getType(), + xceiverClientManager.getFactor(), + HddsProtos.LifeCycleState.DELETING); + Assert.assertTrue(deleteContainers.contains(containerInfo.containerID())); + + thrown.expect(IOException.class); + thrown.expectMessage("Lease Exception"); + mapping.updateContainerState(containerInfo.getContainerName(), + HddsProtos.LifeCycleEvent.CREATED); + } + + @Test + public void testFullContainerReport() throws IOException { + String containerName = UUID.randomUUID().toString(); + ContainerInfo info = createContainer(containerName); + DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails(); + ContainerReportsRequestProto.reportType reportType = + ContainerReportsRequestProto.reportType.fullReport; + List<StorageContainerDatanodeProtocolProtos.ContainerInfo> reports = + new ArrayList<>(); + StorageContainerDatanodeProtocolProtos.ContainerInfo.Builder ciBuilder = + StorageContainerDatanodeProtocolProtos.ContainerInfo.newBuilder(); + ciBuilder.setContainerName(containerName) + //setting some random hash + .setFinalhash("e16cc9d6024365750ed8dbd194ea46d2") + .setSize(5368709120L) + .setUsed(2000000000L) + .setKeyCount(100000000L) + .setReadCount(100000000L) + .setWriteCount(100000000L) + .setReadBytes(2000000000L) + .setWriteBytes(2000000000L) + .setContainerID(info.getContainerID()); + + reports.add(ciBuilder.build()); + + ContainerReportsRequestProto.Builder crBuilder = + ContainerReportsRequestProto.newBuilder(); + crBuilder.setDatanodeDetails(datanodeDetails.getProtoBufMessage()) + .setType(reportType).addAllReports(reports); + + mapping.processContainerReports(crBuilder.build()); + + ContainerInfo updatedContainer = mapping.getContainer(containerName); + Assert.assertEquals(100000000L, updatedContainer.getNumberOfKeys()); + Assert.assertEquals(2000000000L, updatedContainer.getUsedBytes()); + } + + @Test + public void testContainerCloseWithContainerReport() throws IOException { + String containerName = UUID.randomUUID().toString(); + ContainerInfo info = createContainer(containerName); + DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails(); + ContainerReportsRequestProto.reportType reportType = + ContainerReportsRequestProto.reportType.fullReport; + List<StorageContainerDatanodeProtocolProtos.ContainerInfo> reports = + new ArrayList<>(); + + StorageContainerDatanodeProtocolProtos.ContainerInfo.Builder ciBuilder = + StorageContainerDatanodeProtocolProtos.ContainerInfo.newBuilder(); + ciBuilder.setContainerName(containerName) + //setting some random hash + .setFinalhash("7c45eb4d7ed5e0d2e89aaab7759de02e") + .setSize(5368709120L) + .setUsed(5368705120L) + .setKeyCount(500000000L) + .setReadCount(500000000L) + .setWriteCount(500000000L) + .setReadBytes(5368705120L) + .setWriteBytes(5368705120L) + .setContainerID(info.getContainerID()); + + reports.add(ciBuilder.build()); + + ContainerReportsRequestProto.Builder crBuilder = + ContainerReportsRequestProto.newBuilder(); + crBuilder.setDatanodeDetails(datanodeDetails.getProtoBufMessage()) + .setType(reportType).addAllReports(reports); + + mapping.processContainerReports(crBuilder.build()); + + ContainerInfo updatedContainer = mapping.getContainer(containerName); + Assert.assertEquals(500000000L, updatedContainer.getNumberOfKeys()); + Assert.assertEquals(5368705120L, updatedContainer.getUsedBytes()); + NavigableSet<ContainerID> pendingCloseContainers = mapping.getStateManager() + .getMatchingContainerIDs( + containerOwner, + xceiverClientManager.getType(), + xceiverClientManager.getFactor(), + HddsProtos.LifeCycleState.CLOSING); + Assert.assertTrue( + pendingCloseContainers.contains(updatedContainer.containerID())); + } + + @Test + public void testCloseContainer() throws IOException { + String containerName = UUID.randomUUID().toString(); + ContainerInfo info = createContainer(containerName); + mapping.updateContainerState(containerName, + HddsProtos.LifeCycleEvent.FINALIZE); + NavigableSet<ContainerID> pendingCloseContainers = mapping.getStateManager() + .getMatchingContainerIDs( + containerOwner, + xceiverClientManager.getType(), + xceiverClientManager.getFactor(), + HddsProtos.LifeCycleState.CLOSING); + Assert.assertTrue(pendingCloseContainers.contains(info.containerID())); + mapping.updateContainerState(containerName, + HddsProtos.LifeCycleEvent.CLOSE); + NavigableSet<ContainerID> closeContainers = mapping.getStateManager() + .getMatchingContainerIDs( + containerOwner, + xceiverClientManager.getType(), + xceiverClientManager.getFactor(), + HddsProtos.LifeCycleState.CLOSED); + Assert.assertTrue(closeContainers.contains(info.containerID())); + } + + /** + * Creates a container with the given name in ContainerMapping. + * @param containerName + * Name of the container + * @throws IOException + */ + private ContainerInfo createContainer(String containerName) + throws IOException { + nodeManager.setChillmode(false); + ContainerInfo containerInfo = mapping.allocateContainer( + xceiverClientManager.getType(), + xceiverClientManager.getFactor(), + containerName, + containerOwner); + mapping.updateContainerState(containerInfo.getContainerName(), + HddsProtos.LifeCycleEvent.CREATE); + mapping.updateContainerState(containerInfo.getContainerName(), + HddsProtos.LifeCycleEvent.CREATED); + return containerInfo; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/closer/TestContainerCloser.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/closer/TestContainerCloser.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/closer/TestContainerCloser.java new file mode 100644 index 0000000..2fec232 --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/closer/TestContainerCloser.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + * + */ + +package org.apache.hadoop.hdds.scm.container.closer; + +import org.apache.commons.lang.RandomStringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdds.scm.TestUtils; +import org.apache.hadoop.hdds.scm.container.ContainerMapping; +import org.apache.hadoop.hdds.scm.container.MockNodeManager; +import org.apache.hadoop.hdds.scm.container.TestContainerMapping; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.container.common.SCMTestUtils; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_CONTAINER_SIZE_DEFAULT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_CONTAINER_SIZE_GB; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent + .CREATE; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent + .CREATED; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_CONTAINER_REPORT_INTERVAL; + +/** + * Test class for Closing Container. + */ +public class TestContainerCloser { + + private static final long GIGABYTE = 1024L * 1024L * 1024L; + private static Configuration configuration; + private static MockNodeManager nodeManager; + private static ContainerMapping mapping; + private static long size; + private static File testDir; + + @BeforeClass + public static void setUp() throws Exception { + configuration = SCMTestUtils.getConf(); + size = configuration.getLong(OZONE_SCM_CONTAINER_SIZE_GB, + OZONE_SCM_CONTAINER_SIZE_DEFAULT) * 1024 * 1024 * 1024; + configuration.setTimeDuration(OZONE_CONTAINER_REPORT_INTERVAL, + 1, TimeUnit.SECONDS); + testDir = GenericTestUtils + .getTestDir(TestContainerMapping.class.getSimpleName()); + configuration.set(OzoneConfigKeys.OZONE_METADATA_DIRS, + testDir.getAbsolutePath()); + nodeManager = new MockNodeManager(true, 10); + mapping = new ContainerMapping(configuration, nodeManager, 128); + } + + @AfterClass + public static void tearDown() throws Exception { + if (mapping != null) { + mapping.close(); + } + FileUtil.fullyDelete(testDir); + } + + @Test + public void testClose() throws IOException { + String containerName = "container-" + RandomStringUtils.randomNumeric(5); + + ContainerInfo info = mapping.allocateContainer( + HddsProtos.ReplicationType.STAND_ALONE, + HddsProtos.ReplicationFactor.ONE, containerName, "ozone"); + + //Execute these state transitions so that we can close the container. + mapping.updateContainerState(containerName, CREATE); + mapping.updateContainerState(containerName, CREATED); + long currentCount = mapping.getCloser().getCloseCount(); + long runCount = mapping.getCloser().getThreadRunCount(); + + DatanodeDetails datanode = info.getPipeline().getLeader(); + // Send a container report with used set to 1 GB. This should not close. + sendContainerReport(info, 1 * GIGABYTE); + + // with only one container the cleaner thread should not run. + Assert.assertEquals(0, mapping.getCloser().getThreadRunCount()); + + // With only 1 GB, the container should not be queued for closing. + Assert.assertEquals(0, mapping.getCloser().getCloseCount()); + + // Assert that the Close command was not queued for this Datanode. + Assert.assertEquals(0, nodeManager.getCommandCount(datanode)); + + long newUsed = (long) (size * 0.91f); + sendContainerReport(info, newUsed); + + // with only one container the cleaner thread should not run. + Assert.assertEquals(runCount, mapping.getCloser().getThreadRunCount()); + + // and close count will be one. + Assert.assertEquals(1, + mapping.getCloser().getCloseCount() - currentCount); + + // Assert that the Close command was Queued for this Datanode. + Assert.assertEquals(1, nodeManager.getCommandCount(datanode)); + } + + @Test + public void testRepeatedClose() throws IOException, + InterruptedException { + // This test asserts that if we queue more than one report then the + // second report is discarded by the system if it lands in the 3 * report + // frequency window. + + configuration.setTimeDuration(OZONE_CONTAINER_REPORT_INTERVAL, 1, + TimeUnit.SECONDS); + String containerName = "container-" + RandomStringUtils.randomNumeric(5); + + ContainerInfo info = mapping.allocateContainer( + HddsProtos.ReplicationType.STAND_ALONE, + HddsProtos.ReplicationFactor.ONE, containerName, "ozone"); + + //Execute these state transitions so that we can close the container. + mapping.updateContainerState(containerName, CREATE); + + long currentCount = mapping.getCloser().getCloseCount(); + long runCount = mapping.getCloser().getThreadRunCount(); + + + DatanodeDetails datanodeDetails = info.getPipeline().getLeader(); + + // Send this command twice and assert we have only one command in the queue. + sendContainerReport(info, 5 * GIGABYTE); + sendContainerReport(info, 5 * GIGABYTE); + + // Assert that the Close command was Queued for this Datanode. + Assert.assertEquals(1, + nodeManager.getCommandCount(datanodeDetails)); + // And close count will be one. + Assert.assertEquals(1, + mapping.getCloser().getCloseCount() - currentCount); + Thread.sleep(TimeUnit.SECONDS.toMillis(4)); + + //send another close and the system will queue this to the command queue. + sendContainerReport(info, 5 * GIGABYTE); + Assert.assertEquals(2, + nodeManager.getCommandCount(datanodeDetails)); + // but the close count will still be one, since from the point of view of + // closer we are closing only one container even if we have send multiple + // close commands to the datanode. + Assert.assertEquals(1, mapping.getCloser().getCloseCount() + - currentCount); + } + + @Test + public void testCleanupThreadRuns() throws IOException, + InterruptedException { + // This test asserts that clean up thread runs once we have closed a + // number above cleanup water mark. + + long runCount = mapping.getCloser().getThreadRunCount(); + + for (int x = 0; x < ContainerCloser.getCleanupWaterMark() + 10; x++) { + String containerName = "container-" + RandomStringUtils.randomNumeric(7); + ContainerInfo info = mapping.allocateContainer( + HddsProtos.ReplicationType.STAND_ALONE, + HddsProtos.ReplicationFactor.ONE, containerName, "ozone"); + mapping.updateContainerState(containerName, CREATE); + mapping.updateContainerState(containerName, CREATED); + sendContainerReport(info, 5 * GIGABYTE); + } + + Thread.sleep(TimeUnit.SECONDS.toMillis(1)); + + // Assert that cleanup thread ran at least once. + Assert.assertTrue(mapping.getCloser().getThreadRunCount() - runCount > 0); + } + + private void sendContainerReport(ContainerInfo info, long used) throws + IOException { + ContainerReportsRequestProto.Builder + reports = ContainerReportsRequestProto.newBuilder(); + reports.setType(ContainerReportsRequestProto.reportType.fullReport); + + StorageContainerDatanodeProtocolProtos.ContainerInfo.Builder ciBuilder = + StorageContainerDatanodeProtocolProtos.ContainerInfo.newBuilder(); + ciBuilder.setContainerName(info.getContainerName()) + .setFinalhash("e16cc9d6024365750ed8dbd194ea46d2") + .setSize(size) + .setUsed(used) + .setKeyCount(100000000L) + .setReadCount(100000000L) + .setWriteCount(100000000L) + .setReadBytes(2000000000L) + .setWriteBytes(2000000000L) + .setContainerID(1L); + reports.setDatanodeDetails( + TestUtils.getDatanodeDetails().getProtoBufMessage()); + reports.addReports(ciBuilder); + mapping.processContainerReports(reports.build()); + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org