This is an automated email from the ASF dual-hosted git repository.

alberto pushed a commit to branch support/1.15
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 3ada8fe80f7569874f643c32d585658e98f4615a
Author: Alberto Gomez <alberto.go...@est.tech>
AuthorDate: Thu Aug 18 09:42:27 2022 +0200

    GEODE-10403: Fix distributed deadlock with stop gw sender (#7830)
    
    There is a distributed deadlock that can appear
    when stopping the gateway sender if a race condition
    happens in which the stop gateway sender command gets blocked
    indefinitely trying to get the size of the queue from remote peers
    (ParallelGatewaySenderQueue.size() call) and
    also one call to store one event in the queue tries to get
    the lifecycle lock (acquired by the gateway sender command).
    
    These two calls could get into a deadlock under heavy load and
    make the system unresponsive for any traffic request (get, put, ...).
    
    In order to avoid it, in the storage of the event in the gateway
    sender queue (AbstractGatewaySender.distribute() call),
    instead to trying to get the lifecycle lock without
    any timeout, a try with a timeout is added. If the
    try returns false it is checked if the gateway sender is running. If
    it is not running, the event is dropped and there is no need to get the 
lock.
    Otherwise, the lifecycle lock acquire is retried until it succeeds or
    the gateway sender is stopped.
---
 .../internal/ClusterOperationExecutors.java        |  9 ++-
 .../internal/cache/wan/AbstractGatewaySender.java  | 52 +++++++++----
 .../geode/internal/cache/wan/WANTestBase.java      |  7 ++
 ...llelGatewaySenderOperationsDistributedTest.java | 86 +++++++++++++++++++++-
 4 files changed, 133 insertions(+), 21 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterOperationExecutors.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterOperationExecutors.java
index ba25e3b539..7c45bbb9e3 100644
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterOperationExecutors.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterOperationExecutors.java
@@ -25,12 +25,14 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.CancelException;
 import org.apache.geode.InternalGemFireError;
 import org.apache.geode.SystemFailure;
+import org.apache.geode.annotations.internal.MutableForTesting;
 import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import 
org.apache.geode.distributed.internal.membership.gms.messages.ViewAckMessage;
 import org.apache.geode.internal.logging.CoreLoggingExecutors;
@@ -167,6 +169,8 @@ public class ClusterOperationExecutors implements 
OperationExecutors {
 
   private SerialQueuedExecutorPool serialQueuedExecutorPool;
 
+  @MutableForTesting
+  public static final AtomicInteger maxPrThreadsForTest = new 
AtomicInteger(-1);
 
   ClusterOperationExecutors(DistributionStats stats,
       InternalDistributedSystem system) {
@@ -252,10 +256,11 @@ public class ClusterOperationExecutors implements 
OperationExecutors {
             this::doWaitingThread, stats.getWaitingPoolHelper(),
             threadMonitor);
 
-    if (MAX_PR_THREADS > 1) {
+    int maxPrThreads = maxPrThreadsForTest.get() > 0 ? 
maxPrThreadsForTest.get() : MAX_PR_THREADS;
+    if (maxPrThreads > 1) {
       partitionedRegionPool =
           CoreLoggingExecutors.newThreadPoolWithFeedStatistics(
-              MAX_PR_THREADS, INCOMING_QUEUE_LIMIT, 
stats.getPartitionedRegionQueueHelper(),
+              maxPrThreads, INCOMING_QUEUE_LIMIT, 
stats.getPartitionedRegionQueueHelper(),
               "PartitionedRegion Message Processor",
               thread -> stats.incPartitionedRegionThreadStarts(), 
this::doPartitionRegionThread,
               stats.getPartitionedRegionPoolHelper(),
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
index e5fa44a83e..47fa99e4a0 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.logging.log4j.Logger;
@@ -238,6 +239,9 @@ public abstract class AbstractGatewaySender implements 
InternalGatewaySender, Di
 
   protected boolean enforceThreadsConnectSameReceiver;
 
+  @MutableForTesting
+  public static final AtomicBoolean doSleepForTestingInDistribute = new 
AtomicBoolean(false);
+
   protected AbstractGatewaySender() {
     statisticsClock = disabledClock();
   }
@@ -1122,16 +1126,17 @@ public abstract class AbstractGatewaySender implements 
InternalGatewaySender, Di
       }
 
       // If this gateway is not running, return
-      if (!isRunning()) {
-        if (isPrimary()) {
-          recordDroppedEvent(clonedEvent);
-        }
-        if (isDebugEnabled) {
-          logger.debug("Returning back without putting into the gateway sender 
queue:" + event);
-        }
+      if (!getIsRunningAndDropEventIfNotRunning(event, isDebugEnabled, 
clonedEvent)) {
         return;
       }
 
+      if (AbstractGatewaySender.doSleepForTestingInDistribute.get()) {
+        try {
+          Thread.sleep(5);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+      }
       if (!getLifeCycleLock().readLock().tryLock()) {
         synchronized (queuedEventsSync) {
           if (!enqueuedAllTempQueueEvents) {
@@ -1148,19 +1153,22 @@ public abstract class AbstractGatewaySender implements 
InternalGatewaySender, Di
           }
         }
         if (enqueuedAllTempQueueEvents) {
-          getLifeCycleLock().readLock().lock();
+          try {
+            while (!getLifeCycleLock().readLock().tryLock(10, 
TimeUnit.MILLISECONDS)) {
+              if (!getIsRunningAndDropEventIfNotRunning(event, isDebugEnabled, 
clonedEvent)) {
+                return;
+              }
+            }
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            return;
+          }
         }
       }
       try {
         // If this gateway is not running, return
         // The sender may have stopped, after we have checked the status in 
the beginning.
-        if (!isRunning()) {
-          if (isDebugEnabled) {
-            logger.debug("Returning back without putting into the gateway 
sender queue:" + event);
-          }
-          if (isPrimary()) {
-            recordDroppedEvent(clonedEvent);
-          }
+        if (!getIsRunningAndDropEventIfNotRunning(event, isDebugEnabled, 
clonedEvent)) {
           return;
         }
 
@@ -1205,6 +1213,20 @@ public abstract class AbstractGatewaySender implements 
InternalGatewaySender, Di
     }
   }
 
+  private boolean getIsRunningAndDropEventIfNotRunning(EntryEventImpl event, 
boolean isDebugEnabled,
+      EntryEventImpl clonedEvent) {
+    if (isRunning()) {
+      return true;
+    }
+    if (isPrimary()) {
+      recordDroppedEvent(clonedEvent);
+    }
+    if (isDebugEnabled) {
+      logger.debug("Returning back without putting into the gateway sender 
queue:" + event);
+    }
+    return false;
+  }
+
   private void recordDroppedEvent(EntryEventImpl event) {
     if (eventProcessor != null) {
       eventProcessor.registerEventDroppedInPrimaryQueue(event);
diff --git 
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
 
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
index ce7d19abc5..89daabc9bf 100644
--- 
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
+++ 
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
@@ -917,6 +917,13 @@ public class WANTestBase extends DistributedTestCase {
     createCache(true, locPort);
   }
 
+  public static void createCacheConserveSocketsInVMs(Boolean conserveSockets, 
Integer locPort,
+      VM... vms) {
+    for (VM vm : vms) {
+      vm.invoke(() -> createCacheConserveSockets(conserveSockets, locPort));
+    }
+  }
+
   public static void createCacheConserveSockets(Boolean conserveSockets, 
Integer locPort) {
     WANTestBase test = new WANTestBase();
     Properties props = test.getDistributedSystemProperties();
diff --git 
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDistributedTest.java
 
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDistributedTest.java
index bc7ea22b21..4e3e4850d3 100644
--- 
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDistributedTest.java
+++ 
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDistributedTest.java
@@ -59,6 +59,7 @@ import org.apache.geode.cache.RegionShortcut;
 import org.apache.geode.cache.wan.GatewayEventFilter;
 import org.apache.geode.cache.wan.GatewaySender;
 import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.ClusterOperationExecutors;
 import org.apache.geode.distributed.internal.DistributionMessage;
 import org.apache.geode.distributed.internal.DistributionMessageObserver;
 import org.apache.geode.internal.cache.BucketRegion;
@@ -346,6 +347,66 @@ public class 
ParallelGatewaySenderOperationsDistributedTest extends WANTestBase
     vm2.invoke(() -> validateRegionSizeRemainsSame(getUniqueName() + "_PR", 
100));
   }
 
+  /**
+   * Verifies that no distributed deadlock occurs when stopping a gateway 
sender while receiving
+   * traffic.
+   * The distributed deadlock may occur when the gateway sender tries to get 
the
+   * size of the gateway sender queue (sending a size message to other 
members) while holding the
+   * lifeCycleLock lock. This lock is also taken when an event is to be 
distributed by the gateway
+   * sender.
+   * As this issue has only been observed in the field with a lot of traffic, 
in order to reproduce
+   * it in a test case, conserve-sockets is set to true (although the deadlock 
has also
+   * been seen with conserve-sockets=false), the size of the PartitionedRegion 
thread pool is set
+   * to a small value and an artificial timeout is added at a point in the 
distribute() call
+   * of the AbstractGatewaySeder class.
+   */
+  @Test
+  public void testNoDistributedDeadlockWithGatewaySenderStop() throws 
Exception {
+    addIgnoredException("Broken pipe");
+    Integer[] locatorPorts = createLNAndNYLocators();
+    Integer lnPort = locatorPorts[0];
+    Integer nyPort = locatorPorts[1];
+    VM[] senders = {vm4, vm5, vm6, vm7};
+    try {
+      for (VM sender : senders) {
+        sender.invoke(() -> 
AbstractGatewaySender.doSleepForTestingInDistribute.set(true));
+        sender.invoke(() -> 
ClusterOperationExecutors.maxPrThreadsForTest.set(2));
+      }
+      vm2.invoke(() -> ClusterOperationExecutors.maxPrThreadsForTest.set(2));
+      vm3.invoke(() -> ClusterOperationExecutors.maxPrThreadsForTest.set(2));
+
+      createSendersReceiversAndPartitionedRegion(lnPort, nyPort, false, true, 
true);
+
+      // make sure all the senders are running before doing any puts
+      waitForSendersRunning();
+
+      // Send a fairly big amount of operations to provoke the deadlock
+      int invocationsPerServer = 4;
+      AsyncInvocation[] invocations = new AsyncInvocation[senders.length * 
invocationsPerServer];
+      for (int i = 0; i < senders.length; i++) {
+        for (int j = 0; j < invocationsPerServer; j++) {
+          invocations[i + (j * invocationsPerServer)] =
+              senders[i].invokeAsync(() -> doPuts(getUniqueName() + "_PR", 
100));
+        }
+      }
+
+      // Wait for some elements to be replicated before stopping the senders
+      for (int i = 0; i < senders.length; i++) {
+        senders[i].invoke(() -> await()
+            .untilAsserted(() -> assertThat(getSenderStats("ln", 
-1).get(3)).isGreaterThan(1)));
+      }
+
+      stopSendersAsync();
+      for (int i = 0; i < invocations.length; i++) {
+        invocations[i].await();
+      }
+    } finally {
+      for (int i = 0; i < senders.length; i++) {
+        senders[i].invoke(() -> 
AbstractGatewaySender.doSleepForTestingInDistribute.set(false));
+      }
+    }
+  }
+
   /**
    * Normal scenario in which a sender is stopped and then started again.
    */
@@ -1271,7 +1332,13 @@ public class 
ParallelGatewaySenderOperationsDistributedTest extends WANTestBase
 
   private void createSendersReceiversAndPartitionedRegion(Integer lnPort, 
Integer nyPort,
       boolean createAccessors, boolean startSenders) {
-    createSendersAndReceivers(lnPort, nyPort);
+    createSendersReceiversAndPartitionedRegion(lnPort, nyPort, 
createAccessors, startSenders,
+        false);
+  }
+
+  private void createSendersReceiversAndPartitionedRegion(Integer lnPort, 
Integer nyPort,
+      boolean createAccessors, boolean startSenders, boolean conserveSockets) {
+    createSendersAndReceivers(lnPort, nyPort, conserveSockets);
 
     createPartitionedRegions(createAccessors);
 
@@ -1280,11 +1347,11 @@ public class 
ParallelGatewaySenderOperationsDistributedTest extends WANTestBase
     }
   }
 
-  private void createSendersAndReceivers(Integer lnPort, Integer nyPort) {
-    createCacheInVMs(nyPort, vm2, vm3);
+  private void createSendersAndReceivers(Integer lnPort, Integer nyPort, 
boolean conserveSockets) {
+    createCacheConserveSocketsInVMs(conserveSockets, nyPort, vm2, vm3);
     createReceiverInVMs(vm2, vm3);
 
-    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+    createCacheConserveSocketsInVMs(conserveSockets, lnPort, vm4, vm5, vm6, 
vm7);
 
     vm4.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, 
true));
     vm5.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, 
true));
@@ -1578,6 +1645,17 @@ public class 
ParallelGatewaySenderOperationsDistributedTest extends WANTestBase
     vm7.invoke(() -> stopSender("ln"));
   }
 
+  private void stopSendersAsync() throws InterruptedException {
+    AsyncInvocation inv1 = vm4.invokeAsync(() -> stopSender("ln"));
+    AsyncInvocation inv2 = vm5.invokeAsync(() -> stopSender("ln"));
+    AsyncInvocation inv3 = vm6.invokeAsync(() -> stopSender("ln"));
+    AsyncInvocation inv4 = vm7.invokeAsync(() -> stopSender("ln"));
+    inv1.await();
+    inv2.await();
+    inv3.await();
+    inv4.await();
+  }
+
   private void waitForSendersRunning() {
     vm4.invoke(() -> waitForSenderRunningState("ln"));
     vm5.invoke(() -> waitForSenderRunningState("ln"));

Reply via email to