Good idea, I propose the interdiff:
diff --git a/src/Ganeti/Locking/Locks.hs b/src/Ganeti/Locking/Locks.hs
index 5a6b404..5c41d75 100644
--- a/src/Ganeti/Locking/Locks.hs
+++ b/src/Ganeti/Locking/Locks.hs
@@ -171,8 +171,19 @@ instance Lock GanetiLocks where
lockImplications (Network _) = [NetworkLockSet]
lockImplications _ = []
--- | A client is identified as a job id and path to its process
+-- | A client is identified as a job id, thread id and path to its process
-- identifier file.
+--
+-- The JobId isn't enough to identify a client as the master daemon
+-- also handles RPC calls that aren't jobs, but which use the
configuration.
+-- Therefore it's needed to include the identification for threads.
+-- An alternative would be to use something like @Either JobId RpcCallId@.
+--
+-- FIXME: Python threads are only unique wrt running threads, so it's
possible
+-- that a new thread will get a thread id that has been used before by
another
+-- finished thread. Since we rely on threads releasing their locks anyway,
+-- this isn't a big issue, but in the future it'd be better to have a
unique
+-- identifier for each operation.
data ClientId = ClientId
{ ciJobId :: Maybe JobId
, ciThreadId :: Integer
On Tue, Mar 18, 2014 at 11:16 AM, Helga Velroyen <[email protected]> wrote:
>
>
>
> On Mon, Mar 17, 2014 at 1:46 PM, Petr Pudlak <[email protected]> wrote:
>
>> This allows to distinguish threads that don't have a job id, which is
>> needed for answering queries.
>>
>> Since Python thread IDs aren't guaranteed to be unique, in future it'd
>> be preferable to use a different, unique identifier.
>>
>> Note that this breaks 'gnt-debug locks'.
>>
>> Signed-off-by: Petr Pudlak <[email protected]>
>> ---
>> lib/cmdlib/base.py | 18 ++++++------
>> lib/mcpu.py | 68
>> ++++++++++++++++++++-------------------------
>> src/Ganeti/Locking/Locks.hs | 10 ++++---
>> 3 files changed, 45 insertions(+), 51 deletions(-)
>>
>> diff --git a/lib/cmdlib/base.py b/lib/cmdlib/base.py
>> index 33f2313..3039791 100644
>> --- a/lib/cmdlib/base.py
>> +++ b/lib/cmdlib/base.py
>> @@ -65,19 +65,19 @@ class LUWConfdClient(object):
>> self.lu = lu
>>
>> def TryUpdateLocks(self, req):
>> - jid, livelockfile = self.lu.wconfdcontext
>> - self.lu.wconfd.Client().TryUpdateLocks(jid, livelockfile, req)
>> - self.lu.wconfdlocks = self.lu.wconfd.Client().ListLocks(jid,
>> livelockfile)
>> + self.lu.wconfd.Client().TryUpdateLocks(self.lu.wconfdcontext, req)
>> + self.lu.wconfdlocks = \
>> + self.lu.wconfd.Client().ListLocks(self.lu.wconfdcontext)
>>
>> def DownGradeLocksLevel(self, level):
>> - jid, livelockfile = self.lu.wconfdcontext
>> - self.lu.wconfd.Client().DownGradeLocksLevel(jid, livelockfile, level)
>> - self.lu.wconfdlocks = self.lu.wconfd.Client().ListLocks(jid,
>> livelockfile)
>> + self.lu.wconfd.Client().DownGradeLocksLevel(self.lu.wconfdcontext,
>> level)
>> + self.lu.wconfdlocks = \
>> + self.lu.wconfd.Client().ListLocks(self.lu.wconfdcontext)
>>
>> def FreeLocksLevel(self, level):
>> - jid, livelockfile = self.lu.wconfdcontext
>> - self.lu.wconfd.Client().FreeLocksLevel(jid, livelockfile, level)
>> - self.lu.wconfdlocks = self.lu.wconfd.Client().ListLocks(jid,
>> livelockfile)
>> + self.lu.wconfd.Client().FreeLocksLevel(self.lu.wconfdcontext, level)
>> + self.lu.wconfdlocks = \
>> + self.lu.wconfd.Client().ListLocks(self.lu.wconfdcontext)
>>
>>
>> class LogicalUnit(object):
>> diff --git a/lib/mcpu.py b/lib/mcpu.py
>> index 0bd5cbf..d527d27 100644
>> --- a/lib/mcpu.py
>> +++ b/lib/mcpu.py
>> @@ -33,6 +33,7 @@ import logging
>> import random
>> import time
>> import itertools
>> +import threading
>> import traceback
>>
>> from ganeti import opcodes
>> @@ -308,6 +309,9 @@ class Processor(object):
>> self.hmclass = hooksmaster.HooksMaster
>> self._enable_locks = enable_locks
>> self.wconfd = wconfd # Indirection to allow testing
>> + self._wconfdcontext = (ec_id,
>> + threading.current_thread().ident,
>> + self.context.livelock.lockfile.name)
>>
>> def _CheckLocksEnabled(self):
>> """Checks if locking is enabled.
>> @@ -362,14 +366,12 @@ class Processor(object):
>> names = [names]
>>
>> levelname = locking.LEVEL_NAMES[level]
>> - jid = int(self.GetECId())
>> - livelockfile = self.context.livelock.lockfile.name
>>
>> locks = ["%s/%s" % (levelname, lock) for lock in list(names)]
>>
>> if not names:
>> - logging.debug("Acquiring no locks for %d (%s) at level %s",
>> - jid, livelockfile, levelname)
>> + logging.debug("Acquiring no locks for (%s) at level %s",
>> + self._wconfdcontext, levelname)
>> return []
>>
>> if shared:
>> @@ -378,26 +380,26 @@ class Processor(object):
>> request = [[lock, "exclusive"] for lock in locks]
>>
>> if opportunistic:
>> - logging.debug("Opportunistically acquring some of %s for %d (%s).",
>> - locks, jid, livelockfile)
>> - locks = self.wconfd.Client().OpportunisticLockUnion(jid,
>> livelockfile,
>> + logging.debug("Opportunistically acquring some of %s for %s.",
>> + locks, self._wconfdcontext)
>> + locks =
>> self.wconfd.Client().OpportunisticLockUnion(self._wconfdcontext,
>> request)
>> elif timeout is None:
>> while True:
>> ## TODO: use asynchronous wait instead of polling
>> - blockedon = self.wconfd.Client().TryUpdateLocks(jid,
>> livelockfile,
>> + blockedon =
>> self.wconfd.Client().TryUpdateLocks(self._wconfdcontext,
>> request)
>> - logging.debug("
>> https://memegen.googleplex.com/5618255569879040Requesting %s for %d (%s)
>> blocked on %s",
>>
>> - request, jid, livelockfile, blockedon)
>> + logging.debug("Requesting %s for %s blocked on %s",
>> + request, self._wconfdcontext, blockedon)
>> if not blockedon:
>> break
>> time.sleep(random.random())
>> else:
>> - logging.debug("Trying %ss to request %s for %d (%s)",
>> - timeout, request, jid, livelockfile)
>> + logging.debug("Trying %ss to request %s for %s",
>> + timeout, request, self._wconfdcontext)
>> ## TODO: use blocking wait instead of polling
>> blocked = utils.SimpleRetry([],
>> self.wconfd.Client().TryUpdateLocks, 0.1,
>> - timeout, args=[jid, livelockfile,
>> request])
>> + timeout, args=[self._wconfdcontext,
>> request])
>> if blocked:
>> raise LockAcquireTimeout()
>>
>> @@ -497,8 +499,7 @@ class Processor(object):
>>
>> self._AcquireLocks(level, needed_locks, share, opportunistic,
>> calc_timeout())
>> - (jid, livelockfile) = lu.wconfdcontext
>> - lu.wconfdlocks = self.wconfd.Client().ListLocks(jid,
>> livelockfile)
>> + lu.wconfdlocks =
>> self.wconfd.Client().ListLocks(self._wconfdcontext)
>> else:
>> # Adding locks
>> add_locks = lu.add_locks[level]
>> @@ -507,8 +508,6 @@ class Processor(object):
>> lu.remove_locks[level] = add_locks
>>
>> try:
>> - jid = int(self.GetECId())
>> - livelockfile = self.context.livelock.lockfile.name
>> levelname = locking.LEVEL_NAMES[level]
>>
>> if share:
>> @@ -519,13 +518,13 @@ class Processor(object):
>> request = [["%s/%s" % (levelname, lock), mode]
>> for lock in add_locks]
>>
>> - logging.debug("Requesting %s for %d (%s)",
>> - request, jid, livelockfile)
>> - blocked = \
>> - self.wconfd.Client().TryUpdateLocks(jid, livelockfile,
>> request)
>> + logging.debug("Requesting %s for %s",
>> + request, self._wconfdcontext)
>> + blocked =
>> self.wconfd.Client().TryUpdateLocks(self._wconfdcontext,
>> + request)
>> assert blocked == [], "Allocating newly 'created' locks
>> failed"
>> - (jid, livelockfile) = lu.wconfdcontext
>> - lu.wconfdlocks = self.wconfd.Client().ListLocks(jid,
>> livelockfile)
>> + lu.wconfdlocks = \
>> + self.wconfd.Client().ListLocks(self._wconfdcontext)
>> except errors.GenericError:
>> # TODO: verify what actually caused the error
>> logging.exception("Detected lock error in level %s for locks"
>> @@ -539,21 +538,18 @@ class Processor(object):
>> result = self._LockAndExecLU(lu, level + 1, calc_timeout)
>> finally:
>> if level in lu.remove_locks:
>> - jid = int(self.GetECId())
>> - livelockfile = self.context.livelock.lockfile.name
>> levelname = locking.LEVEL_NAMES[level]
>> request = [["%s/%s" % (levelname, lock), "release"]
>> for lock in lu.remove_locks[level]]
>> blocked = \
>> - self.wconfd.Client().TryUpdateLocks(jid, livelockfile,
>> request)
>> + self.wconfd.Client().TryUpdateLocks(self._wconfdcontext,
>> + request)
>> assert blocked == [], "Release may not fail"
>> finally:
>> - jid = int(self.GetECId())
>> - livelockfile = self.context.livelock.lockfile.name
>> levelname = locking.LEVEL_NAMES[level]
>> - logging.debug("Freeing locks at level %s for %d (%s)",
>> - levelname, jid, livelockfile)
>> - self.wconfd.Client().FreeLocksLevel(jid, livelockfile, levelname)
>> + logging.debug("Freeing locks at level %s for %s",
>> + levelname, self._wconfdcontext)
>> + self.wconfd.Client().FreeLocksLevel(self._wconfdcontext,
>> levelname)
>> else:
>> result = self._LockAndExecLU(lu, level + 1, calc_timeout)
>>
>> @@ -614,11 +610,9 @@ class Processor(object):
>> " disabled" % op.OP_ID)
>>
>> try:
>> - jid = int(self.GetECId())
>> - livelockfile = self.context.livelock.lockfile.name
>> - lu = lu_class(self, op, self.context, self.rpc, (jid,
>> livelockfile),
>> + lu = lu_class(self, op, self.context, self.rpc,
>> self._wconfdcontext,
>> self.wconfd)
>> - lu.wconfdlocks = self.wconfd.Client().ListLocks(jid,
>> livelockfile)
>> + lu.wconfdlocks =
>> self.wconfd.Client().ListLocks(self._wconfdcontext)
>> lu.ExpandNames()
>> assert lu.needed_locks is not None, "needed_locks not set by LU"
>>
>> @@ -630,11 +624,9 @@ class Processor(object):
>> self.context.cfg.DropECReservations(self._ec_id)
>> finally:
>> # Release BGL if owned
>> - jid = int(self.GetECId())
>> - livelockfile = self.context.livelock.lockfile.name
>> bglname = "%s/%s" % (locking.LEVEL_NAMES[locking.LEVEL_CLUSTER],
>> locking.BGL)
>> - self.wconfd.Client().TryUpdateLocks(jid, livelockfile,
>> + self.wconfd.Client().TryUpdateLocks(self._wconfdcontext,
>> [[bglname, "release"]])
>> finally:https://memegen.googleplex.com/5618255569879040
>>
>> self._cbs = None
>> diff --git a/src/Ganeti/Locking/Locks.hs b/src/Ganeti/Locking/Locks.hs
>> index db81b19..9705d9a 100644
>> --- a/src/Ganeti/Locking/Locks.hs
>> +++ b/src/Ganeti/Locking/Locks.hs
>> @@ -44,7 +44,7 @@ import qualified Text.JSON as J
>>
>> import Ganeti.BasicTypes
>> import Ganeti.Errors (ResultG, GanetiException)
>> -import Ganeti.JSON (readEitherString, fromJResultE)
>> +import Ganeti.JSON (readEitherString, fromJResultE, MaybeForJSON(..))
>> import Ganeti.Locking.Allocation
>> import Ganeti.Locking.Types
>> import Ganeti.Logging.Lifted (MonadLog, logDebug, logEmergency)
>> @@ -174,14 +174,16 @@ instance Lock GanetiLocks wherhttps://
>> memegen.googleplex.com/5618255569879040https://memegen.googleplex.com/5618255569879040e
>>
>> -- | A client is identified as a job id, thread id and path to its
>> process
>> -- identifier file
>> data ClientId = ClientId
>> - { ciJobId :: JobId
>> + { ciJobId :: Maybe JobId
>> + , ciThreadId :: Integer
>> , ciLockFile :: FilePath
>> }
>> deriving (Ord, Eq, Show)
>>
>> Please add a comment (maybe even a FIXME) that mentions the problem with
> the non-uniqueness of python threads.
>
>
>> instance J.JSON ClientId where
>> - showJSON (ClientId jid lf) = J.showJSON (jid, lf)
>> - readJSON = liftM (uncurry ClientId) . J.readJSON
>> + showJSON (ClientId jid tid lf) = J.showJSON (MaybeForJSON jid, tid, lf)
>> + readJSON = liftM (\(MaybeForJSON jid, tid, lf) -> ClientId jid tid lf)
>> + . J.readJSON
>>
>> -- | The type of lock Allocations in Ganeti. In Ganeti, the owner of
>> -- locks are jobs.
>> --
>> 1.9.0.279.gdc9e3eb
>>
>>
>
> Rest LGTM
> Helga
> --
> --
> Helga Velroyen | Software Engineer | [email protected] |
>
> Google Germany GmbH
> Dienerstr. 12
> 80331 München
>
> Registergericht und -nummer: Hamburg, HRB 86891
> Sitz der Gesellschaft: Hamburg
> Geschäftsführer: Graham Law, Christine Elizabeth Flores
>