This is an automated email from the ASF dual-hosted git repository. dcapwell pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new 8e225c5 v4+ protocol did not clean up client warnings, which caused leaking the state 8e225c5 is described below commit 8e225c55c49493f00fc9bc0b5809ab026d60c767 Author: David Capwell <dcapw...@apache.org> AuthorDate: Mon Oct 25 07:28:08 2021 -0700 v4+ protocol did not clean up client warnings, which caused leaking the state patch by David Capwell; reviewed by Caleb Rackliffe, Jon Meredith, Sam Tunnicliffe for CASSANDRA-17054 --- CHANGES.txt | 1 + .../org/apache/cassandra/transport/Dispatcher.java | 38 ++++----- .../transport/InitialConnectionHandler.java | 2 +- .../org/apache/cassandra/transport/Message.java | 11 ++- .../distributed/test/JavaDriverUtils.java | 9 +++ .../distributed/test/NativeMixedVersionTest.java | 89 ++++++++++++++++++++++ 6 files changed, 130 insertions(+), 20 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 68aeb04..8ef8531 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.1 + * v4+ protocol did not clean up client warnings, which caused leaking the state (CASSANDRA-17054) * Remove duplicate toCQLString in ReadCommand (CASSANDRA-17023) * Ensure hint window is persistent across restarts of a node (CASSANDRA-14309) * Allow to GRANT or REVOKE multiple permissions in a single statement (CASSANDRA-17030) diff --git a/src/java/org/apache/cassandra/transport/Dispatcher.java b/src/java/org/apache/cassandra/transport/Dispatcher.java index 31b750e..3aff2d2 100644 --- a/src/java/org/apache/cassandra/transport/Dispatcher.java +++ b/src/java/org/apache/cassandra/transport/Dispatcher.java @@ -82,9 +82,10 @@ public class Dispatcher } /** - * Note: this method may be executed on the netty event loop, during initial protocol negotiation + * Note: this method may be executed on the netty event loop, during initial protocol negotiation; the caller is + * responsible for cleaning up any global or thread-local state. (ex. tracing, client warnings, etc.). */ - static Message.Response processRequest(ServerConnection connection, Message.Request request, Overload backpressure) + private static Message.Response processRequest(ServerConnection connection, Message.Request request, Overload backpressure) { long queryStartNanoTime = nanoTime(); if (connection.getVersion().isGreaterOrEqualTo(ProtocolVersion.V4)) @@ -99,7 +100,7 @@ public class Dispatcher { String message = String.format("Request breached global limit of %d requests/second and triggered backpressure.", ClientResourceLimits.getNativeTransportMaxRequestsPerSecond()); - + NoSpamLogger.log(logger, NoSpamLogger.Level.INFO, 1, TimeUnit.MINUTES, message); ClientWarn.instance.warn(message); } @@ -107,7 +108,7 @@ public class Dispatcher { String message = String.format("Request breached limit(s) on bytes in flight (Endpoint: %d, Global: %d) and triggered backpressure.", ClientResourceLimits.getEndpointLimit(), ClientResourceLimits.getGlobalLimit()); - + NoSpamLogger.log(logger, NoSpamLogger.Level.INFO, 1, TimeUnit.MINUTES, message); ClientWarn.instance.warn(message); } @@ -129,39 +130,42 @@ public class Dispatcher } /** - * Note: this method is not expected to execute on the netty event loop. + * Note: this method may be executed on the netty event loop. */ - void processRequest(Channel channel, Message.Request request, FlushItemConverter forFlusher, Overload backpressure) + static Message.Response processRequest(Channel channel, Message.Request request, Overload backpressure) { - final Message.Response response; - final ServerConnection connection; - FlushItem<?> toFlush; try { - assert request.connection() instanceof ServerConnection; - connection = (ServerConnection) request.connection(); - response = processRequest(connection, request, backpressure); - toFlush = forFlusher.toFlushItem(channel, request, response); - Message.logger.trace("Responding: {}, v={}", response, connection.getVersion()); + return processRequest((ServerConnection) request.connection(), request, backpressure); } catch (Throwable t) { JVMStabilityInspector.inspectThrowable(t); - + if (request.isTrackable()) CoordinatorWarnings.done(); - + Predicate<Throwable> handler = ExceptionHandlers.getUnexpectedExceptionHandler(channel, true); ErrorMessage error = ErrorMessage.fromException(t, handler); error.setStreamId(request.getStreamId()); error.setWarnings(ClientWarn.instance.getWarnings()); - toFlush = forFlusher.toFlushItem(channel, request, error); + return error; } finally { CoordinatorWarnings.reset(); ClientWarn.instance.resetWarnings(); } + } + + /** + * Note: this method is not expected to execute on the netty event loop. + */ + void processRequest(Channel channel, Message.Request request, FlushItemConverter forFlusher, Overload backpressure) + { + Message.Response response = processRequest(channel, request, backpressure); + FlushItem<?> toFlush = forFlusher.toFlushItem(channel, request, response); + Message.logger.trace("Responding: {}, v={}", response, request.connection().getVersion()); flush(toFlush); } diff --git a/src/java/org/apache/cassandra/transport/InitialConnectionHandler.java b/src/java/org/apache/cassandra/transport/InitialConnectionHandler.java index e122b6e..75cb72e 100644 --- a/src/java/org/apache/cassandra/transport/InitialConnectionHandler.java +++ b/src/java/org/apache/cassandra/transport/InitialConnectionHandler.java @@ -148,7 +148,7 @@ public class InitialConnectionHandler extends ByteToMessageDecoder promise = new VoidChannelPromise(ctx.channel(), false); } - final Message.Response response = Dispatcher.processRequest((ServerConnection) connection, startup, Overload.NONE); + final Message.Response response = Dispatcher.processRequest(ctx.channel(), startup, Overload.NONE); outbound = response.encode(inbound.header.version); ctx.writeAndFlush(outbound, promise); logger.trace("Configured pipeline: {}", ctx.pipeline()); diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java index 0f8002f..c40aa7a 100644 --- a/src/java/org/apache/cassandra/transport/Message.java +++ b/src/java/org/apache/cassandra/transport/Message.java @@ -330,9 +330,16 @@ public abstract class Message List<String> warnings = message.getWarnings(); if (warnings != null) { + // if cassandra populates warnings for <= v3 protocol, this is a bug if (version.isSmallerThan(ProtocolVersion.V4)) - throw new ProtocolException("Must not send frame with WARNING flag for native protocol version < 4"); - messageSize += CBUtil.sizeOfStringList(warnings); + { + logger.warn("Warnings present in message with version less than v4 (it is {}); warnings={}", version, warnings); + warnings = null; + } + else + { + messageSize += CBUtil.sizeOfStringList(warnings); + } } if (customPayload != null) { diff --git a/test/distributed/org/apache/cassandra/distributed/test/JavaDriverUtils.java b/test/distributed/org/apache/cassandra/distributed/test/JavaDriverUtils.java index bc39ba1..c7c478b 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/JavaDriverUtils.java +++ b/test/distributed/org/apache/cassandra/distributed/test/JavaDriverUtils.java @@ -18,6 +18,7 @@ package org.apache.cassandra.distributed.test; +import com.datastax.driver.core.ProtocolVersion; import org.apache.cassandra.distributed.api.Feature; import org.apache.cassandra.distributed.api.ICluster; import org.apache.cassandra.distributed.api.IInstance; @@ -30,6 +31,11 @@ public final class JavaDriverUtils public static com.datastax.driver.core.Cluster create(ICluster<? extends IInstance> dtest) { + return create(dtest, null); + } + + public static com.datastax.driver.core.Cluster create(ICluster<? extends IInstance> dtest, ProtocolVersion version) + { if (dtest.size() == 0) throw new IllegalArgumentException("Attempted to open java driver for empty cluster"); @@ -45,6 +51,9 @@ public final class JavaDriverUtils //TODO support auth dtest.stream().forEach(i -> builder.addContactPoint(i.broadcastAddress().getAddress().getHostAddress())); + if (version != null) + builder.withProtocolVersion(version); + return builder.build(); } } diff --git a/test/distributed/org/apache/cassandra/distributed/test/NativeMixedVersionTest.java b/test/distributed/org/apache/cassandra/distributed/test/NativeMixedVersionTest.java new file mode 100644 index 0000000..b2391e3 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/NativeMixedVersionTest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.distributed.test; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.List; + +import com.google.common.collect.ImmutableMap; +import org.junit.Test; + +import com.datastax.driver.core.ProtocolVersion; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Session; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.assertj.core.api.Assertions; + +public class NativeMixedVersionTest extends TestBaseImpl +{ + @Test + public void v4ConnectionCleansUpThreadLocalState() throws IOException + { + // make sure to limit the netty thread pool to size 1, this will make the test determanistic as all work + // will happen on the single thread. + System.setProperty("io.netty.eventLoopThreads", "1"); + try (Cluster cluster = Cluster.build(1) + .withConfig(c -> + c.with(Feature.values()) + .set("track_warnings", ImmutableMap.of( + "enabled", true, + "local_read_size", ImmutableMap.of("warn_threshold_kb", 1) + )) + ) + .start()) + { + init(cluster); + cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int, ck1 int, value blob, PRIMARY KEY (pk, ck1))")); + IInvokableInstance node = cluster.get(1); + + ByteBuffer blob = ByteBuffer.wrap("This is just some large string to get some number of bytes".getBytes(StandardCharsets.UTF_8)); + + for (int i = 0; i < 100; i++) + node.executeInternal(withKeyspace("INSERT INTO %s.tbl (pk, ck1, value) VALUES (?, ?, ?)"), 0, i, blob); + + // v4+ process STARTUP message on the netty thread. To make sure we do not leak the ClientWarn state, + // make sure a warning will be generated by a query then run on the same threads on the v3 protocol (which + // does not support warnings) + try (com.datastax.driver.core.Cluster driver = JavaDriverUtils.create(cluster, ProtocolVersion.V5); + Session session = driver.connect()) + { + ResultSet rs = session.execute(withKeyspace("SELECT * FROM %s.tbl")); + Assertions.assertThat(rs.getExecutionInfo().getWarnings()).isNotEmpty(); + } + + try (com.datastax.driver.core.Cluster driver = JavaDriverUtils.create(cluster, ProtocolVersion.V3); + Session session = driver.connect()) + { + ResultSet rs = session.execute(withKeyspace("SELECT * FROM %s.tbl")); + Assertions.assertThat(rs.getExecutionInfo().getWarnings()).isEmpty(); + } + + // this should not happen; so make sure no logs are found + List<String> result = node.logs().grep("Warnings present in message with version less than").getResult(); + Assertions.assertThat(result).isEmpty(); + } + finally + { + System.clearProperty("io.netty.eventLoopThreads"); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org