This is an automated email from the ASF dual-hosted git repository.

elek pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1b041d4  HDDS-1497. Refactor blockade Tests. Contributed by Nilotpal 
Nandi.
1b041d4 is described below

commit 1b041d4fd4ec0c8c4cfdcd6fa28711cf7fcd56fe
Author: Márton Elek <e...@apache.org>
AuthorDate: Thu May 30 16:46:06 2019 +0200

    HDDS-1497. Refactor blockade Tests. Contributed by Nilotpal Nandi.
---
 .../src/main/blockade/blockadeUtils/blockade.py    |  30 +--
 .../src/main/blockade/clusterUtils/__init__.py     |   2 +-
 .../main/blockade/clusterUtils/cluster_utils.py    |  11 +-
 .../blockade/{clusterUtils => ozone}/__init__.py   |   2 +-
 .../dist/src/main/blockade/ozone/cluster.py        | 295 +++++++++++++++++++++
 .../blockade/test_blockade_datanode_isolation.py   | 219 ++++++++-------
 .../dist/src/main/blockade/test_blockade_flaky.py  |  48 ++--
 hadoop-ozone/dist/src/main/blockade/util.py        |  52 ++++
 .../src/main/compose/ozoneblockade/docker-config   |   6 +
 9 files changed, 508 insertions(+), 157 deletions(-)

diff --git a/hadoop-ozone/dist/src/main/blockade/blockadeUtils/blockade.py 
b/hadoop-ozone/dist/src/main/blockade/blockadeUtils/blockade.py
index f371865..7809c70 100644
--- a/hadoop-ozone/dist/src/main/blockade/blockadeUtils/blockade.py
+++ b/hadoop-ozone/dist/src/main/blockade/blockadeUtils/blockade.py
@@ -18,9 +18,8 @@
 """This module has apis to create and remove a blockade cluster"""
 
 from subprocess import call
-import subprocess
 import logging
-import random
+import util
 from clusterUtils.cluster_utils import ClusterUtils
 
 logger = logging.getLogger(__name__)
@@ -39,23 +38,13 @@ class Blockade(object):
 
     @classmethod
     def blockade_status(cls):
-        exit_code, output = ClusterUtils.run_cmd("blockade status")
+        exit_code, output = util.run_cmd("blockade status")
         return exit_code, output
 
     @classmethod
-    def make_flaky(cls, flaky_node, container_list):
-        # make the network flaky
-        om, scm, _, datanodes = \
-            ClusterUtils.find_om_scm_client_datanodes(container_list)
-        node_dict = {
-                "all": "--all",
-                "scm" : scm[0],
-                "om" : om[0],
-                "datanode": random.choice(datanodes)
-                }[flaky_node]
-        logger.info("flaky node: %s", node_dict)
-
-        output = call(["blockade", "flaky", node_dict])
+    def make_flaky(cls, flaky_node):
+        logger.info("flaky node: %s", flaky_node)
+        output = call(["blockade", "flaky", flaky_node])
         assert output == 0, "flaky command failed with exit code=[%s]" % output
 
     @classmethod
@@ -69,7 +58,7 @@ class Blockade(object):
         for node_list in args:
             nodes = nodes + ','.join(node_list) + " "
         exit_code, output = \
-            ClusterUtils.run_cmd("blockade partition %s" % nodes)
+            util.run_cmd("blockade partition %s" % nodes)
         assert exit_code == 0, \
             "blockade partition command failed with exit code=[%s]" % output
 
@@ -95,4 +84,9 @@ class Blockade(object):
         else:
             output = call(["blockade", "start", node])
         assert output == 0, "blockade start command failed with " \
