Stop SSTables being lost from compaction strategy after full repairs

patch by Kurt Greaves; reviewed by Stefan Podkowinski, Marcus Eriksson, for 
CASSANDRA-14423


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f8912ce9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f8912ce9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f8912ce9

Branch: refs/heads/trunk
Commit: f8912ce9329a8bc360e93cf61e56814135fbab39
Parents: 1143bc1
Author: kurt <k...@instaclustr.com>
Authored: Thu Jun 14 10:59:19 2018 +0000
Committer: Mick Semb Wever <m...@apache.org>
Committed: Fri Jun 29 16:49:53 2018 +1000

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../db/compaction/CompactionManager.java        |  70 ++++++-----
 .../db/compaction/AntiCompactionTest.java       | 120 ++++++++++++++++++-
 3 files changed, 156 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8912ce9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7b1089e..9d6a9ea 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.13
+ * Fix bug that prevented compaction of SSTables after full repairs 
(CASSANDRA-14423)
  * Incorrect counting of pending messages in OutboundTcpConnection 
(CASSANDRA-11551)
  * Fix compaction failure caused by reading un-flushed data (CASSANDRA-12743)
  * Use Bounds instead of Range for sstables in anticompaction (CASSANDRA-14411)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8912ce9/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 419f66e..013fc04 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -460,6 +460,16 @@ public class CompactionManager implements 
CompactionManagerMBean
         }, jobs, OperationType.CLEANUP);
     }
 
