Updated Branches: refs/heads/trunk 3e0b21c45 -> efbdee237
Decide on a gcBefore before sending out TreeRequests patch by marcuse, reviewed by yukim, for CASSANDRA-4932 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/efbdee23 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/efbdee23 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/efbdee23 Branch: refs/heads/trunk Commit: efbdee23705fed6be49a2fb20a79f1aaaa8c98ff Parents: 3e0b21c Author: Marcus Eriksson <marc...@spotify.com> Authored: Tue Apr 9 08:08:32 2013 +0200 Committer: Marcus Eriksson <marc...@spotify.com> Committed: Tue Apr 9 08:08:32 2013 +0200 ---------------------------------------------------------------------- CHANGES.txt | 3 +- .../cassandra/db/compaction/CompactionManager.java | 5 ++- .../cassandra/service/ActiveRepairService.java | 27 ++++++++++---- .../data/serialization/2.0/service.TreeRequest.bin | Bin 121 -> 129 bytes .../serialization/2.0/service.TreeResponse.bin | Bin 930 -> 946 bytes .../compaction/LeveledCompactionStrategyTest.java | 3 +- .../service/AntiEntropyServiceTestAbstract.java | 3 +- .../cassandra/service/SerializationsTest.java | 2 +- 8 files changed, 30 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/efbdee23/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index bb00642..b322425 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -28,7 +28,8 @@ * remove row-level bloom filters (CASSANDRA-4885) * Change Kernel Page Cache skipping into row preheating (disabled by default) (CASSANDRA-4937) - + * Improve repair by deciding on a gcBefore before sending + out TreeRequests (CASSANDRA-4932) 1.2.4 * Ensure that PerRowSecondaryIndex updates see the most recent values http://git-wip-us.apache.org/repos/asf/cassandra/blob/efbdee23/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 64cf253..b5aefee 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -618,7 +618,10 @@ public class CompactionManager implements CompactionManagerMBean // we don't mark validating sstables as compacting in DataTracker, so we have to mark them referenced // instead so they won't be cleaned up if they do get compacted during the validation sstables = cfs.markCurrentSSTablesReferenced(); - gcBefore = getDefaultGcBefore(cfs); + if (validator.request.gcBefore > 0) + gcBefore = validator.request.gcBefore; + else + gcBefore = getDefaultGcBefore(cfs); } CompactionIterable ci = new ValidationCompactionIterable(cfs, sstables, validator.request.range, gcBefore); http://git-wip-us.apache.org/repos/asf/cassandra/blob/efbdee23/src/java/org/apache/cassandra/service/ActiveRepairService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java index f103771..f7a5fa4 100644 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@ -442,7 +442,7 @@ public class ActiveRepairService public void doVerb(MessageIn<TreeRequest> message, int id) { TreeRequest remotereq = message.payload; - TreeRequest request = new TreeRequest(remotereq.sessionid, message.from, remotereq.range, remotereq.cf); + TreeRequest request = new TreeRequest(remotereq.sessionid, message.from, remotereq.range, remotereq.gcBefore, remotereq.cf); // trigger read-only compaction ColumnFamilyStore store = Table.open(request.cf.left).getColumnFamilyStore(request.cf.right); @@ -462,7 +462,7 @@ public class ActiveRepairService { // deserialize the remote tree, and register it Validator response = message.payload; - TreeRequest request = new TreeRequest(response.request.sessionid, message.from, response.request.range, response.request.cf); + TreeRequest request = new TreeRequest(response.request.sessionid, message.from, response.request.range, response.request.gcBefore, response.request.cf); ActiveRepairService.instance.rendezvous(request, response.tree); } } @@ -489,20 +489,22 @@ public class ActiveRepairService public final String sessionid; public final InetAddress endpoint; public final Range<Token> range; + public final int gcBefore; public final CFPair cf; - public TreeRequest(String sessionid, InetAddress endpoint, Range<Token> range, CFPair cf) + public TreeRequest(String sessionid, InetAddress endpoint, Range<Token> range, int gcBefore, CFPair cf) { this.sessionid = sessionid; this.endpoint = endpoint; this.cf = cf; + this.gcBefore = gcBefore; this.range = range; } @Override public final int hashCode() { - return Objects.hashCode(sessionid, endpoint, cf, range); + return Objects.hashCode(sessionid, endpoint, gcBefore, cf, range); } @Override @@ -512,13 +514,13 @@ public class ActiveRepairService return false; TreeRequest that = (TreeRequest)o; // handles nulls properly - return Objects.equal(sessionid, that.sessionid) && Objects.equal(endpoint, that.endpoint) && Objects.equal(cf, that.cf) && Objects.equal(range, that.range); + return Objects.equal(sessionid, that.sessionid) && Objects.equal(endpoint, that.endpoint) && gcBefore == that.gcBefore && Objects.equal(cf, that.cf) && Objects.equal(range, that.range); } @Override public String toString() { - return "#<TreeRequest " + sessionid + ", " + endpoint + ", " + cf + ", " + range + ">"; + return "#<TreeRequest " + sessionid + ", " + endpoint + ", " + gcBefore + ", " + cf + ", " + range + ">"; } public MessageOut<TreeRequest> createMessage() @@ -532,6 +534,9 @@ public class ActiveRepairService { out.writeUTF(request.sessionid); CompactEndpointSerializationHelper.serialize(request.endpoint, out); + + if (version >= MessagingService.VERSION_20) + out.writeInt(request.gcBefore); out.writeUTF(request.cf.left); out.writeUTF(request.cf.right); AbstractBounds.serializer.serialize(request.range, out, version); @@ -541,17 +546,21 @@ public class ActiveRepairService { String sessId = in.readUTF(); InetAddress endpoint = CompactEndpointSerializationHelper.deserialize(in); + int gcBefore = -1; + if (version >= MessagingService.VERSION_20) + gcBefore = in.readInt(); CFPair cfpair = new CFPair(in.readUTF(), in.readUTF()); Range<Token> range; range = (Range<Token>) AbstractBounds.serializer.deserialize(in, version); - return new TreeRequest(sessId, endpoint, range, cfpair); + return new TreeRequest(sessId, endpoint, range, gcBefore, cfpair); } public long serializedSize(TreeRequest request, int version) { return TypeSizes.NATIVE.sizeof(request.sessionid) + CompactEndpointSerializationHelper.serializedSize(request.endpoint) + + TypeSizes.NATIVE.sizeof(request.gcBefore) + TypeSizes.NATIVE.sizeof(request.cf.left) + TypeSizes.NATIVE.sizeof(request.cf.right) + AbstractBounds.serializer.serializedSize(request.range, version); @@ -833,8 +842,10 @@ public class ActiveRepairService if (isSequential) makeSnapshots(endpoints); + int gcBefore = (int)(System.currentTimeMillis()/1000) - Table.open(tablename).getColumnFamilyStore(cfname).metadata.getGcGraceSeconds(); + for (InetAddress endpoint : allEndpoints) - treeRequests.add(new TreeRequest(getName(), endpoint, range, new CFPair(tablename, cfname))); + treeRequests.add(new TreeRequest(getName(), endpoint, range, gcBefore, new CFPair(tablename, cfname))); logger.info(String.format("[repair #%s] requesting merkle trees for %s (to %s)", getName(), cfname, allEndpoints)); treeRequests.start(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/efbdee23/test/data/serialization/2.0/service.TreeRequest.bin ---------------------------------------------------------------------- diff --git a/test/data/serialization/2.0/service.TreeRequest.bin b/test/data/serialization/2.0/service.TreeRequest.bin index b12a1b8..b336e50 100644 Binary files a/test/data/serialization/2.0/service.TreeRequest.bin and b/test/data/serialization/2.0/service.TreeRequest.bin differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/efbdee23/test/data/serialization/2.0/service.TreeResponse.bin ---------------------------------------------------------------------- diff --git a/test/data/serialization/2.0/service.TreeResponse.bin b/test/data/serialization/2.0/service.TreeResponse.bin index 4beb410..b63d8a2 100644 Binary files a/test/data/serialization/2.0/service.TreeResponse.bin and b/test/data/serialization/2.0/service.TreeResponse.bin differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/efbdee23/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java index 4e46719..f4fd960 100644 --- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java @@ -91,7 +91,8 @@ public class LeveledCompactionStrategyTest extends SchemaLoader ActiveRepairService.CFPair p = new ActiveRepairService.CFPair(ksname, cfname); Range<Token> range = new Range<Token>(Util.token(""), Util.token("")); - ActiveRepairService.TreeRequest req = new ActiveRepairService.TreeRequest("1", FBUtilities.getLocalAddress(), range, p); + int gcBefore = (int)(System.currentTimeMillis()/1000) - table.getColumnFamilyStore(cfname).metadata.getGcGraceSeconds(); + ActiveRepairService.TreeRequest req = new ActiveRepairService.TreeRequest("1", FBUtilities.getLocalAddress(), range, gcBefore, p); ActiveRepairService.Validator validator = new ActiveRepairService.Validator(req); CompactionManager.instance.submitValidation(store, validator).get(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/efbdee23/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java index 3b14b64..50ae5e4 100644 --- a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java +++ b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java @@ -105,7 +105,8 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader local_range = StorageService.instance.getLocalPrimaryRange(); // (we use REMOTE instead of LOCAL so that the reponses for the validator.complete() get lost) - request = new TreeRequest(UUID.randomUUID().toString(), REMOTE, local_range, new CFPair(tablename, cfname)); + int gcBefore = (int)(System.currentTimeMillis()/1000) - store.metadata.getGcGraceSeconds(); + request = new TreeRequest(UUID.randomUUID().toString(), REMOTE, local_range, gcBefore, new CFPair(tablename, cfname)); // Set a fake session corresponding to this fake request ActiveRepairService.instance.submitArtificialRepairSession(request, tablename, cfname); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/efbdee23/test/unit/org/apache/cassandra/service/SerializationsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/SerializationsTest.java b/test/unit/org/apache/cassandra/service/SerializationsTest.java index a733059..02c2ff7 100644 --- a/test/unit/org/apache/cassandra/service/SerializationsTest.java +++ b/test/unit/org/apache/cassandra/service/SerializationsTest.java @@ -106,6 +106,6 @@ public class SerializationsTest extends AbstractSerializationsTester private static class Statics { private static final ActiveRepairService.CFPair pair = new ActiveRepairService.CFPair("Keyspace1", "Standard1"); - private static final ActiveRepairService.TreeRequest req = new ActiveRepairService.TreeRequest("sessionId", FBUtilities.getBroadcastAddress(), FULL_RANGE, pair); + private static final ActiveRepairService.TreeRequest req = new ActiveRepairService.TreeRequest("sessionId", FBUtilities.getBroadcastAddress(), FULL_RANGE, 1234, pair); } }