This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8e225c5  v4+ protocol did not clean up client warnings, which caused 
leaking the state
8e225c5 is described below

commit 8e225c55c49493f00fc9bc0b5809ab026d60c767
Author: David Capwell <dcapw...@apache.org>
AuthorDate: Mon Oct 25 07:28:08 2021 -0700

    v4+ protocol did not clean up client warnings, which caused leaking the 
state
    
    patch by David Capwell; reviewed by Caleb Rackliffe, Jon Meredith, Sam 
Tunnicliffe for CASSANDRA-17054
---
 CHANGES.txt                                        |  1 +
 .../org/apache/cassandra/transport/Dispatcher.java | 38 ++++-----
 .../transport/InitialConnectionHandler.java        |  2 +-
 .../org/apache/cassandra/transport/Message.java    | 11 ++-
 .../distributed/test/JavaDriverUtils.java          |  9 +++
 .../distributed/test/NativeMixedVersionTest.java   | 89 ++++++++++++++++++++++
 6 files changed, 130 insertions(+), 20 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 68aeb04..8ef8531 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.1
+ * v4+ protocol did not clean up client warnings, which caused leaking the 
state (CASSANDRA-17054)
  * Remove duplicate toCQLString in ReadCommand (CASSANDRA-17023)
  * Ensure hint window is persistent across restarts of a node (CASSANDRA-14309)
  * Allow to GRANT or REVOKE multiple permissions in a single statement 
(CASSANDRA-17030)
diff --git a/src/java/org/apache/cassandra/transport/Dispatcher.java 
b/src/java/org/apache/cassandra/transport/Dispatcher.java
index 31b750e..3aff2d2 100644
--- a/src/java/org/apache/cassandra/transport/Dispatcher.java
+++ b/src/java/org/apache/cassandra/transport/Dispatcher.java
@@ -82,9 +82,10 @@ public class Dispatcher
     }
 
     /**
-     * Note: this method may be executed on the netty event loop, during 
initial protocol negotiation
+     * Note: this method may be executed on the netty event loop, during 
initial protocol negotiation; the caller is
+     * responsible for cleaning up any global or thread-local state. (ex. 
tracing, client warnings, etc.).
      */
-    static Message.Response processRequest(ServerConnection connection, 
Message.Request request, Overload backpressure)
+    private static Message.Response processRequest(ServerConnection 
connection, Message.Request request, Overload backpressure)
     {
         long queryStartNanoTime = nanoTime();
         if (connection.getVersion().isGreaterOrEqualTo(ProtocolVersion.V4))
@@ -99,7 +100,7 @@ public class Dispatcher
         {
             String message = String.format("Request breached global limit of 
%d requests/second and triggered backpressure.",
                                            
ClientResourceLimits.getNativeTransportMaxRequestsPerSecond());
-            
+
             NoSpamLogger.log(logger, NoSpamLogger.Level.INFO, 1, 
TimeUnit.MINUTES, message);
             ClientWarn.instance.warn(message);
         }
@@ -107,7 +108,7 @@ public class Dispatcher
         {
             String message = String.format("Request breached limit(s) on bytes 
in flight (Endpoint: %d, Global: %d) and triggered backpressure.",
                                            
ClientResourceLimits.getEndpointLimit(), ClientResourceLimits.getGlobalLimit());
-            
+
             NoSpamLogger.log(logger, NoSpamLogger.Level.INFO, 1, 
TimeUnit.MINUTES, message);
             ClientWarn.instance.warn(message);
         }
@@ -129,39 +130,42 @@ public class Dispatcher
     }
 
     /**
-     * Note: this method is not expected to execute on the netty event loop.
+     * Note: this method may be executed on the netty event loop.
      */
