Snapshot only related SSTables when sequential repair patch by yukim; reviewed by jmckenzie for CASSANDRA-7024
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/de8a479f Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/de8a479f Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/de8a479f Branch: refs/heads/trunk Commit: de8a479f2e1a8b536dedf2e6470301709bc3d9dc Parents: b69f5e3 Author: Yuki Morishita <yu...@apache.org> Authored: Tue Apr 15 17:13:45 2014 -0500 Committer: Yuki Morishita <yu...@apache.org> Committed: Tue Apr 15 17:13:45 2014 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/ColumnFamilyStore.java | 18 ++++++- .../repair/RepairMessageVerbHandler.java | 33 +++++++++--- .../apache/cassandra/repair/SnapshotTask.java | 8 +-- .../repair/messages/RepairMessage.java | 3 +- .../repair/messages/SnapshotMessage.java | 53 ++++++++++++++++++++ 6 files changed, 100 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/de8a479f/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 592eef9..9f34023 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -45,6 +45,7 @@ * Add failure handler to async callback (CASSANDRA-6747) * Fix AE when closing SSTable without releasing reference (CASSANDRA-7000) * Clean up IndexInfo on keyspace/table drops (CASSANDRA-6924) + * Only snapshot relative SSTables when sequential repair (CASSANDRA-7024) Merged from 2.0: * Put nodes in hibernate when join_ring is false (CASSANDRA-6961) * Allow compaction of system tables during startup (CASSANDRA-6913) http://git-wip-us.apache.org/repos/asf/cassandra/blob/de8a479f/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index ffea243..923ea5b 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -30,6 +30,7 @@ import javax.management.*; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; +import com.google.common.base.Predicate; import com.google.common.collect.*; import com.google.common.util.concurrent.*; import com.google.common.util.concurrent.Futures; @@ -2153,6 +2154,11 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean public void snapshotWithoutFlush(String snapshotName) { + snapshotWithoutFlush(snapshotName, null); + } + + public void snapshotWithoutFlush(String snapshotName, Predicate<SSTableReader> predicate) + { for (ColumnFamilyStore cfs : concatWithIndexes()) { DataTracker.View currentView = cfs.markCurrentViewReferenced(); @@ -2161,6 +2167,11 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean { for (SSTableReader ssTable : currentView.sstables) { + if (predicate != null && !predicate.apply(ssTable)) + { + continue; + } + File snapshotDirectory = Directories.getSnapshotDirectory(ssTable.descriptor, snapshotName); ssTable.createLinks(snapshotDirectory.getPath()); // hard links if (logger.isDebugEnabled()) @@ -2190,8 +2201,13 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean */ public void snapshot(String snapshotName) { + snapshot(snapshotName, null); + } + + public void snapshot(String snapshotName, Predicate<SSTableReader> predicate) + { forceBlockingFlush(); - snapshotWithoutFlush(snapshotName); + snapshotWithoutFlush(snapshotName, predicate); } public boolean snapshotExists(String snapshotName) http://git-wip-us.apache.org/repos/asf/cassandra/blob/de8a479f/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java index bb66b69..d710652 100644 --- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java +++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java @@ -18,30 +18,32 @@ package org.apache.cassandra.repair; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.UUID; import java.util.concurrent.Future; +import com.google.common.base.Predicate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.dht.Bounds; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.repair.messages.AnticompactionRequest; -import org.apache.cassandra.repair.messages.PrepareMessage; -import org.apache.cassandra.repair.messages.RepairMessage; -import org.apache.cassandra.repair.messages.SyncRequest; -import org.apache.cassandra.repair.messages.ValidationRequest; +import org.apache.cassandra.repair.messages.*; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /** * Handles all repair related message. * @@ -71,6 +73,21 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage> MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from); break; + case SNAPSHOT: + ColumnFamilyStore cfs = Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily); + final Range<Token> repairingRange = desc.range; + cfs.snapshot(desc.sessionId.toString(), new Predicate<SSTableReader>() + { + public boolean apply(SSTableReader sstable) + { + return sstable != null && new Bounds<>(sstable.first.token, sstable.last.token).intersects(Collections.singleton(repairingRange)); + } + }); + + logger.debug("Enqueuing response to snapshot request {} to {}", desc.sessionId, message.from); + MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from); + break; + case VALIDATION_REQUEST: ValidationRequest validationRequest = (ValidationRequest) message.payload; // trigger read-only compaction http://git-wip-us.apache.org/repos/asf/cassandra/blob/de8a479f/src/java/org/apache/cassandra/repair/SnapshotTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/SnapshotTask.java b/src/java/org/apache/cassandra/repair/SnapshotTask.java index cb5003a..6c3afb1 100644 --- a/src/java/org/apache/cassandra/repair/SnapshotTask.java +++ b/src/java/org/apache/cassandra/repair/SnapshotTask.java @@ -18,15 +18,14 @@ package org.apache.cassandra.repair; import java.net.InetAddress; -import java.util.List; import java.util.concurrent.RunnableFuture; import com.google.common.util.concurrent.AbstractFuture; -import org.apache.cassandra.db.SnapshotCommand; import org.apache.cassandra.net.IAsyncCallbackWithFailure; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.repair.messages.SnapshotMessage; /** * SnapshotTask is a task that sends snapshot request. @@ -44,10 +43,7 @@ public class SnapshotTask extends AbstractFuture<InetAddress> implements Runnabl public void run() { - MessagingService.instance().sendRRWithFailure(new SnapshotCommand(desc.keyspace, - desc.columnFamily, - desc.sessionId.toString(), - false).createMessage(), + MessagingService.instance().sendRRWithFailure(new SnapshotMessage(desc).createMessage(), endpoint, new SnapshotCallback(this)); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/de8a479f/src/java/org/apache/cassandra/repair/messages/RepairMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java index 054fb55..d500928 100644 --- a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java +++ b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java @@ -44,7 +44,8 @@ public abstract class RepairMessage SYNC_REQUEST(2, SyncRequest.serializer), SYNC_COMPLETE(3, SyncComplete.serializer), ANTICOMPACTION_REQUEST(4, AnticompactionRequest.serializer), - PREPARE_MESSAGE(5, PrepareMessage.serializer); + PREPARE_MESSAGE(5, PrepareMessage.serializer), + SNAPSHOT(6, SnapshotMessage.serializer); private final byte type; private final MessageSerializer<RepairMessage> serializer; http://git-wip-us.apache.org/repos/asf/cassandra/blob/de8a479f/src/java/org/apache/cassandra/repair/messages/SnapshotMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/SnapshotMessage.java b/src/java/org/apache/cassandra/repair/messages/SnapshotMessage.java new file mode 100644 index 0000000..caccc82 --- /dev/null +++ b/src/java/org/apache/cassandra/repair/messages/SnapshotMessage.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.repair.messages; + +import java.io.DataInput; +import java.io.IOException; + +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.repair.RepairJobDesc; + +public class SnapshotMessage extends RepairMessage +{ + public final static MessageSerializer serializer = new SnapshotMessageSerializer(); + + public SnapshotMessage(RepairJobDesc desc) + { + super(Type.SNAPSHOT, desc); + } + + public static class SnapshotMessageSerializer implements MessageSerializer<SnapshotMessage> + { + public void serialize(SnapshotMessage message, DataOutputPlus out, int version) throws IOException + { + RepairJobDesc.serializer.serialize(message.desc, out, version); + } + + public SnapshotMessage deserialize(DataInput in, int version) throws IOException + { + RepairJobDesc desc = RepairJobDesc.serializer.deserialize(in, version); + return new SnapshotMessage(desc); + } + + public long serializedSize(SnapshotMessage message, int version) + { + return RepairJobDesc.serializer.serializedSize(message.desc, version); + } + } +}