This is an automated email from the ASF dual-hosted git repository. ifesdjeen pushed a commit to branch cassandra-3.0 in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit a27092926b42a83ec0a1e6188677329737c5a3f4 Merge: cd82046 b2f2c70 Author: Alex Petrov <oleksandr.pet...@gmail.com> AuthorDate: Fri Jan 31 14:37:25 2020 +0100 Merge branch 'cassandra-2.2' into cassandra-3.0 .../apache/cassandra/distributed/api/IMessage.java | 8 +- .../cassandra/distributed/api/IMessageFilters.java | 28 ++- .../distributed/impl/AbstractCluster.java | 17 +- .../distributed/impl/IInvokableInstance.java | 1 - .../cassandra/distributed/impl/Instance.java | 187 ++++++++++-------- .../cassandra/distributed/impl/MessageFilters.java | 79 ++++---- .../distributed/test/MessageFiltersTest.java | 210 +++++++++++++++++++++ 7 files changed, 395 insertions(+), 135 deletions(-) diff --cc test/distributed/org/apache/cassandra/distributed/impl/Instance.java index 3de5ed8,0647198..5a4dcf4 --- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java @@@ -192,7 -192,9 +193,10 @@@ public class Instance extends IsolatedE { BiConsumer<InetAddressAndPort, IMessage> deliverToInstance = (to, message) -> cluster.get(to).receiveMessage(message); BiConsumer<InetAddressAndPort, IMessage> deliverToInstanceIfNotFiltered = (to, message) -> { - if (cluster.filters().permit(this, cluster.get(to), message.verb())) + int fromNum = config().num(); + int toNum = cluster.get(to).config().num(); ++ + if (cluster.filters().permit(fromNum, toNum, message)) deliverToInstance.accept(to, message); }; @@@ -242,46 -265,35 +267,34 @@@ public boolean allowOutgoingMessage(MessageOut messageOut, int id, InetAddress to) { - try (DataOutputBuffer out = new DataOutputBuffer(1024)) + InetAddressAndPort from = broadcastAddressAndPort(); - InetAddressAndPort toFull = lookupAddressAndPort.apply(to); + assert from.equals(lookupAddressAndPort.apply(messageOut.from)); - - IMessage serialized = serializeMessage(messageOut, id, broadcastAddressAndPort(), lookupAddressAndPort.apply(messageOut.from)); ++ InetAddressAndPort toFull = lookupAddressAndPort.apply(to); ++ IMessage message = serializeMessage(messageOut, id, from, toFull); + + // Tracing logic - similar to org.apache.cassandra.net.OutboundTcpConnection.writeConnected + byte[] sessionBytes = (byte[]) messageOut.parameters.get(Tracing.TRACE_HEADER); + if (sessionBytes != null) { - InetAddressAndPort from = broadcastAddressAndPort(); - assert from.equals(lookupAddressAndPort.apply(messageOut.from)); - InetAddressAndPort toFull = lookupAddressAndPort.apply(to); - int version = MessagingService.instance().getVersion(to); - - // Tracing logic - similar to org.apache.cassandra.net.OutboundTcpConnection.writeConnected - byte[] sessionBytes = (byte[]) messageOut.parameters.get(Tracing.TRACE_HEADER); - if (sessionBytes != null) + UUID sessionId = UUIDGen.getUUID(ByteBuffer.wrap(sessionBytes)); + TraceState state = Tracing.instance.get(sessionId); - String message = String.format("Sending %s message to %s", messageOut.verb, to); ++ String traceMessage = String.format("Sending %s message to %s", messageOut.verb, toFull.address); + // session may have already finished; see CASSANDRA-5668 + if (state == null) { - UUID sessionId = UUIDGen.getUUID(ByteBuffer.wrap(sessionBytes)); - TraceState state = Tracing.instance.get(sessionId); - String message = String.format("Sending %s message to %s", messageOut.verb, toFull.address); - // session may have already finished; see CASSANDRA-5668 - if (state == null) - { - byte[] traceTypeBytes = (byte[]) messageOut.parameters.get(Tracing.TRACE_TYPE); - Tracing.TraceType traceType = traceTypeBytes == null ? Tracing.TraceType.QUERY : Tracing.TraceType.deserialize(traceTypeBytes[0]); - TraceState.mutateWithTracing(ByteBuffer.wrap(sessionBytes), message, -1, traceType.getTTL()); - } - else - { - state.trace(message); - if (messageOut.verb == MessagingService.Verb.REQUEST_RESPONSE) - Tracing.instance.doneWithNonLocalSession(state); - } + byte[] traceTypeBytes = (byte[]) messageOut.parameters.get(Tracing.TRACE_TYPE); + Tracing.TraceType traceType = traceTypeBytes == null ? Tracing.TraceType.QUERY : Tracing.TraceType.deserialize(traceTypeBytes[0]); - TraceState.mutateWithTracing(ByteBuffer.wrap(sessionBytes), message, -1, traceType.getTTL()); ++ TraceState.mutateWithTracing(ByteBuffer.wrap(sessionBytes), traceMessage, -1, traceType.getTTL()); + } + else + { - state.trace(message); ++ state.trace(traceMessage); + if (messageOut.verb == MessagingService.Verb.REQUEST_RESPONSE) + Tracing.instance.doneWithNonLocalSession(state); } - - out.writeInt(MessagingService.PROTOCOL_MAGIC); - out.writeInt(id); - long timestamp = System.currentTimeMillis(); - out.writeInt((int) timestamp); - messageOut.serialize(out, version); - deliver.accept(toFull, new Message(messageOut.verb.ordinal(), out.toByteArray(), id, version, from)); - } - catch (IOException e) - { - throw new RuntimeException(e); } + - deliver.accept(toFull, serialized); ++ deliver.accept(toFull, message); return false; } @@@ -292,50 -304,46 +305,47 @@@ } } - public void receiveMessage(IMessage imessage) - public static Pair<MessageIn<Object>, Integer> deserializeMessage(IMessage msg) ++ ++ public static Pair<MessageIn<Object>, Integer> deserializeMessage(IMessage imessage) { - sync(() -> { - // Based on org.apache.cassandra.net.IncomingTcpConnection.receiveMessage - try (DataInputBuffer input = new DataInputBuffer(imessage.bytes())) + // Based on org.apache.cassandra.net.IncomingTcpConnection.receiveMessage - try (DataInputStream input = new DataInputStream(new ByteArrayInputStream(msg.bytes()))) ++ try (DataInputBuffer input = new DataInputBuffer(imessage.bytes())) + { - int version = msg.version(); ++ int version = imessage.version(); + if (version > MessagingService.current_version) { - int version = imessage.version(); - if (version > MessagingService.current_version) - { - throw new IllegalStateException(String.format("Node%d received message version %d but current version is %d", - this.config.num(), - version, - MessagingService.current_version)); - } + throw new IllegalStateException(String.format("Received message version %d but current version is %d", + version, + MessagingService.current_version)); + } - MessagingService.validateMagic(input.readInt()); - int id; - if (version < MessagingService.VERSION_20) - id = Integer.parseInt(input.readUTF()); - else - id = input.readInt(); + MessagingService.validateMagic(input.readInt()); + int id; + if (version < MessagingService.VERSION_20) + id = Integer.parseInt(input.readUTF()); + else + id = input.readInt(); - if (msg.id() != id) - throw new IllegalStateException(String.format("Message id mismatch: %d != %d", msg.id(), id)); ++ if (imessage.id() != id) ++ throw new IllegalStateException(String.format("Message id mismatch: %d != %d", imessage.id(), id)); - long timestamp = System.currentTimeMillis(); - boolean isCrossNodeTimestamp = false; - // make sure to readInt, even if cross_node_to is not enabled - int partial = input.readInt(); - if (DatabaseDescriptor.hasCrossNodeTimeout()) - { - long crossNodeTimestamp = (timestamp & 0xFFFFFFFF00000000L) | (((partial & 0xFFFFFFFFL) << 2) >> 2); - isCrossNodeTimestamp = (timestamp != crossNodeTimestamp); - timestamp = crossNodeTimestamp; - } + // make sure to readInt, even if cross_node_to is not enabled + int partial = input.readInt(); - MessageIn message = MessageIn.read(input, version, id); - if (message == null) - { - // callback expired; nothing to do - return; - } - if (version <= MessagingService.current_version) - { - MessagingService.instance().receive(message, id, timestamp, isCrossNodeTimestamp); - } - // else ignore message + return Pair.create(MessageIn.read(input, version, id), partial); + } + catch (IOException e) + { + throw new RuntimeException(); + } + } + + public void receiveMessage(IMessage imessage) + { + sync(() -> { + Pair<MessageIn<Object>, Integer> deserialized = null; + try + { + deserialized = deserializeMessage(imessage); } catch (Throwable t) { diff --cc test/distributed/org/apache/cassandra/distributed/impl/MessageFilters.java index c1607f8,c92553f..833677b --- a/test/distributed/org/apache/cassandra/distributed/impl/MessageFilters.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/MessageFilters.java @@@ -19,16 -19,11 +19,14 @@@ package org.apache.cassandra.distributed.impl; import java.util.Arrays; - import java.util.Set; - import java.util.concurrent.CopyOnWriteArraySet; - import java.util.function.BiConsumer; + import java.util.List; + import java.util.concurrent.CopyOnWriteArrayList; ++import java.util.function.Supplier; ++ ++import com.google.common.annotations.VisibleForTesting; - import org.apache.cassandra.distributed.api.IInstance; import org.apache.cassandra.distributed.api.IMessage; import org.apache.cassandra.distributed.api.IMessageFilters; - import org.apache.cassandra.distributed.api.ICluster; - import org.apache.cassandra.locator.InetAddressAndPort; - import org.apache.cassandra.net.MessagingService; public class MessageFilters implements IMessageFilters { diff --cc test/distributed/org/apache/cassandra/distributed/test/MessageFiltersTest.java index 0000000,96974d8..07e7428 mode 000000,100644..100644 --- a/test/distributed/org/apache/cassandra/distributed/test/MessageFiltersTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/MessageFiltersTest.java @@@ -1,0 -1,210 +1,210 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.cassandra.distributed.test; + + import java.util.Arrays; -import java.util.HashSet; + import java.util.Set; + import java.util.concurrent.atomic.AtomicInteger; + ++import com.google.common.collect.Sets; + import org.junit.Assert; + import org.junit.Test; + + import org.apache.cassandra.db.ConsistencyLevel; + import org.apache.cassandra.distributed.Cluster; + import org.apache.cassandra.distributed.api.IIsolatedExecutor; + import org.apache.cassandra.distributed.api.IMessage; + import org.apache.cassandra.distributed.api.IMessageFilters; + import org.apache.cassandra.distributed.impl.Instance; + import org.apache.cassandra.distributed.impl.MessageFilters; + import org.apache.cassandra.locator.InetAddressAndPort; + import org.apache.cassandra.net.MessageIn; + import org.apache.cassandra.net.MessagingService; + + public class MessageFiltersTest extends DistributedTestBase + { + @Test + public void simpleFiltersTest() throws Throwable + { + int VERB1 = MessagingService.Verb.READ.ordinal(); + int VERB2 = MessagingService.Verb.REQUEST_RESPONSE.ordinal(); + int VERB3 = MessagingService.Verb.READ_REPAIR.ordinal(); + int i1 = 1; + int i2 = 2; + int i3 = 3; + String MSG1 = "msg1"; + String MSG2 = "msg2"; + + MessageFilters filters = new MessageFilters(); + MessageFilters.Filter filter = filters.allVerbs().from(1).drop(); + + Assert.assertFalse(filters.permit(i1, i2, msg(VERB1, MSG1))); + Assert.assertFalse(filters.permit(i1, i2, msg(VERB2, MSG1))); + Assert.assertFalse(filters.permit(i1, i2, msg(VERB3, MSG1))); + Assert.assertTrue(filters.permit(i2, i1, msg(VERB1, MSG1))); + filter.off(); + Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG1))); + filters.reset(); + + filters.verbs(VERB1).from(1).to(2).drop(); + Assert.assertFalse(filters.permit(i1, i2, msg(VERB1, MSG1))); + Assert.assertTrue(filters.permit(i1, i2, msg(VERB2, MSG1))); + Assert.assertTrue(filters.permit(i2, i1, msg(VERB1, MSG1))); + Assert.assertTrue(filters.permit(i2, i3, msg(VERB2, MSG1))); + + filters.reset(); + AtomicInteger counter = new AtomicInteger(); + filters.verbs(VERB1).from(1).to(2).messagesMatching((from, to, msg) -> { + counter.incrementAndGet(); + return Arrays.equals(msg.bytes(), MSG1.getBytes()); + }).drop(); + Assert.assertFalse(filters.permit(i1, i2, msg(VERB1, MSG1))); + Assert.assertEquals(counter.get(), 1); + Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG2))); + Assert.assertEquals(counter.get(), 2); + + // filter chain gets interrupted because a higher level filter returns no match + Assert.assertTrue(filters.permit(i2, i1, msg(VERB1, MSG1))); + Assert.assertEquals(counter.get(), 2); + Assert.assertTrue(filters.permit(i2, i1, msg(VERB2, MSG1))); + Assert.assertEquals(counter.get(), 2); + filters.reset(); + + filters.allVerbs().from(3, 2).to(2, 1).drop(); + Assert.assertFalse(filters.permit(i3, i1, msg(VERB1, MSG1))); + Assert.assertFalse(filters.permit(i3, i2, msg(VERB1, MSG1))); + Assert.assertFalse(filters.permit(i2, i1, msg(VERB1, MSG1))); + Assert.assertTrue(filters.permit(i2, i3, msg(VERB1, MSG1))); + Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG1))); + Assert.assertTrue(filters.permit(i1, i3, msg(VERB1, MSG1))); + filters.reset(); + + counter.set(0); + filters.allVerbs().from(1).to(2).messagesMatching((from, to, msg) -> { + counter.incrementAndGet(); + return false; + }).drop(); + Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG1))); + Assert.assertTrue(filters.permit(i1, i3, msg(VERB1, MSG1))); + Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG1))); + Assert.assertEquals(2, counter.get()); + } + + IMessage msg(int verb, String msg) + { + return new IMessage() + { + public int verb() { return verb; } + public byte[] bytes() { return msg.getBytes(); } + public int id() { return 0; } + public int version() { return 0; } + public InetAddressAndPort from() { return null; } + }; + } + + @Test + public void testFilters() throws Throwable + { + String read = "SELECT * FROM " + KEYSPACE + ".tbl"; + String write = "INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)"; + + try (Cluster cluster = Cluster.create(2)) + { + cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + cluster.size() + "};"); + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); + + // Reads and writes are going to time out in both directions + cluster.filters().allVerbs().from(1).to(2).drop(); + for (int i : new int[]{ 1, 2 }) + assertTimeOut(() -> cluster.coordinator(i).execute(read, ConsistencyLevel.ALL)); + for (int i : new int[]{ 1, 2 }) + assertTimeOut(() -> cluster.coordinator(i).execute(write, ConsistencyLevel.ALL)); + + cluster.filters().reset(); + // Reads are going to timeout only when 1 serves as a coordinator + cluster.verbs(MessagingService.Verb.RANGE_SLICE).from(1).to(2).drop(); + assertTimeOut(() -> cluster.coordinator(1).execute(read, ConsistencyLevel.ALL)); + cluster.coordinator(2).execute(read, ConsistencyLevel.ALL); + + // Writes work in both directions + for (int i : new int[]{ 1, 2 }) + cluster.coordinator(i).execute(write, ConsistencyLevel.ALL); + } + } + + @Test + public void testMessageMatching() throws Throwable + { + String read = "SELECT * FROM " + KEYSPACE + ".tbl"; + String write = "INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)"; + + try (Cluster cluster = Cluster.create(2)) + { + cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + cluster.size() + "};"); + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); + + AtomicInteger counter = new AtomicInteger(); + - Set<Integer> verbs = new HashSet<>(Arrays.asList(MessagingService.Verb.RANGE_SLICE.ordinal(), - MessagingService.Verb.MUTATION.ordinal())); ++ Set<Integer> verbs = Sets.newHashSet(Arrays.asList(MessagingService.Verb.RANGE_SLICE.ordinal(), ++ MessagingService.Verb.MUTATION.ordinal())); + + // Reads and writes are going to time out in both directions + IMessageFilters.Filter filter = cluster.filters() + .allVerbs() + .from(1) + .to(2) + .messagesMatching((from, to, msg) -> { + // Decode and verify message on instance; return the result back here + Integer id = cluster.get(1).callsOnInstance((IIsolatedExecutor.SerializableCallable<Integer>) () -> { + MessageIn decoded = Instance.deserializeMessage(msg).left; + if (decoded != null) + return (Integer) decoded.verb.ordinal(); + return -1; + }).call(); + if (id > 0) + Assert.assertTrue(verbs.contains(id)); + counter.incrementAndGet(); + return false; + }).drop(); + + for (int i : new int[]{ 1, 2 }) + cluster.coordinator(i).execute(read, ConsistencyLevel.ALL); + for (int i : new int[]{ 1, 2 }) + cluster.coordinator(i).execute(write, ConsistencyLevel.ALL); + + filter.off(); + Assert.assertEquals(4, counter.get()); + } + } + + private static void assertTimeOut(Runnable r) + { + try + { + r.run(); + Assert.fail("Should have timed out"); + } + catch (Throwable t) + { + if (!t.toString().contains("TimeoutException")) + throw t; + // ignore + } + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org