Author: jbellis
Date: Thu Aug 11 19:29:34 2011
New Revision: 1156757

URL: http://svn.apache.org/viewvc?rev=1156757&view=rev
Log:
make sure pre-truncate CL segments are discarded

Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java
    cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
    
cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java

Modified: cassandra/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1156757&r1=1156756&r2=1156757&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Thu Aug 11 19:29:34 2011
@@ -232,7 +232,7 @@
  * Disable compaction throttling during bootstrap (CASSANDRA-2612) 
  * fix CQL treatment of > and < operators in range slices (CASSANDRA-2592)
  * fix potential double-application of counter updates on commitlog replay
-   (CASSANDRA-2419)
+   by moving replay position from header to sstable metadata (CASSANDRA-2419)
  * JDBC CQL driver exposes getColumn for access to timestamp
  * JDBC ResultSetMetadata properties added to AbstractType
  * r/m clustertool (CASSANDRA-2607)

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1156757&r1=1156756&r2=1156757&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Thu 
Aug 11 19:29:34 2011
@@ -581,6 +581,7 @@ public class ColumnFamilyStore implement
             assert getMemtableThreadSafe() == oldMemtable;
             oldMemtable.freeze();
             final ReplayPosition ctx = writeCommitLog ? 
CommitLog.instance.getContext() : ReplayPosition.NONE;
+            logger.debug("flush position is {}", ctx);
 
             // submit the memtable for any indexed sub-cfses, and our own.
             List<ColumnFamilyStore> icc = new ArrayList<ColumnFamilyStore>();
