[jira] [Commented] (OOZIE-2501) ZK reentrant lock doesn't work for few cases
[ https://issues.apache.org/jira/browse/OOZIE-2501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15527186#comment-15527186 ] Purshotam Shah commented on OOZIE-2501: --- Committed to trunk and 4.3 branch. > ZK reentrant lock doesn't work for few cases > > > Key: OOZIE-2501 > URL: https://issues.apache.org/jira/browse/OOZIE-2501 > Project: Oozie > Issue Type: Bug >Reporter: Purshotam Shah >Assignee: Purshotam Shah >Priority: Blocker > Fix For: 4.3.0 > > Attachments: OOZIE-2501-V2.patch, OOZIE-2501-V4.patch, > OOZIE-2501-V7.patch > > > We will have an issue when oozie trying to acquire a lock and at the same > time, some other thread is releasing the same lock . > acquireLock will wait for 5 sec to acquire the lock. It will bypass the > synchronized block and get lockEntry from the hashmap. > While it waiting for 5 sec to acquire the lock, other thread releases the > lock and may execute the release code which will remove lockEntry from the > map. > If some other command from same thread tries to acquire the lock, it will > create a new InterProcessReadWriteLock object and use that for acquiring the > lock. > Logic for lock acquiring. > {code} > public LockToken getWriteLock(String resource, long wait) throws > InterruptedException { > InterProcessReadWriteLock lockEntry; > synchronized (zkLocks) { > if (zkLocks.containsKey(resource)) { > lockEntry = zkLocks.get(resource); > } > else { > lockEntry = new InterProcessReadWriteLock(zk.getClient(), > LOCKS_NODE + "/" + resource); > zkLocks.put(resource, lockEntry); > } > } > InterProcessMutex writeLock = lockEntry.writeLock(); > return acquireLock(wait, writeLock, resource); > } > {code} > Logic for lock releasing > {code} > public void release() { > try { > lock.release(); > if (zkLocks.get(resource) == null) { > return; > } > if (!isLockHeld()) { > synchronized (zkLocks) { > if (zkLocks.get(resource) != null) { > if (!isLockHeld()) { > zkLocks.remove(resource); > } > } > } > } > } > catch (Exception ex) { > LOG.warn("Could not release lock: " + ex.getMessage(), ex); > } > } > {code} > Curator code to acquire lock. > {code} > private boolean internalLock(long time, TimeUnit unit) throws Exception > { > /* >Note on concurrency: a given lockData instance >can be only acted on by a single thread so locking isn't necessary > */ > Thread currentThread = Thread.currentThread(); > LockDatalockData = threadData.get(currentThread); > if ( lockData != null ) > { > // re-entering > lockData.lockCount.incrementAndGet(); > return true; > } > String lockPath = internals.attemptLock(time, unit, > getLockNodeBytes()); > if ( lockPath != null ) > { > LockDatanewLockData = new LockData(currentThread, > lockPath); > threadData.put(currentThread, newLockData); > return true; > } > return false; > } > {code} > The approach we have followed is to use map with weakvalue. Once the lock is > unreachable. GC will remove it from the map. We don't have to explicitly > remove it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (OOZIE-2501) ZK reentrant lock doesn't work for few cases
[ https://issues.apache.org/jira/browse/OOZIE-2501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15514773#comment-15514773 ] Rohini Palaniswamy commented on OOZIE-2501: --- +1 > ZK reentrant lock doesn't work for few cases > > > Key: OOZIE-2501 > URL: https://issues.apache.org/jira/browse/OOZIE-2501 > Project: Oozie > Issue Type: Bug >Reporter: Purshotam Shah >Assignee: Purshotam Shah >Priority: Blocker > Fix For: 4.3.0 > > Attachments: OOZIE-2501-V2.patch, OOZIE-2501-V4.patch, > OOZIE-2501-V7.patch > > > We will have an issue when oozie trying to acquire a lock and at the same > time, some other thread is releasing the same lock . > acquireLock will wait for 5 sec to acquire the lock. It will bypass the > synchronized block and get lockEntry from the hashmap. > While it waiting for 5 sec to acquire the lock, other thread releases the > lock and may execute the release code which will remove lockEntry from the > map. > If some other command from same thread tries to acquire the lock, it will > create a new InterProcessReadWriteLock object and use that for acquiring the > lock. > Logic for lock acquiring. > {code} > public LockToken getWriteLock(String resource, long wait) throws > InterruptedException { > InterProcessReadWriteLock lockEntry; > synchronized (zkLocks) { > if (zkLocks.containsKey(resource)) { > lockEntry = zkLocks.get(resource); > } > else { > lockEntry = new InterProcessReadWriteLock(zk.getClient(), > LOCKS_NODE + "/" + resource); > zkLocks.put(resource, lockEntry); > } > } > InterProcessMutex writeLock = lockEntry.writeLock(); > return acquireLock(wait, writeLock, resource); > } > {code} > Logic for lock releasing > {code} > public void release() { > try { > lock.release(); > if (zkLocks.get(resource) == null) { > return; > } > if (!isLockHeld()) { > synchronized (zkLocks) { > if (zkLocks.get(resource) != null) { > if (!isLockHeld()) { > zkLocks.remove(resource); > } > } > } > } > } > catch (Exception ex) { > LOG.warn("Could not release lock: " + ex.getMessage(), ex); > } > } > {code} > Curator code to acquire lock. > {code} > private boolean internalLock(long time, TimeUnit unit) throws Exception > { > /* >Note on concurrency: a given lockData instance >can be only acted on by a single thread so locking isn't necessary > */ > Thread currentThread = Thread.currentThread(); > LockDatalockData = threadData.get(currentThread); > if ( lockData != null ) > { > // re-entering > lockData.lockCount.incrementAndGet(); > return true; > } > String lockPath = internals.attemptLock(time, unit, > getLockNodeBytes()); > if ( lockPath != null ) > { > LockDatanewLockData = new LockData(currentThread, > lockPath); > threadData.put(currentThread, newLockData); > return true; > } > return false; > } > {code} > The approach we have followed is to use map with weakvalue. Once the lock is > unreachable. GC will remove it from the map. We don't have to explicitly > remove it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (OOZIE-2501) ZK reentrant lock doesn't work for few cases
[ https://issues.apache.org/jira/browse/OOZIE-2501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15514615#comment-15514615 ] Peter Bacsko commented on OOZIE-2501: - Thanks, I think we're good to go. > ZK reentrant lock doesn't work for few cases > > > Key: OOZIE-2501 > URL: https://issues.apache.org/jira/browse/OOZIE-2501 > Project: Oozie > Issue Type: Bug >Reporter: Purshotam Shah >Assignee: Purshotam Shah >Priority: Blocker > Fix For: 4.3.0 > > Attachments: OOZIE-2501-V2.patch, OOZIE-2501-V4.patch, > OOZIE-2501-V7.patch > > > We will have an issue when oozie trying to acquire a lock and at the same > time, some other thread is releasing the same lock . > acquireLock will wait for 5 sec to acquire the lock. It will bypass the > synchronized block and get lockEntry from the hashmap. > While it waiting for 5 sec to acquire the lock, other thread releases the > lock and may execute the release code which will remove lockEntry from the > map. > If some other command from same thread tries to acquire the lock, it will > create a new InterProcessReadWriteLock object and use that for acquiring the > lock. > Logic for lock acquiring. > {code} > public LockToken getWriteLock(String resource, long wait) throws > InterruptedException { > InterProcessReadWriteLock lockEntry; > synchronized (zkLocks) { > if (zkLocks.containsKey(resource)) { > lockEntry = zkLocks.get(resource); > } > else { > lockEntry = new InterProcessReadWriteLock(zk.getClient(), > LOCKS_NODE + "/" + resource); > zkLocks.put(resource, lockEntry); > } > } > InterProcessMutex writeLock = lockEntry.writeLock(); > return acquireLock(wait, writeLock, resource); > } > {code} > Logic for lock releasing > {code} > public void release() { > try { > lock.release(); > if (zkLocks.get(resource) == null) { > return; > } > if (!isLockHeld()) { > synchronized (zkLocks) { > if (zkLocks.get(resource) != null) { > if (!isLockHeld()) { > zkLocks.remove(resource); > } > } > } > } > } > catch (Exception ex) { > LOG.warn("Could not release lock: " + ex.getMessage(), ex); > } > } > {code} > Curator code to acquire lock. > {code} > private boolean internalLock(long time, TimeUnit unit) throws Exception > { > /* >Note on concurrency: a given lockData instance >can be only acted on by a single thread so locking isn't necessary > */ > Thread currentThread = Thread.currentThread(); > LockDatalockData = threadData.get(currentThread); > if ( lockData != null ) > { > // re-entering > lockData.lockCount.incrementAndGet(); > return true; > } > String lockPath = internals.attemptLock(time, unit, > getLockNodeBytes()); > if ( lockPath != null ) > { > LockDatanewLockData = new LockData(currentThread, > lockPath); > threadData.put(currentThread, newLockData); > return true; > } > return false; > } > {code} > The approach we have followed is to use map with weakvalue. Once the lock is > unreachable. GC will remove it from the map. We don't have to explicitly > remove it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (OOZIE-2501) ZK reentrant lock doesn't work for few cases
[ https://issues.apache.org/jira/browse/OOZIE-2501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15514609#comment-15514609 ] Hadoop QA commented on OOZIE-2501: -- Testing JIRA OOZIE-2501 Cleaning local git workspace {color:green}+1 PATCH_APPLIES{color} {color:green}+1 CLEAN{color} {color:green}+1 RAW_PATCH_ANALYSIS{color} .{color:green}+1{color} the patch does not introduce any @author tags .{color:green}+1{color} the patch does not introduce any tabs .{color:green}+1{color} the patch does not introduce any trailing spaces .{color:green}+1{color} the patch does not introduce any line longer than 132 .{color:green}+1{color} the patch does adds/modifies 2 testcase(s) {color:green}+1 RAT{color} .{color:green}+1{color} the patch does not seem to introduce new RAT warnings {color:green}+1 JAVADOC{color} .{color:green}+1{color} the patch does not seem to introduce new Javadoc warnings {color:green}+1 COMPILE{color} .{color:green}+1{color} HEAD compiles .{color:green}+1{color} patch compiles .{color:green}+1{color} the patch does not seem to introduce new javac warnings {color:green}+1 BACKWARDS_COMPATIBILITY{color} .{color:green}+1{color} the patch does not change any JPA Entity/Colum/Basic/Lob/Transient annotations .{color:green}+1{color} the patch does not modify JPA files {color:green}+1 TESTS{color} .Tests run: 1812 {color:green}+1 DISTRO{color} .{color:green}+1{color} distro tarball builds with the patch {color:green}*+1 Overall result, good!, no -1s*{color} The full output of the test-patch run is available at . https://builds.apache.org/job/oozie-trunk-precommit-build/3320/ > ZK reentrant lock doesn't work for few cases > > > Key: OOZIE-2501 > URL: https://issues.apache.org/jira/browse/OOZIE-2501 > Project: Oozie > Issue Type: Bug >Reporter: Purshotam Shah >Assignee: Purshotam Shah >Priority: Blocker > Fix For: 4.3.0 > > Attachments: OOZIE-2501-V2.patch, OOZIE-2501-V4.patch, > OOZIE-2501-V7.patch > > > We will have an issue when oozie trying to acquire a lock and at the same > time, some other thread is releasing the same lock . > acquireLock will wait for 5 sec to acquire the lock. It will bypass the > synchronized block and get lockEntry from the hashmap. > While it waiting for 5 sec to acquire the lock, other thread releases the > lock and may execute the release code which will remove lockEntry from the > map. > If some other command from same thread tries to acquire the lock, it will > create a new InterProcessReadWriteLock object and use that for acquiring the > lock. > Logic for lock acquiring. > {code} > public LockToken getWriteLock(String resource, long wait) throws > InterruptedException { > InterProcessReadWriteLock lockEntry; > synchronized (zkLocks) { > if (zkLocks.containsKey(resource)) { > lockEntry = zkLocks.get(resource); > } > else { > lockEntry = new InterProcessReadWriteLock(zk.getClient(), > LOCKS_NODE + "/" + resource); > zkLocks.put(resource, lockEntry); > } > } > InterProcessMutex writeLock = lockEntry.writeLock(); > return acquireLock(wait, writeLock, resource); > } > {code} > Logic for lock releasing > {code} > public void release() { > try { > lock.release(); > if (zkLocks.get(resource) == null) { > return; > } > if (!isLockHeld()) { > synchronized (zkLocks) { > if (zkLocks.get(resource) != null) { > if (!isLockHeld()) { > zkLocks.remove(resource); > } > } > } > } > } > catch (Exception ex) { > LOG.warn("Could not release lock: " + ex.getMessage(), ex); > } > } > {code} > Curator code to acquire lock. > {code} > private boolean internalLock(long time, TimeUnit unit) throws Exception > { > /* >Note on concurrency: a given lockData instance >can be only acted on by a single thread so locking isn't necessary > */ > Thread currentThread = Thread.currentThread(); > LockDatalockData = threadData.get(currentThread); > if ( lockData != null ) > { > // re-entering > lockData.lockCount.incrementAndGet(); > return true; > } > String lockPath = internals.attemptLock(time, unit, > getLockNodeBytes()); >
[jira] [Commented] (OOZIE-2501) ZK reentrant lock doesn't work for few cases
[ https://issues.apache.org/jira/browse/OOZIE-2501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15513993#comment-15513993 ] Purshotam Shah commented on OOZIE-2501: --- [~pbacsko] I have uploaded new patch addressing your comment. > ZK reentrant lock doesn't work for few cases > > > Key: OOZIE-2501 > URL: https://issues.apache.org/jira/browse/OOZIE-2501 > Project: Oozie > Issue Type: Bug >Reporter: Purshotam Shah >Assignee: Purshotam Shah >Priority: Blocker > Fix For: 4.3.0 > > Attachments: OOZIE-2501-V2.patch, OOZIE-2501-V4.patch, > OOZIE-2501-V7.patch > > > We will have an issue when oozie trying to acquire a lock and at the same > time, some other thread is releasing the same lock . > acquireLock will wait for 5 sec to acquire the lock. It will bypass the > synchronized block and get lockEntry from the hashmap. > While it waiting for 5 sec to acquire the lock, other thread releases the > lock and may execute the release code which will remove lockEntry from the > map. > If some other command from same thread tries to acquire the lock, it will > create a new InterProcessReadWriteLock object and use that for acquiring the > lock. > Logic for lock acquiring. > {code} > public LockToken getWriteLock(String resource, long wait) throws > InterruptedException { > InterProcessReadWriteLock lockEntry; > synchronized (zkLocks) { > if (zkLocks.containsKey(resource)) { > lockEntry = zkLocks.get(resource); > } > else { > lockEntry = new InterProcessReadWriteLock(zk.getClient(), > LOCKS_NODE + "/" + resource); > zkLocks.put(resource, lockEntry); > } > } > InterProcessMutex writeLock = lockEntry.writeLock(); > return acquireLock(wait, writeLock, resource); > } > {code} > Logic for lock releasing > {code} > public void release() { > try { > lock.release(); > if (zkLocks.get(resource) == null) { > return; > } > if (!isLockHeld()) { > synchronized (zkLocks) { > if (zkLocks.get(resource) != null) { > if (!isLockHeld()) { > zkLocks.remove(resource); > } > } > } > } > } > catch (Exception ex) { > LOG.warn("Could not release lock: " + ex.getMessage(), ex); > } > } > {code} > Curator code to acquire lock. > {code} > private boolean internalLock(long time, TimeUnit unit) throws Exception > { > /* >Note on concurrency: a given lockData instance >can be only acted on by a single thread so locking isn't necessary > */ > Thread currentThread = Thread.currentThread(); > LockDatalockData = threadData.get(currentThread); > if ( lockData != null ) > { > // re-entering > lockData.lockCount.incrementAndGet(); > return true; > } > String lockPath = internals.attemptLock(time, unit, > getLockNodeBytes()); > if ( lockPath != null ) > { > LockDatanewLockData = new LockData(currentThread, > lockPath); > threadData.put(currentThread, newLockData); > return true; > } > return false; > } > {code} > The approach we have followed is to use map with weakvalue. Once the lock is > unreachable. GC will remove it from the map. We don't have to explicitly > remove it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (OOZIE-2501) ZK reentrant lock doesn't work for few cases
[ https://issues.apache.org/jira/browse/OOZIE-2501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15513773#comment-15513773 ] Abhishek Bafna commented on OOZIE-2501: --- Ping: [~rohini], [~rkanter] [~puru] [~jaydeepvishwakarma]. Pinging for Oozie 4.3.0 Release perspective. > ZK reentrant lock doesn't work for few cases > > > Key: OOZIE-2501 > URL: https://issues.apache.org/jira/browse/OOZIE-2501 > Project: Oozie > Issue Type: Bug >Reporter: Purshotam Shah >Assignee: Purshotam Shah >Priority: Blocker > Fix For: 4.3.0 > > Attachments: OOZIE-2501-V2.patch, OOZIE-2501-V4.patch > > > We will have an issue when oozie trying to acquire a lock and at the same > time, some other thread is releasing the same lock . > acquireLock will wait for 5 sec to acquire the lock. It will bypass the > synchronized block and get lockEntry from the hashmap. > While it waiting for 5 sec to acquire the lock, other thread releases the > lock and may execute the release code which will remove lockEntry from the > map. > If some other command from same thread tries to acquire the lock, it will > create a new InterProcessReadWriteLock object and use that for acquiring the > lock. > Logic for lock acquiring. > {code} > public LockToken getWriteLock(String resource, long wait) throws > InterruptedException { > InterProcessReadWriteLock lockEntry; > synchronized (zkLocks) { > if (zkLocks.containsKey(resource)) { > lockEntry = zkLocks.get(resource); > } > else { > lockEntry = new InterProcessReadWriteLock(zk.getClient(), > LOCKS_NODE + "/" + resource); > zkLocks.put(resource, lockEntry); > } > } > InterProcessMutex writeLock = lockEntry.writeLock(); > return acquireLock(wait, writeLock, resource); > } > {code} > Logic for lock releasing > {code} > public void release() { > try { > lock.release(); > if (zkLocks.get(resource) == null) { > return; > } > if (!isLockHeld()) { > synchronized (zkLocks) { > if (zkLocks.get(resource) != null) { > if (!isLockHeld()) { > zkLocks.remove(resource); > } > } > } > } > } > catch (Exception ex) { > LOG.warn("Could not release lock: " + ex.getMessage(), ex); > } > } > {code} > Curator code to acquire lock. > {code} > private boolean internalLock(long time, TimeUnit unit) throws Exception > { > /* >Note on concurrency: a given lockData instance >can be only acted on by a single thread so locking isn't necessary > */ > Thread currentThread = Thread.currentThread(); > LockDatalockData = threadData.get(currentThread); > if ( lockData != null ) > { > // re-entering > lockData.lockCount.incrementAndGet(); > return true; > } > String lockPath = internals.attemptLock(time, unit, > getLockNodeBytes()); > if ( lockPath != null ) > { > LockDatanewLockData = new LockData(currentThread, > lockPath); > threadData.put(currentThread, newLockData); > return true; > } > return false; > } > {code} > The approach we have followed is to use map with weakvalue. Once the lock is > unreachable. GC will remove it from the map. We don't have to explicitly > remove it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (OOZIE-2501) ZK reentrant lock doesn't work for few cases
[ https://issues.apache.org/jira/browse/OOZIE-2501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15406818#comment-15406818 ] Purshotam Shah commented on OOZIE-2501: --- Thanks [~pbacsko] I will go through your comments and if needed I will upload a new patch. > ZK reentrant lock doesn't work for few cases > > > Key: OOZIE-2501 > URL: https://issues.apache.org/jira/browse/OOZIE-2501 > Project: Oozie > Issue Type: Bug >Reporter: Purshotam Shah >Assignee: Purshotam Shah > Fix For: 4.3.0 > > Attachments: OOZIE-2501-V2.patch, OOZIE-2501-V4.patch > > > We will have an issue when oozie trying to acquire a lock and at the same > time, some other thread is releasing the same lock . > acquireLock will wait for 5 sec to acquire the lock. It will bypass the > synchronized block and get lockEntry from the hashmap. > While it waiting for 5 sec to acquire the lock, other thread releases the > lock and may execute the release code which will remove lockEntry from the > map. > If some other command from same thread tries to acquire the lock, it will > create a new InterProcessReadWriteLock object and use that for acquiring the > lock. > Logic for lock acquiring. > {code} > public LockToken getWriteLock(String resource, long wait) throws > InterruptedException { > InterProcessReadWriteLock lockEntry; > synchronized (zkLocks) { > if (zkLocks.containsKey(resource)) { > lockEntry = zkLocks.get(resource); > } > else { > lockEntry = new InterProcessReadWriteLock(zk.getClient(), > LOCKS_NODE + "/" + resource); > zkLocks.put(resource, lockEntry); > } > } > InterProcessMutex writeLock = lockEntry.writeLock(); > return acquireLock(wait, writeLock, resource); > } > {code} > Logic for lock releasing > {code} > public void release() { > try { > lock.release(); > if (zkLocks.get(resource) == null) { > return; > } > if (!isLockHeld()) { > synchronized (zkLocks) { > if (zkLocks.get(resource) != null) { > if (!isLockHeld()) { > zkLocks.remove(resource); > } > } > } > } > } > catch (Exception ex) { > LOG.warn("Could not release lock: " + ex.getMessage(), ex); > } > } > {code} > Curator code to acquire lock. > {code} > private boolean internalLock(long time, TimeUnit unit) throws Exception > { > /* >Note on concurrency: a given lockData instance >can be only acted on by a single thread so locking isn't necessary > */ > Thread currentThread = Thread.currentThread(); > LockDatalockData = threadData.get(currentThread); > if ( lockData != null ) > { > // re-entering > lockData.lockCount.incrementAndGet(); > return true; > } > String lockPath = internals.attemptLock(time, unit, > getLockNodeBytes()); > if ( lockPath != null ) > { > LockDatanewLockData = new LockData(currentThread, > lockPath); > threadData.put(currentThread, newLockData); > return true; > } > return false; > } > {code} > The approach we have followed is to use map with weakvalue. Once the lock is > unreachable. GC will remove it from the map. We don't have to explicitly > remove it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (OOZIE-2501) ZK reentrant lock doesn't work for few cases
[ https://issues.apache.org/jira/browse/OOZIE-2501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15406804#comment-15406804 ] Peter Bacsko commented on OOZIE-2501: - Please see my review comments before you commit regarding tests. You might want to consider them. > ZK reentrant lock doesn't work for few cases > > > Key: OOZIE-2501 > URL: https://issues.apache.org/jira/browse/OOZIE-2501 > Project: Oozie > Issue Type: Bug >Reporter: Purshotam Shah >Assignee: Purshotam Shah > Fix For: 4.3.0 > > Attachments: OOZIE-2501-V2.patch, OOZIE-2501-V4.patch > > > We will have an issue when oozie trying to acquire a lock and at the same > time, some other thread is releasing the same lock . > acquireLock will wait for 5 sec to acquire the lock. It will bypass the > synchronized block and get lockEntry from the hashmap. > While it waiting for 5 sec to acquire the lock, other thread releases the > lock and may execute the release code which will remove lockEntry from the > map. > If some other command from same thread tries to acquire the lock, it will > create a new InterProcessReadWriteLock object and use that for acquiring the > lock. > Logic for lock acquiring. > {code} > public LockToken getWriteLock(String resource, long wait) throws > InterruptedException { > InterProcessReadWriteLock lockEntry; > synchronized (zkLocks) { > if (zkLocks.containsKey(resource)) { > lockEntry = zkLocks.get(resource); > } > else { > lockEntry = new InterProcessReadWriteLock(zk.getClient(), > LOCKS_NODE + "/" + resource); > zkLocks.put(resource, lockEntry); > } > } > InterProcessMutex writeLock = lockEntry.writeLock(); > return acquireLock(wait, writeLock, resource); > } > {code} > Logic for lock releasing > {code} > public void release() { > try { > lock.release(); > if (zkLocks.get(resource) == null) { > return; > } > if (!isLockHeld()) { > synchronized (zkLocks) { > if (zkLocks.get(resource) != null) { > if (!isLockHeld()) { > zkLocks.remove(resource); > } > } > } > } > } > catch (Exception ex) { > LOG.warn("Could not release lock: " + ex.getMessage(), ex); > } > } > {code} > Curator code to acquire lock. > {code} > private boolean internalLock(long time, TimeUnit unit) throws Exception > { > /* >Note on concurrency: a given lockData instance >can be only acted on by a single thread so locking isn't necessary > */ > Thread currentThread = Thread.currentThread(); > LockDatalockData = threadData.get(currentThread); > if ( lockData != null ) > { > // re-entering > lockData.lockCount.incrementAndGet(); > return true; > } > String lockPath = internals.attemptLock(time, unit, > getLockNodeBytes()); > if ( lockPath != null ) > { > LockDatanewLockData = new LockData(currentThread, > lockPath); > threadData.put(currentThread, newLockData); > return true; > } > return false; > } > {code} > The approach we have followed is to use map with weakvalue. Once the lock is > unreachable. GC will remove it from the map. We don't have to explicitly > remove it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (OOZIE-2501) ZK reentrant lock doesn't work for few cases
[ https://issues.apache.org/jira/browse/OOZIE-2501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15406587#comment-15406587 ] Hadoop QA commented on OOZIE-2501: -- Testing JIRA OOZIE-2501 Cleaning local git workspace {color:green}+1 PATCH_APPLIES{color} {color:green}+1 CLEAN{color} {color:green}+1 RAW_PATCH_ANALYSIS{color} .{color:green}+1{color} the patch does not introduce any @author tags .{color:green}+1{color} the patch does not introduce any tabs .{color:green}+1{color} the patch does not introduce any trailing spaces .{color:green}+1{color} the patch does not introduce any line longer than 132 .{color:green}+1{color} the patch does adds/modifies 2 testcase(s) {color:green}+1 RAT{color} .{color:green}+1{color} the patch does not seem to introduce new RAT warnings {color:green}+1 JAVADOC{color} .{color:green}+1{color} the patch does not seem to introduce new Javadoc warnings {color:red}-1 COMPILE{color} .{color:red}-1{color} HEAD does not compile .{color:red}-1{color} patch does not compile .{color:green}+1{color} the patch does not seem to introduce new javac warnings {color:green}+1 BACKWARDS_COMPATIBILITY{color} .{color:green}+1{color} the patch does not change any JPA Entity/Colum/Basic/Lob/Transient annotations .{color:green}+1{color} the patch does not modify JPA files {color:red}-1 TESTS{color} .Tests run: 1796 .Tests failed: 3 .Tests errors: 0 .The patch failed the following testcases: . testBundleRerunInPausedWithError(org.apache.oozie.command.bundle.TestBundleRerunXCommand) . testBundleStatusNotTransitionFromKilled(org.apache.oozie.service.TestStatusTransitService) . testBundleStatusCoordSubmitFails(org.apache.oozie.service.TestStatusTransitService) {color:green}+1 DISTRO{color} .{color:green}+1{color} distro tarball builds with the patch {color:red}*-1 Overall result, please check the reported -1(s)*{color} The full output of the test-patch run is available at . https://builds.apache.org/job/oozie-trunk-precommit-build/3183/ > ZK reentrant lock doesn't work for few cases > > > Key: OOZIE-2501 > URL: https://issues.apache.org/jira/browse/OOZIE-2501 > Project: Oozie > Issue Type: Bug >Reporter: Purshotam Shah >Assignee: Purshotam Shah > Fix For: 4.3.0 > > Attachments: OOZIE-2501-V2.patch, OOZIE-2501-V4.patch > > > We will have an issue when oozie trying to acquire a lock and at the same > time, some other thread is releasing the same lock . > acquireLock will wait for 5 sec to acquire the lock. It will bypass the > synchronized block and get lockEntry from the hashmap. > While it waiting for 5 sec to acquire the lock, other thread releases the > lock and may execute the release code which will remove lockEntry from the > map. > If some other command from same thread tries to acquire the lock, it will > create a new InterProcessReadWriteLock object and use that for acquiring the > lock. > Logic for lock acquiring. > {code} > public LockToken getWriteLock(String resource, long wait) throws > InterruptedException { > InterProcessReadWriteLock lockEntry; > synchronized (zkLocks) { > if (zkLocks.containsKey(resource)) { > lockEntry = zkLocks.get(resource); > } > else { > lockEntry = new InterProcessReadWriteLock(zk.getClient(), > LOCKS_NODE + "/" + resource); > zkLocks.put(resource, lockEntry); > } > } > InterProcessMutex writeLock = lockEntry.writeLock(); > return acquireLock(wait, writeLock, resource); > } > {code} > Logic for lock releasing > {code} > public void release() { > try { > lock.release(); > if (zkLocks.get(resource) == null) { > return; > } > if (!isLockHeld()) { > synchronized (zkLocks) { > if (zkLocks.get(resource) != null) { > if (!isLockHeld()) { > zkLocks.remove(resource); > } > } > } > } > } > catch (Exception ex) { > LOG.warn("Could not release lock: " + ex.getMessage(), ex); > } > } > {code} > Curator code to acquire lock. > {code} > private boolean internalLock(long time, TimeUnit unit) throws Exception > { > /* >Note on concurrency: a given lockData instance >can be only acted on by a single thread so locking isn't necessary > */ > Thread currentThread =
[jira] [Commented] (OOZIE-2501) ZK reentrant lock doesn't work for few cases
[ https://issues.apache.org/jira/browse/OOZIE-2501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15406315#comment-15406315 ] Rohini Palaniswamy commented on OOZIE-2501: --- +1 for https://reviews.apache.org/r/47837/diff/4/ pending jenkins. > ZK reentrant lock doesn't work for few cases > > > Key: OOZIE-2501 > URL: https://issues.apache.org/jira/browse/OOZIE-2501 > Project: Oozie > Issue Type: Bug >Reporter: Purshotam Shah >Assignee: Purshotam Shah > Fix For: 4.3.0 > > Attachments: OOZIE-2501-V2.patch > > > We will have an issue when oozie trying to acquire a lock and at the same > time, some other thread is releasing the same lock . > acquireLock will wait for 5 sec to acquire the lock. It will bypass the > synchronized block and get lockEntry from the hashmap. > While it waiting for 5 sec to acquire the lock, other thread releases the > lock and may execute the release code which will remove lockEntry from the > map. > If some other command from same thread tries to acquire the lock, it will > create a new InterProcessReadWriteLock object and use that for acquiring the > lock. > Logic for lock acquiring. > {code} > public LockToken getWriteLock(String resource, long wait) throws > InterruptedException { > InterProcessReadWriteLock lockEntry; > synchronized (zkLocks) { > if (zkLocks.containsKey(resource)) { > lockEntry = zkLocks.get(resource); > } > else { > lockEntry = new InterProcessReadWriteLock(zk.getClient(), > LOCKS_NODE + "/" + resource); > zkLocks.put(resource, lockEntry); > } > } > InterProcessMutex writeLock = lockEntry.writeLock(); > return acquireLock(wait, writeLock, resource); > } > {code} > Logic for lock releasing > {code} > public void release() { > try { > lock.release(); > if (zkLocks.get(resource) == null) { > return; > } > if (!isLockHeld()) { > synchronized (zkLocks) { > if (zkLocks.get(resource) != null) { > if (!isLockHeld()) { > zkLocks.remove(resource); > } > } > } > } > } > catch (Exception ex) { > LOG.warn("Could not release lock: " + ex.getMessage(), ex); > } > } > {code} > Curator code to acquire lock. > {code} > private boolean internalLock(long time, TimeUnit unit) throws Exception > { > /* >Note on concurrency: a given lockData instance >can be only acted on by a single thread so locking isn't necessary > */ > Thread currentThread = Thread.currentThread(); > LockDatalockData = threadData.get(currentThread); > if ( lockData != null ) > { > // re-entering > lockData.lockCount.incrementAndGet(); > return true; > } > String lockPath = internals.attemptLock(time, unit, > getLockNodeBytes()); > if ( lockPath != null ) > { > LockDatanewLockData = new LockData(currentThread, > lockPath); > threadData.put(currentThread, newLockData); > return true; > } > return false; > } > {code} > The approach we have followed is to use map with weakvalue. Once the lock is > unreachable. GC will remove it from the map. We don't have to explicitly > remove it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (OOZIE-2501) ZK reentrant lock doesn't work for few cases
[ https://issues.apache.org/jira/browse/OOZIE-2501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402827#comment-15402827 ] Purshotam Shah commented on OOZIE-2501: --- ActionStartXCommand is synchronously called from SignalXCommand passing wfJob so that it doesn't have to reload. {code} @Override protected void loadState() throws CommandException { try { jpaService = Services.get().get(JPAService.class); if (jpaService != null) { if (wfJob == null) { this.wfJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW, jobId); } this.wfAction = WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION, actionId); LogUtils.setLogInfo( wfJob); LogUtils.setLogInfo(wfAction); } else { throw new CommandException(ErrorCode.E0610); } } catch (XException ex) { throw new CommandException(ex); } } {code} If there is any error, the command will get queued with wfJob!=null. When requeued command executes, it won't load wfJob and it end up writing stale entry to the database. We have also fixed that in this patch. When the command is queued, we set wfJob to null so that it get loaded from the database when the command executes. > ZK reentrant lock doesn't work for few cases > > > Key: OOZIE-2501 > URL: https://issues.apache.org/jira/browse/OOZIE-2501 > Project: Oozie > Issue Type: Bug >Reporter: Purshotam Shah >Assignee: Purshotam Shah > Attachments: OOZIE-2501-V2.patch > > > We will have an issue when oozie trying to acquire a lock and at the same > time, some other thread is releasing the same lock . > acquireLock will wait for 5 sec to acquire the lock. It will bypass the > synchronized block and get lockEntry from the hashmap. > While it waiting for 5 sec to acquire the lock, other thread releases the > lock and may execute the release code which will remove lockEntry from the > map. > If some other command from same thread tries to acquire the lock, it will > create a new InterProcessReadWriteLock object and use that for acquiring the > lock. > Logic for lock acquiring. > {code} > public LockToken getWriteLock(String resource, long wait) throws > InterruptedException { > InterProcessReadWriteLock lockEntry; > synchronized (zkLocks) { > if (zkLocks.containsKey(resource)) { > lockEntry = zkLocks.get(resource); > } > else { > lockEntry = new InterProcessReadWriteLock(zk.getClient(), > LOCKS_NODE + "/" + resource); > zkLocks.put(resource, lockEntry); > } > } > InterProcessMutex writeLock = lockEntry.writeLock(); > return acquireLock(wait, writeLock, resource); > } > {code} > Logic for lock releasing > {code} > public void release() { > try { > lock.release(); > if (zkLocks.get(resource) == null) { > return; > } > if (!isLockHeld()) { > synchronized (zkLocks) { > if (zkLocks.get(resource) != null) { > if (!isLockHeld()) { > zkLocks.remove(resource); > } > } > } > } > } > catch (Exception ex) { > LOG.warn("Could not release lock: " + ex.getMessage(), ex); > } > } > {code} > Curator code to acquire lock. > {code} > private boolean internalLock(long time, TimeUnit unit) throws Exception > { > /* >Note on concurrency: a given lockData instance >can be only acted on by a single thread so locking isn't necessary > */ > Thread currentThread = Thread.currentThread(); > LockDatalockData = threadData.get(currentThread); > if ( lockData != null ) > { > // re-entering > lockData.lockCount.incrementAndGet(); > return true; > } > String lockPath = internals.attemptLock(time, unit, > getLockNodeBytes()); > if ( lockPath != null ) > { > LockDatanewLockData = new LockData(currentThread, > lockPath); > threadData.put(currentThread, newLockData); > return true; > } > return false; > } > {code} > The approach we have followed is to use map with weakvalue. Once the lock is > unreachable. GC will remove it from the map. We don't have to explicitly > remove
[jira] [Commented] (OOZIE-2501) ZK reentrant lock doesn't work for few cases
[ https://issues.apache.org/jira/browse/OOZIE-2501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402776#comment-15402776 ] Hadoop QA commented on OOZIE-2501: -- Testing JIRA OOZIE-2501 Cleaning local git workspace {color:green}+1 PATCH_APPLIES{color} {color:green}+1 CLEAN{color} {color:green}+1 RAW_PATCH_ANALYSIS{color} .{color:green}+1{color} the patch does not introduce any @author tags .{color:green}+1{color} the patch does not introduce any tabs .{color:green}+1{color} the patch does not introduce any trailing spaces .{color:green}+1{color} the patch does not introduce any line longer than 132 .{color:green}+1{color} the patch does adds/modifies 2 testcase(s) {color:green}+1 RAT{color} .{color:green}+1{color} the patch does not seem to introduce new RAT warnings {color:green}+1 JAVADOC{color} .{color:green}+1{color} the patch does not seem to introduce new Javadoc warnings {color:red}-1 COMPILE{color} .{color:red}-1{color} HEAD does not compile .{color:red}-1{color} patch does not compile .{color:green}+1{color} the patch does not seem to introduce new javac warnings {color:green}+1 BACKWARDS_COMPATIBILITY{color} .{color:green}+1{color} the patch does not change any JPA Entity/Colum/Basic/Lob/Transient annotations .{color:green}+1{color} the patch does not modify JPA files {color:red}-1 TESTS{color} .Tests run: 1795 .Tests failed: 1 .Tests errors: 0 .The patch failed the following testcases: . testMessage_withMixedStatus(org.apache.oozie.command.coord.TestAbandonedCoordChecker) {color:green}+1 DISTRO{color} .{color:green}+1{color} distro tarball builds with the patch {color:red}*-1 Overall result, please check the reported -1(s)*{color} The full output of the test-patch run is available at . https://builds.apache.org/job/oozie-trunk-precommit-build/3168/ > ZK reentrant lock doesn't work for few cases > > > Key: OOZIE-2501 > URL: https://issues.apache.org/jira/browse/OOZIE-2501 > Project: Oozie > Issue Type: Bug >Reporter: Purshotam Shah >Assignee: Purshotam Shah > Attachments: OOZIE-2501-V2.patch > > > We will have an issue when oozie trying to acquire a lock and at the same > time, some other thread is releasing the same lock . > acquireLock will wait for 5 sec to acquire the lock. It will bypass the > synchronized block and get lockEntry from the hashmap. > While it waiting for 5 sec to acquire the lock, other thread releases the > lock and may execute the release code which will remove lockEntry from the > map. > If some other command from same thread tries to acquire the lock, it will > create a new InterProcessReadWriteLock object and use that for acquiring the > lock. > Logic for lock acquiring. > {code} > public LockToken getWriteLock(String resource, long wait) throws > InterruptedException { > InterProcessReadWriteLock lockEntry; > synchronized (zkLocks) { > if (zkLocks.containsKey(resource)) { > lockEntry = zkLocks.get(resource); > } > else { > lockEntry = new InterProcessReadWriteLock(zk.getClient(), > LOCKS_NODE + "/" + resource); > zkLocks.put(resource, lockEntry); > } > } > InterProcessMutex writeLock = lockEntry.writeLock(); > return acquireLock(wait, writeLock, resource); > } > {code} > Logic for lock releasing > {code} > public void release() { > try { > lock.release(); > if (zkLocks.get(resource) == null) { > return; > } > if (!isLockHeld()) { > synchronized (zkLocks) { > if (zkLocks.get(resource) != null) { > if (!isLockHeld()) { > zkLocks.remove(resource); > } > } > } > } > } > catch (Exception ex) { > LOG.warn("Could not release lock: " + ex.getMessage(), ex); > } > } > {code} > Curator code to acquire lock. > {code} > private boolean internalLock(long time, TimeUnit unit) throws Exception > { > /* >Note on concurrency: a given lockData instance >can be only acted on by a single thread so locking isn't necessary > */ > Thread currentThread = Thread.currentThread(); > LockDatalockData = threadData.get(currentThread); > if ( lockData != null ) > { > // re-entering > lockData.lockCount.incrementAndGet(); > return true; >
[jira] [Commented] (OOZIE-2501) ZK reentrant lock doesn't work for few cases
[ https://issues.apache.org/jira/browse/OOZIE-2501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15400259#comment-15400259 ] Hadoop QA commented on OOZIE-2501: -- Testing JIRA OOZIE-2501 Cleaning local git workspace {color:green}+1 PATCH_APPLIES{color} {color:green}+1 CLEAN{color} {color:green}+1 RAW_PATCH_ANALYSIS{color} .{color:green}+1{color} the patch does not introduce any @author tags .{color:green}+1{color} the patch does not introduce any tabs .{color:green}+1{color} the patch does not introduce any trailing spaces .{color:green}+1{color} the patch does not introduce any line longer than 132 .{color:green}+1{color} the patch does adds/modifies 2 testcase(s) {color:green}+1 RAT{color} .{color:green}+1{color} the patch does not seem to introduce new RAT warnings {color:green}+1 JAVADOC{color} .{color:green}+1{color} the patch does not seem to introduce new Javadoc warnings {color:red}-1 COMPILE{color} .{color:red}-1{color} HEAD does not compile .{color:red}-1{color} patch does not compile .{color:green}+1{color} the patch does not seem to introduce new javac warnings {color:green}+1 BACKWARDS_COMPATIBILITY{color} .{color:green}+1{color} the patch does not change any JPA Entity/Colum/Basic/Lob/Transient annotations .{color:green}+1{color} the patch does not modify JPA files {color:red}-1 TESTS{color} .Tests run: 1795 .Tests failed: 130 .Tests errors: 323 .The patch failed the following testcases: . testCoordKillXCommandUniqueness(org.apache.oozie.command.coord.TestCoordKillXCommand) . testCoordKillRemovePushMissingDeps(org.apache.oozie.command.coord.TestCoordKillXCommand) . testTimeoutTimingOutWriteLockThreads(org.apache.oozie.service.TestZKLocksService) . testTimeoutTimingOutWriteLockOozies(org.apache.oozie.service.TestZKLocksService) . testReadLockThreads(org.apache.oozie.service.TestZKLocksService) . testReadLockOozies(org.apache.oozie.service.TestZKLocksService) . testReadWriteLockThreads(org.apache.oozie.service.TestZKLocksService) . testReadWriteLockOozies(org.apache.oozie.service.TestZKLocksService) . testWriteReadLockThreads(org.apache.oozie.service.TestZKLocksService) . testWriteReadLockOozies(org.apache.oozie.service.TestZKLocksService) . testReentrantMultipleCall(org.apache.oozie.service.TestZKLocksService) . testWaitWriteLockThreads(org.apache.oozie.service.TestZKLocksService) . testWaitWriteLockOozies(org.apache.oozie.service.TestZKLocksService) . testNoWaitWriteLockThreads(org.apache.oozie.service.TestZKLocksService) . testNoWaitWriteLockOozies(org.apache.oozie.service.TestZKLocksService) . testTimeoutWaitingWriteLockThreads(org.apache.oozie.service.TestZKLocksService) . testTimeoutWaitingWriteLockOozies(org.apache.oozie.service.TestZKLocksService) . testInterrupt(org.apache.oozie.service.TestCallableQueueService) . testInterruptsWithCompositeCallable(org.apache.oozie.service.TestCallableQueueService) . testInterruptsInCompositeCallable(org.apache.oozie.service.TestCallableQueueService) . testInterruptsWithDistinguishedLockKeys(org.apache.oozie.service.TestCallableQueueService) . testMaxInterruptMapSize(org.apache.oozie.service.TestCallableQueueService) . testFsFailover(org.apache.oozie.action.TestActionFailover) . testTimeOutWithException1(org.apache.oozie.command.coord.TestCoordPushDependencyCheckXCommand) . testRequeueOnException(org.apache.oozie.command.coord.TestCoordPushDependencyCheckXCommand) . testEngine(org.apache.oozie.command.coord.TestFutureActionsTimeOut) . testCoordStatusTransitServiceSuspendedBottomUp(org.apache.oozie.service.TestStatusTransitService) . testCoordStatusTransitServiceBackwardSupport(org.apache.oozie.service.TestStatusTransitService) . testCoordStatusTransitServiceRunning3(org.apache.oozie.service.TestStatusTransitService) . testBundleStatusTransitServiceForTerminalStates(org.apache.oozie.service.TestStatusTransitService) . testCoordStatusTransitServiceSuspendedByUser(org.apache.oozie.service.TestStatusTransitService) . testCoordStatusTransitServiceRunning1(org.apache.oozie.service.TestStatusTransitService) . testCoordStatusTransitServiceKilledByUser2(org.apache.oozie.service.TestStatusTransitService) . testBundleStatusTransitServicePausedWithError(org.apache.oozie.service.TestStatusTransitService) . testBundleStatusTransitServiceKilled2(org.apache.oozie.service.TestStatusTransitService) . testBundleStatusTransitServiceSucceeded1(org.apache.oozie.service.TestStatusTransitService) . testCoordStatusTransitServicePaused(org.apache.oozie.service.TestStatusTransitService) . testBundleStatusTransitServicePaused(org.apache.oozie.service.TestStatusTransitService) .
[jira] [Commented] (OOZIE-2501) ZK reentrant lock doesn't work for few cases
[ https://issues.apache.org/jira/browse/OOZIE-2501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302574#comment-15302574 ] Purshotam Shah commented on OOZIE-2501: --- uploaded new patch. [~rkanter] do you want to take a look? > ZK reentrant lock doesn't work for few cases > > > Key: OOZIE-2501 > URL: https://issues.apache.org/jira/browse/OOZIE-2501 > Project: Oozie > Issue Type: Bug >Reporter: Purshotam Shah >Assignee: Purshotam Shah > > We will have an issue when oozie trying to acquire a lock and at the same > time, some other thread is releasing the same lock . > acquireLock will wait for 5 sec to acquire the lock. It will bypass the > synchronized block and get lockEntry from the hashmap. > While it waiting for 5 sec to acquire the lock, other thread releases the > lock and may execute the release code which will remove lockEntry from the > map. > If some other command from same thread tries to acquire the lock, it will > create a new InterProcessReadWriteLock object and use that for acquiring the > lock. > Logic for lock acquiring. > {code} > public LockToken getWriteLock(String resource, long wait) throws > InterruptedException { > InterProcessReadWriteLock lockEntry; > synchronized (zkLocks) { > if (zkLocks.containsKey(resource)) { > lockEntry = zkLocks.get(resource); > } > else { > lockEntry = new InterProcessReadWriteLock(zk.getClient(), > LOCKS_NODE + "/" + resource); > zkLocks.put(resource, lockEntry); > } > } > InterProcessMutex writeLock = lockEntry.writeLock(); > return acquireLock(wait, writeLock, resource); > } > {code} > Logic for lock releasing > {code} > public void release() { > try { > lock.release(); > if (zkLocks.get(resource) == null) { > return; > } > if (!isLockHeld()) { > synchronized (zkLocks) { > if (zkLocks.get(resource) != null) { > if (!isLockHeld()) { > zkLocks.remove(resource); > } > } > } > } > } > catch (Exception ex) { > LOG.warn("Could not release lock: " + ex.getMessage(), ex); > } > } > {code} > Curator code to acquire lock. > {code} > private boolean internalLock(long time, TimeUnit unit) throws Exception > { > /* >Note on concurrency: a given lockData instance >can be only acted on by a single thread so locking isn't necessary > */ > Thread currentThread = Thread.currentThread(); > LockDatalockData = threadData.get(currentThread); > if ( lockData != null ) > { > // re-entering > lockData.lockCount.incrementAndGet(); > return true; > } > String lockPath = internals.attemptLock(time, unit, > getLockNodeBytes()); > if ( lockPath != null ) > { > LockDatanewLockData = new LockData(currentThread, > lockPath); > threadData.put(currentThread, newLockData); > return true; > } > return false; > } > {code} > The approach we have followed is to use map with weakvalue. Once the lock is > unreachable. GC will remove it from the map. We don't have to explicitly > remove it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (OOZIE-2501) ZK reentrant lock doesn't work for few cases
[ https://issues.apache.org/jira/browse/OOZIE-2501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15237932#comment-15237932 ] Purshotam Shah commented on OOZIE-2501: --- I removed it because it had some issue. Will update new one. > ZK reentrant lock doesn't work for few cases > > > Key: OOZIE-2501 > URL: https://issues.apache.org/jira/browse/OOZIE-2501 > Project: Oozie > Issue Type: Bug >Reporter: Purshotam Shah >Assignee: Purshotam Shah > > We will have an issue when oozie trying to acquire a lock and at the same > time, some other thread is releasing the same lock . > acquireLock will wait for 5 sec to acquire the lock. It will bypass the > synchronized block and get lockEntry from the hashmap. > While it waiting for 5 sec to acquire the lock, other thread releases the > lock and may execute the release code which will remove lockEntry from the > map. > If some other command from same thread tries to acquire the lock, it will > create a new InterProcessReadWriteLock object and use that for acquiring the > lock. > Logic for lock acquiring. > {code} > public LockToken getWriteLock(String resource, long wait) throws > InterruptedException { > InterProcessReadWriteLock lockEntry; > synchronized (zkLocks) { > if (zkLocks.containsKey(resource)) { > lockEntry = zkLocks.get(resource); > } > else { > lockEntry = new InterProcessReadWriteLock(zk.getClient(), > LOCKS_NODE + "/" + resource); > zkLocks.put(resource, lockEntry); > } > } > InterProcessMutex writeLock = lockEntry.writeLock(); > return acquireLock(wait, writeLock, resource); > } > {code} > Logic for lock releasing > {code} > public void release() { > try { > lock.release(); > if (zkLocks.get(resource) == null) { > return; > } > if (!isLockHeld()) { > synchronized (zkLocks) { > if (zkLocks.get(resource) != null) { > if (!isLockHeld()) { > zkLocks.remove(resource); > } > } > } > } > } > catch (Exception ex) { > LOG.warn("Could not release lock: " + ex.getMessage(), ex); > } > } > {code} > Curator code to acquire lock. > {code} > private boolean internalLock(long time, TimeUnit unit) throws Exception > { > /* >Note on concurrency: a given lockData instance >can be only acted on by a single thread so locking isn't necessary > */ > Thread currentThread = Thread.currentThread(); > LockDatalockData = threadData.get(currentThread); > if ( lockData != null ) > { > // re-entering > lockData.lockCount.incrementAndGet(); > return true; > } > String lockPath = internals.attemptLock(time, unit, > getLockNodeBytes()); > if ( lockPath != null ) > { > LockDatanewLockData = new LockData(currentThread, > lockPath); > threadData.put(currentThread, newLockData); > return true; > } > return false; > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (OOZIE-2501) ZK reentrant lock doesn't work for few cases
[ https://issues.apache.org/jira/browse/OOZIE-2501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15237862#comment-15237862 ] Robert Kanter commented on OOZIE-2501: -- [~puru], can you reupload the patch? It's somehow missing... > ZK reentrant lock doesn't work for few cases > > > Key: OOZIE-2501 > URL: https://issues.apache.org/jira/browse/OOZIE-2501 > Project: Oozie > Issue Type: Bug >Reporter: Purshotam Shah >Assignee: Purshotam Shah > > We will have an issue when oozie trying to acquire a lock and at the same > time, some other thread is releasing the same lock . > acquireLock will wait for 5 sec to acquire the lock. It will bypass the > synchronized block and get lockEntry from the hashmap. > While it waiting for 5 sec to acquire the lock, other thread releases the > lock and may execute the release code which will remove lockEntry from the > map. > If some other command from same thread tries to acquire the lock, it will > create a new InterProcessReadWriteLock object and use that for acquiring the > lock. > Logic for lock acquiring. > {code} > public LockToken getWriteLock(String resource, long wait) throws > InterruptedException { > InterProcessReadWriteLock lockEntry; > synchronized (zkLocks) { > if (zkLocks.containsKey(resource)) { > lockEntry = zkLocks.get(resource); > } > else { > lockEntry = new InterProcessReadWriteLock(zk.getClient(), > LOCKS_NODE + "/" + resource); > zkLocks.put(resource, lockEntry); > } > } > InterProcessMutex writeLock = lockEntry.writeLock(); > return acquireLock(wait, writeLock, resource); > } > {code} > Logic for lock releasing > {code} > public void release() { > try { > lock.release(); > if (zkLocks.get(resource) == null) { > return; > } > if (!isLockHeld()) { > synchronized (zkLocks) { > if (zkLocks.get(resource) != null) { > if (!isLockHeld()) { > zkLocks.remove(resource); > } > } > } > } > } > catch (Exception ex) { > LOG.warn("Could not release lock: " + ex.getMessage(), ex); > } > } > {code} > Curator code to acquire lock. > {code} > private boolean internalLock(long time, TimeUnit unit) throws Exception > { > /* >Note on concurrency: a given lockData instance >can be only acted on by a single thread so locking isn't necessary > */ > Thread currentThread = Thread.currentThread(); > LockDatalockData = threadData.get(currentThread); > if ( lockData != null ) > { > // re-entering > lockData.lockCount.incrementAndGet(); > return true; > } > String lockPath = internals.attemptLock(time, unit, > getLockNodeBytes()); > if ( lockPath != null ) > { > LockDatanewLockData = new LockData(currentThread, > lockPath); > threadData.put(currentThread, newLockData); > return true; > } > return false; > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (OOZIE-2501) ZK reentrant lock doesn't work for few cases
[ https://issues.apache.org/jira/browse/OOZIE-2501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15227065#comment-15227065 ] Hadoop QA commented on OOZIE-2501: -- Testing JIRA OOZIE-2501 Cleaning local git workspace {color:green}+1 PATCH_APPLIES{color} {color:green}+1 CLEAN{color} {color:red}-1 RAW_PATCH_ANALYSIS{color} .{color:green}+1{color} the patch does not introduce any @author tags .{color:green}+1{color} the patch does not introduce any tabs .{color:green}+1{color} the patch does not introduce any trailing spaces .{color:green}+1{color} the patch does not introduce any line longer than 132 .{color:red}-1{color} the patch does not add/modify any testcase {color:green}+1 RAT{color} .{color:green}+1{color} the patch does not seem to introduce new RAT warnings {color:green}+1 JAVADOC{color} .{color:green}+1{color} the patch does not seem to introduce new Javadoc warnings {color:green}+1 COMPILE{color} .{color:green}+1{color} HEAD compiles .{color:green}+1{color} patch compiles .{color:green}+1{color} the patch does not seem to introduce new javac warnings {color:green}+1 BACKWARDS_COMPATIBILITY{color} .{color:green}+1{color} the patch does not change any JPA Entity/Colum/Basic/Lob/Transient annotations .{color:green}+1{color} the patch does not modify JPA files {color:red}-1 TESTS{color} - patch does not compile, cannot run testcases {color:green}+1 DISTRO{color} .{color:green}+1{color} distro tarball builds with the patch {color:red}*-1 Overall result, please check the reported -1(s)*{color} The full output of the test-patch run is available at . https://builds.apache.org/job/oozie-trunk-precommit-build/2810/ > ZK reentrant lock doesn't work for few cases > > > Key: OOZIE-2501 > URL: https://issues.apache.org/jira/browse/OOZIE-2501 > Project: Oozie > Issue Type: Bug >Reporter: Purshotam Shah >Assignee: Purshotam Shah > Attachments: OOZIE-2501-V1.patch > > > We will have an issue when oozie trying to acquire a lock and at the same > time, some other thread is releasing the same lock . > acquireLock will wait for 5 sec to acquire the lock. It will bypass the > synchronized block and get lockEntry from the hashmap. > While it waiting for 5 sec to acquire the lock, other thread releases the > lock and may execute the release code which will remove lockEntry from the > map. > If some other command from same thread tries to acquire the lock, it will > create a new InterProcessReadWriteLock object and use that for acquiring the > lock. > Logic for lock acquiring. > {code} > public LockToken getWriteLock(String resource, long wait) throws > InterruptedException { > InterProcessReadWriteLock lockEntry; > synchronized (zkLocks) { > if (zkLocks.containsKey(resource)) { > lockEntry = zkLocks.get(resource); > } > else { > lockEntry = new InterProcessReadWriteLock(zk.getClient(), > LOCKS_NODE + "/" + resource); > zkLocks.put(resource, lockEntry); > } > } > InterProcessMutex writeLock = lockEntry.writeLock(); > return acquireLock(wait, writeLock, resource); > } > {code} > Logic for lock releasing > {code} > public void release() { > try { > lock.release(); > if (zkLocks.get(resource) == null) { > return; > } > if (!isLockHeld()) { > synchronized (zkLocks) { > if (zkLocks.get(resource) != null) { > if (!isLockHeld()) { > zkLocks.remove(resource); > } > } > } > } > } > catch (Exception ex) { > LOG.warn("Could not release lock: " + ex.getMessage(), ex); > } > } > {code} > Curator code to acquire lock. > {code} > private boolean internalLock(long time, TimeUnit unit) throws Exception > { > /* >Note on concurrency: a given lockData instance >can be only acted on by a single thread so locking isn't necessary > */ > Thread currentThread = Thread.currentThread(); > LockDatalockData = threadData.get(currentThread); > if ( lockData != null ) > { > // re-entering > lockData.lockCount.incrementAndGet(); > return true; > } > String lockPath = internals.attemptLock(time, unit, > getLockNodeBytes()); > if ( lockPath != null ) > { > LockData
[jira] [Commented] (OOZIE-2501) ZK reentrant lock doesn't work for few cases
[ https://issues.apache.org/jira/browse/OOZIE-2501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226728#comment-15226728 ] Purshotam Shah commented on OOZIE-2501: --- We already have an existing test case to test this behavior. it's very difficult to have reproducible test case for this kind of scenario. {code:tile=TestZKLocksService.java} public void testReentrantMultipleThread() throws ServiceException, InterruptedException { final String path = UUID.randomUUID().toString(); final ZKLocksService zkls = new ZKLocksService(); zkls.init(Services.get()); try { ThreadLock t1 = new ThreadLock(zkls, path); ThreadLock t2 = new ThreadLock(zkls, path); t1.start(); t1.join(); assertFalse(zkls.getLocks().containsKey(path)); t2.start(); t2.join(); assertFalse(zkls.getLocks().containsKey(path)); } finally { zkls.destroy(); } } {code} > ZK reentrant lock doesn't work for few cases > > > Key: OOZIE-2501 > URL: https://issues.apache.org/jira/browse/OOZIE-2501 > Project: Oozie > Issue Type: Bug >Reporter: Purshotam Shah >Assignee: Purshotam Shah > Attachments: OOZIE-2501-V1.patch > > > We will have an issue when oozie trying to acquire a lock and at the same > time, some other thread is releasing the same lock . > acquireLock will wait for 5 sec to acquire the lock. It will bypass the > synchronized block and get lockEntry from the hashmap. > While it waiting for 5 sec to acquire the lock, other thread releases the > lock and may execute the release code which will remove lockEntry from the > map. > If some other command from same thread tries to acquire the lock, it will > create a new InterProcessReadWriteLock object and use that for acquiring the > lock. > Logic for lock acquiring. > {code} > public LockToken getWriteLock(String resource, long wait) throws > InterruptedException { > InterProcessReadWriteLock lockEntry; > synchronized (zkLocks) { > if (zkLocks.containsKey(resource)) { > lockEntry = zkLocks.get(resource); > } > else { > lockEntry = new InterProcessReadWriteLock(zk.getClient(), > LOCKS_NODE + "/" + resource); > zkLocks.put(resource, lockEntry); > } > } > InterProcessMutex writeLock = lockEntry.writeLock(); > return acquireLock(wait, writeLock, resource); > } > {code} > Logic for lock releasing > {code} > public void release() { > try { > lock.release(); > if (zkLocks.get(resource) == null) { > return; > } > if (!isLockHeld()) { > synchronized (zkLocks) { > if (zkLocks.get(resource) != null) { > if (!isLockHeld()) { > zkLocks.remove(resource); > } > } > } > } > } > catch (Exception ex) { > LOG.warn("Could not release lock: " + ex.getMessage(), ex); > } > } > {code} > Curator code to acquire lock. > {code} > private boolean internalLock(long time, TimeUnit unit) throws Exception > { > /* >Note on concurrency: a given lockData instance >can be only acted on by a single thread so locking isn't necessary > */ > Thread currentThread = Thread.currentThread(); > LockDatalockData = threadData.get(currentThread); > if ( lockData != null ) > { > // re-entering > lockData.lockCount.incrementAndGet(); > return true; > } > String lockPath = internals.attemptLock(time, unit, > getLockNodeBytes()); > if ( lockPath != null ) > { > LockDatanewLockData = new LockData(currentThread, > lockPath); > threadData.put(currentThread, newLockData); > return true; > } > return false; > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)