-                            "exit code=[%s]" % output
\ No newline at end of file
+                            "exit code=[%s]" % output
+
+    @classmethod
+    def blockade_add(cls, node):
+        output = call(["blockade", "add", node])
+        assert output == 0, "blockade add command failed"
\ No newline at end of file
diff --git a/hadoop-ozone/dist/src/main/blockade/clusterUtils/__init__.py 
b/hadoop-ozone/dist/src/main/blockade/clusterUtils/__init__.py
index ae1e83e..13878a1 100644
--- a/hadoop-ozone/dist/src/main/blockade/clusterUtils/__init__.py
+++ b/hadoop-ozone/dist/src/main/blockade/clusterUtils/__init__.py
@@ -11,4 +11,4 @@
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
-# limitations under the License.
+# limitations under the License.
\ No newline at end of file
diff --git a/hadoop-ozone/dist/src/main/blockade/clusterUtils/cluster_utils.py 
b/hadoop-ozone/dist/src/main/blockade/clusterUtils/cluster_utils.py
index 3a04103..cf67380 100644
--- a/hadoop-ozone/dist/src/main/blockade/clusterUtils/cluster_utils.py
+++ b/hadoop-ozone/dist/src/main/blockade/clusterUtils/cluster_utils.py
@@ -17,6 +17,7 @@
 
 
 from subprocess import call
+
 import subprocess
 import logging
 import time
@@ -292,9 +293,15 @@ class ClusterUtils(object):
     assert exit_code == 0, "Ozone get Key failed with output=[%s]" % output
 
   @classmethod
-  def find_checksum(cls, docker_compose_file, filepath):
+  def find_checksum(cls, docker_compose_file, filepath, client="ozone_client"):
+    """
+    This function finds the checksum of a file present in a docker container.
+    Before running any 'putKey' operation, this function is called to store
+    the original checksum of the file. The file is then uploaded as a key.
+    """
     command = "docker-compose -f %s " \
-              "exec ozone_client md5sum  %s" % (docker_compose_file, filepath)
+              "exec %s md5sum  %s" % \
+              (docker_compose_file, client, filepath)
     exit_code, output = cls.run_cmd(command)
     assert exit_code == 0, "Cant find checksum"
     myoutput = output.split("\n")