@@ -1532,6 +1533,37 @@ public class ColumnFamilyStore implement
     }
 
     /**
+     * Waits for flushes started BEFORE THIS METHOD IS CALLED to finish.
+     * Does NOT guarantee that no flush is active when it returns.
+     */
+    private void waitForActiveFlushes()
+    {
+        Future<?> future;
+        Table.switchLock.writeLock().lock();
+        try
+        {
+            future = postFlushExecutor.submit(new Runnable() { public void 
run() { } });
+        }
+        finally
+        {
+            Table.switchLock.writeLock().unlock();
+        }
+
+        try
+        {
+            future.get();
+        }
+        catch (InterruptedException e)
+        {
+            throw new AssertionError(e);
+        }
+        catch (ExecutionException e)
+        {
+            throw new AssertionError(e);
+        }
+    }
+
+    /**
      * Truncate practically deletes the entire column family's data
      * @return a Future to the delete operation. Call the future's get() to 
make
      * sure the column family has been deleted
@@ -1544,14 +1576,33 @@ public class ColumnFamilyStore implement
         // We accomplish this by first flushing manually, then snapshotting, 
and
         // recording the timestamp IN BETWEEN those actions. Any sstables 
created
         // with this timestamp or greater time, will not be marked for delete.
-        try
-        {
-            forceBlockingFlush();
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
+        //
+        // Bonus complication: since we store replay position in sstable 
metadata,
+        // truncating those sstables means we will replay any CL segments from 
the
+        // beginning if we restart before they are discarded for normal reasons
+        // post-truncate.  So we need to (a) force a new segment so the 
currently
+        // active one can be discarded, and (b) flush *all* CFs so that 
unflushed
+        // data in others don't keep any pre-truncate CL segments alive.
+        //
+        // Bonus bonus: simply forceFlush of all the CF is not enough, because 
if
+        // for a given column family the memtable is clean, forceFlush will 
return
+        // immediately, even though there could be a memtable being flush at 
the same
+        // time.  So to guarantee that all segments can be cleaned out, we need
+        // "waitForActiveFlushes" after the new segment has been created.
+        CommitLog.instance.forceNewSegment();
+        waitForActiveFlushes();
+        List<Future<?>> futures = new ArrayList<Future<?>>();
+        ReplayPosition position = CommitLog.instance.getContext();
+        for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
+        {
+            Future<?> f = cfs.forceFlush();
+            if (f != null)
+                futures.add(f);
+        }
+        FBUtilities.waitOnFutures(futures);
+        // if everything was clean, flush won't have called discard
+        CommitLog.instance.discardCompletedSegments(metadata.cfId, position);
+
         // sleep a little to make sure that our truncatedAt comes after any 
sstable
         // that was part of the flushed we forced; otherwise on a tie, it 
won't get deleted.
         try

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java?rev=1156757&r1=1156756&r2=1156757&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java Thu Aug 
11 19:29:34 2011
@@ -81,9 +81,14 @@ public class SystemTable
         ColumnFamily cf = 
table.getColumnFamilyStore(STATUS_CF).getColumnFamily(dotSeven);
         if (cf == null)
         {
-            // upgrading from 0.6 to 0.7.
-            logger.info("Upgrading to 0.7. Purging hints if there are any. Old 
hints will be snapshotted.");
-            new Truncation(Table.SYSTEM_TABLE, 
HintedHandOffManager.HINTS_CF).apply();
+            // 0.7+ marker not found.  Remove hints and add the marker.
+            ColumnFamilyStore hintsCfs = 
Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(HintedHandOffManager.HINTS_CF);
+            if (hintsCfs.getSSTables().size() > 0)
+            {
+                logger.info("Possible 0.6-format hints found. Snapshotting as 
'old-hints' and purging");
+                hintsCfs.snapshot("old-hints");
+                hintsCfs.removeAllSSTables();
+            }
             RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, COOKIE_KEY);
             rm.add(new QueryPath(STATUS_CF, null, hintsPurged6to7), 
ByteBufferUtil.bytes("oh yes, it they were purged."), 
System.currentTimeMillis());
             rm.apply();

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java?rev=1156757&r1=1156756&r2=1156757&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java 
Thu Aug 11 19:29:34 2011
@@ -496,7 +496,6 @@ public class CommitLog implements Commit
         }
     }
 
-    
     void sync() throws IOException
     {
         currentSegment().sync();
@@ -532,6 +531,50 @@ public class CommitLog implements Commit
         return getSize();
     }
 
+    public void forceNewSegment()
+    {
+        Callable<?> task = new Callable()
+        {
+            public Object call() throws IOException
+            {
+                createNewSegment();
+                return null;
+            }
+        };
+
+        try
+        {
+            executor.submit(task).get();
+        }
+        catch (InterruptedException e)
+        {
+            throw new AssertionError(e);
+        }
+        catch (ExecutionException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void createNewSegment() throws IOException
+    {
+        sync();
+        segments.add(new CommitLogSegment());
+
+        // Maintain desired CL size cap
+        if (getSize() >= DatabaseDescriptor.getTotalCommitlogSpaceInMB() * 
1024 * 1024)
+        {
+            // Force a flush on all CFs keeping the oldest segment from being 
removed
+            CommitLogSegment oldestSegment = segments.peek();
+            assert oldestSegment != null; // has to be at least the one we 
just added
+            for (Integer dirtyCFId : oldestSegment.cfLastWrite.keySet())
+            {
+                String keypace = CFMetaData.getCF(dirtyCFId).left;
+                
Table.open(keypace).getColumnFamilyStore(dirtyCFId).forceFlush();
+            }
+        }
+    }
+
     // TODO this should be a Runnable since it doesn't actually return 
anything, but it's difficult to do that
     // without breaking the fragile CheaterFutureTask in BatchCLES.
     class LogRecordAdder implements Callable, Runnable
@@ -550,23 +593,7 @@ public class CommitLog implements Commit
                 currentSegment().write(rowMutation);
                 // roll log if necessary
                 if (currentSegment().length() >= SEGMENT_SIZE)
-                {
-                    sync();
-                    segments.add(new CommitLogSegment());
-
-                    // Maintain desired CL size cap
-                    if (getSize() >= 
DatabaseDescriptor.getTotalCommitlogSpaceInMB() * 1024 * 1024)
-                    {
-                        // Force a flush on all CFs keeping the oldest segment 
from being removed
-                        CommitLogSegment oldestSegment = segments.peek();
-                        assert oldestSegment != null; // has to be at least 
the one we just added
-                        for (Integer dirtyCFId : 
oldestSegment.cfLastWrite.keySet())
-                        {
-                            String keypace = CFMetaData.getCF(dirtyCFId).left;
-                            
Table.open(keypace).getColumnFamilyStore(dirtyCFId).forceFlush();
-                        }
-                    }
-                }
+                    createNewSegment();
             }
             catch (IOException e)
             {

Modified: 
cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java?rev=1156757&r1=1156756&r2=1156757&view=diff
==============================================================================
--- 
cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
 (original)
+++ 
cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
 Thu Aug 11 19:29:34 2011
@@ -73,6 +73,7 @@ public class RecoveryManagerTruncateTest
                rm.apply();
                cfs.forceBlockingFlush();
                cfs.truncate().get();
+        CommitLog.instance.resetUnsafe();
                CommitLog.recover();
                assertNull(getFromTable(table, "Standard1", "keymulti", 
"col1"));
        }


Reply via email to