avoid flushing everyone on truncate; save truncation position in system table 
instead
patch by jbellis; reviewed by yukim for CASSANDRA-4906


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/38bfc6dc
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/38bfc6dc
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/38bfc6dc

Branch: refs/heads/cassandra-1.2
Commit: 38bfc6dca06bd0192167ae5e9bd51d593542f03e
Parents: 93f8fec
Author: Jonathan Ellis <jbel...@apache.org>
Authored: Sat Oct 27 11:18:31 2012 -0700
Committer: Jonathan Ellis <jbel...@apache.org>
Committed: Mon Nov 12 15:05:36 2012 -0600

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../org/apache/cassandra/config/CFMetaData.java    |    3 +-
 .../apache/cassandra/cql3/UntypedResultSet.java    |    9 ++-
 .../org/apache/cassandra/db/ColumnFamilyStore.java |   97 ++++-----------
 src/java/org/apache/cassandra/db/SystemTable.java  |   64 ++++++++++
 .../cassandra/db/commitlog/CommitLogReplayer.java  |   21 ++--
 .../cassandra/db/compaction/CompactionManager.java |    5 +-
 7 files changed, 118 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/38bfc6dc/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ff79b9a..9ac6227 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.2-rc1
+ * save truncation position in system table (CASSANDRA-4906)
  * Move CompressionMetadata off-heap (CASSANDRA-4937)
  * allow CLI to GET cql3 columnfamily data (CASSANDRA-4924)
  * Fix rare race condition in getExpireTimeForEndpoint (CASSANDRA-4402)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/38bfc6dc/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java 
b/src/java/org/apache/cassandra/config/CFMetaData.java
index 921242a..5f0e93a 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -181,7 +181,8 @@ public final class CFMetaData
                                                          + "thrift_version 
text,"
                                                          + "cql_version text,"
                                                          + "data_center text,"
-                                                         + "rack text"
+                                                         + "rack text,"
+                                                         + "truncated_at 
map<uuid, blob>"
                                                          + ") WITH 
