This is an automated email from the ASF dual-hosted git repository. samt pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new 7e936e7 Initial client handler correctly sets stream id on responses 7e936e7 is described below commit 7e936e7f2c6ccc73d8e3acd31d7050889ec1efbe Author: Sam Tunnicliffe <s...@beobal.com> AuthorDate: Wed Jan 13 10:50:17 2021 +0000 Initial client handler correctly sets stream id on responses Patch by Sam Tunnicliffe; reviewed by Mick Semb Wever for CASSANDRA-16376 --- CHANGES.txt | 1 + .../transport/InitialConnectionHandler.java | 1 + .../apache/cassandra/transport/SimpleClient.java | 4 +- .../apache/cassandra/transport/BurnTestUtil.java | 2 +- .../transport/ProtocolNegotiationTest.java | 101 ++++++++++++++++++--- 5 files changed, 96 insertions(+), 13 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index fbb1bd1..47b47d8 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0-beta5 + * Ensure pre-negotiation native protocol responses have correct stream id (CASSANDRA-16376) * Fix check for -Xlog in cassandra-env.sh (CASSANDRA-16279) * SSLFactory should initialize SSLContext before setting protocols (CASSANDRA-16362) * Restore sasi dependencies jflex, snowball-stemmer, and concurrent-trees, in the cassandra-all pom (CASSANDRA-16303) diff --git a/src/java/org/apache/cassandra/transport/InitialConnectionHandler.java b/src/java/org/apache/cassandra/transport/InitialConnectionHandler.java index 70237c5..77e9232 100644 --- a/src/java/org/apache/cassandra/transport/InitialConnectionHandler.java +++ b/src/java/org/apache/cassandra/transport/InitialConnectionHandler.java @@ -89,6 +89,7 @@ public class InitialConnectionHandler extends ByteToMessageDecoder supportedOptions.put(StartupMessage.COMPRESSION, compressions); supportedOptions.put(StartupMessage.PROTOCOL_VERSIONS, ProtocolVersion.supportedVersions()); SupportedMessage supported = new SupportedMessage(supportedOptions); + supported.setStreamId(inbound.header.streamId); outbound = supported.encode(inbound.header.version); ctx.writeAndFlush(outbound); break; diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java b/src/java/org/apache/cassandra/transport/SimpleClient.java index 43807a8..5ad4c17 100644 --- a/src/java/org/apache/cassandra/transport/SimpleClient.java +++ b/src/java/org/apache/cassandra/transport/SimpleClient.java @@ -25,6 +25,7 @@ import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.annotations.VisibleForTesting; import com.google.common.primitives.Ints; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -200,7 +201,8 @@ public class SimpleClient implements Closeable responseHandler.eventHandler = eventHandler; } - protected void establishConnection() throws IOException + @VisibleForTesting + void establishConnection() throws IOException { // Configure the client. bootstrap = new Bootstrap() diff --git a/test/burn/org/apache/cassandra/transport/BurnTestUtil.java b/test/burn/org/apache/cassandra/transport/BurnTestUtil.java index e7798e1..e7bf6b8 100644 --- a/test/burn/org/apache/cassandra/transport/BurnTestUtil.java +++ b/test/burn/org/apache/cassandra/transport/BurnTestUtil.java @@ -62,7 +62,7 @@ public class BurnTestUtil for (int i = 0; i < sizeCaps.columnCountCap; i++) values[i] = bytes(rnd, sizeCaps.valueMinSize, sizeCaps.valueMaxSize); - return new SimpleStatement(Integer.toString(idx), values); + return new SimpleStatement(Integer.toString(idx), (Object[]) values); } public static QueryMessage generateQueryMessage(int idx, SizeCaps sizeCaps) diff --git a/test/unit/org/apache/cassandra/transport/ProtocolNegotiationTest.java b/test/unit/org/apache/cassandra/transport/ProtocolNegotiationTest.java index 54a68bc..f33d8e6 100644 --- a/test/unit/org/apache/cassandra/transport/ProtocolNegotiationTest.java +++ b/test/unit/org/apache/cassandra/transport/ProtocolNegotiationTest.java @@ -18,17 +18,36 @@ package org.apache.cassandra.transport; +import java.io.IOException; +import java.util.Random; + +import com.google.common.collect.ImmutableMap; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import com.datastax.driver.core.Cluster; -import com.datastax.driver.core.ProtocolVersion; import com.datastax.driver.core.Session; import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.cql3.QueryOptions; +import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.transport.messages.OptionsMessage; +import org.apache.cassandra.transport.messages.QueryMessage; +import org.apache.cassandra.transport.messages.StartupMessage; +import static com.datastax.driver.core.ProtocolVersion.NEWEST_BETA; +import static com.datastax.driver.core.ProtocolVersion.NEWEST_SUPPORTED; +import static com.datastax.driver.core.ProtocolVersion.V1; +import static com.datastax.driver.core.ProtocolVersion.V2; +import static com.datastax.driver.core.ProtocolVersion.V3; +import static com.datastax.driver.core.ProtocolVersion.V4; +import static com.datastax.driver.core.ProtocolVersion.V5; +import static org.apache.cassandra.transport.messages.StartupMessage.CQL_VERSION; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; + public class ProtocolNegotiationTest extends CQLTester { // to avoid JMX naming clashes between cluster metrics @@ -40,33 +59,93 @@ public class ProtocolNegotiationTest extends CQLTester requireNetwork(); } + @Before + public void initNetwork() + { + reinitializeNetwork(); + } + @Test public void serverSupportsV3AndV4ByDefault() { - reinitializeNetwork(); // client can explicitly request either V3 or V4 - testConnection(ProtocolVersion.V3, ProtocolVersion.V3); - testConnection(ProtocolVersion.V4, ProtocolVersion.V4); + testConnection(V3, V3); + testConnection(V4, V4); // if not specified, V4 is the default - testConnection(null, ProtocolVersion.V4); - testConnection(ProtocolVersion.NEWEST_SUPPORTED, ProtocolVersion.V4); + testConnection(null, V4); + testConnection(NEWEST_SUPPORTED, V4); } @Test public void supportV5ConnectionWithBetaOption() { - reinitializeNetwork(); - testConnection(ProtocolVersion.V5, ProtocolVersion.V5); - testConnection(ProtocolVersion.NEWEST_BETA, ProtocolVersion.V5); + testConnection(V5, V5); + testConnection(NEWEST_BETA, V5); } @Test public void olderVersionsAreUnsupported() { + testConnection(V1, V4); + testConnection(V2, V4); + } + + @Test + public void preNegotiationResponsesHaveCorrectStreamId() + { + ProtocolVersion.SUPPORTED.forEach(this::testStreamIdsAcrossNegotiation); + } + + private void testStreamIdsAcrossNegotiation(ProtocolVersion version) + { + long seed = System.currentTimeMillis(); + Random random = new Random(seed); reinitializeNetwork(); - testConnection(ProtocolVersion.V1, ProtocolVersion.V4); - testConnection(ProtocolVersion.V2, ProtocolVersion.V4); + SimpleClient.Builder builder = SimpleClient.builder(nativeAddr.getHostAddress(), nativePort); + if (version.isBeta()) + builder.useBeta(); + else + builder.protocolVersion(version); + + try (SimpleClient client = builder.build()) + { + client.establishConnection(); + // Before STARTUP the client hasn't yet negotiated a protocol version. + // All OPTIONS messages are received by the intial connection handler. + OptionsMessage options = new OptionsMessage(); + for (int i = 0; i < 100; i++) + { + int streamId = random.nextInt(254) + 1; + options.setStreamId(streamId); + Message.Response response = client.execute(options); + assertEquals(String.format("StreamId mismatch; version: %s, seed: %s, iter: %s, expected: %s, actual: %s", + version, seed, i, streamId, response.getStreamId()), + streamId, response.getStreamId()); + } + + int streamId = random.nextInt(254) + 1; + // STARTUP messages are handled by the initial connection handler + StartupMessage startup = new StartupMessage(ImmutableMap.of(CQL_VERSION, QueryProcessor.CQL_VERSION.toString())); + startup.setStreamId(streamId); + Message.Response response = client.execute(startup); + assertEquals(String.format("StreamId mismatch after negotiation; version: %s, expected: %s, actual %s", + version, streamId, response.getStreamId()), + streamId, response.getStreamId()); + + // Following STARTUP, the version specific handlers are fully responsible for processing messages + QueryMessage query = new QueryMessage("SELECT * FROM system.local", QueryOptions.DEFAULT); + query.setStreamId(streamId); + response = client.execute(query); + assertEquals(String.format("StreamId mismatch after negotiation; version: %s, expected: %s, actual %s", + version, streamId, response.getStreamId()), + streamId, response.getStreamId()); + } + catch (IOException e) + { + e.printStackTrace(); + fail("Error establishing connection"); + } } private void testConnection(com.datastax.driver.core.ProtocolVersion requestedVersion, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org