This is an automated email from the ASF dual-hosted git repository.

jonmeredith pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 431c692  Failed inbound internode authentication failures generate 
ugly warning with stack trace
431c692 is described below

commit 431c692b884395b16beadac1d10f7e674b3e7ed5
Author: Jon Meredith <https://jonmered...@apache.org>
AuthorDate: Tue Mar 15 16:17:43 2022 -0600

    Failed inbound internode authentication failures generate ugly warning with 
stack trace
    
    patch by Jon Meredith; reviewed by David Capwell for CASSANDRA-17068
---
 CHANGES.txt                                        |  1 +
 .../cassandra/net/InboundConnectionInitiator.java  | 48 ++++++++++++++++++----
 .../test/InternodeErrorExclusionTest.java          | 30 ++------------
 .../apache/cassandra/net/MessagingServiceTest.java | 48 ++++++++++++++++++++--
 4 files changed, 90 insertions(+), 37 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 3fc2300..2e3478c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.1
+ * Failed inbound internode authentication failures generate ugly warning with 
stack trace (CASSANDRA-17068)
  * Expose gossip information in system_views.gossip_info virtual table 
(CASSANDRA-17002)
  * Add guardrails for collection items and size (CASSANDRA-17153)
  * Improve guardrails messages (CASSANDRA-17430)
diff --git a/src/java/org/apache/cassandra/net/InboundConnectionInitiator.java 
b/src/java/org/apache/cassandra/net/InboundConnectionInitiator.java
index 15202f3..c5ed064 100644
--- a/src/java/org/apache/cassandra/net/InboundConnectionInitiator.java
+++ b/src/java/org/apache/cassandra/net/InboundConnectionInitiator.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.List;
+import java.util.NoSuchElementException;
 import java.util.concurrent.Future;
 import java.util.function.Consumer;
 
@@ -232,20 +233,30 @@ public class InboundConnectionInitiator
                 failHandshake(ctx);
             }, HandshakeProtocol.TIMEOUT_MILLIS, MILLISECONDS);
 
-            authenticate(ctx.channel().remoteAddress());
+            if (!authenticate(ctx.channel().remoteAddress()))
+            {
+                failHandshake(ctx);
+            }
         }
 
