Great, thank you! LGTM Helga
On Tue, Mar 18, 2014 at 12:48 PM, Petr Pudlák <[email protected]> wrote: > Good idea, I propose the interdiff: > > diff --git a/src/Ganeti/Locking/Locks.hs b/src/Ganeti/Locking/Locks.hs > index 5a6b404..5c41d75 100644 > --- a/src/Ganeti/Locking/Locks.hs > +++ b/src/Ganeti/Locking/Locks.hs > @@ -171,8 +171,19 @@ instance Lock GanetiLocks where > lockImplications (Network _) = [NetworkLockSet] > lockImplications _ = [] > > --- | A client is identified as a job id and path to its process > +-- | A client is identified as a job id, thread id and path to its process > -- identifier file. > +-- > +-- The JobId isn't enough to identify a client as the master daemon > +-- also handles RPC calls that aren't jobs, but which use the > configuration. > +-- Therefore it's needed to include the identification for threads. > +-- An alternative would be to use something like @Either JobId RpcCallId@ > . > +-- > +-- FIXME: Python threads are only unique wrt running threads, so it's > possible > +-- that a new thread will get a thread id that has been used before by > another > +-- finished thread. Since we rely on threads releasing their locks anyway, > +-- this isn't a big issue, but in the future it'd be better to have a > unique > +-- identifier for each operation. > data ClientId = ClientId > { ciJobId :: Maybe JobId > , ciThreadId :: Integer > > > > On Tue, Mar 18, 2014 at 11:16 AM, Helga Velroyen <[email protected]>wrote: > >> >> >> >> On Mon, Mar 17, 2014 at 1:46 PM, Petr Pudlak <[email protected]> wrote: >> >>> This allows to distinguish threads that don't have a job id, which is >>> needed for answering queries. >>> >>> Since Python thread IDs aren't guaranteed to be unique, in future it'd >>> be preferable to use a different, unique identifier. >>> >>> Note that this breaks 'gnt-debug locks'. >>> >>> Signed-off-by: Petr Pudlak <[email protected]> >>> --- >>> lib/cmdlib/base.py | 18 ++++++------ >>> lib/mcpu.py | 68 >>> ++++++++++++++++++++------------------------- >>> src/Ganeti/Locking/Locks.hs | 10 ++++--- >>> 3 files changed, 45 insertions(+), 51 deletions(-) >>> >>> diff --git a/lib/cmdlib/base.py b/lib/cmdlib/base.py >>> index 33f2313..3039791 100644 >>> --- a/lib/cmdlib/base.py >>> +++ b/lib/cmdlib/base.py >>> @@ -65,19 +65,19 @@ class LUWConfdClient(object): >>> self.lu = lu >>> >>> def TryUpdateLocks(self, req): >>> - jid, livelockfile = self.lu.wconfdcontext >>> - self.lu.wconfd.Client().TryUpdateLocks(jid, livelockfile, req) >>> - self.lu.wconfdlocks = self.lu.wconfd.Client().ListLocks(jid, >>> livelockfile) >>> + self.lu.wconfd.Client().TryUpdateLocks(self.lu.wconfdcontext, req) >>> + self.lu.wconfdlocks = \ >>> + self.lu.wconfd.Client().ListLocks(self.lu.wconfdcontext) >>> >>> def DownGradeLocksLevel(self, level): >>> - jid, livelockfile = self.lu.wconfdcontext >>> - self.lu.wconfd.Client().DownGradeLocksLevel(jid, livelockfile, >>> level) >>> - self.lu.wconfdlocks = self.lu.wconfd.Client().ListLocks(jid, >>> livelockfile) >>> + self.lu.wconfd.Client().DownGradeLocksLevel(self.lu.wconfdcontext, >>> level) >>> + self.lu.wconfdlocks = \ >>> + self.lu.wconfd.Client().ListLocks(self.lu.wconfdcontext) >>> >>> def FreeLocksLevel(self, level): >>> - jid, livelockfile = self.lu.wconfdcontext >>> - self.lu.wconfd.Client().FreeLocksLevel(jid, livelockfile, level) >>> - self.lu.wconfdlocks = self.lu.wconfd.Client().ListLocks(jid, >>> livelockfile) >>> + self.lu.wconfd.Client().FreeLocksLevel(self.lu.wconfdcontext, level) >>> + self.lu.wconfdlocks = \ >>> + self.lu.wconfd.Client().ListLocks(self.lu.wconfdcontext) >>> >>> >>> class LogicalUnit(object): >>> diff --git a/lib/mcpu.py b/lib/mcpu.py >>> index 0bd5cbf..d527d27 100644 >>> --- a/lib/mcpu.py >>> +++ b/lib/mcpu.py >>> @@ -33,6 +33,7 @@ import logging >>> import random >>> import time >>> import itertools >>> +import threading >>> import traceback >>> >>> from ganeti import opcodes >>> @@ -308,6 +309,9 @@ class Processor(object): >>> self.hmclass = hooksmaster.HooksMaster >>> self._enable_locks = enable_locks >>> self.wconfd = wconfd # Indirection to allow testing >>> + self._wconfdcontext = (ec_id, >>> + threading.current_thread().ident, >>> + self.context.livelock.lockfile.name) >>> >>> def _CheckLocksEnabled(self): >>> """Checks if locking is enabled. >>> @@ -362,14 +366,12 @@ class Processor(object): >>> names = [names] >>> >>> levelname = locking.LEVEL_NAMES[level] >>> - jid = int(self.GetECId()) >>> - livelockfile = self.context.livelock.lockfile.name >>> >>> locks = ["%s/%s" % (levelname, lock) for lock in list(names)] >>> >>> if not names: >>> - logging.debug("Acquiring no locks for %d (%s) at level %s", >>> - jid, livelockfile, levelname) >>> + logging.debug("Acquiring no locks for (%s) at level %s", >>> + self._wconfdcontext, levelname) >>> return [] >>> >>> if shared: >>> @@ -378,26 +380,26 @@ class Processor(object): >>> request = [[lock, "exclusive"] for lock in locks] >>> >>> if opportunistic: >>> - logging.debug("Opportunistically acquring some of %s for %d >>> (%s).", >>> - locks, jid, livelockfile) >>> - locks = self.wconfd.Client().OpportunisticLockUnion(jid, >>> livelockfile, >>> + logging.debug("Opportunistically acquring some of %s for %s.", >>> + locks, self._wconfdcontext) >>> + locks = >>> self.wconfd.Client().OpportunisticLockUnion(self._wconfdcontext, >>> request) >>> elif timeout is None: >>> while True: >>> ## TODO: use asynchronous wait instead of polling >>> - blockedon = self.wconfd.Client().TryUpdateLocks(jid, >>> livelockfile, >>> + blockedon = >>> self.wconfd.Client().TryUpdateLocks(self._wconfdcontext, >>> request) >>> - logging.debug(" >>> https://memegen.googleplex.com/5618255569879040Requesting %s for %d >>> (%s) blocked on %s", >>> >>> - request, jid, livelockfile, blockedon) >>> + logging.debug("Requesting %s for %s blocked on %s", >>> + request, self._wconfdcontext, blockedon) >>> if not blockedon: >>> break >>> time.sleep(random.random()) >>> else: >>> - logging.debug("Trying %ss to request %s for %d (%s)", >>> - timeout, request, jid, livelockfile) >>> + logging.debug("Trying %ss to request %s for %s", >>> + timeout, request, self._wconfdcontext) >>> ## TODO: use blocking wait instead of polling >>> blocked = utils.SimpleRetry([], >>> self.wconfd.Client().TryUpdateLocks, 0.1, >>> - timeout, args=[jid, livelockfile, >>> request]) >>> + timeout, args=[self._wconfdcontext, >>> request]) >>> if blocked: >>> raise LockAcquireTimeout() >>> >>> @@ -497,8 +499,7 @@ class Processor(object): >>> >>> self._AcquireLocks(level, needed_locks, share, opportunistic, >>> calc_timeout()) >>> - (jid, livelockfile) = lu.wconfdcontext >>> - lu.wconfdlocks = self.wconfd.Client().ListLocks(jid, >>> livelockfile) >>> + lu.wconfdlocks = >>> self.wconfd.Client().ListLocks(self._wconfdcontext) >>> else: >>> # Adding locks >>> add_locks = lu.add_locks[level] >>> @@ -507,8 +508,6 @@ class Processor(object): >>> lu.remove_locks[level] = add_locks >>> >>> try: >>> - jid = int(self.GetECId()) >>> - livelockfile = self.context.livelock.lockfile.name >>> levelname = locking.LEVEL_NAMES[level] >>> >>> if share: >>> @@ -519,13 +518,13 @@ class Processor(object): >>> request = [["%s/%s" % (levelname, lock), mode] >>> for lock in add_locks] >>> >>> - logging.debug("Requesting %s for %d (%s)", >>> - request, jid, livelockfile) >>> - blocked = \ >>> - self.wconfd.Client().TryUpdateLocks(jid, livelockfile, >>> request) >>> + logging.debug("Requesting %s for %s", >>> + request, self._wconfdcontext) >>> + blocked = >>> self.wconfd.Client().TryUpdateLocks(self._wconfdcontext, >>> + request) >>> assert blocked == [], "Allocating newly 'created' locks >>> failed" >>> - (jid, livelockfile) = lu.wconfdcontext >>> - lu.wconfdlocks = self.wconfd.Client().ListLocks(jid, >>> livelockfile) >>> + lu.wconfdlocks = \ >>> + self.wconfd.Client().ListLocks(self._wconfdcontext) >>> except errors.GenericError: >>> # TODO: verify what actually caused the error >>> logging.exception("Detected lock error in level %s for >>> locks" >>> @@ -539,21 +538,18 @@ class Processor(object): >>> result = self._LockAndExecLU(lu, level + 1, calc_timeout) >>> finally: >>> if level in lu.remove_locks: >>> - jid = int(self.GetECId()) >>> - livelockfile = self.context.livelock.lockfile.name >>> levelname = locking.LEVEL_NAMES[level] >>> request = [["%s/%s" % (levelname, lock), "release"] >>> for lock in lu.remove_locks[level]] >>> blocked = \ >>> - self.wconfd.Client().TryUpdateLocks(jid, livelockfile, >>> request) >>> + self.wconfd.Client().TryUpdateLocks(self._wconfdcontext, >>> + request) >>> assert blocked == [], "Release may not fail" >>> finally: >>> - jid = int(self.GetECId()) >>> - livelockfile = self.context.livelock.lockfile.name >>> levelname = locking.LEVEL_NAMES[level] >>> - logging.debug("Freeing locks at level %s for %d (%s)", >>> - levelname, jid, livelockfile) >>> - self.wconfd.Client().FreeLocksLevel(jid, livelockfile, >>> levelname) >>> + logging.debug("Freeing locks at level %s for %s", >>> + levelname, self._wconfdcontext) >>> + self.wconfd.Client().FreeLocksLevel(self._wconfdcontext, >>> levelname) >>> else: >>> result = self._LockAndExecLU(lu, level + 1, calc_timeout) >>> >>> @@ -614,11 +610,9 @@ class Processor(object): >>> " disabled" % op.OP_ID) >>> >>> try: >>> - jid = int(self.GetECId()) >>> - livelockfile = self.context.livelock.lockfile.name >>> - lu = lu_class(self, op, self.context, self.rpc, (jid, >>> livelockfile), >>> + lu = lu_class(self, op, self.context, self.rpc, >>> self._wconfdcontext, >>> self.wconfd) >>> - lu.wconfdlocks = self.wconfd.Client().ListLocks(jid, >>> livelockfile) >>> + lu.wconfdlocks = >>> self.wconfd.Client().ListLocks(self._wconfdcontext) >>> lu.ExpandNames() >>> assert lu.needed_locks is not None, "needed_locks not set by LU" >>> >>> @@ -630,11 +624,9 @@ class Processor(object): >>> self.context.cfg.DropECReservations(self._ec_id) >>> finally: >>> # Release BGL if owned >>> - jid = int(self.GetECId()) >>> - livelockfile = self.context.livelock.lockfile.name >>> bglname = "%s/%s" % (locking.LEVEL_NAMES[locking.LEVEL_CLUSTER], >>> locking.BGL) >>> - self.wconfd.Client().TryUpdateLocks(jid, livelockfile, >>> + self.wconfd.Client().TryUpdateLocks(self._wconfdcontext, >>> [[bglname, "release"]]) >>> finally:https://memegen.googleplex.com/5618255569879040 >>> >>> self._cbs = None >>> diff --git a/src/Ganeti/Locking/Locks.hs b/src/Ganeti/Locking/Locks.hs >>> index db81b19..9705d9a 100644 >>> --- a/src/Ganeti/Locking/Locks.hs >>> +++ b/src/Ganeti/Locking/Locks.hs >>> @@ -44,7 +44,7 @@ import qualified Text.JSON as J >>> >>> import Ganeti.BasicTypes >>> import Ganeti.Errors (ResultG, GanetiException) >>> -import Ganeti.JSON (readEitherString, fromJResultE) >>> +import Ganeti.JSON (readEitherString, fromJResultE, MaybeForJSON(..)) >>> import Ganeti.Locking.Allocation >>> import Ganeti.Locking.Types >>> import Ganeti.Logging.Lifted (MonadLog, logDebug, logEmergency) >>> @@ -174,14 +174,16 @@ instance Lock GanetiLocks wherhttps:// >>> memegen.googleplex.com/5618255569879040https://memegen.googleplex.com/5618255569879040e >>> >>> -- | A client is identified as a job id, thread id and path to its >>> process >>> -- identifier file >>> data ClientId = ClientId >>> - { ciJobId :: JobId >>> + { ciJobId :: Maybe JobId >>> + , ciThreadId :: Integer >>> , ciLockFile :: FilePath >>> } >>> deriving (Ord, Eq, Show) >>> >>> Please add a comment (maybe even a FIXME) that mentions the problem with >> the non-uniqueness of python threads. >> >> >>> instance J.JSON ClientId where >>> - showJSON (ClientId jid lf) = J.showJSON (jid, lf) >>> - readJSON = liftM (uncurry ClientId) . J.readJSON >>> + showJSON (ClientId jid tid lf) = J.showJSON (MaybeForJSON jid, tid, >>> lf) >>> + readJSON = liftM (\(MaybeForJSON jid, tid, lf) -> ClientId jid tid lf) >>> + . J.readJSON >>> >>> -- | The type of lock Allocations in Ganeti. In Ganeti, the owner of >>> -- locks are jobs. >>> -- >>> 1.9.0.279.gdc9e3eb >>> >>> >> >> Rest LGTM >> Helga >> -- >> -- >> Helga Velroyen | Software Engineer | [email protected] | >> >> Google Germany GmbH >> Dienerstr. 12 >> 80331 München >> >> Registergericht und -nummer: Hamburg, HRB 86891 >> Sitz der Gesellschaft: Hamburg >> Geschäftsführer: Graham Law, Christine Elizabeth Flores >> > > -- -- Helga Velroyen | Software Engineer | [email protected] | Google Germany GmbH Dienerstr. 12 80331 München Registergericht und -nummer: Hamburg, HRB 86891 Sitz der Gesellschaft: Hamburg Geschäftsführer: Graham Law, Christine Elizabeth Flores
