Repository: cassandra Updated Branches: refs/heads/cassandra-3.0 79cead093 -> f88ec9357 refs/heads/cassandra-3.11 c4946960a -> 465d86904 refs/heads/trunk 68b81372c -> 2201e364b
Fully utilise specified compaction threads (jobs) Patch by Kurt Greaves; reviewed by marcuse for CASSANDRA-14210 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f88ec935 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f88ec935 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f88ec935 Branch: refs/heads/cassandra-3.0 Commit: f88ec9357de406daad0f795951f17e5f854ade10 Parents: 79cead0 Author: kurt <k...@instaclustr.com> Authored: Mon Feb 12 21:06:34 2018 +0000 Committer: Marcus Eriksson <marc...@apache.org> Committed: Mon Mar 5 09:09:52 2018 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../db/compaction/CompactionManager.java | 11 +++--- .../io/sstable/format/SSTableReader.java | 2 + .../org/apache/cassandra/utils/FBUtilities.java | 36 ++++++++++++++++++ .../apache/cassandra/utils/FBUtilitiesTest.java | 39 ++++++++++++++++++++ 5 files changed, 84 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/f88ec935/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 8cf665e..5599906 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.17 + * Fully utilise specified compaction threads (CASSANDRA-14210) * Pre-create deletion log records to finish compactions quicker (CASSANDRA-12763) Merged from 2.1: * CVE-2017-5929 Security vulnerability in Logback warning in NEWS.txt (CASSANDRA-14183) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f88ec935/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 1d54667..a8e6931 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -327,8 +327,8 @@ public class CompactionManager implements CompactionManagerMBean if (jobs > 0 && futures.size() == jobs) { - FBUtilities.waitOnFutures(futures); - futures.clear(); + Future<?> f = FBUtilities.waitOnFirstFuture(futures); + futures.remove(f); } } FBUtilities.waitOnFutures(futures); @@ -416,8 +416,9 @@ public class CompactionManager implements CompactionManagerMBean @Override public Iterable<SSTableReader> filterSSTables(LifecycleTransaction transaction) { - Iterable<SSTableReader> sstables = new ArrayList<>(transaction.originals()); - Iterator<SSTableReader> iter = sstables.iterator(); + List<SSTableReader> sortedSSTables = Lists.newArrayList(transaction.originals()); + Collections.sort(sortedSSTables, SSTableReader.sizeComparator.reversed()); + Iterator<SSTableReader> iter = sortedSSTables.iterator(); while (iter.hasNext()) { SSTableReader sstable = iter.next(); @@ -427,7 +428,7 @@ public class CompactionManager implements CompactionManagerMBean iter.remove(); } } - return sstables; + return sortedSSTables; } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/f88ec935/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java index 7e1bc1a..2c94e45 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java @@ -152,6 +152,8 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS public static final Ordering<SSTableReader> sstableOrdering = Ordering.from(sstableComparator); + public static final Comparator<SSTableReader> sizeComparator = (o1, o2) -> Longs.compare(o1.onDiskLength(), o2.onDiskLength()); + /** * maxDataAge is a timestamp in local server time (e.g. System.currentTimeMilli) which represents an upper bound * to the newest piece of data stored in the sstable. In other words, this sstable does not contain items created http://git-wip-us.apache.org/repos/asf/cassandra/blob/f88ec935/src/java/org/apache/cassandra/utils/FBUtilities.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java index f111919..268e54b 100644 --- a/src/java/org/apache/cassandra/utils/FBUtilities.java +++ b/src/java/org/apache/cassandra/utils/FBUtilities.java @@ -35,6 +35,7 @@ import javax.annotation.Nullable; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.base.Strings; +import com.google.common.util.concurrent.Uninterruptibles; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -397,6 +398,41 @@ public class FBUtilities result.get(ms, TimeUnit.MILLISECONDS); } + public static <T> Future<? extends T> waitOnFirstFuture(Iterable<? extends Future<? extends T>> futures) + { + return waitOnFirstFuture(futures, 100); + } + /** + * Only wait for the first future to finish from a list of futures. Will block until at least 1 future finishes. + * @param futures The futures to wait on + * @return future that completed. + */ + public static <T> Future<? extends T> waitOnFirstFuture(Iterable<? extends Future<? extends T>> futures, long delay) + { + while (true) + { + for (Future<? extends T> f : futures) + { + if (f.isDone()) + { + try + { + f.get(); + } + catch (InterruptedException e) + { + throw new AssertionError(e); + } + catch (ExecutionException e) + { + throw new RuntimeException(e); + } + return f; + } + } + Uninterruptibles.sleepUninterruptibly(delay, TimeUnit.MILLISECONDS); + } + } /** * Create a new instance of a partitioner defined in an SSTable Descriptor * @param desc Descriptor of an sstable http://git-wip-us.apache.org/repos/asf/cassandra/blob/f88ec935/test/unit/org/apache/cassandra/utils/FBUtilitiesTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/utils/FBUtilitiesTest.java b/test/unit/org/apache/cassandra/utils/FBUtilitiesTest.java index acd68eb..c5126a0 100644 --- a/test/unit/org/apache/cassandra/utils/FBUtilitiesTest.java +++ b/test/unit/org/apache/cassandra/utils/FBUtilitiesTest.java @@ -23,6 +23,15 @@ import java.net.InetAddress; import java.nio.ByteBuffer; import java.nio.charset.CharacterCodingException; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import com.google.common.primitives.Ints; import org.junit.Test; @@ -127,4 +136,34 @@ public class FBUtilitiesTest FBUtilities.reset(); } + + @Test + public void testWaitFirstFuture() throws ExecutionException, InterruptedException + { + + ExecutorService executor = Executors.newFixedThreadPool(4); + FBUtilities.reset(); + List<Future<?>> futures = new ArrayList<>(); + for (int i = 4; i >= 1; i--) + { + final int sleep = i * 10; + futures.add(executor.submit(() -> { TimeUnit.MILLISECONDS.sleep(sleep); return sleep; })); + } + Future<?> fut = FBUtilities.waitOnFirstFuture(futures, 3); + int futSleep = (Integer) fut.get(); + assertEquals(futSleep, 10); + futures.remove(fut); + fut = FBUtilities.waitOnFirstFuture(futures, 3); + futSleep = (Integer) fut.get(); + assertEquals(futSleep, 20); + futures.remove(fut); + fut = FBUtilities.waitOnFirstFuture(futures, 3); + futSleep = (Integer) fut.get(); + assertEquals(futSleep, 30); + futures.remove(fut); + fut = FBUtilities.waitOnFirstFuture(futures, 3); + futSleep = (Integer) fut.get(); + assertEquals(futSleep, 40); + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org