Repository: cassandra Updated Branches: refs/heads/cassandra-3.0 8afc76ae6 -> f490ccec6
Avoid anticompaction after non-global repairs Patch by marcuse; reviewed by sankalp kohli for CASSANDRA-9142 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/842f1509 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/842f1509 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/842f1509 Branch: refs/heads/cassandra-3.0 Commit: 842f1509d46bf068abca1e064f23454892347e60 Parents: 134bcda Author: Marcus Eriksson <marc...@apache.org> Authored: Mon Jun 1 15:36:17 2015 +0200 Committer: Marcus Eriksson <marc...@apache.org> Committed: Tue Aug 25 19:00:12 2015 +0200 ---------------------------------------------------------------------- .../repair/RepairMessageVerbHandler.java | 14 +++++- .../apache/cassandra/repair/RepairRunnable.java | 2 +- .../repair/messages/PrepareMessage.java | 7 +-- .../repair/messages/RepairMessage.java | 1 + .../cassandra/repair/messages/RepairOption.java | 4 ++ .../cassandra/service/ActiveRepairService.java | 52 ++++++++++++++------ .../LeveledCompactionStrategyTest.java | 2 +- .../cassandra/repair/LocalSyncTaskTest.java | 2 +- 8 files changed, 60 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/842f1509/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java index c0855c4..796f135 100644 --- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java +++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java @@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.dht.Bounds; import org.apache.cassandra.dht.LocalPartitioner; @@ -41,6 +42,7 @@ import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.repair.messages.*; import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.utils.CassandraVersion; import org.apache.cassandra.utils.Pair; /** @@ -59,6 +61,7 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage> { switch (message.payload.messageType) { + case PREPARE_GLOBAL_MESSAGE: case PREPARE_MESSAGE: PrepareMessage prepareMessage = (PrepareMessage) message.payload; logger.debug("Preparing, {}", prepareMessage); @@ -69,10 +72,17 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage> ColumnFamilyStore columnFamilyStore = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right); columnFamilyStores.add(columnFamilyStore); } + CassandraVersion peerVersion = SystemKeyspace.getReleaseVersion(message.from); + // note that we default isGlobal to true since old version always default to true: + boolean isGlobal = peerVersion == null || + peerVersion.compareTo(ActiveRepairService.SUPPORTS_GLOBAL_PREPARE_FLAG_VERSION) < 0 || + message.payload.messageType.equals(RepairMessage.Type.PREPARE_GLOBAL_MESSAGE); + logger.debug("Received prepare message: global message = {}, peerVersion = {},", message.payload.messageType.equals(RepairMessage.Type.PREPARE_GLOBAL_MESSAGE), peerVersion); ActiveRepairService.instance.registerParentRepairSession(prepareMessage.parentRepairSession, columnFamilyStores, prepareMessage.ranges, - prepareMessage.isIncremental); + prepareMessage.isIncremental, + isGlobal); MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from); break; @@ -117,7 +127,7 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage> logger.debug("Syncing {}", request); long repairedAt = ActiveRepairService.UNREPAIRED_SSTABLE; if (desc.parentSessionId != null && ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId) != null) - repairedAt = ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId).repairedAt; + repairedAt = ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId).getRepairedAt(); StreamingRepairTask task = new StreamingRepairTask(desc, request, repairedAt); task.run(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/842f1509/src/java/org/apache/cassandra/repair/RepairRunnable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java index 28511db..91ac82a 100644 --- a/src/java/org/apache/cassandra/repair/RepairRunnable.java +++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java @@ -191,7 +191,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti try { ActiveRepairService.instance.prepareForRepair(parentSession, allNeighbors, options, columnFamilyStores); - repairedAt = ActiveRepairService.instance.getParentRepairSession(parentSession).repairedAt; + repairedAt = ActiveRepairService.instance.getParentRepairSession(parentSession).getRepairedAt(); progress.incrementAndGet(); } catch (Throwable t) http://git-wip-us.apache.org/repos/asf/cassandra/blob/842f1509/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java b/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java index 37dc07c..a57c27e 100644 --- a/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java +++ b/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java @@ -41,9 +41,9 @@ public class PrepareMessage extends RepairMessage public final UUID parentRepairSession; public final boolean isIncremental; - public PrepareMessage(UUID parentRepairSession, List<UUID> cfIds, Collection<Range<Token>> ranges, boolean isIncremental) + public PrepareMessage(UUID parentRepairSession, List<UUID> cfIds, Collection<Range<Token>> ranges, boolean isIncremental, boolean isGlobal) { - super(Type.PREPARE_MESSAGE, null); + super(isGlobal ? Type.PREPARE_GLOBAL_MESSAGE : Type.PREPARE_MESSAGE, null); this.parentRepairSession = parentRepairSession; this.cfIds = cfIds; this.ranges = ranges; @@ -79,7 +79,8 @@ public class PrepareMessage extends RepairMessage for (int i = 0; i < rangeCount; i++) ranges.add((Range<Token>) Range.tokenSerializer.deserialize(in, MessagingService.globalPartitioner(), version)); boolean isIncremental = in.readBoolean(); - return new PrepareMessage(parentRepairSession, cfIds, ranges, isIncremental); + + return new PrepareMessage(parentRepairSession, cfIds, ranges, isIncremental, false); } public long serializedSize(PrepareMessage message, int version) http://git-wip-us.apache.org/repos/asf/cassandra/blob/842f1509/src/java/org/apache/cassandra/repair/messages/RepairMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java index 6b5226d..d78c2fd 100644 --- a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java +++ b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java @@ -45,6 +45,7 @@ public abstract class RepairMessage SYNC_COMPLETE(3, SyncComplete.serializer), ANTICOMPACTION_REQUEST(4, AnticompactionRequest.serializer), PREPARE_MESSAGE(5, PrepareMessage.serializer), + PREPARE_GLOBAL_MESSAGE(8, PrepareMessage.serializer), SNAPSHOT(6, SnapshotMessage.serializer), CLEANUP(7, CleanupMessage.serializer); http://git-wip-us.apache.org/repos/asf/cassandra/blob/842f1509/src/java/org/apache/cassandra/repair/messages/RepairOption.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/RepairOption.java b/src/java/org/apache/cassandra/repair/messages/RepairOption.java index 7b9a9af..f3e452c 100644 --- a/src/java/org/apache/cassandra/repair/messages/RepairOption.java +++ b/src/java/org/apache/cassandra/repair/messages/RepairOption.java @@ -287,6 +287,10 @@ public class RepairOption return hosts; } + public boolean isGlobal() + { + return dataCenters.isEmpty() && hosts.isEmpty(); + } @Override public String toString() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/842f1509/src/java/org/apache/cassandra/service/ActiveRepairService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java index 213edeb..a6389ea 100644 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@ -36,6 +36,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; @@ -54,6 +55,7 @@ import org.apache.cassandra.repair.RepairJobDesc; import org.apache.cassandra.repair.RepairParallelism; import org.apache.cassandra.repair.RepairSession; import org.apache.cassandra.repair.messages.*; +import org.apache.cassandra.utils.CassandraVersion; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.UUIDGen; import org.apache.cassandra.utils.concurrent.Ref; @@ -75,6 +77,8 @@ import org.apache.cassandra.utils.concurrent.Refs; */ public class ActiveRepairService { + public static CassandraVersion SUPPORTS_GLOBAL_PREPARE_FLAG_VERSION = new CassandraVersion("2.2.1"); + private static final Logger logger = LoggerFactory.getLogger(ActiveRepairService.class); // singleton enforcement public static final ActiveRepairService instance = new ActiveRepairService(FailureDetector.instance, Gossiper.instance); @@ -233,7 +237,7 @@ public class ActiveRepairService public synchronized UUID prepareForRepair(UUID parentRepairSession, Set<InetAddress> endpoints, RepairOption options, List<ColumnFamilyStore> columnFamilyStores) { - registerParentRepairSession(parentRepairSession, columnFamilyStores, options.getRanges(), options.isIncremental()); + registerParentRepairSession(parentRepairSession, columnFamilyStores, options.getRanges(), options.isIncremental(), options.isGlobal()); final CountDownLatch prepareLatch = new CountDownLatch(endpoints.size()); final AtomicBoolean status = new AtomicBoolean(true); final Set<String> failedNodes = Collections.synchronizedSet(new HashSet<String>()); @@ -263,7 +267,10 @@ public class ActiveRepairService for (InetAddress neighbour : endpoints) { - PrepareMessage message = new PrepareMessage(parentRepairSession, cfIds, options.getRanges(), options.isIncremental()); + CassandraVersion peerVersion = SystemKeyspace.getReleaseVersion(neighbour); + boolean isGlobal = options.isGlobal() && peerVersion != null && peerVersion.compareTo(SUPPORTS_GLOBAL_PREPARE_FLAG_VERSION) >= 0; + logger.debug("Sending prepare message: options.isGlobal = {}, peerVersion = {}", options.isGlobal(), peerVersion); + PrepareMessage message = new PrepareMessage(parentRepairSession, cfIds, options.getRanges(), options.isIncremental(), isGlobal); MessageOut<RepairMessage> msg = message.createMessage(); MessagingService.instance().sendRR(msg, neighbour, callback, TimeUnit.HOURS.toMillis(1), true); } @@ -286,9 +293,9 @@ public class ActiveRepairService return parentRepairSession; } - public void registerParentRepairSession(UUID parentRepairSession, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental) + public void registerParentRepairSession(UUID parentRepairSession, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, boolean isGlobal) { - parentRepairSessions.put(parentRepairSession, new ParentRepairSession(columnFamilyStores, ranges, isIncremental, System.currentTimeMillis())); + parentRepairSessions.put(parentRepairSession, new ParentRepairSession(columnFamilyStores, ranges, isIncremental, isGlobal, System.currentTimeMillis())); } public Set<SSTableReader> currentlyRepairing(UUID cfId, UUID parentRepairSession) @@ -313,15 +320,15 @@ public class ActiveRepairService */ public synchronized ListenableFuture finishParentSession(UUID parentSession, Set<InetAddress> neighbors, Collection<Range<Token>> successfulRanges) { - List<ListenableFuture<?>> tasks = new ArrayList<>(neighbors.size() + 1); - for (InetAddress neighbor : neighbors) - { - AnticompactionTask task = new AnticompactionTask(parentSession, neighbor, successfulRanges); - tasks.add(task); - task.run(); // 'run' is just sending message - } - tasks.add(doAntiCompaction(parentSession, successfulRanges)); - return Futures.successfulAsList(tasks); + List<ListenableFuture<?>> tasks = new ArrayList<>(neighbors.size() + 1); + for (InetAddress neighbor : neighbors) + { + AnticompactionTask task = new AnticompactionTask(parentSession, neighbor, successfulRanges); + tasks.add(task); + task.run(); // 'run' is just sending message + } + tasks.add(doAntiCompaction(parentSession, successfulRanges)); + return Futures.successfulAsList(tasks); } public ParentRepairSession getParentRepairSession(UUID parentSessionId) @@ -346,6 +353,12 @@ public class ActiveRepairService { assert parentRepairSession != null; ParentRepairSession prs = getParentRepairSession(parentRepairSession); + if (!prs.isGlobal) + { + logger.info("Not a global repair, will not do anticompaction"); + removeParentRepairSession(parentRepairSession); + return Futures.immediateFuture(Collections.emptyList()); + } assert prs.ranges.containsAll(successfulRanges) : "Trying to perform anticompaction on unknown ranges"; List<ListenableFuture<?>> futures = new ArrayList<>(); @@ -400,15 +413,17 @@ public class ActiveRepairService private final Map<UUID, ColumnFamilyStore> columnFamilyStores = new HashMap<>(); private final Collection<Range<Token>> ranges; private final Map<UUID, Set<SSTableReader>> sstableMap = new HashMap<>(); - public final long repairedAt; + private final long repairedAt; public final boolean isIncremental; + private final boolean isGlobal; - public ParentRepairSession(List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, long repairedAt) + public ParentRepairSession(List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, boolean isGlobal, long repairedAt) { for (ColumnFamilyStore cfs : columnFamilyStores) this.columnFamilyStores.put(cfs.metadata.cfId, cfs); this.ranges = ranges; this.repairedAt = repairedAt; + this.isGlobal = isGlobal; this.isIncremental = isIncremental; } @@ -445,7 +460,12 @@ public class ActiveRepairService } return new Refs<>(references.build()); } - + public long getRepairedAt() + { + if (isGlobal) + return repairedAt; + return ActiveRepairService.UNREPAIRED_SSTABLE; + } @Override public String toString() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/842f1509/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java index 03aaf03..63fd0e7 100644 --- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java @@ -197,7 +197,7 @@ public class LeveledCompactionStrategyTest Range<Token> range = new Range<>(Util.token(""), Util.token("")); int gcBefore = keyspace.getColumnFamilyStore(CF_STANDARDDLEVELED).gcBefore(System.currentTimeMillis()); UUID parentRepSession = UUID.randomUUID(); - ActiveRepairService.instance.registerParentRepairSession(parentRepSession, Arrays.asList(cfs), Arrays.asList(range), false); + ActiveRepairService.instance.registerParentRepairSession(parentRepSession, Arrays.asList(cfs), Arrays.asList(range), false, true); RepairJobDesc desc = new RepairJobDesc(parentRepSession, UUID.randomUUID(), KEYSPACE1, CF_STANDARDDLEVELED, range); Validator validator = new Validator(desc, FBUtilities.getBroadcastAddress(), gcBefore); CompactionManager.instance.submitValidation(cfs, validator).get(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/842f1509/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java index 3a16262..e5c03b9 100644 --- a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java +++ b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java @@ -89,7 +89,7 @@ public class LocalSyncTaskTest extends SchemaLoader Keyspace keyspace = Keyspace.open(KEYSPACE1); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1"); - ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, Arrays.asList(cfs), Arrays.asList(range), false); + ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, Arrays.asList(cfs), Arrays.asList(range), false, false); RepairJobDesc desc = new RepairJobDesc(parentRepairSession, UUID.randomUUID(), KEYSPACE1, "Standard1", range);