This is an automated email from the ASF dual-hosted git repository. jonmeredith pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new 431c692 Failed inbound internode authentication failures generate ugly warning with stack trace 431c692 is described below commit 431c692b884395b16beadac1d10f7e674b3e7ed5 Author: Jon Meredith <https://jonmered...@apache.org> AuthorDate: Tue Mar 15 16:17:43 2022 -0600 Failed inbound internode authentication failures generate ugly warning with stack trace patch by Jon Meredith; reviewed by David Capwell for CASSANDRA-17068 --- CHANGES.txt | 1 + .../cassandra/net/InboundConnectionInitiator.java | 48 ++++++++++++++++++---- .../test/InternodeErrorExclusionTest.java | 30 ++------------ .../apache/cassandra/net/MessagingServiceTest.java | 48 ++++++++++++++++++++-- 4 files changed, 90 insertions(+), 37 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 3fc2300..2e3478c 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.1 + * Failed inbound internode authentication failures generate ugly warning with stack trace (CASSANDRA-17068) * Expose gossip information in system_views.gossip_info virtual table (CASSANDRA-17002) * Add guardrails for collection items and size (CASSANDRA-17153) * Improve guardrails messages (CASSANDRA-17430) diff --git a/src/java/org/apache/cassandra/net/InboundConnectionInitiator.java b/src/java/org/apache/cassandra/net/InboundConnectionInitiator.java index 15202f3..c5ed064 100644 --- a/src/java/org/apache/cassandra/net/InboundConnectionInitiator.java +++ b/src/java/org/apache/cassandra/net/InboundConnectionInitiator.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.List; +import java.util.NoSuchElementException; import java.util.concurrent.Future; import java.util.function.Consumer; @@ -232,20 +233,30 @@ public class InboundConnectionInitiator failHandshake(ctx); }, HandshakeProtocol.TIMEOUT_MILLIS, MILLISECONDS); - authenticate(ctx.channel().remoteAddress()); + if (!authenticate(ctx.channel().remoteAddress())) + { + failHandshake(ctx); + } } - private void authenticate(SocketAddress socketAddress) throws IOException + private boolean authenticate(SocketAddress socketAddress) throws IOException { if (socketAddress.getClass().getSimpleName().equals("EmbeddedSocketAddress")) - return; + return true; if (!(socketAddress instanceof InetSocketAddress)) throw new IOException(String.format("Unexpected SocketAddress type: %s, %s", socketAddress.getClass(), socketAddress)); InetSocketAddress addr = (InetSocketAddress)socketAddress; if (!settings.authenticate(addr.getAddress(), addr.getPort())) - throw new IOException("Authentication failure for inbound connection from peer " + addr); + { + // Log at info level as anything that can reach the inbound port could hit this + // and trigger a log of noise. Failed outbound connections to known cluster endpoints + // still fail with an ERROR message and exception to alert operators that aren't watching logs closely. + logger.info("Authenticate rejected inbound internode connection from {}", addr); + return false; + } + return true; } @Override @@ -376,14 +387,22 @@ public class InboundConnectionInitiator private void exceptionCaught(Channel channel, Throwable cause) { - logger.error("Failed to properly handshake with peer {}. Closing the channel.", channel.remoteAddress(), cause); + final SocketAddress remoteAddress = channel.remoteAddress(); + boolean reportingExclusion = DatabaseDescriptor.getInternodeErrorReportingExclusions().contains(remoteAddress); + + if (reportingExclusion) + logger.debug("Excluding internode exception for {}; address contained in internode_error_reporting_exclusions", remoteAddress, cause); + else + logger.error("Failed to properly handshake with peer {}. Closing the channel.", remoteAddress, cause); + try { failHandshake(channel); } catch (Throwable t) { - logger.error("Unexpected exception in {}.exceptionCaught", this.getClass().getSimpleName(), t); + if (!reportingExclusion) + logger.error("Unexpected exception in {}.exceptionCaught", this.getClass().getSimpleName(), t); } } @@ -394,9 +413,24 @@ public class InboundConnectionInitiator private void failHandshake(Channel channel) { - channel.close(); + // Cancel the handshake timeout as early as possible as it calls this method if (handshakeTimeout != null) handshakeTimeout.cancel(true); + + // prevent further decoding of buffered data by removing this handler before closing + // otherwise the pending bytes will be decoded again on close, throwing further exceptions. + try + { + channel.pipeline().remove(this); + } + catch (NoSuchElementException ex) + { + // possible race with the handshake timeout firing and removing this handler already + } + finally + { + channel.close(); + } } private void setupStreamingPipeline(InetAddressAndPort from, ChannelHandlerContext ctx) diff --git a/test/distributed/org/apache/cassandra/distributed/test/InternodeErrorExclusionTest.java b/test/distributed/org/apache/cassandra/distributed/test/InternodeErrorExclusionTest.java index e91e167..08fd122 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/InternodeErrorExclusionTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/InternodeErrorExclusionTest.java @@ -19,7 +19,6 @@ package org.apache.cassandra.distributed.test; import java.io.IOException; -import java.net.InetAddress; import java.util.Arrays; import java.util.concurrent.TimeoutException; @@ -28,12 +27,9 @@ import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; -import org.apache.cassandra.auth.IInternodeAuthenticator; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.distributed.Cluster; import org.apache.cassandra.distributed.api.Feature; -import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.net.InboundConnectionInitiator; import org.apache.cassandra.transport.SimpleClient; import static org.assertj.core.api.Assertions.assertThat; @@ -46,13 +42,14 @@ public class InternodeErrorExclusionTest extends TestBaseImpl DatabaseDescriptor.clientInitialization(); } + // Connect a simple native client to the internode port (which fails on the protocol magic check) + // and make sure the exception is swallowed. @Test - public void ignoreAuthErrors() throws IOException, TimeoutException + public void ignoreExcludedInternodeErrors() throws IOException, TimeoutException { try (Cluster cluster = Cluster.build(1) .withConfig(c -> c .with(Feature.NETWORK) - .set("internode_authenticator", AlwaysFailingIInternodeAuthenticator.class.getName()) .set("internode_error_reporting_exclusions", ImmutableMap.of("subnets", Arrays.asList("127.0.0.1")))) .start()) { @@ -68,25 +65,4 @@ public class InternodeErrorExclusionTest extends TestBaseImpl assertThat(cluster.get(1).logs().watchFor("address contained in internode_error_reporting_exclusions").getResult()).hasSize(1); } } - - public static class AlwaysFailingIInternodeAuthenticator implements IInternodeAuthenticator - { - @Override - public boolean authenticate(InetAddress remoteAddress, int remotePort) - { - String klass = InboundConnectionInitiator.class.getName(); - for (StackTraceElement e : Thread.currentThread().getStackTrace()) - { - if (e.getClassName().startsWith(klass)) - return false; - } - return true; - } - - @Override - public void validateConfiguration() throws ConfigurationException - { - - } - } } diff --git a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java index 3b5959b..733584b 100644 --- a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java +++ b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java @@ -20,8 +20,11 @@ */ package org.apache.cassandra.net; +import java.io.IOException; import java.net.InetAddress; +import java.net.InetSocketAddress; import java.net.UnknownHostException; +import java.nio.channels.AsynchronousSocketChannel; import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; @@ -29,6 +32,9 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.*; import java.util.regex.Matcher; @@ -44,6 +50,7 @@ import org.apache.cassandra.metrics.MessagingMetrics; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.utils.FBUtilities; +import org.awaitility.Awaitility; import org.caffinitas.ohc.histo.EstimatedHistogram; import org.junit.After; import org.junit.Assert; @@ -57,10 +64,12 @@ import static org.junit.Assert.*; public class MessagingServiceTest { private final static long[] bucketOffsets = new EstimatedHistogram(160).getBucketOffsets(); + public static AtomicInteger rejectedConnections = new AtomicInteger(); public static final IInternodeAuthenticator ALLOW_NOTHING_AUTHENTICATOR = new IInternodeAuthenticator() { public boolean authenticate(InetAddress remoteAddress, int remotePort) { + rejectedConnections.incrementAndGet(); return false; } @@ -92,6 +101,7 @@ public class MessagingServiceTest messagingService.metrics.resetDroppedMessages(); messagingService.closeOutbound(InetAddressAndPort.getByName("127.0.0.2")); messagingService.closeOutbound(InetAddressAndPort.getByName("127.0.0.3")); + DatabaseDescriptor.setInternodeAuthenticator(originalAuthenticator); } @After @@ -216,21 +226,53 @@ public class MessagingServiceTest * @throws Exception */ @Test - public void testFailedInternodeAuth() throws Exception + public void testFailedOutboundInternodeAuth() throws Exception { MessagingService ms = MessagingService.instance(); DatabaseDescriptor.setInternodeAuthenticator(ALLOW_NOTHING_AUTHENTICATOR); InetAddressAndPort address = InetAddressAndPort.getByName("127.0.0.250"); //Should return null - Message messageOut = Message.out(Verb.ECHO_REQ, NoPayload.noPayload); - assertFalse(ms.isConnected(address, messageOut)); + int rejectedBefore = rejectedConnections.get(); + Message<?> messageOut = Message.out(Verb.ECHO_REQ, NoPayload.noPayload); + ms.send(messageOut, address); + Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> rejectedConnections.get() > rejectedBefore); //Should tolerate null ms.closeOutbound(address); ms.send(messageOut, address); } + @Test + public void testFailedInboundInternodeAuth() throws IOException, InterruptedException + { + ServerEncryptionOptions serverEncryptionOptions = new ServerEncryptionOptions() + .withInternodeEncryption(ServerEncryptionOptions.InternodeEncryption.none); + + DatabaseDescriptor.setInternodeAuthenticator(ALLOW_NOTHING_AUTHENTICATOR); + InetAddress listenAddress = FBUtilities.getJustLocalAddress(); + + InboundConnectionSettings settings = new InboundConnectionSettings().withEncryption(serverEncryptionOptions); + InboundSockets connections = new InboundSockets(settings); + + try (AsynchronousSocketChannel testChannel = AsynchronousSocketChannel.open()) + { + connections.open().await(); + Assert.assertTrue(connections.isListening()); + + int rejectedBefore = rejectedConnections.get(); + Future<Void> connectFuture = testChannel.connect(new InetSocketAddress(listenAddress, DatabaseDescriptor.getStoragePort())); + Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> rejectedConnections.get() > rejectedBefore); + + connectFuture.cancel(true); + } + finally + { + connections.close().await(); + Assert.assertFalse(connections.isListening()); + } + } + // @Test // public void reconnectWithNewIp() throws Exception // { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org