Author: jbellis Date: Fri Jul 23 04:57:20 2010 New Revision: 966964 URL: http://svn.apache.org/viewvc?rev=966964&view=rev Log: flush index CFs before marking parent CF flushed in commitlog header. patch by jbellis; reviewed by Nate McCall for CASSANDRA-1301
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java cassandra/trunk/src/java/org/apache/cassandra/db/IFlushable.java cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Modified: cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java?rev=966964&r1=966963&r2=966964&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java Fri Jul 23 04:57:20 2010 @@ -20,6 +20,7 @@ package org.apache.cassandra.db; import java.io.IOException; import java.util.*; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; @@ -78,7 +79,7 @@ public class BinaryMemtable implements I if (!isFrozen) { isFrozen = true; - cfs.submitFlush(this); + cfs.submitFlush(this, new CountDownLatch(1)); cfs.switchBinaryMemtable(key, buffer); } else @@ -134,7 +135,7 @@ public class BinaryMemtable implements I return sstable; } - public void flushAndSignal(final Condition condition, ExecutorService sorter, final ExecutorService writer) + public void flushAndSignal(final CountDownLatch latch, ExecutorService sorter, final ExecutorService writer) { sorter.submit(new Runnable() { @@ -146,7 +147,7 @@ public class BinaryMemtable implements I public void runMayThrow() throws IOException { cfs.addSSTable(writeSortedContents(sortedKeys)); - condition.signalAll(); + latch.countDown(); } }); } Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=966964&r1=966963&r2=966964&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Fri Jul 23 04:57:20 2010 @@ -31,6 +31,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; import org.apache.commons.collections.IteratorUtils; import org.apache.commons.lang.ArrayUtils; import org.slf4j.Logger; @@ -394,22 +395,29 @@ public class ColumnFamilyStore implement try { if (oldMemtable.isFrozen()) - { return null; - } - oldMemtable.freeze(); + assert memtable_ == oldMemtable; + memtable_.freeze(); final CommitLogSegment.CommitLogContext ctx = writeCommitLog ? CommitLog.instance().getContext() : null; - logger_.info(columnFamily_ + " has reached its threshold; switching in a fresh Memtable at " + ctx); - final Condition condition = submitFlush(oldMemtable); - memtable_ = new Memtable(this, partitioner_); - // a second executor that makes sure the onMemtableFlushes get called in the right order, + logger_.info("switching in a fresh Memtable for " + columnFamily_ + " at " + ctx); + + // submit the memtable for any indexed sub-cfses, and our own + final CountDownLatch latch = new CountDownLatch(1 + indexedColumns_.size()); + for (ColumnFamilyStore cfs : Iterables.concat(indexedColumns_.values(), Collections.singleton(this))) + { + submitFlush(cfs.memtable_, latch); + cfs.memtable_ = new Memtable(cfs, cfs.partitioner_); + } + + // when all the memtables have been written, including for indexes, mark the flush in the commitlog header. + // a second executor makes sure the onMemtableFlushes get called in the right order, // while keeping the wait-for-flush (future.get) out of anything latency-sensitive. return commitLogUpdater_.submit(new WrappedRunnable() { public void runMayThrow() throws InterruptedException, IOException { - condition.await(); + latch.await(); if (writeCommitLog) { // if we're not writing to the commit log, we are replaying the log, so marking @@ -463,7 +471,7 @@ public class ColumnFamilyStore implement if (binaryMemtable_.get().isClean()) return; - submitFlush(binaryMemtable_.get()); + submitFlush(binaryMemtable_.get(), new CountDownLatch(1)); } /** @@ -674,12 +682,10 @@ public class ColumnFamilyStore implement * flushing thread finishes sorting, which will almost always be longer than any of the flushSorter threads proper * (since, by definition, it started last). */ - Condition submitFlush(IFlushable flushable) + void submitFlush(IFlushable flushable, CountDownLatch latch) { logger_.info("Enqueuing flush of {}", flushable); - final Condition condition = new SimpleCondition(); - flushable.flushAndSignal(condition, flushSorter_, flushWriter_); - return condition; + flushable.flushAndSignal(latch, flushSorter_, flushWriter_); } public int getMemtableColumnsCount() Modified: cassandra/trunk/src/java/org/apache/cassandra/db/IFlushable.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/IFlushable.java?rev=966964&r1=966963&r2=966964&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/IFlushable.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/IFlushable.java Fri Jul 23 04:57:20 2010 @@ -21,10 +21,11 @@ package org.apache.cassandra.db; */ +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.locks.Condition; public interface IFlushable { - public void flushAndSignal(Condition condition, ExecutorService sorter, ExecutorService writer); + public void flushAndSignal(CountDownLatch condition, ExecutorService sorter, ExecutorService writer); } Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=966964&r1=966963&r2=966964&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Fri Jul 23 04:57:20 2010 @@ -22,9 +22,9 @@ import java.io.IOException; import java.util.*; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.Condition; import com.google.common.collect.Iterators; import com.google.common.collect.PeekingIterator; @@ -37,8 +37,6 @@ import org.apache.cassandra.db.marshal.A import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.io.sstable.SSTableWriter; -import org.apache.cassandra.io.util.DataOutputBuffer; -import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.WrappedRunnable; public class Memtable implements Comparable<Memtable>, IFlushable @@ -158,7 +156,7 @@ public class Memtable implements Compara return ssTable; } - public void flushAndSignal(final Condition condition, ExecutorService sorter, final ExecutorService writer) + public void flushAndSignal(final CountDownLatch latch, ExecutorService sorter, final ExecutorService writer) { cfs.getMemtablesPendingFlush().add(this); // it's ok for the MT to briefly be both active and pendingFlush writer.submit(new WrappedRunnable() @@ -167,7 +165,7 @@ public class Memtable implements Compara { cfs.addSSTable(writeSortedContents()); cfs.getMemtablesPendingFlush().remove(Memtable.this); - condition.signalAll(); + latch.countDown(); } }); }