Repository: oozie
Updated Branches:
  refs/heads/master 84c6d8854 -> 71eccfb9f


OOZIE-3113 Retry for ZK lock release (satishsaley)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/71eccfb9
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/71eccfb9
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/71eccfb9

Branch: refs/heads/master
Commit: 71eccfb9f2a929221fa5b0ea15be78b931b06a64
Parents: 84c6d88
Author: satishsaley <satishsa...@apache.org>
Authored: Tue Jan 2 15:08:29 2018 -0800
Committer: satishsaley <satishsa...@apache.org>
Committed: Tue Jan 2 15:08:29 2018 -0800

----------------------------------------------------------------------
 .../apache/oozie/service/ZKLocksService.java    | 41 +++++++++++---
 core/src/main/resources/oozie-default.xml       |  8 +++
 .../oozie/service/TestZKLocksService.java       | 57 ++++++++++++++++++++
 release-log.txt                                 |  1 +
 4 files changed, 99 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/71eccfb9/core/src/main/java/org/apache/oozie/service/ZKLocksService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/ZKLocksService.java 
b/core/src/main/java/org/apache/oozie/service/ZKLocksService.java
index 2c71c00..6593212 100644
--- a/core/src/main/java/org/apache/oozie/service/ZKLocksService.java
+++ b/core/src/main/java/org/apache/oozie/service/ZKLocksService.java
@@ -38,6 +38,7 @@ import org.apache.curator.utils.ThreadUtils;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.MapMaker;
+import org.apache.zookeeper.KeeperException;
 
 /**
  * Service that provides distributed locks via ZooKeeper.  Requires that a 
ZooKeeper ensemble is available.  The locks will be
@@ -56,6 +57,8 @@ public class ZKLocksService extends MemoryLocksService 
implements Service, Instr
     private static final String REAPING_LEADER_PATH = 
ZKUtils.ZK_BASE_SERVICES_PATH + "/locksChildReaperLeaderPath";
     static final String REAPING_THRESHOLD = CONF_PREFIX + 
"ZKLocksService.locks.reaper.threshold";
     static final String REAPING_THREADS = CONF_PREFIX + 
"ZKLocksService.locks.reaper.threads";
+    private static final String RELEASE_RETRY_TIME_LIMIT_MINUTES = CONF_PREFIX 
+ "ZKLocksService.lock.release.retry.time.limit"
+            + ".minutes";
 
     /**
      * Initialize the zookeeper locks service
@@ -203,19 +206,41 @@ public class ZKLocksService extends MemoryLocksService 
implements Service, Instr
         @Override
         public void release() {
             try {
-                switch (type) {
-                    case WRITE:
-                        lockEntry.writeLock().release();
-                        break;
-                    case READ:
-                        lockEntry.readLock().release();
-                        break;
-                }
+                retriableRelease();
             }
             catch (Exception ex) {
                 LOG.warn("Could not release lock: " + ex.getMessage(), ex);
             }
         }
+
+        /**
+         * Retires on failure to release lock
+         *
+         * @throws InterruptedException
+         */
+        private void retriableRelease() throws Exception {
+            long retryTimeLimit = 
TimeUnit.MINUTES.toSeconds(ConfigurationService.getLong(RELEASE_RETRY_TIME_LIMIT_MINUTES,
 30));
+            int sleepSeconds = 10;
+            for(int retryCount = 1; retryTimeLimit>=0; retryTimeLimit -= 
sleepSeconds, retryCount++) {
+                try {
+                    switch (type) {
+                        case WRITE:
+                            lockEntry.writeLock().release();
+                            break;
+                        case READ:
+                            lockEntry.readLock().release();
+                            break;
+                    }
+                    break;
+                }
+                catch (KeeperException.ConnectionLossException ex) {
+                    LOG.warn("Could not release lock: " + ex.getMessage() + ". 
Retry will be after " + sleepSeconds + " seconds",
+                            ex);
+                    Thread.sleep(TimeUnit.SECONDS.toMillis(sleepSeconds));
+                    LOG.info("Retrying to release lock. Retry number=" + 
retryCount);
+                }
+            }
+        }
     }
 
     @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/oozie/blob/71eccfb9/core/src/main/resources/oozie-default.xml
