This is an automated email from the ASF dual-hosted git repository.

sureshanaparti pushed a commit to branch 4.22
in repository https://gitbox.apache.org/repos/asf/cloudstack.git


The following commit(s) were added to refs/heads/4.22 by this push:
     new 0c86899cc19 Added VDDK support in VMware to KVM migrations (#12970)
0c86899cc19 is described below

commit 0c86899cc198ee5dd75ded9cffa7eb4adc48332e
Author: Harikrishna <[email protected]>
AuthorDate: Tue Apr 14 22:33:01 2026 +0530

    Added VDDK support in VMware to KVM migrations (#12970)
---
 agent/conf/agent.properties                        |  13 +
 .../cloud/agent/properties/AgentProperties.java    |  24 ++
 .../com/cloud/agent/api/to/RemoteInstanceTO.java   |  20 +-
 api/src/main/java/com/cloud/host/Host.java         |   3 +
 .../org/apache/cloudstack/api/ApiConstants.java    |   1 +
 .../api/command/admin/vm/ImportVmCmd.java          |  12 +
 .../agent/api/CheckConvertInstanceCommand.java     |  23 ++
 .../cloud/agent/api/ConvertInstanceCommand.java    |  36 +++
 .../com/cloud/agent/manager/AgentManagerImpl.java  |  25 +-
 .../kvm/resource/LibvirtComputingResource.java     | 162 +++++++++++
 .../LibvirtCheckConvertInstanceCommandWrapper.java |  10 +-
 .../LibvirtConvertInstanceCommandWrapper.java      | 307 ++++++++++++++++++---
 .../wrapper/LibvirtReadyCommandWrapper.java        |   4 +
 ...virtCheckConvertInstanceCommandWrapperTest.java |  25 ++
 .../LibvirtConvertInstanceCommandWrapperTest.java  | 124 +++++++++
 .../cloudstack/vm/UnmanagedVMsManagerImpl.java     | 143 ++++++++--
 .../cloudstack/vm/UnmanagedVMsManagerImplTest.java | 160 +++++++++--
 ui/public/locales/en.json                          |   3 +
 ui/src/views/infra/HostInfo.vue                    |  16 ++
 ui/src/views/tools/ImportUnmanagedInstance.vue     | 100 ++++++-
 20 files changed, 1116 insertions(+), 95 deletions(-)

diff --git a/agent/conf/agent.properties b/agent/conf/agent.properties
index 777bd4bf5cd..013c2e5bf2a 100644
--- a/agent/conf/agent.properties
+++ b/agent/conf/agent.properties
@@ -460,3 +460,16 @@ iscsi.session.cleanup.enabled=false
 
 # Time, in seconds, to wait before retrying to rebase during the incremental 
snapshot process.
 # incremental.snapshot.retry.rebase.wait=60
+
+# Path to the VDDK library directory for VMware to KVM conversion via VDDK,
+# passed to virt-v2v as -io vddk-libdir=<path>
+#vddk.lib.dir=
+
+# Ordered VDDK transport preference for VMware to KVM conversion via VDDK, 
passed as
+# -io vddk-transports=<value> to virt-v2v. Example: nbd:nbdssl
+#vddk.transports=
+
+# Optional vCenter SHA1 thumbprint for VMware to KVM conversion via VDDK, 
passed as
+# -io vddk-thumbprint=<value>. If unset, CloudStack computes it on the KVM 
host via openssl.
+#vddk.thumbprint=
+
diff --git 
a/agent/src/main/java/com/cloud/agent/properties/AgentProperties.java 
b/agent/src/main/java/com/cloud/agent/properties/AgentProperties.java
index 9d3f9f7df55..1cb9232eec1 100644
--- a/agent/src/main/java/com/cloud/agent/properties/AgentProperties.java
+++ b/agent/src/main/java/com/cloud/agent/properties/AgentProperties.java
@@ -808,6 +808,30 @@ public class AgentProperties{
      */
     public static final Property<String> CONVERT_ENV_VIRTV2V_TMPDIR = new 
Property<>("convert.instance.env.virtv2v.tmpdir", null, String.class);
 
+    /**
+     * Path to the VDDK library directory on the KVM conversion host, used 
when converting VMs from VMware to KVM via VDDK.
+     * This directory is passed to virt-v2v as <code>-io 
vddk-libdir=&lt;path&gt;</code>.
+     * Data type: String.<br>
+     * Default value: <code>null</code>
+     */
+    public static final Property<String> VDDK_LIB_DIR = new 
Property<>("vddk.lib.dir", null, String.class);
+
+    /**
+     * Ordered list of VDDK transports for virt-v2v, passed as <code>-io 
vddk-transports=&lt;value&gt;</code>.
+     * Example: <code>nbd:nbdssl</code>.
+     * Data type: String.<br>
+     * Default value: <code>null</code>
+     */
+    public static final Property<String> VDDK_TRANSPORTS = new 
Property<>("vddk.transports", null, String.class);
+
+    /**
+     * vCenter TLS certificate thumbprint used by virt-v2v VDDK mode, passed 
as <code>-io vddk-thumbprint=&lt;value&gt;</code>.
+     * If unset, the KVM host computes it at runtime from the vCenter endpoint.
+     * Data type: String.<br>
+     * Default value: <code>null</code>
+     */
+    public static final Property<String> VDDK_THUMBPRINT = new 
Property<>("vddk.thumbprint", null, String.class);
+
     /**
      * BGP controll CIDR
      * Data type: String.<br>
diff --git a/api/src/main/java/com/cloud/agent/api/to/RemoteInstanceTO.java 
b/api/src/main/java/com/cloud/agent/api/to/RemoteInstanceTO.java
index 18737c584b3..7daeb964917 100644
--- a/api/src/main/java/com/cloud/agent/api/to/RemoteInstanceTO.java
+++ b/api/src/main/java/com/cloud/agent/api/to/RemoteInstanceTO.java
@@ -36,13 +36,17 @@ public class RemoteInstanceTO implements Serializable {
     private String vcenterPassword;
     private String vcenterHost;
     private String datacenterName;
+    private String clusterName;
+    private String hostName;
 
     public RemoteInstanceTO() {
     }
 
-    public RemoteInstanceTO(String instanceName) {
+    public RemoteInstanceTO(String instanceName, String clusterName, String 
hostName) {
         this.hypervisorType = Hypervisor.HypervisorType.VMware;
         this.instanceName = instanceName;
+        this.clusterName = clusterName;
+        this.hostName = hostName;
     }
 
     public RemoteInstanceTO(String instanceName, String instancePath, String 
vcenterHost, String vcenterUsername, String vcenterPassword, String 
datacenterName) {
@@ -55,6 +59,12 @@ public class RemoteInstanceTO implements Serializable {
         this.datacenterName = datacenterName;
     }
 
+    public RemoteInstanceTO(String instanceName, String instancePath, String 
vcenterHost, String vcenterUsername, String vcenterPassword, String 
datacenterName, String clusterName, String hostName) {
+        this(instanceName, instancePath, vcenterHost, vcenterUsername, 
vcenterPassword, datacenterName);
+        this.clusterName = clusterName;
+        this.hostName = hostName;
+    }
+
     public Hypervisor.HypervisorType getHypervisorType() {
         return this.hypervisorType;
     }
@@ -82,4 +92,12 @@ public class RemoteInstanceTO implements Serializable {
     public String getDatacenterName() {
         return datacenterName;
     }
+
+    public String getClusterName() {
+        return clusterName;
+    }
+
+    public String getHostName() {
+        return hostName;
+    }
 }
diff --git a/api/src/main/java/com/cloud/host/Host.java 
b/api/src/main/java/com/cloud/host/Host.java
index 9c011bac319..b5234820151 100644
--- a/api/src/main/java/com/cloud/host/Host.java
+++ b/api/src/main/java/com/cloud/host/Host.java
@@ -57,6 +57,9 @@ public interface Host extends StateObject<Status>, Identity, 
Partition, HAResour
     String HOST_UEFI_ENABLE = "host.uefi.enable";
     String HOST_VOLUME_ENCRYPTION = "host.volume.encryption";
     String HOST_INSTANCE_CONVERSION = "host.instance.conversion";
+    String HOST_VDDK_SUPPORT = "host.vddk.support";
+    String HOST_VDDK_LIB_DIR = "vddk.lib.dir";
+    String HOST_VDDK_VERSION = "host.vddk.version";
     String HOST_OVFTOOL_VERSION = "host.ovftool.version";
     String HOST_VIRTV2V_VERSION = "host.virtv2v.version";
     String HOST_SSH_PORT = "host.ssh.port";
diff --git a/api/src/main/java/org/apache/cloudstack/api/ApiConstants.java 
b/api/src/main/java/org/apache/cloudstack/api/ApiConstants.java
index 43af4b94015..03b73834a94 100644
--- a/api/src/main/java/org/apache/cloudstack/api/ApiConstants.java
+++ b/api/src/main/java/org/apache/cloudstack/api/ApiConstants.java
@@ -622,6 +622,7 @@ public class ApiConstants {
     public static final String USER_CONFIGURABLE = "userconfigurable";
     public static final String USER_SECURITY_GROUP_LIST = 
"usersecuritygrouplist";
     public static final String USER_SECRET_KEY = "usersecretkey";
+    public static final String USE_VDDK = "usevddk";
     public static final String USE_VIRTUAL_NETWORK = "usevirtualnetwork";
     public static final String USE_VIRTUAL_ROUTER_IP_RESOLVER = 
"userouteripresolver";
     public static final String UPDATE_IN_SEQUENCE = "updateinsequence";
diff --git 
a/api/src/main/java/org/apache/cloudstack/api/command/admin/vm/ImportVmCmd.java 
b/api/src/main/java/org/apache/cloudstack/api/command/admin/vm/ImportVmCmd.java
index 50ccfbd69c5..db7dcc3fb44 100644
--- 
a/api/src/main/java/org/apache/cloudstack/api/command/admin/vm/ImportVmCmd.java
+++ 
b/api/src/main/java/org/apache/cloudstack/api/command/admin/vm/ImportVmCmd.java
@@ -179,6 +179,14 @@ public class ImportVmCmd extends 
ImportUnmanagedInstanceCmd {
             description = "(only for importing VMs from VMware to KVM) 
optional - the ID of the guest OS for the imported VM.")
     private Long guestOsId;
 
+    @Parameter(name = ApiConstants.USE_VDDK,
+            type = CommandType.BOOLEAN,
+            since = "4.22.1",
+            description = "(only for importing VMs from VMware to KVM) 
optional - if true, uses VDDK on the KVM conversion host for converting the VM. 
" +
+                    "This parameter is mutually exclusive with " + 
ApiConstants.FORCE_MS_TO_IMPORT_VM_FILES + ".")
+    private Boolean useVddk;
+
+
     /////////////////////////////////////////////////////
     /////////////////// Accessors ///////////////////////
     /////////////////////////////////////////////////////
@@ -255,6 +263,10 @@ public class ImportVmCmd extends 
ImportUnmanagedInstanceCmd {
         return storagePoolId;
     }
 
+    public boolean getUseVddk() {
+        return BooleanUtils.toBooleanDefaultIfNull(useVddk, true);
+    }
+
     public String getTmpPath() {
         return tmpPath;
     }
diff --git 
a/core/src/main/java/com/cloud/agent/api/CheckConvertInstanceCommand.java 
b/core/src/main/java/com/cloud/agent/api/CheckConvertInstanceCommand.java
index fc066e5c589..c46fb697a3c 100644
--- a/core/src/main/java/com/cloud/agent/api/CheckConvertInstanceCommand.java
+++ b/core/src/main/java/com/cloud/agent/api/CheckConvertInstanceCommand.java
@@ -18,6 +18,8 @@ package com.cloud.agent.api;
 
 public class CheckConvertInstanceCommand extends Command {
     boolean checkWindowsGuestConversionSupport = false;
+    boolean useVddk = false;
+    String vddkLibDir;
 
     public CheckConvertInstanceCommand() {
     }
@@ -26,6 +28,11 @@ public class CheckConvertInstanceCommand extends Command {
         this.checkWindowsGuestConversionSupport = 
checkWindowsGuestConversionSupport;
     }
 
+    public CheckConvertInstanceCommand(boolean 
checkWindowsGuestConversionSupport, boolean useVddk) {
+        this.checkWindowsGuestConversionSupport = 
checkWindowsGuestConversionSupport;
+        this.useVddk = useVddk;
+    }
+
     @Override
     public boolean executeInSequence() {
         return false;
@@ -34,4 +41,20 @@ public class CheckConvertInstanceCommand extends Command {
     public boolean getCheckWindowsGuestConversionSupport() {
         return checkWindowsGuestConversionSupport;
     }
+
+    public boolean isUseVddk() {
+        return useVddk;
+    }
+
+    public void setUseVddk(boolean useVddk) {
+        this.useVddk = useVddk;
+    }
+
+    public String getVddkLibDir() {
+        return vddkLibDir;
+    }
+
+    public void setVddkLibDir(String vddkLibDir) {
+        this.vddkLibDir = vddkLibDir;
+    }
 }
diff --git a/core/src/main/java/com/cloud/agent/api/ConvertInstanceCommand.java 
b/core/src/main/java/com/cloud/agent/api/ConvertInstanceCommand.java
index 24336747ccf..38e0dca7736 100644
--- a/core/src/main/java/com/cloud/agent/api/ConvertInstanceCommand.java
+++ b/core/src/main/java/com/cloud/agent/api/ConvertInstanceCommand.java
@@ -31,6 +31,10 @@ public class ConvertInstanceCommand extends Command {
     private boolean exportOvfToConversionLocation;
     private int threadsCountToExportOvf = 0;
     private String extraParams;
+    private boolean useVddk;
+    private String vddkLibDir;
+    private String vddkTransports;
+    private String vddkThumbprint;
 
     public ConvertInstanceCommand() {
     }
@@ -90,6 +94,38 @@ public class ConvertInstanceCommand extends Command {
         this.extraParams = extraParams;
     }
 
+    public boolean isUseVddk() {
+        return useVddk;
+    }
+
+    public void setUseVddk(boolean useVddk) {
+        this.useVddk = useVddk;
+    }
+
+    public String getVddkLibDir() {
+        return vddkLibDir;
+    }
+
+    public void setVddkLibDir(String vddkLibDir) {
+        this.vddkLibDir = vddkLibDir;
+    }
+
+    public String getVddkTransports() {
+        return vddkTransports;
+    }
+
+    public void setVddkTransports(String vddkTransports) {
+        this.vddkTransports = vddkTransports;
+    }
+
+    public String getVddkThumbprint() {
+        return vddkThumbprint;
+    }
+
+    public void setVddkThumbprint(String vddkThumbprint) {
+        this.vddkThumbprint = vddkThumbprint;
+    }
+
     @Override
     public boolean executeInSequence() {
         return false;
diff --git 
a/engine/orchestration/src/main/java/com/cloud/agent/manager/AgentManagerImpl.java
 
b/engine/orchestration/src/main/java/com/cloud/agent/manager/AgentManagerImpl.java
index 9e85be87db3..8c69dcdc482 100644
--- 
a/engine/orchestration/src/main/java/com/cloud/agent/manager/AgentManagerImpl.java
+++ 
b/engine/orchestration/src/main/java/com/cloud/agent/manager/AgentManagerImpl.java
@@ -805,8 +805,11 @@ public class AgentManagerImpl extends ManagerBase 
implements AgentManager, Handl
                 String uefiEnabled = detailsMap.get(Host.HOST_UEFI_ENABLE);
                 String virtv2vVersion = 
detailsMap.get(Host.HOST_VIRTV2V_VERSION);
                 String ovftoolVersion = 
detailsMap.get(Host.HOST_OVFTOOL_VERSION);
+                String vddkSupport = detailsMap.get(Host.HOST_VDDK_SUPPORT);
+                String vddkLibDir = detailsMap.get(Host.HOST_VDDK_LIB_DIR);
+                String vddkVersion = detailsMap.get(Host.HOST_VDDK_VERSION);
                 logger.debug("Got HOST_UEFI_ENABLE [{}] for host [{}]:", 
uefiEnabled, host);
-                if (ObjectUtils.anyNotNull(uefiEnabled, virtv2vVersion, 
ovftoolVersion)) {
+                if (ObjectUtils.anyNotNull(uefiEnabled, virtv2vVersion, 
ovftoolVersion, vddkSupport, vddkLibDir, vddkVersion)) {
                     _hostDao.loadDetails(host);
                     boolean updateNeeded = false;
                     if (StringUtils.isNotBlank(uefiEnabled) && 
!uefiEnabled.equals(host.getDetails().get(Host.HOST_UEFI_ENABLE))) {
@@ -821,6 +824,26 @@ public class AgentManagerImpl extends ManagerBase 
implements AgentManager, Handl
                         host.getDetails().put(Host.HOST_OVFTOOL_VERSION, 
ovftoolVersion);
                         updateNeeded = true;
                     }
+                    if (StringUtils.isNotBlank(vddkSupport) && 
!vddkSupport.equals(host.getDetails().get(Host.HOST_VDDK_SUPPORT))) {
+                        host.getDetails().put(Host.HOST_VDDK_SUPPORT, 
vddkSupport);
+                        updateNeeded = true;
+                    }
+                    if 
(!StringUtils.defaultString(vddkLibDir).equals(StringUtils.defaultString(host.getDetails().get(Host.HOST_VDDK_LIB_DIR))))
 {
+                        if (StringUtils.isBlank(vddkLibDir)) {
+                            host.getDetails().remove(Host.HOST_VDDK_LIB_DIR);
+                        } else {
+                            host.getDetails().put(Host.HOST_VDDK_LIB_DIR, 
vddkLibDir);
+                        }
+                        updateNeeded = true;
+                    }
+                    if 
(!StringUtils.defaultString(vddkVersion).equals(StringUtils.defaultString(host.getDetails().get(Host.HOST_VDDK_VERSION))))
 {
+                        if (StringUtils.isBlank(vddkVersion)) {
+                            host.getDetails().remove(Host.HOST_VDDK_VERSION);
+                        } else {
+                            host.getDetails().put(Host.HOST_VDDK_VERSION, 
vddkVersion);
+                        }
+                        updateNeeded = true;
+                    }
                     if (updateNeeded) {
                         _hostDao.saveDetails(host);
                     }
diff --git 
a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/LibvirtComputingResource.java
 
b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/LibvirtComputingResource.java
index 64df98f413a..25162ca9b92 100644
--- 
a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/LibvirtComputingResource.java
+++ 
b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/LibvirtComputingResource.java
@@ -18,6 +18,9 @@ package com.cloud.hypervisor.kvm.resource;
 
 import static com.cloud.host.Host.HOST_INSTANCE_CONVERSION;
 import static com.cloud.host.Host.HOST_OVFTOOL_VERSION;
+import static com.cloud.host.Host.HOST_VDDK_LIB_DIR;
+import static com.cloud.host.Host.HOST_VDDK_SUPPORT;
+import static com.cloud.host.Host.HOST_VDDK_VERSION;
 import static com.cloud.host.Host.HOST_VIRTV2V_VERSION;
 import static com.cloud.host.Host.HOST_VOLUME_ENCRYPTION;
 import static org.apache.cloudstack.utils.linux.KVMHostInfo.isHostS390x;
@@ -363,6 +366,7 @@ public class LibvirtComputingResource extends 
ServerResourceBase implements Serv
     public static final String WINDOWS_GUEST_CONVERSION_SUPPORTED_CHECK_CMD = 
"rpm -qa | grep -i virtio-win";
     public static final String 
UBUNTU_WINDOWS_GUEST_CONVERSION_SUPPORTED_CHECK_CMD = "dpkg -l virtio-win";
     public static final String UBUNTU_NBDKIT_PKG_CHECK_CMD = "dpkg -l nbdkit";
+    public static final String VDDK_AUTODETECT_PATH_CMD = "find / -type d 
-name 'vmware-vix-disklib-distrib' 2>/dev/null | head -n 1";
 
     public static final int LIBVIRT_CGROUP_CPU_SHARES_MIN = 2;
     public static final int LIBVIRT_CGROUP_CPU_SHARES_MAX = 262144;
@@ -883,10 +887,16 @@ public class LibvirtComputingResource extends 
ServerResourceBase implements Serv
 
     private boolean convertInstanceVerboseMode = false;
     private Map<String, String> convertInstanceEnv = null;
+    private String vddkLibDir = null;
+    private static final String libguestfsBackend = "direct";
     protected boolean dpdkSupport = false;
     protected String dpdkOvsPath;
     protected String directDownloadTemporaryDownloadPath;
     protected String cachePath;
+    private String vddkTransports = null;
+    private String vddkThumbprint = null;
+    private String vddkVersion = null;
+    private String detectedPasswordFileOption = null;
     protected String javaTempDir = System.getProperty("java.io.tmpdir");
 
     private String getEndIpFromStartIp(final String startIp, final int numIps) 
{
@@ -951,6 +961,26 @@ public class LibvirtComputingResource extends 
ServerResourceBase implements Serv
         return convertInstanceEnv;
     }
 
+    public String getVddkLibDir() {
+        return vddkLibDir;
+    }
+
+    public String getLibguestfsBackend() {
+        return libguestfsBackend;
+    }
+
+    public String getVddkTransports() {
+        return vddkTransports;
+    }
+
+    public String getVddkThumbprint() {
+        return vddkThumbprint;
+    }
+
+    public String getVddkVersion() {
+        return vddkVersion;
+    }
+
     /**
      * Defines resource's public and private network interface according to 
what is configured in agent.properties.
      */
@@ -1151,6 +1181,37 @@ public class LibvirtComputingResource extends 
ServerResourceBase implements Serv
 
         setConvertInstanceEnv(convertEnvTmpDir, convertEnvVirtv2vTmpDir);
 
+        vddkLibDir = 
StringUtils.trimToNull(AgentPropertiesFileHandler.getPropertyValue(AgentProperties.VDDK_LIB_DIR));
+        if (StringUtils.isNotBlank(vddkLibDir) && 
!isVddkLibDirValid(vddkLibDir)) {
+            LOGGER.warn("Configured VDDK library dir [{}] is invalid (missing 
lib64/libvixDiskLib.so), attempting auto-detection", vddkLibDir);
+            vddkLibDir = null;
+        }
+        if (StringUtils.isBlank(vddkLibDir)) {
+            vddkLibDir = detectVddkLibDir();
+        }
+        if (StringUtils.isNotBlank(vddkLibDir)) {
+            LOGGER.info("Detected VDDK library dir: {}", vddkLibDir);
+        } else {
+            LOGGER.warn("Could not detect a valid VDDK library dir; VDDK 
conversion will be unavailable");
+        }
+
+        vddkVersion = detectVddkVersion();
+        if (StringUtils.isNotBlank(vddkVersion)) {
+            LOGGER.info("Detected nbdkit VDDK plugin version: {}", 
vddkVersion);
+        }
+
+        vddkTransports = StringUtils.trimToNull(
+                
AgentPropertiesFileHandler.getPropertyValue(AgentProperties.VDDK_TRANSPORTS));
+        vddkThumbprint = StringUtils.trimToNull(
+                
AgentPropertiesFileHandler.getPropertyValue(AgentProperties.VDDK_THUMBPRINT));
+
+        detectedPasswordFileOption = detectPasswordFileOption();
+        if (StringUtils.isNotBlank(detectedPasswordFileOption)) {
+            LOGGER.info("Detected virt-v2v password option: {}", 
detectedPasswordFileOption);
+        } else {
+            LOGGER.warn("Could not detect virt-v2v password option, VDDK 
conversions may fail");
+        }
+
         pool = (String)params.get("pool");
         if (pool == null) {
             pool = "/root";
@@ -4212,6 +4273,13 @@ public class LibvirtComputingResource extends 
ServerResourceBase implements Serv
         cmd.setHostTags(getHostTags());
         boolean instanceConversionSupported = hostSupportsInstanceConversion();
         cmd.getHostDetails().put(HOST_INSTANCE_CONVERSION, 
String.valueOf(instanceConversionSupported));
+        cmd.getHostDetails().put(HOST_VDDK_SUPPORT, 
String.valueOf(hostSupportsVddk()));
+        if (StringUtils.isNotBlank(vddkLibDir)) {
+            cmd.getHostDetails().put(HOST_VDDK_LIB_DIR, vddkLibDir);
+        }
+        if (StringUtils.isNotBlank(vddkVersion)) {
+            cmd.getHostDetails().put(HOST_VDDK_VERSION, vddkVersion);
+        }
         if (instanceConversionSupported) {
             cmd.getHostDetails().put(HOST_VIRTV2V_VERSION, 
getHostVirtV2vVersion());
         }
@@ -5927,6 +5995,66 @@ public class LibvirtComputingResource extends 
ServerResourceBase implements Serv
         return exitValue == 0;
     }
 
+    public boolean hostSupportsVddk() {
+        return hostSupportsVddk(null);
+    }
+
+    public boolean hostSupportsVddk(String overriddenVddkLibDir) {
+        String effectiveVddkLibDir = 
StringUtils.trimToNull(overriddenVddkLibDir);
+        if (StringUtils.isBlank(effectiveVddkLibDir)) {
+            effectiveVddkLibDir = StringUtils.trimToNull(vddkLibDir);
+        }
+        if (StringUtils.isBlank(effectiveVddkLibDir) || 
!isVddkLibDirValid(effectiveVddkLibDir)) {
+            effectiveVddkLibDir = detectVddkLibDir();
+        }
+        return hostSupportsInstanceConversion() && 
isVddkLibDirValid(effectiveVddkLibDir) && 
StringUtils.isNotBlank(detectVddkVersion());
+    }
+
+    protected boolean isVddkLibDirValid(String path) {
+        if (StringUtils.isBlank(path)) {
+            return false;
+        }
+        File libDir = new File(path, "lib64");
+        if (!libDir.isDirectory()) {
+            return false;
+        }
+        File[] libs = libDir.listFiles((dir, name) -> 
name.startsWith("libvixDiskLib.so"));
+        return libs != null && libs.length > 0;
+    }
+
+    protected String detectVddkLibDir() {
+        String detectedPath = 
StringUtils.trimToNull(Script.runSimpleBashScript(VDDK_AUTODETECT_PATH_CMD));
+        if (StringUtils.isNotBlank(detectedPath) && 
isVddkLibDirValid(detectedPath)) {
+            return detectedPath;
+        }
+        return null;
+    }
+
+    protected String detectVddkVersion() {
+        try {
+            ProcessBuilder pb = new ProcessBuilder("nbdkit", "vddk", 
"--version");
+            Process process = pb.start();
+
+            String output = new 
String(process.getInputStream().readAllBytes());
+            process.waitFor();
+
+            if (StringUtils.isBlank(output)) {
+                return null;
+            }
+
+            for (String line : output.split("\\R")) {
+                String trimmed = StringUtils.trimToEmpty(line);
+                if (trimmed.startsWith("vddk ")) {
+                    return StringUtils.trimToNull(trimmed.substring("vddk 
".length()));
+                }
+            }
+            return null;
+        } catch (Exception e) {
+            LOGGER.error("Failed to detect vddk version: {}", e.getMessage());
+            return null;
+        }
+    }
+
     public boolean hostSupportsWindowsGuestConversion() {
         if (isUbuntuOrDebianHost()) {
             int exitValue = 
Script.runSimpleBashScriptForExitValue(UBUNTU_WINDOWS_GUEST_CONVERSION_SUPPORTED_CHECK_CMD);
@@ -5941,6 +6069,40 @@ public class LibvirtComputingResource extends 
ServerResourceBase implements Serv
         return exitValue == 0;
     }
 
+    /**
+     * Detect which password option virt-v2v supports by examining its --help 
output
+     * @return "-ip" if supported (virt-v2v >= 2.8.1), "--password-file" if 
older version, or null if detection fails
+     */
+    protected String detectPasswordFileOption() {
+        try {
+            ProcessBuilder pb = new ProcessBuilder("virt-v2v", "--help");
+            Process process = pb.start();
+
+            String output = new 
String(process.getInputStream().readAllBytes());
+            process.waitFor();
+
+            if (output.contains("-ip <filename>")) {
+                return "-ip";
+            } else if (output.contains("--password-file")) {
+                return "--password-file";
+            } else {
+                LOGGER.error("virt-v2v does not support -ip or 
--password-file");
+                return null;
+            }
+        } catch (Exception e) {
+            LOGGER.error("Failed to detect virt-v2v password option: {}", 
e.getMessage());
+            return null;
+        }
+    }
+
+    /**
+     * Get the detected password file option for virt-v2v
+     * @return the password option ("-ip" or "--password-file") or null if not 
detected
+     */
+    public String getDetectedPasswordFileOption() {
+        return detectedPasswordFileOption;
+    }
+
     public String getHostVirtV2vVersion() {
         if (!hostSupportsInstanceConversion()) {
             return "";
diff --git 
a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtCheckConvertInstanceCommandWrapper.java
 
b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtCheckConvertInstanceCommandWrapper.java
index b94b4830003..de9341715f0 100644
--- 
a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtCheckConvertInstanceCommandWrapper.java
+++ 
b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtCheckConvertInstanceCommandWrapper.java
@@ -30,7 +30,15 @@ public class LibvirtCheckConvertInstanceCommandWrapper 
extends CommandWrapper<Ch
 
     @Override
     public Answer execute(CheckConvertInstanceCommand cmd, 
LibvirtComputingResource serverResource) {
-        if (!serverResource.hostSupportsInstanceConversion()) {
+        if (cmd.isUseVddk()) {
+            if (!serverResource.hostSupportsVddk(cmd.getVddkLibDir())) {
+                String msg = String.format("Cannot convert the instance from 
VMware using VDDK on host %s. " +
+                                "Please make sure virt-v2v%s, nbdkit-vddk and 
a valid VDDK library directory are available on the host.",
+                        serverResource.getPrivateIp(), 
serverResource.isUbuntuOrDebianHost() ? ", nbdkit" : "");
+                logger.info(msg);
+                return new CheckConvertInstanceAnswer(cmd, false, msg);
+            }
+        } else if (!serverResource.hostSupportsInstanceConversion()) {
             String msg = String.format("Cannot convert the instance from 
VMware as the virt-v2v binary is not found on host %s. " +
                     "Please install virt-v2v%s on the host before attempting 
the instance conversion.", serverResource.getPrivateIp(), 
serverResource.isUbuntuOrDebianHost()? ", nbdkit" : "");
             logger.info(msg);
diff --git 
a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtConvertInstanceCommandWrapper.java
 
b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtConvertInstanceCommandWrapper.java
index 66a5f5dd7d2..eaa8b790439 100644
--- 
a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtConvertInstanceCommandWrapper.java
+++ 
b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtConvertInstanceCommandWrapper.java
@@ -20,10 +20,17 @@ package com.cloud.hypervisor.kvm.resource.wrapper;
 
 import java.net.URLEncoder;
 import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.attribute.PosixFilePermission;
+import java.util.Locale;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.apache.cloudstack.storage.to.PrimaryDataStoreTO;
 import org.apache.commons.collections4.MapUtils;
@@ -51,6 +58,7 @@ public class LibvirtConvertInstanceCommandWrapper extends 
CommandWrapper<Convert
 
     private static final List<Hypervisor.HypervisorType> 
supportedInstanceConvertSourceHypervisors =
             List.of(Hypervisor.HypervisorType.VMware);
+    private static final Pattern SHA1_FINGERPRINT_PATTERN = 
Pattern.compile("(?i)(?:SHA1\\s+)?Fingerprint\\s*=\\s*([0-9A-F:]+)");
 
     @Override
     public Answer execute(ConvertInstanceCommand cmd, LibvirtComputingResource 
serverResource) {
@@ -61,7 +69,8 @@ public class LibvirtConvertInstanceCommandWrapper extends 
CommandWrapper<Convert
         DataStoreTO conversionTemporaryLocation = 
cmd.getConversionTemporaryLocation();
         long timeout = (long) cmd.getWait() * 1000;
         String extraParams = cmd.getExtraParams();
-        String originalVMName = cmd.getOriginalVMName(); // For logging 
purposes, as the sourceInstance may have been cloned
+        boolean useVddk = cmd.isUseVddk();
+        String originalVMName = cmd.getOriginalVMName();
 
         if (cmd.getCheckConversionSupport() && 
!serverResource.hostSupportsInstanceConversion()) {
             String msg = String.format("Cannot convert the instance %s from 
VMware as the virt-v2v binary is not found. " +
@@ -84,61 +93,75 @@ public class LibvirtConvertInstanceCommandWrapper extends 
CommandWrapper<Convert
         logger.info(String.format("(%s) Attempting to convert the instance %s 
from %s to KVM",
                 originalVMName, sourceInstanceName, sourceHypervisorType));
         final String temporaryConvertPath = 
temporaryStoragePool.getLocalPath();
+        final String temporaryConvertUuid = UUID.randomUUID().toString();
+        boolean verboseModeEnabled = 
serverResource.isConvertInstanceVerboseModeEnabled();
 
-        String ovfTemplateDirOnConversionLocation;
-        String sourceOVFDirPath;
+        boolean cleanupSecondaryStorage = false;
         boolean ovfExported = false;
-        if (cmd.getExportOvfToConversionLocation()) {
-            String exportInstanceOVAUrl = 
getExportInstanceOVAUrl(sourceInstance, originalVMName);
-            if (StringUtils.isBlank(exportInstanceOVAUrl)) {
-                String err = String.format("Couldn't export OVA for the VM %s, 
due to empty url", sourceInstanceName);
-                logger.error(String.format("(%s) %s", originalVMName, err));
-                return new Answer(cmd, false, err);
-            }
+        String ovfTemplateDirOnConversionLocation = null;
 
-            int noOfThreads = cmd.getThreadsCountToExportOvf();
-            if (noOfThreads > 1 && 
!serverResource.ovfExportToolSupportsParallelThreads()) {
-                noOfThreads = 0;
-            }
-            ovfTemplateDirOnConversionLocation = UUID.randomUUID().toString();
-            
temporaryStoragePool.createFolder(ovfTemplateDirOnConversionLocation);
-            sourceOVFDirPath = String.format("%s/%s/", temporaryConvertPath, 
ovfTemplateDirOnConversionLocation);
-            ovfExported = exportOVAFromVMOnVcenter(exportInstanceOVAUrl, 
sourceOVFDirPath, noOfThreads, originalVMName, timeout);
-            if (!ovfExported) {
-                String err = String.format("Export OVA for the VM %s failed", 
sourceInstanceName);
-                logger.error(String.format("(%s) %s", originalVMName, err));
-                return new Answer(cmd, false, err);
-            }
-            sourceOVFDirPath = String.format("%s%s/", sourceOVFDirPath, 
sourceInstanceName);
-        } else {
-            ovfTemplateDirOnConversionLocation = 
cmd.getTemplateDirOnConversionLocation();
-            sourceOVFDirPath = String.format("%s/%s/", temporaryConvertPath, 
ovfTemplateDirOnConversionLocation);
-        }
+        try {
+            boolean result;
+            if (useVddk) {
+                logger.info("({}) Using VDDK-based conversion (direct from 
VMware)", originalVMName);
+                String vddkLibDir = resolveVddkSetting(cmd.getVddkLibDir(), 
serverResource.getVddkLibDir());
+                if (StringUtils.isBlank(vddkLibDir)) {
+                    String err = String.format("VDDK lib dir is not configured 
on the host. " +
+                            "Set '%s' in agent.properties or in details 
parameter of the import api calll to use VDDK-based conversion.", 
"vddk.lib.dir");
+                    logger.error("({}) {}", originalVMName, err);
+                    return new Answer(cmd, false, err);
+                }
+                String vddkTransports = 
resolveVddkSetting(cmd.getVddkTransports(), serverResource.getVddkTransports());
+                String configuredVddkThumbprint = 
resolveVddkSetting(cmd.getVddkThumbprint(), serverResource.getVddkThumbprint());
+                String passwordOption = 
serverResource.getDetectedPasswordFileOption();
+                result = performInstanceConversionUsingVddk(sourceInstance, 
originalVMName, temporaryConvertPath,
+                        vddkLibDir, serverResource.getLibguestfsBackend(), 
vddkTransports, configuredVddkThumbprint,
+                        timeout, verboseModeEnabled, extraParams, 
temporaryConvertUuid, passwordOption);
+            } else {
+                logger.info("({}) Using OVF-based conversion (export + local 
convert)", originalVMName);
+                String sourceOVFDirPath;
+                if (cmd.getExportOvfToConversionLocation()) {
+                    String exportInstanceOVAUrl = 
getExportInstanceOVAUrl(sourceInstance, originalVMName);
 
-        logger.info(String.format("(%s) Attempting to convert the OVF %s of 
the instance %s from %s to KVM",
-                originalVMName, ovfTemplateDirOnConversionLocation, 
sourceInstanceName, sourceHypervisorType));
+                    if (StringUtils.isBlank(exportInstanceOVAUrl)) {
+                        String err = String.format("Couldn't export OVA for 
the VM %s, due to empty url", sourceInstanceName);
+                        logger.error("({}) {}", originalVMName, err);
+                        return new Answer(cmd, false, err);
+                    }
 
-        final String temporaryConvertUuid = UUID.randomUUID().toString();
-        boolean verboseModeEnabled = 
serverResource.isConvertInstanceVerboseModeEnabled();
+                    int noOfThreads = cmd.getThreadsCountToExportOvf();
+                    if (noOfThreads > 1 && 
!serverResource.ovfExportToolSupportsParallelThreads()) {
+                        noOfThreads = 0;
+                    }
+                    ovfTemplateDirOnConversionLocation = 
UUID.randomUUID().toString();
+                    
temporaryStoragePool.createFolder(ovfTemplateDirOnConversionLocation);
+                    sourceOVFDirPath = String.format("%s/%s/", 
temporaryConvertPath, ovfTemplateDirOnConversionLocation);
+                    ovfExported = 
exportOVAFromVMOnVcenter(exportInstanceOVAUrl, sourceOVFDirPath, noOfThreads, 
originalVMName, timeout);
+
+                    if (!ovfExported) {
+                        String err = String.format("Export OVA for the VM %s 
failed", sourceInstanceName);
+                        logger.error("({}) {}", originalVMName, err);
+                        return new Answer(cmd, false, err);
+                    }
+                    sourceOVFDirPath = String.format("%s%s/", 
sourceOVFDirPath, sourceInstanceName);
+                } else {
+                    ovfTemplateDirOnConversionLocation = 
cmd.getTemplateDirOnConversionLocation();
+                    sourceOVFDirPath = String.format("%s/%s/", 
temporaryConvertPath, ovfTemplateDirOnConversionLocation);
+                }
+
+                result = performInstanceConversion(originalVMName, 
sourceOVFDirPath, temporaryConvertPath, temporaryConvertUuid,
+                        timeout, verboseModeEnabled, extraParams, 
serverResource);
+            }
 
-        boolean cleanupSecondaryStorage = false;
-        try {
-            boolean result = performInstanceConversion(originalVMName, 
sourceOVFDirPath, temporaryConvertPath, temporaryConvertUuid,
-                    timeout, verboseModeEnabled, extraParams, serverResource);
             if (!result) {
-                String err = String.format(
-                        "The virt-v2v conversion for the OVF %s failed. Please 
check the agent logs " +
-                                "for the virt-v2v output. Please try on a 
different kvm host which " +
-                                "has a different virt-v2v version.",
-                        ovfTemplateDirOnConversionLocation);
-                logger.error(String.format("(%s) %s", originalVMName, err));
+                String err = String.format("Instance conversion failed for VM 
%s. Please check virt-v2v logs.", sourceInstanceName);
+                logger.error("({}) {}", originalVMName, err);
                 return new Answer(cmd, false, err);
             }
             return new ConvertInstanceAnswer(cmd, temporaryConvertUuid);
         } catch (Exception e) {
-            String error = String.format("Error converting instance %s from 
%s, due to: %s",
-                    sourceInstanceName, sourceHypervisorType, e.getMessage());
-            logger.error(String.format("(%s) %s", originalVMName, error), e);
+            String error = String.format("Error converting instance %s from 
%s, due to: %s", sourceInstanceName, sourceHypervisorType, e.getMessage());
+            logger.error("({}) {}", originalVMName, error, e);
             cleanupSecondaryStorage = true;
             return new Answer(cmd, false, error);
         } finally {
@@ -275,4 +298,198 @@ public class LibvirtConvertInstanceCommandWrapper extends 
CommandWrapper<Convert
     protected String encodeUsername(String username) {
         return URLEncoder.encode(username, Charset.defaultCharset());
     }
+
+    private String resolveVddkSetting(String commandValue, String agentValue) {
+        return 
StringUtils.defaultIfBlank(StringUtils.trimToNull(commandValue), 
StringUtils.trimToNull(agentValue));
+    }
+
+    protected boolean performInstanceConversionUsingVddk(RemoteInstanceTO 
vmwareInstance, String originalVMName,
+                                                         String 
temporaryConvertFolder, String vddkLibDir,
+                                                         String 
libguestfsBackend, String vddkTransports,
+                                                         String 
configuredVddkThumbprint,
+                                                         long timeout, boolean 
verboseModeEnabled, String extraParams,
+                                                         String 
temporaryConvertUuid, String passwordOption) {
+
+        String vcenterPassword = vmwareInstance.getVcenterPassword();
+        if (StringUtils.isBlank(vcenterPassword)) {
+            logger.error("({}) Could not determine vCenter password for {}", 
originalVMName, vmwareInstance.getVcenterHost());
+            return false;
+        }
+
+        String passwordFilePath = String.format("/tmp/v2v.pass.cloud.%s.%s",
+                StringUtils.defaultIfBlank(vmwareInstance.getVcenterHost(), 
"unknown"),
+                UUID.randomUUID());
+        try {
+            Files.writeString(Path.of(passwordFilePath), vcenterPassword);
+            Files.setPosixFilePermissions(Path.of(passwordFilePath), 
Set.of(PosixFilePermission.OWNER_READ, PosixFilePermission.OWNER_WRITE));
+            logger.debug("({}) Written vCenter password to {}", 
originalVMName, passwordFilePath);
+        } catch (Exception e) {
+            logger.error("({}) Failed to write vCenter password file {}: {}", 
originalVMName, passwordFilePath, e.getMessage());
+            return false;
+        }
+
+        try {
+            String vpxUrl = buildVpxUrl(vmwareInstance);
+
+            StringBuilder cmd = new StringBuilder();
+
+            cmd.append("export 
LIBGUESTFS_BACKEND=").append(libguestfsBackend).append(" && ");
+
+            cmd.append("virt-v2v ");
+            cmd.append("--root first ");
+            cmd.append("-ic '").append(vpxUrl).append("' ");
+            if (StringUtils.isBlank(passwordOption)) {
+                logger.error("({}) Could not determine supported password file 
option for virt-v2v", originalVMName);
+                return false;
+            }
+
+            cmd.append(passwordOption).append(" 
").append(passwordFilePath).append(" ");
+            cmd.append("-it vddk ");
+            cmd.append("-io vddk-libdir=").append(vddkLibDir).append(" ");
+            String vddkThumbprint = 
StringUtils.trimToNull(configuredVddkThumbprint);
+            if (StringUtils.isBlank(vddkThumbprint)) {
+                vddkThumbprint = 
getVcenterThumbprint(vmwareInstance.getVcenterHost(), timeout, originalVMName);
+            }
+            if (StringUtils.isBlank(vddkThumbprint)) {
+                logger.error("({}) Could not determine vCenter thumbprint for 
{}", originalVMName, vmwareInstance.getVcenterHost());
+                return false;
+            }
+            cmd.append("-io vddk-thumbprint=").append(vddkThumbprint).append(" 
");
+            if (StringUtils.isNotBlank(vddkTransports)) {
+                cmd.append("-io 
vddk-transports=").append(vddkTransports).append(" ");
+            }
+            cmd.append(vmwareInstance.getInstanceName()).append(" ");
+            cmd.append("-o local ");
+            cmd.append("-os ").append(temporaryConvertFolder).append(" ");
+            cmd.append("-of qcow2 ");
+            cmd.append("-on ").append(temporaryConvertUuid).append(" ");
+
+            if (verboseModeEnabled) {
+                cmd.append("-v ");
+            }
+
+            if (StringUtils.isNotBlank(extraParams)) {
+                cmd.append(extraParams).append(" ");
+            }
+
+            Script script = new Script("/bin/bash", timeout, logger);
+            script.add("-c");
+            script.add(cmd.toString());
+
+            String logPrefix = String.format("(%s) virt-v2v vddk import", 
originalVMName);
+            OutputInterpreter.LineByLineOutputLogger outputLogger =
+                    new OutputInterpreter.LineByLineOutputLogger(logger, 
logPrefix);
+
+            logger.info("({}) Starting virt-v2v VDDK conversion", 
originalVMName);
+            script.execute(outputLogger);
+
+            int exitValue = script.getExitValue();
+            if (exitValue != 0) {
+                logger.error("({}) virt-v2v failed with exit code {}", 
originalVMName, exitValue);
+            }
+
+            return exitValue == 0;
+        } finally {
+            try {
+                Files.deleteIfExists(Path.of(passwordFilePath));
+                logger.debug("({}) Deleted password file {}", originalVMName, 
passwordFilePath);
+            } catch (Exception e) {
+                logger.warn("({}) Failed to delete password file {}: {}", 
originalVMName, passwordFilePath, e.getMessage());
+            }
+        }
+    }
+
+    protected String getVcenterThumbprint(String vcenterHost, long timeout, 
String originalVMName) {
+        if (StringUtils.isBlank(vcenterHost)) {
+            return null;
+        }
+
+        String endpoint = String.format("%s:443", vcenterHost);
+        String command = String.format("openssl s_client -connect '%s' 
</dev/null 2>/dev/null | " +
+                "openssl x509 -fingerprint -sha1 -noout", endpoint);
+
+        Script script = new Script("/bin/bash", timeout, logger);
+        script.add("-c");
+        script.add(command);
+
+        OutputInterpreter.AllLinesParser parser = new 
OutputInterpreter.AllLinesParser();
+        script.execute(parser);
+
+        String output = parser.getLines();
+        if (script.getExitValue() != 0) {
+            logger.error("({}) Failed to fetch vCenter thumbprint for {}", 
originalVMName, vcenterHost);
+            return null;
+        }
+
+        String thumbprint = extractSha1Fingerprint(output);
+        if (StringUtils.isBlank(thumbprint)) {
+            logger.error("({}) Failed to parse vCenter thumbprint from output 
for {}", originalVMName, vcenterHost);
+            return null;
+        }
+        return thumbprint;
+    }
+
+    private String extractSha1Fingerprint(String output) {
+        String parsedOutput = StringUtils.trimToEmpty(output);
+        if (StringUtils.isBlank(parsedOutput)) {
+            return null;
+        }
+
+        for (String line : parsedOutput.split("\\R")) {
+            String trimmedLine = StringUtils.trimToEmpty(line);
+            if (StringUtils.isBlank(trimmedLine)) {
+                continue;
+            }
+
+            Matcher matcher = SHA1_FINGERPRINT_PATTERN.matcher(trimmedLine);
+            if (matcher.find()) {
+                return matcher.group(1).toUpperCase(Locale.ROOT);
+            }
+
+            // Fallback for raw fingerprint-only output.
+            if (trimmedLine.matches("(?i)[0-9a-f]{2}(:[0-9a-f]{2})+")) {
+                return trimmedLine.toUpperCase(Locale.ROOT);
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Build vpx:// URL for virt-v2v
+     *
+     * Format:
+     * vpx://user@vcenter/DC/cluster/host?no_verify=1
+     */
+    private String buildVpxUrl(RemoteInstanceTO vmwareInstance) {
+
+        String vmName = vmwareInstance.getInstanceName();
+        String vcenter = vmwareInstance.getVcenterHost();
+        String username = vmwareInstance.getVcenterUsername();
+        String datacenter = vmwareInstance.getDatacenterName();
+        String cluster = vmwareInstance.getClusterName();
+        String host = vmwareInstance.getHostName();
+
+        String encodedUsername = encodeUsername(username);
+
+        StringBuilder url = new StringBuilder();
+        url.append("vpx://")
+                .append(encodedUsername)
+                .append("@")
+                .append(vcenter)
+                .append("/")
+                .append(datacenter);
+
+        if (StringUtils.isNotBlank(cluster)) {
+            url.append("/").append(cluster);
+        }
+
+        if (StringUtils.isNotBlank(host)) {
+            url.append("/").append(host);
+        }
+
+        url.append("?no_verify=1");
+
+        logger.info("({}) Using VPX URL: {}", vmName, url);
+        return url.toString();
+    }
 }
diff --git 
a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtReadyCommandWrapper.java
 
b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtReadyCommandWrapper.java
index e74923b281f..5a7d6d2c203 100644
--- 
a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtReadyCommandWrapper.java
+++ 
b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtReadyCommandWrapper.java
@@ -34,6 +34,7 @@ import 
com.cloud.hypervisor.kvm.resource.LibvirtComputingResource;
 import com.cloud.resource.CommandWrapper;
 import com.cloud.resource.ResourceWrapper;
 import com.cloud.utils.script.Script;
+import org.apache.commons.lang3.StringUtils;
 
 @ResourceWrapper(handles =  ReadyCommand.class)
 public final class LibvirtReadyCommandWrapper extends 
CommandWrapper<ReadyCommand, Answer, LibvirtComputingResource> {
@@ -50,6 +51,9 @@ public final class LibvirtReadyCommandWrapper extends 
CommandWrapper<ReadyComman
         if (libvirtComputingResource.hostSupportsInstanceConversion()) {
             hostDetails.put(Host.HOST_VIRTV2V_VERSION, 
libvirtComputingResource.getHostVirtV2vVersion());
         }
+        hostDetails.put(Host.HOST_VDDK_SUPPORT, 
Boolean.toString(libvirtComputingResource.hostSupportsVddk()));
+        hostDetails.put(Host.HOST_VDDK_LIB_DIR, 
StringUtils.defaultString(libvirtComputingResource.getVddkLibDir()));
+        hostDetails.put(Host.HOST_VDDK_VERSION, 
StringUtils.defaultString(libvirtComputingResource.getVddkVersion()));
 
         if (libvirtComputingResource.hostSupportsOvfExport()) {
             hostDetails.put(Host.HOST_OVFTOOL_VERSION, 
libvirtComputingResource.getHostOvfToolVersion());
diff --git 
a/plugins/hypervisors/kvm/src/test/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtCheckConvertInstanceCommandWrapperTest.java
 
b/plugins/hypervisors/kvm/src/test/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtCheckConvertInstanceCommandWrapperTest.java
index 3cad9c27a68..74090723331 100644
--- 
a/plugins/hypervisors/kvm/src/test/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtCheckConvertInstanceCommandWrapperTest.java
+++ 
b/plugins/hypervisors/kvm/src/test/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtCheckConvertInstanceCommandWrapperTest.java
@@ -52,6 +52,7 @@ public class LibvirtCheckConvertInstanceCommandWrapperTest {
 
     @Test
     public void testCheckInstanceCommand_success() {
+        
Mockito.when(checkConvertInstanceCommandMock.isUseVddk()).thenReturn(false);
         
Mockito.when(libvirtComputingResourceMock.hostSupportsInstanceConversion()).thenReturn(true);
         Answer answer = 
checkConvertInstanceCommandWrapper.execute(checkConvertInstanceCommandMock, 
libvirtComputingResourceMock);
         assertTrue(answer.getResult());
@@ -59,9 +60,33 @@ public class LibvirtCheckConvertInstanceCommandWrapperTest {
 
     @Test
     public void testCheckInstanceCommand_failure() {
+        
Mockito.when(checkConvertInstanceCommandMock.isUseVddk()).thenReturn(false);
         
Mockito.when(libvirtComputingResourceMock.hostSupportsInstanceConversion()).thenReturn(false);
         Answer answer = 
checkConvertInstanceCommandWrapper.execute(checkConvertInstanceCommandMock, 
libvirtComputingResourceMock);
         assertFalse(answer.getResult());
         assertTrue(StringUtils.isNotBlank(answer.getDetails()));
     }
+
+    @Test
+    public void testCheckInstanceCommand_vddkSuccess() {
+        
Mockito.when(checkConvertInstanceCommandMock.isUseVddk()).thenReturn(true);
+        
Mockito.when(checkConvertInstanceCommandMock.getVddkLibDir()).thenReturn("/opt/vmware-vddk/vmware-vix-disklib-distrib");
+        
Mockito.when(libvirtComputingResourceMock.hostSupportsVddk("/opt/vmware-vddk/vmware-vix-disklib-distrib")).thenReturn(true);
+
+        Answer answer = 
checkConvertInstanceCommandWrapper.execute(checkConvertInstanceCommandMock, 
libvirtComputingResourceMock);
+
+        assertTrue(answer.getResult());
+    }
+
+    @Test
+    public void testCheckInstanceCommand_vddkFailure() {
+        
Mockito.when(checkConvertInstanceCommandMock.isUseVddk()).thenReturn(true);
+        
Mockito.when(checkConvertInstanceCommandMock.getVddkLibDir()).thenReturn("/opt/vmware-vddk/vmware-vix-disklib-distrib");
+        
Mockito.when(libvirtComputingResourceMock.hostSupportsVddk("/opt/vmware-vddk/vmware-vix-disklib-distrib")).thenReturn(false);
+
+        Answer answer = 
checkConvertInstanceCommandWrapper.execute(checkConvertInstanceCommandMock, 
libvirtComputingResourceMock);
+
+        assertFalse(answer.getResult());
+        assertTrue(StringUtils.isNotBlank(answer.getDetails()));
+    }
 }
diff --git 
a/plugins/hypervisors/kvm/src/test/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtConvertInstanceCommandWrapperTest.java
 
b/plugins/hypervisors/kvm/src/test/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtConvertInstanceCommandWrapperTest.java
index 4d55ac2bc73..c30fa2f4948 100644
--- 
a/plugins/hypervisors/kvm/src/test/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtConvertInstanceCommandWrapperTest.java
+++ 
b/plugins/hypervisors/kvm/src/test/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtConvertInstanceCommandWrapperTest.java
@@ -18,6 +18,7 @@
 //
 package com.cloud.hypervisor.kvm.resource.wrapper;
 
+import java.nio.file.Files;
 import java.util.List;
 import java.util.UUID;
 
@@ -189,4 +190,127 @@ public class LibvirtConvertInstanceCommandWrapperTest {
         Mockito.verify(script).add("-x");
         Mockito.verify(script).add("-v");
     }
+
+    @Test
+    public void 
testPerformInstanceConversionUsingVddkUsesConfiguredLibguestfsBackend() {
+        RemoteInstanceTO remoteInstanceTO = 
Mockito.mock(RemoteInstanceTO.class);
+        
Mockito.when(remoteInstanceTO.getVcenterHost()).thenReturn("vcenter.local");
+        
Mockito.when(remoteInstanceTO.getVcenterUsername()).thenReturn("[email protected]");
+        
Mockito.when(remoteInstanceTO.getVcenterPassword()).thenReturn("secret");
+        Mockito.when(remoteInstanceTO.getDatacenterName()).thenReturn("dc1");
+        Mockito.when(remoteInstanceTO.getClusterName()).thenReturn("cluster1");
+        Mockito.when(remoteInstanceTO.getHostName()).thenReturn("host1");
+        
Mockito.doReturn("28:19:A6:1C:90:ED:46:D7:1C:86:BC:F6:13:52:F0:B9:19:81:0D:81")
+                
.when(convertInstanceCommandWrapper).getVcenterThumbprint(Mockito.anyString(), 
Mockito.anyLong(), Mockito.anyString());
+
+        try (MockedStatic<Files> filesMock = Mockito.mockStatic(Files.class);
+             MockedConstruction<Script> ignored = 
Mockito.mockConstruction(Script.class, (mock, context) -> {
+                 Mockito.when(mock.execute(Mockito.any())).thenReturn("");
+                 Mockito.when(mock.getExitValue()).thenReturn(0);
+             })) {
+            filesMock.when(() -> Files.writeString(Mockito.argThat(path -> 
path.toString().contains("/tmp/v2v.pass.cloud.vcenter.local.")), 
Mockito.eq("secret")))
+                    .thenAnswer(invocation -> invocation.getArgument(0));
+            filesMock.when(() -> Files.deleteIfExists(Mockito.argThat(path -> 
path.toString().contains("/tmp/v2v.pass.cloud.vcenter.local."))))
+                    .thenReturn(true);
+
+            boolean result = 
convertInstanceCommandWrapper.performInstanceConversionUsingVddk(
+                    remoteInstanceTO, vmName, "/tmp/convert", "/opt/vddk", 
"libvirt", null, null, 1000L, false, null, "tmp-uuid", "-ip");
+
+            Assert.assertTrue(result);
+            Script scriptMock = ignored.constructed().get(0);
+            Mockito.verify(scriptMock).add("-c");
+            Mockito.verify(scriptMock).add(Mockito.contains("export 
LIBGUESTFS_BACKEND=libvirt &&"));
+            Mockito.verify(scriptMock).add(Mockito.contains("-ip 
/tmp/v2v.pass.cloud.vcenter.local."));
+            Mockito.verify(scriptMock).add(Mockito.contains(" -on tmp-uuid "));
+            Mockito.verify(scriptMock).add(Mockito.contains("-io 
vddk-thumbprint=28:19:A6:1C:90:ED:46:D7:1C:86:BC:F6:13:52:F0:B9:19:81:0D:81 "));
+        }
+    }
+
+    @Test
+    public void 
testPerformInstanceConversionUsingVddkUsesConfiguredTransportsOrder() {
+        RemoteInstanceTO remoteInstanceTO = 
Mockito.mock(RemoteInstanceTO.class);
+        
Mockito.when(remoteInstanceTO.getVcenterHost()).thenReturn("vcenter.local");
+        
Mockito.when(remoteInstanceTO.getVcenterUsername()).thenReturn("[email protected]");
+        
Mockito.when(remoteInstanceTO.getVcenterPassword()).thenReturn("secret");
+        Mockito.when(remoteInstanceTO.getDatacenterName()).thenReturn("dc1");
+        Mockito.when(remoteInstanceTO.getClusterName()).thenReturn("cluster1");
+        Mockito.when(remoteInstanceTO.getHostName()).thenReturn("host1");
+        
Mockito.doReturn("28:19:A6:1C:90:ED:46:D7:1C:86:BC:F6:13:52:F0:B9:19:81:0D:81")
+                
.when(convertInstanceCommandWrapper).getVcenterThumbprint(Mockito.anyString(), 
Mockito.anyLong(), Mockito.anyString());
+
+        try (MockedStatic<Files> filesMock = Mockito.mockStatic(Files.class);
+             MockedConstruction<Script> ignored = 
Mockito.mockConstruction(Script.class, (mock, context) -> {
+                 Mockito.when(mock.execute(Mockito.any())).thenReturn("");
+                 Mockito.when(mock.getExitValue()).thenReturn(0);
+             })) {
+            filesMock.when(() -> Files.writeString(Mockito.argThat(path -> 
path.toString().contains("/tmp/v2v.pass.cloud.vcenter.local.")), 
Mockito.eq("secret")))
+                    .thenAnswer(invocation -> invocation.getArgument(0));
+            filesMock.when(() -> Files.deleteIfExists(Mockito.argThat(path -> 
path.toString().contains("/tmp/v2v.pass.cloud.vcenter.local."))))
+                    .thenReturn(true);
+
+            boolean result = 
convertInstanceCommandWrapper.performInstanceConversionUsingVddk(
+                    remoteInstanceTO, vmName, "/tmp/convert", "/opt/vddk", 
"direct", "nbd:nbdssl", null, 1000L, false, null, "tmp-uuid", "-ip");
+
+            Assert.assertTrue(result);
+            Script scriptMock = ignored.constructed().get(0);
+            Mockito.verify(scriptMock).add(Mockito.contains("-io 
vddk-transports=nbd:nbdssl "));
+        }
+    }
+
+    @Test
+    public void 
testPerformInstanceConversionUsingVddkFailsWhenThumbprintUnavailable() {
+        RemoteInstanceTO remoteInstanceTO = 
Mockito.mock(RemoteInstanceTO.class);
+        
Mockito.when(remoteInstanceTO.getVcenterHost()).thenReturn("vcenter.local");
+        
Mockito.when(remoteInstanceTO.getVcenterUsername()).thenReturn("[email protected]");
+        
Mockito.when(remoteInstanceTO.getVcenterPassword()).thenReturn("secret");
+        Mockito.when(remoteInstanceTO.getDatacenterName()).thenReturn("dc1");
+        Mockito.when(remoteInstanceTO.getClusterName()).thenReturn("cluster1");
+        Mockito.when(remoteInstanceTO.getHostName()).thenReturn("host1");
+        Mockito.doReturn(null)
+                
.when(convertInstanceCommandWrapper).getVcenterThumbprint(Mockito.anyString(), 
Mockito.anyLong(), Mockito.anyString());
+
+        try (MockedStatic<Files> filesMock = Mockito.mockStatic(Files.class)) {
+            filesMock.when(() -> Files.writeString(Mockito.argThat(path -> 
path.toString().contains("/tmp/v2v.pass.cloud.vcenter.local.")), 
Mockito.eq("secret")))
+                    .thenAnswer(invocation -> invocation.getArgument(0));
+            filesMock.when(() -> Files.deleteIfExists(Mockito.argThat(path -> 
path.toString().contains("/tmp/v2v.pass.cloud.vcenter.local."))))
+                    .thenReturn(true);
+
+            boolean result = 
convertInstanceCommandWrapper.performInstanceConversionUsingVddk(
+                    remoteInstanceTO, vmName, "/tmp/convert", "/opt/vddk", 
"direct", null, null, 1000L, false, null, "tmp-uuid", "-ip");
+
+            Assert.assertFalse(result);
+        }
+    }
+
+    @Test
+    public void 
testPerformInstanceConversionUsingVddkUsesConfiguredThumbprintFromAgentProperty()
 {
+        RemoteInstanceTO remoteInstanceTO = 
Mockito.mock(RemoteInstanceTO.class);
+        
Mockito.when(remoteInstanceTO.getVcenterHost()).thenReturn("vcenter.local");
+        
Mockito.when(remoteInstanceTO.getVcenterUsername()).thenReturn("[email protected]");
+        
Mockito.when(remoteInstanceTO.getVcenterPassword()).thenReturn("secret");
+        Mockito.when(remoteInstanceTO.getDatacenterName()).thenReturn("dc1");
+        Mockito.when(remoteInstanceTO.getClusterName()).thenReturn("cluster1");
+        Mockito.when(remoteInstanceTO.getHostName()).thenReturn("host1");
+
+        try (MockedStatic<Files> filesMock = Mockito.mockStatic(Files.class);
+             MockedConstruction<Script> ignored = 
Mockito.mockConstruction(Script.class, (mock, context) -> {
+                 Mockito.when(mock.execute(Mockito.any())).thenReturn("");
+                 Mockito.when(mock.getExitValue()).thenReturn(0);
+             })) {
+            filesMock.when(() -> Files.writeString(Mockito.argThat(path -> 
path.toString().contains("/tmp/v2v.pass.cloud.vcenter.local.")), 
Mockito.eq("secret")))
+                    .thenAnswer(invocation -> invocation.getArgument(0));
+            filesMock.when(() -> Files.deleteIfExists(Mockito.argThat(path -> 
path.toString().contains("/tmp/v2v.pass.cloud.vcenter.local."))))
+                    .thenReturn(true);
+
+            boolean result = 
convertInstanceCommandWrapper.performInstanceConversionUsingVddk(
+                    remoteInstanceTO, vmName, "/tmp/convert", "/opt/vddk", 
"direct", null,
+                    "AA:BB:CC:DD:EE", 1000L, false, null, "tmp-uuid", "-ip");
+
+            Assert.assertTrue(result);
+            Script scriptMock = ignored.constructed().get(0);
+            Mockito.verify(scriptMock).add(Mockito.contains("-io 
vddk-thumbprint=AA:BB:CC:DD:EE "));
+            Mockito.verify(convertInstanceCommandWrapper, Mockito.never())
+                    .getVcenterThumbprint(Mockito.anyString(), 
Mockito.anyLong(), Mockito.anyString());
+        }
+    }
 }
diff --git 
a/server/src/main/java/org/apache/cloudstack/vm/UnmanagedVMsManagerImpl.java 
b/server/src/main/java/org/apache/cloudstack/vm/UnmanagedVMsManagerImpl.java
index 4c9962c4bc1..20bd463c049 100644
--- a/server/src/main/java/org/apache/cloudstack/vm/UnmanagedVMsManagerImpl.java
+++ b/server/src/main/java/org/apache/cloudstack/vm/UnmanagedVMsManagerImpl.java
@@ -212,6 +212,8 @@ public class UnmanagedVMsManagerImpl implements 
UnmanagedVMsManager {
     private static final List<Storage.StoragePoolType> 
forceConvertToPoolAllowedTypes =
             Arrays.asList(Storage.StoragePoolType.NetworkFilesystem, 
Storage.StoragePoolType.Filesystem,
                     Storage.StoragePoolType.SharedMountPoint);
+    private static final String DETAIL_VDDK_TRANSPORTS = "vddk.transports";
+    private static final String DETAIL_VDDK_THUMBPRINT = "vddk.thumbprint";
 
     ConfigKey<Boolean> ConvertVmwareInstanceToKvmExtraParamsAllowed = new 
ConfigKey<>(Boolean.class,
             "convert.vmware.instance.to.kvm.extra.params.allowed",
@@ -1626,6 +1628,8 @@ public class UnmanagedVMsManagerImpl implements 
UnmanagedVMsManager {
         String extraParams = cmd.getExtraParams();
         boolean forceConvertToPool = cmd.getForceConvertToPool();
         Long guestOsId = cmd.getGuestOsId();
+        boolean forceMsToImportVmFiles = 
Boolean.TRUE.equals(cmd.getForceMsToImportVmFiles());
+        boolean useVddk = cmd.getUseVddk();
 
         if ((existingVcenterId == null && vcenter == null) || 
(existingVcenterId != null && vcenter != null)) {
             throw new ServerApiException(ApiErrorCode.PARAM_ERROR,
@@ -1635,8 +1639,14 @@ public class UnmanagedVMsManagerImpl implements 
UnmanagedVMsManager {
             throw new ServerApiException(ApiErrorCode.PARAM_ERROR,
                     "Please set all the information for a vCenter IP/Name, 
datacenter, username and password");
         }
+        if (forceMsToImportVmFiles && useVddk) {
+            throw new ServerApiException(ApiErrorCode.PARAM_ERROR,
+                    String.format("Parameters %s and %s are mutually 
exclusive",
+                            ApiConstants.FORCE_MS_TO_IMPORT_VM_FILES, 
ApiConstants.USE_VDDK));
+        }
 
         checkConversionStoragePool(convertStoragePoolId, forceConvertToPool);
+        validateSelectedConversionStoragePoolForVddk(useVddk, 
convertStoragePoolId, serviceOffering, dataDiskOfferingMap);
 
         checkExtraParamsAllowed(extraParams);
 
@@ -1659,9 +1669,16 @@ public class UnmanagedVMsManagerImpl implements 
UnmanagedVMsManager {
         String ovfTemplateOnConvertLocation = null;
         ImportVmTask importVMTask = null;
         try {
-            HostVO convertHost = 
selectKVMHostForConversionInCluster(destinationCluster, convertInstanceHostId);
-            HostVO importHost = 
selectKVMHostForImportingInCluster(destinationCluster, importInstanceHostId);
-            CheckConvertInstanceAnswer conversionSupportAnswer = 
checkConversionSupportOnHost(convertHost, sourceVMName, false);
+            HostVO convertHost = 
selectKVMHostForConversionInCluster(destinationCluster, convertInstanceHostId, 
useVddk);
+            HostVO importHost = (useVddk && importInstanceHostId == null)
+                    ? convertHost
+                    : selectKVMHostForImportingInCluster(destinationCluster, 
importInstanceHostId);
+
+            boolean isOvfExportSupported = false;
+            CheckConvertInstanceAnswer conversionSupportAnswer = 
checkConversionSupportOnHost(convertHost, sourceVMName, false, useVddk, 
details);
+            if (!useVddk) {
+                isOvfExportSupported = 
conversionSupportAnswer.isOvfExportSupported();
+            }
             logger.debug("The host {}  is selected to execute the conversion 
of the " +
                     "instance {} from VMware to KVM ", convertHost, 
sourceVMName);
 
@@ -1682,12 +1699,12 @@ public class UnmanagedVMsManagerImpl implements 
UnmanagedVMsManager {
             isClonedInstance = sourceInstanceDetails.second();
             boolean isWindowsVm = 
sourceVMwareInstance.getOperatingSystem().toLowerCase().contains("windows");
             if (isWindowsVm) {
-                checkConversionSupportOnHost(convertHost, sourceVMName, true);
+                checkConversionSupportOnHost(convertHost, sourceVMName, true, 
useVddk, details);
             }
 
             checkNetworkingBeforeConvertingVmwareInstance(zone, owner, 
displayName, hostName, sourceVMwareInstance, nicNetworkMap, nicIpAddressMap, 
forced);
             UnmanagedInstanceTO convertedInstance;
-            if (cmd.getForceMsToImportVmFiles() || 
!conversionSupportAnswer.isOvfExportSupported()) {
+            if (!useVddk && (forceMsToImportVmFiles || !isOvfExportSupported)) 
{
                 // Uses MS for OVF export to temporary conversion location
                 int noOfThreads = 
UnmanagedVMsManager.ThreadsOnMSToImportVMwareVMFiles.value();
                 importVmTasksManager.updateImportVMTaskStep(importVMTask, 
zone, owner, convertHost, importHost, null, ConvertingInstance);
@@ -1699,12 +1716,12 @@ public class UnmanagedVMsManagerImpl implements 
UnmanagedVMsManager {
                         serviceOffering, dataDiskOfferingMap, 
temporaryConvertLocation,
                         ovfTemplateOnConvertLocation, forceConvertToPool, 
extraParams);
             } else {
-                // Uses KVM Host for OVF export to temporary conversion 
location, through ovftool
+                // Uses KVM Host for direct conversion using VDDK, or for OVF 
export to temporary conversion location through ovftool
                 importVmTasksManager.updateImportVMTaskStep(importVMTask, 
zone, owner, convertHost, importHost, null, ConvertingInstance);
-                convertedInstance = 
convertVmwareInstanceToKVMAfterExportingOVFToConvertLocation(
+                convertedInstance = 
convertVmwareInstanceToKVMUsingVDDKOrAfterExportingOVFToConvertLocation(
                         sourceVMName, sourceVMwareInstance, convertHost, 
importHost,
                         convertStoragePools, serviceOffering, 
dataDiskOfferingMap,
-                        temporaryConvertLocation, vcenter, username, password, 
datacenterName, forceConvertToPool, extraParams);
+                        temporaryConvertLocation, vcenter, username, password, 
datacenterName, forceConvertToPool, extraParams, useVddk, details);
             }
 
             sanitizeConvertedInstance(convertedInstance, sourceVMwareInstance);
@@ -1759,6 +1776,45 @@ public class UnmanagedVMsManagerImpl implements 
UnmanagedVMsManager {
         }
     }
 
+    protected void validateSelectedConversionStoragePoolForVddk(boolean 
useVddk, Long convertStoragePoolId,
+                                                                
ServiceOfferingVO serviceOffering, Map<String, Long> dataDiskOfferingMap) {
+        if (!useVddk || convertStoragePoolId == null) {
+            return;
+        }
+
+        StoragePoolVO selectedStoragePool = 
primaryDataStoreDao.findById(convertStoragePoolId);
+        if (selectedStoragePool == null) {
+            return;
+        }
+
+        if (serviceOffering.getDiskOfferingId() != null) {
+            DiskOfferingVO rootDiskOffering = 
diskOfferingDao.findById(serviceOffering.getDiskOfferingId());
+            if (rootDiskOffering == null) {
+                throw new InvalidParameterValueException(String.format("Cannot 
find disk offering with ID %s that belongs to the service offering %s",
+                        serviceOffering.getDiskOfferingId(), 
serviceOffering.getName()));
+            }
+            if 
(!volumeApiService.doesStoragePoolSupportDiskOffering(selectedStoragePool, 
rootDiskOffering)) {
+                throw new InvalidParameterValueException(String.format("The 
root disk offering '%s' is not supported by the selected conversion storage 
pool '%s'. " +
+                        "When using VDDK, all selected disk offerings must be 
compatible with the conversion storage pool, as it will become the primary 
storage for the imported volumes.",
+                        rootDiskOffering.getName(), 
selectedStoragePool.getName()));
+            }
+        }
+
+        if (MapUtils.isNotEmpty(dataDiskOfferingMap)) {
+            for (Long diskOfferingId : dataDiskOfferingMap.values()) {
+                DiskOfferingVO diskOffering = 
diskOfferingDao.findById(diskOfferingId);
+                if (diskOffering == null) {
+                    throw new 
InvalidParameterValueException(String.format("Cannot find disk offering with ID 
%s", diskOfferingId));
+                }
+                if 
(!volumeApiService.doesStoragePoolSupportDiskOffering(selectedStoragePool, 
diskOffering)) {
+                    throw new 
InvalidParameterValueException(String.format("The data disk offering '%s' is 
not supported by the selected conversion storage pool '%s'. " +
+                            "When using VDDK, all selected disk offerings must 
be compatible with the conversion storage pool, as it will become the primary 
storage for the imported volumes.",
+                            diskOffering.getName(), 
selectedStoragePool.getName()));
+                }
+            }
+        }
+    }
+
     private void checkNetworkingBeforeConvertingVmwareInstance(DataCenter 
zone, Account owner, String displayName,
                                                                String 
hostName, UnmanagedInstanceTO sourceVMwareInstance,
                                                                Map<String, 
Long> nicNetworkMap,
@@ -1921,7 +1977,7 @@ public class UnmanagedVMsManagerImpl implements 
UnmanagedVMsManager {
         throw new CloudRuntimeException(err);
     }
 
-    HostVO selectKVMHostForConversionInCluster(Cluster destinationCluster, 
Long convertInstanceHostId) {
+    HostVO selectKVMHostForConversionInCluster(Cluster destinationCluster, 
Long convertInstanceHostId, boolean useVddk) {
         if (convertInstanceHostId != null) {
             HostVO selectedHost = hostDao.findById(convertInstanceHostId);
             String err = null;
@@ -1955,24 +2011,58 @@ public class UnmanagedVMsManagerImpl implements 
UnmanagedVMsManager {
         // Auto select host with conversion capability
         List<HostVO> hosts = 
hostDao.listByClusterHypervisorTypeAndHostCapability(destinationCluster.getId(),
 destinationCluster.getHypervisorType(), Host.HOST_INSTANCE_CONVERSION);
         if (CollectionUtils.isNotEmpty(hosts)) {
-            return hosts.get(new Random().nextInt(hosts.size()));
+            if (useVddk) {
+                List<HostVO> vddkHosts = filterHostsWithVddkSupport(hosts);
+                if (CollectionUtils.isNotEmpty(vddkHosts)) {
+                    hosts = vddkHosts;
+                }
+            }
+            if (CollectionUtils.isNotEmpty(hosts)) {
+                return hosts.get(new Random().nextInt(hosts.size()));
+            }
         }
 
         // Try without host capability check
         hosts = 
hostDao.listByClusterAndHypervisorType(destinationCluster.getId(), 
destinationCluster.getHypervisorType());
         if (CollectionUtils.isNotEmpty(hosts)) {
-            return hosts.get(new Random().nextInt(hosts.size()));
+            if (useVddk) {
+                List<HostVO> vddkHosts = filterHostsWithVddkSupport(hosts);
+                if (CollectionUtils.isNotEmpty(vddkHosts)) {
+                    hosts = vddkHosts;
+                }
+            }
+            if (CollectionUtils.isNotEmpty(hosts)) {
+                return hosts.get(new Random().nextInt(hosts.size()));
+            }
         }
 
-        String err = String.format("Could not find any suitable %s host in 
cluster %s to perform the instance conversion",
-                destinationCluster.getHypervisorType(), destinationCluster);
+        String err = useVddk
+                ? String.format("Could not find any suitable %s host in 
cluster %s with '%s' configured to perform the VDDK-based instance conversion",
+                        destinationCluster.getHypervisorType(), 
destinationCluster, Host.HOST_VDDK_SUPPORT)
+                : String.format("Could not find any suitable %s host in 
cluster %s to perform the instance conversion",
+                        destinationCluster.getHypervisorType(), 
destinationCluster);
         logger.error(err);
         throw new CloudRuntimeException(err);
     }
 
-    private CheckConvertInstanceAnswer checkConversionSupportOnHost(HostVO 
convertHost, String sourceVM, boolean checkWindowsGuestConversionSupport) {
-        logger.debug(String.format("Checking the %s conversion support on the 
host %s", checkWindowsGuestConversionSupport? "windows guest" : "", 
convertHost));
-        CheckConvertInstanceCommand cmd = new 
CheckConvertInstanceCommand(checkWindowsGuestConversionSupport);
+    private List<HostVO> filterHostsWithVddkSupport(List<HostVO> hosts) {
+        return hosts.stream().filter(h -> {
+            hostDao.loadDetails(h);
+            return Boolean.parseBoolean(h.getDetail(Host.HOST_VDDK_SUPPORT));
+        }).collect(Collectors.toList());
+    }
+
+    private CheckConvertInstanceAnswer checkConversionSupportOnHost(HostVO 
convertHost, String sourceVM,
+                                                                    boolean 
checkWindowsGuestConversionSupport,
+                                                                    boolean 
useVddk, Map<String, String> details) {
+        logger.debug(String.format("Checking the %s%s conversion support on 
the host %s",
+                useVddk ? "VDDK " : "",
+                checkWindowsGuestConversionSupport ? "windows guest " : "",
+                convertHost));
+        CheckConvertInstanceCommand cmd = new 
CheckConvertInstanceCommand(checkWindowsGuestConversionSupport, useVddk);
+        if (MapUtils.isNotEmpty(details)) {
+            
cmd.setVddkLibDir(StringUtils.trimToNull(details.get(Host.HOST_VDDK_LIB_DIR)));
+        }
         int timeoutSeconds = 60;
         cmd.setWait(timeoutSeconds);
 
@@ -2006,7 +2096,7 @@ public class UnmanagedVMsManagerImpl implements 
UnmanagedVMsManager {
         logger.debug("Delegating the conversion of instance {} from VMware to 
KVM to the host {} using OVF {} on conversion datastore",
                 sourceVM, convertHost, ovfTemplateDirConvertLocation);
 
-        RemoteInstanceTO remoteInstanceTO = new RemoteInstanceTO(sourceVM);
+        RemoteInstanceTO remoteInstanceTO = new RemoteInstanceTO(sourceVM, 
sourceVMwareInstance.getClusterName(), sourceVMwareInstance.getHostName());
         List<String> destinationStoragePools = 
selectInstanceConversionStoragePools(convertStoragePools, 
sourceVMwareInstance.getDisks(), serviceOffering, dataDiskOfferingMap);
         ConvertInstanceCommand cmd = new 
ConvertInstanceCommand(remoteInstanceTO,
                 Hypervisor.HypervisorType.KVM, temporaryConvertLocation,
@@ -2021,15 +2111,16 @@ public class UnmanagedVMsManagerImpl implements 
UnmanagedVMsManager {
                 remoteInstanceTO, destinationStoragePools, 
temporaryConvertLocation, forceConvertToPool);
     }
 
-    private UnmanagedInstanceTO 
convertVmwareInstanceToKVMAfterExportingOVFToConvertLocation(
+    private UnmanagedInstanceTO 
convertVmwareInstanceToKVMUsingVDDKOrAfterExportingOVFToConvertLocation(
             String sourceVM, UnmanagedInstanceTO sourceVMwareInstance, HostVO 
convertHost,
             HostVO importHost, List<StoragePoolVO> convertStoragePools,
             ServiceOfferingVO serviceOffering, Map<String, Long> 
dataDiskOfferingMap,
             DataStoreTO temporaryConvertLocation, String vcenterHost, String 
vcenterUsername,
-            String vcenterPassword, String datacenterName, boolean 
forceConvertToPool, String extraParams) {
+            String vcenterPassword, String datacenterName, boolean 
forceConvertToPool, String extraParams,
+            boolean useVddk, Map<String, String> details) {
         logger.debug("Delegating the conversion of instance {} from VMware to 
KVM to the host {} after OVF export through ovftool", sourceVM, convertHost);
 
-        RemoteInstanceTO remoteInstanceTO = new 
RemoteInstanceTO(sourceVMwareInstance.getName(), 
sourceVMwareInstance.getPath(), vcenterHost, vcenterUsername, vcenterPassword, 
datacenterName);
+        RemoteInstanceTO remoteInstanceTO = new 
RemoteInstanceTO(sourceVMwareInstance.getName(), 
sourceVMwareInstance.getPath(), vcenterHost, vcenterUsername, vcenterPassword, 
datacenterName, sourceVMwareInstance.getClusterName(), 
sourceVMwareInstance.getHostName());
         List<String> destinationStoragePools = 
selectInstanceConversionStoragePools(convertStoragePools, 
sourceVMwareInstance.getDisks(), serviceOffering, dataDiskOfferingMap);
         ConvertInstanceCommand cmd = new 
ConvertInstanceCommand(remoteInstanceTO,
                 Hypervisor.HypervisorType.KVM, temporaryConvertLocation, null, 
false, true, sourceVM);
@@ -2044,10 +2135,22 @@ public class UnmanagedVMsManagerImpl implements 
UnmanagedVMsManager {
         if (StringUtils.isNotBlank(extraParams)) {
             cmd.setExtraParams(extraParams);
         }
+        cmd.setUseVddk(useVddk);
+        applyVddkOverridesFromDetails(cmd, details);
         return convertAndImportToKVM(cmd, convertHost, importHost, sourceVM,
                 remoteInstanceTO, destinationStoragePools, 
temporaryConvertLocation, forceConvertToPool);
     }
 
+    private void applyVddkOverridesFromDetails(ConvertInstanceCommand cmd, 
Map<String, String> details) {
+        if (MapUtils.isEmpty(details)) {
+            return;
+        }
+
+        
cmd.setVddkLibDir(StringUtils.trimToNull(details.get(Host.HOST_VDDK_LIB_DIR)));
+        
cmd.setVddkTransports(StringUtils.trimToNull(details.get(DETAIL_VDDK_TRANSPORTS)));
+        
cmd.setVddkThumbprint(StringUtils.trimToNull(details.get(DETAIL_VDDK_THUMBPRINT)));
+    }
+
     private UnmanagedInstanceTO convertAndImportToKVM(ConvertInstanceCommand 
convertInstanceCommand, HostVO convertHost, HostVO importHost,
                                                       String sourceVM,
                                                       RemoteInstanceTO 
remoteInstanceTO,
diff --git 
a/server/src/test/java/org/apache/cloudstack/vm/UnmanagedVMsManagerImplTest.java
 
b/server/src/test/java/org/apache/cloudstack/vm/UnmanagedVMsManagerImplTest.java
index dfc1829ab5c..0decef7adf9 100644
--- 
a/server/src/test/java/org/apache/cloudstack/vm/UnmanagedVMsManagerImplTest.java
+++ 
b/server/src/test/java/org/apache/cloudstack/vm/UnmanagedVMsManagerImplTest.java
@@ -716,7 +716,17 @@ public class UnmanagedVMsManagerImplTest {
     }
 
     private enum VcenterParameter {
-        EXISTING, EXTERNAL, BOTH, NONE, EXISTING_INVALID, AGENT_UNAVAILABLE, 
CONVERT_FAILURE
+        EXISTING,
+        EXTERNAL,
+        BOTH,
+        NONE,
+        EXISTING_INVALID,
+        AGENT_UNAVAILABLE,
+        CONVERT_FAILURE,
+        FORCE_MS_AND_USE_VDDK,
+        USE_VDDK_OVF_UNSUPPORTED,
+        USE_VDDK_OVF_SUPPORTED,
+        USE_VDDK_DETAILS_OVERRIDES
     }
 
     private void baseTestImportVmFromVmwareToKvm(VcenterParameter 
vcenterParameter, boolean selectConvertHost,
@@ -753,6 +763,34 @@ public class UnmanagedVMsManagerImplTest {
         when(importVmCmd.getConvertInstanceHostId()).thenReturn(null);
         when(importVmCmd.getImportInstanceHostId()).thenReturn(null);
         when(importVmCmd.getConvertStoragePoolId()).thenReturn(null);
+        when(importVmCmd.getExistingVcenterId()).thenReturn(null);
+        when(importVmCmd.getVcenter()).thenReturn(null);
+        when(importVmCmd.getDatacenterName()).thenReturn(null);
+        when(importVmCmd.getUsername()).thenReturn(null);
+        when(importVmCmd.getPassword()).thenReturn(null);
+        when(importVmCmd.getDetails()).thenReturn(new HashMap<>());
+
+        boolean forceMsToImportVmFiles = false;
+        boolean useVddk = false;
+        boolean ovfExportSupported = false;
+        if (VcenterParameter.FORCE_MS_AND_USE_VDDK == vcenterParameter) {
+            forceMsToImportVmFiles = true;
+            useVddk = true;
+        } else if (VcenterParameter.USE_VDDK_OVF_UNSUPPORTED == 
vcenterParameter) {
+            useVddk = true;
+        } else if (VcenterParameter.USE_VDDK_OVF_SUPPORTED == 
vcenterParameter) {
+            useVddk = true;
+            ovfExportSupported = true;
+        } else if (VcenterParameter.USE_VDDK_DETAILS_OVERRIDES == 
vcenterParameter) {
+            useVddk = true;
+            ovfExportSupported = true;
+            when(importVmCmd.getDetails()).thenReturn(Map.of(
+                    "vddk.lib.dir", "/opt/vmware-vddk/override",
+                    "vddk.transports", "nbd:nbdssl",
+                    "vddk.thumbprint", "AA:BB:CC:DD:EE"));
+        }
+        
when(importVmCmd.getForceMsToImportVmFiles()).thenReturn(forceMsToImportVmFiles);
+        when(importVmCmd.getUseVddk()).thenReturn(useVddk);
 
         NetworkVO networkVO = Mockito.mock(NetworkVO.class);
         when(networkVO.getGuestType()).thenReturn(Network.GuestType.L2);
@@ -814,11 +852,6 @@ public class UnmanagedVMsManagerImplTest {
             when(datacenterVO.getPassword()).thenReturn(password);
             
when(importVmCmd.getExistingVcenterId()).thenReturn(existingDatacenterId);
             
when(vmwareDatacenterDao.findById(existingDatacenterId)).thenReturn(datacenterVO);
-        } else if (VcenterParameter.EXTERNAL == vcenterParameter) {
-            when(importVmCmd.getVcenter()).thenReturn(vcenterHost);
-            when(importVmCmd.getDatacenterName()).thenReturn(datacenter);
-            when(importVmCmd.getUsername()).thenReturn(username);
-            when(importVmCmd.getPassword()).thenReturn(password);
         }
 
         if (VcenterParameter.BOTH == vcenterParameter) {
@@ -832,8 +865,20 @@ public class UnmanagedVMsManagerImplTest {
             
when(vmwareDatacenterDao.findById(existingDatacenterId)).thenReturn(null);
         }
 
+        if (VcenterParameter.FORCE_MS_AND_USE_VDDK == vcenterParameter
+                || VcenterParameter.USE_VDDK_OVF_UNSUPPORTED == 
vcenterParameter
+                || VcenterParameter.USE_VDDK_OVF_SUPPORTED == vcenterParameter
+                || VcenterParameter.USE_VDDK_DETAILS_OVERRIDES == 
vcenterParameter) {
+            Mockito.doReturn((Long) 
null).when(importVmCmd).getExistingVcenterId();
+            Mockito.doReturn(vcenterHost).when(importVmCmd).getVcenter();
+            Mockito.doReturn(datacenter).when(importVmCmd).getDatacenterName();
+            Mockito.doReturn(username).when(importVmCmd).getUsername();
+            Mockito.doReturn(password).when(importVmCmd).getPassword();
+        }
+
         CheckConvertInstanceAnswer checkConvertInstanceAnswer = 
mock(CheckConvertInstanceAnswer.class);
         
when(checkConvertInstanceAnswer.getResult()).thenReturn(vcenterParameter != 
VcenterParameter.CONVERT_FAILURE);
+        
when(checkConvertInstanceAnswer.isOvfExportSupported()).thenReturn(ovfExportSupported);
         if (VcenterParameter.AGENT_UNAVAILABLE != vcenterParameter) {
             when(agentManager.send(Mockito.eq(convertHostId), 
Mockito.any(CheckConvertInstanceCommand.class))).thenReturn(checkConvertInstanceAnswer);
         }
@@ -856,9 +901,29 @@ public class UnmanagedVMsManagerImplTest {
              MockedConstruction<CheckedReservation> mockCheckedReservation = 
Mockito.mockConstruction(CheckedReservation.class)) {
             unmanagedVMsManager.importVm(importVmCmd);
             
verify(vmwareGuru).getHypervisorVMOutOfBandAndCloneIfRequired(Mockito.eq(host), 
Mockito.eq(vmName), anyMap());
-            verify(vmwareGuru).createVMTemplateOutOfBand(Mockito.eq(host), 
Mockito.eq(vmName), anyMap(), any(DataStoreTO.class), anyInt());
+            if (VcenterParameter.USE_VDDK_OVF_SUPPORTED == vcenterParameter) {
+                verify(vmwareGuru, 
Mockito.never()).createVMTemplateOutOfBand(anyString(), anyString(), anyMap(), 
any(DataStoreTO.class), anyInt());
+                verify(agentManager).send(Mockito.eq(convertHostId), 
Mockito.<com.cloud.agent.api.Command>argThat(command ->
+                        command instanceof ConvertInstanceCommand && 
((ConvertInstanceCommand) command).isUseVddk()));
+                verify(vmwareGuru, 
Mockito.never()).removeVMTemplateOutOfBand(any(DataStoreTO.class), anyString());
+            } else if (VcenterParameter.USE_VDDK_DETAILS_OVERRIDES == 
vcenterParameter) {
+                verify(vmwareGuru, 
Mockito.never()).createVMTemplateOutOfBand(anyString(), anyString(), anyMap(), 
any(DataStoreTO.class), anyInt());
+                verify(agentManager).send(Mockito.eq(convertHostId), 
Mockito.<com.cloud.agent.api.Command>argThat(command -> {
+                    if (!(command instanceof ConvertInstanceCommand)) {
+                        return false;
+                    }
+                    ConvertInstanceCommand convertCmd = 
(ConvertInstanceCommand) command;
+                    return convertCmd.isUseVddk()
+                            && 
"/opt/vmware-vddk/override".equals(convertCmd.getVddkLibDir())
+                            && 
"nbd:nbdssl".equals(convertCmd.getVddkTransports())
+                            && 
"AA:BB:CC:DD:EE".equals(convertCmd.getVddkThumbprint());
+                }));
+                verify(vmwareGuru, 
Mockito.never()).removeVMTemplateOutOfBand(any(DataStoreTO.class), anyString());
+            } else {
+                verify(vmwareGuru).createVMTemplateOutOfBand(Mockito.eq(host), 
Mockito.eq(vmName), anyMap(), any(DataStoreTO.class), anyInt());
+                
verify(vmwareGuru).removeVMTemplateOutOfBand(any(DataStoreTO.class), 
anyString());
+            }
             
verify(vmwareGuru).removeClonedHypervisorVMOutOfBand(Mockito.eq(host), 
Mockito.eq(vmName), anyMap());
-            
verify(vmwareGuru).removeVMTemplateOutOfBand(any(DataStoreTO.class), 
anyString());
         }
     }
 
@@ -952,6 +1017,49 @@ public class UnmanagedVMsManagerImplTest {
         baseTestImportVmFromVmwareToKvm(VcenterParameter.CONVERT_FAILURE, 
false, false);
     }
 
+    @Test(expected = ServerApiException.class)
+    public void 
testImportVmFromVmwareToKvmForceMsMutuallyExclusiveWithUseVddk() throws 
OperationTimedoutException, AgentUnavailableException {
+        
baseTestImportVmFromVmwareToKvm(VcenterParameter.FORCE_MS_AND_USE_VDDK, false, 
false);
+    }
+
+    @Test(expected = InvalidParameterValueException.class)
+    public void 
testValidateSelectedConversionStoragePoolForVddkFailsWhenPoolDoesNotSupportDiskOfferings()
 {
+        long poolId = 11L;
+        StoragePoolVO selectedPool = mock(StoragePoolVO.class);
+        ServiceOfferingVO serviceOffering = mock(ServiceOfferingVO.class);
+        DiskOfferingVO rootDiskOffering = mock(DiskOfferingVO.class);
+        DiskOfferingVO dataDiskOffering = mock(DiskOfferingVO.class);
+
+        when(serviceOffering.getDiskOfferingId()).thenReturn(21L);
+        when(primaryDataStoreDao.findById(poolId)).thenReturn(selectedPool);
+        when(diskOfferingDao.findById(21L)).thenReturn(rootDiskOffering);
+        when(diskOfferingDao.findById(22L)).thenReturn(dataDiskOffering);
+        when(volumeApiService.doesStoragePoolSupportDiskOffering(selectedPool, 
rootDiskOffering)).thenReturn(true);
+        when(volumeApiService.doesStoragePoolSupportDiskOffering(selectedPool, 
dataDiskOffering)).thenReturn(false);
+
+        unmanagedVMsManager.validateSelectedConversionStoragePoolForVddk(true, 
poolId,
+                serviceOffering, Map.of("1000-2", 22L));
+    }
+
+    @Test
+    public void 
testValidateSelectedConversionStoragePoolForVddkPassesWhenPoolSupportsAllDiskOfferings()
 {
+        long poolId = 12L;
+        StoragePoolVO selectedPool = mock(StoragePoolVO.class);
+        ServiceOfferingVO serviceOffering = mock(ServiceOfferingVO.class);
+        DiskOfferingVO rootDiskOffering = mock(DiskOfferingVO.class);
+        DiskOfferingVO dataDiskOffering = mock(DiskOfferingVO.class);
+
+        when(serviceOffering.getDiskOfferingId()).thenReturn(31L);
+        when(primaryDataStoreDao.findById(poolId)).thenReturn(selectedPool);
+        when(diskOfferingDao.findById(31L)).thenReturn(rootDiskOffering);
+        when(diskOfferingDao.findById(32L)).thenReturn(dataDiskOffering);
+        when(volumeApiService.doesStoragePoolSupportDiskOffering(selectedPool, 
rootDiskOffering)).thenReturn(true);
+        when(volumeApiService.doesStoragePoolSupportDiskOffering(selectedPool, 
dataDiskOffering)).thenReturn(true);
+
+        unmanagedVMsManager.validateSelectedConversionStoragePoolForVddk(true, 
poolId,
+                serviceOffering, Map.of("1000-2", 32L));
+    }
+
     private ClusterVO getClusterForTests() {
         ClusterVO cluster = mock(ClusterVO.class);
         when(cluster.getId()).thenReturn(1L);
@@ -1133,7 +1241,7 @@ public class UnmanagedVMsManagerImplTest {
 
         when(hostDao.findById(hostId)).thenReturn(host);
 
-        HostVO returnedHost = 
unmanagedVMsManager.selectKVMHostForConversionInCluster(cluster, hostId);
+        HostVO returnedHost = 
unmanagedVMsManager.selectKVMHostForConversionInCluster(cluster, hostId, false);
         Assert.assertEquals(host, returnedHost);
     }
 
@@ -1149,7 +1257,7 @@ public class UnmanagedVMsManagerImplTest {
 
         when(hostDao.findById(hostId)).thenReturn(host);
 
-        HostVO returnedHost = 
unmanagedVMsManager.selectKVMHostForConversionInCluster(cluster, hostId);
+        HostVO returnedHost = 
unmanagedVMsManager.selectKVMHostForConversionInCluster(cluster, hostId, false);
         Assert.assertEquals(host, returnedHost);
     }
 
@@ -1161,7 +1269,7 @@ public class UnmanagedVMsManagerImplTest {
         
when(hostDao.listByClusterHypervisorTypeAndHostCapability(cluster.getId(),
                 cluster.getHypervisorType(), 
Host.HOST_INSTANCE_CONVERSION)).thenReturn(List.of(host));
 
-        HostVO returnedHost = 
unmanagedVMsManager.selectKVMHostForConversionInCluster(cluster, null);
+        HostVO returnedHost = 
unmanagedVMsManager.selectKVMHostForConversionInCluster(cluster, null, false);
         Assert.assertEquals(host, returnedHost);
     }
 
@@ -1175,7 +1283,7 @@ public class UnmanagedVMsManagerImplTest {
 
         when(hostDao.listByClusterAndHypervisorType(cluster.getId(), 
cluster.getHypervisorType())).thenReturn(List.of(host));
 
-        HostVO returnedHost = 
unmanagedVMsManager.selectKVMHostForConversionInCluster(cluster, null);
+        HostVO returnedHost = 
unmanagedVMsManager.selectKVMHostForConversionInCluster(cluster, null, false);
         Assert.assertEquals(host, returnedHost);
     }
 
@@ -1188,7 +1296,7 @@ public class UnmanagedVMsManagerImplTest {
 
         when(hostDao.listByClusterAndHypervisorType(cluster.getId(), 
cluster.getHypervisorType())).thenReturn(List.of());
 
-        unmanagedVMsManager.selectKVMHostForConversionInCluster(cluster, null);
+        unmanagedVMsManager.selectKVMHostForConversionInCluster(cluster, null, 
false);
     }
 
     @Test(expected = CloudRuntimeException.class)
@@ -1203,7 +1311,7 @@ public class UnmanagedVMsManagerImplTest {
 
         when(hostDao.findById(hostId)).thenReturn(host);
 
-        unmanagedVMsManager.selectKVMHostForConversionInCluster(cluster, 
hostId);
+        unmanagedVMsManager.selectKVMHostForConversionInCluster(cluster, 
hostId, false);
     }
 
     @Test(expected = CloudRuntimeException.class)
@@ -1217,7 +1325,7 @@ public class UnmanagedVMsManagerImplTest {
 
         when(hostDao.findById(hostId)).thenReturn(host);
 
-        unmanagedVMsManager.selectKVMHostForConversionInCluster(cluster, 
hostId);
+        unmanagedVMsManager.selectKVMHostForConversionInCluster(cluster, 
hostId, false);
     }
 
     @Test(expected = CloudRuntimeException.class)
@@ -1230,7 +1338,7 @@ public class UnmanagedVMsManagerImplTest {
 
         when(hostDao.findById(hostId)).thenReturn(host);
 
-        unmanagedVMsManager.selectKVMHostForConversionInCluster(cluster, 
hostId);
+        unmanagedVMsManager.selectKVMHostForConversionInCluster(cluster, 
hostId, false);
     }
 
     @Test(expected = CloudRuntimeException.class)
@@ -1242,7 +1350,7 @@ public class UnmanagedVMsManagerImplTest {
 
         when(hostDao.findById(hostId)).thenReturn(host);
 
-        unmanagedVMsManager.selectKVMHostForConversionInCluster(cluster, 
hostId);
+        unmanagedVMsManager.selectKVMHostForConversionInCluster(cluster, 
hostId, false);
     }
 
     @Test(expected = CloudRuntimeException.class)
@@ -1252,7 +1360,23 @@ public class UnmanagedVMsManagerImplTest {
 
         when(hostDao.findById(hostId)).thenReturn(null);
 
-        unmanagedVMsManager.selectKVMHostForConversionInCluster(cluster, 
hostId);
+        unmanagedVMsManager.selectKVMHostForConversionInCluster(cluster, 
hostId, false);
+    }
+
+    @Test
+    public void 
testSelectKVMHostForConversionInClusterVddkAutoSelectsHostWithVddkSupport() {
+        ClusterVO cluster = getClusterForTests();
+        HostVO hostWithVddk = Mockito.mock(HostVO.class);
+        HostVO hostWithoutVddk = Mockito.mock(HostVO.class);
+        
when(hostWithVddk.getDetail(Host.HOST_VDDK_SUPPORT)).thenReturn("true");
+        
when(hostWithoutVddk.getDetail(Host.HOST_VDDK_SUPPORT)).thenReturn(null);
+
+        
when(hostDao.listByClusterHypervisorTypeAndHostCapability(cluster.getId(),
+                cluster.getHypervisorType(), Host.HOST_INSTANCE_CONVERSION))
+                .thenReturn(List.of(hostWithoutVddk, hostWithVddk));
+
+        HostVO returnedHost = 
unmanagedVMsManager.selectKVMHostForConversionInCluster(cluster, null, true);
+        Assert.assertEquals(hostWithVddk, returnedHost);
     }
 
     @Test
diff --git a/ui/public/locales/en.json b/ui/public/locales/en.json
index 1bfa64fc24f..f5b7cc647dc 100644
--- a/ui/public/locales/en.json
+++ b/ui/public/locales/en.json
@@ -1217,6 +1217,8 @@
 "label.host.alerts": "Hosts in alert state",
 "label.host.name": "Host name",
 "label.host.ovftool.version": "OVFTool Version",
+"label.host.vddk.support": "VDDK Support",
+"label.host.vddk.version": "VDDK Version",
 "label.host.tag": "Host tag",
 "label.host.virtv2v.version": "Virt-v2v Version",
 "label.hostcontrolstate": "Compute Resource Status",
@@ -2376,6 +2378,7 @@
 "label.user.data.policy.tooltip": "User Data linked to the Template can be 
overridden by User Data provided during Instance deploy. Select the override 
policy as required.",
 "label.user.data": "User Data",
 "label.user.data.library": "User Data Library",
+"label.use.vddk": "Use VDDK",
 "label.ssh.port": "SSH port",
 "label.sshkeypair": "New SSH key pair",
 "label.sshkeypairs": "SSH key pairs",
diff --git a/ui/src/views/infra/HostInfo.vue b/ui/src/views/infra/HostInfo.vue
index 0fea5788a31..cb16a5a8bbe 100644
--- a/ui/src/views/infra/HostInfo.vue
+++ b/ui/src/views/infra/HostInfo.vue
@@ -64,6 +64,22 @@
           </div>
         </div>
       </a-list-item>
+      <a-list-item v-if="host.details && host.details['host.vddk.support']">
+        <div>
+          <strong>{{ $t('label.host.vddk.support') }}</strong>
+          <div>
+            {{ host.details['host.vddk.support'] }}
+          </div>
+        </div>
+      </a-list-item>
+      <a-list-item v-if="host.details && host.details['host.vddk.version']">
+        <div>
+          <strong>{{ $t('label.host.vddk.version') }}</strong>
+          <div>
+            {{ host.details['host.vddk.version'] }}
+          </div>
+        </div>
+      </a-list-item>
       <a-list-item v-if="host.details && host.details['host.ovftool.version']">
         <div>
           <strong>{{ $t('label.host.ovftool.version') }}</strong>
diff --git a/ui/src/views/tools/ImportUnmanagedInstance.vue 
b/ui/src/views/tools/ImportUnmanagedInstance.vue
index cbe0dc7d5a0..ffa0d934433 100644
--- a/ui/src/views/tools/ImportUnmanagedInstance.vue
+++ b/ui/src/views/tools/ImportUnmanagedInstance.vue
@@ -152,6 +152,12 @@
                   </a-row>
                 </a-radio-group>
               </a-form-item>
+              <a-form-item name="usevddk" ref="usevddk" 
v-if="selectedVmwareVcenter">
+                <template #label>
+                  <tooltip-label :title="$t('label.use.vddk')" 
:tooltip="apiParams.usevddk ? apiParams.usevddk.description : ''"/>
+                </template>
+                <a-switch v-model:checked="form.usevddk" 
@change="onUseVddkChange" />
+              </a-form-item>
               <a-form-item name="forceconverttopool" ref="forceconverttopool" 
v-if="selectedVmwareVcenter">
                 <template #label>
                   <tooltip-label :title="$t('label.force.convert.to.pool')" 
:tooltip="apiParams.forceconverttopool.description"/>
@@ -170,7 +176,7 @@
                   
@handle-checkselectpair-change="updateSelectedKvmHostForConversion"
                 />
               </a-form-item>
-              <a-form-item name="importhostid" ref="importhostid">
+              <a-form-item name="importhostid" ref="importhostid" 
v-if="!form.usevddk">
                 <check-box-select-pair
                   layout="vertical"
                   v-if="cluster.hypervisortype === 'KVM' && 
selectedVmwareVcenter"
@@ -184,12 +190,13 @@
               </a-form-item>
               <a-form-item name="convertstorageoption" 
ref="convertstorageoption">
                 <check-box-select-pair
+                  :key="`convertstorageoption-${form.usevddk ? 'vddk' : 
'default'}-${switches.forceConvertToPool ? 'pool' : 'tmp'}`"
                   layout="vertical"
                   v-if="cluster.hypervisortype === 'KVM' && 
selectedVmwareVcenter"
                   :resourceKey="cluster.id"
                   :selectOptions="storageOptionsForConversion"
                   :checkBoxLabel="switches.forceConvertToPool ? 
$t('message.select.destination.storage.instance.conversion') : 
$t('message.select.temporary.storage.instance.conversion')"
-                  :defaultCheckBoxValue="false"
+                  :defaultCheckBoxValue="switches.forceConvertToPool"
                   :reversed="false"
                   
@handle-checkselectpair-change="updateSelectedStorageOptionForConversion"
                 />
@@ -226,7 +233,7 @@
                   :placeholder="$t('label.extra')"
                 />
               </a-form-item>
-              <a-form-item name="forcemstoimportvmfiles" 
ref="forcemstoimportvmfiles" v-if="selectedVmwareVcenter">
+              <a-form-item name="forcemstoimportvmfiles" 
ref="forcemstoimportvmfiles" v-if="selectedVmwareVcenter && !form.usevddk">
                 <template #label>
                   <tooltip-label 
:title="$t('label.force.ms.to.import.vm.files')" 
:tooltip="apiParams.forcemstoimportvmfiles.description"/>
                 </template>
@@ -581,7 +588,8 @@ export default {
       selectedRootDiskSources: [],
       vmwareToKvmExtraParamsAllowed: false,
       vmwareToKvmExtraParamsSelected: false,
-      vmwareToKvmExtraParams: ''
+      vmwareToKvmExtraParams: '',
+      userModifiedVddkSetting: false
     }
   },
   beforeCreate () {
@@ -778,6 +786,7 @@ export default {
       this.formRef = ref()
       this.form = reactive({
         rootdiskid: 0,
+        usevddk: false,
         migrateallowed: this.switches.migrateAllowed,
         forced: this.switches.forced,
         forcemstoimportvmfiles: this.switches.forceMsToImportVmFiles,
@@ -1011,6 +1020,8 @@ export default {
       }).then(json => {
         this.kvmHostsForConversion = json.listhostsresponse.host || []
         this.kvmHostsForConversion = this.kvmHostsForConversion.filter(host => 
['Enabled', 'Disabled'].includes(host.resourcestate))
+        // Check if any host has VDDK support
+        let hasVddkSupport = false
         this.kvmHostsForConversion.map(host => {
           host.name = host.name + ' [Pod=' + host.podname + '] [Cluster=' + 
host.clustername + ']'
           if (host.instanceconversionsupported !== null && 
host.instanceconversionsupported !== undefined && 
host.instanceconversionsupported) {
@@ -1024,7 +1035,29 @@ export default {
           if (host.details['host.ovftool.version']) {
             host.name = host.name + ' (ovftool=' + 
host.details['host.ovftool.version'] + ')'
           }
+          // Check for VDDK support
+          if (host.details['host.vddk.support'] === 'true' || 
host.details['host.vddk.support'] === true) {
+            hasVddkSupport = true
+          }
+
+          if (this.form.usevddk) {
+            if (host.details['host.vddk.support'] === 'true' || 
host.details['host.vddk.support'] === true) {
+              host.name = host.name + ' (VDDK=' + this.$t('label.supported') + 
')'
+            } else {
+              host.name = host.name + ' (VDDK=' + 
this.$t('label.not.supported') + ')'
+            }
+            if (host.details['host.vddk.version']) {
+              host.name = host.name + ' (vddk=' + 
host.details['host.vddk.version'] + ')'
+            }
+          }
         })
+
+        // Enable usevddk by default if at least one host has VDDK support
+        // Only auto-enable if user hasn't manually modified the setting
+        if (hasVddkSupport && !this.form.usevddk && 
!this.userModifiedVddkSetting) {
+          this.form.usevddk = true
+          this.onUseVddkChange(true, false)
+        }
       })
     },
     fetchKvmHostsForImporting () {
@@ -1052,6 +1085,11 @@ export default {
         }
         getAPI('listStoragePools', params).then(json => {
           this.storagePoolsForConversion = 
json.liststoragepoolsresponse.storagepool || []
+          // Keep selected pool state aligned when the value is auto-populated 
by v-model.
+          if (this.form.convertstoragepoolid) {
+            const poolExists = this.storagePoolsForConversion.some(pool => 
pool.id === this.form.convertstoragepoolid)
+            this.selectedStoragePoolForConversion = poolExists ? 
this.form.convertstoragepoolid : null
+          }
         })
       } else if (this.selectedStorageOptionForConversion === 'local') {
         const kvmHost = this.kvmHostsForConversion.filter(x => x.id === 
this.selectedKvmHostForConversion)[0]
@@ -1061,6 +1099,10 @@ export default {
           status: 'Up'
         }).then(json => {
           this.storagePoolsForConversion = 
json.liststoragepoolsresponse.storagepool || []
+          if (this.form.convertstoragepoolid) {
+            const poolExists = this.storagePoolsForConversion.some(pool => 
pool.id === this.form.convertstoragepoolid)
+            this.selectedStoragePoolForConversion = poolExists ? 
this.form.convertstoragepoolid : null
+          }
         })
       }
     },
@@ -1115,6 +1157,34 @@ export default {
     },
     onForceConvertToPoolChange (val) {
       this.switches.forceConvertToPool = val
+      this.form.forceconverttopool = val
+      this.selectedStorageOptionForConversion = null
+      this.selectedStoragePoolForConversion = null
+      this.showStoragePoolsForConversion = false
+      this.resetStorageOptionsForConversion()
+    },
+    onUseVddkChange (val, isUserChange = true) {
+      if (isUserChange) {
+        this.userModifiedVddkSetting = true
+      }
+      if (val) {
+        this.form.forceconverttopool = true
+        this.form.forcemstoimportvmfiles = false
+        this.switches.forceConvertToPool = true
+        this.switches.forceMsToImportVmFiles = false
+        // Reset import host selection when VDDK is enabled
+        this.selectedKvmHostForImporting = null
+        // Refresh host list to show VDDK support details
+        this.fetchKvmHostsForConversion()
+      } else {
+        this.form.forceconverttopool = false
+        this.switches.forceConvertToPool = false
+        this.selectedStorageOptionForConversion = null
+        this.selectedStoragePoolForConversion = null
+        this.showStoragePoolsForConversion = false
+        // Refresh host list to remove VDDK support details
+        this.fetchKvmHostsForConversion()
+      }
       this.resetStorageOptionsForConversion()
     },
     updateSelectedRootDisk () {
@@ -1229,18 +1299,25 @@ export default {
           if (this.selectedKvmHostForImporting) {
             params.importinstancehostid = this.selectedKvmHostForImporting
           }
-          if (this.selectedStoragePoolForConversion) {
-            params.convertinstancepoolid = 
this.selectedStoragePoolForConversion
+          const selectedPoolForConversion = values.convertstoragepoolid || 
this.selectedStoragePoolForConversion
+          if (selectedPoolForConversion) {
+            params.convertinstancepoolid = selectedPoolForConversion
           }
           if (this.vmwareToKvmExtraParams) {
             params.extraparams = this.vmwareToKvmExtraParams
           }
-          params.forcemstoimportvmfiles = values.forcemstoimportvmfiles
-          if (values.forceconverttopool) {
+          if (values.usevddk) {
+            params.usevddk = true
+            params.forcemstoimportvmfiles = false
+          } else {
+            params.usevddk = false
+            params.forcemstoimportvmfiles = values.forcemstoimportvmfiles
+          }
+          if (values.forceconverttopool !== undefined) {
             params.forceconverttopool = values.forceconverttopool
           }
         }
-        var keys = ['hostname', 'domainid', 'projectid', 'account', 
'migrateallowed', 'forced', 'forcemstoimportvmfiles', 'osid']
+        var keys = ['hostname', 'domainid', 'projectid', 'account', 
'migrateallowed', 'forced', 'osid']
         if (this.templateType !== 'auto') {
           keys.push('templateid')
         }
@@ -1354,6 +1431,11 @@ export default {
       this.templateType = this.defaultTemplateType()
       this.updateComputeOffering(undefined)
       this.switches = {}
+      this.form.usevddk = false
+      this.form.forceconverttopool = false
+      this.form.forcemstoimportvmfiles = false
+      this.userModifiedVddkSetting = false
+      this.resetStorageOptionsForConversion()
     },
     closeAction () {
       this.$emit('close-action')

Reply via email to