GEODE-2024 Deadlock creating a new lock service Grantor

This change-set causes the code in TXLockServiceImpl.release() to
perform periodic checks to see if grantor recovery is being performed.
If so it skips releaseTryLocks, which requires a recovered grantor to
function.  This is in line with the previous attempts to fix this
problem.  The recovery message that is trying to obtain the recovery
write-lock now sets the "recovering" state in TXLockServiceImpl prior
to attempting to get the lock so that it is set when
TXLockServiceImpl.release() checks its state.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/f02ea36f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/f02ea36f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/f02ea36f

Branch: refs/heads/feature/GEODE-288
Commit: f02ea36f2e3a440e8aa39815539f3aa2855ce124
Parents: 69a0877
Author: Bruce Schuchardt <bschucha...@pivotal.io>
Authored: Wed Oct 26 13:51:20 2016 -0700
Committer: Bruce Schuchardt <bschucha...@pivotal.io>
Committed: Wed Oct 26 13:53:00 2016 -0700

----------------------------------------------------------------------
 .../locks/DLockRecoverGrantorProcessor.java     |  16 +-
 .../internal/locks/DLockService.java            | 108 ++++++----
 .../internal/cache/locks/TXLockServiceImpl.java |  35 ++--
 .../locks/TXRecoverGrantorMessageProcessor.java |   8 +-
 .../cache/locks/TXLockServiceDUnitTest.java     | 210 ++++++++++++++-----
 5 files changed, 272 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f02ea36f/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockRecoverGrantorProcessor.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockRecoverGrantorProcessor.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockRecoverGrantorProcessor.java
