Updated Branches: refs/heads/cassandra-1.2 c4c9626f8 -> 44c462cb7 refs/heads/trunk 66aadda2e -> 19047b894
Ignore pre-truncate hints patch by Alexey Zotov and jbellis; reviewed by vijay for CASSANDRA-4655 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c4c9626f Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c4c9626f Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c4c9626f Branch: refs/heads/trunk Commit: c4c9626f8b3ba941aa51860a7d10a6e686362f85 Parents: 49f220c Author: Jonathan Ellis <jbel...@apache.org> Authored: Thu May 23 10:04:04 2013 -0500 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Thu May 23 10:08:17 2013 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/db/ColumnFamilyStore.java | 7 +++ .../apache/cassandra/db/HintedHandOffManager.java | 25 ++++++++++ src/java/org/apache/cassandra/db/RowMutation.java | 9 ++++ src/java/org/apache/cassandra/db/SystemTable.java | 35 ++++++++++----- .../cassandra/db/commitlog/CommitLogReplayer.java | 10 ++--- .../cassandra/db/compaction/CompactionManager.java | 2 +- 7 files changed, 71 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4c9626f/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 6f1127a..66c5f04 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 1.2.6 + * Ignore pre-truncate hints (CASSANDRA-4655) * Move System.exit on OOM into a separate thread (CASSANDRA-5273) * Write row markers when serializing schema (CASSANDRA-5572) * Check only SSTables for the requested range when streaming (CASSANDRA-5569) http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4c9626f/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 055c415..429859e 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -298,6 +298,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean valid = false; unregisterMBean(); + SystemTable.removeTruncationRecord(metadata.cfId); data.unreferenceSSTables(); indexManager.invalidate(); } @@ -2077,4 +2078,10 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean { return getDataTracker().getDroppableTombstoneRatio(); } + + public long getTruncationTime() + { + Pair<ReplayPosition, Long> truncationRecord = SystemTable.getTruncationRecords().get(metadata.cfId); + return truncationRecord == null ? Long.MIN_VALUE : truncationRecord.right; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4c9626f/src/java/org/apache/cassandra/db/HintedHandOffManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java index 53411f5..9346fb3 100644 --- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java +++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java @@ -30,6 +30,7 @@ import javax.management.MBeanServer; import javax.management.ObjectName; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSortedSet; import com.google.common.collect.Lists; import com.google.common.util.concurrent.RateLimiter; @@ -365,6 +366,30 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean throw new AssertionError(e); } + Map<UUID, Long> truncationTimesCache = new HashMap<UUID, Long>(); + for (UUID cfId : ImmutableSet.copyOf((rm.getColumnFamilyIds()))) + { + Long truncatedAt = truncationTimesCache.get(cfId); + if (truncatedAt == null) + { + ColumnFamilyStore cfs = Table.open(rm.getTable()).getColumnFamilyStore(cfId); + truncatedAt = cfs.getTruncationTime(); + truncationTimesCache.put(cfId, truncatedAt); + } + + if (hint.maxTimestamp() < truncatedAt) + { + logger.debug("Skipping delivery of hint for truncated columnfamily {}" + cfId); + rm = rm.without(cfId); + } + } + + if (rm.isEmpty()) + { + deleteHint(hostIdBytes, hint.name(), hint.maxTimestamp()); + continue; + } + MessageOut<RowMutation> message = rm.createMessage(); rateLimiter.acquire(message.serializedSize(MessagingService.current_version)); Runnable callback = new Runnable() http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4c9626f/src/java/org/apache/cassandra/db/RowMutation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RowMutation.java b/src/java/org/apache/cassandra/db/RowMutation.java index 826f3e0..b85cfcd 100644 --- a/src/java/org/apache/cassandra/db/RowMutation.java +++ b/src/java/org/apache/cassandra/db/RowMutation.java @@ -355,6 +355,15 @@ public class RowMutation implements IMutation } } + public RowMutation without(UUID cfId) + { + RowMutation rm = new RowMutation(table, key); + for (Map.Entry<UUID, ColumnFamily> entry : modifications.entrySet()) + if (!entry.getKey().equals(cfId)) + rm.add(entry.getValue()); + return rm; + } + public static class RowMutationSerializer implements IVersionedSerializer<RowMutation> { public void serialize(RowMutation rm, DataOutput dos, int version) throws IOException http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4c9626f/src/java/org/apache/cassandra/db/SystemTable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SystemTable.java b/src/java/org/apache/cassandra/db/SystemTable.java index 2e36aeb..327f01b 100644 --- a/src/java/org/apache/cassandra/db/SystemTable.java +++ b/src/java/org/apache/cassandra/db/SystemTable.java @@ -47,7 +47,10 @@ import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.locator.IEndpointSnitch; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.thrift.Constants; -import org.apache.cassandra.utils.*; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.CounterId; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.Pair; import static org.apache.cassandra.cql3.QueryProcessor.processInternal; @@ -178,19 +181,30 @@ public class SystemTable } } - public static void saveTruncationPosition(ColumnFamilyStore cfs, ReplayPosition position) + public static void saveTruncationRecord(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position) { String req = "UPDATE system.%s SET truncated_at = truncated_at + %s WHERE key = '%s'"; - processInternal(String.format(req, LOCAL_CF, positionAsMapEntry(cfs, position), LOCAL_KEY)); + processInternal(String.format(req, LOCAL_CF, truncationAsMapEntry(cfs, truncatedAt, position), LOCAL_KEY)); forceBlockingFlush(LOCAL_CF); } - private static String positionAsMapEntry(ColumnFamilyStore cfs, ReplayPosition position) + /** + * This method is used to remove information about truncation time for specified column family + */ + public static void removeTruncationRecord(UUID cfId) + { + String req = "DELETE truncation_time['%s'] from system.%s WHERE key = '%s'"; + processInternal(String.format(req, cfId, LOCAL_CF, LOCAL_KEY)); + forceBlockingFlush(LOCAL_CF); + } + + private static String truncationAsMapEntry(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position) { DataOutputBuffer out = new DataOutputBuffer(); try { ReplayPosition.serializer.serialize(position, out); + out.writeLong(truncatedAt); } catch (IOException e) { @@ -201,7 +215,7 @@ public class SystemTable ByteBufferUtil.bytesToHex(ByteBuffer.wrap(out.getData(), 0, out.getLength()))); } - public static Map<UUID, ReplayPosition> getTruncationPositions() + public static Map<UUID, Pair<ReplayPosition, Long>> getTruncationRecords() { String req = "SELECT truncated_at FROM system.%s WHERE key = '%s'"; UntypedResultSet rows = processInternal(String.format(req, LOCAL_CF, LOCAL_KEY)); @@ -213,19 +227,18 @@ public class SystemTable if (rawMap == null) return Collections.emptyMap(); - Map<UUID, ReplayPosition> positions = new HashMap<UUID, ReplayPosition>(); + Map<UUID, Pair<ReplayPosition, Long>> positions = new HashMap<UUID, Pair<ReplayPosition, Long>>(); for (Map.Entry<UUID, ByteBuffer> entry : rawMap.entrySet()) - { - positions.put(entry.getKey(), positionFromBlob(entry.getValue())); - } + positions.put(entry.getKey(), truncationRecordFromBlob(entry.getValue())); return positions; } - private static ReplayPosition positionFromBlob(ByteBuffer bytes) + private static Pair<ReplayPosition, Long> truncationRecordFromBlob(ByteBuffer bytes) { try { - return ReplayPosition.serializer.deserialize(new DataInputStream(ByteBufferUtil.inputStream(bytes))); + DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(bytes)); + return Pair.create(ReplayPosition.serializer.deserialize(in), in.available() > 0 ? in.readLong() : Long.MIN_VALUE); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4c9626f/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java index 2728970..e1fefa1 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java @@ -40,10 +40,8 @@ import org.apache.cassandra.io.IColumnSerializer; import org.apache.cassandra.io.util.FastByteArrayInputStream; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.io.util.RandomAccessReader; -import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.PureJavaCrc32; -import org.apache.cassandra.utils.WrappedRunnable; +import org.apache.cassandra.utils.*; + import org.cliffc.high_scale_lib.NonBlockingHashSet; public class CommitLogReplayer @@ -73,7 +71,7 @@ public class CommitLogReplayer // compute per-CF and global replay positions cfPositions = new HashMap<UUID, ReplayPosition>(); Ordering<ReplayPosition> replayPositionOrdering = Ordering.from(ReplayPosition.comparator); - Map<UUID, ReplayPosition> truncationPositions = SystemTable.getTruncationPositions(); + Map<UUID,Pair<ReplayPosition,Long>> truncationPositions = SystemTable.getTruncationRecords(); for (ColumnFamilyStore cfs : ColumnFamilyStore.all()) { // it's important to call RP.gRP per-cf, before aggregating all the positions w/ the Ordering.min call @@ -82,7 +80,7 @@ public class CommitLogReplayer ReplayPosition rp = ReplayPosition.getReplayPosition(cfs.getSSTables()); // but, if we've truncted the cf in question, then we need to need to start replay after the truncation - ReplayPosition truncatedAt = truncationPositions.get(cfs.metadata.cfId); + ReplayPosition truncatedAt = truncationPositions.get(cfs.metadata.cfId).left; if (truncatedAt != null) rp = replayPositionOrdering.max(Arrays.asList(rp, truncatedAt)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4c9626f/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 7ebbc7c..c9e1b79 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -866,7 +866,7 @@ public class CompactionManager implements CompactionManagerMBean for (SecondaryIndex index : main.indexManager.getIndexes()) index.truncate(truncatedAt); - SystemTable.saveTruncationPosition(main, replayAfter); + SystemTable.saveTruncationRecord(main, truncatedAt, replayAfter); for (RowCacheKey key : CacheService.instance.rowCache.getKeySet()) {