This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new cbf4dcb334 Enable EpochAwareDebounce to cancel in flight rpc requests
cbf4dcb334 is described below

commit cbf4dcb3345c7e2f42f6a897c66b6460b7acc2ca
Author: Sam Tunnicliffe <s...@apache.org>
AuthorDate: Fri Apr 12 14:04:06 2024 +0100

    Enable EpochAwareDebounce to cancel in flight rpc requests
    
    Patch by Sam Tunnicliffe; reviewed by Alex Petrov and Marcus Eriksson
    for CASSANDRA-19514
---
 .../apache/cassandra/tcm/EpochAwareDebounce.java   |  65 ++++++++--
 .../org/apache/cassandra/tcm/PeerLogFetcher.java   |  19 +--
 .../org/apache/cassandra/tcm/RemoteProcessor.java  |  38 ++++--
 .../cassandra/distributed/impl/Instance.java       |   5 +-
 .../distributed/test/log/CMSCatchupTest.java       |  71 +++++++++++
 .../test/log/FetchLogFromPeers2Test.java           | 134 ++++++++++++++++++++
 .../test/log/FetchLogFromPeersTest.java            | 139 +--------------------
 7 files changed, 306 insertions(+), 165 deletions(-)

diff --git a/src/java/org/apache/cassandra/tcm/EpochAwareDebounce.java 
b/src/java/org/apache/cassandra/tcm/EpochAwareDebounce.java
index 5621845487..f65c03d830 100644
--- a/src/java/org/apache/cassandra/tcm/EpochAwareDebounce.java
+++ b/src/java/org/apache/cassandra/tcm/EpochAwareDebounce.java
@@ -18,13 +18,23 @@
 
 package org.apache.cassandra.tcm;
 
-import java.util.concurrent.Callable;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.ExecutorFactory;
 import org.apache.cassandra.concurrent.ExecutorPlus;
+import org.apache.cassandra.tcm.log.LogState;
+import org.apache.cassandra.utils.ExecutorUtils;
 import org.apache.cassandra.utils.concurrent.AsyncPromise;
 import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.Promise;
 
 /**
  * When debouncing from a replica we know exactly which epoch we need, so to 
avoid retries we
@@ -32,12 +42,13 @@ import org.apache.cassandra.utils.concurrent.Future;
  * comes in, we create a new future. If a request for a newer epoch comes in, 
we simply
  * swap out the current future reference for a new one which is requesting the 
newer epoch.
  */
