This is an automated email from the ASF dual-hosted git repository. samt pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new c9d2258 Fix client notifications in v5 c9d2258 is described below commit c9d22583d22d566807e76fa10c65af29104ae16c Author: Sam Tunnicliffe <s...@beobal.com> AuthorDate: Tue Dec 15 17:37:18 2020 +0000 Fix client notifications in v5 Patch by Sam Tunnicliffe; reviewed by Benjamin Lerer for CASSANDRA-16353 --- CHANGES.txt | 1 + .../org/apache/cassandra/transport/Dispatcher.java | 33 +++++- .../cassandra/transport/PipelineConfigurator.java | 5 + .../org/apache/cassandra/transport/Server.java | 37 +++++-- .../transport/ClientNotificiationsTest.java | 117 +++++++++++++++++++++ 5 files changed, 185 insertions(+), 8 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 426de4c..09b02d5 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0-beta5 + * Fix client notifications in CQL protocol v5 (CASSANDRA-16353) * Too defensive check when picking sstables for preview repair (CASSANDRA-16284) * Ensure pre-negotiation native protocol responses have correct stream id (CASSANDRA-16376) * Fix check for -Xlog in cassandra-env.sh (CASSANDRA-16279) diff --git a/src/java/org/apache/cassandra/transport/Dispatcher.java b/src/java/org/apache/cassandra/transport/Dispatcher.java index 65093f3..05b55e8 100644 --- a/src/java/org/apache/cassandra/transport/Dispatcher.java +++ b/src/java/org/apache/cassandra/transport/Dispatcher.java @@ -20,15 +20,19 @@ package org.apache.cassandra.transport; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.function.Consumer; import io.netty.channel.Channel; import io.netty.channel.EventLoop; +import io.netty.util.AttributeKey; import org.apache.cassandra.concurrent.LocalAwareExecutorService; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.net.FrameEncoder; import org.apache.cassandra.service.ClientWarn; import org.apache.cassandra.service.QueryState; import org.apache.cassandra.transport.Flusher.FlushItem; import org.apache.cassandra.transport.messages.ErrorMessage; +import org.apache.cassandra.transport.messages.EventMessage; import org.apache.cassandra.utils.JVMStabilityInspector; import static org.apache.cassandra.concurrent.SharedExecutorPool.SHARED; @@ -61,7 +65,8 @@ public class Dispatcher this.useLegacyFlusher = useLegacyFlusher; } - public void dispatch(Channel channel, Message.Request request, FlushItemConverter forFlusher) { + public void dispatch(Channel channel, Message.Request request, FlushItemConverter forFlusher) + { requestExecutor.submit(() -> processRequest(channel, request, forFlusher)); } @@ -140,4 +145,30 @@ public class Dispatcher requestExecutor.shutdown(); } } + + + /** + * Dispatcher for EventMessages. In {@link Server.ConnectionTracker#send(Event)}, the strategy + * for delivering events to registered clients is dependent on protocol version and the configuration + * of the pipeline. For v5 and newer connections, the event message is encoded into an Envelope, + * wrapped in a FlushItem and then delivered via the pipeline's flusher, in a similar way to + * a Response returned from {@link #processRequest(Channel, Message.Request, FlushItemConverter)}. + * It's worth noting that events are not generally fired as a direct response to a client request, + * so this flush item has a null request attribute. The dispatcher itself is created when the + * pipeline is first configured during protocol negotiation and is attached to the channel for + * later retrieval. + * + * Pre-v5 connections simply write the EventMessage directly to the pipeline. + */ + static final AttributeKey<Consumer<EventMessage>> EVENT_DISPATCHER = AttributeKey.valueOf("EVTDISP"); + Consumer<EventMessage> eventDispatcher(final Channel channel, + final ProtocolVersion version, + final FrameEncoder.PayloadAllocator allocator) + { + return eventMessage -> flush(new FlushItem.Framed(channel, + eventMessage.encode(version), + null, + allocator, + f -> f.response.release())); + } } diff --git a/src/java/org/apache/cassandra/transport/PipelineConfigurator.java b/src/java/org/apache/cassandra/transport/PipelineConfigurator.java index d2a5cad..82865f2 100644 --- a/src/java/org/apache/cassandra/transport/PipelineConfigurator.java +++ b/src/java/org/apache/cassandra/transport/PipelineConfigurator.java @@ -300,6 +300,11 @@ public class PipelineConfigurator pipeline.addBefore(INITIAL_HANDLER, MESSAGE_PROCESSOR, processor); pipeline.replace(EXCEPTION_HANDLER, EXCEPTION_HANDLER, exceptionHandler); pipeline.remove(INITIAL_HANDLER); + + // Handles delivering event messages to registered clients + ctx.channel() + .attr(Dispatcher.EVENT_DISPATCHER) + .set(dispatcher.eventDispatcher(ctx.channel(), version, payloadAllocator)); onNegotiationComplete(pipeline); } diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java index 2e317e5..5c9e575 100644 --- a/src/java/org/apache/cassandra/transport/Server.java +++ b/src/java/org/apache/cassandra/transport/Server.java @@ -32,6 +32,7 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.group.ChannelGroup; +import io.netty.channel.group.ChannelMatcher; import io.netty.channel.group.DefaultChannelGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.util.concurrent.GlobalEventExecutor; @@ -97,7 +98,8 @@ public class Server implements CassandraDaemon.Server DatabaseDescriptor.useNativeTransportLegacyFlusher(), builder.tlsEncryptionPolicy); - EventNotifier notifier = new EventNotifier(this); + EventNotifier notifier = builder.eventNotifier != null ? builder.eventNotifier : new EventNotifier(); + notifier.registerConnectionTracker(connectionTracker); StorageService.instance.register(notifier); Schema.instance.registerListener(notifier); } @@ -177,6 +179,7 @@ public class Server implements CassandraDaemon.Server private int port = -1; private InetSocketAddress socket; private PipelineConfigurator pipelineConfigurator; + private EventNotifier eventNotifier; public Builder withTlsEncryptionPolicy(EncryptionOptions.TlsEncryptionPolicy tlsEncryptionPolicy) { @@ -210,6 +213,12 @@ public class Server implements CassandraDaemon.Server return this; } + public Builder withEventNotifier(EventNotifier eventNotifier) + { + this.eventNotifier = eventNotifier; + return this; + } + public Server build() { return new Server(this); @@ -234,6 +243,11 @@ public class Server implements CassandraDaemon.Server public static class ConnectionTracker implements Connection.Tracker { + private static final ChannelMatcher PRE_V5_CHANNEL = channel -> channel.attr(Connection.attributeKey) + .get() + .getVersion() + .isSmallerThan(ProtocolVersion.V5); + // TODO: should we be using the GlobalEventExecutor or defining our own? public final ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); private final EnumMap<Event.Type, ChannelGroup> groups = new EnumMap<>(Event.Type.class); @@ -260,7 +274,16 @@ public class Server implements CassandraDaemon.Server public void send(Event event) { - groups.get(event.type).writeAndFlush(new EventMessage(event)); + ChannelGroup registered = groups.get(event.type); + EventMessage message = new EventMessage(event); + + // Deliver event to pre-v5 channels + registered.writeAndFlush(message, PRE_V5_CHANNEL); + + // Deliver event to post-v5 channels + for (Channel c : registered) + if (!PRE_V5_CHANNEL.matches(c)) + c.attr(Dispatcher.EVENT_DISPATCHER).get().accept(message); } void closeAll() @@ -331,9 +354,9 @@ public class Server implements CassandraDaemon.Server } } - private static class EventNotifier extends SchemaChangeListener implements IEndpointLifecycleSubscriber + public static class EventNotifier extends SchemaChangeListener implements IEndpointLifecycleSubscriber { - private final Server server; + private ConnectionTracker connectionTracker; // We keep track of the latest status change events we have sent to avoid sending duplicates // since StorageService may send duplicate notifications (CASSANDRA-7816, CASSANDRA-8236, CASSANDRA-9156) @@ -342,9 +365,9 @@ public class Server implements CassandraDaemon.Server // state. This tracks the endpoints which have joined, but not yet signalled they're ready for clients private final Set<InetAddressAndPort> endpointsPendingJoinedNotification = ConcurrentHashMap.newKeySet(); - private EventNotifier(Server server) + private void registerConnectionTracker(ConnectionTracker connectionTracker) { - this.server = server; + this.connectionTracker = connectionTracker; } private InetAddressAndPort getNativeAddress(InetAddressAndPort endpoint) @@ -381,7 +404,7 @@ public class Server implements CassandraDaemon.Server private void send(Event event) { - server.connectionTracker.send(event); + connectionTracker.send(event); } public void onJoinCluster(InetAddressAndPort endpoint) diff --git a/test/unit/org/apache/cassandra/transport/ClientNotificiationsTest.java b/test/unit/org/apache/cassandra/transport/ClientNotificiationsTest.java new file mode 100644 index 0000000..bd1ec63 --- /dev/null +++ b/test/unit/org/apache/cassandra/transport/ClientNotificiationsTest.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.transport; + +import java.util.Collections; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.transport.messages.RegisterMessage; +import org.apache.cassandra.utils.FBUtilities; + +import static org.junit.Assert.assertEquals; + +@RunWith(Parameterized.class) +public class ClientNotificiationsTest extends CQLTester +{ + private static Server.EventNotifier notifier = new Server.EventNotifier(); + + @Before + public void setup() + { + requireNetwork(builder -> builder.withEventNotifier(notifier)); + } + + @Parameterized.Parameter(0) + public ProtocolVersion version; + + @Parameterized.Parameters(name = "{index}: protocol version={0}") + public static Iterable<ProtocolVersion> params() + { + return ProtocolVersion.SUPPORTED; + } + + @Test + public void testNotifications() throws Exception + { + SimpleClient.Builder builder = SimpleClient.builder(nativeAddr.getHostAddress(), nativePort) + .protocolVersion(version); + if (version.isBeta()) + builder.useBeta(); + + try (SimpleClient client = builder.build()) + { + EventHandler handler = new EventHandler(); + client.setEventHandler(handler); + client.connect(false); + client.execute(new RegisterMessage(Collections.singletonList(Event.Type.STATUS_CHANGE))); + client.execute(new RegisterMessage(Collections.singletonList(Event.Type.TOPOLOGY_CHANGE))); + client.execute(new RegisterMessage(Collections.singletonList(Event.Type.SCHEMA_CHANGE))); + + InetAddressAndPort broadcastAddress = FBUtilities.getBroadcastAddressAndPort(); + InetAddressAndPort nativeAddress = FBUtilities.getBroadcastNativeAddressAndPort(); + + // Necessary or else the NEW_NODE notification is deferred (CASSANDRA-11038) + // (note: this works because the notifications are for the local address) + StorageService.instance.setRpcReady(true); + + notifier.onUp(broadcastAddress); + notifier.onDown(broadcastAddress); + notifier.onJoinCluster(broadcastAddress); + notifier.onMove(broadcastAddress); + notifier.onLeaveCluster(broadcastAddress); + notifier.onCreateKeyspace("ks"); + notifier.onAlterKeyspace("ks"); + notifier.onDropKeyspace("ks"); + + handler.assertNextEvent(Event.StatusChange.nodeUp(nativeAddress)); + handler.assertNextEvent(Event.StatusChange.nodeDown(nativeAddress)); + handler.assertNextEvent(Event.TopologyChange.newNode(nativeAddress)); + handler.assertNextEvent(Event.TopologyChange.movedNode(nativeAddress)); + handler.assertNextEvent(Event.TopologyChange.removedNode(nativeAddress)); + handler.assertNextEvent(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, "ks")); + handler.assertNextEvent(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, "ks")); + handler.assertNextEvent(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, "ks")); + } + } + + static class EventHandler extends SimpleClient.SimpleEventHandler + { + public void assertNextEvent(Event expected) + { + try + { + Event actual = queue.poll(100, TimeUnit.MILLISECONDS); + assertEquals(expected, actual); + } + catch (InterruptedException e) + { + throw new AssertionError(String.format("Expected event %s, but not received withing timeout", expected)); + } + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org