On Mon, Mar 17, 2014 at 1:46 PM, Petr Pudlak <[email protected]> wrote:
> This allows to distinguish threads that don't have a job id, which is > needed for answering queries. > > Since Python thread IDs aren't guaranteed to be unique, in future it'd > be preferable to use a different, unique identifier. > > Note that this breaks 'gnt-debug locks'. > > Signed-off-by: Petr Pudlak <[email protected]> > --- > lib/cmdlib/base.py | 18 ++++++------ > lib/mcpu.py | 68 > ++++++++++++++++++++------------------------- > src/Ganeti/Locking/Locks.hs | 10 ++++--- > 3 files changed, 45 insertions(+), 51 deletions(-) > > diff --git a/lib/cmdlib/base.py b/lib/cmdlib/base.py > index 33f2313..3039791 100644 > --- a/lib/cmdlib/base.py > +++ b/lib/cmdlib/base.py > @@ -65,19 +65,19 @@ class LUWConfdClient(object): > self.lu = lu > > def TryUpdateLocks(self, req): > - jid, livelockfile = self.lu.wconfdcontext > - self.lu.wconfd.Client().TryUpdateLocks(jid, livelockfile, req) > - self.lu.wconfdlocks = self.lu.wconfd.Client().ListLocks(jid, > livelockfile) > + self.lu.wconfd.Client().TryUpdateLocks(self.lu.wconfdcontext, req) > + self.lu.wconfdlocks = \ > + self.lu.wconfd.Client().ListLocks(self.lu.wconfdcontext) > > def DownGradeLocksLevel(self, level): > - jid, livelockfile = self.lu.wconfdcontext > - self.lu.wconfd.Client().DownGradeLocksLevel(jid, livelockfile, level) > - self.lu.wconfdlocks = self.lu.wconfd.Client().ListLocks(jid, > livelockfile) > + self.lu.wconfd.Client().DownGradeLocksLevel(self.lu.wconfdcontext, > level) > + self.lu.wconfdlocks = \ > + self.lu.wconfd.Client().ListLocks(self.lu.wconfdcontext) > > def FreeLocksLevel(self, level): > - jid, livelockfile = self.lu.wconfdcontext > - self.lu.wconfd.Client().FreeLocksLevel(jid, livelockfile, level) > - self.lu.wconfdlocks = self.lu.wconfd.Client().ListLocks(jid, > livelockfile) > + self.lu.wconfd.Client().FreeLocksLevel(self.lu.wconfdcontext, level) > + self.lu.wconfdlocks = \ > + self.lu.wconfd.Client().ListLocks(self.lu.wconfdcontext) > > > class LogicalUnit(object): > diff --git a/lib/mcpu.py b/lib/mcpu.py > index 0bd5cbf..d527d27 100644 > --- a/lib/mcpu.py > +++ b/lib/mcpu.py > @@ -33,6 +33,7 @@ import logging > import random > import time > import itertools > +import threading > import traceback > > from ganeti import opcodes > @@ -308,6 +309,9 @@ class Processor(object): > self.hmclass = hooksmaster.HooksMaster > self._enable_locks = enable_locks > self.wconfd = wconfd # Indirection to allow testing > + self._wconfdcontext = (ec_id, > + threading.current_thread().ident, > + self.context.livelock.lockfile.name) > > def _CheckLocksEnabled(self): > """Checks if locking is enabled. > @@ -362,14 +366,12 @@ class Processor(object): > names = [names] > > levelname = locking.LEVEL_NAMES[level] > - jid = int(self.GetECId()) > - livelockfile = self.context.livelock.lockfile.name > > locks = ["%s/%s" % (levelname, lock) for lock in list(names)] > > if not names: > - logging.debug("Acquiring no locks for %d (%s) at level %s", > - jid, livelockfile, levelname) > + logging.debug("Acquiring no locks for (%s) at level %s", > + self._wconfdcontext, levelname) > return [] > > if shared: > @@ -378,26 +380,26 @@ class Processor(object): > request = [[lock, "exclusive"] for lock in locks] > > if opportunistic: > - logging.debug("Opportunistically acquring some of %s for %d (%s).", > - locks, jid, livelockfile) > - locks = self.wconfd.Client().OpportunisticLockUnion(jid, > livelockfile, > + logging.debug("Opportunistically acquring some of %s for %s.", > + locks, self._wconfdcontext) > + locks = > self.wconfd.Client().OpportunisticLockUnion(self._wconfdcontext, > request) > elif timeout is None: > while True: > ## TODO: use asynchronous wait instead of polling > - blockedon = self.wconfd.Client().TryUpdateLocks(jid, livelockfile, > + blockedon = > self.wconfd.Client().TryUpdateLocks(self._wconfdcontext, > request) > - logging.debug(" > https://memegen.googleplex.com/5618255569879040Requesting %s for %d (%s) > blocked on %s", > - request, jid, livelockfile, blockedon) > + logging.debug("Requesting %s for %s blocked on %s", > + request, self._wconfdcontext, blockedon) > if not blockedon: > break > time.sleep(random.random()) > else: > - logging.debug("Trying %ss to request %s for %d (%s)", > - timeout, request, jid, livelockfile) > + logging.debug("Trying %ss to request %s for %s", > + timeout, request, self._wconfdcontext) > ## TODO: use blocking wait instead of polling > blocked = utils.SimpleRetry([], > self.wconfd.Client().TryUpdateLocks, 0.1, > - timeout, args=[jid, livelockfile, > request]) > + timeout, args=[self._wconfdcontext, > request]) > if blocked: > raise LockAcquireTimeout() > > @@ -497,8 +499,7 @@ class Processor(object): > > self._AcquireLocks(level, needed_locks, share, opportunistic, > calc_timeout()) > - (jid, livelockfile) = lu.wconfdcontext > - lu.wconfdlocks = self.wconfd.Client().ListLocks(jid, > livelockfile) > + lu.wconfdlocks = > self.wconfd.Client().ListLocks(self._wconfdcontext) > else: > # Adding locks > add_locks = lu.add_locks[level] > @@ -507,8 +508,6 @@ class Processor(object): > lu.remove_locks[level] = add_locks > > try: > - jid = int(self.GetECId()) > - livelockfile = self.context.livelock.lockfile.name > levelname = locking.LEVEL_NAMES[level] > > if share: > @@ -519,13 +518,13 @@ class Processor(object): > request = [["%s/%s" % (levelname, lock), mode] > for lock in add_locks] > > - logging.debug("Requesting %s for %d (%s)", > - request, jid, livelockfile) > - blocked = \ > - self.wconfd.Client().TryUpdateLocks(jid, livelockfile, > request) > + logging.debug("Requesting %s for %s", > + request, self._wconfdcontext) > + blocked = > self.wconfd.Client().TryUpdateLocks(self._wconfdcontext, > + request) > assert blocked == [], "Allocating newly 'created' locks > failed" > - (jid, livelockfile) = lu.wconfdcontext > - lu.wconfdlocks = self.wconfd.Client().ListLocks(jid, > livelockfile) > + lu.wconfdlocks = \ > + self.wconfd.Client().ListLocks(self._wconfdcontext) > except errors.GenericError: > # TODO: verify what actually caused the error > logging.exception("Detected lock error in level %s for locks" > @@ -539,21 +538,18 @@ class Processor(object): > result = self._LockAndExecLU(lu, level + 1, calc_timeout) > finally: > if level in lu.remove_locks: > - jid = int(self.GetECId()) > - livelockfile = self.context.livelock.lockfile.name > levelname = locking.LEVEL_NAMES[level] > request = [["%s/%s" % (levelname, lock), "release"] > for lock in lu.remove_locks[level]] > blocked = \ > - self.wconfd.Client().TryUpdateLocks(jid, livelockfile, > request) > + self.wconfd.Client().TryUpdateLocks(self._wconfdcontext, > + request) > assert blocked == [], "Release may not fail" > finally: > - jid = int(self.GetECId()) > - livelockfile = self.context.livelock.lockfile.name > levelname = locking.LEVEL_NAMES[level] > - logging.debug("Freeing locks at level %s for %d (%s)", > - levelname, jid, livelockfile) > - self.wconfd.Client().FreeLocksLevel(jid, livelockfile, levelname) > + logging.debug("Freeing locks at level %s for %s", > + levelname, self._wconfdcontext) > + self.wconfd.Client().FreeLocksLevel(self._wconfdcontext, > levelname) > else: > result = self._LockAndExecLU(lu, level + 1, calc_timeout) > > @@ -614,11 +610,9 @@ class Processor(object): > " disabled" % op.OP_ID) > > try: > - jid = int(self.GetECId()) > - livelockfile = self.context.livelock.lockfile.name > - lu = lu_class(self, op, self.context, self.rpc, (jid, > livelockfile), > + lu = lu_class(self, op, self.context, self.rpc, > self._wconfdcontext, > self.wconfd) > - lu.wconfdlocks = self.wconfd.Client().ListLocks(jid, livelockfile) > + lu.wconfdlocks = > self.wconfd.Client().ListLocks(self._wconfdcontext) > lu.ExpandNames() > assert lu.needed_locks is not None, "needed_locks not set by LU" > > @@ -630,11 +624,9 @@ class Processor(object): > self.context.cfg.DropECReservations(self._ec_id) > finally: > # Release BGL if owned > - jid = int(self.GetECId()) > - livelockfile = self.context.livelock.lockfile.name > bglname = "%s/%s" % (locking.LEVEL_NAMES[locking.LEVEL_CLUSTER], > locking.BGL) > - self.wconfd.Client().TryUpdateLocks(jid, livelockfile, > + self.wconfd.Client().TryUpdateLocks(self._wconfdcontext, > [[bglname, "release"]]) > finally:https://memegen.googleplex.com/5618255569879040 > self._cbs = None > diff --git a/src/Ganeti/Locking/Locks.hs b/src/Ganeti/Locking/Locks.hs > index db81b19..9705d9a 100644 > --- a/src/Ganeti/Locking/Locks.hs > +++ b/src/Ganeti/Locking/Locks.hs > @@ -44,7 +44,7 @@ import qualified Text.JSON as J > > import Ganeti.BasicTypes > import Ganeti.Errors (ResultG, GanetiException) > -import Ganeti.JSON (readEitherString, fromJResultE) > +import Ganeti.JSON (readEitherString, fromJResultE, MaybeForJSON(..)) > import Ganeti.Locking.Allocation > import Ganeti.Locking.Types > import Ganeti.Logging.Lifted (MonadLog, logDebug, logEmergency) > @@ -174,14 +174,16 @@ instance Lock GanetiLocks wherhttps:// > memegen.googleplex.com/5618255569879040https://memegen.googleplex.com/5618255569879040e > -- | A client is identified as a job id, thread id and path to its process > -- identifier file > data ClientId = ClientId > - { ciJobId :: JobId > + { ciJobId :: Maybe JobId > + , ciThreadId :: Integer > , ciLockFile :: FilePath > } > deriving (Ord, Eq, Show) > > Please add a comment (maybe even a FIXME) that mentions the problem with the non-uniqueness of python threads. > instance J.JSON ClientId where > - showJSON (ClientId jid lf) = J.showJSON (jid, lf) > - readJSON = liftM (uncurry ClientId) . J.readJSON > + showJSON (ClientId jid tid lf) = J.showJSON (MaybeForJSON jid, tid, lf) > + readJSON = liftM (\(MaybeForJSON jid, tid, lf) -> ClientId jid tid lf) > + . J.readJSON > > -- | The type of lock Allocations in Ganeti. In Ganeti, the owner of > -- locks are jobs. > -- > 1.9.0.279.gdc9e3eb > > Rest LGTM Helga -- -- Helga Velroyen | Software Engineer | [email protected] | Google Germany GmbH Dienerstr. 12 80331 München Registergericht und -nummer: Hamburg, HRB 86891 Sitz der Gesellschaft: Hamburg Geschäftsführer: Graham Law, Christine Elizabeth Flores
