Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 c4e6cd2a1 -> c4982587b
  refs/heads/cassandra-3.11 aed1b5fdf -> c3a66ab1e
  refs/heads/trunk dfddeb44c -> d500100a4


Create MessagingService mocking classes

Patch by Stefan Podkowinski; reviewed by Tyler Hobbs for CASSANDRA-12016


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1e478d38
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1e478d38
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1e478d38

Branch: refs/heads/cassandra-3.0
Commit: 1e478d380bd497a3fe635b35b6e87e002cabf617
Parents: c4e6cd2
Author: Stefan Podkowinski <s.podkowin...@gmail.com>
Authored: Tue Jul 12 12:04:23 2016 -0500
Committer: Aleksey Yeshchenko <alek...@apple.com>
Committed: Tue Jul 3 14:16:25 2018 +0100

----------------------------------------------------------------------
 .../apache/cassandra/gms/FailureDetector.java   |   9 +-
 .../apache/cassandra/net/MessagingService.java  |   5 +
 .../cassandra/service/ActiveRepairService.java  |   3 +-
 .../org/apache/cassandra/utils/ExpiringMap.java |   4 +-
 test/unit/org/apache/cassandra/net/Matcher.java |  32 +++
 .../apache/cassandra/net/MatcherResponse.java   | 210 +++++++++++++++++
 .../cassandra/net/MockMessagingService.java     | 144 ++++++++++++
 .../cassandra/net/MockMessagingServiceTest.java |  96 ++++++++
 .../apache/cassandra/net/MockMessagingSpy.java  | 234 +++++++++++++++++++
 9 files changed, 730 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e478d38/src/java/org/apache/cassandra/gms/FailureDetector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/FailureDetector.java 
