Great, thank you! LGTM

Helga


On Tue, Mar 18, 2014 at 12:48 PM, Petr Pudlák <[email protected]> wrote:

> Good idea, I propose the interdiff:
>
> diff --git a/src/Ganeti/Locking/Locks.hs b/src/Ganeti/Locking/Locks.hs
> index 5a6b404..5c41d75 100644
> --- a/src/Ganeti/Locking/Locks.hs
> +++ b/src/Ganeti/Locking/Locks.hs
> @@ -171,8 +171,19 @@ instance Lock GanetiLocks where
>    lockImplications (Network _) = [NetworkLockSet]
>    lockImplications _ = []
>
> --- | A client is identified as a job id and path to its process
> +-- | A client is identified as a job id, thread id and path to its process
>  -- identifier file.
> +--
> +-- The JobId isn't enough to identify a client as the master daemon
> +-- also handles RPC calls that aren't jobs, but which use the
> configuration.
> +-- Therefore it's needed to include the identification for threads.
> +-- An alternative would be to use something like @Either JobId RpcCallId@
> .
> +--
> +-- FIXME: Python threads are only unique wrt running threads, so it's
> possible
> +-- that a new thread will get a thread id that has been used before by
> another
> +-- finished thread. Since we rely on threads releasing their locks anyway,
> +-- this isn't a big issue, but in the future it'd be better to have a
> unique
> +-- identifier for each operation.
>  data ClientId = ClientId
>    { ciJobId :: Maybe JobId
>    , ciThreadId :: Integer
>
>
>
> On Tue, Mar 18, 2014 at 11:16 AM, Helga Velroyen <[email protected]>wrote:
>
>>
>>
>>
>> On Mon, Mar 17, 2014 at 1:46 PM, Petr Pudlak <[email protected]> wrote:
>>
>>> This allows to distinguish threads that don't have a job id, which is
>>> needed for answering queries.
>>>
>>> Since Python thread IDs aren't guaranteed to be unique, in future it'd
>>> be preferable to use a different, unique identifier.
>>>
>>> Note that this breaks 'gnt-debug locks'.
>>>
>>> Signed-off-by: Petr Pudlak <[email protected]>
>>> ---
>>>  lib/cmdlib/base.py          | 18 ++++++------
>>>  lib/mcpu.py                 | 68
>>> ++++++++++++++++++++-------------------------
>>>  src/Ganeti/Locking/Locks.hs | 10 ++++---
>>>  3 files changed, 45 insertions(+), 51 deletions(-)
>>>
>>> diff --git a/lib/cmdlib/base.py b/lib/cmdlib/base.py
>>> index 33f2313..3039791 100644
>>> --- a/lib/cmdlib/base.py
>>> +++ b/lib/cmdlib/base.py
>>> @@ -65,19 +65,19 @@ class LUWConfdClient(object):
>>>      self.lu = lu
>>>
>>>    def TryUpdateLocks(self, req):
>>> -    jid, livelockfile = self.lu.wconfdcontext
>>> -    self.lu.wconfd.Client().TryUpdateLocks(jid, livelockfile, req)
>>> -    self.lu.wconfdlocks = self.lu.wconfd.Client().ListLocks(jid,
>>> livelockfile)
>>> +    self.lu.wconfd.Client().TryUpdateLocks(self.lu.wconfdcontext, req)
>>> +    self.lu.wconfdlocks = \
>>> +      self.lu.wconfd.Client().ListLocks(self.lu.wconfdcontext)
>>>
>>>    def DownGradeLocksLevel(self, level):
>>> -    jid, livelockfile = self.lu.wconfdcontext
>>> -    self.lu.wconfd.Client().DownGradeLocksLevel(jid, livelockfile,
>>> level)
>>> -    self.lu.wconfdlocks = self.lu.wconfd.Client().ListLocks(jid,
>>> livelockfile)
>>> +    self.lu.wconfd.Client().DownGradeLocksLevel(self.lu.wconfdcontext,
>>> level)
>>> +    self.lu.wconfdlocks = \
>>> +      self.lu.wconfd.Client().ListLocks(self.lu.wconfdcontext)
>>>
>>>    def FreeLocksLevel(self, level):
>>> -    jid, livelockfile = self.lu.wconfdcontext
>>> -    self.lu.wconfd.Client().FreeLocksLevel(jid, livelockfile, level)
>>> -    self.lu.wconfdlocks = self.lu.wconfd.Client().ListLocks(jid,
>>> livelockfile)
>>> +    self.lu.wconfd.Client().FreeLocksLevel(self.lu.wconfdcontext, level)
>>> +    self.lu.wconfdlocks = \
>>> +      self.lu.wconfd.Client().ListLocks(self.lu.wconfdcontext)
>>>
>>>
>>>  class LogicalUnit(object):
>>> diff --git a/lib/mcpu.py b/lib/mcpu.py
>>> index 0bd5cbf..d527d27 100644
>>> --- a/lib/mcpu.py
>>> +++ b/lib/mcpu.py
>>> @@ -33,6 +33,7 @@ import logging
>>>  import random
>>>  import time
>>>  import itertools
>>> +import threading
>>>  import traceback
>>>
>>>  from ganeti import opcodes
>>> @@ -308,6 +309,9 @@ class Processor(object):
>>>      self.hmclass = hooksmaster.HooksMaster
>>>      self._enable_locks = enable_locks
>>>      self.wconfd = wconfd # Indirection to allow testing
>>> +    self._wconfdcontext = (ec_id,
>>> +                           threading.current_thread().ident,
>>> +                           self.context.livelock.lockfile.name)
>>>
>>>    def _CheckLocksEnabled(self):
>>>      """Checks if locking is enabled.
>>> @@ -362,14 +366,12 @@ class Processor(object):
>>>        names = [names]
>>>
>>>      levelname = locking.LEVEL_NAMES[level]
>>> -    jid = int(self.GetECId())
>>> -    livelockfile = self.context.livelock.lockfile.name
>>>
>>>      locks = ["%s/%s" % (levelname, lock) for lock in list(names)]
>>>
>>>      if not names:
>>> -      logging.debug("Acquiring no locks for %d (%s) at level %s",
>>> -                    jid, livelockfile, levelname)
>>> +      logging.debug("Acquiring no locks for (%s) at level %s",
>>> +                    self._wconfdcontext, levelname)
>>>        return []
>>>
>>>      if shared:
>>> @@ -378,26 +380,26 @@ class Processor(object):
>>>        request = [[lock, "exclusive"] for lock in locks]
>>>
>>>      if opportunistic:
>>> -      logging.debug("Opportunistically acquring some of %s for %d
>>> (%s).",
>>> -                    locks, jid, livelockfile)
>>> -      locks = self.wconfd.Client().OpportunisticLockUnion(jid,
>>> livelockfile,
>>> +      logging.debug("Opportunistically acquring some of %s for %s.",
>>> +                    locks, self._wconfdcontext)
>>> +      locks =
>>> self.wconfd.Client().OpportunisticLockUnion(self._wconfdcontext,
>>>                                                            request)
>>>      elif timeout is None:
>>>        while True:
>>>          ## TODO: use asynchronous wait instead of polling
>>> -        blockedon = self.wconfd.Client().TryUpdateLocks(jid,
>>> livelockfile,
>>> +        blockedon =
>>> self.wconfd.Client().TryUpdateLocks(self._wconfdcontext,
>>>                                                          request)
>>> -        logging.debug("
>>> https://memegen.googleplex.com/5618255569879040Requesting %s for %d
>>> (%s) blocked on %s",
>>>
>>> -                      request, jid, livelockfile, blockedon)
>>> +        logging.debug("Requesting %s for %s blocked on %s",
>>> +                      request, self._wconfdcontext, blockedon)
>>>          if not blockedon:
>>>            break
>>>          time.sleep(random.random())
>>>      else:
>>> -      logging.debug("Trying %ss to request %s for %d (%s)",
>>> -                    timeout, request, jid, livelockfile)
>>> +      logging.debug("Trying %ss to request %s for %s",
>>> +                    timeout, request, self._wconfdcontext)
>>>        ## TODO: use blocking wait instead of polling
>>>        blocked = utils.SimpleRetry([],
>>> self.wconfd.Client().TryUpdateLocks, 0.1,
>>> -                                  timeout, args=[jid, livelockfile,
>>> request])
>>> +                                  timeout, args=[self._wconfdcontext,
>>> request])
>>>        if blocked:
>>>          raise LockAcquireTimeout()
>>>
>>> @@ -497,8 +499,7 @@ class Processor(object):
>>>
>>>            self._AcquireLocks(level, needed_locks, share, opportunistic,
>>>                               calc_timeout())
>>> -          (jid, livelockfile) = lu.wconfdcontext
>>> -          lu.wconfdlocks = self.wconfd.Client().ListLocks(jid,
>>> livelockfile)
>>> +          lu.wconfdlocks =
>>> self.wconfd.Client().ListLocks(self._wconfdcontext)
>>>          else:
>>>            # Adding locks
>>>            add_locks = lu.add_locks[level]
>>> @@ -507,8 +508,6 @@ class Processor(object):
>>>            lu.remove_locks[level] = add_locks
>>>
>>>            try:
>>> -            jid = int(self.GetECId())
>>> -            livelockfile = self.context.livelock.lockfile.name
>>>              levelname = locking.LEVEL_NAMES[level]
>>>
>>>              if share:
>>> @@ -519,13 +518,13 @@ class Processor(object):
>>>              request = [["%s/%s" % (levelname, lock), mode]
>>>                         for lock in add_locks]
>>>
>>> -            logging.debug("Requesting %s for %d (%s)",
>>> -                          request, jid, livelockfile)
>>> -            blocked = \
>>> -              self.wconfd.Client().TryUpdateLocks(jid, livelockfile,
>>> request)
>>> +            logging.debug("Requesting %s for %s",
>>> +                          request, self._wconfdcontext)
>>> +            blocked =
>>> self.wconfd.Client().TryUpdateLocks(self._wconfdcontext,
>>> +                                                          request)
>>>              assert blocked == [], "Allocating newly 'created' locks
>>> failed"
>>> -            (jid, livelockfile) = lu.wconfdcontext
>>> -            lu.wconfdlocks = self.wconfd.Client().ListLocks(jid,
>>> livelockfile)
>>> +            lu.wconfdlocks = \
>>> +              self.wconfd.Client().ListLocks(self._wconfdcontext)
>>>            except errors.GenericError:
>>>              # TODO: verify what actually caused the error
>>>              logging.exception("Detected lock error in level %s for
>>> locks"
>>> @@ -539,21 +538,18 @@ class Processor(object):
>>>            result = self._LockAndExecLU(lu, level + 1, calc_timeout)
>>>          finally:
>>>            if level in lu.remove_locks:
>>> -            jid = int(self.GetECId())
>>> -            livelockfile = self.context.livelock.lockfile.name
>>>              levelname = locking.LEVEL_NAMES[level]
>>>              request = [["%s/%s" % (levelname, lock), "release"]
>>>                         for lock in lu.remove_locks[level]]
>>>              blocked = \
>>> -              self.wconfd.Client().TryUpdateLocks(jid, livelockfile,
>>> request)
>>> +              self.wconfd.Client().TryUpdateLocks(self._wconfdcontext,
>>> +                                                  request)
>>>              assert blocked == [], "Release may not fail"
>>>        finally:
>>> -        jid = int(self.GetECId())
>>> -        livelockfile = self.context.livelock.lockfile.name
>>>          levelname = locking.LEVEL_NAMES[level]
>>> -        logging.debug("Freeing locks at level %s for %d (%s)",
>>> -                      levelname, jid, livelockfile)
>>> -        self.wconfd.Client().FreeLocksLevel(jid, livelockfile,
>>> levelname)
>>> +        logging.debug("Freeing locks at level %s for %s",
>>> +                      levelname, self._wconfdcontext)
>>> +        self.wconfd.Client().FreeLocksLevel(self._wconfdcontext,
>>> levelname)
>>>      else:
>>>        result = self._LockAndExecLU(lu, level + 1, calc_timeout)
>>>
>>> @@ -614,11 +610,9 @@ class Processor(object):
>>>                                       " disabled" % op.OP_ID)
>>>
>>>        try:
>>> -        jid = int(self.GetECId())
>>> -        livelockfile = self.context.livelock.lockfile.name
>>> -        lu = lu_class(self, op, self.context, self.rpc, (jid,
>>> livelockfile),
>>> +        lu = lu_class(self, op, self.context, self.rpc,
>>> self._wconfdcontext,
>>>                        self.wconfd)
>>> -        lu.wconfdlocks = self.wconfd.Client().ListLocks(jid,
>>> livelockfile)
>>> +        lu.wconfdlocks =
>>> self.wconfd.Client().ListLocks(self._wconfdcontext)
>>>          lu.ExpandNames()
>>>          assert lu.needed_locks is not None, "needed_locks not set by LU"
>>>
>>> @@ -630,11 +624,9 @@ class Processor(object):
>>>              self.context.cfg.DropECReservations(self._ec_id)
>>>        finally:
>>>          # Release BGL if owned
>>> -        jid = int(self.GetECId())
>>> -        livelockfile = self.context.livelock.lockfile.name
>>>          bglname = "%s/%s" % (locking.LEVEL_NAMES[locking.LEVEL_CLUSTER],
>>>                               locking.BGL)
>>> -        self.wconfd.Client().TryUpdateLocks(jid, livelockfile,
>>> +        self.wconfd.Client().TryUpdateLocks(self._wconfdcontext,
>>>                                              [[bglname, "release"]])
>>>      finally:https://memegen.googleplex.com/5618255569879040
>>>
>>>        self._cbs = None
>>> diff --git a/src/Ganeti/Locking/Locks.hs b/src/Ganeti/Locking/Locks.hs
>>> index db81b19..9705d9a 100644
>>> --- a/src/Ganeti/Locking/Locks.hs
>>> +++ b/src/Ganeti/Locking/Locks.hs
>>> @@ -44,7 +44,7 @@ import qualified Text.JSON as J
>>>
>>>  import Ganeti.BasicTypes
>>>  import Ganeti.Errors (ResultG, GanetiException)
>>> -import Ganeti.JSON (readEitherString, fromJResultE)
>>> +import Ganeti.JSON (readEitherString, fromJResultE, MaybeForJSON(..))
>>>  import Ganeti.Locking.Allocation
>>>  import Ganeti.Locking.Types
>>>  import Ganeti.Logging.Lifted (MonadLog, logDebug, logEmergency)
>>> @@ -174,14 +174,16 @@ instance Lock GanetiLocks wherhttps://
>>> memegen.googleplex.com/5618255569879040https://memegen.googleplex.com/5618255569879040e
>>>
>>>  -- | A client is identified as a job id, thread id and path to its
>>> process
>>>  -- identifier file
>>>  data ClientId = ClientId
>>> -  { ciJobId :: JobId
>>> +  { ciJobId :: Maybe JobId
>>> +  , ciThreadId :: Integer
>>>    , ciLockFile :: FilePath
>>>    }
>>>    deriving (Ord, Eq, Show)
>>>
>>> Please add a comment (maybe even a FIXME) that mentions the problem with
>> the non-uniqueness of python threads.
>>
>>
>>>  instance J.JSON ClientId where
>>> -  showJSON (ClientId jid lf) = J.showJSON (jid, lf)
>>> -  readJSON = liftM (uncurry ClientId) . J.readJSON
>>> +  showJSON (ClientId jid tid lf) = J.showJSON (MaybeForJSON jid, tid,
>>> lf)
>>> +  readJSON = liftM (\(MaybeForJSON jid, tid, lf) -> ClientId jid tid lf)
>>> +             . J.readJSON
>>>
>>>  -- | The type of lock Allocations in Ganeti. In Ganeti, the owner of
>>>  -- locks are jobs.
>>> --
>>> 1.9.0.279.gdc9e3eb
>>>
>>>
>>
>> Rest LGTM
>> Helga
>> --
>> --
>> Helga Velroyen | Software Engineer | [email protected] |
>>
>> Google Germany GmbH
>> Dienerstr. 12
>> 80331 München
>>
>> Registergericht und -nummer: Hamburg, HRB 86891
>> Sitz der Gesellschaft: Hamburg
>> Geschäftsführer: Graham Law, Christine Elizabeth Flores
>>
>
>


-- 
-- 
Helga Velroyen | Software Engineer | [email protected] |

Google Germany GmbH
Dienerstr. 12
80331 München

Registergericht und -nummer: Hamburg, HRB 86891
Sitz der Gesellschaft: Hamburg
Geschäftsführer: Graham Law, Christine Elizabeth Flores

Reply via email to