-        private void authenticate(SocketAddress socketAddress) throws 
IOException
+        private boolean authenticate(SocketAddress socketAddress) throws 
IOException
         {
             if 
(socketAddress.getClass().getSimpleName().equals("EmbeddedSocketAddress"))
-                return;
+                return true;
 
             if (!(socketAddress instanceof InetSocketAddress))
                 throw new IOException(String.format("Unexpected SocketAddress 
type: %s, %s", socketAddress.getClass(), socketAddress));
 
             InetSocketAddress addr = (InetSocketAddress)socketAddress;
             if (!settings.authenticate(addr.getAddress(), addr.getPort()))
-                throw new IOException("Authentication failure for inbound 
connection from peer " + addr);
+            {
+                // Log at info level as anything that can reach the inbound 
port could hit this
+                // and trigger a log of noise.  Failed outbound connections to 
known cluster endpoints
+                // still fail with an ERROR message and exception to alert 
operators that aren't watching logs closely.
+                logger.info("Authenticate rejected inbound internode 
connection from {}", addr);
+                return false;
+            }
+            return true;
         }
 
         @Override
@@ -376,14 +387,22 @@ public class InboundConnectionInitiator
 
         private void exceptionCaught(Channel channel, Throwable cause)
         {
-            logger.error("Failed to properly handshake with peer {}. Closing 
the channel.", channel.remoteAddress(), cause);
+            final SocketAddress remoteAddress = channel.remoteAddress();
+            boolean reportingExclusion = 
DatabaseDescriptor.getInternodeErrorReportingExclusions().contains(remoteAddress);
+
+            if (reportingExclusion)
+                logger.debug("Excluding internode exception for {}; address 
contained in internode_error_reporting_exclusions", remoteAddress, cause);
+            else
+                logger.error("Failed to properly handshake with peer {}. 
Closing the channel.", remoteAddress, cause);
+
             try
             {
                 failHandshake(channel);
             }
             catch (Throwable t)
             {
-                logger.error("Unexpected exception in {}.exceptionCaught", 
this.getClass().getSimpleName(), t);
+                if (!reportingExclusion)
+                    logger.error("Unexpected exception in {}.exceptionCaught", 
this.getClass().getSimpleName(), t);
             }
         }
 
@@ -394,9 +413,24 @@ public class InboundConnectionInitiator
 
         private void failHandshake(Channel channel)
         {
-            channel.close();
+            // Cancel the handshake timeout as early as possible as it calls 
this method
             if (handshakeTimeout != null)
                 handshakeTimeout.cancel(true);
+
+            // prevent further decoding of buffered data by removing this 
handler before closing
+            // otherwise the pending bytes will be decoded again on close, 
throwing further exceptions.
+            try
+            {
+                channel.pipeline().remove(this);
+            }
+            catch (NoSuchElementException ex)
+            {
+                // possible race with the handshake timeout firing and 
removing this handler already
+            }
+            finally
+            {
+                channel.close();
+            }
         }
 
         private void setupStreamingPipeline(InetAddressAndPort from, 
ChannelHandlerContext ctx)
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/InternodeErrorExclusionTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/InternodeErrorExclusionTest.java
index e91e167..08fd122 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/InternodeErrorExclusionTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/InternodeErrorExclusionTest.java
@@ -19,7 +19,6 @@
 package org.apache.cassandra.distributed.test;
 
 import java.io.IOException;
-import java.net.InetAddress;
 import java.util.Arrays;
 import java.util.concurrent.TimeoutException;
 
@@ -28,12 +27,9 @@ import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import org.apache.cassandra.auth.IInternodeAuthenticator;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.distributed.api.Feature;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.net.InboundConnectionInitiator;
 import org.apache.cassandra.transport.SimpleClient;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -46,13 +42,14 @@ public class InternodeErrorExclusionTest extends 
TestBaseImpl
         DatabaseDescriptor.clientInitialization();
     }
 
+    // Connect a simple native client to the internode port (which fails on 
the protocol magic check)
+    // and make sure the exception is swallowed.
     @Test
-    public void ignoreAuthErrors() throws IOException, TimeoutException
+    public void ignoreExcludedInternodeErrors() throws IOException, 
TimeoutException
     {
         try (Cluster cluster = Cluster.build(1)
                                       .withConfig(c -> c
                                                        .with(Feature.NETWORK)
-                                                       
.set("internode_authenticator", 
AlwaysFailingIInternodeAuthenticator.class.getName())
                                                        
.set("internode_error_reporting_exclusions", ImmutableMap.of("subnets", 
Arrays.asList("127.0.0.1"))))
                                       .start())
         {
@@ -68,25 +65,4 @@ public class InternodeErrorExclusionTest extends TestBaseImpl
             assertThat(cluster.get(1).logs().watchFor("address contained in 
internode_error_reporting_exclusions").getResult()).hasSize(1);
         }
     }
-
-    public static class AlwaysFailingIInternodeAuthenticator implements 
IInternodeAuthenticator
-    {
-        @Override
-        public boolean authenticate(InetAddress remoteAddress, int remotePort)
-        {
-            String klass = InboundConnectionInitiator.class.getName();
-            for (StackTraceElement e : Thread.currentThread().getStackTrace())
-            {
-                if (e.getClassName().startsWith(klass))
-                    return false;
-            }
-            return true;
-        }
-
-        @Override
-        public void validateConfiguration() throws ConfigurationException
-        {
-
-        }
-    }
 }
diff --git a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java 
b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
index 3b5959b..733584b 100644
--- a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
+++ b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
@@ -20,8 +20,11 @@
  */
 package org.apache.cassandra.net;
 
+import java.io.IOException;
 import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
+import java.nio.channels.AsynchronousSocketChannel;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
@@ -29,6 +32,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.*;
 import java.util.regex.Matcher;
 
@@ -44,6 +50,7 @@ import org.apache.cassandra.metrics.MessagingMetrics;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.utils.FBUtilities;
+import org.awaitility.Awaitility;
 import org.caffinitas.ohc.histo.EstimatedHistogram;
 import org.junit.After;
 import org.junit.Assert;
@@ -57,10 +64,12 @@ import static org.junit.Assert.*;
 public class MessagingServiceTest
 {
     private final static long[] bucketOffsets = new 
EstimatedHistogram(160).getBucketOffsets();
+    public static AtomicInteger rejectedConnections = new AtomicInteger();
     public static final IInternodeAuthenticator ALLOW_NOTHING_AUTHENTICATOR = 
new IInternodeAuthenticator()
     {
         public boolean authenticate(InetAddress remoteAddress, int remotePort)
         {
+            rejectedConnections.incrementAndGet();
             return false;
         }
 
@@ -92,6 +101,7 @@ public class MessagingServiceTest
         messagingService.metrics.resetDroppedMessages();
         
messagingService.closeOutbound(InetAddressAndPort.getByName("127.0.0.2"));
         
messagingService.closeOutbound(InetAddressAndPort.getByName("127.0.0.3"));
+        DatabaseDescriptor.setInternodeAuthenticator(originalAuthenticator);
     }
 
     @After
@@ -216,21 +226,53 @@ public class MessagingServiceTest
      * @throws Exception
      */
     @Test
-    public void testFailedInternodeAuth() throws Exception
+    public void testFailedOutboundInternodeAuth() throws Exception
     {
         MessagingService ms = MessagingService.instance();
         
DatabaseDescriptor.setInternodeAuthenticator(ALLOW_NOTHING_AUTHENTICATOR);
         InetAddressAndPort address = 
InetAddressAndPort.getByName("127.0.0.250");
 
         //Should return null
-        Message messageOut = Message.out(Verb.ECHO_REQ, NoPayload.noPayload);
-        assertFalse(ms.isConnected(address, messageOut));
+        int rejectedBefore = rejectedConnections.get();
+        Message<?> messageOut = Message.out(Verb.ECHO_REQ, 
NoPayload.noPayload);
+        ms.send(messageOut, address);
+        Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> 
rejectedConnections.get() > rejectedBefore);
 
         //Should tolerate null
         ms.closeOutbound(address);
         ms.send(messageOut, address);
     }
 
+    @Test
+    public void testFailedInboundInternodeAuth() throws IOException, 
InterruptedException
+    {
+        ServerEncryptionOptions serverEncryptionOptions = new 
ServerEncryptionOptions()
+            
.withInternodeEncryption(ServerEncryptionOptions.InternodeEncryption.none);
+
+        
DatabaseDescriptor.setInternodeAuthenticator(ALLOW_NOTHING_AUTHENTICATOR);
+        InetAddress listenAddress = FBUtilities.getJustLocalAddress();
+
+        InboundConnectionSettings settings = new 
InboundConnectionSettings().withEncryption(serverEncryptionOptions);
+        InboundSockets connections = new InboundSockets(settings);
+
+        try (AsynchronousSocketChannel testChannel = 
AsynchronousSocketChannel.open())
+        {
+            connections.open().await();
+            Assert.assertTrue(connections.isListening());
+
+            int rejectedBefore = rejectedConnections.get();
+            Future<Void> connectFuture = testChannel.connect(new 
InetSocketAddress(listenAddress, DatabaseDescriptor.getStoragePort()));
+            Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> 
rejectedConnections.get() > rejectedBefore);
+
+            connectFuture.cancel(true);
+        }
+        finally
+        {
+            connections.close().await();
+            Assert.assertFalse(connections.isListening());
+        }
+    }
+
 //    @Test
 //    public void reconnectWithNewIp() throws Exception
 //    {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to