COMMENT='information about the local node'");
 
     public static final CFMetaData TraceSessionsCf = compile(14, "CREATE TABLE 
" + Tracing.SESSIONS_CF + " ("

http://git-wip-us.apache.org/repos/asf/cassandra/blob/38bfc6dc/src/java/org/apache/cassandra/cql3/UntypedResultSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/UntypedResultSet.java 
b/src/java/org/apache/cassandra/cql3/UntypedResultSet.java
index ca3acf5..b6fcb55 100644
--- a/src/java/org/apache/cassandra/cql3/UntypedResultSet.java
+++ b/src/java/org/apache/cassandra/cql3/UntypedResultSet.java
@@ -133,7 +133,14 @@ public class UntypedResultSet implements 
Iterable<UntypedResultSet.Row>
 
         public <T> Set<T> getSet(String column, AbstractType<T> type)
         {
-            return SetType.getInstance(type).compose(data.get(column));
+            ByteBuffer raw = data.get(column);
+            return raw == null ? null : SetType.getInstance(type).compose(raw);
+        }
+
+        public <K, V> Map<K, V> getMap(String column, AbstractType<K> keyType, 
AbstractType<V> valueType)
+        {
+            ByteBuffer raw = data.get(column);
+            return raw == null ? null : MapType.getInstance(keyType, 
valueType).compose(raw);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/38bfc6dc/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index a91af8c..439ef5f 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -1720,38 +1720,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
     }
 
     /**
-     * Waits for flushes started BEFORE THIS METHOD IS CALLED to finish.
-     * Does NOT guarantee that no flush is active when it returns.
-     */
-    private void waitForActiveFlushes()
-    {
-        Future<?> future;
-        Table.switchLock.writeLock().lock();
-        try
-        {
-            future = postFlushExecutor.submit(new Runnable() { public void 
run() { } });
-        }
-        finally
-        {
-            Table.switchLock.writeLock().unlock();
-        }
-
-        try
-        {
-            future.get();
-        }
-        catch (InterruptedException e)
-        {
-            throw new AssertionError(e);
-        }
-        catch (ExecutionException e)
-        {
-            throw new AssertionError(e);
-        }
-    }
-
-    /**
-     * Truncate practically deletes the entire column family's data
+     * Truncate deletes the entire column family's data with no expensive 
tombstone creation
      * @return a Future to the delete operation. Call the future's get() to 
make
      * sure the column family has been deleted
      */
@@ -1767,21 +1736,29 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         // Bonus complication: since we store replay position in sstable 
metadata,
         // truncating those sstables means we will replay any CL segments from 
the
         // beginning if we restart before they are discarded for normal reasons
-        // post-truncate.  So we need to (a) force a new segment so the 
currently
-        // active one can be discarded, and (b) flush *all* CFs so that 
unflushed
-        // data in others don't keep any pre-truncate CL segments alive.
-        //
-        // Bonus bonus: simply forceFlush of all the CF is not enough, because 
if
-        // for a given column family the memtable is clean, forceFlush will 
return
-        // immediately, even though there could be a memtable being flushed at 
the same
-        // time.  So to guarantee that all segments can be cleaned out, we 
need to
-        // "waitForActiveFlushes" after the new segment has been created.
+        // post-truncate.  So we need to create a "dummy" sstable containing
+        // only the replay position.  This is done by 
CompactionManager.submitTruncate.
         logger.debug("truncating {}", columnFamily);
 
         if (DatabaseDescriptor.isAutoSnapshot())
         {
             // flush the CF being truncated before forcing the new segment
             forceBlockingFlush();
+
+            // sleep a little to make sure that our truncatedAt comes after 
any sstable
+            // that was part of the flushed we forced; otherwise on a tie, it 
won't get deleted.
+            try
+            {
+                long starttime = System.currentTimeMillis();
+                while ((System.currentTimeMillis() - starttime) < 1)
+                {
+                    Thread.sleep(1);
+                }
+            }
+            catch (InterruptedException e)
+            {
+                throw new AssertionError(e);
+            }
         }
         else
         {
@@ -1804,33 +1781,6 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
             }
         }
 
-        KSMetaData ksm = Schema.instance.getKSMetaData(this.table.name);
-        if (ksm.durableWrites)
-        {
-            CommitLog.instance.forceNewSegment();
-            Future<ReplayPosition> position = CommitLog.instance.getContext();
-            // now flush everyone else.  re-flushing ourselves is not 
necessary, but harmless
-            for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
-                cfs.forceFlush();
-            waitForActiveFlushes();
-            // if everything was clean, flush won't have called discard
-            CommitLog.instance.discardCompletedSegments(metadata.cfId, 
position.get());
-        }
-
-        // sleep a little to make sure that our truncatedAt comes after any 
sstable
-        // that was part of the flushed we forced; otherwise on a tie, it 
won't get deleted.
-        try
-        {
-            long starttime = System.currentTimeMillis();
-            while ((System.currentTimeMillis() - starttime) < 1)
-            {
-                Thread.sleep(1);
-            }
-        }
-        catch (InterruptedException e)
-        {
-            throw new AssertionError(e);
-        }
         long truncatedAt = System.currentTimeMillis();
         if (DatabaseDescriptor.isAutoSnapshot())
             snapshot(Table.getTimestampedSnapshotName(columnFamily));
@@ -2093,8 +2043,10 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
      *
      * @param truncatedAt The timestamp of the truncation
      *                    (all SSTables before that timestamp are going be 
marked as compacted)
+     *
+     * @return the most recent replay position of the truncated data
      */
-    public void discardSSTables(long truncatedAt)
+    public ReplayPosition discardSSTables(long truncatedAt)
     {
         List<SSTableReader> truncatedSSTables = new ArrayList<SSTableReader>();
 
@@ -2104,7 +2056,10 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
                 truncatedSSTables.add(sstable);
         }
 
-        if (!truncatedSSTables.isEmpty())
-            markCompacted(truncatedSSTables, OperationType.UNKNOWN);
+        if (truncatedSSTables.isEmpty())
+            return ReplayPosition.NONE;
+
+        markCompacted(truncatedSSTables, OperationType.UNKNOWN);
+        return ReplayPosition.getReplayPosition(truncatedSSTables);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/38bfc6dc/src/java/org/apache/cassandra/db/SystemTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemTable.java 
b/src/java/org/apache/cassandra/db/SystemTable.java
index a14ba58..34f7096 100644
--- a/src/java/org/apache/cassandra/db/SystemTable.java
+++ b/src/java/org/apache/cassandra/db/SystemTable.java
@@ -17,6 +17,9 @@
  */
 package org.apache.cassandra.db;
 
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
@@ -29,8 +32,11 @@ import com.google.common.collect.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.avro.ipc.ByteBufferInputStream;
 import org.apache.cassandra.config.KSMetaData;
 import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.marshal.UUIDType;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.QueryProcessor;
@@ -43,12 +49,15 @@ import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.FastByteArrayOutputStream;
 import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.thrift.Constants;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.CounterId;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
 
 import static org.apache.cassandra.cql3.QueryProcessor.processInternal;
 
@@ -178,6 +187,61 @@ public class SystemTable
         }
     }
 
