Fix spin loop in AtomicSortedColumns

patch by graham sanderson and benedict; reviewed by yukim for
CASSANDRA-7546


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dee15a85
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dee15a85
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dee15a85

Branch: refs/heads/trunk
Commit: dee15a85c8640e58e162798d46f026c47fdd432c
Parents: 11e8dc1
Author: graham sanderson <gra...@vast.com>
Authored: Mon Oct 13 11:52:03 2014 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Mon Oct 13 11:57:54 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/db/AtomicBTreeColumns.java | 139 ++++++++++++++++---
 src/java/org/apache/cassandra/db/Memtable.java  |  10 +-
 .../cassandra/utils/concurrent/Locks.java       |  37 +++++
 4 files changed, 166 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/dee15a85/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 891848e..0b2dd0c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -125,6 +125,7 @@ Merged from 2.0:
  * Fix wrong progress when streaming uncompressed (CASSANDRA-7878)
  * Fix possible infinite loop in creating repair range (CASSANDRA-7983)
  * Fix unit in nodetool for streaming throughput (CASSANDRA-7375)
+ * Fix spin loop in AtomicSortedColumns (CASSANDRA-7546)
 Merged from 1.2:
  * Don't index tombstones (CASSANDRA-7828)
  * Improve PasswordAuthenticator default super user setup (CASSANDRA-7788)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dee15a85/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java 
b/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
index 559e759..7b5e8a8 100644
--- a/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
+++ b/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
@@ -23,6 +23,7 @@ import java.util.Collection;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import com.google.common.base.Function;
@@ -37,10 +38,10 @@ import org.apache.cassandra.db.filter.ColumnSlice;
 import org.apache.cassandra.utils.ObjectSizes;
 import org.apache.cassandra.utils.btree.BTree;
 import org.apache.cassandra.utils.btree.UpdateFunction;
+import org.apache.cassandra.utils.concurrent.Locks;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 import org.apache.cassandra.utils.memory.HeapAllocator;
 import org.apache.cassandra.utils.memory.MemtableAllocator;
-import org.apache.cassandra.utils.memory.NativeAllocator;
 import org.apache.cassandra.utils.memory.NativePool;
 
 import static org.apache.cassandra.db.index.SecondaryIndexManager.Updater;
@@ -59,6 +60,31 @@ public class AtomicBTreeColumns extends ColumnFamily
     static final long EMPTY_SIZE = ObjectSizes.measure(new 
AtomicBTreeColumns(CFMetaData.IndexCf, null))
             + ObjectSizes.measure(new Holder(null, null));
 
+    // Reserved values for wasteTracker field. These values must not be 
consecutive (see avoidReservedValues)
+    private static final int TRACKER_NEVER_WASTED = 0;
+    private static final int TRACKER_PESSIMISTIC_LOCKING = Integer.MAX_VALUE;
+
+    // The granularity with which we track wasted allocation/work; we round up
+    private static final int ALLOCATION_GRANULARITY_BYTES = 1024;
+    // The number of bytes we have to waste in excess of our acceptable 
realtime rate of waste (defined below)
+    private static final long EXCESS_WASTE_BYTES = 10 * 1024 * 1024L;
+    private static final int EXCESS_WASTE_OFFSET = (int) (EXCESS_WASTE_BYTES / 
ALLOCATION_GRANULARITY_BYTES);
+    // Note this is a shift, because dividing a long time and then picking the 
low 32 bits doesn't give correct rollover behavior
+    private static final int CLOCK_SHIFT = 17;
+    // CLOCK_GRANULARITY = 1^9ns >> CLOCK_SHIFT == 132us == (1/7.63)ms
+
+    /**
+     * (clock + allocation) granularity are combined to give us an acceptable 
(waste) allocation rate that is defined by
+     * the passage of real time of 
ALLOCATION_GRANULARITY_BYTES/CLOCK_GRANULARITY, or in this case 7.63Kb/ms, or 
7.45Mb/s
+     *
+     * in wasteTracker we maintain within EXCESS_WASTE_OFFSET before the 
current time; whenever we waste bytes
+     * we increment the current value if it is within this window, and set it 
to the min of the window plus our waste
+     * otherwise.
+     */
+    private volatile int wasteTracker = TRACKER_NEVER_WASTED;
+
+    private static final AtomicIntegerFieldUpdater<AtomicBTreeColumns> 
wasteTrackerUpdater = 
AtomicIntegerFieldUpdater.newUpdater(AtomicBTreeColumns.class, "wasteTracker");
+
     private static final Function<Cell, CellName> NAME = new Function<Cell, 
