This patch replaces the big and unmaintainable unittest
for removing SSH keys by several small ones, which each
cover one of the aspects of the big test.

Note that a certain amount of code duplication is
intended as there are more refactorings coming.

Signed-off-by: Helga Velroyen <[email protected]>
---
 test/py/ganeti.backend_unittest.py | 306 +++++++++++++++++--------------------
 test/py/testutils_ssh.py           |  25 ++-
 2 files changed, 166 insertions(+), 165 deletions(-)

diff --git a/test/py/ganeti.backend_unittest.py 
b/test/py/ganeti.backend_unittest.py
index bb601f5..cd2e24c 100755
--- a/test/py/ganeti.backend_unittest.py
+++ b/test/py/ganeti.backend_unittest.py
@@ -1050,42 +1050,6 @@ class 
TestAddRemoveGenerateNodeSshKey(testutils.GanetiTestCase):
   def _TearDownTestData(self):
     os.remove(self._pub_key_file)
 
-  def _KeyOperationExecuted(self, key_data, node_name, expected_type,
-                            expected_key, action_types):
-    if not node_name in key_data:
-      return False
-    for data in key_data[node_name]:
-      if expected_type in data:
-        (action, key_dict) = data[expected_type]
-        if action in action_types:
-          for key_list in key_dict.values():
-            if expected_key in key_list:
-              return True
-    return False
-
-  def _KeyReceived(self, key_data, node_name, expected_type,
-                   expected_key):
-    return self._KeyOperationExecuted(
-      key_data, node_name, expected_type, expected_key,
-      [constants.SSHS_ADD, constants.SSHS_OVERRIDE,
-       constants.SSHS_REPLACE_OR_ADD])
-
-  def _KeyRemoved(self, key_data, node_name, expected_type,
-                  expected_key):
-    if self._KeyOperationExecuted(
-        key_data, node_name, expected_type, expected_key,
-        [constants.SSHS_REMOVE]):
-      return True
-    else:
-      if not node_name in key_data:
-        return False
-      for data in key_data[node_name]:
-        if expected_type in data:
-          (action, key_dict) = data[expected_type]
-          if action == constants.SSHS_CLEAR:
-            return True
-    return False
-
   def _GetCallsPerNode(self):
     calls_per_node = {}
     for (pos, keyword) in self._run_cmd_mock.call_args_list:
@@ -1226,7 +1190,7 @@ class 
TestAddRemoveGenerateNodeSshKey(testutils.GanetiTestCase):
 
     # Get one of the potential master candidates
     node_name, node_uuid, node_key, pot_mc, mc, master = \
-      self._ssh_file_manager.GetAllPotentialMasterCandidates()[0]
+      self._ssh_file_manager.GetAllPurePotentialMasterCandidates()[0]
     # Update it's role to master candidate in the test data
     self._ssh_file_manager.SetOrAddNode(node_name, node_uuid, node_key,
                                         pot_mc, True, master)
@@ -1249,133 +1213,147 @@ class 
TestAddRemoveGenerateNodeSshKey(testutils.GanetiTestCase):
 
     self._TearDownTestData()
 
