make memory metering use an unbounded queue to avoid blocking the write path patch by pschuller and jbellis; reviewed by slebresne for CASSANDRA-4032
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fd3bfac6 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fd3bfac6 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fd3bfac6 Branch: refs/heads/cassandra-1.1.0 Commit: fd3bfac6cbc487e36ac1c39740c5897e350d0d16 Parents: d49113f Author: Jonathan Ellis <jbel...@apache.org> Authored: Tue Apr 10 10:59:35 2012 -0500 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Wed Apr 11 13:24:09 2012 -0500 ---------------------------------------------------------------------- src/java/org/apache/cassandra/db/Memtable.java | 86 ++++++++++-------- 1 files changed, 48 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/fd3bfac6/src/java/org/apache/cassandra/db/Memtable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java index 81dac7c..d9e9570 100644 --- a/src/java/org/apache/cassandra/db/Memtable.java +++ b/src/java/org/apache/cassandra/db/Memtable.java @@ -42,6 +42,7 @@ import org.apache.cassandra.io.sstable.SSTableWriter; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.SlabAllocator; import org.apache.cassandra.utils.WrappedRunnable; +import org.cliffc.high_scale_lib.NonBlockingHashSet; import org.github.jamm.MemoryMeter; public class Memtable @@ -53,14 +54,17 @@ public class Memtable // max liveratio seen w/ 1-byte columns on a 64-bit jvm was 19. If it gets higher than 64 something is probably broken. private static final double MAX_SANE_LIVE_RATIO = 64.0; - // we're careful to only allow one count to run at a time because counting is slow - // (can be minutes, for a large memtable and a busy server), so we could keep memtables - // alive after they're flushed and would otherwise be GC'd. + // we want to limit the amount of concurrently running and/or queued meterings, because counting is slow (can be + // minutes, for a large memtable and a busy server). so we could keep memtables + // alive after they're flushed and would otherwise be GC'd. the approach we take is to bound the number of + // outstanding/running meterings to a maximum of one per CFS using this set; the executor's queue is unbounded but + // will implicitly be bounded by the number of CFS:s. + private static final Set<ColumnFamilyStore> meteringInProgress = new NonBlockingHashSet<ColumnFamilyStore>(); private static final ExecutorService meterExecutor = new DebuggableThreadPoolExecutor(1, 1, Integer.MAX_VALUE, TimeUnit.MILLISECONDS, - new SynchronousQueue<Runnable>(), + new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("MemoryMeter")) { @Override @@ -152,7 +156,7 @@ public class Memtable resolve(key, columnFamily); } - public void updateLiveRatio() + public void updateLiveRatio() throws RuntimeException { if (!MemoryMeter.isInitialized()) { @@ -162,50 +166,56 @@ public class Memtable return; } + if (!meteringInProgress.add(cfs)) + { + logger.debug("Metering already pending or active for {}; skipping liveRatio update", cfs); + return; + } + Runnable runnable = new Runnable() { public void run() { - activelyMeasuring = Memtable.this; - - long start = System.currentTimeMillis(); - // ConcurrentSkipListMap has cycles, so measureDeep will have to track a reference to EACH object it visits. - // So to reduce the memory overhead of doing a measurement, we break it up to row-at-a-time. - long deepSize = meter.measure(columnFamilies); - int objects = 0; - for (Map.Entry<RowPosition, ColumnFamily> entry : columnFamilies.entrySet()) - { - deepSize += meter.measureDeep(entry.getKey()) + meter.measureDeep(entry.getValue()); - objects += entry.getValue().getColumnCount(); - } - double newRatio = (double) deepSize / currentThroughput.get(); - - if (newRatio < MIN_SANE_LIVE_RATIO) + try { - logger.warn("setting live ratio to minimum of 1.0 instead of {}", newRatio); - newRatio = MIN_SANE_LIVE_RATIO; + activelyMeasuring = Memtable.this; + + long start = System.currentTimeMillis(); + // ConcurrentSkipListMap has cycles, so measureDeep will have to track a reference to EACH object it visits. + // So to reduce the memory overhead of doing a measurement, we break it up to row-at-a-time. + long deepSize = meter.measure(columnFamilies); + int objects = 0; + for (Map.Entry<RowPosition, ColumnFamily> entry : columnFamilies.entrySet()) + { + deepSize += meter.measureDeep(entry.getKey()) + meter.measureDeep(entry.getValue()); + objects += entry.getValue().getColumnCount(); + } + double newRatio = (double) deepSize / currentThroughput.get(); + + if (newRatio < MIN_SANE_LIVE_RATIO) + { + logger.warn("setting live ratio to minimum of 1.0 instead of {}", newRatio); + newRatio = MIN_SANE_LIVE_RATIO; + } + if (newRatio > MAX_SANE_LIVE_RATIO) + { + logger.warn("setting live ratio to maximum of 64 instead of {}", newRatio); + newRatio = MAX_SANE_LIVE_RATIO; + } + cfs.liveRatio = Math.max(cfs.liveRatio, newRatio); + + logger.info("{} liveRatio is {} (just-counted was {}). calculation took {}ms for {} columns", + new Object[]{ cfs, cfs.liveRatio, newRatio, System.currentTimeMillis() - start, objects }); + activelyMeasuring = null; } - if (newRatio > MAX_SANE_LIVE_RATIO) + finally { - logger.warn("setting live ratio to maximum of 64 instead of {}", newRatio); - newRatio = MAX_SANE_LIVE_RATIO; + meteringInProgress.remove(cfs); } - cfs.liveRatio = Math.max(cfs.liveRatio, newRatio); - - logger.info("{} liveRatio is {} (just-counted was {}). calculation took {}ms for {} columns", - new Object[]{ cfs, cfs.liveRatio, newRatio, System.currentTimeMillis() - start, objects }); - activelyMeasuring = null; } }; - try - { - meterExecutor.submit(runnable); - } - catch (RejectedExecutionException e) - { - logger.debug("Meter thread is busy; skipping liveRatio update for {}", cfs); - } + meterExecutor.submit(runnable); } private void resolve(DecoratedKey key, ColumnFamily cf)