This is an automated email from the ASF dual-hosted git repository.

harikrishna pushed a commit to branch LiveStoragMigrationScaleIOMain
in repository https://gitbox.apache.org/repos/asf/cloudstack.git

commit 1f407dd14f7e5d59012ab9b21bef8493ae0592a0
Author: Harikrishna Patnala <harikrishna.patn...@gmail.com>
AuthorDate: Mon Mar 13 12:51:18 2023 +0530

    Checking block copy status
---
 .../hypervisor/kvm/resource/LibvirtConnection.java |  2 +
 .../LibvirtMigrateVolumeCommandWrapper.java        | 76 ++++++++++++----------
 .../driver/ScaleIOPrimaryDataStoreDriver.java      | 47 ++++++-------
 3 files changed, 67 insertions(+), 58 deletions(-)

diff --git 
a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/LibvirtConnection.java
 
b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/LibvirtConnection.java
index c70a72f399c..0f8031e3aaa 100644
--- 
a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/LibvirtConnection.java
+++ 
b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/LibvirtConnection.java
@@ -21,6 +21,7 @@ import java.util.Map;
 
 import org.apache.log4j.Logger;
 import org.libvirt.Connect;
+import org.libvirt.Library;
 import org.libvirt.LibvirtException;
 
 import com.cloud.hypervisor.Hypervisor;
