Author: jbellis
Date: Fri Apr  1 22:22:30 2011
New Revision: 1087919

URL: http://svn.apache.org/viewvc?rev=1087919&view=rev
Log:
revert #1954 for now

Modified:
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
    cassandra/trunk/src/java/org/apache/cassandra/db/Table.java

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1087919&r1=1087918&r2=1087919&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Fri 
Apr  1 22:22:30 2011
@@ -40,7 +40,6 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.cache.AutoSavingCache;
 import org.apache.cassandra.cache.AutoSavingKeyCache;
 import org.apache.cassandra.cache.AutoSavingRowCache;
-import org.apache.cassandra.cache.JMXInstrumentedCache;
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.concurrent.StageManager;
@@ -588,18 +587,33 @@ public class ColumnFamilyStore implement
     /** flush the given memtable and swap in a new one for its CFS, if it 
hasn't been frozen already.  threadsafe. */
     Future<?> maybeSwitchMemtable(Memtable oldMemtable, final boolean 
writeCommitLog)
     {
-        // Only one thread will succeed in marking it as pending flush; the 
others can go back to processing writes
-        if (!oldMemtable.markPendingFlush())
+        if (oldMemtable.isFrozen())
         {
-            logger.debug("memtable is already pending flush; another thread 
must be flushing it");
+            logger.debug("memtable is already frozen; another thread must be 
flushing it");
             return null;
         }
-        assert getMemtableThreadSafe() == oldMemtable;
 
-        // global synchronization ensures that we schedule 
discardCompletedSegments calls in the same order as their
-        // contexts (commitlog position) were read, even though the flush 
executor is multithreaded.
-        synchronized (ColumnFamilyStore.class)
+        /*
+         * If we can get the writelock, that means no new updates can come in 
and
+         * all ongoing updates to memtables have completed. We can get the tail
+         * of the log and use it as the starting position for log replay on 
recovery.
+         *
+         * This is why we Table.flusherLock needs to be global instead of 
per-Table:
+         * we need to schedule discardCompletedSegments calls in the same 
order as their
+         * contexts (commitlog position) were read, even though the flush 
executor
+         * is multithreaded.
+         */
+        Table.switchLock.writeLock().lock();
+        try
         {
+            if (oldMemtable.isFrozen())
+            {
+                logger.debug("memtable is already frozen; another thread must 
be flushing it");
+                return null;
+            }
+
+            assert getMemtableThreadSafe() == oldMemtable;
+            oldMemtable.freeze();
             final CommitLogSegment.CommitLogContext ctx = writeCommitLog ? 
CommitLog.instance.getContext() : null;
 
             // submit the memtable for any indexed sub-cfses, and our own.
@@ -612,9 +626,8 @@ public class ColumnFamilyStore implement
             }
             final CountDownLatch latch = new CountDownLatch(icc.size());
             for (ColumnFamilyStore cfs : icc)
-            {
                 submitFlush(cfs.data.switchMemtable(), latch);
-            }
+
             // we marked our memtable as frozen as part of the concurrency 
control,
             // so even if there was nothing to flush we need to switch it out
             if (!icc.contains(this))
@@ -641,6 +654,10 @@ public class ColumnFamilyStore implement
                 }
             });
         }
+        finally
+        {
+            Table.switchLock.writeLock().unlock();
+        }
     }
 
     public boolean isDropped()
@@ -696,6 +713,8 @@ public class ColumnFamilyStore implement
 
     /**
      * Insert/Update the column family for this key.
+     * Caller is responsible for acquiring Table.flusherLock!
+     * param @ lock - lock that needs to be used.
      * param @ key - key for update/insert
      * param @ columnFamily - columnFamily changes
      */
