[ 
https://issues.apache.org/jira/browse/OOZIE-2501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15402827#comment-15402827
 ] 

Purshotam Shah commented on OOZIE-2501:
---------------------------------------

ActionStartXCommand is synchronously called from SignalXCommand passing wfJob 
so that it doesn't have to reload.
{code}
@Override
    protected void loadState() throws CommandException {
        try {
            jpaService = Services.get().get(JPAService.class);
            if (jpaService != null) {
                if (wfJob == null) {
                    this.wfJob = 
WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW, 
jobId);
                }
                this.wfAction = 
WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION, 
actionId);
                LogUtils.setLogInfo( wfJob);
                LogUtils.setLogInfo(wfAction);
            }
            else {
                throw new CommandException(ErrorCode.E0610);
            }
        }
        catch (XException ex) {
            throw new CommandException(ex);
        }
    }
{code}

If there is any error, the command will get queued with wfJob!=null. When 
requeued command executes, it won't load wfJob and it end up writing stale 
entry to the database.

We have also fixed that in this patch. When the command is queued, we set wfJob 
to null so that it get loaded from the database when the command executes.


> ZK reentrant lock doesn't work for few cases
> --------------------------------------------
>
>                 Key: OOZIE-2501
>                 URL: https://issues.apache.org/jira/browse/OOZIE-2501
>             Project: Oozie
>          Issue Type: Bug
>            Reporter: Purshotam Shah
>            Assignee: Purshotam Shah
>         Attachments: OOZIE-2501-V2.patch
>
>
> We will have an issue when oozie trying to acquire a lock and at the same 
> time, some other thread is releasing the same lock .
> acquireLock will wait for 5 sec to acquire the lock. It will bypass the 
> synchronized block and get lockEntry from the hashmap.
> While it waiting for 5 sec to acquire the lock, other thread releases the 
> lock and may execute the release code which will remove  lockEntry from the 
> map.
> If some other command from same thread tries to acquire the lock, it will 
> create a new InterProcessReadWriteLock object and use that for acquiring the 
> lock. 
> Logic for lock acquiring.
> {code}
>  public LockToken getWriteLock(String resource, long wait) throws 
> InterruptedException {
>         InterProcessReadWriteLock lockEntry;
>         synchronized (zkLocks) {
>             if (zkLocks.containsKey(resource)) {
>                 lockEntry = zkLocks.get(resource);
>             }
>             else {
>                 lockEntry = new InterProcessReadWriteLock(zk.getClient(), 
> LOCKS_NODE + "/" + resource);
>                 zkLocks.put(resource, lockEntry);
>             }
>         }
>         InterProcessMutex writeLock = lockEntry.writeLock();
>         return acquireLock(wait, writeLock, resource);
>     }
> {code}
> Logic for lock releasing
> {code}
> public void release() {
>             try {
>                 lock.release();
>                 if (zkLocks.get(resource) == null) {
>                     return;
>                 }
>                 if (!isLockHeld()) {
>                     synchronized (zkLocks) {
>                         if (zkLocks.get(resource) != null) {
>                             if (!isLockHeld()) {
>                                 zkLocks.remove(resource);
>                             }
>                         }
>                     }
>                 }
>             }
>             catch (Exception ex) {
>                 LOG.warn("Could not release lock: " + ex.getMessage(), ex);
>             }
>         }
> {code}
> Curator code to acquire lock.
> {code}
> private boolean internalLock(long time, TimeUnit unit) throws Exception
>     {
>         /*
>            Note on concurrency: a given lockData instance
>            can be only acted on by a single thread so locking isn't necessary
>         */
>         Thread          currentThread = Thread.currentThread();
>         LockData        lockData = threadData.get(currentThread);
>         if ( lockData != null )
>         {
>             // re-entering
>             lockData.lockCount.incrementAndGet();
>             return true;
>         }
>         String lockPath = internals.attemptLock(time, unit, 
> getLockNodeBytes());
>         if ( lockPath != null )
>         {
>             LockData        newLockData = new LockData(currentThread, 
> lockPath);
>             threadData.put(currentThread, newLockData);
>             return true;
>         }
>         return false;
>     }
> {code}
> The approach we have followed is to use map with weakvalue. Once the lock is 
> unreachable. GC will remove it from the map. We don't have to explicitly 
> remove it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to