This is an automated email from the ASF dual-hosted git repository. samt pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new cbf4dcb334 Enable EpochAwareDebounce to cancel in flight rpc requests cbf4dcb334 is described below commit cbf4dcb3345c7e2f42f6a897c66b6460b7acc2ca Author: Sam Tunnicliffe <s...@apache.org> AuthorDate: Fri Apr 12 14:04:06 2024 +0100 Enable EpochAwareDebounce to cancel in flight rpc requests Patch by Sam Tunnicliffe; reviewed by Alex Petrov and Marcus Eriksson for CASSANDRA-19514 --- .../apache/cassandra/tcm/EpochAwareDebounce.java | 65 ++++++++-- .../org/apache/cassandra/tcm/PeerLogFetcher.java | 19 +-- .../org/apache/cassandra/tcm/RemoteProcessor.java | 38 ++++-- .../cassandra/distributed/impl/Instance.java | 5 +- .../distributed/test/log/CMSCatchupTest.java | 71 +++++++++++ .../test/log/FetchLogFromPeers2Test.java | 134 ++++++++++++++++++++ .../test/log/FetchLogFromPeersTest.java | 139 +-------------------- 7 files changed, 306 insertions(+), 165 deletions(-) diff --git a/src/java/org/apache/cassandra/tcm/EpochAwareDebounce.java b/src/java/org/apache/cassandra/tcm/EpochAwareDebounce.java index 5621845487..f65c03d830 100644 --- a/src/java/org/apache/cassandra/tcm/EpochAwareDebounce.java +++ b/src/java/org/apache/cassandra/tcm/EpochAwareDebounce.java @@ -18,13 +18,23 @@ package org.apache.cassandra.tcm; -import java.util.concurrent.Callable; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.ExecutorFactory; import org.apache.cassandra.concurrent.ExecutorPlus; +import org.apache.cassandra.tcm.log.LogState; +import org.apache.cassandra.utils.ExecutorUtils; import org.apache.cassandra.utils.concurrent.AsyncPromise; import org.apache.cassandra.utils.concurrent.Future; +import org.apache.cassandra.utils.concurrent.Promise; /** * When debouncing from a replica we know exactly which epoch we need, so to avoid retries we @@ -32,12 +42,13 @@ import org.apache.cassandra.utils.concurrent.Future; * comes in, we create a new future. If a request for a newer epoch comes in, we simply * swap out the current future reference for a new one which is requesting the newer epoch. */ -public class EpochAwareDebounce<T> +public class EpochAwareDebounce { - public static final EpochAwareDebounce<ClusterMetadata> instance = new EpochAwareDebounce<>(); - - private final AtomicReference<EpochAwareAsyncPromise<T>> currentFuture = new AtomicReference<>(); + private static final Logger logger = LoggerFactory.getLogger(EpochAwareDebounce.class); + public static final EpochAwareDebounce instance = new EpochAwareDebounce(); + private final AtomicReference<EpochAwareAsyncPromise> currentFuture = new AtomicReference<>(); private final ExecutorPlus executor; + private final List<Promise<LogState>> inflightRequests = new CopyOnWriteArrayList<>(); private EpochAwareDebounce() { @@ -45,24 +56,50 @@ public class EpochAwareDebounce<T> this.executor = ExecutorFactory.Global.executorFactory().pooled("debounce", 2); } - public Future<T> getAsync(Callable<T> get, Epoch epoch) + /** + * Deduplicate requests to catch up log state based on the desired epoch. Callers supply a target epoch and + * a function obtain the ClusterMetadata that corresponds with it. It is expected that this function will make rpc + * calls to peers, retrieving a LogState which can be applied locally to produce the necessary {@code + * ClusterMetadata}. The function takes a {@code Promise<LogState>} as input, with the expectation that this + * specific instance will be used to provide blocking behaviour when making the rpc calls that fetch the {@code + * LogState}. These promises are memoized in order to cancel them when {@link #shutdownAndWait(long, TimeUnit)} is + * called. This causes the fetch function to stop waiting on any in flight {@code LogState} requests and prevents + * shutdown from being blocked. + * + * @param fetchFunction executes the request for LogState. It's expected that this popluates fetchResult with the + * successful result. + * @param epoch the desired epoch + * @return + */ + public Future<ClusterMetadata> getAsync(Function<Promise<LogState>, ClusterMetadata> fetchFunction, + Epoch epoch) { while (true) { - EpochAwareAsyncPromise<T> running = currentFuture.get(); + EpochAwareAsyncPromise running = currentFuture.get(); if (running != null && !running.isDone() && running.epoch.isEqualOrAfter(epoch)) return running; - EpochAwareAsyncPromise<T> promise = new EpochAwareAsyncPromise<>(epoch); + Promise<LogState> fetchResult = new AsyncPromise<>(); + + EpochAwareAsyncPromise promise = new EpochAwareAsyncPromise(epoch); if (currentFuture.compareAndSet(running, promise)) { + fetchResult.addCallback((logState, error) -> { + logger.debug("Removing future remotely requesting epoch {} from in flight list", epoch); + inflightRequests.remove(fetchResult); + }); + inflightRequests.add(fetchResult); + executor.submit(() -> { try { - promise.setSuccess(get.call()); + promise.setSuccess(fetchFunction.apply(fetchResult)); } catch (Throwable t) { + fetchResult.cancel(true); + inflightRequests.remove(fetchResult); promise.setFailure(t); } }); @@ -71,7 +108,7 @@ public class EpochAwareDebounce<T> } } - private static class EpochAwareAsyncPromise<T> extends AsyncPromise<T> + private static class EpochAwareAsyncPromise extends AsyncPromise<ClusterMetadata> { private final Epoch epoch; public EpochAwareAsyncPromise(Epoch epoch) @@ -79,4 +116,12 @@ public class EpochAwareDebounce<T> this.epoch = epoch; } } + + public void shutdownAndWait(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException + { + logger.info("Cancelling {} in flight log fetch requests", inflightRequests.size()); + for (Promise<LogState> toCancel : inflightRequests) + toCancel.cancel(true); + ExecutorUtils.shutdownAndWait(timeout, unit, executor); + } } diff --git a/src/java/org/apache/cassandra/tcm/PeerLogFetcher.java b/src/java/org/apache/cassandra/tcm/PeerLogFetcher.java index 29c7e6b1cb..61cbc63eac 100644 --- a/src/java/org/apache/cassandra/tcm/PeerLogFetcher.java +++ b/src/java/org/apache/cassandra/tcm/PeerLogFetcher.java @@ -22,6 +22,7 @@ import java.util.Collections; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Function; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,6 +36,7 @@ import org.apache.cassandra.tcm.log.LocalLog; import org.apache.cassandra.tcm.log.LogState; import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.concurrent.Future; +import org.apache.cassandra.utils.concurrent.Promise; public class PeerLogFetcher { @@ -72,10 +74,11 @@ public class PeerLogFetcher public Future<ClusterMetadata> asyncFetchLog(InetAddressAndPort remote, Epoch awaitAtleast) { - return EpochAwareDebounce.instance.getAsync(() -> fetchLogEntriesAndWaitInternal(remote, awaitAtleast), awaitAtleast); + Function<Promise<LogState>, ClusterMetadata> fn = promise -> fetchLogEntriesAndWaitInternal(promise, remote, awaitAtleast); + return EpochAwareDebounce.instance.getAsync(fn, awaitAtleast); } - private ClusterMetadata fetchLogEntriesAndWaitInternal(InetAddressAndPort remote, Epoch awaitAtleast) + private ClusterMetadata fetchLogEntriesAndWaitInternal(Promise<LogState> remoteRequest, InetAddressAndPort remote, Epoch awaitAtleast) { Epoch before = ClusterMetadata.current().epoch; if (before.isEqualOrAfter(awaitAtleast)) @@ -85,11 +88,13 @@ public class PeerLogFetcher try (Timer.Context ctx = TCMMetrics.instance.fetchPeerLogLatency.time()) { - LogState logState = RemoteProcessor.sendWithCallback(Verb.TCM_FETCH_PEER_LOG_REQ, - new FetchPeerLog(before), - new RemoteProcessor.CandidateIterator(Collections.singletonList(remote)), - Retry.Deadline.after(DatabaseDescriptor.getCmsAwaitTimeout().to(TimeUnit.NANOSECONDS), - new Retry.Jitter(TCMMetrics.instance.fetchLogRetries))); + RemoteProcessor.sendWithCallbackAsync(remoteRequest, + Verb.TCM_FETCH_PEER_LOG_REQ, + new FetchPeerLog(before), + new RemoteProcessor.CandidateIterator(Collections.singletonList(remote)), + Retry.Deadline.after(DatabaseDescriptor.getCmsAwaitTimeout().to(TimeUnit.NANOSECONDS), + new Retry.Jitter(TCMMetrics.instance.fetchLogRetries))); + LogState logState = remoteRequest.awaitUninterruptibly().get(); log.append(logState); ClusterMetadata fetched = log.waitForHighestConsecutive(); if (fetched.epoch.isEqualOrAfter(awaitAtleast)) diff --git a/src/java/org/apache/cassandra/tcm/RemoteProcessor.java b/src/java/org/apache/cassandra/tcm/RemoteProcessor.java index 260d151419..a647d4d8b0 100644 --- a/src/java/org/apache/cassandra/tcm/RemoteProcessor.java +++ b/src/java/org/apache/cassandra/tcm/RemoteProcessor.java @@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Function; import java.util.function.Supplier; import org.slf4j.Logger; @@ -49,6 +50,7 @@ import org.apache.cassandra.tcm.log.LogState; import org.apache.cassandra.utils.AbstractIterator; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.concurrent.AsyncPromise; +import org.apache.cassandra.utils.concurrent.Future; import org.apache.cassandra.utils.concurrent.Promise; import static org.apache.cassandra.exceptions.ExceptionCode.SERVER_ERROR; @@ -124,13 +126,19 @@ public final class RemoteProcessor implements Processor @Override public ClusterMetadata fetchLogAndWait(Epoch waitFor, Retry.Deadline retryPolicy) { - // Synchonous, non-debounced call if we are waiting for the highest epoch. Should be used sparingly. + Function<Promise<LogState>, ClusterMetadata> fetchFunction = + promise -> fetchLogAndWaitInternal(promise, + new CandidateIterator(candidates(true), false), + log); + // Synchonous, non-debounced call if we are waiting for the highest epoch (without knowing/caring what it is). + // Should be used sparingly. if (waitFor == null) - return fetchLogAndWaitInternal(); + return fetchFunction.apply(new AsyncPromise<>()); try { - return EpochAwareDebounce.instance.getAsync(this::fetchLogAndWaitInternal, waitFor).get(retryPolicy.remainingNanos(), TimeUnit.NANOSECONDS); + Future<ClusterMetadata> cmFuture = EpochAwareDebounce.instance.getAsync(fetchFunction, waitFor); + return cmFuture.get(retryPolicy.remainingNanos(), TimeUnit.NANOSECONDS); } catch (InterruptedException e) { @@ -142,29 +150,37 @@ public final class RemoteProcessor implements Processor } } - private ClusterMetadata fetchLogAndWaitInternal() + public static ClusterMetadata fetchLogAndWait(CandidateIterator candidateIterator, LocalLog log) { - return fetchLogAndWait(new CandidateIterator(candidates(true), false), log); + Promise<LogState> remoteRequest = new AsyncPromise<>(); + return fetchLogAndWaitInternal(remoteRequest, candidateIterator, log); } - public static ClusterMetadata fetchLogAndWait(CandidateIterator candidateIterator, LocalLog log) + private static ClusterMetadata fetchLogAndWaitInternal(Promise<LogState> remoteRequest, + CandidateIterator candidates, + LocalLog log) { try (Timer.Context ctx = TCMMetrics.instance.fetchCMSLogLatency.time()) { Epoch currentEpoch = log.metadata().epoch; - LogState replay = sendWithCallback(Verb.TCM_FETCH_CMS_LOG_REQ, - new FetchCMSLog(currentEpoch, ClusterMetadataService.state() == REMOTE), - candidateIterator, - new Retry.Backoff(TCMMetrics.instance.fetchLogRetries)); + sendWithCallbackAsync(remoteRequest, + Verb.TCM_FETCH_CMS_LOG_REQ, + new FetchCMSLog(currentEpoch, ClusterMetadataService.state() == REMOTE), + candidates, + new Retry.Backoff(TCMMetrics.instance.fetchLogRetries)); + LogState replay = remoteRequest.awaitUninterruptibly().get(); if (!replay.isEmpty()) { logger.info("Replay request returned replay data: {}", replay); log.append(replay); TCMMetrics.instance.cmsLogEntriesFetched(currentEpoch, replay.latestEpoch()); } - return log.waitForHighestConsecutive(); } + catch (InterruptedException | ExecutionException e) + { + throw new RuntimeException(e); + } } // todo rename to send with retries or something diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java index 25b2cb8703..a2b225ca0b 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java @@ -131,12 +131,14 @@ import org.apache.cassandra.service.paxos.PaxosRepair; import org.apache.cassandra.service.paxos.PaxosState; import org.apache.cassandra.service.paxos.uncommitted.UncommittedTableData; import org.apache.cassandra.service.reads.thresholds.CoordinatorWarnings; +import org.apache.cassandra.service.snapshot.SnapshotManager; import org.apache.cassandra.streaming.StreamManager; import org.apache.cassandra.streaming.StreamReceiveTask; import org.apache.cassandra.streaming.StreamTransferTask; import org.apache.cassandra.streaming.async.NettyStreamingChannel; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.ClusterMetadataService; +import org.apache.cassandra.tcm.EpochAwareDebounce; import org.apache.cassandra.tcm.Startup; import org.apache.cassandra.tcm.membership.NodeId; import org.apache.cassandra.tcm.membership.NodeState; @@ -925,7 +927,8 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance () -> SSTableReader.shutdownBlocking(1L, MINUTES), () -> shutdownAndWait(Collections.singletonList(ActiveRepairService.repairCommandExecutor())), () -> ActiveRepairService.instance().shutdownNowAndWait(1L, MINUTES), - () -> org.apache.cassandra.service.snapshot.SnapshotManager.shutdownAndWait(1L, MINUTES) + () -> SnapshotManager.shutdownAndWait(1L, MINUTES), + () -> EpochAwareDebounce.instance.shutdownAndWait(1L, MINUTES) ); internodeMessagingStarted = false; diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/CMSCatchupTest.java b/test/distributed/org/apache/cassandra/distributed/test/log/CMSCatchupTest.java new file mode 100644 index 0000000000..3e8cb9c297 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/log/CMSCatchupTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.log; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; + +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.net.Verb; +import org.apache.cassandra.tcm.sequences.AddToCMS; + +import static org.junit.Assert.assertTrue; + +public class CMSCatchupTest extends TestBaseImpl +{ + @Test + public void testCMSCatchup() throws Exception + { + try (Cluster cluster = init(builder().withNodes(4) + .withConfig(c -> c.with(Feature.NETWORK, Feature.GOSSIP)) // needed for addtocms below + .start())) + { + cluster.schemaChange(withKeyspace("create table %s.tbl (id int primary key)")); + cluster.get(2).runOnInstance(() -> AddToCMS.initiate()); + cluster.get(3).runOnInstance(() -> AddToCMS.initiate()); + // isolate node2 from the other CMS members to ensure it's behind + cluster.filters().inbound().from(1).to(2).drop(); + cluster.filters().inbound().from(3).to(2).drop(); + AtomicInteger fetchedFromPeer = new AtomicInteger(); + cluster.filters().inbound().from(2).to(4).messagesMatching((from, to, msg) -> { + if (msg.verb() == Verb.TCM_FETCH_PEER_LOG_REQ.id) + fetchedFromPeer.getAndIncrement(); + return false; + }).drop().on(); + + long mark = cluster.get(4).logs().mark(); + cluster.coordinator(1).execute(withKeyspace("alter table %s.tbl with comment='test 123'"), ConsistencyLevel.ONE); + cluster.get(4).logs().watchFor(mark, "AlterOptions"); + + mark = cluster.get(2).logs().mark(); + cluster.get(1).shutdown().get(); + cluster.get(2).logs().watchFor(mark, "/127.0.0.1:7012 state jump to shutdown"); + // node2, a CMS member, is now behind and node1 is shut down. + // Try reading at QUORUM from node4, node2 should detect it's behind and catch up from node4 + int before = fetchedFromPeer.get(); + cluster.coordinator(4).execute(withKeyspace("select * from %s.tbl where id = 55"), ConsistencyLevel.QUORUM); + assertTrue(fetchedFromPeer.get() > before); + } + } + +} diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/FetchLogFromPeers2Test.java b/test/distributed/org/apache/cassandra/distributed/test/log/FetchLogFromPeers2Test.java new file mode 100644 index 0000000000..d3b549d20f --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/log/FetchLogFromPeers2Test.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.log; + +import java.util.UUID; +import java.util.concurrent.ExecutionException; + +import org.junit.Test; + +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.distributed.test.log.FetchLogFromPeersTest.ClusterState; +import org.apache.cassandra.metrics.TCMMetrics; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.ClusterMetadataService; +import org.apache.cassandra.tcm.Epoch; + +import static org.apache.cassandra.distributed.test.log.FetchLogFromPeersTest.*; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class FetchLogFromPeers2Test extends TestBaseImpl +{ + @Test + public void testSchema() throws Exception + { + try (Cluster cluster = init(builder().withNodes(3) + .start())) + { + cluster.schemaChange(withKeyspace("alter keyspace %s with replication = {'class':'SimpleStrategy', 'replication_factor':3}")); + cluster.schemaChange(withKeyspace("create table %s.tbl (id int primary key)")); + cluster.schemaChange(withKeyspace("create table %s.tbl2 (id int primary key)")); + + for (ClusterState clusterState : ClusterState.values()) + for (Operation operation : Operation.values()) + { + setupSchemaBehind(cluster); + runQuery(cluster, clusterState, operation); + } + } + } + + public void runQuery(Cluster cluster, ClusterState clusterState, Operation operation) throws ExecutionException, InterruptedException + { + cluster.get(1).shutdown().get(); + + // node2 is behind + String query; + switch (operation) + { + case READ: + query = "select * from %s.tbl where id = 5"; + break; + case WRITE: + query = "insert into %s.tbl (id) values (5)"; + break; + default: + throw new IllegalStateException(); + } + int coordinator = coordinator(clusterState); + long mark = cluster.get(2).logs().mark(); + long metricsBefore = cluster.get(2).callOnInstance(() -> TCMMetrics.instance.fetchedPeerLogEntries.getCount()); + if (clusterState == ClusterState.COORDINATOR_BEHIND) + { + long [] coordinatorBehindMetricsBefore = new long[cluster.size()]; + try + { + for (int i = 1; i <= cluster.size(); i++) + if (!cluster.get(i).isShutdown()) + coordinatorBehindMetricsBefore[i - 1] = cluster.get(i).callOnInstance(() -> TCMMetrics.instance.coordinatorBehindSchema.getCount()); + cluster.coordinator(coordinator).execute(withKeyspace(query), ConsistencyLevel.QUORUM); + fail("should fail"); + } + catch (Exception ignored) {} + + boolean metricBumped = false; + for (int i = 1; i <= cluster.size(); i++) + { + if (i == coordinator || cluster.get(i).isShutdown()) + continue; + long metricAfter = cluster.get(i).callOnInstance(() -> TCMMetrics.instance.coordinatorBehindSchema.getCount()); + if (metricAfter - coordinatorBehindMetricsBefore[i - 1] > 0) + { + metricBumped = true; + break; + } + } + assertTrue("Metric CoordinatorBehindSchema should have been bumped for at least one replica", metricBumped); + + } + cluster.coordinator(coordinator).execute(withKeyspace(query), ConsistencyLevel.QUORUM); + assertTrue(cluster.get(2).logs().grep(mark, "Fetching log from /127.0.0.3:7012").getResult().size() > 0); + long metricsAfter = cluster.get(2).callOnInstance(() -> TCMMetrics.instance.fetchedPeerLogEntries.getCount()); + assertTrue(metricsAfter > metricsBefore); + + cluster.get(1).startup(); + } + + public void setupSchemaBehind(Cluster cluster) + { + cluster.filters().reset(); + cluster.filters().inbound().from(1).to(2).drop(); + long epochBefore = cluster.get(3).callOnInstance(() -> ClusterMetadata.current().epoch.getEpoch()); + cluster.coordinator(1).execute(withKeyspace("alter table %s.tbl with comment='test " + UUID.randomUUID() + "'"), ConsistencyLevel.ONE); + cluster.get(3).runOnInstance(() -> { + try + { + ClusterMetadataService.instance().awaitAtLeast(Epoch.create(epochBefore).nextEpoch()); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + }); + cluster.filters().reset(); + } +} diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/FetchLogFromPeersTest.java b/test/distributed/org/apache/cassandra/distributed/test/log/FetchLogFromPeersTest.java index fbcd080a98..6996f0c190 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/log/FetchLogFromPeersTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/log/FetchLogFromPeersTest.java @@ -19,7 +19,6 @@ package org.apache.cassandra.distributed.test.log; import java.util.UUID; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -28,7 +27,6 @@ import org.junit.Test; import org.apache.cassandra.distributed.Cluster; import org.apache.cassandra.distributed.api.ConsistencyLevel; -import org.apache.cassandra.distributed.api.Feature; import org.apache.cassandra.distributed.api.IInstanceConfig; import org.apache.cassandra.distributed.api.IInvokableInstance; import org.apache.cassandra.distributed.api.IMessageFilters; @@ -38,14 +36,12 @@ import org.apache.cassandra.distributed.shared.ClusterUtils; import org.apache.cassandra.distributed.shared.NetworkTopology; import org.apache.cassandra.distributed.test.TestBaseImpl; import org.apache.cassandra.locator.InetAddressAndPort; -import org.apache.cassandra.metrics.TCMMetrics; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.Verb; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.ClusterMetadataService; import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.tcm.log.LogState; -import org.apache.cassandra.tcm.sequences.AddToCMS; import org.apache.cassandra.tcm.transformations.TriggerSnapshot; import static org.junit.Assert.assertTrue; @@ -53,29 +49,10 @@ import static org.junit.Assert.fail; public class FetchLogFromPeersTest extends TestBaseImpl { - enum ClusterState { COORDINATOR_BEHIND, REPLICA_BEHIND } - enum Operation { READ, WRITE } + public enum ClusterState { COORDINATOR_BEHIND, REPLICA_BEHIND } + public enum Operation { READ, WRITE } - @Test - public void testSchema() throws Exception - { - try (Cluster cluster = init(builder().withNodes(3) - .start())) - { - cluster.schemaChange(withKeyspace("alter keyspace %s with replication = {'class':'SimpleStrategy', 'replication_factor':3}")); - cluster.schemaChange(withKeyspace("create table %s.tbl (id int primary key)")); - cluster.schemaChange(withKeyspace("create table %s.tbl2 (id int primary key)")); - - for (ClusterState clusterState : ClusterState.values()) - for (Operation operation : Operation.values()) - { - setupSchemaBehind(cluster); - runQuery(cluster, clusterState, operation); - } - } - } - - public int coordinator(ClusterState clusterState) + public static int coordinator(ClusterState clusterState) { switch (clusterState) { @@ -87,81 +64,6 @@ public class FetchLogFromPeersTest extends TestBaseImpl throw new IllegalStateException(); } - public void runQuery(Cluster cluster, ClusterState clusterState, Operation operation) throws ExecutionException, InterruptedException - { - cluster.get(1).shutdown().get(); - - // node2 is behind - String query; - switch (operation) - { - case READ: - query = "select * from %s.tbl where id = 5"; - break; - case WRITE: - query = "insert into %s.tbl (id) values (5)"; - break; - default: - throw new IllegalStateException(); - } - int coordinator = coordinator(clusterState); - long mark = cluster.get(2).logs().mark(); - long metricsBefore = cluster.get(2).callOnInstance(() -> TCMMetrics.instance.fetchedPeerLogEntries.getCount()); - if (clusterState == ClusterState.COORDINATOR_BEHIND) - { - long [] coordinatorBehindMetricsBefore = new long[cluster.size()]; - try - { - for (int i = 1; i <= cluster.size(); i++) - if (!cluster.get(i).isShutdown()) - coordinatorBehindMetricsBefore[i - 1] = cluster.get(i).callOnInstance(() -> TCMMetrics.instance.coordinatorBehindSchema.getCount()); - cluster.coordinator(coordinator).execute(withKeyspace(query), ConsistencyLevel.QUORUM); - fail("should fail"); - } - catch (Exception ignored) {} - - boolean metricBumped = false; - for (int i = 1; i <= cluster.size(); i++) - { - if (i == coordinator || cluster.get(i).isShutdown()) - continue; - long metricAfter = cluster.get(i).callOnInstance(() -> TCMMetrics.instance.coordinatorBehindSchema.getCount()); - if (metricAfter - coordinatorBehindMetricsBefore[i - 1] > 0) - { - metricBumped = true; - break; - } - } - assertTrue("Metric CoordinatorBehindSchema should have been bumped for at least one replica", metricBumped); - - } - cluster.coordinator(coordinator).execute(withKeyspace(query), ConsistencyLevel.QUORUM); - assertTrue(cluster.get(2).logs().grep(mark, "Fetching log from /127.0.0.3:7012").getResult().size() > 0); - long metricsAfter = cluster.get(2).callOnInstance(() -> TCMMetrics.instance.fetchedPeerLogEntries.getCount()); - assertTrue(metricsAfter > metricsBefore); - - cluster.get(1).startup(); - } - - public void setupSchemaBehind(Cluster cluster) - { - cluster.filters().reset(); - cluster.filters().inbound().from(1).to(2).drop(); - long epochBefore = cluster.get(3).callOnInstance(() -> ClusterMetadata.current().epoch.getEpoch()); - cluster.coordinator(1).execute(withKeyspace("alter table %s.tbl with comment='test " + UUID.randomUUID() + "'"), ConsistencyLevel.ONE); - cluster.get(3).runOnInstance(() -> { - try - { - ClusterMetadataService.instance().awaitAtLeast(Epoch.create(epochBefore).nextEpoch()); - } - catch (Exception e) - { - throw new RuntimeException(e); - } - }); - cluster.filters().reset(); - } - @Test public void catchupCoordinatorBehindTestPlacements() throws Exception { @@ -263,40 +165,6 @@ public class FetchLogFromPeersTest extends TestBaseImpl } } - @Test - public void testCMSCatchupTest() throws Exception - { - try (Cluster cluster = init(builder().withNodes(4) - .withConfig(c -> c.with(Feature.NETWORK, Feature.GOSSIP)) // needed for addtocms below - .start())) - { - cluster.schemaChange(withKeyspace("create table %s.tbl (id int primary key)")); - cluster.get(2).runOnInstance(() -> AddToCMS.initiate()); - cluster.get(3).runOnInstance(() -> AddToCMS.initiate()); - // isolate node2 from the other CMS members to ensure it's behind - cluster.filters().inbound().from(1).to(2).drop(); - cluster.filters().inbound().from(3).to(2).drop(); - AtomicInteger fetchedFromPeer = new AtomicInteger(); - cluster.filters().inbound().from(2).to(4).messagesMatching((from, to, msg) -> { - if (msg.verb() == Verb.TCM_FETCH_PEER_LOG_REQ.id) - fetchedFromPeer.getAndIncrement(); - return false; - }).drop().on(); - - long mark = cluster.get(4).logs().mark(); - cluster.coordinator(1).execute(withKeyspace("alter table %s.tbl with comment='test 123'"), ConsistencyLevel.ONE); - cluster.get(4).logs().watchFor(mark, "AlterOptions"); - - mark = cluster.get(2).logs().mark(); - cluster.get(1).shutdown().get(); - cluster.get(2).logs().watchFor(mark, "/127.0.0.1:7012 state jump to shutdown"); - // node2, a CMS member, is now behind and node1 is shut down. - // Try reading at QUORUM from node4, node2 should detect it's behind and catch up from node4 - int before = fetchedFromPeer.get(); - cluster.coordinator(4).execute(withKeyspace("select * from %s.tbl where id = 55"), ConsistencyLevel.QUORUM); - assertTrue(fetchedFromPeer.get() > before); - } - } @Test public void catchupWithSnapshot() throws Exception @@ -370,7 +238,6 @@ public class FetchLogFromPeersTest extends TestBaseImpl } } - private static void executeAlters(Cluster cluster) { for (int i = 0; i < 10; i++) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org