This is an automated email from the ASF dual-hosted git repository. dcapwell pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new 46ee939 Internode messaging catches OOMs and does not rethrow 46ee939 is described below commit 46ee939b957528185dc6bbd3028c1d6e695163e7 Author: Yifan Cai <yc25c...@gmail.com> AuthorDate: Thu Nov 19 09:13:30 2020 -0800 Internode messaging catches OOMs and does not rethrow patch by Yifan Cai; reviewed by David Capwell, Jordan West for CASSANDRA-15214 --- CHANGES.txt | 1 + .../cassandra/net/InboundMessageHandler.java | 6 +-- .../apache/cassandra/net/OutboundConnection.java | 6 +-- .../cassandra/net/OutboundConnectionInitiator.java | 2 +- .../org/apache/cassandra/transport/Message.java | 2 + .../cassandra/utils/JVMStabilityInspector.java | 51 +++++++++++++++++----- .../cassandra/utils/JVMStabilityInspectorTest.java | 17 ++++++++ 7 files changed, 66 insertions(+), 19 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 0d93b85..beb878a 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -10,6 +10,7 @@ * Invalid serialized size for responses caused by increasing message time by 1ms which caused extra bytes in size calculation (CASSANDRA-16103) * Throw BufferOverflowException from DataOutputBuffer for better visibility (CASSANDRA-16214) * TLS connections to the storage port on a node without server encryption configured causes java.io.IOException accessing missing keystore (CASSANDRA-16144) + * Internode messaging catches OOMs and does not rethrow (CASSANDRA-15214) Merged from 3.11: * SASI's `max_compaction_flush_memory_in_mb` settings over 100GB revert to default of 1GB (CASSANDRA-16071) Merged from 3.0: diff --git a/src/java/org/apache/cassandra/net/InboundMessageHandler.java b/src/java/org/apache/cassandra/net/InboundMessageHandler.java index 534128e..2cac3eb 100644 --- a/src/java/org/apache/cassandra/net/InboundMessageHandler.java +++ b/src/java/org/apache/cassandra/net/InboundMessageHandler.java @@ -329,7 +329,7 @@ public class InboundMessageHandler extends ChannelInboundHandlerAdapter implemen } catch (Throwable t) { - JVMStabilityInspector.inspectThrowable(t, false); + JVMStabilityInspector.inspectThrowable(t); callbacks.onFailedDeserialize(size, header, t); logger.error("{} unexpected exception caught while deserializing a message", id(), t); } @@ -648,7 +648,7 @@ public class InboundMessageHandler extends ChannelInboundHandlerAdapter implemen { decoder.discard(); - JVMStabilityInspector.inspectThrowable(cause, false); + JVMStabilityInspector.inspectThrowable(cause); if (cause instanceof Message.InvalidLegacyProtocolMagic) logger.error("{} invalid, unrecoverable CRC mismatch detected while reading messages - closing the connection", id()); @@ -832,7 +832,7 @@ public class InboundMessageHandler extends ChannelInboundHandlerAdapter implemen } catch (Throwable t) { - JVMStabilityInspector.inspectThrowable(t, false); + JVMStabilityInspector.inspectThrowable(t); callbacks.onFailedDeserialize(size, header, t); logger.error("{} unexpected exception caught while deserializing a message", id(), t); } diff --git a/src/java/org/apache/cassandra/net/OutboundConnection.java b/src/java/org/apache/cassandra/net/OutboundConnection.java index 79c0459..98c034c 100644 --- a/src/java/org/apache/cassandra/net/OutboundConnection.java +++ b/src/java/org/apache/cassandra/net/OutboundConnection.java @@ -485,7 +485,7 @@ public class OutboundConnection */ private void onFailedSerialize(Message<?> message, int messagingVersion, int bytesWrittenToNetwork, Throwable t) { - JVMStabilityInspector.inspectThrowable(t, false); + JVMStabilityInspector.inspectThrowable(t); releaseCapacity(1, canonicalSize(message)); errorCount += 1; errorBytes += message.serializedSize(messagingVersion); @@ -1047,7 +1047,7 @@ public class OutboundConnection private void invalidateChannel(Established established, Throwable cause) { - JVMStabilityInspector.inspectThrowable(cause, false); + JVMStabilityInspector.inspectThrowable(cause); if (state != established) return; // do nothing; channel already invalidated @@ -1093,7 +1093,7 @@ public class OutboundConnection else noSpamLogger.error("{} failed to connect", id(), cause); - JVMStabilityInspector.inspectThrowable(cause, false); + JVMStabilityInspector.inspectThrowable(cause); if (hasPending()) { diff --git a/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java b/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java index 2c26005..f1fa6b7 100644 --- a/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java +++ b/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java @@ -382,7 +382,7 @@ public class OutboundConnectionInitiator<SuccessType extends OutboundConnectionI try { - JVMStabilityInspector.inspectThrowable(cause, false); + JVMStabilityInspector.inspectThrowable(cause); resultPromise.tryFailure(cause); if (isCausedByConnectionReset(cause)) logger.info("Failed to connect to peer {}", settings.connectToId(), cause); diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java index 09ea2e5..59195c4 100644 --- a/src/java/org/apache/cassandra/transport/Message.java +++ b/src/java/org/apache/cassandra/transport/Message.java @@ -805,6 +805,8 @@ public abstract class Message }); } } + + JVMStabilityInspector.inspectThrowable(cause); } } diff --git a/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java b/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java index ae28410..6f4e4c6 100644 --- a/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java +++ b/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java @@ -19,6 +19,9 @@ package org.apache.cassandra.utils; import java.io.FileNotFoundException; import java.net.SocketException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; @@ -28,6 +31,7 @@ import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import net.nicoulaj.compilecommand.annotations.Exclude; import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; @@ -60,17 +64,12 @@ public final class JVMStabilityInspector */ public static void inspectThrowable(Throwable t) throws OutOfMemoryError { - inspectThrowable(t, true); - } - - public static void inspectThrowable(Throwable t, boolean propagateOutOfMemory) throws OutOfMemoryError - { - inspectThrowable(t, propagateOutOfMemory, JVMStabilityInspector::inspectDiskError); + inspectThrowable(t, JVMStabilityInspector::inspectDiskError); } public static void inspectCommitLogThrowable(Throwable t) { - inspectThrowable(t, true, JVMStabilityInspector::inspectCommitLogError); + inspectThrowable(t, JVMStabilityInspector::inspectCommitLogError); } private static void inspectDiskError(Throwable t) @@ -81,7 +80,7 @@ public final class JVMStabilityInspector FileUtils.handleFSError((FSError) t); } - public static void inspectThrowable(Throwable t, boolean propagateOutOfMemory, Consumer<Throwable> fn) throws OutOfMemoryError + public static void inspectThrowable(Throwable t, Consumer<Throwable> fn) throws OutOfMemoryError { boolean isUnstable = false; if (t instanceof OutOfMemoryError) @@ -102,11 +101,11 @@ public final class JVMStabilityInspector logger.error("OutOfMemory error letting the JVM handle the error:", t); StorageService.instance.removeShutdownHook(); + + forceHeapSpaceOomMaybe((OutOfMemoryError) t); + // We let the JVM handle the error. The startup checks should have warned the user if it did not configure // the JVM behavior in case of OOM (CASSANDRA-13006). - if (!propagateOutOfMemory) - return; - throw (OutOfMemoryError) t; } @@ -125,7 +124,35 @@ public final class JVMStabilityInspector killer.killCurrentJVM(t); if (t.getCause() != null) - inspectThrowable(t.getCause(), propagateOutOfMemory, fn); + inspectThrowable(t.getCause(), fn); + } + + /** + * Intentionally produce a heap space OOM upon seeing a Direct buffer memory OOM. + * Direct buffer OOM cannot trigger JVM OOM error related options, + * e.g. OnOutOfMemoryError, HeapDumpOnOutOfMemoryError, etc. + * See CASSANDRA-15214 for more details + */ + @Exclude // Exclude from just in time compilation. + private static void forceHeapSpaceOomMaybe(OutOfMemoryError oom) + { + // See the oom thrown from java.nio.Bits.reserveMemory. + // In jdk 13 and up, the message is "Cannot reserve XX bytes of direct buffer memory (...)" + // In jdk 11 and below, the message is "Direct buffer memory" + if ((oom.getMessage() != null && oom.getMessage().toLowerCase().contains("direct buffer memory")) || + Arrays.stream(oom.getStackTrace()).anyMatch(x -> x.getClassName().equals("java.nio.Bits") + && x.getMethodName().equals("reserveMemory"))) + { + logger.error("Force heap space OutOfMemoryError in the presence of", oom); + // Start to produce heap space OOM forcibly. + List<long[]> ignored = new ArrayList<>(); + while (true) + { + // java.util.AbstractCollection.MAX_ARRAY_SIZE is defined as Integer.MAX_VALUE - 8 + // so Integer.MAX_VALUE / 2 should be a large enough and safe size to request. + ignored.add(new long[Integer.MAX_VALUE / 2]); + } + } } private static void inspectCommitLogError(Throwable t) diff --git a/test/unit/org/apache/cassandra/utils/JVMStabilityInspectorTest.java b/test/unit/org/apache/cassandra/utils/JVMStabilityInspectorTest.java index 3b1056d..109fdb1 100644 --- a/test/unit/org/apache/cassandra/utils/JVMStabilityInspectorTest.java +++ b/test/unit/org/apache/cassandra/utils/JVMStabilityInspectorTest.java @@ -31,7 +31,9 @@ import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.io.sstable.CorruptSSTableException; import static java.util.Arrays.asList; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -109,6 +111,21 @@ public class JVMStabilityInspectorTest } @Test + public void testForceHeapSpaceOom() + { + try + { + JVMStabilityInspector.inspectThrowable(new OutOfMemoryError("Direct buffer memory")); + fail("The JVMStabilityInspector should force trigger a heap space OutOfMemoryError and delegate the handling to the JVM"); + } + catch (Throwable e) + { + assertSame(e.getClass(), OutOfMemoryError.class); + assertEquals("Java heap space", e.getMessage()); + } + } + + @Test public void fileHandleTest() { KillerForTests killerForTests = new KillerForTests(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org