This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7e936e7  Initial client handler correctly sets stream id on responses
7e936e7 is described below

commit 7e936e7f2c6ccc73d8e3acd31d7050889ec1efbe
Author: Sam Tunnicliffe <s...@beobal.com>
AuthorDate: Wed Jan 13 10:50:17 2021 +0000

    Initial client handler correctly sets stream id on responses
    
    Patch by Sam Tunnicliffe; reviewed by Mick Semb Wever for CASSANDRA-16376
---
 CHANGES.txt                                        |   1 +
 .../transport/InitialConnectionHandler.java        |   1 +
 .../apache/cassandra/transport/SimpleClient.java   |   4 +-
 .../apache/cassandra/transport/BurnTestUtil.java   |   2 +-
 .../transport/ProtocolNegotiationTest.java         | 101 ++++++++++++++++++---
 5 files changed, 96 insertions(+), 13 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index fbb1bd1..47b47d8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-beta5
+ * Ensure pre-negotiation native protocol responses have correct stream id 
(CASSANDRA-16376)
  * Fix check for -Xlog in cassandra-env.sh (CASSANDRA-16279)
  * SSLFactory should initialize SSLContext before setting protocols 
(CASSANDRA-16362)
  * Restore sasi dependencies jflex, snowball-stemmer, and concurrent-trees, in 
the cassandra-all pom (CASSANDRA-16303)
diff --git 
a/src/java/org/apache/cassandra/transport/InitialConnectionHandler.java 
b/src/java/org/apache/cassandra/transport/InitialConnectionHandler.java
index 70237c5..77e9232 100644
--- a/src/java/org/apache/cassandra/transport/InitialConnectionHandler.java
+++ b/src/java/org/apache/cassandra/transport/InitialConnectionHandler.java
@@ -89,6 +89,7 @@ public class InitialConnectionHandler extends 
ByteToMessageDecoder
                     supportedOptions.put(StartupMessage.COMPRESSION, 
compressions);
                     supportedOptions.put(StartupMessage.PROTOCOL_VERSIONS, 
ProtocolVersion.supportedVersions());
                     SupportedMessage supported = new 
SupportedMessage(supportedOptions);
+                    supported.setStreamId(inbound.header.streamId);
                     outbound = supported.encode(inbound.header.version);
                     ctx.writeAndFlush(outbound);
                     break;
diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java 
b/src/java/org/apache/cassandra/transport/SimpleClient.java
index 43807a8..5ad4c17 100644
--- a/src/java/org/apache/cassandra/transport/SimpleClient.java
+++ b/src/java/org/apache/cassandra/transport/SimpleClient.java
@@ -25,6 +25,7 @@ import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.primitives.Ints;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -200,7 +201,8 @@ public class SimpleClient implements Closeable
         responseHandler.eventHandler = eventHandler;
     }
 
-    protected void establishConnection() throws IOException
+    @VisibleForTesting
+    void establishConnection() throws IOException
     {
         // Configure the client.
         bootstrap = new Bootstrap()
diff --git a/test/burn/org/apache/cassandra/transport/BurnTestUtil.java 
b/test/burn/org/apache/cassandra/transport/BurnTestUtil.java
index e7798e1..e7bf6b8 100644
--- a/test/burn/org/apache/cassandra/transport/BurnTestUtil.java
+++ b/test/burn/org/apache/cassandra/transport/BurnTestUtil.java
@@ -62,7 +62,7 @@ public class BurnTestUtil
         for (int i = 0; i < sizeCaps.columnCountCap; i++)
             values[i] = bytes(rnd, sizeCaps.valueMinSize, 
sizeCaps.valueMaxSize);
 
-        return new SimpleStatement(Integer.toString(idx), values);
+        return new SimpleStatement(Integer.toString(idx), (Object[]) values);
     }
 
     public static QueryMessage generateQueryMessage(int idx, SizeCaps sizeCaps)
diff --git 
a/test/unit/org/apache/cassandra/transport/ProtocolNegotiationTest.java 
b/test/unit/org/apache/cassandra/transport/ProtocolNegotiationTest.java
index 54a68bc..f33d8e6 100644
--- a/test/unit/org/apache/cassandra/transport/ProtocolNegotiationTest.java
+++ b/test/unit/org/apache/cassandra/transport/ProtocolNegotiationTest.java
@@ -18,17 +18,36 @@
 
 package org.apache.cassandra.transport;
 
+import java.io.IOException;
+import java.util.Random;
+
+import com.google.common.collect.ImmutableMap;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.ProtocolVersion;
 import com.datastax.driver.core.Session;
 import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.transport.messages.OptionsMessage;
+import org.apache.cassandra.transport.messages.QueryMessage;
+import org.apache.cassandra.transport.messages.StartupMessage;
 
