Repository: cassandra
Updated Branches:
  refs/heads/trunk 2aeed037e -> bdb52801c


Abort compactions quicker

Patch by marcuse; reviewed by Alex Petrov for CASSANDRA-14397


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bdb52801
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bdb52801
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bdb52801

Branch: refs/heads/trunk
Commit: bdb52801c7384ef07f7fc0b4f3b965bdf35d821d
Parents: 2aeed03
Author: Marcus Eriksson <marc...@apache.org>
Authored: Fri Apr 13 15:15:03 2018 +0200
Committer: Marcus Eriksson <marc...@apache.org>
Committed: Wed Jun 13 13:06:59 2018 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/compaction/CompactionIterator.java       | 38 +++++++++++-
 .../db/compaction/CompactionManager.java        |  3 -
 .../cassandra/db/compaction/CompactionTask.java |  3 -
 .../db/repair/CassandraValidationIterator.java  |  8 ---
 .../db/compaction/CompactionIteratorTest.java   | 61 ++++++++++++++++++++
 6 files changed, 99 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/bdb52801/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 629df0c..49738cd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Abort compactions quicker (CASSANDRA-14397)
  * Support light-weight transactions in cassandra-stress (CASSANDRA-13529)
  * Make AsyncOneResponse use the correct timeout (CASSANDRA-14509)
  * Add option to sanity check tombstones on reads/compactions (CASSANDRA-14467)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bdb52801/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
index dfbb6cc..c9d7e52 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
@@ -104,7 +104,8 @@ public class CompactionIterator extends 
CompactionInfo.Holder implements Unfilte
                                            ? 
EmptyIterators.unfilteredPartition(controller.cfs.metadata())
                                            : 
UnfilteredPartitionIterators.merge(scanners, nowInSec, listener());
         merged = Transformation.apply(merged, new GarbageSkipper(controller, 
nowInSec));
-        this.compacted = Transformation.apply(merged, new Purger(controller, 
nowInSec));
+        merged = Transformation.apply(merged, new Purger(controller, 
nowInSec));
+        compacted = Transformation.apply(merged, new 
AbortableUnfilteredPartitionTransformation(this));
     }
 
     public TableMetadata metadata()
@@ -542,4 +543,39 @@ public class CompactionIterator extends 
CompactionInfo.Holder implements Unfilte
             return new GarbageSkippingUnfilteredRowIterator(partition, 
UnfilteredRowIterators.merge(iters, nowInSec), nowInSec, cellLevelGC);
         }
     }
