Repository: cassandra Updated Branches: refs/heads/cassandra-2.1 c5f03a988 -> b757db148 refs/heads/cassandra-2.2 92e2e4e46 -> 99f7ce9bf refs/heads/trunk 03f72acd5 -> dea6ab1b7
Ensure memtable book keeping is not corrupted in the event we shrink usage patch by benedict; reviewed by tjake for CASSANDRA-9681 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b757db14 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b757db14 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b757db14 Branch: refs/heads/cassandra-2.1 Commit: b757db1484473b264bf25ca5541f080d54a579a2 Parents: c5f03a9 Author: Benedict Elliott Smith <bened...@apache.org> Authored: Thu Jul 2 10:27:07 2015 +0100 Committer: Benedict Elliott Smith <bened...@apache.org> Committed: Thu Jul 2 10:27:07 2015 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/AtomicBTreeColumns.java | 2 +- .../apache/cassandra/db/ColumnFamilyStore.java | 22 +++++- src/java/org/apache/cassandra/db/Memtable.java | 15 ++-- .../org/apache/cassandra/utils/FBUtilities.java | 10 +++ .../apache/cassandra/utils/memory/HeapPool.java | 4 +- .../utils/memory/MemtableAllocator.java | 39 +++++++---- .../cassandra/utils/memory/MemtablePool.java | 73 ++++++++++++-------- .../utils/memory/NativeAllocatorTest.java | 18 ++++- 9 files changed, 132 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/b757db14/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 762b88b..25f7c1d 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.8 + * Ensure memtable book keeping is not corrupted in the event we shrink usage (CASSANDRA-9681) * Update internal python driver for cqlsh (CASSANDRA-9064) * Fix IndexOutOfBoundsException when inserting tuple with too many elements using the string literal notation (CASSANDRA-9559) http://git-wip-us.apache.org/repos/asf/cassandra/blob/b757db14/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java b/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java index 47f0b85..d9eb29c 100644 --- a/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java +++ b/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java @@ -505,7 +505,7 @@ public class AtomicBTreeColumns extends ColumnFamily protected void finish() { - allocator.onHeap().allocate(heapSize, writeOp); + allocator.onHeap().adjust(heapSize, writeOp); reclaimer.commit(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b757db14/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index fa527c7..8e67cdc 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -35,6 +35,7 @@ import com.google.common.collect.*; import com.google.common.util.concurrent.*; import org.apache.cassandra.io.FSWriteError; +import org.apache.cassandra.utils.memory.MemtablePool; import org.json.simple.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1157,6 +1158,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean { float largestRatio = 0f; Memtable largest = null; + float liveOnHeap = 0, liveOffHeap = 0; for (ColumnFamilyStore cfs : ColumnFamilyStore.all()) { // we take a reference to the current main memtable for the CF prior to snapping its ownership ratios @@ -1181,19 +1183,37 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean } float ratio = Math.max(onHeap, offHeap); - if (ratio > largestRatio) { largest = current; largestRatio = ratio; } + + liveOnHeap += onHeap; + liveOffHeap += offHeap; } if (largest != null) + { + float usedOnHeap = Memtable.MEMORY_POOL.onHeap.usedRatio(); + float usedOffHeap = Memtable.MEMORY_POOL.offHeap.usedRatio(); + float flushingOnHeap = Memtable.MEMORY_POOL.onHeap.reclaimingRatio(); + float flushingOffHeap = Memtable.MEMORY_POOL.offHeap.reclaimingRatio(); + float thisOnHeap = largest.getAllocator().onHeap().ownershipRatio(); + float thisOffHeap = largest.getAllocator().onHeap().ownershipRatio(); + logger.info("Flushing largest {} to free up room. Used total: {}, live: {}, flushing: {}, this: {}", + largest.cfs, ratio(usedOnHeap, usedOffHeap), ratio(liveOnHeap, liveOffHeap), + ratio(flushingOnHeap, flushingOffHeap), ratio(thisOnHeap, thisOffHeap)); largest.cfs.switchMemtableIfCurrent(largest); + } } } + private static String ratio(float onHeap, float offHeap) + { + return String.format("%.0f/%.0f", onHeap, offHeap); + } + public void maybeUpdateRowCache(DecoratedKey key) { if (!isRowCacheEnabled()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/b757db14/src/java/org/apache/cassandra/db/Memtable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java index a50a614..9f6cf9b 100644 --- a/src/java/org/apache/cassandra/db/Memtable.java +++ b/src/java/org/apache/cassandra/db/Memtable.java @@ -239,7 +239,8 @@ public class Memtable public String toString() { return String.format("Memtable-%s@%s(%s serialized bytes, %s ops, %.0f%%/%.0f%% of on/off-heap limit)", - cfs.name, hashCode(), liveDataSize, currentOperations, 100 * allocator.onHeap().ownershipRatio(), 100 * allocator.offHeap().ownershipRatio()); + cfs.name, hashCode(), FBUtilities.prettyPrintMemory(liveDataSize.get()), currentOperations, + 100 * allocator.onHeap().ownershipRatio(), 100 * allocator.offHeap().ownershipRatio()); } /** @@ -378,19 +379,21 @@ public class Memtable if (writer.getFilePointer() > 0) { - writer.isolateReferences(); + logger.info(String.format("Completed flushing %s (%s) for commitlog position %s", + writer.getFilename(), + FBUtilities.prettyPrintMemory(writer.getOnDiskFilePointer()), + context)); + writer.isolateReferences(); // temp sstables should contain non-repaired data. ssTable = writer.closeAndOpenReader(); - logger.info(String.format("Completed flushing %s (%d bytes) for commitlog position %s", - ssTable.getFilename(), new File(ssTable.getFilename()).length(), context)); } else { + logger.info("Completed flushing %s; nothing needed to be retained. Commitlog position was {}", + writer.getFilename(), context); writer.abort(); ssTable = null; - logger.info("Completed flushing; nothing needed to be retained. Commitlog position was {}", - context); } if (heavilyContendedRowCount > 0) http://git-wip-us.apache.org/repos/asf/cassandra/blob/b757db14/src/java/org/apache/cassandra/utils/FBUtilities.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java index 4c81b2a..68eb864 100644 --- a/src/java/org/apache/cassandra/utils/FBUtilities.java +++ b/src/java/org/apache/cassandra/utils/FBUtilities.java @@ -28,6 +28,7 @@ import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; +import java.text.NumberFormat; import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -570,6 +571,15 @@ public class FBUtilities } } + public static String prettyPrintMemory(long size) + { + if (size >= 1 << 30) + return String.format("%.3fGiB", size / (double) (1 << 30)); + if (size >= 1 << 20) + return String.format("%.3fMiB", size / (double) (1 << 20)); + return String.format("%.3fKiB", size / (double) (1 << 10)); + } + /** * Starts and waits for the given @param pb to finish. * @throws java.io.IOException on non-zero exit code http://git-wip-us.apache.org/repos/asf/cassandra/blob/b757db14/src/java/org/apache/cassandra/utils/memory/HeapPool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/memory/HeapPool.java b/src/java/org/apache/cassandra/utils/memory/HeapPool.java index a04947c..2a19d9c 100644 --- a/src/java/org/apache/cassandra/utils/memory/HeapPool.java +++ b/src/java/org/apache/cassandra/utils/memory/HeapPool.java @@ -75,13 +75,13 @@ public class HeapPool extends MemtablePool public Reclaimer reclaimImmediately(Cell cell) { - onHeap().release(cell.name().dataSize() + cell.value().remaining()); + onHeap().released(cell.name().dataSize() + cell.value().remaining()); return this; } public Reclaimer reclaimImmediately(DecoratedKey key) { - onHeap().release(key.getKey().remaining()); + onHeap().released(key.getKey().remaining()); return this; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b757db14/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java b/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java index e814b4d..f5e743c 100644 --- a/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java +++ b/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java @@ -160,13 +160,24 @@ public abstract class MemtableAllocator // currently no corroboration/enforcement of this is performed. void releaseAll() { - parent.adjustAcquired(-ownsUpdater.getAndSet(this, 0), false); - parent.adjustReclaiming(-reclaimingUpdater.getAndSet(this, 0)); + parent.released(ownsUpdater.getAndSet(this, 0)); + parent.reclaimed(reclaimingUpdater.getAndSet(this, 0)); + } + + // like allocate, but permits allocations to be negative + public void adjust(long size, OpOrder.Group opGroup) + { + if (size <= 0) + released(-size); + else + allocate(size, opGroup); } // allocate memory in the tracker, and mark ourselves as owning it public void allocate(long size, OpOrder.Group opGroup) { + assert size >= 0; + while (true) { if (parent.tryAllocate(size)) @@ -190,23 +201,23 @@ public abstract class MemtableAllocator } } - // retroactively mark an amount allocated amd acquired in the tracker, and owned by us - void allocated(long size) + // retroactively mark an amount allocated and acquired in the tracker, and owned by us + private void allocated(long size) { - parent.adjustAcquired(size, true); + parent.allocated(size); ownsUpdater.addAndGet(this, size); } // retroactively mark an amount acquired in the tracker, and owned by us - void acquired(long size) + private void acquired(long size) { - parent.adjustAcquired(size, false); + parent.acquired(size); ownsUpdater.addAndGet(this, size); } - void release(long size) + void released(long size) { - parent.adjustAcquired(-size, false); + parent.released(size); ownsUpdater.addAndGet(this, -size); } @@ -217,11 +228,11 @@ public abstract class MemtableAllocator { long cur = owns; long prev = reclaiming; - if (reclaimingUpdater.compareAndSet(this, prev, cur)) - { - parent.adjustReclaiming(cur - prev); - return; - } + if (!reclaimingUpdater.compareAndSet(this, prev, cur)) + continue; + + parent.reclaiming(cur - prev); + return; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b757db14/src/java/org/apache/cassandra/utils/memory/MemtablePool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/memory/MemtablePool.java b/src/java/org/apache/cassandra/utils/memory/MemtablePool.java index 1d219bb..bb85884 100644 --- a/src/java/org/apache/cassandra/utils/memory/MemtablePool.java +++ b/src/java/org/apache/cassandra/utils/memory/MemtablePool.java @@ -130,10 +130,8 @@ public abstract class MemtablePool * apply the size adjustment to allocated, bypassing any limits or constraints. If this reduces the * allocated total, we will signal waiters */ - void adjustAllocated(long size) + private void adjustAllocated(long size) { - if (size == 0) - return; while (true) { long cur = allocated; @@ -142,38 +140,43 @@ public abstract class MemtablePool } } - // 'acquires' an amount of memory, and maybe also marks it allocated. This method is meant to be overridden - // by implementations with a separate concept of acquired/allocated. As this method stands, an acquire - // without an allocate is a no-op (acquisition is achieved through allocation), however a release (where size < 0) - // is always processed and accounted for in allocated. - void adjustAcquired(long size, boolean alsoAllocated) + void allocated(long size) { - if (size > 0 || alsoAllocated) - { - if (alsoAllocated) - adjustAllocated(size); - maybeClean(); - } - else if (size < 0) - { - adjustAllocated(size); - hasRoom.signalAll(); - } + assert size >= 0; + if (size == 0) + return; + + adjustAllocated(size); + maybeClean(); + } + + void acquired(long size) + { + maybeClean(); + } + + void released(long size) + { + assert size >= 0; + adjustAllocated(-size); + hasRoom.signalAll(); } - // space reclaimed should be released prior to calling this, to avoid triggering unnecessary cleans - void adjustReclaiming(long reclaiming) + void reclaiming(long size) { - if (reclaiming == 0) + if (size == 0) return; - reclaimingUpdater.addAndGet(this, reclaiming); - if (reclaiming < 0 && updateNextClean() && cleaner != null) - cleaner.trigger(); + reclaimingUpdater.addAndGet(this, size); } - public long allocated() + void reclaimed(long size) { - return allocated; + if (size == 0) + return; + + reclaimingUpdater.addAndGet(this, -size); + if (updateNextClean() && cleaner != null) + cleaner.trigger(); } public long used() @@ -181,6 +184,22 @@ public abstract class MemtablePool return allocated; } + public float reclaimingRatio() + { + float r = reclaiming / (float) limit; + if (Float.isNaN(r)) + return 0; + return r; + } + + public float usedRatio() + { + float r = allocated / (float) limit; + if (Float.isNaN(r)) + return 0; + return r; + } + public MemtableAllocator.SubAllocator newAllocator() { return new MemtableAllocator.SubAllocator(this); http://git-wip-us.apache.org/repos/asf/cassandra/blob/b757db14/test/unit/org/apache/cassandra/utils/memory/NativeAllocatorTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/utils/memory/NativeAllocatorTest.java b/test/unit/org/apache/cassandra/utils/memory/NativeAllocatorTest.java index 83d6c0c..7f87fcd 100644 --- a/test/unit/org/apache/cassandra/utils/memory/NativeAllocatorTest.java +++ b/test/unit/org/apache/cassandra/utils/memory/NativeAllocatorTest.java @@ -56,7 +56,7 @@ public class NativeAllocatorTest } if (isClean.getCount() > 0) { - allocatorRef.get().offHeap().release(80); + allocatorRef.get().offHeap().released(80); isClean.countDown(); } } @@ -79,6 +79,22 @@ public class NativeAllocatorTest // allocate normal, check accounted and not cleaned allocator.allocate(10, group); Assert.assertEquals(10, allocator.offHeap().owns()); + // confirm adjustment works + allocator.offHeap().adjust(-10, group); + Assert.assertEquals(0, allocator.offHeap().owns()); + allocator.offHeap().adjust(10, group); + Assert.assertEquals(10, allocator.offHeap().owns()); + // confirm we cannot allocate negative + boolean success = false; + try + { + allocator.offHeap().allocate(-10, group); + } + catch (AssertionError e) + { + success = true; + } + Assert.assertTrue(success); Uninterruptibles.sleepUninterruptibly(10L, TimeUnit.MILLISECONDS); Assert.assertEquals(1, isClean.getCount());