On Mon, Mar 17, 2014 at 1:46 PM, Petr Pudlak <[email protected]> wrote:

> This allows to distinguish threads that don't have a job id, which is
> needed for answering queries.
>
> Since Python thread IDs aren't guaranteed to be unique, in future it'd
> be preferable to use a different, unique identifier.
>
> Note that this breaks 'gnt-debug locks'.
>
> Signed-off-by: Petr Pudlak <[email protected]>
> ---
>  lib/cmdlib/base.py          | 18 ++++++------
>  lib/mcpu.py                 | 68
> ++++++++++++++++++++-------------------------
>  src/Ganeti/Locking/Locks.hs | 10 ++++---
>  3 files changed, 45 insertions(+), 51 deletions(-)
>
> diff --git a/lib/cmdlib/base.py b/lib/cmdlib/base.py
> index 33f2313..3039791 100644
> --- a/lib/cmdlib/base.py
> +++ b/lib/cmdlib/base.py
> @@ -65,19 +65,19 @@ class LUWConfdClient(object):
>      self.lu = lu
>
>    def TryUpdateLocks(self, req):
> -    jid, livelockfile = self.lu.wconfdcontext
> -    self.lu.wconfd.Client().TryUpdateLocks(jid, livelockfile, req)
> -    self.lu.wconfdlocks = self.lu.wconfd.Client().ListLocks(jid,
> livelockfile)
> +    self.lu.wconfd.Client().TryUpdateLocks(self.lu.wconfdcontext, req)
> +    self.lu.wconfdlocks = \
> +      self.lu.wconfd.Client().ListLocks(self.lu.wconfdcontext)
>
>    def DownGradeLocksLevel(self, level):
> -    jid, livelockfile = self.lu.wconfdcontext
> -    self.lu.wconfd.Client().DownGradeLocksLevel(jid, livelockfile, level)
> -    self.lu.wconfdlocks = self.lu.wconfd.Client().ListLocks(jid,
> livelockfile)
> +    self.lu.wconfd.Client().DownGradeLocksLevel(self.lu.wconfdcontext,
> level)
> +    self.lu.wconfdlocks = \
> +      self.lu.wconfd.Client().ListLocks(self.lu.wconfdcontext)
>
>    def FreeLocksLevel(self, level):
> -    jid, livelockfile = self.lu.wconfdcontext
> -    self.lu.wconfd.Client().FreeLocksLevel(jid, livelockfile, level)
> -    self.lu.wconfdlocks = self.lu.wconfd.Client().ListLocks(jid,
> livelockfile)
> +    self.lu.wconfd.Client().FreeLocksLevel(self.lu.wconfdcontext, level)
> +    self.lu.wconfdlocks = \
> +      self.lu.wconfd.Client().ListLocks(self.lu.wconfdcontext)
>
>
>  class LogicalUnit(object):
> diff --git a/lib/mcpu.py b/lib/mcpu.py
> index 0bd5cbf..d527d27 100644
> --- a/lib/mcpu.py
> +++ b/lib/mcpu.py
> @@ -33,6 +33,7 @@ import logging
>  import random
>  import time
>  import itertools
> +import threading
>  import traceback
>
>  from ganeti import opcodes
> @@ -308,6 +309,9 @@ class Processor(object):
>      self.hmclass = hooksmaster.HooksMaster
>      self._enable_locks = enable_locks
>      self.wconfd = wconfd # Indirection to allow testing
> +    self._wconfdcontext = (ec_id,
> +                           threading.current_thread().ident,
> +                           self.context.livelock.lockfile.name)
>
>    def _CheckLocksEnabled(self):
>      """Checks if locking is enabled.
> @@ -362,14 +366,12 @@ class Processor(object):
>        names = [names]
>
>      levelname = locking.LEVEL_NAMES[level]
> -    jid = int(self.GetECId())
> -    livelockfile = self.context.livelock.lockfile.name
>
>      locks = ["%s/%s" % (levelname, lock) for lock in list(names)]
>
>      if not names:
> -      logging.debug("Acquiring no locks for %d (%s) at level %s",
> -                    jid, livelockfile, levelname)
> +      logging.debug("Acquiring no locks for (%s) at level %s",
> +                    self._wconfdcontext, levelname)
>        return []
>
>      if shared:
> @@ -378,26 +380,26 @@ class Processor(object):
>        request = [[lock, "exclusive"] for lock in locks]
>
>      if opportunistic:
> -      logging.debug("Opportunistically acquring some of %s for %d (%s).",
> -                    locks, jid, livelockfile)
> -      locks = self.wconfd.Client().OpportunisticLockUnion(jid,
> livelockfile,
> +      logging.debug("Opportunistically acquring some of %s for %s.",
> +                    locks, self._wconfdcontext)
> +      locks =
> self.wconfd.Client().OpportunisticLockUnion(self._wconfdcontext,
>                                                            request)
>      elif timeout is None:
>        while True:
>          ## TODO: use asynchronous wait instead of polling
> -        blockedon = self.wconfd.Client().TryUpdateLocks(jid, livelockfile,
> +        blockedon =
> self.wconfd.Client().TryUpdateLocks(self._wconfdcontext,
>                                                          request)
> -        logging.debug("
> https://memegen.googleplex.com/5618255569879040Requesting %s for %d (%s)
> blocked on %s",
> -                      request, jid, livelockfile, blockedon)
> +        logging.debug("Requesting %s for %s blocked on %s",
> +                      request, self._wconfdcontext, blockedon)
>          if not blockedon:
>            break
>          time.sleep(random.random())
>      else:
> -      logging.debug("Trying %ss to request %s for %d (%s)",
> -                    timeout, request, jid, livelockfile)
> +      logging.debug("Trying %ss to request %s for %s",
> +                    timeout, request, self._wconfdcontext)
>        ## TODO: use blocking wait instead of polling
>        blocked = utils.SimpleRetry([],
> self.wconfd.Client().TryUpdateLocks, 0.1,
> -                                  timeout, args=[jid, livelockfile,
> request])
> +                                  timeout, args=[self._wconfdcontext,
> request])
>        if blocked:
>          raise LockAcquireTimeout()
>
> @@ -497,8 +499,7 @@ class Processor(object):
>
>            self._AcquireLocks(level, needed_locks, share, opportunistic,
>                               calc_timeout())
> -          (jid, livelockfile) = lu.wconfdcontext
> -          lu.wconfdlocks = self.wconfd.Client().ListLocks(jid,
> livelockfile)
> +          lu.wconfdlocks =
> self.wconfd.Client().ListLocks(self._wconfdcontext)
>          else:
>            # Adding locks
>            add_locks = lu.add_locks[level]
> @@ -507,8 +508,6 @@ class Processor(object):
>            lu.remove_locks[level] = add_locks
>
>            try:
> -            jid = int(self.GetECId())
> -            livelockfile = self.context.livelock.lockfile.name
>              levelname = locking.LEVEL_NAMES[level]
>
>              if share:
> @@ -519,13 +518,13 @@ class Processor(object):
>              request = [["%s/%s" % (levelname, lock), mode]
>                         for lock in add_locks]
>
> -            logging.debug("Requesting %s for %d (%s)",
> -                          request, jid, livelockfile)
> -            blocked = \
> -              self.wconfd.Client().TryUpdateLocks(jid, livelockfile,
> request)
> +            logging.debug("Requesting %s for %s",
> +                          request, self._wconfdcontext)
> +            blocked =
> self.wconfd.Client().TryUpdateLocks(self._wconfdcontext,
> +                                                          request)
>              assert blocked == [], "Allocating newly 'created' locks
> failed"
> -            (jid, livelockfile) = lu.wconfdcontext
> -            lu.wconfdlocks = self.wconfd.Client().ListLocks(jid,
> livelockfile)
> +            lu.wconfdlocks = \
> +              self.wconfd.Client().ListLocks(self._wconfdcontext)
>            except errors.GenericError:
>              # TODO: verify what actually caused the error
>              logging.exception("Detected lock error in level %s for locks"
> @@ -539,21 +538,18 @@ class Processor(object):
>            result = self._LockAndExecLU(lu, level + 1, calc_timeout)
>          finally:
>            if level in lu.remove_locks:
> -            jid = int(self.GetECId())
> -            livelockfile = self.context.livelock.lockfile.name
>              levelname = locking.LEVEL_NAMES[level]
>              request = [["%s/%s" % (levelname, lock), "release"]
>                         for lock in lu.remove_locks[level]]
>              blocked = \
> -              self.wconfd.Client().TryUpdateLocks(jid, livelockfile,
> request)
> +              self.wconfd.Client().TryUpdateLocks(self._wconfdcontext,
> +                                                  request)
>              assert blocked == [], "Release may not fail"
>        finally:
> -        jid = int(self.GetECId())
> -        livelockfile = self.context.livelock.lockfile.name
>          levelname = locking.LEVEL_NAMES[level]
> -        logging.debug("Freeing locks at level %s for %d (%s)",
> -                      levelname, jid, livelockfile)
> -        self.wconfd.Client().FreeLocksLevel(jid, livelockfile, levelname)
> +        logging.debug("Freeing locks at level %s for %s",
> +                      levelname, self._wconfdcontext)
> +        self.wconfd.Client().FreeLocksLevel(self._wconfdcontext,
> levelname)
>      else:
>        result = self._LockAndExecLU(lu, level + 1, calc_timeout)
>
> @@ -614,11 +610,9 @@ class Processor(object):
>                                       " disabled" % op.OP_ID)
>
>        try:
> -        jid = int(self.GetECId())
> -        livelockfile = self.context.livelock.lockfile.name
> -        lu = lu_class(self, op, self.context, self.rpc, (jid,
> livelockfile),
> +        lu = lu_class(self, op, self.context, self.rpc,
> self._wconfdcontext,
>                        self.wconfd)
> -        lu.wconfdlocks = self.wconfd.Client().ListLocks(jid, livelockfile)
> +        lu.wconfdlocks =
> self.wconfd.Client().ListLocks(self._wconfdcontext)
>          lu.ExpandNames()
>          assert lu.needed_locks is not None, "needed_locks not set by LU"
>
> @@ -630,11 +624,9 @@ class Processor(object):
>              self.context.cfg.DropECReservations(self._ec_id)
>        finally:
>          # Release BGL if owned
> -        jid = int(self.GetECId())
> -        livelockfile = self.context.livelock.lockfile.name
>          bglname = "%s/%s" % (locking.LEVEL_NAMES[locking.LEVEL_CLUSTER],
>                               locking.BGL)
> -        self.wconfd.Client().TryUpdateLocks(jid, livelockfile,
> +        self.wconfd.Client().TryUpdateLocks(self._wconfdcontext,
>                                              [[bglname, "release"]])
>      finally:https://memegen.googleplex.com/5618255569879040
>        self._cbs = None
> diff --git a/src/Ganeti/Locking/Locks.hs b/src/Ganeti/Locking/Locks.hs
> index db81b19..9705d9a 100644
> --- a/src/Ganeti/Locking/Locks.hs
> +++ b/src/Ganeti/Locking/Locks.hs
> @@ -44,7 +44,7 @@ import qualified Text.JSON as J
>
>  import Ganeti.BasicTypes
>  import Ganeti.Errors (ResultG, GanetiException)
> -import Ganeti.JSON (readEitherString, fromJResultE)
> +import Ganeti.JSON (readEitherString, fromJResultE, MaybeForJSON(..))
>  import Ganeti.Locking.Allocation
>  import Ganeti.Locking.Types
>  import Ganeti.Logging.Lifted (MonadLog, logDebug, logEmergency)
> @@ -174,14 +174,16 @@ instance Lock GanetiLocks wherhttps://
> memegen.googleplex.com/5618255569879040https://memegen.googleplex.com/5618255569879040e
>  -- | A client is identified as a job id, thread id and path to its process
>  -- identifier file
>  data ClientId = ClientId
> -  { ciJobId :: JobId
> +  { ciJobId :: Maybe JobId
> +  , ciThreadId :: Integer
>    , ciLockFile :: FilePath
>    }
>    deriving (Ord, Eq, Show)
>
> Please add a comment (maybe even a FIXME) that mentions the problem with
the non-uniqueness of python threads.


>  instance J.JSON ClientId where
> -  showJSON (ClientId jid lf) = J.showJSON (jid, lf)
> -  readJSON = liftM (uncurry ClientId) . J.readJSON
> +  showJSON (ClientId jid tid lf) = J.showJSON (MaybeForJSON jid, tid, lf)
> +  readJSON = liftM (\(MaybeForJSON jid, tid, lf) -> ClientId jid tid lf)
> +             . J.readJSON
>
>  -- | The type of lock Allocations in Ganeti. In Ganeti, the owner of
>  -- locks are jobs.
> --
> 1.9.0.279.gdc9e3eb
>
>

Rest LGTM
Helga
-- 
-- 
Helga Velroyen | Software Engineer | [email protected] |

Google Germany GmbH
Dienerstr. 12
80331 München

Registergericht und -nummer: Hamburg, HRB 86891
Sitz der Gesellschaft: Hamburg
Geschäftsführer: Graham Law, Christine Elizabeth Flores

Reply via email to