Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 0cd17695f -> 48847b5c0


Improve MeteredFlusher handling of MF-unaffected column families

patch by Aleksey Yeschenko; reviewed by Jonathan Ellis for
CASSANDRA-6867


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/58e524e7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/58e524e7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/58e524e7

Branch: refs/heads/cassandra-2.1
Commit: 58e524e77f218cc75d01acc8f41c8ee6eef86207
Parents: f8229c3
Author: Aleksey Yeschenko <alek...@apache.org>
Authored: Tue Mar 25 21:55:37 2014 +0300
Committer: Aleksey Yeschenko <alek...@apache.org>
Committed: Tue Mar 25 21:55:37 2014 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../org/apache/cassandra/db/MeteredFlusher.java | 164 ++++++++++---------
 .../compaction/AbstractCompactionStrategy.java  |  10 ++
 3 files changed, 96 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/58e524e7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 373544c..00b98fa 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -29,6 +29,8 @@
  * Static columns with IF NOT EXISTS don't always work as expected 
(CASSANDRA-6873)
  * Fix paging with SELECT DISTINCT (CASSANDRA-6857)
  * Fix UnsupportedOperationException on CAS timeout (CASSANDRA-6923)
+ * Improve MeteredFlusher handling of MF-unaffected column families
+   (CASSANDRA-6867)
 Merged from 1.2:
  * Add UNLOGGED, COUNTER options to BATCH documentation (CASSANDRA-6816)
  * add extra SSL cipher suites (CASSANDRA-6613)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/58e524e7/src/java/org/apache/cassandra/db/MeteredFlusher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/MeteredFlusher.java 