@@ -947,11 +966,14 @@ public class ColumnFamilyStore implement
     }
 
     /**
-     * get the current memtable in a threadsafe fashion. 
-     * Returning memtable is ok because memtable is volatile, and thus
-     * introduce a happens-before ordering.
+     * get the current memtable in a threadsafe fashion.  note that simply 
"return memtable_" is
+     * incorrect; you need to lock to introduce a thread safe happens-before 
ordering.
+     *
+     * do NOT use this method to do either a put or get on the memtable 
object, since it could be
+     * flushed in the meantime (and its executor terminated).
      *
-     * do NOT make this method public or it will really get impossible to 
reason about these things.
+     * also do NOT make this method public or it will really get impossible to 
reason about these things.
+     * @return
      */
     private Memtable getMemtableThreadSafe()
     {
@@ -998,10 +1020,10 @@ public class ColumnFamilyStore implement
         return readStats.getTotalLatencyMicros();
     }
 
-    // TODO this actually isn't a good meature of pending tasks
+// TODO this actually isn't a good meature of pending tasks
     public int getPendingTasks()
     {
-        return 0;
+        return Table.switchLock.getQueueLength();
     }
 
     public long getWriteCount()

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=1087919&r1=1087918&r2=1087919&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Fri Apr  1 
22:22:30 2011
@@ -52,8 +52,7 @@ public class Memtable implements Compara
 {
     private static final Logger logger = 
LoggerFactory.getLogger(Memtable.class);
 
-    private final AtomicBoolean isPendingFlush = new AtomicBoolean(false);
-    private final AtomicInteger activeWriters = new AtomicInteger(0);
+    private boolean isFrozen;
 
     private final AtomicLong currentThroughput = new AtomicLong(0);
     private final AtomicLong currentOperations = new AtomicLong(0);
@@ -106,30 +105,25 @@ public class Memtable implements Compara
         return currentThroughput.get() >= this.THRESHOLD || 
currentOperations.get() >= this.THRESHOLD_COUNT;
     }
 
-    boolean isPendingFlush()
+    boolean isFrozen()
     {
-        return isPendingFlush.get();
+        return isFrozen;
     }
 
-    boolean markPendingFlush()
+    void freeze()
     {
-        return isPendingFlush.compareAndSet(false, true);
+        isFrozen = true;
     }
 
     /**
      * Should only be called by ColumnFamilyStore.apply.  NOT a public API.
+     * (CFS handles locking to avoid submitting an op
+     *  to a flushing memtable.  Any other way is unsafe.)
     */
     void put(DecoratedKey key, ColumnFamily columnFamily)
     {
-        try
-        {
-            activeWriters.incrementAndGet();
-            resolve(key, columnFamily);
-        }
-        finally
-        {
-            activeWriters.decrementAndGet();
-        }
+        assert !isFrozen; // not 100% foolproof but hell, it's an assert
+        resolve(key, columnFamily);
     }
 
     private void resolve(DecoratedKey key, ColumnFamily cf)
@@ -178,7 +172,6 @@ public class Memtable implements Compara
         {
             public void runMayThrow() throws IOException
             {
-                waitForWriters();
                 cfs.flushLock.lock();
                 try
                 {
@@ -197,25 +190,6 @@ public class Memtable implements Compara
         });
     }
 
