OOZIE-2843 Enhance logging inside ZKLocksService and MemoryLocksService (andras.piros via pbacsko)
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/ee359d7b Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/ee359d7b Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/ee359d7b Branch: refs/heads/oya Commit: ee359d7be1deb457b59ee12e26fac206d0198182 Parents: fbe5e49 Author: Peter Bacsko <pbac...@cloudera.com> Authored: Wed Apr 19 15:17:14 2017 +0200 Committer: Peter Bacsko <pbac...@cloudera.com> Committed: Wed Apr 19 15:17:14 2017 +0200 ---------------------------------------------------------------------- .../oozie/service/MemoryLocksService.java | 9 +++- .../apache/oozie/service/ZKLocksService.java | 53 +++++++++++++------- .../main/java/org/apache/oozie/util/XLog.java | 12 +++++ release-log.txt | 1 + 4 files changed, 55 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/ee359d7b/core/src/main/java/org/apache/oozie/service/MemoryLocksService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/MemoryLocksService.java b/core/src/main/java/org/apache/oozie/service/MemoryLocksService.java index 2ab2abc..4fa4d3e 100644 --- a/core/src/main/java/org/apache/oozie/service/MemoryLocksService.java +++ b/core/src/main/java/org/apache/oozie/service/MemoryLocksService.java @@ -24,12 +24,15 @@ import org.apache.oozie.lock.LockToken; import org.apache.oozie.lock.MemoryLocks; import com.google.common.annotations.VisibleForTesting; +import org.apache.oozie.util.XLog; /** * Service that provides in-memory locks. Assumes no other Oozie servers are using the database. */ public class MemoryLocksService implements Service, Instrumentable { + private static final XLog LOG = XLog.getLog(MemoryLocksService.class); + public static enum Type { READ, WRITE } @@ -87,7 +90,8 @@ public class MemoryLocksService implements Service, Instrumentable { * @return the lock token for the resource, or <code>null</code> if the lock could not be obtained. * @throws InterruptedException thrown if the thread was interrupted while waiting. */ - public LockToken getReadLock(String resource, long wait) throws InterruptedException { + public LockToken getReadLock(final String resource, final long wait) throws InterruptedException { + LOG.trace("Acquiring in-memory read lock. [resource={0};wait={1}]", resource, wait); return locks.getLock(resource, Type.READ, wait); } @@ -99,7 +103,8 @@ public class MemoryLocksService implements Service, Instrumentable { * @return the lock token for the resource, or <code>null</code> if the lock could not be obtained. * @throws InterruptedException thrown if the thread was interrupted while waiting. */ - public LockToken getWriteLock(String resource, long wait) throws InterruptedException { + public LockToken getWriteLock(final String resource, final long wait) throws InterruptedException { + LOG.trace("Acquiring in-memory write lock. [resource={0};wait={1}]", resource, wait); return locks.getLock(resource, Type.WRITE, wait); } http://git-wip-us.apache.org/repos/asf/oozie/blob/ee359d7b/core/src/main/java/org/apache/oozie/service/ZKLocksService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/ZKLocksService.java b/core/src/main/java/org/apache/oozie/service/ZKLocksService.java index 83790cf..2c71c00 100644 --- a/core/src/main/java/org/apache/oozie/service/ZKLocksService.java +++ b/core/src/main/java/org/apache/oozie/service/ZKLocksService.java @@ -25,7 +25,6 @@ import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock; import org.apache.oozie.ErrorCode; import org.apache.oozie.util.Instrumentable; import org.apache.oozie.util.Instrumentation; -import org.apache.oozie.event.listener.ZKConnectionListener; import org.apache.oozie.lock.LockToken; import org.apache.oozie.util.XLog; import org.apache.oozie.util.ZKUtils; @@ -35,7 +34,6 @@ import java.util.concurrent.ScheduledExecutorService; import org.apache.curator.framework.recipes.locks.ChildReaper; import org.apache.curator.framework.recipes.locks.Reaper; -import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.utils.ThreadUtils; import com.google.common.annotations.VisibleForTesting; @@ -49,16 +47,15 @@ import com.google.common.collect.MapMaker; public class ZKLocksService extends MemoryLocksService implements Service, Instrumentable { private ZKUtils zk; - private static XLog LOG = XLog.getLog(ZKLocksService.class); public static final String LOCKS_NODE = "/locks"; - private ConcurrentMap<String, InterProcessReadWriteLock> zkLocks = new MapMaker().weakValues().makeMap(); - + private static final XLog LOG = XLog.getLog(ZKLocksService.class); + private final ConcurrentMap<String, InterProcessReadWriteLock> zkLocks = new MapMaker().weakValues().makeMap(); + private ChildReaper reaper = null; private static final String REAPING_LEADER_PATH = ZKUtils.ZK_BASE_SERVICES_PATH + "/locksChildReaperLeaderPath"; - public static final String REAPING_THRESHOLD = CONF_PREFIX + "ZKLocksService.locks.reaper.threshold"; - public static final String REAPING_THREADS = CONF_PREFIX + "ZKLocksService.locks.reaper.threads"; - private ChildReaper reaper = null; + static final String REAPING_THRESHOLD = CONF_PREFIX + "ZKLocksService.locks.reaper.threshold"; + static final String REAPING_THREADS = CONF_PREFIX + "ZKLocksService.locks.reaper.threads"; /** * Initialize the zookeeper locks service @@ -141,30 +138,50 @@ public class ZKLocksService extends MemoryLocksService implements Service, Instr return acquireLock(resource, Type.WRITE, wait); } - private LockToken acquireLock(final String resource, Type type, long wait) throws InterruptedException { - InterProcessReadWriteLock lockEntry = zkLocks.get(resource); - if (lockEntry == null) { - InterProcessReadWriteLock newLock = new InterProcessReadWriteLock(zk.getClient(), LOCKS_NODE + "/" + resource); - lockEntry = zkLocks.putIfAbsent(resource, newLock); - if (lockEntry == null) { - lockEntry = newLock; - } + private LockToken acquireLock(final String resource, final Type type, final long wait) throws InterruptedException { + LOG.debug("Acquiring ZooKeeper lock. [resource={};type={};wait={}]", resource, type, wait); + + InterProcessReadWriteLock lockEntry; + final String zkPath = LOCKS_NODE + "/" + resource; + LOG.debug("Checking existing Curator lock or creating new one. [zkPath={}]", zkPath); + + // Creating a Curator InterProcessReadWriteLock is lightweight - only calling acquire() costs real ZooKeeper calls + final InterProcessReadWriteLock newLockEntry = new InterProcessReadWriteLock(zk.getClient(), zkPath); + final InterProcessReadWriteLock existingLockEntry = zkLocks.putIfAbsent(resource, newLockEntry); + if (existingLockEntry == null) { + lockEntry = newLockEntry; + LOG.debug("No existing Curator lock present, new one created successfully. [zkPath={}]", zkPath); } - InterProcessMutex lock = (type.equals(Type.READ)) ? lockEntry.readLock() : lockEntry.writeLock(); + else { + // We can't destoy newLockEntry and we don't have to - it's taken care of by Curator and JVM GC + lockEntry = existingLockEntry; + LOG.debug("Reusing existing Curator lock. [zkPath={}]", zkPath); + } + ZKLockToken token = null; try { + LOG.debug("Calling Curator to acquire ZooKeeper lock. [resource={};type={};wait={}]", resource, type, wait); + final InterProcessMutex lock = (type.equals(Type.READ)) ? lockEntry.readLock() : lockEntry.writeLock(); if (wait == -1) { lock.acquire(); token = new ZKLockToken(lockEntry, type); + LOG.debug("ZooKeeper lock acquired successfully. [resource={};type={}]", resource, type); } else if (lock.acquire(wait, TimeUnit.MILLISECONDS)) { token = new ZKLockToken(lockEntry, type); + LOG.debug("ZooKeeper lock acquired successfully waiting. [resource={};type={};wait={}]", resource, type, wait); + } + else { + LOG.warn("Could not acquire ZooKeeper lock, timed out. [resource={};type={};wait={}]", resource, type, wait); } } - catch (Exception ex) { + catch (final Exception ex) { //Not throwing exception. Should return null, so that command can be requeued + LOG.warn("Could not acquire lock due to a ZooKeeper error. " + + "[ex={};resource={};type={};wait={}]", ex, resource, type, wait); LOG.error("Error while acquiring lock", ex); } + return token; } http://git-wip-us.apache.org/repos/asf/oozie/blob/ee359d7b/core/src/main/java/org/apache/oozie/util/XLog.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/util/XLog.java b/core/src/main/java/org/apache/oozie/util/XLog.java index cc9abd8..9ecac4d 100644 --- a/core/src/main/java/org/apache/oozie/util/XLog.java +++ b/core/src/main/java/org/apache/oozie/util/XLog.java @@ -672,12 +672,24 @@ public class XLog implements Log { public static String format(String msgTemplate, Object... params) { ParamChecker.notEmpty(msgTemplate, "msgTemplate"); msgTemplate = msgTemplate.replace("{E}", System.getProperty("line.separator")); + msgTemplate = replaceEmptyPositions(msgTemplate); if (params != null && params.length > 0) { msgTemplate = MessageFormat.format(msgTemplate, params); } return msgTemplate; } + private static String replaceEmptyPositions(String msgTemplate) { + int pos = 0; + + while (msgTemplate.contains("{}")) { + msgTemplate = msgTemplate.replace("{}", String.format("{%d}", pos)); + pos++; + } + + return msgTemplate; + } + /** * Utility method that extracts the <code>Throwable</code>, if present, from the parameters. * http://git-wip-us.apache.org/repos/asf/oozie/blob/ee359d7b/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 11ae948..23aa9ae 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.4.0 release (trunk - unreleased) +OOZIE-2843 Enhance logging inside ZKLocksService and MemoryLocksService (andras.piros via pbacsko) OOZIE-2818 Can't overwrite oozie.action.max.output.data on a per-workflow basis (asasvari via pbacsko) OOZIE-2827 More directly view of the coordinatorâs history from perspective of workflow action. (Alonzo Zhou via pbacsko) OOZIE-2864 Maven artifacts for package com.codahale.metrics have inconsistent groupId (andras.piros via pbacsko)