This is an automated email from the ASF dual-hosted git repository.

eshu11 pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new a94fd4d  GEODE-3516: Avoid tryResume call to add the thread again into 
the waiting thread queue
a94fd4d is described below

commit a94fd4dd288277df54b1b3b90959097c928ff67d
Author: eshu <e...@pivotal.io>
AuthorDate: Wed Sep 6 14:53:47 2017 -0700

    GEODE-3516: Avoid tryResume call to add the thread again into the waiting 
thread queue
    
        Avoid tryResume add a thread multiple times into the waiting queue.
        Add a unit test that verify the fix.
---
 .../apache/geode/internal/cache/TXManagerImpl.java | 61 ++++++++++++----------
 .../internal/cache/TXManagerImplJUnitTest.java     | 59 +++++++++++++++++++++
 2 files changed, 92 insertions(+), 28 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
index a0a4d7c..b106546 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
@@ -1301,50 +1301,55 @@ public class TXManagerImpl implements 
CacheTransactionManager, MembershipListene
    */
   private ConcurrentMap<TransactionId, Queue<Thread>> waitMap = new 
ConcurrentHashMap<>();
 
+  Queue<Thread> getWaitQueue(TransactionId transactionId) {
+    return waitMap.get(transactionId);
+  }
+
+  private Queue<Thread> getOrCreateWaitQueue(TransactionId transactionId) {
+    Queue<Thread> threadq = getWaitQueue(transactionId);
+    if (threadq == null) {
+      threadq = new ConcurrentLinkedQueue<Thread>();
+      Queue<Thread> oldq = waitMap.putIfAbsent(transactionId, threadq);
+      if (oldq != null) {
+        threadq = oldq;
+      }
+    }
+    return threadq;
+  }
+
   public boolean tryResume(TransactionId transactionId, long time, TimeUnit 
unit) {
     if (transactionId == null || getTXState() != null || 
!exists(transactionId)) {
       return false;
     }
-    Thread currentThread = Thread.currentThread();
-    long timeout = unit.toNanos(time);
-    long startTime = System.nanoTime();
-    Queue<Thread> threadq = null;
+    final Thread currentThread = Thread.currentThread();
+    final long endTime = System.nanoTime() + unit.toNanos(time);
+    final Queue<Thread> threadq = getOrCreateWaitQueue(transactionId);
 
     try {
       while (true) {
-        threadq = waitMap.get(transactionId);
-        if (threadq == null) {
-          threadq = new ConcurrentLinkedQueue<Thread>();
-          Queue<Thread> oldq = waitMap.putIfAbsent(transactionId, threadq);
-          if (oldq != null) {
-            threadq = oldq;
-          }
+        if (!threadq.contains(currentThread)) {
+          threadq.add(currentThread);
         }
-        threadq.add(currentThread);
-        // after putting this thread in waitMap, we should check for
-        // an entry in suspendedTXs. if no entry is found in suspendedTXs
-        // next invocation of suspend() will unblock this thread
         if (tryResume(transactionId)) {
           return true;
-        } else if (!exists(transactionId)) {
+        }
+        if (!exists(transactionId)) {
           return false;
         }
-        LockSupport.parkNanos(timeout);
-        long nowTime = System.nanoTime();
-        timeout -= nowTime - startTime;
-        startTime = nowTime;
-        if (timeout <= 0) {
-          break;
+        long parkTimeout = endTime - System.nanoTime();
+        if (parkTimeout <= 0) {
+          return false;
         }
+        parkToRetryResume(parkTimeout);
       }
     } finally {
-      threadq = waitMap.get(transactionId);
-      if (threadq != null) {
-        threadq.remove(currentThread);
-        // the queue itself will be removed at commit/rollback
-      }
+      threadq.remove(currentThread);
+      // the queue itself will be removed at commit/rollback
     }
-    return false;
+  }
+
+  void parkToRetryResume(long timeout) {
+    LockSupport.parkNanos(timeout);
   }
 
   public boolean exists(TransactionId transactionId) {
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/TXManagerImplJUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/TXManagerImplJUnitTest.java
index a2c1e70..d252d92 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/TXManagerImplJUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/TXManagerImplJUnitTest.java
@@ -14,6 +14,9 @@
  */
 package org.apache.geode.internal.cache;
 
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+
 import org.apache.geode.cache.*;
 import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.test.junit.categories.IntegrationTest;
@@ -23,8 +26,11 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import java.util.Properties;
+import java.util.Queue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -331,4 +337,57 @@ public class TXManagerImplJUnitTest {
   protected void callIsDistributed(TXManagerImpl txMgr) {
     assertFalse(txMgr.isDistributed());
   }
+
+  @Test
+  public void testTryResumeRemoveItselfFromWaitingQueue() throws Exception {
+    int time = 30;
+    long timeout = TimeUnit.SECONDS.toNanos(time);
+    TXManagerImpl txMgr = (TXManagerImpl) cache.getCacheTransactionManager();
+    TXManagerImpl spyMgr = spy(txMgr);
+    doAnswer(new Answer<Void>() {
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        Thread.sleep(10);
+        return null;
+      }
+    }).when(spyMgr).parkToRetryResume(timeout);
+    spyMgr.begin();
+    region.put("key", "value");
+    final TransactionId txId = spyMgr.suspend();
+    spyMgr.resume(txId);
+    final CountDownLatch latch1 = new CountDownLatch(2);
+    final CountDownLatch latch2 = new CountDownLatch(2);
+    Thread t1 = new Thread(new Runnable() {
+      public void run() {
+        latch1.countDown();
+        assertTrue(spyMgr.tryResume(txId, time, TimeUnit.SECONDS));
+        region.put("key1", "value1");
+        assertEquals(txId, spyMgr.suspend());
+        latch2.countDown();
+      }
+    });
+    Thread t2 = new Thread(new Runnable() {
+      public void run() {
+        latch1.countDown();
+        assertTrue(spyMgr.tryResume(txId, time, TimeUnit.SECONDS));
+        region.put("key2", "value1");
+        assertEquals(txId, spyMgr.suspend());
+        latch2.countDown();
+      }
+    });
+    t1.start();
+    t2.start();
+    Thread.sleep(300);
+    if (!latch1.await(30, TimeUnit.SECONDS)) {
+      fail("junit test failed");
+    }
+    spyMgr.suspend();
+    if (!latch2.await(30, TimeUnit.SECONDS)) {
+      fail("junit test failed");
+    }
+    spyMgr.tryResume(txId, time, TimeUnit.SECONDS);
+    assertEquals(3, region.size());
+    assertEquals(0, spyMgr.getWaitQueue(txId).size());
+    spyMgr.commit();
+  }
 }

-- 
To stop receiving notification emails like this one, please contact
['"commits@geode.apache.org" <commits@geode.apache.org>'].

Reply via email to