b/src/java/org/apache/cassandra/gms/FailureDetector.java
index 70a354c..cda6469 100644
--- a/src/java/org/apache/cassandra/gms/FailureDetector.java
+++ b/src/java/org/apache/cassandra/gms/FailureDetector.java
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.Clock;
 import org.apache.cassandra.utils.FBUtilities;
 
 /**
@@ -52,7 +53,7 @@ public class FailureDetector implements IFailureDetector, 
FailureDetectorMBean
     private static final int DEBUG_PERCENTAGE = 80; // if the phi is larger 
than this percentage of the max, log a debug message
     private static final long DEFAULT_MAX_PAUSE = 5000L * 1000000L; // 5 
seconds
     private static final long MAX_LOCAL_PAUSE_IN_NANOS = getMaxLocalPause();
-    private long lastInterpret = System.nanoTime();
+    private long lastInterpret = Clock.instance.nanoTime();
     private long lastPause = 0L;
 
     private static long getMaxLocalPause()
@@ -252,7 +253,7 @@ public class FailureDetector implements IFailureDetector, 
FailureDetectorMBean
 
     public void report(InetAddress ep)
     {
-        long now = System.nanoTime();
+        long now = Clock.instance.nanoTime();
         ArrivalWindow heartbeatWindow = arrivalSamples.get(ep);
         if (heartbeatWindow == null)
         {
@@ -279,7 +280,7 @@ public class FailureDetector implements IFailureDetector, 
FailureDetectorMBean
         {
             return;
         }
-        long now = System.nanoTime();
+        long now = Clock.instance.nanoTime();
         long diff = now - lastInterpret;
         lastInterpret = now;
         if (diff > MAX_LOCAL_PAUSE_IN_NANOS)
@@ -288,7 +289,7 @@ public class FailureDetector implements IFailureDetector, 
FailureDetectorMBean
             lastPause = now;
             return;
         }
-        if (System.nanoTime() - lastPause < MAX_LOCAL_PAUSE_IN_NANOS)
+        if (Clock.instance.nanoTime() - lastPause < MAX_LOCAL_PAUSE_IN_NANOS)
         {
             logger.debug("Still not marking nodes down due to local pause");
             return;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e478d38/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java 
b/src/java/org/apache/cassandra/net/MessagingService.java
index 047f51f..67c3ba9 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -389,6 +389,11 @@ public final class MessagingService implements 
MessagingServiceMBean
         messageSinks.add(sink);
     }
 
+    public void removeMessageSink(IMessageSink sink)
+    {
+        messageSinks.remove(sink);
+    }
+
     public void clearMessageSinks()
     {
         messageSinks.clear();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e478d38/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java 
b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index adb6fab..f63cb86 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -65,6 +65,7 @@ import org.apache.cassandra.repair.RepairParallelism;
 import org.apache.cassandra.repair.RepairSession;
 import org.apache.cassandra.repair.messages.*;
 import org.apache.cassandra.utils.CassandraVersion;
+import org.apache.cassandra.utils.Clock;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.UUIDGen;
 import org.apache.cassandra.utils.concurrent.Ref;
@@ -282,7 +283,7 @@ public class ActiveRepairService implements 
IEndpointStateChangeSubscriber, IFai
 
     public UUID prepareForRepair(UUID parentRepairSession, InetAddress 
coordinator, Set<InetAddress> endpoints, RepairOption options, 
List<ColumnFamilyStore> columnFamilyStores)
     {
-        long timestamp = System.currentTimeMillis();
+        long timestamp = Clock.instance.currentTimeMillis();
         registerParentRepairSession(parentRepairSession, coordinator, 
columnFamilyStores, options.getRanges(), options.isIncremental(), timestamp, 
options.isGlobal());
         final CountDownLatch prepareLatch = new 
CountDownLatch(endpoints.size());
         final AtomicBoolean status = new AtomicBoolean(true);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e478d38/src/java/org/apache/cassandra/utils/ExpiringMap.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/ExpiringMap.java 
b/src/java/org/apache/cassandra/utils/ExpiringMap.java
index 8359918..ef013f5 100644
--- a/src/java/org/apache/cassandra/utils/ExpiringMap.java
+++ b/src/java/org/apache/cassandra/utils/ExpiringMap.java
@@ -48,7 +48,7 @@ public class ExpiringMap<K, V>
             assert value != null;
             this.value = value;
             this.timeout = timeout;
-            this.createdAt = System.nanoTime();
+            this.createdAt = Clock.instance.nanoTime();
         }
 
         private boolean isReadyToDieAt(long atNano)
@@ -85,7 +85,7 @@ public class ExpiringMap<K, V>
         {
             public void run()
             {
-                long start = System.nanoTime();
+                long start = Clock.instance.nanoTime();
                 int n = 0;
                 for (Map.Entry<K, CacheableObject<V>> entry : cache.entrySet())
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e478d38/test/unit/org/apache/cassandra/net/Matcher.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/Matcher.java 
b/test/unit/org/apache/cassandra/net/Matcher.java
new file mode 100644
index 0000000..cd1b667
--- /dev/null
+++ b/test/unit/org/apache/cassandra/net/Matcher.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.net;
+
+import java.net.InetAddress;
+
+/**
+ * Predicate based on intercepted, outgoing messange and the message's 
destination address.
+ */
+public interface Matcher<T>
+{
+    /**
+     * @param obj intercepted outgoing message
+     * @param to  destination address
+     */
+    public boolean matches(MessageOut<T> obj, InetAddress to);
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e478d38/test/unit/org/apache/cassandra/net/MatcherResponse.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/MatcherResponse.java 
b/test/unit/org/apache/cassandra/net/MatcherResponse.java
new file mode 100644
index 0000000..12a8d1b
--- /dev/null
+++ b/test/unit/org/apache/cassandra/net/MatcherResponse.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.net;
+
+import java.net.InetAddress;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import org.apache.cassandra.utils.Clock;
+
+/**
+ * Sends a response for an incoming message with a matching {@link Matcher}.
+ * The actual behavior by any instance of this class can be inspected by
+ * interacting with the returned {@link MockMessagingSpy}.
+ */
+public class MatcherResponse
+{
+    private final Matcher<?> matcher;
+    private final Set<Integer> sendResponses = new HashSet<>();
+    private final MockMessagingSpy spy = new MockMessagingSpy();
+    private final AtomicInteger limitCounter = new 
AtomicInteger(Integer.MAX_VALUE);
+    private IMessageSink sink;
+
+    MatcherResponse(Matcher<?> matcher)
+    {
+        this.matcher = matcher;
+    }
+
+    /**
+     * Do not create any responses for intercepted outbound messages.
+     */
+    public MockMessagingSpy dontReply()
+    {
+        return respond((MessageIn<?>)null);
+    }
+
+    /**
+     * Respond with provided message in reply to each intercepted outbound 
message.
+     * @param message   the message to use as mock reply from the cluster
+     */
+    public MockMessagingSpy respond(MessageIn<?> message)
+    {
+        return respondN(message, Integer.MAX_VALUE);
+    }
+
+    /**
+     * Respond a limited number of times with the provided message in reply to 
each intercepted outbound message.
+     * @param response  the message to use as mock reply from the cluster
+     * @param limit     number of times to respond with message
+     */
+    public MockMessagingSpy respondN(final MessageIn<?> response, int limit)
+    {
+        return respondN((in, to) -> response, limit);
+    }
+
+    /**
+     * Respond with the message created by the provided function that will be 
called with each intercepted outbound message.
+     * @param fnResponse    function to call for creating reply based on 
intercepted message and target address
+     */
+    public <T, S> MockMessagingSpy respond(BiFunction<MessageOut<T>, 
InetAddress, MessageIn<S>> fnResponse)
+    {
+        return respondN(fnResponse, Integer.MAX_VALUE);
+    }
+
+    /**
+     * Respond with message wrapping the payload object created by provided 
function called for each intercepted outbound message.
+     * The target address from the intercepted message will automatically be 
used as the created message's sender address.
+     * @param fnResponse    function to call for creating payload object based 
on intercepted message and target address
+     * @param verb          verb to use for reply message
+     */
+    public <T, S> MockMessagingSpy 
respondWithPayloadForEachReceiver(Function<MessageOut<T>, S> fnResponse, 
MessagingService.Verb verb)
+    {
+        return respondNWithPayloadForEachReceiver(fnResponse, verb, 
Integer.MAX_VALUE);
+    }
+
+    /**
+     * Respond a limited number of times with message wrapping the payload 
object created by provided function called for
+     * each intercepted outbound message. The target address from the 
intercepted message will automatically be used as the
+     * created message's sender address.
+     * @param fnResponse    function to call for creating payload object based 
on intercepted message and target address
+     * @param verb          verb to use for reply message
+     */
+    public <T, S> MockMessagingSpy 
respondNWithPayloadForEachReceiver(Function<MessageOut<T>, S> fnResponse, 
MessagingService.Verb verb, int limit)
+    {
+        return respondN((MessageOut<T> msg, InetAddress to) -> {
+                    S payload = fnResponse.apply(msg);
+                    if (payload == null)
+                        return null;
+                    else
+                        return MessageIn.create(to, payload, 
Collections.emptyMap(), verb, MessagingService.current_version);
+                },
+                limit);
+    }
+
+    /**
+     * Responds to each intercepted outbound message by creating a response 
message wrapping the next element consumed
+     * from the provided queue. No reply will be send when the queue has been 
exhausted.
+     * @param cannedResponses   prepared payload messages to use for responses
+     * @param verb              verb to use for reply message
+     */
+    public <T, S> MockMessagingSpy respondWithPayloadForEachReceiver(Queue<S> 
cannedResponses, MessagingService.Verb verb)
+    {
+        return respondWithPayloadForEachReceiver((MessageOut<T> msg) -> 
cannedResponses.poll(), verb);
+    }
+
+    /**
+     * Responds to each intercepted outbound message by creating a response 
message wrapping the next element consumed
+     * from the provided queue. This method will block until queue elements 
are available.
+     * @param cannedResponses   prepared payload messages to use for responses
+     * @param verb              verb to use for reply message
+     */
+    public <T, S> MockMessagingSpy 
respondWithPayloadForEachReceiver(BlockingQueue<S> cannedResponses, 
MessagingService.Verb verb)
+    {
+        return respondWithPayloadForEachReceiver((MessageOut<T> msg) -> {
+            try
+            {
+                return cannedResponses.take();
+            }
+            catch (InterruptedException e)
+            {
+                return null;
+            }
+        }, verb);
+    }
+
+    /**
+     * Respond a limited number of times with the message created by the 
provided function that will be called with
+     * each intercepted outbound message.
+     * @param fnResponse    function to call for creating reply based on 
intercepted message and target address
+     */
+    public <T, S> MockMessagingSpy respondN(BiFunction<MessageOut<T>, 
InetAddress, MessageIn<S>> fnResponse, int limit)
+    {
+        limitCounter.set(limit);
+
+        assert sink == null: "destroy() must be called first to register new 
response";
+
+        sink = new IMessageSink()
+        {
+            public boolean allowOutgoingMessage(MessageOut message, int id, 
InetAddress to)
+            {
+                // prevent outgoing message from being send in case matcher 
indicates a match
+                // and instead send the mocked response
+                if (matcher.matches(message, to))
+                {
+                    spy.matchingMessage(message);
+
+                    if (limitCounter.decrementAndGet() < 0)
+                        return false;
+
+                    synchronized (sendResponses)
+                    {
+                        // I'm not sure about retry semantics regarding 
message/ID relationships, but I assume
+                        // sending a message multiple times using the same ID 
shouldn't happen..
+                        assert !sendResponses.contains(id) : "ID re-use for 
outgoing message";
+                        sendResponses.add(id);
+                    }
+                    MessageIn<?> response = fnResponse.apply(message, to);
+                    if (response != null)
+                    {
+                        CallbackInfo cb = 
MessagingService.instance().getRegisteredCallback(id);
+                        if (cb != null)
+                            cb.callback.response(response);
+                        else
+                            MessagingService.instance().receive(response, id, 
Clock.instance.currentTimeMillis(), false);
+                        spy.matchingResponse(response);
+                    }
+                    return false;
+                }
+                return true;
+            }
+
+            public boolean allowIncomingMessage(MessageIn message, int id)
+            {
+                return true;
+            }
+        };
+        MessagingService.instance().addMessageSink(sink);
+
+        return spy;
+    }
+
+    /**
+     * Stops currently registered response from being send.
+     */
+    public void destroy()
+    {
+        MessagingService.instance().removeMessageSink(sink);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e478d38/test/unit/org/apache/cassandra/net/MockMessagingService.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/MockMessagingService.java 
b/test/unit/org/apache/cassandra/net/MockMessagingService.java
new file mode 100644
index 0000000..0412759
--- /dev/null
+++ b/test/unit/org/apache/cassandra/net/MockMessagingService.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.net;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.function.Predicate;
+
+/**
+ * Starting point for mocking {@link MessagingService} interactions. Outgoing 
messages can be
+ * intercepted by first creating a {@link MatcherResponse} by calling {@link 
MockMessagingService#when(Matcher)}.
+ * Alternatively {@link Matcher}s can be created by using helper methods such 
as {@link #to(InetAddress)},
+ * {@link #verb(MessagingService.Verb)} or {@link #payload(Predicate)} and may 
also be
+ * nested using {@link MockMessagingService#all(Matcher[])} or {@link 
MockMessagingService#any(Matcher[])}.
+ * After each test, {@link MockMessagingService#cleanup()} must be called for 
free listeners registered
+ * in {@link MessagingService}.
+ */
+public class MockMessagingService
+{
+
+    private MockMessagingService()
+    {
+    }
+
+    /**
+     * Creates a MatcherResponse based on specified matcher.
+     */
+    public static MatcherResponse when(Matcher matcher)
+    {
+        return new MatcherResponse(matcher);
+    }
+
+    /**
+     * Unsubscribes any handlers added by calling {@link 
MessagingService#addMessageSink(IMessageSink)}.
+     * This should be called after each test.
+     */
+    public static void cleanup()
+    {
+        MessagingService.instance().clearMessageSinks();
+    }
+
+    /**
+     * Creates a matcher that will indicate if the target address of the 
outgoing message equals the
+     * provided address.
+     */
+    public static Matcher<InetAddress> to(String address)
+    {
+        try
+        {
+            return to(InetAddress.getByName(address));
+        }
+        catch (UnknownHostException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Creates a matcher that will indicate if the target address of the 
outgoing message equals the
+     * provided address.
+     */
+    public static Matcher<InetAddress> to(InetAddress address)
+    {
+        return (in, to) -> to == address || to.equals(address);
+    }
+
+    /**
+     * Creates a matcher that will indicate if the verb of the outgoing 
message equals the
+     * provided value.
+     */
+    public static Matcher<MessagingService.Verb> verb(MessagingService.Verb 
verb)
+    {
+        return (in, to) -> in.verb == verb;
+    }
+
+    /**
+     * Creates a matcher based on the result of the provided predicate called 
with the outgoing message.
+     */
+    public static <T> Matcher<T> message(Predicate<MessageOut<T>> fn)
+    {
+        return (msg, to) -> fn.test(msg);
+    }
+
+    /**
+     * Creates a matcher based on the result of the provided predicate called 
with the outgoing message's payload.
+     */
+    public static <T> Matcher<T> payload(Predicate<T> fn)
+    {
+        return (msg, to) -> fn.test(msg.payload);
+    }
+
+    /**
+     * Inverts boolean result of wrapped matcher.
+     */
+    public static <T> Matcher<T> not(Matcher<T> matcher)
+    {
+        return (o, to) -> !matcher.matches(o, to);
+    }
+
+    /**
+     * Indicates true in case all provided matchers returned true.
+     */
+    public static <T> Matcher<?> all(Matcher<?>... matchers)
+    {
+        return (MessageOut<T> out, InetAddress to) -> {
+            for (Matcher matcher : matchers)
+            {
+                if (!matcher.matches(out, to))
+                    return false;
+            }
+            return true;
+        };
+    }
+
+    /**
+     * Indicates true in case at least a single provided matcher returned true.
+     */
+    public static <T> Matcher<?> any(Matcher<?>... matchers)
+    {
+        return (MessageOut<T> out, InetAddress to) -> {
+            for (Matcher matcher : matchers)
+            {
+                if (matcher.matches(out, to))
+                    return true;
+            }
+            return false;
+        };
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e478d38/test/unit/org/apache/cassandra/net/MockMessagingServiceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/MockMessagingServiceTest.java 
b/test/unit/org/apache/cassandra/net/MockMessagingServiceTest.java
new file mode 100644
index 0000000..ed4cce8
--- /dev/null
+++ b/test/unit/org/apache/cassandra/net/MockMessagingServiceTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.net;
+
+import java.util.Collections;
+import java.util.concurrent.ExecutionException;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.gms.EchoMessage;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.apache.cassandra.net.MockMessagingService.all;
+import static org.apache.cassandra.net.MockMessagingService.to;
+import static org.apache.cassandra.net.MockMessagingService.verb;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class MockMessagingServiceTest
+{
+    @BeforeClass
+    public static void initCluster() throws ConfigurationException
+    {
+        SchemaLoader.prepareServer();
+        StorageService.instance.initServer();
+    }
+
+    @Before
+    public void cleanup()
+    {
+        MockMessagingService.cleanup();
+    }
+
+    @Test
+    public void testRequestResponse() throws InterruptedException, 
ExecutionException
+    {
+        // echo message that we like to mock as incoming reply for outgoing 
echo message
+        MessageIn<EchoMessage> echoMessageIn = 
MessageIn.create(FBUtilities.getBroadcastAddress(),
+                EchoMessage.instance,
+                Collections.emptyMap(),
+                MessagingService.Verb.ECHO,
+                MessagingService.current_version
+        );
+        MockMessagingSpy spy = MockMessagingService
+                .when(
+                        all(
+                                to(FBUtilities.getBroadcastAddress()),
+                                verb(MessagingService.Verb.ECHO)
+                        )
+                )
+                .respond(echoMessageIn);
+
+        MessageOut<EchoMessage> echoMessageOut = new 
MessageOut<>(MessagingService.Verb.ECHO, EchoMessage.instance, 
EchoMessage.serializer);
+        MessagingService.instance().sendRR(echoMessageOut, 
FBUtilities.getBroadcastAddress(), new IAsyncCallback()
+        {
+            public void response(MessageIn msg)
+            {
+                assertEquals(MessagingService.Verb.ECHO, msg.verb);
+                assertEquals(echoMessageIn.payload, msg.payload);
+            }
+
+            public boolean isLatencyForSnitch()
+            {
+                return false;
+            }
+        });
+
+        // we must have intercepted the outgoing message at this point
+        MessageOut<?> msg = spy.captureMessageOut().get();
+        assertEquals(1, spy.messagesIntercepted);
+        assertTrue(msg == echoMessageOut);
+
+        // and return a mocked response
+        assertEquals(1, spy.mockedMessageResponses);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e478d38/test/unit/org/apache/cassandra/net/MockMessagingSpy.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/MockMessagingSpy.java 
b/test/unit/org/apache/cassandra/net/MockMessagingSpy.java
new file mode 100644
index 0000000..80bdb39
--- /dev/null
+++ b/test/unit/org/apache/cassandra/net/MockMessagingSpy.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.net;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.util.concurrent.AbstractFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import junit.framework.AssertionFailedError;
+
+/**
+ * Allows inspecting the behavior of mocked messaging by observing {@link 
MatcherResponse}.
+ */
+public class MockMessagingSpy
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(MockMessagingSpy.class);
+
+    public int messagesIntercepted = 0;
+    public int mockedMessageResponses = 0;
+
+    private final BlockingQueue<MessageOut<?>> interceptedMessages = new 
LinkedBlockingQueue<>();
+    private final BlockingQueue<MessageIn<?>> deliveredResponses = new 
LinkedBlockingQueue<>();
+
+    private static final Executor executor = 
Executors.newSingleThreadExecutor();
+
+    /**
+     * Returns a future with the first mocked incoming message that has been 
created and delivered.
+     */
+    public ListenableFuture<MessageIn<?>> captureMockedMessageIn()
+    {
+        return Futures.transform(captureMockedMessageInN(1), 
(List<MessageIn<?>> result) -> result.isEmpty() ? null : result.get(0));
+    }
+
+    /**
+     * Returns a future with the specified number mocked incoming messages 
that have been created and delivered.
+     */
+    public ListenableFuture<List<MessageIn<?>>> captureMockedMessageInN(int 
noOfMessages)
+    {
+        CapturedResultsFuture<MessageIn<?>> ret = new 
CapturedResultsFuture<>(noOfMessages, deliveredResponses);
+        executor.execute(ret);
+        return ret;
+    }
+
+    /**
+     * Returns a future that will indicate if a mocked incoming message has 
been created and delivered.
+     */
+    public ListenableFuture<Boolean> expectMockedMessageIn()
+    {
+        return expectMockedMessageIn(1);
+    }
+
+    /**
+     * Returns a future that will indicate if the specified number of mocked 
incoming message have been created and delivered.
+     */
+    public ListenableFuture<Boolean> expectMockedMessageIn(int noOfMessages)
+    {
+        ResultsCompletionFuture<MessageIn<?>> ret = new 
ResultsCompletionFuture<>(noOfMessages, deliveredResponses);
+        executor.execute(ret);
+        return ret;
+    }
+
+    /**
+     * Returns a future with the first intercepted outbound message that would 
have been send.
+     */
+    public ListenableFuture<MessageOut<?>> captureMessageOut()
+    {
+        return Futures.transform(captureMessageOut(1), (List<MessageOut<?>> 
result) -> result.isEmpty() ? null : result.get(0));
+    }
+
+    /**
+     * Returns a future with the specified number of intercepted outbound 
messages that would have been send.
+     */
+    public ListenableFuture<List<MessageOut<?>>> captureMessageOut(int 
noOfMessages)
+    {
+        CapturedResultsFuture<MessageOut<?>> ret = new 
CapturedResultsFuture<>(noOfMessages, interceptedMessages);
+        executor.execute(ret);
+        return ret;
+    }
+
+    /**
+     * Returns a future that will indicate if an intercepted outbound messages 
would have been send.
+     */
+    public ListenableFuture<Boolean> interceptMessageOut()
+    {
+        return interceptMessageOut(1);
+    }
+
+    /**
+     * Returns a future that will indicate if the specified number of 
intercepted outbound messages would have been send.
+     */
+    public ListenableFuture<Boolean> interceptMessageOut(int noOfMessages)
+    {
+        ResultsCompletionFuture<MessageOut<?>> ret = new 
ResultsCompletionFuture<>(noOfMessages, interceptedMessages);
+        executor.execute(ret);
+        return ret;
+    }
+
+    /**
+     * Returns a future that will indicate the absence of any intercepted 
outbound messages with the specifed period.
+     */
+    public ListenableFuture<Boolean> interceptNoMsg(long time, TimeUnit unit)
+    {
+        ResultAbsenceFuture<MessageOut<?>> ret = new 
ResultAbsenceFuture<>(interceptedMessages, time, unit);
+        executor.execute(ret);
+        return ret;
+    }
+
+    void matchingMessage(MessageOut<?> message)
+    {
+        messagesIntercepted++;
+        logger.trace("Received matching message: {}", message);
+        interceptedMessages.add(message);
+    }
+
+    void matchingResponse(MessageIn<?> response)
+    {
+        mockedMessageResponses++;
+        logger.trace("Responding to intercepted message: {}", response);
+        deliveredResponses.add(response);
+    }
+
+
+    private static class CapturedResultsFuture<T> extends 
AbstractFuture<List<T>> implements Runnable
+    {
+        private final int waitForResults;
+        private final List<T> results;
+        private final BlockingQueue<T> queue;
+
+        CapturedResultsFuture(int waitForResponses, BlockingQueue<T> queue)
+        {
+            this.waitForResults = waitForResponses;
+            results = new ArrayList<T>(waitForResponses);
+            this.queue = queue;
+        }
+
+        public void run()
+        {
+            try
+            {
+                while (results.size() < waitForResults)
+                    results.add(queue.take());
+
+                set(results);
+            }
+            catch (InterruptedException e)
+            {
+                throw new AssertionError();
+            }
+        }
+    }
+
+    private static class ResultsCompletionFuture<T> extends 
AbstractFuture<Boolean> implements Runnable
+    {
+        private final int waitForResults;
+        private final BlockingQueue<T> queue;
+
+        ResultsCompletionFuture(int waitForResponses, BlockingQueue<T> queue)
+        {
+            this.waitForResults = waitForResponses;
+            this.queue = queue;
+        }
+
+        public void run()
+        {
+            try
+            {
+                for (int i = 0; i < waitForResults; i++)
+                {
+                    queue.take();
+                }
+                set(true);
+            }
+            catch (InterruptedException e)
+            {
+                throw new AssertionError();
+            }
+        }
+    }
+
+    private static class ResultAbsenceFuture<T> extends 
AbstractFuture<Boolean> implements Runnable
+    {
+        private final BlockingQueue<T> queue;
+        private final long time;
+        private final TimeUnit unit;
+
+        ResultAbsenceFuture(BlockingQueue<T> queue, long time, TimeUnit unit)
+        {
+            this.queue = queue;
+            this.time = time;
+            this.unit = unit;
+        }
+
+        public void run()
+        {
+            try
+            {
+                T result = queue.poll(time, unit);
+                if (result != null)
+                    setException(new AssertionFailedError("Received unexpected 
message: " + result));
+                else
+                    set(true);
+            }
+            catch (InterruptedException e)
+            {
+                throw new AssertionError();
+            }
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to