index 37fbfbe..2a48308 100755
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockRecoverGrantorProcessor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockRecoverGrantorProcessor.java
@@ -91,7 +91,7 @@ public class DLockRecoverGrantorProcessor extends 
ReplyProcessor21 {
     // process msg and reply from this VM...
     if (msg.getSender() == null)
       msg.setSender(dm.getId());
-    msg.processMessage(dm);
+    msg.scheduleMessage(dm);
 
     // keep waiting even if interrupted
     try {
@@ -239,6 +239,20 @@ public class DLockRecoverGrantorProcessor extends 
ReplyProcessor21 {
       processMessage(dm);
     }
 
+    /**
+     * For unit testing we need to push the message through scheduleAction so 
that message observers
+     * are invoked
+     * 
+     * @param dm the distribution manager
+     */
+    protected void scheduleMessage(DM dm) {
+      if (dm instanceof DistributionManager) {
+        super.scheduleAction((DistributionManager) dm);
+      } else {
+        processMessage(dm);
+      }
+    }
+
     protected void processMessage(DM dm) {
       MessageProcessor processor = nullServiceProcessor;
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f02ea36f/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockService.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockService.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockService.java
index a859299..ca012d3 100644
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockService.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockService.java
@@ -15,8 +15,18 @@
 
 package org.apache.geode.distributed.internal.locks;
 
-import org.apache.geode.*;
-import org.apache.geode.distributed.*;
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.CancelException;
+import org.apache.geode.InternalGemFireError;
+import org.apache.geode.InternalGemFireException;
+import org.apache.geode.StatisticsFactory;
+import org.apache.geode.SystemFailure;
+import org.apache.geode.distributed.DistributedLockService;
+import org.apache.geode.distributed.DistributedSystem;
+import org.apache.geode.distributed.DistributedSystemDisconnectedException;
+import org.apache.geode.distributed.LeaseExpiredException;
+import org.apache.geode.distributed.LockNotHeldException;
+import org.apache.geode.distributed.LockServiceDestroyedException;
 import org.apache.geode.distributed.internal.DM;
 import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
@@ -39,8 +49,18 @@ import org.apache.logging.log4j.Logger;
 
 import java.io.DataInput;
 import java.io.DataOutput;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CancellationException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
@@ -290,7 +310,7 @@ public class DLockService extends DistributedLockService {
         statStart = getStats().startGrantorWait();
         if (!ownLockGrantorFutureResult) {
           LockGrantorId lockGrantorIdRef =
-              waitForLockGrantorFutureResult(lockGrantorFutureResultRef);
+              waitForLockGrantorFutureResult(lockGrantorFutureResultRef, 0, 
TimeUnit.MILLISECONDS);
           if (lockGrantorIdRef != null) {
             return lockGrantorIdRef;
           } else {
@@ -366,7 +386,7 @@ public class DLockService extends DistributedLockService {
 
   /**
    * Creates a local {@link DLockGrantor}.
-   * 
+   *
    * if (!createLocalGrantor(xxx)) { theLockGrantorId = this.lockGrantorId; }
    * 
    * @param elder the elder that told us to be the grantor
@@ -727,15 +747,24 @@ public class DLockService extends DistributedLockService {
    * Returns lockGrantorId when lockGrantorFutureResultRef has been set by 
another thread.
    * 
    * @param lockGrantorFutureResultRef FutureResult to wait for
+   * @param timeToWait how many ms to wait, 0 = forever
+   * @param timeUnit the unit of measure for timeToWait
    * @return the LockGrantorId or null if FutureResult was cancelled
    */
-  private LockGrantorId waitForLockGrantorFutureResult(FutureResult 
lockGrantorFutureResultRef) {
+  private LockGrantorId waitForLockGrantorFutureResult(FutureResult 
lockGrantorFutureResultRef,
+      long timeToWait, final TimeUnit timeUnit) {
     LockGrantorId lockGrantorIdRef = null;
     while (lockGrantorIdRef == null) {
       boolean interrupted = Thread.interrupted();
       try {
         checkDestroyed();
-        lockGrantorIdRef = (LockGrantorId) lockGrantorFutureResultRef.get();
+        if (timeToWait == 0) {
+          lockGrantorIdRef = (LockGrantorId) lockGrantorFutureResultRef.get();
+        } else {
+          lockGrantorIdRef = (LockGrantorId) 
lockGrantorFutureResultRef.get(timeToWait, timeUnit);
+        }
+      } catch (TimeoutException e) {
+        break;
       } catch (InterruptedException e) {
         interrupted = true;
         this.dm.getCancelCriterion().checkCancelInProgress(e);
@@ -757,7 +786,14 @@ public class DLockService extends DistributedLockService {
     return lockGrantorIdRef;
   }
 
-  private void notLockGrantorId(LockGrantorId notLockGrantorId, boolean 
waitForGrantor) {
+  /**
+   * nulls out grantor to force call to elder
+   * 
+   * @param timeToWait how long to wait for a new grantor. -1 don't wait, 0 no 
time limit
+   * @param timeUnit the unit of measure of timeToWait
+   */
+  private void notLockGrantorId(LockGrantorId notLockGrantorId, long 
timeToWait,
+      final TimeUnit timeUnit) {
     if (notLockGrantorId.isLocal(getSerialNumber())) {
       if (logger.isTraceEnabled(LogMarker.DLS)) {
         logger.trace(LogMarker.DLS,
@@ -793,8 +829,8 @@ public class DLockService extends DistributedLockService {
 
       statStart = getStats().startGrantorWait();
       if (!ownLockGrantorFutureResult) {
-        if (waitForGrantor) { // fix for bug #43708
-          waitForLockGrantorFutureResult(lockGrantorFutureResultRef);
+        if (timeToWait >= 0) {
+          waitForLockGrantorFutureResult(lockGrantorFutureResultRef, 
timeToWait, timeUnit);
         }
         return;
       }
@@ -947,7 +983,7 @@ public class DLockService extends DistributedLockService {
           }
         }
         if (!ownLockGrantorFutureResult) {
-          waitForLockGrantorFutureResult(lockGrantorFutureResultRef);
+          waitForLockGrantorFutureResult(lockGrantorFutureResultRef, 0, 
TimeUnit.MILLISECONDS);
           continue;
         }
       }
@@ -1329,7 +1365,7 @@ public class DLockService extends DistributedLockService {
    *        will be ignored if the lock is currently held by another client.
    *
    * @param interruptible true if this lock request is interruptible
-   * 
+   *
    * @param disableAlerts true to disable logging alerts if the dlock is 
taking a long time to
    *        acquired.
    *
@@ -1408,7 +1444,6 @@ public class DLockService extends DistributedLockService {
                   
LocalizedStrings.DLockService_DEBUG_LOCKINTERRUPTIBLY_HAS_GONE_HOT_AND_LOOPED_0_TIMES
                       .toLocalizedString(count);
 
-
               InternalGemFireError e = new InternalGemFireError(s);
               logger.error(LogMarker.DLS,
                   LocalizedMessage.create(
@@ -1516,7 +1551,6 @@ public class DLockService extends DistributedLockService {
             }
           } // else: non-reentrant or reentrant w/ non-infinite lease
 
-
           if (gotLock) {
             // if (processor != null) (cannot be null)
             { // TODO: can be null after restoring above optimization
@@ -1539,9 +1573,7 @@ public class DLockService extends DistributedLockService {
                       this.lockGrantorId);
                 }
                 continue;
-              }
-
-              else if (isDestroyed()) {
+              } else if (isDestroyed()) {
                 // race: dls was destroyed
                 if (isDebugEnabled_DLS) {
                   logger.trace(LogMarker.DLS,
@@ -1549,9 +1581,7 @@ public class DLockService extends DistributedLockService {
                       theLockGrantorId);
                 }
                 needToReleaseOrphanedGrant = true;
-              }
-
-              else {
+              } else {
                 safeExit = true;
                 synchronized (this.tokens) {
                   checkDestroyed();
@@ -1603,7 +1633,7 @@ public class DLockService extends DistributedLockService {
           // grantor replied NOT_GRANTOR or departed (getLock is false)
           else if (processor.repliedNotGrantor() || processor.hadNoResponse()) 
{
             safeExit = true;
-            notLockGrantorId(theLockGrantorId, true);
+            notLockGrantorId(theLockGrantorId, 0, TimeUnit.MILLISECONDS);
             // keepTrying is still true... loop back around
           } // grantor replied NOT_GRANTOR or departed
 
@@ -1912,7 +1942,7 @@ public class DLockService extends DistributedLockService {
             released = true;
           } finally {
             if (!released) {
-              notLockGrantorId(theLockGrantorId, true);
+              notLockGrantorId(theLockGrantorId, 0, TimeUnit.MILLISECONDS);
             }
           }
         } // while !released
@@ -1966,7 +1996,7 @@ public class DLockService extends DistributedLockService {
           // loop back around to get next lock grantor
         } finally {
           if (queryReply != null && queryReply.repliedNotGrantor()) {
-            notLockGrantorId(theLockGrantorId, true);
+            notLockGrantorId(theLockGrantorId, 0, TimeUnit.MILLISECONDS);
           }
         }
       } // while querying
@@ -2076,7 +2106,7 @@ public class DLockService extends DistributedLockService {
     return this.dlockStats;
   }
 
-  public void releaseTryLocks(DLockBatchId batchId, boolean onlyIfSameGrantor) 
{
+  public void releaseTryLocks(DLockBatchId batchId, Callable<Boolean> 
untilCondition) {
     final boolean isDebugEnabled_DLS = logger.isTraceEnabled(LogMarker.DLS);
     if (isDebugEnabled_DLS) {
       logger.trace(LogMarker.DLS, "[DLockService.releaseTryLocks] enter: {}", 
batchId);
@@ -2088,26 +2118,29 @@ public class DLockService extends 
DistributedLockService {
       boolean lockBatch = true;
       boolean released = false;
       while (!released) {
+        try {
+          boolean quit = untilCondition.call();
+          if (quit) {
+            return;
+          }
+        } catch (Exception e) {
+          throw new InternalGemFireException("unexpected exception", e);
+        }
         checkDestroyed();
         LockGrantorId theLockGrantorId = null;
 
-        if (onlyIfSameGrantor) { // this was a fix for bug #38763, from r19555
-          theLockGrantorId = batchId.getLockGrantorId();
-          synchronized (this.lockGrantorIdLock) {
-            if (!checkLockGrantorId(theLockGrantorId)) {
-              // the grantor is different so break and skip 
DLockReleaseProcessor
-              break;
-            }
+        theLockGrantorId = batchId.getLockGrantorId();
+        synchronized (this.lockGrantorIdLock) {
+          if (!checkLockGrantorId(theLockGrantorId)) {
+            // the grantor is different so break and skip DLockReleaseProcessor
+            break;
           }
-        } else {
-          theLockGrantorId = getLockGrantorId();
         }
 
         released =
             callReleaseProcessor(theLockGrantorId.getLockGrantorMember(), 
batchId, lockBatch, -1);
         if (!released) {
-          final boolean waitForGrantor = onlyIfSameGrantor; // fix for bug 
#43708
-          notLockGrantorId(theLockGrantorId, waitForGrantor);
+          notLockGrantorId(theLockGrantorId, 100, TimeUnit.MILLISECONDS);
         }
       }
     } finally {
@@ -2185,7 +2218,7 @@ public class DLockService extends DistributedLockService {
           // should have thrown LockServiceDestroyedException
           Assert.assertTrue(isDestroyed(), "Grantor reports service " + this + 
" is destroyed");
         } else if (processor.repliedNotGrantor() || processor.hadNoResponse()) 
{
-          notLockGrantorId(theLockGrantorId, true);
+          notLockGrantorId(theLockGrantorId, 0, TimeUnit.MILLISECONDS);
         } else {
           keyIfFailed[0] = processor.getKeyIfFailed();
           if (keyIfFailed[0] == null) {
@@ -2455,7 +2488,8 @@ public class DLockService extends DistributedLockService {
         if (theLockGrantorId != null && 
!theLockGrantorId.isLocal(getSerialNumber())) {
           if (!NonGrantorDestroyedProcessor.send(this.serviceName, 
theLockGrantorId, dm)) {
             // grantor responded NOT_GRANTOR
-            notLockGrantorId(theLockGrantorId, true); // nulls out grantor to 
force call to elder
+            notLockGrantorId(theLockGrantorId, 0, TimeUnit.MILLISECONDS); // 
nulls out grantor to
+                                                                          // 
force call to elder
             retry = true;
           }
         }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f02ea36f/geode-core/src/main/java/org/apache/geode/internal/cache/locks/TXLockServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/locks/TXLockServiceImpl.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/locks/TXLockServiceImpl.java
index f4ab02f..717d878 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/locks/TXLockServiceImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/locks/TXLockServiceImpl.java
@@ -15,13 +15,6 @@
 
 package org.apache.geode.internal.cache.locks;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.logging.log4j.Logger;
-
 import org.apache.geode.cache.CommitConflictException;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.distributed.internal.ReplyException;
@@ -32,6 +25,12 @@ import 
org.apache.geode.distributed.internal.membership.InternalDistributedMembe
 import org.apache.geode.internal.i18n.LocalizedStrings;
 import org.apache.geode.internal.logging.LogService;
 import 
org.apache.geode.internal.util.concurrent.StoppableReentrantReadWriteLock;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
 
 /** Provides clean separation of implementation from public facade */
 public class TXLockServiceImpl extends TXLockService {
@@ -50,13 +49,14 @@ public class TXLockServiceImpl extends TXLockService {
   /** Instance of dlock service to use */
   private DLockService dlock;
 
-  /** List of active txLockIds */
+  /**
+   * List of active txLockIds
+   */
   protected List txLockIdList = new ArrayList();
 
   /**
    * True if grantor recovery is in progress; used to keep 
<code>release</code> from waiting for
-   * grantor. TODO: this boolean can probably be removed... it was 
insufficient and new fixes for
-   * bug 38763 have the side effect of making this boolean obsolete (verify 
before removal!)
+   * grantor.
    */
   private volatile boolean recovering = false;
 
@@ -225,10 +225,11 @@ public class TXLockServiceImpl extends TXLockService {
             LocalizedStrings.TXLockServiceImpl_INVALID_TXLOCKID_NOT_FOUND_0
                 .toLocalizedString(txLockId));
       }
-      // only release w/ dlock if not in middle of recovery...
-      if (!this.recovering) {
-        this.dlock.releaseTryLocks(txLockId, true);
-      }
+
+      this.dlock.releaseTryLocks(txLockId, () -> {
+        return this.recovering;
+      });
+
       this.txLockIdList.remove(txLockId);
       releaseRecoveryReadLock();
     }
@@ -243,10 +244,14 @@ public class TXLockServiceImpl extends TXLockService {
   // Internal implementation methods
   // -------------------------------------------------------------------------
 
+  boolean isRecovering() {
+    return this.recovering;
+  }
+
   /** Delays grantor recovery replies until finished with locks */
   void acquireRecoveryWriteLock() throws InterruptedException {
-    this.recoveryLock.writeLock().lockInterruptibly();
     this.recovering = true;
+    this.recoveryLock.writeLock().lockInterruptibly();
   }
 
   void releaseRecoveryWriteLock() {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f02ea36f/geode-core/src/main/java/org/apache/geode/internal/cache/locks/TXRecoverGrantorMessageProcessor.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/locks/TXRecoverGrantorMessageProcessor.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/locks/TXRecoverGrantorMessageProcessor.java
old mode 100755
new mode 100644
index 77dec94..7ae2d2b
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/locks/TXRecoverGrantorMessageProcessor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/locks/TXRecoverGrantorMessageProcessor.java
@@ -15,10 +15,6 @@
 
 package org.apache.geode.internal.cache.locks;
 
-import java.util.concurrent.RejectedExecutionException;
-
-import org.apache.logging.log4j.Logger;
-
 import org.apache.geode.distributed.internal.DM;
 import org.apache.geode.distributed.internal.DistributionManager;
 import org.apache.geode.distributed.internal.ReplyException;
@@ -30,11 +26,13 @@ import org.apache.geode.internal.cache.TXCommitMessage;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.logging.log4j.LocalizedMessage;
+import org.apache.logging.log4j.Logger;
+
+import java.util.concurrent.RejectedExecutionException;
 
 /**
  * Provides processing of DLockRecoverGrantorProcessor. Reply will not be sent 
until all locks are
  * released.
- *
  */
 public class TXRecoverGrantorMessageProcessor
     implements DLockRecoverGrantorProcessor.MessageProcessor {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f02ea36f/geode-core/src/test/java/org/apache/geode/internal/cache/locks/TXLockServiceDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/locks/TXLockServiceDUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/locks/TXLockServiceDUnitTest.java
old mode 100755
new mode 100644
index fb16ea9..6a5eae8
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/locks/TXLockServiceDUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/locks/TXLockServiceDUnitTest.java
@@ -14,29 +14,25 @@
  */
 package org.apache.geode.internal.cache.locks;
 
-import static org.apache.geode.distributed.ConfigurationProperties.*;
-import static org.junit.Assert.*;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Properties;
-import java.util.Set;
-
-import org.junit.Ignore;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
+import static com.jayway.awaitility.Awaitility.await;
+import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
 
 import org.apache.geode.cache.CommitConflictException;
 import org.apache.geode.distributed.DistributedLockService;
 import org.apache.geode.distributed.DistributedSystem;
 import org.apache.geode.distributed.internal.DM;
+import org.apache.geode.distributed.internal.DistributionManager;
 import org.apache.geode.distributed.internal.DistributionMessage;
+import org.apache.geode.distributed.internal.DistributionMessageObserver;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.distributed.internal.ReplyProcessor21;
 import 
org.apache.geode.distributed.internal.locks.DLockRecoverGrantorProcessor;
+import 
org.apache.geode.distributed.internal.locks.DLockRecoverGrantorProcessor.DLockRecoverGrantorMessage;
 import org.apache.geode.distributed.internal.locks.DLockService;
 import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.cache.TXRegionLockRequestImpl;
@@ -44,9 +40,19 @@ import org.apache.geode.test.dunit.Host;
 import org.apache.geode.test.dunit.Invoke;
 import org.apache.geode.test.dunit.LogWriterUtils;
 import org.apache.geode.test.dunit.SerializableRunnable;
-import org.apache.geode.test.dunit.ThreadUtils;
 import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
 import org.apache.geode.test.junit.categories.DistributedTest;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 /**
  * This class tests distributed ownership via the DistributedLockService api.
@@ -75,22 +81,10 @@ public class TXLockServiceDUnitTest extends 
JUnit4DistributedTestCase {
    */
   @Override
   public final void postSetUp() throws Exception {
-    // Create a DistributedSystem in every VM
+    Invoke.invokeInEveryVM("connectDistributedSystem", () -> 
connectDistributedSystem());
     connectDistributedSystem();
-
-    for (int h = 0; h < Host.getHostCount(); h++) {
-      Host host = Host.getHost(h);
-
-      for (int v = 0; v < host.getVMCount(); v++) {
-        // host.getVM(v).invoke(() -> TXLockServiceDUnitTest.dumpStack());
-        host.getVM(v).invoke(TXLockServiceDUnitTest.class, 
"connectDistributedSystem", null);
-      }
-    }
   }
 
-  public static void dumpStack() {
-    org.apache.geode.internal.OSProcess.printStacks(0);
-  }
 
   @Override
   public final void preTearDown() throws Exception {
@@ -124,16 +118,13 @@ public class TXLockServiceDUnitTest extends 
JUnit4DistributedTestCase {
      */
   }
 
-  @Ignore("TODO: test is disabled")
   @Test
   public void testGetAndDestroyAgain() {
     testGetAndDestroy();
   }
 
-  @Ignore("TODO: test is disabled")
   @Test
   public void testTXRecoverGrantorMessageProcessor() throws Exception {
-    LogWriterUtils.getLogWriter().info("[testTXOriginatorRecoveryProcessor]");
     TXLockService.createDTLS();
     checkDLockRecoverGrantorMessageProcessor();
 
@@ -162,29 +153,158 @@ public class TXLockServiceDUnitTest extends 
JUnit4DistributedTestCase {
     msg.setProcessorId(testProc.getProcessorId());
     msg.setSender(dlock.getDistributionManager().getId());
 
-    Thread thread = new Thread(new Runnable() {
-      public void run() {
-        TXRecoverGrantorMessageProcessor proc =
-            (TXRecoverGrantorMessageProcessor) 
dlock.getDLockRecoverGrantorMessageProcessor();
-        proc.processDLockRecoverGrantorMessage(dlock.getDistributionManager(), 
msg);
-      }
+    Thread thread = new Thread(() -> {
+      TXRecoverGrantorMessageProcessor proc =
+          (TXRecoverGrantorMessageProcessor) 
dlock.getDLockRecoverGrantorMessageProcessor();
+      proc.processDLockRecoverGrantorMessage(dlock.getDistributionManager(), 
msg);
     });
+    thread.setName("TXLockServiceDUnitTest thread");
+    thread.setDaemon(true);
     thread.start();
 
-    // pause to allow thread to be blocked before we release the lock
-    sleep(999);
+    await("waiting for recovery message to block").atMost(999, 
TimeUnit.MILLISECONDS).until(() -> {
+      return ((TXLockServiceImpl) dtls).isRecovering();
+    });
 
-    // release txLock
     dtls.release(txLockId);
 
-    // check results to verify no locks were provided in reply
-    ThreadUtils.join(thread, 30 * 1000);
+    // check results to verify no locks were provided in the reply
+    await("waiting for thread to exit").atMost(30, TimeUnit.SECONDS).until(() 
-> {
+      return !thread.isAlive();
+    });
+
+    assertFalse(((TXLockServiceImpl) dtls).isRecovering());
+
     assertEquals("testTXRecoverGrantor_replyCode_PASS is false", true,
         testTXRecoverGrantor_replyCode_PASS);
     assertEquals("testTXRecoverGrantor_heldLocks_PASS is false", true,
         testTXRecoverGrantor_heldLocks_PASS);
   }
 
+
+  @Test
+  public void testTXGrantorMigration() throws Exception {
+    // first make sure some other VM is the grantor
+    Host.getHost(0).getVM(0).invoke("become lock grantor", () -> {
+      TXLockService.createDTLS();
+      TXLockService vm0dtls = TXLockService.getDTLS();
+      DLockService vm0dlock = ((TXLockServiceImpl) 
vm0dtls).getInternalDistributedLockService();
+      vm0dlock.becomeLockGrantor();
+    });
+
+    TXLockService.createDTLS();
+    checkDLockRecoverGrantorMessageProcessor();
+
+    /*
+     * call TXRecoverGrantorMessageProcessor.process directly to make sure 
that correct behavior
+     * occurs
+     */
+
+    // get txLock and hold it
+    final List regionLockReqs = new ArrayList();
+    regionLockReqs.add(new 
TXRegionLockRequestImpl("/testTXRecoverGrantorMessageProcessor2",
+        new HashSet(Arrays.asList(new String[] {"KEY-1", "KEY-2", "KEY-3", 
"KEY-4"}))));
+    TXLockService dtls = TXLockService.getDTLS();
+    TXLockId txLockId = dtls.txLock(regionLockReqs, Collections.EMPTY_SET);
+
+    final DLockService dlock = ((TXLockServiceImpl) 
dtls).getInternalDistributedLockService();
+
+    // GEODE-2024: now cause grantor migration while holding the 
recoveryReadLock.
+    // It will lock up in TXRecoverGrantorMessageProcessor until the 
recoveryReadLock
+    // is released. Demonstrate that dtls.release() does not block forever and 
releases the
+    // recoveryReadLock
+    // allowing grantor migration to finish
+
+    // create an observer that will block recovery messages from being 
processed
+    MessageObserver observer = new MessageObserver();
+    DistributionMessageObserver.setInstance(observer);
+
+    try {
+      System.out.println("starting thread to take over being lock grantor from 
vm0");
+
+      // become the grantor - this will block waiting for a reply to the 
message blocked by the
+      // observer
+      Thread thread = new Thread(() -> {
+        dlock.becomeLockGrantor();
+      });
+      thread.setName("TXLockServiceDUnitTest thread2");
+      thread.setDaemon(true);
+      thread.start();
+
+      await("waiting for recovery to begin").atMost(10, 
TimeUnit.SECONDS).until(() -> {
+        return observer.isPreventingProcessing();
+      });
+
+
+      // spawn a thread that will unblock message processing
+      // so that TXLockServiceImpl's "recovering" variable will be set
+      System.out.println("starting a thread to unblock recovery in 5 seconds");
+      Thread unblockThread = new Thread(() -> {
+        try {
+          Thread.sleep(5000);
+        } catch (InterruptedException e) {
+          throw new RuntimeException("sleep interrupted");
+        }
+        System.out.println("releasing block of recovery message processing");
+        observer.releasePreventionOfProcessing();
+      });
+      unblockThread.setName("TXLockServiceDUnitTest unblockThread");
+      unblockThread.setDaemon(true);
+      unblockThread.start();
+
+      // release txLock - this will block until unblockThread tells the 
observer
+      // that it can process its message. Then it should release the recovery 
read-lock
+      // allowing the grantor to finish recovery
+      System.out.println("releasing transaction locks, which should block for 
a bit");
+      dtls.release(txLockId);
+
+      await("waiting for recovery to finish").atMost(10, 
TimeUnit.SECONDS).until(() -> {
+        return !((TXLockServiceImpl) dtls).isRecovering();
+      });
+    } finally {
+      observer.releasePreventionOfProcessing();
+      DistributionMessageObserver.setInstance(null);
+    }
+  }
+
+  static class MessageObserver extends DistributionMessageObserver {
+    final boolean[] preventingMessageProcessing = new boolean[] {false};
+    final boolean[] preventMessageProcessing = new boolean[] {true};
+
+
+    public boolean isPreventingProcessing() {
+      synchronized (preventingMessageProcessing) {
+        return preventingMessageProcessing[0];
+      }
+    }
+
+    public void releasePreventionOfProcessing() {
+      synchronized (preventMessageProcessing) {
+        preventMessageProcessing[0] = false;
+      }
+    }
+
+    @Override
+    public void beforeProcessMessage(DistributionManager dm, 
DistributionMessage message) {
+      if (message instanceof DLockRecoverGrantorMessage) {
+        synchronized (preventingMessageProcessing) {
+          preventingMessageProcessing[0] = true;
+        }
+        synchronized (preventMessageProcessing) {
+          while (preventMessageProcessing[0]) {
+            try {
+              preventMessageProcessing.wait(50);
+            } catch (InterruptedException e) {
+              throw new RuntimeException("sleep interrupted");
+            }
+          }
+        }
+      }
+    }
+
+  }
+
+
   protected static volatile TXLockId testTXLock_TXLockId;
 
   @Test
@@ -384,7 +504,6 @@ public class TXLockServiceDUnitTest extends 
JUnit4DistributedTestCase {
     });
     Host.getHost(0).getVM(originatorVM).invoke(() -> disconnectFromDS());
 
-
     // grantor sends TXOriginatorRecoveryMessage...
     // TODO: verify processing of message? and have test sleep until finished
     sleep(200);
@@ -456,7 +575,7 @@ public class TXLockServiceDUnitTest extends 
JUnit4DistributedTestCase {
 
   /**
    * Creates a new DistributedLockService in a remote VM.
-   *
+   * 
    * @param name The name of the newly-created DistributedLockService. It is 
recommended that the
    *        name of the Region be the {@link #getUniqueName()} of the test, or 
at least derive from
    *        it.
@@ -594,9 +713,6 @@ public class TXLockServiceDUnitTest extends 
JUnit4DistributedTestCase {
 
   /**
    * Accessed via reflection. DO NOT REMOVE
-   * 
-   * @param key
-   * @return
    */
   protected static Boolean unlock_DTLS(Object key) {
     TXLockService dtls = TXLockService.getDTLS();

Reply via email to