diff --git a/hadoop-ozone/dist/src/main/blockade/clusterUtils/__init__.py 
b/hadoop-ozone/dist/src/main/blockade/ozone/__init__.py
similarity index 95%
copy from hadoop-ozone/dist/src/main/blockade/clusterUtils/__init__.py
copy to hadoop-ozone/dist/src/main/blockade/ozone/__init__.py
index ae1e83e..13878a1 100644
--- a/hadoop-ozone/dist/src/main/blockade/clusterUtils/__init__.py
+++ b/hadoop-ozone/dist/src/main/blockade/ozone/__init__.py
@@ -11,4 +11,4 @@
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
-# limitations under the License.
+# limitations under the License.
\ No newline at end of file
diff --git a/hadoop-ozone/dist/src/main/blockade/ozone/cluster.py 
b/hadoop-ozone/dist/src/main/blockade/ozone/cluster.py
new file mode 100644
index 0000000..4347f86
--- /dev/null
+++ b/hadoop-ozone/dist/src/main/blockade/ozone/cluster.py
@@ -0,0 +1,295 @@
+#!/usr/bin/python
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import os
+import re
+import subprocess
+import yaml
+import util
+from subprocess import call
+from blockadeUtils.blockade import Blockade
+
+
+class Command(object):
+  docker = "docker"
+  blockade = "blockade"
+  docker_compose = "docker-compose"
+  ozone = "/opt/hadoop/bin/ozone"
+  freon = "/opt/hadoop/bin/ozone freon"
+
+
+class Configuration:
+  """
+  Configurations to be used while starting Ozone Cluster.
+  Here @property decorators is used to achieve getters, setters and delete
+  behaviour for 'datanode_count' attribute.
+  @datanode_count.setter will set the value for 'datanode_count' attribute.
+  @datanode_count.deleter will delete the current value of 'datanode_count'
+  attribute.
+  """
+
+  def __init__(self):
+    __parent_dir__ = os.path.dirname(os.path.dirname(
+      os.path.dirname(os.path.realpath(__file__))))
+    self.docker_compose_file = os.path.join(__parent_dir__,
+                                            "compose", "ozoneblockade",
+                                            "docker-compose.yaml")
+    self._datanode_count = 3
+    os.environ["DOCKER_COMPOSE_FILE"] = self.docker_compose_file
+
+  @property
+  def datanode_count(self):
+    return self._datanode_count
+
+  @datanode_count.setter
+  def datanode_count(self, datanode_count):
+    self._datanode_count = datanode_count
+
+  @datanode_count.deleter
+  def datanode_count(self):
+    del self._datanode_count
+
+
+class Cluster(object):
+  """
+  This represents Ozone Cluster.
+  Here @property decorators is used to achieve getters, setters and delete
+  behaviour for 'om', 'scm', 'datanodes' and 'clients' attributes.
+  """
+
+  __logger__ = logging.getLogger(__name__)
+
+  def __init__(self, conf):
+    self.conf = conf
+    self.docker_compose_file = conf.docker_compose_file
+    self._om = None
+    self._scm = None
+    self._datanodes = None
+    self._clients = None
+    self.scm_uuid = None
+    self.datanode_dir = None
+
+  @property
+  def om(self):
+    return self._om
+
+  @om.setter
+  def om(self, om):
+    self._om = om
+
+  @om.deleter
+  def om(self):
+    del self._om
+
+  @property
+  def scm(self):
+    return self._scm
+
+  @scm.setter
+  def scm(self, scm):
+    self._scm = scm
+
+  @scm.deleter
+  def scm(self):
+    del self._scm
+
+  @property
+  def datanodes(self):
+    return self._datanodes
+
+  @datanodes.setter
+  def datanodes(self, datanodes):
+    self._datanodes = datanodes
+
+  @datanodes.deleter
+  def datanodes(self):
+    del self._datanodes
+
+  @property
+  def clients(self):
+    return self._clients
+
+  @clients.setter
+  def clients(self, clients):
+    self._clients = clients
+
+  @clients.deleter
+  def clients(self):
+    del self._clients
+
+  @classmethod
+  def create(cls, config=Configuration()):
+    return Cluster(config)
+
+  def start(self):
+    """
+    Start Ozone Cluster in docker containers.
+    """
+    Cluster.__logger__.info("Starting Ozone Cluster")
+    Blockade.blockade_destroy()
+    call([Command.docker_compose, "-f", self.docker_compose_file,
+          "up", "-d", "--scale",
+          "datanode=" + str(self.conf.datanode_count)])
+    Cluster.__logger__.info("Waiting 10s for cluster start up...")
+    # Remove the sleep and wait only till the cluster is out of safemode
+    # time.sleep(10)
+    output = subprocess.check_output([Command.docker_compose, "-f",
+                                      self.docker_compose_file, "ps"])
+    node_list = []
+    for out in output.split("\n")[2:-1]:
+      node = out.split(" ")[0]
+      node_list.append(node)
+      Blockade.blockade_add(node)
+
+    Blockade.blockade_status()
+    self.om = filter(lambda x: 'om' in x, node_list)[0]
+    self.scm = filter(lambda x: 'scm' in x, node_list)[0]
+    self.datanodes = sorted(list(filter(lambda x: 'datanode' in x, node_list)))
+    self.clients = filter(lambda x: 'ozone_client' in x, node_list)
+    self.scm_uuid = self.__get_scm_uuid__()
+    self.datanode_dir = self.get_conf_value("hdds.datanode.dir")
+
+    assert node_list, "no node found in the cluster!"
+    Cluster.__logger__.info("blockade created with nodes %s",
+                            ' '.join(node_list))
+
+  def get_conf_value(self, key):
+    """
+    Returns the value of given configuration key.
+    """
+    command = [Command.ozone, "getconf -confKey " + key]
+    exit_code, output = self.__run_docker_command__(command, self.om)
+    return str(output).strip()
+
+  def scale_datanode(self, datanode_count):
+    """
+    Commission new datanodes to the running cluster.
+    """
+    call([Command.docker_compose, "-f", self.docker_compose_file,
+          "up", "-d", "--scale", "datanode=" + datanode_count])
+
+  def partition_network(self, *args):
+    """
+    Partition the network which is used by the cluster.
+    """
+    Blockade.blockade_create_partition(*args)
+
+
+  def restore_network(self):
+    """
+    Restores the network partition.
+    """
+    Blockade.blockade_join()
+
+
+  def __get_scm_uuid__(self):
+    """
+    Returns SCM's UUID.
+    """
+    ozone_metadata_dir = self.get_conf_value("ozone.metadata.dirs")
+    command = "cat %s/scm/current/VERSION" % ozone_metadata_dir
+    exit_code, output = self.__run_docker_command__(command, self.scm)
+    output_list = output.split("\n")
+    key_value = [x for x in output_list if re.search(r"\w+=\w+", x)]
+    uuid = [token for token in key_value if 'scmUuid' in token]
+    return uuid.pop().split("=")[1].strip()
+
+  def get_container_states(self, datanode):
+    """
+    Returns the state of all the containers in the given datanode.
+    """
+    container_parent_path = "%s/hdds/%s/current/containerDir0" % \
+                            (self.datanode_dir, self.scm_uuid)
+    command = "find %s -type f -name '*.container'" % container_parent_path
+    exit_code, output = self.__run_docker_command__(command, datanode)
+    container_state = {}
+
+    container_list = map(str.strip, output.split("\n"))
+    for container_path in container_list:
+      # Reading the container file.
+      exit_code, output = self.__run_docker_command__(
+        "cat " + container_path, datanode)
+      if exit_code is not 0:
+        continue
+      data = output.split("\n")
+      # Reading key value pairs from container file.
+      key_value = [x for x in data if re.search(r"\w+:\s\w+", x)]
+      content = "\n".join(key_value)
+      content_yaml = yaml.load(content)
+      if content_yaml is None:
+        continue
+      for key, value in content_yaml.items():
+        content_yaml[key] = str(value).lstrip()
+      # Stores the container state in a dictionary.
+      container_state[content_yaml['containerID']] = content_yaml['state']
+    return container_state
+
+  def run_freon(self, num_volumes, num_buckets, num_keys, key_size,
+                replication_type="RATIS", replication_factor="THREE",
+                run_on=None):
+    """
+    Runs freon on the cluster.
+    """
+    if run_on is None:
+      run_on = self.om
+    command = [Command.freon,
+               " rk",
+               " --numOfVolumes " + str(num_volumes),
+               " --numOfBuckets " + str(num_buckets),
+               " --numOfKeys " + str(num_keys),
+               " --keySize " + str(key_size),
+               " --replicationType " + replication_type,
+               " --factor " + replication_factor]
+    return self.__run_docker_command__(command, run_on)
+
+  def __run_docker_command__(self, command, run_on):
+    if isinstance(command, list):
+      command = ' '.join(command)
+    command = [Command.docker,
+               "exec " + run_on,
+               command]
+    return util.run_cmd(command)
+
+  def stop(self):
+    """
+    Stops the Ozone Cluster.
+    """
+    Cluster.__logger__.info("Stopping Ozone Cluster")
+    call([Command.docker_compose, "-f", self.docker_compose_file, "down"])
+    Blockade.blockade_destroy()
+
+  def container_state_predicate_all_closed(self, datanodes):
+    for datanode in datanodes:
+      container_states_dn = self.get_container_states(datanode)
+      if not container_states_dn \
+              or container_states_dn.popitem()[1] != 'CLOSED':
+        return False
+    return True
+
+  def container_state_predicate_one_closed(self, datanodes):
+    for datanode in datanodes:
+      container_states_dn = self.get_container_states(datanode)
+      if container_states_dn and container_states_dn.popitem()[1] == 'CLOSED':
+        return True
+    return False
+
+  def container_state_predicate(self, datanode, state):
+    container_states_dn = self.get_container_states(datanode)
+    if container_states_dn and container_states_dn.popitem()[1] == state:
+      return True
+    return False
\ No newline at end of file
diff --git 
a/hadoop-ozone/dist/src/main/blockade/test_blockade_datanode_isolation.py 
b/hadoop-ozone/dist/src/main/blockade/test_blockade_datanode_isolation.py
index 1e53a32..dfa1b70 100644
--- a/hadoop-ozone/dist/src/main/blockade/test_blockade_datanode_isolation.py
+++ b/hadoop-ozone/dist/src/main/blockade/test_blockade_datanode_isolation.py
@@ -16,132 +16,123 @@
 # limitations under the License.
 
 import os
