Repository: cassandra Updated Branches: refs/heads/cassandra-2.1 11e8dc1c1 -> dee15a85c refs/heads/trunk 9333f86cf -> 27fdf4211
Fix spin loop in AtomicSortedColumns patch by graham sanderson and benedict; reviewed by yukim for CASSANDRA-7546 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dee15a85 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dee15a85 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dee15a85 Branch: refs/heads/cassandra-2.1 Commit: dee15a85c8640e58e162798d46f026c47fdd432c Parents: 11e8dc1 Author: graham sanderson <gra...@vast.com> Authored: Mon Oct 13 11:52:03 2014 -0500 Committer: Yuki Morishita <yu...@apache.org> Committed: Mon Oct 13 11:57:54 2014 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/AtomicBTreeColumns.java | 139 ++++++++++++++++--- src/java/org/apache/cassandra/db/Memtable.java | 10 +- .../cassandra/utils/concurrent/Locks.java | 37 +++++ 4 files changed, 166 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/dee15a85/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 891848e..0b2dd0c 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -125,6 +125,7 @@ Merged from 2.0: * Fix wrong progress when streaming uncompressed (CASSANDRA-7878) * Fix possible infinite loop in creating repair range (CASSANDRA-7983) * Fix unit in nodetool for streaming throughput (CASSANDRA-7375) + * Fix spin loop in AtomicSortedColumns (CASSANDRA-7546) Merged from 1.2: * Don't index tombstones (CASSANDRA-7828) * Improve PasswordAuthenticator default super user setup (CASSANDRA-7788) http://git-wip-us.apache.org/repos/asf/cassandra/blob/dee15a85/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java b/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java index 559e759..7b5e8a8 100644 --- a/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java +++ b/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java @@ -23,6 +23,7 @@ import java.util.Collection; import java.util.Comparator; import java.util.Iterator; import java.util.List; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import com.google.common.base.Function; @@ -37,10 +38,10 @@ import org.apache.cassandra.db.filter.ColumnSlice; import org.apache.cassandra.utils.ObjectSizes; import org.apache.cassandra.utils.btree.BTree; import org.apache.cassandra.utils.btree.UpdateFunction; +import org.apache.cassandra.utils.concurrent.Locks; import org.apache.cassandra.utils.concurrent.OpOrder; import org.apache.cassandra.utils.memory.HeapAllocator; import org.apache.cassandra.utils.memory.MemtableAllocator; -import org.apache.cassandra.utils.memory.NativeAllocator; import org.apache.cassandra.utils.memory.NativePool; import static org.apache.cassandra.db.index.SecondaryIndexManager.Updater; @@ -59,6 +60,31 @@ public class AtomicBTreeColumns extends ColumnFamily static final long EMPTY_SIZE = ObjectSizes.measure(new AtomicBTreeColumns(CFMetaData.IndexCf, null)) + ObjectSizes.measure(new Holder(null, null)); + // Reserved values for wasteTracker field. These values must not be consecutive (see avoidReservedValues) + private static final int TRACKER_NEVER_WASTED = 0; + private static final int TRACKER_PESSIMISTIC_LOCKING = Integer.MAX_VALUE; + + // The granularity with which we track wasted allocation/work; we round up + private static final int ALLOCATION_GRANULARITY_BYTES = 1024; + // The number of bytes we have to waste in excess of our acceptable realtime rate of waste (defined below) + private static final long EXCESS_WASTE_BYTES = 10 * 1024 * 1024L; + private static final int EXCESS_WASTE_OFFSET = (int) (EXCESS_WASTE_BYTES / ALLOCATION_GRANULARITY_BYTES); + // Note this is a shift, because dividing a long time and then picking the low 32 bits doesn't give correct rollover behavior + private static final int CLOCK_SHIFT = 17; + // CLOCK_GRANULARITY = 1^9ns >> CLOCK_SHIFT == 132us == (1/7.63)ms + + /** + * (clock + allocation) granularity are combined to give us an acceptable (waste) allocation rate that is defined by + * the passage of real time of ALLOCATION_GRANULARITY_BYTES/CLOCK_GRANULARITY, or in this case 7.63Kb/ms, or 7.45Mb/s + * + * in wasteTracker we maintain within EXCESS_WASTE_OFFSET before the current time; whenever we waste bytes + * we increment the current value if it is within this window, and set it to the min of the window plus our waste + * otherwise. + */ + private volatile int wasteTracker = TRACKER_NEVER_WASTED; + + private static final AtomicIntegerFieldUpdater<AtomicBTreeColumns> wasteTrackerUpdater = AtomicIntegerFieldUpdater.newUpdater(AtomicBTreeColumns.class, "wasteTracker"); + private static final Function<Cell, CellName> NAME = new Function<Cell, CellName>() { public CellName apply(Cell column) @@ -168,35 +194,108 @@ public class AtomicBTreeColumns extends ColumnFamily ColumnUpdater updater = new ColumnUpdater(this, cm.metadata, allocator, writeOp, indexer); DeletionInfo inputDeletionInfoCopy = null; - while (true) + boolean monitorOwned = false; + try { - Holder current = ref; - updater.ref = current; - updater.reset(); - - DeletionInfo deletionInfo; - if (cm.deletionInfo().mayModify(current.deletionInfo)) + if (usePessimisticLocking()) { - if (inputDeletionInfoCopy == null) - inputDeletionInfoCopy = cm.deletionInfo().copy(HeapAllocator.instance); - - deletionInfo = current.deletionInfo.copy().add(inputDeletionInfoCopy); - updater.allocated(deletionInfo.unsharedHeapSize() - current.deletionInfo.unsharedHeapSize()); + Locks.monitorEnterUnsafe(this); + monitorOwned = true; } - else + while (true) { - deletionInfo = current.deletionInfo; + Holder current = ref; + updater.ref = current; + updater.reset(); + + DeletionInfo deletionInfo; + if (cm.deletionInfo().mayModify(current.deletionInfo)) + { + if (inputDeletionInfoCopy == null) + inputDeletionInfoCopy = cm.deletionInfo().copy(HeapAllocator.instance); + + deletionInfo = current.deletionInfo.copy().add(inputDeletionInfoCopy); + updater.allocated(deletionInfo.unsharedHeapSize() - current.deletionInfo.unsharedHeapSize()); + } + else + { + deletionInfo = current.deletionInfo; + } + + Object[] tree = BTree.update(current.tree, metadata.comparator.columnComparator(Memtable.MEMORY_POOL instanceof NativePool), cm, cm.getColumnCount(), true, updater); + + if (tree != null && refUpdater.compareAndSet(this, current, new Holder(tree, deletionInfo))) + { + indexer.updateRowLevelIndexes(); + updater.finish(); + return updater.dataSize; + } + else if (!monitorOwned) + { + boolean shouldLock = usePessimisticLocking(); + if (!shouldLock) + { + shouldLock = updateWastedAllocationTracker(updater.heapSize); + } + if (shouldLock) + { + Locks.monitorEnterUnsafe(this); + monitorOwned = true; + } + } } + } + finally + { + if (monitorOwned) + Locks.monitorExitUnsafe(this); + } + } - Object[] tree = BTree.update(current.tree, metadata.comparator.columnComparator(Memtable.MEMORY_POOL instanceof NativePool), cm, cm.getColumnCount(), true, updater); + boolean usePessimisticLocking() + { + return wasteTracker == TRACKER_PESSIMISTIC_LOCKING; + } - if (tree != null && refUpdater.compareAndSet(this, current, new Holder(tree, deletionInfo))) + /** + * Update the wasted allocation tracker state based on newly wasted allocation information + * + * @param wastedBytes the number of bytes wasted by this thread + * @return true if the caller should now proceed with pessimistic locking because the waste limit has been reached + */ + private boolean updateWastedAllocationTracker(long wastedBytes) { + // Early check for huge allocation that exceeds the limit + if (wastedBytes < EXCESS_WASTE_BYTES) + { + // We round up to ensure work < granularity are still accounted for + int wastedAllocation = ((int) (wastedBytes + ALLOCATION_GRANULARITY_BYTES - 1)) / ALLOCATION_GRANULARITY_BYTES; + + int oldTrackerValue; + while (TRACKER_PESSIMISTIC_LOCKING != (oldTrackerValue = wasteTracker)) { - indexer.updateRowLevelIndexes(); - updater.finish(); - return updater.dataSize; + // Note this time value has an arbitrary offset, but is a constant rate 32 bit counter (that may wrap) + int time = (int) (System.nanoTime() >>> CLOCK_SHIFT); + int delta = oldTrackerValue - time; + if (oldTrackerValue == TRACKER_NEVER_WASTED || delta >= 0 || delta < -EXCESS_WASTE_OFFSET) + delta = -EXCESS_WASTE_OFFSET; + delta += wastedAllocation; + if (delta >= 0) + break; + if (wasteTrackerUpdater.compareAndSet(this, oldTrackerValue, avoidReservedValues(time + delta))) + return false; } } + // We have definitely reached our waste limit so set the state if it isn't already + wasteTrackerUpdater.set(this, TRACKER_PESSIMISTIC_LOCKING); + // And tell the caller to proceed with pessimistic locking + return true; + } + + private static int avoidReservedValues(int wasteTracker) + { + if (wasteTracker == TRACKER_NEVER_WASTED || wasteTracker == TRACKER_PESSIMISTIC_LOCKING) + return wasteTracker + 1; + return wasteTracker; } // no particular reason not to implement these next methods, we just haven't needed them yet http://git-wip-us.apache.org/repos/asf/cassandra/blob/dee15a85/src/java/org/apache/cassandra/db/Memtable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java index 1eea915..b0d2a11 100644 --- a/src/java/org/apache/cassandra/db/Memtable.java +++ b/src/java/org/apache/cassandra/db/Memtable.java @@ -328,11 +328,13 @@ public class Memtable SSTableWriter writer = createFlushWriter(cfs.getTempSSTablePath(sstableDirectory)); try { + boolean trackContention = logger.isDebugEnabled(); + int heavilyContendedRowCount = 0; // (we can't clear out the map as-we-go to free up memory, // since the memtable is being used for queries in the "pending flush" category) for (Map.Entry<RowPosition, AtomicBTreeColumns> entry : rows.entrySet()) { - ColumnFamily cf = entry.getValue(); + AtomicBTreeColumns cf = entry.getValue(); if (cf.isMarkedForDelete() && cf.hasColumns()) { @@ -345,6 +347,9 @@ public class Memtable continue; } + if (trackContention && cf.usePessimisticLocking()) + heavilyContendedRowCount++; + if (!cf.isEmpty()) writer.append((DecoratedKey)entry.getKey(), cf); } @@ -366,6 +371,9 @@ public class Memtable context); } + if (heavilyContendedRowCount > 0) + logger.debug(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, rows.size(), Memtable.this.toString())); + return ssTable; } catch (Throwable e) http://git-wip-us.apache.org/repos/asf/cassandra/blob/dee15a85/src/java/org/apache/cassandra/utils/concurrent/Locks.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/concurrent/Locks.java b/src/java/org/apache/cassandra/utils/concurrent/Locks.java new file mode 100644 index 0000000..1ed5492 --- /dev/null +++ b/src/java/org/apache/cassandra/utils/concurrent/Locks.java @@ -0,0 +1,37 @@ +package org.apache.cassandra.utils.concurrent; + +import sun.misc.Unsafe; + +import java.lang.reflect.Field; + +public class Locks +{ + static final Unsafe unsafe; + + static + { + try + { + Field field = sun.misc.Unsafe.class.getDeclaredField("theUnsafe"); + field.setAccessible(true); + unsafe = (sun.misc.Unsafe) field.get(null); + } + catch (Exception e) + { + throw new AssertionError(e); + } + } + + // enters the object's monitor IF UNSAFE IS PRESENT. If it isn't, this is a no-op. + public static void monitorEnterUnsafe(Object object) + { + if (unsafe != null) + unsafe.monitorEnter(object); + } + + public static void monitorExitUnsafe(Object object) + { + if (unsafe != null) + unsafe.monitorExit(object); + } +}