Repository: cassandra Updated Branches: refs/heads/cassandra-2.0 f8229c3fb -> 58e524e77
Improve MeteredFlusher handling of MF-unaffected column families patch by Aleksey Yeschenko; reviewed by Jonathan Ellis for CASSANDRA-6867 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/58e524e7 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/58e524e7 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/58e524e7 Branch: refs/heads/cassandra-2.0 Commit: 58e524e77f218cc75d01acc8f41c8ee6eef86207 Parents: f8229c3 Author: Aleksey Yeschenko <alek...@apache.org> Authored: Tue Mar 25 21:55:37 2014 +0300 Committer: Aleksey Yeschenko <alek...@apache.org> Committed: Tue Mar 25 21:55:37 2014 +0300 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../org/apache/cassandra/db/MeteredFlusher.java | 164 ++++++++++--------- .../compaction/AbstractCompactionStrategy.java | 10 ++ 3 files changed, 96 insertions(+), 80 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/58e524e7/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 373544c..00b98fa 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -29,6 +29,8 @@ * Static columns with IF NOT EXISTS don't always work as expected (CASSANDRA-6873) * Fix paging with SELECT DISTINCT (CASSANDRA-6857) * Fix UnsupportedOperationException on CAS timeout (CASSANDRA-6923) + * Improve MeteredFlusher handling of MF-unaffected column families + (CASSANDRA-6867) Merged from 1.2: * Add UNLOGGED, COUNTER options to BATCH documentation (CASSANDRA-6816) * add extra SSL cipher suites (CASSANDRA-6613) http://git-wip-us.apache.org/repos/asf/cassandra/blob/58e524e7/src/java/org/apache/cassandra/db/MeteredFlusher.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/MeteredFlusher.java b/src/java/org/apache/cassandra/db/MeteredFlusher.java index 5c71fc6..4f06bc6 100644 --- a/src/java/org/apache/cassandra/db/MeteredFlusher.java +++ b/src/java/org/apache/cassandra/db/MeteredFlusher.java @@ -22,7 +22,6 @@ import java.util.Collections; import java.util.Comparator; import java.util.List; -import com.google.common.collect.Iterables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,97 +33,102 @@ public class MeteredFlusher implements Runnable public void run() { - long totalMemtableBytesAllowed = DatabaseDescriptor.getTotalMemtableSpaceInMB() * 1048576L; - - // first, find how much memory non-active memtables are using - ColumnFamilyStore measuredCfs = Memtable.activelyMeasuring; - long flushingBytes = measuredCfs == null ? 0 : measuredCfs.getMemtableThreadSafe().getLiveSize(); - flushingBytes += countFlushingBytes(); - if (flushingBytes > 0) - logger.debug("Currently flushing {} bytes of {} max", flushingBytes, totalMemtableBytesAllowed); - - // next, flush CFs using more than 1 / (maximum number of memtables it could have in the pipeline) - // of the total size allotted. Then, flush other CFs in order of size if necessary. - long liveBytes = 0; - try - { - long totalMemtableBytesUnused = totalMemtableBytesAllowed - flushingBytes; - for (ColumnFamilyStore cfs : ColumnFamilyStore.all()) - { - long size = cfs.getTotalMemtableLiveSize(); - int maxInFlight = (int) Math.ceil((double) (1 // live memtable - + 1 // potentially a flushed memtable being counted by jamm - + DatabaseDescriptor.getFlushWriters() - + DatabaseDescriptor.getFlushQueueSize()) - / (1 + cfs.indexManager.getIndexesBackedByCfs().size())); - if (cfs.getCompactionStrategy().isAffectedByMeteredFlusher() && totalMemtableBytesUnused > 0 && size > totalMemtableBytesUnused / maxInFlight) - { - logger.info("flushing high-traffic column family {} (estimated {} bytes)", cfs, size); - cfs.forceFlush(); - } - else - { - liveBytes += size; - } - } + long allowedSize = calculateAllowedSize(); - if (flushingBytes + liveBytes <= totalMemtableBytesAllowed) - return; + // find how much memory non-active memtables are using + long flushingSize = calculateFlushingSize(); + if (flushingSize > 0) + logger.debug("Currently flushing {} bytes of {} max", flushingSize, allowedSize); - logger.info("estimated {} live and {} flushing bytes used by all memtables", liveBytes, flushingBytes); + List<ColumnFamilyStore> affectedCFs = affectedColumnFamilies(); + long liveSize = 0; - // sort memtables by size - List<ColumnFamilyStore> sorted = new ArrayList<ColumnFamilyStore>(); - Iterables.addAll(sorted, ColumnFamilyStore.all()); - Collections.sort(sorted, new Comparator<ColumnFamilyStore>() + // flush CFs using more than 1 / (maximum number of memtables it could have in the pipeline) + // of the total size allotted. Then, flush other CFs in order of size if necessary. + for (ColumnFamilyStore cfs : affectedCFs) + { + int maxInFlight = (int) Math.ceil((double) (1 // live memtable + + 1 // potentially a flushed memtable being counted by jamm + + DatabaseDescriptor.getFlushWriters() + + DatabaseDescriptor.getFlushQueueSize()) + / (1 + cfs.indexManager.getIndexesBackedByCfs().size())); + long size = cfs.getTotalMemtableLiveSize(); + if (allowedSize > flushingSize && size > (allowedSize - flushingSize) / maxInFlight) { - public int compare(ColumnFamilyStore o1, ColumnFamilyStore o2) - { - long size1 = o1.getTotalMemtableLiveSize(); - long size2 = o2.getTotalMemtableLiveSize(); - if (size1 < size2) - return -1; - if (size1 > size2) - return 1; - return 0; - } - }); - - // flush largest first until we get below our threshold. - // although it looks like liveBytes + flushingBytes will stay a constant, it will not if flushes finish - // while we loop, which is especially likely to happen if the flush queue fills up (so further forceFlush calls block) - while (!sorted.isEmpty()) + logger.info("flushing high-traffic column family {} (estimated {} bytes)", cfs, size); + cfs.forceFlush(); + } + else { - flushingBytes = countFlushingBytes(); - if (liveBytes + flushingBytes <= totalMemtableBytesAllowed) - break; - - ColumnFamilyStore cfs = sorted.remove(sorted.size() - 1); - if (cfs.getCompactionStrategy().isAffectedByMeteredFlusher()) - { - long size = cfs.getTotalMemtableLiveSize(); - if (size == 0) - break; - logger.info("flushing {} to free up {} bytes", cfs, size); - liveBytes -= size; - cfs.forceFlush(); - } + liveSize += size; } } - finally + + if (liveSize + flushingSize <= allowedSize) + return; + logger.info("estimated {} live and {} flushing bytes used by all memtables", liveSize, flushingSize); + + Collections.sort(affectedCFs, new Comparator<ColumnFamilyStore>() { - logger.trace("memtable memory usage is {} bytes with {} live", liveBytes + flushingBytes, liveBytes); + public int compare(ColumnFamilyStore lhs, ColumnFamilyStore rhs) + { + return Long.compare(lhs.getTotalMemtableLiveSize(), rhs.getTotalMemtableLiveSize()); + } + }); + + // flush largest first until we get below our threshold. + // although it looks like liveSize + flushingSize will stay a constant, it will not if flushes finish + // while we loop, which is especially likely to happen if the flush queue fills up (so further forceFlush calls block) + while (!affectedCFs.isEmpty()) + { + flushingSize = calculateFlushingSize(); + if (liveSize + flushingSize <= allowedSize) + break; + + ColumnFamilyStore cfs = affectedCFs.remove(affectedCFs.size() - 1); + long size = cfs.getTotalMemtableLiveSize(); + if (size > 0) + { + logger.info("flushing {} to free up {} bytes", cfs, size); + liveSize -= size; + cfs.forceFlush(); + } } + + logger.trace("memtable memory usage is {} bytes with {} live", liveSize + flushingSize, liveSize); } - private long countFlushingBytes() + private static List<ColumnFamilyStore> affectedColumnFamilies() { - long flushingBytes = 0; + List<ColumnFamilyStore> affected = new ArrayList<>(); + // filter out column families that aren't affected by MeteredFlusher for (ColumnFamilyStore cfs : ColumnFamilyStore.all()) - { - for (Memtable memtable : cfs.getMemtablesPendingFlush()) - flushingBytes += memtable.getLiveSize(); - } - return flushingBytes; + if (cfs.getCompactionStrategy().isAffectedByMeteredFlusher()) + affected.add(cfs); + return affected; + } + + private static long calculateAllowedSize() + { + long allowed = DatabaseDescriptor.getTotalMemtableSpaceInMB() * 1048576L; + // deduct the combined memory limit of the tables unaffected by the metered flusher (we don't flush them, we + // should not count their limits to the total limit either). + for (ColumnFamilyStore cfs : ColumnFamilyStore.all()) + if (!cfs.getCompactionStrategy().isAffectedByMeteredFlusher()) + allowed -= cfs.getCompactionStrategy().getMemtableReservedSize(); + return allowed; + } + + private static long calculateFlushingSize() + { + ColumnFamilyStore measuredCFS = Memtable.activelyMeasuring; + long flushing = measuredCFS != null && measuredCFS.getCompactionStrategy().isAffectedByMeteredFlusher() + ? measuredCFS.getMemtableThreadSafe().getLiveSize() + : 0; + for (ColumnFamilyStore cfs : ColumnFamilyStore.all()) + if (cfs.getCompactionStrategy().isAffectedByMeteredFlusher()) + for (Memtable memtable : cfs.getMemtablesPendingFlush()) + flushing += memtable.getLiveSize(); + return flushing; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/58e524e7/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java index 164cfda..5425683 100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java @@ -201,6 +201,16 @@ public abstract class AbstractCompactionStrategy } /** + * If not affected by MeteredFlusher (and handling flushing on its own), override to tell MF how much + * space to reserve for this CF, i.e., how much space to subtract from `memtable_total_space_in_mb` when deciding + * if other memtables should be flushed or not. + */ + public long getMemtableReservedSize() + { + return 0; + } + + /** * Handle a flushed memtable. * * @param memtable the flushed memtable