-import time
-import re
 import logging
-from blockadeUtils.blockade import Blockade
-from clusterUtils.cluster_utils import ClusterUtils
+import util
+from ozone.cluster import Cluster
 
 logger = logging.getLogger(__name__)
-parent_dir = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
-FILE = os.path.join(parent_dir, "compose", "ozoneblockade",
-                    "docker-compose.yaml")
-os.environ["DOCKER_COMPOSE_FILE"] = FILE
-SCALE = 3
-INCREASED_SCALE = 5
-CONTAINER_LIST = []
-OM = []
-SCM = []
-DATANODES = []
 
 
-def setup():
-  global CONTAINER_LIST, OM, SCM, DATANODES
-  Blockade.blockade_destroy()
-  CONTAINER_LIST = ClusterUtils.cluster_setup(FILE, SCALE)
-  exit_code, output = Blockade.blockade_status()
-  assert exit_code == 0, "blockade status command failed with output=[%s]" % \
-                         output
-  OM, SCM, _, DATANODES = \
-    ClusterUtils.find_om_scm_client_datanodes(CONTAINER_LIST)
+def setup_function(function):
+  global cluster
+  cluster = Cluster.create()
+  cluster.start()
 
-  exit_code, output = ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS",
-                                             "THREE")
-  assert exit_code == 0, "freon run failed with output=[%s]" % output
 
