This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 46ee939  Internode messaging catches OOMs and does not rethrow
46ee939 is described below

commit 46ee939b957528185dc6bbd3028c1d6e695163e7
Author: Yifan Cai <yc25c...@gmail.com>
AuthorDate: Thu Nov 19 09:13:30 2020 -0800

    Internode messaging catches OOMs and does not rethrow
    
    patch by Yifan Cai; reviewed by David Capwell, Jordan West for 
CASSANDRA-15214
---
 CHANGES.txt                                        |  1 +
 .../cassandra/net/InboundMessageHandler.java       |  6 +--
 .../apache/cassandra/net/OutboundConnection.java   |  6 +--
 .../cassandra/net/OutboundConnectionInitiator.java |  2 +-
 .../org/apache/cassandra/transport/Message.java    |  2 +
 .../cassandra/utils/JVMStabilityInspector.java     | 51 +++++++++++++++++-----
 .../cassandra/utils/JVMStabilityInspectorTest.java | 17 ++++++++
 7 files changed, 66 insertions(+), 19 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 0d93b85..beb878a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -10,6 +10,7 @@
  * Invalid serialized size for responses caused by increasing message time by 
1ms which caused extra bytes in size calculation (CASSANDRA-16103)
  * Throw BufferOverflowException from DataOutputBuffer for better visibility 
(CASSANDRA-16214)
  * TLS connections to the storage port on a node without server encryption 
configured causes java.io.IOException accessing missing keystore 
(CASSANDRA-16144)
+ * Internode messaging catches OOMs and does not rethrow (CASSANDRA-15214)
 Merged from 3.11:
  * SASI's `max_compaction_flush_memory_in_mb` settings over 100GB revert to 
default of 1GB (CASSANDRA-16071)
 Merged from 3.0:
diff --git a/src/java/org/apache/cassandra/net/InboundMessageHandler.java 
b/src/java/org/apache/cassandra/net/InboundMessageHandler.java
index 534128e..2cac3eb 100644
--- a/src/java/org/apache/cassandra/net/InboundMessageHandler.java
+++ b/src/java/org/apache/cassandra/net/InboundMessageHandler.java
@@ -329,7 +329,7 @@ public class InboundMessageHandler extends 
ChannelInboundHandlerAdapter implemen
         }
         catch (Throwable t)
         {
-            JVMStabilityInspector.inspectThrowable(t, false);
+            JVMStabilityInspector.inspectThrowable(t);
             callbacks.onFailedDeserialize(size, header, t);
             logger.error("{} unexpected exception caught while deserializing a 
message", id(), t);
         }
@@ -648,7 +648,7 @@ public class InboundMessageHandler extends 
ChannelInboundHandlerAdapter implemen
     {
         decoder.discard();
 
-        JVMStabilityInspector.inspectThrowable(cause, false);
+        JVMStabilityInspector.inspectThrowable(cause);
 
         if (cause instanceof Message.InvalidLegacyProtocolMagic)
             logger.error("{} invalid, unrecoverable CRC mismatch detected 
while reading messages - closing the connection", id());
@@ -832,7 +832,7 @@ public class InboundMessageHandler extends 
ChannelInboundHandlerAdapter implemen
             }
             catch (Throwable t)
             {
-                JVMStabilityInspector.inspectThrowable(t, false);
+                JVMStabilityInspector.inspectThrowable(t);
                 callbacks.onFailedDeserialize(size, header, t);
                 logger.error("{} unexpected exception caught while 
deserializing a message", id(), t);
             }
diff --git a/src/java/org/apache/cassandra/net/OutboundConnection.java 
b/src/java/org/apache/cassandra/net/OutboundConnection.java
index 79c0459..98c034c 100644
--- a/src/java/org/apache/cassandra/net/OutboundConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundConnection.java
@@ -485,7 +485,7 @@ public class OutboundConnection
      */
     private void onFailedSerialize(Message<?> message, int messagingVersion, 
int bytesWrittenToNetwork, Throwable t)
     {
-        JVMStabilityInspector.inspectThrowable(t, false);
+        JVMStabilityInspector.inspectThrowable(t);
         releaseCapacity(1, canonicalSize(message));
         errorCount += 1;
         errorBytes += message.serializedSize(messagingVersion);
@@ -1047,7 +1047,7 @@ public class OutboundConnection
 
     private void invalidateChannel(Established established, Throwable cause)
     {
-        JVMStabilityInspector.inspectThrowable(cause, false);
+        JVMStabilityInspector.inspectThrowable(cause);
 
         if (state != established)
             return; // do nothing; channel already invalidated
@@ -1093,7 +1093,7 @@ public class OutboundConnection
                 else
                     noSpamLogger.error("{} failed to connect", id(), cause);
 
-                JVMStabilityInspector.inspectThrowable(cause, false);
+                JVMStabilityInspector.inspectThrowable(cause);
 
                 if (hasPending())
                 {
diff --git a/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java 
b/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java
index 2c26005..f1fa6b7 100644
--- a/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java
+++ b/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java
@@ -382,7 +382,7 @@ public class OutboundConnectionInitiator<SuccessType 
extends OutboundConnectionI
 
             try
             {
-                JVMStabilityInspector.inspectThrowable(cause, false);
+                JVMStabilityInspector.inspectThrowable(cause);
                 resultPromise.tryFailure(cause);
                 if (isCausedByConnectionReset(cause))
                     logger.info("Failed to connect to peer {}", 
settings.connectToId(), cause);
diff --git a/src/java/org/apache/cassandra/transport/Message.java 
b/src/java/org/apache/cassandra/transport/Message.java
index 09ea2e5..59195c4 100644
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@ -805,6 +805,8 @@ public abstract class Message
                     });
                 }
             }
+
+            JVMStabilityInspector.inspectThrowable(cause);
         }
     }
 
diff --git a/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java 
b/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java
index ae28410..6f4e4c6 100644
--- a/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java
+++ b/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java
@@ -19,6 +19,9 @@ package org.apache.cassandra.utils;
 
 import java.io.FileNotFoundException;
 import java.net.SocketException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
@@ -28,6 +31,7 @@ import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import net.nicoulaj.compilecommand.annotations.Exclude;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -60,17 +64,12 @@ public final class JVMStabilityInspector
      */
     public static void inspectThrowable(Throwable t) throws OutOfMemoryError
     {
-        inspectThrowable(t, true);
-    }
-
-    public static void inspectThrowable(Throwable t, boolean 
propagateOutOfMemory) throws OutOfMemoryError
-    {
-        inspectThrowable(t, propagateOutOfMemory, 
JVMStabilityInspector::inspectDiskError);
+        inspectThrowable(t, JVMStabilityInspector::inspectDiskError);
     }
 
     public static void inspectCommitLogThrowable(Throwable t)
     {
-        inspectThrowable(t, true, 
JVMStabilityInspector::inspectCommitLogError);
+        inspectThrowable(t, JVMStabilityInspector::inspectCommitLogError);
     }
 
     private static void inspectDiskError(Throwable t)
@@ -81,7 +80,7 @@ public final class JVMStabilityInspector
             FileUtils.handleFSError((FSError) t);
     }
 
