Updated Branches:
  refs/heads/trunk 3e0b21c45 -> efbdee237

Decide on a gcBefore before sending out TreeRequests

patch by marcuse, reviewed by yukim, for CASSANDRA-4932


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/efbdee23
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/efbdee23
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/efbdee23

Branch: refs/heads/trunk
Commit: efbdee23705fed6be49a2fb20a79f1aaaa8c98ff
Parents: 3e0b21c
Author: Marcus Eriksson <marc...@spotify.com>
Authored: Tue Apr 9 08:08:32 2013 +0200
Committer: Marcus Eriksson <marc...@spotify.com>
Committed: Tue Apr 9 08:08:32 2013 +0200

----------------------------------------------------------------------
 CHANGES.txt                                        |    3 +-
 .../cassandra/db/compaction/CompactionManager.java |    5 ++-
 .../cassandra/service/ActiveRepairService.java     |   27 ++++++++++----
 .../data/serialization/2.0/service.TreeRequest.bin |  Bin 121 -> 129 bytes
 .../serialization/2.0/service.TreeResponse.bin     |  Bin 930 -> 946 bytes
 .../compaction/LeveledCompactionStrategyTest.java  |    3 +-
 .../service/AntiEntropyServiceTestAbstract.java    |    3 +-
 .../cassandra/service/SerializationsTest.java      |    2 +-
 8 files changed, 30 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/efbdee23/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bb00642..b322425 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -28,7 +28,8 @@
  * remove row-level bloom filters (CASSANDRA-4885)
  * Change Kernel Page Cache skipping into row preheating (disabled by default)
    (CASSANDRA-4937)
-
+ * Improve repair by deciding on a gcBefore before sending
+   out TreeRequests (CASSANDRA-4932)
 
 1.2.4
  * Ensure that PerRowSecondaryIndex updates see the most recent values

http://git-wip-us.apache.org/repos/asf/cassandra/blob/efbdee23/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 64cf253..b5aefee 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -618,7 +618,10 @@ public class CompactionManager implements 
CompactionManagerMBean
             // we don't mark validating sstables as compacting in DataTracker, 
so we have to mark them referenced
             // instead so they won't be cleaned up if they do get compacted 
during the validation
             sstables = cfs.markCurrentSSTablesReferenced();
-            gcBefore = getDefaultGcBefore(cfs);
+            if (validator.request.gcBefore > 0)
+                gcBefore = validator.request.gcBefore;
+            else
+                gcBefore = getDefaultGcBefore(cfs);
         }
 
         CompactionIterable ci = new ValidationCompactionIterable(cfs, 
sstables, validator.request.range, gcBefore);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/efbdee23/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java 
b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index f103771..f7a5fa4 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -442,7 +442,7 @@ public class ActiveRepairService
         public void doVerb(MessageIn<TreeRequest> message, int id)
         {
             TreeRequest remotereq = message.payload;
-            TreeRequest request = new TreeRequest(remotereq.sessionid, 
message.from, remotereq.range, remotereq.cf);
+            TreeRequest request = new TreeRequest(remotereq.sessionid, 
message.from, remotereq.range, remotereq.gcBefore, remotereq.cf);
 
             // trigger read-only compaction
             ColumnFamilyStore store = 
Table.open(request.cf.left).getColumnFamilyStore(request.cf.right);
@@ -462,7 +462,7 @@ public class ActiveRepairService
         {
             // deserialize the remote tree, and register it
             Validator response = message.payload;
-            TreeRequest request = new TreeRequest(response.request.sessionid, 
message.from, response.request.range, response.request.cf);
+            TreeRequest request = new TreeRequest(response.request.sessionid, 
message.from, response.request.range, response.request.gcBefore, 
response.request.cf);
             ActiveRepairService.instance.rendezvous(request, response.tree);
         }
     }
@@ -489,20 +489,22 @@ public class ActiveRepairService
         public final String sessionid;
         public final InetAddress endpoint;
         public final Range<Token> range;
