avoid flushing everyone on truncate; save truncation position in system table instead patch by jbellis; reviewed by yukim for CASSANDRA-4906
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/38bfc6dc Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/38bfc6dc Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/38bfc6dc Branch: refs/heads/cassandra-1.2 Commit: 38bfc6dca06bd0192167ae5e9bd51d593542f03e Parents: 93f8fec Author: Jonathan Ellis <jbel...@apache.org> Authored: Sat Oct 27 11:18:31 2012 -0700 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Mon Nov 12 15:05:36 2012 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/config/CFMetaData.java | 3 +- .../apache/cassandra/cql3/UntypedResultSet.java | 9 ++- .../org/apache/cassandra/db/ColumnFamilyStore.java | 97 ++++----------- src/java/org/apache/cassandra/db/SystemTable.java | 64 ++++++++++ .../cassandra/db/commitlog/CommitLogReplayer.java | 21 ++-- .../cassandra/db/compaction/CompactionManager.java | 5 +- 7 files changed, 118 insertions(+), 82 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/38bfc6dc/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index ff79b9a..9ac6227 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 1.2-rc1 + * save truncation position in system table (CASSANDRA-4906) * Move CompressionMetadata off-heap (CASSANDRA-4937) * allow CLI to GET cql3 columnfamily data (CASSANDRA-4924) * Fix rare race condition in getExpireTimeForEndpoint (CASSANDRA-4402) http://git-wip-us.apache.org/repos/asf/cassandra/blob/38bfc6dc/src/java/org/apache/cassandra/config/CFMetaData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java index 921242a..5f0e93a 100644 --- a/src/java/org/apache/cassandra/config/CFMetaData.java +++ b/src/java/org/apache/cassandra/config/CFMetaData.java @@ -181,7 +181,8 @@ public final class CFMetaData + "thrift_version text," + "cql_version text," + "data_center text," - + "rack text" + + "rack text," + + "truncated_at map<uuid, blob>" + ") WITH COMMENT='information about the local node'"); public static final CFMetaData TraceSessionsCf = compile(14, "CREATE TABLE " + Tracing.SESSIONS_CF + " (" http://git-wip-us.apache.org/repos/asf/cassandra/blob/38bfc6dc/src/java/org/apache/cassandra/cql3/UntypedResultSet.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/UntypedResultSet.java b/src/java/org/apache/cassandra/cql3/UntypedResultSet.java index ca3acf5..b6fcb55 100644 --- a/src/java/org/apache/cassandra/cql3/UntypedResultSet.java +++ b/src/java/org/apache/cassandra/cql3/UntypedResultSet.java @@ -133,7 +133,14 @@ public class UntypedResultSet implements Iterable<UntypedResultSet.Row> public <T> Set<T> getSet(String column, AbstractType<T> type) { - return SetType.getInstance(type).compose(data.get(column)); + ByteBuffer raw = data.get(column); + return raw == null ? null : SetType.getInstance(type).compose(raw); + } + + public <K, V> Map<K, V> getMap(String column, AbstractType<K> keyType, AbstractType<V> valueType) + { + ByteBuffer raw = data.get(column); + return raw == null ? null : MapType.getInstance(keyType, valueType).compose(raw); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/38bfc6dc/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index a91af8c..439ef5f 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -1720,38 +1720,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean } /** - * Waits for flushes started BEFORE THIS METHOD IS CALLED to finish. - * Does NOT guarantee that no flush is active when it returns. - */ - private void waitForActiveFlushes() - { - Future<?> future; - Table.switchLock.writeLock().lock(); - try - { - future = postFlushExecutor.submit(new Runnable() { public void run() { } }); - } - finally - { - Table.switchLock.writeLock().unlock(); - } - - try - { - future.get(); - } - catch (InterruptedException e) - { - throw new AssertionError(e); - } - catch (ExecutionException e) - { - throw new AssertionError(e); - } - } - - /** - * Truncate practically deletes the entire column family's data + * Truncate deletes the entire column family's data with no expensive tombstone creation * @return a Future to the delete operation. Call the future's get() to make * sure the column family has been deleted */ @@ -1767,21 +1736,29 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean // Bonus complication: since we store replay position in sstable metadata, // truncating those sstables means we will replay any CL segments from the // beginning if we restart before they are discarded for normal reasons - // post-truncate. So we need to (a) force a new segment so the currently - // active one can be discarded, and (b) flush *all* CFs so that unflushed - // data in others don't keep any pre-truncate CL segments alive. - // - // Bonus bonus: simply forceFlush of all the CF is not enough, because if - // for a given column family the memtable is clean, forceFlush will return - // immediately, even though there could be a memtable being flushed at the same - // time. So to guarantee that all segments can be cleaned out, we need to - // "waitForActiveFlushes" after the new segment has been created. + // post-truncate. So we need to create a "dummy" sstable containing + // only the replay position. This is done by CompactionManager.submitTruncate. logger.debug("truncating {}", columnFamily); if (DatabaseDescriptor.isAutoSnapshot()) { // flush the CF being truncated before forcing the new segment forceBlockingFlush(); + + // sleep a little to make sure that our truncatedAt comes after any sstable + // that was part of the flushed we forced; otherwise on a tie, it won't get deleted. + try + { + long starttime = System.currentTimeMillis(); + while ((System.currentTimeMillis() - starttime) < 1) + { + Thread.sleep(1); + } + } + catch (InterruptedException e) + { + throw new AssertionError(e); + } } else { @@ -1804,33 +1781,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean } } - KSMetaData ksm = Schema.instance.getKSMetaData(this.table.name); - if (ksm.durableWrites) - { - CommitLog.instance.forceNewSegment(); - Future<ReplayPosition> position = CommitLog.instance.getContext(); - // now flush everyone else. re-flushing ourselves is not necessary, but harmless - for (ColumnFamilyStore cfs : ColumnFamilyStore.all()) - cfs.forceFlush(); - waitForActiveFlushes(); - // if everything was clean, flush won't have called discard - CommitLog.instance.discardCompletedSegments(metadata.cfId, position.get()); - } - - // sleep a little to make sure that our truncatedAt comes after any sstable - // that was part of the flushed we forced; otherwise on a tie, it won't get deleted. - try - { - long starttime = System.currentTimeMillis(); - while ((System.currentTimeMillis() - starttime) < 1) - { - Thread.sleep(1); - } - } - catch (InterruptedException e) - { - throw new AssertionError(e); - } long truncatedAt = System.currentTimeMillis(); if (DatabaseDescriptor.isAutoSnapshot()) snapshot(Table.getTimestampedSnapshotName(columnFamily)); @@ -2093,8 +2043,10 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean * * @param truncatedAt The timestamp of the truncation * (all SSTables before that timestamp are going be marked as compacted) + * + * @return the most recent replay position of the truncated data */ - public void discardSSTables(long truncatedAt) + public ReplayPosition discardSSTables(long truncatedAt) { List<SSTableReader> truncatedSSTables = new ArrayList<SSTableReader>(); @@ -2104,7 +2056,10 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean truncatedSSTables.add(sstable); } - if (!truncatedSSTables.isEmpty()) - markCompacted(truncatedSSTables, OperationType.UNKNOWN); + if (truncatedSSTables.isEmpty()) + return ReplayPosition.NONE; + + markCompacted(truncatedSSTables, OperationType.UNKNOWN); + return ReplayPosition.getReplayPosition(truncatedSSTables); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/38bfc6dc/src/java/org/apache/cassandra/db/SystemTable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SystemTable.java b/src/java/org/apache/cassandra/db/SystemTable.java index a14ba58..34f7096 100644 --- a/src/java/org/apache/cassandra/db/SystemTable.java +++ b/src/java/org/apache/cassandra/db/SystemTable.java @@ -17,6 +17,9 @@ */ package org.apache.cassandra.db; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; import java.net.InetAddress; import java.nio.ByteBuffer; import java.nio.charset.CharacterCodingException; @@ -29,8 +32,11 @@ import com.google.common.collect.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.avro.ipc.ByteBufferInputStream; import org.apache.cassandra.config.KSMetaData; import org.apache.cassandra.config.Schema; +import org.apache.cassandra.db.commitlog.ReplayPosition; +import org.apache.cassandra.db.marshal.UUIDType; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.QueryProcessor; @@ -43,12 +49,15 @@ import org.apache.cassandra.db.marshal.BytesType; import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.io.util.FastByteArrayOutputStream; import org.apache.cassandra.locator.IEndpointSnitch; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.thrift.Constants; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.CounterId; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.Pair; import static org.apache.cassandra.cql3.QueryProcessor.processInternal; @@ -178,6 +187,61 @@ public class SystemTable } } + public static void saveTruncationPosition(ColumnFamilyStore cfs, ReplayPosition position) + { + String req = "UPDATE system.%s SET truncated_at = truncated_at + %s WHERE key = '%s'"; + processInternal(String.format(req, LOCAL_CF, positionAsMapEntry(cfs, position), LOCAL_KEY)); + forceBlockingFlush(LOCAL_CF); + } + + private static String positionAsMapEntry(ColumnFamilyStore cfs, ReplayPosition position) + { + DataOutputBuffer out = new DataOutputBuffer(); + try + { + ReplayPosition.serializer.serialize(position, out); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + return String.format("{'%s': '%s'}", + cfs.metadata.cfId, + ByteBufferUtil.bytesToHex(ByteBuffer.wrap(out.getData(), 0, out.getLength()))); + } + + public static Map<UUID, ReplayPosition> getTruncationPositions() + { + String req = "SELECT truncated_at FROM system.%s WHERE key = '%s'"; + UntypedResultSet rows = processInternal(String.format(req, LOCAL_CF, LOCAL_KEY)); + if (rows.isEmpty()) + return Collections.emptyMap(); + + UntypedResultSet.Row row = rows.one(); + Map<UUID, ByteBuffer> rawMap = row.getMap("truncated_at", UUIDType.instance, BytesType.instance); + if (rawMap == null) + return Collections.emptyMap(); + + Map<UUID, ReplayPosition> positions = new HashMap<UUID, ReplayPosition>(); + for (Map.Entry<UUID, ByteBuffer> entry : rawMap.entrySet()) + { + positions.put(entry.getKey(), positionFromBlob(entry.getValue())); + } + return positions; + } + + private static ReplayPosition positionFromBlob(ByteBuffer bytes) + { + try + { + return ReplayPosition.serializer.deserialize(new DataInputStream(ByteBufferUtil.inputStream(bytes))); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + /** * Record tokens being used by another node */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/38bfc6dc/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java index 9bc0179..9f949d0 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java @@ -35,11 +35,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.config.Schema; -import org.apache.cassandra.db.ColumnFamily; -import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.RowMutation; -import org.apache.cassandra.db.Table; -import org.apache.cassandra.db.UnknownColumnFamilyException; +import org.apache.cassandra.db.*; import org.apache.cassandra.io.IColumnSerializer; import org.apache.cassandra.io.util.FastByteArrayInputStream; import org.apache.cassandra.io.util.FileUtils; @@ -72,18 +68,27 @@ public class CommitLogReplayer this.invalidMutations = new HashMap<UUID, AtomicInteger>(); // count the number of replayed mutation. We don't really care about atomicity, but we need it to be a reference. this.replayedCount = new AtomicInteger(); + this.checksum = new PureJavaCrc32(); + // compute per-CF and global replay positions - this.cfPositions = new HashMap<UUID, ReplayPosition>(); + cfPositions = new HashMap<UUID, ReplayPosition>(); + Ordering<ReplayPosition> replayPositionOrdering = Ordering.from(ReplayPosition.comparator); + Map<UUID, ReplayPosition> truncationPositions = SystemTable.getTruncationPositions(); for (ColumnFamilyStore cfs : ColumnFamilyStore.all()) { // it's important to call RP.gRP per-cf, before aggregating all the positions w/ the Ordering.min call // below: gRP will return NONE if there are no flushed sstables, which is important to have in the // list (otherwise we'll just start replay from the first flush position that we do have, which is not correct). ReplayPosition rp = ReplayPosition.getReplayPosition(cfs.getSSTables()); + + // but, if we've truncted the cf in question, then we need to need to start replay after the truncation + ReplayPosition truncatedAt = truncationPositions.get(cfs.metadata.cfId); + if (truncatedAt != null) + rp = replayPositionOrdering.max(Arrays.asList(rp, truncatedAt)); + cfPositions.put(cfs.metadata.cfId, rp); } - this.globalPosition = Ordering.from(ReplayPosition.comparator).min(cfPositions.values()); - this.checksum = new PureJavaCrc32(); + globalPosition = replayPositionOrdering.min(cfPositions.values()); } public void recover(File[] clogs) throws IOException http://git-wip-us.apache.org/repos/asf/cassandra/blob/38bfc6dc/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 2d06036..4399948 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -44,6 +44,7 @@ import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.*; +import org.apache.cassandra.db.commitlog.ReplayPosition; import org.apache.cassandra.db.compaction.CompactionInfo.Holder; import org.apache.cassandra.db.index.SecondaryIndex; import org.apache.cassandra.db.index.SecondaryIndexBuilder; @@ -861,11 +862,13 @@ public class CompactionManager implements CompactionManagerMBean try { - main.discardSSTables(truncatedAt); + ReplayPosition replayAfter = main.discardSSTables(truncatedAt); for (SecondaryIndex index : main.indexManager.getIndexes()) index.truncate(truncatedAt); + SystemTable.saveTruncationPosition(main, replayAfter); + for (RowCacheKey key : CacheService.instance.rowCache.getKeySet()) { if (key.cfId == main.metadata.cfId)