-    void processRequest(Channel channel, Message.Request request, 
FlushItemConverter forFlusher, Overload backpressure)
+    static Message.Response processRequest(Channel channel, Message.Request 
request, Overload backpressure)
     {
-        final Message.Response response;
-        final ServerConnection connection;
-        FlushItem<?> toFlush;
         try
         {
-            assert request.connection() instanceof ServerConnection;
-            connection = (ServerConnection) request.connection();
-            response = processRequest(connection, request, backpressure);
-            toFlush = forFlusher.toFlushItem(channel, request, response);
-            Message.logger.trace("Responding: {}, v={}", response, 
connection.getVersion());
+            return processRequest((ServerConnection) request.connection(), 
request, backpressure);
         }
         catch (Throwable t)
         {
             JVMStabilityInspector.inspectThrowable(t);
-            
+
             if (request.isTrackable())
                 CoordinatorWarnings.done();
-            
+
             Predicate<Throwable> handler = 
ExceptionHandlers.getUnexpectedExceptionHandler(channel, true);
             ErrorMessage error = ErrorMessage.fromException(t, handler);
             error.setStreamId(request.getStreamId());
             error.setWarnings(ClientWarn.instance.getWarnings());
-            toFlush = forFlusher.toFlushItem(channel, request, error);
+            return error;
         }
         finally
         {
             CoordinatorWarnings.reset();
             ClientWarn.instance.resetWarnings();
         }
+    }
+
+    /**
+     * Note: this method is not expected to execute on the netty event loop.
+     */
+    void processRequest(Channel channel, Message.Request request, 
FlushItemConverter forFlusher, Overload backpressure)
+    {
+        Message.Response response = processRequest(channel, request, 
backpressure);
+        FlushItem<?> toFlush = forFlusher.toFlushItem(channel, request, 
response);
+        Message.logger.trace("Responding: {}, v={}", response, 
request.connection().getVersion());
         flush(toFlush);
     }
 
diff --git 
a/src/java/org/apache/cassandra/transport/InitialConnectionHandler.java 
b/src/java/org/apache/cassandra/transport/InitialConnectionHandler.java
index e122b6e..75cb72e 100644
--- a/src/java/org/apache/cassandra/transport/InitialConnectionHandler.java
+++ b/src/java/org/apache/cassandra/transport/InitialConnectionHandler.java
@@ -148,7 +148,7 @@ public class InitialConnectionHandler extends 
ByteToMessageDecoder
                         promise = new VoidChannelPromise(ctx.channel(), false);
                     }
 
-                    final Message.Response response = 
Dispatcher.processRequest((ServerConnection) connection, startup, 
Overload.NONE);
+                    final Message.Response response = 
Dispatcher.processRequest(ctx.channel(), startup, Overload.NONE);
                     outbound = response.encode(inbound.header.version);
                     ctx.writeAndFlush(outbound, promise);
                     logger.trace("Configured pipeline: {}", ctx.pipeline());
