GEODE-2024 Deadlock creating a new lock service Grantor This change-set causes the code in TXLockServiceImpl.release() to perform periodic checks to see if grantor recovery is being performed. If so it skips releaseTryLocks, which requires a recovered grantor to function. This is in line with the previous attempts to fix this problem. The recovery message that is trying to obtain the recovery write-lock now sets the "recovering" state in TXLockServiceImpl prior to attempting to get the lock so that it is set when TXLockServiceImpl.release() checks its state.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/f02ea36f Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/f02ea36f Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/f02ea36f Branch: refs/heads/feature/GEM-983 Commit: f02ea36f2e3a440e8aa39815539f3aa2855ce124 Parents: 69a0877 Author: Bruce Schuchardt <bschucha...@pivotal.io> Authored: Wed Oct 26 13:51:20 2016 -0700 Committer: Bruce Schuchardt <bschucha...@pivotal.io> Committed: Wed Oct 26 13:53:00 2016 -0700 ---------------------------------------------------------------------- .../locks/DLockRecoverGrantorProcessor.java | 16 +- .../internal/locks/DLockService.java | 108 ++++++---- .../internal/cache/locks/TXLockServiceImpl.java | 35 ++-- .../locks/TXRecoverGrantorMessageProcessor.java | 8 +- .../cache/locks/TXLockServiceDUnitTest.java | 210 ++++++++++++++----- 5 files changed, 272 insertions(+), 105 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f02ea36f/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockRecoverGrantorProcessor.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockRecoverGrantorProcessor.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockRecoverGrantorProcessor.java index 37fbfbe..2a48308 100755 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockRecoverGrantorProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockRecoverGrantorProcessor.java @@ -91,7 +91,7 @@ public class DLockRecoverGrantorProcessor extends ReplyProcessor21 { // process msg and reply from this VM... if (msg.getSender() == null) msg.setSender(dm.getId()); - msg.processMessage(dm); + msg.scheduleMessage(dm); // keep waiting even if interrupted try { @@ -239,6 +239,20 @@ public class DLockRecoverGrantorProcessor extends ReplyProcessor21 { processMessage(dm); } + /** + * For unit testing we need to push the message through scheduleAction so that message observers + * are invoked + * + * @param dm the distribution manager + */ + protected void scheduleMessage(DM dm) { + if (dm instanceof DistributionManager) { + super.scheduleAction((DistributionManager) dm); + } else { + processMessage(dm); + } + } + protected void processMessage(DM dm) { MessageProcessor processor = nullServiceProcessor; http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f02ea36f/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockService.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockService.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockService.java index a859299..ca012d3 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockService.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockService.java @@ -15,8 +15,18 @@ package org.apache.geode.distributed.internal.locks; -import org.apache.geode.*; -import org.apache.geode.distributed.*; +import org.apache.geode.CancelCriterion; +import org.apache.geode.CancelException; +import org.apache.geode.InternalGemFireError; +import org.apache.geode.InternalGemFireException; +import org.apache.geode.StatisticsFactory; +import org.apache.geode.SystemFailure; +import org.apache.geode.distributed.DistributedLockService; +import org.apache.geode.distributed.DistributedSystem; +import org.apache.geode.distributed.DistributedSystemDisconnectedException; +import org.apache.geode.distributed.LeaseExpiredException; +import org.apache.geode.distributed.LockNotHeldException; +import org.apache.geode.distributed.LockServiceDestroyedException; import org.apache.geode.distributed.internal.DM; import org.apache.geode.distributed.internal.DistributionConfig; import org.apache.geode.distributed.internal.InternalDistributedSystem; @@ -39,8 +49,18 @@ import org.apache.logging.log4j.Logger; import java.io.DataInput; import java.io.DataOutput; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; /** @@ -290,7 +310,7 @@ public class DLockService extends DistributedLockService { statStart = getStats().startGrantorWait(); if (!ownLockGrantorFutureResult) { LockGrantorId lockGrantorIdRef = - waitForLockGrantorFutureResult(lockGrantorFutureResultRef); + waitForLockGrantorFutureResult(lockGrantorFutureResultRef, 0, TimeUnit.MILLISECONDS); if (lockGrantorIdRef != null) { return lockGrantorIdRef; } else { @@ -366,7 +386,7 @@ public class DLockService extends DistributedLockService { /** * Creates a local {@link DLockGrantor}. - * + * * if (!createLocalGrantor(xxx)) { theLockGrantorId = this.lockGrantorId; } * * @param elder the elder that told us to be the grantor @@ -727,15 +747,24 @@ public class DLockService extends DistributedLockService { * Returns lockGrantorId when lockGrantorFutureResultRef has been set by another thread. * * @param lockGrantorFutureResultRef FutureResult to wait for + * @param timeToWait how many ms to wait, 0 = forever + * @param timeUnit the unit of measure for timeToWait * @return the LockGrantorId or null if FutureResult was cancelled */ - private LockGrantorId waitForLockGrantorFutureResult(FutureResult lockGrantorFutureResultRef) { + private LockGrantorId waitForLockGrantorFutureResult(FutureResult lockGrantorFutureResultRef, + long timeToWait, final TimeUnit timeUnit) { LockGrantorId lockGrantorIdRef = null; while (lockGrantorIdRef == null) { boolean interrupted = Thread.interrupted(); try { checkDestroyed(); - lockGrantorIdRef = (LockGrantorId) lockGrantorFutureResultRef.get(); + if (timeToWait == 0) { + lockGrantorIdRef = (LockGrantorId) lockGrantorFutureResultRef.get(); + } else { + lockGrantorIdRef = (LockGrantorId) lockGrantorFutureResultRef.get(timeToWait, timeUnit); + } + } catch (TimeoutException e) { + break; } catch (InterruptedException e) { interrupted = true; this.dm.getCancelCriterion().checkCancelInProgress(e); @@ -757,7 +786,14 @@ public class DLockService extends DistributedLockService { return lockGrantorIdRef; } - private void notLockGrantorId(LockGrantorId notLockGrantorId, boolean waitForGrantor) { + /** + * nulls out grantor to force call to elder + * + * @param timeToWait how long to wait for a new grantor. -1 don't wait, 0 no time limit + * @param timeUnit the unit of measure of timeToWait + */ + private void notLockGrantorId(LockGrantorId notLockGrantorId, long timeToWait, + final TimeUnit timeUnit) { if (notLockGrantorId.isLocal(getSerialNumber())) { if (logger.isTraceEnabled(LogMarker.DLS)) { logger.trace(LogMarker.DLS, @@ -793,8 +829,8 @@ public class DLockService extends DistributedLockService { statStart = getStats().startGrantorWait(); if (!ownLockGrantorFutureResult) { - if (waitForGrantor) { // fix for bug #43708 - waitForLockGrantorFutureResult(lockGrantorFutureResultRef); + if (timeToWait >= 0) { + waitForLockGrantorFutureResult(lockGrantorFutureResultRef, timeToWait, timeUnit); } return; } @@ -947,7 +983,7 @@ public class DLockService extends DistributedLockService { } } if (!ownLockGrantorFutureResult) { - waitForLockGrantorFutureResult(lockGrantorFutureResultRef); + waitForLockGrantorFutureResult(lockGrantorFutureResultRef, 0, TimeUnit.MILLISECONDS); continue; } } @@ -1329,7 +1365,7 @@ public class DLockService extends DistributedLockService { * will be ignored if the lock is currently held by another client. * * @param interruptible true if this lock request is interruptible - * + * * @param disableAlerts true to disable logging alerts if the dlock is taking a long time to * acquired. * @@ -1408,7 +1444,6 @@ public class DLockService extends DistributedLockService { LocalizedStrings.DLockService_DEBUG_LOCKINTERRUPTIBLY_HAS_GONE_HOT_AND_LOOPED_0_TIMES .toLocalizedString(count); - InternalGemFireError e = new InternalGemFireError(s); logger.error(LogMarker.DLS, LocalizedMessage.create( @@ -1516,7 +1551,6 @@ public class DLockService extends DistributedLockService { } } // else: non-reentrant or reentrant w/ non-infinite lease - if (gotLock) { // if (processor != null) (cannot be null) { // TODO: can be null after restoring above optimization @@ -1539,9 +1573,7 @@ public class DLockService extends DistributedLockService { this.lockGrantorId); } continue; - } - - else if (isDestroyed()) { + } else if (isDestroyed()) { // race: dls was destroyed if (isDebugEnabled_DLS) { logger.trace(LogMarker.DLS, @@ -1549,9 +1581,7 @@ public class DLockService extends DistributedLockService { theLockGrantorId); } needToReleaseOrphanedGrant = true; - } - - else { + } else { safeExit = true; synchronized (this.tokens) { checkDestroyed(); @@ -1603,7 +1633,7 @@ public class DLockService extends DistributedLockService { // grantor replied NOT_GRANTOR or departed (getLock is false) else if (processor.repliedNotGrantor() || processor.hadNoResponse()) { safeExit = true; - notLockGrantorId(theLockGrantorId, true); + notLockGrantorId(theLockGrantorId, 0, TimeUnit.MILLISECONDS); // keepTrying is still true... loop back around } // grantor replied NOT_GRANTOR or departed @@ -1912,7 +1942,7 @@ public class DLockService extends DistributedLockService { released = true; } finally { if (!released) { - notLockGrantorId(theLockGrantorId, true); + notLockGrantorId(theLockGrantorId, 0, TimeUnit.MILLISECONDS); } } } // while !released @@ -1966,7 +1996,7 @@ public class DLockService extends DistributedLockService { // loop back around to get next lock grantor } finally { if (queryReply != null && queryReply.repliedNotGrantor()) { - notLockGrantorId(theLockGrantorId, true); + notLockGrantorId(theLockGrantorId, 0, TimeUnit.MILLISECONDS); } } } // while querying @@ -2076,7 +2106,7 @@ public class DLockService extends DistributedLockService { return this.dlockStats; } - public void releaseTryLocks(DLockBatchId batchId, boolean onlyIfSameGrantor) { + public void releaseTryLocks(DLockBatchId batchId, Callable<Boolean> untilCondition) { final boolean isDebugEnabled_DLS = logger.isTraceEnabled(LogMarker.DLS); if (isDebugEnabled_DLS) { logger.trace(LogMarker.DLS, "[DLockService.releaseTryLocks] enter: {}", batchId); @@ -2088,26 +2118,29 @@ public class DLockService extends DistributedLockService { boolean lockBatch = true; boolean released = false; while (!released) { + try { + boolean quit = untilCondition.call(); + if (quit) { + return; + } + } catch (Exception e) { + throw new InternalGemFireException("unexpected exception", e); + } checkDestroyed(); LockGrantorId theLockGrantorId = null; - if (onlyIfSameGrantor) { // this was a fix for bug #38763, from r19555 - theLockGrantorId = batchId.getLockGrantorId(); - synchronized (this.lockGrantorIdLock) { - if (!checkLockGrantorId(theLockGrantorId)) { - // the grantor is different so break and skip DLockReleaseProcessor - break; - } + theLockGrantorId = batchId.getLockGrantorId(); + synchronized (this.lockGrantorIdLock) { + if (!checkLockGrantorId(theLockGrantorId)) { + // the grantor is different so break and skip DLockReleaseProcessor + break; } - } else { - theLockGrantorId = getLockGrantorId(); } released = callReleaseProcessor(theLockGrantorId.getLockGrantorMember(), batchId, lockBatch, -1); if (!released) { - final boolean waitForGrantor = onlyIfSameGrantor; // fix for bug #43708 - notLockGrantorId(theLockGrantorId, waitForGrantor); + notLockGrantorId(theLockGrantorId, 100, TimeUnit.MILLISECONDS); } } } finally { @@ -2185,7 +2218,7 @@ public class DLockService extends DistributedLockService { // should have thrown LockServiceDestroyedException Assert.assertTrue(isDestroyed(), "Grantor reports service " + this + " is destroyed"); } else if (processor.repliedNotGrantor() || processor.hadNoResponse()) { - notLockGrantorId(theLockGrantorId, true); + notLockGrantorId(theLockGrantorId, 0, TimeUnit.MILLISECONDS); } else { keyIfFailed[0] = processor.getKeyIfFailed(); if (keyIfFailed[0] == null) { @@ -2455,7 +2488,8 @@ public class DLockService extends DistributedLockService { if (theLockGrantorId != null && !theLockGrantorId.isLocal(getSerialNumber())) { if (!NonGrantorDestroyedProcessor.send(this.serviceName, theLockGrantorId, dm)) { // grantor responded NOT_GRANTOR - notLockGrantorId(theLockGrantorId, true); // nulls out grantor to force call to elder + notLockGrantorId(theLockGrantorId, 0, TimeUnit.MILLISECONDS); // nulls out grantor to + // force call to elder retry = true; } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f02ea36f/geode-core/src/main/java/org/apache/geode/internal/cache/locks/TXLockServiceImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/locks/TXLockServiceImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/locks/TXLockServiceImpl.java index f4ab02f..717d878 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/locks/TXLockServiceImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/locks/TXLockServiceImpl.java @@ -15,13 +15,6 @@ package org.apache.geode.internal.cache.locks; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Set; - -import org.apache.logging.log4j.Logger; - import org.apache.geode.cache.CommitConflictException; import org.apache.geode.distributed.internal.InternalDistributedSystem; import org.apache.geode.distributed.internal.ReplyException; @@ -32,6 +25,12 @@ import org.apache.geode.distributed.internal.membership.InternalDistributedMembe import org.apache.geode.internal.i18n.LocalizedStrings; import org.apache.geode.internal.logging.LogService; import org.apache.geode.internal.util.concurrent.StoppableReentrantReadWriteLock; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; /** Provides clean separation of implementation from public facade */ public class TXLockServiceImpl extends TXLockService { @@ -50,13 +49,14 @@ public class TXLockServiceImpl extends TXLockService { /** Instance of dlock service to use */ private DLockService dlock; - /** List of active txLockIds */ + /** + * List of active txLockIds + */ protected List txLockIdList = new ArrayList(); /** * True if grantor recovery is in progress; used to keep <code>release</code> from waiting for - * grantor. TODO: this boolean can probably be removed... it was insufficient and new fixes for - * bug 38763 have the side effect of making this boolean obsolete (verify before removal!) + * grantor. */ private volatile boolean recovering = false; @@ -225,10 +225,11 @@ public class TXLockServiceImpl extends TXLockService { LocalizedStrings.TXLockServiceImpl_INVALID_TXLOCKID_NOT_FOUND_0 .toLocalizedString(txLockId)); } - // only release w/ dlock if not in middle of recovery... - if (!this.recovering) { - this.dlock.releaseTryLocks(txLockId, true); - } + + this.dlock.releaseTryLocks(txLockId, () -> { + return this.recovering; + }); + this.txLockIdList.remove(txLockId); releaseRecoveryReadLock(); } @@ -243,10 +244,14 @@ public class TXLockServiceImpl extends TXLockService { // Internal implementation methods // ------------------------------------------------------------------------- + boolean isRecovering() { + return this.recovering; + } + /** Delays grantor recovery replies until finished with locks */ void acquireRecoveryWriteLock() throws InterruptedException { - this.recoveryLock.writeLock().lockInterruptibly(); this.recovering = true; + this.recoveryLock.writeLock().lockInterruptibly(); } void releaseRecoveryWriteLock() { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f02ea36f/geode-core/src/main/java/org/apache/geode/internal/cache/locks/TXRecoverGrantorMessageProcessor.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/locks/TXRecoverGrantorMessageProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/locks/TXRecoverGrantorMessageProcessor.java old mode 100755 new mode 100644 index 77dec94..7ae2d2b --- a/geode-core/src/main/java/org/apache/geode/internal/cache/locks/TXRecoverGrantorMessageProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/locks/TXRecoverGrantorMessageProcessor.java @@ -15,10 +15,6 @@ package org.apache.geode.internal.cache.locks; -import java.util.concurrent.RejectedExecutionException; - -import org.apache.logging.log4j.Logger; - import org.apache.geode.distributed.internal.DM; import org.apache.geode.distributed.internal.DistributionManager; import org.apache.geode.distributed.internal.ReplyException; @@ -30,11 +26,13 @@ import org.apache.geode.internal.cache.TXCommitMessage; import org.apache.geode.internal.i18n.LocalizedStrings; import org.apache.geode.internal.logging.LogService; import org.apache.geode.internal.logging.log4j.LocalizedMessage; +import org.apache.logging.log4j.Logger; + +import java.util.concurrent.RejectedExecutionException; /** * Provides processing of DLockRecoverGrantorProcessor. Reply will not be sent until all locks are * released. - * */ public class TXRecoverGrantorMessageProcessor implements DLockRecoverGrantorProcessor.MessageProcessor { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f02ea36f/geode-core/src/test/java/org/apache/geode/internal/cache/locks/TXLockServiceDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/locks/TXLockServiceDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/locks/TXLockServiceDUnitTest.java old mode 100755 new mode 100644 index fb16ea9..6a5eae8 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/locks/TXLockServiceDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/locks/TXLockServiceDUnitTest.java @@ -14,29 +14,25 @@ */ package org.apache.geode.internal.cache.locks; -import static org.apache.geode.distributed.ConfigurationProperties.*; -import static org.junit.Assert.*; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Properties; -import java.util.Set; - -import org.junit.Ignore; -import org.junit.Test; -import org.junit.experimental.categories.Category; +import static com.jayway.awaitility.Awaitility.await; +import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; import org.apache.geode.cache.CommitConflictException; import org.apache.geode.distributed.DistributedLockService; import org.apache.geode.distributed.DistributedSystem; import org.apache.geode.distributed.internal.DM; +import org.apache.geode.distributed.internal.DistributionManager; import org.apache.geode.distributed.internal.DistributionMessage; +import org.apache.geode.distributed.internal.DistributionMessageObserver; import org.apache.geode.distributed.internal.InternalDistributedSystem; import org.apache.geode.distributed.internal.ReplyProcessor21; import org.apache.geode.distributed.internal.locks.DLockRecoverGrantorProcessor; +import org.apache.geode.distributed.internal.locks.DLockRecoverGrantorProcessor.DLockRecoverGrantorMessage; import org.apache.geode.distributed.internal.locks.DLockService; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.internal.cache.TXRegionLockRequestImpl; @@ -44,9 +40,19 @@ import org.apache.geode.test.dunit.Host; import org.apache.geode.test.dunit.Invoke; import org.apache.geode.test.dunit.LogWriterUtils; import org.apache.geode.test.dunit.SerializableRunnable; -import org.apache.geode.test.dunit.ThreadUtils; import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase; import org.apache.geode.test.junit.categories.DistributedTest; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeUnit; /** * This class tests distributed ownership via the DistributedLockService api. @@ -75,22 +81,10 @@ public class TXLockServiceDUnitTest extends JUnit4DistributedTestCase { */ @Override public final void postSetUp() throws Exception { - // Create a DistributedSystem in every VM + Invoke.invokeInEveryVM("connectDistributedSystem", () -> connectDistributedSystem()); connectDistributedSystem(); - - for (int h = 0; h < Host.getHostCount(); h++) { - Host host = Host.getHost(h); - - for (int v = 0; v < host.getVMCount(); v++) { - // host.getVM(v).invoke(() -> TXLockServiceDUnitTest.dumpStack()); - host.getVM(v).invoke(TXLockServiceDUnitTest.class, "connectDistributedSystem", null); - } - } } - public static void dumpStack() { - org.apache.geode.internal.OSProcess.printStacks(0); - } @Override public final void preTearDown() throws Exception { @@ -124,16 +118,13 @@ public class TXLockServiceDUnitTest extends JUnit4DistributedTestCase { */ } - @Ignore("TODO: test is disabled") @Test public void testGetAndDestroyAgain() { testGetAndDestroy(); } - @Ignore("TODO: test is disabled") @Test public void testTXRecoverGrantorMessageProcessor() throws Exception { - LogWriterUtils.getLogWriter().info("[testTXOriginatorRecoveryProcessor]"); TXLockService.createDTLS(); checkDLockRecoverGrantorMessageProcessor(); @@ -162,29 +153,158 @@ public class TXLockServiceDUnitTest extends JUnit4DistributedTestCase { msg.setProcessorId(testProc.getProcessorId()); msg.setSender(dlock.getDistributionManager().getId()); - Thread thread = new Thread(new Runnable() { - public void run() { - TXRecoverGrantorMessageProcessor proc = - (TXRecoverGrantorMessageProcessor) dlock.getDLockRecoverGrantorMessageProcessor(); - proc.processDLockRecoverGrantorMessage(dlock.getDistributionManager(), msg); - } + Thread thread = new Thread(() -> { + TXRecoverGrantorMessageProcessor proc = + (TXRecoverGrantorMessageProcessor) dlock.getDLockRecoverGrantorMessageProcessor(); + proc.processDLockRecoverGrantorMessage(dlock.getDistributionManager(), msg); }); + thread.setName("TXLockServiceDUnitTest thread"); + thread.setDaemon(true); thread.start(); - // pause to allow thread to be blocked before we release the lock - sleep(999); + await("waiting for recovery message to block").atMost(999, TimeUnit.MILLISECONDS).until(() -> { + return ((TXLockServiceImpl) dtls).isRecovering(); + }); - // release txLock dtls.release(txLockId); - // check results to verify no locks were provided in reply - ThreadUtils.join(thread, 30 * 1000); + // check results to verify no locks were provided in the reply + await("waiting for thread to exit").atMost(30, TimeUnit.SECONDS).until(() -> { + return !thread.isAlive(); + }); + + assertFalse(((TXLockServiceImpl) dtls).isRecovering()); + assertEquals("testTXRecoverGrantor_replyCode_PASS is false", true, testTXRecoverGrantor_replyCode_PASS); assertEquals("testTXRecoverGrantor_heldLocks_PASS is false", true, testTXRecoverGrantor_heldLocks_PASS); } + + @Test + public void testTXGrantorMigration() throws Exception { + // first make sure some other VM is the grantor + Host.getHost(0).getVM(0).invoke("become lock grantor", () -> { + TXLockService.createDTLS(); + TXLockService vm0dtls = TXLockService.getDTLS(); + DLockService vm0dlock = ((TXLockServiceImpl) vm0dtls).getInternalDistributedLockService(); + vm0dlock.becomeLockGrantor(); + }); + + TXLockService.createDTLS(); + checkDLockRecoverGrantorMessageProcessor(); + + /* + * call TXRecoverGrantorMessageProcessor.process directly to make sure that correct behavior + * occurs + */ + + // get txLock and hold it + final List regionLockReqs = new ArrayList(); + regionLockReqs.add(new TXRegionLockRequestImpl("/testTXRecoverGrantorMessageProcessor2", + new HashSet(Arrays.asList(new String[] {"KEY-1", "KEY-2", "KEY-3", "KEY-4"})))); + TXLockService dtls = TXLockService.getDTLS(); + TXLockId txLockId = dtls.txLock(regionLockReqs, Collections.EMPTY_SET); + + final DLockService dlock = ((TXLockServiceImpl) dtls).getInternalDistributedLockService(); + + // GEODE-2024: now cause grantor migration while holding the recoveryReadLock. + // It will lock up in TXRecoverGrantorMessageProcessor until the recoveryReadLock + // is released. Demonstrate that dtls.release() does not block forever and releases the + // recoveryReadLock + // allowing grantor migration to finish + + // create an observer that will block recovery messages from being processed + MessageObserver observer = new MessageObserver(); + DistributionMessageObserver.setInstance(observer); + + try { + System.out.println("starting thread to take over being lock grantor from vm0"); + + // become the grantor - this will block waiting for a reply to the message blocked by the + // observer + Thread thread = new Thread(() -> { + dlock.becomeLockGrantor(); + }); + thread.setName("TXLockServiceDUnitTest thread2"); + thread.setDaemon(true); + thread.start(); + + await("waiting for recovery to begin").atMost(10, TimeUnit.SECONDS).until(() -> { + return observer.isPreventingProcessing(); + }); + + + // spawn a thread that will unblock message processing + // so that TXLockServiceImpl's "recovering" variable will be set + System.out.println("starting a thread to unblock recovery in 5 seconds"); + Thread unblockThread = new Thread(() -> { + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + throw new RuntimeException("sleep interrupted"); + } + System.out.println("releasing block of recovery message processing"); + observer.releasePreventionOfProcessing(); + }); + unblockThread.setName("TXLockServiceDUnitTest unblockThread"); + unblockThread.setDaemon(true); + unblockThread.start(); + + // release txLock - this will block until unblockThread tells the observer + // that it can process its message. Then it should release the recovery read-lock + // allowing the grantor to finish recovery + System.out.println("releasing transaction locks, which should block for a bit"); + dtls.release(txLockId); + + await("waiting for recovery to finish").atMost(10, TimeUnit.SECONDS).until(() -> { + return !((TXLockServiceImpl) dtls).isRecovering(); + }); + } finally { + observer.releasePreventionOfProcessing(); + DistributionMessageObserver.setInstance(null); + } + } + + static class MessageObserver extends DistributionMessageObserver { + final boolean[] preventingMessageProcessing = new boolean[] {false}; + final boolean[] preventMessageProcessing = new boolean[] {true}; + + + public boolean isPreventingProcessing() { + synchronized (preventingMessageProcessing) { + return preventingMessageProcessing[0]; + } + } + + public void releasePreventionOfProcessing() { + synchronized (preventMessageProcessing) { + preventMessageProcessing[0] = false; + } + } + + @Override + public void beforeProcessMessage(DistributionManager dm, DistributionMessage message) { + if (message instanceof DLockRecoverGrantorMessage) { + synchronized (preventingMessageProcessing) { + preventingMessageProcessing[0] = true; + } + synchronized (preventMessageProcessing) { + while (preventMessageProcessing[0]) { + try { + preventMessageProcessing.wait(50); + } catch (InterruptedException e) { + throw new RuntimeException("sleep interrupted"); + } + } + } + } + } + + } + + protected static volatile TXLockId testTXLock_TXLockId; @Test @@ -384,7 +504,6 @@ public class TXLockServiceDUnitTest extends JUnit4DistributedTestCase { }); Host.getHost(0).getVM(originatorVM).invoke(() -> disconnectFromDS()); - // grantor sends TXOriginatorRecoveryMessage... // TODO: verify processing of message? and have test sleep until finished sleep(200); @@ -456,7 +575,7 @@ public class TXLockServiceDUnitTest extends JUnit4DistributedTestCase { /** * Creates a new DistributedLockService in a remote VM. - * + * * @param name The name of the newly-created DistributedLockService. It is recommended that the * name of the Region be the {@link #getUniqueName()} of the test, or at least derive from * it. @@ -594,9 +713,6 @@ public class TXLockServiceDUnitTest extends JUnit4DistributedTestCase { /** * Accessed via reflection. DO NOT REMOVE - * - * @param key - * @return */ protected static Boolean unlock_DTLS(Object key) { TXLockService dtls = TXLockService.getDTLS();