CellName>()
     {
         public CellName apply(Cell column)
@@ -168,35 +194,108 @@ public class AtomicBTreeColumns extends ColumnFamily
         ColumnUpdater updater = new ColumnUpdater(this, cm.metadata, 
allocator, writeOp, indexer);
         DeletionInfo inputDeletionInfoCopy = null;
 
-        while (true)
+        boolean monitorOwned = false;
+        try
         {
-            Holder current = ref;
-            updater.ref = current;
-            updater.reset();
-
-            DeletionInfo deletionInfo;
-            if (cm.deletionInfo().mayModify(current.deletionInfo))
+            if (usePessimisticLocking())
             {
-                if (inputDeletionInfoCopy == null)
-                    inputDeletionInfoCopy = 
cm.deletionInfo().copy(HeapAllocator.instance);
-
-                deletionInfo = 
current.deletionInfo.copy().add(inputDeletionInfoCopy);
-                updater.allocated(deletionInfo.unsharedHeapSize() - 
current.deletionInfo.unsharedHeapSize());
+                Locks.monitorEnterUnsafe(this);
+                monitorOwned = true;
             }
-            else
+            while (true)
             {
-                deletionInfo = current.deletionInfo;
+                Holder current = ref;
+                updater.ref = current;
+                updater.reset();
+
+                DeletionInfo deletionInfo;
+                if (cm.deletionInfo().mayModify(current.deletionInfo))
+                {
+                    if (inputDeletionInfoCopy == null)
+                        inputDeletionInfoCopy = 
cm.deletionInfo().copy(HeapAllocator.instance);
+
+                    deletionInfo = 
current.deletionInfo.copy().add(inputDeletionInfoCopy);
+                    updater.allocated(deletionInfo.unsharedHeapSize() - 
current.deletionInfo.unsharedHeapSize());
+                }
+                else
+                {
+                    deletionInfo = current.deletionInfo;
+                }
+
+                Object[] tree = BTree.update(current.tree, 
metadata.comparator.columnComparator(Memtable.MEMORY_POOL instanceof 
NativePool), cm, cm.getColumnCount(), true, updater);
+
+                if (tree != null && refUpdater.compareAndSet(this, current, 
new Holder(tree, deletionInfo)))
+                {
+                    indexer.updateRowLevelIndexes();
+                    updater.finish();
+                    return updater.dataSize;
+                }
+                else if (!monitorOwned)
+                {
+                    boolean shouldLock = usePessimisticLocking();
+                    if (!shouldLock)
+                    {
+                        shouldLock = 
updateWastedAllocationTracker(updater.heapSize);
+                    }
+                    if (shouldLock)
+                    {
+                        Locks.monitorEnterUnsafe(this);
+                        monitorOwned = true;
+                    }
+                }
             }
+        }
+        finally
+        {
+            if (monitorOwned)
+                Locks.monitorExitUnsafe(this);
+        }
+    }
 
-            Object[] tree = BTree.update(current.tree, 
metadata.comparator.columnComparator(Memtable.MEMORY_POOL instanceof 
NativePool), cm, cm.getColumnCount(), true, updater);
+    boolean usePessimisticLocking()
+    {
+        return wasteTracker == TRACKER_PESSIMISTIC_LOCKING;
+    }
 