@@ -44,6 +45,7 @@ public class LibvirtConnection {
         if (conn == null) {
             s_logger.info("No existing libvirtd connection found. Opening a 
new one");
             conn = new Connect(hypervisorURI, false);
+            Library.initEventLoop();
             s_logger.debug("Successfully connected to libvirt at: " + 
hypervisorURI);
             s_connections.put(hypervisorURI, conn);
         } else {
diff --git 
a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtMigrateVolumeCommandWrapper.java
 
b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtMigrateVolumeCommandWrapper.java
index c78ea453250..a083061d227 100644
--- 
a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtMigrateVolumeCommandWrapper.java
+++ 
b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtMigrateVolumeCommandWrapper.java
@@ -46,13 +46,11 @@ import org.apache.cloudstack.storage.to.VolumeObjectTO;
 import org.apache.log4j.Logger;
 import org.libvirt.Connect;
 import org.libvirt.Domain;
+import org.libvirt.DomainBlockJobInfo;
 import org.libvirt.DomainInfo;
 import org.libvirt.TypedParameter;
 import org.libvirt.TypedUlongParameter;
 import org.libvirt.LibvirtException;
-import org.libvirt.event.BlockJobListener;
-import org.libvirt.event.BlockJobStatus;
-import org.libvirt.event.BlockJobType;
 
 @ResourceWrapper(handles =  MigrateVolumeCommand.class)
 public final class LibvirtMigrateVolumeCommandWrapper extends 
CommandWrapper<MigrateVolumeCommand, Answer, LibvirtComputingResource> {
@@ -80,15 +78,6 @@ public final class LibvirtMigrateVolumeCommandWrapper 
extends CommandWrapper<Mig
         String srcPath = srcVolumeObjectTO.getPath();
         final String srcVolumeId = 
ScaleIOUtil.getVolumePath(srcVolumeObjectTO.getPath());
         final String vmName = srcVolumeObjectTO.getVmName();
-        Map<String, String> srcDetails = command.getSrcDetails();
-        final String srcSystemId = 
srcDetails.get(ScaleIOGatewayClient.STORAGE_POOL_SYSTEM_ID);
-
-        final String diskFileName = ScaleIOUtil.DISK_NAME_PREFIX + srcSystemId 
+ "-" + srcVolumeId;
-        final String srcFilePath = ScaleIOUtil.DISK_PATH + File.separator + 
diskFileName;
-
-        LOGGER.info("HARI Source volume ID: "+ srcVolumeId);
-        LOGGER.info("HARI source volume PATH: "+ srcFilePath);
-        LOGGER.info("HARI source system ID: "+ srcSystemId);
 
         // Destination Details
         VolumeObjectTO destVolumeObjectTO = 
(VolumeObjectTO)command.getDestData();
@@ -96,13 +85,11 @@ public final class LibvirtMigrateVolumeCommandWrapper 
extends CommandWrapper<Mig
         final String destVolumeId = 
ScaleIOUtil.getVolumePath(destVolumeObjectTO.getPath());
         Map<String, String> destDetails = command.getDestDetails();
         final String destSystemId = 
destDetails.get(ScaleIOGatewayClient.STORAGE_POOL_SYSTEM_ID);
+        String destDiskLabel = null;
 
         final String destDiskFileName = ScaleIOUtil.DISK_NAME_PREFIX + 
destSystemId + "-" + destVolumeId;
         final String diskFilePath = ScaleIOUtil.DISK_PATH + File.separator + 
destDiskFileName;
 
-        LOGGER.info("HARI destination volume ID: "+ destVolumeId);
-        LOGGER.info("HARI destination system ID: "+ destSystemId);
-
         Domain dm = null;
         try {
             final LibvirtUtilitiesHelper libvirtUtilitiesHelper = 
libvirtComputingResource.getLibvirtUtilitiesHelper();
@@ -123,6 +110,7 @@ public final class LibvirtMigrateVolumeCommandWrapper 
extends CommandWrapper<Mig
             pool.connectPhysicalDisk(destVolumeObjectTO.getPath(), null);
 
             LibvirtVMDef.DiskDef diskdef = 
generateDestinationDiskDefinition(dm, srcVolumeId, srcPath, diskFilePath);
+            destDiskLabel = diskdef.getDiskLabel();
 
             TypedUlongParameter parameter = new 
TypedUlongParameter("bandwidth", 0);
             TypedParameter[] parameters = new TypedParameter[1];
@@ -130,28 +118,41 @@ public final class LibvirtMigrateVolumeCommandWrapper 
extends CommandWrapper<Mig
 
             Domain finalDm = dm;
             final Boolean[] copyStatus = {true};
-            dm.blockCopy(diskdef.getDiskLabel(), diskdef.toString(), 
parameters, Domain.BlockCopyFlags.REUSE_EXT);
-            BlockJobListener listener = new BlockJobListener() {
-                @Override
-                public void onEvent(Domain domain, String diskPath, 
BlockJobType type, BlockJobStatus status) {
-                    if (type == BlockJobType.COPY && status == 
BlockJobStatus.READY) {
-                        try {
-                            finalDm.blockJobAbort(diskFilePath, 
Domain.BlockJobAbortFlags.PIVOT);
-                            copyStatus[0] = false;
-                        } catch (LibvirtException e) {
-                            throw new RuntimeException(e);
-                        }
+            dm.blockCopy(destDiskLabel, diskdef.toString(), parameters, 
Domain.BlockCopyFlags.REUSE_EXT);
+            LOGGER.info(String.format("Block copy has started for the volume 
%s : %s ", diskdef.getDiskLabel(), srcPath));
+
+            int timeBetweenTries = 1000; // Try more frequently (every sec) 
and return early if disk is found
+            int waitTimeInSec = command.getWait();
+            while (waitTimeInSec > 0) {
+                DomainBlockJobInfo blockJobInfo = 
dm.getBlockJobInfo(destDiskLabel, 0);
+                if (blockJobInfo != null) {
+                    LOGGER.debug(String.format("Volume %s : %s block copy 
progress: %s%% ", destDiskLabel, srcPath, 100 * (blockJobInfo.cur / 
blockJobInfo.end)));
+                    if (blockJobInfo.cur == blockJobInfo.end) {
+                        LOGGER.debug(String.format("Block copy completed for 
the volume %s : %s", destDiskLabel, srcPath));
+                        dm.blockJobAbort(destDiskLabel, 
Domain.BlockJobAbortFlags.PIVOT);
+                        break;
                     }
+                } else {
+                    LOGGER.debug("Failed to get the block copy status, trying 
to abort the job");
+                    dm.blockJobAbort(destDiskLabel, 
Domain.BlockJobAbortFlags.ASYNC);
+                }
+                waitTimeInSec--;
+
+                try {
+                    Thread.sleep(timeBetweenTries);
+                } catch (Exception ex) {
+                    // don't do anything
                 }
-            };
-            dm.addBlockJobListener(listener);
-            while (copyStatus[0]) {
-                LOGGER.info("Waiting for the block copy to complete");
             }
 
-            if (copyStatus[0]) {
-                String msg = "Migrate volume failed due to timeout";
-                LOGGER.warn(msg);
+            if (waitTimeInSec <= 0) {
+                String msg = "Block copy is taking long time, failing the job";
+                LOGGER.error(msg);
+                try {
+                    dm.blockJobAbort(destDiskLabel, 
Domain.BlockJobAbortFlags.ASYNC);
+                } catch (LibvirtException ex) {
+                    LOGGER.error("Migrate volume failed while aborting the 
block job due to " + ex.getMessage());
+                }
                 return new MigrateVolumeAnswer(command, false, msg, null);
             }
 
@@ -159,6 +160,13 @@ public final class LibvirtMigrateVolumeCommandWrapper 
extends CommandWrapper<Mig
         } catch (Exception e) {
             String msg = "Migrate volume failed due to " + e.toString();
             LOGGER.warn(msg, e);
+            if (destDiskLabel != null) {
+                try {
+                    dm.blockJobAbort(destDiskLabel, 
Domain.BlockJobAbortFlags.ASYNC);
+                } catch (LibvirtException ex) {
+                    LOGGER.error("Migrate volume failed while aborting the 
block job due to " + ex.getMessage());
+                }
+            }
             return new MigrateVolumeAnswer(command, false, msg, null);
         } finally {
             if (dm != null) {
@@ -180,7 +188,6 @@ public final class LibvirtMigrateVolumeCommandWrapper 
extends CommandWrapper<Mig
         LibvirtVMDef.DiskDef diskdef = null;
         for (final LibvirtVMDef.DiskDef disk : disks) {
             final String file = disk.getDiskPath();
-            LOGGER.info("HARIIII disk: " + file);
             if (file != null && file.contains(srcVolumeId)) {
                 diskdef = disk;
                 break;
@@ -190,7 +197,6 @@ public final class LibvirtMigrateVolumeCommandWrapper 
extends CommandWrapper<Mig
             throw new InternalErrorException("disk: " + srcPath + " is not 
attached before");
         }
         diskdef.setDiskPath(diskFilePath);
-        LOGGER.info("HARIIII Destination xml : " + diskdef.toString());
 
         return diskdef;
     }
diff --git 
a/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/driver/ScaleIOPrimaryDataStoreDriver.java
 
b/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/driver/ScaleIOPrimaryDataStoreDriver.java
index b70ae125f80..9af900f5ee5 100644
--- 
a/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/driver/ScaleIOPrimaryDataStoreDriver.java
+++ 
b/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/driver/ScaleIOPrimaryDataStoreDriver.java
@@ -793,7 +793,7 @@ public class ScaleIOPrimaryDataStoreDriver implements 
PrimaryDataStoreDriver {
             if (migrateStatus) {
                 updateVolumeAfterCopyVolume(srcData, destData);
                 updateSnapshotsAfterCopyVolume(srcData, destData);
-
+                deleteSourceVolumeAfterSuccessfulBlockCopy(srcData, host);
                 LOGGER.debug(String.format("Successfully migrated migrate 
PowerFlex volume %d to storage pool %d", srcVolumeId,  destPoolId));
                 answer = new Answer(null, true, null);
             } else {
@@ -807,32 +807,12 @@ public class ScaleIOPrimaryDataStoreDriver implements 
PrimaryDataStoreDriver {
         }
 
         if (destVolumePath != null && !answer.getResult()) {
-            revertCopyVolumeOperations(srcData, destData, host, 
destVolumePath);
+            revertBlockCopyVolumeOperations(srcData, destData, host, 
destVolumePath);
         }
 
         return answer;
     }
 
-    private void checkForDestinationVolumeExistence(DataStore destStore, 
String destVolumePath) throws Exception {
-        int retryCount = 3;
-        while (retryCount > 0) {
-            try {
-                Thread.sleep(3000); // Try after few secs
-                String destScaleIOVolumeId = 
ScaleIOUtil.getVolumePath(destVolumePath);
-                final ScaleIOGatewayClient destClient = 
getScaleIOClient(destStore.getId());
-                org.apache.cloudstack.storage.datastore.api.Volume 
destScaleIOVolume = destClient.getVolume(destScaleIOVolumeId);
-                if (destScaleIOVolume != null) {
-                    return;
-                }
-            } catch (Exception ex) {
-                LOGGER.error("Exception while checking for existence of the 
volume at " + destVolumePath + " - " + ex.getLocalizedMessage());
-                throw ex;
-            } finally {
-                retryCount--;
-            }
-        }
-    }
-
     private void updateVolumeAfterCopyVolume(DataObject srcData, DataObject 
destData) {
         // destination volume is already created and volume path is set in 
database by this time at "CreateObjectAnswer createAnswer = 
createVolume((VolumeInfo) destData, destStore.getId());"
         final long srcVolumeId = srcData.getId();
@@ -852,6 +832,7 @@ public class ScaleIOPrimaryDataStoreDriver implements 
PrimaryDataStoreDriver {
             volumeDao.update(srcVolumeId, volume);
         }
     }
+
     private Host findEndpointForVolumeOperation(DataObject srcData) {
         long hostId = 0;
         VMInstanceVO instance = 
vmInstanceDao.findVMByInstanceName(((VolumeInfo) srcData).getAttachedVmName());
@@ -868,6 +849,7 @@ public class ScaleIOPrimaryDataStoreDriver implements 
PrimaryDataStoreDriver {
 
         return host;
     }
+
     private void updateSnapshotsAfterCopyVolume(DataObject srcData, DataObject 
destData) throws Exception {
         final long srcVolumeId = srcData.getId();
         DataStore srcStore = srcData.getDataStore();
@@ -902,7 +884,26 @@ public class ScaleIOPrimaryDataStoreDriver implements 
PrimaryDataStoreDriver {
         }
     }
 
-    private void revertCopyVolumeOperations(DataObject srcData, DataObject 
destData, Host host, String destVolumePath) {
+    private void deleteSourceVolumeAfterSuccessfulBlockCopy(DataObject 
srcData, Host host) {
+        DataStore srcStore = srcData.getDataStore();
+        String srcVolumePath = srcData.getTO().getPath();
+        revokeAccess(srcData, host, srcData.getDataStore());
+        String errMsg;
+        try {
+            String scaleIOVolumeId = ScaleIOUtil.getVolumePath(srcVolumePath);
+            final ScaleIOGatewayClient client = 
getScaleIOClient(srcStore.getId());
+            Boolean deleteResult =  client.deleteVolume(scaleIOVolumeId);
+            if (!deleteResult) {
+                errMsg = "Failed to delete source PowerFlex volume with id: " 
+ scaleIOVolumeId;
+                LOGGER.warn(errMsg);
+            }
+        } catch (Exception e) {
+            errMsg = "Unable to delete source PowerFlex volume: " + 
srcVolumePath + " due to " + e.getMessage();
+            LOGGER.warn(errMsg);;
+        }
+    }
+
+    private void revertBlockCopyVolumeOperations(DataObject srcData, 
DataObject destData, Host host, String destVolumePath) {
         final String srcVolumePath = ((VolumeInfo) srcData).getPath();
         final String srcVolumeFolder = ((VolumeInfo) srcData).getFolder();
         DataStore destStore = destData.getDataStore();

Reply via email to