This is an automated email from the ASF dual-hosted git repository. elek pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new 1b041d4 HDDS-1497. Refactor blockade Tests. Contributed by Nilotpal Nandi. 1b041d4 is described below commit 1b041d4fd4ec0c8c4cfdcd6fa28711cf7fcd56fe Author: Márton Elek <e...@apache.org> AuthorDate: Thu May 30 16:46:06 2019 +0200 HDDS-1497. Refactor blockade Tests. Contributed by Nilotpal Nandi. --- .../src/main/blockade/blockadeUtils/blockade.py | 30 +-- .../src/main/blockade/clusterUtils/__init__.py | 2 +- .../main/blockade/clusterUtils/cluster_utils.py | 11 +- .../blockade/{clusterUtils => ozone}/__init__.py | 2 +- .../dist/src/main/blockade/ozone/cluster.py | 295 +++++++++++++++++++++ .../blockade/test_blockade_datanode_isolation.py | 219 ++++++++------- .../dist/src/main/blockade/test_blockade_flaky.py | 48 ++-- hadoop-ozone/dist/src/main/blockade/util.py | 52 ++++ .../src/main/compose/ozoneblockade/docker-config | 6 + 9 files changed, 508 insertions(+), 157 deletions(-) diff --git a/hadoop-ozone/dist/src/main/blockade/blockadeUtils/blockade.py b/hadoop-ozone/dist/src/main/blockade/blockadeUtils/blockade.py index f371865..7809c70 100644 --- a/hadoop-ozone/dist/src/main/blockade/blockadeUtils/blockade.py +++ b/hadoop-ozone/dist/src/main/blockade/blockadeUtils/blockade.py @@ -18,9 +18,8 @@ """This module has apis to create and remove a blockade cluster""" from subprocess import call -import subprocess import logging -import random +import util from clusterUtils.cluster_utils import ClusterUtils logger = logging.getLogger(__name__) @@ -39,23 +38,13 @@ class Blockade(object): @classmethod def blockade_status(cls): - exit_code, output = ClusterUtils.run_cmd("blockade status") + exit_code, output = util.run_cmd("blockade status") return exit_code, output @classmethod - def make_flaky(cls, flaky_node, container_list): - # make the network flaky - om, scm, _, datanodes = \ - ClusterUtils.find_om_scm_client_datanodes(container_list) - node_dict = { - "all": "--all", - "scm" : scm[0], - "om" : om[0], - "datanode": random.choice(datanodes) - }[flaky_node] - logger.info("flaky node: %s", node_dict) - - output = call(["blockade", "flaky", node_dict]) + def make_flaky(cls, flaky_node): + logger.info("flaky node: %s", flaky_node) + output = call(["blockade", "flaky", flaky_node]) assert output == 0, "flaky command failed with exit code=[%s]" % output @classmethod @@ -69,7 +58,7 @@ class Blockade(object): for node_list in args: nodes = nodes + ','.join(node_list) + " " exit_code, output = \ - ClusterUtils.run_cmd("blockade partition %s" % nodes) + util.run_cmd("blockade partition %s" % nodes) assert exit_code == 0, \ "blockade partition command failed with exit code=[%s]" % output @@ -95,4 +84,9 @@ class Blockade(object): else: output = call(["blockade", "start", node]) assert output == 0, "blockade start command failed with " \ - "exit code=[%s]" % output \ No newline at end of file + "exit code=[%s]" % output + + @classmethod + def blockade_add(cls, node): + output = call(["blockade", "add", node]) + assert output == 0, "blockade add command failed" \ No newline at end of file diff --git a/hadoop-ozone/dist/src/main/blockade/clusterUtils/__init__.py b/hadoop-ozone/dist/src/main/blockade/clusterUtils/__init__.py index ae1e83e..13878a1 100644 --- a/hadoop-ozone/dist/src/main/blockade/clusterUtils/__init__.py +++ b/hadoop-ozone/dist/src/main/blockade/clusterUtils/__init__.py @@ -11,4 +11,4 @@ # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and -# limitations under the License. +# limitations under the License. \ No newline at end of file diff --git a/hadoop-ozone/dist/src/main/blockade/clusterUtils/cluster_utils.py b/hadoop-ozone/dist/src/main/blockade/clusterUtils/cluster_utils.py index 3a04103..cf67380 100644 --- a/hadoop-ozone/dist/src/main/blockade/clusterUtils/cluster_utils.py +++ b/hadoop-ozone/dist/src/main/blockade/clusterUtils/cluster_utils.py @@ -17,6 +17,7 @@ from subprocess import call + import subprocess import logging import time @@ -292,9 +293,15 @@ class ClusterUtils(object): assert exit_code == 0, "Ozone get Key failed with output=[%s]" % output @classmethod - def find_checksum(cls, docker_compose_file, filepath): + def find_checksum(cls, docker_compose_file, filepath, client="ozone_client"): + """ + This function finds the checksum of a file present in a docker container. + Before running any 'putKey' operation, this function is called to store + the original checksum of the file. The file is then uploaded as a key. + """ command = "docker-compose -f %s " \ - "exec ozone_client md5sum %s" % (docker_compose_file, filepath) + "exec %s md5sum %s" % \ + (docker_compose_file, client, filepath) exit_code, output = cls.run_cmd(command) assert exit_code == 0, "Cant find checksum" myoutput = output.split("\n") diff --git a/hadoop-ozone/dist/src/main/blockade/clusterUtils/__init__.py b/hadoop-ozone/dist/src/main/blockade/ozone/__init__.py similarity index 95% copy from hadoop-ozone/dist/src/main/blockade/clusterUtils/__init__.py copy to hadoop-ozone/dist/src/main/blockade/ozone/__init__.py index ae1e83e..13878a1 100644 --- a/hadoop-ozone/dist/src/main/blockade/clusterUtils/__init__.py +++ b/hadoop-ozone/dist/src/main/blockade/ozone/__init__.py @@ -11,4 +11,4 @@ # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and -# limitations under the License. +# limitations under the License. \ No newline at end of file diff --git a/hadoop-ozone/dist/src/main/blockade/ozone/cluster.py b/hadoop-ozone/dist/src/main/blockade/ozone/cluster.py new file mode 100644 index 0000000..4347f86 --- /dev/null +++ b/hadoop-ozone/dist/src/main/blockade/ozone/cluster.py @@ -0,0 +1,295 @@ +#!/usr/bin/python + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import os +import re +import subprocess +import yaml +import util +from subprocess import call +from blockadeUtils.blockade import Blockade + + +class Command(object): + docker = "docker" + blockade = "blockade" + docker_compose = "docker-compose" + ozone = "/opt/hadoop/bin/ozone" + freon = "/opt/hadoop/bin/ozone freon" + + +class Configuration: + """ + Configurations to be used while starting Ozone Cluster. + Here @property decorators is used to achieve getters, setters and delete + behaviour for 'datanode_count' attribute. + @datanode_count.setter will set the value for 'datanode_count' attribute. + @datanode_count.deleter will delete the current value of 'datanode_count' + attribute. + """ + + def __init__(self): + __parent_dir__ = os.path.dirname(os.path.dirname( + os.path.dirname(os.path.realpath(__file__)))) + self.docker_compose_file = os.path.join(__parent_dir__, + "compose", "ozoneblockade", + "docker-compose.yaml") + self._datanode_count = 3 + os.environ["DOCKER_COMPOSE_FILE"] = self.docker_compose_file + + @property + def datanode_count(self): + return self._datanode_count + + @datanode_count.setter + def datanode_count(self, datanode_count): + self._datanode_count = datanode_count + + @datanode_count.deleter + def datanode_count(self): + del self._datanode_count + + +class Cluster(object): + """ + This represents Ozone Cluster. + Here @property decorators is used to achieve getters, setters and delete + behaviour for 'om', 'scm', 'datanodes' and 'clients' attributes. + """ + + __logger__ = logging.getLogger(__name__) + + def __init__(self, conf): + self.conf = conf + self.docker_compose_file = conf.docker_compose_file + self._om = None + self._scm = None + self._datanodes = None + self._clients = None + self.scm_uuid = None + self.datanode_dir = None + + @property + def om(self): + return self._om + + @om.setter + def om(self, om): + self._om = om + + @om.deleter + def om(self): + del self._om + + @property + def scm(self): + return self._scm + + @scm.setter + def scm(self, scm): + self._scm = scm + + @scm.deleter + def scm(self): + del self._scm + + @property + def datanodes(self): + return self._datanodes + + @datanodes.setter + def datanodes(self, datanodes): + self._datanodes = datanodes + + @datanodes.deleter + def datanodes(self): + del self._datanodes + + @property + def clients(self): + return self._clients + + @clients.setter + def clients(self, clients): + self._clients = clients + + @clients.deleter + def clients(self): + del self._clients + + @classmethod + def create(cls, config=Configuration()): + return Cluster(config) + + def start(self): + """ + Start Ozone Cluster in docker containers. + """ + Cluster.__logger__.info("Starting Ozone Cluster") + Blockade.blockade_destroy() + call([Command.docker_compose, "-f", self.docker_compose_file, + "up", "-d", "--scale", + "datanode=" + str(self.conf.datanode_count)]) + Cluster.__logger__.info("Waiting 10s for cluster start up...") + # Remove the sleep and wait only till the cluster is out of safemode + # time.sleep(10) + output = subprocess.check_output([Command.docker_compose, "-f", + self.docker_compose_file, "ps"]) + node_list = [] + for out in output.split("\n")[2:-1]: + node = out.split(" ")[0] + node_list.append(node) + Blockade.blockade_add(node) + + Blockade.blockade_status() + self.om = filter(lambda x: 'om' in x, node_list)[0] + self.scm = filter(lambda x: 'scm' in x, node_list)[0] + self.datanodes = sorted(list(filter(lambda x: 'datanode' in x, node_list))) + self.clients = filter(lambda x: 'ozone_client' in x, node_list) + self.scm_uuid = self.__get_scm_uuid__() + self.datanode_dir = self.get_conf_value("hdds.datanode.dir") + + assert node_list, "no node found in the cluster!" + Cluster.__logger__.info("blockade created with nodes %s", + ' '.join(node_list)) + + def get_conf_value(self, key): + """ + Returns the value of given configuration key. + """ + command = [Command.ozone, "getconf -confKey " + key] + exit_code, output = self.__run_docker_command__(command, self.om) + return str(output).strip() + + def scale_datanode(self, datanode_count): + """ + Commission new datanodes to the running cluster. + """ + call([Command.docker_compose, "-f", self.docker_compose_file, + "up", "-d", "--scale", "datanode=" + datanode_count]) + + def partition_network(self, *args): + """ + Partition the network which is used by the cluster. + """ + Blockade.blockade_create_partition(*args) + + + def restore_network(self): + """ + Restores the network partition. + """ + Blockade.blockade_join() + + + def __get_scm_uuid__(self): + """ + Returns SCM's UUID. + """ + ozone_metadata_dir = self.get_conf_value("ozone.metadata.dirs") + command = "cat %s/scm/current/VERSION" % ozone_metadata_dir + exit_code, output = self.__run_docker_command__(command, self.scm) + output_list = output.split("\n") + key_value = [x for x in output_list if re.search(r"\w+=\w+", x)] + uuid = [token for token in key_value if 'scmUuid' in token] + return uuid.pop().split("=")[1].strip() + + def get_container_states(self, datanode): + """ + Returns the state of all the containers in the given datanode. + """ + container_parent_path = "%s/hdds/%s/current/containerDir0" % \ + (self.datanode_dir, self.scm_uuid) + command = "find %s -type f -name '*.container'" % container_parent_path + exit_code, output = self.__run_docker_command__(command, datanode) + container_state = {} + + container_list = map(str.strip, output.split("\n")) + for container_path in container_list: + # Reading the container file. + exit_code, output = self.__run_docker_command__( + "cat " + container_path, datanode) + if exit_code is not 0: + continue + data = output.split("\n") + # Reading key value pairs from container file. + key_value = [x for x in data if re.search(r"\w+:\s\w+", x)] + content = "\n".join(key_value) + content_yaml = yaml.load(content) + if content_yaml is None: + continue + for key, value in content_yaml.items(): + content_yaml[key] = str(value).lstrip() + # Stores the container state in a dictionary. + container_state[content_yaml['containerID']] = content_yaml['state'] + return container_state + + def run_freon(self, num_volumes, num_buckets, num_keys, key_size, + replication_type="RATIS", replication_factor="THREE", + run_on=None): + """ + Runs freon on the cluster. + """ + if run_on is None: + run_on = self.om + command = [Command.freon, + " rk", + " --numOfVolumes " + str(num_volumes), + " --numOfBuckets " + str(num_buckets), + " --numOfKeys " + str(num_keys), + " --keySize " + str(key_size), + " --replicationType " + replication_type, + " --factor " + replication_factor] + return self.__run_docker_command__(command, run_on) + + def __run_docker_command__(self, command, run_on): + if isinstance(command, list): + command = ' '.join(command) + command = [Command.docker, + "exec " + run_on, + command] + return util.run_cmd(command) + + def stop(self): + """ + Stops the Ozone Cluster. + """ + Cluster.__logger__.info("Stopping Ozone Cluster") + call([Command.docker_compose, "-f", self.docker_compose_file, "down"]) + Blockade.blockade_destroy() + + def container_state_predicate_all_closed(self, datanodes): + for datanode in datanodes: + container_states_dn = self.get_container_states(datanode) + if not container_states_dn \ + or container_states_dn.popitem()[1] != 'CLOSED': + return False + return True + + def container_state_predicate_one_closed(self, datanodes): + for datanode in datanodes: + container_states_dn = self.get_container_states(datanode) + if container_states_dn and container_states_dn.popitem()[1] == 'CLOSED': + return True + return False + + def container_state_predicate(self, datanode, state): + container_states_dn = self.get_container_states(datanode) + if container_states_dn and container_states_dn.popitem()[1] == state: + return True + return False \ No newline at end of file diff --git a/hadoop-ozone/dist/src/main/blockade/test_blockade_datanode_isolation.py b/hadoop-ozone/dist/src/main/blockade/test_blockade_datanode_isolation.py index 1e53a32..dfa1b70 100644 --- a/hadoop-ozone/dist/src/main/blockade/test_blockade_datanode_isolation.py +++ b/hadoop-ozone/dist/src/main/blockade/test_blockade_datanode_isolation.py @@ -16,132 +16,123 @@ # limitations under the License. import os -import time -import re import logging -from blockadeUtils.blockade import Blockade -from clusterUtils.cluster_utils import ClusterUtils +import util +from ozone.cluster import Cluster logger = logging.getLogger(__name__) -parent_dir = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) -FILE = os.path.join(parent_dir, "compose", "ozoneblockade", - "docker-compose.yaml") -os.environ["DOCKER_COMPOSE_FILE"] = FILE -SCALE = 3 -INCREASED_SCALE = 5 -CONTAINER_LIST = [] -OM = [] -SCM = [] -DATANODES = [] -def setup(): - global CONTAINER_LIST, OM, SCM, DATANODES - Blockade.blockade_destroy() - CONTAINER_LIST = ClusterUtils.cluster_setup(FILE, SCALE) - exit_code, output = Blockade.blockade_status() - assert exit_code == 0, "blockade status command failed with output=[%s]" % \ - output - OM, SCM, _, DATANODES = \ - ClusterUtils.find_om_scm_client_datanodes(CONTAINER_LIST) +def setup_function(function): + global cluster + cluster = Cluster.create() + cluster.start() - exit_code, output = ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", - "THREE") - assert exit_code == 0, "freon run failed with output=[%s]" % output +def teardown_function(function): + cluster.stop() -def teardown(): - logger.info("Inside teardown") - Blockade.blockade_destroy() +def test_isolate_single_datanode(): + """ + In this test case we will create a network partition in such a way that + one of the datanode will not be able to communicate with other datanodes + but it will be able to communicate with SCM. -def teardown_module(): - ClusterUtils.cluster_destroy(FILE) + Once the network partition happens, SCM detects it and closes the pipeline, + which in-turn closes the containers. + The container on the first two datanode will get CLOSED as they have quorum. + The container replica on the third node will be QUASI_CLOSED as it is not + able to connect with the other datanodes and it doesn't have latest BCSID. + + Once we restore the network, the stale replica on the third datanode will be + deleted and a latest replica will be copied from any one of the other + datanodes. -def test_isolatedatanode_singlenode(run_second_phase): - """ - In this test, one of the datanodes (first datanode) cannot communicate - with other two datanodes. - All datanodes can communicate with SCM. - Expectation : - The container replica state in first datanode should be quasi-closed. - The container replica state in other datanodes should be closed. """ - first_set = [OM[0], SCM[0], DATANODES[0]] - second_set = [OM[0], SCM[0], DATANODES[1], DATANODES[2]] - Blockade.blockade_create_partition(first_set, second_set) - Blockade.blockade_status() - ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE") - logger.info("Waiting for %s seconds before checking container status", - os.environ["CONTAINER_STATUS_SLEEP"]) - time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"])) - all_datanodes_container_status = \ - ClusterUtils.findall_container_status(FILE, SCALE) - first_datanode_status = all_datanodes_container_status[0] - closed_container_datanodes = [x for x in all_datanodes_container_status - if x == 'CLOSED'] - assert first_datanode_status == 'QUASI_CLOSED' - assert len(closed_container_datanodes) == 2, \ - "The container should have two closed replicas." - - if str(run_second_phase).lower() == "true": - ClusterUtils.cluster_setup(FILE, INCREASED_SCALE, False) - Blockade.blockade_status() - logger.info("Waiting for %s seconds before checking container status", - os.environ["CONTAINER_STATUS_SLEEP"]) - time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"])) - all_datanodes_container_status = \ - ClusterUtils.findall_container_status(FILE, INCREASED_SCALE) - closed_container_datanodes = [x for x in all_datanodes_container_status - if x == 'CLOSED'] - assert len(closed_container_datanodes) >= 3, \ - "The container should have at least three closed replicas." - Blockade.blockade_join() - Blockade.blockade_status() - _, output = \ - ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE") - assert re.search("Status: Success", output) is not None - - -def test_datanode_isolation_all(run_second_phase): + cluster.run_freon(1, 1, 1, 10240) + first_set = [cluster.om, cluster.scm, + cluster.datanodes[0], cluster.datanodes[1]] + second_set = [cluster.om, cluster.scm, cluster.datanodes[2]] + logger.info("Partitioning the network") + cluster.partition_network(first_set, second_set) + cluster.run_freon(1, 1, 1, 10240) + logger.info("Waiting for container to be QUASI_CLOSED") + + util.wait_until(lambda: cluster.get_container_states(cluster.datanodes[2]) + .popitem()[1] == 'QUASI_CLOSED', + int(os.environ["CONTAINER_STATUS_SLEEP"]), 10) + container_states_dn_0 = cluster.get_container_states(cluster.datanodes[0]) + container_states_dn_1 = cluster.get_container_states(cluster.datanodes[1]) + container_states_dn_2 = cluster.get_container_states(cluster.datanodes[2]) + assert len(container_states_dn_0) != 0 + assert len(container_states_dn_1) != 0 + assert len(container_states_dn_2) != 0 + for key in container_states_dn_0: + assert container_states_dn_0.get(key) == 'CLOSED' + for key in container_states_dn_1: + assert container_states_dn_1.get(key) == 'CLOSED' + for key in container_states_dn_2: + assert container_states_dn_2.get(key) == 'QUASI_CLOSED' + + # Since the replica in datanode[2] doesn't have the latest BCSID, + # ReplicationManager will delete it and copy a closed replica. + # We will now restore the network and datanode[2] should get a + # closed replica of the container + logger.info("Restoring the network") + cluster.restore_network() + + logger.info("Waiting for the replica to be CLOSED") + util.wait_until( + lambda: cluster.container_state_predicate(cluster.datanodes[2], 'CLOSED'), + int(os.environ["CONTAINER_STATUS_SLEEP"]), 10) + container_states_dn_2 = cluster.get_container_states(cluster.datanodes[2]) + assert len(container_states_dn_2) != 0 + for key in container_states_dn_2: + assert container_states_dn_2.get(key) == 'CLOSED' + + +def test_datanode_isolation_all(): """ - In this test, none of the datanodes can communicate with other two - datanodes. - All datanodes can communicate with SCM. - Expectation : The container should eventually have at least two closed - replicas. + In this test case we will create a network partition in such a way that + all datanodes cannot communicate with each other. + All datanodes will be able to communicate with SCM. + + Once the network partition happens, SCM detects it and closes the pipeline, + which in-turn tries to close the containers. + At least one of the replica should be in closed state + + Once we restore the network, there will be three closed replicas. + """ - first_set = [OM[0], SCM[0], DATANODES[0]] - second_set = [OM[0], SCM[0], DATANODES[1]] - third_set = [OM[0], SCM[0], DATANODES[2]] - Blockade.blockade_create_partition(first_set, second_set, third_set) - Blockade.blockade_status() - ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE") - logger.info("Waiting for %s seconds before checking container status", - os.environ["CONTAINER_STATUS_SLEEP"]) - time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"])) - all_datanodes_container_status = \ - ClusterUtils.findall_container_status(FILE, SCALE) - closed_container_datanodes = [x for x in all_datanodes_container_status - if x == 'CLOSED'] - assert len(closed_container_datanodes) >= 2, \ - "The container should have at least two closed replicas." - - if str(run_second_phase).lower() == "true": - ClusterUtils.cluster_setup(FILE, INCREASED_SCALE, False) - Blockade.blockade_status() - logger.info("Waiting for %s seconds before checking container status", - os.environ["CONTAINER_STATUS_SLEEP"]) - time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"])) - all_datanodes_container_status = \ - ClusterUtils.findall_container_status(FILE, INCREASED_SCALE) - closed_container_datanodes = [x for x in all_datanodes_container_status - if x == 'CLOSED'] - assert len(closed_container_datanodes) >= 3, \ - "The container should have at least three closed replicas." - Blockade.blockade_join() - Blockade.blockade_status() - _, output = \ - ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE") - assert re.search("Status: Success", output) is not None \ No newline at end of file + cluster.run_freon(1, 1, 1, 10240) + + assert len(cluster.get_container_states(cluster.datanodes[0])) != 0 + assert len(cluster.get_container_states(cluster.datanodes[1])) != 0 + assert len(cluster.get_container_states(cluster.datanodes[2])) != 0 + + logger.info("Partitioning the network") + first_set = [cluster.om, cluster.scm, cluster.datanodes[0]] + second_set = [cluster.om, cluster.scm, cluster.datanodes[1]] + third_set = [cluster.om, cluster.scm, cluster.datanodes[2]] + cluster.partition_network(first_set, second_set, third_set) + + logger.info("Waiting for the replica to be CLOSED") + util.wait_until( + lambda: cluster.container_state_predicate_one_closed(cluster.datanodes), + int(os.environ["CONTAINER_STATUS_SLEEP"]), 10) + + # At least one of the replica should be in closed state + assert cluster.container_state_predicate_one_closed(cluster.datanodes) + + # After restoring the network all the replicas should be in + # CLOSED state + logger.info("Restoring the network") + cluster.restore_network() + + logger.info("Waiting for the container to be replicated") + util.wait_until( + lambda: cluster.container_state_predicate_all_closed(cluster.datanodes), + int(os.environ["CONTAINER_STATUS_SLEEP"]), 10) + assert cluster.container_state_predicate_all_closed(cluster.datanodes) \ No newline at end of file diff --git a/hadoop-ozone/dist/src/main/blockade/test_blockade_flaky.py b/hadoop-ozone/dist/src/main/blockade/test_blockade_flaky.py index 3129600..a79bd4f 100644 --- a/hadoop-ozone/dist/src/main/blockade/test_blockade_flaky.py +++ b/hadoop-ozone/dist/src/main/blockade/test_blockade_flaky.py @@ -16,11 +16,11 @@ # limitations under the License. import os -import time import logging +import random import pytest from blockadeUtils.blockade import Blockade -from clusterUtils.cluster_utils import ClusterUtils +from ozone.cluster import Cluster logger = logging.getLogger(__name__) @@ -32,30 +32,36 @@ SCALE = 6 CONTAINER_LIST = [] -def setup_module(): - global CONTAINER_LIST - Blockade.blockade_destroy() - CONTAINER_LIST = ClusterUtils.cluster_setup(FILE, SCALE) - exit_code, output = Blockade.blockade_status() - assert exit_code == 0, "blockade status command failed with output=[%s]" % \ - output +def setup_function(function): + global cluster + cluster = Cluster.create() + cluster.start() -def teardown_module(): - Blockade.blockade_destroy() - ClusterUtils.cluster_destroy(FILE) +def teardown_function(function): + cluster.stop() -def teardown(): - logger.info("Inside teardown") - Blockade.blockade_fast_all() - time.sleep(5) +@pytest.mark.parametrize("flaky_node", ["datanode", "scm", "om", "all"]) +def test_flaky(flaky_node): + """ + In these tests, we make the network of the nodes as flaky using blockade. + There are 4 tests : + 1) one of the datanodes selected randomly and network of the datanode is + made flaky. + 2) scm network is made flaky. + 3) om network is made flaky. + 4) Network of all the nodes are made flaky. + """ + flaky_container_name = { + "scm": cluster.scm, + "om": cluster.om, + "datanode": random.choice(cluster.datanodes), + "all": "--all" + }[flaky_node] -@pytest.mark.parametrize("flaky_nodes", ["datanode", "scm", "om", "all"]) -def test_flaky(flaky_nodes): - Blockade.make_flaky(flaky_nodes, CONTAINER_LIST) + Blockade.make_flaky(flaky_container_name) Blockade.blockade_status() - exit_code, output = ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", - "THREE") + exit_code, output = cluster.run_freon(1, 1, 1, 10240) assert exit_code == 0, "freon run failed with output=[%s]" % output \ No newline at end of file diff --git a/hadoop-ozone/dist/src/main/blockade/util.py b/hadoop-ozone/dist/src/main/blockade/util.py new file mode 100644 index 0000000..84f7fda --- /dev/null +++ b/hadoop-ozone/dist/src/main/blockade/util.py @@ -0,0 +1,52 @@ +#!/usr/bin/python + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +import re +import logging +import subprocess + +logger = logging.getLogger(__name__) + +def wait_until(predicate, timeout, check_frequency=1): + deadline = time.time() + timeout + while time.time() < deadline: + if predicate(): + return + time.sleep(check_frequency) + + +def run_cmd(cmd): + command = cmd + if isinstance(cmd, list): + command = ' '.join(cmd) + logger.info(" RUNNING: %s", command) + all_output = "" + my_process = subprocess.Popen(command, stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, shell=True) + while my_process.poll() is None: + op = my_process.stdout.readline() + if op: + all_output += op + logger.info(op) + other_output = my_process.communicate() + other_output = other_output[0].strip() + if other_output != "": + all_output += other_output + reg = re.compile(r"(\r\n|\n)$") + all_output = reg.sub("", all_output, 1) + return my_process.returncode, all_output diff --git a/hadoop-ozone/dist/src/main/compose/ozoneblockade/docker-config b/hadoop-ozone/dist/src/main/compose/ozoneblockade/docker-config index dae9ddb..f5e6a92 100644 --- a/hadoop-ozone/dist/src/main/compose/ozoneblockade/docker-config +++ b/hadoop-ozone/dist/src/main/compose/ozoneblockade/docker-config @@ -26,6 +26,12 @@ OZONE-SITE.XML_ozone.scm.client.address=scm OZONE-SITE.XML_ozone.scm.dead.node.interval=5m OZONE-SITE.XML_ozone.replication=1 OZONE-SITE.XML_hdds.datanode.dir=/data/hdds +OZONE-SITE.XML_ozone.scm.pipeline.owner.container.count=1 +OZONE-SITE.XML_ozone.scm.pipeline.destroy.timeout=15s +OZONE-SITE.XML_hdds.heartbeat.interval=2s +OZONE-SITE.XML_hdds.scm.replication.thread.interval=5s +OZONE-SITE.XML_hdds.scm.replication.event.timeout=7s +OZONE-SITE.XML_dfs.ratis.server.failure.duration=25s HDFS-SITE.XML_rpc.metrics.quantile.enable=true HDFS-SITE.XML_rpc.metrics.percentiles.intervals=60,300 LOG4J.PROPERTIES_log4j.rootLogger=INFO, stdout --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org