-            if (tree != null && refUpdater.compareAndSet(this, current, new 
Holder(tree, deletionInfo)))
+    /**
+     * Update the wasted allocation tracker state based on newly wasted 
allocation information
+     *
+     * @param wastedBytes the number of bytes wasted by this thread
+     * @return true if the caller should now proceed with pessimistic locking 
because the waste limit has been reached
+     */
+    private boolean updateWastedAllocationTracker(long wastedBytes) {
+        // Early check for huge allocation that exceeds the limit
+        if (wastedBytes < EXCESS_WASTE_BYTES)
+        {
+            // We round up to ensure work < granularity are still accounted for
+            int wastedAllocation = ((int) (wastedBytes + 
ALLOCATION_GRANULARITY_BYTES - 1)) / ALLOCATION_GRANULARITY_BYTES;
+
+            int oldTrackerValue;
+            while (TRACKER_PESSIMISTIC_LOCKING != (oldTrackerValue = 
wasteTracker))
             {
-                indexer.updateRowLevelIndexes();
-                updater.finish();
-                return updater.dataSize;
+                // Note this time value has an arbitrary offset, but is a 
constant rate 32 bit counter (that may wrap)
+                int time = (int) (System.nanoTime() >>> CLOCK_SHIFT);
+                int delta = oldTrackerValue - time;
+                if (oldTrackerValue == TRACKER_NEVER_WASTED || delta >= 0 || 
delta < -EXCESS_WASTE_OFFSET)
+                    delta = -EXCESS_WASTE_OFFSET;
+                delta += wastedAllocation;
+                if (delta >= 0)
+                    break;
+                if (wasteTrackerUpdater.compareAndSet(this, oldTrackerValue, 
avoidReservedValues(time + delta)))
+                    return false;
             }
         }
+        // We have definitely reached our waste limit so set the state if it 
isn't already
+        wasteTrackerUpdater.set(this, TRACKER_PESSIMISTIC_LOCKING);
+        // And tell the caller to proceed with pessimistic locking
+        return true;
+    }
+
+    private static int avoidReservedValues(int wasteTracker)
+    {
+        if (wasteTracker == TRACKER_NEVER_WASTED || wasteTracker == 
TRACKER_PESSIMISTIC_LOCKING)
+            return wasteTracker + 1;
+        return wasteTracker;
     }
 
     // no particular reason not to implement these next methods, we just 
haven't needed them yet

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dee15a85/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java 
b/src/java/org/apache/cassandra/db/Memtable.java
index 1eea915..b0d2a11 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -328,11 +328,13 @@ public class Memtable
             SSTableWriter writer = 
createFlushWriter(cfs.getTempSSTablePath(sstableDirectory));
             try
             {
+                boolean trackContention = logger.isDebugEnabled();
+                int heavilyContendedRowCount = 0;
                 // (we can't clear out the map as-we-go to free up memory,
                 //  since the memtable is being used for queries in the 
"pending flush" category)
                 for (Map.Entry<RowPosition, AtomicBTreeColumns> entry : 
rows.entrySet())
                 {
-                    ColumnFamily cf = entry.getValue();
+                    AtomicBTreeColumns cf = entry.getValue();
 
                     if (cf.isMarkedForDelete() && cf.hasColumns())
                     {
@@ -345,6 +347,9 @@ public class Memtable
                             continue;
                     }
 
+                    if (trackContention && cf.usePessimisticLocking())
+                        heavilyContendedRowCount++;
+
                     if (!cf.isEmpty())
                         writer.append((DecoratedKey)entry.getKey(), cf);
                 }
@@ -366,6 +371,9 @@ public class Memtable
                                 context);
                 }
 
+                if (heavilyContendedRowCount > 0)
+                    logger.debug(String.format("High update contention in 
%d/%d partitions of %s ", heavilyContendedRowCount, rows.size(), 
Memtable.this.toString()));
+
                 return ssTable;
             }
             catch (Throwable e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dee15a85/src/java/org/apache/cassandra/utils/concurrent/Locks.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Locks.java 
b/src/java/org/apache/cassandra/utils/concurrent/Locks.java
new file mode 100644
index 0000000..1ed5492
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/concurrent/Locks.java
@@ -0,0 +1,37 @@
+package org.apache.cassandra.utils.concurrent;
+
+import sun.misc.Unsafe;
+
+import java.lang.reflect.Field;
+
+public class Locks
+{
+    static final Unsafe unsafe;
+
+    static
+    {
+        try
+        {
+            Field field = sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
+            field.setAccessible(true);
+            unsafe = (sun.misc.Unsafe) field.get(null);
+        }
+        catch (Exception e)
+        {
+            throw new AssertionError(e);
+        }
+    }
+
+    // enters the object's monitor IF UNSAFE IS PRESENT. If it isn't, this is 
a no-op.
+    public static void monitorEnterUnsafe(Object object)
+    {
+        if (unsafe != null)
+            unsafe.monitorEnter(object);
+    }
+
+    public static void monitorExitUnsafe(Object object)
+    {
+        if (unsafe != null)
+            unsafe.monitorExit(object);
+    }
+}

Reply via email to