+        public final int gcBefore;
         public final CFPair cf;
 
-        public TreeRequest(String sessionid, InetAddress endpoint, 
Range<Token> range, CFPair cf)
+        public TreeRequest(String sessionid, InetAddress endpoint, 
Range<Token> range, int gcBefore, CFPair cf)
         {
             this.sessionid = sessionid;
             this.endpoint = endpoint;
             this.cf = cf;
+            this.gcBefore = gcBefore;
             this.range = range;
         }
 
         @Override
         public final int hashCode()
         {
-            return Objects.hashCode(sessionid, endpoint, cf, range);
+            return Objects.hashCode(sessionid, endpoint, gcBefore, cf, range);
         }
 
         @Override
@@ -512,13 +514,13 @@ public class ActiveRepairService
                 return false;
             TreeRequest that = (TreeRequest)o;
             // handles nulls properly
-            return Objects.equal(sessionid, that.sessionid) && 
Objects.equal(endpoint, that.endpoint) && Objects.equal(cf, that.cf) && 
Objects.equal(range, that.range);
+            return Objects.equal(sessionid, that.sessionid) && 
Objects.equal(endpoint, that.endpoint) && gcBefore == that.gcBefore && 
Objects.equal(cf, that.cf) && Objects.equal(range, that.range);
         }
 
         @Override
         public String toString()
         {
-            return "#<TreeRequest " + sessionid + ", " + endpoint + ", " + cf 
+ ", " + range + ">";
+            return "#<TreeRequest " + sessionid + ", " + endpoint + ", " + 
gcBefore + ", " + cf  + ", " + range + ">";
         }
 
         public MessageOut<TreeRequest> createMessage()
