Ignore pre-truncate hints
patch by Alexey Zotov and jbellis; reviewed by vijay for CASSANDRA-4655


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c4c9626f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c4c9626f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c4c9626f

Branch: refs/heads/cassandra-1.2
Commit: c4c9626f8b3ba941aa51860a7d10a6e686362f85
Parents: 49f220c
Author: Jonathan Ellis <jbel...@apache.org>
Authored: Thu May 23 10:04:04 2013 -0500
Committer: Jonathan Ellis <jbel...@apache.org>
Committed: Thu May 23 10:08:17 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../org/apache/cassandra/db/ColumnFamilyStore.java |    7 +++
 .../apache/cassandra/db/HintedHandOffManager.java  |   25 ++++++++++
 src/java/org/apache/cassandra/db/RowMutation.java  |    9 ++++
 src/java/org/apache/cassandra/db/SystemTable.java  |   35 ++++++++++-----
 .../cassandra/db/commitlog/CommitLogReplayer.java  |   10 ++---
 .../cassandra/db/compaction/CompactionManager.java |    2 +-
 7 files changed, 71 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4c9626f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6f1127a..66c5f04 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.2.6
+ * Ignore pre-truncate hints (CASSANDRA-4655)
  * Move System.exit on OOM into a separate thread (CASSANDRA-5273)
  * Write row markers when serializing schema (CASSANDRA-5572)
  * Check only SSTables for the requested range when streaming (CASSANDRA-5569)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4c9626f/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 055c415..429859e 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -298,6 +298,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
             valid = false;
             unregisterMBean();
 
+            SystemTable.removeTruncationRecord(metadata.cfId);
             data.unreferenceSSTables();
             indexManager.invalidate();
         }
@@ -2077,4 +2078,10 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
     {
         return getDataTracker().getDroppableTombstoneRatio();
     }
+
+    public long getTruncationTime()
+    {
+        Pair<ReplayPosition, Long> truncationRecord = 
SystemTable.getTruncationRecords().get(metadata.cfId);
+        return truncationRecord == null ? Long.MIN_VALUE : 
truncationRecord.right;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4c9626f/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java 
b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
index 53411f5..9346fb3 100644
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@ -30,6 +30,7 @@ import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.ImmutableSortedSet;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.RateLimiter;
@@ -365,6 +366,30 @@ public class HintedHandOffManager implements 
HintedHandOffManagerMBean
                     throw new AssertionError(e);
                 }
 
+                Map<UUID, Long> truncationTimesCache = new HashMap<UUID, 
Long>();
+                for (UUID cfId : 
ImmutableSet.copyOf((rm.getColumnFamilyIds())))
+                {
+                    Long truncatedAt = truncationTimesCache.get(cfId);
+                    if (truncatedAt == null)
+                    {
+                        ColumnFamilyStore cfs = 
Table.open(rm.getTable()).getColumnFamilyStore(cfId);
+                        truncatedAt = cfs.getTruncationTime();
+                        truncationTimesCache.put(cfId, truncatedAt);
+                    }
+
+                    if (hint.maxTimestamp() < truncatedAt)
+                    {
+                        logger.debug("Skipping delivery of hint for truncated 
columnfamily {}" + cfId);
+                        rm = rm.without(cfId);
+                    }
+                }
+
+                if (rm.isEmpty())
+                {
+                    deleteHint(hostIdBytes, hint.name(), hint.maxTimestamp());
+                    continue;
+                }
+
                 MessageOut<RowMutation> message = rm.createMessage();
                 
rateLimiter.acquire(message.serializedSize(MessagingService.current_version));
                 Runnable callback = new Runnable()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4c9626f/src/java/org/apache/cassandra/db/RowMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowMutation.java 
b/src/java/org/apache/cassandra/db/RowMutation.java
index 826f3e0..b85cfcd 100644
--- a/src/java/org/apache/cassandra/db/RowMutation.java
+++ b/src/java/org/apache/cassandra/db/RowMutation.java
@@ -355,6 +355,15 @@ public class RowMutation implements IMutation
         }
     }
 
+    public RowMutation without(UUID cfId)
+    {
+        RowMutation rm = new RowMutation(table, key);
+        for (Map.Entry<UUID, ColumnFamily> entry : modifications.entrySet())
+            if (!entry.getKey().equals(cfId))
+                rm.add(entry.getValue());
+        return rm;
+    }
+
     public static class RowMutationSerializer implements 
IVersionedSerializer<RowMutation>
     {
         public void serialize(RowMutation rm, DataOutput dos, int version) 
throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4c9626f/src/java/org/apache/cassandra/db/SystemTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemTable.java 
b/src/java/org/apache/cassandra/db/SystemTable.java
index 2e36aeb..327f01b 100644
--- a/src/java/org/apache/cassandra/db/SystemTable.java
+++ b/src/java/org/apache/cassandra/db/SystemTable.java
@@ -47,7 +47,10 @@ import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.thrift.Constants;
-import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.CounterId;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
 
 import static org.apache.cassandra.cql3.QueryProcessor.processInternal;
 
@@ -178,19 +181,30 @@ public class SystemTable
         }
     }
 
