This is an automated email from the ASF dual-hosted git repository. eshu11 pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new a94fd4d GEODE-3516: Avoid tryResume call to add the thread again into the waiting thread queue a94fd4d is described below commit a94fd4dd288277df54b1b3b90959097c928ff67d Author: eshu <e...@pivotal.io> AuthorDate: Wed Sep 6 14:53:47 2017 -0700 GEODE-3516: Avoid tryResume call to add the thread again into the waiting thread queue Avoid tryResume add a thread multiple times into the waiting queue. Add a unit test that verify the fix. --- .../apache/geode/internal/cache/TXManagerImpl.java | 61 ++++++++++++---------- .../internal/cache/TXManagerImplJUnitTest.java | 59 +++++++++++++++++++++ 2 files changed, 92 insertions(+), 28 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java index a0a4d7c..b106546 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java @@ -1301,50 +1301,55 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene */ private ConcurrentMap<TransactionId, Queue<Thread>> waitMap = new ConcurrentHashMap<>(); + Queue<Thread> getWaitQueue(TransactionId transactionId) { + return waitMap.get(transactionId); + } + + private Queue<Thread> getOrCreateWaitQueue(TransactionId transactionId) { + Queue<Thread> threadq = getWaitQueue(transactionId); + if (threadq == null) { + threadq = new ConcurrentLinkedQueue<Thread>(); + Queue<Thread> oldq = waitMap.putIfAbsent(transactionId, threadq); + if (oldq != null) { + threadq = oldq; + } + } + return threadq; + } + public boolean tryResume(TransactionId transactionId, long time, TimeUnit unit) { if (transactionId == null || getTXState() != null || !exists(transactionId)) { return false; } - Thread currentThread = Thread.currentThread(); - long timeout = unit.toNanos(time); - long startTime = System.nanoTime(); - Queue<Thread> threadq = null; + final Thread currentThread = Thread.currentThread(); + final long endTime = System.nanoTime() + unit.toNanos(time); + final Queue<Thread> threadq = getOrCreateWaitQueue(transactionId); try { while (true) { - threadq = waitMap.get(transactionId); - if (threadq == null) { - threadq = new ConcurrentLinkedQueue<Thread>(); - Queue<Thread> oldq = waitMap.putIfAbsent(transactionId, threadq); - if (oldq != null) { - threadq = oldq; - } + if (!threadq.contains(currentThread)) { + threadq.add(currentThread); } - threadq.add(currentThread); - // after putting this thread in waitMap, we should check for - // an entry in suspendedTXs. if no entry is found in suspendedTXs - // next invocation of suspend() will unblock this thread if (tryResume(transactionId)) { return true; - } else if (!exists(transactionId)) { + } + if (!exists(transactionId)) { return false; } - LockSupport.parkNanos(timeout); - long nowTime = System.nanoTime(); - timeout -= nowTime - startTime; - startTime = nowTime; - if (timeout <= 0) { - break; + long parkTimeout = endTime - System.nanoTime(); + if (parkTimeout <= 0) { + return false; } + parkToRetryResume(parkTimeout); } } finally { - threadq = waitMap.get(transactionId); - if (threadq != null) { - threadq.remove(currentThread); - // the queue itself will be removed at commit/rollback - } + threadq.remove(currentThread); + // the queue itself will be removed at commit/rollback } - return false; + } + + void parkToRetryResume(long timeout) { + LockSupport.parkNanos(timeout); } public boolean exists(TransactionId transactionId) { diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/TXManagerImplJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/TXManagerImplJUnitTest.java index a2c1e70..d252d92 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/TXManagerImplJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/TXManagerImplJUnitTest.java @@ -14,6 +14,9 @@ */ package org.apache.geode.internal.cache; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; + import org.apache.geode.cache.*; import org.apache.geode.distributed.internal.DistributionConfig; import org.apache.geode.test.junit.categories.IntegrationTest; @@ -23,8 +26,11 @@ import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import java.util.Properties; +import java.util.Queue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -331,4 +337,57 @@ public class TXManagerImplJUnitTest { protected void callIsDistributed(TXManagerImpl txMgr) { assertFalse(txMgr.isDistributed()); } + + @Test + public void testTryResumeRemoveItselfFromWaitingQueue() throws Exception { + int time = 30; + long timeout = TimeUnit.SECONDS.toNanos(time); + TXManagerImpl txMgr = (TXManagerImpl) cache.getCacheTransactionManager(); + TXManagerImpl spyMgr = spy(txMgr); + doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + Thread.sleep(10); + return null; + } + }).when(spyMgr).parkToRetryResume(timeout); + spyMgr.begin(); + region.put("key", "value"); + final TransactionId txId = spyMgr.suspend(); + spyMgr.resume(txId); + final CountDownLatch latch1 = new CountDownLatch(2); + final CountDownLatch latch2 = new CountDownLatch(2); + Thread t1 = new Thread(new Runnable() { + public void run() { + latch1.countDown(); + assertTrue(spyMgr.tryResume(txId, time, TimeUnit.SECONDS)); + region.put("key1", "value1"); + assertEquals(txId, spyMgr.suspend()); + latch2.countDown(); + } + }); + Thread t2 = new Thread(new Runnable() { + public void run() { + latch1.countDown(); + assertTrue(spyMgr.tryResume(txId, time, TimeUnit.SECONDS)); + region.put("key2", "value1"); + assertEquals(txId, spyMgr.suspend()); + latch2.countDown(); + } + }); + t1.start(); + t2.start(); + Thread.sleep(300); + if (!latch1.await(30, TimeUnit.SECONDS)) { + fail("junit test failed"); + } + spyMgr.suspend(); + if (!latch2.await(30, TimeUnit.SECONDS)) { + fail("junit test failed"); + } + spyMgr.tryResume(txId, time, TimeUnit.SECONDS); + assertEquals(3, region.size()); + assertEquals(0, spyMgr.getWaitQueue(txId).size()); + spyMgr.commit(); + } } -- To stop receiving notification emails like this one, please contact ['"commits@geode.apache.org" <commits@geode.apache.org>'].