[ https://issues.apache.org/jira/browse/CLOUDSTACK-9304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15188696#comment-15188696 ]
ASF GitHub Bot commented on CLOUDSTACK-9304: -------------------------------------------- Github user sanju1010 commented on a diff in the pull request: https://github.com/apache/cloudstack/pull/1431#discussion_r55635451 --- Diff: test/integration/plugins/nuagevsp/nuageTestCase.py --- @@ -0,0 +1,830 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" Custom base class for NuageVsp SDN Plugin specific Marvin tests +""" +# Import Local Modules +from marvin.cloudstackTestCase import cloudstackTestCase, unittest +from marvin.lib.base import (NetworkServiceProvider, + ServiceOffering, + NetworkOffering, + Network, + Router, + Nuage, + VPC, + VpcOffering, + PublicIPAddress, + VirtualMachine, + StaticNATRule, + NetworkACLList, + NetworkACL, + FireWallRule, + EgressFireWallRule, + Host) +from marvin.lib.common import (get_zone, + get_domain, + get_template, + list_templates, + wait_for_cleanup) +from marvin.lib.utils import cleanup_resources +from marvin.cloudstackAPI import (listPhysicalNetworks, + updateConfiguration, + updateTemplate, + listConfigurations, + listHypervisors, + stopRouter, + startRouter) +# Import nosetests Modules +from nose.plugins.attrib import attr + +# Import System Modules +import socket +import importlib +import logging + + +class nuageTestCase(cloudstackTestCase): + + @classmethod + def setUpClass(cls): + cls.debug("setUpClass nuageTestCase") + + # We want to fail quicker, if it's a failure + socket.setdefaulttimeout(60) + + test_client = super(nuageTestCase, cls).getClsTestClient() + cls.api_client = test_client.getApiClient() + cls.db_client = test_client.getDbConnection() + cls.test_data = test_client.getParsedTestDataConfig() + + # Get Zone, Domain and templates + cls.zone = get_zone(cls.api_client) + cls.domain = get_domain(cls.api_client) + cls.template = get_template(cls.api_client, + cls.zone.id, + cls.test_data["ostype"] + ) + cls.test_data["virtual_machine"]["zoneid"] = cls.zone.id + cls.test_data["virtual_machine"]["template"] = cls.template.id + + # Create service offering + cls.service_offering = ServiceOffering.create(cls.api_client, + cls.test_data["service_offering"] + ) + cls._cleanup = [cls.service_offering] + + # Get configured Nuage Vsp device details + try: + resp = listPhysicalNetworks.listPhysicalNetworksCmd() + resp.zoneid = cls.zone.id + physical_networks = cls.api_client.listPhysicalNetworks(resp) + for pn in physical_networks: + if pn.isolationmethods == 'VSP': + cls.vsp_physical_network = pn + cls.nuage_vsp_device = Nuage.list(cls.api_client, + physicalnetworkid=cls.vsp_physical_network.id + )[0] + pns = cls.config.zones[0].physical_networks + providers = filter(lambda physical_network: 'VSP' in physical_network.isolationmethods, pns)[0].providers + devices = filter(lambda provider: provider.name == 'NuageVsp', providers)[0].devices + cls.nuage_vsp_device.username = devices[0].username + cls.nuage_vsp_device.password = devices[0].password + listConfigurationsCmd = listConfigurations.listConfigurationsCmd() + listConfigurationsCmd.name = "nuagevsp.cms.id" + listConfigurationsCmd.scopename = "global" + cs_config_dict = cls.api_client.listConfigurations(listConfigurationsCmd) + cls.cms_id = str(cs_config_dict[0].value).split(":")[1] + except Exception as e: + cls.tearDownClass() + raise unittest.SkipTest("Warning: Couldn't get configured Nuage Vsp device details: %s" % e) + + # Check if the host hypervisor type is simulator + resp = listHypervisors.listHypervisorsCmd() + resp.zoneid = cls.zone.id + cls.isSimulator = cls.api_client.listHypervisors(resp)[0].name == 'Simulator' + + # VSD is a Python SDK for Nuage Vsp + try: + vspk_module = "vspk." + cls.nuage_vsp_device.apiversion if int(cls.nuage_vsp_device.apiversion[1]) >= 4 \ + else "vspk.vsdk." + cls.nuage_vsp_device.apiversion + vsdk = importlib.import_module(vspk_module) + vspk_utils_module = "vspk.utils" if int(cls.nuage_vsp_device.apiversion[1]) >= 4 \ + else "vspk.vsdk." + cls.nuage_vsp_device.apiversion + ".utils" + vsdk_utils = importlib.import_module(vspk_utils_module) + set_log_level = getattr(vsdk_utils, "set_log_level") + from cms_vspk_wrapper.cms_vspk_wrapper import Cms_vspk_wrapper + except: + raise unittest.SkipTest("vspk (and/or) cms_vspk_wrapper import failure") + + # Configure VSD session + cls._session = vsdk.NUVSDSession(username=cls.nuage_vsp_device.username, + password=cls.nuage_vsp_device.password, + enterprise="csp", api_url="https://%s:%d" % + (cls.nuage_vsp_device.hostname, + cls.nuage_vsp_device.port) + ) + cls._session.start() + + # Configure cms_vspk_wrapper session + cls.log_handler = logging.getLogger("CSLog").handlers[0] + vsd_info = cls.nuage_vsp_device.__dict__ + vsd_info["port"] = str(vsd_info["port"]) + cls.vsd = Cms_vspk_wrapper(vsd_info, cls.log_handler) + + set_log_level(logging.INFO) + + cls.debug("setUpClass nuageTestCase [DONE]") + + def setUp(self): + self.cleanup = [] + return + + @classmethod + def tearDownClass(cls): + try: + # Cleanup resources used + cleanup_resources(cls.api_client, cls._cleanup) + except Exception as e: + cls.debug("Warning: Exception during cleanup : %s" % e) + return + + def tearDown(self): + # Cleanup resources used + self.debug("Cleaning up the resources") + for obj in reversed(self.cleanup): + try: + if isinstance(obj, VirtualMachine): + obj.delete(self.api_client, expunge=True) + else: + obj.delete(self.api_client) + except Exception as e: + self.error("Failed to cleanup %s, got %s" % (obj, e)) + # cleanup_resources(self.api_client, self.cleanup) + self.cleanup = [] + self.debug("Cleanup complete!") + return + + def getConfigurationValue(self, name, scope="global"): + listConfigurationsCmd = listConfigurations.listConfigurationsCmd() + listConfigurationsCmd.name = name + listConfigurationsCmd.scopename = scope + if scope is "zone": + listConfigurationsCmd.zoneid = self.zone.id + return self.api_client.listConfigurations(listConfigurationsCmd) + + def setConfigurationValue(self, name, value, scope="global"): + cmd = updateConfiguration.updateConfigurationCmd() + cmd.name = name + cmd.scopename = scope + if scope is "zone": + cmd.zoneid = self.zone.id + cmd.value = value + self.api_client.updateConfiguration(cmd) + + def updateTemplate(self, value): + self.debug("UPDATE TEMPLATE") + cmd = updateTemplate.updateTemplateCmd() + cmd.id = self.template.id + cmd.passwordenabled = value + self.api_client.updateTemplate(cmd) + list_template_response = list_templates(self.api_client, + templatefilter="all", + id=self.template.id + ) + self.template = list_template_response[0] + + # Creates the vpc offering + def create_VpcOffering(self, vpc_offering, suffix=None): + self.debug('Create VpcOffering') + if suffix: + vpc_offering["name"] = "VPC_OFF-" + str(suffix) + vpc_off = VpcOffering.create(self.api_client, + vpc_offering + ) + # Enable VPC offering + vpc_off.update(self.api_client, state='Enabled') + self.cleanup.append(vpc_off) + self.debug('Created and Enabled VpcOffering') + return vpc_off + + # create_Vpc - Takes the vpc offering as arguments and creates the VPC + def create_Vpc(self, vpc_offering, cidr='10.1.1.1/16', cleanup=True): + self.debug("Creating a VPC network in the account: %s" % self.account.name) + self.test_data["vpc"]["cidr"] = cidr + vpc = VPC.create( + self.api_client, + self.test_data["vpc"], + vpcofferingid=vpc_offering.id, + zoneid=self.zone.id, + account=self.account.name, + domainid=self.account.domainid + ) + self.debug("Created VPC with ID: %s" % vpc.id) + if cleanup: + self.cleanup.append(vpc) + return vpc + + # create_NetworkOffering - Takes the network offering as argument and creates the Network Offering + def create_NetworkOffering(self, net_offering, suffix=None, conserve_mode=False): + self.debug('Create NetworkOffering') + if suffix: + net_offering["name"] = "NET_OFF-" + str(suffix) + nw_off = NetworkOffering.create(self.api_client, + net_offering, + conservemode=conserve_mode + ) + # Enable Network offering + nw_off.update(self.api_client, state='Enabled') + self.cleanup.append(nw_off) + self.debug('Created and Enabled NetworkOffering') + return nw_off + + # create_Network - Takes the network offering as argument and nw_key and creates the network + def create_Network(self, nw_off, nw_key="network", gateway='10.1.1.1', netmask='255.255.255.0', vpc=None, acl_list=None): + if not hasattr(nw_off, "id"): + nw_off = self.create_NetworkOffering(nw_off) + self.debug('Adding Network=%s' % self.test_data[nw_key]) + self.test_data[nw_key]["netmask"] = netmask + obj_network = Network.create(self.api_client, + self.test_data[nw_key], + accountid=self.account.name, + domainid=self.account.domainid, + networkofferingid=nw_off.id, + zoneid=self.zone.id, + gateway=gateway, + vpcid=vpc.id if vpc else self.vpc.id if hasattr(self, "vpc") else None, + aclid=acl_list.id if acl_list else None + ) + self.debug("Created network with ID: %s" % obj_network.id) + self.cleanup.append(obj_network) + return obj_network + + # upgrade_Network - Upgrades the given network + def upgrade_Network(self, nw_off, network): + if not hasattr(nw_off, "id"): + nw_off = self.create_NetworkOffering(nw_off, network.gateway) + self.debug('Update Network=%s' % network) + network.update( + self.api_client, + networkofferingid=nw_off.id, + changecidr=False + ) + self.debug("Updated network with ID: %s" % network.id) + + # delete_Network - Deletes the given network + def delete_Network(self, network): + self.debug('Deleting Network - %s' % network.name) + # Wait for network garbage collection before network deletion + wait_for_cleanup(self.api_client, + ["network.gc.interval", "network.gc.wait"] + ) + network.delete(self.api_client) + if network in self.cleanup: + self.cleanup.remove(network) + self.debug('Deleted Network - %s' % network.name) + + # create_VM_in_Network - Creates a VM in the given network, the vm_key - is the key for the services on the vm. + def create_VM_in_Network(self, network, vm_key="virtual_machine", host_id=None, start_vm=True): + self.debug('Creating VM in network=%s' % network.name) + self.debug('Passed vm_key=%s' % vm_key) + self.test_data[vm_key]["zoneid"] = self.zone.id + self.test_data[vm_key]["template"] = self.template.id + vm = VirtualMachine.create( + self.api_client, + self.test_data[vm_key], + accountid=self.account.name, + domainid=self.account.domainid, + serviceofferingid=self.service_offering.id, + networkids=[str(network.id)], + startvm=start_vm, + hostid=host_id + ) + self.debug('Created VM=%s in network=%s' % (vm.id, network.name)) + self.cleanup.append(vm) + return vm + + # starts the given vm + def start(self, apiclient): + """Start the instance""" + cmd = startVirtualMachine.startVirtualMachineCmd() + cmd.id = self.id + apiclient.startVirtualMachine(cmd) + response = self.getState(apiclient, VirtualMachine.RUNNING) + if response[0] == FAIL: + raise Exception(response[1]) + return + + # stops the given vm + def stop(self, apiclient, forced=None): + """Stop the instance""" + cmd = stopVirtualMachine.stopVirtualMachineCmd() + cmd.id = self.id + if forced: + cmd.forced = forced + apiclient.stopVirtualMachine(cmd) + response = self.getState(apiclient, VirtualMachine.STOPPED) + if response[0] == FAIL: + raise Exception(response[1]) + return + + # delete_VM - Deletes the given VM + def delete_VM(self, vm): + self.debug('Deleting VM - %s' % vm.name) + vm.delete(self.api_client) + # Wait for expunge interval to cleanup VM + wait_for_cleanup(self.api_client, + ["expunge.delay", "expunge.interval"] + ) + if vm in self.cleanup: + self.cleanup.remove(vm) + self.debug('Deleted VM - %s' % vm.name) + + # acquire_Public_IP - Acquires a public IP for the given network + def acquire_Public_IP(self, network, vpc=None): + self.debug("Associating public IP for network: %s" % network.name) + public_ip = PublicIPAddress.create(self.api_client, + accountid=self.account.name, + zoneid=self.zone.id, + domainid=self.account.domainid, + networkid=network.id if vpc is None else None, + vpcid=vpc.id if vpc else self.vpc.id if hasattr(self, "vpc") else None + ) + self.debug("Associated %s with network %s" % (public_ip.ipaddress.ipaddress, + network.id)) + return public_ip + + # create_StaticNatRule_For_VM - Creates static NAT rule for the given network , VM on the given public ip + def create_StaticNatRule_For_VM(self, vm, public_ip, network, vmguestip=None): + self.debug("Enabling static NAT for IP: %s" % + public_ip.ipaddress.ipaddress) + StaticNATRule.enable( + self.api_client, + ipaddressid=public_ip.ipaddress.id, + virtualmachineid=vm.id, + networkid=network.id, + vmguestip=vmguestip + ) + self.debug("Static NAT enabled for IP: %s" % + public_ip.ipaddress.ipaddress) + + # delete_StaticNatRule_For_VM - Deletes the static NAT rule for the given VM + def delete_StaticNatRule_For_VM(self, vm, public_ip): + self.debug("Disabling static NAT for IP: %s" % + public_ip.ipaddress.ipaddress) + StaticNATRule.disable( + self.api_client, + ipaddressid=public_ip.ipaddress.id, + virtualmachineid=vm.id + ) + self.debug("Static NAT disabled for IP: %s" % + public_ip.ipaddress.ipaddress) + + # create_firewall_rule - Creates the Ingress firewall rule on the given public ip + def create_firewall_rule(self, public_ip, rule=None): + if not rule: + rule = self.test_data["ingress_rule"] + self.debug("Adding an Ingress Firewall rule to make Guest VMs accessible through Static NAT") + return FireWallRule.create(self.api_client, + ipaddressid=public_ip.ipaddress.id, + protocol=rule["protocol"], + cidrlist=rule["cidrlist"], + startport=rule["startport"], + endport=rule["endport"] + ) + + # create_egress_firewall_rule - Creates the Egress firewall rule on the given public ip + def create_egress_firewall_rule(self, network, rule): + self.debug("Adding an Egress Firewall rule to allow/deny outgoing traffic from Guest VMs") + return EgressFireWallRule.create(self.api_client, + networkid=network.id, + protocol=rule["protocol"], + cidrlist=rule["cidrlist"], + startport=rule["startport"], + endport=rule["endport"] + ) + + # create_network_acl_list - Creates network ACL list in the given VPC + def create_network_acl_list(self, name, description, vpc): + self.debug("Adding NetworkACL list in VPC: %s" % vpc.id) + return NetworkACLList.create(self.api_client, + services={}, + name=name, + description=description, + vpcid=vpc.id + ) + + # create_network_acl_rule - Creates network ACL rule Ingree/Egress in the given network + def create_network_acl_rule(self, rule, traffic_type="Ingress", network=None, acl_list=None): + self.debug("Adding NetworkACL rule: %s" % rule) + return NetworkACL.create(self.api_client, + networkid=network.id if network else None, + services=rule, + traffictype=traffic_type, + aclid=acl_list.id if acl_list else None + ) + + # migrate_vm - Migrates the VM to a different host if available + def migrate_vm(self, vm): + self.debug("Checking if a host is available for migration?") + hosts = Host.listForMigration(self.api_client) + self.assertEqual(isinstance(hosts, list), True, + "List hosts should return a valid list" + ) + # Remove the host of current VM from the hosts list + hosts[:] = [host for host in hosts if host.id != vm.hostid] + if len(hosts) <= 0: + self.skipTest("No host available for migration. Test requires at-least 2 hosts") + host = hosts[0] + self.debug("Migrating VM-ID: %s to Host: %s" % (vm.id, host.id)) + try: + vm.migrate(self.api_client, hostid=host.id) + except Exception as e: + self.fail("Failed to migrate instance, %s" % e) + + # get_network_router - returns the router for the given network + def get_network_router(self, network): + self.debug("Finding the virtual router for network: %s" % network.name) + routers = Router.list(self.api_client, + networkid=network.id, + listall=True + ) + self.assertEqual(isinstance(routers, list), True, + "List routers should return a valid virtual router for network" + ) + return routers[0] + + # stop_network_router - Stops the given network router + def stop_network_router(self, router): + self.debug("Stopping Router with ID: %s" % router.id) + cmd = stopRouter.stopRouterCmd() + cmd.id = router.id + self.api_client.stopRouter(cmd) + + # start_network_router - Starts the given network router + def start_network_router(self, router): + self.debug("Starting Router with ID: %s" % router.id) + cmd = startRouter.startRouterCmd() + cmd.id = router.id + self.api_client.startRouter(cmd) + + # ssh_into_vm - Gets into the shell of the given VM + def ssh_into_vm(self, vm, public_ip): + self.debug("SSH into VM=%s on public_ip=%s" % (vm.name, public_ip.ipaddress.ipaddress)) + ssh_client = vm.get_ssh_client(ipaddress=public_ip.ipaddress.ipaddress) + return ssh_client + + # execute_cmd - Executes the given command on the given ssh client + def execute_cmd(self, ssh_client, cmd): + self.debug("EXECUTE SSH COMMAND: " + cmd) + ret_data = "" + out_list = ssh_client.execute(cmd) + if out_list is not None: + ret_data = ' '.join(map(str, out_list)).strip() + self.debug("ssh execute cmd result=" + ret_data) + else: + self.debug("ssh execute cmd result is None") + return ret_data + + # wget_from_server - fetches the index.html file from the given public Ip + def wget_from_server(self, public_ip): + import urllib + self.debug("wget from a http server on public_ip=%s" % public_ip.ipaddress.ipaddress) + wget_file = urllib.urlretrieve("http://%s/index.html" % public_ip.ipaddress.ipaddress, + filename="index.html" + ) + return wget_file + + # validate_NetworkServiceProvider - Validates the Network Service Provider + # in the Nuage VSP Physical Network - matches the given provider name + # against the list of providers fetched + def validate_NetworkServiceProvider(self, provider_name, state=None): + """Validates the Network Service Provider in the Nuage VSP Physical Network""" + self.debug("Check if the Network Service Provider is created successfully ?") + providers = NetworkServiceProvider.list(self.api_client, + name=provider_name, + physicalnetworkid=self.vsp_physical_network.id) + self.assertEqual(isinstance(providers, list), True, + "List Network Service Provider should return a valid list" + ) + self.assertEqual(provider_name, providers[0].name, + "Name of the Network Service Provider should match with the returned list data" + ) + if state: + self.assertEqual(providers[0].state, state, + "Network Service Provider state should be '%s'" % state + ) + self.debug("Network Service Provider creation successfully validated - %s" % provider_name) + + # validate_vpc_offering - Validates the VPC offering, matches the given VPC off name against the list of VPC offerings fetched + def validate_vpc_offering(self, vpc_offering, state=None): + """Validates the VPC offering""" + self.debug("Check if the VPC offering is created successfully ?") + vpc_offs = VpcOffering.list(self.api_client, + id=vpc_offering.id + ) + self.assertEqual(isinstance(vpc_offs, list), True, + "List VPC offering should return a valid list" + ) + self.assertEqual(vpc_offering.name, vpc_offs[0].name, + "Name of the VPC offering should match with the returned list data" + ) + if state: + self.assertEqual(vpc_offs[0].state, state, + "VPC offering state should be '%s'" % state + ) + self.debug("VPC offering creation successfully validated - %s" % vpc_offering.name) + + # validate_vpc - Validates the given VPC matches, the given VPC name against the list of VPCs fetched + def validate_vpc(self, vpc, state=None): + """Validates the VPC""" + self.debug("Check if the VPC is created successfully ?") + vpcs = VPC.list(self.api_client, + id=vpc.id + ) + self.assertEqual(isinstance(vpcs, list), True, + "List VPC should return a valid list" + ) + self.assertEqual(vpc.name, vpcs[0].name, + "Name of the VPC should match with the returned list data" + ) + if state: + self.assertEqual(vpcs[0].state, state, + "VPC state should be '%s'" % state + ) + self.debug("VPC creation successfully validated - %s" % vpc.name) + + # validate_network_offering - Validates the given Network offering + # matches the given network offering name against the list of network + # offerings fetched + def validate_network_offering(self, net_offering, state=None): + """Validates the Network offering""" + self.debug("Check if the Network offering is created successfully ?") + net_offs = NetworkOffering.list(self.api_client, + id=net_offering.id + ) + self.assertEqual(isinstance(net_offs, list), True, + "List Network offering should return a valid list" + ) + self.assertEqual(net_offering.name, net_offs[0].name, + "Name of the Network offering should match with the returned list data" + ) + if state: + self.assertEqual(net_offs[0].state, state, + "Network offering state should be '%s'" % state + ) + self.debug("Network offering creation successfully validated - %s" % net_offering.name) + + # validate_network - Validates the network - matches the given network name against the list of networks fetched + def validate_network(self, network, state=None): + """Validates the network""" + self.debug("Check if the network is created successfully ?") + networks = Network.list(self.api_client, + id=network.id + ) + self.assertEqual(isinstance(networks, list), True, + "List network should return a valid list" + ) + self.assertEqual(network.name, networks[0].name, + "Name of the network should match with with the returned list data" + ) + if state: + self.assertEqual(networks[0].state, state, + "Network state should be '%s'" % state + ) + self.debug("Network creation successfully validated - %s" % network.name) + + # check_router_state - Fetches the list of routers and their states and matches the given router's state + def check_router_state(self, router, state=None): + self.debug("Check if the virtual router is in state - %s" % state) + routers = Router.list(self.api_client, + id=router.id, + listall=True + ) + self.assertEqual(isinstance(routers, list), True, + "List router should return a valid list" + ) + if state: + self.assertEqual(routers[0].state, state, + "Virtual router is not in the expected state" + ) + self.debug("Virtual router is in the expected state - %s" % state) + + # check_vm_state - Fetches the list of VMs and their states and matches the given VM's state + def check_vm_state(self, vm, state=None): + self.debug("Check if the VM instance is in state - %s" % state) + vms = VirtualMachine.list(self.api_client, + id=vm.id, + listall=True + ) + self.assertEqual(isinstance(vms, list), True, + "List virtual machine should return a valid list" + ) + if state: + self.assertEqual(vms[0].state, state, + "Virtual machine is not in the expected state" + ) + self.debug("Virtual machine is in the expected state - %s" % state) + + # validate_Public_IP - Looks if the given public ip is in the allocated state form the list of fetched public IPs + def validate_Public_IP(self, public_ip, network, static_nat=False, vm=None): + """Validates the Public IP""" + self.debug("Check if the Public IP is successfully assigned to the network ?") + public_ips = PublicIPAddress.list(self.api_client, + id=public_ip.ipaddress.id, + networkid=network.id, + isstaticnat=static_nat, + listall=True + ) + self.assertEqual(isinstance(public_ips, list), True, + "List public Ip for network should return a valid list" + ) + self.assertEqual(public_ips[0].ipaddress, public_ip.ipaddress.ipaddress, + "List public Ip for network should list the assigned public Ip address" + ) + self.assertEqual(public_ips[0].state, "Allocated", + "Assigned public Ip is not in the allocated state" + ) + if static_nat and vm: + self.assertEqual(public_ips[0].virtualmachineid, vm.id, + "Static NAT Rule not enabled for the VM using the assigned public Ip" + ) + self.debug("Assigned Public IP is successfully validated - %s" % public_ip.ipaddress.ipaddress) + + # VSD verifications + + def fetch_by_external_id(self, fetcher, *cs_objects): + """ Fetches a child object by external id using the given fetcher, and uuids of the given cloudstack objects. + E.G. + - fetch_by_external_id(vsdk.NUSubnet(id="954de425-b860-410b-be09-c560e7dbb474").vms, cs_vm) + - fetch_by_external_id(session.user.floating_ips, cs_network, cs_public_ip) + :param fetcher: VSPK Fetcher to use to find the child entity + :param cs_objects: Cloudstack objects to take the UUID from. + :return: the VSPK object having the correct externalID + """ + return fetcher.get_first(filter="externalID BEGINSWITH '%s'" % ":".join([o.id for o in cs_objects])) + + # VSD verifications using cms_vspk_wrapper + + def get_externalID(self, object_id): + return object_id + "@" + self.cms_id + + # verify_vsp_network - Fetches the vsd domain, vsd zone and vsd subnet and Verifies the given network/VPC values match the fetched values + def verify_vsp_network(self, domain_id, network, vpc=None): + vsd_enterprise = self.vsd.get_enterprise(name=domain_id) + if vpc: + ext_network_id = self.get_externalID(vpc.id) + else: + ext_network_id = self.get_externalID(network.id) + ext_subnet_id = self.get_externalID(network.id) + vsd_domain = self.vsd.get_domain(externalID=ext_network_id) + vsd_zone = self.vsd.get_zone(externalID=ext_network_id) + vsd_subnet = self.vsd.get_subnet(externalID=ext_subnet_id) + self.debug("SHOW ENTERPRISE DATA FORMAT IN VSD") + self.debug(vsd_enterprise) + self.assertNotEqual(vsd_enterprise, None, + "VSD Enterprise data format should not be a None type" + ) + self.debug("SHOW NETWORK DATA FORMAT IN VSD") + self.debug(vsd_domain) + self.debug(vsd_zone) + self.debug(vsd_subnet) + if vpc: + self.assertEqual(vsd_domain['description'], "VPC_" + vpc.name, + "VSD domain description should match VPC name in CloudStack" + ) + self.assertEqual(vsd_zone['description'], "VPC_" + vpc.name, + "VSD zone description should match VPC name in CloudStack" + ) + else: + self.assertEqual(vsd_domain['description'], network.name, + "VSD domain description should match Isolated Network name in CloudStack" + ) + self.assertEqual(vsd_zone['description'], network.name, + "VSD zone description should match Isolated Network name in CloudStack" + ) + self.assertEqual(vsd_subnet['description'], network.name, + "VSD subnet description should match Isolated Network name in CloudStack" + ) + + # verify_vsp_vm - Fetches the vsd vport, vsd vm and interface and Verifies the given VM values match the fetched values + def verify_vsp_vm(self, vm, stopped=None): + ext_vm_id = self.get_externalID(vm.id) + for nic in vm.nic: + ext_network_id = self.get_externalID(nic.networkid) + ext_nic_id = self.get_externalID(nic.id) + vsd_vport = self.vsd.get_vport(subnet_externalID=ext_network_id, vport_externalID=ext_nic_id) + vsd_vm_interface = self.vsd.get_vm_interface(externalID=ext_nic_id) + self.debug("SHOW VPORT and VM INTERFACE DATA FORMAT IN VSD") + self.debug(vsd_vport) + self.debug(vsd_vm_interface) + self.assertEqual(vsd_vport['active'], True, + "VSD VM vport should be active" + ) + self.assertEqual(vsd_vm_interface['IPAddress'], nic.ipaddress, + "VSD VM interface IP address should match VM's NIC IP address in CloudStack" + ) + vsd_vm = self.vsd.get_vm(externalID=ext_vm_id) + self.debug("SHOW VM DATA FORMAT IN VSD") + self.debug(vsd_vm) + if not self.isSimulator: + if stopped: + self.assertEqual(vsd_vm['status'], "DELETE_PENDING", + "VM state in VSD should be DELETE_PENDING" + ) + else: + self.assertEqual(vsd_vm['status'], vm.state.upper(), + "VM state in VSD should match its state in CloudStack" + ) + # verify_vsp_router - Fetches the vsd router and Verifies the given router status match the fetched status + + def verify_vsp_router(self, router, stopped=None): + ext_router_id = self.get_externalID(router.id) + vsd_router = self.vsd.get_vm(externalID=ext_router_id) + self.debug("SHOW VIRTUAL ROUTER DATA FORMAT IN VSD") + self.debug(vsd_router) + if not self.isSimulator: + if stopped: + self.assertEqual(vsd_router['status'], "DELETE_PENDING", + "Router state in VSD should be DELETE_PENDING" + ) + else: + self.assertEqual(vsd_router['status'], router.state.upper(), + "Router state in VSD should match its state in CloudStack" + ) + + # verify_vsp_floating_ip - Verifies the floating IPs on the given public IP against the VSD FIP + def verify_vsp_floating_ip(self, network, vm, public_ipaddress, vpc=None): + ext_fip_id = self.get_externalID(network.id + ":" + public_ipaddress.id) + vsd_fip = self.vsd.get_floating_ip(externalID=ext_fip_id) + self.debug("SHOW FLOATING IP DATA FORMAT IN VSD") + self.debug(vsd_fip) + self.assertEqual(vsd_fip['address'], public_ipaddress.ipaddress, + "Floating IP address in VSD should match acquired Public IP address in CloudStack" + ) + if vpc: + ext_network_id = self.get_externalID(vpc.id) + else: + ext_network_id = self.get_externalID(network.id) + vsd_domain = self.vsd.get_domain(externalID=ext_network_id) + self.debug("SHOW NETWORK DATA FORMAT IN VSD") + self.debug(vsd_domain) + self.assertEqual(vsd_domain['ID'], vsd_fip['parentID'], + "Floating IP in VSD should be associated with the correct VSD domain, " + "which in turn should correspond to the correct VPC (or) Isolated network in CloudStack" + ) + ext_subnet_id = self.get_externalID(network.id) + vsd_subnet = self.vsd.get_subnet(externalID=ext_subnet_id) + for nic in vm.nic: + if nic.networkname == vsd_subnet['description']: + ext_network_id = self.get_externalID(nic.networkid) + ext_nic_id = self.get_externalID(nic.id) + vsd_vport = self.vsd.get_vport(subnet_externalID=ext_network_id, vport_externalID=ext_nic_id) + self.debug("SHOW VM VPORT DATA FORMAT IN VSD") + self.debug(vsd_vport) + self.assertEqual(vsd_vport['associatedFloatingIPID'], vsd_fip['ID'], + "Floating IP in VSD should be associated to the correct VSD vport, " + "which in turn should correspond to the correct Static NAT enabled VM " + "and Isolated Network in CloudStack" + ) + + # verify_vsp_firewall_rule - Verifies the start port, destination port, + # protocol of the given firewall rule Ingress/Egress against the VSD + # firewall rule + def verify_vsp_firewall_rule(self, firewall_rule, traffic_type="Ingress"): + ext_fw_id = self.get_externalID(firewall_rule.id) + if traffic_type is "Ingress": + vsd_fw_rule = self.vsd.get_egress_acl_entry(externalID=ext_fw_id) + else: + vsd_fw_rule = self.vsd.get_ingress_acl_entry(externalID=ext_fw_id) + self.debug("SHOW ACL ENTRY IN VSD") + self.debug(vsd_fw_rule) + dest_port = str(firewall_rule.startport) + "-" + str(firewall_rule.endport) + self.assertEqual(vsd_fw_rule['destinationPort'], dest_port, + "Destination Port in VSD should match Destination Port in CloudStack" + ) + vsd_protocol = str(vsd_fw_rule['protocol']) + self.debug("vsd protocol " + vsd_protocol) + protocol = "tcp" + if vsd_protocol == 6: + protocol = "tcp" + elif vsd_protocol == 1: + protocol = "icmp" + elif vsd_protocol == 17: + protocol = "udp" + self.assertEqual(protocol, firewall_rule.protocol.lower(), + "Protocol in VSD should match Protocol in CloudStack" + ) --- End diff -- I don't see any test methods (or marvin test) in this suite file. > Add nuagevsp userdata testcase (Cloudstack-9095) & Refactor existing testcases > ------------------------------------------------------------------------------ > > Key: CLOUDSTACK-9304 > URL: https://issues.apache.org/jira/browse/CLOUDSTACK-9304 > Project: CloudStack > Issue Type: Task > Security Level: Public(Anyone can view this level - this is the > default.) > Components: Automation > Reporter: Sowmya Neeladhuri > Fix For: 4.9.0 > > > 1. Created folder / test / integration / plugins / nuagevsp > 2. Moved test_nuage_vpc_network.py , test_nuage_vsp.py from / test / > integration / component / to---> / test / integration / plugins / nuagevsp/ > 3. Added new testcase test_nuage_password_reset.py (Cloudstack-9095) > 4. Added Nuage class in / tools / marvin / marvin / lib / base.py > 5. Added services in / tools / marvin / marvin / config / test_data.py > > results: > test_01_UserDataPasswordReset > (nuagevsp.test_nuage_password_reset.TestNuagePasswordReset) ... === TestName: > test_01_UserDataPasswordReset | Status : SUCCESS === > ok > Test Basic VPC Network Functionality with NuageVsp network Plugin ... === > TestName: test_nuage_vpc_network | Status : SUCCESS === > ok > Test NuageVsp network plugin with basic Isolated Network functionality ... > === TestName: test_nuage_vsp | Status : SUCCESS === > ok > ---------------------------------------------------------------------- > XML: /root/report_xunit.xml > ---------------------------------------------------------------------- > Ran 3 tests in 2406.256s > OK > ~ -- This message was sent by Atlassian JIRA (v6.3.4#6332)