-  def testRemoveNodeSshKeyValid(self):
-    node_name = "node_name"
-    node_uuid = "node_uuid"
-    node_key = "node_key"
-
-    for (from_authorized_keys, from_public_keys,
-         clear_authorized_keys, clear_public_keys) in \
-       [(True, True, False, False),
-        (True, False, False, False),
-        (False, True, False, False),
-        (False, True, True, False),
-        (False, False, True, False),
-        (True, True, True, False),
-        (True, True, True, True),
-       ]:
-
-      self._SetupTestData()
-
-      # set up public key file, ssconf store, and node lists
-      if from_public_keys or from_authorized_keys:
-        ssh.AddPublicKey(node_uuid, node_key, key_file=self._pub_key_file)
-        self._potential_master_candidates.append(node_name)
-      if from_authorized_keys:
-        ssh.AddAuthorizedKeys(self._pub_key_file, [node_key])
-
-      self._ssh_port_map[node_name] = self._SSH_PORT
-
-      if from_authorized_keys:
-        self._master_candidate_uuids.append(node_uuid)
-
-      backend.RemoveNodeSshKey(node_uuid, node_name,
-                               self._master_candidate_uuids,
-                               self._potential_master_candidates,
-                               self._ssh_port_map,
-                               from_authorized_keys=from_authorized_keys,
-                               from_public_keys=from_public_keys,
-                               clear_authorized_keys=clear_authorized_keys,
-                               clear_public_keys=clear_public_keys,
-                               pub_key_file=self._pub_key_file,
-                               ssconf_store=self._ssconf_mock,
-                               noded_cert_file=self.noded_cert_file,
-                               run_cmd_fn=self._run_cmd_mock)
-
-      calls_per_node = self._GetCallsPerNode()
-
-      # one sample node per type (master candidate, potential master candidate,
-      # normal node)
-      mc_idx = 3
-      pot_mc_idx = 7
-      normal_idx = 12
-      sample_nodes = [mc_idx, pot_mc_idx, normal_idx]
-      pot_sample_nodes = [mc_idx, pot_mc_idx]
-
-      if from_authorized_keys:
-        for node_idx in sample_nodes:
-          self.assertTrue(self._KeyRemoved(
-            calls_per_node, "node_name_%i" % node_idx,
-            constants.SSHS_SSH_AUTHORIZED_KEYS, node_key),
-            "Node %i did not get request to remove authorized key '%s'"
-            " although it should have." % (node_idx, node_key))
-      else:
-        for node_idx in sample_nodes:
-          self.assertFalse(self._KeyRemoved(
-            calls_per_node, "node_name_%i" % node_idx,
-            constants.SSHS_SSH_AUTHORIZED_KEYS, node_key),
-            "Node %i got requested to remove authorized key '%s', although it"
-            " should not have." % (node_idx, node_key))
-
-      if from_public_keys:
-        for node_idx in pot_sample_nodes:
-          self.assertTrue(self._KeyRemoved(
-            calls_per_node, "node_name_%i" % node_idx,
-            constants.SSHS_SSH_PUBLIC_KEYS, node_key),
-            "Node %i did not receive request to remove public key '%s',"
-            " although it should have." % (node_idx, node_key))
-        self.assertTrue(self._KeyRemoved(
-          calls_per_node, node_name,
-          constants.SSHS_SSH_PUBLIC_KEYS, node_key),
-          "Node %s did not receive request to remove its own public key '%s',"
-          " although it should have." % (node_name, node_key))
-        for node_idx in list(set(sample_nodes) - set(pot_sample_nodes)):
-          self.assertFalse(self._KeyRemoved(
-            calls_per_node, "node_name_%i" % node_idx,
-            constants.SSHS_SSH_PUBLIC_KEYS, node_key),
-            "Node %i received a request to remove public key '%s',"
-            " although it should not have." % (node_idx, node_key))
-      else:
-        for node_idx in sample_nodes:
-          self.assertFalse(self._KeyRemoved(
-            calls_per_node, "node_name_%i" % node_idx,
-            constants.SSHS_SSH_PUBLIC_KEYS, node_key),
-            "Node %i received a request to remove public key '%s',"
-            " although it should not have." % (node_idx, node_key))
-
-      if clear_authorized_keys:
-        for node_idx in list(set(sample_nodes) - set([mc_idx])):
-          key = "key%s" % node_idx
-          self.assertFalse(self._KeyRemoved(
-            calls_per_node, node_name,
-            constants.SSHS_SSH_AUTHORIZED_KEYS, key),
-            "Node %s did receive request to remove authorized key '%s',"
-            " although it should not have." % (node_name, key))
-        mc_key = "key%s" % mc_idx
-        self.assertTrue(self._KeyRemoved(
-          calls_per_node, node_name,
-          constants.SSHS_SSH_AUTHORIZED_KEYS, mc_key),
-          "Node %s did not receive request to remove authorized key '%s',"
-          " although it should have." % (node_name, mc_key))
-      else:
-        for node_idx in sample_nodes:
-          key = "key%s" % node_idx
-          self.assertFalse(self._KeyRemoved(
-            calls_per_node, node_name,
-            constants.SSHS_SSH_AUTHORIZED_KEYS, key),
-            "Node %s did receive request to remove authorized key '%s',"
-            " although it should not have." % (node_name, key))
-
-      if clear_public_keys:
-        # Checks if the node is cleared of all other potential master candidate
-        # nodes' public keys
-        for node_idx in pot_sample_nodes:
-          key = "key%s" % node_idx
-          self.assertTrue(self._KeyRemoved(
-            calls_per_node, node_name,
-            constants.SSHS_SSH_PUBLIC_KEYS, mc_key),
-            "Node %s did not receive request to remove public key '%s',"
-            " although it should have." % (node_name, key))
+  def testRemoveMasterCandidate(self):
+    self._SetupTestData()
+    (node_name, node_uuid, node_key, is_potential_master_candidate,
+     is_master_candidate, is_master) = \
+        self._ssh_file_manager.GetAllMasterCandidates()[0]
+
+    backend.RemoveNodeSshKey(node_uuid, node_name,
+                             self._master_candidate_uuids,
+                             self._potential_master_candidates,
+                             self._ssh_port_map,
+                             from_authorized_keys=True,
+                             from_public_keys=True,
+                             clear_authorized_keys=True,
+                             clear_public_keys=True,
+                             pub_key_file=self._pub_key_file,
+                             ssconf_store=self._ssconf_mock,
+                             noded_cert_file=self.noded_cert_file,
+                             run_cmd_fn=self._run_cmd_mock)
+
+    self.assertTrue(self._ssh_file_manager.NoNodeHasPublicKey(
+        node_uuid, node_key))
+    self.assertTrue(self._ssh_file_manager.NoNodeHasAuthorizedKey(node_key))
+    self.assertEqual(0,
+        len(self._ssh_file_manager.GetPublicKeysOfNode(node_name)))
+    self.assertEqual(0,
+        len(self._ssh_file_manager.GetAuthorizedKeysOfNode(node_name)))
+
+    self._TearDownTestData()
+
+  def testRemovePotentialMasterCandidate(self):
+    self._SetupTestData()
+    (node_name, node_uuid, node_key, is_potential_master_candidate,
+     is_master_candidate, is_master) = \
+        self._ssh_file_manager.GetAllPurePotentialMasterCandidates()[0]
+
+    backend.RemoveNodeSshKey(node_uuid, node_name,
+                             self._master_candidate_uuids,
+                             self._potential_master_candidates,
+                             self._ssh_port_map,
+                             from_authorized_keys=False,
+                             from_public_keys=True,
+                             clear_authorized_keys=True,
+                             clear_public_keys=True,
+                             pub_key_file=self._pub_key_file,
+                             ssconf_store=self._ssconf_mock,
+                             noded_cert_file=self.noded_cert_file,
+                             run_cmd_fn=self._run_cmd_mock)
+
+    self.assertTrue(self._ssh_file_manager.NoNodeHasPublicKey(
+        node_uuid, node_key))
+    self.assertTrue(self._ssh_file_manager.NoNodeHasAuthorizedKey(node_key))
+    self.assertEqual(0,
+        len(self._ssh_file_manager.GetPublicKeysOfNode(node_name)))
+    self.assertEqual(0,
+        len(self._ssh_file_manager.GetAuthorizedKeysOfNode(node_name)))
+
+    self._TearDownTestData()
+
+  def testRemoveNormalNode(self):
+    self._SetupTestData()
+    (node_name, node_uuid, node_key, is_potential_master_candidate,
+     is_master_candidate, is_master) = \
+        self._ssh_file_manager.GetAllNormalNodes()[0]
+
+    backend.RemoveNodeSshKey(node_uuid, node_name,
+                             self._master_candidate_uuids,
+                             self._potential_master_candidates,
+                             self._ssh_port_map,
+                             from_authorized_keys=False,
+                             from_public_keys=False,
+                             clear_authorized_keys=True,
+                             clear_public_keys=True,
+                             pub_key_file=self._pub_key_file,
+                             ssconf_store=self._ssconf_mock,
+                             noded_cert_file=self.noded_cert_file,
+                             run_cmd_fn=self._run_cmd_mock)
+
+    self.assertTrue(self._ssh_file_manager.NoNodeHasPublicKey(
+        node_uuid, node_key))
+    self.assertTrue(self._ssh_file_manager.NoNodeHasAuthorizedKey(node_key))
+    self.assertEqual(0,
+        len(self._ssh_file_manager.GetPublicKeysOfNode(node_name)))
+    self.assertEqual(0,
+        len(self._ssh_file_manager.GetAuthorizedKeysOfNode(node_name)))
+
+    self._TearDownTestData()
+
+  def testDemoteMasterCandidateToPotentialMasterCandidate(self):
+    self._SetupTestData()
+    (node_name, node_uuid, node_key, is_potential_master_candidate,
+     is_master_candidate, is_master) = \
+        self._ssh_file_manager.GetAllMasterCandidates()[0]
+    self._ssh_file_manager.SetOrAddNode(node_name, node_uuid, node_key,
+                                        is_potential_master_candidate, False,
+                                        is_master)
+
+    backend.RemoveNodeSshKey(node_uuid, node_name,
+                             self._master_candidate_uuids,
+                             self._potential_master_candidates,
+                             self._ssh_port_map,
+                             from_authorized_keys=True,
+                             from_public_keys=False,
+                             clear_authorized_keys=False,
+                             clear_public_keys=False,
+                             pub_key_file=self._pub_key_file,
+                             ssconf_store=self._ssconf_mock,
+                             noded_cert_file=self.noded_cert_file,
+                             run_cmd_fn=self._run_cmd_mock)
+
+    
self._ssh_file_manager.PotentialMasterCandidatesOnlyHavePublicKey(node_name)
+    self.assertTrue(self._ssh_file_manager.NoNodeHasAuthorizedKey(node_key))
+
+    self._TearDownTestData()
+
+  def testDemotePotentialMasterCandidateToNormalNode(self):
+    self._SetupTestData()
+    (node_name, node_uuid, node_key, is_potential_master_candidate,
+     is_master_candidate, is_master) = \
+        self._ssh_file_manager.GetAllPurePotentialMasterCandidates()[0]
+    self._ssh_file_manager.SetOrAddNode(node_name, node_uuid, node_key,
+                                        False, is_master_candidate,
+                                        is_master)
+
+    backend.RemoveNodeSshKey(node_uuid, node_name,
+                             self._master_candidate_uuids,
+                             self._potential_master_candidates,
+                             self._ssh_port_map,
+                             from_authorized_keys=False,
+                             from_public_keys=True,
+                             clear_authorized_keys=False,
+                             clear_public_keys=False,
+                             pub_key_file=self._pub_key_file,
+                             ssconf_store=self._ssconf_mock,
+                             noded_cert_file=self.noded_cert_file,
+                             run_cmd_fn=self._run_cmd_mock)
+
+    self.assertTrue(self._ssh_file_manager.NoNodeHasPublicKey(
+        node_uuid, node_key))
+    self.assertTrue(self._ssh_file_manager.NoNodeHasAuthorizedKey(node_key))
+
+    self._TearDownTestData()
 
 
 class TestVerifySshSetup(testutils.GanetiTestCase):
