Good idea, I propose the interdiff:

diff --git a/src/Ganeti/Locking/Locks.hs b/src/Ganeti/Locking/Locks.hs
index 5a6b404..5c41d75 100644
--- a/src/Ganeti/Locking/Locks.hs
+++ b/src/Ganeti/Locking/Locks.hs
@@ -171,8 +171,19 @@ instance Lock GanetiLocks where
   lockImplications (Network _) = [NetworkLockSet]
   lockImplications _ = []

--- | A client is identified as a job id and path to its process
+-- | A client is identified as a job id, thread id and path to its process
 -- identifier file.
+--
+-- The JobId isn't enough to identify a client as the master daemon
+-- also handles RPC calls that aren't jobs, but which use the
configuration.
+-- Therefore it's needed to include the identification for threads.
+-- An alternative would be to use something like @Either JobId RpcCallId@.
+--
+-- FIXME: Python threads are only unique wrt running threads, so it's
possible
+-- that a new thread will get a thread id that has been used before by
another
+-- finished thread. Since we rely on threads releasing their locks anyway,
+-- this isn't a big issue, but in the future it'd be better to have a
unique
+-- identifier for each operation.
 data ClientId = ClientId
   { ciJobId :: Maybe JobId
   , ciThreadId :: Integer



On Tue, Mar 18, 2014 at 11:16 AM, Helga Velroyen <[email protected]> wrote:

>
>
>
> On Mon, Mar 17, 2014 at 1:46 PM, Petr Pudlak <[email protected]> wrote:
>
>> This allows to distinguish threads that don't have a job id, which is
>> needed for answering queries.
>>
>> Since Python thread IDs aren't guaranteed to be unique, in future it'd
>> be preferable to use a different, unique identifier.
>>
>> Note that this breaks 'gnt-debug locks'.
>>
>> Signed-off-by: Petr Pudlak <[email protected]>
>> ---
>>  lib/cmdlib/base.py          | 18 ++++++------
>>  lib/mcpu.py                 | 68
>> ++++++++++++++++++++-------------------------
>>  src/Ganeti/Locking/Locks.hs | 10 ++++---
>>  3 files changed, 45 insertions(+), 51 deletions(-)
>>
>> diff --git a/lib/cmdlib/base.py b/lib/cmdlib/base.py
>> index 33f2313..3039791 100644
>> --- a/lib/cmdlib/base.py
>> +++ b/lib/cmdlib/base.py
>> @@ -65,19 +65,19 @@ class LUWConfdClient(object):
>>      self.lu = lu
>>
>>    def TryUpdateLocks(self, req):
>> -    jid, livelockfile = self.lu.wconfdcontext
>> -    self.lu.wconfd.Client().TryUpdateLocks(jid, livelockfile, req)
>> -    self.lu.wconfdlocks = self.lu.wconfd.Client().ListLocks(jid,
>> livelockfile)
>> +    self.lu.wconfd.Client().TryUpdateLocks(self.lu.wconfdcontext, req)
>> +    self.lu.wconfdlocks = \
>> +      self.lu.wconfd.Client().ListLocks(self.lu.wconfdcontext)
>>
>>    def DownGradeLocksLevel(self, level):
>> -    jid, livelockfile = self.lu.wconfdcontext
>> -    self.lu.wconfd.Client().DownGradeLocksLevel(jid, livelockfile, level)
>> -    self.lu.wconfdlocks = self.lu.wconfd.Client().ListLocks(jid,
>> livelockfile)
>> +    self.lu.wconfd.Client().DownGradeLocksLevel(self.lu.wconfdcontext,
>> level)
>> +    self.lu.wconfdlocks = \
>> +      self.lu.wconfd.Client().ListLocks(self.lu.wconfdcontext)
>>
>>    def FreeLocksLevel(self, level):
>> -    jid, livelockfile = self.lu.wconfdcontext
>> -    self.lu.wconfd.Client().FreeLocksLevel(jid, livelockfile, level)
>> -    self.lu.wconfdlocks = self.lu.wconfd.Client().ListLocks(jid,
>> livelockfile)
>> +    self.lu.wconfd.Client().FreeLocksLevel(self.lu.wconfdcontext, level)
>> +    self.lu.wconfdlocks = \
>> +      self.lu.wconfd.Client().ListLocks(self.lu.wconfdcontext)
>>
>>
>>  class LogicalUnit(object):
>> diff --git a/lib/mcpu.py b/lib/mcpu.py
>> index 0bd5cbf..d527d27 100644
>> --- a/lib/mcpu.py
>> +++ b/lib/mcpu.py
>> @@ -33,6 +33,7 @@ import logging
>>  import random
>>  import time
>>  import itertools
>> +import threading
>>  import traceback
>>
>>  from ganeti import opcodes
>> @@ -308,6 +309,9 @@ class Processor(object):
>>      self.hmclass = hooksmaster.HooksMaster
>>      self._enable_locks = enable_locks
>>      self.wconfd = wconfd # Indirection to allow testing
>> +    self._wconfdcontext = (ec_id,
>> +                           threading.current_thread().ident,
>> +                           self.context.livelock.lockfile.name)
>>
>>    def _CheckLocksEnabled(self):
>>      """Checks if locking is enabled.
>> @@ -362,14 +366,12 @@ class Processor(object):
>>        names = [names]
>>
>>      levelname = locking.LEVEL_NAMES[level]
>> -    jid = int(self.GetECId())
>> -    livelockfile = self.context.livelock.lockfile.name
>>
>>      locks = ["%s/%s" % (levelname, lock) for lock in list(names)]
>>
>>      if not names:
>> -      logging.debug("Acquiring no locks for %d (%s) at level %s",
>> -                    jid, livelockfile, levelname)
>> +      logging.debug("Acquiring no locks for (%s) at level %s",
>> +                    self._wconfdcontext, levelname)
>>        return []
>>
>>      if shared:
>> @@ -378,26 +380,26 @@ class Processor(object):
>>        request = [[lock, "exclusive"] for lock in locks]
>>
>>      if opportunistic:
>> -      logging.debug("Opportunistically acquring some of %s for %d (%s).",
>> -                    locks, jid, livelockfile)
>> -      locks = self.wconfd.Client().OpportunisticLockUnion(jid,
>> livelockfile,
>> +      logging.debug("Opportunistically acquring some of %s for %s.",
>> +                    locks, self._wconfdcontext)
>> +      locks =
>> self.wconfd.Client().OpportunisticLockUnion(self._wconfdcontext,
>>                                                            request)
>>      elif timeout is None:
>>        while True:
>>          ## TODO: use asynchronous wait instead of polling
>> -        blockedon = self.wconfd.Client().TryUpdateLocks(jid,
>> livelockfile,
>> +        blockedon =
>> self.wconfd.Client().TryUpdateLocks(self._wconfdcontext,
>>                                                          request)
>> -        logging.debug("
>> https://memegen.googleplex.com/5618255569879040Requesting %s for %d (%s)
>> blocked on %s",
>>
>> -                      request, jid, livelockfile, blockedon)
>> +        logging.debug("Requesting %s for %s blocked on %s",
>> +                      request, self._wconfdcontext, blockedon)
>>          if not blockedon:
>>            break
>>          time.sleep(random.random())
>>      else:
>> -      logging.debug("Trying %ss to request %s for %d (%s)",
>> -                    timeout, request, jid, livelockfile)
>> +      logging.debug("Trying %ss to request %s for %s",
>> +                    timeout, request, self._wconfdcontext)
>>        ## TODO: use blocking wait instead of polling
>>        blocked = utils.SimpleRetry([],
>> self.wconfd.Client().TryUpdateLocks, 0.1,
>> -                                  timeout, args=[jid, livelockfile,
>> request])
>> +                                  timeout, args=[self._wconfdcontext,
>> request])
>>        if blocked:
>>          raise LockAcquireTimeout()
>>
>> @@ -497,8 +499,7 @@ class Processor(object):
>>
>>            self._AcquireLocks(level, needed_locks, share, opportunistic,
>>                               calc_timeout())
>> -          (jid, livelockfile) = lu.wconfdcontext
>> -          lu.wconfdlocks = self.wconfd.Client().ListLocks(jid,
>> livelockfile)
>> +          lu.wconfdlocks =
>> self.wconfd.Client().ListLocks(self._wconfdcontext)
>>          else:
>>            # Adding locks
>>            add_locks = lu.add_locks[level]
>> @@ -507,8 +508,6 @@ class Processor(object):
>>            lu.remove_locks[level] = add_locks
>>
>>            try:
>> -            jid = int(self.GetECId())
>> -            livelockfile = self.context.livelock.lockfile.name
>>              levelname = locking.LEVEL_NAMES[level]
>>
>>              if share:
>> @@ -519,13 +518,13 @@ class Processor(object):
>>              request = [["%s/%s" % (levelname, lock), mode]
>>                         for lock in add_locks]
>>
>> -            logging.debug("Requesting %s for %d (%s)",
>> -                          request, jid, livelockfile)
>> -            blocked = \
>> -              self.wconfd.Client().TryUpdateLocks(jid, livelockfile,
>> request)
>> +            logging.debug("Requesting %s for %s",
>> +                          request, self._wconfdcontext)
>> +            blocked =
>> self.wconfd.Client().TryUpdateLocks(self._wconfdcontext,
>> +                                                          request)
>>              assert blocked == [], "Allocating newly 'created' locks
>> failed"
>> -            (jid, livelockfile) = lu.wconfdcontext
>> -            lu.wconfdlocks = self.wconfd.Client().ListLocks(jid,
>> livelockfile)
>> +            lu.wconfdlocks = \
>> +              self.wconfd.Client().ListLocks(self._wconfdcontext)
>>            except errors.GenericError:
>>              # TODO: verify what actually caused the error
>>              logging.exception("Detected lock error in level %s for locks"
>> @@ -539,21 +538,18 @@ class Processor(object):
>>            result = self._LockAndExecLU(lu, level + 1, calc_timeout)
>>          finally:
>>            if level in lu.remove_locks:
>> -            jid = int(self.GetECId())
>> -            livelockfile = self.context.livelock.lockfile.name
>>              levelname = locking.LEVEL_NAMES[level]
>>              request = [["%s/%s" % (levelname, lock), "release"]
>>                         for lock in lu.remove_locks[level]]
>>              blocked = \
>> -              self.wconfd.Client().TryUpdateLocks(jid, livelockfile,
>> request)
>> +              self.wconfd.Client().TryUpdateLocks(self._wconfdcontext,
>> +                                                  request)
>>              assert blocked == [], "Release may not fail"
>>        finally:
>> -        jid = int(self.GetECId())
>> -        livelockfile = self.context.livelock.lockfile.name
>>          levelname = locking.LEVEL_NAMES[level]
>> -        logging.debug("Freeing locks at level %s for %d (%s)",
>> -                      levelname, jid, livelockfile)
>> -        self.wconfd.Client().FreeLocksLevel(jid, livelockfile, levelname)
>> +        logging.debug("Freeing locks at level %s for %s",
>> +                      levelname, self._wconfdcontext)
>> +        self.wconfd.Client().FreeLocksLevel(self._wconfdcontext,
>> levelname)
>>      else:
>>        result = self._LockAndExecLU(lu, level + 1, calc_timeout)
>>
>> @@ -614,11 +610,9 @@ class Processor(object):
>>                                       " disabled" % op.OP_ID)
>>
>>        try:
>> -        jid = int(self.GetECId())
>> -        livelockfile = self.context.livelock.lockfile.name
>> -        lu = lu_class(self, op, self.context, self.rpc, (jid,
>> livelockfile),
>> +        lu = lu_class(self, op, self.context, self.rpc,
>> self._wconfdcontext,
>>                        self.wconfd)
>> -        lu.wconfdlocks = self.wconfd.Client().ListLocks(jid,
>> livelockfile)
>> +        lu.wconfdlocks =
>> self.wconfd.Client().ListLocks(self._wconfdcontext)
>>          lu.ExpandNames()
>>          assert lu.needed_locks is not None, "needed_locks not set by LU"
>>
>> @@ -630,11 +624,9 @@ class Processor(object):
>>              self.context.cfg.DropECReservations(self._ec_id)
>>        finally:
>>          # Release BGL if owned
>> -        jid = int(self.GetECId())
>> -        livelockfile = self.context.livelock.lockfile.name
>>          bglname = "%s/%s" % (locking.LEVEL_NAMES[locking.LEVEL_CLUSTER],
>>                               locking.BGL)
>> -        self.wconfd.Client().TryUpdateLocks(jid, livelockfile,
>> +        self.wconfd.Client().TryUpdateLocks(self._wconfdcontext,
>>                                              [[bglname, "release"]])
>>      finally:https://memegen.googleplex.com/5618255569879040
>>
>>        self._cbs = None
>> diff --git a/src/Ganeti/Locking/Locks.hs b/src/Ganeti/Locking/Locks.hs
>> index db81b19..9705d9a 100644
>> --- a/src/Ganeti/Locking/Locks.hs
>> +++ b/src/Ganeti/Locking/Locks.hs
>> @@ -44,7 +44,7 @@ import qualified Text.JSON as J
>>
>>  import Ganeti.BasicTypes
>>  import Ganeti.Errors (ResultG, GanetiException)
>> -import Ganeti.JSON (readEitherString, fromJResultE)
>> +import Ganeti.JSON (readEitherString, fromJResultE, MaybeForJSON(..))
>>  import Ganeti.Locking.Allocation
>>  import Ganeti.Locking.Types
>>  import Ganeti.Logging.Lifted (MonadLog, logDebug, logEmergency)
>> @@ -174,14 +174,16 @@ instance Lock GanetiLocks wherhttps://
>> memegen.googleplex.com/5618255569879040https://memegen.googleplex.com/5618255569879040e
>>
>>  -- | A client is identified as a job id, thread id and path to its
>> process
>>  -- identifier file
>>  data ClientId = ClientId
>> -  { ciJobId :: JobId
>> +  { ciJobId :: Maybe JobId
>> +  , ciThreadId :: Integer
>>    , ciLockFile :: FilePath
>>    }
>>    deriving (Ord, Eq, Show)
>>
>> Please add a comment (maybe even a FIXME) that mentions the problem with
> the non-uniqueness of python threads.
>
>
>>  instance J.JSON ClientId where
>> -  showJSON (ClientId jid lf) = J.showJSON (jid, lf)
>> -  readJSON = liftM (uncurry ClientId) . J.readJSON
>> +  showJSON (ClientId jid tid lf) = J.showJSON (MaybeForJSON jid, tid, lf)
>> +  readJSON = liftM (\(MaybeForJSON jid, tid, lf) -> ClientId jid tid lf)
>> +             . J.readJSON
>>
>>  -- | The type of lock Allocations in Ganeti. In Ganeti, the owner of
>>  -- locks are jobs.
>> --
>> 1.9.0.279.gdc9e3eb
>>
>>
>
> Rest LGTM
> Helga
> --
> --
> Helga Velroyen | Software Engineer | [email protected] |
>
> Google Germany GmbH
> Dienerstr. 12
> 80331 München
>
> Registergericht und -nummer: Hamburg, HRB 86891
> Sitz der Gesellschaft: Hamburg
> Geschäftsführer: Graham Law, Christine Elizabeth Flores
>

Reply via email to