diff --git a/src/java/org/apache/cassandra/transport/Message.java 
b/src/java/org/apache/cassandra/transport/Message.java
index 0f8002f..c40aa7a 100644
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@ -330,9 +330,16 @@ public abstract class Message
                 List<String> warnings = message.getWarnings();
                 if (warnings != null)
                 {
+                    // if cassandra populates warnings for <= v3 protocol, 
this is a bug
                     if (version.isSmallerThan(ProtocolVersion.V4))
-                        throw new ProtocolException("Must not send frame with 
WARNING flag for native protocol version < 4");
-                    messageSize += CBUtil.sizeOfStringList(warnings);
+                    {
+                        logger.warn("Warnings present in message with version 
less than v4 (it is {}); warnings={}", version, warnings);
+                        warnings = null;
+                    }
+                    else
+                    {
+                        messageSize += CBUtil.sizeOfStringList(warnings);
+                    }
                 }
                 if (customPayload != null)
                 {
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/JavaDriverUtils.java 
b/test/distributed/org/apache/cassandra/distributed/test/JavaDriverUtils.java
index bc39ba1..c7c478b 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/JavaDriverUtils.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/JavaDriverUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.distributed.test;
 
+import com.datastax.driver.core.ProtocolVersion;
 import org.apache.cassandra.distributed.api.Feature;
 import org.apache.cassandra.distributed.api.ICluster;
 import org.apache.cassandra.distributed.api.IInstance;
@@ -30,6 +31,11 @@ public final class JavaDriverUtils
 
     public static com.datastax.driver.core.Cluster create(ICluster<? extends 
IInstance> dtest)
     {
+        return create(dtest, null);
+    }
+
+    public static com.datastax.driver.core.Cluster create(ICluster<? extends 
IInstance> dtest, ProtocolVersion version)
+    {
         if (dtest.size() == 0)
             throw new IllegalArgumentException("Attempted to open java driver 
for empty cluster");
 
@@ -45,6 +51,9 @@ public final class JavaDriverUtils
         //TODO support auth
         dtest.stream().forEach(i -> 
builder.addContactPoint(i.broadcastAddress().getAddress().getHostAddress()));
 
+        if (version != null)
+            builder.withProtocolVersion(version);
+
         return builder.build();
     }
 }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/NativeMixedVersionTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/NativeMixedVersionTest.java
new file mode 100644
index 0000000..b2391e3
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/NativeMixedVersionTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+import com.google.common.collect.ImmutableMap;
+import org.junit.Test;
+
+import com.datastax.driver.core.ProtocolVersion;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.assertj.core.api.Assertions;
+
+public class NativeMixedVersionTest extends TestBaseImpl
+{
+    @Test
+    public void v4ConnectionCleansUpThreadLocalState() throws IOException
+    {
+        // make sure to limit the netty thread pool to size 1, this will make 
the test determanistic as all work
+        // will happen on the single thread.
+        System.setProperty("io.netty.eventLoopThreads", "1");
+        try (Cluster cluster = Cluster.build(1)
+                                      .withConfig(c ->
+                                                  c.with(Feature.values())
+                                                   .set("track_warnings", 
ImmutableMap.of(
+                                                       "enabled", true,
+                                                       "local_read_size", 
ImmutableMap.of("warn_threshold_kb", 1)
+                                                   ))
+                                      )
+                                      .start())
+        {
+            init(cluster);
+            cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int, 
ck1 int, value blob, PRIMARY KEY (pk, ck1))"));
+            IInvokableInstance node = cluster.get(1);
+
+            ByteBuffer blob = ByteBuffer.wrap("This is just some large string 
to get some number of bytes".getBytes(StandardCharsets.UTF_8));
+
+            for (int i = 0; i < 100; i++)
+                node.executeInternal(withKeyspace("INSERT INTO %s.tbl (pk, 
ck1, value) VALUES (?, ?, ?)"), 0, i, blob);
+
+            // v4+ process STARTUP message on the netty thread.  To make sure 
we do not leak the ClientWarn state,
+            // make sure a warning will be generated by a query then run on 
the same threads on the v3 protocol (which
+            // does not support warnings)
+            try (com.datastax.driver.core.Cluster driver = 
JavaDriverUtils.create(cluster, ProtocolVersion.V5);
+                 Session session = driver.connect())
+            {
+                ResultSet rs = session.execute(withKeyspace("SELECT * FROM 
%s.tbl"));
+                
Assertions.assertThat(rs.getExecutionInfo().getWarnings()).isNotEmpty();
+            }
+
+            try (com.datastax.driver.core.Cluster driver = 
JavaDriverUtils.create(cluster, ProtocolVersion.V3);
+                 Session session = driver.connect())
+            {
+                ResultSet rs = session.execute(withKeyspace("SELECT * FROM 
%s.tbl"));
+                
Assertions.assertThat(rs.getExecutionInfo().getWarnings()).isEmpty();
+            }
+
+            // this should not happen; so make sure no logs are found
+            List<String> result = node.logs().grep("Warnings present in 
message with version less than").getResult();
+            Assertions.assertThat(result).isEmpty();
+        }
+        finally
+        {
+            System.clearProperty("io.netty.eventLoopThreads");
+        }
+    }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to