-public class EpochAwareDebounce<T>
+public class EpochAwareDebounce
 {
-    public static final EpochAwareDebounce<ClusterMetadata> instance = new 
EpochAwareDebounce<>();
-
-    private final AtomicReference<EpochAwareAsyncPromise<T>> currentFuture = 
new AtomicReference<>();
+    private static final Logger logger = 
LoggerFactory.getLogger(EpochAwareDebounce.class);
+    public static final EpochAwareDebounce instance = new EpochAwareDebounce();
+    private final AtomicReference<EpochAwareAsyncPromise> currentFuture = new 
AtomicReference<>();
     private final ExecutorPlus executor;
+    private final List<Promise<LogState>> inflightRequests = new 
CopyOnWriteArrayList<>();
 
     private EpochAwareDebounce()
     {
@@ -45,24 +56,50 @@ public class EpochAwareDebounce<T>
         this.executor = 
ExecutorFactory.Global.executorFactory().pooled("debounce", 2);
     }
 
-    public Future<T> getAsync(Callable<T> get, Epoch epoch)
+    /**
+     * Deduplicate requests to catch up log state based on the desired epoch. 
Callers supply a target epoch and
+     * a function obtain the ClusterMetadata that corresponds with it. It is 
expected that this function will make rpc
+     * calls to peers, retrieving a LogState which can be applied locally to 
produce the necessary {@code
+     * ClusterMetadata}. The function takes a {@code Promise<LogState>} as 
input, with the expectation that this
+     * specific instance will be used to provide blocking behaviour when 
making the rpc calls that fetch the {@code
+     * LogState}. These promises are memoized in order to cancel them when 
{@link #shutdownAndWait(long, TimeUnit)} is
+     * called. This causes the fetch function to stop waiting on any in flight 
{@code LogState} requests and prevents
+     * shutdown from being blocked.
+     *
+     * @param  fetchFunction executes the request for LogState. It's expected 
that this popluates fetchResult with the
+     *                       successful result.
+     * @param epoch the desired epoch
+     * @return
+     */
+    public Future<ClusterMetadata> getAsync(Function<Promise<LogState>, 
ClusterMetadata> fetchFunction,
+                                            Epoch epoch)
     {
         while (true)
         {
-            EpochAwareAsyncPromise<T> running = currentFuture.get();
+            EpochAwareAsyncPromise running = currentFuture.get();
             if (running != null && !running.isDone() && 
running.epoch.isEqualOrAfter(epoch))
                 return running;
 
-            EpochAwareAsyncPromise<T> promise = new 
EpochAwareAsyncPromise<>(epoch);
+            Promise<LogState> fetchResult = new AsyncPromise<>();
+
+            EpochAwareAsyncPromise promise = new EpochAwareAsyncPromise(epoch);
             if (currentFuture.compareAndSet(running, promise))
             {
+                fetchResult.addCallback((logState, error) -> {
+                    logger.debug("Removing future remotely requesting epoch {} 
from in flight list", epoch);
+                    inflightRequests.remove(fetchResult);
+                });
+                inflightRequests.add(fetchResult);
+
                 executor.submit(() -> {
                     try
                     {
-                        promise.setSuccess(get.call());
+                        promise.setSuccess(fetchFunction.apply(fetchResult));
                     }
                     catch (Throwable t)
                     {
+                        fetchResult.cancel(true);
+                        inflightRequests.remove(fetchResult);
                         promise.setFailure(t);
                     }
                 });
@@ -71,7 +108,7 @@ public class EpochAwareDebounce<T>
         }
     }
 
-    private static class EpochAwareAsyncPromise<T> extends AsyncPromise<T>
+    private static class EpochAwareAsyncPromise extends 
AsyncPromise<ClusterMetadata>
     {
         private final Epoch epoch;
         public EpochAwareAsyncPromise(Epoch epoch)
@@ -79,4 +116,12 @@ public class EpochAwareDebounce<T>
             this.epoch = epoch;
         }
     }
+
+    public void shutdownAndWait(long timeout, TimeUnit unit) throws 
InterruptedException, TimeoutException
+    {
+        logger.info("Cancelling {} in flight log fetch requests", 
inflightRequests.size());
+        for (Promise<LogState> toCancel : inflightRequests)
+            toCancel.cancel(true);
+        ExecutorUtils.shutdownAndWait(timeout, unit, executor);
+    }
 }
diff --git a/src/java/org/apache/cassandra/tcm/PeerLogFetcher.java 
b/src/java/org/apache/cassandra/tcm/PeerLogFetcher.java
index 29c7e6b1cb..61cbc63eac 100644
--- a/src/java/org/apache/cassandra/tcm/PeerLogFetcher.java
+++ b/src/java/org/apache/cassandra/tcm/PeerLogFetcher.java
@@ -22,6 +22,7 @@ import java.util.Collections;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,6 +36,7 @@ import org.apache.cassandra.tcm.log.LocalLog;
 import org.apache.cassandra.tcm.log.LogState;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.Promise;
 
 public class PeerLogFetcher
 {
@@ -72,10 +74,11 @@ public class PeerLogFetcher
 
     public Future<ClusterMetadata> asyncFetchLog(InetAddressAndPort remote, 
Epoch awaitAtleast)
     {
-        return EpochAwareDebounce.instance.getAsync(() -> 
fetchLogEntriesAndWaitInternal(remote, awaitAtleast), awaitAtleast);
+        Function<Promise<LogState>, ClusterMetadata> fn = promise -> 
fetchLogEntriesAndWaitInternal(promise, remote, awaitAtleast);
+        return EpochAwareDebounce.instance.getAsync(fn, awaitAtleast);
     }
 
-    private ClusterMetadata fetchLogEntriesAndWaitInternal(InetAddressAndPort 
remote, Epoch awaitAtleast)
+    private ClusterMetadata fetchLogEntriesAndWaitInternal(Promise<LogState> 
remoteRequest, InetAddressAndPort remote, Epoch awaitAtleast)
     {
         Epoch before = ClusterMetadata.current().epoch;
         if (before.isEqualOrAfter(awaitAtleast))
@@ -85,11 +88,13 @@ public class PeerLogFetcher
 
         try (Timer.Context ctx = 
TCMMetrics.instance.fetchPeerLogLatency.time())
         {
-            LogState logState = 
RemoteProcessor.sendWithCallback(Verb.TCM_FETCH_PEER_LOG_REQ,
-                                                                 new 
FetchPeerLog(before),
-                                                                 new 
RemoteProcessor.CandidateIterator(Collections.singletonList(remote)),
-                                                                 
Retry.Deadline.after(DatabaseDescriptor.getCmsAwaitTimeout().to(TimeUnit.NANOSECONDS),
-                                                                               
       new Retry.Jitter(TCMMetrics.instance.fetchLogRetries)));
+            RemoteProcessor.sendWithCallbackAsync(remoteRequest,
+                                                  Verb.TCM_FETCH_PEER_LOG_REQ,
+                                                  new FetchPeerLog(before),
+                                                  new 
RemoteProcessor.CandidateIterator(Collections.singletonList(remote)),
+                                                  
Retry.Deadline.after(DatabaseDescriptor.getCmsAwaitTimeout().to(TimeUnit.NANOSECONDS),
+                                                                       new 
Retry.Jitter(TCMMetrics.instance.fetchLogRetries)));
+            LogState logState = remoteRequest.awaitUninterruptibly().get();
             log.append(logState);
             ClusterMetadata fetched = log.waitForHighestConsecutive();
             if (fetched.epoch.isEqualOrAfter(awaitAtleast))
diff --git a/src/java/org/apache/cassandra/tcm/RemoteProcessor.java 
b/src/java/org/apache/cassandra/tcm/RemoteProcessor.java
index 260d151419..a647d4d8b0 100644
--- a/src/java/org/apache/cassandra/tcm/RemoteProcessor.java
+++ b/src/java/org/apache/cassandra/tcm/RemoteProcessor.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
 import java.util.function.Supplier;
 
 import org.slf4j.Logger;
@@ -49,6 +50,7 @@ import org.apache.cassandra.tcm.log.LogState;
 import org.apache.cassandra.utils.AbstractIterator;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.concurrent.AsyncPromise;
+import org.apache.cassandra.utils.concurrent.Future;
 import org.apache.cassandra.utils.concurrent.Promise;
 
 import static org.apache.cassandra.exceptions.ExceptionCode.SERVER_ERROR;
@@ -124,13 +126,19 @@ public final class RemoteProcessor implements Processor
     @Override
     public ClusterMetadata fetchLogAndWait(Epoch waitFor, Retry.Deadline 
retryPolicy)
     {
-        // Synchonous, non-debounced call if we are waiting for the highest 
epoch. Should be used sparingly.
+        Function<Promise<LogState>, ClusterMetadata> fetchFunction =
+            promise -> fetchLogAndWaitInternal(promise,
+                                               new 
CandidateIterator(candidates(true), false),
+                                               log);
+        // Synchonous, non-debounced call if we are waiting for the highest 
epoch (without knowing/caring what it is).
+        // Should be used sparingly.
         if (waitFor == null)
-            return fetchLogAndWaitInternal();
+            return fetchFunction.apply(new AsyncPromise<>());
 
         try
         {
-            return 
EpochAwareDebounce.instance.getAsync(this::fetchLogAndWaitInternal, 
waitFor).get(retryPolicy.remainingNanos(), TimeUnit.NANOSECONDS);
+            Future<ClusterMetadata> cmFuture = 
EpochAwareDebounce.instance.getAsync(fetchFunction, waitFor);
+            return cmFuture.get(retryPolicy.remainingNanos(), 
TimeUnit.NANOSECONDS);
         }
         catch (InterruptedException e)
         {
@@ -142,29 +150,37 @@ public final class RemoteProcessor implements Processor
         }
     }
 
-    private ClusterMetadata fetchLogAndWaitInternal()
+    public static ClusterMetadata fetchLogAndWait(CandidateIterator 
candidateIterator, LocalLog log)
     {
-        return fetchLogAndWait(new CandidateIterator(candidates(true), false), 
log);
+        Promise<LogState> remoteRequest = new AsyncPromise<>();
+        return fetchLogAndWaitInternal(remoteRequest, candidateIterator, log);
     }
 
-    public static ClusterMetadata fetchLogAndWait(CandidateIterator 
candidateIterator, LocalLog log)
+    private static ClusterMetadata fetchLogAndWaitInternal(Promise<LogState> 
remoteRequest,
+                                                           CandidateIterator 
candidates,
+                                                           LocalLog log)
     {
         try (Timer.Context ctx = TCMMetrics.instance.fetchCMSLogLatency.time())
         {
             Epoch currentEpoch = log.metadata().epoch;
-            LogState replay = sendWithCallback(Verb.TCM_FETCH_CMS_LOG_REQ,
-                                               new FetchCMSLog(currentEpoch, 
ClusterMetadataService.state() == REMOTE),
-                                               candidateIterator,
-                                               new 
Retry.Backoff(TCMMetrics.instance.fetchLogRetries));
+            sendWithCallbackAsync(remoteRequest,
+                                  Verb.TCM_FETCH_CMS_LOG_REQ,
+                                  new FetchCMSLog(currentEpoch, 
ClusterMetadataService.state() == REMOTE),
+                                  candidates,
+                                  new 
Retry.Backoff(TCMMetrics.instance.fetchLogRetries));
+            LogState replay = remoteRequest.awaitUninterruptibly().get();
             if (!replay.isEmpty())
             {
                 logger.info("Replay request returned replay data: {}", replay);
                 log.append(replay);
                 TCMMetrics.instance.cmsLogEntriesFetched(currentEpoch, 
replay.latestEpoch());
             }
-
             return log.waitForHighestConsecutive();
         }
+        catch (InterruptedException | ExecutionException e)
+        {
+            throw new RuntimeException(e);
+        }
     }
 
     // todo rename to send with retries or something
diff --git 
a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java 
b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index 25b2cb8703..a2b225ca0b 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -131,12 +131,14 @@ import org.apache.cassandra.service.paxos.PaxosRepair;
 import org.apache.cassandra.service.paxos.PaxosState;
 import org.apache.cassandra.service.paxos.uncommitted.UncommittedTableData;
 import org.apache.cassandra.service.reads.thresholds.CoordinatorWarnings;
+import org.apache.cassandra.service.snapshot.SnapshotManager;
 import org.apache.cassandra.streaming.StreamManager;
 import org.apache.cassandra.streaming.StreamReceiveTask;
 import org.apache.cassandra.streaming.StreamTransferTask;
 import org.apache.cassandra.streaming.async.NettyStreamingChannel;
 import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.EpochAwareDebounce;
 import org.apache.cassandra.tcm.Startup;
 import org.apache.cassandra.tcm.membership.NodeId;
 import org.apache.cassandra.tcm.membership.NodeState;
@@ -925,7 +927,8 @@ public class Instance extends IsolatedExecutor implements 
IInvokableInstance
                                 () -> SSTableReader.shutdownBlocking(1L, 
MINUTES),
                                 () -> 
shutdownAndWait(Collections.singletonList(ActiveRepairService.repairCommandExecutor())),
                                 () -> 
ActiveRepairService.instance().shutdownNowAndWait(1L, MINUTES),
-                                () -> 
org.apache.cassandra.service.snapshot.SnapshotManager.shutdownAndWait(1L, 
MINUTES)
+                                () -> SnapshotManager.shutdownAndWait(1L, 
MINUTES),
+                                () -> 
EpochAwareDebounce.instance.shutdownAndWait(1L, MINUTES)
             );
 
             internodeMessagingStarted = false;
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/log/CMSCatchupTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/log/CMSCatchupTest.java
new file mode 100644
index 0000000000..3e8cb9c297
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/log/CMSCatchupTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.log;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.tcm.sequences.AddToCMS;
+
+import static org.junit.Assert.assertTrue;
+
+public class CMSCatchupTest extends TestBaseImpl
+{
+    @Test
+    public void testCMSCatchup() throws Exception
+    {
+        try (Cluster cluster = init(builder().withNodes(4)
+                                             .withConfig(c -> 
c.with(Feature.NETWORK, Feature.GOSSIP)) // needed for addtocms below
+                                             .start()))
+        {
+            cluster.schemaChange(withKeyspace("create table %s.tbl (id int 
primary key)"));
+            cluster.get(2).runOnInstance(() -> AddToCMS.initiate());
+            cluster.get(3).runOnInstance(() -> AddToCMS.initiate());
+            // isolate node2 from the other CMS members to ensure it's behind
+            cluster.filters().inbound().from(1).to(2).drop();
+            cluster.filters().inbound().from(3).to(2).drop();
+            AtomicInteger fetchedFromPeer = new AtomicInteger();
+            cluster.filters().inbound().from(2).to(4).messagesMatching((from, 
to, msg) -> {
+                if (msg.verb() == Verb.TCM_FETCH_PEER_LOG_REQ.id)
+                    fetchedFromPeer.getAndIncrement();
+                return false;
+            }).drop().on();
+
+            long mark = cluster.get(4).logs().mark();
+            cluster.coordinator(1).execute(withKeyspace("alter table %s.tbl 
with comment='test 123'"), ConsistencyLevel.ONE);
+            cluster.get(4).logs().watchFor(mark, "AlterOptions");
+
+            mark = cluster.get(2).logs().mark();
+            cluster.get(1).shutdown().get();
+            cluster.get(2).logs().watchFor(mark, "/127.0.0.1:7012 state jump 
to shutdown");
+            // node2, a CMS member, is now behind and node1 is shut down.
+            // Try reading at QUORUM from node4, node2 should detect it's 
behind and catch up from node4
+            int before = fetchedFromPeer.get();
+            cluster.coordinator(4).execute(withKeyspace("select * from %s.tbl 
where id = 55"), ConsistencyLevel.QUORUM);
+            assertTrue(fetchedFromPeer.get() > before);
+        }
+    }
+
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/log/FetchLogFromPeers2Test.java
 
b/test/distributed/org/apache/cassandra/distributed/test/log/FetchLogFromPeers2Test.java
new file mode 100644
index 0000000000..d3b549d20f
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/log/FetchLogFromPeers2Test.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.log;
+
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import 
org.apache.cassandra.distributed.test.log.FetchLogFromPeersTest.ClusterState;
+import org.apache.cassandra.metrics.TCMMetrics;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.Epoch;
+
+import static 
org.apache.cassandra.distributed.test.log.FetchLogFromPeersTest.*;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class FetchLogFromPeers2Test extends TestBaseImpl
+{
+    @Test
+    public void testSchema() throws Exception
+    {
+        try (Cluster cluster = init(builder().withNodes(3)
+                                             .start()))
+        {
+            cluster.schemaChange(withKeyspace("alter keyspace %s with 
replication = {'class':'SimpleStrategy', 'replication_factor':3}"));
+            cluster.schemaChange(withKeyspace("create table %s.tbl (id int 
primary key)"));
+            cluster.schemaChange(withKeyspace("create table %s.tbl2 (id int 
primary key)"));
+
+            for (ClusterState clusterState : ClusterState.values())
+                for (Operation operation : Operation.values())
+                {
+                    setupSchemaBehind(cluster);
+                    runQuery(cluster, clusterState, operation);
+                }
+        }
+    }
+
+    public void runQuery(Cluster cluster, ClusterState clusterState, Operation 
operation) throws ExecutionException, InterruptedException
+    {
+        cluster.get(1).shutdown().get();
+
+        // node2 is behind
+        String query;
+        switch (operation)
+        {
+            case READ:
+                query = "select * from %s.tbl where id = 5";
+                break;
+            case WRITE:
+                query = "insert into %s.tbl (id) values (5)";
+                break;
+            default:
+                throw new IllegalStateException();
+        }
+        int coordinator = coordinator(clusterState);
+        long mark = cluster.get(2).logs().mark();
+        long metricsBefore = cluster.get(2).callOnInstance(() -> 
TCMMetrics.instance.fetchedPeerLogEntries.getCount());
+        if (clusterState == ClusterState.COORDINATOR_BEHIND)
+        {
+            long [] coordinatorBehindMetricsBefore = new long[cluster.size()];
+            try
+            {
+                for (int i = 1; i <= cluster.size(); i++)
+                    if (!cluster.get(i).isShutdown())
+                        coordinatorBehindMetricsBefore[i - 1] = 
cluster.get(i).callOnInstance(() -> 
TCMMetrics.instance.coordinatorBehindSchema.getCount());
+                cluster.coordinator(coordinator).execute(withKeyspace(query), 
ConsistencyLevel.QUORUM);
+                fail("should fail");
+            }
+            catch (Exception ignored) {}
+
+            boolean metricBumped = false;
+            for (int i = 1; i <= cluster.size(); i++)
+            {
+                if (i == coordinator || cluster.get(i).isShutdown())
+                    continue;
+                long metricAfter = cluster.get(i).callOnInstance(() -> 
TCMMetrics.instance.coordinatorBehindSchema.getCount());
+                if (metricAfter - coordinatorBehindMetricsBefore[i - 1] > 0)
+                {
+                    metricBumped = true;
+                    break;
+                }
+            }
+            assertTrue("Metric CoordinatorBehindSchema should have been bumped 
for at least one replica", metricBumped);
+
+        }
+        cluster.coordinator(coordinator).execute(withKeyspace(query), 
ConsistencyLevel.QUORUM);
+        assertTrue(cluster.get(2).logs().grep(mark, "Fetching log from 
/127.0.0.3:7012").getResult().size() > 0);
+        long metricsAfter = cluster.get(2).callOnInstance(() -> 
TCMMetrics.instance.fetchedPeerLogEntries.getCount());
+        assertTrue(metricsAfter > metricsBefore);
+
+        cluster.get(1).startup();
+    }
+
+    public void setupSchemaBehind(Cluster cluster)
+    {
+        cluster.filters().reset();
+        cluster.filters().inbound().from(1).to(2).drop();
+        long epochBefore = cluster.get(3).callOnInstance(() -> 
ClusterMetadata.current().epoch.getEpoch());
+        cluster.coordinator(1).execute(withKeyspace("alter table %s.tbl with 
comment='test " + UUID.randomUUID() + "'"), ConsistencyLevel.ONE);
+        cluster.get(3).runOnInstance(() -> {
+            try
+            {
+                
ClusterMetadataService.instance().awaitAtLeast(Epoch.create(epochBefore).nextEpoch());
+            }
+            catch (Exception e)
+            {
+                throw new RuntimeException(e);
+            }
+        });
+        cluster.filters().reset();
+    }
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/log/FetchLogFromPeersTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/log/FetchLogFromPeersTest.java
index fbcd080a98..6996f0c190 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/log/FetchLogFromPeersTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/log/FetchLogFromPeersTest.java
@@ -19,7 +19,6 @@
 package org.apache.cassandra.distributed.test.log;
 
 import java.util.UUID;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -28,7 +27,6 @@ import org.junit.Test;
 
 import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.distributed.api.ConsistencyLevel;
-import org.apache.cassandra.distributed.api.Feature;
 import org.apache.cassandra.distributed.api.IInstanceConfig;
 import org.apache.cassandra.distributed.api.IInvokableInstance;
 import org.apache.cassandra.distributed.api.IMessageFilters;
@@ -38,14 +36,12 @@ import org.apache.cassandra.distributed.shared.ClusterUtils;
 import org.apache.cassandra.distributed.shared.NetworkTopology;
 import org.apache.cassandra.distributed.test.TestBaseImpl;
 import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.metrics.TCMMetrics;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.Verb;
 import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.ClusterMetadataService;
 import org.apache.cassandra.tcm.Epoch;
 import org.apache.cassandra.tcm.log.LogState;
-import org.apache.cassandra.tcm.sequences.AddToCMS;
 import org.apache.cassandra.tcm.transformations.TriggerSnapshot;
 
 import static org.junit.Assert.assertTrue;
@@ -53,29 +49,10 @@ import static org.junit.Assert.fail;
 
 public class FetchLogFromPeersTest extends TestBaseImpl
 {
-    enum ClusterState { COORDINATOR_BEHIND, REPLICA_BEHIND }
-    enum Operation { READ, WRITE }
+    public enum ClusterState { COORDINATOR_BEHIND, REPLICA_BEHIND }
+    public enum Operation { READ, WRITE }
 
-    @Test
-    public void testSchema() throws Exception
-    {
-        try (Cluster cluster = init(builder().withNodes(3)
-                                             .start()))
-        {
-            cluster.schemaChange(withKeyspace("alter keyspace %s with 
replication = {'class':'SimpleStrategy', 'replication_factor':3}"));
-            cluster.schemaChange(withKeyspace("create table %s.tbl (id int 
primary key)"));
-            cluster.schemaChange(withKeyspace("create table %s.tbl2 (id int 
primary key)"));
-
-            for (ClusterState clusterState : ClusterState.values())
-                for (Operation operation : Operation.values())
-                {
-                    setupSchemaBehind(cluster);
-                    runQuery(cluster, clusterState, operation);
-                }
-        }
-    }
-
-    public int coordinator(ClusterState clusterState)
+    public static int coordinator(ClusterState clusterState)
     {
         switch (clusterState)
         {
@@ -87,81 +64,6 @@ public class FetchLogFromPeersTest extends TestBaseImpl
         throw new IllegalStateException();
     }
 
-    public void runQuery(Cluster cluster, ClusterState clusterState, Operation 
operation) throws ExecutionException, InterruptedException
-    {
-        cluster.get(1).shutdown().get();
-
-        // node2 is behind
-        String query;
-        switch (operation)
-        {
-            case READ:
-                query = "select * from %s.tbl where id = 5";
-                break;
-            case WRITE:
-                query = "insert into %s.tbl (id) values (5)";
-                break;
-            default:
-                throw new IllegalStateException();
-        }
-        int coordinator = coordinator(clusterState);
-        long mark = cluster.get(2).logs().mark();
-        long metricsBefore = cluster.get(2).callOnInstance(() -> 
TCMMetrics.instance.fetchedPeerLogEntries.getCount());
-        if (clusterState == ClusterState.COORDINATOR_BEHIND)
-        {
-            long [] coordinatorBehindMetricsBefore = new long[cluster.size()];
-            try
-            {
-                for (int i = 1; i <= cluster.size(); i++)
-                    if (!cluster.get(i).isShutdown())
-                        coordinatorBehindMetricsBefore[i - 1] = 
cluster.get(i).callOnInstance(() -> 
TCMMetrics.instance.coordinatorBehindSchema.getCount());
-                cluster.coordinator(coordinator).execute(withKeyspace(query), 
ConsistencyLevel.QUORUM);
-                fail("should fail");
-            }
-            catch (Exception ignored) {}
-
-            boolean metricBumped = false;
-            for (int i = 1; i <= cluster.size(); i++)
-            {
-                if (i == coordinator || cluster.get(i).isShutdown())
-                    continue;
-                long metricAfter = cluster.get(i).callOnInstance(() -> 
TCMMetrics.instance.coordinatorBehindSchema.getCount());
-                if (metricAfter - coordinatorBehindMetricsBefore[i - 1] > 0)
-                {
-                    metricBumped = true;
-                    break;
-                }
-            }
-            assertTrue("Metric CoordinatorBehindSchema should have been bumped 
for at least one replica", metricBumped);
-
-        }
-        cluster.coordinator(coordinator).execute(withKeyspace(query), 
ConsistencyLevel.QUORUM);
-        assertTrue(cluster.get(2).logs().grep(mark, "Fetching log from 
/127.0.0.3:7012").getResult().size() > 0);
-        long metricsAfter = cluster.get(2).callOnInstance(() -> 
TCMMetrics.instance.fetchedPeerLogEntries.getCount());
-        assertTrue(metricsAfter > metricsBefore);
-
-        cluster.get(1).startup();
-    }
-
-    public void setupSchemaBehind(Cluster cluster)
-    {
-        cluster.filters().reset();
-        cluster.filters().inbound().from(1).to(2).drop();
-        long epochBefore = cluster.get(3).callOnInstance(() -> 
ClusterMetadata.current().epoch.getEpoch());
-        cluster.coordinator(1).execute(withKeyspace("alter table %s.tbl with 
comment='test " + UUID.randomUUID() + "'"), ConsistencyLevel.ONE);
-        cluster.get(3).runOnInstance(() -> {
-            try
-            {
-                
ClusterMetadataService.instance().awaitAtLeast(Epoch.create(epochBefore).nextEpoch());
-            }
-            catch (Exception e)
-            {
-                throw new RuntimeException(e);
-            }
-        });
-        cluster.filters().reset();
-    }
-
     @Test
     public void catchupCoordinatorBehindTestPlacements() throws Exception
     {
@@ -263,40 +165,6 @@ public class FetchLogFromPeersTest extends TestBaseImpl
         }
     }
 
-    @Test
-    public void testCMSCatchupTest() throws Exception
-    {
-        try (Cluster cluster = init(builder().withNodes(4)
-                                             .withConfig(c -> 
c.with(Feature.NETWORK, Feature.GOSSIP)) // needed for addtocms below
-                                             .start()))
-        {
-            cluster.schemaChange(withKeyspace("create table %s.tbl (id int 
primary key)"));
-            cluster.get(2).runOnInstance(() -> AddToCMS.initiate());
-            cluster.get(3).runOnInstance(() -> AddToCMS.initiate());
-            // isolate node2 from the other CMS members to ensure it's behind
-            cluster.filters().inbound().from(1).to(2).drop();
-            cluster.filters().inbound().from(3).to(2).drop();
-            AtomicInteger fetchedFromPeer = new AtomicInteger();
-            cluster.filters().inbound().from(2).to(4).messagesMatching((from, 
to, msg) -> {
-                if (msg.verb() == Verb.TCM_FETCH_PEER_LOG_REQ.id)
-                    fetchedFromPeer.getAndIncrement();
-                return false;
-            }).drop().on();
-
-            long mark = cluster.get(4).logs().mark();
-            cluster.coordinator(1).execute(withKeyspace("alter table %s.tbl 
with comment='test 123'"), ConsistencyLevel.ONE);
-            cluster.get(4).logs().watchFor(mark, "AlterOptions");
-
-            mark = cluster.get(2).logs().mark();
-            cluster.get(1).shutdown().get();
-            cluster.get(2).logs().watchFor(mark, "/127.0.0.1:7012 state jump 
to shutdown");
-            // node2, a CMS member, is now behind and node1 is shut down.
-            // Try reading at QUORUM from node4, node2 should detect it's 
behind and catch up from node4
-            int before = fetchedFromPeer.get();
-            cluster.coordinator(4).execute(withKeyspace("select * from %s.tbl 
where id = 55"), ConsistencyLevel.QUORUM);
-            assertTrue(fetchedFromPeer.get() > before);
-        }
-    }
 
     @Test
     public void catchupWithSnapshot() throws Exception
@@ -370,7 +238,6 @@ public class FetchLogFromPeersTest extends TestBaseImpl
         }
     }
 
-
     private static void executeAlters(Cluster cluster)
     {
         for (int i = 0; i < 10; i++)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org


Reply via email to