+def teardown_function(function):
+  cluster.stop()
 
-def teardown():
-  logger.info("Inside teardown")
-  Blockade.blockade_destroy()
 
+def test_isolate_single_datanode():
+  """
+  In this test case we will create a network partition in such a way that
+  one of the datanode will not be able to communicate with other datanodes
+  but it will be able to communicate with SCM.
 
-def teardown_module():
-  ClusterUtils.cluster_destroy(FILE)
+  Once the network partition happens, SCM detects it and closes the pipeline,
+  which in-turn closes the containers.
 
+  The container on the first two datanode will get CLOSED as they have quorum.
+  The container replica on the third node will be QUASI_CLOSED as it is not
+  able to connect with the other datanodes and it doesn't have latest BCSID.
+
+  Once we restore the network, the stale replica on the third datanode will be
+  deleted and a latest replica will be copied from any one of the other
+  datanodes.
 
-def test_isolatedatanode_singlenode(run_second_phase):
-  """
-  In this test, one of the datanodes (first datanode) cannot communicate
-  with other two datanodes.
-  All datanodes can communicate with SCM.
-  Expectation :
-  The container replica state in first datanode should be quasi-closed.
-  The container replica state in other datanodes should be closed.
   """
-  first_set = [OM[0], SCM[0], DATANODES[0]]
-  second_set = [OM[0], SCM[0], DATANODES[1], DATANODES[2]]
-  Blockade.blockade_create_partition(first_set, second_set)
-  Blockade.blockade_status()
-  ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
-  logger.info("Waiting for %s seconds before checking container status",
-              os.environ["CONTAINER_STATUS_SLEEP"])
-  time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
-  all_datanodes_container_status = \
-    ClusterUtils.findall_container_status(FILE, SCALE)
-  first_datanode_status = all_datanodes_container_status[0]
-  closed_container_datanodes = [x for x in all_datanodes_container_status
-                                if x == 'CLOSED']
-  assert first_datanode_status == 'QUASI_CLOSED'
-  assert len(closed_container_datanodes) == 2, \
-    "The container should have two closed replicas."
-
-  if str(run_second_phase).lower() == "true":
-    ClusterUtils.cluster_setup(FILE, INCREASED_SCALE, False)
-    Blockade.blockade_status()
-    logger.info("Waiting for %s seconds before checking container status",
-                os.environ["CONTAINER_STATUS_SLEEP"])
-    time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
-    all_datanodes_container_status = \
-      ClusterUtils.findall_container_status(FILE, INCREASED_SCALE)
-    closed_container_datanodes = [x for x in all_datanodes_container_status
-                                  if x == 'CLOSED']
-    assert len(closed_container_datanodes) >= 3, \
-      "The container should have at least three closed replicas."
-    Blockade.blockade_join()
-    Blockade.blockade_status()
-    _, output = \
-      ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
-    assert re.search("Status: Success", output) is not None
-
-
-def test_datanode_isolation_all(run_second_phase):
+  cluster.run_freon(1, 1, 1, 10240)
+  first_set = [cluster.om, cluster.scm,
+               cluster.datanodes[0], cluster.datanodes[1]]
+  second_set = [cluster.om, cluster.scm, cluster.datanodes[2]]
+  logger.info("Partitioning the network")
+  cluster.partition_network(first_set, second_set)
+  cluster.run_freon(1, 1, 1, 10240)
+  logger.info("Waiting for container to be QUASI_CLOSED")
+
+  util.wait_until(lambda: cluster.get_container_states(cluster.datanodes[2])
+                  .popitem()[1] == 'QUASI_CLOSED',
+                  int(os.environ["CONTAINER_STATUS_SLEEP"]), 10)
+  container_states_dn_0 = cluster.get_container_states(cluster.datanodes[0])
+  container_states_dn_1 = cluster.get_container_states(cluster.datanodes[1])
+  container_states_dn_2 = cluster.get_container_states(cluster.datanodes[2])
+  assert len(container_states_dn_0) != 0
+  assert len(container_states_dn_1) != 0
+  assert len(container_states_dn_2) != 0
+  for key in container_states_dn_0:
+    assert container_states_dn_0.get(key) == 'CLOSED'
+  for key in container_states_dn_1:
+    assert container_states_dn_1.get(key) == 'CLOSED'
+  for key in container_states_dn_2:
+    assert container_states_dn_2.get(key) == 'QUASI_CLOSED'
+
+  # Since the replica in datanode[2] doesn't have the latest BCSID,
+  # ReplicationManager will delete it and copy a closed replica.
+  # We will now restore the network and datanode[2] should get a
+  # closed replica of the container
+  logger.info("Restoring the network")
+  cluster.restore_network()
+
+  logger.info("Waiting for the replica to be CLOSED")
+  util.wait_until(
+    lambda: cluster.container_state_predicate(cluster.datanodes[2], 'CLOSED'),
+    int(os.environ["CONTAINER_STATUS_SLEEP"]), 10)
+  container_states_dn_2 = cluster.get_container_states(cluster.datanodes[2])
+  assert len(container_states_dn_2) != 0
+  for key in container_states_dn_2:
+    assert container_states_dn_2.get(key) == 'CLOSED'
+
+
+def test_datanode_isolation_all():
   """