-    public static void saveTruncationPosition(ColumnFamilyStore cfs, 
ReplayPosition position)
+    public static void saveTruncationRecord(ColumnFamilyStore cfs, long 
truncatedAt, ReplayPosition position)
     {
         String req = "UPDATE system.%s SET truncated_at = truncated_at + %s 
WHERE key = '%s'";
-        processInternal(String.format(req, LOCAL_CF, positionAsMapEntry(cfs, 
position), LOCAL_KEY));
+        processInternal(String.format(req, LOCAL_CF, truncationAsMapEntry(cfs, 
truncatedAt, position), LOCAL_KEY));
         forceBlockingFlush(LOCAL_CF);
     }
 
-    private static String positionAsMapEntry(ColumnFamilyStore cfs, 
ReplayPosition position)
+    /**
+     * This method is used to remove information about truncation time for 
specified column family
+     */
+    public static void removeTruncationRecord(UUID cfId)
+    {
+        String req = "DELETE truncation_time['%s'] from system.%s WHERE key = 
'%s'";
+        processInternal(String.format(req, cfId, LOCAL_CF, LOCAL_KEY));
+        forceBlockingFlush(LOCAL_CF);
+    }
+
+    private static String truncationAsMapEntry(ColumnFamilyStore cfs, long 
truncatedAt, ReplayPosition position)
     {
         DataOutputBuffer out = new DataOutputBuffer();
         try
         {
             ReplayPosition.serializer.serialize(position, out);
+            out.writeLong(truncatedAt);
         }
         catch (IOException e)
         {
@@ -201,7 +215,7 @@ public class SystemTable
                              
ByteBufferUtil.bytesToHex(ByteBuffer.wrap(out.getData(), 0, out.getLength())));
     }
 
-    public static Map<UUID, ReplayPosition> getTruncationPositions()
+    public static Map<UUID, Pair<ReplayPosition, Long>> getTruncationRecords()
     {
         String req = "SELECT truncated_at FROM system.%s WHERE key = '%s'";
         UntypedResultSet rows = processInternal(String.format(req, LOCAL_CF, 
LOCAL_KEY));
@@ -213,19 +227,18 @@ public class SystemTable
         if (rawMap == null)
             return Collections.emptyMap();
 
-        Map<UUID, ReplayPosition> positions = new HashMap<UUID, 
ReplayPosition>();
+        Map<UUID, Pair<ReplayPosition, Long>> positions = new HashMap<UUID, 
Pair<ReplayPosition, Long>>();
         for (Map.Entry<UUID, ByteBuffer> entry : rawMap.entrySet())
-        {
-            positions.put(entry.getKey(), positionFromBlob(entry.getValue()));
-        }
+            positions.put(entry.getKey(), 
truncationRecordFromBlob(entry.getValue()));
         return positions;
     }
 
-    private static ReplayPosition positionFromBlob(ByteBuffer bytes)
+    private static Pair<ReplayPosition, Long> 
truncationRecordFromBlob(ByteBuffer bytes)
     {
         try
         {
-            return ReplayPosition.serializer.deserialize(new 
DataInputStream(ByteBufferUtil.inputStream(bytes)));
+            DataInputStream in = new 
DataInputStream(ByteBufferUtil.inputStream(bytes));
+            return Pair.create(ReplayPosition.serializer.deserialize(in), 
in.available() > 0 ? in.readLong() : Long.MIN_VALUE);
         }
         catch (IOException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4c9626f/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java 
b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index 2728970..e1fefa1 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -40,10 +40,8 @@ import org.apache.cassandra.io.IColumnSerializer;
 import org.apache.cassandra.io.util.FastByteArrayInputStream;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.RandomAccessReader;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.PureJavaCrc32;
-import org.apache.cassandra.utils.WrappedRunnable;
+import org.apache.cassandra.utils.*;
+
 import org.cliffc.high_scale_lib.NonBlockingHashSet;
 
 public class CommitLogReplayer
@@ -73,7 +71,7 @@ public class CommitLogReplayer
         // compute per-CF and global replay positions
         cfPositions = new HashMap<UUID, ReplayPosition>();
         Ordering<ReplayPosition> replayPositionOrdering = 
Ordering.from(ReplayPosition.comparator);
-        Map<UUID, ReplayPosition> truncationPositions = 
SystemTable.getTruncationPositions();
+        Map<UUID,Pair<ReplayPosition,Long>> truncationPositions = 
SystemTable.getTruncationRecords();
         for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
         {
             // it's important to call RP.gRP per-cf, before aggregating all 
the positions w/ the Ordering.min call
@@ -82,7 +80,7 @@ public class CommitLogReplayer
             ReplayPosition rp = 
ReplayPosition.getReplayPosition(cfs.getSSTables());
 
             // but, if we've truncted the cf in question, then we need to need 
to start replay after the truncation
-            ReplayPosition truncatedAt = 
truncationPositions.get(cfs.metadata.cfId);
+            ReplayPosition truncatedAt = 
truncationPositions.get(cfs.metadata.cfId).left;
             if (truncatedAt != null)
                 rp = replayPositionOrdering.max(Arrays.asList(rp, 
truncatedAt));
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4c9626f/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 7ebbc7c..c9e1b79 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -866,7 +866,7 @@ public class CompactionManager implements 
CompactionManagerMBean
                     for (SecondaryIndex index : main.indexManager.getIndexes())
                         index.truncate(truncatedAt);
 
-                    SystemTable.saveTruncationPosition(main, replayAfter);
+                    SystemTable.saveTruncationRecord(main, truncatedAt, 
replayAfter);
 
                     for (RowCacheKey key : 
CacheService.instance.rowCache.getKeySet())
                     {

Reply via email to