-    /*
-     * Wait for all writers to be done with this memtable before flushing.
-     * A busy-wait is probably alright since we'll new wait long.
-     */
-    private void waitForWriters()
-    {
-        while (activeWriters.get() > 0)
-        {
-            try
-            {
-                Thread.sleep(3);
-            }
-            catch (InterruptedException e)
-            {
-                logger.error("Interrupted while waiting on writers.", e);
-            }
-        }
-    }
-
     public String toString()
     {
         return String.format("Memtable-%s@%s(%s bytes, %s operations)",

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=1087919&r1=1087918&r2=1087919&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Fri Apr  1 
22:22:30 2011
@@ -24,7 +24,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.*;
-import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
@@ -57,6 +57,14 @@ public class Table
     private static final Logger logger = LoggerFactory.getLogger(Table.class);
     private static final String SNAPSHOT_SUBDIR_NAME = "snapshots";
 
+    /**
+     * accesses to CFS.memtable should acquire this for thread safety.
+     * Table.maybeSwitchMemtable should aquire the writeLock; see that method 
for the full explanation.
+     *
+     * (Enabling fairness in the RRWL is observed to decrease throughput, so 
we leave it off.)
+     */
+    static final ReentrantReadWriteLock switchLock = new 
ReentrantReadWriteLock();
+
     // It is possible to call Table.open without a running daemon, so it makes 
sense to ensure
     // proper directories here as well as in CassandraDaemon.
     static 
@@ -371,64 +379,72 @@ public class Table
             logger.debug("applying mutation of row {}", 
ByteBufferUtil.bytesToHex(mutation.key()));
 
         // write the mutation to the commitlog and memtables
-        if (writeCommitLog)
-            CommitLog.instance.add(mutation);
-
-        DecoratedKey<?> key = 
StorageService.getPartitioner().decorateKey(mutation.key());
-        for (ColumnFamily cf : mutation.getColumnFamilies())
+        switchLock.readLock().lock();
+        try
         {
-            ColumnFamilyStore cfs = columnFamilyStores.get(cf.id());
-            if (cfs == null)
+            if (writeCommitLog)
+                CommitLog.instance.add(mutation);
+        
+            DecoratedKey<?> key = 
StorageService.getPartitioner().decorateKey(mutation.key());
+            for (ColumnFamily cf : mutation.getColumnFamilies())
             {
-                logger.error("Attempting to mutate non-existant column family 
" + cf.id());
-                continue;
-            }
+                ColumnFamilyStore cfs = columnFamilyStores.get(cf.id());
+                if (cfs == null)
+                {
+                    logger.error("Attempting to mutate non-existant column 
family " + cf.id());
+                    continue;
+                }
 
-            SortedSet<ByteBuffer> mutatedIndexedColumns = null;
-            for (ByteBuffer column : cfs.getIndexedColumns())
-            {
-                if (cf.getColumnNames().contains(column) || 
cf.isMarkedForDelete())
+                SortedSet<ByteBuffer> mutatedIndexedColumns = null;
+                for (ByteBuffer column : cfs.getIndexedColumns())
                 {
-                    if (mutatedIndexedColumns == null)
-                        mutatedIndexedColumns = new TreeSet<ByteBuffer>();
-                    mutatedIndexedColumns.add(column);
-                    if (logger.isDebugEnabled())
+                    if (cf.getColumnNames().contains(column) || 
cf.isMarkedForDelete())
                     {
-                        // can't actually use validator to print value here, 
because we overload value
-                        // for deletion timestamp as well (which may not be a 
well-formed value for the column type)
-                        ByteBuffer value = cf.getColumn(column) == null ? null 
: cf.getColumn(column).value(); // may be null on row-level deletion
-                        logger.debug(String.format("mutating indexed column %s 
value %s",
-                                    cf.getComparator().getString(column),
-                                    value == null ? "null" : 
ByteBufferUtil.bytesToHex(value)));
+                        if (mutatedIndexedColumns == null)
+                            mutatedIndexedColumns = new TreeSet<ByteBuffer>();
+                        mutatedIndexedColumns.add(column);
+                        if (logger.isDebugEnabled())
+                        {
+                            // can't actually use validator to print value 
here, because we overload value
+                            // for deletion timestamp as well (which may not 
be a well-formed value for the column type)
+                            ByteBuffer value = cf.getColumn(column) == null ? 
null : cf.getColumn(column).value(); // may be null on row-level deletion
+                            logger.debug(String.format("mutating indexed 
column %s value %s",
+                                                       
cf.getComparator().getString(column),
+                                                       value == null ? "null" 
: ByteBufferUtil.bytesToHex(value)));
+                        }
                     }
                 }
-            }
 
-            synchronized (indexLockFor(mutation.key()))
-            {
-                ColumnFamily oldIndexedColumns = null;
-                if (mutatedIndexedColumns != null)
+                synchronized (indexLockFor(mutation.key()))
                 {
-                    // with the raw data CF, we can just apply every update in 
any order and let
-                    // read-time resolution throw out obsolete versions, thus 
avoiding read-before-write.
-                    // but for indexed data we need to make sure that we're 
not creating index entries
-                    // for obsolete writes.
-                    oldIndexedColumns = readCurrentIndexedColumns(key, cfs, 
mutatedIndexedColumns);
-                    logger.debug("Pre-mutation index row is {}", 
oldIndexedColumns);
-                    ignoreObsoleteMutations(cf, mutatedIndexedColumns, 
oldIndexedColumns);
-                }
+                    ColumnFamily oldIndexedColumns = null;
+                    if (mutatedIndexedColumns != null)
+                    {
+                        // with the raw data CF, we can just apply every 
update in any order and let
+                        // read-time resolution throw out obsolete versions, 
thus avoiding read-before-write.
+                        // but for indexed data we need to make sure that 
we're not creating index entries
+                        // for obsolete writes.
+                        oldIndexedColumns = readCurrentIndexedColumns(key, 
cfs, mutatedIndexedColumns);
+                        logger.debug("Pre-mutation index row is {}", 
oldIndexedColumns);
+                        ignoreObsoleteMutations(cf, mutatedIndexedColumns, 
oldIndexedColumns);
+                    }
 
-                Memtable fullMemtable = cfs.apply(key, cf);
-                if (fullMemtable != null)
-                    memtablesToFlush = addFullMemtable(memtablesToFlush, 
fullMemtable);
+                    Memtable fullMemtable = cfs.apply(key, cf);
+                    if (fullMemtable != null)
+                        memtablesToFlush = addFullMemtable(memtablesToFlush, 
fullMemtable);
 
-                if (mutatedIndexedColumns != null)
-                {
-                    // ignore full index memtables -- we flush those when the 
"master" one is full
-                    applyIndexUpdates(mutation.key(), cf, cfs, 
mutatedIndexedColumns, oldIndexedColumns);
+                    if (mutatedIndexedColumns != null)
+                    {
+                        // ignore full index memtables -- we flush those when 
the "master" one is full
+                        applyIndexUpdates(mutation.key(), cf, cfs, 
mutatedIndexedColumns, oldIndexedColumns);
+                    }
                 }
             }
         }
+        finally
+        {
+            switchLock.readLock().unlock();
+        }
 
         // flush memtables that got filled up outside the readlock 
(maybeSwitchMemtable acquires writeLock).
         // usually mTF will be empty and this will be a no-op.
@@ -582,12 +598,19 @@ public class Table
                 DecoratedKey<?> key = iter.next();
                 logger.debug("Indexing row {} ", key);
                 List<Memtable> memtablesToFlush = Collections.emptyList();
-
-                synchronized (indexLockFor(key.key))
+                switchLock.readLock().lock();
+                try
+                {
+                    synchronized (indexLockFor(key.key))
+                    {
+                        ColumnFamily cf = readCurrentIndexedColumns(key, cfs, 
columns);
+                        if (cf != null)
+                            memtablesToFlush = applyIndexUpdates(key.key, cf, 
cfs, cf.getColumnNames(), null);
+                    }
+                }
+                finally
                 {
-                    ColumnFamily cf = readCurrentIndexedColumns(key, cfs, 
columns);
-                    if (cf != null)
-                        memtablesToFlush = applyIndexUpdates(key.key, cf, cfs, 
cf.getColumnNames(), null);
+                    switchLock.readLock().unlock();
                 }
 
                 // during index build, we do flush index memtables separately 
from master; otherwise we could OOM


Reply via email to