http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/repair/RepairRunnable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java index 6f7297b..422dbdb 100644 --- a/src/java/org/apache/cassandra/repair/RepairRunnable.java +++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java @@ -20,6 +20,7 @@ package org.apache.cassandra.repair; import java.net.InetAddress; import java.nio.ByteBuffer; import java.util.*; +import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -44,6 +45,7 @@ import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.repair.consistent.CoordinatorSession; import org.apache.cassandra.repair.messages.RepairOption; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.service.QueryState; @@ -66,11 +68,15 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti { private static final Logger logger = LoggerFactory.getLogger(RepairRunnable.class); - private StorageService storageService; + private final StorageService storageService; private final int cmd; private final RepairOption options; private final String keyspace; + private final String tag; + private final AtomicInteger progress = new AtomicInteger(); + private final int totalProgress; + private final List<ProgressListener> listeners = new ArrayList<>(); private static final AtomicInteger threadCounter = new AtomicInteger(1); @@ -81,6 +87,10 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti this.cmd = cmd; this.options = options; this.keyspace = keyspace; + + this.tag = "repair:" + cmd; + // get valid column families, calculate neighbors, validation, prepare for repair + number of ranges to repair + this.totalProgress = 4 + options.getRanges().size(); } @Override @@ -223,72 +233,35 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti return; } - // Set up RepairJob executor for this repair command. - final ListeningExecutorService executor = MoreExecutors.listeningDecorator(new JMXConfigurableThreadPoolExecutor(options.getJobThreads(), - Integer.MAX_VALUE, - TimeUnit.SECONDS, - new LinkedBlockingQueue<Runnable>(), - new NamedThreadFactory("Repair#" + cmd), - "internal")); - - List<ListenableFuture<RepairSessionResult>> futures = new ArrayList<>(options.getRanges().size()); - for (Pair<Set<InetAddress>, ? extends Collection<Range<Token>>> p : commonRanges) + if (options.isIncremental()) { - final RepairSession session = ActiveRepairService.instance.submitRepairSession(parentSession, - p.right, - keyspace, - options.getParallelism(), - p.left, - repairedAt, - options.isPullRepair(), - executor, - cfnames); - if (session == null) - continue; - // After repair session completes, notify client its result - Futures.addCallback(session, new FutureCallback<RepairSessionResult>() - { - public void onSuccess(RepairSessionResult result) - { - /** - * If the success message below is modified, it must also be updated on - * {@link org.apache.cassandra.utils.progress.jmx.LegacyJMXProgressSupport} - * for backward-compatibility support. - */ - String message = String.format("Repair session %s for range %s finished", session.getId(), - session.getRanges().toString()); - logger.info(message); - fireProgressEvent(tag, new ProgressEvent(ProgressEventType.PROGRESS, - progress.incrementAndGet(), - totalProgress, - message)); - } - - public void onFailure(Throwable t) - { - /** - * If the failure message below is modified, it must also be updated on - * {@link org.apache.cassandra.utils.progress.jmx.LegacyJMXProgressSupport} - * for backward-compatibility support. - */ - String message = String.format("Repair session %s for range %s failed with error %s", - session.getId(), session.getRanges().toString(), t.getMessage()); - logger.error(message, t); - fireProgressEvent(tag, new ProgressEvent(ProgressEventType.PROGRESS, - progress.incrementAndGet(), - totalProgress, - message)); - } - }); - futures.add(session); + consistentRepair(parentSession, repairedAt, startTime, traceState, allNeighbors, commonRanges, cfnames); } + else + { + normalRepair(parentSession, startTime, traceState, allNeighbors, commonRanges, cfnames); + } + } + + private void normalRepair(UUID parentSession, + long startTime, + TraceState traceState, + Set<InetAddress> allNeighbors, + List<Pair<Set<InetAddress>, ? extends Collection<Range<Token>>>> commonRanges, + String... cfnames) + { + + // Set up RepairJob executor for this repair command. + ListeningExecutorService executor = createExecutor(); + + // Setting the repairedAt time to UNREPAIRED_SSTABLE causes the repairedAt times to be preserved across streamed sstables + final ListenableFuture<List<RepairSessionResult>> allSessions = submitRepairSessions(parentSession, ActiveRepairService.UNREPAIRED_SSTABLE, false, executor, commonRanges, cfnames); // After all repair sessions completes(successful or not), // run anticompaction if necessary and send finish notice back to client final Collection<Range<Token>> successfulRanges = new ArrayList<>(); final AtomicBoolean hasFailure = new AtomicBoolean(); - final ListenableFuture<List<RepairSessionResult>> allSessions = Futures.successfulAsList(futures); - ListenableFuture anticompactionResult = Futures.transform(allSessions, new AsyncFunction<List<RepairSessionResult>, Object>() + ListenableFuture repairResult = Futures.transform(allSessions, new AsyncFunction<List<RepairSessionResult>, Object>() { @SuppressWarnings("unchecked") public ListenableFuture apply(List<RepairSessionResult> results) @@ -305,57 +278,188 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti hasFailure.compareAndSet(false, true); } } - return ActiveRepairService.instance.finishParentSession(parentSession, allNeighbors, successfulRanges); + return Futures.immediateFuture(null); } }); - Futures.addCallback(anticompactionResult, new FutureCallback<Object>() + Futures.addCallback(repairResult, new RepairCompleteCallback(parentSession, successfulRanges, startTime, traceState, hasFailure, executor)); + } + + private void consistentRepair(UUID parentSession, + long repairedAt, + long startTime, + TraceState traceState, + Set<InetAddress> allNeighbors, + List<Pair<Set<InetAddress>, ? extends Collection<Range<Token>>>> commonRanges, + String... cfnames) + { + // the local node also needs to be included in the set of + // participants, since coordinator sessions aren't persisted + Set<InetAddress> allParticipants = new HashSet<>(allNeighbors); + allParticipants.add(FBUtilities.getBroadcastAddress()); + + CoordinatorSession coordinatorSession = ActiveRepairService.instance.consistent.coordinated.registerSession(parentSession, allParticipants); + ListeningExecutorService executor = createExecutor(); + AtomicBoolean hasFailure = new AtomicBoolean(false); + ListenableFuture repairResult = coordinatorSession.execute(executor, + () -> submitRepairSessions(parentSession, repairedAt, true, executor, commonRanges, cfnames), + hasFailure); + Collection<Range<Token>> ranges = new HashSet<>(); + for (Collection<Range<Token>> range : Iterables.transform(commonRanges, cr -> cr.right)) + { + ranges.addAll(range); + } + Futures.addCallback(repairResult, new RepairCompleteCallback(parentSession, ranges, startTime, traceState, hasFailure, executor)); + } + + private ListenableFuture<List<RepairSessionResult>> submitRepairSessions(UUID parentSession, + long repairedAt, + boolean isConsistent, + ListeningExecutorService executor, + List<Pair<Set<InetAddress>, ? extends Collection<Range<Token>>>> commonRanges, + String... cfnames) + { + List<ListenableFuture<RepairSessionResult>> futures = new ArrayList<>(options.getRanges().size()); + for (Pair<Set<InetAddress>, ? extends Collection<Range<Token>>> p : commonRanges) + { + RepairSession session = ActiveRepairService.instance.submitRepairSession(parentSession, + p.right, + keyspace, + options.getParallelism(), + p.left, + repairedAt, + isConsistent, + options.isPullRepair(), + executor, + cfnames); + if (session == null) + continue; + Futures.addCallback(session, new RepairSessionCallback(session)); + futures.add(session); + } + return Futures.successfulAsList(futures); + } + + private ListeningExecutorService createExecutor() + { + return MoreExecutors.listeningDecorator(new JMXConfigurableThreadPoolExecutor(options.getJobThreads(), + Integer.MAX_VALUE, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), + new NamedThreadFactory("Repair#" + cmd), + "internal")); + } + + private class RepairSessionCallback implements FutureCallback<RepairSessionResult> + { + private final RepairSession session; + + public RepairSessionCallback(RepairSession session) + { + this.session = session; + } + + public void onSuccess(RepairSessionResult result) + { + /** + * If the success message below is modified, it must also be updated on + * {@link org.apache.cassandra.utils.progress.jmx.LegacyJMXProgressSupport} + * for backward-compatibility support. + */ + String message = String.format("Repair session %s for range %s finished", session.getId(), + session.getRanges().toString()); + logger.info(message); + fireProgressEvent(tag, new ProgressEvent(ProgressEventType.PROGRESS, + progress.incrementAndGet(), + totalProgress, + message)); + } + + public void onFailure(Throwable t) + { + /** + * If the failure message below is modified, it must also be updated on + * {@link org.apache.cassandra.utils.progress.jmx.LegacyJMXProgressSupport} + * for backward-compatibility support. + */ + String message = String.format("Repair session %s for range %s failed with error %s", + session.getId(), session.getRanges().toString(), t.getMessage()); + logger.error(message, t); + fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR, + progress.incrementAndGet(), + totalProgress, + message)); + } + } + + private class RepairCompleteCallback implements FutureCallback<Object> + { + final UUID parentSession; + final Collection<Range<Token>> successfulRanges; + final long startTime; + final TraceState traceState; + final AtomicBoolean hasFailure; + final ExecutorService executor; + + public RepairCompleteCallback(UUID parentSession, + Collection<Range<Token>> successfulRanges, + long startTime, + TraceState traceState, + AtomicBoolean hasFailure, + ExecutorService executor) { - public void onSuccess(Object result) + this.parentSession = parentSession; + this.successfulRanges = successfulRanges; + this.startTime = startTime; + this.traceState = traceState; + this.hasFailure = hasFailure; + this.executor = executor; + } + + public void onSuccess(Object result) + { + SystemDistributedKeyspace.successfulParentRepair(parentSession, successfulRanges); + if (hasFailure.get()) { - SystemDistributedKeyspace.successfulParentRepair(parentSession, successfulRanges); - if (hasFailure.get()) - { - fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR, progress.get(), totalProgress, - "Some repair failed")); - } - else - { - fireProgressEvent(tag, new ProgressEvent(ProgressEventType.SUCCESS, progress.get(), totalProgress, - "Repair completed successfully")); - } - repairComplete(); + fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR, progress.get(), totalProgress, + "Some repair failed")); } - - public void onFailure(Throwable t) + else { - fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR, progress.get(), totalProgress, t.getMessage())); - SystemDistributedKeyspace.failParentRepair(parentSession, t); - repairComplete(); + fireProgressEvent(tag, new ProgressEvent(ProgressEventType.SUCCESS, progress.get(), totalProgress, + "Repair completed successfully")); } + repairComplete(); + } - private void repairComplete() + public void onFailure(Throwable t) + { + fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR, progress.get(), totalProgress, t.getMessage())); + SystemDistributedKeyspace.failParentRepair(parentSession, t); + repairComplete(); + } + + private void repairComplete() + { + String duration = DurationFormatUtils.formatDurationWords(System.currentTimeMillis() - startTime, + true, true); + String message = String.format("Repair command #%d finished in %s", cmd, duration); + fireProgressEvent(tag, new ProgressEvent(ProgressEventType.COMPLETE, progress.get(), totalProgress, message)); + logger.info(message); + if (options.isTraced() && traceState != null) { - String duration = DurationFormatUtils.formatDurationWords(System.currentTimeMillis() - startTime, - true, true); - String message = String.format("Repair command #%d finished in %s", cmd, duration); - fireProgressEvent(tag, new ProgressEvent(ProgressEventType.COMPLETE, progress.get(), totalProgress, message)); - logger.info(message); - if (options.isTraced() && traceState != null) - { - for (ProgressListener listener : listeners) - traceState.removeProgressListener(listener); - // Because DebuggableThreadPoolExecutor#afterExecute and this callback - // run in a nondeterministic order (within the same thread), the - // TraceState may have been nulled out at this point. The TraceState - // should be traceState, so just set it without bothering to check if it - // actually was nulled out. - Tracing.instance.set(traceState); - Tracing.traceRepair(message); - Tracing.instance.stopSession(); - } - executor.shutdownNow(); + for (ProgressListener listener : listeners) + traceState.removeProgressListener(listener); + // Because DebuggableThreadPoolExecutor#afterExecute and this callback + // run in a nondeterministic order (within the same thread), the + // TraceState may have been nulled out at this point. The TraceState + // should be traceState, so just set it without bothering to check if it + // actually was nulled out. + Tracing.instance.set(traceState); + Tracing.traceRepair(message); + Tracing.instance.stopSession(); } - }); + executor.shutdownNow(); + } } private void addRangeToNeighbors(List<Pair<Set<InetAddress>, ? extends Collection<Range<Token>>>> neighborRangeList, Range<Token> range, Set<InetAddress> neighbors)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/repair/RepairSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java index 00340a1..43a9bfb 100644 --- a/src/java/org/apache/cassandra/repair/RepairSession.java +++ b/src/java/org/apache/cassandra/repair/RepairSession.java @@ -91,6 +91,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement public final Collection<Range<Token>> ranges; public final Set<InetAddress> endpoints; public final long repairedAt; + public final boolean isConsistent; private final AtomicBoolean isFailed = new AtomicBoolean(false); @@ -124,6 +125,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement RepairParallelism parallelismDegree, Set<InetAddress> endpoints, long repairedAt, + boolean isConsistent, boolean pullRepair, String... cfnames) { @@ -137,6 +139,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement this.ranges = ranges; this.endpoints = endpoints; this.repairedAt = repairedAt; + this.isConsistent = isConsistent; this.pullRepair = pullRepair; } @@ -256,7 +259,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement List<ListenableFuture<RepairResult>> jobs = new ArrayList<>(cfnames.length); for (String cfname : cfnames) { - RepairJob job = new RepairJob(this, cfname); + RepairJob job = new RepairJob(this, cfname, isConsistent); executor.execute(job); jobs.add(job); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/repair/StreamingRepairTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java index f5b2b1d..f24a79a 100644 --- a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java +++ b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java @@ -43,12 +43,14 @@ public class StreamingRepairTask implements Runnable, StreamEventHandler private final RepairJobDesc desc; private final SyncRequest request; private final long repairedAt; + private final boolean isConsistent; - public StreamingRepairTask(RepairJobDesc desc, SyncRequest request, long repairedAt) + public StreamingRepairTask(RepairJobDesc desc, SyncRequest request, long repairedAt, boolean isConsistent) { this.desc = desc; this.request = request; this.repairedAt = repairedAt; + this.isConsistent = isConsistent; } public void run() @@ -62,7 +64,7 @@ public class StreamingRepairTask implements Runnable, StreamEventHandler ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId); isIncremental = prs.isIncremental; } - new StreamPlan("Repair", repairedAt, 1, false, isIncremental, false).listeners(this) + new StreamPlan("Repair", repairedAt, 1, false, isIncremental, false, isConsistent ? desc.parentSessionId : null).listeners(this) .flushBeforeTransfer(true) // request ranges from the remote node .requestRanges(dest, preferred, desc.keyspace, request.ranges, desc.columnFamily) http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java b/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java index fc7aab4..6ebd756 100644 --- a/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java +++ b/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java @@ -173,7 +173,8 @@ public final class SystemDistributedKeyspace PrintWriter pw = new PrintWriter(sw); t.printStackTrace(pw); String fmtQuery = format(query, SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, PARENT_REPAIR_HISTORY, parent_id.toString()); - processSilent(fmtQuery, t.getMessage(), sw.toString()); + String message = t.getMessage(); + processSilent(fmtQuery, message != null ? message : "", sw.toString()); } public static void successfulParentRepair(UUID parent_id, Collection<Range<Token>> successfulRanges) http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/repair/Validator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/Validator.java b/src/java/org/apache/cassandra/repair/Validator.java index e20995e..e8e3621 100644 --- a/src/java/org/apache/cassandra/repair/Validator.java +++ b/src/java/org/apache/cassandra/repair/Validator.java @@ -59,6 +59,7 @@ public class Validator implements Runnable public final InetAddress initiator; public final int gcBefore; private final boolean evenTreeDistribution; + public final boolean isConsistent; // null when all rows with the min token have been consumed private long validated; @@ -72,14 +73,20 @@ public class Validator implements Runnable public Validator(RepairJobDesc desc, InetAddress initiator, int gcBefore) { - this(desc, initiator, gcBefore, false); + this(desc, initiator, gcBefore, false, false); } - public Validator(RepairJobDesc desc, InetAddress initiator, int gcBefore, boolean evenTreeDistribution) + public Validator(RepairJobDesc desc, InetAddress initiator, int gcBefore, boolean isConsistent) + { + this(desc, initiator, gcBefore, false, isConsistent); + } + + public Validator(RepairJobDesc desc, InetAddress initiator, int gcBefore, boolean evenTreeDistribution, boolean isConsistent) { this.desc = desc; this.initiator = initiator; this.gcBefore = gcBefore; + this.isConsistent = isConsistent; validated = 0; range = null; ranges = null; http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java b/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java new file mode 100644 index 0000000..9b1fec9 --- /dev/null +++ b/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair.consistent; + +import java.net.InetAddress; +import java.util.Collection; +import java.util.EnumMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.Executor; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; + +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; + +import org.apache.cassandra.repair.messages.FailSession; +import org.apache.cassandra.repair.messages.FinalizeCommit; +import org.apache.cassandra.repair.messages.FinalizePromise; +import org.apache.cassandra.repair.messages.FinalizePropose; +import org.apache.cassandra.repair.messages.PrepareConsistentRequest; +import org.apache.cassandra.repair.messages.PrepareConsistentResponse; +import org.apache.cassandra.repair.messages.PrepareMessage; +import org.apache.cassandra.repair.messages.RepairOption; +import org.apache.cassandra.repair.messages.StatusRequest; +import org.apache.cassandra.repair.messages.StatusResponse; +import org.apache.cassandra.repair.messages.ValidationRequest; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.tools.nodetool.RepairAdmin; + +/** + * Base class for consistent Local and Coordinator sessions + * + * <p/> + * There are 4 stages to a consistent incremental repair. + * + * <h1>Repair prepare</h1> + * First, the normal {@link ActiveRepairService#prepareForRepair(UUID, InetAddress, Set, RepairOption, List)} stuff + * happens, which sends out {@link PrepareMessage} and creates a {@link ActiveRepairService.ParentRepairSession} + * on the coordinator and each of the neighbors. + * + * <h1>Consistent prepare</h1> + * The consistent prepare step promotes the parent repair session to a consistent session, and isolates the sstables + * being repaired other sstables. First, the coordinator sends a {@link PrepareConsistentRequest} message to each repair + * participant (including itself). When received, the node creates a {@link LocalSession} instance, sets it's state to + * {@code PREPARING}, persists it, and begins a {@link PendingAntiCompaction} task. When the pending anti compaction + * completes, the session state is set to {@code PREPARED}, and a {@link PrepareConsistentResponse} is sent to the + * coordinator indicating success or failure. If the pending anti-compaction fails, the local session state is set + * to {@code FAILED}. + * <p/> + * (see {@link LocalSessions#handlePrepareMessage(InetAddress, PrepareConsistentRequest)} + * <p/> + * Once the coordinator recieves positive {@code PrepareConsistentResponse} messages from all the participants, the + * coordinator begins the normal repair process. + * <p/> + * (see {@link CoordinatorSession#handlePrepareResponse(InetAddress, boolean)} + * + * <h1>Repair</h1> + * The coordinator runs the normal data repair process against the sstables segregated in the previous step. When a + * node recieves a {@link ValidationRequest}, it sets it's local session state to {@code REPAIRING}. + * <p/> + * + * If all of the RepairSessions complete successfully, the coordinator begins the {@code Finalization} process. Otherwise, + * it begins the {@code Failure} process. + * + * <h1>Finalization</h1> + * The finalization step finishes the session and promotes the sstables to repaired. The coordinator begins by sending + * {@link FinalizePropose} messages to each of the participants. Each participant will set it's state to {@code FINALIZE_PROMISED} + * and respond with a {@link FinalizePromise} message. Once the coordinator has received promise messages from all participants, + * it will send a {@link FinalizeCommit} message to all of them, ending the coordinator session. When a node receives the + * {@code FinalizeCommit} message, it will set it's sessions state to {@code FINALIZED}, completing the {@code LocalSession}. + * <p/> + * + * For the sake of simplicity, finalization does not immediately mark pending repair sstables repaired because of potential + * conflicts with in progress compactions. The sstables will be marked repaired as part of the normal compaction process. + * <p/> + * + * On the coordinator side, see {@link CoordinatorSession#finalizePropose(Executor)}, {@link CoordinatorSession#handleFinalizePromise(InetAddress, boolean)}, + * & {@link CoordinatorSession#finalizeCommit(Executor)} + * <p/> + * + * On the local session side, see {@link LocalSessions#handleFinalizeProposeMessage(InetAddress, FinalizePropose)} + * & {@link LocalSessions#handleFinalizeCommitMessage(InetAddress, FinalizeCommit)} + * + * <h1>Failure</h1> + * If there are any failures or problems during the process above, the session will be failed. When a session is failed, + * the coordinator will send {@link FailSession} messages to each of the participants. In some cases (basically those not + * including Validation and Sync) errors are reported back to the coordinator by the local session, at which point, it + * will send {@code FailSession} messages out. + * <p/> + * Just as with finalization, sstables aren't immediately moved back to unrepaired, but will be demoted as part of the + * normal compaction process. + * + * <p/> + * See {@link LocalSessions#failSession(UUID, boolean)} and {@link CoordinatorSession#fail()} + * + * <h1>Failure Recovery & Session Cleanup</h1> + * There are a few scenarios where sessions can get stuck. If a node fails mid session, or it misses a {@code FailSession} + * or {@code FinalizeCommit} message, it will never finish. To address this, there is a cleanup task that runs every + * 10 minutes that attempts to complete idle sessions. + * + * <p/> + * If a session is not completed (not {@code FINALIZED} or {@code FAILED}) and there's been no activity on the session for + * over an hour, the cleanup task will attempt to finish the session by learning the session state of the other participants. + * To do this, it sends a {@link StatusRequest} message to the other session participants. The participants respond with a + * {@link StatusResponse} message, notifying the sender of their state. If the sender receives a {@code FAILED} response + * from any of the participants, it fails the session locally. If it receives a {@code FINALIZED} response from any of the + * participants, it will set it's state to {@code FINALIZED} as well. Since the coordinator won't finalize sessions until + * it's received {@code FinalizePromise} messages from <i>all</i> participants, this is safe. + * + * + * <p/> + * If a session is not completed, and hasn't had any activity for over a day, the session is auto-failed. + * + * <p/> + * Once a session has been completed for over 2 days, it's deleted. + * + * <p/> + * Operators can also manually fail sessions with {@code nodetool repair_admin --cancel} + * + * <p/> + * See {@link LocalSessions#cleanup()} and {@link RepairAdmin} + * + */ +public abstract class ConsistentSession +{ + /** + * The possible states of a {@code ConsistentSession}. The typical progression is {@link State#PREPARING}, {@link State#PREPARED}, + * {@link State#REPAIRING}, {@link State#FINALIZE_PROMISED}, and {@link State#FINALIZED}. With the exception of {@code FINALIZED}, + * any state can be transitions to {@link State#FAILED}. + */ + public enum State + { + PREPARING(0), + PREPARED(1), + REPAIRING(2), + FINALIZE_PROMISED(3), + FINALIZED(4), + FAILED(5); + + State(int expectedOrdinal) + { + assert ordinal() == expectedOrdinal; + } + + private static final Map<State, Set<State>> transitions = new EnumMap<State, Set<State>>(State.class) {{ + put(PREPARING, ImmutableSet.of(PREPARED, FAILED)); + put(PREPARED, ImmutableSet.of(REPAIRING, FAILED)); + put(REPAIRING, ImmutableSet.of(FINALIZE_PROMISED, FAILED)); + put(FINALIZE_PROMISED, ImmutableSet.of(FINALIZED, FAILED)); + put(FINALIZED, ImmutableSet.of()); + put(FAILED, ImmutableSet.of(FAILED)); + }}; + + public boolean canTransitionTo(State state) + { + return transitions.get(this).contains(state); + } + + public static State valueOf(int ordinal) + { + return values()[ordinal]; + } + } + + private volatile State state; + public final UUID sessionID; + public final InetAddress coordinator; + public final ImmutableSet<TableId> tableIds; + public final long repairedAt; + public final ImmutableSet<Range<Token>> ranges; + public final ImmutableSet<InetAddress> participants; + + ConsistentSession(AbstractBuilder builder) + { + builder.validate(); + this.state = builder.state; + this.sessionID = builder.sessionID; + this.coordinator = builder.coordinator; + this.tableIds = ImmutableSet.copyOf(builder.ids); + this.repairedAt = builder.repairedAt; + this.ranges = ImmutableSet.copyOf(builder.ranges); + this.participants = ImmutableSet.copyOf(builder.participants); + } + + public State getState() + { + return state; + } + + public void setState(State state) + { + this.state = state; + } + + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + ConsistentSession that = (ConsistentSession) o; + + if (repairedAt != that.repairedAt) return false; + if (state != that.state) return false; + if (!sessionID.equals(that.sessionID)) return false; + if (!coordinator.equals(that.coordinator)) return false; + if (!tableIds.equals(that.tableIds)) return false; + if (!ranges.equals(that.ranges)) return false; + return participants.equals(that.participants); + } + + public int hashCode() + { + int result = state.hashCode(); + result = 31 * result + sessionID.hashCode(); + result = 31 * result + coordinator.hashCode(); + result = 31 * result + tableIds.hashCode(); + result = 31 * result + (int) (repairedAt ^ (repairedAt >>> 32)); + result = 31 * result + ranges.hashCode(); + result = 31 * result + participants.hashCode(); + return result; + } + + public String toString() + { + return "ConsistentSession{" + + "state=" + state + + ", sessionID=" + sessionID + + ", coordinator=" + coordinator + + ", tableIds=" + tableIds + + ", repairedAt=" + repairedAt + + ", ranges=" + ranges + + ", participants=" + participants + + '}'; + } + + abstract static class AbstractBuilder + { + private State state; + private UUID sessionID; + private InetAddress coordinator; + private Set<TableId> ids; + private long repairedAt; + private Collection<Range<Token>> ranges; + private Set<InetAddress> participants; + + void withState(State state) + { + this.state = state; + } + + void withSessionID(UUID sessionID) + { + this.sessionID = sessionID; + } + + void withCoordinator(InetAddress coordinator) + { + this.coordinator = coordinator; + } + + void withUUIDTableIds(Iterable<UUID> ids) + { + this.ids = ImmutableSet.copyOf(Iterables.transform(ids, TableId::fromUUID)); + } + + void withTableIds(Set<TableId> ids) + { + this.ids = ids; + } + + void withRepairedAt(long repairedAt) + { + this.repairedAt = repairedAt; + } + + void withRanges(Collection<Range<Token>> ranges) + { + this.ranges = ranges; + } + + void withParticipants(Set<InetAddress> peers) + { + this.participants = peers; + } + + void validate() + { + Preconditions.checkArgument(state != null); + Preconditions.checkArgument(sessionID != null); + Preconditions.checkArgument(coordinator != null); + Preconditions.checkArgument(ids != null); + Preconditions.checkArgument(!ids.isEmpty()); + Preconditions.checkArgument(repairedAt > 0); + Preconditions.checkArgument(ranges != null); + Preconditions.checkArgument(!ranges.isEmpty()); + Preconditions.checkArgument(participants != null); + Preconditions.checkArgument(!participants.isEmpty()); + Preconditions.checkArgument(participants.contains(coordinator)); + } + } + + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java b/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java new file mode 100644 index 0000000..ef3eacd --- /dev/null +++ b/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair.consistent; + +import java.net.InetAddress; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; + +import javax.annotation.Nullable; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; +import com.google.common.util.concurrent.AsyncFunction; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.net.MessageOut; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.repair.RepairSessionResult; +import org.apache.cassandra.repair.messages.FailSession; +import org.apache.cassandra.repair.messages.FinalizeCommit; +import org.apache.cassandra.repair.messages.FinalizePropose; +import org.apache.cassandra.repair.messages.PrepareConsistentRequest; +import org.apache.cassandra.repair.messages.RepairMessage; +import org.apache.cassandra.service.ActiveRepairService; + +/** + * Coordinator side logic and state of a consistent repair session. Like {@link ActiveRepairService.ParentRepairSession}, + * there is only one {@code CoordinatorSession} per user repair command, regardless of the number of tables and token + * ranges involved. + */ +public class CoordinatorSession extends ConsistentSession +{ + private static final Logger logger = LoggerFactory.getLogger(CoordinatorSession.class); + + private final Map<InetAddress, State> participantStates = new HashMap<>(); + private final SettableFuture<Boolean> prepareFuture = SettableFuture.create(); + private final SettableFuture<Boolean> finalizeProposeFuture = SettableFuture.create(); + + + public CoordinatorSession(Builder builder) + { + super(builder); + for (InetAddress participant : participants) + { + participantStates.put(participant, State.PREPARING); + } + } + + public static class Builder extends AbstractBuilder + { + public CoordinatorSession build() + { + validate(); + return new CoordinatorSession(this); + } + } + + public static Builder builder() + { + return new Builder(); + } + + public void setState(State state) + { + logger.debug("Setting coordinator state to {} for repair {}", state, sessionID); + super.setState(state); + } + + public synchronized void setParticipantState(InetAddress participant, State state) + { + logger.debug("Setting participant {} to state {} for repair {}", participant, state, sessionID); + Preconditions.checkArgument(participantStates.containsKey(participant), + "Session %s doesn't include %s", + sessionID, participant); + Preconditions.checkArgument(participantStates.get(participant).canTransitionTo(state), + "Invalid state transition %s -> %s", + participantStates.get(participant), state); + participantStates.put(participant, state); + + // update coordinator state if all participants are at the value being set + if (Iterables.all(participantStates.values(), s -> s == state)) + { + setState(state); + } + } + + synchronized void setAll(State state) + { + for (InetAddress participant : participants) + { + setParticipantState(participant, state); + } + } + + synchronized boolean allStates(State state) + { + return getState() == state && Iterables.all(participantStates.values(), v -> v == state); + } + + synchronized boolean hasFailed() + { + return getState() == State.FAILED || Iterables.any(participantStates.values(), v -> v == State.FAILED); + } + + protected void sendMessage(InetAddress destination, RepairMessage message) + { + MessageOut<RepairMessage> messageOut = new MessageOut<RepairMessage>(MessagingService.Verb.REPAIR_MESSAGE, message, RepairMessage.serializer); + MessagingService.instance().sendOneWay(messageOut, destination); + } + + public ListenableFuture<Boolean> prepare(Executor executor) + { + Preconditions.checkArgument(allStates(State.PREPARING)); + + logger.debug("Sending PrepareConsistentRequest message to {}", participants); + PrepareConsistentRequest message = new PrepareConsistentRequest(sessionID, coordinator, participants); + for (final InetAddress participant : participants) + { + executor.execute(() -> sendMessage(participant, message)); + } + return prepareFuture; + } + + public synchronized void handlePrepareResponse(InetAddress participant, boolean success) + { + if (getState() == State.FAILED) + { + logger.debug("Consistent repair {} has failed, ignoring prepare response from {}", sessionID, participant); + } + else if (!success) + { + logger.debug("Failed prepare response received from {} for session {}", participant, sessionID); + fail(); + prepareFuture.set(false); + } + else + { + logger.debug("Successful prepare response received from {} for session {}", participant, sessionID); + setParticipantState(participant, State.PREPARED); + if (getState() == State.PREPARED) + { + prepareFuture.set(true); + } + } + } + + public synchronized void setRepairing() + { + setAll(State.REPAIRING); + } + + public synchronized ListenableFuture<Boolean> finalizePropose(Executor executor) + { + Preconditions.checkArgument(allStates(State.REPAIRING)); + logger.debug("Sending FinalizePropose message to {}", participants); + FinalizePropose message = new FinalizePropose(sessionID); + for (final InetAddress participant : participants) + { + executor.execute(() -> sendMessage(participant, message)); + } + return finalizeProposeFuture; + } + + public synchronized void handleFinalizePromise(InetAddress participant, boolean success) + { + if (getState() == State.FAILED) + { + logger.debug("Consistent repair {} has failed, ignoring finalize promise from {}", sessionID, participant); + } + else if (!success) + { + logger.debug("Failed finalize promise received from {} for session {}", participant, sessionID); + fail(); + finalizeProposeFuture.set(false); + } + else + { + logger.debug("Successful finalize promise received from {} for session {}", participant, sessionID); + setParticipantState(participant, State.FINALIZE_PROMISED); + if (getState() == State.FINALIZE_PROMISED) + { + finalizeProposeFuture.set(true); + } + } + } + + public synchronized void finalizeCommit(Executor executor) + { + Preconditions.checkArgument(allStates(State.FINALIZE_PROMISED)); + logger.debug("Sending FinalizeCommit message to {}", participants); + FinalizeCommit message = new FinalizeCommit(sessionID); + for (final InetAddress participant : participants) + { + executor.execute(() -> sendMessage(participant, message)); + } + setAll(State.FINALIZED); + } + + public void fail() + { + fail(MoreExecutors.directExecutor()); + } + + public synchronized void fail(Executor executor) + { + logger.debug("Failing session {}", sessionID); + FailSession message = new FailSession(sessionID); + for (final InetAddress participant : participants) + { + if (participantStates.get(participant) != State.FAILED) + { + executor.execute(() -> sendMessage(participant, message)); + } + } + setAll(State.FAILED); + } + + /** + * Runs the asynchronous consistent repair session. Actual repair sessions are scheduled via a submitter to make unit testing easier + */ + public ListenableFuture execute(Executor executor, Supplier<ListenableFuture<List<RepairSessionResult>>> sessionSubmitter, AtomicBoolean hasFailure) + { + logger.debug("Executing consistent repair {}", sessionID); + + ListenableFuture<Boolean> prepareResult = prepare(executor); + + // run repair sessions normally + ListenableFuture<List<RepairSessionResult>> repairSessionResults = Futures.transform(prepareResult, new AsyncFunction<Boolean, List<RepairSessionResult>>() + { + public ListenableFuture<List<RepairSessionResult>> apply(Boolean success) throws Exception + { + if (success) + { + setRepairing(); + return sessionSubmitter.get(); + } + else + { + return Futures.immediateFuture(null); + } + + } + }); + + // mark propose finalization + ListenableFuture<Boolean> proposeFuture = Futures.transform(repairSessionResults, new AsyncFunction<List<RepairSessionResult>, Boolean>() + { + public ListenableFuture<Boolean> apply(List<RepairSessionResult> results) throws Exception + { + if (results == null || results.isEmpty() || Iterables.any(results, r -> r == null)) + { + return Futures.immediateFailedFuture(new RuntimeException()); + } + else + { + return finalizePropose(executor); + } + } + }); + + // commit repaired data + Futures.addCallback(proposeFuture, new FutureCallback<Boolean>() + { + public void onSuccess(@Nullable Boolean result) + { + if (result != null && result) + { + finalizeCommit(executor); + } + else + { + hasFailure.set(true); + fail(executor); + } + } + + public void onFailure(Throwable t) + { + hasFailure.set(true); + fail(executor); + } + }); + + return proposeFuture; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/repair/consistent/CoordinatorSessions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/consistent/CoordinatorSessions.java b/src/java/org/apache/cassandra/repair/consistent/CoordinatorSessions.java new file mode 100644 index 0000000..211e0c1 --- /dev/null +++ b/src/java/org/apache/cassandra/repair/consistent/CoordinatorSessions.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair.consistent; + +import java.net.InetAddress; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import com.google.common.base.Preconditions; + +import org.apache.cassandra.repair.messages.FailSession; +import org.apache.cassandra.repair.messages.FinalizePromise; +import org.apache.cassandra.repair.messages.PrepareConsistentResponse; +import org.apache.cassandra.service.ActiveRepairService; + +/** + * Container for all consistent repair sessions a node is coordinating + */ +public class CoordinatorSessions +{ + private final Map<UUID, CoordinatorSession> sessions = new HashMap<>(); + + protected CoordinatorSession buildSession(CoordinatorSession.Builder builder) + { + return new CoordinatorSession(builder); + } + + public synchronized CoordinatorSession registerSession(UUID sessionId, Set<InetAddress> participants) + { + Preconditions.checkArgument(!sessions.containsKey(sessionId), "A coordinator already exists for session %s", sessionId); + ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(sessionId); + CoordinatorSession.Builder builder = CoordinatorSession.builder(); + builder.withState(ConsistentSession.State.PREPARING); + builder.withSessionID(sessionId); + builder.withCoordinator(prs.coordinator); + + builder.withTableIds(prs.getTableIds()); + builder.withRepairedAt(prs.repairedAt); + builder.withRanges(prs.getRanges()); + builder.withParticipants(participants); + CoordinatorSession session = buildSession(builder); + sessions.put(session.sessionID, session); + return session; + } + + public synchronized CoordinatorSession getSession(UUID sessionId) + { + return sessions.get(sessionId); + } + + public void handlePrepareResponse(PrepareConsistentResponse msg) + { + CoordinatorSession session = getSession(msg.parentSession); + if (session != null) + { + session.handlePrepareResponse(msg.participant, msg.success); + } + } + + public void handleFinalizePromiseMessage(FinalizePromise msg) + { + CoordinatorSession session = getSession(msg.sessionID); + if (session != null) + { + session.handleFinalizePromise(msg.participant, msg.promised); + } + } + + public void handleFailSessionMessage(FailSession msg) + { + CoordinatorSession session = getSession(msg.sessionID); + if (session != null) + { + session.fail(); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/repair/consistent/LocalSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/consistent/LocalSession.java b/src/java/org/apache/cassandra/repair/consistent/LocalSession.java new file mode 100644 index 0000000..9d8b4bd --- /dev/null +++ b/src/java/org/apache/cassandra/repair/consistent/LocalSession.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair.consistent; + +import com.google.common.base.Preconditions; + +import org.apache.cassandra.utils.FBUtilities; + +/** + * Basically just a record of a local session. All of the local session logic is implemented in {@link LocalSessions} + */ +public class LocalSession extends ConsistentSession +{ + public final int startedAt; + private volatile int lastUpdate; + + public LocalSession(Builder builder) + { + super(builder); + this.startedAt = builder.startedAt; + this.lastUpdate = builder.lastUpdate; + } + + public boolean isCompleted() + { + State s = getState(); + return s == State.FINALIZED || s == State.FAILED; + } + + public int getStartedAt() + { + return startedAt; + } + + public int getLastUpdate() + { + return lastUpdate; + } + + public void setLastUpdate() + { + lastUpdate = FBUtilities.nowInSeconds(); + } + + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + if (!super.equals(o)) return false; + + LocalSession session = (LocalSession) o; + + if (startedAt != session.startedAt) return false; + return lastUpdate == session.lastUpdate; + } + + public int hashCode() + { + int result = super.hashCode(); + result = 31 * result + startedAt; + result = 31 * result + lastUpdate; + return result; + } + + public String toString() + { + return "LocalSession{" + + "state=" + getState() + + ", sessionID=" + sessionID + + ", coordinator=" + coordinator + + ", tableIds=" + tableIds + + ", repairedAt=" + repairedAt + + ", ranges=" + ranges + + ", participants=" + participants + + ", startedAt=" + startedAt + + ", lastUpdate=" + lastUpdate + + '}'; + } + + public static class Builder extends AbstractBuilder + { + private int startedAt; + private int lastUpdate; + + public void withStartedAt(int startedAt) + { + this.startedAt = startedAt; + } + + public void withLastUpdate(int lastUpdate) + { + this.lastUpdate = lastUpdate; + } + + void validate() + { + super.validate(); + Preconditions.checkArgument(startedAt > 0); + Preconditions.checkArgument(lastUpdate > 0); + } + + public LocalSession build() + { + validate(); + return new LocalSession(this); + } + } + + public static Builder builder() + { + return new Builder(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/repair/consistent/LocalSessionInfo.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/consistent/LocalSessionInfo.java b/src/java/org/apache/cassandra/repair/consistent/LocalSessionInfo.java new file mode 100644 index 0000000..903aeb5 --- /dev/null +++ b/src/java/org/apache/cassandra/repair/consistent/LocalSessionInfo.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair.consistent; + +import java.net.InetAddress; +import java.util.HashMap; +import java.util.Map; + +import com.google.common.base.Joiner; +import com.google.common.collect.Iterables; + +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.schema.TableMetadata; + +/** + * helper for JMX management functions + */ +public class LocalSessionInfo +{ + public static final String SESSION_ID = "SESSION_ID"; + public static final String STATE = "STATE"; + public static final String STARTED = "STARTED"; + public static final String LAST_UPDATE = "LAST_UPDATE"; + public static final String COORDINATOR = "COORDINATOR"; + public static final String PARTICIPANTS = "PARTICIPANTS"; + public static final String TABLES = "TABLES"; + + + private LocalSessionInfo() {} + + private static String tableString(TableId id) + { + TableMetadata meta = Schema.instance.getTableMetadata(id); + return meta != null ? meta.keyspace + '.' + meta.name : "<null>"; + } + + static Map<String, String> sessionToMap(LocalSession session) + { + Map<String, String> m = new HashMap<>(); + m.put(SESSION_ID, session.sessionID.toString()); + m.put(STATE, session.getState().toString()); + m.put(STARTED, Integer.toString(session.getStartedAt())); + m.put(LAST_UPDATE, Integer.toString(session.getLastUpdate())); + m.put(COORDINATOR, session.coordinator.toString()); + m.put(PARTICIPANTS, Joiner.on(',').join(Iterables.transform(session.participants, InetAddress::toString))); + m.put(TABLES, Joiner.on(',').join(Iterables.transform(session.tableIds, LocalSessionInfo::tableString))); + + return m; + } +}