Author: jbellis
Date: Fri Jul 23 04:57:20 2010
New Revision: 966964

URL: http://svn.apache.org/viewvc?rev=966964&view=rev
Log:
flush index CFs before marking parent CF flushed in commitlog header.  patch by 
jbellis; reviewed by Nate McCall for CASSANDRA-1301

Modified:
    cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/trunk/src/java/org/apache/cassandra/db/IFlushable.java
    cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java?rev=966964&r1=966963&r2=966964&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java Fri 
Jul 23 04:57:20 2010
@@ -20,6 +20,7 @@ package org.apache.cassandra.db;
 
 import java.io.IOException;
 import java.util.*;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Condition;
@@ -78,7 +79,7 @@ public class BinaryMemtable implements I
                 if (!isFrozen)
                 {
                     isFrozen = true;
-                    cfs.submitFlush(this);
+                    cfs.submitFlush(this, new CountDownLatch(1));
                     cfs.switchBinaryMemtable(key, buffer);
                 }
                 else
@@ -134,7 +135,7 @@ public class BinaryMemtable implements I
         return sstable;
     }
 
-    public void flushAndSignal(final Condition condition, ExecutorService 
sorter, final ExecutorService writer)
+    public void flushAndSignal(final CountDownLatch latch, ExecutorService 
sorter, final ExecutorService writer)
     {
         sorter.submit(new Runnable()
         {
@@ -146,7 +147,7 @@ public class BinaryMemtable implements I
                     public void runMayThrow() throws IOException
                     {
                         cfs.addSSTable(writeSortedContents(sortedKeys));
-                        condition.signalAll();
+                        latch.countDown();
                     }
                 });
             }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=966964&r1=966963&r2=966964&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Fri 
Jul 23 04:57:20 2010
@@ -31,6 +31,7 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
 import org.apache.commons.collections.IteratorUtils;
 import org.apache.commons.lang.ArrayUtils;
 import org.slf4j.Logger;
@@ -394,22 +395,29 @@ public class ColumnFamilyStore implement
         try
         {
             if (oldMemtable.isFrozen())
-            {
                 return null;
-            }
-            oldMemtable.freeze();
 
+            assert memtable_ == oldMemtable;
+            memtable_.freeze();
             final CommitLogSegment.CommitLogContext ctx = writeCommitLog ? 
CommitLog.instance().getContext() : null;
-            logger_.info(columnFamily_ + " has reached its threshold; 
switching in a fresh Memtable at " + ctx);
-            final Condition condition = submitFlush(oldMemtable);
-            memtable_ = new Memtable(this, partitioner_);
-            // a second executor that makes sure the onMemtableFlushes get 
called in the right order,
+            logger_.info("switching in a fresh Memtable for " + columnFamily_ 
+ " at " + ctx);
+
+            // submit the memtable for any indexed sub-cfses, and our own
+            final CountDownLatch latch = new CountDownLatch(1 + 
indexedColumns_.size());
+            for (ColumnFamilyStore cfs : 
Iterables.concat(indexedColumns_.values(), Collections.singleton(this)))
+            {
+                submitFlush(cfs.memtable_, latch);
+                cfs.memtable_ = new Memtable(cfs, cfs.partitioner_);
+            }
+
+            // when all the memtables have been written, including for 
indexes, mark the flush in the commitlog header.
+            // a second executor makes sure the onMemtableFlushes get called 
in the right order,
             // while keeping the wait-for-flush (future.get) out of anything 
latency-sensitive.
             return commitLogUpdater_.submit(new WrappedRunnable()
             {
                 public void runMayThrow() throws InterruptedException, 
IOException
                 {
-                    condition.await();
+                    latch.await();
                     if (writeCommitLog)
                     {
                         // if we're not writing to the commit log, we are 
replaying the log, so marking
@@ -463,7 +471,7 @@ public class ColumnFamilyStore implement
         if (binaryMemtable_.get().isClean())
             return;
 
-        submitFlush(binaryMemtable_.get());
+        submitFlush(binaryMemtable_.get(), new CountDownLatch(1));
     }
 
     /**
@@ -674,12 +682,10 @@ public class ColumnFamilyStore implement
      * flushing thread finishes sorting, which will almost always be longer 
than any of the flushSorter threads proper
      * (since, by definition, it started last).
      */
-    Condition submitFlush(IFlushable flushable)
+    void submitFlush(IFlushable flushable, CountDownLatch latch)
     {
         logger_.info("Enqueuing flush of {}", flushable);
-        final Condition condition = new SimpleCondition();
-        flushable.flushAndSignal(condition, flushSorter_, flushWriter_);
-        return condition;
+        flushable.flushAndSignal(latch, flushSorter_, flushWriter_);
     }
 
     public int getMemtableColumnsCount()

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/IFlushable.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/IFlushable.java?rev=966964&r1=966963&r2=966964&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/IFlushable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/IFlushable.java Fri Jul 23 
04:57:20 2010
@@ -21,10 +21,11 @@ package org.apache.cassandra.db;
  */
 
 
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.locks.Condition;
 
 public interface IFlushable
 {
-    public void flushAndSignal(Condition condition, ExecutorService sorter, 
ExecutorService writer);
+    public void flushAndSignal(CountDownLatch condition, ExecutorService 
sorter, ExecutorService writer);
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=966964&r1=966963&r2=966964&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Fri Jul 23 
04:57:20 2010
@@ -22,9 +22,9 @@ import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Condition;
 
 import com.google.common.collect.Iterators;
 import com.google.common.collect.PeekingIterator;
@@ -37,8 +37,6 @@ import org.apache.cassandra.db.marshal.A
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.sstable.SSTableWriter;
-import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.WrappedRunnable;
 
 public class Memtable implements Comparable<Memtable>, IFlushable
@@ -158,7 +156,7 @@ public class Memtable implements Compara
         return ssTable;
     }
 
-    public void flushAndSignal(final Condition condition, ExecutorService 
sorter, final ExecutorService writer)
+    public void flushAndSignal(final CountDownLatch latch, ExecutorService 
sorter, final ExecutorService writer)
     {
         cfs.getMemtablesPendingFlush().add(this); // it's ok for the MT to 
briefly be both active and pendingFlush
         writer.submit(new WrappedRunnable()
@@ -167,7 +165,7 @@ public class Memtable implements Compara
             {
                 cfs.addSSTable(writeSortedContents());
                 cfs.getMemtablesPendingFlush().remove(Memtable.this);
-                condition.signalAll();
+                latch.countDown();
             }
         });
     }


Reply via email to