OOZIE-2501 ZK reentrant lock doesn't work for few cases
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/d330d406 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/d330d406 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/d330d406 Branch: refs/heads/oya Commit: d330d40665a3b42744db20dfc5d9a80ad5f9b439 Parents: e8a9b24 Author: Purshotam Shah <purus...@yahoo-inc.com> Authored: Tue Sep 27 12:21:26 2016 -0700 Committer: Purshotam Shah <purus...@yahoo-inc.com> Committed: Tue Sep 27 12:21:26 2016 -0700 ---------------------------------------------------------------------- .../oozie/command/wf/ActionStartXCommand.java | 10 ++ .../java/org/apache/oozie/lock/MemoryLocks.java | 82 ++++++------- .../oozie/service/MemoryLocksService.java | 9 +- .../apache/oozie/service/ZKLocksService.java | 85 +++++--------- .../org/apache/oozie/lock/TestMemoryLocks.java | 60 ++++++++-- .../oozie/service/TestZKLocksService.java | 115 ++++++++++++++----- release-log.txt | 1 + 7 files changed, 218 insertions(+), 144 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/d330d406/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java index 41f4430..edfac48 100644 --- a/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java @@ -21,6 +21,7 @@ package org.apache.oozie.command.wf; import java.util.ArrayList; import java.util.Date; import java.util.List; + import javax.servlet.jsp.el.ELException; import org.apache.hadoop.conf.Configuration; @@ -41,6 +42,7 @@ import org.apache.oozie.client.SLAEvent.Status; import org.apache.oozie.client.rest.JsonBean; import org.apache.oozie.command.CommandException; import org.apache.oozie.command.PreconditionException; +import org.apache.oozie.command.XCommand; import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; import org.apache.oozie.executor.jpa.BatchQueryExecutor; import org.apache.oozie.executor.jpa.JPAExecutorException; @@ -399,4 +401,12 @@ public class ActionStartXCommand extends ActionXCommand<org.apache.oozie.command queue(new ActionStartXCommand(wfAction.getId(), wfAction.getType()), retryDelayMillis); } + protected void queue(XCommand<?> command, long msDelay) { + // ActionStartXCommand is synchronously called from SignalXCommand passing wfJob so that it doesn't have to + //reload wfJob again. We need set wfJob to null, so that it get reloaded when the requeued command executes. + if (command instanceof ActionStartXCommand) { + ((ActionStartXCommand)command).wfJob = null; + } + super.queue(command, msDelay); + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/d330d406/core/src/main/java/org/apache/oozie/lock/MemoryLocks.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/lock/MemoryLocks.java b/core/src/main/java/org/apache/oozie/lock/MemoryLocks.java index 7d65ac0..1ef1e41 100644 --- a/core/src/main/java/org/apache/oozie/lock/MemoryLocks.java +++ b/core/src/main/java/org/apache/oozie/lock/MemoryLocks.java @@ -18,33 +18,32 @@ package org.apache.oozie.lock; -import java.util.HashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.Lock; +import org.apache.oozie.service.MemoryLocksService.Type; + +import com.google.common.collect.MapMaker; /** * In memory resource locking that provides READ/WRITE lock capabilities. */ public class MemoryLocks { - final private HashMap<String, ReentrantReadWriteLock> locks = new HashMap<String, ReentrantReadWriteLock>(); - private static enum Type { - READ, WRITE - } + final private ConcurrentMap<String, ReentrantReadWriteLock> locks = new MapMaker().weakValues().makeMap(); /** * Implementation of {@link LockToken} for in memory locks. */ class MemoryLockToken implements LockToken { - private final ReentrantReadWriteLock rwLock; - private final java.util.concurrent.locks.Lock lock; - private final String resource; + private final ReentrantReadWriteLock lockEntry; + private final Type type; + + public MemoryLockToken(ReentrantReadWriteLock lockEntry, Type type) { + this.lockEntry = lockEntry; + this.type = type; - private MemoryLockToken(ReentrantReadWriteLock rwLock, java.util.concurrent.locks.Lock lock, String resource) { - this.rwLock = rwLock; - this.lock = lock; - this.resource = resource; } /** @@ -52,18 +51,15 @@ public class MemoryLocks { */ @Override public void release() { - lock.unlock(); - if (!isLockHeld()) { - synchronized (locks) { - if (!isLockHeld()) { - locks.remove(resource); - } - } + switch (type) { + case WRITE: + lockEntry.writeLock().unlock(); + break; + case READ: + lockEntry.readLock().unlock(); + break; } } - private boolean isLockHeld(){ - return rwLock.hasQueuedThreads() || rwLock.isWriteLocked() || rwLock.getReadLockCount() > 0; - } } /** @@ -76,41 +72,23 @@ public class MemoryLocks { } /** - * Obtain a READ lock for a source. + * Obtain a lock for a source. * * @param resource resource name. + * @param type lock type. * @param wait time out in milliseconds to wait for the lock, -1 means no timeout and 0 no wait. * @return the lock token for the resource, or <code>null</code> if the lock could not be obtained. * @throws InterruptedException thrown if the thread was interrupted while waiting. */ - public MemoryLockToken getReadLock(String resource, long wait) throws InterruptedException { - return getLock(resource, Type.READ, wait); - } - - /** - * Obtain a WRITE lock for a source. - * - * @param resource resource name. - * @param wait time out in milliseconds to wait for the lock, -1 means no timeout and 0 no wait. - * @return the lock token for the resource, or <code>null</code> if the lock could not be obtained. - * @throws InterruptedException thrown if the thread was interrupted while waiting. - */ - public MemoryLockToken getWriteLock(String resource, long wait) throws InterruptedException { - return getLock(resource, Type.WRITE, wait); - } - - private MemoryLockToken getLock(String resource, Type type, long wait) throws InterruptedException { - ReentrantReadWriteLock lockEntry; - synchronized (locks) { - if (locks.containsKey(resource)) { - lockEntry = locks.get(resource); - } - else { - lockEntry = new ReentrantReadWriteLock(true); - locks.put(resource, lockEntry); + public MemoryLockToken getLock(final String resource, Type type, long wait) throws InterruptedException { + ReentrantReadWriteLock lockEntry = locks.get(resource); + if (lockEntry == null) { + ReentrantReadWriteLock newLock = new ReentrantReadWriteLock(true); + lockEntry = locks.putIfAbsent(resource, newLock); + if (lockEntry == null) { + lockEntry = newLock; } } - Lock lock = (type.equals(Type.READ)) ? lockEntry.readLock() : lockEntry.writeLock(); if (wait == -1) { @@ -133,6 +111,10 @@ public class MemoryLocks { locks.put(resource, lockEntry); } } - return new MemoryLockToken(lockEntry, lock, resource); + return new MemoryLockToken(lockEntry, type); + } + + public ConcurrentMap<String, ReentrantReadWriteLock> getLockMap(){ + return locks; } } http://git-wip-us.apache.org/repos/asf/oozie/blob/d330d406/core/src/main/java/org/apache/oozie/service/MemoryLocksService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/MemoryLocksService.java b/core/src/main/java/org/apache/oozie/service/MemoryLocksService.java index d7c6a89..2ab2abc 100644 --- a/core/src/main/java/org/apache/oozie/service/MemoryLocksService.java +++ b/core/src/main/java/org/apache/oozie/service/MemoryLocksService.java @@ -29,6 +29,11 @@ import com.google.common.annotations.VisibleForTesting; * Service that provides in-memory locks. Assumes no other Oozie servers are using the database. */ public class MemoryLocksService implements Service, Instrumentable { + + public static enum Type { + READ, WRITE + } + protected static final String INSTRUMENTATION_GROUP = "locks"; private MemoryLocks locks; @@ -83,7 +88,7 @@ public class MemoryLocksService implements Service, Instrumentable { * @throws InterruptedException thrown if the thread was interrupted while waiting. */ public LockToken getReadLock(String resource, long wait) throws InterruptedException { - return locks.getReadLock(resource, wait); + return locks.getLock(resource, Type.READ, wait); } /** @@ -95,7 +100,7 @@ public class MemoryLocksService implements Service, Instrumentable { * @throws InterruptedException thrown if the thread was interrupted while waiting. */ public LockToken getWriteLock(String resource, long wait) throws InterruptedException { - return locks.getWriteLock(resource, wait); + return locks.getLock(resource, Type.WRITE, wait); } @VisibleForTesting http://git-wip-us.apache.org/repos/asf/oozie/blob/d330d406/core/src/main/java/org/apache/oozie/service/ZKLocksService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/ZKLocksService.java b/core/src/main/java/org/apache/oozie/service/ZKLocksService.java index 952b90d..8acbad9 100644 --- a/core/src/main/java/org/apache/oozie/service/ZKLocksService.java +++ b/core/src/main/java/org/apache/oozie/service/ZKLocksService.java @@ -17,7 +17,7 @@ */ package org.apache.oozie.service; -import java.util.HashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import org.apache.curator.framework.recipes.locks.InterProcessMutex; @@ -39,6 +39,7 @@ import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.utils.ThreadUtils; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.MapMaker; /** * Service that provides distributed locks via ZooKeeper. Requires that a ZooKeeper ensemble is available. The locks will be @@ -51,7 +52,8 @@ public class ZKLocksService extends MemoryLocksService implements Service, Instr private static XLog LOG = XLog.getLog(ZKLocksService.class); public static final String LOCKS_NODE = "/locks"; - final private HashMap<String, InterProcessReadWriteLock> zkLocks = new HashMap<String, InterProcessReadWriteLock>(); + private ConcurrentMap<String, InterProcessReadWriteLock> zkLocks = new MapMaker().weakValues().makeMap(); + private static final String REAPING_LEADER_PATH = ZKUtils.ZK_BASE_SERVICES_PATH + "/locksChildReaperLeaderPath"; public static final String REAPING_THRESHOLD = CONF_PREFIX + "ZKLocksService.locks.reaper.threshold"; @@ -123,18 +125,7 @@ public class ZKLocksService extends MemoryLocksService implements Service, Instr */ @Override public LockToken getReadLock(String resource, long wait) throws InterruptedException { - InterProcessReadWriteLock lockEntry; - synchronized (zkLocks) { - if (zkLocks.containsKey(resource)) { - lockEntry = zkLocks.get(resource); - } - else { - lockEntry = new InterProcessReadWriteLock(zk.getClient(), LOCKS_NODE + "/" + resource); - zkLocks.put(resource, lockEntry); - } - } - InterProcessMutex readLock = lockEntry.readLock(); - return acquireLock(wait, readLock, resource); + return acquireLock(resource, Type.READ, wait); } /** @@ -147,29 +138,27 @@ public class ZKLocksService extends MemoryLocksService implements Service, Instr */ @Override public LockToken getWriteLock(String resource, long wait) throws InterruptedException { - InterProcessReadWriteLock lockEntry; - synchronized (zkLocks) { - if (zkLocks.containsKey(resource)) { - lockEntry = zkLocks.get(resource); - } - else { - lockEntry = new InterProcessReadWriteLock(zk.getClient(), LOCKS_NODE + "/" + resource); - zkLocks.put(resource, lockEntry); - } - } - InterProcessMutex writeLock = lockEntry.writeLock(); - return acquireLock(wait, writeLock, resource); + return acquireLock(resource, Type.WRITE, wait); } - private LockToken acquireLock(long wait, InterProcessMutex lock, String resource) { + private LockToken acquireLock(final String resource, Type type, long wait) throws InterruptedException { + InterProcessReadWriteLock lockEntry = zkLocks.get(resource); + if (lockEntry == null) { + InterProcessReadWriteLock newLock = new InterProcessReadWriteLock(zk.getClient(), LOCKS_NODE + "/" + resource); + lockEntry = zkLocks.putIfAbsent(resource, newLock); + if (lockEntry == null) { + lockEntry = newLock; + } + } + InterProcessMutex lock = (type.equals(Type.READ)) ? lockEntry.readLock() : lockEntry.writeLock(); ZKLockToken token = null; try { if (wait == -1) { lock.acquire(); - token = new ZKLockToken(lock, resource); + token = new ZKLockToken(lockEntry, type); } else if (lock.acquire(wait, TimeUnit.MILLISECONDS)) { - token = new ZKLockToken(lock, resource); + token = new ZKLockToken(lockEntry, type); } } catch (Exception ex) { @@ -183,12 +172,12 @@ public class ZKLocksService extends MemoryLocksService implements Service, Instr * Implementation of {@link LockToken} for zookeeper locks. */ class ZKLockToken implements LockToken { - private final InterProcessMutex lock; - private final String resource; + private final InterProcessReadWriteLock lockEntry; + private final Type type; - private ZKLockToken(InterProcessMutex lock, String resource) { - this.lock = lock; - this.resource = resource; + private ZKLockToken(InterProcessReadWriteLock lockEntry, Type type) { + this.lockEntry = lockEntry; + this.type = type; } /** @@ -197,35 +186,23 @@ public class ZKLocksService extends MemoryLocksService implements Service, Instr @Override public void release() { try { - lock.release(); - if (zkLocks.get(resource) == null) { - return; - } - if (!isLockHeld()) { - synchronized (zkLocks) { - if (zkLocks.get(resource) != null) { - if (!isLockHeld()) { - zkLocks.remove(resource); - } - } - } + switch (type) { + case WRITE: + lockEntry.writeLock().release(); + break; + case READ: + lockEntry.readLock().release(); + break; } } catch (Exception ex) { LOG.warn("Could not release lock: " + ex.getMessage(), ex); } - } - - private boolean isLockHeld() { - return zkLocks.get(resource).readLock().isAcquiredInThisProcess() - || zkLocks.get(resource).writeLock().isAcquiredInThisProcess(); - } - } @VisibleForTesting - public HashMap<String, InterProcessReadWriteLock> getLocks(){ + public ConcurrentMap<String, InterProcessReadWriteLock> getLocks(){ return zkLocks; } http://git-wip-us.apache.org/repos/asf/oozie/blob/d330d406/core/src/test/java/org/apache/oozie/lock/TestMemoryLocks.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/lock/TestMemoryLocks.java b/core/src/test/java/org/apache/oozie/lock/TestMemoryLocks.java index f0a87e5..8c7b58e 100644 --- a/core/src/test/java/org/apache/oozie/lock/TestMemoryLocks.java +++ b/core/src/test/java/org/apache/oozie/lock/TestMemoryLocks.java @@ -23,6 +23,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.apache.oozie.service.MemoryLocksService; +import org.apache.oozie.service.MemoryLocksService.Type; import org.apache.oozie.service.ServiceException; import org.apache.oozie.service.Services; import org.apache.oozie.test.XTestCase; @@ -31,6 +32,7 @@ import org.apache.oozie.util.XLog; public class TestMemoryLocks extends XTestCase { private static final int LATCH_TIMEOUT = 10; private XLog log = XLog.getLog(getClass()); + public static final int DEFAULT_LOCK_TIMEOUT = 5 * 1000; private MemoryLocks locks; @@ -118,7 +120,7 @@ public class TestMemoryLocks extends XTestCase { } protected MemoryLocks.MemoryLockToken getLock() throws InterruptedException { - return locks.getReadLock(name, timeout); + return locks.getLock(name, Type.READ, timeout); } } @@ -129,7 +131,7 @@ public class TestMemoryLocks extends XTestCase { } protected MemoryLocks.MemoryLockToken getLock() throws InterruptedException { - return locks.getWriteLock(name, timeout); + return locks.getLock(name, Type.WRITE, timeout); } } @@ -323,7 +325,7 @@ public class TestMemoryLocks extends XTestCase { } protected MemoryLocks.MemoryLockToken getLock() throws InterruptedException { - return locks.getWriteLock(name, timeout); + return locks.getLock(name, Type.WRITE, timeout); } } @@ -372,16 +374,16 @@ public class TestMemoryLocks extends XTestCase { MemoryLocksService lockService = new MemoryLocksService(); try { lockService.init(Services.get()); - LockToken lock = lockService.getWriteLock(path, 5000); - lock = (LockToken) lockService.getWriteLock(path, 5000); - lock = (LockToken) lockService.getWriteLock(path, 5000); + LockToken lock = lockService.getWriteLock(path, DEFAULT_LOCK_TIMEOUT); + lock = (LockToken) lockService.getWriteLock(path, DEFAULT_LOCK_TIMEOUT); + lock = (LockToken) lockService.getWriteLock(path, DEFAULT_LOCK_TIMEOUT); assertEquals(lockService.getMemoryLocks().size(), 1); lock.release(); assertEquals(lockService.getMemoryLocks().size(), 1); lock.release(); assertEquals(lockService.getMemoryLocks().size(), 1); lock.release(); - assertEquals(lockService.getMemoryLocks().size(), 0); + checkLockRelease(path, lockService); } catch (Exception e) { fail("Reentrant property, it should have acquired lock"); @@ -391,4 +393,48 @@ public class TestMemoryLocks extends XTestCase { } } + public void testLocksAreGarbageCollected() throws ServiceException, InterruptedException { + String path = new String("a"); + String path1 = new String("a"); + MemoryLocksService lockService = new MemoryLocksService(); + lockService.init(Services.get()); + LockToken lock = lockService.getWriteLock(path, DEFAULT_LOCK_TIMEOUT); + int oldHash = lockService.getMemoryLocks().getLockMap().get(path).hashCode(); + lock.release(); + lock = lockService.getWriteLock(path1, DEFAULT_LOCK_TIMEOUT); + int newHash = lockService.getMemoryLocks().getLockMap().get(path1).hashCode(); + assertTrue(oldHash == newHash); + lock.release(); + lock = null; + System.gc(); + path = "a"; + lock = lockService.getWriteLock(path, DEFAULT_LOCK_TIMEOUT); + newHash = lockService.getMemoryLocks().getLockMap().get(path).hashCode(); + assertFalse(oldHash == newHash); + + } + + public void testLocksAreReused() throws ServiceException, InterruptedException { + String path = "a"; + MemoryLocksService lockService = new MemoryLocksService(); + lockService.init(Services.get()); + LockToken lock = lockService.getWriteLock(path, DEFAULT_LOCK_TIMEOUT); + int oldHash = System.identityHashCode(lockService.getMemoryLocks().getLockMap().get(path)); + System.gc(); + lock.release(); + lock = lockService.getWriteLock(path, DEFAULT_LOCK_TIMEOUT); + assertEquals(lockService.getMemoryLocks().size(), 1); + int newHash = System.identityHashCode(lockService.getMemoryLocks().getLockMap().get(path)); + assertTrue(oldHash == newHash); + } + + private void checkLockRelease(String path, MemoryLocksService lockService) { + if (lockService.getMemoryLocks().getLockMap().get(path) == null) { + // good lock is removed from memory after gc. + } + else { + assertFalse(lockService.getMemoryLocks().getLockMap().get(path).isWriteLocked()); + } + } + } http://git-wip-us.apache.org/repos/asf/oozie/blob/d330d406/core/src/test/java/org/apache/oozie/service/TestZKLocksService.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/service/TestZKLocksService.java b/core/src/test/java/org/apache/oozie/service/TestZKLocksService.java index d1acadf..d04f04e 100644 --- a/core/src/test/java/org/apache/oozie/service/TestZKLocksService.java +++ b/core/src/test/java/org/apache/oozie/service/TestZKLocksService.java @@ -21,6 +21,7 @@ package org.apache.oozie.service; import java.util.UUID; import org.apache.oozie.lock.LockToken; +import org.apache.oozie.lock.TestMemoryLocks; import org.apache.oozie.service.ZKLocksService.ZKLockToken; import org.apache.oozie.test.ZKXTestCase; import org.apache.oozie.util.XLog; @@ -132,7 +133,7 @@ public class TestZKLocksService extends ZKXTestCase { ZKLocksService zkls = new ZKLocksService(); try { zkls.init(Services.get()); - _testWaitWriteLock(zkls, zkls); + checkWaitWriteLock(zkls, zkls); } finally { zkls.destroy(); @@ -146,7 +147,7 @@ public class TestZKLocksService extends ZKXTestCase { try { zkls1.init(Services.get()); zkls2.init(Services.get()); - _testWaitWriteLock(zkls1, zkls2); + checkWaitWriteLock(zkls1, zkls2); } finally { zkls1.destroy(); @@ -154,7 +155,7 @@ public class TestZKLocksService extends ZKXTestCase { } } - public void _testWaitWriteLock(ZKLocksService zkls1, ZKLocksService zkls2) throws Exception { + public void checkWaitWriteLock(ZKLocksService zkls1, ZKLocksService zkls2) throws Exception { StringBuffer sb = new StringBuffer(""); Locker l1 = new WriteLocker("a", 1, -1, sb, zkls1); Locker l2 = new WriteLocker("a", 2, -1, sb, zkls2); @@ -174,7 +175,7 @@ public class TestZKLocksService extends ZKXTestCase { ZKLocksService zkls = new ZKLocksService(); try { zkls.init(Services.get()); - _testNoWaitWriteLock(zkls, zkls); + checkNoWaitWriteLock(zkls, zkls); } finally { zkls.destroy(); @@ -188,7 +189,7 @@ public class TestZKLocksService extends ZKXTestCase { try { zkls1.init(Services.get()); zkls2.init(Services.get()); - _testNoWaitWriteLock(zkls1, zkls2); + checkNoWaitWriteLock(zkls1, zkls2); } finally { zkls1.destroy(); @@ -196,7 +197,7 @@ public class TestZKLocksService extends ZKXTestCase { } } - public void _testNoWaitWriteLock(ZKLocksService zkls1, ZKLocksService zkls2) throws Exception { + public void checkNoWaitWriteLock(ZKLocksService zkls1, ZKLocksService zkls2) throws Exception { StringBuffer sb = new StringBuffer(""); Locker l1 = new WriteLocker("a", 1, 0, sb, zkls1); Locker l2 = new WriteLocker("a", 2, 0, sb, zkls2); @@ -216,7 +217,7 @@ public class TestZKLocksService extends ZKXTestCase { ZKLocksService zkls = new ZKLocksService(); try { zkls.init(Services.get()); - _testTimeoutWaitingWriteLock(zkls, zkls); + checkTimeoutWaitingWriteLock(zkls, zkls); } finally { zkls.destroy(); @@ -230,7 +231,7 @@ public class TestZKLocksService extends ZKXTestCase { try { zkls1.init(Services.get()); zkls2.init(Services.get()); - _testTimeoutWaitingWriteLock(zkls1, zkls2); + checkTimeoutWaitingWriteLock(zkls1, zkls2); } finally { zkls1.destroy(); @@ -238,7 +239,7 @@ public class TestZKLocksService extends ZKXTestCase { } } - public void _testTimeoutWaitingWriteLock(ZKLocksService zkls1, ZKLocksService zkls2) throws Exception { + public void checkTimeoutWaitingWriteLock(ZKLocksService zkls1, ZKLocksService zkls2) throws Exception { StringBuffer sb = new StringBuffer(""); Locker l1 = new WriteLocker("a", 1, 0, sb, zkls1); Locker l2 = new WriteLocker("a", 2, (long) (WAITFOR_RATIO * 2000), sb, zkls2); @@ -258,7 +259,7 @@ public class TestZKLocksService extends ZKXTestCase { ZKLocksService zkls = new ZKLocksService(); try { zkls.init(Services.get()); - _testTimeoutTimingOutWriteLock(zkls, zkls); + checkTimeoutTimingOutWriteLock(zkls, zkls); } finally { zkls.destroy(); @@ -272,7 +273,7 @@ public class TestZKLocksService extends ZKXTestCase { try { zkls1.init(Services.get()); zkls2.init(Services.get()); - _testTimeoutTimingOutWriteLock(zkls1, zkls2); + checkTimeoutTimingOutWriteLock(zkls1, zkls2); } finally { zkls1.destroy(); @@ -280,7 +281,7 @@ public class TestZKLocksService extends ZKXTestCase { } } - public void _testTimeoutTimingOutWriteLock(ZKLocksService zkls1, ZKLocksService zkls2) throws Exception { + public void checkTimeoutTimingOutWriteLock(ZKLocksService zkls1, ZKLocksService zkls2) throws Exception { StringBuffer sb = new StringBuffer(""); Locker l1 = new WriteLocker("a", 1, 0, sb, zkls1); Locker l2 = new WriteLocker("a", 2, 50, sb, zkls2); @@ -300,7 +301,7 @@ public class TestZKLocksService extends ZKXTestCase { ZKLocksService zkls = new ZKLocksService(); try { zkls.init(Services.get()); - _testReadLock(zkls, zkls); + checkReadLock(zkls, zkls); } finally { zkls.destroy(); @@ -314,7 +315,7 @@ public class TestZKLocksService extends ZKXTestCase { try { zkls1.init(Services.get()); zkls2.init(Services.get()); - _testReadLock(zkls1, zkls2); + checkReadLock(zkls1, zkls2); } finally { zkls1.destroy(); @@ -322,7 +323,7 @@ public class TestZKLocksService extends ZKXTestCase { } } - public void _testReadLock(ZKLocksService zkls1, ZKLocksService zkls2) throws Exception { + public void checkReadLock(ZKLocksService zkls1, ZKLocksService zkls2) throws Exception { StringBuffer sb = new StringBuffer(""); Locker l1 = new ReadLocker("a", 1, -1, sb, zkls1); Locker l2 = new ReadLocker("a", 2, -1, sb, zkls2); @@ -342,7 +343,7 @@ public class TestZKLocksService extends ZKXTestCase { ZKLocksService zkls = new ZKLocksService(); try { zkls.init(Services.get()); - _testReadWriteLock(zkls, zkls); + checkReadWriteLock(zkls, zkls); } finally { zkls.destroy(); @@ -356,7 +357,7 @@ public class TestZKLocksService extends ZKXTestCase { try { zkls1.init(Services.get()); zkls2.init(Services.get()); - _testReadWriteLock(zkls1, zkls2); + checkReadWriteLock(zkls1, zkls2); } finally { zkls1.destroy(); @@ -364,7 +365,7 @@ public class TestZKLocksService extends ZKXTestCase { } } - public void _testReadWriteLock(ZKLocksService zkls1, ZKLocksService zkls2) throws Exception { + public void checkReadWriteLock(ZKLocksService zkls1, ZKLocksService zkls2) throws Exception { StringBuffer sb = new StringBuffer(""); Locker l1 = new ReadLocker("a", 1, -1, sb, zkls1); Locker l2 = new WriteLocker("a", 2, -1, sb, zkls2); @@ -384,7 +385,7 @@ public class TestZKLocksService extends ZKXTestCase { ZKLocksService zkls = new ZKLocksService(); try { zkls.init(Services.get()); - _testWriteReadLock(zkls, zkls); + checkWriteReadLock(zkls, zkls); } finally { zkls.destroy(); @@ -398,7 +399,7 @@ public class TestZKLocksService extends ZKXTestCase { try { zkls1.init(Services.get()); zkls2.init(Services.get()); - _testWriteReadLock(zkls1, zkls2); + checkWriteReadLock(zkls1, zkls2); } finally { zkls1.destroy(); @@ -406,7 +407,7 @@ public class TestZKLocksService extends ZKXTestCase { } } - public void _testWriteReadLock(ZKLocksService zkls1, ZKLocksService zkls2) throws Exception { + public void checkWriteReadLock(ZKLocksService zkls1, ZKLocksService zkls2) throws Exception { StringBuffer sb = new StringBuffer(""); Locker l1 = new WriteLocker("a", 1, -1, sb, zkls1); Locker l2 = new ReadLocker("a", 2, -1, sb, zkls2); @@ -427,10 +428,10 @@ public class TestZKLocksService extends ZKXTestCase { ZKLocksService zkls = new ZKLocksService(); try { zkls.init(Services.get()); - ZKLockToken lock = (ZKLockToken) zkls.getWriteLock(path, 5000); + ZKLockToken lock = (ZKLockToken) zkls.getWriteLock(path, TestMemoryLocks.DEFAULT_LOCK_TIMEOUT); assertTrue(zkls.getLocks().containsKey(path)); lock.release(); - assertFalse(zkls.getLocks().containsKey(path)); + checkLockRelease(path, zkls); } finally { zkls.destroy(); @@ -442,16 +443,16 @@ public class TestZKLocksService extends ZKXTestCase { ZKLocksService zkls = new ZKLocksService(); try { zkls.init(Services.get()); - ZKLockToken lock = (ZKLockToken) zkls.getWriteLock(path, 5000); - lock = (ZKLockToken) zkls.getWriteLock(path, 5000); - lock = (ZKLockToken) zkls.getWriteLock(path, 5000); + ZKLockToken lock = (ZKLockToken) zkls.getWriteLock(path, TestMemoryLocks.DEFAULT_LOCK_TIMEOUT); + lock = (ZKLockToken) zkls.getWriteLock(path, TestMemoryLocks.DEFAULT_LOCK_TIMEOUT); + lock = (ZKLockToken) zkls.getWriteLock(path, TestMemoryLocks.DEFAULT_LOCK_TIMEOUT); assertTrue(zkls.getLocks().containsKey(path)); lock.release(); assertTrue(zkls.getLocks().containsKey(path)); lock.release(); assertTrue(zkls.getLocks().containsKey(path)); lock.release(); - assertFalse(zkls.getLocks().containsKey(path)); + checkLockRelease(path, zkls); } catch (Exception e) { fail("Reentrant property, it should have acquired lock"); @@ -470,10 +471,10 @@ public class TestZKLocksService extends ZKXTestCase { ThreadLock t2 = new ThreadLock(zkls, path); t1.start(); t1.join(); - assertFalse(zkls.getLocks().containsKey(path)); + checkLockRelease(path, zkls); t2.start(); t2.join(); - assertFalse(zkls.getLocks().containsKey(path)); + checkLockRelease(path, zkls); } finally { zkls.destroy(); @@ -507,6 +508,58 @@ public class TestZKLocksService extends ZKXTestCase { } } + public void testLocksAreGarbageCollected() throws ServiceException, InterruptedException { + String path = new String("a"); + String path1 = new String("a"); + ZKLocksService lockService = new ZKLocksService(); + try { + lockService.init(Services.get()); + LockToken lock = lockService.getWriteLock(path, TestMemoryLocks.DEFAULT_LOCK_TIMEOUT); + lock.release(); + assertEquals(lockService.getLocks().size(), 1); + int oldHash = lockService.getLocks().get(path).hashCode(); + lock = lockService.getWriteLock(path1, TestMemoryLocks.DEFAULT_LOCK_TIMEOUT); + int newHash = lockService.getLocks().get(path1).hashCode(); + assertTrue(oldHash == newHash); + lock = null; + System.gc(); + lock = lockService.getWriteLock(path, TestMemoryLocks.DEFAULT_LOCK_TIMEOUT); + newHash = lockService.getLocks().get(path).hashCode(); + assertFalse(oldHash == newHash); + } + finally { + lockService.destroy(); + } + } + + public void testLocksAreReused() throws ServiceException, InterruptedException { + String path = "a"; + ZKLocksService lockService = new ZKLocksService(); + try { + lockService.init(Services.get()); + LockToken lock = lockService.getWriteLock(path, TestMemoryLocks.DEFAULT_LOCK_TIMEOUT); + int oldHash = System.identityHashCode(lockService.getLocks().get(path)); + System.gc(); + lock.release(); + lock = lockService.getWriteLock(path, TestMemoryLocks.DEFAULT_LOCK_TIMEOUT); + assertEquals(lockService.getLocks().size(), 1); + int newHash = System.identityHashCode(lockService.getLocks().get(path)); + assertTrue(oldHash == newHash); + } + finally { + lockService.destroy(); + } + } + + private void checkLockRelease(String path, ZKLocksService zkls) { + if (zkls.getLocks().get(path) == null) { + // good, lock is removed from memory after gc. + } + else { + assertFalse(zkls.getLocks().get(path).writeLock().isAcquiredInThisProcess()); + } + } + static class ThreadLock extends Thread { ZKLocksService zkls; String path; @@ -520,9 +573,9 @@ public class TestZKLocksService extends ZKXTestCase { public void run() { try { - lock = zkls.getWriteLock(path, 5000); + lock = zkls.getWriteLock(path, TestMemoryLocks.DEFAULT_LOCK_TIMEOUT); if (lock != null) { - lock = zkls.getWriteLock(path, 5000); + lock = zkls.getWriteLock(path, TestMemoryLocks.DEFAULT_LOCK_TIMEOUT); Thread.sleep(1000); lock.release(); Thread.sleep(1000); http://git-wip-us.apache.org/repos/asf/oozie/blob/d330d406/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 10a183a..b03a61a 100644 --- a/release-log.txt +++ b/release-log.txt @@ -3,6 +3,7 @@ -- Oozie 4.3.0 release +OOZIE-2501 ZK reentrant lock doesn't work for few cases (puru) OOZIE-2582 Populating external child Ids for action failures (abhishekbafna via rohini) OOZIE-2678 Oozie job -kill doesn't work with tez jobs (abhishekbafna via rohini) OOZIE-2676 Make hadoop-2 as the default profile (gezapeti via rkanter)