+    public static void saveTruncationPosition(ColumnFamilyStore cfs, 
ReplayPosition position)
+    {
+        String req = "UPDATE system.%s SET truncated_at = truncated_at + %s 
WHERE key = '%s'";
+        processInternal(String.format(req, LOCAL_CF, positionAsMapEntry(cfs, 
position), LOCAL_KEY));
+        forceBlockingFlush(LOCAL_CF);
+    }
+
+    private static String positionAsMapEntry(ColumnFamilyStore cfs, 
ReplayPosition position)
+    {
+        DataOutputBuffer out = new DataOutputBuffer();
+        try
+        {
+            ReplayPosition.serializer.serialize(position, out);
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+        return String.format("{'%s': '%s'}",
+                             cfs.metadata.cfId,
+                             
ByteBufferUtil.bytesToHex(ByteBuffer.wrap(out.getData(), 0, out.getLength())));
+    }
+
+    public static Map<UUID, ReplayPosition> getTruncationPositions()
+    {
+        String req = "SELECT truncated_at FROM system.%s WHERE key = '%s'";
+        UntypedResultSet rows = processInternal(String.format(req, LOCAL_CF, 
LOCAL_KEY));
+        if (rows.isEmpty())
+            return Collections.emptyMap();
+
+        UntypedResultSet.Row row = rows.one();
+        Map<UUID, ByteBuffer> rawMap = row.getMap("truncated_at", 
UUIDType.instance, BytesType.instance);
+        if (rawMap == null)
+            return Collections.emptyMap();
+
+        Map<UUID, ReplayPosition> positions = new HashMap<UUID, 
ReplayPosition>();
+        for (Map.Entry<UUID, ByteBuffer> entry : rawMap.entrySet())
+        {
+            positions.put(entry.getKey(), positionFromBlob(entry.getValue()));
+        }
+        return positions;
+    }
+
+    private static ReplayPosition positionFromBlob(ByteBuffer bytes)
+    {
+        try
+        {
+            return ReplayPosition.serializer.deserialize(new 
DataInputStream(ByteBufferUtil.inputStream(bytes)));
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
     /**
      * Record tokens being used by another node
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/38bfc6dc/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java 
b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index 9bc0179..9f949d0 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -35,11 +35,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.RowMutation;
-import org.apache.cassandra.db.Table;
-import org.apache.cassandra.db.UnknownColumnFamilyException;
+import org.apache.cassandra.db.*;
 import org.apache.cassandra.io.IColumnSerializer;
 import org.apache.cassandra.io.util.FastByteArrayInputStream;
 import org.apache.cassandra.io.util.FileUtils;
@@ -72,18 +68,27 @@ public class CommitLogReplayer
         this.invalidMutations = new HashMap<UUID, AtomicInteger>();
         // count the number of replayed mutation. We don't really care about 
atomicity, but we need it to be a reference.
         this.replayedCount = new AtomicInteger();
+        this.checksum = new PureJavaCrc32();
+
         // compute per-CF and global replay positions
-        this.cfPositions = new HashMap<UUID, ReplayPosition>();
+        cfPositions = new HashMap<UUID, ReplayPosition>();
+        Ordering<ReplayPosition> replayPositionOrdering = 
Ordering.from(ReplayPosition.comparator);
+        Map<UUID, ReplayPosition> truncationPositions = 
SystemTable.getTruncationPositions();
         for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
         {
             // it's important to call RP.gRP per-cf, before aggregating all 
the positions w/ the Ordering.min call
             // below: gRP will return NONE if there are no flushed sstables, 
which is important to have in the
             // list (otherwise we'll just start replay from the first flush 
position that we do have, which is not correct).
             ReplayPosition rp = 
ReplayPosition.getReplayPosition(cfs.getSSTables());
+
+            // but, if we've truncted the cf in question, then we need to need 
to start replay after the truncation
+            ReplayPosition truncatedAt = 
truncationPositions.get(cfs.metadata.cfId);
+            if (truncatedAt != null)
+                rp = replayPositionOrdering.max(Arrays.asList(rp, 
truncatedAt));
+
             cfPositions.put(cfs.metadata.cfId, rp);
         }
-        this.globalPosition = 
Ordering.from(ReplayPosition.comparator).min(cfPositions.values());
-        this.checksum = new PureJavaCrc32();
+        globalPosition = replayPositionOrdering.min(cfPositions.values());
     }
 
     public void recover(File[] clogs) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/38bfc6dc/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 2d06036..4399948 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -44,6 +44,7 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.compaction.CompactionInfo.Holder;
 import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.db.index.SecondaryIndexBuilder;
@@ -861,11 +862,13 @@ public class CompactionManager implements 
CompactionManagerMBean
 
                 try
                 {
-                    main.discardSSTables(truncatedAt);
+                    ReplayPosition replayAfter = 
main.discardSSTables(truncatedAt);
 
                     for (SecondaryIndex index : main.indexManager.getIndexes())
                         index.truncate(truncatedAt);
 
+                    SystemTable.saveTruncationPosition(main, replayAfter);
+
                     for (RowCacheKey key : 
CacheService.instance.rowCache.getKeySet())
                     {
                         if (key.cfId == main.metadata.cfId)

Reply via email to