http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/service/ActiveRepairService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java index e7c6640..89e1954 100644 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@ -18,29 +18,31 @@ package org.apache.cassandra.service; import java.io.IOException; +import java.lang.management.ManagementFactory; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; +import javax.management.MBeanServer; +import javax.management.ObjectName; + import com.google.common.base.Predicate; -import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; import com.google.common.collect.Multimap; import com.google.common.collect.Sets; import com.google.common.util.concurrent.AbstractFuture; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; + import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.compaction.CompactionManager; -import org.apache.cassandra.db.lifecycle.SSTableSet; -import org.apache.cassandra.db.lifecycle.View; import org.apache.cassandra.dht.Bounds; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; @@ -59,18 +61,17 @@ import org.apache.cassandra.net.IAsyncCallbackWithFailure; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.repair.AnticompactionTask; import org.apache.cassandra.repair.RepairJobDesc; import org.apache.cassandra.repair.RepairParallelism; import org.apache.cassandra.repair.RepairSession; +import org.apache.cassandra.repair.consistent.CoordinatorSessions; +import org.apache.cassandra.repair.consistent.LocalSessions; import org.apache.cassandra.repair.messages.*; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.utils.CassandraVersion; import org.apache.cassandra.utils.Clock; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.UUIDGen; -import org.apache.cassandra.utils.concurrent.Ref; -import org.apache.cassandra.utils.concurrent.Refs; /** * ActiveRepairService is the starting point for manual "active" repairs. @@ -86,7 +87,7 @@ import org.apache.cassandra.utils.concurrent.Refs; * The creation of a repair session is done through the submitRepairSession that * returns a future on the completion of that session. */ -public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFailureDetectionEventListener +public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFailureDetectionEventListener, ActiveRepairServiceMBean { /** * @deprecated this statuses are from the previous JMX notification service, @@ -98,6 +99,15 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai { STARTED, SESSION_SUCCESS, SESSION_FAILED, FINISHED } + + public class ConsistentSessions + { + public final LocalSessions local = new LocalSessions(); + public final CoordinatorSessions coordinated = new CoordinatorSessions(); + } + + public final ConsistentSessions consistent = new ConsistentSessions(); + private boolean registeredForEndpointChanges = false; public static CassandraVersion SUPPORTS_GLOBAL_PREPARE_FLAG_VERSION = new CassandraVersion("2.2.1"); @@ -107,6 +117,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai public static final ActiveRepairService instance = new ActiveRepairService(FailureDetector.instance, Gossiper.instance); public static final long UNREPAIRED_SSTABLE = 0; + public static final UUID NO_PENDING_REPAIR = null; /** * A map of active coordinator session. @@ -122,6 +133,37 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai { this.failureDetector = failureDetector; this.gossiper = gossiper; + + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + try + { + mbs.registerMBean(this, new ObjectName(MBEAN_NAME)); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + + public void start() + { + consistent.local.start(); + ScheduledExecutors.optionalTasks.scheduleAtFixedRate(consistent.local::cleanup, 0, + LocalSessions.CLEANUP_INTERVAL, + TimeUnit.SECONDS); + } + + @Override + public List<Map<String, String>> getSessions(boolean all) + { + return consistent.local.sessionInfo(all); + } + + @Override + public void failSession(String session, boolean force) + { + UUID sessionID = UUID.fromString(session); + consistent.local.cancelSession(sessionID, force); } /** @@ -135,6 +177,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai RepairParallelism parallelismDegree, Set<InetAddress> endpoints, long repairedAt, + boolean isConsistent, boolean pullRepair, ListeningExecutorService executor, String... cfnames) @@ -145,7 +188,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai if (cfnames.length == 0) return null; - final RepairSession session = new RepairSession(parentRepairSession, UUIDGen.getTimeUUID(), range, keyspace, parallelismDegree, endpoints, repairedAt, pullRepair, cfnames); + final RepairSession session = new RepairSession(parentRepairSession, UUIDGen.getTimeUUID(), range, keyspace, parallelismDegree, endpoints, repairedAt, isConsistent, pullRepair, cfnames); sessions.put(session.getId(), session); // register listeners @@ -283,8 +326,9 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai public synchronized UUID prepareForRepair(UUID parentRepairSession, InetAddress coordinator, Set<InetAddress> endpoints, RepairOption options, List<ColumnFamilyStore> columnFamilyStores) { - long timestamp = Clock.instance.currentTimeMillis(); - registerParentRepairSession(parentRepairSession, coordinator, columnFamilyStores, options.getRanges(), options.isIncremental(), timestamp, options.isGlobal()); + // we only want repairedAt for incremental repairs, for non incremental repairs, UNREPAIRED_SSTABLE will preserve repairedAt on streamed sstables + long repairedAt = options.isIncremental() ? Clock.instance.currentTimeMillis() : ActiveRepairService.UNREPAIRED_SSTABLE; + registerParentRepairSession(parentRepairSession, coordinator, columnFamilyStores, options.getRanges(), options.isIncremental(), repairedAt, options.isGlobal()); final CountDownLatch prepareLatch = new CountDownLatch(endpoints.size()); final AtomicBoolean status = new AtomicBoolean(true); final Set<String> failedNodes = Collections.synchronizedSet(new HashSet<String>()); @@ -316,7 +360,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai { if (FailureDetector.instance.isAlive(neighbour)) { - PrepareMessage message = new PrepareMessage(parentRepairSession, tableIds, options.getRanges(), options.isIncremental(), timestamp, options.isGlobal()); + PrepareMessage message = new PrepareMessage(parentRepairSession, tableIds, options.getRanges(), options.isIncremental(), repairedAt, options.isGlobal()); MessageOut<RepairMessage> msg = message.createMessage(); MessagingService.instance().sendRR(msg, neighbour, callback, TimeUnit.HOURS.toMillis(1), true); } @@ -346,8 +390,9 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai return parentRepairSession; } - public void registerParentRepairSession(UUID parentRepairSession, InetAddress coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, long timestamp, boolean isGlobal) + public void registerParentRepairSession(UUID parentRepairSession, InetAddress coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, long repairedAt, boolean isGlobal) { + assert isIncremental || repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE; if (!registeredForEndpointChanges) { Gossiper.instance.register(this); @@ -355,41 +400,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai registeredForEndpointChanges = true; } - parentRepairSessions.put(parentRepairSession, new ParentRepairSession(coordinator, columnFamilyStores, ranges, isIncremental, timestamp, isGlobal)); - } - - public Set<SSTableReader> currentlyRepairing(TableId tableId, UUID parentRepairSession) - { - Set<SSTableReader> repairing = new HashSet<>(); - for (Map.Entry<UUID, ParentRepairSession> entry : parentRepairSessions.entrySet()) - { - Collection<SSTableReader> sstables = entry.getValue().getActiveSSTables(tableId); - if (sstables != null && !entry.getKey().equals(parentRepairSession)) - repairing.addAll(sstables); - } - return repairing; - } - - /** - * Run final process of repair. - * This removes all resources held by parent repair session, after performing anti compaction if necessary. - * - * @param parentSession Parent session ID - * @param neighbors Repair participants (not including self) - * @param successfulRanges Ranges that repaired successfully - */ - public synchronized ListenableFuture finishParentSession(UUID parentSession, Set<InetAddress> neighbors, Collection<Range<Token>> successfulRanges) - { - List<ListenableFuture<?>> tasks = new ArrayList<>(neighbors.size() + 1); - for (InetAddress neighbor : neighbors) - { - AnticompactionTask task = new AnticompactionTask(parentSession, neighbor, successfulRanges); - registerOnFdAndGossip(task); - tasks.add(task); - task.run(); // 'run' is just sending message - } - tasks.add(doAntiCompaction(parentSession, successfulRanges)); - return Futures.successfulAsList(tasks); + parentRepairSessions.put(parentRepairSession, new ParentRepairSession(coordinator, columnFamilyStores, ranges, isIncremental, repairedAt, isGlobal)); } public ParentRepairSession getParentRepairSession(UUID parentSessionId) @@ -422,53 +433,6 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai return parentRepairSessions.remove(parentSessionId); } - /** - * Submit anti-compaction jobs to CompactionManager. - * When all jobs are done, parent repair session is removed whether those are suceeded or not. - * - * @param parentRepairSession parent repair session ID - * @return Future result of all anti-compaction jobs. - */ - @SuppressWarnings("resource") - public ListenableFuture<List<Object>> doAntiCompaction(final UUID parentRepairSession, Collection<Range<Token>> successfulRanges) - { - assert parentRepairSession != null; - ParentRepairSession prs = getParentRepairSession(parentRepairSession); - //A repair will be marked as not global if it is a subrange repair to avoid many small anti-compactions - //in addition to other scenarios such as repairs not involving all DCs or hosts - if (!prs.isGlobal) - { - logger.info("[repair #{}] Not a global repair, will not do anticompaction", parentRepairSession); - removeParentRepairSession(parentRepairSession); - return Futures.immediateFuture(Collections.emptyList()); - } - assert prs.ranges.containsAll(successfulRanges) : "Trying to perform anticompaction on unknown ranges"; - - List<ListenableFuture<?>> futures = new ArrayList<>(); - // if we don't have successful repair ranges, then just skip anticompaction - if (!successfulRanges.isEmpty()) - { - for (Map.Entry<TableId, ColumnFamilyStore> columnFamilyStoreEntry : prs.columnFamilyStores.entrySet()) - { - Refs<SSTableReader> sstables = prs.getActiveRepairedSSTableRefsForAntiCompaction(columnFamilyStoreEntry.getKey(), parentRepairSession); - ColumnFamilyStore cfs = columnFamilyStoreEntry.getValue(); - futures.add(CompactionManager.instance.submitAntiCompaction(cfs, successfulRanges, sstables, prs.repairedAt, parentRepairSession)); - } - } - - ListenableFuture<List<Object>> allAntiCompactionResults = Futures.successfulAsList(futures); - allAntiCompactionResults.addListener(new Runnable() - { - @Override - public void run() - { - removeParentRepairSession(parentRepairSession); - } - }, MoreExecutors.directExecutor()); - - return allAntiCompactionResults; - } - public void handleMessage(InetAddress endpoint, RepairMessage message) { RepairJobDesc desc = message.desc; @@ -495,27 +459,15 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai * We keep a ParentRepairSession around for the duration of the entire repair, for example, on a 256 token vnode rf=3 cluster * we would have 768 RepairSession but only one ParentRepairSession. We use the PRS to avoid anticompacting the sstables * 768 times, instead we take all repaired ranges at the end of the repair and anticompact once. - * - * We do an optimistic marking of sstables - when we start an incremental repair we mark all unrepaired sstables as - * repairing (@see markSSTablesRepairing), then while the repair is ongoing compactions might remove those sstables, - * and when it is time for anticompaction we will only anticompact the sstables that are still on disk. - * - * Note that validation and streaming do not care about which sstables we have marked as repairing - they operate on - * all unrepaired sstables (if it is incremental), otherwise we would not get a correct repair. */ public static class ParentRepairSession { private final Map<TableId, ColumnFamilyStore> columnFamilyStores = new HashMap<>(); private final Collection<Range<Token>> ranges; - public final Map<TableId, Set<String>> sstableMap = new HashMap<>(); public final boolean isIncremental; public final boolean isGlobal; public final long repairedAt; public final InetAddress coordinator; - /** - * Indicates whether we have marked sstables as repairing. Can only be done once per table per ParentRepairSession - */ - private final Set<TableId> marked = new HashSet<>(); public ParentRepairSession(InetAddress coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, long repairedAt, boolean isGlobal) { @@ -523,7 +475,6 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai for (ColumnFamilyStore cfs : columnFamilyStores) { this.columnFamilyStores.put(cfs.metadata.id, cfs); - sstableMap.put(cfs.metadata.id, new HashSet<>()); } this.ranges = ranges; this.repairedAt = repairedAt; @@ -531,97 +482,6 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai this.isGlobal = isGlobal; } - /** - * Mark sstables repairing - either all sstables or only the unrepaired ones depending on - * - * whether this is an incremental or full repair - * - * @param tableId the table - * @param parentSessionId the parent repair session id, used to make sure we don't start multiple repairs over the same sstables - */ - public synchronized void markSSTablesRepairing(TableId tableId, UUID parentSessionId) - { - if (!marked.contains(tableId)) - { - List<SSTableReader> sstables = columnFamilyStores.get(tableId).select(View.select(SSTableSet.CANONICAL, (s) -> !isIncremental || !s.isRepaired())).sstables; - Set<SSTableReader> currentlyRepairing = ActiveRepairService.instance.currentlyRepairing(tableId, parentSessionId); - if (!Sets.intersection(currentlyRepairing, Sets.newHashSet(sstables)).isEmpty()) - { - logger.error("Cannot start multiple repair sessions over the same sstables"); - throw new RuntimeException("Cannot start multiple repair sessions over the same sstables"); - } - addSSTables(tableId, sstables); - marked.add(tableId); - } - } - - /** - * Get the still active sstables we should run anticompaction on - * - * note that validation and streaming do not call this method - they have to work on the actual active sstables on the node, we only call this - * to know which sstables are still there that were there when we started the repair - * - * @param tableId - * @param parentSessionId for checking if there exists a snapshot for this repair - * @return - */ - @SuppressWarnings("resource") - public synchronized Refs<SSTableReader> getActiveRepairedSSTableRefsForAntiCompaction(TableId tableId, UUID parentSessionId) - { - assert marked.contains(tableId); - if (!columnFamilyStores.containsKey(tableId)) - throw new RuntimeException("Not possible to get sstables for anticompaction for " + tableId); - boolean isSnapshotRepair = columnFamilyStores.get(tableId).snapshotExists(parentSessionId.toString()); - ImmutableMap.Builder<SSTableReader, Ref<SSTableReader>> references = ImmutableMap.builder(); - Iterable<SSTableReader> sstables = isSnapshotRepair ? getSSTablesForSnapshotRepair(tableId, parentSessionId) : getActiveSSTables(tableId); - // we check this above - if columnFamilyStores contains the tableId sstables will not be null - assert sstables != null; - for (SSTableReader sstable : sstables) - { - Ref<SSTableReader> ref = sstable.tryRef(); - if (ref == null) - sstableMap.get(tableId).remove(sstable.getFilename()); - else - references.put(sstable, ref); - } - return new Refs<>(references.build()); - } - - /** - * If we are running a snapshot repair we need to find the 'real' sstables when we start anticompaction - * - * We use the generation of the sstables as identifiers instead of the file name to avoid having to parse out the - * actual filename. - * - * @param tableId - * @param parentSessionId - * @return - */ - private Set<SSTableReader> getSSTablesForSnapshotRepair(TableId tableId, UUID parentSessionId) - { - Set<SSTableReader> activeSSTables = new HashSet<>(); - ColumnFamilyStore cfs = columnFamilyStores.get(tableId); - if (cfs == null) - return null; - - Set<Integer> snapshotGenerations = new HashSet<>(); - try (Refs<SSTableReader> snapshottedSSTables = cfs.getSnapshotSSTableReader(parentSessionId.toString())) - { - for (SSTableReader sstable : snapshottedSSTables) - { - snapshotGenerations.add(sstable.descriptor.generation); - } - } - catch (IOException e) - { - throw new RuntimeException(e); - } - for (SSTableReader sstable : cfs.getSSTables(SSTableSet.CANONICAL)) - if (snapshotGenerations.contains(sstable.descriptor.generation)) - activeSSTables.add(sstable); - return activeSSTables; - } - public synchronized void maybeSnapshot(TableId tableId, UUID parentSessionId) { String snapshotName = parentSessionId.toString(); @@ -637,75 +497,29 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(ranges); } }, true, false); - - if (isAlreadyRepairing(tableId, parentSessionId, snapshottedSSTables)) - { - columnFamilyStores.get(tableId).clearSnapshot(snapshotName); - logger.error("Cannot start multiple repair sessions over the same sstables"); - throw new RuntimeException("Cannot start multiple repair sessions over the same sstables"); - } - addSSTables(tableId, snapshottedSSTables); - marked.add(tableId); } } - - /** - * Compares other repairing sstables *generation* to the ones we just snapshotted - * - * we compare generations since the sstables have different paths due to snapshot names - * - * @param tableId id of table store - * @param parentSessionId parent repair session - * @param sstables the newly snapshotted sstables - * @return - */ - private boolean isAlreadyRepairing(TableId tableId, UUID parentSessionId, Collection<SSTableReader> sstables) + public long getRepairedAt() { - Set<SSTableReader> currentlyRepairing = ActiveRepairService.instance.currentlyRepairing(tableId, parentSessionId); - Set<Integer> currentlyRepairingGenerations = new HashSet<>(); - Set<Integer> newRepairingGenerations = new HashSet<>(); - for (SSTableReader sstable : currentlyRepairing) - currentlyRepairingGenerations.add(sstable.descriptor.generation); - for (SSTableReader sstable : sstables) - newRepairingGenerations.add(sstable.descriptor.generation); - - return !Sets.intersection(currentlyRepairingGenerations, newRepairingGenerations).isEmpty(); + if (isGlobal) + return repairedAt; + return ActiveRepairService.UNREPAIRED_SSTABLE; } - private Set<SSTableReader> getActiveSSTables(TableId tableId) + public Collection<ColumnFamilyStore> getColumnFamilyStores() { - if (!columnFamilyStores.containsKey(tableId)) - return null; - - Set<String> repairedSSTables = sstableMap.get(tableId); - Set<SSTableReader> activeSSTables = new HashSet<>(); - Set<String> activeSSTableNames = new HashSet<>(); - ColumnFamilyStore cfs = columnFamilyStores.get(tableId); - for (SSTableReader sstable : cfs.getSSTables(SSTableSet.CANONICAL)) - { - if (repairedSSTables.contains(sstable.getFilename())) - { - activeSSTables.add(sstable); - activeSSTableNames.add(sstable.getFilename()); - } - } - sstableMap.put(tableId, activeSSTableNames); - return activeSSTables; + return ImmutableSet.<ColumnFamilyStore>builder().addAll(columnFamilyStores.values()).build(); } - private void addSSTables(TableId tableId, Collection<SSTableReader> sstables) + public Set<TableId> getTableIds() { - for (SSTableReader sstable : sstables) - sstableMap.get(tableId).add(sstable.getFilename()); + return ImmutableSet.copyOf(Iterables.transform(getColumnFamilyStores(), cfs -> cfs.metadata.id)); } - - public long getRepairedAt() + public Collection<Range<Token>> getRanges() { - if (isGlobal) - return repairedAt; - return ActiveRepairService.UNREPAIRED_SSTABLE; + return ImmutableSet.copyOf(ranges); } @Override @@ -714,7 +528,6 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai return "ParentRepairSession{" + "columnFamilyStores=" + columnFamilyStores + ", ranges=" + ranges + - ", sstableMap=" + sstableMap + ", repairedAt=" + repairedAt + '}'; }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/service/ActiveRepairServiceMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ActiveRepairServiceMBean.java b/src/java/org/apache/cassandra/service/ActiveRepairServiceMBean.java new file mode 100644 index 0000000..53b0acb --- /dev/null +++ b/src/java/org/apache/cassandra/service/ActiveRepairServiceMBean.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service; + +import java.util.List; +import java.util.Map; + +public interface ActiveRepairServiceMBean +{ + public static final String MBEAN_NAME = "org.apache.cassandra.db:type=RepairService"; + + public List<Map<String, String>> getSessions(boolean all); + public void failSession(String session, boolean force); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/service/CassandraDaemon.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index fe84082..03156ae 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -322,6 +322,7 @@ public class CassandraDaemon } SystemKeyspace.finishStartup(); + ActiveRepairService.instance.start(); // Prepared statements QueryProcessor.preloadPreparedStatement(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/streaming/ConnectionHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java index 556748d..10c5827 100644 --- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java +++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java @@ -198,7 +198,8 @@ public class ConnectionHandler session.description(), !isOutgoingHandler, session.keepSSTableLevel(), - session.isIncremental()); + session.isIncremental(), + session.getPendingRepair()); ByteBuffer messageBuf = message.createMessage(false, protocolVersion); DataOutputStreamPlus out = getWriteChannel(socket); out.write(messageBuf); http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/streaming/StreamCoordinator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java index 2cb75f7..81d0498 100644 --- a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java +++ b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java @@ -49,15 +49,17 @@ public class StreamCoordinator private final boolean keepSSTableLevel; private final boolean isIncremental; private Iterator<StreamSession> sessionsToConnect = null; + private final UUID pendingRepair; public StreamCoordinator(int connectionsPerHost, boolean keepSSTableLevel, boolean isIncremental, - StreamConnectionFactory factory, boolean connectSequentially) + StreamConnectionFactory factory, boolean connectSequentially, UUID pendingRepair) { this.connectionsPerHost = connectionsPerHost; this.factory = factory; this.keepSSTableLevel = keepSSTableLevel; this.isIncremental = isIncremental; this.connectSequentially = connectSequentially; + this.pendingRepair = pendingRepair; } public void setConnectionFactory(StreamConnectionFactory factory) @@ -288,7 +290,7 @@ public class StreamCoordinator // create if (streamSessions.size() < connectionsPerHost) { - StreamSession session = new StreamSession(peer, connecting, factory, streamSessions.size(), keepSSTableLevel, isIncremental); + StreamSession session = new StreamSession(peer, connecting, factory, streamSessions.size(), keepSSTableLevel, isIncremental, pendingRepair); streamSessions.put(++lastReturned, session); return session; } @@ -320,7 +322,7 @@ public class StreamCoordinator StreamSession session = streamSessions.get(id); if (session == null) { - session = new StreamSession(peer, connecting, factory, id, keepSSTableLevel, isIncremental); + session = new StreamSession(peer, connecting, factory, id, keepSSTableLevel, isIncremental, pendingRepair); streamSessions.put(id, session); } return session; http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/streaming/StreamPlan.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java index e9d43cb..5526da8 100644 --- a/src/java/org/apache/cassandra/streaming/StreamPlan.java +++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java @@ -48,21 +48,21 @@ public class StreamPlan */ public StreamPlan(String description) { - this(description, ActiveRepairService.UNREPAIRED_SSTABLE, 1, false, false, false); + this(description, ActiveRepairService.UNREPAIRED_SSTABLE, 1, false, false, false, null); } public StreamPlan(String description, boolean keepSSTableLevels, boolean connectSequentially) { - this(description, ActiveRepairService.UNREPAIRED_SSTABLE, 1, keepSSTableLevels, false, connectSequentially); + this(description, ActiveRepairService.UNREPAIRED_SSTABLE, 1, keepSSTableLevels, false, connectSequentially, null); } public StreamPlan(String description, long repairedAt, int connectionsPerHost, boolean keepSSTableLevels, - boolean isIncremental, boolean connectSequentially) + boolean isIncremental, boolean connectSequentially, UUID pendingRepair) { this.description = description; this.repairedAt = repairedAt; this.coordinator = new StreamCoordinator(connectionsPerHost, keepSSTableLevels, isIncremental, new DefaultConnectionFactory(), - connectSequentially); + connectSequentially, pendingRepair); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/streaming/StreamReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java index baf5ec9..fdc2ae2 100644 --- a/src/java/org/apache/cassandra/streaming/StreamReader.java +++ b/src/java/org/apache/cassandra/streaming/StreamReader.java @@ -21,6 +21,7 @@ import java.io.*; import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; import java.util.Collection; +import java.util.UUID; import com.google.common.base.Throwables; import com.google.common.collect.UnmodifiableIterator; @@ -95,16 +96,16 @@ public class StreamReader throw new IOException("CF " + tableId + " was dropped during streaming"); } - logger.debug("[Stream #{}] Start receiving file #{} from {}, repairedAt = {}, size = {}, ks = '{}', table = '{}'.", + logger.debug("[Stream #{}] Start receiving file #{} from {}, repairedAt = {}, size = {}, ks = '{}', table = '{}', pendingRepair = '{}'.", session.planId(), fileSeqNum, session.peer, repairedAt, totalSize, cfs.keyspace.getName(), - cfs.getTableName()); + cfs.getTableName(), session.getPendingRepair()); TrackedInputStream in = new TrackedInputStream(new LZFInputStream(Channels.newInputStream(channel))); StreamDeserializer deserializer = new StreamDeserializer(cfs.metadata(), in, inputVersion, getHeader(cfs.metadata())); SSTableMultiWriter writer = null; try { - writer = createWriter(cfs, totalSize, repairedAt, format); + writer = createWriter(cfs, totalSize, repairedAt, session.getPendingRepair(), format); while (in.getBytesRead() < totalSize) { writePartition(deserializer, writer); @@ -138,13 +139,13 @@ public class StreamReader return header != null? header.toHeader(metadata) : null; //pre-3.0 sstable have no SerializationHeader } - protected SSTableMultiWriter createWriter(ColumnFamilyStore cfs, long totalSize, long repairedAt, SSTableFormat.Type format) throws IOException + protected SSTableMultiWriter createWriter(ColumnFamilyStore cfs, long totalSize, long repairedAt, UUID pendingRepair, SSTableFormat.Type format) throws IOException { Directories.DataDirectory localDir = cfs.getDirectories().getWriteableLocation(totalSize); if (localDir == null) throw new IOException(String.format("Insufficient disk space to store %s", FBUtilities.prettyPrintMemory(totalSize))); - RangeAwareSSTableWriter writer = new RangeAwareSSTableWriter(cfs, estimatedKeys, repairedAt, format, sstableLevel, totalSize, session.getTransaction(tableId), getHeader(cfs.metadata())); + RangeAwareSSTableWriter writer = new RangeAwareSSTableWriter(cfs, estimatedKeys, repairedAt, pendingRepair, format, sstableLevel, totalSize, session.getTransaction(tableId), getHeader(cfs.metadata())); StreamHook.instance.reportIncomingFile(cfs, writer, session, fileSeqNum); return writer; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/streaming/StreamResultFuture.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java index 61a1c8c..6d0c03b 100644 --- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java +++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java @@ -71,10 +71,9 @@ public final class StreamResultFuture extends AbstractFuture<StreamState> set(getCurrentState()); } - private StreamResultFuture(UUID planId, String description, boolean keepSSTableLevels, boolean isIncremental) + private StreamResultFuture(UUID planId, String description, boolean keepSSTableLevels, boolean isIncremental, UUID pendingRepair) { - this(planId, description, new StreamCoordinator(0, keepSSTableLevels, isIncremental, - new DefaultConnectionFactory(), false)); + this(planId, description, new StreamCoordinator(0, keepSSTableLevels, isIncremental, new DefaultConnectionFactory(), false, pendingRepair)); } static StreamResultFuture init(UUID planId, String description, Collection<StreamEventHandler> listeners, @@ -108,7 +107,8 @@ public final class StreamResultFuture extends AbstractFuture<StreamState> boolean isForOutgoing, int version, boolean keepSSTableLevel, - boolean isIncremental) throws IOException + boolean isIncremental, + UUID pendingRepair) throws IOException { StreamResultFuture future = StreamManager.instance.getReceivingStream(planId); if (future == null) @@ -116,7 +116,7 @@ public final class StreamResultFuture extends AbstractFuture<StreamState> logger.info("[Stream #{} ID#{}] Creating new streaming plan for {}", planId, sessionIndex, description); // The main reason we create a StreamResultFuture on the receiving side is for JMX exposure. - future = new StreamResultFuture(planId, description, keepSSTableLevel, isIncremental); + future = new StreamResultFuture(planId, description, keepSSTableLevel, isIncremental, pendingRepair); StreamManager.instance.registerReceiving(future); } future.attachConnection(from, sessionIndex, connection, isForOutgoing, version); http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/streaming/StreamSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java index faa05d1..b7db2b2 100644 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@ -163,6 +163,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber private final boolean keepSSTableLevel; private final boolean isIncremental; private ScheduledFuture<?> keepAliveFuture = null; + private final UUID pendingRepair; public static enum State { @@ -184,7 +185,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber * @param connecting Actual connecting address * @param factory is used for establishing connection */ - public StreamSession(InetAddress peer, InetAddress connecting, StreamConnectionFactory factory, int index, boolean keepSSTableLevel, boolean isIncremental) + public StreamSession(InetAddress peer, InetAddress connecting, StreamConnectionFactory factory, int index, boolean keepSSTableLevel, boolean isIncremental, UUID pendingRepair) { this.peer = peer; this.connecting = connecting; @@ -196,6 +197,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber this.metrics = StreamingMetrics.get(connecting); this.keepSSTableLevel = keepSSTableLevel; this.isIncremental = isIncremental; + this.pendingRepair = pendingRepair; } public UUID planId() @@ -223,6 +225,10 @@ public class StreamSession implements IEndpointStateChangeSubscriber return isIncremental; } + public UUID getPendingRepair() + { + return pendingRepair; + } public LifecycleTransaction getTransaction(TableId tableId) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java index a15d2ff..d8e329c 100644 --- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java +++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java @@ -73,7 +73,7 @@ public class CompressedStreamReader extends StreamReader } logger.debug("[Stream #{}] Start receiving file #{} from {}, repairedAt = {}, size = {}, ks = '{}', table = '{}'.", - session.planId(), fileSeqNum, session.peer, repairedAt, totalSize, cfs.keyspace.getName(), + session.planId(), fileSeqNum, session.peer, repairedAt, totalSize, cfs.keyspace.getName(), session.getPendingRepair(), cfs.getTableName()); CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo, @@ -84,7 +84,7 @@ public class CompressedStreamReader extends StreamReader SSTableMultiWriter writer = null; try { - writer = createWriter(cfs, totalSize, repairedAt, format); + writer = createWriter(cfs, totalSize, repairedAt, session.getPendingRepair(), format); String filename = writer.getFilename(); int sectionIdx = 0; for (Pair<Long, Long> section : sections) http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java index 6d807e9..3b4b512 100644 --- a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java @@ -49,8 +49,9 @@ public class StreamInitMessage public final boolean isForOutgoing; public final boolean keepSSTableLevel; public final boolean isIncremental; + public final UUID pendingRepair; - public StreamInitMessage(InetAddress from, int sessionIndex, UUID planId, String description, boolean isForOutgoing, boolean keepSSTableLevel, boolean isIncremental) + public StreamInitMessage(InetAddress from, int sessionIndex, UUID planId, String description, boolean isForOutgoing, boolean keepSSTableLevel, boolean isIncremental, UUID pendingRepair) { this.from = from; this.sessionIndex = sessionIndex; @@ -59,6 +60,7 @@ public class StreamInitMessage this.isForOutgoing = isForOutgoing; this.keepSSTableLevel = keepSSTableLevel; this.isIncremental = isIncremental; + this.pendingRepair = pendingRepair; } /** @@ -114,6 +116,12 @@ public class StreamInitMessage out.writeBoolean(message.isForOutgoing); out.writeBoolean(message.keepSSTableLevel); out.writeBoolean(message.isIncremental); + + out.writeBoolean(message.pendingRepair != null); + if (message.pendingRepair != null) + { + UUIDSerializer.serializer.serialize(message.pendingRepair, out, MessagingService.current_version); + } } public StreamInitMessage deserialize(DataInputPlus in, int version) throws IOException @@ -124,8 +132,10 @@ public class StreamInitMessage String description = in.readUTF(); boolean sentByInitiator = in.readBoolean(); boolean keepSSTableLevel = in.readBoolean(); + boolean isIncremental = in.readBoolean(); - return new StreamInitMessage(from, sessionIndex, planId, description, sentByInitiator, keepSSTableLevel, isIncremental); + UUID pendingRepair = in.readBoolean() ? UUIDSerializer.serializer.deserialize(in, version) : null; + return new StreamInitMessage(from, sessionIndex, planId, description, sentByInitiator, keepSSTableLevel, isIncremental, pendingRepair); } public long serializedSize(StreamInitMessage message, int version) @@ -137,6 +147,11 @@ public class StreamInitMessage size += TypeSizes.sizeof(message.isForOutgoing); size += TypeSizes.sizeof(message.keepSSTableLevel); size += TypeSizes.sizeof(message.isIncremental); + size += TypeSizes.sizeof(message.pendingRepair != null); + if (message.pendingRepair != null) + { + size += UUIDSerializer.serializer.serializedSize(message.pendingRepair, MessagingService.current_version); + } return size; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/tools/NodeProbe.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index 5463255..865665c 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -74,6 +74,7 @@ import org.apache.cassandra.metrics.TableMetrics; import org.apache.cassandra.metrics.ThreadPoolMetrics; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.net.MessagingServiceMBean; +import org.apache.cassandra.service.ActiveRepairServiceMBean; import org.apache.cassandra.service.CacheService; import org.apache.cassandra.service.CacheServiceMBean; import org.apache.cassandra.service.GCInspector; @@ -122,6 +123,7 @@ public class NodeProbe implements AutoCloseable private StorageProxyMBean spProxy; private HintedHandOffManagerMBean hhProxy; private BatchlogManagerMBean bmProxy; + private ActiveRepairServiceMBean arsProxy; private boolean failed; /** @@ -214,6 +216,8 @@ public class NodeProbe implements AutoCloseable gossProxy = JMX.newMBeanProxy(mbeanServerConn, name, GossiperMBean.class); name = new ObjectName(BatchlogManager.MBEAN_NAME); bmProxy = JMX.newMBeanProxy(mbeanServerConn, name, BatchlogManagerMBean.class); + name = new ObjectName(ActiveRepairServiceMBean.MBEAN_NAME); + arsProxy = JMX.newMBeanProxy(mbeanServerConn, name, ActiveRepairServiceMBean.class); } catch (MalformedObjectNameException e) { @@ -1511,6 +1515,11 @@ public class NodeProbe implements AutoCloseable throw new RuntimeException(e); } } + + public ActiveRepairServiceMBean getRepairServiceProxy() + { + return arsProxy; + } } class ColumnFamilyStoreMBeanIterator implements Iterator<Map.Entry<String, ColumnFamilyStoreMBean>> http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/tools/NodeTool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java index 0c55f76..6812a27 100644 --- a/src/java/org/apache/cassandra/tools/NodeTool.java +++ b/src/java/org/apache/cassandra/tools/NodeTool.java @@ -99,6 +99,7 @@ public class NodeTool RemoveNode.class, Assassinate.class, Repair.class, + RepairAdmin.class, ReplayBatchlog.class, SetCacheCapacity.class, SetHintedHandoffThrottleInKB.class, http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java b/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java index 019e053..63c7f96 100644 --- a/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java +++ b/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java @@ -135,6 +135,7 @@ public class SSTableMetadataViewer out.printf("Estimated droppable tombstones: %s%n", stats.getEstimatedDroppableTombstoneRatio((int) (System.currentTimeMillis() / 1000) - gcgs)); out.printf("SSTable Level: %d%n", stats.sstableLevel); out.printf("Repaired at: %d%n", stats.repairedAt); + out.printf("Pending repair: %s%n", stats.pendingRepair); out.printf("Replay positions covered: %s%n", stats.commitLogIntervals); out.printf("totalColumnsSet: %s%n", stats.totalColumnsSet); out.printf("totalRows: %s%n", stats.totalRows); http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/tools/SSTableRepairedAtSetter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/SSTableRepairedAtSetter.java b/src/java/org/apache/cassandra/tools/SSTableRepairedAtSetter.java index a130177..8056ff8 100644 --- a/src/java/org/apache/cassandra/tools/SSTableRepairedAtSetter.java +++ b/src/java/org/apache/cassandra/tools/SSTableRepairedAtSetter.java @@ -27,7 +27,6 @@ import java.util.List; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; -import org.apache.cassandra.service.ActiveRepairService; /** * Set repairedAt status on a given set of sstables. @@ -89,11 +88,11 @@ public class SSTableRepairedAtSetter if (setIsRepaired) { FileTime f = Files.getLastModifiedTime(new File(descriptor.filenameFor(Component.DATA)).toPath()); - descriptor.getMetadataSerializer().mutateRepairedAt(descriptor, f.toMillis()); + descriptor.getMetadataSerializer().mutateRepaired(descriptor, f.toMillis(), null); } else { - descriptor.getMetadataSerializer().mutateRepairedAt(descriptor, ActiveRepairService.UNREPAIRED_SSTABLE); + descriptor.getMetadataSerializer().mutateRepaired(descriptor, 0, null); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/tools/nodetool/RepairAdmin.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/nodetool/RepairAdmin.java b/src/java/org/apache/cassandra/tools/nodetool/RepairAdmin.java new file mode 100644 index 0000000..bb201a2 --- /dev/null +++ b/src/java/org/apache/cassandra/tools/nodetool/RepairAdmin.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tools.nodetool; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +import io.airlift.command.Command; +import io.airlift.command.Option; +import org.apache.cassandra.repair.consistent.LocalSessionInfo; +import org.apache.cassandra.service.ActiveRepairServiceMBean; +import org.apache.cassandra.tools.NodeProbe; +import org.apache.cassandra.tools.NodeTool; +import org.apache.cassandra.utils.FBUtilities; + +/** + * Supports listing and failing incremental repair sessions + */ +@Command(name = "repair_admin", description = "list and fail incremental repair sessions") +public class RepairAdmin extends NodeTool.NodeToolCmd +{ + @Option(title = "list", name = {"-l", "--list"}, description = "list repair sessions (default behavior)") + private boolean list = false; + + @Option(title = "all", name = {"-a", "--all"}, description = "include completed and failed sessions") + private boolean all = false; + + @Option(title = "cancel", name = {"-x", "--cancel"}, description = "cancel an incremental repair session") + private String cancel = null; + + @Option(title = "force", name = {"-f", "--force"}, description = "cancel repair session from a node other than the repair coordinator." + + " Attempting to cancel FINALIZED or FAILED sessions is an error.") + private boolean force = false; + + private static final List<String> header = Lists.newArrayList("id", + "state", + "last activity", + "coordinator", + "participants"); + + + private List<String> sessionValues(Map<String, String> session, int now) + { + int updated = Integer.parseInt(session.get(LocalSessionInfo.LAST_UPDATE)); + return Lists.newArrayList(session.get(LocalSessionInfo.SESSION_ID), + session.get(LocalSessionInfo.STATE), + Integer.toString(now - updated) + " (s)", + session.get(LocalSessionInfo.COORDINATOR), + session.get(LocalSessionInfo.PARTICIPANTS)); + } + + private void listSessions(ActiveRepairServiceMBean repairServiceProxy) + { + Preconditions.checkArgument(cancel == null); + Preconditions.checkArgument(!force, "-f/--force only valid for session cancel"); + List<Map<String, String>> sessions = repairServiceProxy.getSessions(all); + if (sessions.isEmpty()) + { + System.out.println("no sessions"); + + } + else + { + List<List<String>> rows = new ArrayList<>(); + rows.add(header); + int now = FBUtilities.nowInSeconds(); + for (Map<String, String> session : sessions) + { + rows.add(sessionValues(session, now)); + } + + // get max col widths + int[] widths = new int[header.size()]; + for (List<String> row : rows) + { + assert row.size() == widths.length; + for (int i = 0; i < widths.length; i++) + { + widths[i] = Math.max(widths[i], row.get(i).length()); + } + } + + List<String> fmts = new ArrayList<>(widths.length); + for (int i = 0; i < widths.length; i++) + { + fmts.add("%-" + Integer.toString(widths[i]) + "s"); + } + + + // print + for (List<String> row : rows) + { + List<String> formatted = new ArrayList<>(row.size()); + for (int i = 0; i < widths.length; i++) + { + formatted.add(String.format(fmts.get(i), row.get(i))); + } + System.out.println(Joiner.on(" | ").join(formatted)); + } + } + } + + private void cancelSession(ActiveRepairServiceMBean repairServiceProxy) + { + Preconditions.checkArgument(!list); + Preconditions.checkArgument(!all, "-a/--all only valid for session list"); + repairServiceProxy.failSession(cancel, force); + } + + protected void execute(NodeProbe probe) + { + if (list && cancel != null) + { + throw new RuntimeException("Can either list, or cancel sessions, not both"); + } + else if (cancel != null) + { + cancelSession(probe.getRepairServiceProxy()); + } + else + { + // default + listSessions(probe.getRepairServiceProxy()); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/db/RepairedDataTombstonesTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/RepairedDataTombstonesTest.java b/test/unit/org/apache/cassandra/db/RepairedDataTombstonesTest.java index b814ea6..e01088d 100644 --- a/test/unit/org/apache/cassandra/db/RepairedDataTombstonesTest.java +++ b/test/unit/org/apache/cassandra/db/RepairedDataTombstonesTest.java @@ -308,7 +308,7 @@ public class RepairedDataTombstonesTest extends CQLTester public static void repair(ColumnFamilyStore cfs, SSTableReader sstable) throws IOException { - sstable.descriptor.getMetadataSerializer().mutateRepairedAt(sstable.descriptor, 1); + sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor, 1, null); sstable.reloadSSTableMetadata(); cfs.getTracker().notifySSTableRepairedStatusChanged(Collections.singleton(sstable)); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/db/ScrubTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java index d9f6433..f0873b9 100644 --- a/test/unit/org/apache/cassandra/db/ScrubTest.java +++ b/test/unit/org/apache/cassandra/db/ScrubTest.java @@ -642,7 +642,7 @@ public class ScrubTest { SerializationHeader header = new SerializationHeader(true, metadata.get(), metadata.get().regularAndStaticColumns(), EncodingStats.NO_STATS); MetadataCollector collector = new MetadataCollector(metadata.get().comparator).sstableLevel(0); - return new TestMultiWriter(new TestWriter(descriptor, keyCount, 0, metadata, collector, header, txn), txn); + return new TestMultiWriter(new TestWriter(descriptor, keyCount, 0, null, metadata, collector, header, txn), txn); } private static class TestMultiWriter extends SimpleSSTableMultiWriter @@ -658,10 +658,10 @@ public class ScrubTest */ private static class TestWriter extends BigTableWriter { - TestWriter(Descriptor descriptor, long keyCount, long repairedAt, TableMetadataRef metadata, + TestWriter(Descriptor descriptor, long keyCount, long repairedAt, UUID pendingRepair, TableMetadataRef metadata, MetadataCollector collector, SerializationHeader header, LifecycleTransaction txn) { - super(descriptor, keyCount, repairedAt, metadata, collector, header, Collections.emptySet(), txn); + super(descriptor, keyCount, repairedAt, pendingRepair, metadata, collector, header, Collections.emptySet(), txn); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java b/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java new file mode 100644 index 0000000..08be550 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.compaction; + +import java.io.IOException; +import java.net.InetAddress; +import java.util.HashSet; +import java.util.Set; +import java.util.UUID; + +import com.google.common.collect.Iterables; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Ignore; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.cql3.statements.CreateTableStatement; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.net.IMessageSink; +import org.apache.cassandra.net.MessageIn; +import org.apache.cassandra.net.MessageOut; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.repair.consistent.AbstractConsistentSessionTest; +import org.apache.cassandra.repair.consistent.LocalSessionAccessor; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.service.ActiveRepairService; + +@Ignore +public class AbstractPendingRepairTest extends AbstractConsistentSessionTest +{ + protected String ks; + protected final String tbl = "tbl"; + protected TableMetadata cfm; + protected ColumnFamilyStore cfs; + protected CompactionStrategyManager csm; + protected static ActiveRepairService ARS; + + private int nextSSTableKey = 0; + + @BeforeClass + public static void setupClass() + { + SchemaLoader.prepareServer(); + ARS = ActiveRepairService.instance; + LocalSessionAccessor.startup(); + + // cutoff messaging service + MessagingService.instance().addMessageSink(new IMessageSink() + { + public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to) + { + return false; + } + + public boolean allowIncomingMessage(MessageIn message, int id) + { + return false; + } + }); + } + + @Before + public void setup() + { + ks = "ks_" + System.currentTimeMillis(); + cfm = CreateTableStatement.parse(String.format("CREATE TABLE %s.%s (k INT PRIMARY KEY, v INT)", ks, tbl), ks).build(); + SchemaLoader.createKeyspace(ks, KeyspaceParams.simple(1), cfm); + cfs = Schema.instance.getColumnFamilyStoreInstance(cfm.id); + csm = cfs.getCompactionStrategyManager(); + nextSSTableKey = 0; + } + + /** + * creates and returns an sstable + * + * @param orphan if true, the sstable will be removed from the unrepaired strategy + */ + SSTableReader makeSSTable(boolean orphan) + { + int pk = nextSSTableKey++; + Set<SSTableReader> pre = cfs.getLiveSSTables(); + QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES(?, ?)", ks, tbl), pk, pk); + cfs.forceBlockingFlush(); + Set<SSTableReader> post = cfs.getLiveSSTables(); + Set<SSTableReader> diff = new HashSet<>(post); + diff.removeAll(pre); + assert diff.size() == 1; + SSTableReader sstable = diff.iterator().next(); + if (orphan) + { + Iterables.any(csm.getUnrepaired(), s -> s.getSSTables().contains(sstable)); + csm.getUnrepaired().forEach(s -> s.removeSSTable(sstable)); + } + return sstable; + } + + protected static void mutateRepaired(SSTableReader sstable, long repairedAt, UUID pendingRepair) + { + try + { + sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor, repairedAt, pendingRepair); + sstable.reloadSSTableMetadata(); + } + catch (IOException e) + { + throw new AssertionError(e); + } + } + + protected static void mutateRepaired(SSTableReader sstable, long repairedAt) + { + mutateRepaired(sstable, repairedAt, ActiveRepairService.NO_PENDING_REPAIR); + } + + protected static void mutateRepaired(SSTableReader sstable, UUID pendingRepair) + { + mutateRepaired(sstable, ActiveRepairService.UNREPAIRED_SSTABLE, pendingRepair); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java index 5a7bfed..41c090e 100644 --- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java @@ -29,6 +29,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import org.junit.BeforeClass; import org.junit.After; +import org.junit.Ignore; import org.junit.Test; import org.apache.cassandra.schema.TableMetadata; @@ -47,9 +48,11 @@ import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.UUIDGen; import org.apache.cassandra.utils.concurrent.Refs; import org.apache.cassandra.UpdateBuilder; +import static org.apache.cassandra.service.ActiveRepairService.UNREPAIRED_SSTABLE; import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -57,6 +60,8 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; +import static org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR; + public class AntiCompactionTest { @@ -80,9 +85,10 @@ public class AntiCompactionTest store.truncateBlocking(); } - @Test - public void antiCompactOne() throws Exception + private void antiCompactOne(long repairedAt, UUID pendingRepair) throws Exception { + assert repairedAt != UNREPAIRED_SSTABLE || pendingRepair != null; + ColumnFamilyStore store = prepareColumnFamilyStore(); Collection<SSTableReader> sstables = getUnrepairedSSTables(store); assertEquals(store.getLiveSSTables().size(), sstables.size()); @@ -90,15 +96,15 @@ public class AntiCompactionTest List<Range<Token>> ranges = Arrays.asList(range); int repairedKeys = 0; + int pendingKeys = 0; int nonRepairedKeys = 0; try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION); Refs<SSTableReader> refs = Refs.ref(sstables)) { if (txn == null) throw new IllegalStateException(); - long repairedAt = 1000; UUID parentRepairSession = UUID.randomUUID(); - CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, repairedAt, parentRepairSession); + CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, repairedAt, pendingRepair, parentRepairSession); } assertEquals(2, store.getLiveSSTables().size()); @@ -109,10 +115,11 @@ public class AntiCompactionTest while (scanner.hasNext()) { UnfilteredRowIterator row = scanner.next(); - if (sstable.isRepaired()) + if (sstable.isRepaired() || sstable.isPendingRepair()) { assertTrue(range.contains(row.partitionKey().getToken())); - repairedKeys++; + repairedKeys += sstable.isRepaired() ? 1 : 0; + pendingKeys += sstable.isPendingRepair() ? 1 : 0; } else { @@ -128,11 +135,25 @@ public class AntiCompactionTest assertEquals(1, sstable.selfRef().globalCount()); } assertEquals(0, store.getTracker().getCompacting().size()); - assertEquals(repairedKeys, 4); + assertEquals(repairedKeys, repairedAt != UNREPAIRED_SSTABLE ? 4 : 0); + assertEquals(pendingKeys, pendingRepair != NO_PENDING_REPAIR ? 4 : 0); assertEquals(nonRepairedKeys, 6); } @Test + public void antiCompactOneRepairedAt() throws Exception + { + antiCompactOne(1000, NO_PENDING_REPAIR); + } + + @Test + public void antiCompactOnePendingRepair() throws Exception + { + antiCompactOne(UNREPAIRED_SSTABLE, UUIDGen.getTimeUUID()); + } + + @Ignore + @Test public void antiCompactionSizeTest() throws InterruptedException, IOException { Keyspace keyspace = Keyspace.open(KEYSPACE1); @@ -147,7 +168,7 @@ public class AntiCompactionTest try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION); Refs<SSTableReader> refs = Refs.ref(sstables)) { - CompactionManager.instance.performAnticompaction(cfs, Arrays.asList(range), refs, txn, 12345, parentRepairSession); + CompactionManager.instance.performAnticompaction(cfs, Arrays.asList(range), refs, txn, 12345, NO_PENDING_REPAIR, parentRepairSession); } long sum = 0; long rows = 0; @@ -166,7 +187,7 @@ public class AntiCompactionTest File dir = cfs.getDirectories().getDirectoryForNewSSTables(); Descriptor desc = cfs.newSSTableDescriptor(dir); - try (SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, desc, 0, 0, new SerializationHeader(true, cfs.metadata(), cfs.metadata().regularAndStaticColumns(), EncodingStats.NO_STATS))) + try (SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, desc, 0, 0, NO_PENDING_REPAIR, new SerializationHeader(true, cfs.metadata(), cfs.metadata().regularAndStaticColumns(), EncodingStats.NO_STATS))) { for (int i = 0; i < count; i++) { @@ -230,7 +251,7 @@ public class AntiCompactionTest try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION); Refs<SSTableReader> refs = Refs.ref(sstables)) { - CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, repairedAt, parentRepairSession); + CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, repairedAt, NO_PENDING_REPAIR, parentRepairSession); } /* Anticompaction will be anti-compacting 10 SSTables but will be doing this two at a time @@ -267,8 +288,7 @@ public class AntiCompactionTest assertEquals(nonRepairedKeys, 60); } - @Test - public void shouldMutateRepairedAt() throws InterruptedException, IOException + private void shouldMutate(long repairedAt, UUID pendingRepair) throws InterruptedException, IOException { ColumnFamilyStore store = prepareColumnFamilyStore(); Collection<SSTableReader> sstables = getUnrepairedSSTables(store); @@ -280,15 +300,27 @@ public class AntiCompactionTest try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION); Refs<SSTableReader> refs = Refs.ref(sstables)) { - CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 1, parentRepairSession); + CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, repairedAt, pendingRepair, parentRepairSession); } assertThat(store.getLiveSSTables().size(), is(1)); - assertThat(Iterables.get(store.getLiveSSTables(), 0).isRepaired(), is(true)); + assertThat(Iterables.get(store.getLiveSSTables(), 0).isRepaired(), is(repairedAt != UNREPAIRED_SSTABLE)); + assertThat(Iterables.get(store.getLiveSSTables(), 0).isPendingRepair(), is(pendingRepair != NO_PENDING_REPAIR)); assertThat(Iterables.get(store.getLiveSSTables(), 0).selfRef().globalCount(), is(1)); assertThat(store.getTracker().getCompacting().size(), is(0)); } + @Test + public void shouldMutateRepairedAt() throws InterruptedException, IOException + { + shouldMutate(1, NO_PENDING_REPAIR); + } + + @Test + public void shouldMutatePendingRepair() throws InterruptedException, IOException + { + shouldMutate(UNREPAIRED_SSTABLE, UUIDGen.getTimeUUID()); + } @Test public void shouldSkipAntiCompactionForNonIntersectingRange() throws InterruptedException, IOException @@ -311,7 +343,7 @@ public class AntiCompactionTest try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION); Refs<SSTableReader> refs = Refs.ref(sstables)) { - CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 1, parentRepairSession); + CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 1, NO_PENDING_REPAIR, parentRepairSession); } assertThat(store.getLiveSSTables().size(), is(10)); @@ -348,5 +380,4 @@ public class AntiCompactionTest return ImmutableSet.copyOf(cfs.getTracker().getView().sstables(SSTableSet.LIVE, (s) -> !s.isRepaired())); } - } http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/db/compaction/CompactionManagerGetSSTablesForValidationTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionManagerGetSSTablesForValidationTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionManagerGetSSTablesForValidationTest.java new file mode 100644 index 0000000..0ee85c6 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionManagerGetSSTablesForValidationTest.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.compaction; + +import java.net.InetAddress; +import java.util.Collections; +import java.util.Iterator; +import java.util.Set; +import java.util.UUID; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.cql3.statements.CreateTableStatement; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.repair.RepairJobDesc; +import org.apache.cassandra.repair.Validator; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.UUIDGen; + +/** + * Tests correct sstables are returned from CompactionManager.getSSTablesForValidation + * for consistent, legacy incremental, and full repairs + */ +public class CompactionManagerGetSSTablesForValidationTest +{ + private String ks; + private static final String tbl = "tbl"; + private ColumnFamilyStore cfs; + private static InetAddress coordinator; + + private static Token MT; + + private SSTableReader repaired; + private SSTableReader unrepaired; + private SSTableReader pendingRepair; + + private UUID sessionID; + private RepairJobDesc desc; + + @BeforeClass + public static void setupClass() throws Exception + { + SchemaLoader.prepareServer(); + coordinator = InetAddress.getByName("10.0.0.1"); + MT = DatabaseDescriptor.getPartitioner().getMinimumToken(); + } + + @Before + public void setup() throws Exception + { + ks = "ks_" + System.currentTimeMillis(); + TableMetadata cfm = CreateTableStatement.parse(String.format("CREATE TABLE %s.%s (k INT PRIMARY KEY, v INT)", ks, tbl), ks).build(); + SchemaLoader.createKeyspace(ks, KeyspaceParams.simple(1), cfm); + cfs = Schema.instance.getColumnFamilyStoreInstance(cfm.id); + } + + private void makeSSTables() + { + for (int i=0; i<3; i++) + { + QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES(?, ?)", ks, tbl), i, i); + cfs.forceBlockingFlush(); + } + Assert.assertEquals(3, cfs.getLiveSSTables().size()); + + } + + private void registerRepair(boolean incremental) throws Exception + { + sessionID = UUIDGen.getTimeUUID(); + Range<Token> range = new Range<>(MT, MT); + ActiveRepairService.instance.registerParentRepairSession(sessionID, + coordinator, + Lists.newArrayList(cfs), + Sets.newHashSet(range), + incremental, + incremental ? System.currentTimeMillis() : ActiveRepairService.UNREPAIRED_SSTABLE, + true); + desc = new RepairJobDesc(sessionID, UUIDGen.getTimeUUID(), ks, tbl, Collections.singleton(range)); + } + + private void modifySSTables() throws Exception + { + Iterator<SSTableReader> iter = cfs.getLiveSSTables().iterator(); + + repaired = iter.next(); + repaired.descriptor.getMetadataSerializer().mutateRepaired(repaired.descriptor, System.currentTimeMillis(), null); + repaired.reloadSSTableMetadata(); + + pendingRepair = iter.next(); + pendingRepair.descriptor.getMetadataSerializer().mutateRepaired(pendingRepair.descriptor, ActiveRepairService.UNREPAIRED_SSTABLE, sessionID); + pendingRepair.reloadSSTableMetadata(); + + unrepaired = iter.next(); + + Assert.assertFalse(iter.hasNext()); + } + + @Test + public void consistentRepair() throws Exception + { + makeSSTables(); + registerRepair(true); + modifySSTables(); + + // get sstables for repair + Validator validator = new Validator(desc, coordinator, FBUtilities.nowInSeconds(), true); + Set<SSTableReader> sstables = Sets.newHashSet(CompactionManager.instance.getSSTablesToValidate(cfs, validator)); + Assert.assertNotNull(sstables); + Assert.assertEquals(1, sstables.size()); + Assert.assertTrue(sstables.contains(pendingRepair)); + } + + @Test + public void legacyIncrementalRepair() throws Exception + { + makeSSTables(); + registerRepair(true); + modifySSTables(); + + // get sstables for repair + Validator validator = new Validator(desc, coordinator, FBUtilities.nowInSeconds(), false); + Set<SSTableReader> sstables = Sets.newHashSet(CompactionManager.instance.getSSTablesToValidate(cfs, validator)); + Assert.assertNotNull(sstables); + Assert.assertEquals(2, sstables.size()); + Assert.assertTrue(sstables.contains(pendingRepair)); + Assert.assertTrue(sstables.contains(unrepaired)); + } + + @Test + public void fullRepair() throws Exception + { + makeSSTables(); + registerRepair(false); + modifySSTables(); + + // get sstables for repair + Validator validator = new Validator(desc, coordinator, FBUtilities.nowInSeconds(), false); + Set<SSTableReader> sstables = Sets.newHashSet(CompactionManager.instance.getSSTablesToValidate(cfs, validator)); + Assert.assertNotNull(sstables); + Assert.assertEquals(3, sstables.size()); + Assert.assertTrue(sstables.contains(pendingRepair)); + Assert.assertTrue(sstables.contains(unrepaired)); + Assert.assertTrue(sstables.contains(repaired)); + } +}