The following pull request was submitted through Github.
It can be accessed and reviewed at: https://github.com/lxc/lxd/pull/3004

This e-mail was sent by the LXC bot, direct replies will not reach the author
unless they happen to be subscribed to this list.

=== Description (from pull-request) ===

From c15c48df425b49760f71a0472fe8185db64f7271 Mon Sep 17 00:00:00 2001
From: Christian Brauner <[email protected]>
Date: Fri, 3 Mar 2017 12:32:22 +0100
Subject: [PATCH 1/2] zfs: start to fade out methods

Replace them with more generic functions that can be called in other places as
well (e.g. patches.go, api_internal.go etc.).

Signed-off-by: Christian Brauner <[email protected]>
---
 lxd/api_internal.go |   5 +-
 lxd/storage.go      |   9 ++++
 lxd/storage_zfs.go  | 140 +++++++++++++++++++++++++++++++++++++---------------
 3 files changed, 113 insertions(+), 41 deletions(-)

diff --git a/lxd/api_internal.go b/lxd/api_internal.go
index bc9ca00..fff45b8 100644
--- a/lxd/api_internal.go
+++ b/lxd/api_internal.go
@@ -160,11 +160,12 @@ func internalImport(d *Daemon, r *http.Request) Response {
 
                switch pool.Driver {
                case "zfs":
-                       err = zfsMount(poolName, containerSubString[1:])
+                       dataset := fmt.Sprintf("%s/%s", poolName, 
containerSubString[1:])
+                       err = zfsPoolVolumeMount(dataset)
                        if err != nil {
                                return InternalError(err)
                        }
-                       defer zfsUmount(poolName, containerSubString[1:])
+                       defer zfsPoolVolumeUmount(dataset)
                case "lvm":
                        containerLvmName := containerNameToLVName(name)
                        containerLvmPath := getLvmDevPath(poolName, 
storagePoolVolumeApiEndpointContainers, containerLvmName)
diff --git a/lxd/storage.go b/lxd/storage.go
index 188037c..920ac18 100644
--- a/lxd/storage.go
+++ b/lxd/storage.go
@@ -32,6 +32,8 @@ var lxdStorageMapLock sync.Mutex
 
 // The following functions are used to construct simple operation codes that 
are
 // unique.
+//
+// Pool lock IDs.
 func getPoolMountLockID(poolName string) string {
        return fmt.Sprintf("mount/pool/%s", poolName)
 }
@@ -40,10 +42,16 @@ func getPoolUmountLockID(poolName string) string {
        return fmt.Sprintf("umount/pool/%s", poolName)
 }
 
+// Image lock IDs.
 func getImageCreateLockID(poolName string, fingerprint string) string {
        return fmt.Sprintf("create/image/%s/%s", poolName, fingerprint)
 }
 
+func getImageDeleteLockID(poolName string, fingerprint string) string {
+       return fmt.Sprintf("delete/image/%s/%s", poolName, fingerprint)
+}
+
+// Container lock IDs.
 func getContainerMountLockID(poolName string, containerName string) string {
        return fmt.Sprintf("mount/container/%s/%s", poolName, containerName)
 }
@@ -52,6 +60,7 @@ func getContainerUmountLockID(poolName string, containerName 
string) string {
        return fmt.Sprintf("umount/container/%s/%s", poolName, containerName)
 }
 
+// Custom lock IDs.
 func getCustomMountLockID(poolName string, volumeName string) string {
        return fmt.Sprintf("mount/custom/%s/%s", poolName, volumeName)
 }
diff --git a/lxd/storage_zfs.go b/lxd/storage_zfs.go
index a3f4187..8daaa66 100644
--- a/lxd/storage_zfs.go
+++ b/lxd/storage_zfs.go
@@ -490,8 +490,8 @@ func (s *storageZfs) ContainerUmount(name string, path 
string) (bool, error) {
 // Things we do have to care about
 func (s *storageZfs) ContainerStorageReady(name string) bool {
        poolName := s.getOnDiskPoolName()
-       fs := fmt.Sprintf("%s/containers/%s", poolName, name)
-       return s.zfsFilesystemEntityExists(fs, false)
+       containerDataset := fmt.Sprintf("%s/containers/%s", poolName, name)
+       return zfsFilesystemEntityExists(containerDataset)
 }
 
 func (s *storageZfs) ContainerCreate(container container) error {
@@ -545,7 +545,9 @@ func (s *storageZfs) ContainerCreateFromImage(container 
container, fingerprint s
        fs := fmt.Sprintf("containers/%s", containerName)
        containerPoolVolumeMntPoint := getContainerMountPoint(s.pool.Name, 
containerName)
 
-       fsImage := fmt.Sprintf("images/%s", fingerprint)
+       imageNoPoolName := fmt.Sprintf("images/%s", fingerprint)
+       poolName := s.getOnDiskPoolName()
+       imageDataset := fmt.Sprintf("%s/images/%s", poolName, fingerprint)
 
        imageStoragePoolLockID := getImageCreateLockID(s.pool.Name, fingerprint)
        lxdStorageMapLock.Lock()
@@ -559,7 +561,7 @@ func (s *storageZfs) ContainerCreateFromImage(container 
container, fingerprint s
                lxdStorageMapLock.Unlock()
 
                var imgerr error
-               if !s.zfsFilesystemEntityExists(fsImage, true) {
+               if !zfsFilesystemEntityExists(imageDataset) {
                        imgerr = s.ImageCreate(fingerprint)
                }
 
@@ -575,7 +577,7 @@ func (s *storageZfs) ContainerCreateFromImage(container 
container, fingerprint s
                }
        }
 
-       err := s.zfsPoolVolumeClone(fsImage, "readonly", fs, 
containerPoolVolumeMntPoint)
+       err := s.zfsPoolVolumeClone(imageNoPoolName, "readonly", fs, 
containerPoolVolumeMntPoint)
        if err != nil {
                return err
        }
@@ -1249,18 +1251,13 @@ func (s *storageZfs) 
ContainerSnapshotCreateEmpty(snapshotContainer container) e
        return nil
 }
 
-// - create temporary directory ${LXD_DIR}/images/lxd_images_
-// - create new zfs volume images/<fingerprint>
-// - mount the zfs volume on ${LXD_DIR}/images/lxd_images_
-// - unpack the downloaded image in ${LXD_DIR}/images/lxd_images_
-// - mark new zfs volume images/<fingerprint> readonly
-// - remove mountpoint property from zfs volume images/<fingerprint>
-// - create read-write snapshot from zfs volume images/<fingerprint>
 func (s *storageZfs) ImageCreate(fingerprint string) error {
        shared.LogDebugf("Creating ZFS storage volume for image \"%s\" on 
storage pool \"%s\".", fingerprint, s.pool.Name)
 
-       imageMntPoint := getImageMountPoint(s.pool.Name, fingerprint)
-       fs := fmt.Sprintf("images/%s", fingerprint)
+       poolName := s.getOnDiskPoolName()
+       imageBaseDataset := fmt.Sprintf("images/%s", fingerprint)
+       imageDataset := fmt.Sprintf("%s/%s", poolName, imageBaseDataset)
+
        revert := true
        subrevert := true
 
@@ -1275,8 +1272,9 @@ func (s *storageZfs) ImageCreate(fingerprint string) 
error {
                s.deleteImageDbPoolVolume(fingerprint)
        }()
 
-       if s.zfsFilesystemEntityExists(fmt.Sprintf("deleted/%s", fs), true) {
-               err := s.zfsPoolVolumeRename(fmt.Sprintf("deleted/%s", fs), fs)
+       imageDeletedDataset := fmt.Sprintf("%s/deleted/%s", poolName, 
imageBaseDataset)
+       if zfsFilesystemEntityExists(imageDeletedDataset) {
+               err := zfsPoolVolumeRename(imageDeletedDataset, imageDataset)
                if err != nil {
                        return err
                }
@@ -1290,16 +1288,16 @@ func (s *storageZfs) ImageCreate(fingerprint string) 
error {
 
                // In case this is an image from an older lxd instance, wipe the
                // mountpoint.
-               err = s.zfsPoolVolumeSet(fs, "mountpoint", "none")
+               err = zfsPoolVolumeSet(imageDataset, "mountpoint", "none")
                if err != nil {
                        return err
                }
 
                revert = false
-
                return nil
        }
 
+       imageMntPoint := getImageMountPoint(s.pool.Name, fingerprint)
        if !shared.PathExists(imageMntPoint) {
                err := os.MkdirAll(imageMntPoint, 0700)
                if err != nil {
@@ -1324,7 +1322,7 @@ func (s *storageZfs) ImageCreate(fingerprint string) 
error {
        imagePath := shared.VarPath("images", fingerprint)
 
        // Create a new storage volume on the storage pool for the image.
-       err = s.zfsPoolVolumeCreate(fs)
+       err = zfsPoolVolumeCreate(imageDataset)
        if err != nil {
                return err
        }
@@ -1337,14 +1335,14 @@ func (s *storageZfs) ImageCreate(fingerprint string) 
error {
        }()
 
        // Set a temporary mountpoint for the image.
-       err = s.zfsPoolVolumeSet(fs, "mountpoint", tmpImageDir)
+       err = zfsPoolVolumeSet(imageDataset, "mountpoint", tmpImageDir)
        if err != nil {
                return err
        }
 
        // Make sure that the image actually got mounted.
        if !shared.IsMountPoint(tmpImageDir) {
-               s.zfsPoolVolumeMount(fs)
+               zfsPoolVolumeMount(imageDataset)
        }
 
        // Unpack the image into the temporary mountpoint.
@@ -1354,25 +1352,25 @@ func (s *storageZfs) ImageCreate(fingerprint string) 
error {
        }
 
        // Mark the new storage volume for the image as readonly.
-       err = s.zfsPoolVolumeSet(fs, "readonly", "on")
+       err = zfsPoolVolumeSet(imageDataset, "readonly", "on")
        if err != nil {
                return err
        }
 
        // Remove the temporary mountpoint from the image storage volume.
-       err = s.zfsPoolVolumeSet(fs, "mountpoint", "none")
+       err = zfsPoolVolumeSet(imageDataset, "mountpoint", "none")
        if err != nil {
                return err
        }
 
        // Make sure that the image actually got unmounted.
        if shared.IsMountPoint(tmpImageDir) {
-               s.zfsPoolVolumeUmount(fs)
+               zfsPoolVolumeUmount(imageDataset)
        }
 
        // Create a snapshot of that image on the storage pool which we clone 
for
        // container creation.
-       err = s.zfsPoolVolumeSnapshotCreate(fs, "readonly")
+       err = zfsPoolVolumeSnapshotCreate(imageDataset, "readonly")
        if err != nil {
                return err
        }
@@ -1659,6 +1657,16 @@ func (s *storageZfs) zfsPoolVolumeClone(source string, 
name string, dest string,
        return nil
 }
 
+func zfsPoolVolumeCreate(dataset string) error {
+       msg, err := exec.Command("zfs", "create", "-p", 
dataset).CombinedOutput()
+       if err != nil {
+               shared.LogErrorf("Failed to create ZFS dataset: %s.", 
string(msg))
+               return err
+       }
+
+       return nil
+}
+
 func (s *storageZfs) zfsPoolVolumeCreate(path string) error {
        poolName := s.getOnDiskPoolName()
        output, err := exec.Command(
@@ -1833,6 +1841,36 @@ func (s *storageZfs) zfsFilesystemEntityPropertyGet(path 
string, key string, pre
        return strings.TrimRight(string(output), "\n"), nil
 }
 
+func zfsPoolVolumeRename(sourceDataset string, destDataset string) error {
+       var err error
+       var output []byte
+
+       for i := 0; i < 20; i++ {
+               output, err = exec.Command(
+                       "zfs",
+                       "rename",
+                       "-p",
+                       sourceDataset,
+                       destDataset).CombinedOutput()
+
+               // Success
+               if err == nil {
+                       return nil
+               }
+
+               // zfs rename can fail because of descendants, yet still manage 
the rename
+               if !zfsFilesystemEntityExists(sourceDataset) && 
zfsFilesystemEntityExists(destDataset) {
+                       return nil
+               }
+
+               time.Sleep(500 * time.Millisecond)
+       }
+
+       // Timeout
+       shared.LogErrorf("zfs rename failed: %s.", string(output))
+       return err
+}
+
 func (s *storageZfs) zfsPoolVolumeRename(source string, dest string) error {
        var err error
        var output []byte
@@ -1864,6 +1902,20 @@ func (s *storageZfs) zfsPoolVolumeRename(source string, 
dest string) error {
        return fmt.Errorf("Failed to rename ZFS filesystem: %s", output)
 }
 
+func zfsPoolVolumeSet(dataset string, key string, value string) error {
+       output, err := exec.Command(
+               "zfs",
+               "set",
+               fmt.Sprintf("%s=%s", key, value),
+               dataset).CombinedOutput()
+       if err != nil {
+               shared.LogErrorf("Setting ZFS property failed: %s.", 
string(output))
+               return err
+       }
+
+       return nil
+}
+
 func (s *storageZfs) zfsPoolVolumeSet(path string, key string, value string) 
error {
        poolName := s.getOnDiskPoolName()
        output, err := exec.Command(
@@ -1879,6 +1931,20 @@ func (s *storageZfs) zfsPoolVolumeSet(path string, key 
string, value string) err
        return nil
 }
 
+func zfsPoolVolumeSnapshotCreate(dataset string, name string) error {
+       msg, err := exec.Command(
+               "zfs",
+               "snapshot",
+               "-r",
+               fmt.Sprintf("%s@%s", dataset, name)).CombinedOutput()
+       if err != nil {
+               shared.LogErrorf("zfs snapshot failed: %s.", string(msg))
+               return err
+       }
+
+       return nil
+}
+
 func (s *storageZfs) zfsPoolVolumeSnapshotCreate(path string, name string) 
error {
        poolName := s.getOnDiskPoolName()
        output, err := exec.Command(
@@ -1964,36 +2030,32 @@ func (s *storageZfs) zfsPoolVolumeSnapshotRename(path 
string, oldName string, ne
        return nil
 }
 
-func zfsMount(poolName string, path string) error {
-       output, err := tryExec(
-               "zfs",
-               "mount",
-               fmt.Sprintf("%s/%s", poolName, path))
+func zfsPoolVolumeMount(dataset string) error {
+       output, err := tryExec("zfs", "mount", dataset)
        if err != nil {
-               return fmt.Errorf("Failed to mount ZFS filesystem: %s", output)
+               return fmt.Errorf("Failed to mount ZFS filesystem: %s.", 
string(output))
        }
 
        return nil
 }
 
 func (s *storageZfs) zfsPoolVolumeMount(path string) error {
-       return zfsMount(s.getOnDiskPoolName(), path)
+       dataset := fmt.Sprintf("%s/%s", s.getOnDiskPoolName(), path)
+       return zfsPoolVolumeMount(dataset)
 }
 
-func zfsUmount(poolName string, path string) error {
-       output, err := tryExec(
-               "zfs",
-               "unmount",
-               fmt.Sprintf("%s/%s", poolName, path))
+func zfsPoolVolumeUmount(dataset string) error {
+       msg, err := tryExec("zfs", "unmount", dataset)
        if err != nil {
-               return fmt.Errorf("Failed to unmount ZFS filesystem: %s", 
output)
+               return fmt.Errorf("Failed to unmount ZFS filesystem: %s.", 
string(msg))
        }
 
        return nil
 }
 
 func (s *storageZfs) zfsPoolVolumeUmount(path string) error {
-       return zfsUmount(s.getOnDiskPoolName(), path)
+       dataset := fmt.Sprintf("%s/%s", s.getOnDiskPoolName(), path)
+       return zfsPoolVolumeUmount(dataset)
 }
 
 func (s *storageZfs) zfsPoolListSubvolumes(path string) ([]string, error) {

From c32f66ab1956dc70cf2e64f699d1fd9a3e594320 Mon Sep 17 00:00:00 2001
From: Christian Brauner <[email protected]>
Date: Fri, 3 Mar 2017 17:43:17 +0100
Subject: [PATCH 2/2] zfs: improve locking around Image{Create,Delete}

Signed-off-by: Christian Brauner <[email protected]>
---
 lxd/storage.go     |  14 +++++
 lxd/storage_zfs.go | 153 +++++++++++++++++++++++++++++++++++++++++++----------
 2 files changed, 139 insertions(+), 28 deletions(-)

diff --git a/lxd/storage.go b/lxd/storage.go
index 920ac18..9ae5426 100644
--- a/lxd/storage.go
+++ b/lxd/storage.go
@@ -27,6 +27,16 @@ import (
 // Note that any access to this map must be done while holding a lock.
 var lxdStorageOngoingOperationMap = map[string]chan bool{}
 
+type containerCreationMap struct {
+       refcount map[string]int
+       wait     map[string]chan bool
+}
+
+var lxdContainerCreationMap = containerCreationMap{
+       refcount: make(map[string]int),
+       wait:     make(map[string]chan bool),
+}
+
 // lxdStorageMapLock is used to access lxdStorageOngoingOperationMap.
 var lxdStorageMapLock sync.Mutex
 
@@ -60,6 +70,10 @@ func getContainerUmountLockID(poolName string, containerName 
string) string {
        return fmt.Sprintf("umount/container/%s/%s", poolName, containerName)
 }
 
+func getContainerCreateFromImageLockID(poolName string, fingerprint string) 
string {
+       return fmt.Sprintf("create/container/%s/%s", poolName, fingerprint)
+}
+
 // Custom lock IDs.
 func getCustomMountLockID(poolName string, volumeName string) string {
        return fmt.Sprintf("mount/custom/%s/%s", poolName, volumeName)
diff --git a/lxd/storage_zfs.go b/lxd/storage_zfs.go
index 8daaa66..fdade56 100644
--- a/lxd/storage_zfs.go
+++ b/lxd/storage_zfs.go
@@ -544,40 +544,41 @@ func (s *storageZfs) ContainerCreateFromImage(container 
container, fingerprint s
        containerName := container.Name()
        fs := fmt.Sprintf("containers/%s", containerName)
        containerPoolVolumeMntPoint := getContainerMountPoint(s.pool.Name, 
containerName)
-
        imageNoPoolName := fmt.Sprintf("images/%s", fingerprint)
-       poolName := s.getOnDiskPoolName()
-       imageDataset := fmt.Sprintf("%s/images/%s", poolName, fingerprint)
 
-       imageStoragePoolLockID := getImageCreateLockID(s.pool.Name, fingerprint)
+       // Add the container creation to the map or increase the refcount.
+       containerCreateFromImagePoolLockID := 
getContainerCreateFromImageLockID(s.pool.Name, fingerprint)
        lxdStorageMapLock.Lock()
-       if waitChannel, ok := 
lxdStorageOngoingOperationMap[imageStoragePoolLockID]; ok {
-               lxdStorageMapLock.Unlock()
-               if _, ok := <-waitChannel; ok {
-                       shared.LogWarnf("Received value over semaphore. This 
should not have happened.")
-               }
-       } else {
-               lxdStorageOngoingOperationMap[imageStoragePoolLockID] = 
make(chan bool)
-               lxdStorageMapLock.Unlock()
-
-               var imgerr error
-               if !zfsFilesystemEntityExists(imageDataset) {
-                       imgerr = s.ImageCreate(fingerprint)
-               }
+       _, ok := 
lxdContainerCreationMap.refcount[containerCreateFromImagePoolLockID]
+       if !ok {
+               
lxdContainerCreationMap.wait[containerCreateFromImagePoolLockID] = make(chan 
bool)
+       }
+       lxdContainerCreationMap.refcount[containerCreateFromImagePoolLockID]++
+       lxdStorageMapLock.Unlock()
 
+       defer func() {
                lxdStorageMapLock.Lock()
-               if waitChannel, ok := 
lxdStorageOngoingOperationMap[imageStoragePoolLockID]; ok {
-                       close(waitChannel)
-                       delete(lxdStorageOngoingOperationMap, 
imageStoragePoolLockID)
+               _, ok := 
lxdContainerCreationMap.refcount[containerCreateFromImagePoolLockID]
+               if ok {
+                       
lxdContainerCreationMap.refcount[containerCreateFromImagePoolLockID]--
+                       if 
lxdContainerCreationMap.refcount[containerCreateFromImagePoolLockID] == 0 {
+                               // Close wait channel.
+                               
close(lxdContainerCreationMap.wait[containerCreateFromImagePoolLockID])
+                               // Delete refcount.
+                               delete(lxdContainerCreationMap.refcount, 
containerCreateFromImagePoolLockID)
+                               // Delete wait channel.
+                               delete(lxdContainerCreationMap.wait, 
containerCreateFromImagePoolLockID)
+                       }
                }
                lxdStorageMapLock.Unlock()
+       }()
 
-               if imgerr != nil {
-                       return imgerr
-               }
+       err := s.ImageCreate(fingerprint)
+       if err != nil {
+               return err
        }
 
-       err := s.zfsPoolVolumeClone(imageNoPoolName, "readonly", fs, 
containerPoolVolumeMntPoint)
+       err = s.zfsPoolVolumeClone(imageNoPoolName, "readonly", fs, 
containerPoolVolumeMntPoint)
        if err != nil {
                return err
        }
@@ -1258,10 +1259,50 @@ func (s *storageZfs) ImageCreate(fingerprint string) 
error {
        imageBaseDataset := fmt.Sprintf("images/%s", fingerprint)
        imageDataset := fmt.Sprintf("%s/%s", poolName, imageBaseDataset)
 
-       revert := true
-       subrevert := true
+       // Check for any ongoing ImageCreate() operation.
+       imageCreatePoolLockID := getImageCreateLockID(s.pool.Name, fingerprint)
+       lxdStorageMapLock.Lock()
+       waitChannel, ok := lxdStorageOngoingOperationMap[imageCreatePoolLockID]
+       if ok {
+               lxdStorageMapLock.Unlock()
+               if _, ok := <-waitChannel; ok {
+                       shared.LogWarnf("Received value over semaphore. This 
should not have happened.")
+               }
+               return nil
+       }
+       lxdStorageOngoingOperationMap[imageCreatePoolLockID] = make(chan bool)
+       lxdStorageMapLock.Unlock()
+
+       // Wait for any operation that might conflict with us.
+       imageDeletePoolLockID := getImageDeleteLockID(s.pool.Name, fingerprint)
+       lxdStorageMapLock.Lock()
+       waitChannel, ok = lxdStorageOngoingOperationMap[imageDeletePoolLockID]
+       lxdStorageMapLock.Unlock()
+       if ok {
+               // Wait for the ImageDelete() operation to finish.
+               if _, ok := <-waitChannel; ok {
+                       shared.LogWarnf("Received value over semaphore. This 
should not have happened.")
+               }
+       }
+
+       defer func() {
+               lxdStorageMapLock.Lock()
+               waitChannel, ok := 
lxdStorageOngoingOperationMap[imageCreatePoolLockID]
+               if ok {
+                       close(waitChannel)
+                       delete(lxdStorageOngoingOperationMap, 
imageCreatePoolLockID)
+               }
+               lxdStorageMapLock.Unlock()
+       }()
+
+       // Check if image already exists.
+       _, err := dbStoragePoolVolumeGetTypeID(s.d.db, fingerprint, 
storagePoolVolumeTypeImage, s.poolID)
+       if err == nil {
+               return nil
+       }
 
-       err := s.createImageDbPoolVolume(fingerprint)
+       subrevert := true
+       err = s.createImageDbPoolVolume(fingerprint)
        if err != nil {
                return err
        }
@@ -1272,6 +1313,7 @@ func (s *storageZfs) ImageCreate(fingerprint string) 
error {
                s.deleteImageDbPoolVolume(fingerprint)
        }()
 
+       revert := true
        imageDeletedDataset := fmt.Sprintf("%s/deleted/%s", poolName, 
imageBaseDataset)
        if zfsFilesystemEntityExists(imageDeletedDataset) {
                err := zfsPoolVolumeRename(imageDeletedDataset, imageDataset)
@@ -1384,6 +1426,61 @@ func (s *storageZfs) ImageCreate(fingerprint string) 
error {
 func (s *storageZfs) ImageDelete(fingerprint string) error {
        shared.LogDebugf("Deleting ZFS storage volume for image \"%s\" on 
storage pool \"%s\".", fingerprint, s.pool.Name)
 
+       // Take a lock to check whether there is already an ImageDelete()
+       // progress.
+       imageDeletePoolLockID := getImageDeleteLockID(s.pool.Name, fingerprint)
+       lxdStorageMapLock.Lock()
+       waitChannel, ok := lxdStorageOngoingOperationMap[imageDeletePoolLockID]
+       if ok {
+               lxdStorageMapLock.Unlock()
+               if _, ok := <-waitChannel; ok {
+                       shared.LogWarnf("Received value over semaphore. This 
should not have happened.")
+               }
+               return nil
+       }
+       lxdStorageOngoingOperationMap[imageDeletePoolLockID] = make(chan bool)
+       lxdStorageMapLock.Unlock()
+
+       // Wait for any already ongoing operation that might conflict with us.
+
+       // Check whether an image is created atm.
+       imageCreatePoolLockID := getImageCreateLockID(s.pool.Name, fingerprint)
+       lxdStorageMapLock.Lock()
+       waitChannel, ok = lxdStorageOngoingOperationMap[imageCreatePoolLockID]
+       lxdStorageMapLock.Unlock()
+       if ok {
+               if _, ok := <-waitChannel; ok {
+                       shared.LogWarnf("Received value over semaphore. This 
should not have happened.")
+               }
+       }
+
+       // Check whether a container is created from that image.
+       containerCreateFromImagePoolLockID := 
getContainerCreateFromImageLockID(s.pool.Name, fingerprint)
+       lxdStorageMapLock.Lock()
+       waitChannel, ok = 
lxdContainerCreationMap.wait[containerCreateFromImagePoolLockID]
+       lxdStorageMapLock.Unlock()
+       if ok {
+               if _, ok := <-waitChannel; ok {
+                       shared.LogWarnf("Received value over semaphore. This 
should not have happened.")
+               }
+       }
+
+       defer func() {
+               lxdStorageMapLock.Lock()
+               waitChannel, ok := 
lxdStorageOngoingOperationMap[imageDeletePoolLockID]
+               if ok {
+                       close(waitChannel)
+                       delete(lxdStorageOngoingOperationMap, 
imageDeletePoolLockID)
+               }
+               lxdStorageMapLock.Unlock()
+       }()
+
+       // Check if image already is already deleted.
+       _, err := dbStoragePoolVolumeGetTypeID(s.d.db, fingerprint, 
storagePoolVolumeTypeImage, s.poolID)
+       if err == NoSuchObjectError {
+               return nil
+       }
+
        fs := fmt.Sprintf("images/%s", fingerprint)
 
        if s.zfsFilesystemEntityExists(fs, true) {
@@ -1410,7 +1507,7 @@ func (s *storageZfs) ImageDelete(fingerprint string) 
error {
                }
        }
 
-       err := s.deleteImageDbPoolVolume(fingerprint)
+       err = s.deleteImageDbPoolVolume(fingerprint)
        if err != nil {
                return err
        }
_______________________________________________
lxc-devel mailing list
[email protected]
http://lists.linuxcontainers.org/listinfo/lxc-devel

Reply via email to