diff --git a/test/py/testutils_ssh.py b/test/py/testutils_ssh.py
index 1e7edf8..0220c50 100644
--- a/test/py/testutils_ssh.py
+++ b/test/py/testutils_ssh.py
@@ -166,6 +166,28 @@ class FakeSshFileManager(object):
             for name, (uuid, key, pot_mc, mc, master)
             in self._all_node_data.items() if pot_mc]
 
+  def GetAllPurePotentialMasterCandidates(self):
+    """Get the potential master candidates which are not master candidates."""
+    return [(name, uuid, key, pot_mc, mc, master)
+            for name, (uuid, key, pot_mc, mc, master)
+            in self._all_node_data.items() if pot_mc and not mc]
+
+  def GetAllMasterCandidates(self):
+    return [(name, uuid, key, pot_mc, mc, master)
+            for name, (uuid, key, pot_mc, mc, master)
+            in self._all_node_data.items() if mc]
+
+  def GetAllNormalNodes(self):
+    return [(name, uuid, key, pot_mc, mc, master)
+            for name, (uuid, key, pot_mc, mc, master)
+            in self._all_node_data.items() if not mc and not pot_mc]
+
+  def GetPublicKeysOfNode(self, node):
+    return self._public_keys[node]
+
+  def GetAuthorizedKeysOfNode(self, node):
+    return self._authorized_keys[node]
+
   def SetOrAddNode(self, name, uuid, key, pot_mc, mc, master):
     """Adds a new node to the state of the file manager.
 
@@ -255,7 +277,8 @@ class FakeSshFileManager(object):
     for name in self._all_node_data.keys():
       node_auth_keys = self._authorized_keys[name]
       if key in node_auth_keys:
-        return False
+        raise Exception("Node '%s' does have the key '%s' in its"
+                        " 'authorized_keys' file." % (name, key))
     return True
 
   def NoNodeHasPublicKey(self, uuid, key):
-- 
2.2.0.rc0.207.ga3a616c

Reply via email to