On 06/29/2012 06:31 AM, Yu Mingfei wrote:
> This patch adds testcases for virt-edit.
>
> Signed-off-by: Yu Mingfei <[email protected]>
> ---
> client/tests/libvirt/tests/virt_edit.py | 283
> +++++++++++++++++++++++++++++++
> 1 files changed, 283 insertions(+), 0 deletions(-)
> create mode 100644 client/tests/libvirt/tests/virt_edit.py
>
> diff --git a/client/tests/libvirt/tests/virt_edit.py
> b/client/tests/libvirt/tests/virt_edit.py
> new file mode 100644
> index 0000000..53d7599
> --- /dev/null
> +++ b/client/tests/libvirt/tests/virt_edit.py
> @@ -0,0 +1,283 @@
> +import logging, time, os, sys, shutil, re
> +from autotest.client.shared import utils, error
> +from autotest.client.virt import virt_vm, virt_remote, virt_env_process,
> libvirt_vm, aexpect
> +from xml.dom.minidom import parse, parseString
> +
^ Yu,
A double line spacing between major blocks (functions, methods, etc) is
usually a good idea, as it conforms to Autotest's style guidelines.
> +def vm_new_name(old_vm, newname, params):
> + oldfile = "/etc/libvirt/qemu/%s.xml" % old_vm.name
> + newfile = "/etc/libvirt/qemu/%s.xml" % newname
> + if oldfile == newfile:
> + logging.info("use old vm.")
> + return old_vm
> + shutil.copy(oldfile, newfile)
> + dom = parse(newfile)
> + root = dom.documentElement
> + node_name = root.getElementsByTagName("name")[0]
> + node_name.firstChild.data = "%s" % newname
> + node_uuid = root.getElementsByTagName("uuid")[0]
> + root.removeChild(node_uuid)
> + f=open(newfile, "w")
> + dom.writexml(f)
> + f.close()
^ Chris, IMHO this is a great opportunity for trying out xml_utils.
> +
> + if not os.path.exists(newfile):
> + raise error.TestError("Failed to create xml file.")
> +
> + if params.has_key("uri"):
> + uri = params.get("uri")
> + status = libvirt_vm.virsh_define(newfile, uri)
> + if not status:
> + os.remove(newfile)
> + raise error.TestError("Failed to define a VM.")
> +
> + new_vm = libvirt_vm.VM(newname, params, old_vm.root_dir,
> old_vm.address_cache)
> + return new_vm
> +
> +def cmd_virt_edit(dom_disk_object, filename, option = "", more_args = "",
> change_lang = "no", uri = ""):
> + """
> + Create command of virt-edit.
> + """
> + cmd = ""
> + if change_lang == "yes":
> + cmd = "LANG=C "
> + else:
> + pass
> +
> + if uri == "":
> + cmd += "virt-edit "
> + else:
> + cmd += "virt-edit -c %s " % uri
> + if option != "":
> + cmd += "%s " % option
> + if dom_disk_object != "":
> + cmd += "%s " % dom_disk_object
> + if filename != "":
> + cmd += "%s " % filename
> + if more_args != "":
> + cmd += "%s " % more_args
> + return cmd
> +
> +def virt_edit(vm_ref, filename, params):
> + """
> + According params to make virt-edit command by cmd_virt_edit.
> + @vm_ref:
> + """
> + change_lang = params.get("change_lang")
> + uri = params.get("uri")
> + option = params.get("option")
> + use_disk_name = params.get("use_disk_name")
> + more_args = params.get("more_args")
> + if vm_ref == "right_name":
> + if use_disk_name == "yes":
> + dom_disk_object = params.get("disk_name")
> + utils.run("dd if=/dev/zero of=%s bs=512M count=2" %
> dom_disk_object)
> + else:
> + dom_disk_object = params.get("new_vm_name")
> + elif vm_ref == "valid_name":
> + dom_disk_object = params.get("valid_name")
> + elif vm_ref == "uuid":
> + if params.has_key("dom_uuid"):
> + dom_disk_object = params.get("dom_uuid")
> + else:
> + logging.info("Get dom uuid failed.")
> + dom_disk_object = ""
> + elif vm_ref == "dom_disk":
> + if params.has_key("dom_disk"):
> + dom_disk_object = params.get("dom_disk")
> + else:
> + logging.info("Get dom disk failed.")
> + dom_disk_object = ""
> + else:
> + dom_disk_object = ""
> +
> + cmd = cmd_virt_edit(dom_disk_object, filename, option, more_args,
> change_lang, uri)
> + logging.info("virt-edit command: %s" % cmd)
> + return cmd
> +
> +def exec_virt_edit(cmd):
> + cmd_result = utils.run(cmd, timeout=30, ignore_status=True)
> + logging.info("Output: %s", cmd_result.stdout.strip())
> + logging.info("Error: %s", cmd_result.stderr.strip())
> + logging.info("Status: %d", cmd_result.exit_status)
> + return cmd_result.exit_status
> +
> +def result_analysis(status, error_opposite = "no"):
> + """
> + According status return a result of PASS(True) or FAIL(False).
> + @status: status of command.
> + @error_opposite(True or False):status good to PASS or bad to PASS.
> + """
> + if status == 0:
> + if error_opposite == "yes":
> + return False
> + else:
> + return True
> + else:
> + if error_opposite == "yes":
> + return True
> + else:
> + return False
> +
> +def clean_up(vm, undefine_vm = "no"):
> + if undefine_vm == "yes":
> + vm.remove()
> + else:
> + pass
> +
> +def run_virt_edit(test, params, env):
> + """
> + Test of virt-edit.
> + """
> + vm_ref = params.get("vm_ref")
> + libvirtd_restart = params.get("libvirtd_restart")
> + fail_flag = 0
> + file_names_type = params.get("file_names_type")
> + addition_edit = params.get("addition_edit")
> + foo_line = params.get("foo_line")
> + create_new_vm = params.get("create_new_vm")
> + vm = env.get_vm(params["main_vm"])
> + shutdown_later = params.get("shutdown_later")
> + wait_time = int(params.get("wait_time"))
> + undefine_vm = params.get("undefine_vm")
> + vm_name = params.get("main_vm")
> +
> + vm.verify_alive()
> + #wait for domain to start
> + time.sleep(wait_time)
> +
> + #check domain ip, if domain can not be connected ,end test
> + if params.get("os_type") != "windows":
> + vm_ip = vm.get_address(index = 0)
> + logging.info("VM IP: %s" % vm_ip)
> + ping_result = utils.run("ping -c 2 %s" % vm_ip, ignore_status = True)
> + if ping_result.exit_status != 0:
> + raise error.TestError("Pinging VM %s has no responding." %
> vm_name)
> + vm.destroy()
> +
> + #create new domain if needed.
> + new_dom_name = params.get("new_vm_name")
> + domain = vm_new_name(vm, new_dom_name, params)
> + params["dom_uuid"] = domain.get_uuid()
> + domain.start()
> + time.sleep(10)
> + domain.verify_alive()
> + time.sleep(wait_time)
> +
> + #if test during domain is runing,shutdown_later is yes.
> + if shutdown_later == "no":
> + domain.destroy()
> + time.sleep(10)
> +
> + if vm_ref == "dom_disk":
> + get_disk_cmd = "domblklist %s" % params.get("new_vm_name")
> + disk_list = libvirt_vm.virsh_cmd(get_disk_cmd).stdout.split('\n')
> + logging.info("Disks on %s:\n%s", params.get("new_vm_name"),
> disk_list)
> + disk_path = ""
> + for disk in disk_list:
> + disk_name = disk.split(' ')[0]
> + if re.search("[v|h|s]da", disk_name):
> + disk_path = disk.split(' ')[-1]
> + break
> + params["dom_disk"] = disk_path.strip()
> + logging.info("disk to edit:%s", params["dom_disk"])
> +
> + #get files to edit in domain or disk
> + if file_names_type == "":
> + file_names = [""]
> + elif file_names_type == "valid_file_name":
> + file_names = ["/etc/foo"]
> + else:
> + if params.get("os_type") == "windows":
> + file_names = params.get("win_file_names").split(" ")
> + else:
> + file_names = params.get("linux_file_names").split(" ")
> +
> + #try to edit files
> + for file_name in file_names:
> + cmd = virt_edit(vm_ref, file_name, params)
> + #if libvirtd_restart is true,stop libvirt now and start later
> + if libvirtd_restart == "yes":
> + libvirt_vm.libvirtd_stop()
> + time.sleep(5)
> + if addition_edit == "yes":
> + try:
> + session = aexpect.Expect(cmd)
> + time.sleep(15)
> + session.send('Go')
> + time.sleep(5)
> + session.send('%s' % foo_line)
> + time.sleep(5)
> + session.send('\x1b')
> + time.sleep(5)
> + session.send('ZZ')
> + time.sleep(5)
> + logging.info("virt-edit to add %s success", foo_line)
> + except Exception, e:
> + fail_flag = 1
> + logging.info("virt-edit returned invalid: add %s failed:
> %s", foo_line, e)
> + else:
> + status = exec_virt_edit(cmd)
> + status_error = params.get("status_error")
> + result = result_analysis(status, status_error)
> + if result:
> + logging.info("virt-edit test pass, result is expected.")
> + else:
> + fail_flag = 1
> + logging.info("virt-edit returned is not expected.")
> + if libvirtd_restart == "yes":
> + libvirt_vm.libvirtd_start()
> + time.sleep(5)
> + if shutdown_later == "yes":
> + domain.destroy()
> +
> + if params.get("use_disk_name") == "yes":
> + utils.run("rm -f %s" % params.get("disk_name"))
> +
> + if fail_flag != 0:
> + clean_up(domain, undefine_vm)
> + raise error.TestFail("Test Failed.")
> + else:
> + if params.get("os_type") == "windows":
> + clean_up(domain, undefine_vm)
> + else:
> + if addition_edit == "yes":
> + domain.start()
> + time.sleep(10)
> + dom_state = domain.state()
> + oper_output = ""
> + if dom_state != "running":
> + fail_flag = 1
> + logging.info("vm guest cannt be started.Test Fail.")
> + else:
> + time.sleep(wait_time)
> + try:
> + passwd = params.get("password")
> + session = virt_remote.remote_login("ssh", vm_ip,
> + "22", "root", passwd, "#")
> + time.sleep(5)
> + oper_output = session.cmd_output('cat /etc/hosts',
> + internal_timeout=10)
> + logging.info("\n%s", oper_output)
> + time.sleep(5)
> + session.sendline('cp -f /etc/hosts /etc/hosts.bk')
> + time.sleep(5)
> + session.sendline('sed -e \'/%s/d\' /etc/hosts.bk >
> /etc/hosts' % foo_line)
> + time.sleep(5)
> + session.sendline('rm -f /etc/hosts.bk')
> + time.sleep(5)
> + session.close()
> + except Exception, e:
> + fail_flag = 1
> + logging.info("vm guest cannt be accessed.Test
> Fail:%s", e)
> + domain.destroy()
> + if not re.search(("%s" % foo_line), oper_output):
> + fail_flag = 1
> + logging.info("Edit file fail.Test Fail.")
> + clean_up(domain, undefine_vm)
> + else:
> + if undefine_vm == "yes":
> + clean_up(domain, undefine_vm)
> + if libvirtd_restart == "yes" and undefine_vm == "yes":
> + libvirt_vm.virsh_undefine(new_dom_name)
> + if fail_flag != 0:
> + raise error.TestFail("Test Failed.")
> -- 1.7.1
>
> _______________________________________________
> Autotest mailing list
> [email protected]
> http://test.kernel.org/cgi-bin/mailman/listinfo/autotest
_______________________________________________
Autotest mailing list
[email protected]
http://test.kernel.org/cgi-bin/mailman/listinfo/autotest