Author: jbellis Date: Fri Apr 1 22:22:30 2011 New Revision: 1087919 URL: http://svn.apache.org/viewvc?rev=1087919&view=rev Log: revert #1954 for now
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1087919&r1=1087918&r2=1087919&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Fri Apr 1 22:22:30 2011 @@ -40,7 +40,6 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.cache.AutoSavingCache; import org.apache.cassandra.cache.AutoSavingKeyCache; import org.apache.cassandra.cache.AutoSavingRowCache; -import org.apache.cassandra.cache.JMXInstrumentedCache; import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.concurrent.StageManager; @@ -588,18 +587,33 @@ public class ColumnFamilyStore implement /** flush the given memtable and swap in a new one for its CFS, if it hasn't been frozen already. threadsafe. */ Future<?> maybeSwitchMemtable(Memtable oldMemtable, final boolean writeCommitLog) { - // Only one thread will succeed in marking it as pending flush; the others can go back to processing writes - if (!oldMemtable.markPendingFlush()) + if (oldMemtable.isFrozen()) { - logger.debug("memtable is already pending flush; another thread must be flushing it"); + logger.debug("memtable is already frozen; another thread must be flushing it"); return null; } - assert getMemtableThreadSafe() == oldMemtable; - // global synchronization ensures that we schedule discardCompletedSegments calls in the same order as their - // contexts (commitlog position) were read, even though the flush executor is multithreaded. - synchronized (ColumnFamilyStore.class) + /* + * If we can get the writelock, that means no new updates can come in and + * all ongoing updates to memtables have completed. We can get the tail + * of the log and use it as the starting position for log replay on recovery. + * + * This is why we Table.flusherLock needs to be global instead of per-Table: + * we need to schedule discardCompletedSegments calls in the same order as their + * contexts (commitlog position) were read, even though the flush executor + * is multithreaded. + */ + Table.switchLock.writeLock().lock(); + try { + if (oldMemtable.isFrozen()) + { + logger.debug("memtable is already frozen; another thread must be flushing it"); + return null; + } + + assert getMemtableThreadSafe() == oldMemtable; + oldMemtable.freeze(); final CommitLogSegment.CommitLogContext ctx = writeCommitLog ? CommitLog.instance.getContext() : null; // submit the memtable for any indexed sub-cfses, and our own. @@ -612,9 +626,8 @@ public class ColumnFamilyStore implement } final CountDownLatch latch = new CountDownLatch(icc.size()); for (ColumnFamilyStore cfs : icc) - { submitFlush(cfs.data.switchMemtable(), latch); - } + // we marked our memtable as frozen as part of the concurrency control, // so even if there was nothing to flush we need to switch it out if (!icc.contains(this)) @@ -641,6 +654,10 @@ public class ColumnFamilyStore implement } }); } + finally + { + Table.switchLock.writeLock().unlock(); + } } public boolean isDropped() @@ -696,6 +713,8 @@ public class ColumnFamilyStore implement /** * Insert/Update the column family for this key. + * Caller is responsible for acquiring Table.flusherLock! + * param @ lock - lock that needs to be used. * param @ key - key for update/insert * param @ columnFamily - columnFamily changes */ @@ -947,11 +966,14 @@ public class ColumnFamilyStore implement } /** - * get the current memtable in a threadsafe fashion. - * Returning memtable is ok because memtable is volatile, and thus - * introduce a happens-before ordering. + * get the current memtable in a threadsafe fashion. note that simply "return memtable_" is + * incorrect; you need to lock to introduce a thread safe happens-before ordering. + * + * do NOT use this method to do either a put or get on the memtable object, since it could be + * flushed in the meantime (and its executor terminated). * - * do NOT make this method public or it will really get impossible to reason about these things. + * also do NOT make this method public or it will really get impossible to reason about these things. + * @return */ private Memtable getMemtableThreadSafe() { @@ -998,10 +1020,10 @@ public class ColumnFamilyStore implement return readStats.getTotalLatencyMicros(); } - // TODO this actually isn't a good meature of pending tasks +// TODO this actually isn't a good meature of pending tasks public int getPendingTasks() { - return 0; + return Table.switchLock.getQueueLength(); } public long getWriteCount() Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=1087919&r1=1087918&r2=1087919&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Fri Apr 1 22:22:30 2011 @@ -52,8 +52,7 @@ public class Memtable implements Compara { private static final Logger logger = LoggerFactory.getLogger(Memtable.class); - private final AtomicBoolean isPendingFlush = new AtomicBoolean(false); - private final AtomicInteger activeWriters = new AtomicInteger(0); + private boolean isFrozen; private final AtomicLong currentThroughput = new AtomicLong(0); private final AtomicLong currentOperations = new AtomicLong(0); @@ -106,30 +105,25 @@ public class Memtable implements Compara return currentThroughput.get() >= this.THRESHOLD || currentOperations.get() >= this.THRESHOLD_COUNT; } - boolean isPendingFlush() + boolean isFrozen() { - return isPendingFlush.get(); + return isFrozen; } - boolean markPendingFlush() + void freeze() { - return isPendingFlush.compareAndSet(false, true); + isFrozen = true; } /** * Should only be called by ColumnFamilyStore.apply. NOT a public API. + * (CFS handles locking to avoid submitting an op + * to a flushing memtable. Any other way is unsafe.) */ void put(DecoratedKey key, ColumnFamily columnFamily) { - try - { - activeWriters.incrementAndGet(); - resolve(key, columnFamily); - } - finally - { - activeWriters.decrementAndGet(); - } + assert !isFrozen; // not 100% foolproof but hell, it's an assert + resolve(key, columnFamily); } private void resolve(DecoratedKey key, ColumnFamily cf) @@ -178,7 +172,6 @@ public class Memtable implements Compara { public void runMayThrow() throws IOException { - waitForWriters(); cfs.flushLock.lock(); try { @@ -197,25 +190,6 @@ public class Memtable implements Compara }); } - /* - * Wait for all writers to be done with this memtable before flushing. - * A busy-wait is probably alright since we'll new wait long. - */ - private void waitForWriters() - { - while (activeWriters.get() > 0) - { - try - { - Thread.sleep(3); - } - catch (InterruptedException e) - { - logger.error("Interrupted while waiting on writers.", e); - } - } - } - public String toString() { return String.format("Memtable-%s@%s(%s bytes, %s operations)", Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=1087919&r1=1087918&r2=1087919&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Fri Apr 1 22:22:30 2011 @@ -24,7 +24,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.*; -import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import com.google.common.base.Function; import com.google.common.collect.Iterables; @@ -57,6 +57,14 @@ public class Table private static final Logger logger = LoggerFactory.getLogger(Table.class); private static final String SNAPSHOT_SUBDIR_NAME = "snapshots"; + /** + * accesses to CFS.memtable should acquire this for thread safety. + * Table.maybeSwitchMemtable should aquire the writeLock; see that method for the full explanation. + * + * (Enabling fairness in the RRWL is observed to decrease throughput, so we leave it off.) + */ + static final ReentrantReadWriteLock switchLock = new ReentrantReadWriteLock(); + // It is possible to call Table.open without a running daemon, so it makes sense to ensure // proper directories here as well as in CassandraDaemon. static @@ -371,64 +379,72 @@ public class Table logger.debug("applying mutation of row {}", ByteBufferUtil.bytesToHex(mutation.key())); // write the mutation to the commitlog and memtables - if (writeCommitLog) - CommitLog.instance.add(mutation); - - DecoratedKey<?> key = StorageService.getPartitioner().decorateKey(mutation.key()); - for (ColumnFamily cf : mutation.getColumnFamilies()) + switchLock.readLock().lock(); + try { - ColumnFamilyStore cfs = columnFamilyStores.get(cf.id()); - if (cfs == null) + if (writeCommitLog) + CommitLog.instance.add(mutation); + + DecoratedKey<?> key = StorageService.getPartitioner().decorateKey(mutation.key()); + for (ColumnFamily cf : mutation.getColumnFamilies()) { - logger.error("Attempting to mutate non-existant column family " + cf.id()); - continue; - } + ColumnFamilyStore cfs = columnFamilyStores.get(cf.id()); + if (cfs == null) + { + logger.error("Attempting to mutate non-existant column family " + cf.id()); + continue; + } - SortedSet<ByteBuffer> mutatedIndexedColumns = null; - for (ByteBuffer column : cfs.getIndexedColumns()) - { - if (cf.getColumnNames().contains(column) || cf.isMarkedForDelete()) + SortedSet<ByteBuffer> mutatedIndexedColumns = null; + for (ByteBuffer column : cfs.getIndexedColumns()) { - if (mutatedIndexedColumns == null) - mutatedIndexedColumns = new TreeSet<ByteBuffer>(); - mutatedIndexedColumns.add(column); - if (logger.isDebugEnabled()) + if (cf.getColumnNames().contains(column) || cf.isMarkedForDelete()) { - // can't actually use validator to print value here, because we overload value - // for deletion timestamp as well (which may not be a well-formed value for the column type) - ByteBuffer value = cf.getColumn(column) == null ? null : cf.getColumn(column).value(); // may be null on row-level deletion - logger.debug(String.format("mutating indexed column %s value %s", - cf.getComparator().getString(column), - value == null ? "null" : ByteBufferUtil.bytesToHex(value))); + if (mutatedIndexedColumns == null) + mutatedIndexedColumns = new TreeSet<ByteBuffer>(); + mutatedIndexedColumns.add(column); + if (logger.isDebugEnabled()) + { + // can't actually use validator to print value here, because we overload value + // for deletion timestamp as well (which may not be a well-formed value for the column type) + ByteBuffer value = cf.getColumn(column) == null ? null : cf.getColumn(column).value(); // may be null on row-level deletion + logger.debug(String.format("mutating indexed column %s value %s", + cf.getComparator().getString(column), + value == null ? "null" : ByteBufferUtil.bytesToHex(value))); + } } } - } - synchronized (indexLockFor(mutation.key())) - { - ColumnFamily oldIndexedColumns = null; - if (mutatedIndexedColumns != null) + synchronized (indexLockFor(mutation.key())) { - // with the raw data CF, we can just apply every update in any order and let - // read-time resolution throw out obsolete versions, thus avoiding read-before-write. - // but for indexed data we need to make sure that we're not creating index entries - // for obsolete writes. - oldIndexedColumns = readCurrentIndexedColumns(key, cfs, mutatedIndexedColumns); - logger.debug("Pre-mutation index row is {}", oldIndexedColumns); - ignoreObsoleteMutations(cf, mutatedIndexedColumns, oldIndexedColumns); - } + ColumnFamily oldIndexedColumns = null; + if (mutatedIndexedColumns != null) + { + // with the raw data CF, we can just apply every update in any order and let + // read-time resolution throw out obsolete versions, thus avoiding read-before-write. + // but for indexed data we need to make sure that we're not creating index entries + // for obsolete writes. + oldIndexedColumns = readCurrentIndexedColumns(key, cfs, mutatedIndexedColumns); + logger.debug("Pre-mutation index row is {}", oldIndexedColumns); + ignoreObsoleteMutations(cf, mutatedIndexedColumns, oldIndexedColumns); + } - Memtable fullMemtable = cfs.apply(key, cf); - if (fullMemtable != null) - memtablesToFlush = addFullMemtable(memtablesToFlush, fullMemtable); + Memtable fullMemtable = cfs.apply(key, cf); + if (fullMemtable != null) + memtablesToFlush = addFullMemtable(memtablesToFlush, fullMemtable); - if (mutatedIndexedColumns != null) - { - // ignore full index memtables -- we flush those when the "master" one is full - applyIndexUpdates(mutation.key(), cf, cfs, mutatedIndexedColumns, oldIndexedColumns); + if (mutatedIndexedColumns != null) + { + // ignore full index memtables -- we flush those when the "master" one is full + applyIndexUpdates(mutation.key(), cf, cfs, mutatedIndexedColumns, oldIndexedColumns); + } } } } + finally + { + switchLock.readLock().unlock(); + } // flush memtables that got filled up outside the readlock (maybeSwitchMemtable acquires writeLock). // usually mTF will be empty and this will be a no-op. @@ -582,12 +598,19 @@ public class Table DecoratedKey<?> key = iter.next(); logger.debug("Indexing row {} ", key); List<Memtable> memtablesToFlush = Collections.emptyList(); - - synchronized (indexLockFor(key.key)) + switchLock.readLock().lock(); + try + { + synchronized (indexLockFor(key.key)) + { + ColumnFamily cf = readCurrentIndexedColumns(key, cfs, columns); + if (cf != null) + memtablesToFlush = applyIndexUpdates(key.key, cf, cfs, cf.getColumnNames(), null); + } + } + finally { - ColumnFamily cf = readCurrentIndexedColumns(key, cfs, columns); - if (cf != null) - memtablesToFlush = applyIndexUpdates(key.key, cf, cfs, cf.getColumnNames(), null); + switchLock.readLock().unlock(); } // during index build, we do flush index memtables separately from master; otherwise we could OOM