rajiv-jain-netapp commented on code in PR #13053:
URL: https://github.com/apache/cloudstack/pull/13053#discussion_r3240791778


##########
plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtCreateDiskOnlyVMSnapshotCommandWrapper.java:
##########
@@ -146,21 +150,13 @@ protected Answer 
takeDiskOnlyVmSnapshotOfStoppedVm(CreateDiskOnlyVmSnapshotComma
             }
         } catch (LibvirtException | QemuImgException e) {
             logger.error("Exception while creating disk-only VM snapshot for 
VM [{}]. Deleting leftover deltas.", vmName, e);
-            for (VolumeObjectTO volumeObjectTO : volumeObjectTos) {
-                Pair<Long, String> volSizeAndNewPath = 
mapVolumeToSnapshotSizeAndNewVolumePath.get(volumeObjectTO.getUuid());
-                PrimaryDataStoreTO primaryDataStoreTO = (PrimaryDataStoreTO) 
volumeObjectTO.getDataStore();
-                KVMStoragePool kvmStoragePool = 
storagePoolMgr.getStoragePool(primaryDataStoreTO.getPoolType(), 
primaryDataStoreTO.getUuid());
-
-                if (volSizeAndNewPath == null) {
-                    continue;
-                }
-                try {
-                    
Files.deleteIfExists(Path.of(kvmStoragePool.getLocalPathFor(volSizeAndNewPath.second())));
-                } catch (IOException ex) {
-                    logger.warn("Tried to delete leftover snapshot at [{}] 
failed.", volSizeAndNewPath.second(), ex);
-                }
-            }
+            cleanupLeftoverDeltas(volumeObjectTos, 
mapVolumeToSnapshotSizeAndNewVolumePath, storagePoolMgr);
             return new Answer(cmd, e);
+        } catch (Exception e) {
+            logger.error("Unexpected exception while creating disk-only VM 
snapshot for VM [{}]. Deleting leftover deltas.", vmName, e);
+            cleanupLeftoverDeltas(volumeObjectTos, 
mapVolumeToSnapshotSizeAndNewVolumePath, storagePoolMgr);
+            return new CreateDiskOnlyVmSnapshotAnswer(cmd, false,
+                    String.format("Creation of disk-only VM snapshot for VM 
[%s] failed due to %s.", vmName, e.getMessage()), null);

Review Comment:
   @winterhazel I was inclined to keep the exceptions and their corresponding 
messages distinct. This approach enables both developers and users to clearly 
identify the specific exception encountered and take appropriate triage 
actions, especially in customer-facing scenarios.
   Accordingly, the exception handling has been separated from the existing 
catch block to preserve clarity and improve diagnosability.



##########
plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/IscsiAdmStorageAdaptor.java:
##########
@@ -280,8 +329,96 @@ private String getComponent(String path, int index) {
         return tmp[index].trim();
     }
 
+    /**
+     * Check if there are other LUNs on the same iSCSI target (IQN) that are 
still
+     * visible as block devices. This is needed because ONTAP uses a single 
IQN per
+     * SVM — logging out of the target would kill ALL LUNs, not just the one 
being
+     * disconnected.
+     *
+     * Checks /dev/disk/by-path/ for symlinks matching the same host:port + 
IQN but
+     * with a different LUN number.
+     */
+    private boolean hasOtherActiveLuns(String host, int port, String iqn, 
String lun) {
+        String prefix = "ip-" + host + ":" + port + "-iscsi-" + iqn + "-lun-";
+        java.io.File byPathDir = new java.io.File("/dev/disk/by-path");
+        if (!byPathDir.exists() || !byPathDir.isDirectory()) {
+            return false;
+        }
+        java.io.File[] entries = byPathDir.listFiles();
+        if (entries == null) {
+            return false;
+        }
+        for (java.io.File entry : entries) {
+            String name = entry.getName();
+            // Skip partition entries (e.g. lun-0-part1, lun-0-part2) — these 
are not
+            // independent LUNs, they are partition symlinks for the same LUN 
disk.
+            // Only count actual LUN entries (no "-part" suffix after the lun 
number).
+            if (name.startsWith(prefix) && !name.equals(prefix + lun) && 
!name.contains("-part")) {
+                logger.debug("Found other active LUN on same target: " + name);
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Removes a single stale SCSI device from the kernel using the sysfs 
interface.
+     *
+     * When ONTAP unmaps a LUN from the host's igroup, the by-path symlink and 
the
+     * underlying SCSI device (/dev/sdX) remain present in the kernel until 
explicitly
+     * removed — the kernel does not auto-remove devices from live iSCSI 
sessions.
+     *
+     * This method resolves the by-path symlink to the real block device name 
(e.g. sdd),
+     * then writes "1" to /sys/block/<dev>/device/delete — the standard Linux 
kernel SCSI
+     * API for removing a single device without tearing down the entire iSCSI 
session.
+     * Once the kernel processes the delete, it also removes the by-path 
symlink.
+     *
+     * This is used instead of iscsiadm --logout when other LUNs on the same 
IQN are still
+     * active (ONTAP single-IQN-per-SVM model), since logout would tear down 
ALL LUNs.
+     */
+    private void removeStaleScsiDevice(String host, int port, String iqn, 
String lun) {
+        String byPath = getByPath(host, port, "/" + iqn + "/" + lun);
+        java.nio.file.Path byPathLink = java.nio.file.Paths.get(byPath);
+        if (!java.nio.file.Files.exists(byPathLink)) {
+            logger.debug("by-path entry for LUN " + lun + " already gone, 
nothing to remove");
+            return;
+        }
+        try {
+            java.nio.file.Path realDevice = byPathLink.toRealPath();
+            String devName = realDevice.getFileName().toString();
+            java.io.File deleteFile = new java.io.File("/sys/block/" + devName 
+ "/device/delete");
+            if (!deleteFile.exists()) {
+                logger.warn("sysfs delete entry not found for device " + 
devName + " — cannot remove stale SCSI device");
+                return;
+            }
+            try (java.io.FileWriter fw = new java.io.FileWriter(deleteFile)) {
+                fw.write("1");
+            }
+            logger.info("Removed stale SCSI device " + devName + " for LUN /" 
+ iqn + "/" + lun + " via sysfs");
+        } catch (Exception e) {
+            logger.warn("Failed to remove stale SCSI device for LUN /" + iqn + 
"/" + lun + ": " + e.getMessage());
+        }
+    }
+
     private boolean disconnectPhysicalDisk(String host, int port, String iqn, 
String lun) {
-        // use iscsiadm to log out of the iSCSI target and un-discover it
+        // Check if other LUNs on the same IQN target are still in use.
+        // ONTAP (and similar) uses a single IQN per SVM with multiple LUNs.
+        // Doing iscsiadm --logout tears down the ENTIRE target session,
+        // which would destroy access to ALL LUNs — not just the one being 
disconnected.
+        if (hasOtherActiveLuns(host, port, iqn, lun)) {
+            logger.info("Skipping iSCSI logout for /" + iqn + "/" + lun +
+                    " — other LUNs on the same target are still active. 
Removing stale SCSI device for this LUN only.");
+            removeStaleScsiDevice(host, port, iqn, lun);
+            // After removing this LUN's device, re-check: if no other LUNs 
remain active,
+            // If it is the last one then must logout to clean up the iSCSI 
session entirely.
+            if (hasOtherActiveLuns(host, port, iqn, lun)) {
+                logger.info("Other LUNs still active after removing /" + iqn + 
"/" + lun + " — session kept alive.");
+                return true;

Review Comment:
   @winterhazel The same logic applies to both connect and disconnect 
operations. If one or more LUNs associated with the SVM‑level IQN are still in 
use, we should avoid logging out of that IQN; otherwise, it may unintentionally 
disconnect other active LUNs.



##########
plugins/storage/volume/ontap/src/main/java/org/apache/cloudstack/storage/listener/OntapHostListener.java:
##########
@@ -63,14 +69,20 @@ public boolean hostConnect(long hostId, long poolId)  {
             return false;
         }
 
+        // TODO add host type check also since we support only KVM for now, 
host.getHypervisorType().equals(HypervisorType.KVM)

Review Comment:
   @winterhazel, That is correct—there are plans to extend support to 
additional hypervisors, and this TODO is intended to track that future 
enhancement.



##########
plugins/storage/volume/ontap/src/main/java/org/apache/cloudstack/storage/listener/OntapHostListener.java:
##########
@@ -137,6 +156,7 @@ public boolean hostDisconnected(Host host, StoragePool 
pool) {
             logger.error("Failed to add host by HostListener as host was not 
found with id : {}", host.getId());
             return false;
         }
+        // TODO add storage pool get validation

Review Comment:
   @winterhazel This is not in scope for the current PR; it will be addressed 
as part of upcoming feature support. Would you recommend removing the TODO 
intended for future implementation?



##########
plugins/storage/volume/ontap/src/main/java/org/apache/cloudstack/storage/lifecycle/OntapPrimaryDatastoreLifecycle.java:
##########
@@ -207,70 +212,67 @@ private long validateInitializeInputs(Long capacityBytes, 
Long podId, Long clust
             capacityBytes = ONTAP_MIN_VOLUME_SIZE_IN_BYTES;
         }
 
-        // Scope (pod/cluster/zone) validation
+        // Validate scope
         if (podId == null ^ clusterId == null) {
             throw new CloudRuntimeException("Cluster Id or Pod Id is null, 
cannot create primary storage");
         }
-        if (podId == null && clusterId == null) {
+
+        if (podId == null) {
             if (zoneId != null) {
                 logger.info("Both Pod Id and Cluster Id are null, Primary 
storage pool will be associated with a Zone");
             } else {
                 throw new CloudRuntimeException("Pod Id, Cluster Id and Zone 
Id are all null, cannot create primary storage");
             }
         }

Review Comment:
   @winterhazel We are considering two scenarios:
   1. If zoneId is not null and podId is null, the pool scope will be at the 
zone level.
   2. If both zoneId and podId are null, an exception will be thrown.
   
   Please advise if these should be combined, assuming only a single applicable 
scenario.



##########
plugins/storage/volume/ontap/src/main/java/org/apache/cloudstack/storage/provider/StorageProviderFactory.java:
##########
@@ -36,23 +39,25 @@ public class StorageProviderFactory {
     public static StorageStrategy getStrategy(OntapStorage ontapStorage) {
         ProtocolType protocol = ontapStorage.getProtocol();
         logger.info("Initializing StorageProviderFactory with protocol: " + 
protocol);
+        String decodedPassword = new 
String(java.util.Base64.getDecoder().decode(ontapStorage.getPassword()), 
StandardCharsets.UTF_8);
+        ontapStorage = new OntapStorage(
+            ontapStorage.getUsername(),
+            decodedPassword,
+            ontapStorage.getStorageIP(),
+            ontapStorage.getSvmName(),
+            ontapStorage.getSize(),
+            protocol);
         switch (protocol) {
             case NFS3:
-                if (!ontapStorage.getIsDisaggregated()) {
-                    UnifiedNASStrategy unifiedNASStrategy = new 
UnifiedNASStrategy(ontapStorage);
-                    ComponentContext.inject(unifiedNASStrategy);
-                    unifiedNASStrategy.setOntapStorage(ontapStorage);
-                    return unifiedNASStrategy;
-                }
-                throw new CloudRuntimeException("Unsupported configuration: 
Disaggregated ONTAP is not supported.");
+                UnifiedNASStrategy unifiedNASStrategy = new 
UnifiedNASStrategy(ontapStorage);
+                ComponentContext.inject(unifiedNASStrategy);
+                unifiedNASStrategy.setOntapStorage(ontapStorage);

Review Comment:
   Yes updated



##########
plugins/storage/volume/ontap/src/main/java/org/apache/cloudstack/storage/provider/StorageProviderFactory.java:
##########
@@ -36,23 +39,25 @@ public class StorageProviderFactory {
     public static StorageStrategy getStrategy(OntapStorage ontapStorage) {
         ProtocolType protocol = ontapStorage.getProtocol();
         logger.info("Initializing StorageProviderFactory with protocol: " + 
protocol);
+        String decodedPassword = new 
String(java.util.Base64.getDecoder().decode(ontapStorage.getPassword()), 
StandardCharsets.UTF_8);
+        ontapStorage = new OntapStorage(
+            ontapStorage.getUsername(),
+            decodedPassword,
+            ontapStorage.getStorageIP(),
+            ontapStorage.getSvmName(),
+            ontapStorage.getSize(),
+            protocol);
         switch (protocol) {
             case NFS3:
-                if (!ontapStorage.getIsDisaggregated()) {
-                    UnifiedNASStrategy unifiedNASStrategy = new 
UnifiedNASStrategy(ontapStorage);
-                    ComponentContext.inject(unifiedNASStrategy);
-                    unifiedNASStrategy.setOntapStorage(ontapStorage);
-                    return unifiedNASStrategy;
-                }
-                throw new CloudRuntimeException("Unsupported configuration: 
Disaggregated ONTAP is not supported.");
+                UnifiedNASStrategy unifiedNASStrategy = new 
UnifiedNASStrategy(ontapStorage);
+                ComponentContext.inject(unifiedNASStrategy);
+                unifiedNASStrategy.setOntapStorage(ontapStorage);
+                return unifiedNASStrategy;
             case ISCSI:
-                if (!ontapStorage.getIsDisaggregated()) {
-                    UnifiedSANStrategy unifiedSANStrategy = new 
UnifiedSANStrategy(ontapStorage);
-                    ComponentContext.inject(unifiedSANStrategy);
-                    unifiedSANStrategy.setOntapStorage(ontapStorage);
-                    return unifiedSANStrategy;
-                }
-                throw new CloudRuntimeException("Unsupported configuration: 
Disaggregated ONTAP is not supported.");
+                UnifiedSANStrategy unifiedSANStrategy = new 
UnifiedSANStrategy(ontapStorage);
+                ComponentContext.inject(unifiedSANStrategy);
+                unifiedSANStrategy.setOntapStorage(ontapStorage);

Review Comment:
   corrected



##########
plugins/storage/volume/ontap/src/main/java/org/apache/cloudstack/storage/driver/OntapPrimaryDatastoreDriver.java:
##########
@@ -65,14 +108,235 @@ public DataTO getTO(DataObject data) {
     @Override
     public DataStoreTO getStoreTO(DataStore store) { return null; }
 
+    @Override
+    public boolean volumesRequireGrantAccessWhenUsed(){
+        logger.info("OntapPrimaryDatastoreDriver: 
volumesRequireGrantAccessWhenUsed: Called");
+        return true;
+    }
+
+    /**
+     * Creates a volume on the ONTAP storage system.
+     */
     @Override
     public void createAsync(DataStore dataStore, DataObject dataObject, 
AsyncCompletionCallback<CreateCmdResult> callback) {
-        throw new UnsupportedOperationException("Create operation is not 
supported for ONTAP primary storage.");
+        CreateCmdResult createCmdResult = null;
+        String errMsg;
+
+        if (dataObject == null) {
+            throw new InvalidParameterValueException("dataObject should not be 
null");
+        }
+        if (dataStore == null) {
+            throw new InvalidParameterValueException("dataStore should not be 
null");
+        }
+        if (callback == null) {
+            throw new InvalidParameterValueException("callback should not be 
null");
+        }
+
+        try {
+            logger.info("Started for data store name [{}] and data object name 
[{}] of type [{}]",
+                    dataStore.getName(), dataObject.getName(), 
dataObject.getType());
+
+            StoragePoolVO storagePool = 
storagePoolDao.findById(dataStore.getId());
+            if (storagePool == null) {
+                logger.error("createAsync: Storage Pool not found for id: " + 
dataStore.getId());
+                throw new CloudRuntimeException("Storage Pool not found for 
id: " + dataStore.getId());
+            }
+            String storagePoolUuid = dataStore.getUuid();
+
+            Map<String, String> details = 
storagePoolDetailsDao.listDetailsKeyPairs(dataStore.getId());
+
+            if (dataObject.getType() == DataObjectType.VOLUME) {

Review Comment:
   This was retained from a forward‑looking perspective to accommodate 
additional types of createAsync implementations in the future. As a result, it 
currently appears positioned after the validation logic.



##########
plugins/storage/volume/ontap/src/main/java/org/apache/cloudstack/storage/service/StorageStrategy.java:
##########
@@ -281,16 +316,32 @@ public Volume createStorageVolume(String volumeName, Long 
size) {
         }
     }
 
+     /**
+     * Updates ONTAP Flex-Volume
+     * Eligible only for Unified ONTAP storage
+     * throw exception in case of disaggregated ONTAP storage
+     *
+     * @param volume the volume to update
+     * @return the updated Volume object
+     */
     public Volume updateStorageVolume(Volume volume) {
         return null;
     }
 
+    /**
+     * Delete ONTAP Flex-Volume
+     * Eligible only for Unified ONTAP storage
+     * throw exception in case of disaggregated ONTAP storage
+     *
+     * @param volume the volume to delete
+     */
     public void deleteStorageVolume(Volume volume) {
         logger.info("Deleting ONTAP volume by name: " + volume.getName() + " 
and uuid: " + volume.getUuid());
         String authHeader = 
OntapStorageUtils.generateAuthHeader(storage.getUsername(), 
storage.getPassword());
         try {
+            // TODO: Implement lun and file deletion, if any, before deleting 
the volume

Review Comment:
   yes removed



##########
plugins/storage/volume/ontap/src/main/java/org/apache/cloudstack/storage/service/UnifiedNASStrategy.java:
##########
@@ -180,11 +212,11 @@ public void disableLogicalAccess(Map<String, String> 
values) {
 
     @Override
     public Map<String, String> getLogicalAccess(Map<String, String> values) {

Review Comment:
   This is a framework method, so an empty implementation is required to comply 
with the parent interface.



##########
plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/IscsiAdmStorageAdaptor.java:
##########
@@ -158,8 +159,58 @@ public boolean connectPhysicalDisk(String volumeUuid, 
KVMStoragePool pool, Map<S
         return true;
     }
 
+    /**
+     * Checks the result of an iscsiadm node-create command.
+     * Returns true if the node was created or already exists, false on 
failure.
+     */
+    boolean handleNodeCreateResult(String result, String volumeUuid) {
+        if (result == null) {
+            logger.debug("Successfully added iSCSI node for target {}", 
volumeUuid);
+            return true;
+        }
+        String msg = result.toLowerCase();
+        if (msg.contains("already exists") || msg.contains("database exists") 
|| msg.contains("exists")) {

Review Comment:
   We need to handle both success and failure scenarios while invoking iSCSI 
session creation. In failure cases, different failure conditions should be 
handled distinctly.
   Given that NetApp storage establishes iSCSI sessions at the SVM level, and 
an SVM can host multiple FlexVols, the same iSCSI session may be reused across 
multiple workflows. This should be treated as a valid and expected (positive) 
scenario.
   Specifically, if a failure occurs due to an “existing session” condition, it 
should be interpreted as success rather than an error. To accommodate this, we 
have adopted a message-based handling approach to correctly process and respond 
to such cases.



##########
plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/IscsiAdmStorageAdaptor.java:
##########
@@ -158,8 +159,58 @@ public boolean connectPhysicalDisk(String volumeUuid, 
KVMStoragePool pool, Map<S
         return true;
     }
 
+    /**
+     * Checks the result of an iscsiadm node-create command.
+     * Returns true if the node was created or already exists, false on 
failure.
+     */
+    boolean handleNodeCreateResult(String result, String volumeUuid) {
+        if (result == null) {
+            logger.debug("Successfully added iSCSI node for target {}", 
volumeUuid);
+            return true;
+        }
+        String msg = result.toLowerCase();
+        if (msg.contains("already exists") || msg.contains("database exists") 
|| msg.contains("exists")) {
+            logger.debug("iSCSI node already exists for target {}, 
proceeding", volumeUuid);
+            return true;
+        }
+        logger.debug("Failed to add iSCSI node for target {}: {}", volumeUuid, 
result);
+        return false;
+    }
+
+    /**
+     * Checks the result of an iscsiadm login command.
+     * Returns true if the login succeeded or session already exists, false on 
failure.
+     */
+    boolean handleLoginResult(String result, String volumeUuid) {
+        if (result == null) {
+            logger.debug("Successfully logged in to iSCSI target {}", 
volumeUuid);
+            return true;
+        }
+        String msg = result.toLowerCase();
+        if (msg.contains("already present") || msg.contains("already logged 
in") || msg.contains("session exists")) {
+            logger.debug("iSCSI session already exists for target {}, 
proceeding", volumeUuid);
+            return true;
+        }
+        logger.debug("Failed to log in to iSCSI target {}: {}", volumeUuid, 
result);
+        return false;
+    }
+
+    private void rescanIscsiSessions(String iqn, String host, int port) {
+        Script rescanCmd = new Script(true, "iscsiadm", 0, logger);
+        rescanCmd.add("-m", "node");
+        rescanCmd.add("-T", iqn);
+        rescanCmd.add("-p", host + ":" + port);
+        rescanCmd.add("--rescan");
+        String rescanResult = rescanCmd.execute();
+        if (rescanResult != null) {
+            logger.warn("iSCSI session rescan returned: {}", rescanResult);
+        } else {
+            logger.debug("iSCSI session rescan completed successfully for 
{}@{}:{}", iqn, host, port);
+        }
+    }
+
     private void waitForDiskToBecomeAvailable(String volumeUuid, 
KVMStoragePool pool) {
-        int numberOfTries = 10;
+        int numberOfTries = 30;

Review Comment:
   Ideally, this parameter should be configured by the administrator. However, 
we have not yet seen this capability being adopted or effectively supported for 
this parameter.
   For now, I can revert this behavior within the ONTAP workflows as a 
temporary approach. In parallel, I will explore this further as part of an 
enhancement/upgrade and come back with a separate PR.



##########
plugins/storage/volume/ontap/src/main/java/org/apache/cloudstack/storage/vmsnapshot/OntapVMSnapshotStrategy.java:
##########
@@ -0,0 +1,931 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.storage.vmsnapshot;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import javax.inject.Inject;
+import javax.naming.ConfigurationException;
+
+import org.apache.cloudstack.engine.subsystem.api.storage.StrategyPriority;
+import org.apache.cloudstack.engine.subsystem.api.storage.VMSnapshotOptions;
+import org.apache.cloudstack.storage.datastore.db.StoragePoolDetailsDao;
+import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
+import org.apache.cloudstack.storage.feign.client.SnapshotFeignClient;
+import org.apache.cloudstack.storage.feign.model.CliSnapshotRestoreRequest;
+import org.apache.cloudstack.storage.feign.model.FlexVolSnapshot;
+import org.apache.cloudstack.storage.feign.model.response.JobResponse;
+import org.apache.cloudstack.storage.feign.model.response.OntapResponse;
+import org.apache.cloudstack.storage.service.StorageStrategy;
+import org.apache.cloudstack.storage.service.model.ProtocolType;
+import org.apache.cloudstack.storage.to.VolumeObjectTO;
+import org.apache.cloudstack.storage.utils.OntapStorageUtils;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import com.cloud.agent.api.CreateVMSnapshotAnswer;
+import com.cloud.agent.api.CreateVMSnapshotCommand;
+import com.cloud.agent.api.DeleteVMSnapshotAnswer;
+import com.cloud.agent.api.DeleteVMSnapshotCommand;
+import com.cloud.agent.api.FreezeThawVMAnswer;
+import com.cloud.agent.api.FreezeThawVMCommand;
+import com.cloud.agent.api.RevertToVMSnapshotAnswer;
+import com.cloud.agent.api.RevertToVMSnapshotCommand;
+import com.cloud.agent.api.VMSnapshotTO;
+import com.cloud.event.EventTypes;
+import com.cloud.exception.AgentUnavailableException;
+import com.cloud.exception.OperationTimedoutException;
+import com.cloud.hypervisor.Hypervisor;
+import com.cloud.storage.GuestOSVO;
+import com.cloud.storage.VolumeDetailVO;
+import com.cloud.storage.VolumeVO;
+import com.cloud.storage.dao.VolumeDetailsDao;
+import com.cloud.uservm.UserVm;
+import com.cloud.utils.exception.CloudRuntimeException;
+import com.cloud.utils.fsm.NoTransitionException;
+import com.cloud.vm.VirtualMachine;
+import com.cloud.vm.snapshot.VMSnapshot;
+import com.cloud.vm.snapshot.VMSnapshotDetailsVO;
+import com.cloud.vm.snapshot.VMSnapshotVO;
+import org.apache.cloudstack.storage.utils.OntapStorageConstants;
+
+/**
+ * VM Snapshot strategy for NetApp ONTAP managed storage using 
FlexVolume-level snapshots.
+ *
+ * <p>This strategy handles VM-level (instance) snapshots for VMs whose volumes
+ * reside on ONTAP managed primary storage. Instead of creating per-file clones
+ * (the old approach), it takes <b>ONTAP FlexVolume-level snapshots</b> via the
+ * ONTAP REST API ({@code POST /api/storage/volumes/{uuid}/snapshots}).</p>
+ *
+ * <h3>Key Advantage:</h3>
+ * <p>When multiple CloudStack disks (ROOT + DATA) reside on the same ONTAP
+ * FlexVolume, a single FlexVolume snapshot atomically captures all of them.
+ * This is both faster and more storage-efficient than per-file clones.</p>
+ *
+ * <h3>Flow:</h3>
+ * <ol>
+ *   <li>Group all VM volumes by their parent FlexVolume UUID</li>
+ *   <li>Freeze the VM via QEMU guest agent ({@code fsfreeze}) — if quiesce 
requested</li>
+ *   <li>For each unique FlexVolume, create one ONTAP snapshot</li>
+ *   <li>Thaw the VM</li>
+ *   <li>Record FlexVolume → snapshot UUID mappings in {@code 
vm_snapshot_details}</li>
+ * </ol>
+ *
+ * <h3>Metadata in vm_snapshot_details:</h3>
+ * <p>Each FlexVolume snapshot is stored as a detail row with:
+ * <ul>
+ *   <li>name = {@value OntapStorageConstants#ONTAP_FLEXVOL_SNAPSHOT}</li>
+ *   <li>value = {@code 
"<flexVolUuid>::<snapshotUuid>::<snapshotName>::<volumePath>::<poolId>::<protocol>"}</li>
+ * </ul>
+ * One row is persisted per CloudStack volume (not per FlexVolume) so that the
+ * revert operation can restore individual files/LUNs using the ONTAP Snapshot
+ * File Restore API ({@code POST 
/api/storage/volumes/{vol}/snapshots/{snap}/files/{path}/restore}).</p>
+ *
+ * <h3>Strategy Selection:</h3>
+ * <p>Returns {@code StrategyPriority.HIGHEST} when:</p>
+ * <ul>
+ *   <li>Hypervisor is KVM</li>
+ *   <li>Snapshot type is Disk-only (no memory)</li>
+ *   <li>All VM volumes are on ONTAP managed primary storage</li>
+ * </ul>
+ */
+public class OntapVMSnapshotStrategy extends StorageVMSnapshotStrategy {
+
+    private static final Logger logger = 
LogManager.getLogger(OntapVMSnapshotStrategy.class);
+
+    /** Separator used in the vm_snapshot_details value to delimit FlexVol 
UUID, snapshot UUID, snapshot name, and pool ID. */
+    static final String DETAIL_SEPARATOR = "::";
+
+    @Inject
+    private StoragePoolDetailsDao storagePoolDetailsDao;
+
+    @Inject
+    private VolumeDetailsDao volumeDetailsDao;
+
+    @Override
+    public boolean configure(String name, Map<String, Object> params) throws 
ConfigurationException {
+        return super.configure(name, params);
+    }
+
+    // 
──────────────────────────────────────────────────────────────────────────
+    // Strategy Selection
+    // 
──────────────────────────────────────────────────────────────────────────
+
+    @Override
+    public StrategyPriority canHandle(VMSnapshot vmSnapshot) {
+        VMSnapshotVO vmSnapshotVO = (VMSnapshotVO) vmSnapshot;
+
+        // For existing (non-Allocated) snapshots, check if we created them
+        if (!VMSnapshot.State.Allocated.equals(vmSnapshotVO.getState())) {
+            // Check for our FlexVolume snapshot details first
+            List<VMSnapshotDetailsVO> flexVolDetails = 
vmSnapshotDetailsDao.findDetails(vmSnapshot.getId(), 
OntapStorageConstants.ONTAP_FLEXVOL_SNAPSHOT);
+            if (CollectionUtils.isNotEmpty(flexVolDetails)) {
+                // Verify the volumes are still on ONTAP storage
+                if (allVolumesOnOntapManagedStorage(vmSnapshot.getVmId())) {
+                    return StrategyPriority.HIGHEST;
+                }
+                return StrategyPriority.CANT_HANDLE;
+            }
+            // Also check legacy STORAGE_SNAPSHOT details for backward 
compatibility
+            List<VMSnapshotDetailsVO> legacyDetails = 
vmSnapshotDetailsDao.findDetails(vmSnapshot.getId(), STORAGE_SNAPSHOT);
+            if (CollectionUtils.isNotEmpty(legacyDetails) && 
allVolumesOnOntapManagedStorage(vmSnapshot.getVmId())) {
+                return StrategyPriority.HIGHEST;
+            }
+            return StrategyPriority.CANT_HANDLE;
+        }
+
+        // For new snapshots (Allocated state), check if we can handle this VM
+        // ONTAP only supports disk-only snapshots, not memory snapshots
+        if (allVolumesOnOntapManagedStorage(vmSnapshot.getVmId())) {
+            if (vmSnapshotVO.getType() == VMSnapshot.Type.DiskAndMemory) {
+                logger.debug("canHandle: Memory snapshots (DiskAndMemory) are 
not supported for VMs on ONTAP storage. VMSnapshot [{}]", vmSnapshot.getId());
+                return StrategyPriority.CANT_HANDLE;
+            }
+            return StrategyPriority.HIGHEST;
+        }
+
+        return StrategyPriority.CANT_HANDLE;
+    }
+
+    @Override
+    public StrategyPriority canHandle(Long vmId, Long rootPoolId, boolean 
snapshotMemory) {
+        // ONTAP FlexVolume snapshots only support disk-only 
(crash-consistent) snapshots.
+        // Memory snapshots (snapshotMemory=true) are not supported because:
+        // 1. ONTAP snapshots capture disk state only, not VM memory
+        // 2. Allowing memory snapshots would require falling back to libvirt 
snapshots,
+        //    creating mixed snapshot chains that would cause issues during 
revert
+        // Return CANT_HANDLE so VMSnapshotManagerImpl can provide a clear 
error message.
+        if (snapshotMemory) {
+            logger.debug("canHandle: Memory snapshots (snapshotMemory=true) 
are not supported for VMs on ONTAP storage. VM [{}]", vmId);
+            return StrategyPriority.CANT_HANDLE;
+        }
+
+        if (allVolumesOnOntapManagedStorage(vmId)) {
+            return StrategyPriority.HIGHEST;
+        }
+
+        return StrategyPriority.CANT_HANDLE;
+    }
+
+    /**
+     * Checks whether all volumes of a VM reside on ONTAP managed primary 
storage.
+     */
+    boolean allVolumesOnOntapManagedStorage(long vmId) {
+        UserVm userVm = userVmDao.findById(vmId);
+        if (userVm == null) {
+            logger.debug("allVolumesOnOntapManagedStorage: VM with id [{}] not 
found", vmId);
+            return false;
+        }
+
+        if (!Hypervisor.HypervisorType.KVM.equals(userVm.getHypervisorType())) 
{
+            logger.debug("allVolumesOnOntapManagedStorage: ONTAP VM snapshot 
strategy only supports KVM hypervisor, VM [{}] uses [{}]",
+                    vmId, userVm.getHypervisorType());
+            return false;
+        }
+
+        // ONTAP VM snapshots work for both Running and Stopped VMs.
+        // Running VMs may be frozen/thawed (if quiesce is requested).
+        // Stopped VMs don't need freeze/thaw - just take the FlexVol snapshot 
directly.
+        VirtualMachine.State vmState = userVm.getState();
+        if (!VirtualMachine.State.Running.equals(vmState) && 
!VirtualMachine.State.Stopped.equals(vmState)) {
+            logger.info("allVolumesOnOntapManagedStorage: ONTAP VM snapshot 
strategy requires VM to be Running or Stopped, VM [{}] is in state [{}], 
returning false",
+                    vmId, vmState);
+            return false;
+        }
+
+        List<VolumeVO> volumes = volumeDao.findByInstance(vmId);
+        if (volumes == null || volumes.isEmpty()) {
+            logger.debug("allVolumesOnOntapManagedStorage: No volumes found 
for VM [{}]", vmId);
+            return false;
+        }
+
+        for (VolumeVO volume : volumes) {
+            if (volume.getPoolId() == null) {
+                return false;
+            }
+            StoragePoolVO pool = storagePool.findById(volume.getPoolId());
+            if (pool == null) {
+                return false;
+            }
+            if (!pool.isManaged()) {
+                logger.debug("allVolumesOnOntapManagedStorage: Volume [{}] is 
on non-managed storage pool [{}], not ONTAP",
+                        volume.getId(), pool.getName());
+                return false;
+            }
+            if 
(!OntapStorageConstants.ONTAP_PLUGIN_NAME.equals(pool.getStorageProviderName()))
 {
+                logger.debug("allVolumesOnOntapManagedStorage: Volume [{}] is 
on managed pool [{}] with provider [{}], not ONTAP",
+                        volume.getId(), pool.getName(), 
pool.getStorageProviderName());
+                return false;
+            }
+        }
+
+        logger.debug("allVolumesOnOntapManagedStorage: All volumes of VM [{}] 
are on ONTAP managed storage, this strategy can handle", vmId);
+        return true;
+    }
+
+    // 
──────────────────────────────────────────────────────────────────────────
+    // Take VM Snapshot (FlexVolume-level)
+    // 
──────────────────────────────────────────────────────────────────────────
+
+    /**
+     * Takes a VM-level snapshot by freezing the VM, creating ONTAP 
FlexVolume-level
+     * snapshots (one per unique FlexVolume), and then thawing the VM.
+     *
+     * <p>Volumes are grouped by their parent FlexVolume UUID (from storage 
pool details).
+     * For each unique FlexVolume, exactly one ONTAP snapshot is created via
+     * {@code POST /api/storage/volumes/{uuid}/snapshots}. This means if a VM 
has
+     * ROOT and DATA disks on the same FlexVolume, only one snapshot is 
created.</p>
+     *
+     * <p><b>Memory Snapshots Not Supported:</b> This strategy only supports 
disk-only
+     * (crash-consistent) snapshots. Memory snapshots (snapshotmemory=true) 
are rejected
+     * with a clear error message. This is because ONTAP FlexVolume snapshots 
capture disk
+     * state only, and allowing mixed snapshot chains (ONTAP disk + libvirt 
memory) would
+     * cause issues during revert operations.</p>
+     *
+     * @throws CloudRuntimeException if memory snapshot is requested
+     */
+    @Override
+    public VMSnapshot takeVMSnapshot(VMSnapshot vmSnapshot) {
+        Long hostId = vmSnapshotHelper.pickRunningHost(vmSnapshot.getVmId());
+        UserVm userVm = userVmDao.findById(vmSnapshot.getVmId());
+        VMSnapshotVO vmSnapshotVO = (VMSnapshotVO) vmSnapshot;
+
+        // Transition to Creating state FIRST - this is required so that the 
finally block
+        // can properly transition to Error state via OperationFailed event if 
anything fails.
+        // (OperationFailed can only transition FROM Creating state, not from 
Allocated)
+        try {
+            vmSnapshotHelper.vmSnapshotStateTransitTo(vmSnapshotVO, 
VMSnapshot.Event.CreateRequested);
+        } catch (NoTransitionException e) {
+            throw new CloudRuntimeException(e.getMessage());
+        }
+
+        FreezeThawVMAnswer freezeAnswer = null;
+        FreezeThawVMCommand thawCmd = null;
+        FreezeThawVMAnswer thawAnswer = null;
+        long startFreeze = 0;
+
+        // Track which FlexVolume snapshots were created (for rollback)
+        List<FlexVolSnapshotDetail> createdSnapshots = new ArrayList<>();
+
+        boolean result = false;
+        try {
+            GuestOSVO guestOS = guestOSDao.findById(userVm.getGuestOSId());
+            List<VolumeObjectTO> volumeTOs = 
vmSnapshotHelper.getVolumeTOList(userVm.getId());
+
+            long prev_chain_size = 0;
+            long virtual_size = 0;
+
+            // Build snapshot parent chain
+            VMSnapshotTO current = null;
+            VMSnapshotVO currentSnapshot = 
vmSnapshotDao.findCurrentSnapshotByVmId(userVm.getId());
+            if (currentSnapshot != null) {
+                current = 
vmSnapshotHelper.getSnapshotWithParents(currentSnapshot);
+            }
+
+            // Respect the user's quiesce option from the VM snapshot request
+            boolean quiescevm = true; // default to true for safety
+            VMSnapshotOptions options = vmSnapshotVO.getOptions();
+            if (options != null) {
+                quiescevm = options.needQuiesceVM();
+            }
+
+            // Check if VM is actually running - freeze/thaw only makes sense 
for running VMs
+            boolean vmIsRunning = 
VirtualMachine.State.Running.equals(userVm.getState());
+            boolean shouldFreezeThaw = quiescevm && vmIsRunning;
+
+            if (!vmIsRunning) {
+                logger.info("takeVMSnapshot: VM [{}] is in state [{}] (not 
Running). Skipping freeze/thaw - " +
+                        "FlexVolume snapshot will be taken directly.", 
userVm.getInstanceName(), userVm.getState());
+            } else if (quiescevm) {
+                logger.info("takeVMSnapshot: Quiesce option is enabled for 
ONTAP VM Snapshot of VM [{}]. " +
+                        "VM file systems will be frozen/thawed for 
application-consistent snapshots.", userVm.getInstanceName());
+            } else {
+                logger.info("takeVMSnapshot: Quiesce option is disabled for 
ONTAP VM Snapshot of VM [{}]. " +
+                        "Snapshots will be crash-consistent only.", 
userVm.getInstanceName());
+            }
+
+            VMSnapshotTO target = new VMSnapshotTO(vmSnapshot.getId(), 
vmSnapshot.getName(),
+                    vmSnapshot.getType(), null, vmSnapshot.getDescription(), 
false, current, quiescevm);
+
+            if (current == null) {
+                vmSnapshotVO.setParent(null);
+            } else {
+                vmSnapshotVO.setParent(current.getId());
+            }
+
+            CreateVMSnapshotCommand ccmd = new CreateVMSnapshotCommand(
+                    userVm.getInstanceName(), userVm.getUuid(), target, 
volumeTOs, guestOS.getDisplayName());
+
+            logger.info("takeVMSnapshot: Creating ONTAP FlexVolume VM Snapshot 
for VM [{}] with quiesce={}", userVm.getInstanceName(), quiescevm);
+
+            // Prepare volume info list and calculate sizes
+            for (VolumeObjectTO volumeObjectTO : volumeTOs) {
+                virtual_size += volumeObjectTO.getSize();
+                VolumeVO volumeVO = volumeDao.findById(volumeObjectTO.getId());
+                prev_chain_size += volumeVO.getVmSnapshotChainSize() == null ? 
0 : volumeVO.getVmSnapshotChainSize();
+            }
+
+            // ── Group volumes by FlexVolume UUID ──
+            Map<String, FlexVolGroupInfo> flexVolGroups = 
groupVolumesByFlexVol(volumeTOs);
+
+            logger.info("takeVMSnapshot: VM [{}] has {} volumes across {} 
unique FlexVolume(s)",
+                    userVm.getInstanceName(), volumeTOs.size(), 
flexVolGroups.size());
+
+            // ── Step 1: Freeze the VM (only if quiescing is requested AND VM 
is running) ──
+            if (shouldFreezeThaw) {
+                FreezeThawVMCommand freezeCommand = new 
FreezeThawVMCommand(userVm.getInstanceName());
+                freezeCommand.setOption(FreezeThawVMCommand.FREEZE);
+                freezeAnswer = (FreezeThawVMAnswer) agentMgr.send(hostId, 
freezeCommand);
+                startFreeze = System.nanoTime();
+
+                thawCmd = new FreezeThawVMCommand(userVm.getInstanceName());
+                thawCmd.setOption(FreezeThawVMCommand.THAW);
+
+                if (freezeAnswer == null || !freezeAnswer.getResult()) {
+                    String detail = (freezeAnswer != null) ? 
freezeAnswer.getDetails() : "no response from agent";
+                    throw new CloudRuntimeException("Could not freeze VM [" + 
userVm.getInstanceName() +
+                            "] for ONTAP snapshot. Ensure qemu-guest-agent is 
installed and running. Details: " + detail);
+                }
+
+                logger.info("takeVMSnapshot: VM [{}] frozen successfully via 
QEMU guest agent", userVm.getInstanceName());
+            } else {
+                logger.info("takeVMSnapshot: Skipping VM freeze for VM [{}] 
(quiesce={}, vmIsRunning={})",
+                        userVm.getInstanceName(), quiescevm, vmIsRunning);
+            }
+
+            // ── Step 2: Create FlexVolume-level snapshots ──
+            try {
+                String snapshotNameBase = buildSnapshotName(vmSnapshot);
+
+                for (Map.Entry<String, FlexVolGroupInfo> entry : 
flexVolGroups.entrySet()) {
+                    String flexVolUuid = entry.getKey();
+                    FlexVolGroupInfo groupInfo = entry.getValue();
+                    long startSnapshot = System.nanoTime();
+
+                    // Build storage strategy from pool details to get the 
feign client
+                    StorageStrategy storageStrategy = 
OntapStorageUtils.getStrategyByStoragePoolDetails(groupInfo.poolDetails);
+                    SnapshotFeignClient snapshotClient = 
storageStrategy.getSnapshotFeignClient();
+                    String authHeader = storageStrategy.getAuthHeader();
+
+                    // Use the same snapshot name for all FlexVolumes in this 
VM snapshot
+                    // (each FlexVolume gets its own independent snapshot with 
this name)
+                    FlexVolSnapshot snapshotRequest = new 
FlexVolSnapshot(snapshotNameBase,
+                            "CloudStack VM snapshot " + vmSnapshot.getName() + 
" for VM " + userVm.getInstanceName());
+
+                    logger.info("takeVMSnapshot: Creating ONTAP FlexVolume 
snapshot [{}] on FlexVol UUID [{}] covering {} volume(s)",
+                            snapshotNameBase, flexVolUuid, 
groupInfo.volumeIds.size());
+
+                    JobResponse jobResponse = 
snapshotClient.createSnapshot(authHeader, flexVolUuid, snapshotRequest);
+                    if (jobResponse == null || jobResponse.getJob() == null) {
+                        throw new CloudRuntimeException("Failed to initiate 
FlexVolume snapshot on FlexVol UUID [" + flexVolUuid + "]");
+                    }
+
+                    // Poll for job completion
+                    Boolean jobSucceeded = 
storageStrategy.jobPollForSuccess(jobResponse.getJob().getUuid(), 30, 2);
+                    if (!jobSucceeded) {
+                        throw new CloudRuntimeException("FlexVolume snapshot 
job failed on FlexVol UUID [" + flexVolUuid + "]");
+                    }
+
+                    // Retrieve the created snapshot UUID by name
+                    String snapshotUuid = resolveSnapshotUuid(snapshotClient, 
authHeader, flexVolUuid, snapshotNameBase);
+
+                    String protocol = 
groupInfo.poolDetails.get(OntapStorageConstants.PROTOCOL);
+
+                    // Create one detail per CloudStack volume in this FlexVol 
group (for single-file restore during revert)
+                    for (Long volumeId : groupInfo.volumeIds) {
+                        String volumePath = resolveVolumePathOnOntap(volumeId, 
protocol, groupInfo.poolDetails);
+                        FlexVolSnapshotDetail detail = new 
FlexVolSnapshotDetail(
+                                flexVolUuid, snapshotUuid, snapshotNameBase, 
volumePath, groupInfo.poolId, protocol);
+                        createdSnapshots.add(detail);
+                    }
+
+                    logger.info("takeVMSnapshot: ONTAP FlexVolume snapshot 
[{}] (uuid={}) on FlexVol [{}] completed in {} ms. Covers volumes: {}",
+                            snapshotNameBase, snapshotUuid, flexVolUuid,
+                            TimeUnit.MILLISECONDS.convert(System.nanoTime() - 
startSnapshot, TimeUnit.NANOSECONDS),
+                            groupInfo.volumeIds);
+                }
+            } finally {
+                // ── Step 3: Thaw the VM (only if it was frozen, always even 
on error) ──
+                if (quiescevm && freezeAnswer != null && 
freezeAnswer.getResult()) {
+                    try {
+                        thawAnswer = (FreezeThawVMAnswer) 
agentMgr.send(hostId, thawCmd);
+                        if (thawAnswer != null && thawAnswer.getResult()) {
+                            logger.info("takeVMSnapshot: VM [{}] thawed 
successfully. Total freeze duration: {} ms",
+                                    userVm.getInstanceName(),
+                                    
TimeUnit.MILLISECONDS.convert(System.nanoTime() - startFreeze, 
TimeUnit.NANOSECONDS));
+                        } else {
+                            logger.warn("takeVMSnapshot: Failed to thaw VM 
[{}]: {}", userVm.getInstanceName(),
+                                    (thawAnswer != null) ? 
thawAnswer.getDetails() : "no response");
+                        }
+                    } catch (Exception thawEx) {
+                        logger.error("takeVMSnapshot: Exception while thawing 
VM [{}]: {}", userVm.getInstanceName(), thawEx.getMessage(), thawEx);
+                    }
+                }
+            }
+
+            // ── Step 4: Persist FlexVolume snapshot details (one row per 
CloudStack volume) ──
+            for (FlexVolSnapshotDetail detail : createdSnapshots) {
+                vmSnapshotDetailsDao.persist(new VMSnapshotDetailsVO(
+                        vmSnapshot.getId(), 
OntapStorageConstants.ONTAP_FLEXVOL_SNAPSHOT, detail.toString(), true));
+            }
+
+            // ── Step 5: Finalize via parent processAnswer ──
+            CreateVMSnapshotAnswer answer = new CreateVMSnapshotAnswer(ccmd, 
true, "");
+            answer.setVolumeTOs(volumeTOs);
+
+            processAnswer(vmSnapshotVO, userVm, answer, null);
+            logger.info("takeVMSnapshot: ONTAP FlexVolume VM Snapshot [{}] 
created successfully for VM [{}] ({} FlexVol snapshot(s))",
+                    vmSnapshot.getName(), userVm.getInstanceName(), 
createdSnapshots.size());
+
+            long new_chain_size = 0;
+            for (VolumeObjectTO volumeTo : answer.getVolumeTOs()) {
+                publishUsageEvent(EventTypes.EVENT_VM_SNAPSHOT_CREATE, 
vmSnapshot, userVm, volumeTo);
+                new_chain_size += volumeTo.getSize();
+            }
+            publishUsageEvent(EventTypes.EVENT_VM_SNAPSHOT_ON_PRIMARY, 
vmSnapshot, userVm,
+                    new_chain_size - prev_chain_size, virtual_size);
+
+            result = true;
+            return vmSnapshot;
+
+        } catch (OperationTimedoutException e) {
+            logger.error("takeVMSnapshot: ONTAP VM Snapshot [{}] timed out: 
{}", vmSnapshot.getName(), e.getMessage());
+            throw new CloudRuntimeException("Creating Instance Snapshot: " + 
vmSnapshot.getName() + " timed out: " + e.getMessage());
+        } catch (AgentUnavailableException e) {
+            logger.error("takeVMSnapshot: ONTAP VM Snapshot [{}] failed, agent 
unavailable: {}", vmSnapshot.getName(), e.getMessage());
+            throw new CloudRuntimeException("Creating Instance Snapshot: " + 
vmSnapshot.getName() + " failed: " + e.getMessage());
+        } catch (CloudRuntimeException e) {
+            throw e;
+        } finally {
+            if (!result) {
+                // Rollback all FlexVolume snapshots created so far 
(deduplicate by FlexVol+Snapshot)
+                Map<String, Boolean> rolledBack = new HashMap<>();
+                for (FlexVolSnapshotDetail detail : createdSnapshots) {
+                    String dedupeKey = detail.flexVolUuid + "::" + 
detail.snapshotUuid;
+                    if (!rolledBack.containsKey(dedupeKey)) {
+                        try {
+                            rollbackFlexVolSnapshot(detail);
+                            rolledBack.put(dedupeKey, Boolean.TRUE);
+                        } catch (Exception rollbackEx) {
+                            logger.error("takeVMSnapshot: Failed to rollback 
FlexVol snapshot [{}] on FlexVol [{}]: {}",
+                                    detail.snapshotUuid, detail.flexVolUuid, 
rollbackEx.getMessage());
+                        }
+                    }
+                }
+
+                // Ensure VM is thawed if we haven't done so
+                if (thawAnswer == null && freezeAnswer != null && 
freezeAnswer.getResult()) {
+                    try {
+                        logger.info("takeVMSnapshot: Thawing VM [{}] during 
error cleanup", userVm.getInstanceName());
+                        thawAnswer = (FreezeThawVMAnswer) 
agentMgr.send(hostId, thawCmd);
+                    } catch (Exception ex) {
+                        logger.error("takeVMSnapshot: Could not thaw VM during 
cleanup: {}", ex.getMessage());
+                    }
+                }
+
+                // Clean up VM snapshot details and transition state
+                try {
+                    List<VMSnapshotDetailsVO> vmSnapshotDetails = 
vmSnapshotDetailsDao.listDetails(vmSnapshot.getId());
+                    for (VMSnapshotDetailsVO detail : vmSnapshotDetails) {
+                        if 
(OntapStorageConstants.ONTAP_FLEXVOL_SNAPSHOT.equals(detail.getName())) {
+                            vmSnapshotDetailsDao.remove(detail.getId());
+                        }
+                    }
+                    vmSnapshotHelper.vmSnapshotStateTransitTo(vmSnapshot, 
VMSnapshot.Event.OperationFailed);
+                } catch (NoTransitionException e1) {
+                    logger.error("takeVMSnapshot: Cannot set VM Snapshot state 
to OperationFailed: {}", e1.getMessage());
+                }
+            }
+        }
+    }
+
+    // 
──────────────────────────────────────────────────────────────────────────
+    // Delete VM Snapshot
+    // 
──────────────────────────────────────────────────────────────────────────
+
+    @Override
+    public boolean deleteVMSnapshot(VMSnapshot vmSnapshot) {
+        VMSnapshotVO vmSnapshotVO = (VMSnapshotVO) vmSnapshot;
+        UserVm userVm = userVmDao.findById(vmSnapshot.getVmId());
+
+        try {
+            vmSnapshotHelper.vmSnapshotStateTransitTo(vmSnapshotVO, 
VMSnapshot.Event.ExpungeRequested);
+        } catch (NoTransitionException e) {
+            throw new CloudRuntimeException(e.getMessage());
+        }
+
+        try {
+            List<VolumeObjectTO> volumeTOs = 
vmSnapshotHelper.getVolumeTOList(userVm.getId());
+            String vmInstanceName = userVm.getInstanceName();
+            VMSnapshotTO parent = 
vmSnapshotHelper.getSnapshotWithParents(vmSnapshotVO).getParent();
+
+            VMSnapshotTO vmSnapshotTO = new VMSnapshotTO(vmSnapshotVO.getId(), 
vmSnapshotVO.getName(), vmSnapshotVO.getType(),
+                    vmSnapshotVO.getCreated().getTime(), 
vmSnapshotVO.getDescription(), vmSnapshotVO.getCurrent(), parent, true);
+            GuestOSVO guestOS = guestOSDao.findById(userVm.getGuestOSId());
+            DeleteVMSnapshotCommand deleteSnapshotCommand = new 
DeleteVMSnapshotCommand(vmInstanceName, vmSnapshotTO,
+                    volumeTOs, guestOS.getDisplayName());
+
+            // Check for FlexVolume snapshots (new approach)
+            List<VMSnapshotDetailsVO> flexVolDetails = 
vmSnapshotDetailsDao.findDetails(vmSnapshot.getId(), 
OntapStorageConstants.ONTAP_FLEXVOL_SNAPSHOT);
+            if (CollectionUtils.isNotEmpty(flexVolDetails)) {
+                deleteFlexVolSnapshots(flexVolDetails);
+            }
+
+            // Also handle legacy STORAGE_SNAPSHOT details (backward 
compatibility)

Review Comment:
   Yes, this is incorporated as part of the snapshot deletion workflow to 
ensure a clean removal, particularly deleting the snapshots created at the 
storage to back CLoudStack UI side of snapshot.



##########
plugins/storage/volume/ontap/src/main/java/org/apache/cloudstack/storage/driver/OntapPrimaryDatastoreDriver.java:
##########
@@ -65,14 +108,222 @@ public DataTO getTO(DataObject data) {
     @Override
     public DataStoreTO getStoreTO(DataStore store) { return null; }
 
+    @Override
+    public boolean volumesRequireGrantAccessWhenUsed() {
+        logger.info("OntapPrimaryDatastoreDriver: 
volumesRequireGrantAccessWhenUsed: Called");
+        return true;
+    }
+
+    /**
+     * Creates a volume on the ONTAP storage system.
+     */
     @Override
     public void createAsync(DataStore dataStore, DataObject dataObject, 
AsyncCompletionCallback<CreateCmdResult> callback) {
-        throw new UnsupportedOperationException("Create operation is not 
supported for ONTAP primary storage.");
+        CreateCmdResult createCmdResult = null;
+        String errMsg;
+
+        if (dataObject == null) {
+            throw new InvalidParameterValueException("dataObject should not be 
null");
+        }
+        if (dataStore == null) {
+            throw new InvalidParameterValueException("dataStore should not be 
null");
+        }
+        if (callback == null) {
+            throw new InvalidParameterValueException("callback should not be 
null");
+        }

Review Comment:
   The intent behind keeping them separate was to gain better insight into 
which specific object was null and caused the exception.
   If we consider modularizing this approach, we could structure it more 
cleanly, something along the lines of the sample below ?
   
   requireNotNull(dataObject, "dataObject");
   requireNotNull(dataStore, "dataStore");
   requireNotNull(callback, "callback");
   .
   .
   .
   
    private void requireNotNull(Object value, String parameterName) {
           if (value == null) {
               throw new InvalidParameterValueException(parameterName + " 
should not be null");
           }
       }



##########
engine/storage/snapshot/src/main/java/org/apache/cloudstack/storage/vmsnapshot/KvmFileBasedStorageVmSnapshotStrategy.java:
##########
@@ -325,8 +326,13 @@ public StrategyPriority canHandle(Long vmId, Long 
rootPoolId, boolean snapshotMe
         List<VolumeVO> volumes = volumeDao.findByInstance(vmId);
         for (VolumeVO volume : volumes) {
             StoragePoolVO storagePoolVO = 
storagePool.findById(volume.getPoolId());
+            if (storagePoolVO.isManaged() && 
DataStoreProvider.ONTAP_PLUGIN_NAME.equals(storagePoolVO.getStorageProviderName()))
 {

Review Comment:
   Hi @weizhouapache , we have it pipelined for this task of adding 
NetApp-specific protocols. IN the interest of deadlines for 4.23. We proceeded 
with default protocols.



##########
plugins/storage/volume/ontap/src/main/java/org/apache/cloudstack/storage/driver/OntapPrimaryDatastoreDriver.java:
##########
@@ -98,14 +349,231 @@ public ChapInfo getChapInfo(DataObject dataObject) {
         return null;
     }
 
+    /**
+     * Grants a host access to a volume.
+     */
     @Override
     public boolean grantAccess(DataObject dataObject, Host host, DataStore 
dataStore) {
-       return false;
+        try {
+            if (dataStore == null) {
+                throw new InvalidParameterValueException("dataStore should not 
be null");
+            }
+            if (dataObject == null) {
+                throw new InvalidParameterValueException("dataObject should 
not be null");
+            }
+            if (host == null) {
+                throw new InvalidParameterValueException("host should not be 
null");
+            }
+
+            StoragePoolVO storagePool = 
storagePoolDao.findById(dataStore.getId());
+            if (storagePool == null) {
+                logger.error("grantAccess: Storage Pool not found for id: " + 
dataStore.getId());
+                throw new CloudRuntimeException("Storage Pool not found for 
id: " + dataStore.getId());
+            }
+
+            // ONTAP managed storage only supports cluster and zone scoped 
pools
+            if (storagePool.getScope() != ScopeType.CLUSTER && 
storagePool.getScope() != ScopeType.ZONE) {
+                logger.error("grantAccess: Only Cluster and Zone scoped 
primary storage is supported for storage Pool: " + storagePool.getName());
+                throw new CloudRuntimeException("Only Cluster and Zone scoped 
primary storage is supported for Storage Pool: " + storagePool.getName());
+            }
+
+            if (dataObject.getType() == DataObjectType.VOLUME) {
+                VolumeVO volumeVO = volumeDao.findById(dataObject.getId());
+                if (volumeVO == null) {
+                    logger.error("grantAccess: CloudStack Volume not found for 
id: " + dataObject.getId());
+                    throw new CloudRuntimeException("CloudStack Volume not 
found for id: " + dataObject.getId());
+                }
+
+                Map<String, String> details = 
storagePoolDetailsDao.listDetailsKeyPairs(storagePool.getId());
+                String svmName = details.get(OntapStorageConstants.SVM_NAME);
+
+                if 
(ProtocolType.ISCSI.name().equalsIgnoreCase(details.get(OntapStorageConstants.PROTOCOL)))
 {
+                    // Only retrieve LUN name for iSCSI volumes
+                    String cloudStackVolumeName = 
volumeDetailsDao.findDetail(volumeVO.getId(), 
OntapStorageConstants.LUN_DOT_NAME).getValue();
+                    UnifiedSANStrategy sanStrategy = (UnifiedSANStrategy) 
OntapStorageUtils.getStrategyByStoragePoolDetails(details);
+                    String accessGroupName = 
OntapStorageUtils.getIgroupName(svmName, host.getName());
+
+                    // Validate if Igroup exist ONTAP for this host as we may 
be using delete_on_unmap= true and igroup may be deleted by ONTAP automatically
+                    Map<String, String> getAccessGroupMap = Map.of(
+                            OntapStorageConstants.NAME, accessGroupName,
+                            OntapStorageConstants.SVM_DOT_NAME, svmName
+                    );
+                    AccessGroup accessGroup = 
sanStrategy.getAccessGroup(getAccessGroupMap);
+                    if(accessGroup == null || accessGroup.getIgroup() == null) 
{
+                        logger.info("grantAccess: Igroup {} does not exist for 
the host {} : Need to create Igroup for the host ", accessGroupName, 
host.getName());
+                        // create the igroup for the host and perform 
lun-mapping
+                        accessGroup = new AccessGroup();
+                        List<HostVO> hosts = new ArrayList<>();
+                        hosts.add((HostVO) host);
+                        accessGroup.setHostsToConnect(hosts);
+                        accessGroup.setStoragePoolId(storagePool.getId());
+                        accessGroup = 
sanStrategy.createAccessGroup(accessGroup);
+                    }else{
+                        logger.info("grantAccess: Igroup {} already exist for 
the host {}: ", accessGroup.getIgroup().getName() ,host.getName());
+                        /* TODO Below cases will be covered later, for now 
they will be a pre-requisite on customer side
+                          1. Igroup exist with the same name but host 
initiator has been rempved
+                          2.  Igroup exist with the same name but host 
initiator has been changed may be due to new NIC or new adapter
+                          In both cases we need to verify current host 
initiator is registered in the igroup before allowing access
+                          Incase it is not , add it and proceed for lun-mapping
+                         */
+                    }
+                    logger.info("grantAccess: Igroup {}  is present now with 
initiators {} ", accessGroup.getIgroup().getName(), 
accessGroup.getIgroup().getInitiators());
+                    // Create or retrieve existing LUN mapping
+                    String lunNumber = sanStrategy.ensureLunMapped(svmName, 
cloudStackVolumeName, accessGroupName);
+
+                    // Update volume path if changed (e.g., after migration or 
re-mapping)
+                    String iscsiPath = OntapStorageConstants.SLASH + 
storagePool.getPath() + OntapStorageConstants.SLASH + lunNumber;
+                    if (volumeVO.getPath() == null || 
!volumeVO.getPath().equals(iscsiPath)) {
+                        volumeVO.set_iScsiName(iscsiPath);
+                        volumeVO.setPath(iscsiPath);
+                    }

Review Comment:
   Sure



##########
plugins/storage/volume/ontap/src/main/java/org/apache/cloudstack/storage/driver/OntapPrimaryDatastoreDriver.java:
##########
@@ -98,14 +349,231 @@ public ChapInfo getChapInfo(DataObject dataObject) {
         return null;
     }
 
+    /**
+     * Grants a host access to a volume.
+     */
     @Override
     public boolean grantAccess(DataObject dataObject, Host host, DataStore 
dataStore) {
-       return false;
+        try {
+            if (dataStore == null) {
+                throw new InvalidParameterValueException("dataStore should not 
be null");
+            }
+            if (dataObject == null) {
+                throw new InvalidParameterValueException("dataObject should 
not be null");
+            }
+            if (host == null) {
+                throw new InvalidParameterValueException("host should not be 
null");
+            }
+
+            StoragePoolVO storagePool = 
storagePoolDao.findById(dataStore.getId());
+            if (storagePool == null) {
+                logger.error("grantAccess: Storage Pool not found for id: " + 
dataStore.getId());
+                throw new CloudRuntimeException("Storage Pool not found for 
id: " + dataStore.getId());
+            }
+
+            // ONTAP managed storage only supports cluster and zone scoped 
pools
+            if (storagePool.getScope() != ScopeType.CLUSTER && 
storagePool.getScope() != ScopeType.ZONE) {
+                logger.error("grantAccess: Only Cluster and Zone scoped 
primary storage is supported for storage Pool: " + storagePool.getName());
+                throw new CloudRuntimeException("Only Cluster and Zone scoped 
primary storage is supported for Storage Pool: " + storagePool.getName());
+            }
+
+            if (dataObject.getType() == DataObjectType.VOLUME) {
+                VolumeVO volumeVO = volumeDao.findById(dataObject.getId());
+                if (volumeVO == null) {
+                    logger.error("grantAccess: CloudStack Volume not found for 
id: " + dataObject.getId());
+                    throw new CloudRuntimeException("CloudStack Volume not 
found for id: " + dataObject.getId());
+                }
+
+                Map<String, String> details = 
storagePoolDetailsDao.listDetailsKeyPairs(storagePool.getId());
+                String svmName = details.get(OntapStorageConstants.SVM_NAME);
+
+                if 
(ProtocolType.ISCSI.name().equalsIgnoreCase(details.get(OntapStorageConstants.PROTOCOL)))
 {
+                    // Only retrieve LUN name for iSCSI volumes
+                    String cloudStackVolumeName = 
volumeDetailsDao.findDetail(volumeVO.getId(), 
OntapStorageConstants.LUN_DOT_NAME).getValue();
+                    UnifiedSANStrategy sanStrategy = (UnifiedSANStrategy) 
OntapStorageUtils.getStrategyByStoragePoolDetails(details);
+                    String accessGroupName = 
OntapStorageUtils.getIgroupName(svmName, host.getName());
+
+                    // Validate if Igroup exist ONTAP for this host as we may 
be using delete_on_unmap= true and igroup may be deleted by ONTAP automatically
+                    Map<String, String> getAccessGroupMap = Map.of(
+                            OntapStorageConstants.NAME, accessGroupName,
+                            OntapStorageConstants.SVM_DOT_NAME, svmName
+                    );
+                    AccessGroup accessGroup = 
sanStrategy.getAccessGroup(getAccessGroupMap);
+                    if(accessGroup == null || accessGroup.getIgroup() == null) 
{
+                        logger.info("grantAccess: Igroup {} does not exist for 
the host {} : Need to create Igroup for the host ", accessGroupName, 
host.getName());
+                        // create the igroup for the host and perform 
lun-mapping
+                        accessGroup = new AccessGroup();
+                        List<HostVO> hosts = new ArrayList<>();
+                        hosts.add((HostVO) host);
+                        accessGroup.setHostsToConnect(hosts);
+                        accessGroup.setStoragePoolId(storagePool.getId());
+                        accessGroup = 
sanStrategy.createAccessGroup(accessGroup);
+                    }else{
+                        logger.info("grantAccess: Igroup {} already exist for 
the host {}: ", accessGroup.getIgroup().getName() ,host.getName());
+                        /* TODO Below cases will be covered later, for now 
they will be a pre-requisite on customer side
+                          1. Igroup exist with the same name but host 
initiator has been rempved
+                          2.  Igroup exist with the same name but host 
initiator has been changed may be due to new NIC or new adapter
+                          In both cases we need to verify current host 
initiator is registered in the igroup before allowing access
+                          Incase it is not , add it and proceed for lun-mapping
+                         */
+                    }
+                    logger.info("grantAccess: Igroup {}  is present now with 
initiators {} ", accessGroup.getIgroup().getName(), 
accessGroup.getIgroup().getInitiators());
+                    // Create or retrieve existing LUN mapping
+                    String lunNumber = sanStrategy.ensureLunMapped(svmName, 
cloudStackVolumeName, accessGroupName);
+
+                    // Update volume path if changed (e.g., after migration or 
re-mapping)
+                    String iscsiPath = OntapStorageConstants.SLASH + 
storagePool.getPath() + OntapStorageConstants.SLASH + lunNumber;
+                    if (volumeVO.getPath() == null || 
!volumeVO.getPath().equals(iscsiPath)) {
+                        volumeVO.set_iScsiName(iscsiPath);
+                        volumeVO.setPath(iscsiPath);
+                    }
+                } else if 
(ProtocolType.NFS3.name().equalsIgnoreCase(details.get(OntapStorageConstants.PROTOCOL)))
 {
+                    // For NFS, no access grant needed - file is accessible 
via mount
+                    logger.debug("grantAccess: NFS volume [{}], no igroup 
mapping required", volumeVO.getUuid());
+                    return true;
+                }
+                volumeVO.setPoolType(storagePool.getPoolType());
+                volumeVO.setPoolId(storagePool.getId());
+                volumeDao.update(volumeVO.getId(), volumeVO);
+            } else {
+                logger.error("Invalid DataObjectType (" + dataObject.getType() 
+ ") passed to grantAccess");
+                throw new CloudRuntimeException("Invalid DataObjectType (" + 
dataObject.getType() + ") passed to grantAccess");
+            }
+            return true;
+        } catch (Exception e) {
+            logger.error("grantAccess: Failed for dataObject [{}]: {}", 
dataObject, e.getMessage());
+            throw new CloudRuntimeException("Failed with error: " + 
e.getMessage(), e);
+        }
     }
 
+    /**
+     * Revokes a host's access to a volume.
+     */
     @Override
     public void revokeAccess(DataObject dataObject, Host host, DataStore 
dataStore) {
-        throw new UnsupportedOperationException("Revoke access operation is 
not supported for ONTAP primary storage.");
+        try {
+            if (dataStore == null) {
+                throw new InvalidParameterValueException("dataStore should not 
be null");
+            }
+            if (dataObject == null) {
+                throw new InvalidParameterValueException("dataObject should 
not be null");
+            }
+            if (host == null) {
+                throw new InvalidParameterValueException("host should not be 
null");
+            }
+
+            StoragePoolVO storagePool = 
storagePoolDao.findById(dataStore.getId());
+            if (storagePool == null) {
+                logger.error("revokeAccess: Storage Pool not found for id: " + 
dataStore.getId());
+                throw new CloudRuntimeException("Storage Pool not found for 
id: " + dataStore.getId());
+            }
+
+            if (storagePool.getScope() != ScopeType.CLUSTER && 
storagePool.getScope() != ScopeType.ZONE) {
+                logger.error("revokeAccess: Only Cluster and Zone scoped 
primary storage is supported for storage Pool: " + storagePool.getName());
+                throw new CloudRuntimeException("Only Cluster and Zone scoped 
primary storage is supported for Storage Pool: " + storagePool.getName());
+            }
+
+            if (dataObject.getType() == DataObjectType.VOLUME) {
+                VolumeVO volumeVO = volumeDao.findById(dataObject.getId());
+                if (volumeVO == null) {
+                    logger.error("revokeAccess: CloudStack Volume not found 
for id: " + dataObject.getId());
+                    throw new CloudRuntimeException("CloudStack Volume not 
found for id: " + dataObject.getId());
+                }
+                revokeAccessForVolume(storagePool, volumeVO, host);
+            } else {
+                logger.error("revokeAccess: Invalid DataObjectType (" + 
dataObject.getType() + ") passed to revokeAccess");
+                throw new CloudRuntimeException("Invalid DataObjectType (" + 
dataObject.getType() + ") passed to revokeAccess");
+            }
+        } catch (Exception e) {
+            logger.error("revokeAccess: Failed for dataObject [{}]: {}", 
dataObject, e.getMessage());
+            throw new CloudRuntimeException("Failed with error: " + 
e.getMessage(), e);
+        }
+    }
+
+    /**
+     * Revokes volume access for the specified host.
+     */
+    private void revokeAccessForVolume(StoragePoolVO storagePool, VolumeVO 
volumeVO, Host host) {
+        logger.info("revokeAccessForVolume: Revoking access to volume [{}] for 
host [{}]", volumeVO.getName(), host.getName());
+
+        Map<String, String> details = 
storagePoolDetailsDao.listDetailsKeyPairs(storagePool.getId());
+        StorageStrategy storageStrategy = 
OntapStorageUtils.getStrategyByStoragePoolDetails(details);
+        String svmName = details.get(OntapStorageConstants.SVM_NAME);
+
+        if 
(ProtocolType.ISCSI.name().equalsIgnoreCase(details.get(OntapStorageConstants.PROTOCOL)))
 {
+            String accessGroupName = OntapStorageUtils.getIgroupName(svmName, 
host.getName());
+
+            // Retrieve LUN name from volume details; if missing, volume may 
not have been fully created
+            VolumeDetailVO lunDetail = 
volumeDetailsDao.findDetail(volumeVO.getId(), 
OntapStorageConstants.LUN_DOT_NAME);
+            String lunName = lunDetail != null ? lunDetail.getValue() : null;
+            if (lunName == null) {
+                logger.warn("revokeAccessForVolume: No LUN name found for 
volume [{}]; skipping revoke", volumeVO.getId());
+                return;
+            }
+
+            // Verify LUN still exists on ONTAP (may have been manually 
deleted)
+            CloudStackVolume cloudStackVolume = 
getCloudStackVolumeByName(storageStrategy, svmName, lunName);
+            if (cloudStackVolume == null || cloudStackVolume.getLun() == null 
|| cloudStackVolume.getLun().getUuid() == null) {
+                logger.warn("revokeAccessForVolume: LUN for volume [{}] not 
found on ONTAP, skipping revoke", volumeVO.getId());
+                return;
+            }
+
+            // Verify igroup still exists on ONTAP
+            AccessGroup accessGroup = getAccessGroupByName(storageStrategy, 
svmName, accessGroupName);
+            if (accessGroup == null || accessGroup.getIgroup() == null || 
accessGroup.getIgroup().getUuid() == null) {
+                logger.warn("revokeAccessForVolume: iGroup [{}] not found on 
ONTAP, skipping revoke", accessGroupName);
+                return;
+            }
+
+            // Verify host initiator is in the igroup before attempting to 
remove mapping
+            SANStrategy sanStrategy = (UnifiedSANStrategy) storageStrategy;
+            if 
(!sanStrategy.validateInitiatorInAccessGroup(host.getStorageUrl(), svmName, 
accessGroup.getIgroup())) {
+                logger.warn("revokeAccessForVolume: Initiator [{}] is not in 
iGroup [{}], skipping revoke",
+                        host.getStorageUrl(), accessGroupName);
+                return;
+            }
+
+            // Remove the LUN mapping from the igroup
+            Map<String, String> disableLogicalAccessMap = new HashMap<>();
+            disableLogicalAccessMap.put(OntapStorageConstants.LUN_DOT_UUID, 
cloudStackVolume.getLun().getUuid());
+            disableLogicalAccessMap.put(OntapStorageConstants.IGROUP_DOT_UUID, 
accessGroup.getIgroup().getUuid());
+            storageStrategy.disableLogicalAccess(disableLogicalAccessMap);
+
+            logger.info("revokeAccessForVolume: Successfully revoked access to 
LUN [{}] for host [{}]",
+                    lunName, host.getName());

Review Comment:
   From a trade-off perspective, I have kept the implementation as is for now. 
That said, I will modularize it further while preserving the existing logic.



##########
plugins/storage/volume/ontap/src/main/java/org/apache/cloudstack/storage/driver/OntapPrimaryDatastoreDriver.java:
##########
@@ -65,14 +108,235 @@ public DataTO getTO(DataObject data) {
     @Override
     public DataStoreTO getStoreTO(DataStore store) { return null; }
 
+    @Override
+    public boolean volumesRequireGrantAccessWhenUsed(){
+        logger.info("OntapPrimaryDatastoreDriver: 
volumesRequireGrantAccessWhenUsed: Called");
+        return true;
+    }
+
+    /**
+     * Creates a volume on the ONTAP storage system.
+     */
     @Override
     public void createAsync(DataStore dataStore, DataObject dataObject, 
AsyncCompletionCallback<CreateCmdResult> callback) {
-        throw new UnsupportedOperationException("Create operation is not 
supported for ONTAP primary storage.");
+        CreateCmdResult createCmdResult = null;
+        String errMsg;
+
+        if (dataObject == null) {
+            throw new InvalidParameterValueException("dataObject should not be 
null");
+        }
+        if (dataStore == null) {
+            throw new InvalidParameterValueException("dataStore should not be 
null");
+        }
+        if (callback == null) {
+            throw new InvalidParameterValueException("callback should not be 
null");
+        }
+
+        try {
+            logger.info("Started for data store name [{}] and data object name 
[{}] of type [{}]",
+                    dataStore.getName(), dataObject.getName(), 
dataObject.getType());
+
+            StoragePoolVO storagePool = 
storagePoolDao.findById(dataStore.getId());
+            if (storagePool == null) {
+                logger.error("createAsync: Storage Pool not found for id: " + 
dataStore.getId());
+                throw new CloudRuntimeException("Storage Pool not found for 
id: " + dataStore.getId());
+            }
+            String storagePoolUuid = dataStore.getUuid();
+
+            Map<String, String> details = 
storagePoolDetailsDao.listDetailsKeyPairs(dataStore.getId());
+
+            if (dataObject.getType() == DataObjectType.VOLUME) {
+                VolumeInfo volInfo = (VolumeInfo) dataObject;
+
+                // Create the backend storage object (LUN for iSCSI, no-op for 
NFS)
+                CloudStackVolume created = createCloudStackVolume(dataStore, 
volInfo, details);
+
+                // Update CloudStack volume record with storage pool 
association and protocol-specific details
+                VolumeVO volumeVO = volumeDao.findById(volInfo.getId());
+                if (volumeVO != null) {

Review Comment:
   done



##########
plugins/storage/volume/ontap/src/main/java/org/apache/cloudstack/storage/lifecycle/OntapPrimaryDatastoreLifecycle.java:
##########
@@ -401,7 +410,8 @@ private boolean 
validateProtocolSupportAndFetchHostsIdentifier(List<HostVO> host
                 for (HostVO host : hosts) {
                     if (host == null || host.getStorageUrl() == null || 
host.getStorageUrl().trim().isEmpty()
                             || 
!host.getStorageUrl().startsWith(protocolPrefix)) {
-                        return false;
+                        // TODO we will inform customer through alert for 
excluded host because of protocol enabled on host

Review Comment:
   It would be addressed in future PRs



##########
plugins/storage/volume/ontap/src/main/java/org/apache/cloudstack/storage/driver/OntapPrimaryDatastoreDriver.java:
##########
@@ -65,14 +108,235 @@ public DataTO getTO(DataObject data) {
     @Override
     public DataStoreTO getStoreTO(DataStore store) { return null; }
 
+    @Override
+    public boolean volumesRequireGrantAccessWhenUsed(){
+        logger.info("OntapPrimaryDatastoreDriver: 
volumesRequireGrantAccessWhenUsed: Called");
+        return true;
+    }
+
+    /**
+     * Creates a volume on the ONTAP storage system.
+     */
     @Override
     public void createAsync(DataStore dataStore, DataObject dataObject, 
AsyncCompletionCallback<CreateCmdResult> callback) {
-        throw new UnsupportedOperationException("Create operation is not 
supported for ONTAP primary storage.");
+        CreateCmdResult createCmdResult = null;
+        String errMsg;
+
+        if (dataObject == null) {
+            throw new InvalidParameterValueException("dataObject should not be 
null");
+        }
+        if (dataStore == null) {
+            throw new InvalidParameterValueException("dataStore should not be 
null");
+        }
+        if (callback == null) {
+            throw new InvalidParameterValueException("callback should not be 
null");
+        }
+
+        try {
+            logger.info("Started for data store name [{}] and data object name 
[{}] of type [{}]",
+                    dataStore.getName(), dataObject.getName(), 
dataObject.getType());
+
+            StoragePoolVO storagePool = 
storagePoolDao.findById(dataStore.getId());
+            if (storagePool == null) {
+                logger.error("createAsync: Storage Pool not found for id: " + 
dataStore.getId());
+                throw new CloudRuntimeException("Storage Pool not found for 
id: " + dataStore.getId());
+            }
+            String storagePoolUuid = dataStore.getUuid();
+
+            Map<String, String> details = 
storagePoolDetailsDao.listDetailsKeyPairs(dataStore.getId());
+
+            if (dataObject.getType() == DataObjectType.VOLUME) {
+                VolumeInfo volInfo = (VolumeInfo) dataObject;
+
+                // Create the backend storage object (LUN for iSCSI, no-op for 
NFS)
+                CloudStackVolume created = createCloudStackVolume(dataStore, 
volInfo, details);
+
+                // Update CloudStack volume record with storage pool 
association and protocol-specific details
+                VolumeVO volumeVO = volumeDao.findById(volInfo.getId());
+                if (volumeVO != null) {
+                    volumeVO.setPoolType(storagePool.getPoolType());
+                    volumeVO.setPoolId(storagePool.getId());
+
+                    if 
(ProtocolType.ISCSI.name().equalsIgnoreCase(details.get(OntapStorageConstants.PROTOCOL)))
 {
+                        String lunName = created != null && created.getLun() 
!= null ? created.getLun().getName() : null;
+                        if (lunName == null) {
+                            throw new CloudRuntimeException("Missing LUN name 
for volume " + volInfo.getId());
+                        }
+
+                        // Persist LUN details for future operations (delete, 
grant/revoke access)
+                        volumeDetailsDao.addDetail(volInfo.getId(), 
OntapStorageConstants.LUN_DOT_UUID, created.getLun().getUuid(), false);
+                        volumeDetailsDao.addDetail(volInfo.getId(), 
OntapStorageConstants.LUN_DOT_NAME, lunName, false);
+                        if (created.getLun().getUuid() != null) {
+                            volumeVO.setFolder(created.getLun().getUuid());
+                        }
+
+                        logger.info("createAsync: Created LUN [{}] for volume 
[{}]. LUN mapping will occur during grantAccess() to per-host igroup.",
+                                lunName, volumeVO.getId());
+                        createCmdResult = new CreateCmdResult(lunName, new 
Answer(null, true, null));
+                    } else if 
(ProtocolType.NFS3.name().equalsIgnoreCase(details.get(OntapStorageConstants.PROTOCOL)))
 {
+                        createCmdResult = new 
CreateCmdResult(volInfo.getUuid(), new Answer(null, true, null));
+                        logger.info("createAsync: Managed NFS volume [{}] with 
path [{}] associated with pool {}",
+                                volumeVO.getId(), volInfo.getUuid(), 
storagePool.getId());
+                    }
+                    volumeDao.update(volumeVO.getId(), volumeVO);
+                }
+            } else {
+                errMsg = "Invalid DataObjectType (" + dataObject.getType() + 
") passed to createAsync";
+                logger.error(errMsg);
+                throw new CloudRuntimeException(errMsg);
+            }
+        } catch (Exception e) {
+            errMsg = e.getMessage();
+            logger.error("createAsync: Failed for dataObject name [{}]: {}", 
dataObject.getName(), errMsg);
+            createCmdResult = new CreateCmdResult(null, new Answer(null, 
false, errMsg));
+            createCmdResult.setResult(e.toString());
+        } finally {
+            if (createCmdResult != null && createCmdResult.isSuccess()) {
+                logger.info("createAsync: Operation completed successfully for 
{}", dataObject.getType());
+            }
+            callback.complete(createCmdResult);
+        }
+    }
+
+    /**
+     * Creates a volume on the ONTAP backend.
+     */
+    private CloudStackVolume createCloudStackVolume(DataStore dataStore, 
DataObject dataObject, Map<String, String> details) {

Review Comment:
   Done



##########
plugins/storage/volume/ontap/src/main/java/org/apache/cloudstack/storage/lifecycle/OntapPrimaryDatastoreLifecycle.java:
##########
@@ -411,18 +421,18 @@ private boolean 
validateProtocolSupportAndFetchHostsIdentifier(List<HostVO> host
                 for (HostVO host : hosts) {
                     if (host != null) {
                         ip =  host.getStorageIpAddress() != null ? 
host.getStorageIpAddress().trim() : "";
-                        if (ip.isEmpty()) {
-                            if (host.getPrivateIpAddress() == null || 
host.getPrivateIpAddress().trim().isEmpty()) {
-                                return false;
-                            }
-                            ip = host.getPrivateIpAddress().trim();
+                        if (ip.isEmpty() && 
StringUtils.isBlank(host.getPrivateIpAddress() )) {
+                            // TODO we will inform customer through alert for 
excluded host because of protocol enabled on host

Review Comment:
   same as earlier response



##########
plugins/storage/volume/ontap/src/main/java/org/apache/cloudstack/storage/service/UnifiedNASStrategy.java:
##########
@@ -180,11 +212,11 @@ public void disableLogicalAccess(Map<String, String> 
values) {
 
     @Override
     public Map<String, String> getLogicalAccess(Map<String, String> values) {

Review Comment:
   yes, they are expected as empty implementation to comply with parent 
interface 



##########
plugins/storage/volume/ontap/src/main/java/org/apache/cloudstack/storage/service/StorageStrategy.java:
##########
@@ -281,16 +316,32 @@ public Volume createStorageVolume(String volumeName, Long 
size) {
         }
     }
 
+     /**
+     * Updates ONTAP Flex-Volume
+     * Eligible only for Unified ONTAP storage
+     * throw exception in case of disaggregated ONTAP storage
+     *
+     * @param volume the volume to update
+     * @return the updated Volume object
+     */
     public Volume updateStorageVolume(Volume volume) {
         return null;
     }
 
+    /**
+     * Delete ONTAP Flex-Volume
+     * Eligible only for Unified ONTAP storage
+     * throw exception in case of disaggregated ONTAP storage
+     *
+     * @param volume the volume to delete
+     */
     public void deleteStorageVolume(Volume volume) {
         logger.info("Deleting ONTAP volume by name: " + volume.getName() + " 
and uuid: " + volume.getUuid());
         String authHeader = 
OntapStorageUtils.generateAuthHeader(storage.getUsername(), 
storage.getPassword());
         try {
+            // TODO: Implement lun and file deletion, if any, before deleting 
the volume

Review Comment:
   I believe this is already being handled, so it can be removed.



##########
plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/IscsiAdmStorageAdaptor.java:
##########
@@ -158,8 +159,58 @@ public boolean connectPhysicalDisk(String volumeUuid, 
KVMStoragePool pool, Map<S
         return true;
     }
 
+    /**
+     * Checks the result of an iscsiadm node-create command.
+     * Returns true if the node was created or already exists, false on 
failure.
+     */
+    boolean handleNodeCreateResult(String result, String volumeUuid) {
+        if (result == null) {
+            logger.debug("Successfully added iSCSI node for target {}", 
volumeUuid);
+            return true;
+        }
+        String msg = result.toLowerCase();
+        if (msg.contains("already exists") || msg.contains("database exists") 
|| msg.contains("exists")) {
+            logger.debug("iSCSI node already exists for target {}, 
proceeding", volumeUuid);
+            return true;
+        }
+        logger.debug("Failed to add iSCSI node for target {}: {}", volumeUuid, 
result);
+        return false;
+    }
+
+    /**
+     * Checks the result of an iscsiadm login command.
+     * Returns true if the login succeeded or session already exists, false on 
failure.
+     */
+    boolean handleLoginResult(String result, String volumeUuid) {
+        if (result == null) {
+            logger.debug("Successfully logged in to iSCSI target {}", 
volumeUuid);
+            return true;
+        }
+        String msg = result.toLowerCase();
+        if (msg.contains("already present") || msg.contains("already logged 
in") || msg.contains("session exists")) {

Review Comment:
   yes, Same as above



##########
plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/IscsiAdmStorageAdaptor.java:
##########
@@ -238,6 +289,15 @@ public KVMPhysicalDisk getPhysicalDisk(String volumeUuid, 
KVMStoragePool pool) {
     }
 
     private long getDeviceSize(String deviceByPath) {
+        try {
+            if (!Files.exists(Paths.get(deviceByPath))) {
+                logger.debug("Device by-path does not exist yet: " + 
deviceByPath);
+                return 0L;
+            }
+        } catch (Exception ignore) {
+            // If FS check fails for any reason, fall back to blockdev call

Review Comment:
   Sure



##########
plugins/storage/volume/ontap/src/main/java/org/apache/cloudstack/storage/service/UnifiedNASStrategy.java:
##########
@@ -252,19 +285,88 @@ private void assignExportPolicyToVolume(String 
volumeUuid, String policyName) {
                     
Thread.sleep(OntapStorageConstants.CREATE_VOLUME_CHECK_SLEEP_TIME);
                 }
             } catch (Exception e) {
-                logger.error("Exception while updating volume: ", e);
+                logger.error("assignExportPolicyToVolume: Exception while 
updating volume: ", e);
                 throw new CloudRuntimeException("Failed to update volume: " + 
e.getMessage());
             }
-            logger.info("Export policy successfully assigned to volume: {}", 
volumeUuid);
+            logger.info("assignExportPolicyToVolume: Export policy 
successfully assigned to volume: {}", volumeUuid);
         } catch (FeignException e) {
-            logger.error("Failed to assign export policy to volume: {}", 
volumeUuid, e);
+            logger.error("assignExportPolicyToVolume: Failed to assign export 
policy to volume: {}", volumeUuid, e);
             throw new CloudRuntimeException("Failed to assign export policy: " 
+ e.getMessage());
         } catch (Exception e) {
-            logger.error("Exception while assigning export policy to volume: 
{}", volumeUuid, e);
+            logger.error("assignExportPolicyToVolume: Exception while 
assigning export policy to volume: {}", volumeUuid, e);
             throw new CloudRuntimeException("Failed to assign export policy: " 
+ e.getMessage());
         }
     }
 
+    private boolean createFile(String volumeUuid, String filePath, FileInfo 
fileInfo) {
+        logger.info("createFile: Creating file: {} in volume: {}", filePath, 
volumeUuid);
+        try {
+            String authHeader = 
OntapStorageUtils.generateAuthHeader(storage.getUsername(), 
storage.getPassword());
+            nasFeignClient.createFile(authHeader, volumeUuid, filePath, 
fileInfo);
+            logger.info("createFile: File created successfully: {} in volume: 
{}", filePath, volumeUuid);
+            return true;
+        } catch (FeignException e) {
+            logger.error("createFile: Failed to create file: {} in volume: 
{}", filePath, volumeUuid, e);
+            return false;
+        } catch (Exception e) {
+            logger.error("createFile: Exception while creating file: {} in 
volume: {}", filePath, volumeUuid, e);
+            return false;
+        }
+    }
+
+    private boolean deleteFile(String volumeUuid, String filePath) {
+        logger.info("deleteFile: Deleting file: {} from volume: {}", filePath, 
volumeUuid);
+        try {
+            String authHeader = 
OntapStorageUtils.generateAuthHeader(storage.getUsername(), 
storage.getPassword());
+            nasFeignClient.deleteFile(authHeader, volumeUuid, filePath);
+            logger.info("deleteFile: File deleted successfully: {} from 
volume: {}", filePath, volumeUuid);
+            return true;
+        } catch (FeignException e) {
+            logger.error("deleteFile: Failed to delete file: {} from volume: 
{}", filePath, volumeUuid, e);
+            return false;
+        } catch (Exception e) {
+            logger.error("deleteFile: Exception while deleting file: {} from 
volume: {}", filePath, volumeUuid, e);
+            return false;
+        }
+    }
+
+    private OntapResponse<FileInfo> getFileInfo(String volumeUuid, String 
filePath) {
+        logger.debug("getFileInfo: Getting file info for: {} in volume: {}", 
filePath, volumeUuid);
+        try {
+            String authHeader = 
OntapStorageUtils.generateAuthHeader(storage.getUsername(), 
storage.getPassword());
+            OntapResponse<FileInfo> response = 
nasFeignClient.getFileResponse(authHeader, volumeUuid, filePath);
+            logger.debug("getFileInfo: Retrieved file info for: {} in volume: 
{}", filePath, volumeUuid);
+            return response;
+        } catch (FeignException e){
+            if (e.status() == 404) {
+                logger.debug("getFileInfo: File not found: {} in volume: {}", 
filePath, volumeUuid);
+                return null;
+            }
+            logger.error("getFileInfo: Failed to get file info: {} in volume: 
{}", filePath, volumeUuid, e);
+            throw new CloudRuntimeException("Failed to get file info: " + 
e.getMessage());
+        } catch (Exception e){
+            logger.error("getFileInfo: Exception while getting file info: {} 
in volume: {}", filePath, volumeUuid, e);
+            throw new CloudRuntimeException("Failed to get file info: " + 
e.getMessage());
+        }
+    }
+
+    private boolean updateFile(String volumeUuid, String filePath, FileInfo 
fileInfo) {
+        logger.info("updateFile: Updating file: {} in volume: {}", filePath, 
volumeUuid);
+        try {
+            String authHeader = 
OntapStorageUtils.generateAuthHeader(storage.getUsername(), 
storage.getPassword());
+            nasFeignClient.updateFile( authHeader, volumeUuid, filePath, 
fileInfo);
+            logger.info("updateFile: File updated successfully: {} in volume: 
{}", filePath, volumeUuid);
+            return true;
+        } catch (FeignException e) {
+            logger.error("updateFile: Failed to update file: {} in volume: 
{}", filePath, volumeUuid, e);
+            return false;
+        } catch (Exception e){
+            logger.error("updateFile: Exception while updating file: {} in 
volume: {}", filePath, volumeUuid, e);
+            return false;
+        }
+    }
+
+

Review Comment:
   We had planned to use them in future releases and features. For now, I am 
removing them and will reintroduce them when needed.



##########
plugins/storage/volume/ontap/src/main/java/org/apache/cloudstack/storage/driver/OntapPrimaryDatastoreDriver.java:
##########
@@ -65,14 +108,222 @@ public DataTO getTO(DataObject data) {
     @Override
     public DataStoreTO getStoreTO(DataStore store) { return null; }
 
+    @Override
+    public boolean volumesRequireGrantAccessWhenUsed() {
+        logger.info("OntapPrimaryDatastoreDriver: 
volumesRequireGrantAccessWhenUsed: Called");

Review Comment:
   Sure



##########
plugins/storage/volume/ontap/src/main/java/org/apache/cloudstack/storage/vmsnapshot/OntapVMSnapshotStrategy.java:
##########
@@ -0,0 +1,931 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.storage.vmsnapshot;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import javax.inject.Inject;
+import javax.naming.ConfigurationException;
+
+import org.apache.cloudstack.engine.subsystem.api.storage.StrategyPriority;
+import org.apache.cloudstack.engine.subsystem.api.storage.VMSnapshotOptions;
+import org.apache.cloudstack.storage.datastore.db.StoragePoolDetailsDao;
+import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
+import org.apache.cloudstack.storage.feign.client.SnapshotFeignClient;
+import org.apache.cloudstack.storage.feign.model.CliSnapshotRestoreRequest;
+import org.apache.cloudstack.storage.feign.model.FlexVolSnapshot;
+import org.apache.cloudstack.storage.feign.model.response.JobResponse;
+import org.apache.cloudstack.storage.feign.model.response.OntapResponse;
+import org.apache.cloudstack.storage.service.StorageStrategy;
+import org.apache.cloudstack.storage.service.model.ProtocolType;
+import org.apache.cloudstack.storage.to.VolumeObjectTO;
+import org.apache.cloudstack.storage.utils.OntapStorageUtils;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import com.cloud.agent.api.CreateVMSnapshotAnswer;
+import com.cloud.agent.api.CreateVMSnapshotCommand;
+import com.cloud.agent.api.DeleteVMSnapshotAnswer;
+import com.cloud.agent.api.DeleteVMSnapshotCommand;
+import com.cloud.agent.api.FreezeThawVMAnswer;
+import com.cloud.agent.api.FreezeThawVMCommand;
+import com.cloud.agent.api.RevertToVMSnapshotAnswer;
+import com.cloud.agent.api.RevertToVMSnapshotCommand;
+import com.cloud.agent.api.VMSnapshotTO;
+import com.cloud.event.EventTypes;
+import com.cloud.exception.AgentUnavailableException;
+import com.cloud.exception.OperationTimedoutException;
+import com.cloud.hypervisor.Hypervisor;
+import com.cloud.storage.GuestOSVO;
+import com.cloud.storage.VolumeDetailVO;
+import com.cloud.storage.VolumeVO;
+import com.cloud.storage.dao.VolumeDetailsDao;
+import com.cloud.uservm.UserVm;
+import com.cloud.utils.exception.CloudRuntimeException;
+import com.cloud.utils.fsm.NoTransitionException;
+import com.cloud.vm.VirtualMachine;
+import com.cloud.vm.snapshot.VMSnapshot;
+import com.cloud.vm.snapshot.VMSnapshotDetailsVO;
+import com.cloud.vm.snapshot.VMSnapshotVO;
+import org.apache.cloudstack.storage.utils.OntapStorageConstants;
+
+/**
+ * VM Snapshot strategy for NetApp ONTAP managed storage using 
FlexVolume-level snapshots.
+ *
+ * <p>This strategy handles VM-level (instance) snapshots for VMs whose volumes
+ * reside on ONTAP managed primary storage. Instead of creating per-file clones
+ * (the old approach), it takes <b>ONTAP FlexVolume-level snapshots</b> via the
+ * ONTAP REST API ({@code POST /api/storage/volumes/{uuid}/snapshots}).</p>
+ *
+ * <h3>Key Advantage:</h3>
+ * <p>When multiple CloudStack disks (ROOT + DATA) reside on the same ONTAP
+ * FlexVolume, a single FlexVolume snapshot atomically captures all of them.
+ * This is both faster and more storage-efficient than per-file clones.</p>
+ *
+ * <h3>Flow:</h3>
+ * <ol>
+ *   <li>Group all VM volumes by their parent FlexVolume UUID</li>
+ *   <li>Freeze the VM via QEMU guest agent ({@code fsfreeze}) — if quiesce 
requested</li>
+ *   <li>For each unique FlexVolume, create one ONTAP snapshot</li>
+ *   <li>Thaw the VM</li>
+ *   <li>Record FlexVolume → snapshot UUID mappings in {@code 
vm_snapshot_details}</li>
+ * </ol>
+ *
+ * <h3>Metadata in vm_snapshot_details:</h3>
+ * <p>Each FlexVolume snapshot is stored as a detail row with:
+ * <ul>
+ *   <li>name = {@value OntapStorageConstants#ONTAP_FLEXVOL_SNAPSHOT}</li>
+ *   <li>value = {@code 
"<flexVolUuid>::<snapshotUuid>::<snapshotName>::<volumePath>::<poolId>::<protocol>"}</li>
+ * </ul>
+ * One row is persisted per CloudStack volume (not per FlexVolume) so that the
+ * revert operation can restore individual files/LUNs using the ONTAP Snapshot
+ * File Restore API ({@code POST 
/api/storage/volumes/{vol}/snapshots/{snap}/files/{path}/restore}).</p>
+ *
+ * <h3>Strategy Selection:</h3>
+ * <p>Returns {@code StrategyPriority.HIGHEST} when:</p>
+ * <ul>
+ *   <li>Hypervisor is KVM</li>
+ *   <li>Snapshot type is Disk-only (no memory)</li>
+ *   <li>All VM volumes are on ONTAP managed primary storage</li>
+ * </ul>
+ */
+public class OntapVMSnapshotStrategy extends StorageVMSnapshotStrategy {
+
+    private static final Logger logger = 
LogManager.getLogger(OntapVMSnapshotStrategy.class);
+
+    /** Separator used in the vm_snapshot_details value to delimit FlexVol 
UUID, snapshot UUID, snapshot name, and pool ID. */
+    static final String DETAIL_SEPARATOR = "::";
+
+    @Inject
+    private StoragePoolDetailsDao storagePoolDetailsDao;
+
+    @Inject
+    private VolumeDetailsDao volumeDetailsDao;
+
+    @Override
+    public boolean configure(String name, Map<String, Object> params) throws 
ConfigurationException {
+        return super.configure(name, params);
+    }
+
+    // 
──────────────────────────────────────────────────────────────────────────
+    // Strategy Selection
+    // 
──────────────────────────────────────────────────────────────────────────
+
+    @Override
+    public StrategyPriority canHandle(VMSnapshot vmSnapshot) {
+        VMSnapshotVO vmSnapshotVO = (VMSnapshotVO) vmSnapshot;
+
+        // For existing (non-Allocated) snapshots, check if we created them
+        if (!VMSnapshot.State.Allocated.equals(vmSnapshotVO.getState())) {
+            // Check for our FlexVolume snapshot details first
+            List<VMSnapshotDetailsVO> flexVolDetails = 
vmSnapshotDetailsDao.findDetails(vmSnapshot.getId(), 
OntapStorageConstants.ONTAP_FLEXVOL_SNAPSHOT);
+            if (CollectionUtils.isNotEmpty(flexVolDetails)) {
+                // Verify the volumes are still on ONTAP storage
+                if (allVolumesOnOntapManagedStorage(vmSnapshot.getVmId())) {
+                    return StrategyPriority.HIGHEST;
+                }
+                return StrategyPriority.CANT_HANDLE;
+            }
+            // Also check legacy STORAGE_SNAPSHOT details for backward 
compatibility
+            List<VMSnapshotDetailsVO> legacyDetails = 
vmSnapshotDetailsDao.findDetails(vmSnapshot.getId(), STORAGE_SNAPSHOT);
+            if (CollectionUtils.isNotEmpty(legacyDetails) && 
allVolumesOnOntapManagedStorage(vmSnapshot.getVmId())) {
+                return StrategyPriority.HIGHEST;
+            }
+            return StrategyPriority.CANT_HANDLE;
+        }
+
+        // For new snapshots (Allocated state), check if we can handle this VM
+        // ONTAP only supports disk-only snapshots, not memory snapshots
+        if (allVolumesOnOntapManagedStorage(vmSnapshot.getVmId())) {
+            if (vmSnapshotVO.getType() == VMSnapshot.Type.DiskAndMemory) {
+                logger.debug("canHandle: Memory snapshots (DiskAndMemory) are 
not supported for VMs on ONTAP storage. VMSnapshot [{}]", vmSnapshot.getId());
+                return StrategyPriority.CANT_HANDLE;
+            }
+            return StrategyPriority.HIGHEST;
+        }
+
+        return StrategyPriority.CANT_HANDLE;
+    }
+
+    @Override
+    public StrategyPriority canHandle(Long vmId, Long rootPoolId, boolean 
snapshotMemory) {
+        // ONTAP FlexVolume snapshots only support disk-only 
(crash-consistent) snapshots.
+        // Memory snapshots (snapshotMemory=true) are not supported because:
+        // 1. ONTAP snapshots capture disk state only, not VM memory
+        // 2. Allowing memory snapshots would require falling back to libvirt 
snapshots,
+        //    creating mixed snapshot chains that would cause issues during 
revert
+        // Return CANT_HANDLE so VMSnapshotManagerImpl can provide a clear 
error message.
+        if (snapshotMemory) {
+            logger.debug("canHandle: Memory snapshots (snapshotMemory=true) 
are not supported for VMs on ONTAP storage. VM [{}]", vmId);
+            return StrategyPriority.CANT_HANDLE;
+        }
+
+        if (allVolumesOnOntapManagedStorage(vmId)) {
+            return StrategyPriority.HIGHEST;
+        }
+
+        return StrategyPriority.CANT_HANDLE;
+    }
+
+    /**
+     * Checks whether all volumes of a VM reside on ONTAP managed primary 
storage.
+     */
+    boolean allVolumesOnOntapManagedStorage(long vmId) {
+        UserVm userVm = userVmDao.findById(vmId);
+        if (userVm == null) {
+            logger.debug("allVolumesOnOntapManagedStorage: VM with id [{}] not 
found", vmId);
+            return false;
+        }
+
+        if (!Hypervisor.HypervisorType.KVM.equals(userVm.getHypervisorType())) 
{
+            logger.debug("allVolumesOnOntapManagedStorage: ONTAP VM snapshot 
strategy only supports KVM hypervisor, VM [{}] uses [{}]",
+                    vmId, userVm.getHypervisorType());
+            return false;
+        }
+
+        // ONTAP VM snapshots work for both Running and Stopped VMs.
+        // Running VMs may be frozen/thawed (if quiesce is requested).
+        // Stopped VMs don't need freeze/thaw - just take the FlexVol snapshot 
directly.
+        VirtualMachine.State vmState = userVm.getState();
+        if (!VirtualMachine.State.Running.equals(vmState) && 
!VirtualMachine.State.Stopped.equals(vmState)) {
+            logger.info("allVolumesOnOntapManagedStorage: ONTAP VM snapshot 
strategy requires VM to be Running or Stopped, VM [{}] is in state [{}], 
returning false",
+                    vmId, vmState);
+            return false;
+        }
+
+        List<VolumeVO> volumes = volumeDao.findByInstance(vmId);
+        if (volumes == null || volumes.isEmpty()) {
+            logger.debug("allVolumesOnOntapManagedStorage: No volumes found 
for VM [{}]", vmId);
+            return false;
+        }
+
+        for (VolumeVO volume : volumes) {
+            if (volume.getPoolId() == null) {
+                return false;
+            }
+            StoragePoolVO pool = storagePool.findById(volume.getPoolId());
+            if (pool == null) {
+                return false;
+            }
+            if (!pool.isManaged()) {
+                logger.debug("allVolumesOnOntapManagedStorage: Volume [{}] is 
on non-managed storage pool [{}], not ONTAP",
+                        volume.getId(), pool.getName());
+                return false;
+            }
+            if 
(!OntapStorageConstants.ONTAP_PLUGIN_NAME.equals(pool.getStorageProviderName()))
 {
+                logger.debug("allVolumesOnOntapManagedStorage: Volume [{}] is 
on managed pool [{}] with provider [{}], not ONTAP",
+                        volume.getId(), pool.getName(), 
pool.getStorageProviderName());
+                return false;
+            }
+        }
+
+        logger.debug("allVolumesOnOntapManagedStorage: All volumes of VM [{}] 
are on ONTAP managed storage, this strategy can handle", vmId);
+        return true;
+    }
+
+    // 
──────────────────────────────────────────────────────────────────────────
+    // Take VM Snapshot (FlexVolume-level)
+    // 
──────────────────────────────────────────────────────────────────────────
+
+    /**
+     * Takes a VM-level snapshot by freezing the VM, creating ONTAP 
FlexVolume-level
+     * snapshots (one per unique FlexVolume), and then thawing the VM.
+     *
+     * <p>Volumes are grouped by their parent FlexVolume UUID (from storage 
pool details).
+     * For each unique FlexVolume, exactly one ONTAP snapshot is created via
+     * {@code POST /api/storage/volumes/{uuid}/snapshots}. This means if a VM 
has
+     * ROOT and DATA disks on the same FlexVolume, only one snapshot is 
created.</p>
+     *
+     * <p><b>Memory Snapshots Not Supported:</b> This strategy only supports 
disk-only
+     * (crash-consistent) snapshots. Memory snapshots (snapshotmemory=true) 
are rejected
+     * with a clear error message. This is because ONTAP FlexVolume snapshots 
capture disk
+     * state only, and allowing mixed snapshot chains (ONTAP disk + libvirt 
memory) would
+     * cause issues during revert operations.</p>
+     *
+     * @throws CloudRuntimeException if memory snapshot is requested
+     */
+    @Override
+    public VMSnapshot takeVMSnapshot(VMSnapshot vmSnapshot) {
+        Long hostId = vmSnapshotHelper.pickRunningHost(vmSnapshot.getVmId());
+        UserVm userVm = userVmDao.findById(vmSnapshot.getVmId());
+        VMSnapshotVO vmSnapshotVO = (VMSnapshotVO) vmSnapshot;
+
+        // Transition to Creating state FIRST - this is required so that the 
finally block
+        // can properly transition to Error state via OperationFailed event if 
anything fails.
+        // (OperationFailed can only transition FROM Creating state, not from 
Allocated)
+        try {
+            vmSnapshotHelper.vmSnapshotStateTransitTo(vmSnapshotVO, 
VMSnapshot.Event.CreateRequested);
+        } catch (NoTransitionException e) {
+            throw new CloudRuntimeException(e.getMessage());
+        }
+
+        FreezeThawVMAnswer freezeAnswer = null;
+        FreezeThawVMCommand thawCmd = null;
+        FreezeThawVMAnswer thawAnswer = null;
+        long startFreeze = 0;
+
+        // Track which FlexVolume snapshots were created (for rollback)
+        List<FlexVolSnapshotDetail> createdSnapshots = new ArrayList<>();
+
+        boolean result = false;
+        try {
+            GuestOSVO guestOS = guestOSDao.findById(userVm.getGuestOSId());
+            List<VolumeObjectTO> volumeTOs = 
vmSnapshotHelper.getVolumeTOList(userVm.getId());
+
+            long prev_chain_size = 0;
+            long virtual_size = 0;
+
+            // Build snapshot parent chain
+            VMSnapshotTO current = null;
+            VMSnapshotVO currentSnapshot = 
vmSnapshotDao.findCurrentSnapshotByVmId(userVm.getId());
+            if (currentSnapshot != null) {
+                current = 
vmSnapshotHelper.getSnapshotWithParents(currentSnapshot);
+            }
+
+            // Respect the user's quiesce option from the VM snapshot request
+            boolean quiescevm = true; // default to true for safety
+            VMSnapshotOptions options = vmSnapshotVO.getOptions();
+            if (options != null) {
+                quiescevm = options.needQuiesceVM();
+            }
+
+            // Check if VM is actually running - freeze/thaw only makes sense 
for running VMs
+            boolean vmIsRunning = 
VirtualMachine.State.Running.equals(userVm.getState());
+            boolean shouldFreezeThaw = quiescevm && vmIsRunning;
+
+            if (!vmIsRunning) {
+                logger.info("takeVMSnapshot: VM [{}] is in state [{}] (not 
Running). Skipping freeze/thaw - " +
+                        "FlexVolume snapshot will be taken directly.", 
userVm.getInstanceName(), userVm.getState());
+            } else if (quiescevm) {
+                logger.info("takeVMSnapshot: Quiesce option is enabled for 
ONTAP VM Snapshot of VM [{}]. " +
+                        "VM file systems will be frozen/thawed for 
application-consistent snapshots.", userVm.getInstanceName());
+            } else {
+                logger.info("takeVMSnapshot: Quiesce option is disabled for 
ONTAP VM Snapshot of VM [{}]. " +
+                        "Snapshots will be crash-consistent only.", 
userVm.getInstanceName());
+            }
+
+            VMSnapshotTO target = new VMSnapshotTO(vmSnapshot.getId(), 
vmSnapshot.getName(),
+                    vmSnapshot.getType(), null, vmSnapshot.getDescription(), 
false, current, quiescevm);
+
+            if (current == null) {
+                vmSnapshotVO.setParent(null);
+            } else {
+                vmSnapshotVO.setParent(current.getId());
+            }
+
+            CreateVMSnapshotCommand ccmd = new CreateVMSnapshotCommand(
+                    userVm.getInstanceName(), userVm.getUuid(), target, 
volumeTOs, guestOS.getDisplayName());
+
+            logger.info("takeVMSnapshot: Creating ONTAP FlexVolume VM Snapshot 
for VM [{}] with quiesce={}", userVm.getInstanceName(), quiescevm);
+
+            // Prepare volume info list and calculate sizes
+            for (VolumeObjectTO volumeObjectTO : volumeTOs) {
+                virtual_size += volumeObjectTO.getSize();
+                VolumeVO volumeVO = volumeDao.findById(volumeObjectTO.getId());
+                prev_chain_size += volumeVO.getVmSnapshotChainSize() == null ? 
0 : volumeVO.getVmSnapshotChainSize();
+            }
+
+            // ── Group volumes by FlexVolume UUID ──
+            Map<String, FlexVolGroupInfo> flexVolGroups = 
groupVolumesByFlexVol(volumeTOs);
+
+            logger.info("takeVMSnapshot: VM [{}] has {} volumes across {} 
unique FlexVolume(s)",
+                    userVm.getInstanceName(), volumeTOs.size(), 
flexVolGroups.size());
+
+            // ── Step 1: Freeze the VM (only if quiescing is requested AND VM 
is running) ──
+            if (shouldFreezeThaw) {
+                FreezeThawVMCommand freezeCommand = new 
FreezeThawVMCommand(userVm.getInstanceName());
+                freezeCommand.setOption(FreezeThawVMCommand.FREEZE);
+                freezeAnswer = (FreezeThawVMAnswer) agentMgr.send(hostId, 
freezeCommand);
+                startFreeze = System.nanoTime();
+
+                thawCmd = new FreezeThawVMCommand(userVm.getInstanceName());
+                thawCmd.setOption(FreezeThawVMCommand.THAW);
+
+                if (freezeAnswer == null || !freezeAnswer.getResult()) {
+                    String detail = (freezeAnswer != null) ? 
freezeAnswer.getDetails() : "no response from agent";
+                    throw new CloudRuntimeException("Could not freeze VM [" + 
userVm.getInstanceName() +
+                            "] for ONTAP snapshot. Ensure qemu-guest-agent is 
installed and running. Details: " + detail);
+                }
+
+                logger.info("takeVMSnapshot: VM [{}] frozen successfully via 
QEMU guest agent", userVm.getInstanceName());
+            } else {
+                logger.info("takeVMSnapshot: Skipping VM freeze for VM [{}] 
(quiesce={}, vmIsRunning={})",
+                        userVm.getInstanceName(), quiescevm, vmIsRunning);
+            }
+
+            // ── Step 2: Create FlexVolume-level snapshots ──
+            try {
+                String snapshotNameBase = buildSnapshotName(vmSnapshot);
+
+                for (Map.Entry<String, FlexVolGroupInfo> entry : 
flexVolGroups.entrySet()) {
+                    String flexVolUuid = entry.getKey();
+                    FlexVolGroupInfo groupInfo = entry.getValue();
+                    long startSnapshot = System.nanoTime();
+
+                    // Build storage strategy from pool details to get the 
feign client
+                    StorageStrategy storageStrategy = 
OntapStorageUtils.getStrategyByStoragePoolDetails(groupInfo.poolDetails);
+                    SnapshotFeignClient snapshotClient = 
storageStrategy.getSnapshotFeignClient();
+                    String authHeader = storageStrategy.getAuthHeader();
+
+                    // Use the same snapshot name for all FlexVolumes in this 
VM snapshot
+                    // (each FlexVolume gets its own independent snapshot with 
this name)
+                    FlexVolSnapshot snapshotRequest = new 
FlexVolSnapshot(snapshotNameBase,
+                            "CloudStack VM snapshot " + vmSnapshot.getName() + 
" for VM " + userVm.getInstanceName());
+
+                    logger.info("takeVMSnapshot: Creating ONTAP FlexVolume 
snapshot [{}] on FlexVol UUID [{}] covering {} volume(s)",
+                            snapshotNameBase, flexVolUuid, 
groupInfo.volumeIds.size());
+
+                    JobResponse jobResponse = 
snapshotClient.createSnapshot(authHeader, flexVolUuid, snapshotRequest);
+                    if (jobResponse == null || jobResponse.getJob() == null) {
+                        throw new CloudRuntimeException("Failed to initiate 
FlexVolume snapshot on FlexVol UUID [" + flexVolUuid + "]");
+                    }
+
+                    // Poll for job completion
+                    Boolean jobSucceeded = 
storageStrategy.jobPollForSuccess(jobResponse.getJob().getUuid(), 30, 2);
+                    if (!jobSucceeded) {
+                        throw new CloudRuntimeException("FlexVolume snapshot 
job failed on FlexVol UUID [" + flexVolUuid + "]");
+                    }
+
+                    // Retrieve the created snapshot UUID by name
+                    String snapshotUuid = resolveSnapshotUuid(snapshotClient, 
authHeader, flexVolUuid, snapshotNameBase);
+
+                    String protocol = 
groupInfo.poolDetails.get(OntapStorageConstants.PROTOCOL);
+
+                    // Create one detail per CloudStack volume in this FlexVol 
group (for single-file restore during revert)
+                    for (Long volumeId : groupInfo.volumeIds) {
+                        String volumePath = resolveVolumePathOnOntap(volumeId, 
protocol, groupInfo.poolDetails);
+                        FlexVolSnapshotDetail detail = new 
FlexVolSnapshotDetail(
+                                flexVolUuid, snapshotUuid, snapshotNameBase, 
volumePath, groupInfo.poolId, protocol);
+                        createdSnapshots.add(detail);
+                    }
+
+                    logger.info("takeVMSnapshot: ONTAP FlexVolume snapshot 
[{}] (uuid={}) on FlexVol [{}] completed in {} ms. Covers volumes: {}",
+                            snapshotNameBase, snapshotUuid, flexVolUuid,
+                            TimeUnit.MILLISECONDS.convert(System.nanoTime() - 
startSnapshot, TimeUnit.NANOSECONDS),
+                            groupInfo.volumeIds);
+                }
+            } finally {
+                // ── Step 3: Thaw the VM (only if it was frozen, always even 
on error) ──
+                if (quiescevm && freezeAnswer != null && 
freezeAnswer.getResult()) {
+                    try {
+                        thawAnswer = (FreezeThawVMAnswer) 
agentMgr.send(hostId, thawCmd);
+                        if (thawAnswer != null && thawAnswer.getResult()) {
+                            logger.info("takeVMSnapshot: VM [{}] thawed 
successfully. Total freeze duration: {} ms",
+                                    userVm.getInstanceName(),
+                                    
TimeUnit.MILLISECONDS.convert(System.nanoTime() - startFreeze, 
TimeUnit.NANOSECONDS));
+                        } else {
+                            logger.warn("takeVMSnapshot: Failed to thaw VM 
[{}]: {}", userVm.getInstanceName(),
+                                    (thawAnswer != null) ? 
thawAnswer.getDetails() : "no response");
+                        }
+                    } catch (Exception thawEx) {
+                        logger.error("takeVMSnapshot: Exception while thawing 
VM [{}]: {}", userVm.getInstanceName(), thawEx.getMessage(), thawEx);
+                    }
+                }
+            }
+
+            // ── Step 4: Persist FlexVolume snapshot details (one row per 
CloudStack volume) ──
+            for (FlexVolSnapshotDetail detail : createdSnapshots) {
+                vmSnapshotDetailsDao.persist(new VMSnapshotDetailsVO(
+                        vmSnapshot.getId(), 
OntapStorageConstants.ONTAP_FLEXVOL_SNAPSHOT, detail.toString(), true));
+            }
+
+            // ── Step 5: Finalize via parent processAnswer ──
+            CreateVMSnapshotAnswer answer = new CreateVMSnapshotAnswer(ccmd, 
true, "");
+            answer.setVolumeTOs(volumeTOs);
+
+            processAnswer(vmSnapshotVO, userVm, answer, null);
+            logger.info("takeVMSnapshot: ONTAP FlexVolume VM Snapshot [{}] 
created successfully for VM [{}] ({} FlexVol snapshot(s))",
+                    vmSnapshot.getName(), userVm.getInstanceName(), 
createdSnapshots.size());
+
+            long new_chain_size = 0;
+            for (VolumeObjectTO volumeTo : answer.getVolumeTOs()) {
+                publishUsageEvent(EventTypes.EVENT_VM_SNAPSHOT_CREATE, 
vmSnapshot, userVm, volumeTo);
+                new_chain_size += volumeTo.getSize();
+            }
+            publishUsageEvent(EventTypes.EVENT_VM_SNAPSHOT_ON_PRIMARY, 
vmSnapshot, userVm,
+                    new_chain_size - prev_chain_size, virtual_size);
+
+            result = true;
+            return vmSnapshot;
+
+        } catch (OperationTimedoutException e) {
+            logger.error("takeVMSnapshot: ONTAP VM Snapshot [{}] timed out: 
{}", vmSnapshot.getName(), e.getMessage());
+            throw new CloudRuntimeException("Creating Instance Snapshot: " + 
vmSnapshot.getName() + " timed out: " + e.getMessage());
+        } catch (AgentUnavailableException e) {
+            logger.error("takeVMSnapshot: ONTAP VM Snapshot [{}] failed, agent 
unavailable: {}", vmSnapshot.getName(), e.getMessage());
+            throw new CloudRuntimeException("Creating Instance Snapshot: " + 
vmSnapshot.getName() + " failed: " + e.getMessage());
+        } catch (CloudRuntimeException e) {
+            throw e;
+        } finally {
+            if (!result) {
+                // Rollback all FlexVolume snapshots created so far 
(deduplicate by FlexVol+Snapshot)
+                Map<String, Boolean> rolledBack = new HashMap<>();
+                for (FlexVolSnapshotDetail detail : createdSnapshots) {
+                    String dedupeKey = detail.flexVolUuid + "::" + 
detail.snapshotUuid;
+                    if (!rolledBack.containsKey(dedupeKey)) {
+                        try {
+                            rollbackFlexVolSnapshot(detail);
+                            rolledBack.put(dedupeKey, Boolean.TRUE);
+                        } catch (Exception rollbackEx) {
+                            logger.error("takeVMSnapshot: Failed to rollback 
FlexVol snapshot [{}] on FlexVol [{}]: {}",
+                                    detail.snapshotUuid, detail.flexVolUuid, 
rollbackEx.getMessage());
+                        }
+                    }
+                }
+
+                // Ensure VM is thawed if we haven't done so
+                if (thawAnswer == null && freezeAnswer != null && 
freezeAnswer.getResult()) {
+                    try {
+                        logger.info("takeVMSnapshot: Thawing VM [{}] during 
error cleanup", userVm.getInstanceName());
+                        thawAnswer = (FreezeThawVMAnswer) 
agentMgr.send(hostId, thawCmd);
+                    } catch (Exception ex) {
+                        logger.error("takeVMSnapshot: Could not thaw VM during 
cleanup: {}", ex.getMessage());
+                    }
+                }
+
+                // Clean up VM snapshot details and transition state
+                try {
+                    List<VMSnapshotDetailsVO> vmSnapshotDetails = 
vmSnapshotDetailsDao.listDetails(vmSnapshot.getId());
+                    for (VMSnapshotDetailsVO detail : vmSnapshotDetails) {
+                        if 
(OntapStorageConstants.ONTAP_FLEXVOL_SNAPSHOT.equals(detail.getName())) {
+                            vmSnapshotDetailsDao.remove(detail.getId());
+                        }
+                    }
+                    vmSnapshotHelper.vmSnapshotStateTransitTo(vmSnapshot, 
VMSnapshot.Event.OperationFailed);
+                } catch (NoTransitionException e1) {
+                    logger.error("takeVMSnapshot: Cannot set VM Snapshot state 
to OperationFailed: {}", e1.getMessage());
+                }
+            }
+        }
+    }
+
+    // 
──────────────────────────────────────────────────────────────────────────
+    // Delete VM Snapshot
+    // 
──────────────────────────────────────────────────────────────────────────
+
+    @Override
+    public boolean deleteVMSnapshot(VMSnapshot vmSnapshot) {
+        VMSnapshotVO vmSnapshotVO = (VMSnapshotVO) vmSnapshot;
+        UserVm userVm = userVmDao.findById(vmSnapshot.getVmId());
+
+        try {
+            vmSnapshotHelper.vmSnapshotStateTransitTo(vmSnapshotVO, 
VMSnapshot.Event.ExpungeRequested);
+        } catch (NoTransitionException e) {
+            throw new CloudRuntimeException(e.getMessage());
+        }
+
+        try {
+            List<VolumeObjectTO> volumeTOs = 
vmSnapshotHelper.getVolumeTOList(userVm.getId());
+            String vmInstanceName = userVm.getInstanceName();
+            VMSnapshotTO parent = 
vmSnapshotHelper.getSnapshotWithParents(vmSnapshotVO).getParent();
+
+            VMSnapshotTO vmSnapshotTO = new VMSnapshotTO(vmSnapshotVO.getId(), 
vmSnapshotVO.getName(), vmSnapshotVO.getType(),
+                    vmSnapshotVO.getCreated().getTime(), 
vmSnapshotVO.getDescription(), vmSnapshotVO.getCurrent(), parent, true);
+            GuestOSVO guestOS = guestOSDao.findById(userVm.getGuestOSId());
+            DeleteVMSnapshotCommand deleteSnapshotCommand = new 
DeleteVMSnapshotCommand(vmInstanceName, vmSnapshotTO,
+                    volumeTOs, guestOS.getDisplayName());
+
+            // Check for FlexVolume snapshots (new approach)
+            List<VMSnapshotDetailsVO> flexVolDetails = 
vmSnapshotDetailsDao.findDetails(vmSnapshot.getId(), 
OntapStorageConstants.ONTAP_FLEXVOL_SNAPSHOT);
+            if (CollectionUtils.isNotEmpty(flexVolDetails)) {
+                deleteFlexVolSnapshots(flexVolDetails);
+            }
+
+            // Also handle legacy STORAGE_SNAPSHOT details (backward 
compatibility)
+            List<VMSnapshotDetailsVO> legacyDetails = 
vmSnapshotDetailsDao.findDetails(vmSnapshot.getId(), STORAGE_SNAPSHOT);
+            if (CollectionUtils.isNotEmpty(legacyDetails)) {
+                deleteDiskSnapshot(vmSnapshot);
+            }
+
+            processAnswer(vmSnapshotVO, userVm, new 
DeleteVMSnapshotAnswer(deleteSnapshotCommand, volumeTOs), null);
+            long full_chain_size = 0;
+            for (VolumeObjectTO volumeTo : volumeTOs) {
+                publishUsageEvent(EventTypes.EVENT_VM_SNAPSHOT_DELETE, 
vmSnapshot, userVm, volumeTo);
+                full_chain_size += volumeTo.getSize();
+            }
+            publishUsageEvent(EventTypes.EVENT_VM_SNAPSHOT_OFF_PRIMARY, 
vmSnapshot, userVm, full_chain_size, 0L);
+            return true;
+        } catch (CloudRuntimeException err) {
+            String errMsg = String.format("Delete of ONTAP VM Snapshot [%s] of 
VM [%s] failed: %s",
+                    vmSnapshot.getName(), userVm.getInstanceName(), 
err.getMessage());
+            logger.error(errMsg, err);
+            throw new CloudRuntimeException(errMsg, err);
+        }
+    }
+
+    // 
──────────────────────────────────────────────────────────────────────────
+    // Revert VM Snapshot
+    // 
──────────────────────────────────────────────────────────────────────────
+
+    @Override
+    public boolean revertVMSnapshot(VMSnapshot vmSnapshot) {
+        VMSnapshotVO vmSnapshotVO = (VMSnapshotVO) vmSnapshot;
+        UserVm userVm = userVmDao.findById(vmSnapshot.getVmId());
+
+        try {
+            vmSnapshotHelper.vmSnapshotStateTransitTo(vmSnapshotVO, 
VMSnapshot.Event.RevertRequested);
+        } catch (NoTransitionException e) {
+            throw new CloudRuntimeException(e.getMessage());
+        }
+
+        boolean result = false;
+        try {
+            List<VolumeObjectTO> volumeTOs = 
vmSnapshotHelper.getVolumeTOList(userVm.getId());
+            String vmInstanceName = userVm.getInstanceName();
+            VMSnapshotTO parent = 
vmSnapshotHelper.getSnapshotWithParents(vmSnapshotVO).getParent();
+
+            VMSnapshotTO vmSnapshotTO = new VMSnapshotTO(vmSnapshotVO.getId(), 
vmSnapshotVO.getName(), vmSnapshotVO.getType(),
+                    vmSnapshotVO.getCreated().getTime(), 
vmSnapshotVO.getDescription(), vmSnapshotVO.getCurrent(), parent, true);
+            GuestOSVO guestOS = guestOSDao.findById(userVm.getGuestOSId());
+            RevertToVMSnapshotCommand revertToSnapshotCommand = new 
RevertToVMSnapshotCommand(vmInstanceName,
+                    userVm.getUuid(), vmSnapshotTO, volumeTOs, 
guestOS.getDisplayName());
+
+            // Check for FlexVolume snapshots (new approach)
+            List<VMSnapshotDetailsVO> flexVolDetails = 
vmSnapshotDetailsDao.findDetails(vmSnapshot.getId(), 
OntapStorageConstants.ONTAP_FLEXVOL_SNAPSHOT);
+            if (CollectionUtils.isNotEmpty(flexVolDetails)) {
+                revertFlexVolSnapshots(flexVolDetails);
+            }
+
+            // Also handle legacy STORAGE_SNAPSHOT details (backward 
compatibility)

Review Comment:
   Yes, this will be required as part of enabling snapshot revert support 
within the plugin in future.



##########
plugins/storage/volume/ontap/src/main/java/org/apache/cloudstack/storage/driver/OntapPrimaryDatastoreDriver.java:
##########
@@ -98,14 +349,231 @@ public ChapInfo getChapInfo(DataObject dataObject) {
         return null;
     }
 
+    /**
+     * Grants a host access to a volume.
+     */
     @Override
     public boolean grantAccess(DataObject dataObject, Host host, DataStore 
dataStore) {
-       return false;
+        try {
+            if (dataStore == null) {
+                throw new InvalidParameterValueException("dataStore should not 
be null");
+            }
+            if (dataObject == null) {
+                throw new InvalidParameterValueException("dataObject should 
not be null");
+            }
+            if (host == null) {
+                throw new InvalidParameterValueException("host should not be 
null");
+            }

Review Comment:
   Same as above, please suggest?



##########
plugins/storage/volume/ontap/src/main/java/org/apache/cloudstack/storage/service/UnifiedSANStrategy.java:
##########
@@ -286,24 +370,226 @@ public AccessGroup getAccessGroup(Map<String, String> 
values) {
             AccessGroup accessGroup = new AccessGroup();
             accessGroup.setIgroup(igroup);
             return accessGroup;
-        } catch (Exception e) {
-            String errMsg = e.getMessage();
-            if (errMsg != null && errMsg.contains("not found")) {
-                logger.warn("getAccessGroup: Igroup '{}' not found on SVM '{}' 
({}). Returning null.", igroupName, svmName, errMsg);
+        } catch (FeignException e) {
+            if (e.status() == 404) {
+                logger.warn("getAccessGroup: Igroup '{}' not found on SVM '{}' 
(status 404). Returning null.", igroupName, svmName);
                 return null;
             }
-            logger.error("Exception occurred while fetching Igroup, Exception: 
{}", errMsg);
-            throw new CloudRuntimeException("Failed to fetch Igroup details: " 
+ errMsg);
+            logger.error("FeignException occurred while fetching Igroup, 
Status: {}, Exception: {}", e.status(), e.getMessage());
+            throw new CloudRuntimeException("Failed to fetch Igroup details: " 
+ e.getMessage());
+        } catch (Exception e) {
+            logger.error("Exception occurred while fetching Igroup, Exception: 
{}", e.getMessage());
+            throw new CloudRuntimeException("Failed to fetch Igroup details: " 
+ e.getMessage());
         }
     }
 
-    public Map<String, String> enableLogicalAccess(Map<String, String> values) 
{
-        return null;
+    public String enableLogicalAccess(Map<String, String> values) {
+        logger.info("enableLogicalAccess : Create LunMap");

Review Comment:
   Yes corrected



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to