-    public static void inspectThrowable(Throwable t, boolean 
propagateOutOfMemory, Consumer<Throwable> fn) throws OutOfMemoryError
+    public static void inspectThrowable(Throwable t, Consumer<Throwable> fn) 
throws OutOfMemoryError
     {
         boolean isUnstable = false;
         if (t instanceof OutOfMemoryError)
@@ -102,11 +101,11 @@ public final class JVMStabilityInspector
             logger.error("OutOfMemory error letting the JVM handle the 
error:", t);
 
             StorageService.instance.removeShutdownHook();
+
+            forceHeapSpaceOomMaybe((OutOfMemoryError) t);
+
             // We let the JVM handle the error. The startup checks should have 
warned the user if it did not configure
             // the JVM behavior in case of OOM (CASSANDRA-13006).
-            if (!propagateOutOfMemory)
-                return;
-
             throw (OutOfMemoryError) t;
         }
 
@@ -125,7 +124,35 @@ public final class JVMStabilityInspector
             killer.killCurrentJVM(t);
 
         if (t.getCause() != null)
-            inspectThrowable(t.getCause(), propagateOutOfMemory, fn);
+            inspectThrowable(t.getCause(), fn);
+    }
+
+    /**
+     * Intentionally produce a heap space OOM upon seeing a Direct buffer 
memory OOM.
+     * Direct buffer OOM cannot trigger JVM OOM error related options,
+     * e.g. OnOutOfMemoryError, HeapDumpOnOutOfMemoryError, etc.
+     * See CASSANDRA-15214 for more details
+     */
+    @Exclude // Exclude from just in time compilation.
+    private static void forceHeapSpaceOomMaybe(OutOfMemoryError oom)
+    {
+        // See the oom thrown from java.nio.Bits.reserveMemory.
+        // In jdk 13 and up, the message is "Cannot reserve XX bytes of direct 
buffer memory (...)"
+        // In jdk 11 and below, the message is "Direct buffer memory"
+        if ((oom.getMessage() != null && 
oom.getMessage().toLowerCase().contains("direct buffer memory")) ||
+            Arrays.stream(oom.getStackTrace()).anyMatch(x -> 
x.getClassName().equals("java.nio.Bits")
+                                                             && 
x.getMethodName().equals("reserveMemory")))
+        {
+            logger.error("Force heap space OutOfMemoryError in the presence 
of", oom);
+            // Start to produce heap space OOM forcibly.
+            List<long[]> ignored = new ArrayList<>();
+            while (true)
+            {
+                // java.util.AbstractCollection.MAX_ARRAY_SIZE is defined as 
Integer.MAX_VALUE - 8
+                // so Integer.MAX_VALUE / 2 should be a large enough and safe 
size to request.
+                ignored.add(new long[Integer.MAX_VALUE / 2]);
+            }
+        }
     }
 
     private static void inspectCommitLogError(Throwable t)
diff --git 
a/test/unit/org/apache/cassandra/utils/JVMStabilityInspectorTest.java 
b/test/unit/org/apache/cassandra/utils/JVMStabilityInspectorTest.java
index 3b1056d..109fdb1 100644
--- a/test/unit/org/apache/cassandra/utils/JVMStabilityInspectorTest.java
+++ b/test/unit/org/apache/cassandra/utils/JVMStabilityInspectorTest.java
@@ -31,7 +31,9 @@ import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.sstable.CorruptSSTableException;
 
 import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -109,6 +111,21 @@ public class JVMStabilityInspectorTest
     }
 
     @Test
+    public void testForceHeapSpaceOom()
+    {
+        try
+        {
+            JVMStabilityInspector.inspectThrowable(new 
OutOfMemoryError("Direct buffer memory"));
+            fail("The JVMStabilityInspector should force trigger a heap space 
OutOfMemoryError and delegate the handling to the JVM");
+        }
+        catch (Throwable e)
+        {
+            assertSame(e.getClass(), OutOfMemoryError.class);
+            assertEquals("Java heap space", e.getMessage());
+        }
+    }
+
+    @Test
     public void fileHandleTest()
     {
         KillerForTests killerForTests = new KillerForTests();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to