hsato03 commented on code in PR #12086:
URL: https://github.com/apache/cloudstack/pull/12086#discussion_r2884086981


##########
engine/orchestration/src/main/java/org/apache/cloudstack/engine/orchestration/StorageOrchestrator.java:
##########
@@ -352,26 +374,111 @@ protected Pair<String, Boolean> migrateCompleted(Long 
destDatastoreId, DataStore
     protected Map<Long, Pair<Long, Long>> migrateAway(
             DataObject chosenFileForMigration,
             Map<Long, Pair<Long, Long>> storageCapacities,
-            Map<DataObject,
-            Pair<List<SnapshotInfo>, Long>> snapshotChains,
+            Map<DataObject, Pair<List<SnapshotInfo>, Long>> snapshotChains,
             Map<DataObject, Pair<List<TemplateInfo>, Long>> templateChains,
+            Set<Long> snapshotIdsToMigrate,
             DataStore srcDatastore,
             Long destDatastoreId,
             List<Future<DataObjectResult>> futures) {
 
         Long fileSize = migrationHelper.getFileSize(chosenFileForMigration, 
snapshotChains, templateChains);
         storageCapacities = assumeMigrate(storageCapacities, 
srcDatastore.getId(), destDatastoreId, fileSize);
 
-        MigrateDataTask task = new MigrateDataTask(chosenFileForMigration, 
srcDatastore, dataStoreManager.getDataStore(destDatastoreId, 
DataStoreRole.Image));
-        if (chosenFileForMigration instanceof SnapshotInfo ) {
+        DataStore destDataStore = 
dataStoreManager.getDataStore(destDatastoreId, DataStoreRole.Image);
+
+        boolean isKvmIncrementalSnapshot = chosenFileForMigration instanceof 
SnapshotInfo && ((SnapshotInfo) 
chosenFileForMigration).isKvmIncrementalSnapshot() && 
snapshotChains.containsKey(chosenFileForMigration);
+
+        if (isKvmIncrementalSnapshot) {
+            MigrateKvmIncrementalSnapshotTask task = new 
MigrateKvmIncrementalSnapshotTask(chosenFileForMigration, snapshotChains, 
srcDatastore, destDataStore, snapshotIdsToMigrate);
+            
futures.add(submitKvmIncrementalMigration(srcDatastore.getScope().getScopeId(), 
task));
+            logger.debug("Incremental snapshot migration {} submitted to 
incremental pool.", chosenFileForMigration.getUuid());
+        } else {
+            createMigrateDataTask(chosenFileForMigration, snapshotChains, 
templateChains, srcDatastore, destDataStore, futures);
+        }
+
+        return storageCapacities;
+    }
+
+    private AsyncCallFuture<DataObjectResult> 
migrateKvmIncrementalSnapshotChain(DataObject chosenFileForMigration, 
Map<DataObject, Pair<List<SnapshotInfo>, Long>> snapshotChains, DataStore 
srcDatastore, DataStore destDataStore, Set<Long> snapshotIdsToMigrate) {
+        return 
Transaction.execute((TransactionCallback<AsyncCallFuture<DataObjectResult>>) 
status -> {
+            MigrateBetweenSecondaryStoragesCommandAnswer answer = null;
+            AsyncCallFuture<DataObjectResult> future = new AsyncCallFuture<>();
+            DataObjectResult result = new 
DataObjectResult(chosenFileForMigration);
+
+            try {
+                List<SnapshotInfo> snapshotChain = 
snapshotChains.get(chosenFileForMigration).first();
+                MigrateSnapshotsBetweenSecondaryStoragesCommand 
migrateBetweenSecondaryStoragesCmd = new 
MigrateSnapshotsBetweenSecondaryStoragesCommand(snapshotChain.stream().map(DataObject::getTO).collect(Collectors.toList()),
 srcDatastore.getTO(), destDataStore.getTO(), snapshotIdsToMigrate);
+
+                HostVO host = getAvailableHost(((SnapshotInfo) 
chosenFileForMigration).getDataCenterId());
+                if (host == null) {
+                    throw new CloudRuntimeException("No suitable hosts found 
to send migrate command.");
+                }
+
+                
migrateBetweenSecondaryStoragesCmd.setWait(StorageManager.AgentMaxDataMigrationWaitTime.valueIn(host.getClusterId()));
+                answer = (MigrateBetweenSecondaryStoragesCommandAnswer) 
agentManager.send(host.getId(), migrateBetweenSecondaryStoragesCmd);
+                if (answer == null || !answer.getResult()) {
+                    if (answer != null) {
+                        logger.warn(answer.getDetails());
+                    }
+                    throw new CloudRuntimeException("Unable to migrate KVM 
incremental snapshots to another secondary storage.");
+                }
+            } catch (final OperationTimedoutException | 
AgentUnavailableException e) {
+                throw new CloudRuntimeException("Error while migrating KVM 
incremental snapshot chain. Check the logs for more information.", e);
+            } finally {
+                if (answer != null) {
+                    updateSnapshotsReference(destDataStore, answer);
+                }
+            }
+            result.setSuccess(true);
+            future.complete(result);
+            return future;
+        });
+    }
+
+    private void updateSnapshotsReference(DataStore destDataStore, 
MigrateBetweenSecondaryStoragesCommandAnswer answer) {
+        for (Pair<Long, String> snapshotIdAndUpdatedCheckpointPath : 
answer.getMigratedResources()) {
+            Long snapshotId = snapshotIdAndUpdatedCheckpointPath.first();
+            String newCheckpointPath = 
snapshotIdAndUpdatedCheckpointPath.second();
+
+            SnapshotDataStoreVO snapshotDataStore = 
snapshotDataStoreDao.findOneBySnapshotAndDatastoreRole(snapshotId, 
DataStoreRole.Image);
+
+            if (snapshotDataStore == null) {
+                logger.warn("Snapshot [{}] not found.", snapshotId);
+                continue;
+            }
+
+            snapshotDataStore.setDataStoreId(destDataStore.getId());
+            snapshotDataStore.setKvmCheckpointPath(newCheckpointPath);
+            snapshotDataStoreDao.update(snapshotDataStore.getId(), 
snapshotDataStore);
+        }
+    }
+
+    protected <T> Future<T> submitKvmIncrementalMigration(Long zoneId, 
Callable<T> task) {
+        if (!zoneKvmIncrementalExecutorMap.containsKey(zoneId)) {
+            zoneKvmIncrementalExecutorMap.put(zoneId, 
Executors.newSingleThreadExecutor());
+        }
+        return zoneKvmIncrementalExecutorMap.get(zoneId).submit(task);
+    }
+
+    private HostVO getAvailableHost(long zoneId) throws 
AgentUnavailableException, OperationTimedoutException {
+        List<HostVO> hosts = 
hostDao.listByDataCenterIdAndHypervisorType(zoneId, 
Hypervisor.HypervisorType.KVM);
+        if (CollectionUtils.isNotEmpty(hosts)) {
+            return hosts.get(new Random().nextInt(hosts.size()));

Review Comment:
   The method `hostDao.listByDataCenterIdAndHypervisorType` already performs 
this filtering.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to