+import static com.datastax.driver.core.ProtocolVersion.NEWEST_BETA;
+import static com.datastax.driver.core.ProtocolVersion.NEWEST_SUPPORTED;
+import static com.datastax.driver.core.ProtocolVersion.V1;
+import static com.datastax.driver.core.ProtocolVersion.V2;
+import static com.datastax.driver.core.ProtocolVersion.V3;
+import static com.datastax.driver.core.ProtocolVersion.V4;
+import static com.datastax.driver.core.ProtocolVersion.V5;
+import static 
org.apache.cassandra.transport.messages.StartupMessage.CQL_VERSION;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+
 public class ProtocolNegotiationTest extends CQLTester
 {
     // to avoid JMX naming clashes between cluster metrics
@@ -40,33 +59,93 @@ public class ProtocolNegotiationTest extends CQLTester
         requireNetwork();
     }
 
+    @Before
+    public void initNetwork()
+    {
+        reinitializeNetwork();
+    }
+
     @Test
     public void serverSupportsV3AndV4ByDefault()
     {
-        reinitializeNetwork();
         // client can explicitly request either V3 or V4
-        testConnection(ProtocolVersion.V3, ProtocolVersion.V3);
-        testConnection(ProtocolVersion.V4, ProtocolVersion.V4);
+        testConnection(V3, V3);
+        testConnection(V4, V4);
 
         // if not specified, V4 is the default
-        testConnection(null, ProtocolVersion.V4);
-        testConnection(ProtocolVersion.NEWEST_SUPPORTED, ProtocolVersion.V4);
+        testConnection(null, V4);
+        testConnection(NEWEST_SUPPORTED, V4);
     }
 
     @Test
     public void supportV5ConnectionWithBetaOption()
     {
-        reinitializeNetwork();
-        testConnection(ProtocolVersion.V5, ProtocolVersion.V5);
-        testConnection(ProtocolVersion.NEWEST_BETA, ProtocolVersion.V5);
+        testConnection(V5, V5);
+        testConnection(NEWEST_BETA, V5);
     }
 
     @Test
     public void olderVersionsAreUnsupported()
     {
+        testConnection(V1, V4);
+        testConnection(V2, V4);
+    }
+
+    @Test
+    public void preNegotiationResponsesHaveCorrectStreamId()
+    {
+        
ProtocolVersion.SUPPORTED.forEach(this::testStreamIdsAcrossNegotiation);
+    }
+
+    private void testStreamIdsAcrossNegotiation(ProtocolVersion version)
+    {
+        long seed = System.currentTimeMillis();
+        Random random = new Random(seed);
         reinitializeNetwork();
-        testConnection(ProtocolVersion.V1, ProtocolVersion.V4);
-        testConnection(ProtocolVersion.V2, ProtocolVersion.V4);
+        SimpleClient.Builder builder = 
SimpleClient.builder(nativeAddr.getHostAddress(), nativePort);
+        if (version.isBeta())
+            builder.useBeta();
+        else
+            builder.protocolVersion(version);
+
+        try (SimpleClient client = builder.build())
+        {
+            client.establishConnection();
+            // Before STARTUP the client hasn't yet negotiated a protocol 
version.
+            // All OPTIONS messages are received by the intial connection 
handler.
+            OptionsMessage options = new OptionsMessage();
+            for (int i = 0; i < 100; i++)
+            {
+                int streamId = random.nextInt(254) + 1;
+                options.setStreamId(streamId);
+                Message.Response response = client.execute(options);
+                assertEquals(String.format("StreamId mismatch; version: %s, 
seed: %s, iter: %s, expected: %s, actual: %s",
+                                           version, seed, i, streamId, 
response.getStreamId()),
+                             streamId, response.getStreamId());
+            }
+
+            int streamId = random.nextInt(254) + 1;
+            // STARTUP messages are handled by the initial connection handler
+            StartupMessage startup = new 
StartupMessage(ImmutableMap.of(CQL_VERSION, 
QueryProcessor.CQL_VERSION.toString()));
+            startup.setStreamId(streamId);
+            Message.Response response = client.execute(startup);
+            assertEquals(String.format("StreamId mismatch after negotiation; 
version: %s, expected: %s, actual %s",
+                                       version, streamId, 
response.getStreamId()),
+                         streamId, response.getStreamId());
+
+            // Following STARTUP, the version specific handlers are fully 
responsible for processing messages
+            QueryMessage query = new QueryMessage("SELECT * FROM 
system.local", QueryOptions.DEFAULT);
+            query.setStreamId(streamId);
+            response = client.execute(query);
+            assertEquals(String.format("StreamId mismatch after negotiation; 
version: %s, expected: %s, actual %s",
+                                       version, streamId, 
response.getStreamId()),
+                         streamId, response.getStreamId());
+        }
+        catch (IOException e)
+        {
+            e.printStackTrace();
+            fail("Error establishing connection");
+        }
     }
 
     private void testConnection(com.datastax.driver.core.ProtocolVersion 
requestedVersion,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to