Repository: cassandra Updated Branches: refs/heads/trunk 11b93152d -> 7751588f7
Create MessagingService mocking classes Patch by Stefan Podkowinski; reviewed by Tyler Hobbs for CASSANDRA-12016 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7751588f Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7751588f Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7751588f Branch: refs/heads/trunk Commit: 7751588f7715386db0a92bfc4b5db9f151e15133 Parents: 11b9315 Author: Stefan Podkowinski <s.podkowin...@gmail.com> Authored: Tue Jul 12 12:04:23 2016 -0500 Committer: Tyler Hobbs <tylerlho...@gmail.com> Committed: Tue Jul 12 12:04:23 2016 -0500 ---------------------------------------------------------------------- .../apache/cassandra/gms/FailureDetector.java | 9 +- .../apache/cassandra/net/MessagingService.java | 5 + .../cassandra/service/ActiveRepairService.java | 3 +- src/java/org/apache/cassandra/utils/Clock.java | 80 +++++++ .../org/apache/cassandra/utils/ExpiringMap.java | 4 +- test/unit/org/apache/cassandra/net/Matcher.java | 32 +++ .../apache/cassandra/net/MatcherResponse.java | 208 +++++++++++++++++ .../cassandra/net/MockMessagingService.java | 144 ++++++++++++ .../cassandra/net/MockMessagingServiceTest.java | 97 ++++++++ .../apache/cassandra/net/MockMessagingSpy.java | 234 +++++++++++++++++++ .../cassandra/utils/FreeRunningClock.java | 46 ++++ 11 files changed, 855 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/7751588f/src/java/org/apache/cassandra/gms/FailureDetector.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/FailureDetector.java b/src/java/org/apache/cassandra/gms/FailureDetector.java index 964b4ad..7d8b88b 100644 --- a/src/java/org/apache/cassandra/gms/FailureDetector.java +++ b/src/java/org/apache/cassandra/gms/FailureDetector.java @@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.utils.Clock; import org.apache.cassandra.utils.FBUtilities; /** @@ -52,7 +53,7 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean private static final int DEBUG_PERCENTAGE = 80; // if the phi is larger than this percentage of the max, log a debug message private static final long DEFAULT_MAX_PAUSE = 5000L * 1000000L; // 5 seconds private static final long MAX_LOCAL_PAUSE_IN_NANOS = getMaxLocalPause(); - private long lastInterpret = System.nanoTime(); + private long lastInterpret = Clock.instance.nanoTime(); private long lastPause = 0L; private static long getMaxLocalPause() @@ -252,7 +253,7 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean public void report(InetAddress ep) { - long now = System.nanoTime(); + long now = Clock.instance.nanoTime(); ArrivalWindow heartbeatWindow = arrivalSamples.get(ep); if (heartbeatWindow == null) { @@ -279,7 +280,7 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean { return; } - long now = System.nanoTime(); + long now = Clock.instance.nanoTime(); long diff = now - lastInterpret; lastInterpret = now; if (diff > MAX_LOCAL_PAUSE_IN_NANOS) @@ -288,7 +289,7 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean lastPause = now; return; } - if (System.nanoTime() - lastPause < MAX_LOCAL_PAUSE_IN_NANOS) + if (Clock.instance.nanoTime() - lastPause < MAX_LOCAL_PAUSE_IN_NANOS) { logger.debug("Still not marking nodes down due to local pause"); return; http://git-wip-us.apache.org/repos/asf/cassandra/blob/7751588f/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index 954bd9d..54d1183 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -353,6 +353,11 @@ public final class MessagingService implements MessagingServiceMBean messageSinks.add(sink); } + public void removeMessageSink(IMessageSink sink) + { + messageSinks.remove(sink); + } + public void clearMessageSinks() { messageSinks.clear(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/7751588f/src/java/org/apache/cassandra/service/ActiveRepairService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java index 04e39db..bfa121d 100644 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@ -64,6 +64,7 @@ import org.apache.cassandra.repair.RepairParallelism; import org.apache.cassandra.repair.RepairSession; import org.apache.cassandra.repair.messages.*; import org.apache.cassandra.utils.CassandraVersion; +import org.apache.cassandra.utils.Clock; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.UUIDGen; import org.apache.cassandra.utils.concurrent.Ref; @@ -260,7 +261,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai public synchronized UUID prepareForRepair(UUID parentRepairSession, InetAddress coordinator, Set<InetAddress> endpoints, RepairOption options, List<ColumnFamilyStore> columnFamilyStores) { - long timestamp = System.currentTimeMillis(); + long timestamp = Clock.instance.currentTimeMillis(); registerParentRepairSession(parentRepairSession, coordinator, columnFamilyStores, options.getRanges(), options.isIncremental(), timestamp, options.isGlobal()); final CountDownLatch prepareLatch = new CountDownLatch(endpoints.size()); final AtomicBoolean status = new AtomicBoolean(true); http://git-wip-us.apache.org/repos/asf/cassandra/blob/7751588f/src/java/org/apache/cassandra/utils/Clock.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/Clock.java b/src/java/org/apache/cassandra/utils/Clock.java new file mode 100644 index 0000000..eb9822c --- /dev/null +++ b/src/java/org/apache/cassandra/utils/Clock.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.utils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Wrapper around time related functions that are either implemented by using the default JVM calls + * or by using a custom implementation for testing purposes. + * + * See {@link #instance} for how to use a custom implementation. + * + * Please note that {@link java.time.Clock} wasn't used, as it would not be possible to provide an + * implementation for {@link #nanoTime()} with the exact same properties of {@link System#nanoTime()}. + */ +public class Clock +{ + private static final Logger logger = LoggerFactory.getLogger(Clock.class); + + /** + * Static singleton object that will be instanciated by default with a system clock + * implementation. Set <code>cassandra.clock</code> system property to a FQCN to use a + * different implementation instead. + */ + public static Clock instance; + + static + { + String sclock = System.getProperty("cassandra.clock"); + if (sclock == null) + { + instance = new Clock(); + } + else + { + try + { + logger.debug("Using custom clock implementation: {}", sclock); + instance = (Clock) Class.forName(sclock).newInstance(); + } + catch (Exception e) + { + logger.error(e.getMessage(), e); + } + } + } + + /** + * @see System#nanoTime() + */ + public long nanoTime() + { + return System.nanoTime(); + } + + /** + * @see System#currentTimeMillis() + */ + public long currentTimeMillis() + { + return System.currentTimeMillis(); + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/7751588f/src/java/org/apache/cassandra/utils/ExpiringMap.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/ExpiringMap.java b/src/java/org/apache/cassandra/utils/ExpiringMap.java index e7b626c..fc88880 100644 --- a/src/java/org/apache/cassandra/utils/ExpiringMap.java +++ b/src/java/org/apache/cassandra/utils/ExpiringMap.java @@ -48,7 +48,7 @@ public class ExpiringMap<K, V> assert value != null; this.value = value; this.timeout = timeout; - this.createdAt = System.nanoTime(); + this.createdAt = Clock.instance.nanoTime(); } private boolean isReadyToDieAt(long atNano) @@ -85,7 +85,7 @@ public class ExpiringMap<K, V> { public void run() { - long start = System.nanoTime(); + long start = Clock.instance.nanoTime(); int n = 0; for (Map.Entry<K, CacheableObject<V>> entry : cache.entrySet()) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/7751588f/test/unit/org/apache/cassandra/net/Matcher.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/net/Matcher.java b/test/unit/org/apache/cassandra/net/Matcher.java new file mode 100644 index 0000000..cd1b667 --- /dev/null +++ b/test/unit/org/apache/cassandra/net/Matcher.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.net; + +import java.net.InetAddress; + +/** + * Predicate based on intercepted, outgoing messange and the message's destination address. + */ +public interface Matcher<T> +{ + /** + * @param obj intercepted outgoing message + * @param to destination address + */ + public boolean matches(MessageOut<T> obj, InetAddress to); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/7751588f/test/unit/org/apache/cassandra/net/MatcherResponse.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/net/MatcherResponse.java b/test/unit/org/apache/cassandra/net/MatcherResponse.java new file mode 100644 index 0000000..c8984eb --- /dev/null +++ b/test/unit/org/apache/cassandra/net/MatcherResponse.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.net; + +import java.net.InetAddress; +import java.util.Collections; +import java.util.HashSet; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiFunction; +import java.util.function.Function; + +/** + * Sends a response for an incoming message with a matching {@link Matcher}. + * The actual behavior by any instance of this class can be inspected by + * interacting with the returned {@link MockMessagingSpy}. + */ +public class MatcherResponse +{ + private final Matcher<?> matcher; + private final Set<Integer> sendResponses = new HashSet<>(); + private final MockMessagingSpy spy = new MockMessagingSpy(); + private final AtomicInteger limitCounter = new AtomicInteger(Integer.MAX_VALUE); + private IMessageSink sink; + + MatcherResponse(Matcher<?> matcher) + { + this.matcher = matcher; + } + + /** + * Do not create any responses for intercepted outbound messages. + */ + public MockMessagingSpy dontReply() + { + return respond((MessageIn<?>)null); + } + + /** + * Respond with provided message in reply to each intercepted outbound message. + * @param message the message to use as mock reply from the cluster + */ + public MockMessagingSpy respond(MessageIn<?> message) + { + return respondN(message, Integer.MAX_VALUE); + } + + /** + * Respond a limited number of times with the provided message in reply to each intercepted outbound message. + * @param response the message to use as mock reply from the cluster + * @param limit number of times to respond with message + */ + public MockMessagingSpy respondN(final MessageIn<?> response, int limit) + { + return respondN((in, to) -> response, limit); + } + + /** + * Respond with the message created by the provided function that will be called with each intercepted outbound message. + * @param fnResponse function to call for creating reply based on intercepted message and target address + */ + public <T, S> MockMessagingSpy respond(BiFunction<MessageOut<T>, InetAddress, MessageIn<S>> fnResponse) + { + return respondN(fnResponse, Integer.MAX_VALUE); + } + + /** + * Respond with message wrapping the payload object created by provided function called for each intercepted outbound message. + * The target address from the intercepted message will automatically be used as the created message's sender address. + * @param fnResponse function to call for creating payload object based on intercepted message and target address + * @param verb verb to use for reply message + */ + public <T, S> MockMessagingSpy respondWithPayloadForEachReceiver(Function<MessageOut<T>, S> fnResponse, MessagingService.Verb verb) + { + return respondNWithPayloadForEachReceiver(fnResponse, verb, Integer.MAX_VALUE); + } + + /** + * Respond a limited number of times with message wrapping the payload object created by provided function called for + * each intercepted outbound message. The target address from the intercepted message will automatically be used as the + * created message's sender address. + * @param fnResponse function to call for creating payload object based on intercepted message and target address + * @param verb verb to use for reply message + */ + public <T, S> MockMessagingSpy respondNWithPayloadForEachReceiver(Function<MessageOut<T>, S> fnResponse, MessagingService.Verb verb, int limit) + { + return respondN((MessageOut<T> msg, InetAddress to) -> { + S payload = fnResponse.apply(msg); + if (payload == null) + return null; + else + return MessageIn.create(to, payload, Collections.emptyMap(), verb, MessagingService.current_version, MessageIn.createTimestamp()); + }, + limit); + } + + /** + * Responds to each intercepted outbound message by creating a response message wrapping the next element consumed + * from the provided queue. No reply will be send when the queue has been exhausted. + * @param cannedResponses prepared payload messages to use for responses + * @param verb verb to use for reply message + */ + public <T, S> MockMessagingSpy respondWithPayloadForEachReceiver(Queue<S> cannedResponses, MessagingService.Verb verb) + { + return respondWithPayloadForEachReceiver((MessageOut<T> msg) -> cannedResponses.poll(), verb); + } + + /** + * Responds to each intercepted outbound message by creating a response message wrapping the next element consumed + * from the provided queue. This method will block until queue elements are available. + * @param cannedResponses prepared payload messages to use for responses + * @param verb verb to use for reply message + */ + public <T, S> MockMessagingSpy respondWithPayloadForEachReceiver(BlockingQueue<S> cannedResponses, MessagingService.Verb verb) + { + return respondWithPayloadForEachReceiver((MessageOut<T> msg) -> { + try + { + return cannedResponses.take(); + } + catch (InterruptedException e) + { + return null; + } + }, verb); + } + + /** + * Respond a limited number of times with the message created by the provided function that will be called with + * each intercepted outbound message. + * @param fnResponse function to call for creating reply based on intercepted message and target address + */ + public <T, S> MockMessagingSpy respondN(BiFunction<MessageOut<T>, InetAddress, MessageIn<S>> fnResponse, int limit) + { + limitCounter.set(limit); + + assert sink == null: "destroy() must be called first to register new response"; + + sink = new IMessageSink() + { + public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to) + { + // prevent outgoing message from being send in case matcher indicates a match + // and instead send the mocked response + if (matcher.matches(message, to)) + { + spy.matchingMessage(message); + + if (limitCounter.decrementAndGet() < 0) + return false; + + synchronized (sendResponses) + { + // I'm not sure about retry semantics regarding message/ID relationships, but I assume + // sending a message multiple times using the same ID shouldn't happen.. + assert !sendResponses.contains(id) : "ID re-use for outgoing message"; + sendResponses.add(id); + } + MessageIn<?> response = fnResponse.apply(message, to); + if (response != null) + { + CallbackInfo cb = MessagingService.instance().getRegisteredCallback(id); + if (cb != null) + cb.callback.response(response); + else + MessagingService.instance().receive(response, id); + spy.matchingResponse(response); + } + return false; + } + return true; + } + + public boolean allowIncomingMessage(MessageIn message, int id) + { + return true; + } + }; + MessagingService.instance().addMessageSink(sink); + + return spy; + } + + /** + * Stops currently registered response from being send. + */ + public void destroy() + { + MessagingService.instance().removeMessageSink(sink); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/7751588f/test/unit/org/apache/cassandra/net/MockMessagingService.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/net/MockMessagingService.java b/test/unit/org/apache/cassandra/net/MockMessagingService.java new file mode 100644 index 0000000..0412759 --- /dev/null +++ b/test/unit/org/apache/cassandra/net/MockMessagingService.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.net; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.function.Predicate; + +/** + * Starting point for mocking {@link MessagingService} interactions. Outgoing messages can be + * intercepted by first creating a {@link MatcherResponse} by calling {@link MockMessagingService#when(Matcher)}. + * Alternatively {@link Matcher}s can be created by using helper methods such as {@link #to(InetAddress)}, + * {@link #verb(MessagingService.Verb)} or {@link #payload(Predicate)} and may also be + * nested using {@link MockMessagingService#all(Matcher[])} or {@link MockMessagingService#any(Matcher[])}. + * After each test, {@link MockMessagingService#cleanup()} must be called for free listeners registered + * in {@link MessagingService}. + */ +public class MockMessagingService +{ + + private MockMessagingService() + { + } + + /** + * Creates a MatcherResponse based on specified matcher. + */ + public static MatcherResponse when(Matcher matcher) + { + return new MatcherResponse(matcher); + } + + /** + * Unsubscribes any handlers added by calling {@link MessagingService#addMessageSink(IMessageSink)}. + * This should be called after each test. + */ + public static void cleanup() + { + MessagingService.instance().clearMessageSinks(); + } + + /** + * Creates a matcher that will indicate if the target address of the outgoing message equals the + * provided address. + */ + public static Matcher<InetAddress> to(String address) + { + try + { + return to(InetAddress.getByName(address)); + } + catch (UnknownHostException e) + { + throw new RuntimeException(e); + } + } + + /** + * Creates a matcher that will indicate if the target address of the outgoing message equals the + * provided address. + */ + public static Matcher<InetAddress> to(InetAddress address) + { + return (in, to) -> to == address || to.equals(address); + } + + /** + * Creates a matcher that will indicate if the verb of the outgoing message equals the + * provided value. + */ + public static Matcher<MessagingService.Verb> verb(MessagingService.Verb verb) + { + return (in, to) -> in.verb == verb; + } + + /** + * Creates a matcher based on the result of the provided predicate called with the outgoing message. + */ + public static <T> Matcher<T> message(Predicate<MessageOut<T>> fn) + { + return (msg, to) -> fn.test(msg); + } + + /** + * Creates a matcher based on the result of the provided predicate called with the outgoing message's payload. + */ + public static <T> Matcher<T> payload(Predicate<T> fn) + { + return (msg, to) -> fn.test(msg.payload); + } + + /** + * Inverts boolean result of wrapped matcher. + */ + public static <T> Matcher<T> not(Matcher<T> matcher) + { + return (o, to) -> !matcher.matches(o, to); + } + + /** + * Indicates true in case all provided matchers returned true. + */ + public static <T> Matcher<?> all(Matcher<?>... matchers) + { + return (MessageOut<T> out, InetAddress to) -> { + for (Matcher matcher : matchers) + { + if (!matcher.matches(out, to)) + return false; + } + return true; + }; + } + + /** + * Indicates true in case at least a single provided matcher returned true. + */ + public static <T> Matcher<?> any(Matcher<?>... matchers) + { + return (MessageOut<T> out, InetAddress to) -> { + for (Matcher matcher : matchers) + { + if (matcher.matches(out, to)) + return true; + } + return false; + }; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/7751588f/test/unit/org/apache/cassandra/net/MockMessagingServiceTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/net/MockMessagingServiceTest.java b/test/unit/org/apache/cassandra/net/MockMessagingServiceTest.java new file mode 100644 index 0000000..ce94f33 --- /dev/null +++ b/test/unit/org/apache/cassandra/net/MockMessagingServiceTest.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.net; + +import java.util.Collections; +import java.util.concurrent.ExecutionException; + +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.gms.EchoMessage; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.FBUtilities; + +import static org.apache.cassandra.net.MockMessagingService.all; +import static org.apache.cassandra.net.MockMessagingService.to; +import static org.apache.cassandra.net.MockMessagingService.verb; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class MockMessagingServiceTest +{ + @BeforeClass + public static void initCluster() throws ConfigurationException + { + SchemaLoader.prepareServer(); + StorageService.instance.initServer(); + } + + @Before + public void cleanup() + { + MockMessagingService.cleanup(); + } + + @Test + public void testRequestResponse() throws InterruptedException, ExecutionException + { + // echo message that we like to mock as incoming reply for outgoing echo message + MessageIn<EchoMessage> echoMessageIn = MessageIn.create(FBUtilities.getBroadcastAddress(), + EchoMessage.instance, + Collections.emptyMap(), + MessagingService.Verb.ECHO, + MessagingService.current_version, + MessageIn.createTimestamp() + ); + MockMessagingSpy spy = MockMessagingService + .when( + all( + to(FBUtilities.getBroadcastAddress()), + verb(MessagingService.Verb.ECHO) + ) + ) + .respond(echoMessageIn); + + MessageOut<EchoMessage> echoMessageOut = new MessageOut<>(MessagingService.Verb.ECHO, EchoMessage.instance, EchoMessage.serializer); + MessagingService.instance().sendRR(echoMessageOut, FBUtilities.getBroadcastAddress(), new IAsyncCallback() + { + public void response(MessageIn msg) + { + assertEquals(MessagingService.Verb.ECHO, msg.verb); + assertEquals(echoMessageIn.payload, msg.payload); + } + + public boolean isLatencyForSnitch() + { + return false; + } + }); + + // we must have intercepted the outgoing message at this point + MessageOut<?> msg = spy.captureMessageOut().get(); + assertEquals(1, spy.messagesIntercepted); + assertTrue(msg == echoMessageOut); + + // and return a mocked response + assertEquals(1, spy.mockedMessageResponses); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/7751588f/test/unit/org/apache/cassandra/net/MockMessagingSpy.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/net/MockMessagingSpy.java b/test/unit/org/apache/cassandra/net/MockMessagingSpy.java new file mode 100644 index 0000000..80bdb39 --- /dev/null +++ b/test/unit/org/apache/cassandra/net/MockMessagingSpy.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.net; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import com.google.common.util.concurrent.AbstractFuture; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import junit.framework.AssertionFailedError; + +/** + * Allows inspecting the behavior of mocked messaging by observing {@link MatcherResponse}. + */ +public class MockMessagingSpy +{ + private static final Logger logger = LoggerFactory.getLogger(MockMessagingSpy.class); + + public int messagesIntercepted = 0; + public int mockedMessageResponses = 0; + + private final BlockingQueue<MessageOut<?>> interceptedMessages = new LinkedBlockingQueue<>(); + private final BlockingQueue<MessageIn<?>> deliveredResponses = new LinkedBlockingQueue<>(); + + private static final Executor executor = Executors.newSingleThreadExecutor(); + + /** + * Returns a future with the first mocked incoming message that has been created and delivered. + */ + public ListenableFuture<MessageIn<?>> captureMockedMessageIn() + { + return Futures.transform(captureMockedMessageInN(1), (List<MessageIn<?>> result) -> result.isEmpty() ? null : result.get(0)); + } + + /** + * Returns a future with the specified number mocked incoming messages that have been created and delivered. + */ + public ListenableFuture<List<MessageIn<?>>> captureMockedMessageInN(int noOfMessages) + { + CapturedResultsFuture<MessageIn<?>> ret = new CapturedResultsFuture<>(noOfMessages, deliveredResponses); + executor.execute(ret); + return ret; + } + + /** + * Returns a future that will indicate if a mocked incoming message has been created and delivered. + */ + public ListenableFuture<Boolean> expectMockedMessageIn() + { + return expectMockedMessageIn(1); + } + + /** + * Returns a future that will indicate if the specified number of mocked incoming message have been created and delivered. + */ + public ListenableFuture<Boolean> expectMockedMessageIn(int noOfMessages) + { + ResultsCompletionFuture<MessageIn<?>> ret = new ResultsCompletionFuture<>(noOfMessages, deliveredResponses); + executor.execute(ret); + return ret; + } + + /** + * Returns a future with the first intercepted outbound message that would have been send. + */ + public ListenableFuture<MessageOut<?>> captureMessageOut() + { + return Futures.transform(captureMessageOut(1), (List<MessageOut<?>> result) -> result.isEmpty() ? null : result.get(0)); + } + + /** + * Returns a future with the specified number of intercepted outbound messages that would have been send. + */ + public ListenableFuture<List<MessageOut<?>>> captureMessageOut(int noOfMessages) + { + CapturedResultsFuture<MessageOut<?>> ret = new CapturedResultsFuture<>(noOfMessages, interceptedMessages); + executor.execute(ret); + return ret; + } + + /** + * Returns a future that will indicate if an intercepted outbound messages would have been send. + */ + public ListenableFuture<Boolean> interceptMessageOut() + { + return interceptMessageOut(1); + } + + /** + * Returns a future that will indicate if the specified number of intercepted outbound messages would have been send. + */ + public ListenableFuture<Boolean> interceptMessageOut(int noOfMessages) + { + ResultsCompletionFuture<MessageOut<?>> ret = new ResultsCompletionFuture<>(noOfMessages, interceptedMessages); + executor.execute(ret); + return ret; + } + + /** + * Returns a future that will indicate the absence of any intercepted outbound messages with the specifed period. + */ + public ListenableFuture<Boolean> interceptNoMsg(long time, TimeUnit unit) + { + ResultAbsenceFuture<MessageOut<?>> ret = new ResultAbsenceFuture<>(interceptedMessages, time, unit); + executor.execute(ret); + return ret; + } + + void matchingMessage(MessageOut<?> message) + { + messagesIntercepted++; + logger.trace("Received matching message: {}", message); + interceptedMessages.add(message); + } + + void matchingResponse(MessageIn<?> response) + { + mockedMessageResponses++; + logger.trace("Responding to intercepted message: {}", response); + deliveredResponses.add(response); + } + + + private static class CapturedResultsFuture<T> extends AbstractFuture<List<T>> implements Runnable + { + private final int waitForResults; + private final List<T> results; + private final BlockingQueue<T> queue; + + CapturedResultsFuture(int waitForResponses, BlockingQueue<T> queue) + { + this.waitForResults = waitForResponses; + results = new ArrayList<T>(waitForResponses); + this.queue = queue; + } + + public void run() + { + try + { + while (results.size() < waitForResults) + results.add(queue.take()); + + set(results); + } + catch (InterruptedException e) + { + throw new AssertionError(); + } + } + } + + private static class ResultsCompletionFuture<T> extends AbstractFuture<Boolean> implements Runnable + { + private final int waitForResults; + private final BlockingQueue<T> queue; + + ResultsCompletionFuture(int waitForResponses, BlockingQueue<T> queue) + { + this.waitForResults = waitForResponses; + this.queue = queue; + } + + public void run() + { + try + { + for (int i = 0; i < waitForResults; i++) + { + queue.take(); + } + set(true); + } + catch (InterruptedException e) + { + throw new AssertionError(); + } + } + } + + private static class ResultAbsenceFuture<T> extends AbstractFuture<Boolean> implements Runnable + { + private final BlockingQueue<T> queue; + private final long time; + private final TimeUnit unit; + + ResultAbsenceFuture(BlockingQueue<T> queue, long time, TimeUnit unit) + { + this.queue = queue; + this.time = time; + this.unit = unit; + } + + public void run() + { + try + { + T result = queue.poll(time, unit); + if (result != null) + setException(new AssertionFailedError("Received unexpected message: " + result)); + else + set(true); + } + catch (InterruptedException e) + { + throw new AssertionError(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/7751588f/test/unit/org/apache/cassandra/utils/FreeRunningClock.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/utils/FreeRunningClock.java b/test/unit/org/apache/cassandra/utils/FreeRunningClock.java new file mode 100644 index 0000000..83c8db7 --- /dev/null +++ b/test/unit/org/apache/cassandra/utils/FreeRunningClock.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.utils; + +import java.util.concurrent.TimeUnit; + +/** + * A freely adjustable clock that can be used for unit testing. See {@link Clock#instance} how to + * enable this class. + */ +public class FreeRunningClock extends Clock +{ + private long nanoTime = 0; + + @Override + public long nanoTime() + { + return nanoTime; + } + + @Override + public long currentTimeMillis() + { + return TimeUnit.NANOSECONDS.toMillis(nanoTime()); + } + + public void advance(long time, TimeUnit unit) + { + nanoTime += unit.toNanos(time); + } +}