Repository: cassandra Updated Branches: refs/heads/cassandra-3.0 c4e6cd2a1 -> c4982587b refs/heads/cassandra-3.11 aed1b5fdf -> c3a66ab1e refs/heads/trunk dfddeb44c -> d500100a4
Create MessagingService mocking classes Patch by Stefan Podkowinski; reviewed by Tyler Hobbs for CASSANDRA-12016 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1e478d38 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1e478d38 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1e478d38 Branch: refs/heads/cassandra-3.0 Commit: 1e478d380bd497a3fe635b35b6e87e002cabf617 Parents: c4e6cd2 Author: Stefan Podkowinski <s.podkowin...@gmail.com> Authored: Tue Jul 12 12:04:23 2016 -0500 Committer: Aleksey Yeshchenko <alek...@apple.com> Committed: Tue Jul 3 14:16:25 2018 +0100 ---------------------------------------------------------------------- .../apache/cassandra/gms/FailureDetector.java | 9 +- .../apache/cassandra/net/MessagingService.java | 5 + .../cassandra/service/ActiveRepairService.java | 3 +- .../org/apache/cassandra/utils/ExpiringMap.java | 4 +- test/unit/org/apache/cassandra/net/Matcher.java | 32 +++ .../apache/cassandra/net/MatcherResponse.java | 210 +++++++++++++++++ .../cassandra/net/MockMessagingService.java | 144 ++++++++++++ .../cassandra/net/MockMessagingServiceTest.java | 96 ++++++++ .../apache/cassandra/net/MockMessagingSpy.java | 234 +++++++++++++++++++ 9 files changed, 730 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e478d38/src/java/org/apache/cassandra/gms/FailureDetector.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/FailureDetector.java b/src/java/org/apache/cassandra/gms/FailureDetector.java index 70a354c..cda6469 100644 --- a/src/java/org/apache/cassandra/gms/FailureDetector.java +++ b/src/java/org/apache/cassandra/gms/FailureDetector.java @@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.utils.Clock; import org.apache.cassandra.utils.FBUtilities; /** @@ -52,7 +53,7 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean private static final int DEBUG_PERCENTAGE = 80; // if the phi is larger than this percentage of the max, log a debug message private static final long DEFAULT_MAX_PAUSE = 5000L * 1000000L; // 5 seconds private static final long MAX_LOCAL_PAUSE_IN_NANOS = getMaxLocalPause(); - private long lastInterpret = System.nanoTime(); + private long lastInterpret = Clock.instance.nanoTime(); private long lastPause = 0L; private static long getMaxLocalPause() @@ -252,7 +253,7 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean public void report(InetAddress ep) { - long now = System.nanoTime(); + long now = Clock.instance.nanoTime(); ArrivalWindow heartbeatWindow = arrivalSamples.get(ep); if (heartbeatWindow == null) { @@ -279,7 +280,7 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean { return; } - long now = System.nanoTime(); + long now = Clock.instance.nanoTime(); long diff = now - lastInterpret; lastInterpret = now; if (diff > MAX_LOCAL_PAUSE_IN_NANOS) @@ -288,7 +289,7 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean lastPause = now; return; } - if (System.nanoTime() - lastPause < MAX_LOCAL_PAUSE_IN_NANOS) + if (Clock.instance.nanoTime() - lastPause < MAX_LOCAL_PAUSE_IN_NANOS) { logger.debug("Still not marking nodes down due to local pause"); return; http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e478d38/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index 047f51f..67c3ba9 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -389,6 +389,11 @@ public final class MessagingService implements MessagingServiceMBean messageSinks.add(sink); } + public void removeMessageSink(IMessageSink sink) + { + messageSinks.remove(sink); + } + public void clearMessageSinks() { messageSinks.clear(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e478d38/src/java/org/apache/cassandra/service/ActiveRepairService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java index adb6fab..f63cb86 100644 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@ -65,6 +65,7 @@ import org.apache.cassandra.repair.RepairParallelism; import org.apache.cassandra.repair.RepairSession; import org.apache.cassandra.repair.messages.*; import org.apache.cassandra.utils.CassandraVersion; +import org.apache.cassandra.utils.Clock; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.UUIDGen; import org.apache.cassandra.utils.concurrent.Ref; @@ -282,7 +283,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai public UUID prepareForRepair(UUID parentRepairSession, InetAddress coordinator, Set<InetAddress> endpoints, RepairOption options, List<ColumnFamilyStore> columnFamilyStores) { - long timestamp = System.currentTimeMillis(); + long timestamp = Clock.instance.currentTimeMillis(); registerParentRepairSession(parentRepairSession, coordinator, columnFamilyStores, options.getRanges(), options.isIncremental(), timestamp, options.isGlobal()); final CountDownLatch prepareLatch = new CountDownLatch(endpoints.size()); final AtomicBoolean status = new AtomicBoolean(true); http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e478d38/src/java/org/apache/cassandra/utils/ExpiringMap.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/ExpiringMap.java b/src/java/org/apache/cassandra/utils/ExpiringMap.java index 8359918..ef013f5 100644 --- a/src/java/org/apache/cassandra/utils/ExpiringMap.java +++ b/src/java/org/apache/cassandra/utils/ExpiringMap.java @@ -48,7 +48,7 @@ public class ExpiringMap<K, V> assert value != null; this.value = value; this.timeout = timeout; - this.createdAt = System.nanoTime(); + this.createdAt = Clock.instance.nanoTime(); } private boolean isReadyToDieAt(long atNano) @@ -85,7 +85,7 @@ public class ExpiringMap<K, V> { public void run() { - long start = System.nanoTime(); + long start = Clock.instance.nanoTime(); int n = 0; for (Map.Entry<K, CacheableObject<V>> entry : cache.entrySet()) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e478d38/test/unit/org/apache/cassandra/net/Matcher.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/net/Matcher.java b/test/unit/org/apache/cassandra/net/Matcher.java new file mode 100644 index 0000000..cd1b667 --- /dev/null +++ b/test/unit/org/apache/cassandra/net/Matcher.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.net; + +import java.net.InetAddress; + +/** + * Predicate based on intercepted, outgoing messange and the message's destination address. + */ +public interface Matcher<T> +{ + /** + * @param obj intercepted outgoing message + * @param to destination address + */ + public boolean matches(MessageOut<T> obj, InetAddress to); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e478d38/test/unit/org/apache/cassandra/net/MatcherResponse.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/net/MatcherResponse.java b/test/unit/org/apache/cassandra/net/MatcherResponse.java new file mode 100644 index 0000000..12a8d1b --- /dev/null +++ b/test/unit/org/apache/cassandra/net/MatcherResponse.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.net; + +import java.net.InetAddress; +import java.util.Collections; +import java.util.HashSet; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiFunction; +import java.util.function.Function; + +import org.apache.cassandra.utils.Clock; + +/** + * Sends a response for an incoming message with a matching {@link Matcher}. + * The actual behavior by any instance of this class can be inspected by + * interacting with the returned {@link MockMessagingSpy}. + */ +public class MatcherResponse +{ + private final Matcher<?> matcher; + private final Set<Integer> sendResponses = new HashSet<>(); + private final MockMessagingSpy spy = new MockMessagingSpy(); + private final AtomicInteger limitCounter = new AtomicInteger(Integer.MAX_VALUE); + private IMessageSink sink; + + MatcherResponse(Matcher<?> matcher) + { + this.matcher = matcher; + } + + /** + * Do not create any responses for intercepted outbound messages. + */ + public MockMessagingSpy dontReply() + { + return respond((MessageIn<?>)null); + } + + /** + * Respond with provided message in reply to each intercepted outbound message. + * @param message the message to use as mock reply from the cluster + */ + public MockMessagingSpy respond(MessageIn<?> message) + { + return respondN(message, Integer.MAX_VALUE); + } + + /** + * Respond a limited number of times with the provided message in reply to each intercepted outbound message. + * @param response the message to use as mock reply from the cluster + * @param limit number of times to respond with message + */ + public MockMessagingSpy respondN(final MessageIn<?> response, int limit) + { + return respondN((in, to) -> response, limit); + } + + /** + * Respond with the message created by the provided function that will be called with each intercepted outbound message. + * @param fnResponse function to call for creating reply based on intercepted message and target address + */ + public <T, S> MockMessagingSpy respond(BiFunction<MessageOut<T>, InetAddress, MessageIn<S>> fnResponse) + { + return respondN(fnResponse, Integer.MAX_VALUE); + } + + /** + * Respond with message wrapping the payload object created by provided function called for each intercepted outbound message. + * The target address from the intercepted message will automatically be used as the created message's sender address. + * @param fnResponse function to call for creating payload object based on intercepted message and target address + * @param verb verb to use for reply message + */ + public <T, S> MockMessagingSpy respondWithPayloadForEachReceiver(Function<MessageOut<T>, S> fnResponse, MessagingService.Verb verb) + { + return respondNWithPayloadForEachReceiver(fnResponse, verb, Integer.MAX_VALUE); + } + + /** + * Respond a limited number of times with message wrapping the payload object created by provided function called for + * each intercepted outbound message. The target address from the intercepted message will automatically be used as the + * created message's sender address. + * @param fnResponse function to call for creating payload object based on intercepted message and target address + * @param verb verb to use for reply message + */ + public <T, S> MockMessagingSpy respondNWithPayloadForEachReceiver(Function<MessageOut<T>, S> fnResponse, MessagingService.Verb verb, int limit) + { + return respondN((MessageOut<T> msg, InetAddress to) -> { + S payload = fnResponse.apply(msg); + if (payload == null) + return null; + else + return MessageIn.create(to, payload, Collections.emptyMap(), verb, MessagingService.current_version); + }, + limit); + } + + /** + * Responds to each intercepted outbound message by creating a response message wrapping the next element consumed + * from the provided queue. No reply will be send when the queue has been exhausted. + * @param cannedResponses prepared payload messages to use for responses + * @param verb verb to use for reply message + */ + public <T, S> MockMessagingSpy respondWithPayloadForEachReceiver(Queue<S> cannedResponses, MessagingService.Verb verb) + { + return respondWithPayloadForEachReceiver((MessageOut<T> msg) -> cannedResponses.poll(), verb); + } + + /** + * Responds to each intercepted outbound message by creating a response message wrapping the next element consumed + * from the provided queue. This method will block until queue elements are available. + * @param cannedResponses prepared payload messages to use for responses + * @param verb verb to use for reply message + */ + public <T, S> MockMessagingSpy respondWithPayloadForEachReceiver(BlockingQueue<S> cannedResponses, MessagingService.Verb verb) + { + return respondWithPayloadForEachReceiver((MessageOut<T> msg) -> { + try + { + return cannedResponses.take(); + } + catch (InterruptedException e) + { + return null; + } + }, verb); + } + + /** + * Respond a limited number of times with the message created by the provided function that will be called with + * each intercepted outbound message. + * @param fnResponse function to call for creating reply based on intercepted message and target address + */ + public <T, S> MockMessagingSpy respondN(BiFunction<MessageOut<T>, InetAddress, MessageIn<S>> fnResponse, int limit) + { + limitCounter.set(limit); + + assert sink == null: "destroy() must be called first to register new response"; + + sink = new IMessageSink() + { + public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to) + { + // prevent outgoing message from being send in case matcher indicates a match + // and instead send the mocked response + if (matcher.matches(message, to)) + { + spy.matchingMessage(message); + + if (limitCounter.decrementAndGet() < 0) + return false; + + synchronized (sendResponses) + { + // I'm not sure about retry semantics regarding message/ID relationships, but I assume + // sending a message multiple times using the same ID shouldn't happen.. + assert !sendResponses.contains(id) : "ID re-use for outgoing message"; + sendResponses.add(id); + } + MessageIn<?> response = fnResponse.apply(message, to); + if (response != null) + { + CallbackInfo cb = MessagingService.instance().getRegisteredCallback(id); + if (cb != null) + cb.callback.response(response); + else + MessagingService.instance().receive(response, id, Clock.instance.currentTimeMillis(), false); + spy.matchingResponse(response); + } + return false; + } + return true; + } + + public boolean allowIncomingMessage(MessageIn message, int id) + { + return true; + } + }; + MessagingService.instance().addMessageSink(sink); + + return spy; + } + + /** + * Stops currently registered response from being send. + */ + public void destroy() + { + MessagingService.instance().removeMessageSink(sink); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e478d38/test/unit/org/apache/cassandra/net/MockMessagingService.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/net/MockMessagingService.java b/test/unit/org/apache/cassandra/net/MockMessagingService.java new file mode 100644 index 0000000..0412759 --- /dev/null +++ b/test/unit/org/apache/cassandra/net/MockMessagingService.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.net; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.function.Predicate; + +/** + * Starting point for mocking {@link MessagingService} interactions. Outgoing messages can be + * intercepted by first creating a {@link MatcherResponse} by calling {@link MockMessagingService#when(Matcher)}. + * Alternatively {@link Matcher}s can be created by using helper methods such as {@link #to(InetAddress)}, + * {@link #verb(MessagingService.Verb)} or {@link #payload(Predicate)} and may also be + * nested using {@link MockMessagingService#all(Matcher[])} or {@link MockMessagingService#any(Matcher[])}. + * After each test, {@link MockMessagingService#cleanup()} must be called for free listeners registered + * in {@link MessagingService}. + */ +public class MockMessagingService +{ + + private MockMessagingService() + { + } + + /** + * Creates a MatcherResponse based on specified matcher. + */ + public static MatcherResponse when(Matcher matcher) + { + return new MatcherResponse(matcher); + } + + /** + * Unsubscribes any handlers added by calling {@link MessagingService#addMessageSink(IMessageSink)}. + * This should be called after each test. + */ + public static void cleanup() + { + MessagingService.instance().clearMessageSinks(); + } + + /** + * Creates a matcher that will indicate if the target address of the outgoing message equals the + * provided address. + */ + public static Matcher<InetAddress> to(String address) + { + try + { + return to(InetAddress.getByName(address)); + } + catch (UnknownHostException e) + { + throw new RuntimeException(e); + } + } + + /** + * Creates a matcher that will indicate if the target address of the outgoing message equals the + * provided address. + */ + public static Matcher<InetAddress> to(InetAddress address) + { + return (in, to) -> to == address || to.equals(address); + } + + /** + * Creates a matcher that will indicate if the verb of the outgoing message equals the + * provided value. + */ + public static Matcher<MessagingService.Verb> verb(MessagingService.Verb verb) + { + return (in, to) -> in.verb == verb; + } + + /** + * Creates a matcher based on the result of the provided predicate called with the outgoing message. + */ + public static <T> Matcher<T> message(Predicate<MessageOut<T>> fn) + { + return (msg, to) -> fn.test(msg); + } + + /** + * Creates a matcher based on the result of the provided predicate called with the outgoing message's payload. + */ + public static <T> Matcher<T> payload(Predicate<T> fn) + { + return (msg, to) -> fn.test(msg.payload); + } + + /** + * Inverts boolean result of wrapped matcher. + */ + public static <T> Matcher<T> not(Matcher<T> matcher) + { + return (o, to) -> !matcher.matches(o, to); + } + + /** + * Indicates true in case all provided matchers returned true. + */ + public static <T> Matcher<?> all(Matcher<?>... matchers) + { + return (MessageOut<T> out, InetAddress to) -> { + for (Matcher matcher : matchers) + { + if (!matcher.matches(out, to)) + return false; + } + return true; + }; + } + + /** + * Indicates true in case at least a single provided matcher returned true. + */ + public static <T> Matcher<?> any(Matcher<?>... matchers) + { + return (MessageOut<T> out, InetAddress to) -> { + for (Matcher matcher : matchers) + { + if (matcher.matches(out, to)) + return true; + } + return false; + }; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e478d38/test/unit/org/apache/cassandra/net/MockMessagingServiceTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/net/MockMessagingServiceTest.java b/test/unit/org/apache/cassandra/net/MockMessagingServiceTest.java new file mode 100644 index 0000000..ed4cce8 --- /dev/null +++ b/test/unit/org/apache/cassandra/net/MockMessagingServiceTest.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.net; + +import java.util.Collections; +import java.util.concurrent.ExecutionException; + +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.gms.EchoMessage; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.FBUtilities; + +import static org.apache.cassandra.net.MockMessagingService.all; +import static org.apache.cassandra.net.MockMessagingService.to; +import static org.apache.cassandra.net.MockMessagingService.verb; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class MockMessagingServiceTest +{ + @BeforeClass + public static void initCluster() throws ConfigurationException + { + SchemaLoader.prepareServer(); + StorageService.instance.initServer(); + } + + @Before + public void cleanup() + { + MockMessagingService.cleanup(); + } + + @Test + public void testRequestResponse() throws InterruptedException, ExecutionException + { + // echo message that we like to mock as incoming reply for outgoing echo message + MessageIn<EchoMessage> echoMessageIn = MessageIn.create(FBUtilities.getBroadcastAddress(), + EchoMessage.instance, + Collections.emptyMap(), + MessagingService.Verb.ECHO, + MessagingService.current_version + ); + MockMessagingSpy spy = MockMessagingService + .when( + all( + to(FBUtilities.getBroadcastAddress()), + verb(MessagingService.Verb.ECHO) + ) + ) + .respond(echoMessageIn); + + MessageOut<EchoMessage> echoMessageOut = new MessageOut<>(MessagingService.Verb.ECHO, EchoMessage.instance, EchoMessage.serializer); + MessagingService.instance().sendRR(echoMessageOut, FBUtilities.getBroadcastAddress(), new IAsyncCallback() + { + public void response(MessageIn msg) + { + assertEquals(MessagingService.Verb.ECHO, msg.verb); + assertEquals(echoMessageIn.payload, msg.payload); + } + + public boolean isLatencyForSnitch() + { + return false; + } + }); + + // we must have intercepted the outgoing message at this point + MessageOut<?> msg = spy.captureMessageOut().get(); + assertEquals(1, spy.messagesIntercepted); + assertTrue(msg == echoMessageOut); + + // and return a mocked response + assertEquals(1, spy.mockedMessageResponses); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e478d38/test/unit/org/apache/cassandra/net/MockMessagingSpy.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/net/MockMessagingSpy.java b/test/unit/org/apache/cassandra/net/MockMessagingSpy.java new file mode 100644 index 0000000..80bdb39 --- /dev/null +++ b/test/unit/org/apache/cassandra/net/MockMessagingSpy.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.net; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import com.google.common.util.concurrent.AbstractFuture; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import junit.framework.AssertionFailedError; + +/** + * Allows inspecting the behavior of mocked messaging by observing {@link MatcherResponse}. + */ +public class MockMessagingSpy +{ + private static final Logger logger = LoggerFactory.getLogger(MockMessagingSpy.class); + + public int messagesIntercepted = 0; + public int mockedMessageResponses = 0; + + private final BlockingQueue<MessageOut<?>> interceptedMessages = new LinkedBlockingQueue<>(); + private final BlockingQueue<MessageIn<?>> deliveredResponses = new LinkedBlockingQueue<>(); + + private static final Executor executor = Executors.newSingleThreadExecutor(); + + /** + * Returns a future with the first mocked incoming message that has been created and delivered. + */ + public ListenableFuture<MessageIn<?>> captureMockedMessageIn() + { + return Futures.transform(captureMockedMessageInN(1), (List<MessageIn<?>> result) -> result.isEmpty() ? null : result.get(0)); + } + + /** + * Returns a future with the specified number mocked incoming messages that have been created and delivered. + */ + public ListenableFuture<List<MessageIn<?>>> captureMockedMessageInN(int noOfMessages) + { + CapturedResultsFuture<MessageIn<?>> ret = new CapturedResultsFuture<>(noOfMessages, deliveredResponses); + executor.execute(ret); + return ret; + } + + /** + * Returns a future that will indicate if a mocked incoming message has been created and delivered. + */ + public ListenableFuture<Boolean> expectMockedMessageIn() + { + return expectMockedMessageIn(1); + } + + /** + * Returns a future that will indicate if the specified number of mocked incoming message have been created and delivered. + */ + public ListenableFuture<Boolean> expectMockedMessageIn(int noOfMessages) + { + ResultsCompletionFuture<MessageIn<?>> ret = new ResultsCompletionFuture<>(noOfMessages, deliveredResponses); + executor.execute(ret); + return ret; + } + + /** + * Returns a future with the first intercepted outbound message that would have been send. + */ + public ListenableFuture<MessageOut<?>> captureMessageOut() + { + return Futures.transform(captureMessageOut(1), (List<MessageOut<?>> result) -> result.isEmpty() ? null : result.get(0)); + } + + /** + * Returns a future with the specified number of intercepted outbound messages that would have been send. + */ + public ListenableFuture<List<MessageOut<?>>> captureMessageOut(int noOfMessages) + { + CapturedResultsFuture<MessageOut<?>> ret = new CapturedResultsFuture<>(noOfMessages, interceptedMessages); + executor.execute(ret); + return ret; + } + + /** + * Returns a future that will indicate if an intercepted outbound messages would have been send. + */ + public ListenableFuture<Boolean> interceptMessageOut() + { + return interceptMessageOut(1); + } + + /** + * Returns a future that will indicate if the specified number of intercepted outbound messages would have been send. + */ + public ListenableFuture<Boolean> interceptMessageOut(int noOfMessages) + { + ResultsCompletionFuture<MessageOut<?>> ret = new ResultsCompletionFuture<>(noOfMessages, interceptedMessages); + executor.execute(ret); + return ret; + } + + /** + * Returns a future that will indicate the absence of any intercepted outbound messages with the specifed period. + */ + public ListenableFuture<Boolean> interceptNoMsg(long time, TimeUnit unit) + { + ResultAbsenceFuture<MessageOut<?>> ret = new ResultAbsenceFuture<>(interceptedMessages, time, unit); + executor.execute(ret); + return ret; + } + + void matchingMessage(MessageOut<?> message) + { + messagesIntercepted++; + logger.trace("Received matching message: {}", message); + interceptedMessages.add(message); + } + + void matchingResponse(MessageIn<?> response) + { + mockedMessageResponses++; + logger.trace("Responding to intercepted message: {}", response); + deliveredResponses.add(response); + } + + + private static class CapturedResultsFuture<T> extends AbstractFuture<List<T>> implements Runnable + { + private final int waitForResults; + private final List<T> results; + private final BlockingQueue<T> queue; + + CapturedResultsFuture(int waitForResponses, BlockingQueue<T> queue) + { + this.waitForResults = waitForResponses; + results = new ArrayList<T>(waitForResponses); + this.queue = queue; + } + + public void run() + { + try + { + while (results.size() < waitForResults) + results.add(queue.take()); + + set(results); + } + catch (InterruptedException e) + { + throw new AssertionError(); + } + } + } + + private static class ResultsCompletionFuture<T> extends AbstractFuture<Boolean> implements Runnable + { + private final int waitForResults; + private final BlockingQueue<T> queue; + + ResultsCompletionFuture(int waitForResponses, BlockingQueue<T> queue) + { + this.waitForResults = waitForResponses; + this.queue = queue; + } + + public void run() + { + try + { + for (int i = 0; i < waitForResults; i++) + { + queue.take(); + } + set(true); + } + catch (InterruptedException e) + { + throw new AssertionError(); + } + } + } + + private static class ResultAbsenceFuture<T> extends AbstractFuture<Boolean> implements Runnable + { + private final BlockingQueue<T> queue; + private final long time; + private final TimeUnit unit; + + ResultAbsenceFuture(BlockingQueue<T> queue, long time, TimeUnit unit) + { + this.queue = queue; + this.time = time; + this.unit = unit; + } + + public void run() + { + try + { + T result = queue.poll(time, unit); + if (result != null) + setException(new AssertionFailedError("Received unexpected message: " + result)); + else + set(true); + } + catch (InterruptedException e) + { + throw new AssertionError(); + } + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org