Repository: cassandra Updated Branches: refs/heads/cassandra-2.1 6fe6d65d5 -> d9f5702d8
Move MeteredFlusher to its own thread patch by Aleksey Yeschenko; reviewed by Sam Tunnicliffe for CASSANDRA-8485 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5f54285e Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5f54285e Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5f54285e Branch: refs/heads/cassandra-2.1 Commit: 5f54285e9e39833b6bc01317fd74b8bd9b408842 Parents: 2acbab6 Author: Aleksey Yeschenko <alek...@apache.org> Authored: Fri Dec 19 21:40:25 2014 +0300 Committer: Aleksey Yeschenko <alek...@apache.org> Committed: Fri Dec 19 21:40:25 2014 +0300 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/db/MeteredFlusher.java | 31 +++++++++++++++----- .../cassandra/service/CassandraDaemon.java | 2 +- 3 files changed, 26 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f54285e/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index bd128f5..57ab5b4 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.0.12: + * Move MeteredFlusher to its own thread (CASSANDRA-8485) * Fix non-distinct results in DISTNCT queries on static columns when paging is enabled (CASSANDRA-8087) * Move all hints related tasks to hints internal executor (CASSANDRA-8285) http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f54285e/src/java/org/apache/cassandra/db/MeteredFlusher.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/MeteredFlusher.java b/src/java/org/apache/cassandra/db/MeteredFlusher.java index 4f06bc6..30dbb23 100644 --- a/src/java/org/apache/cassandra/db/MeteredFlusher.java +++ b/src/java/org/apache/cassandra/db/MeteredFlusher.java @@ -21,16 +21,33 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.List; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor; import org.apache.cassandra.config.DatabaseDescriptor; public class MeteredFlusher implements Runnable { private static final Logger logger = LoggerFactory.getLogger(MeteredFlusher.class); + public static final MeteredFlusher instance = new MeteredFlusher(); + + private final ScheduledExecutorService executor; + + private MeteredFlusher() + { + executor = new DebuggableScheduledThreadPoolExecutor("MeteredFlusher"); + } + + public void start() + { + executor.scheduleWithFixedDelay(this, 1, 1, TimeUnit.SECONDS); + } + public void run() { long allowedSize = calculateAllowedSize(); @@ -55,7 +72,7 @@ public class MeteredFlusher implements Runnable long size = cfs.getTotalMemtableLiveSize(); if (allowedSize > flushingSize && size > (allowedSize - flushingSize) / maxInFlight) { - logger.info("flushing high-traffic column family {} (estimated {} bytes)", cfs, size); + logger.info("Flushing high-traffic column family {} (estimated {} bytes)", cfs, size); cfs.forceFlush(); } else @@ -66,7 +83,7 @@ public class MeteredFlusher implements Runnable if (liveSize + flushingSize <= allowedSize) return; - logger.info("estimated {} live and {} flushing bytes used by all memtables", liveSize, flushingSize); + logger.info("Estimated {} live and {} flushing bytes used by all memtables", liveSize, flushingSize); Collections.sort(affectedCFs, new Comparator<ColumnFamilyStore>() { @@ -89,16 +106,16 @@ public class MeteredFlusher implements Runnable long size = cfs.getTotalMemtableLiveSize(); if (size > 0) { - logger.info("flushing {} to free up {} bytes", cfs, size); + logger.info("Flushing {} to free up {} bytes", cfs, size); liveSize -= size; cfs.forceFlush(); } } - logger.trace("memtable memory usage is {} bytes with {} live", liveSize + flushingSize, liveSize); + logger.trace("Memtable memory usage is {} bytes with {} live", liveSize + flushingSize, liveSize); } - private static List<ColumnFamilyStore> affectedColumnFamilies() + private List<ColumnFamilyStore> affectedColumnFamilies() { List<ColumnFamilyStore> affected = new ArrayList<>(); // filter out column families that aren't affected by MeteredFlusher @@ -108,7 +125,7 @@ public class MeteredFlusher implements Runnable return affected; } - private static long calculateAllowedSize() + private long calculateAllowedSize() { long allowed = DatabaseDescriptor.getTotalMemtableSpaceInMB() * 1048576L; // deduct the combined memory limit of the tables unaffected by the metered flusher (we don't flush them, we @@ -119,7 +136,7 @@ public class MeteredFlusher implements Runnable return allowed; } - private static long calculateFlushingSize() + private long calculateFlushingSize() { ColumnFamilyStore measuredCFS = Memtable.activelyMeasuring; long flushing = measuredCFS != null && measuredCFS.getCompactionStrategy().isAffectedByMeteredFlusher() http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f54285e/src/java/org/apache/cassandra/service/CassandraDaemon.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index 89d2bb0..cad1658 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -328,7 +328,7 @@ public class CassandraDaemon // MeteredFlusher can block if flush queue fills up, so don't put on scheduledTasks // Start it before commit log, so memtables can flush during commit log replay - StorageService.optionalTasks.scheduleWithFixedDelay(new MeteredFlusher(), 1000, 1000, TimeUnit.MILLISECONDS); + MeteredFlusher.instance.start(); // replay the log if necessary try