This patch replaces the big and unmaintainable unittest
for removing SSH keys by several small ones, which each
cover one of the aspects of the big test.
Note that a certain amount of code duplication is
intended as there are more refactorings coming.
Signed-off-by: Helga Velroyen <[email protected]>
---
test/py/ganeti.backend_unittest.py | 306 +++++++++++++++++--------------------
test/py/testutils_ssh.py | 25 ++-
2 files changed, 166 insertions(+), 165 deletions(-)
diff --git a/test/py/ganeti.backend_unittest.py
b/test/py/ganeti.backend_unittest.py
index bb601f5..cd2e24c 100755
--- a/test/py/ganeti.backend_unittest.py
+++ b/test/py/ganeti.backend_unittest.py
@@ -1050,42 +1050,6 @@ class
TestAddRemoveGenerateNodeSshKey(testutils.GanetiTestCase):
def _TearDownTestData(self):
os.remove(self._pub_key_file)
- def _KeyOperationExecuted(self, key_data, node_name, expected_type,
- expected_key, action_types):
- if not node_name in key_data:
- return False
- for data in key_data[node_name]:
- if expected_type in data:
- (action, key_dict) = data[expected_type]
- if action in action_types:
- for key_list in key_dict.values():
- if expected_key in key_list:
- return True
- return False
-
- def _KeyReceived(self, key_data, node_name, expected_type,
- expected_key):
- return self._KeyOperationExecuted(
- key_data, node_name, expected_type, expected_key,
- [constants.SSHS_ADD, constants.SSHS_OVERRIDE,
- constants.SSHS_REPLACE_OR_ADD])
-
- def _KeyRemoved(self, key_data, node_name, expected_type,
- expected_key):
- if self._KeyOperationExecuted(
- key_data, node_name, expected_type, expected_key,
- [constants.SSHS_REMOVE]):
- return True
- else:
- if not node_name in key_data:
- return False
- for data in key_data[node_name]:
- if expected_type in data:
- (action, key_dict) = data[expected_type]
- if action == constants.SSHS_CLEAR:
- return True
- return False
-
def _GetCallsPerNode(self):
calls_per_node = {}
for (pos, keyword) in self._run_cmd_mock.call_args_list:
@@ -1226,7 +1190,7 @@ class
TestAddRemoveGenerateNodeSshKey(testutils.GanetiTestCase):
# Get one of the potential master candidates
node_name, node_uuid, node_key, pot_mc, mc, master = \
- self._ssh_file_manager.GetAllPotentialMasterCandidates()[0]
+ self._ssh_file_manager.GetAllPurePotentialMasterCandidates()[0]
# Update it's role to master candidate in the test data
self._ssh_file_manager.SetOrAddNode(node_name, node_uuid, node_key,
pot_mc, True, master)
@@ -1249,133 +1213,147 @@ class
TestAddRemoveGenerateNodeSshKey(testutils.GanetiTestCase):
self._TearDownTestData()
- def testRemoveNodeSshKeyValid(self):
- node_name = "node_name"
- node_uuid = "node_uuid"
- node_key = "node_key"
-
- for (from_authorized_keys, from_public_keys,
- clear_authorized_keys, clear_public_keys) in \
- [(True, True, False, False),
- (True, False, False, False),
- (False, True, False, False),
- (False, True, True, False),
- (False, False, True, False),
- (True, True, True, False),
- (True, True, True, True),
- ]:
-
- self._SetupTestData()
-
- # set up public key file, ssconf store, and node lists
- if from_public_keys or from_authorized_keys:
- ssh.AddPublicKey(node_uuid, node_key, key_file=self._pub_key_file)
- self._potential_master_candidates.append(node_name)
- if from_authorized_keys:
- ssh.AddAuthorizedKeys(self._pub_key_file, [node_key])
-
- self._ssh_port_map[node_name] = self._SSH_PORT
-
- if from_authorized_keys:
- self._master_candidate_uuids.append(node_uuid)
-
- backend.RemoveNodeSshKey(node_uuid, node_name,
- self._master_candidate_uuids,
- self._potential_master_candidates,
- self._ssh_port_map,
- from_authorized_keys=from_authorized_keys,
- from_public_keys=from_public_keys,
- clear_authorized_keys=clear_authorized_keys,
- clear_public_keys=clear_public_keys,
- pub_key_file=self._pub_key_file,
- ssconf_store=self._ssconf_mock,
- noded_cert_file=self.noded_cert_file,
- run_cmd_fn=self._run_cmd_mock)
-
- calls_per_node = self._GetCallsPerNode()
-
- # one sample node per type (master candidate, potential master candidate,
- # normal node)
- mc_idx = 3
- pot_mc_idx = 7
- normal_idx = 12
- sample_nodes = [mc_idx, pot_mc_idx, normal_idx]
- pot_sample_nodes = [mc_idx, pot_mc_idx]
-
- if from_authorized_keys:
- for node_idx in sample_nodes:
- self.assertTrue(self._KeyRemoved(
- calls_per_node, "node_name_%i" % node_idx,
- constants.SSHS_SSH_AUTHORIZED_KEYS, node_key),
- "Node %i did not get request to remove authorized key '%s'"
- " although it should have." % (node_idx, node_key))
- else:
- for node_idx in sample_nodes:
- self.assertFalse(self._KeyRemoved(
- calls_per_node, "node_name_%i" % node_idx,
- constants.SSHS_SSH_AUTHORIZED_KEYS, node_key),
- "Node %i got requested to remove authorized key '%s', although it"
- " should not have." % (node_idx, node_key))
-
- if from_public_keys:
- for node_idx in pot_sample_nodes:
- self.assertTrue(self._KeyRemoved(
- calls_per_node, "node_name_%i" % node_idx,
- constants.SSHS_SSH_PUBLIC_KEYS, node_key),
- "Node %i did not receive request to remove public key '%s',"
- " although it should have." % (node_idx, node_key))
- self.assertTrue(self._KeyRemoved(
- calls_per_node, node_name,
- constants.SSHS_SSH_PUBLIC_KEYS, node_key),
- "Node %s did not receive request to remove its own public key '%s',"
- " although it should have." % (node_name, node_key))
- for node_idx in list(set(sample_nodes) - set(pot_sample_nodes)):
- self.assertFalse(self._KeyRemoved(
- calls_per_node, "node_name_%i" % node_idx,
- constants.SSHS_SSH_PUBLIC_KEYS, node_key),
- "Node %i received a request to remove public key '%s',"
- " although it should not have." % (node_idx, node_key))
- else:
- for node_idx in sample_nodes:
- self.assertFalse(self._KeyRemoved(
- calls_per_node, "node_name_%i" % node_idx,
- constants.SSHS_SSH_PUBLIC_KEYS, node_key),
- "Node %i received a request to remove public key '%s',"
- " although it should not have." % (node_idx, node_key))
-
- if clear_authorized_keys:
- for node_idx in list(set(sample_nodes) - set([mc_idx])):
- key = "key%s" % node_idx
- self.assertFalse(self._KeyRemoved(
- calls_per_node, node_name,
- constants.SSHS_SSH_AUTHORIZED_KEYS, key),
- "Node %s did receive request to remove authorized key '%s',"
- " although it should not have." % (node_name, key))
- mc_key = "key%s" % mc_idx
- self.assertTrue(self._KeyRemoved(
- calls_per_node, node_name,
- constants.SSHS_SSH_AUTHORIZED_KEYS, mc_key),
- "Node %s did not receive request to remove authorized key '%s',"
- " although it should have." % (node_name, mc_key))
- else:
- for node_idx in sample_nodes:
- key = "key%s" % node_idx
- self.assertFalse(self._KeyRemoved(
- calls_per_node, node_name,
- constants.SSHS_SSH_AUTHORIZED_KEYS, key),
- "Node %s did receive request to remove authorized key '%s',"
- " although it should not have." % (node_name, key))
-
- if clear_public_keys:
- # Checks if the node is cleared of all other potential master candidate
- # nodes' public keys
- for node_idx in pot_sample_nodes:
- key = "key%s" % node_idx
- self.assertTrue(self._KeyRemoved(
- calls_per_node, node_name,
- constants.SSHS_SSH_PUBLIC_KEYS, mc_key),
- "Node %s did not receive request to remove public key '%s',"
- " although it should have." % (node_name, key))
+ def testRemoveMasterCandidate(self):
+ self._SetupTestData()
+ (node_name, node_uuid, node_key, is_potential_master_candidate,
+ is_master_candidate, is_master) = \
+ self._ssh_file_manager.GetAllMasterCandidates()[0]
+
+ backend.RemoveNodeSshKey(node_uuid, node_name,
+ self._master_candidate_uuids,
+ self._potential_master_candidates,
+ self._ssh_port_map,
+ from_authorized_keys=True,
+ from_public_keys=True,
+ clear_authorized_keys=True,
+ clear_public_keys=True,
+ pub_key_file=self._pub_key_file,
+ ssconf_store=self._ssconf_mock,
+ noded_cert_file=self.noded_cert_file,
+ run_cmd_fn=self._run_cmd_mock)
+
+ self.assertTrue(self._ssh_file_manager.NoNodeHasPublicKey(
+ node_uuid, node_key))
+ self.assertTrue(self._ssh_file_manager.NoNodeHasAuthorizedKey(node_key))
+ self.assertEqual(0,
+ len(self._ssh_file_manager.GetPublicKeysOfNode(node_name)))
+ self.assertEqual(0,
+ len(self._ssh_file_manager.GetAuthorizedKeysOfNode(node_name)))
+
+ self._TearDownTestData()
+
+ def testRemovePotentialMasterCandidate(self):
+ self._SetupTestData()
+ (node_name, node_uuid, node_key, is_potential_master_candidate,
+ is_master_candidate, is_master) = \
+ self._ssh_file_manager.GetAllPurePotentialMasterCandidates()[0]
+
+ backend.RemoveNodeSshKey(node_uuid, node_name,
+ self._master_candidate_uuids,
+ self._potential_master_candidates,
+ self._ssh_port_map,
+ from_authorized_keys=False,
+ from_public_keys=True,
+ clear_authorized_keys=True,
+ clear_public_keys=True,
+ pub_key_file=self._pub_key_file,
+ ssconf_store=self._ssconf_mock,
+ noded_cert_file=self.noded_cert_file,
+ run_cmd_fn=self._run_cmd_mock)
+
+ self.assertTrue(self._ssh_file_manager.NoNodeHasPublicKey(
+ node_uuid, node_key))
+ self.assertTrue(self._ssh_file_manager.NoNodeHasAuthorizedKey(node_key))
+ self.assertEqual(0,
+ len(self._ssh_file_manager.GetPublicKeysOfNode(node_name)))
+ self.assertEqual(0,
+ len(self._ssh_file_manager.GetAuthorizedKeysOfNode(node_name)))
+
+ self._TearDownTestData()
+
+ def testRemoveNormalNode(self):
+ self._SetupTestData()
+ (node_name, node_uuid, node_key, is_potential_master_candidate,
+ is_master_candidate, is_master) = \
+ self._ssh_file_manager.GetAllNormalNodes()[0]
+
+ backend.RemoveNodeSshKey(node_uuid, node_name,
+ self._master_candidate_uuids,
+ self._potential_master_candidates,
+ self._ssh_port_map,
+ from_authorized_keys=False,
+ from_public_keys=False,
+ clear_authorized_keys=True,
+ clear_public_keys=True,
+ pub_key_file=self._pub_key_file,
+ ssconf_store=self._ssconf_mock,
+ noded_cert_file=self.noded_cert_file,
+ run_cmd_fn=self._run_cmd_mock)
+
+ self.assertTrue(self._ssh_file_manager.NoNodeHasPublicKey(
+ node_uuid, node_key))
+ self.assertTrue(self._ssh_file_manager.NoNodeHasAuthorizedKey(node_key))
+ self.assertEqual(0,
+ len(self._ssh_file_manager.GetPublicKeysOfNode(node_name)))
+ self.assertEqual(0,
+ len(self._ssh_file_manager.GetAuthorizedKeysOfNode(node_name)))
+
+ self._TearDownTestData()
+
+ def testDemoteMasterCandidateToPotentialMasterCandidate(self):
+ self._SetupTestData()
+ (node_name, node_uuid, node_key, is_potential_master_candidate,
+ is_master_candidate, is_master) = \
+ self._ssh_file_manager.GetAllMasterCandidates()[0]
+ self._ssh_file_manager.SetOrAddNode(node_name, node_uuid, node_key,
+ is_potential_master_candidate, False,
+ is_master)
+
+ backend.RemoveNodeSshKey(node_uuid, node_name,
+ self._master_candidate_uuids,
+ self._potential_master_candidates,
+ self._ssh_port_map,
+ from_authorized_keys=True,
+ from_public_keys=False,
+ clear_authorized_keys=False,
+ clear_public_keys=False,
+ pub_key_file=self._pub_key_file,
+ ssconf_store=self._ssconf_mock,
+ noded_cert_file=self.noded_cert_file,
+ run_cmd_fn=self._run_cmd_mock)
+
+
self._ssh_file_manager.PotentialMasterCandidatesOnlyHavePublicKey(node_name)
+ self.assertTrue(self._ssh_file_manager.NoNodeHasAuthorizedKey(node_key))
+
+ self._TearDownTestData()
+
+ def testDemotePotentialMasterCandidateToNormalNode(self):
+ self._SetupTestData()
+ (node_name, node_uuid, node_key, is_potential_master_candidate,
+ is_master_candidate, is_master) = \
+ self._ssh_file_manager.GetAllPurePotentialMasterCandidates()[0]
+ self._ssh_file_manager.SetOrAddNode(node_name, node_uuid, node_key,
+ False, is_master_candidate,
+ is_master)
+
+ backend.RemoveNodeSshKey(node_uuid, node_name,
+ self._master_candidate_uuids,
+ self._potential_master_candidates,
+ self._ssh_port_map,
+ from_authorized_keys=False,
+ from_public_keys=True,
+ clear_authorized_keys=False,
+ clear_public_keys=False,
+ pub_key_file=self._pub_key_file,
+ ssconf_store=self._ssconf_mock,
+ noded_cert_file=self.noded_cert_file,
+ run_cmd_fn=self._run_cmd_mock)
+
+ self.assertTrue(self._ssh_file_manager.NoNodeHasPublicKey(
+ node_uuid, node_key))
+ self.assertTrue(self._ssh_file_manager.NoNodeHasAuthorizedKey(node_key))
+
+ self._TearDownTestData()
class TestVerifySshSetup(testutils.GanetiTestCase):
diff --git a/test/py/testutils_ssh.py b/test/py/testutils_ssh.py
index 1e7edf8..0220c50 100644
--- a/test/py/testutils_ssh.py
+++ b/test/py/testutils_ssh.py
@@ -166,6 +166,28 @@ class FakeSshFileManager(object):
for name, (uuid, key, pot_mc, mc, master)
in self._all_node_data.items() if pot_mc]
+ def GetAllPurePotentialMasterCandidates(self):
+ """Get the potential master candidates which are not master candidates."""
+ return [(name, uuid, key, pot_mc, mc, master)
+ for name, (uuid, key, pot_mc, mc, master)
+ in self._all_node_data.items() if pot_mc and not mc]
+
+ def GetAllMasterCandidates(self):
+ return [(name, uuid, key, pot_mc, mc, master)
+ for name, (uuid, key, pot_mc, mc, master)
+ in self._all_node_data.items() if mc]
+
+ def GetAllNormalNodes(self):
+ return [(name, uuid, key, pot_mc, mc, master)
+ for name, (uuid, key, pot_mc, mc, master)
+ in self._all_node_data.items() if not mc and not pot_mc]
+
+ def GetPublicKeysOfNode(self, node):
+ return self._public_keys[node]
+
+ def GetAuthorizedKeysOfNode(self, node):
+ return self._authorized_keys[node]
+
def SetOrAddNode(self, name, uuid, key, pot_mc, mc, master):
"""Adds a new node to the state of the file manager.
@@ -255,7 +277,8 @@ class FakeSshFileManager(object):
for name in self._all_node_data.keys():
node_auth_keys = self._authorized_keys[name]
if key in node_auth_keys:
- return False
+ raise Exception("Node '%s' does have the key '%s' in its"
+ " 'authorized_keys' file." % (name, key))
return True
def NoNodeHasPublicKey(self, uuid, key):
--
2.2.0.rc0.207.ga3a616c