[ https://issues.apache.org/jira/browse/OOZIE-2501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15302574#comment-15302574 ]
Purshotam Shah commented on OOZIE-2501: --------------------------------------- uploaded new patch. [~rkanter] do you want to take a look? > ZK reentrant lock doesn't work for few cases > -------------------------------------------- > > Key: OOZIE-2501 > URL: https://issues.apache.org/jira/browse/OOZIE-2501 > Project: Oozie > Issue Type: Bug > Reporter: Purshotam Shah > Assignee: Purshotam Shah > > We will have an issue when oozie trying to acquire a lock and at the same > time, some other thread is releasing the same lock . > acquireLock will wait for 5 sec to acquire the lock. It will bypass the > synchronized block and get lockEntry from the hashmap. > While it waiting for 5 sec to acquire the lock, other thread releases the > lock and may execute the release code which will remove lockEntry from the > map. > If some other command from same thread tries to acquire the lock, it will > create a new InterProcessReadWriteLock object and use that for acquiring the > lock. > Logic for lock acquiring. > {code} > public LockToken getWriteLock(String resource, long wait) throws > InterruptedException { > InterProcessReadWriteLock lockEntry; > synchronized (zkLocks) { > if (zkLocks.containsKey(resource)) { > lockEntry = zkLocks.get(resource); > } > else { > lockEntry = new InterProcessReadWriteLock(zk.getClient(), > LOCKS_NODE + "/" + resource); > zkLocks.put(resource, lockEntry); > } > } > InterProcessMutex writeLock = lockEntry.writeLock(); > return acquireLock(wait, writeLock, resource); > } > {code} > Logic for lock releasing > {code} > public void release() { > try { > lock.release(); > if (zkLocks.get(resource) == null) { > return; > } > if (!isLockHeld()) { > synchronized (zkLocks) { > if (zkLocks.get(resource) != null) { > if (!isLockHeld()) { > zkLocks.remove(resource); > } > } > } > } > } > catch (Exception ex) { > LOG.warn("Could not release lock: " + ex.getMessage(), ex); > } > } > {code} > Curator code to acquire lock. > {code} > private boolean internalLock(long time, TimeUnit unit) throws Exception > { > /* > Note on concurrency: a given lockData instance > can be only acted on by a single thread so locking isn't necessary > */ > Thread currentThread = Thread.currentThread(); > LockData lockData = threadData.get(currentThread); > if ( lockData != null ) > { > // re-entering > lockData.lockCount.incrementAndGet(); > return true; > } > String lockPath = internals.attemptLock(time, unit, > getLockNodeBytes()); > if ( lockPath != null ) > { > LockData newLockData = new LockData(currentThread, > lockPath); > threadData.put(currentThread, newLockData); > return true; > } > return false; > } > {code} > The approach we have followed is to use map with weakvalue. Once the lock is > unreachable. GC will remove it from the map. We don't have to explicitly > remove it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)