This patch replaces the big and unmaintainable unittest for removing SSH keys by several small ones, which each cover one of the aspects of the big test.
Note that a certain amount of code duplication is intended as there are more refactorings coming. Signed-off-by: Helga Velroyen <[email protected]> --- test/py/ganeti.backend_unittest.py | 306 +++++++++++++++++-------------------- test/py/testutils_ssh.py | 25 ++- 2 files changed, 166 insertions(+), 165 deletions(-) diff --git a/test/py/ganeti.backend_unittest.py b/test/py/ganeti.backend_unittest.py index bb601f5..cd2e24c 100755 --- a/test/py/ganeti.backend_unittest.py +++ b/test/py/ganeti.backend_unittest.py @@ -1050,42 +1050,6 @@ class TestAddRemoveGenerateNodeSshKey(testutils.GanetiTestCase): def _TearDownTestData(self): os.remove(self._pub_key_file) - def _KeyOperationExecuted(self, key_data, node_name, expected_type, - expected_key, action_types): - if not node_name in key_data: - return False - for data in key_data[node_name]: - if expected_type in data: - (action, key_dict) = data[expected_type] - if action in action_types: - for key_list in key_dict.values(): - if expected_key in key_list: - return True - return False - - def _KeyReceived(self, key_data, node_name, expected_type, - expected_key): - return self._KeyOperationExecuted( - key_data, node_name, expected_type, expected_key, - [constants.SSHS_ADD, constants.SSHS_OVERRIDE, - constants.SSHS_REPLACE_OR_ADD]) - - def _KeyRemoved(self, key_data, node_name, expected_type, - expected_key): - if self._KeyOperationExecuted( - key_data, node_name, expected_type, expected_key, - [constants.SSHS_REMOVE]): - return True - else: - if not node_name in key_data: - return False - for data in key_data[node_name]: - if expected_type in data: - (action, key_dict) = data[expected_type] - if action == constants.SSHS_CLEAR: - return True - return False - def _GetCallsPerNode(self): calls_per_node = {} for (pos, keyword) in self._run_cmd_mock.call_args_list: @@ -1226,7 +1190,7 @@ class TestAddRemoveGenerateNodeSshKey(testutils.GanetiTestCase): # Get one of the potential master candidates node_name, node_uuid, node_key, pot_mc, mc, master = \ - self._ssh_file_manager.GetAllPotentialMasterCandidates()[0] + self._ssh_file_manager.GetAllPurePotentialMasterCandidates()[0] # Update it's role to master candidate in the test data self._ssh_file_manager.SetOrAddNode(node_name, node_uuid, node_key, pot_mc, True, master) @@ -1249,133 +1213,147 @@ class TestAddRemoveGenerateNodeSshKey(testutils.GanetiTestCase): self._TearDownTestData() - def testRemoveNodeSshKeyValid(self): - node_name = "node_name" - node_uuid = "node_uuid" - node_key = "node_key" - - for (from_authorized_keys, from_public_keys, - clear_authorized_keys, clear_public_keys) in \ - [(True, True, False, False), - (True, False, False, False), - (False, True, False, False), - (False, True, True, False), - (False, False, True, False), - (True, True, True, False), - (True, True, True, True), - ]: - - self._SetupTestData() - - # set up public key file, ssconf store, and node lists - if from_public_keys or from_authorized_keys: - ssh.AddPublicKey(node_uuid, node_key, key_file=self._pub_key_file) - self._potential_master_candidates.append(node_name) - if from_authorized_keys: - ssh.AddAuthorizedKeys(self._pub_key_file, [node_key]) - - self._ssh_port_map[node_name] = self._SSH_PORT - - if from_authorized_keys: - self._master_candidate_uuids.append(node_uuid) - - backend.RemoveNodeSshKey(node_uuid, node_name, - self._master_candidate_uuids, - self._potential_master_candidates, - self._ssh_port_map, - from_authorized_keys=from_authorized_keys, - from_public_keys=from_public_keys, - clear_authorized_keys=clear_authorized_keys, - clear_public_keys=clear_public_keys, - pub_key_file=self._pub_key_file, - ssconf_store=self._ssconf_mock, - noded_cert_file=self.noded_cert_file, - run_cmd_fn=self._run_cmd_mock) - - calls_per_node = self._GetCallsPerNode() - - # one sample node per type (master candidate, potential master candidate, - # normal node) - mc_idx = 3 - pot_mc_idx = 7 - normal_idx = 12 - sample_nodes = [mc_idx, pot_mc_idx, normal_idx] - pot_sample_nodes = [mc_idx, pot_mc_idx] - - if from_authorized_keys: - for node_idx in sample_nodes: - self.assertTrue(self._KeyRemoved( - calls_per_node, "node_name_%i" % node_idx, - constants.SSHS_SSH_AUTHORIZED_KEYS, node_key), - "Node %i did not get request to remove authorized key '%s'" - " although it should have." % (node_idx, node_key)) - else: - for node_idx in sample_nodes: - self.assertFalse(self._KeyRemoved( - calls_per_node, "node_name_%i" % node_idx, - constants.SSHS_SSH_AUTHORIZED_KEYS, node_key), - "Node %i got requested to remove authorized key '%s', although it" - " should not have." % (node_idx, node_key)) - - if from_public_keys: - for node_idx in pot_sample_nodes: - self.assertTrue(self._KeyRemoved( - calls_per_node, "node_name_%i" % node_idx, - constants.SSHS_SSH_PUBLIC_KEYS, node_key), - "Node %i did not receive request to remove public key '%s'," - " although it should have." % (node_idx, node_key)) - self.assertTrue(self._KeyRemoved( - calls_per_node, node_name, - constants.SSHS_SSH_PUBLIC_KEYS, node_key), - "Node %s did not receive request to remove its own public key '%s'," - " although it should have." % (node_name, node_key)) - for node_idx in list(set(sample_nodes) - set(pot_sample_nodes)): - self.assertFalse(self._KeyRemoved( - calls_per_node, "node_name_%i" % node_idx, - constants.SSHS_SSH_PUBLIC_KEYS, node_key), - "Node %i received a request to remove public key '%s'," - " although it should not have." % (node_idx, node_key)) - else: - for node_idx in sample_nodes: - self.assertFalse(self._KeyRemoved( - calls_per_node, "node_name_%i" % node_idx, - constants.SSHS_SSH_PUBLIC_KEYS, node_key), - "Node %i received a request to remove public key '%s'," - " although it should not have." % (node_idx, node_key)) - - if clear_authorized_keys: - for node_idx in list(set(sample_nodes) - set([mc_idx])): - key = "key%s" % node_idx - self.assertFalse(self._KeyRemoved( - calls_per_node, node_name, - constants.SSHS_SSH_AUTHORIZED_KEYS, key), - "Node %s did receive request to remove authorized key '%s'," - " although it should not have." % (node_name, key)) - mc_key = "key%s" % mc_idx - self.assertTrue(self._KeyRemoved( - calls_per_node, node_name, - constants.SSHS_SSH_AUTHORIZED_KEYS, mc_key), - "Node %s did not receive request to remove authorized key '%s'," - " although it should have." % (node_name, mc_key)) - else: - for node_idx in sample_nodes: - key = "key%s" % node_idx - self.assertFalse(self._KeyRemoved( - calls_per_node, node_name, - constants.SSHS_SSH_AUTHORIZED_KEYS, key), - "Node %s did receive request to remove authorized key '%s'," - " although it should not have." % (node_name, key)) - - if clear_public_keys: - # Checks if the node is cleared of all other potential master candidate - # nodes' public keys - for node_idx in pot_sample_nodes: - key = "key%s" % node_idx - self.assertTrue(self._KeyRemoved( - calls_per_node, node_name, - constants.SSHS_SSH_PUBLIC_KEYS, mc_key), - "Node %s did not receive request to remove public key '%s'," - " although it should have." % (node_name, key)) + def testRemoveMasterCandidate(self): + self._SetupTestData() + (node_name, node_uuid, node_key, is_potential_master_candidate, + is_master_candidate, is_master) = \ + self._ssh_file_manager.GetAllMasterCandidates()[0] + + backend.RemoveNodeSshKey(node_uuid, node_name, + self._master_candidate_uuids, + self._potential_master_candidates, + self._ssh_port_map, + from_authorized_keys=True, + from_public_keys=True, + clear_authorized_keys=True, + clear_public_keys=True, + pub_key_file=self._pub_key_file, + ssconf_store=self._ssconf_mock, + noded_cert_file=self.noded_cert_file, + run_cmd_fn=self._run_cmd_mock) + + self.assertTrue(self._ssh_file_manager.NoNodeHasPublicKey( + node_uuid, node_key)) + self.assertTrue(self._ssh_file_manager.NoNodeHasAuthorizedKey(node_key)) + self.assertEqual(0, + len(self._ssh_file_manager.GetPublicKeysOfNode(node_name))) + self.assertEqual(0, + len(self._ssh_file_manager.GetAuthorizedKeysOfNode(node_name))) + + self._TearDownTestData() + + def testRemovePotentialMasterCandidate(self): + self._SetupTestData() + (node_name, node_uuid, node_key, is_potential_master_candidate, + is_master_candidate, is_master) = \ + self._ssh_file_manager.GetAllPurePotentialMasterCandidates()[0] + + backend.RemoveNodeSshKey(node_uuid, node_name, + self._master_candidate_uuids, + self._potential_master_candidates, + self._ssh_port_map, + from_authorized_keys=False, + from_public_keys=True, + clear_authorized_keys=True, + clear_public_keys=True, + pub_key_file=self._pub_key_file, + ssconf_store=self._ssconf_mock, + noded_cert_file=self.noded_cert_file, + run_cmd_fn=self._run_cmd_mock) + + self.assertTrue(self._ssh_file_manager.NoNodeHasPublicKey( + node_uuid, node_key)) + self.assertTrue(self._ssh_file_manager.NoNodeHasAuthorizedKey(node_key)) + self.assertEqual(0, + len(self._ssh_file_manager.GetPublicKeysOfNode(node_name))) + self.assertEqual(0, + len(self._ssh_file_manager.GetAuthorizedKeysOfNode(node_name))) + + self._TearDownTestData() + + def testRemoveNormalNode(self): + self._SetupTestData() + (node_name, node_uuid, node_key, is_potential_master_candidate, + is_master_candidate, is_master) = \ + self._ssh_file_manager.GetAllNormalNodes()[0] + + backend.RemoveNodeSshKey(node_uuid, node_name, + self._master_candidate_uuids, + self._potential_master_candidates, + self._ssh_port_map, + from_authorized_keys=False, + from_public_keys=False, + clear_authorized_keys=True, + clear_public_keys=True, + pub_key_file=self._pub_key_file, + ssconf_store=self._ssconf_mock, + noded_cert_file=self.noded_cert_file, + run_cmd_fn=self._run_cmd_mock) + + self.assertTrue(self._ssh_file_manager.NoNodeHasPublicKey( + node_uuid, node_key)) + self.assertTrue(self._ssh_file_manager.NoNodeHasAuthorizedKey(node_key)) + self.assertEqual(0, + len(self._ssh_file_manager.GetPublicKeysOfNode(node_name))) + self.assertEqual(0, + len(self._ssh_file_manager.GetAuthorizedKeysOfNode(node_name))) + + self._TearDownTestData() + + def testDemoteMasterCandidateToPotentialMasterCandidate(self): + self._SetupTestData() + (node_name, node_uuid, node_key, is_potential_master_candidate, + is_master_candidate, is_master) = \ + self._ssh_file_manager.GetAllMasterCandidates()[0] + self._ssh_file_manager.SetOrAddNode(node_name, node_uuid, node_key, + is_potential_master_candidate, False, + is_master) + + backend.RemoveNodeSshKey(node_uuid, node_name, + self._master_candidate_uuids, + self._potential_master_candidates, + self._ssh_port_map, + from_authorized_keys=True, + from_public_keys=False, + clear_authorized_keys=False, + clear_public_keys=False, + pub_key_file=self._pub_key_file, + ssconf_store=self._ssconf_mock, + noded_cert_file=self.noded_cert_file, + run_cmd_fn=self._run_cmd_mock) + + self._ssh_file_manager.PotentialMasterCandidatesOnlyHavePublicKey(node_name) + self.assertTrue(self._ssh_file_manager.NoNodeHasAuthorizedKey(node_key)) + + self._TearDownTestData() + + def testDemotePotentialMasterCandidateToNormalNode(self): + self._SetupTestData() + (node_name, node_uuid, node_key, is_potential_master_candidate, + is_master_candidate, is_master) = \ + self._ssh_file_manager.GetAllPurePotentialMasterCandidates()[0] + self._ssh_file_manager.SetOrAddNode(node_name, node_uuid, node_key, + False, is_master_candidate, + is_master) + + backend.RemoveNodeSshKey(node_uuid, node_name, + self._master_candidate_uuids, + self._potential_master_candidates, + self._ssh_port_map, + from_authorized_keys=False, + from_public_keys=True, + clear_authorized_keys=False, + clear_public_keys=False, + pub_key_file=self._pub_key_file, + ssconf_store=self._ssconf_mock, + noded_cert_file=self.noded_cert_file, + run_cmd_fn=self._run_cmd_mock) + + self.assertTrue(self._ssh_file_manager.NoNodeHasPublicKey( + node_uuid, node_key)) + self.assertTrue(self._ssh_file_manager.NoNodeHasAuthorizedKey(node_key)) + + self._TearDownTestData() class TestVerifySshSetup(testutils.GanetiTestCase): diff --git a/test/py/testutils_ssh.py b/test/py/testutils_ssh.py index 1e7edf8..0220c50 100644 --- a/test/py/testutils_ssh.py +++ b/test/py/testutils_ssh.py @@ -166,6 +166,28 @@ class FakeSshFileManager(object): for name, (uuid, key, pot_mc, mc, master) in self._all_node_data.items() if pot_mc] + def GetAllPurePotentialMasterCandidates(self): + """Get the potential master candidates which are not master candidates.""" + return [(name, uuid, key, pot_mc, mc, master) + for name, (uuid, key, pot_mc, mc, master) + in self._all_node_data.items() if pot_mc and not mc] + + def GetAllMasterCandidates(self): + return [(name, uuid, key, pot_mc, mc, master) + for name, (uuid, key, pot_mc, mc, master) + in self._all_node_data.items() if mc] + + def GetAllNormalNodes(self): + return [(name, uuid, key, pot_mc, mc, master) + for name, (uuid, key, pot_mc, mc, master) + in self._all_node_data.items() if not mc and not pot_mc] + + def GetPublicKeysOfNode(self, node): + return self._public_keys[node] + + def GetAuthorizedKeysOfNode(self, node): + return self._authorized_keys[node] + def SetOrAddNode(self, name, uuid, key, pot_mc, mc, master): """Adds a new node to the state of the file manager. @@ -255,7 +277,8 @@ class FakeSshFileManager(object): for name in self._all_node_data.keys(): node_auth_keys = self._authorized_keys[name] if key in node_auth_keys: - return False + raise Exception("Node '%s' does have the key '%s' in its" + " 'authorized_keys' file." % (name, key)) return True def NoNodeHasPublicKey(self, uuid, key): -- 2.2.0.rc0.207.ga3a616c
