Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 33279dd8c -> 2f7077c06


Pick sstables to validate as late as possible with inc repairs

Patch by marcuse; reviewed by yukim for CASSANDRA-8366


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2f7077c0
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2f7077c0
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2f7077c0

Branch: refs/heads/cassandra-2.1
Commit: 2f7077c06ccbd5e8e7259c6891fe98d83ec3359d
Parents: 33279dd
Author: Marcus Eriksson <marc...@apache.org>
Authored: Tue Feb 17 16:20:35 2015 +0100
Committer: Marcus Eriksson <marc...@apache.org>
Committed: Tue Mar 3 10:32:46 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  | 14 +++++++
 .../db/compaction/CompactionManager.java        | 14 ++++++-
 .../cassandra/service/ActiveRepairService.java  | 41 +++++++-------------
 4 files changed, 42 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f7077c0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 15a5a61..c3c7a19 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.4
+ * Pick sstables for validation as late as possible inc repairs 
(CASSANDRA-8366)
  * Fix commitlog getPendingTasks to not increment (CASSANDRA-8856)
  * Fix parallelism adjustment in range and secondary index queries
    when the first fetch does not satisfy the limit (CASSANDRA-8856)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f7077c0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 62aadf9..e4531f2 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -2926,4 +2926,18 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
             return new ArrayList<>(view.sstables);
         }
     };
+
+    public static final Function<DataTracker.View, List<SSTableReader>> 
UNREPAIRED_SSTABLES = new Function<DataTracker.View, List<SSTableReader>>()
+    {
+        public List<SSTableReader> apply(DataTracker.View view)
+        {
+            List<SSTableReader> sstables = new ArrayList<>();
+            for (SSTableReader sstable : view.sstables)
+            {
+                if (!sstable.isRepaired())
+                    sstables.add(sstable);
+            }
+            return sstables;
+        }
+    };
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f7077c0/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 68313a3..e54a25f 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -956,7 +956,19 @@ public class CompactionManager implements 
CompactionManagerMBean
                 if (validator.desc.parentSessionId == null || 
ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId)
 == null)
                     sstables = 
cfs.selectAndReference(ColumnFamilyStore.ALL_SSTABLES).refs;
                 else
-                    sstables = 
ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId).getAndReferenceSSTables(cfs.metadata.cfId);
+                {
+                    ColumnFamilyStore.RefViewFragment refView = 
cfs.selectAndReference(ColumnFamilyStore.UNREPAIRED_SSTABLES);
+                    sstables = refView.refs;
+                    Set<SSTableReader> currentlyRepairing = 
ActiveRepairService.instance.currentlyRepairing(cfs.metadata.cfId, 
validator.desc.parentSessionId);
+
+                    if (!Sets.intersection(currentlyRepairing, 
Sets.newHashSet(refView.sstables)).isEmpty())
+                    {
+                        logger.error("Cannot start multiple repair sessions 
over the same sstables");
+                        throw new RuntimeException("Cannot start multiple 
repair sessions over the same sstables");
+                    }
+
+                    
ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId).addSSTables(cfs.metadata.cfId,
 refView.sstables);
+                }
 
                 if (validator.gcBefore > 0)
                     gcBefore = validator.gcBefore;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f7077c0/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java 
b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index bf1cdd6..f71cb6b 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -305,38 +305,16 @@ public class ActiveRepairService
 
     public synchronized void registerParentRepairSession(UUID 
parentRepairSession, List<ColumnFamilyStore> columnFamilyStores, 
Collection<Range<Token>> ranges)
     {
-        Map<UUID, Set<SSTableReader>> sstablesToRepair = new HashMap<>();
-        for (ColumnFamilyStore cfs : columnFamilyStores)
-        {
-            Set<SSTableReader> sstables = new HashSet<>();
-            Set<SSTableReader> currentlyRepairing = 
currentlyRepairing(cfs.metadata.cfId);
-            for (SSTableReader sstable : cfs.getSSTables())
-            {
-                if (new Bounds<>(sstable.first.getToken(), 
sstable.last.getToken()).intersects(ranges))
-                {
-                    if (!sstable.isRepaired())
-                    {
-                        if (currentlyRepairing.contains(sstable))
-                        {
-                            logger.error("Already repairing "+sstable+", can 
not continue.");
-                            throw new RuntimeException("Already repairing 
"+sstable+", can not continue.");
-                        }
-                        sstables.add(sstable);
-                    }
-                }
-            }
-            sstablesToRepair.put(cfs.metadata.cfId, sstables);
-        }
-        parentRepairSessions.put(parentRepairSession, new 
ParentRepairSession(columnFamilyStores, ranges, sstablesToRepair, 
System.currentTimeMillis()));
+        parentRepairSessions.put(parentRepairSession, new 
ParentRepairSession(columnFamilyStores, ranges, System.currentTimeMillis()));
     }
 
-    private Set<SSTableReader> currentlyRepairing(UUID cfId)
+    public Set<SSTableReader> currentlyRepairing(UUID cfId, UUID 
parentRepairSession)
     {
         Set<SSTableReader> repairing = new HashSet<>();
         for (Map.Entry<UUID, ParentRepairSession> entry : 
parentRepairSessions.entrySet())
         {
             Collection<SSTableReader> sstables = 
entry.getValue().sstableMap.get(cfId);
-            if (sstables != null)
+            if (sstables != null && 
!entry.getKey().equals(parentRepairSession))
                 repairing.addAll(sstables);
         }
         return repairing;
@@ -419,12 +397,12 @@ public class ActiveRepairService
         public final Map<UUID, Set<SSTableReader>> sstableMap;
         public final long repairedAt;
 
-        public ParentRepairSession(List<ColumnFamilyStore> columnFamilyStores, 
Collection<Range<Token>> ranges, Map<UUID, Set<SSTableReader>> sstables, long 
repairedAt)
+        public ParentRepairSession(List<ColumnFamilyStore> columnFamilyStores, 
Collection<Range<Token>> ranges, long repairedAt)
         {
             for (ColumnFamilyStore cfs : columnFamilyStores)
                 this.columnFamilyStores.put(cfs.metadata.cfId, cfs);
             this.ranges = ranges;
-            this.sstableMap = sstables;
+            this.sstableMap = new HashMap<>();
             this.repairedAt = repairedAt;
         }
 
@@ -452,6 +430,15 @@ public class ActiveRepairService
             return new Refs<>(references.build());
         }
 
+        public void addSSTables(UUID cfId, Collection<SSTableReader> sstables)
+        {
+            Set<SSTableReader> existingSSTables = this.sstableMap.get(cfId);
+            if (existingSSTables == null)
+                existingSSTables = new HashSet<>();
+            existingSSTables.addAll(sstables);
+            this.sstableMap.put(cfId, existingSSTables);
+        }
+
         @Override
         public String toString()
         {

Reply via email to