-  In this test, none of the datanodes can communicate with other two
-  datanodes.
-  All datanodes can communicate with SCM.
-  Expectation : The container should eventually have at least two closed
-  replicas.
+  In this test case we will create a network partition in such a way that
+  all datanodes cannot communicate with each other.
+  All datanodes will be able to communicate with SCM.
+
+  Once the network partition happens, SCM detects it and closes the pipeline,
+  which in-turn tries to close the containers.
+  At least one of the replica should be in closed state
+
+  Once we restore the network, there will be three closed replicas.
+
   """
-  first_set = [OM[0], SCM[0], DATANODES[0]]
-  second_set = [OM[0], SCM[0], DATANODES[1]]
-  third_set = [OM[0], SCM[0], DATANODES[2]]
-  Blockade.blockade_create_partition(first_set, second_set, third_set)
-  Blockade.blockade_status()
-  ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
-  logger.info("Waiting for %s seconds before checking container status",
-              os.environ["CONTAINER_STATUS_SLEEP"])
-  time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
-  all_datanodes_container_status = \
-    ClusterUtils.findall_container_status(FILE, SCALE)
-  closed_container_datanodes = [x for x in all_datanodes_container_status
-                                if x == 'CLOSED']
-  assert len(closed_container_datanodes) >= 2, \
-    "The container should have at least two closed replicas."
-
-  if str(run_second_phase).lower() == "true":
-    ClusterUtils.cluster_setup(FILE, INCREASED_SCALE, False)
-    Blockade.blockade_status()
-    logger.info("Waiting for %s seconds before checking container status",
-                os.environ["CONTAINER_STATUS_SLEEP"])
-    time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
-    all_datanodes_container_status = \
-      ClusterUtils.findall_container_status(FILE, INCREASED_SCALE)
-    closed_container_datanodes = [x for x in all_datanodes_container_status
-                                  if x == 'CLOSED']
-    assert len(closed_container_datanodes) >= 3, \
-      "The container should have at least three closed replicas."
-    Blockade.blockade_join()
-    Blockade.blockade_status()
-    _, output = \
-      ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
-    assert re.search("Status: Success", output) is not None
\ No newline at end of file
+  cluster.run_freon(1, 1, 1, 10240)
+
+  assert len(cluster.get_container_states(cluster.datanodes[0])) != 0
+  assert len(cluster.get_container_states(cluster.datanodes[1])) != 0
+  assert len(cluster.get_container_states(cluster.datanodes[2])) != 0
+
+  logger.info("Partitioning the network")
+  first_set = [cluster.om, cluster.scm, cluster.datanodes[0]]
+  second_set = [cluster.om, cluster.scm, cluster.datanodes[1]]
+  third_set = [cluster.om, cluster.scm, cluster.datanodes[2]]
+  cluster.partition_network(first_set, second_set, third_set)
+
+  logger.info("Waiting for the replica to be CLOSED")
+  util.wait_until(
+    lambda: cluster.container_state_predicate_one_closed(cluster.datanodes),
+    int(os.environ["CONTAINER_STATUS_SLEEP"]), 10)
+
+  # At least one of the replica should be in closed state
+  assert cluster.container_state_predicate_one_closed(cluster.datanodes)
+
+  # After restoring the network all the replicas should be in
+  # CLOSED state
+  logger.info("Restoring the network")
+  cluster.restore_network()
+
+  logger.info("Waiting for the container to be replicated")
+  util.wait_until(
+    lambda: cluster.container_state_predicate_all_closed(cluster.datanodes),
+    int(os.environ["CONTAINER_STATUS_SLEEP"]), 10)
+  assert cluster.container_state_predicate_all_closed(cluster.datanodes)
\ No newline at end of file
diff --git a/hadoop-ozone/dist/src/main/blockade/test_blockade_flaky.py 
b/hadoop-ozone/dist/src/main/blockade/test_blockade_flaky.py
index 3129600..a79bd4f 100644
--- a/hadoop-ozone/dist/src/main/blockade/test_blockade_flaky.py
+++ b/hadoop-ozone/dist/src/main/blockade/test_blockade_flaky.py
@@ -16,11 +16,11 @@
 # limitations under the License.
 
 import os
-import time
 import logging
+import random
 import pytest
 from blockadeUtils.blockade import Blockade
-from clusterUtils.cluster_utils import ClusterUtils
+from ozone.cluster import Cluster
 
 
 logger = logging.getLogger(__name__)
@@ -32,30 +32,36 @@ SCALE = 6
 CONTAINER_LIST = []
 
 
-def setup_module():
-    global CONTAINER_LIST
-    Blockade.blockade_destroy()
-    CONTAINER_LIST = ClusterUtils.cluster_setup(FILE, SCALE)
-    exit_code, output = Blockade.blockade_status()
-    assert exit_code == 0, "blockade status command failed with output=[%s]" % 
\
-                           output
+def setup_function(function):
+  global cluster
+  cluster = Cluster.create()
+  cluster.start()
 
 
-def teardown_module():
-    Blockade.blockade_destroy()
-    ClusterUtils.cluster_destroy(FILE)
+def teardown_function(function):
+  cluster.stop()
 
 
-def teardown():
-    logger.info("Inside teardown")
-    Blockade.blockade_fast_all()
-    time.sleep(5)
+@pytest.mark.parametrize("flaky_node", ["datanode", "scm", "om", "all"])
+def test_flaky(flaky_node):
+    """
+    In these tests, we make the network of the nodes as flaky using blockade.
+    There are 4 tests :
+    1) one of the datanodes selected randomly and network of the datanode is
+    made flaky.
+    2) scm network is made flaky.
+    3) om network is made flaky.
+    4) Network of all the nodes are made flaky.
 