@@ -532,6 +534,9 @@ public class ActiveRepairService
             {
                 out.writeUTF(request.sessionid);
                 CompactEndpointSerializationHelper.serialize(request.endpoint, 
out);
+
+                if (version >= MessagingService.VERSION_20)
+                    out.writeInt(request.gcBefore);
                 out.writeUTF(request.cf.left);
                 out.writeUTF(request.cf.right);
                 AbstractBounds.serializer.serialize(request.range, out, 
version);
@@ -541,17 +546,21 @@ public class ActiveRepairService
             {
                 String sessId = in.readUTF();
                 InetAddress endpoint = 
CompactEndpointSerializationHelper.deserialize(in);
+                int gcBefore = -1;
+                if (version >= MessagingService.VERSION_20)
+                    gcBefore = in.readInt();
                 CFPair cfpair = new CFPair(in.readUTF(), in.readUTF());
                 Range<Token> range;
                 range = (Range<Token>) 
AbstractBounds.serializer.deserialize(in, version);
 
-                return new TreeRequest(sessId, endpoint, range, cfpair);
+                return new TreeRequest(sessId, endpoint, range, gcBefore, 
cfpair);
             }
 
             public long serializedSize(TreeRequest request, int version)
             {
                 return TypeSizes.NATIVE.sizeof(request.sessionid)
                      + 
CompactEndpointSerializationHelper.serializedSize(request.endpoint)
+                     + TypeSizes.NATIVE.sizeof(request.gcBefore)
                      + TypeSizes.NATIVE.sizeof(request.cf.left)
                      + TypeSizes.NATIVE.sizeof(request.cf.right)
                      + AbstractBounds.serializer.serializedSize(request.range, 
version);
@@ -833,8 +842,10 @@ public class ActiveRepairService
                 if (isSequential)
                     makeSnapshots(endpoints);
 
+                int gcBefore = (int)(System.currentTimeMillis()/1000) - 
Table.open(tablename).getColumnFamilyStore(cfname).metadata.getGcGraceSeconds();
+
                 for (InetAddress endpoint : allEndpoints)
-                    treeRequests.add(new TreeRequest(getName(), endpoint, 
range, new CFPair(tablename, cfname)));
+                    treeRequests.add(new TreeRequest(getName(), endpoint, 
range, gcBefore, new CFPair(tablename, cfname)));
 
                 logger.info(String.format("[repair #%s] requesting merkle 
trees for %s (to %s)", getName(), cfname, allEndpoints));
                 treeRequests.start();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/efbdee23/test/data/serialization/2.0/service.TreeRequest.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/2.0/service.TreeRequest.bin 
b/test/data/serialization/2.0/service.TreeRequest.bin
index b12a1b8..b336e50 100644
Binary files a/test/data/serialization/2.0/service.TreeRequest.bin and 
b/test/data/serialization/2.0/service.TreeRequest.bin differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/efbdee23/test/data/serialization/2.0/service.TreeResponse.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/2.0/service.TreeResponse.bin 
b/test/data/serialization/2.0/service.TreeResponse.bin
index 4beb410..b63d8a2 100644
Binary files a/test/data/serialization/2.0/service.TreeResponse.bin and 
b/test/data/serialization/2.0/service.TreeResponse.bin differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/efbdee23/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
 
b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index 4e46719..f4fd960 100644
--- 
a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ 
b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -91,7 +91,8 @@ public class LeveledCompactionStrategyTest extends 
SchemaLoader
 
         ActiveRepairService.CFPair p = new ActiveRepairService.CFPair(ksname, 
cfname);
         Range<Token> range = new Range<Token>(Util.token(""), Util.token(""));
-        ActiveRepairService.TreeRequest req = new 
ActiveRepairService.TreeRequest("1", FBUtilities.getLocalAddress(), range, p);
+        int gcBefore = (int)(System.currentTimeMillis()/1000) - 
table.getColumnFamilyStore(cfname).metadata.getGcGraceSeconds();
+        ActiveRepairService.TreeRequest req = new 
ActiveRepairService.TreeRequest("1", FBUtilities.getLocalAddress(), range, 
gcBefore, p);
         ActiveRepairService.Validator validator = new 
ActiveRepairService.Validator(req);
         CompactionManager.instance.submitValidation(store, validator).get();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/efbdee23/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java 
b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
index 3b14b64..50ae5e4 100644
--- a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
+++ b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
@@ -105,7 +105,8 @@ public abstract class AntiEntropyServiceTestAbstract 
extends SchemaLoader
         local_range = StorageService.instance.getLocalPrimaryRange();
 
         // (we use REMOTE instead of LOCAL so that the reponses for the 
validator.complete() get lost)
-        request = new TreeRequest(UUID.randomUUID().toString(), REMOTE, 
local_range, new CFPair(tablename, cfname));
+        int gcBefore = (int)(System.currentTimeMillis()/1000) - 
store.metadata.getGcGraceSeconds();
+        request = new TreeRequest(UUID.randomUUID().toString(), REMOTE, 
local_range, gcBefore, new CFPair(tablename, cfname));
         // Set a fake session corresponding to this fake request
         ActiveRepairService.instance.submitArtificialRepairSession(request, 
tablename, cfname);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/efbdee23/test/unit/org/apache/cassandra/service/SerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/SerializationsTest.java 
b/test/unit/org/apache/cassandra/service/SerializationsTest.java
index a733059..02c2ff7 100644
--- a/test/unit/org/apache/cassandra/service/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/service/SerializationsTest.java
@@ -106,6 +106,6 @@ public class SerializationsTest extends 
AbstractSerializationsTester
     private static class Statics
     {
         private static final ActiveRepairService.CFPair pair = new 
ActiveRepairService.CFPair("Keyspace1", "Standard1");
-        private static final ActiveRepairService.TreeRequest req = new 
ActiveRepairService.TreeRequest("sessionId", FBUtilities.getBroadcastAddress(), 
FULL_RANGE, pair);
+        private static final ActiveRepairService.TreeRequest req = new 
ActiveRepairService.TreeRequest("sessionId", FBUtilities.getBroadcastAddress(), 
FULL_RANGE, 1234, pair);
     }
 }

Reply via email to