Federico Simoncelli has uploaded a new change for review.

Change subject: task: prevent tasks with the same id to be queued
......................................................................

task: prevent tasks with the same id to be queued

Change-Id: I0b9980de08d10203a75071dae9b16c9b850a91f5
Signed-off-by: Federico Simoncelli <[email protected]>
---
M vdsm/storage/task.py
M vdsm/storage/taskManager.py
2 files changed, 13 insertions(+), 3 deletions(-)


  git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/08/26808/1

diff --git a/vdsm/storage/task.py b/vdsm/storage/task.py
index 222cb2e..d8ca3a6 100644
--- a/vdsm/storage/task.py
+++ b/vdsm/storage/task.py
@@ -537,6 +537,7 @@
         except Exception as e:
             self._setError(e)
             self.stop()
+            raise
 
     def __state_running(self, fromState):
         self._runJobs()
diff --git a/vdsm/storage/taskManager.py b/vdsm/storage/taskManager.py
index 4a7a04f..aeae9d5 100644
--- a/vdsm/storage/taskManager.py
+++ b/vdsm/storage/taskManager.py
@@ -20,6 +20,7 @@
 
 import os
 import logging
+import threading
 
 from vdsm.config import config
 import storage_exception as se
@@ -38,6 +39,7 @@
         self.tp = ThreadPool(tpSize, waitTimeout, maxTasks)
         self._tasks = {}
         self._unqueuedTasks = []
+        self._tasksLock = threading.Lock()
 
     def queue(self, task):
         return self._queueTask(task, task.commit)
@@ -46,17 +48,24 @@
         return self._queueTask(task, task.recover)
 
     def _queueTask(self, task, method):
-        try:
+        with self._tasksLock:
+            if task.id in self._tasks:
+                raise se.AddTaskError(
+                    'Task id already in use: {0}'.format(task.id))
+
             self.log.debug("queuing task: %s", task.id)
             self._tasks[task.id] = task
+
+        try:
             if not self.tp.queueTask(task.id, method):
                 self.log.error("unable to queue task: %s", task.dumpTask())
                 del self._tasks[task.id]
                 raise se.AddTaskError()
             self.log.debug("task queued: %s", task.id)
-        except Exception as ex:
-            self.log.error("Could not queue task, encountered: %s", str(ex))
+        except Exception:
+            self.log.exception('could not queue task %s', task.id)
             raise
+
         return task.id
 
     def scheduleJob(self, type, store, task, jobName, func, *args):


-- 
To view, visit http://gerrit.ovirt.org/26808
To unsubscribe, visit http://gerrit.ovirt.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I0b9980de08d10203a75071dae9b16c9b850a91f5
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Federico Simoncelli <[email protected]>
_______________________________________________
vdsm-patches mailing list
[email protected]
https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches

Reply via email to