b/src/java/org/apache/cassandra/db/MeteredFlusher.java
index 5c71fc6..4f06bc6 100644
--- a/src/java/org/apache/cassandra/db/MeteredFlusher.java
+++ b/src/java/org/apache/cassandra/db/MeteredFlusher.java
@@ -22,7 +22,6 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 
-import com.google.common.collect.Iterables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,97 +33,102 @@ public class MeteredFlusher implements Runnable
 
     public void run()
     {
-        long totalMemtableBytesAllowed = 
DatabaseDescriptor.getTotalMemtableSpaceInMB() * 1048576L;
-
-        // first, find how much memory non-active memtables are using
-        ColumnFamilyStore measuredCfs = Memtable.activelyMeasuring;
-        long flushingBytes = measuredCfs == null ? 0 : 
measuredCfs.getMemtableThreadSafe().getLiveSize();
-        flushingBytes += countFlushingBytes();
-        if (flushingBytes > 0)
-            logger.debug("Currently flushing {} bytes of {} max", 
flushingBytes, totalMemtableBytesAllowed);
-
-        // next, flush CFs using more than 1 / (maximum number of memtables it 
could have in the pipeline)
-        // of the total size allotted.  Then, flush other CFs in order of size 
if necessary.
-        long liveBytes = 0;
-        try
-        {
-            long totalMemtableBytesUnused = totalMemtableBytesAllowed - 
flushingBytes;
-            for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
-            {
-                long size = cfs.getTotalMemtableLiveSize();
-                int maxInFlight = (int) Math.ceil((double) (1 // live memtable
-                                                            + 1 // potentially 
a flushed memtable being counted by jamm
-                                                            + 
DatabaseDescriptor.getFlushWriters()
-                                                            + 
DatabaseDescriptor.getFlushQueueSize())
-                                                  / (1 + 
cfs.indexManager.getIndexesBackedByCfs().size()));
-                if (cfs.getCompactionStrategy().isAffectedByMeteredFlusher() 
&& totalMemtableBytesUnused > 0 && size > totalMemtableBytesUnused / 
maxInFlight)
-                {
-                    logger.info("flushing high-traffic column family {} 
(estimated {} bytes)", cfs, size);
-                    cfs.forceFlush();
-                }
-                else
-                {
-                    liveBytes += size;
-                }
-            }
+        long allowedSize = calculateAllowedSize();
 
-            if (flushingBytes + liveBytes <= totalMemtableBytesAllowed)
-                return;
+        // find how much memory non-active memtables are using
+        long flushingSize = calculateFlushingSize();
+        if (flushingSize > 0)
+            logger.debug("Currently flushing {} bytes of {} max", 
flushingSize, allowedSize);
 
-            logger.info("estimated {} live and {} flushing bytes used by all 
memtables", liveBytes, flushingBytes);
+        List<ColumnFamilyStore> affectedCFs = affectedColumnFamilies();
+        long liveSize = 0;
 
-            // sort memtables by size
-            List<ColumnFamilyStore> sorted = new 
ArrayList<ColumnFamilyStore>();
-            Iterables.addAll(sorted, ColumnFamilyStore.all());
-            Collections.sort(sorted, new Comparator<ColumnFamilyStore>()
+        // flush CFs using more than 1 / (maximum number of memtables it could 
have in the pipeline)
+        // of the total size allotted. Then, flush other CFs in order of size 
if necessary.
+        for (ColumnFamilyStore cfs : affectedCFs)
+        {
+            int maxInFlight = (int) Math.ceil((double) (1 // live memtable
+                                                        + 1 // potentially a 
flushed memtable being counted by jamm
+                                                        + 
DatabaseDescriptor.getFlushWriters()
+                                                        + 
DatabaseDescriptor.getFlushQueueSize())
+                                              / (1 + 
cfs.indexManager.getIndexesBackedByCfs().size()));
+            long size = cfs.getTotalMemtableLiveSize();
+            if (allowedSize > flushingSize && size > (allowedSize - 
flushingSize) / maxInFlight)
             {
-                public int compare(ColumnFamilyStore o1, ColumnFamilyStore o2)
-                {
-                    long size1 = o1.getTotalMemtableLiveSize();
-                    long size2 = o2.getTotalMemtableLiveSize();
-                    if (size1 < size2)
-                        return -1;
-                    if (size1 > size2)
-                        return 1;
-                    return 0;
-                }
-            });
-
-            // flush largest first until we get below our threshold.
-            // although it looks like liveBytes + flushingBytes will stay a 
constant, it will not if flushes finish
-            // while we loop, which is especially likely to happen if the 
flush queue fills up (so further forceFlush calls block)
-            while (!sorted.isEmpty())
+                logger.info("flushing high-traffic column family {} (estimated 
{} bytes)", cfs, size);
+                cfs.forceFlush();
+            }
+            else
             {
-                flushingBytes = countFlushingBytes();
-                if (liveBytes + flushingBytes <= totalMemtableBytesAllowed)
-                    break;
-
-                ColumnFamilyStore cfs = sorted.remove(sorted.size() - 1);
-                if (cfs.getCompactionStrategy().isAffectedByMeteredFlusher())
-                {
-                    long size = cfs.getTotalMemtableLiveSize();
-                    if (size == 0)
-                        break;
-                    logger.info("flushing {} to free up {} bytes", cfs, size);
-                    liveBytes -= size;
-                    cfs.forceFlush();
-                }
+                liveSize += size;
             }
         }
-        finally
+
+        if (liveSize + flushingSize <= allowedSize)
+            return;
+        logger.info("estimated {} live and {} flushing bytes used by all 
memtables", liveSize, flushingSize);
+
+        Collections.sort(affectedCFs, new Comparator<ColumnFamilyStore>()
         {
-            logger.trace("memtable memory usage is {} bytes with {} live", 
liveBytes + flushingBytes, liveBytes);
+            public int compare(ColumnFamilyStore lhs, ColumnFamilyStore rhs)
+            {
+                return Long.compare(lhs.getTotalMemtableLiveSize(), 
rhs.getTotalMemtableLiveSize());
+            }
+        });
+
+        // flush largest first until we get below our threshold.
+        // although it looks like liveSize + flushingSize will stay a 
constant, it will not if flushes finish
+        // while we loop, which is especially likely to happen if the flush 
queue fills up (so further forceFlush calls block)
+        while (!affectedCFs.isEmpty())
+        {
+            flushingSize = calculateFlushingSize();
+            if (liveSize + flushingSize <= allowedSize)
+                break;
+
+            ColumnFamilyStore cfs = affectedCFs.remove(affectedCFs.size() - 1);
+            long size = cfs.getTotalMemtableLiveSize();
+            if (size > 0)
+            {
+                logger.info("flushing {} to free up {} bytes", cfs, size);
+                liveSize -= size;
+                cfs.forceFlush();
+            }
         }
+
+        logger.trace("memtable memory usage is {} bytes with {} live", 
liveSize + flushingSize, liveSize);
     }
 
-    private long countFlushingBytes()
+    private static List<ColumnFamilyStore> affectedColumnFamilies()
     {
-        long flushingBytes = 0;
+        List<ColumnFamilyStore> affected = new ArrayList<>();
+        // filter out column families that aren't affected by MeteredFlusher
         for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
-        {
-            for (Memtable memtable : cfs.getMemtablesPendingFlush())
-                flushingBytes += memtable.getLiveSize();
-        }
-        return flushingBytes;
+            if (cfs.getCompactionStrategy().isAffectedByMeteredFlusher())
+                affected.add(cfs);
+        return affected;
+    }
+
+    private static long calculateAllowedSize()
+    {
+        long allowed = DatabaseDescriptor.getTotalMemtableSpaceInMB() * 
1048576L;
+        // deduct the combined memory limit of the tables unaffected by the 
metered flusher (we don't flush them, we
+        // should not count their limits to the total limit either).
+        for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
+            if (!cfs.getCompactionStrategy().isAffectedByMeteredFlusher())
+                allowed -= 
cfs.getCompactionStrategy().getMemtableReservedSize();
+        return allowed;
+    }
+
+    private static long calculateFlushingSize()
+    {
+        ColumnFamilyStore measuredCFS = Memtable.activelyMeasuring;
+        long flushing = measuredCFS != null && 
measuredCFS.getCompactionStrategy().isAffectedByMeteredFlusher()
+                      ? measuredCFS.getMemtableThreadSafe().getLiveSize()
+                      : 0;
+        for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
+            if (cfs.getCompactionStrategy().isAffectedByMeteredFlusher())
+                for (Memtable memtable : cfs.getMemtablesPendingFlush())
+                    flushing += memtable.getLiveSize();
+        return flushing;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/58e524e7/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java 
b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 164cfda..5425683 100644
--- 
a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ 
b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -201,6 +201,16 @@ public abstract class AbstractCompactionStrategy
     }
 
     /**
+     * If not affected by MeteredFlusher (and handling flushing on its own), 
override to tell MF how much
+     * space to reserve for this CF, i.e., how much space to subtract from 
`memtable_total_space_in_mb` when deciding
+     * if other memtables should be flushed or not.
+     */
+    public long getMemtableReservedSize()
+    {
+        return 0;
+    }
+
+    /**
      * Handle a flushed memtable.
      *
      * @param memtable the flushed memtable

Reply via email to