+
+    private static class AbortableUnfilteredPartitionTransformation extends 
Transformation<UnfilteredRowIterator>
+    {
+        private final AbortableUnfilteredRowTransformation abortableIter;
+
+        private AbortableUnfilteredPartitionTransformation(CompactionIterator 
iter)
+        {
+            this.abortableIter = new 
AbortableUnfilteredRowTransformation(iter);
+        }
+
+        @Override
+        protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator 
partition)
+        {
+            if (abortableIter.iter.isStopRequested())
+                throw new 
CompactionInterruptedException(abortableIter.iter.getCompactionInfo());
+            return Transformation.apply(partition, abortableIter);
+        }
+    }
+
+    private static class AbortableUnfilteredRowTransformation extends 
Transformation
+    {
+        private final CompactionIterator iter;
+
+        private AbortableUnfilteredRowTransformation(CompactionIterator iter)
+        {
+            this.iter = iter;
+        }
+
+        public Row applyToRow(Row row)
+        {
+            if (iter.isStopRequested())
+                throw new 
CompactionInterruptedException(iter.getCompactionInfo());
+            return row;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bdb52801/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 5c61982..a872fea 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -1155,9 +1155,6 @@ public class CompactionManager implements 
CompactionManagerMBean
 
             while (ci.hasNext())
             {
-                if (ci.isStopRequested())
-                    throw new 
CompactionInterruptedException(ci.getCompactionInfo());
-
                 try (UnfilteredRowIterator partition = ci.next();
                      UnfilteredRowIterator notCleaned = 
cleanupStrategy.cleanup(partition))
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bdb52801/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 5697df2..662384c 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -197,9 +197,6 @@ public class CompactionTask extends AbstractCompactionTask
                     estimatedKeys = writer.estimatedKeys();
                     while (ci.hasNext())
                     {
-                        if (ci.isStopRequested())
-                            throw new 
CompactionInterruptedException(ci.getCompactionInfo());
-
                         if (writer.append(ci.next()))
                             totalKeysWritten++;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bdb52801/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java 
b/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java
index 6fa0be2..caf1b8e 100644
--- a/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java
+++ b/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java
@@ -281,23 +281,15 @@ public class CassandraValidationIterator extends 
ValidationPartitionIterator
         return cfs.metadata.get();
     }
 
-    private void throwIfStopRequested()
-    {
-        if (ci.isStopRequested())
-            throw new CompactionInterruptedException(ci.getCompactionInfo());
-    }
-
     @Override
     public boolean hasNext()
     {
-        throwIfStopRequested();
         return ci.hasNext();
     }
 
     @Override
     public UnfilteredRowIterator next()
     {
-        throwIfStopRequested();
         return ci.next();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bdb52801/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java 
b/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java
index 99df52f..d5ea56c 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java
@@ -310,6 +310,67 @@ public class CompactionIteratorTest
         }
     }
 
+    @Test
+    public void transformTest()
+    {
+        UnfilteredRowsGenerator generator = new 
UnfilteredRowsGenerator(metadata.comparator, false);
+        List<List<Unfiltered>> inputLists = parse(new String[] {"10[100] 
11[100] 12[100]"}, generator);
+        List<List<Unfiltered>> tombstoneLists = parse(new String[] {}, 
generator);
+        List<Iterable<UnfilteredRowIterator>> content = 
ImmutableList.copyOf(Iterables.transform(inputLists, list -> 
ImmutableList.of(listToIterator(list, kk))));
+        Map<DecoratedKey, Iterable<UnfilteredRowIterator>> transformedSources 
= new TreeMap<>();
+        transformedSources.put(kk, Iterables.transform(tombstoneLists, list -> 
listToIterator(list, kk)));
+        try (CompactionController controller = new 
Controller(Keyspace.openAndGetStore(metadata), transformedSources, GC_BEFORE);
+             CompactionIterator iter = new 
CompactionIterator(OperationType.COMPACTION,
+                                                              
Lists.transform(content, x -> new Scanner(x)),
+                                                              controller, NOW, 
null))
+        {
+            assertTrue(iter.hasNext());
+            UnfilteredRowIterator rows = iter.next();
+            assertTrue(rows.hasNext());
+            assertNotNull(rows.next());
+
+            iter.stop();
+            try
+            {
+                // Will call Transformation#applyToRow
+                rows.hasNext();
+                fail("Should have thrown CompactionInterruptedException");
+            }
+            catch (CompactionInterruptedException e)
+            {
+                // ignore
+            }
+        }
+    }
+
+    @Test
+    public void transformPartitionTest()
+    {
+        UnfilteredRowsGenerator generator = new 
UnfilteredRowsGenerator(metadata.comparator, false);
+        List<List<Unfiltered>> inputLists = parse(new String[] {"10[100] 
11[100] 12[100]"}, generator);
+        List<List<Unfiltered>> tombstoneLists = parse(new String[] {}, 
generator);
+        List<Iterable<UnfilteredRowIterator>> content = 
ImmutableList.copyOf(Iterables.transform(inputLists, list -> 
ImmutableList.of(listToIterator(list, kk))));
+        Map<DecoratedKey, Iterable<UnfilteredRowIterator>> transformedSources 
= new TreeMap<>();
+        transformedSources.put(kk, Iterables.transform(tombstoneLists, list -> 
listToIterator(list, kk)));
+        try (CompactionController controller = new 
Controller(Keyspace.openAndGetStore(metadata), transformedSources, GC_BEFORE);
+             CompactionIterator iter = new 
CompactionIterator(OperationType.COMPACTION,
+                                                              
Lists.transform(content, x -> new Scanner(x)),
+                                                              controller, NOW, 
null))
+        {
+            iter.stop();
+            try
+            {
+                // Will call Transformation#applyToPartition
+                iter.hasNext();
+                fail("Should have thrown CompactionInterruptedException");
+            }
+            catch (CompactionInterruptedException e)
+            {
+                // ignore
+            }
+        }
+    }
+
     class Controller extends CompactionController
     {
         private final Map<DecoratedKey, Iterable<UnfilteredRowIterator>> 
tombstoneSources;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to