----------------------------------------------------------------------
diff --git a/core/src/main/resources/oozie-default.xml 
b/core/src/main/resources/oozie-default.xml
index 1c34800..f7ce621 100644
--- a/core/src/main/resources/oozie-default.xml
+++ b/core/src/main/resources/oozie-default.xml
@@ -2570,6 +2570,14 @@ will be the requeue interval for the actions which are 
waiting for a long time w
     </property>
 
     <property>
+        
<name>oozie.service.ZKLocksService.lock.release.retry.time.limit.minutes</name>
+        <value>30</value>
+        <description>
+            On exception while releasing the lock, Oozie will exponentially 
retry till specified minutes before giving up.
+        </description>
+    </property>
+
+    <property>
         <name>oozie.http.hostname</name>
         <value>0.0.0.0</value>
         <description>

http://git-wip-us.apache.org/repos/asf/oozie/blob/71eccfb9/core/src/test/java/org/apache/oozie/service/TestZKLocksService.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/service/TestZKLocksService.java 
b/core/src/test/java/org/apache/oozie/service/TestZKLocksService.java
index d04f04e..b7dee7e 100644
--- a/core/src/test/java/org/apache/oozie/service/TestZKLocksService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestZKLocksService.java
@@ -20,13 +20,21 @@ package org.apache.oozie.service;
 
 import java.util.UUID;
 
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
 import org.apache.oozie.lock.LockToken;
 import org.apache.oozie.lock.TestMemoryLocks;
 import org.apache.oozie.service.ZKLocksService.ZKLockToken;
 import org.apache.oozie.test.ZKXTestCase;
 import org.apache.oozie.util.XLog;
 import org.apache.oozie.util.ZKUtils;
+import org.apache.zookeeper.KeeperException.ConnectionLossException;
 import org.apache.zookeeper.data.Stat;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.concurrent.TimeUnit;
 
 public class TestZKLocksService extends ZKXTestCase {
     private XLog log = XLog.getLog(getClass());
@@ -551,6 +559,55 @@ public class TestZKLocksService extends ZKXTestCase {
         }
     }
 
+    public void testRetriableRelease() throws Exception {
+        final String path = UUID.randomUUID().toString();
+        ZKLocksService zkls = new ZKLocksService();
+        try{
+            zkls.init(Services.get());
+
+            InterProcessReadWriteLock lockEntry = 
Mockito.mock(InterProcessReadWriteLock.class);
+            final InterProcessMutex writeLock = 
Mockito.mock(InterProcessMutex.class);
+            Mockito.when(lockEntry.writeLock()).thenReturn(writeLock);
+            Mockito.doThrow(new 
ConnectionLossException()).when(writeLock).release();
+            Mockito.doNothing().when(writeLock).acquire();
+
+            // put mocked lockEntry
+            zkls.getLocks().putIfAbsent(path,lockEntry);
+
+            LockToken lock = zkls.getWriteLock(path, -1);
+
+            final boolean [] lockReleased = new boolean [] {false};
+            Runnable exceptionStopper = new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        // Stop the exception on release() after some time in 
other thread
+                        Thread.sleep(TimeUnit.SECONDS.toMillis(13));
+                        Mockito.doAnswer(new Answer() {
+                            @Override
+                            public Object answer(InvocationOnMock invocation) 
throws Throwable {
+                                lockReleased[0] = true;
+                                return null;
+                            }
+                        }).when(writeLock).release();
+                    }
+                    catch (Exception e) {
+                        log.error(e);
+                        fail("Test case failed due to " + e.getMessage());
+                    }
+                }
+            };
+            new Thread(exceptionStopper).start();
+            //Try to release the lock
+            lock.release();
+            assertEquals("Failing the test case. The lock should have been 
released", true, lockReleased[0]);
+        }
+        finally {
+            zkls.destroy();
+        }
+    }
+
+
     private void checkLockRelease(String path, ZKLocksService zkls) {
         if (zkls.getLocks().get(path) == null) {
             // good, lock is removed from memory after gc.

http://git-wip-us.apache.org/repos/asf/oozie/blob/71eccfb9/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 3e344b7..dbf69ed 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 5.0.0 release (trunk - unreleased)
 
+OOZIE-3113 Retry for ZK lock release (satishsaley)
 OOZIE-3142 Integer Overflows in Purge retentionTime (Prabhu Joseph via 
andras.piros)
 OOZIE-3143 AG_Install.twiki needs some refinement (kmarton via andras.piros)
 OOZIE-3127 Remove redundant check for user (satishsaley)

Reply via email to