+    /**
+     * Submit anti-compactions for a collection of SSTables over a set of 
repaired ranges and marks corresponding SSTables
+     * as repaired.
+     *
+     * @param cfs Column family for anti-compaction
+     * @param ranges Repaired ranges to be anti-compacted into separate 
SSTables.
+     * @param sstables {@link Refs} of SSTables within CF to anti-compact.
+     * @param repairedAt Unix timestamp of when repair was completed.
+     * @return Futures executing anti-compaction.
+     */
     public ListenableFuture<?> submitAntiCompaction(final ColumnFamilyStore 
cfs,
                                           final Collection<Range<Token>> 
ranges,
                                           final Refs<SSTableReader> sstables,
@@ -475,6 +485,8 @@ public class CompactionManager implements 
CompactionManagerMBean
                 {
                     for (SSTableReader compactingSSTable : 
cfs.getTracker().getCompacting())
                         sstables.releaseIfHolds(compactingSSTable);
+                    // We don't anti-compact any SSTable that has been 
compacted during repair as it may have been compacted
+                    // with unrepaired data.
                     Set<SSTableReader> compactedSSTables = new HashSet<>();
                     for (SSTableReader sstable : sstables)
                         if (sstable.isMarkedCompacted())
@@ -504,9 +516,17 @@ public class CompactionManager implements 
CompactionManagerMBean
      *
      * Caller must reference the validatedForRepair sstables (via 
ParentRepairSession.getActiveRepairedSSTableRefs(..)).
      *
+     * NOTE: Repairs can take place on both unrepaired (incremental + full) 
and repaired (full) data.
+     * Although anti-compaction could work on repaired sstables as well and 
would result in having more accurate
+     * repairedAt values for these, we avoid anti-compacting already repaired 
sstables, as we currently don't
+     * make use of any actual repairedAt value and splitting up sstables just 
for that is not worth it. However, we will
+     * still update repairedAt if the SSTable is fully contained within the 
repaired ranges, as this does not require
+     * anticompaction.
+     *
      * @param cfs
      * @param ranges Ranges that the repair was carried out on
      * @param validatedForRepair SSTables containing the repaired ranges. 
Should be referenced before passing them.
+     * @param txn Transaction across all SSTables that were repaired.
      * @throws InterruptedException
      * @throws IOException
      */
@@ -519,13 +539,7 @@ public class CompactionManager implements 
CompactionManagerMBean
         logger.info("Starting anticompaction for {}.{} on {}/{} sstables", 
cfs.keyspace.getName(), cfs.getColumnFamilyName(), validatedForRepair.size(), 
cfs.getSSTables().size());
         logger.trace("Starting anticompaction for ranges {}", ranges);
         Set<SSTableReader> sstables = new HashSet<>(validatedForRepair);
-        Set<SSTableReader> mutatedRepairStatuses = new HashSet<>();
-        // we should only notify that repair status changed if it actually did:
-        Set<SSTableReader> mutatedRepairStatusToNotify = new HashSet<>();
-        Map<SSTableReader, Boolean> wasRepairedBefore = new HashMap<>();
-        for (SSTableReader sstable : sstables)
-            wasRepairedBefore.put(sstable, sstable.isRepaired());
-
+        Set<SSTableReader> mutatedRepairStatuses = new HashSet<>(); // 
SSTables that were completely repaired only
         Set<SSTableReader> nonAnticompacting = new HashSet<>();
 
         Iterator<SSTableReader> sstableIterator = sstables.iterator();
@@ -536,6 +550,11 @@ public class CompactionManager implements 
CompactionManagerMBean
             while (sstableIterator.hasNext())
             {
                 SSTableReader sstable = sstableIterator.next();
+                List<String> anticompactRanges = new ArrayList<>();
+                // We don't anti-compact SSTables already marked repaired. See 
CASSANDRA-13153
+                // and CASSANDRA-14423.
+                if (sstable.isRepaired()) // We never anti-compact already 
repaired SSTables
+                    nonAnticompacting.add(sstable);
 
                 Bounds<Token> sstableBounds = new 
Bounds<>(sstable.first.getToken(), sstable.last.getToken());
 
@@ -548,28 +567,30 @@ public class CompactionManager implements 
CompactionManagerMBean
                         logger.info("SSTable {} fully contained in range {}, 
mutating repairedAt instead of anticompacting", sstable, r);
                         
sstable.descriptor.getMetadataSerializer().mutateRepairedAt(sstable.descriptor, 
repairedAt);
                         sstable.reloadSSTableMetadata();
-                        mutatedRepairStatuses.add(sstable);
-                        if (!wasRepairedBefore.get(sstable))
-                            mutatedRepairStatusToNotify.add(sstable);
+                        if (!nonAnticompacting.contains(sstable)) // don't 
notify if the SSTable was already repaired
+                            mutatedRepairStatuses.add(sstable);
                         sstableIterator.remove();
                         shouldAnticompact = true;
                         break;
                     }
-                    else if (r.intersects(sstableBounds))
+                    else if (r.intersects(sstableBounds) && 
!nonAnticompacting.contains(sstable))
                     {
-                        logger.info("SSTable {} ({}) will be anticompacted on 
range {}", sstable, sstableBounds, r);
+                        anticompactRanges.add(r.toString());
                         shouldAnticompact = true;
                     }
                 }
 
+                if (!anticompactRanges.isEmpty())
+                    logger.info("SSTable {} ({}) will be anticompacted on 
ranges: {}", sstable, sstableBounds, String.join(", ", anticompactRanges));
+
                 if (!shouldAnticompact)
                 {
-                    logger.info("SSTable {} ({}) does not intersect repaired 
ranges {}, not touching repairedAt.", sstable, sstableBounds, normalizedRanges);
+                    logger.info("SSTable {} ({}) not subject to anticompaction 
of repaired ranges {}, not touching repairedAt.", sstable, sstableBounds, 
normalizedRanges);
                     nonAnticompacting.add(sstable);
                     sstableIterator.remove();
                 }
             }
-            
cfs.getTracker().notifySSTableRepairedStatusChanged(mutatedRepairStatusToNotify);
+            
cfs.getTracker().notifySSTableRepairedStatusChanged(mutatedRepairStatuses);
             txn.cancel(Sets.union(nonAnticompacting, mutatedRepairStatuses));
             validatedForRepair.release(Sets.union(nonAnticompacting, 
mutatedRepairStatuses));
             assert txn.originals().equals(sstables);
@@ -1223,24 +1244,11 @@ public class CompactionManager implements 
CompactionManagerMBean
      */
     private void doAntiCompaction(ColumnFamilyStore cfs, 
Collection<Range<Token>> ranges, LifecycleTransaction repaired, long repairedAt)
     {
-        logger.info("Performing anticompaction on {} sstables", 
repaired.originals().size());
+        int numAnticompact = repaired.originals().size();
+        logger.info("Performing anticompaction on {} sstables", 
numAnticompact);
 
         //Group SSTables
-        Set<SSTableReader> sstables = repaired.originals();
-
-        // Repairs can take place on both unrepaired (incremental + full) and 
repaired (full) data.
-        // Although anti-compaction could work on repaired sstables as well 
and would result in having more accurate
-        // repairedAt values for these, we still avoid anti-compacting already 
repaired sstables, as we currently don't
-        // make use of any actual repairedAt value and splitting up sstables 
just for that is not worth it at this point.
-        Set<SSTableReader> unrepairedSSTables = 
ImmutableSet.copyOf(Iterables.filter(sstables, new Predicate<SSTableReader>()
-        {
-            public boolean apply(SSTableReader input)
-            {
-                return !input.isRepaired();
-            }
-        }));
-
-        Collection<Collection<SSTableReader>> groupedSSTables = 
cfs.getCompactionStrategy().groupSSTablesForAntiCompaction(unrepairedSSTables);
+        Collection<Collection<SSTableReader>> groupedSSTables = 
cfs.getCompactionStrategy().groupSSTablesForAntiCompaction(repaired.originals());
         // iterate over sstables to check if the repaired / unrepaired ranges 
intersect them.
         int antiCompactedSSTableCount = 0;
         for (Collection<SSTableReader> sstableGroup : groupedSSTables)
@@ -1253,7 +1261,7 @@ public class CompactionManager implements 
CompactionManagerMBean
         }
 
         String format = "Anticompaction completed successfully, anticompacted 
from {} to {} sstable(s).";
-        logger.info(format, repaired.originals().size(), 
antiCompactedSSTableCount);
+        logger.info(format, numAnticompact, antiCompactedSSTableCount);
     }
 
     private int antiCompactGroup(ColumnFamilyStore cfs, 
Collection<Range<Token>> ranges,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8912ce9/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java 
b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
index c451516..abd9a4a 100644
--- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
@@ -29,13 +29,20 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Comparator;
 import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.ExecutionException;
 
 import org.apache.cassandra.config.KSMetaData;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.locator.SimpleStrategy;
+
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.ListenableFuture;
 import org.junit.BeforeClass;
 import org.junit.After;
 import org.junit.Test;
@@ -55,8 +62,6 @@ import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
-import com.google.common.collect.Iterables;
-
 public class AntiCompactionTest
 {
     private static final String KEYSPACE1 = "AntiCompactionTest";
@@ -271,9 +276,116 @@ public class AntiCompactionTest
             CompactionManager.instance.performAnticompaction(store, ranges, 
refs, txn, 1);
         }
 
+        SSTableReader sstable = Iterables.get(store.getSSTables(), 0);
         assertThat(store.getSSTables().size(), is(1));
-        assertThat(Iterables.get(store.getSSTables(), 0).isRepaired(), 
is(true));
-        assertThat(Iterables.get(store.getSSTables(), 
0).selfRef().globalCount(), is(1));
+        assertThat(sstable.isRepaired(), is(true));
+        assertThat(sstable.selfRef().globalCount(), is(1));
+        assertThat(store.getTracker().getCompacting().size(), is(0));
+    }
+
+    @Test
+    public void shouldAntiCompactSSTable() throws IOException, 
InterruptedException, ExecutionException
+    {
+        ColumnFamilyStore store = prepareColumnFamilyStore();
+        Collection<SSTableReader> sstables = store.getUnrepairedSSTables();
+        assertEquals(store.getSSTables().size(), sstables.size());
+        // SSTable range is 0 - 10, repair just a subset of the ranges (0 - 4) 
of the SSTable. Should result in
+        // one repaired and one unrepaired SSTable
+        Range<Token> range = new Range<Token>(new BytesToken("/".getBytes()), 
new BytesToken("4".getBytes()));
+        List<Range<Token>> ranges = Arrays.asList(range);
+
+        try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, 
OperationType.ANTICOMPACTION);
+             Refs<SSTableReader> refs = Refs.ref(sstables))
+        {
+            CompactionManager.instance.performAnticompaction(store, ranges, 
refs, txn, 1);
+        }
+
+        Comparator<SSTableReader> generationReverseComparator = new 
Comparator<SSTableReader>()
+        {
+            public int compare(SSTableReader o1, SSTableReader o2)
+            {
+                return Integer.compare(o1.descriptor.generation, 
o2.descriptor.generation);
+            }
+        };
+
+        SortedSet<SSTableReader> sstablesSorted = new 
TreeSet<>(generationReverseComparator);
+        sstablesSorted.addAll(store.getSSTables());
+
+        SSTableReader sstable = sstablesSorted.first();
+        assertThat(store.getSSTables().size(), is(2));
+        assertThat(sstable.isRepaired(), is(true));
+        assertThat(sstable.selfRef().globalCount(), is(1));
+        assertThat(store.getTracker().getCompacting().size(), is(0));
+
+        // Test we don't anti-compact already repaired SSTables. repairedAt 
shouldn't change for the already repaired SSTable (first)
+        sstables = store.getSSTables();
+        // Range that's a subset of the repaired SSTable's ranges, so would 
cause an anti-compaction (if it wasn't repaired)
+        range = new Range<Token>(new BytesToken("/".getBytes()), new 
BytesToken("2".getBytes()));
+        ranges = Arrays.asList(range);
+        try (Refs<SSTableReader> refs = Refs.ref(sstables))
+        {
+            // use different repairedAt to ensure it doesn't change
+            ListenableFuture fut = 
CompactionManager.instance.submitAntiCompaction(store, ranges, refs, 200);
+            fut.get();
+        }
+
+        sstablesSorted.clear();
+        sstablesSorted.addAll(store.getSSTables());
+        assertThat(sstablesSorted.size(), is(2));
+        assertThat(sstablesSorted.first().isRepaired(), is(true));
+        assertThat(sstablesSorted.last().isRepaired(), is(false));
+        assertThat(sstablesSorted.first().getSSTableMetadata().repairedAt, 
is(1L));
+        assertThat(sstablesSorted.last().getSSTableMetadata().repairedAt, 
is(0L));
+        assertThat(sstablesSorted.first().selfRef().globalCount(), is(1));
+        assertThat(sstablesSorted.last().selfRef().globalCount(), is(1));
+        assertThat(store.getTracker().getCompacting().size(), is(0));
+
+        // Test repairing all the ranges of the repaired SSTable. Should 
mutate repairedAt without anticompacting,
+        // but leave the unrepaired SSTable as is.
+        range = new Range<Token>(new BytesToken("/".getBytes()), new 
BytesToken("4".getBytes()));
+        ranges = Arrays.asList(range);
+
+        try (Refs<SSTableReader> refs = Refs.ref(sstables))
+        {
+            // Same repaired at, but should be changed on the repaired SSTable 
now
+            ListenableFuture fut = 
CompactionManager.instance.submitAntiCompaction(store, ranges, refs, 200);
+            fut.get();
+        }
+
+        sstablesSorted.clear();
+        sstablesSorted.addAll(store.getSSTables());
+
+        assertThat(sstablesSorted.size(), is(2));
+        assertThat(sstablesSorted.first().isRepaired(), is(true));
+        assertThat(sstablesSorted.last().isRepaired(), is(false));
+        assertThat(sstablesSorted.first().getSSTableMetadata().repairedAt, 
is(200L));
+        assertThat(sstablesSorted.last().getSSTableMetadata().repairedAt, 
is(0L));
+        assertThat(sstablesSorted.first().selfRef().globalCount(), is(1));
+        assertThat(sstablesSorted.last().selfRef().globalCount(), is(1));
+        assertThat(store.getTracker().getCompacting().size(), is(0));
+
+        // Repair whole range. Should mutate repairedAt on repaired SSTable 
(again) and
+        // mark unrepaired SSTable as repaired
+        range = new Range<Token>(new BytesToken("/".getBytes()), new 
BytesToken("999".getBytes()));
+        ranges = Arrays.asList(range);
+
+        try (Refs<SSTableReader> refs = Refs.ref(sstables))
+        {
+            // Both SSTables should have repairedAt of 400
+            ListenableFuture fut = 
CompactionManager.instance.submitAntiCompaction(store, ranges, refs, 400);
+            fut.get();
+        }
+
+        sstablesSorted.clear();
+        sstablesSorted.addAll(store.getSSTables());
+
+        assertThat(sstablesSorted.size(), is(2));
+        assertThat(sstablesSorted.first().isRepaired(), is(true));
+        assertThat(sstablesSorted.last().isRepaired(), is(true));
+        assertThat(sstablesSorted.first().getSSTableMetadata().repairedAt, 
is(400L));
+        assertThat(sstablesSorted.last().getSSTableMetadata().repairedAt, 
is(400L));
+        assertThat(sstablesSorted.first().selfRef().globalCount(), is(1));
+        assertThat(sstablesSorted.last().selfRef().globalCount(), is(1));
         assertThat(store.getTracker().getCompacting().size(), is(0));
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to