This is an automated email from the ASF dual-hosted git repository. dcapwell pushed a commit to branch cassandra-5.0 in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 1b1f88e38a363b7937e8472b6863f9168b687659 Merge: a9da19c311 7c79d91b6f Author: ci worker <dcapw...@apache.org> AuthorDate: Wed May 1 14:34:24 2024 -0700 Merge branch 'cassandra-4.1' into cassandra-5.0 CHANGES.txt | 1 + .../db/compaction/AbstractStrategyHolder.java | 1 + .../db/compaction/CompactionStrategyHolder.java | 6 ++++ .../db/compaction/CompactionStrategyManager.java | 18 ++++++++++- .../db/compaction/PendingRepairHolder.java | 12 ++++++++ .../db/compaction/PendingRepairManager.java | 11 ++++++- .../org/apache/cassandra/db/lifecycle/Tracker.java | 2 ++ .../apache/cassandra/repair/RepairCoordinator.java | 15 +++++++++ .../cassandra/repair/RepairMessageVerbHandler.java | 3 ++ .../distributed/test/DistributedRepairUtils.java | 34 ++++++++++++++++++++ .../distributed/test/RepairCoordinatorFast.java | 3 ++ .../test/RepairCoordinatorNeighbourDown.java | 3 ++ .../repair/ConcurrentIrWithPreviewFuzzTest.java | 2 +- .../cassandra/repair/FailingRepairFuzzTest.java | 1 + .../org/apache/cassandra/repair/FuzzTestBase.java | 36 ++++++++++++++++++++-- 15 files changed, 143 insertions(+), 5 deletions(-) diff --cc CHANGES.txt index 7e18e40c97,17257d606a..ac41c7c180 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -38,9 -4,8 +38,10 @@@ Merged from 4.1 * Fix hints delivery for a node going down repeatedly (CASSANDRA-19495) * Do not go to disk for reading hints file sizes (CASSANDRA-19477) * Fix system_views.settings to handle array types (CASSANDRA-19475) + * Memoize Cassandra verion and add a backoff interval for failed schema pulls (CASSANDRA-18902) + * Fix StackOverflowError on ALTER after many previous schema changes (CASSANDRA-19166) Merged from 4.0: + * IR may leak SSTables with pending repair when coming from streaming (CASSANDRA-19182) * Streaming exception race creates corrupt transaction log files that prevent restart (CASSANDRA-18736) * Fix CQL tojson timestamp output on negative timestamp values before Gregorian calendar reform in 1582 (CASSANDRA-19566) * Fix few types issues and implement types compatibility tests (CASSANDRA-19479) diff --cc src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java index 534bf3ae82,86c40e8958..0c5d53c1d8 --- a/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java +++ b/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java @@@ -285,12 -289,8 +292,17 @@@ public class PendingRepairHolder extend return Iterables.any(managers, prm -> prm.containsSSTable(sstable)); } + @Override + public int getEstimatedRemainingTasks() + { + int tasks = 0; + for (PendingRepairManager manager : managers) + tasks += manager.getEstimatedRemainingTasks(); + return tasks; + } ++ + public boolean hasPendingRepairSSTable(TimeUUID sessionID, SSTableReader sstable) + { + return Iterables.any(managers, prm -> prm.hasPendingRepairSSTable(sessionID, sstable)); + } } diff --cc src/java/org/apache/cassandra/repair/RepairCoordinator.java index 27dd3a73b5,0000000000..82664f2d3c mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/repair/RepairCoordinator.java +++ b/src/java/org/apache/cassandra/repair/RepairCoordinator.java @@@ -1,635 -1,0 +1,650 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.repair; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.function.Supplier; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +import org.apache.cassandra.locator.RangesAtEndpoint; ++import org.apache.cassandra.net.Verb; ++import org.apache.cassandra.repair.messages.FailSession; ++import org.apache.cassandra.repair.messages.RepairMessage; ++import org.apache.cassandra.repair.state.ParticipateState; +import org.apache.cassandra.utils.*; +import org.apache.cassandra.utils.concurrent.Future; +import org.apache.commons.lang3.time.DurationFormatUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.codahale.metrics.Timer; +import org.apache.cassandra.concurrent.ExecutorPlus; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.QueryOptions; +import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.cql3.UntypedResultSet; +import org.apache.cassandra.cql3.statements.SelectStatement; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.exceptions.RepairException; +import org.apache.cassandra.locator.EndpointsForRange; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.metrics.StorageMetrics; +import org.apache.cassandra.repair.messages.RepairOption; +import org.apache.cassandra.repair.state.CoordinatorState; +import org.apache.cassandra.schema.SchemaConstants; +import org.apache.cassandra.schema.SystemDistributedKeyspace; +import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.service.ActiveRepairService.ParentRepairStatus; +import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.tracing.TraceKeyspace; +import org.apache.cassandra.tracing.TraceState; +import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.transport.messages.ResultMessage; +import org.apache.cassandra.utils.progress.ProgressEvent; +import org.apache.cassandra.utils.progress.ProgressEventNotifier; +import org.apache.cassandra.utils.progress.ProgressEventType; +import org.apache.cassandra.utils.progress.ProgressListener; + +import static org.apache.cassandra.repair.state.AbstractState.COMPLETE; +import static org.apache.cassandra.repair.state.AbstractState.INIT; +import static org.apache.cassandra.service.QueryState.forInternalCalls; + +public class RepairCoordinator implements Runnable, ProgressEventNotifier, RepairNotifier +{ + private static final Logger logger = LoggerFactory.getLogger(RepairCoordinator.class); + + private static final AtomicInteger THREAD_COUNTER = new AtomicInteger(1); + + public final CoordinatorState state; + private final String tag; + private final BiFunction<String, String[], Iterable<ColumnFamilyStore>> validColumnFamilies; + private final Function<String, RangesAtEndpoint> getLocalReplicas; + + private final List<ProgressListener> listeners = new ArrayList<>(); + private final AtomicReference<Throwable> firstError = new AtomicReference<>(null); + final SharedContext ctx; + final Scheduler validationScheduler; + + private TraceState traceState; + + public RepairCoordinator(StorageService storageService, int cmd, RepairOption options, String keyspace) + { + this(SharedContext.Global.instance, + (ks, tables) -> storageService.getValidColumnFamilies(false, false, ks, tables), + storageService::getLocalReplicas, + cmd, options, keyspace); + } + + RepairCoordinator(SharedContext ctx, + BiFunction<String, String[], Iterable<ColumnFamilyStore>> validColumnFamilies, + Function<String, RangesAtEndpoint> getLocalReplicas, + int cmd, RepairOption options, String keyspace) + { + this.ctx = ctx; + this.validationScheduler = Scheduler.build(DatabaseDescriptor.getConcurrentMerkleTreeRequests()); + this.state = new CoordinatorState(ctx.clock(), cmd, keyspace, options); + this.tag = "repair:" + cmd; + this.validColumnFamilies = validColumnFamilies; + this.getLocalReplicas = getLocalReplicas; + ctx.repair().register(state); + } + + @Override + public void addProgressListener(ProgressListener listener) + { + listeners.add(listener); + } + + @Override + public void removeProgressListener(ProgressListener listener) + { + listeners.remove(listener); + } + + + protected void fireProgressEvent(ProgressEvent event) + { + for (ProgressListener listener : listeners) + { + listener.progress(tag, event); + } + } + + @Override + public void notification(String msg) + { + logger.info(msg); + fireProgressEvent(jmxEvent(ProgressEventType.NOTIFICATION, msg)); + } + + @Override + public void notifyError(Throwable error) + { + // exception should be ignored + if (error instanceof SomeRepairFailedException) + return; + + if (Throwables.anyCauseMatches(error, RepairException::shouldWarn)) + { + logger.warn("Repair {} aborted: {}", state.id, error.getMessage()); + if (logger.isDebugEnabled()) + logger.debug("Repair {} aborted: ", state.id, error); + } + else + { + logger.error("Repair {} failed:", state.id, error); + } + + StorageMetrics.repairExceptions.inc(); + String errorMessage = String.format("Repair command #%d failed with error %s", state.cmd, error.getMessage()); + fireProgressEvent(jmxEvent(ProgressEventType.ERROR, errorMessage)); + firstError.compareAndSet(null, error); + + // since this can fail, update table only after updating in-memory and notification state + maybeStoreParentRepairFailure(error); + } + + @Override + public void notifyProgress(String message) + { + logger.info(message); + fireProgressEvent(jmxEvent(ProgressEventType.PROGRESS, message)); + } + + private void skip(String msg) + { + state.phase.skip(msg); + notification("Repair " + state.id + " skipped: " + msg); + success(msg); + } + + private void success(String msg) + { + state.phase.success(msg); + fireProgressEvent(jmxEvent(ProgressEventType.SUCCESS, msg)); + ctx.repair().recordRepairStatus(state.cmd, ActiveRepairService.ParentRepairStatus.COMPLETED, + ImmutableList.of(msg)); + complete(null); + } + + private void fail(String reason) + { + if (reason == null) + { + Throwable error = firstError.get(); + reason = error != null ? error.toString() : "Some repair failed"; + } + state.phase.fail(reason); ++ ParticipateState p = ctx.repair().participate(state.id); ++ if (p != null) ++ p.phase.fail(reason); ++ NeighborsAndRanges neighborsAndRanges = state.getNeighborsAndRanges(); ++ // this is possible if the failure happened during input processing, in which case no particpates have been notified ++ if (neighborsAndRanges != null) ++ { ++ FailSession msg = new FailSession(state.id); ++ for (InetAddressAndPort participate : neighborsAndRanges.participants) ++ RepairMessage.sendMessageWithRetries(ctx, msg, Verb.FAILED_SESSION_MSG, participate); ++ } + String completionMessage = String.format("Repair command #%d finished with error", state.cmd); + + // Note we rely on the first message being the reason for the failure + // when inspecting this state from RepairRunner.queryForCompletedRepair + ctx.repair().recordRepairStatus(state.cmd, ParentRepairStatus.FAILED, + ImmutableList.of(reason, completionMessage)); + + complete(completionMessage); + } + + private void complete(String msg) + { + long durationMillis = state.getDurationMillis(); + if (msg == null) + { + String duration = DurationFormatUtils.formatDurationWords(durationMillis, true, true); + msg = String.format("Repair command #%d finished in %s", state.cmd, duration); + } + + fireProgressEvent(jmxEvent(ProgressEventType.COMPLETE, msg)); + logger.info(state.options.getPreviewKind().logPrefix(state.id) + msg); + + ctx.repair().removeParentRepairSession(state.id); + TraceState localState = traceState; + if (state.options.isTraced() && localState != null) + { + for (ProgressListener listener : listeners) + localState.removeProgressListener(listener); + // Because ExecutorPlus#afterExecute and this callback + // run in a nondeterministic order (within the same thread), the + // TraceState may have been nulled out at this point. The TraceState + // should be traceState, so just set it without bothering to check if it + // actually was nulled out. + Tracing.instance.set(localState); + Tracing.traceRepair(msg); + Tracing.instance.stopSession(); + } + + Keyspace.open(state.keyspace).metric.repairTime.update(durationMillis, TimeUnit.MILLISECONDS); + } + + public void run() + { + try + { + runMayThrow(); + } + catch (SkipRepairException e) + { + skip(e.getMessage()); + } + catch (Throwable e) + { + notifyError(e); + fail(e.getMessage()); + } + } + + private void runMayThrow() throws Throwable + { + state.phase.setup(); + ctx.repair().recordRepairStatus(state.cmd, ParentRepairStatus.IN_PROGRESS, ImmutableList.of()); + + List<ColumnFamilyStore> columnFamilies = getColumnFamilies(); + String[] cfnames = columnFamilies.stream().map(cfs -> cfs.name).toArray(String[]::new); + + this.traceState = maybeCreateTraceState(columnFamilies); + notifyStarting(); + NeighborsAndRanges neighborsAndRanges = getNeighborsAndRanges(); + // We test to validate the start JMX notification is seen before we compute neighbors and ranges + // but in state (vtable) tracking, we rely on getNeighborsAndRanges to know where we are running repair... + // JMX start != state start, its possible we fail in getNeighborsAndRanges and state start is never reached + state.phase.start(columnFamilies, neighborsAndRanges); + + maybeStoreParentRepairStart(cfnames); + + prepare(columnFamilies, neighborsAndRanges.participants, neighborsAndRanges.shouldExcludeDeadParticipants) + .flatMap(ignore -> repair(cfnames, neighborsAndRanges)) + .addCallback((pair, failure) -> { + if (failure != null) + { + notifyError(failure); + fail(failure.getMessage()); + } + else + { + state.phase.repairCompleted(); + CoordinatedRepairResult result = pair.left; + maybeStoreParentRepairSuccess(result.successfulRanges); + if (result.hasFailed()) + { + fail(null); + } + else + { + success(pair.right.get()); + ctx.repair().cleanUp(state.id, neighborsAndRanges.participants); + } + } + }); + } + + private List<ColumnFamilyStore> getColumnFamilies() + { + String[] columnFamilies = state.options.getColumnFamilies().toArray(new String[state.options.getColumnFamilies().size()]); + Iterable<ColumnFamilyStore> validColumnFamilies = this.validColumnFamilies.apply(state.keyspace, columnFamilies); + + if (Iterables.isEmpty(validColumnFamilies)) + throw new SkipRepairException(String.format("%s Empty keyspace, skipping repair: %s", state.id, state.keyspace)); + return Lists.newArrayList(validColumnFamilies); + } + + private TraceState maybeCreateTraceState(Iterable<ColumnFamilyStore> columnFamilyStores) + { + if (!state.options.isTraced()) + return null; + + StringBuilder cfsb = new StringBuilder(); + for (ColumnFamilyStore cfs : columnFamilyStores) + cfsb.append(", ").append(cfs.getKeyspaceName()).append(".").append(cfs.name); + + TimeUUID sessionId = Tracing.instance.newSession(Tracing.TraceType.REPAIR); + TraceState traceState = Tracing.instance.begin("repair", ImmutableMap.of("keyspace", state.keyspace, "columnFamilies", + cfsb.substring(2))); + traceState.enableActivityNotification(tag); + for (ProgressListener listener : listeners) + traceState.addProgressListener(listener); + Thread queryThread = createQueryThread(sessionId); + queryThread.setName("RepairTracePolling"); + return traceState; + } + + private void notifyStarting() + { + String message = String.format("Starting repair command #%d (%s), repairing keyspace %s with %s", state.cmd, state.id, state.keyspace, + state.options); + logger.info(message); + Tracing.traceRepair(message); + fireProgressEvent(jmxEvent(ProgressEventType.START, message)); + } + + private NeighborsAndRanges getNeighborsAndRanges() throws RepairException + { + Set<InetAddressAndPort> allNeighbors = new HashSet<>(); + List<CommonRange> commonRanges = new ArrayList<>(); + + //pre-calculate output of getLocalReplicas and pass it to getNeighbors to increase performance and prevent + //calculation multiple times + Iterable<Range<Token>> keyspaceLocalRanges = getLocalReplicas.apply(state.keyspace).ranges(); + + for (Range<Token> range : state.options.getRanges()) + { + EndpointsForRange neighbors = ctx.repair().getNeighbors(state.keyspace, keyspaceLocalRanges, range, + state.options.getDataCenters(), + state.options.getHosts()); + if (neighbors.isEmpty()) + { + if (state.options.ignoreUnreplicatedKeyspaces()) + { + logger.info("{} Found no neighbors for range {} for {} - ignoring since repairing with --ignore-unreplicated-keyspaces", state.id, range, state.keyspace); + continue; + } + else + { + throw RepairException.warn(String.format("Nothing to repair for %s in %s - aborting", range, state.keyspace)); + } + } + addRangeToNeighbors(commonRanges, range, neighbors); + allNeighbors.addAll(neighbors.endpoints()); + } + + if (state.options.ignoreUnreplicatedKeyspaces() && allNeighbors.isEmpty()) + { + throw new SkipRepairException(String.format("Nothing to repair for %s in %s - unreplicated keyspace is ignored since repair was called with --ignore-unreplicated-keyspaces", + state.options.getRanges(), + state.keyspace)); + } + + boolean shouldExcludeDeadParticipants = state.options.isForcedRepair(); + + if (shouldExcludeDeadParticipants) + { + Set<InetAddressAndPort> actualNeighbors = Sets.newHashSet(Iterables.filter(allNeighbors, ctx.failureDetector()::isAlive)); + shouldExcludeDeadParticipants = !allNeighbors.equals(actualNeighbors); + allNeighbors = actualNeighbors; + } + return new NeighborsAndRanges(shouldExcludeDeadParticipants, allNeighbors, commonRanges); + } + + private void maybeStoreParentRepairStart(String[] cfnames) + { + if (!state.options.isPreview()) + { + SystemDistributedKeyspace.startParentRepair(state.id, state.keyspace, cfnames, state.options); + } + } + + private void maybeStoreParentRepairSuccess(Collection<Range<Token>> successfulRanges) + { + if (!state.options.isPreview()) + { + SystemDistributedKeyspace.successfulParentRepair(state.id, successfulRanges); + } + } + + private void maybeStoreParentRepairFailure(Throwable error) + { + if (!state.options.isPreview()) + { + SystemDistributedKeyspace.failParentRepair(state.id, error); + } + } + + private Future<?> prepare(List<ColumnFamilyStore> columnFamilies, Set<InetAddressAndPort> allNeighbors, boolean force) + { + state.phase.prepareStart(); + Timer timer = Keyspace.open(state.keyspace).metric.repairPrepareTime; + long startNanos = ctx.clock().nanoTime(); + return ctx.repair().prepareForRepair(state.id, ctx.broadcastAddressAndPort(), allNeighbors, state.options, force, columnFamilies) + .map(ignore -> { + timer.update(ctx.clock().nanoTime() - startNanos, TimeUnit.NANOSECONDS); + state.phase.prepareComplete(); + return null; + }); + } + + private Future<Pair<CoordinatedRepairResult, Supplier<String>>> repair(String[] cfnames, NeighborsAndRanges neighborsAndRanges) + { + RepairTask task; + if (state.options.isPreview()) + { + task = new PreviewRepairTask(this, state.id, neighborsAndRanges.filterCommonRanges(state.keyspace, cfnames), cfnames); + } + else if (state.options.isIncremental()) + { + task = new IncrementalRepairTask(this, state.id, neighborsAndRanges, cfnames); + } + else + { + task = new NormalRepairTask(this, state.id, neighborsAndRanges.filterCommonRanges(state.keyspace, cfnames), cfnames); + } + + ExecutorPlus executor = createExecutor(); + state.phase.repairSubmitted(); + return task.perform(executor, validationScheduler) + // after adding the callback java could no longer infer the type... + .<Pair<CoordinatedRepairResult, Supplier<String>>>map(r -> Pair.create(r, task::successMessage)) + .addCallback((s, f) -> executor.shutdown()); + } + + private ExecutorPlus createExecutor() + { + return ctx.executorFactory() + .localAware() + .withJmxInternal() + .pooled("Repair#" + state.cmd, state.options.getJobThreads()); + } + + private static void addRangeToNeighbors(List<CommonRange> neighborRangeList, Range<Token> range, EndpointsForRange neighbors) + { + Set<InetAddressAndPort> endpoints = neighbors.endpoints(); + Set<InetAddressAndPort> transEndpoints = neighbors.filter(Replica::isTransient).endpoints(); + + for (CommonRange commonRange : neighborRangeList) + { + if (commonRange.matchesEndpoints(endpoints, transEndpoints)) + { + commonRange.ranges.add(range); + return; + } + } + + List<Range<Token>> ranges = new ArrayList<>(); + ranges.add(range); + neighborRangeList.add(new CommonRange(endpoints, transEndpoints, ranges)); + } + + private Thread createQueryThread(final TimeUUID sessionId) + { + return ctx.executorFactory().startThread("Repair-Runnable-" + THREAD_COUNTER.incrementAndGet(), new WrappedRunnable() + { + // Query events within a time interval that overlaps the last by one second. Ignore duplicates. Ignore local traces. + // Wake up upon local trace activity. Query when notified of trace activity with a timeout that doubles every two timeouts. + public void runMayThrow() throws Exception + { + TraceState state = Tracing.instance.get(sessionId); + if (state == null) + throw new Exception("no tracestate"); + + String format = "select event_id, source, source_port, activity from %s.%s where session_id = ? and event_id > ? and event_id < ?;"; + String query = String.format(format, SchemaConstants.TRACE_KEYSPACE_NAME, TraceKeyspace.EVENTS); + SelectStatement statement = (SelectStatement) QueryProcessor.parseStatement(query).prepare(ClientState.forInternalCalls()); + + ByteBuffer sessionIdBytes = sessionId.toBytes(); + InetAddressAndPort source = ctx.broadcastAddressAndPort(); + + HashSet<UUID>[] seen = new HashSet[]{ new HashSet<>(), new HashSet<>() }; + int si = 0; + UUID uuid; + + long tlast = ctx.clock().currentTimeMillis(), tcur; + + TraceState.Status status; + long minWaitMillis = 125; + long maxWaitMillis = 1000 * 1024L; + long timeout = minWaitMillis; + boolean shouldDouble = false; + + while ((status = state.waitActivity(timeout)) != TraceState.Status.STOPPED) + { + if (status == TraceState.Status.IDLE) + { + timeout = shouldDouble ? Math.min(timeout * 2, maxWaitMillis) : timeout; + shouldDouble = !shouldDouble; + } + else + { + timeout = minWaitMillis; + shouldDouble = false; + } + ByteBuffer tminBytes = TimeUUID.minAtUnixMillis(tlast - 1000).toBytes(); + ByteBuffer tmaxBytes = TimeUUID.maxAtUnixMillis(tcur = ctx.clock().currentTimeMillis()).toBytes(); + QueryOptions options = QueryOptions.forInternalCalls(ConsistencyLevel.ONE, Lists.newArrayList(sessionIdBytes, + tminBytes, + tmaxBytes)); + ResultMessage.Rows rows = statement.execute(forInternalCalls(), options, ctx.clock().nanoTime()); + UntypedResultSet result = UntypedResultSet.create(rows.result); + + for (UntypedResultSet.Row r : result) + { + int port = DatabaseDescriptor.getStoragePort(); + if (r.has("source_port")) + port = r.getInt("source_port"); + InetAddressAndPort eventNode = InetAddressAndPort.getByAddressOverrideDefaults(r.getInetAddress("source"), port); + if (source.equals(eventNode)) + continue; + if ((uuid = r.getUUID("event_id")).timestamp() > (tcur - 1000) * 10000) + seen[si].add(uuid); + if (seen[si == 0 ? 1 : 0].contains(uuid)) + continue; + String message = String.format("%s: %s", r.getInetAddress("source"), r.getString("activity")); + notification(message); + } + tlast = tcur; + + si = si == 0 ? 1 : 0; + seen[si].clear(); + } + } + }); + } + + private ProgressEvent jmxEvent(ProgressEventType type, String msg) + { + int length = CoordinatorState.State.values().length + 1; // +1 to include completed state + int currentState = state.getCurrentState(); + return new ProgressEvent(type, currentState == INIT ? 0 : currentState == COMPLETE ? length : currentState, length, msg); + } + + private static final class SkipRepairException extends RuntimeException + { + SkipRepairException(String message) + { + super(message); + } + } + + public static final class NeighborsAndRanges + { + final boolean shouldExcludeDeadParticipants; + public final Set<InetAddressAndPort> participants; + public final List<CommonRange> commonRanges; + + public NeighborsAndRanges(boolean shouldExcludeDeadParticipants, Set<InetAddressAndPort> participants, List<CommonRange> commonRanges) + { + this.shouldExcludeDeadParticipants = shouldExcludeDeadParticipants; + this.participants = participants; + this.commonRanges = commonRanges; + } + + /** + * When in the force mode, removes dead nodes from common ranges (not contained within `allNeighbors`), + * and exludes ranges left without any participants + * When not in the force mode, no-op. + */ + public List<CommonRange> filterCommonRanges(String keyspace, String[] tableNames) + { + if (!shouldExcludeDeadParticipants) + { + return commonRanges; + } + else + { + logger.debug("force flag set, removing dead endpoints if possible"); + + List<CommonRange> filtered = new ArrayList<>(commonRanges.size()); + + for (CommonRange commonRange : commonRanges) + { + Set<InetAddressAndPort> endpoints = ImmutableSet.copyOf(Iterables.filter(commonRange.endpoints, participants::contains)); + Set<InetAddressAndPort> transEndpoints = ImmutableSet.copyOf(Iterables.filter(commonRange.transEndpoints, participants::contains)); + Preconditions.checkState(endpoints.containsAll(transEndpoints), "transEndpoints must be a subset of endpoints"); + + // this node is implicitly a participant in this repair, so a single endpoint is ok here + if (!endpoints.isEmpty()) + { + Set<InetAddressAndPort> skippedReplicas = Sets.difference(commonRange.endpoints, endpoints); + skippedReplicas.forEach(endpoint -> logger.info("Removing a dead node {} from repair for ranges {} due to -force", endpoint, commonRange.ranges)); + filtered.add(new CommonRange(endpoints, transEndpoints, commonRange.ranges, !skippedReplicas.isEmpty())); + } + else + { + logger.warn("Skipping forced repair for ranges {} of tables {} in keyspace {}, as no neighbor nodes are live.", + commonRange.ranges, Arrays.asList(tableNames), keyspace); + } + } + Preconditions.checkState(!filtered.isEmpty(), "Not enough live endpoints for a repair"); + return filtered; + } + } + } +} diff --cc src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java index 34f20cd708,12fa6c341c..610c03dcbf --- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java +++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java @@@ -306,9 -258,11 +306,12 @@@ public class RepairMessageVerbHandler i case FAILED_SESSION_MSG: FailSession failure = (FailSession) message.payload; - ActiveRepairService.instance.consistent.coordinated.handleFailSessionMessage(failure); - ActiveRepairService.instance.consistent.local.handleFailSessionMessage(message.from(), failure); - ParticipateState p = ActiveRepairService.instance.participate(failure.sessionID); + sendAck(message); ++ ParticipateState p = ctx.repair().participate(failure.sessionID); + if (p != null) + p.phase.fail("Failure message from " + message.from()); + ctx.repair().consistent.coordinated.handleFailSessionMessage(failure); + ctx.repair().consistent.local.handleFailSessionMessage(message.from(), failure); break; case STATUS_REQ: diff --cc test/distributed/org/apache/cassandra/distributed/test/DistributedRepairUtils.java index fce67b52d1,441fcadf9b..c71a611c01 --- a/test/distributed/org/apache/cassandra/distributed/test/DistributedRepairUtils.java +++ b/test/distributed/org/apache/cassandra/distributed/test/DistributedRepairUtils.java @@@ -167,6 -173,34 +173,34 @@@ public final class DistributedRepairUti Assert.assertFalse("Only one repair expected, but found more than one", rs.hasNext()); } + public static void assertNoSSTableLeak(ICluster<IInvokableInstance> cluster, String ks, String table) + { + cluster.forEach(i -> { + String name = "node" + i.config().num(); + i.forceCompact(ks, table); // cleanup happens in compaction, so run before checking + i.runOnInstance(() -> { + ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore(table); + for (SSTableReader sstable : cfs.getTracker().getView().liveSSTables()) + { + TimeUUID pendingRepair = sstable.getSSTableMetadata().pendingRepair; + if (pendingRepair == null) + continue; - LocalSession session = ActiveRepairService.instance.consistent.local.getSession(pendingRepair); ++ LocalSession session = ActiveRepairService.instance().consistent.local.getSession(pendingRepair); + // repair maybe async, so some participates may still think the repair is active, which means the sstable SHOULD link to it + if (session != null && !session.isCompleted()) + continue; + // The session is complete, yet the sstable is not updated... is this still pending in compaction? + if (cfs.getCompactionStrategyManager().hasPendingRepairSSTable(pendingRepair, sstable)) + continue; + // compaction does not know about the pending repair... race condition since this check started? + if (sstable.getSSTableMetadata().pendingRepair == null) + continue; // yep, race condition... ignore + throw new AssertionError(String.format("%s had leak detected on sstable %s", name, sstable.descriptor)); + } + }); + }); + } + public enum RepairType { FULL { public String[] append(String... args) diff --cc test/unit/org/apache/cassandra/repair/ConcurrentIrWithPreviewFuzzTest.java index 54ae644e34,0000000000..4eba7f0e36 mode 100644,000000..100644 --- a/test/unit/org/apache/cassandra/repair/ConcurrentIrWithPreviewFuzzTest.java +++ b/test/unit/org/apache/cassandra/repair/ConcurrentIrWithPreviewFuzzTest.java @@@ -1,107 -1,0 +1,107 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.junit.Test; + +import accord.utils.Gen; +import accord.utils.Gens; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.RetrySpec; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.repair.consistent.LocalSessions; +import org.apache.cassandra.repair.state.Completable; +import org.apache.cassandra.utils.Closeable; +import org.apache.cassandra.utils.FailingBiConsumer; +import org.assertj.core.api.Assertions; + +import static accord.utils.Property.qt; + +public class ConcurrentIrWithPreviewFuzzTest extends FuzzTestBase +{ + @Test + public void concurrentIrWithPreview() + { + // to avoid unlucky timing issues, retry until success; given enough retries we should eventually become success + DatabaseDescriptor.getRepairRetrySpec().maxAttempts = new RetrySpec.MaxAttempt(Integer.MAX_VALUE); + qt().withPure(false).withExamples(1).check(rs -> { + Cluster cluster = new Cluster(rs); + enableMessageFaults(cluster); + + Gen<Cluster.Node> coordinatorGen = Gens.pick(cluster.nodes.keySet()).map(cluster.nodes::get); + + List<Closeable> closeables = new ArrayList<>(); + for (int example = 0; example < 100; example++) + { + Cluster.Node irCoordinator = coordinatorGen.next(rs); + Cluster.Node previewCoordinator = coordinatorGen.next(rs); + RepairCoordinator ir = irCoordinator.repair(KEYSPACE, irOption(rs, irCoordinator, KEYSPACE, ignore -> TABLES)); + ir.run(); + RepairCoordinator preview = previewCoordinator.repair(KEYSPACE, previewOption(rs, previewCoordinator, KEYSPACE, ignore -> TABLES), false); + preview.run(); + + closeables.add(cluster.nodes.get(pickParticipant(rs, previewCoordinator, preview)).doValidation(ignore -> (cfs, validator) -> addMismatch(rs, cfs, validator))); + // cause a delay in validation to have more failing previews + closeables.add(cluster.nodes.get(pickParticipant(rs, previewCoordinator, preview)).doValidation(next -> (cfs, validator) -> { + if (validator.desc.parentSessionId.equals(preview.state.id)) + delayValidation(cluster, ir, next, cfs, validator); + else next.acceptOrFail(cfs, validator); + })); + // make sure listeners don't leak + closeables.add(LocalSessions::unsafeClearListeners); + + cluster.processAll(); + + // IR will always pass, but preview is what may fail (if the coordinator is the same) + Assertions.assertThat(ir.state.getResult()).describedAs("Unexpected state: %s -> %s; example %d", ir.state, ir.state.getResult(), example).isEqualTo(Completable.Result.success(repairSuccessMessage(ir))); + + Assertions.assertThat(preview.state.getResult()).describedAs("Unexpected state: %s; example %d", preview.state, example).isNotNull(); + + if (irCoordinator == previewCoordinator) + { + Assertions.assertThat(preview.state.getResult().message).describedAs("Unexpected state: %s -> %s; example %d", preview.state, preview.state.getResult(), example).contains("failed with error An incremental repair with session id"); + } + else + { - assertSuccess(example, true, preview); ++ assertSuccess(cluster, example, true, preview); + } + closeables.forEach(Closeable::close); + closeables.clear(); + } + }); + } + + private void delayValidation(Cluster cluster, RepairCoordinator ir, FailingBiConsumer<ColumnFamilyStore, Validator> next, ColumnFamilyStore cfs, Validator validator) + { + cluster.unorderedScheduled.schedule(() -> { + // make sure to wait for IR to complete... + Completable.Result result = ir.state.getResult(); + if (result == null) + { + delayValidation(cluster, ir, next, cfs, validator); + return; + } + next.accept(cfs, validator); + }, 1, TimeUnit.HOURS); + } +} diff --cc test/unit/org/apache/cassandra/repair/FailingRepairFuzzTest.java index 39576c95f0,0000000000..2d1438e030 mode 100644,000000..100644 --- a/test/unit/org/apache/cassandra/repair/FailingRepairFuzzTest.java +++ b/test/unit/org/apache/cassandra/repair/FailingRepairFuzzTest.java @@@ -1,163 -1,0 +1,164 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import com.google.common.collect.ImmutableList; +import org.junit.Test; + +import accord.utils.Gen; +import accord.utils.Gens; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.RetrySpec; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.repair.state.Completable; +import org.apache.cassandra.streaming.StreamEventHandler; +import org.apache.cassandra.streaming.StreamState; +import org.apache.cassandra.utils.Closeable; +import org.assertj.core.api.AbstractStringAssert; +import org.assertj.core.api.Assertions; + +import static accord.utils.Property.qt; + +public class FailingRepairFuzzTest extends FuzzTestBase +{ + private enum RepairJobStage { VALIDATION, SYNC } + + @Test + public void failingRepair() + { + // to avoid unlucky timing issues, retry until success; given enough retries we should eventually become success + DatabaseDescriptor.getRepairRetrySpec().maxAttempts = new RetrySpec.MaxAttempt(Integer.MAX_VALUE); + Gen<RepairJobStage> stageGen = Gens.enums().all(RepairJobStage.class); + qt().withPure(false).withExamples(10).check(rs -> { + Cluster cluster = new Cluster(rs); + enableMessageFaults(cluster); + + Gen<Cluster.Node> coordinatorGen = Gens.pick(cluster.nodes.keySet()).map(cluster.nodes::get); + + List<Closeable> closeables = new ArrayList<>(); + for (int example = 0; example < 100; example++) + { + Cluster.Node coordinator = coordinatorGen.next(rs); + + RepairCoordinator repair = coordinator.repair(KEYSPACE, repairOption(rs, coordinator, KEYSPACE, TABLES), false); + repair.run(); + InetAddressAndPort failingAddress = pickParticipant(rs, coordinator, repair); + Cluster.Node failingNode = cluster.nodes.get(failingAddress); + RepairJobStage stage = stageGen.next(rs); + // because of local syncs reaching out to the failing address, a different address may actually be what failed + Set<InetAddressAndPort> syncFailedAddresses = new HashSet<>(); + switch (stage) + { + case VALIDATION: + { + closeables.add(failingNode.doValidation((cfs, validator) -> { + long delayNanos = rs.nextLong(TimeUnit.MILLISECONDS.toNanos(5), TimeUnit.MINUTES.toNanos(1)); + cluster.unorderedScheduled.schedule(() -> validator.fail(new SimulatedFault("Validation failed")), delayNanos, TimeUnit.NANOSECONDS); + })); + } + break; + case SYNC: + { + closeables.add(failingNode.doValidation((cfs, validator) -> addMismatch(rs, cfs, validator))); + List<InetAddressAndPort> addresses = ImmutableList.<InetAddressAndPort>builder().add(coordinator.addressAndPort).addAll(repair.state.getNeighborsAndRanges().participants).build(); + for (InetAddressAndPort address : addresses) + { + closeables.add(cluster.nodes.get(address).doSync(plan -> { + long delayNanos = rs.nextLong(TimeUnit.SECONDS.toNanos(5), TimeUnit.MINUTES.toNanos(10)); + cluster.unorderedScheduled.schedule(() -> { + if (address == failingAddress || plan.getCoordinator().getPeers().contains(failingAddress)) + { + syncFailedAddresses.add(address); + SimulatedFault fault = new SimulatedFault("Sync failed"); + for (StreamEventHandler handler : plan.handlers()) + handler.onFailure(fault); + } + else + { + StreamState success = new StreamState(plan.planId(), plan.streamOperation(), Collections.emptySet()); + for (StreamEventHandler handler : plan.handlers()) + handler.onSuccess(success); + } + }, delayNanos, TimeUnit.NANOSECONDS); + return null; + })); + } + } + break; + default: + throw new IllegalArgumentException("Unknown stage: " + stage); + } + + cluster.processAll(); + Assertions.assertThat(repair.state.isComplete()).describedAs("Repair job did not complete, and no work is pending...").isTrue(); + Assertions.assertThat(repair.state.getResult().kind).describedAs("Unexpected state: %s -> %s; example %d", repair.state, repair.state.getResult(), example).isEqualTo(Completable.Result.Kind.FAILURE); + switch (stage) + { + case VALIDATION: + { + // Got VALIDATION_REQ failure from Ero2.MJ.N8kkw2.w3iFYdDw.HJiVYC32.mWb.b.xwi3tZ.s5k1l.mb.asTy_7QmQ.Q3.u.kjgh.GKjx.g1aKfkjB.YlyKg9.DyQszn7F.Ox2DMYIph.xlgH.EV.A9yEz2J.l6UHdC.C6FYLXE.J0CNHBH./[4905:e9f:2f00:e418:baac:b8d9:9ff9:6604]:33363: UNKNOWN" + Assertions.assertThat(repair.state.getResult().message) + .describedAs("Unexpected state: %s -> %s; example %d", repair.state, repair.state.getResult(), example) + // ValidationResponse with null tree seen + .containsAnyOf("Validation failed in " + failingAddress, + // ack was dropped and on retry the participate detected dup so rejected as the task failed + "Got VALIDATION_REQ failure from " + failingAddress + ": UNKNOWN"); + } + break; + case SYNC: + AbstractStringAssert<?> a = Assertions.assertThat(repair.state.getResult().message).describedAs("Unexpected state: %s -> %s; example %d", repair.state, repair.state.getResult(), example); + // SymmetricRemoteSyncTask + AsymmetricRemoteSyncTask + // ... Sync failed between /[81fc:714:2c56:a2d3:faf3:eb7c:e4dd:cb9e]:54401 and /220.3.10.72:21402 + // LocalSyncTask + // ... failed with error Sync failed + // Dedup nack, but may be remote or local sync! + // ... Got SYNC_REQ failure from ...: UNKNOWN + String failingMsg = repair.state.getResult().message; + if (failingMsg.contains("Sync failed between")) + { + a.contains("Sync failed between").contains(failingAddress.toString()); + } + else if (failingMsg.contains("Got SYNC_REQ failure from")) + { + Assertions.assertThat(syncFailedAddresses).isNotEmpty(); + a.containsAnyOf(syncFailedAddresses.stream().map(s -> "Got SYNC_REQ failure from " + s + ": UNKNOWN").collect(Collectors.toList()).toArray(String[]::new)); + } + else + { + a.contains("failed with error Sync failed"); + } + break; + default: + throw new IllegalArgumentException("Unknown stage: " + stage); + } ++ assertParticipateResult(cluster, repair, Completable.Result.Kind.FAILURE); + closeables.forEach(Closeable::close); + closeables.clear(); + } + }); + } +} diff --cc test/unit/org/apache/cassandra/repair/FuzzTestBase.java index 81b341bed9,0000000000..b389231c68 mode 100644,000000..100644 --- a/test/unit/org/apache/cassandra/repair/FuzzTestBase.java +++ b/test/unit/org/apache/cassandra/repair/FuzzTestBase.java @@@ -1,1448 -1,0 +1,1480 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Random; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.LongSupplier; +import java.util.function.Supplier; +import javax.annotation.Nullable; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; +import org.junit.Before; ++import com.google.common.collect.Sets; ++ +import org.junit.BeforeClass; + +import accord.utils.DefaultRandom; +import accord.utils.Gen; +import accord.utils.Gens; +import accord.utils.RandomSource; +import org.agrona.collections.LongHashSet; +import org.apache.cassandra.concurrent.ExecutorBuilder; +import org.apache.cassandra.concurrent.ExecutorBuilderFactory; +import org.apache.cassandra.concurrent.ExecutorFactory; +import org.apache.cassandra.concurrent.ExecutorPlus; +import org.apache.cassandra.concurrent.InfiniteLoopExecutor; +import org.apache.cassandra.concurrent.Interruptible; +import org.apache.cassandra.concurrent.ScheduledExecutorPlus; +import org.apache.cassandra.concurrent.SequentialExecutorPlus; +import org.apache.cassandra.concurrent.SimulatedExecutorFactory; +import org.apache.cassandra.concurrent.Stage; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.UnitConfigOverride; +import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Digest; +import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.db.compaction.ICompactionManager; +import org.apache.cassandra.db.marshal.EmptyType; +import org.apache.cassandra.db.repair.CassandraTableRepairManager; +import org.apache.cassandra.db.repair.PendingAntiCompaction; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.exceptions.RequestFailureReason; +import org.apache.cassandra.gms.ApplicationState; +import org.apache.cassandra.gms.EndpointState; +import org.apache.cassandra.gms.HeartBeatState; +import org.apache.cassandra.gms.IEndpointStateChangeSubscriber; +import org.apache.cassandra.gms.IFailureDetector; +import org.apache.cassandra.gms.IGossiper; +import org.apache.cassandra.gms.VersionedValue; +import org.apache.cassandra.io.util.DataInputBuffer; +import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.locator.IEndpointSnitch; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.LocalStrategy; +import org.apache.cassandra.locator.RangesAtEndpoint; +import org.apache.cassandra.locator.TokenMetadata; +import org.apache.cassandra.net.ConnectionType; +import org.apache.cassandra.net.IVerbHandler; +import org.apache.cassandra.net.Message; +import org.apache.cassandra.net.MessageDelivery; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.RequestCallback; +import org.apache.cassandra.repair.messages.RepairMessage; +import org.apache.cassandra.repair.messages.RepairOption; +import org.apache.cassandra.repair.messages.ValidationResponse; +import org.apache.cassandra.repair.state.Completable; +import org.apache.cassandra.repair.state.CoordinatorState; +import org.apache.cassandra.repair.state.JobState; ++import org.apache.cassandra.repair.state.ParticipateState; +import org.apache.cassandra.repair.state.SessionState; +import org.apache.cassandra.repair.state.ValidationState; +import org.apache.cassandra.schema.KeyspaceMetadata; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.schema.SchemaConstants; +import org.apache.cassandra.schema.SystemDistributedKeyspace; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.schema.Tables; +import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.service.PendingRangeCalculatorService; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.service.paxos.cleanup.PaxosCleanupComplete; +import org.apache.cassandra.service.paxos.cleanup.PaxosCleanupHistory; +import org.apache.cassandra.service.paxos.cleanup.PaxosCleanupRequest; +import org.apache.cassandra.service.paxos.cleanup.PaxosCleanupResponse; +import org.apache.cassandra.service.paxos.cleanup.PaxosRepairState; +import org.apache.cassandra.service.paxos.cleanup.PaxosFinishPrepareCleanup; +import org.apache.cassandra.service.paxos.cleanup.PaxosStartPrepareCleanup; +import org.apache.cassandra.streaming.StreamEventHandler; +import org.apache.cassandra.streaming.StreamReceiveException; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.streaming.StreamState; +import org.apache.cassandra.streaming.StreamingChannel; +import org.apache.cassandra.streaming.StreamingDataInputPlus; +import org.apache.cassandra.tools.nodetool.Repair; +import org.apache.cassandra.utils.AbstractTypeGenerators; +import org.apache.cassandra.utils.CassandraGenerators; +import org.apache.cassandra.utils.Clock; +import org.apache.cassandra.utils.Closeable; +import org.apache.cassandra.utils.FailingBiConsumer; +import org.apache.cassandra.utils.Generators; +import org.apache.cassandra.utils.MBeanWrapper; +import org.apache.cassandra.utils.MerkleTree; +import org.apache.cassandra.utils.MerkleTrees; +import org.apache.cassandra.utils.NoSpamLogger; ++import org.apache.cassandra.utils.TimeUUID; +import org.apache.cassandra.utils.concurrent.AsyncPromise; +import org.apache.cassandra.utils.concurrent.Future; +import org.apache.cassandra.utils.concurrent.ImmediateFuture; +import org.apache.cassandra.utils.progress.ProgressEventType; +import org.assertj.core.api.Assertions; +import org.mockito.Mockito; +import org.quicktheories.impl.JavaRandom; + +import static org.apache.cassandra.config.CassandraRelevantProperties.CLOCK_GLOBAL; +import static org.apache.cassandra.config.CassandraRelevantProperties.ORG_APACHE_CASSANDRA_DISABLE_MBEAN_REGISTRATION; + +public abstract class FuzzTestBase extends CQLTester.InMemory +{ + private static final int MISMATCH_NUM_PARTITIONS = 1; + private static final Gen<String> IDENTIFIER_GEN = fromQT(Generators.IDENTIFIER_GEN); + private static final Gen<String> KEYSPACE_NAME_GEN = fromQT(CassandraGenerators.KEYSPACE_NAME_GEN); + private static final Gen<TableId> TABLE_ID_GEN = fromQT(CassandraGenerators.TABLE_ID_GEN); + private static final Gen<InetAddressAndPort> ADDRESS_W_PORT = fromQT(CassandraGenerators.INET_ADDRESS_AND_PORT_GEN); + + private static boolean SETUP_SCHEMA = false; + static String KEYSPACE; + static List<String> TABLES; + + @BeforeClass + public static void setUpClass() + { + ORG_APACHE_CASSANDRA_DISABLE_MBEAN_REGISTRATION.setBoolean(true); + CLOCK_GLOBAL.setString(ClockAccess.class.getName()); + // when running in CI an external actor will replace the test configs based off the test type (such as trie, cdc, etc.), this could then have failing tests + // that do not repo with the same seed! To fix that, go to UnitConfigOverride and update the config type to match the one that failed in CI, this should then + // use the same config, so the seed should not reproduce. + UnitConfigOverride.maybeOverrideConfig(); + + DatabaseDescriptor.daemonInitialization(); + DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance); // TOOD (coverage): random select + DatabaseDescriptor.setLocalDataCenter("test"); + StreamingChannel.Factory.Global.unsafeSet(new StreamingChannel.Factory() + { + private final AtomicInteger counter = new AtomicInteger(); + + @Override + public StreamingChannel create(InetSocketAddress to, int messagingVersion, StreamingChannel.Kind kind) throws IOException + { + StreamingChannel mock = Mockito.mock(StreamingChannel.class); + int id = counter.incrementAndGet(); + StreamSession session = Mockito.mock(StreamSession.class); + StreamReceiveException access = new StreamReceiveException(session, "mock access rejected"); + StreamingDataInputPlus input = Mockito.mock(StreamingDataInputPlus.class, invocationOnMock -> { + throw access; + }); + Mockito.doNothing().when(input).close(); + Mockito.when(mock.in()).thenReturn(input); + Mockito.when(mock.id()).thenReturn(id); + Mockito.when(mock.peer()).thenReturn(to); + Mockito.when(mock.connectedTo()).thenReturn(to); + Mockito.when(mock.send(Mockito.any())).thenReturn(ImmediateFuture.success(null)); + Mockito.when(mock.close()).thenReturn(ImmediateFuture.success(null)); + return mock; + } + }); + ExecutorFactory delegate = ExecutorFactory.Global.executorFactory(); + ExecutorFactory.Global.unsafeSet(new ExecutorFactory() + { + @Override + public LocalAwareSubFactory localAware() + { + return delegate.localAware(); + } + + @Override + public ScheduledExecutorPlus scheduled(boolean executeOnShutdown, String name, int priority, SimulatorSemantics simulatorSemantics) + { + return delegate.scheduled(executeOnShutdown, name, priority, simulatorSemantics); + } + + private boolean shouldMock() + { + return StackWalker.getInstance().walk(frame -> { + StackWalker.StackFrame caller = frame.skip(3).findFirst().get(); + return caller.getClassName().startsWith("org.apache.cassandra.streaming."); + }); + } + + @Override + public Thread startThread(String name, Runnable runnable, InfiniteLoopExecutor.Daemon daemon) + { + if (shouldMock()) return new Thread(); + return delegate.startThread(name, runnable, daemon); + } + + @Override + public Interruptible infiniteLoop(String name, Interruptible.Task task, InfiniteLoopExecutor.SimulatorSafe simulatorSafe, InfiniteLoopExecutor.Daemon daemon, InfiniteLoopExecutor.Interrupts interrupts) + { + return delegate.infiniteLoop(name, task, simulatorSafe, daemon, interrupts); + } + + @Override + public ThreadGroup newThreadGroup(String name) + { + return delegate.newThreadGroup(name); + } + + @Override + public ExecutorBuilderFactory<ExecutorPlus, SequentialExecutorPlus> withJmx(String jmxPath) + { + return delegate.withJmx(jmxPath); + } + + @Override + public ExecutorBuilder<? extends SequentialExecutorPlus> configureSequential(String name) + { + return delegate.configureSequential(name); + } + + @Override + public ExecutorBuilder<? extends ExecutorPlus> configurePooled(String name, int threads) + { + return delegate.configurePooled(name, threads); + } + }); + + // will both make sure this is loaded and used + if (!(Clock.Global.clock() instanceof ClockAccess)) throw new IllegalStateException("Unable to override clock"); + + // set the repair rcp timeout high so we don't hit it... this class is mostly testing repair reaching success + // so don't want to deal with unlucky histories... + DatabaseDescriptor.setRepairRpcTimeout(TimeUnit.DAYS.toMillis(1)); + + + InMemory.setUpClass(); + } + + @Before + public void setupSchema() + { + if (SETUP_SCHEMA) return; + SETUP_SCHEMA = true; + // StorageService can not be mocked out, nor can ColumnFamilyStores, so make sure that the keyspace is a "local" keyspace to avoid replication as the peers don't actually exist for replication + schemaChange(String.format("CREATE KEYSPACE %s WITH REPLICATION = {'class': '%s'}", SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, HackStrat.class.getName())); + for (TableMetadata table : SystemDistributedKeyspace.metadata().tables) + schemaChange(table.toCqlString(false, false)); + + createSchema(); + } + + protected void cleanupRepairTables() + { + for (String table : Arrays.asList(SystemKeyspace.REPAIRS)) + execute(String.format("TRUNCATE %s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, table)); + } + + private void createSchema() + { + // The main reason to use random here with a fixed seed is just to have a set of tables that are not hard coded. + // The tables will have diversity to them that most likely doesn't matter to repair (hence why the tables are shared), but + // is useful just in case some assumptions change. + RandomSource rs = new DefaultRandom(42); + String ks = KEYSPACE_NAME_GEN.next(rs); + List<String> tableNames = Gens.lists(IDENTIFIER_GEN).unique().ofSizeBetween(10, 100).next(rs); + JavaRandom qt = new JavaRandom(rs.asJdkRandom()); + Tables.Builder tableBuilder = Tables.builder(); + List<TableId> ids = Gens.lists(TABLE_ID_GEN).unique().ofSize(tableNames.size()).next(rs); + for (int i = 0; i < tableNames.size(); i++) + { + String name = tableNames.get(i); + TableId id = ids.get(i); + TableMetadata tableMetadata = new CassandraGenerators.TableMetadataBuilder().withKeyspaceName(ks).withTableName(name).withTableId(id).withTableKinds(TableMetadata.Kind.REGULAR) + // shouldn't matter, just wanted to avoid UDT as that needs more setup + .withDefaultTypeGen(AbstractTypeGenerators.builder().withTypeKinds(AbstractTypeGenerators.TypeKind.PRIMITIVE).withoutPrimitive(EmptyType.instance).build()).build().generate(qt); + tableBuilder.add(tableMetadata); + } + KeyspaceParams params = KeyspaceParams.simple(3); + KeyspaceMetadata metadata = KeyspaceMetadata.create(ks, params, tableBuilder.build()); + + // create + schemaChange(metadata.toCqlString(false, false)); + KEYSPACE = ks; + for (TableMetadata table : metadata.tables) + schemaChange(table.toCqlString(false, false)); + TABLES = tableNames; + } + + static void enableMessageFaults(Cluster cluster) + { + cluster.allowedMessageFaults(new BiFunction<>() + { + private final LongHashSet noFaults = new LongHashSet(); + private final LongHashSet allowDrop = new LongHashSet(); + + @Override + public Set<Faults> apply(Cluster.Node node, Message<?> message) + { + if (RepairMessage.ALLOWS_RETRY.contains(message.verb())) + { + allowDrop.add(message.id()); + return Faults.DROPPED; + } + switch (message.verb()) + { + // these messages are not resilent to ephemeral issues + case STATUS_REQ: + case STATUS_RSP: + // paxos repair does not support faults and will cause a TIMEOUT error, failing the repair + case PAXOS2_CLEANUP_COMPLETE_REQ: + case PAXOS2_CLEANUP_REQ: + case PAXOS2_CLEANUP_RSP2: + case PAXOS2_CLEANUP_START_PREPARE_REQ: + case PAXOS2_CLEANUP_FINISH_PREPARE_REQ: + noFaults.add(message.id()); + return Faults.NONE; + default: + if (noFaults.contains(message.id())) return Faults.NONE; + if (allowDrop.contains(message.id())) return Faults.DROPPED; + // was a new message added and the test not updated? + IllegalStateException e = new IllegalStateException("Verb: " + message.verb()); + cluster.failures.add(e); + throw e; + } + } + }); + } + + static void runAndAssertSuccess(Cluster cluster, int example, boolean shouldSync, RepairCoordinator repair) + { + cluster.processAll(); - assertSuccess(example, shouldSync, repair); ++ assertSuccess(cluster, example, shouldSync, repair); + } + - static void assertSuccess(int example, boolean shouldSync, RepairCoordinator repair) ++ static void assertSuccess(Cluster cluster, int example, boolean shouldSync, RepairCoordinator repair) + { + Completable.Result result = repair.state.getResult(); + Assertions.assertThat(result) + .describedAs("Expected repair to have completed with success, but is still running... %s; example %d", repair.state, example).isNotNull() + .describedAs("Unexpected state: %s -> %s; example %d", repair.state, result, example).isEqualTo(Completable.Result.success(repairSuccessMessage(repair))); + Assertions.assertThat(repair.state.getStateTimesMillis().keySet()).isEqualTo(EnumSet.allOf(CoordinatorState.State.class)); + Assertions.assertThat(repair.state.getSessions()).isNotEmpty(); + boolean shouldSnapshot = repair.state.options.getParallelism() != RepairParallelism.PARALLEL + && (!repair.state.options.isIncremental() || repair.state.options.isPreview()); + for (SessionState session : repair.state.getSessions()) + { + Assertions.assertThat(session.getStateTimesMillis().keySet()).isEqualTo(EnumSet.allOf(SessionState.State.class)); + Assertions.assertThat(session.getJobs()).isNotEmpty(); + for (JobState job : session.getJobs()) + { + EnumSet<JobState.State> expected = EnumSet.allOf(JobState.State.class); + if (!shouldSnapshot) + { + expected.remove(JobState.State.SNAPSHOT_START); + expected.remove(JobState.State.SNAPSHOT_COMPLETE); + } + if (!shouldSync) + { + expected.remove(JobState.State.STREAM_START); + } + Set<JobState.State> actual = job.getStateTimesMillis().keySet(); + Assertions.assertThat(actual).isEqualTo(expected); + } + } ++ ++ assertParticipateResult(cluster, repair, Completable.Result.Kind.SUCCESS); ++ } ++ ++ protected static void assertParticipateResult(Cluster cluster, RepairCoordinator repair, Completable.Result.Kind kind) ++ { ++ for (InetAddressAndPort participate : Sets.union(Collections.singleton(repair.ctx.broadcastAddressAndPort()), repair.state.getNeighborsAndRanges().participants)) ++ { ++ assertParticipateResult(cluster, participate, repair.state.id, kind); ++ } ++ } ++ ++ protected static void assertParticipateResult(Cluster cluster, InetAddressAndPort participate, TimeUUID id, Completable.Result.Kind kind) ++ { ++ Cluster.Node node = cluster.nodes.get(participate); ++ ParticipateState state = node.repair().participate(id); ++ Assertions.assertThat(state).describedAs("Node %s is missing ParticipateState", node).isNotNull(); ++ Completable.Result particpateResult = state.getResult(); ++ Assertions.assertThat(particpateResult).describedAs("Node %s has the ParticipateState as still pending", node).isNotNull(); ++ Assertions.assertThat(particpateResult.kind).isEqualTo(kind); + } + + static String repairSuccessMessage(RepairCoordinator repair) + { + RepairOption options = repair.state.options; + if (options.isPreview()) + { + String suffix; + switch (options.getPreviewKind()) + { + case UNREPAIRED: + case ALL: + suffix = "Previewed data was in sync"; + break; + case REPAIRED: + suffix = "Repaired data is in sync"; + break; + default: + throw new IllegalArgumentException("Unexpected preview repair kind: " + options.getPreviewKind()); + } + return "Repair preview completed successfully; " + suffix; + } + return "Repair completed successfully"; + } + + InetAddressAndPort pickParticipant(RandomSource rs, Cluster.Node coordinator, RepairCoordinator repair) + { + if (repair.state.isComplete()) + throw new IllegalStateException("Repair is completed! " + repair.state.getResult()); + List<InetAddressAndPort> participaents = new ArrayList<>(repair.state.getNeighborsAndRanges().participants.size() + 1); + if (rs.nextBoolean()) participaents.add(coordinator.broadcastAddressAndPort()); + participaents.addAll(repair.state.getNeighborsAndRanges().participants); + participaents.sort(Comparator.naturalOrder()); + + InetAddressAndPort selected = rs.pick(participaents); + return selected; + } + + static void addMismatch(RandomSource rs, ColumnFamilyStore cfs, Validator validator) + { + ValidationState state = validator.state; + int maxDepth = DatabaseDescriptor.getRepairSessionMaxTreeDepth(); + state.phase.start(MISMATCH_NUM_PARTITIONS, 1024); + + MerkleTrees trees = new MerkleTrees(cfs.getPartitioner()); + for (Range<Token> range : validator.desc.ranges) + { + int depth = (int) Math.min(Math.ceil(Math.log(MISMATCH_NUM_PARTITIONS) / Math.log(2)), maxDepth); + trees.addMerkleTree((int) Math.pow(2, depth), range); + } + Set<Token> allTokens = new HashSet<>(); + for (Range<Token> range : validator.desc.ranges) + { + Gen<Token> gen = fromQT(CassandraGenerators.tokensInRange(range)); + Set<Token> tokens = new LinkedHashSet<>(); + for (int i = 0, size = rs.nextInt(1, 10); i < size; i++) + { + for (int attempt = 0; !tokens.add(gen.next(rs)) && attempt < 5; attempt++) + { + } + } + // tokens may or may not be of the expected size; this depends on how wide the range is + for (Token token : tokens) + trees.split(token); + allTokens.addAll(tokens); + } + for (Token token : allTokens) + { + findCorrectRange(trees, token, range -> { + Digest digest = Digest.forValidator(); + digest.update(ByteBuffer.wrap(token.toString().getBytes(StandardCharsets.UTF_8))); + range.addHash(new MerkleTree.RowHash(token, digest.digest(), 1)); + }); + } + state.partitionsProcessed++; + state.bytesRead = 1024; + state.phase.sendingTrees(); + Stage.ANTI_ENTROPY.execute(() -> { + state.phase.success(); + validator.respond(new ValidationResponse(validator.desc, trees)); + }); + } + + private static void findCorrectRange(MerkleTrees trees, Token token, Consumer<MerkleTree.TreeRange> fn) + { + MerkleTrees.TreeRangeIterator it = trees.rangeIterator(); + while (it.hasNext()) + { + MerkleTree.TreeRange next = it.next(); + if (next.contains(token)) + { + fn.accept(next); + return; + } + } + } + + private enum RepairType + {FULL, IR} + + private enum PreviewType + {NONE, REPAIRED, UNREPAIRED} + + static RepairOption repairOption(RandomSource rs, Cluster.Node coordinator, String ks, List<String> tableNames) + { + return repairOption(rs, coordinator, ks, Gens.lists(Gens.pick(tableNames)).ofSizeBetween(1, tableNames.size()), Gens.enums().all(RepairType.class), Gens.enums().all(PreviewType.class), Gens.enums().all(RepairParallelism.class)); + } + + static RepairOption irOption(RandomSource rs, Cluster.Node coordinator, String ks, Gen<List<String>> tablesGen) + { + return repairOption(rs, coordinator, ks, tablesGen, Gens.constant(RepairType.IR), Gens.constant(PreviewType.NONE), Gens.enums().all(RepairParallelism.class)); + } + + static RepairOption previewOption(RandomSource rs, Cluster.Node coordinator, String ks, Gen<List<String>> tablesGen) + { + return repairOption(rs, coordinator, ks, tablesGen, Gens.constant(RepairType.FULL), Gens.constant(PreviewType.REPAIRED), Gens.enums().all(RepairParallelism.class)); + } + + private static RepairOption repairOption(RandomSource rs, Cluster.Node coordinator, String ks, Gen<List<String>> tablesGen, Gen<RepairType> repairTypeGen, Gen<PreviewType> previewTypeGen, Gen<RepairParallelism> repairParallelismGen) + { + List<String> args = new ArrayList<>(); + args.add(ks); + args.addAll(tablesGen.next(rs)); + args.add("-pr"); + RepairType type = repairTypeGen.next(rs); + switch (type) + { + case IR: + // default + break; + case FULL: + args.add("--full"); + break; + default: + throw new AssertionError("Unsupported repair type: " + type); + } + PreviewType previewType = previewTypeGen.next(rs); + switch (previewType) + { + case NONE: + break; + case REPAIRED: + args.add("--validate"); + break; + case UNREPAIRED: + args.add("--preview"); + break; + default: + throw new AssertionError("Unsupported preview type: " + previewType); + } + RepairParallelism parallelism = repairParallelismGen.next(rs); + switch (parallelism) + { + case SEQUENTIAL: + args.add("--sequential"); + break; + case PARALLEL: + // default + break; + case DATACENTER_AWARE: + args.add("--dc-parallel"); + break; + default: + throw new AssertionError("Unknown parallelism: " + parallelism); + } + if (rs.nextBoolean()) args.add("--optimise-streams"); + RepairOption options = RepairOption.parse(Repair.parseOptionMap(() -> "test", args), DatabaseDescriptor.getPartitioner()); + if (options.getRanges().isEmpty()) + { + if (options.isPrimaryRange()) + { + // when repairing only primary range, neither dataCenters nor hosts can be set + if (options.getDataCenters().isEmpty() && options.getHosts().isEmpty()) + options.getRanges().addAll(coordinator.getPrimaryRanges(ks)); + // except dataCenters only contain local DC (i.e. -local) + else if (options.isInLocalDCOnly()) + options.getRanges().addAll(coordinator.getPrimaryRangesWithinDC(ks)); + else + throw new IllegalArgumentException("You need to run primary range repair on all nodes in the cluster."); + } + else + { + Iterables.addAll(options.getRanges(), coordinator.getLocalReplicas(ks).onlyFull().ranges()); + } + } + return options; + } + + enum Faults + { + DELAY, DROP; + + public static final Set<Faults> NONE = Collections.emptySet(); + public static final Set<Faults> DROPPED = EnumSet.of(DELAY, DROP); + } + + private static class Connection + { + final InetAddressAndPort from, to; + + private Connection(InetAddressAndPort from, InetAddressAndPort to) + { + this.from = from; + this.to = to; + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Connection that = (Connection) o; + return from.equals(that.from) && to.equals(that.to); + } + + @Override + public int hashCode() + { + return Objects.hash(from, to); + } + + @Override + public String toString() + { + return "Connection{" + "from=" + from + ", to=" + to + '}'; + } + } + + interface MessageListener + { + default void preHandle(Cluster.Node node, Message<?> msg) {} + } + + static class Cluster + { + private static final FailingBiConsumer<ColumnFamilyStore, Validator> DEFAULT_VALIDATION = ValidationManager::doValidation; + + final Map<InetAddressAndPort, Node> nodes; + private final IFailureDetector failureDetector = Mockito.mock(IFailureDetector.class); + private final IEndpointSnitch snitch = Mockito.mock(IEndpointSnitch.class); + private final SimulatedExecutorFactory globalExecutor; + final ScheduledExecutorPlus unorderedScheduled; + final ExecutorPlus orderedExecutor; + private final Gossip gossiper = new Gossip(); + private final MBeanWrapper mbean = Mockito.mock(MBeanWrapper.class); + private final List<Throwable> failures = new ArrayList<>(); + private final List<MessageListener> listeners = new ArrayList<>(); + private final RandomSource rs; + private BiFunction<Node, Message<?>, Set<Faults>> allowedMessageFaults = (a, b) -> Collections.emptySet(); + + private final Map<Connection, LongSupplier> networkLatencies = new HashMap<>(); + private final Map<Connection, Supplier<Boolean>> networkDrops = new HashMap<>(); + + Cluster(RandomSource rs) + { + ClockAccess.includeThreadAsOwner(); + this.rs = rs; + globalExecutor = new SimulatedExecutorFactory(rs, fromQT(Generators.TIMESTAMP_GEN.map(Timestamp::getTime)).mapToLong(TimeUnit.MILLISECONDS::toNanos).next(rs)); + orderedExecutor = globalExecutor.configureSequential("ignore").build(); + unorderedScheduled = globalExecutor.scheduled("ignored"); + + + + // We run tests in an isolated JVM per class, so not cleaing up is safe... but if that assumption ever changes, will need to cleanup + Stage.ANTI_ENTROPY.unsafeSetExecutor(orderedExecutor); + Stage.MISC.unsafeSetExecutor(orderedExecutor); + Stage.INTERNAL_RESPONSE.unsafeSetExecutor(unorderedScheduled); + Mockito.when(failureDetector.isAlive(Mockito.any())).thenReturn(true); + Thread expectedThread = Thread.currentThread(); + NoSpamLogger.unsafeSetClock(() -> { + if (Thread.currentThread() != expectedThread) + throw new AssertionError("NoSpamLogger.Clock accessed outside of fuzzing..."); + return globalExecutor.nanoTime(); + }); + + int numNodes = rs.nextInt(3, 10); + List<String> dcs = Gens.lists(IDENTIFIER_GEN).unique().ofSizeBetween(1, Math.min(10, numNodes)).next(rs); + Map<InetAddressAndPort, Node> nodes = Maps.newHashMapWithExpectedSize(numNodes); + Gen<Token> tokenGen = fromQT(CassandraGenerators.token(DatabaseDescriptor.getPartitioner())); + Gen<UUID> hostIdGen = fromQT(Generators.UUID_RANDOM_GEN); + Set<Token> tokens = new HashSet<>(); + Set<UUID> hostIds = new HashSet<>(); + for (int i = 0; i < numNodes; i++) + { + InetAddressAndPort addressAndPort = ADDRESS_W_PORT.next(rs); + while (nodes.containsKey(addressAndPort)) addressAndPort = ADDRESS_W_PORT.next(rs); + Token token; + while (!tokens.add(token = tokenGen.next(rs))) + { + } + UUID hostId; + while (!hostIds.add(hostId = hostIdGen.next(rs))) + { + } + + String dc = rs.pick(dcs); + String rack = "rack"; + Mockito.when(snitch.getDatacenter(Mockito.eq(addressAndPort))).thenReturn(dc); + Mockito.when(snitch.getRack(Mockito.eq(addressAndPort))).thenReturn(rack); + + VersionedValue.VersionedValueFactory valueFactory = new VersionedValue.VersionedValueFactory(DatabaseDescriptor.getPartitioner()); + EndpointState state = new EndpointState(new HeartBeatState(42, 42)); + state.addApplicationState(ApplicationState.STATUS, valueFactory.normal(Collections.singleton(token))); + state.addApplicationState(ApplicationState.STATUS_WITH_PORT, valueFactory.normal(Collections.singleton(token))); + state.addApplicationState(ApplicationState.HOST_ID, valueFactory.hostId(hostId)); + state.addApplicationState(ApplicationState.TOKENS, valueFactory.tokens(Collections.singleton(token))); + state.addApplicationState(ApplicationState.DC, valueFactory.datacenter(dc)); + state.addApplicationState(ApplicationState.RACK, valueFactory.rack(rack)); + state.addApplicationState(ApplicationState.RELEASE_VERSION, valueFactory.releaseVersion()); + + gossiper.endpoints.put(addressAndPort, state); + + Node node = new Node(hostId, addressAndPort, Collections.singletonList(token), new Messaging(addressAndPort)); + nodes.put(addressAndPort, node); + } + this.nodes = nodes; + + TokenMetadata tm = StorageService.instance.getTokenMetadata(); + tm.clearUnsafe(); + for (Node inst : nodes.values()) + { + tm.updateHostId(inst.hostId(), inst.broadcastAddressAndPort()); + for (Token token : inst.tokens()) + tm.updateNormalToken(token, inst.broadcastAddressAndPort()); + } + } + + public Closeable addListener(MessageListener listener) + { + listeners.add(listener); + return () -> removeListener(listener); + } + + public void removeListener(MessageListener listener) + { + listeners.remove(listener); + } + + public void allowedMessageFaults(BiFunction<Node, Message<?>, Set<Faults>> fn) + { + this.allowedMessageFaults = fn; + } + + public void checkFailures() + { + if (Thread.interrupted()) + failures.add(new InterruptedException()); + if (failures.isEmpty()) return; + AssertionError error = new AssertionError("Unexpected exceptions found"); + failures.forEach(error::addSuppressed); + failures.clear(); + throw error; + } + + public boolean processOne() + { + boolean result = globalExecutor.processOne(); + checkFailures(); + return result; + } + + public void processAll() + { + while (processOne()) + { + } + } + + private class CallbackContext + { + final RequestCallback callback; + + private CallbackContext(RequestCallback callback) + { + this.callback = Objects.requireNonNull(callback); + } + + public void onResponse(Message msg) + { + callback.onResponse(msg); + } + + public void onFailure(InetAddressAndPort from, RequestFailureReason failureReason) + { + if (callback.invokeOnFailure()) callback.onFailure(from, failureReason); + } + } + + private static class CallbackKey + { + private final long id; + private final InetAddressAndPort peer; + + private CallbackKey(long id, InetAddressAndPort peer) + { + this.id = id; + this.peer = peer; + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + CallbackKey that = (CallbackKey) o; + return id == that.id && peer.equals(that.peer); + } + + @Override + public int hashCode() + { + return Objects.hash(id, peer); + } + + @Override + public String toString() + { + return "CallbackKey{" + + "id=" + id + + ", peer=" + peer + + '}'; + } + } + + private class Messaging implements MessageDelivery + { + final InetAddressAndPort broadcastAddressAndPort; + final Map<CallbackKey, CallbackContext> callbacks = new HashMap<>(); + + private Messaging(InetAddressAndPort broadcastAddressAndPort) + { + this.broadcastAddressAndPort = broadcastAddressAndPort; + } + + @Override + public <REQ> void send(Message<REQ> message, InetAddressAndPort to) + { + message = message.withFrom(broadcastAddressAndPort); + maybeEnqueue(message, to, null); + } + + @Override + public <REQ, RSP> void sendWithCallback(Message<REQ> message, InetAddressAndPort to, RequestCallback<RSP> cb) + { + message = message.withFrom(broadcastAddressAndPort); + maybeEnqueue(message, to, cb); + } + + @Override + public <REQ, RSP> void sendWithCallback(Message<REQ> message, InetAddressAndPort to, RequestCallback<RSP> cb, ConnectionType specifyConnection) + { + message = message.withFrom(broadcastAddressAndPort); + maybeEnqueue(message, to, cb); + } + + private <REQ, RSP> void maybeEnqueue(Message<REQ> message, InetAddressAndPort to, @Nullable RequestCallback<RSP> callback) + { + CallbackContext cb; + if (callback != null) + { + CallbackKey key = new CallbackKey(message.id(), to); + if (callbacks.containsKey(key)) + throw new AssertionError("Message id " + message.id() + " to " + to + " already has a callback"); + cb = new CallbackContext(callback); + callbacks.put(key, cb); + } + else + { + cb = null; + } + boolean toSelf = this.broadcastAddressAndPort.equals(to); + Node node = nodes.get(to); + Set<Faults> allowedFaults = allowedMessageFaults.apply(node, message); + if (allowedFaults.isEmpty()) + { + // enqueue so stack overflow doesn't happen with the inlining + unorderedScheduled.submit(() -> node.handle(message)); + } + else + { + Runnable enqueue = () -> { + if (!allowedFaults.contains(Faults.DELAY)) + { + unorderedScheduled.submit(() -> node.handle(message)); + } + else + { + if (toSelf) unorderedScheduled.submit(() -> node.handle(message)); + else + unorderedScheduled.schedule(() -> node.handle(message), networkJitterNanos(to), TimeUnit.NANOSECONDS); + } + }; + + if (!allowedFaults.contains(Faults.DROP)) enqueue.run(); + else + { + if (!toSelf && networkDrops(to)) + { +// logger.warn("Dropped message {}", message); + // drop + } + else + { + enqueue.run(); + } + } + + if (cb != null) + { + unorderedScheduled.schedule(() -> { + CallbackContext ctx = callbacks.remove(new CallbackKey(message.id(), to)); + if (ctx != null) + { + assert ctx == cb; + try + { + ctx.onFailure(to, RequestFailureReason.TIMEOUT); + } + catch (Throwable t) + { + failures.add(t); + } + } + }, message.verb().expiresAfterNanos(), TimeUnit.NANOSECONDS); + } + } + } + + private long networkJitterNanos(InetAddressAndPort to) + { + return networkLatencies.computeIfAbsent(new Connection(broadcastAddressAndPort, to), ignore -> { + long min = TimeUnit.MICROSECONDS.toNanos(500); + long maxSmall = TimeUnit.MILLISECONDS.toNanos(5); + long max = TimeUnit.SECONDS.toNanos(5); + LongSupplier small = () -> rs.nextLong(min, maxSmall); + LongSupplier large = () -> rs.nextLong(maxSmall, max); + return Gens.bools().runs(rs.nextInt(1, 11) / 100.0D, rs.nextInt(3, 15)).mapToLong(b -> b ? large.getAsLong() : small.getAsLong()).asLongSupplier(rs); + }).getAsLong(); + } + + private boolean networkDrops(InetAddressAndPort to) + { + return networkDrops.computeIfAbsent(new Connection(broadcastAddressAndPort, to), ignore -> Gens.bools().runs(rs.nextInt(1, 11) / 100.0D, rs.nextInt(3, 15)).asSupplier(rs)).get(); + } + + @Override + public <REQ, RSP> Future<Message<RSP>> sendWithResult(Message<REQ> message, InetAddressAndPort to) + { + AsyncPromise<Message<RSP>> promise = new AsyncPromise<>(); + sendWithCallback(message, to, new RequestCallback<RSP>() + { + @Override + public void onResponse(Message<RSP> msg) + { + promise.trySuccess(msg); + } + + @Override + public void onFailure(InetAddressAndPort from, RequestFailureReason failureReason) + { + promise.tryFailure(new MessagingService.FailureResponseException(from, failureReason)); + } + + @Override + public boolean invokeOnFailure() + { + return true; + } + }); + return promise; + } + + @Override + public <V> void respond(V response, Message<?> message) + { + send(message.responseWith(response), message.respondTo()); + } + } + + private class Gossip implements IGossiper + { + private final Map<InetAddressAndPort, EndpointState> endpoints = new HashMap<>(); + + @Override + public void register(IEndpointStateChangeSubscriber subscriber) + { + + } + + @Override + public void unregister(IEndpointStateChangeSubscriber subscriber) + { + + } + + @Nullable + @Override + public EndpointState getEndpointStateForEndpoint(InetAddressAndPort ep) + { + return endpoints.get(ep); + } + + @Override + public void notifyFailureDetector(Map<InetAddressAndPort, EndpointState> remoteEpStateMap) + { + + } + + @Override + public void applyStateLocally(Map<InetAddressAndPort, EndpointState> epStateMap) + { + // If we were testing paxos this would be wrong... + // CASSANDRA-18917 added support for simulating Gossip, but gossip issues were found so couldn't merge that patch... + // For the paxos repair, since we don't care about paxos messages, this is ok to no-op for now, but if paxos cleanup + // ever was to be tested this logic would need to be implemented + } + } + + class Node implements SharedContext + { + private final ICompactionManager compactionManager = Mockito.mock(ICompactionManager.class); + final UUID hostId; + final InetAddressAndPort addressAndPort; + final Collection<Token> tokens; + final ActiveRepairService activeRepairService; + final IVerbHandler verbHandler; + final Messaging messaging; + final IValidationManager validationManager; + private FailingBiConsumer<ColumnFamilyStore, Validator> doValidation = DEFAULT_VALIDATION; + final PaxosRepairState paxosRepairState; + private final StreamExecutor defaultStreamExecutor = plan -> { + long delayNanos = rs.nextLong(TimeUnit.SECONDS.toNanos(5), TimeUnit.MINUTES.toNanos(10)); + unorderedScheduled.schedule(() -> { + StreamState success = new StreamState(plan.planId(), plan.streamOperation(), Collections.emptySet()); + for (StreamEventHandler handler : plan.handlers()) + handler.onSuccess(success); + }, delayNanos, TimeUnit.NANOSECONDS); + return null; + }; + private StreamExecutor streamExecutor = defaultStreamExecutor; + + private Node(UUID hostId, InetAddressAndPort addressAndPort, Collection<Token> tokens, Messaging messaging) + { + this.hostId = hostId; + this.addressAndPort = addressAndPort; + this.tokens = tokens; + this.messaging = messaging; + this.activeRepairService = new ActiveRepairService(this); + this.paxosRepairState = new PaxosRepairState(this); + this.validationManager = (cfs, validator) -> unorderedScheduled.submit(() -> { + try + { + doValidation.acceptOrFail(cfs, validator); + } + catch (Throwable e) + { + validator.fail(e); + } + }); + this.verbHandler = new IVerbHandler<>() + { + private final RepairMessageVerbHandler repairVerbHandler = new RepairMessageVerbHandler(Node.this); + private final IVerbHandler<PaxosStartPrepareCleanup.Request> paxosStartPrepareCleanup = PaxosStartPrepareCleanup.createVerbHandler(Node.this); + private final IVerbHandler<PaxosCleanupRequest> paxosCleanupRequestIVerbHandler = PaxosCleanupRequest.createVerbHandler(Node.this); + private final IVerbHandler<PaxosCleanupHistory> paxosFinishPrepareCleanup = PaxosFinishPrepareCleanup.createVerbHandler(Node.this); + private final IVerbHandler<PaxosCleanupResponse> paxosCleanupResponse = PaxosCleanupResponse.createVerbHandler(Node.this); + private final IVerbHandler<PaxosCleanupComplete.Request> paxosCleanupComplete = PaxosCleanupComplete.createVerbHandler(Node.this); + @Override + public void doVerb(Message message) throws IOException + { + switch (message.verb()) + { + case PAXOS2_CLEANUP_START_PREPARE_REQ: + paxosStartPrepareCleanup.doVerb(message); + break; + case PAXOS2_CLEANUP_REQ: + paxosCleanupRequestIVerbHandler.doVerb(message); + break; + case PAXOS2_CLEANUP_FINISH_PREPARE_REQ: + paxosFinishPrepareCleanup.doVerb(message); + break; + case PAXOS2_CLEANUP_RSP2: + paxosCleanupResponse.doVerb(message); + break; + case PAXOS2_CLEANUP_COMPLETE_REQ: + paxosCleanupComplete.doVerb(message); + break; + default: + repairVerbHandler.doVerb(message); + } + } + }; + + activeRepairService.start(); + } + + public Closeable doValidation(FailingBiConsumer<ColumnFamilyStore, Validator> fn) + { + FailingBiConsumer<ColumnFamilyStore, Validator> previous = this.doValidation; + if (previous != DEFAULT_VALIDATION) + throw new IllegalStateException("Attemptted to override validation, but was already overridden"); + this.doValidation = fn; + return () -> this.doValidation = previous; + } + + public Closeable doValidation(Function<FailingBiConsumer<ColumnFamilyStore, Validator>, FailingBiConsumer<ColumnFamilyStore, Validator>> fn) + { + FailingBiConsumer<ColumnFamilyStore, Validator> previous = this.doValidation; + this.doValidation = fn.apply(previous); + return () -> this.doValidation = previous; + } + + public Closeable doSync(StreamExecutor streamExecutor) + { + StreamExecutor previous = this.streamExecutor; + if (previous != defaultStreamExecutor) + throw new IllegalStateException("Attemptted to override sync, but was already overridden"); + this.streamExecutor = streamExecutor; + return () -> this.streamExecutor = previous; + } + + void handle(Message msg) + { + msg = serde(msg); + if (msg == null) + { + logger.warn("Got a message that failed to serialize/deserialize"); + return; + } + for (MessageListener l : listeners) + l.preHandle(this, msg); + if (msg.verb().isResponse()) + { + // handle callbacks + CallbackKey key = new CallbackKey(msg.id(), msg.from()); + if (messaging.callbacks.containsKey(key)) + { + CallbackContext callback = messaging.callbacks.remove(key); + if (callback == null) + return; + try + { + if (msg.isFailureResponse()) + callback.onFailure(msg.from(), (RequestFailureReason) msg.payload); + else callback.onResponse(msg); + } + catch (Throwable t) + { + failures.add(t); + } + } + } + else + { + try + { + verbHandler.doVerb(msg); + } + catch (Throwable e) + { + failures.add(e); + } + } + } + + public UUID hostId() + { + return hostId; + } + + @Override + public InetAddressAndPort broadcastAddressAndPort() + { + return addressAndPort; + } + + public Collection<Token> tokens() + { + return tokens; + } + + public IFailureDetector failureDetector() + { + return failureDetector; + } + + @Override + public IEndpointSnitch snitch() + { + return snitch; + } + + @Override + public IGossiper gossiper() + { + return gossiper; + } + + @Override + public ICompactionManager compactionManager() + { + return compactionManager; + } + + public ExecutorFactory executorFactory() + { + return globalExecutor; + } + + public ScheduledExecutorPlus optionalTasks() + { + return unorderedScheduled; + } + + @Override + public ScheduledExecutorPlus nonPeriodicTasks() + { + return unorderedScheduled; + } + + @Override + public ScheduledExecutorPlus scheduledTasks() + { + return unorderedScheduled; + } + + @Override + public Supplier<Random> random() + { + return () -> rs.fork().asJdkRandom(); + } + + public Clock clock() + { + return globalExecutor; + } + + public MessageDelivery messaging() + { + return messaging; + } + + public MBeanWrapper mbean() + { + return mbean; + } + + public RepairCoordinator repair(String ks, RepairOption options) + { + return repair(ks, options, true); + } + + public RepairCoordinator repair(String ks, RepairOption options, boolean addFailureOnErrorNotification) + { + RepairCoordinator repair = new RepairCoordinator(this, (name, tables) -> StorageService.instance.getValidColumnFamilies(false, false, name, tables), name -> StorageService.instance.getReplicas(name, broadcastAddressAndPort()), 42, options, ks); + if (addFailureOnErrorNotification) + { + repair.addProgressListener((tag, event) -> { + if (event.getType() == ProgressEventType.ERROR) + failures.add(new AssertionError(event.getMessage())); + }); + } + return repair; + } + + public RangesAtEndpoint getLocalReplicas(String ks) + { + return StorageService.instance.getReplicas(ks, broadcastAddressAndPort()); + } + + public Collection<? extends Range<Token>> getPrimaryRanges(String ks) + { + return StorageService.instance.getPrimaryRangesForEndpoint(ks, broadcastAddressAndPort()); + } + + public Collection<? extends Range<Token>> getPrimaryRangesWithinDC(String ks) + { + return StorageService.instance.getPrimaryRangeForEndpointWithinDC(ks, broadcastAddressAndPort()); + } + + @Override + public ActiveRepairService repair() + { + return activeRepairService; + } + + @Override + public IValidationManager validationManager() + { + return validationManager; + } + + @Override + public TableRepairManager repairManager(ColumnFamilyStore store) + { + return new CassandraTableRepairManager(store, this) + { + @Override + public void snapshot(String name, Collection<Range<Token>> ranges, boolean force) + { + // no-op + } + }; + } + + @Override + public StreamExecutor streamExecutor() + { + return streamExecutor; + } + + @Override + public PendingRangeCalculatorService pendingRangeCalculator() + { + return PendingRangeCalculatorService.instance; + } + + @Override + public PaxosRepairState paxosRepairState() + { + return paxosRepairState; + } ++ ++ public String toString() ++ { ++ return "Node{" + ++ "hostId=" + hostId + ++ ", addressAndPort=" + addressAndPort.getAddress().getHostAddress() + ":" + addressAndPort.getPort() + ++ '}'; ++ } + } + + private Message serde(Message msg) + { + try (DataOutputBuffer b = DataOutputBuffer.scratchBuffer.get()) + { + int messagingVersion = MessagingService.current_version; + Message.serializer.serialize(msg, b, messagingVersion); + DataInputBuffer in = new DataInputBuffer(b.unsafeGetBufferAndFlip(), false); + return Message.serializer.deserialize(in, msg.from(), messagingVersion); + } + catch (Throwable e) + { + failures.add(e); + return null; + } + } + } + + private static <T> Gen<T> fromQT(org.quicktheories.core.Gen<T> qt) + { + return rs -> { + JavaRandom r = new JavaRandom(rs.asJdkRandom()); + return qt.generate(r); + }; + } + + public static class HackStrat extends LocalStrategy + { + public HackStrat(String keyspaceName, TokenMetadata tokenMetadata, IEndpointSnitch snitch, Map<String, String> configOptions) + { + super(keyspaceName, tokenMetadata, snitch, configOptions); + } + } + + /** + * Since the clock is accessable via {@link Global#currentTimeMillis()} and {@link Global#nanoTime()}, and only repair subsystem has the requirement not to touch that, this class + * acts as a safty check that validates that repair does not touch these methods but allows the other subsystems to do so. + */ + public static class ClockAccess implements Clock + { + private static final Set<Thread> OWNERS = Collections.synchronizedSet(new HashSet<>()); + private final Clock delegate = new Default(); + + public static void includeThreadAsOwner() + { + OWNERS.add(Thread.currentThread()); + } + + @Override + public long nanoTime() + { + checkAccess(); + return delegate.nanoTime(); + } + + @Override + public long currentTimeMillis() + { + checkAccess(); + return delegate.currentTimeMillis(); + } + + private enum Access + {MAIN_THREAD_ONLY, REJECT, IGNORE} + + private void checkAccess() + { + Access access = StackWalker.getInstance().walk(frames -> { + Iterator<StackWalker.StackFrame> it = frames.iterator(); + boolean topLevel = false; + while (it.hasNext()) + { + StackWalker.StackFrame next = it.next(); + if (!topLevel) + { + // need to find the top level! + while (!Clock.Global.class.getName().equals(next.getClassName())) + { + assert it.hasNext(); + next = it.next(); + } + topLevel = true; + assert it.hasNext(); + next = it.next(); + } + if (FuzzTestBase.class.getName().equals(next.getClassName())) return Access.MAIN_THREAD_ONLY; + // this is non-deterministic... but since the scope of the work is testing repair and not paxos... this is unblocked for now... + if (("org.apache.cassandra.service.paxos.Paxos".equals(next.getClassName()) && "newBallot".equals(next.getMethodName())) + || ("org.apache.cassandra.service.paxos.uncommitted.PaxosBallotTracker".equals(next.getClassName()) && "updateLowBound".equals(next.getMethodName()))) + return Access.MAIN_THREAD_ONLY; + if (next.getClassName().startsWith("org.apache.cassandra.db.") || next.getClassName().startsWith("org.apache.cassandra.gms.") || next.getClassName().startsWith("org.apache.cassandra.cql3.") || next.getClassName().startsWith("org.apache.cassandra.metrics.") || next.getClassName().startsWith("org.apache.cassandra.utils.concurrent.") + || next.getClassName().startsWith("org.apache.cassandra.utils.TimeUUID") // this would be good to solve + || next.getClassName().startsWith(PendingAntiCompaction.class.getName())) + return Access.IGNORE; + if (next.getClassName().startsWith("org.apache.cassandra.repair") || ActiveRepairService.class.getName().startsWith(next.getClassName())) + return Access.REJECT; + } + return Access.IGNORE; + }); + Thread current = Thread.currentThread(); + switch (access) + { + case IGNORE: + return; + case REJECT: + throw new IllegalStateException("Rejecting access"); + case MAIN_THREAD_ONLY: + if (!OWNERS.contains(current)) throw new IllegalStateException("Accessed in wrong thread: " + current); + break; + } + } + } + + static class SimulatedFault extends RuntimeException + { + SimulatedFault(String message) + { + super(message); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org