+    """
+    flaky_container_name = {
+        "scm": cluster.scm,
+        "om": cluster.om,
+        "datanode": random.choice(cluster.datanodes),
+        "all": "--all"
+    }[flaky_node]
 
-@pytest.mark.parametrize("flaky_nodes", ["datanode", "scm", "om", "all"])
-def test_flaky(flaky_nodes):
-    Blockade.make_flaky(flaky_nodes, CONTAINER_LIST)
+    Blockade.make_flaky(flaky_container_name)
     Blockade.blockade_status()
-    exit_code, output = ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS",
-                                               "THREE")
+    exit_code, output = cluster.run_freon(1, 1, 1, 10240)
     assert exit_code == 0, "freon run failed with output=[%s]" % output
\ No newline at end of file
diff --git a/hadoop-ozone/dist/src/main/blockade/util.py 
b/hadoop-ozone/dist/src/main/blockade/util.py
new file mode 100644
index 0000000..84f7fda
--- /dev/null
+++ b/hadoop-ozone/dist/src/main/blockade/util.py
@@ -0,0 +1,52 @@
+#!/usr/bin/python
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import time
+import re
+import logging
+import subprocess
+
+logger = logging.getLogger(__name__)
+
+def wait_until(predicate, timeout, check_frequency=1):
+  deadline = time.time() + timeout
+  while time.time() < deadline:
+    if predicate():
+      return
+    time.sleep(check_frequency)
+
+
+def run_cmd(cmd):
+    command = cmd
+    if isinstance(cmd, list):
+      command = ' '.join(cmd)
+    logger.info(" RUNNING: %s", command)
+    all_output = ""
+    my_process = subprocess.Popen(command,  stdout=subprocess.PIPE,
+                                  stderr=subprocess.STDOUT, shell=True)
+    while my_process.poll() is None:
+      op = my_process.stdout.readline()
+      if op:
+        all_output += op
+        logger.info(op)
+    other_output = my_process.communicate()
+    other_output = other_output[0].strip()
+    if other_output != "":
+      all_output += other_output
+    reg = re.compile(r"(\r\n|\n)$")
+    all_output = reg.sub("", all_output, 1)
+    return my_process.returncode, all_output
diff --git a/hadoop-ozone/dist/src/main/compose/ozoneblockade/docker-config 
b/hadoop-ozone/dist/src/main/compose/ozoneblockade/docker-config
index dae9ddb..f5e6a92 100644
--- a/hadoop-ozone/dist/src/main/compose/ozoneblockade/docker-config
+++ b/hadoop-ozone/dist/src/main/compose/ozoneblockade/docker-config
@@ -26,6 +26,12 @@ OZONE-SITE.XML_ozone.scm.client.address=scm
 OZONE-SITE.XML_ozone.scm.dead.node.interval=5m
 OZONE-SITE.XML_ozone.replication=1
 OZONE-SITE.XML_hdds.datanode.dir=/data/hdds
+OZONE-SITE.XML_ozone.scm.pipeline.owner.container.count=1
+OZONE-SITE.XML_ozone.scm.pipeline.destroy.timeout=15s
+OZONE-SITE.XML_hdds.heartbeat.interval=2s
+OZONE-SITE.XML_hdds.scm.replication.thread.interval=5s
+OZONE-SITE.XML_hdds.scm.replication.event.timeout=7s
+OZONE-SITE.XML_dfs.ratis.server.failure.duration=25s
 HDFS-SITE.XML_rpc.metrics.quantile.enable=true
 HDFS-SITE.XML_rpc.metrics.percentiles.intervals=60,300
 LOG4J.PROPERTIES_log4j.rootLogger=INFO, stdout


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to