This is an automated email from the ASF dual-hosted git repository. burcham pushed a commit to branch support/1.12 in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/support/1.12 by this push: new 6fe63cc GEODE-7920: Geode UDP INT thread found processing cache operations (#7018) 6fe63cc is described below commit 6fe63cc3dc586824ea827300728688fafcd3d002 Author: Bill Burcham <bill.burc...@gmail.com> AuthorDate: Fri Oct 22 10:43:57 2021 -0700 GEODE-7920: Geode UDP INT thread found processing cache operations (#7018) Modified DistributionMessage to look for JGroups "internal" executor threads. We thought we'd turned off all JGroups thread pools but this one is still around. We don't want to process DistributionMessages in these threads unless absolutely necessary since they're needed when processing incoming messages. (cherry picked from commit 8f24abbc7bade3f8e65a52a1a0402343e12ba2b5) Co-authored-by: Bruce Schuchardt <bschucha...@pivotal.io> --- .../internal/ClusterOperationExecutors.java | 2 +- .../distributed/internal/DistributionMessage.java | 19 +++++++++++++------ .../geode/distributed/internal/ShutdownMessage.java | 2 +- .../internal/ThrottlingMemLinkedQueueWithDMStats.java | 2 +- .../distributed/internal/DistributionMessageTest.java | 12 ++++++++++++ 5 files changed, 28 insertions(+), 9 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterOperationExecutors.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterOperationExecutors.java index 586f5dc..ab85e7a 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterOperationExecutors.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterOperationExecutors.java @@ -738,7 +738,7 @@ public class ClusterOperationExecutors implements OperationExecutors { // UDP readers are throttled in the FC protocol, which queries // the queue to see if it should throttle if (stats.getInternalSerialQueueBytes() > TOTAL_SERIAL_QUEUE_THROTTLE - && !DistributionMessage.isPreciousThread()) { + && !DistributionMessage.isMembershipMessengerThread()) { do { boolean interrupted = Thread.interrupted(); try { diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionMessage.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionMessage.java index 4fed3c1..0f11b6b 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionMessage.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionMessage.java @@ -412,7 +412,8 @@ public abstract class DistributionMessage */ protected void schedule(final ClusterDistributionManager dm) { boolean inlineProcess = INLINE_PROCESS - && getProcessorType() == OperationExecutors.SERIAL_EXECUTOR && !isPreciousThread(); + && getProcessorType() == OperationExecutors.SERIAL_EXECUTOR + && !isMembershipMessengerThread(); boolean forceInline = this.acker != null || getInlineProcess() || Connection.isDominoThread(); @@ -476,13 +477,19 @@ public abstract class DistributionMessage } /** - * returns true if the current thread should not be used for inline processing. i.e., it is a - * "precious" resource + * returns true if the current thread should not be used for inline processing because it + * is responsible for reading geode-membership messages. Blocking such a thread can cause + * a server to be kicked out */ - public static boolean isPreciousThread() { + public static boolean isMembershipMessengerThread() { String thrname = Thread.currentThread().getName(); - // return thrname.startsWith("Geode UDP"); - return thrname.startsWith("unicast receiver") || thrname.startsWith("multicast receiver"); + + return isMembershipMessengerThreadName(thrname); + } + + public static boolean isMembershipMessengerThreadName(String thrname) { + return thrname.startsWith("unicast receiver") || thrname.startsWith("multicast receiver") + || thrname.startsWith("Geode UDP"); } diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/ShutdownMessage.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/ShutdownMessage.java index 66cadc4..2e1ebc4 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/ShutdownMessage.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/ShutdownMessage.java @@ -79,7 +79,7 @@ public class ShutdownMessage extends HighPriorityDistributionMessage // reply.setRecipient(getSender()); // can't send a response in a UDP receiver thread or we might miss // the other side going away due to blocking receipt of views - // if (DistributionMessage.isPreciousThread()) { + // if (DistributionMessage.isMembershipMessengerThread()) { // dm.getWaitingThreadPool().execute(new Runnable() { // public void run() { // dm.putOutgoing(reply); diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/ThrottlingMemLinkedQueueWithDMStats.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/ThrottlingMemLinkedQueueWithDMStats.java index 690cdfa..40d9509 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/ThrottlingMemLinkedQueueWithDMStats.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/ThrottlingMemLinkedQueueWithDMStats.java @@ -103,7 +103,7 @@ public class ThrottlingMemLinkedQueueWithDMStats<E> extends OverflowQueueWithDMS } // only block threads reading from tcp stream sockets. blocking udp // will cause retransmission storms - if (!DistributionMessage.isPreciousThread()) { + if (!DistributionMessage.isMembershipMessengerThread()) { long startTime = DistributionStats.getStatTime(); do { try { diff --git a/geode-core/src/test/java/org/apache/geode/distributed/internal/DistributionMessageTest.java b/geode-core/src/test/java/org/apache/geode/distributed/internal/DistributionMessageTest.java index 91027e1..7bc854a 100644 --- a/geode-core/src/test/java/org/apache/geode/distributed/internal/DistributionMessageTest.java +++ b/geode-core/src/test/java/org/apache/geode/distributed/internal/DistributionMessageTest.java @@ -14,10 +14,14 @@ */ package org.apache.geode.distributed.internal; +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import java.util.Arrays; +import java.util.List; + import org.junit.Test; import org.junit.experimental.categories.Category; @@ -35,4 +39,12 @@ public class DistributionMessageTest { verify(mockDistributionMessage, times(1)).setReplySender(mockReplySender); } + + @Test + public void membershipMessengerThreadsAreRecognized() { + List<String> threadNames = Arrays.asList("unicast receiver", "multicast receiver", "Geode UDP"); + for (String threadName : threadNames) { + assertThat(DistributionMessage.isMembershipMessengerThreadName(threadName)).isTrue(); + } + } }