This is an automated email from the ASF dual-hosted git repository. marcuse pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new 219d209 Include finalized pending sstables in preview repair 219d209 is described below commit 219d209651759cf702519a100c4f43595f7be8d7 Author: Marcus Eriksson <marc...@apache.org> AuthorDate: Wed Feb 5 12:51:47 2020 +0100 Include finalized pending sstables in preview repair Patch by marcuse; reviewed by Blake Eggleston and David Capwell for CASSANDRA-15553 --- CHANGES.txt | 1 + .../db/compaction/PendingRepairManager.java | 2 +- .../db/repair/CassandraValidationIterator.java | 24 +- .../db/streaming/CassandraStreamManager.java | 17 +- .../org/apache/cassandra/repair/RepairSession.java | 43 +++- .../cassandra/repair/consistent/LocalSessions.java | 23 +- .../cassandra/service/ActiveRepairService.java | 5 +- .../apache/cassandra/streaming/PreviewKind.java | 49 +++- .../cassandra/distributed/impl/Instance.java | 2 + .../distributed/test/PreviewRepairTest.java | 281 +++++++++++++++++++++ 10 files changed, 396 insertions(+), 51 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 19906d3..9cd6040 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0-alpha4 + * Include finalized pending sstables in preview repair (CASSANDRA-15553) * Reverted to the original behavior of CLUSTERING ORDER on CREATE TABLE (CASSANDRA-15271) * Correct inaccurate logging message (CASSANDRA-15549) * Add documentation of dynamo (CASSANDRA-15486) diff --git a/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java b/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java index b2d70f7..78d4483 100644 --- a/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java +++ b/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java @@ -449,7 +449,7 @@ class PendingRepairManager } else { - logger.debug("Setting repairedAt to {} on {} for {}", repairedAt, transaction.originals(), sessionID); + logger.info("Moving {} from pending to repaired with repaired at = {} and session id = {}", transaction.originals(), repairedAt, sessionID); cfs.getCompactionStrategyManager().mutateRepaired(transaction.originals(), repairedAt, ActiveRepairService.NO_PENDING_REPAIR, false); } completed = true; diff --git a/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java b/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java index d653f6c..4eea678 100644 --- a/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java +++ b/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java @@ -30,7 +30,6 @@ import java.util.function.LongPredicate; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; import com.google.common.collect.Maps; import org.slf4j.Logger; @@ -39,10 +38,8 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.compaction.AbstractCompactionStrategy; -import org.apache.cassandra.db.compaction.ActiveCompactions; import org.apache.cassandra.db.compaction.ActiveCompactionsTracker; import org.apache.cassandra.db.compaction.CompactionController; -import org.apache.cassandra.db.compaction.CompactionInterruptedException; import org.apache.cassandra.db.compaction.CompactionIterator; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.compaction.OperationType; @@ -54,14 +51,10 @@ import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.sstable.ISSTableScanner; import org.apache.cassandra.io.sstable.format.SSTableReader; -import org.apache.cassandra.metrics.CompactionMetrics; import org.apache.cassandra.repair.ValidationPartitionIterator; -import org.apache.cassandra.repair.Validator; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.streaming.PreviewKind; -import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.UUIDGen; import org.apache.cassandra.utils.concurrent.Refs; @@ -115,21 +108,6 @@ public class CassandraValidationIterator extends ValidationPartitionIterator } } - private static Predicate<SSTableReader> getPreviewPredicate(PreviewKind previewKind) - { - switch (previewKind) - { - case ALL: - return (s) -> true; - case REPAIRED: - return (s) -> s.isRepaired(); - case UNREPAIRED: - return (s) -> !s.isRepaired(); - default: - throw new RuntimeException("Can't get preview predicate for preview kind " + previewKind); - } - } - @VisibleForTesting static synchronized Refs<SSTableReader> getSSTablesToValidate(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, UUID parentId, boolean isIncremental) { @@ -147,7 +125,7 @@ public class CassandraValidationIterator extends ValidationPartitionIterator com.google.common.base.Predicate<SSTableReader> predicate; if (prs.isPreview()) { - predicate = getPreviewPredicate(prs.previewKind); + predicate = prs.previewKind.predicate(); } else if (isIncremental) diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java b/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java index b88a5d6..a84fd27 100644 --- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java +++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java @@ -81,21 +81,6 @@ public class CassandraStreamManager implements TableStreamManager return new CassandraStreamReceiver(cfs, session, totalStreams); } - private static Predicate<SSTableReader> getPreviewPredicate(PreviewKind kind) - { - switch (kind) - { - case ALL: - return Predicates.alwaysTrue(); - case UNREPAIRED: - return Predicates.not(SSTableReader::isRepaired); - case REPAIRED: - return SSTableReader::isRepaired; - default: - throw new IllegalArgumentException("Unsupported kind: " + kind); - } - } - @Override public Collection<OutgoingStream> createOutgoingStreams(StreamSession session, RangesAtEndpoint replicas, UUID pendingRepair, PreviewKind previewKind) { @@ -111,7 +96,7 @@ public class CassandraStreamManager implements TableStreamManager Predicate<SSTableReader> predicate; if (previewKind.isPreview()) { - predicate = getPreviewPredicate(previewKind); + predicate = previewKind.predicate(); } else if (pendingRepair == ActiveRepairService.NO_PENDING_REPAIR) { diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java index 3483e59..95a6e57 100644 --- a/src/java/org/apache/cassandra/repair/RepairSession.java +++ b/src/java/org/apache/cassandra/repair/RepairSession.java @@ -31,10 +31,16 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.gms.*; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.repair.consistent.ConsistentSession; +import org.apache.cassandra.repair.consistent.LocalSession; +import org.apache.cassandra.repair.consistent.LocalSessions; +import org.apache.cassandra.schema.TableId; import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.streaming.SessionSummary; import org.apache.cassandra.tracing.Tracing; @@ -78,7 +84,8 @@ import org.apache.cassandra.utils.Pair; * all of them in parallel otherwise. */ public class RepairSession extends AbstractFuture<RepairSessionResult> implements IEndpointStateChangeSubscriber, - IFailureDetectionEventListener + IFailureDetectionEventListener, + LocalSessions.Listener { private static Logger logger = LoggerFactory.getLogger(RepairSession.class); @@ -401,4 +408,38 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement // If a node failed, we stop everything (though there could still be some activity in the background) forceShutdown(exception); } + + public void onIRStateChange(LocalSession session) + { + // we should only be registered as listeners for PreviewKind.REPAIRED, but double check here + if (previewKind == PreviewKind.REPAIRED && + session.getState() == ConsistentSession.State.FINALIZED && + includesTables(session.tableIds)) + { + for (Range<Token> range : session.ranges) + { + if (range.intersects(ranges())) + { + logger.error("{} An intersecting incremental repair with session id = {} finished, preview repair might not be accurate", previewKind.logPrefix(getId()), session.sessionID); + forceShutdown(new Exception("An incremental repair with session id "+session.sessionID+" finished during this preview repair runtime")); + return; + } + } + } + } + + private boolean includesTables(Set<TableId> tableIds) + { + Keyspace ks = Keyspace.open(keyspace); + if (ks != null) + { + for (String table : cfnames) + { + ColumnFamilyStore cfs = ks.getColumnFamilyStore(table); + if (tableIds.contains(cfs.metadata.id)) + return true; + } + } + return false; + } } diff --git a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java index 6475794..fa224d1 100644 --- a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java +++ b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -106,6 +107,7 @@ import static org.apache.cassandra.repair.consistent.ConsistentSession.State.*; public class LocalSessions { private static final Logger logger = LoggerFactory.getLogger(LocalSessions.class); + private static final Set<Listener> listeners = new CopyOnWriteArraySet<>(); /** * Amount of time a session can go without any activity before we start checking the status of other @@ -441,7 +443,7 @@ public class LocalSessions return new LocalSession(builder); } - protected LocalSession getSession(UUID sessionID) + public LocalSession getSession(UUID sessionID) { return sessions.get(sessionID); } @@ -520,6 +522,8 @@ public class LocalSessions { sessionCompleted(session); } + for (Listener listener : listeners) + listener.onIRStateChange(session); } } @@ -777,7 +781,7 @@ public class LocalSessions LocalSession session = getSession(sessionID); if (session == null) { - logger.warn("Received status response message for unknown session {}", sessionID); + logger.warn("Received status request message for unknown session {}", sessionID); sendMessage(from, Message.out(STATUS_RSP, new StatusResponse(sessionID, FAILED))); } else @@ -868,4 +872,19 @@ public class LocalSessions throw new IllegalStateException("Cannot get final repaired at value for in progress session: " + session); } } + + public static void registerListener(Listener listener) + { + listeners.add(listener); + } + + public static void unregisterListener(Listener listener) + { + listeners.remove(listener); + } + + public interface Listener + { + void onIRStateChange(LocalSession session); + } } diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java index 6f4c474..7499c36 100644 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@ -70,7 +70,6 @@ import org.apache.cassandra.repair.consistent.CoordinatorSessions; import org.apache.cassandra.repair.consistent.LocalSessions; import org.apache.cassandra.repair.messages.*; import org.apache.cassandra.schema.TableId; -import org.apache.cassandra.utils.CassandraVersion; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.MBeanWrapper; import org.apache.cassandra.utils.Pair; @@ -230,6 +229,9 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai // register listeners registerOnFdAndGossip(session); + if (session.previewKind == PreviewKind.REPAIRED) + LocalSessions.registerListener(session); + // remove session at completion session.addListener(new Runnable() { @@ -239,6 +241,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai public void run() { sessions.remove(session.getId()); + LocalSessions.unregisterListener(session); } }, MoreExecutors.directExecutor()); session.start(executor); diff --git a/src/java/org/apache/cassandra/streaming/PreviewKind.java b/src/java/org/apache/cassandra/streaming/PreviewKind.java index 51c5746..b5467de 100644 --- a/src/java/org/apache/cassandra/streaming/PreviewKind.java +++ b/src/java/org/apache/cassandra/streaming/PreviewKind.java @@ -18,22 +18,34 @@ package org.apache.cassandra.streaming; - import java.util.UUID; +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; + +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.metadata.StatsMetadata; +import org.apache.cassandra.repair.consistent.ConsistentSession; +import org.apache.cassandra.repair.consistent.LocalSession; +import org.apache.cassandra.service.ActiveRepairService; + public enum PreviewKind { - NONE(0), - ALL(1), - UNREPAIRED(2), - REPAIRED(3); + NONE(0, (sstable) -> { + throw new RuntimeException("Can't get preview predicate for preview kind NONE"); + }), + ALL(1, Predicates.alwaysTrue()), + UNREPAIRED(2, sstable -> !sstable.isRepaired()), + REPAIRED(3, new PreviewRepairedSSTablePredicate()); private final int serializationVal; + private final Predicate<SSTableReader> predicate; - PreviewKind(int serializationVal) + PreviewKind(int serializationVal, Predicate<SSTableReader> predicate) { assert ordinal() == serializationVal; this.serializationVal = serializationVal; + this.predicate = predicate; } public int getSerializationVal() @@ -46,7 +58,6 @@ public enum PreviewKind return values()[serializationVal]; } - public boolean isPreview() { return this != NONE; @@ -62,4 +73,28 @@ public enum PreviewKind return '[' + logPrefix() + " #" + sessionId.toString() + ']'; } + public Predicate<SSTableReader> predicate() + { + return predicate; + } + + private static class PreviewRepairedSSTablePredicate implements Predicate<SSTableReader> + { + public boolean apply(SSTableReader sstable) + { + // grab the metadata before checking pendingRepair since this can be nulled out at any time + StatsMetadata sstableMetadata = sstable.getSSTableMetadata(); + if (sstableMetadata.pendingRepair != null) + { + LocalSession session = ActiveRepairService.instance.consistent.local.getSession(sstableMetadata.pendingRepair); + if (session == null) + return false; + else if (session.getState() == ConsistentSession.State.FINALIZED) + return true; + else if (session.getState() != ConsistentSession.State.FAILED) + throw new IllegalStateException(String.format("SSTable %s is marked pending for non-finalized incremental repair session %s, failing preview repair", sstable, sstableMetadata.pendingRepair)); + } + return sstableMetadata.repairedAt != ActiveRepairService.UNREPAIRED_SSTABLE; + } + } } diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java index b8ef25d..1beb708 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java @@ -401,6 +401,8 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance if (!FBUtilities.getBroadcastAddressAndPort().equals(broadcastAddressAndPort())) throw new IllegalStateException(); + + ActiveRepairService.instance.start(); } catch (Throwable t) { diff --git a/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java b/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java new file mode 100644 index 0000000..ed29a30 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import com.google.common.collect.ImmutableList; +import org.junit.Test; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.ICoordinator; +import org.apache.cassandra.distributed.api.IIsolatedExecutor; +import org.apache.cassandra.distributed.api.IMessage; +import org.apache.cassandra.distributed.api.IMessageFilters; +import org.apache.cassandra.net.Verb; +import org.apache.cassandra.repair.RepairParallelism; +import org.apache.cassandra.repair.messages.RepairOption; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.streaming.PreviewKind; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.Pair; +import org.apache.cassandra.utils.concurrent.SimpleCondition; +import org.apache.cassandra.utils.progress.ProgressEventType; + +import static org.apache.cassandra.distributed.api.Feature.GOSSIP; +import static org.apache.cassandra.distributed.api.Feature.NETWORK; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class PreviewRepairTest extends DistributedTestBase +{ + /** + * makes sure that the repaired sstables are not matching on the two + * nodes by disabling autocompaction on node2 and then running an + * incremental repair + */ + @Test + public void testWithMismatchingPending() throws Throwable + { + try(Cluster cluster = init(Cluster.build(2).withConfig(config -> config.with(GOSSIP).with(NETWORK)).start())) + { + cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int)"); + insert(cluster.coordinator(1), 0, 100); + cluster.forEach((node) -> node.flush(KEYSPACE)); + cluster.get(1).callOnInstance(repair(options(false))); + insert(cluster.coordinator(1), 100, 100); + cluster.forEach((node) -> node.flush(KEYSPACE)); + + // make sure that all sstables have moved to repaired by triggering a compaction + // also disables autocompaction on the nodes + cluster.forEach((node) -> node.runOnInstance(() -> { + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl"); + FBUtilities.waitOnFutures(CompactionManager.instance.submitBackground(cfs)); + cfs.disableAutoCompaction(); + })); + cluster.get(1).callOnInstance(repair(options(false))); + // now re-enable autocompaction on node1, this moves the sstables for the new repair to repaired + cluster.get(1).runOnInstance(() -> { + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl"); + cfs.enableAutoCompaction(); + FBUtilities.waitOnFutures(CompactionManager.instance.submitBackground(cfs)); + }); + Pair<Boolean, Boolean> rs = cluster.get(1).callOnInstance(repair(options(true))); + assertTrue(rs.left); // preview repair should succeed + assertFalse(rs.right); // and we should see no mismatches + } + } + + /** + * another case where the repaired datasets could mismatch is if an incremental repair finishes just as the preview + * repair is starting up. + * + * This tests this case: + * 1. we start a preview repair + * 2. pause the validation requests from node1 -> node2 + * 3. node1 starts its validation + * 4. run an incremental repair which completes fine + * 5. node2 resumes its validation + * + * Now we will include sstables from the second incremental repair on node2 but not on node1 + * This should fail since we fail any preview repair which is ongoing when an incremental repair finishes (step 4 above) + */ + @Test + public void testFinishingIncRepairDuringPreview() throws IOException, InterruptedException, ExecutionException + { + ExecutorService es = Executors.newSingleThreadExecutor(); + try(Cluster cluster = init(Cluster.build(2).withConfig(config -> config.with(GOSSIP).with(NETWORK)).start())) + { + cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int)"); + + insert(cluster.coordinator(1), 0, 100); + cluster.forEach((node) -> node.flush(KEYSPACE)); + cluster.get(1).callOnInstance(repair(options(false))); + + insert(cluster.coordinator(1), 100, 100); + cluster.forEach((node) -> node.flush(KEYSPACE)); + + SimpleCondition continuePreviewRepair = new SimpleCondition(); + DelayMessageFilter filter = new DelayMessageFilter(continuePreviewRepair); + // this pauses the validation request sent from node1 to node2 until we have run a full inc repair below + cluster.filters().verbs(Verb.VALIDATION_REQ.id).from(1).to(2).messagesMatching(filter).drop(); + + Future<Pair<Boolean, Boolean>> rsFuture = es.submit(() -> cluster.get(1).callOnInstance(repair(options(true)))); + Thread.sleep(1000); + // this needs to finish before the preview repair is unpaused on node2 + cluster.get(1).callOnInstance(repair(options(false))); + continuePreviewRepair.signalAll(); + Pair<Boolean, Boolean> rs = rsFuture.get(); + assertFalse(rs.left); // preview repair should have failed + assertFalse(rs.right); // and no mismatches should have been reported + } + finally + { + es.shutdown(); + } + } + + /** + * Same as testFinishingIncRepairDuringPreview but the previewed range does not intersect the incremental repair + * so both preview and incremental repair should finish fine (without any mismatches) + */ + @Test + public void testFinishingNonIntersectingIncRepairDuringPreview() throws IOException, InterruptedException, ExecutionException + { + ExecutorService es = Executors.newSingleThreadExecutor(); + try(Cluster cluster = init(Cluster.build(2).withConfig(config -> config.with(GOSSIP).with(NETWORK)).start())) + { + cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int)"); + + insert(cluster.coordinator(1), 0, 100); + cluster.forEach((node) -> node.flush(KEYSPACE)); + assertTrue(cluster.get(1).callOnInstance(repair(options(false))).left); + + insert(cluster.coordinator(1), 100, 100); + cluster.forEach((node) -> node.flush(KEYSPACE)); + + // pause preview repair validation messages on node2 until node1 has finished + SimpleCondition continuePreviewRepair = new SimpleCondition(); + DelayMessageFilter filter = new DelayMessageFilter(continuePreviewRepair); + cluster.filters().verbs(Verb.VALIDATION_REQ.id).from(1).to(2).messagesMatching(filter).drop(); + + // get local ranges to repair two separate ranges: + List<String> localRanges = cluster.get(1).callOnInstance(() -> { + List<String> res = new ArrayList<>(); + for (Range<Token> r : StorageService.instance.getLocalReplicas(KEYSPACE).ranges()) + res.add(r.left.getTokenValue()+ ":"+ r.right.getTokenValue()); + return res; + }); + + assertEquals(2, localRanges.size()); + Future<Pair<Boolean, Boolean>> repairStatusFuture = es.submit(() -> cluster.get(1).callOnInstance(repair(options(true, localRanges.get(0))))); + Thread.sleep(1000); // wait for node1 to start validation compaction + // this needs to finish before the preview repair is unpaused on node2 + assertTrue(cluster.get(1).callOnInstance(repair(options(false, localRanges.get(1)))).left); + + continuePreviewRepair.signalAll(); + Pair<Boolean, Boolean> rs = repairStatusFuture.get(); + assertTrue(rs.left); // repair should succeed + assertFalse(rs.right); // and no mismatches + } + finally + { + es.shutdown(); + } + } + + private static class DelayMessageFilter implements IMessageFilters.Matcher + { + private final SimpleCondition condition; + private final AtomicBoolean waitForRepair = new AtomicBoolean(true); + + public DelayMessageFilter(SimpleCondition condition) + { + this.condition = condition; + } + public boolean matches(int from, int to, IMessage message) + { + try + { + // only the first validation req should be delayed: + if (waitForRepair.compareAndSet(true, false)) + condition.await(); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + return false; // don't drop the message + } + } + + private static void insert(ICoordinator coordinator, int start, int count) + { + for (int i = start; i < start + count; i++) + coordinator.execute("insert into " + KEYSPACE + ".tbl (id, t) values (?, ?)", ConsistencyLevel.ALL, i, i); + } + + /** + * returns a pair with [repair success, was inconsistent] + */ + private static IIsolatedExecutor.SerializableCallable<Pair<Boolean, Boolean>> repair(Map<String, String> options) + { + return () -> { + SimpleCondition await = new SimpleCondition(); + AtomicBoolean success = new AtomicBoolean(true); + AtomicBoolean wasInconsistent = new AtomicBoolean(false); + StorageService.instance.repair(KEYSPACE, options, ImmutableList.of((tag, event) -> { + if (event.getType() == ProgressEventType.ERROR) + { + success.set(false); + await.signalAll(); + } + else if (event.getType() == ProgressEventType.NOTIFICATION && event.getMessage().contains("Repaired data is inconsistent")) + { + wasInconsistent.set(true); + } + else if (event.getType() == ProgressEventType.COMPLETE) + await.signalAll(); + })); + try + { + await.await(1, TimeUnit.MINUTES); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + return Pair.create(success.get(), wasInconsistent.get()); + }; + } + + private static Map<String, String> options(boolean preview) + { + Map<String, String> config = new HashMap<>(); + config.put(RepairOption.INCREMENTAL_KEY, "true"); + config.put(RepairOption.PARALLELISM_KEY, RepairParallelism.PARALLEL.toString()); + if (preview) + config.put(RepairOption.PREVIEW, PreviewKind.REPAIRED.toString()); + return config; + } + + private static Map<String, String> options(boolean preview, String range) + { + Map<String, String> options = options(preview); + options.put(RepairOption.RANGES_KEY, range); + return options; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org