Merge commit '904cb5d10e0de1a6ca89249be8c257ed38a80ef0' into cassandra-3.9 * commit '904cb5d10e0de1a6ca89249be8c257ed38a80ef0': Change commitlog and sstables to track dirty and clean intervals. Disable passing control to post-flush after flush failure to prevent data loss.
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7b102173 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7b102173 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7b102173 Branch: refs/heads/trunk Commit: 7b1021733b55c8865f80e261697b4c079d086633 Parents: 21c92ca 904cb5d Author: Sylvain Lebresne <sylv...@datastax.com> Authored: Fri Aug 5 15:39:15 2016 +0200 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Fri Aug 5 15:39:56 2016 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/db/BlacklistedDirectories.java | 13 + .../apache/cassandra/db/ColumnFamilyStore.java | 70 +--- .../org/apache/cassandra/db/Directories.java | 2 +- src/java/org/apache/cassandra/db/Memtable.java | 21 +- .../AbstractCommitLogSegmentManager.java | 4 +- .../cassandra/db/commitlog/CommitLog.java | 11 +- .../db/commitlog/CommitLogReplayer.java | 105 ++---- .../db/commitlog/CommitLogSegment.java | 82 +++-- .../cassandra/db/commitlog/IntervalSet.java | 192 +++++++++++ .../compaction/AbstractCompactionStrategy.java | 3 + .../compaction/CompactionStrategyManager.java | 3 + .../apache/cassandra/db/lifecycle/Tracker.java | 45 +-- .../org/apache/cassandra/db/lifecycle/View.java | 37 +-- .../cassandra/io/sstable/format/Version.java | 2 + .../io/sstable/format/big/BigFormat.java | 12 +- .../metadata/LegacyMetadataSerializer.java | 17 +- .../io/sstable/metadata/MetadataCollector.java | 37 +-- .../io/sstable/metadata/StatsMetadata.java | 44 +-- .../cassandra/tools/SSTableMetadataViewer.java | 3 +- .../apache/cassandra/utils/IntegerInterval.java | 227 +++++++++++++ .../legacy_mc_clust/mc-1-big-CompressionInfo.db | Bin 0 -> 83 bytes .../legacy_mc_clust/mc-1-big-Data.db | Bin 0 -> 5355 bytes .../legacy_mc_clust/mc-1-big-Digest.crc32 | 1 + .../legacy_mc_clust/mc-1-big-Filter.db | Bin 0 -> 24 bytes .../legacy_mc_clust/mc-1-big-Index.db | Bin 0 -> 157553 bytes .../legacy_mc_clust/mc-1-big-Statistics.db | Bin 0 -> 7086 bytes .../legacy_mc_clust/mc-1-big-Summary.db | Bin 0 -> 47 bytes .../legacy_mc_clust/mc-1-big-TOC.txt | 8 + .../mc-1-big-CompressionInfo.db | Bin 0 -> 83 bytes .../legacy_mc_clust_compact/mc-1-big-Data.db | Bin 0 -> 5382 bytes .../mc-1-big-Digest.crc32 | 1 + .../legacy_mc_clust_compact/mc-1-big-Filter.db | Bin 0 -> 24 bytes .../legacy_mc_clust_compact/mc-1-big-Index.db | Bin 0 -> 157553 bytes .../mc-1-big-Statistics.db | Bin 0 -> 7086 bytes .../legacy_mc_clust_compact/mc-1-big-Summary.db | Bin 0 -> 47 bytes .../legacy_mc_clust_compact/mc-1-big-TOC.txt | 8 + .../mc-1-big-CompressionInfo.db | Bin 0 -> 75 bytes .../legacy_mc_clust_counter/mc-1-big-Data.db | Bin 0 -> 4631 bytes .../mc-1-big-Digest.crc32 | 1 + .../legacy_mc_clust_counter/mc-1-big-Filter.db | Bin 0 -> 24 bytes .../legacy_mc_clust_counter/mc-1-big-Index.db | Bin 0 -> 157553 bytes .../mc-1-big-Statistics.db | Bin 0 -> 7095 bytes .../legacy_mc_clust_counter/mc-1-big-Summary.db | Bin 0 -> 47 bytes .../legacy_mc_clust_counter/mc-1-big-TOC.txt | 8 + .../mc-1-big-CompressionInfo.db | Bin 0 -> 75 bytes .../mc-1-big-Data.db | Bin 0 -> 4625 bytes .../mc-1-big-Digest.crc32 | 1 + .../mc-1-big-Filter.db | Bin 0 -> 24 bytes .../mc-1-big-Index.db | Bin 0 -> 157553 bytes .../mc-1-big-Statistics.db | Bin 0 -> 7095 bytes .../mc-1-big-Summary.db | Bin 0 -> 47 bytes .../mc-1-big-TOC.txt | 8 + .../mc-1-big-CompressionInfo.db | Bin 0 -> 43 bytes .../legacy_mc_simple/mc-1-big-Data.db | Bin 0 -> 89 bytes .../legacy_mc_simple/mc-1-big-Digest.crc32 | 1 + .../legacy_mc_simple/mc-1-big-Filter.db | Bin 0 -> 24 bytes .../legacy_mc_simple/mc-1-big-Index.db | Bin 0 -> 26 bytes .../legacy_mc_simple/mc-1-big-Statistics.db | Bin 0 -> 4639 bytes .../legacy_mc_simple/mc-1-big-Summary.db | Bin 0 -> 47 bytes .../legacy_mc_simple/mc-1-big-TOC.txt | 8 + .../mc-1-big-CompressionInfo.db | Bin 0 -> 43 bytes .../legacy_mc_simple_compact/mc-1-big-Data.db | Bin 0 -> 91 bytes .../mc-1-big-Digest.crc32 | 1 + .../legacy_mc_simple_compact/mc-1-big-Filter.db | Bin 0 -> 24 bytes .../legacy_mc_simple_compact/mc-1-big-Index.db | Bin 0 -> 26 bytes .../mc-1-big-Statistics.db | Bin 0 -> 4680 bytes .../mc-1-big-Summary.db | Bin 0 -> 47 bytes .../legacy_mc_simple_compact/mc-1-big-TOC.txt | 8 + .../mc-1-big-CompressionInfo.db | Bin 0 -> 43 bytes .../legacy_mc_simple_counter/mc-1-big-Data.db | Bin 0 -> 110 bytes .../mc-1-big-Digest.crc32 | 1 + .../legacy_mc_simple_counter/mc-1-big-Filter.db | Bin 0 -> 24 bytes .../legacy_mc_simple_counter/mc-1-big-Index.db | Bin 0 -> 27 bytes .../mc-1-big-Statistics.db | Bin 0 -> 4648 bytes .../mc-1-big-Summary.db | Bin 0 -> 47 bytes .../legacy_mc_simple_counter/mc-1-big-TOC.txt | 8 + .../mc-1-big-CompressionInfo.db | Bin 0 -> 43 bytes .../mc-1-big-Data.db | Bin 0 -> 114 bytes .../mc-1-big-Digest.crc32 | 1 + .../mc-1-big-Filter.db | Bin 0 -> 24 bytes .../mc-1-big-Index.db | Bin 0 -> 27 bytes .../mc-1-big-Statistics.db | Bin 0 -> 4689 bytes .../mc-1-big-Summary.db | Bin 0 -> 47 bytes .../mc-1-big-TOC.txt | 8 + .../db/commitlog/CommitLogStressTest.java | 3 +- test/unit/org/apache/cassandra/Util.java | 21 +- .../org/apache/cassandra/cql3/CQLTester.java | 12 +- .../apache/cassandra/cql3/OutOfSpaceTest.java | 33 +- .../cassandra/db/commitlog/CommitLogTest.java | 151 ++++++++- .../cassandra/db/compaction/NeverPurgeTest.java | 6 +- .../cassandra/db/lifecycle/TrackerTest.java | 12 +- .../apache/cassandra/db/lifecycle/ViewTest.java | 2 +- .../cassandra/io/sstable/LegacySSTableTest.java | 2 +- .../io/sstable/SSTableRewriterTest.java | 4 +- .../metadata/MetadataSerializerTest.java | 16 +- .../cassandra/utils/IntegerIntervalsTest.java | 326 +++++++++++++++++++ 97 files changed, 1222 insertions(+), 369 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 289f370,b596fc9..43d28f3 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,10 -1,5 +1,11 @@@ -3.0.9 +3.9 + * Fix nodetool tablestats miss SSTable count (CASSANDRA-12205) + * Fixed flacky SSTablesIteratedTest (CASSANDRA-12282) + * Fixed flacky SSTableRewriterTest: check file counts before calling validateCFS (CASSANDRA-12348) + * cqlsh: Fix handling of $$-escaped strings (CASSANDRA-12189) + * Fix SSL JMX requiring truststore containing server cert (CASSANDRA-12109) +Merged from 3.0: + * Change commitlog and sstables to track dirty and clean intervals (CASSANDRA-11828) * NullPointerException during compaction on table with static columns (CASSANDRA-12336) * Fixed ConcurrentModificationException when reading metrics in GraphiteReporter (CASSANDRA-11823) * Fix upgrade of super columns on thrift (CASSANDRA-12335) http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/src/java/org/apache/cassandra/db/BlacklistedDirectories.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 9d31b60,82604e2..21becfe --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@@ -957,29 -922,19 +954,20 @@@ public class ColumnFamilyStore implemen final boolean flushSecondaryIndexes; final OpOrder.Barrier writeBarrier; final CountDownLatch latch = new CountDownLatch(1); - final CommitLogPosition commitLogUpperBound; - volatile FSWriteError flushFailure = null; + volatile Throwable flushFailure = null; final List<Memtable> memtables; - final List<Collection<SSTableReader>> readers; - private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier, + private PostFlush(boolean flushSecondaryIndexes, + OpOrder.Barrier writeBarrier, - CommitLogPosition commitLogUpperBound, - List<Memtable> memtables, - List<Collection<SSTableReader>> readers) + List<Memtable> memtables) { this.writeBarrier = writeBarrier; this.flushSecondaryIndexes = flushSecondaryIndexes; - this.commitLogUpperBound = commitLogUpperBound; this.memtables = memtables; - this.readers = readers; } - public ReplayPosition call() + public CommitLogPosition call() { - if (discardFlushResults == ColumnFamilyStore.this) - return commitLogUpperBound; - writeBarrier.await(); /** @@@ -1003,19 -958,13 +991,13 @@@ throw new IllegalStateException(); } - // Must check commitLogUpperBound != null because Flush may find that all memtables are clean - // and so not set a commitLogUpperBound - ReplayPosition commitLogUpperBound = ReplayPosition.NONE; ++ CommitLogPosition commitLogUpperBound = CommitLogPosition.NONE; // If a flush errored out but the error was ignored, make sure we don't discard the commit log. - if (flushFailure == null) + if (flushFailure == null && !memtables.isEmpty()) { - CommitLog.instance.discardCompletedSegments(metadata.cfId, commitLogUpperBound); - for (int i = 0 ; i < memtables.size() ; i++) - { - Memtable memtable = memtables.get(i); - Collection<SSTableReader> reader = readers.get(i); - memtable.cfs.data.permitCompactionOfFlushed(reader); - memtable.cfs.compactionStrategyManager.replaceFlushed(memtable, reader); - } + Memtable memtable = memtables.get(0); + commitLogUpperBound = memtable.getCommitLogUpperBound(); + CommitLog.instance.discardCompletedSegments(metadata.cfId, memtable.getCommitLogLowerBound(), commitLogUpperBound); } metric.pendingFlushes.dec(); @@@ -1079,9 -1027,9 +1060,9 @@@ // we then issue the barrier; this lets us wait for all operations started prior to the barrier to complete; // since this happens after wiring up the commitLogUpperBound, we also know all operations with earlier - // replay positions have also completed, i.e. the memtables are done and ready to flush + // commit log segment position have also completed, i.e. the memtables are done and ready to flush writeBarrier.issue(); - postFlush = new PostFlush(!truncate, writeBarrier, commitLogUpperBound.get(), memtables, readers); + postFlush = new PostFlush(!truncate, writeBarrier, memtables); } public void run() @@@ -1111,110 -1059,23 +1092,108 @@@ try { for (Memtable memtable : memtables) -- { - this.readers.add(flushMemtable(memtable)); - Collection<SSTableReader> readers = memtable.flush(); - memtable.cfs.replaceFlushed(memtable, readers); - reclaim(memtable); -- } ++ flushMemtable(memtable); } - catch (FSWriteError e) + catch (Throwable t) { - JVMStabilityInspector.inspectThrowable(e); - // If we weren't killed, try to continue work but do not allow CommitLog to be discarded. - postFlush.flushFailure = e; + JVMStabilityInspector.inspectThrowable(t); + postFlush.flushFailure = t; } - // signal the post-flush we've done our work postFlush.latch.countDown(); } + public Collection<SSTableReader> flushMemtable(Memtable memtable) + { + List<Future<SSTableMultiWriter>> futures = new ArrayList<>(); + long totalBytesOnDisk = 0; + long maxBytesOnDisk = 0; + long minBytesOnDisk = Long.MAX_VALUE; + List<SSTableReader> sstables = new ArrayList<>(); + try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.FLUSH)) + { + List<Memtable.FlushRunnable> flushRunnables = null; + List<SSTableMultiWriter> flushResults = null; + + try + { + // flush the memtable + flushRunnables = memtable.flushRunnables(txn); + + for (int i = 0; i < flushRunnables.size(); i++) + futures.add(perDiskflushExecutors[i].submit(flushRunnables.get(i))); + + flushResults = Lists.newArrayList(FBUtilities.waitOnFutures(futures)); + } + catch (Throwable t) + { + t = memtable.abortRunnables(flushRunnables, t); + t = txn.abort(t); + throw Throwables.propagate(t); + } + + try + { + Iterator<SSTableMultiWriter> writerIterator = flushResults.iterator(); + while (writerIterator.hasNext()) + { + @SuppressWarnings("resource") + SSTableMultiWriter writer = writerIterator.next(); + if (writer.getFilePointer() > 0) + { + writer.setOpenResult(true).prepareToCommit(); + } + else + { + maybeFail(writer.abort(null)); + writerIterator.remove(); + } + } + } + catch (Throwable t) + { + for (SSTableMultiWriter writer : flushResults) + t = writer.abort(t); + t = txn.abort(t); + Throwables.propagate(t); + } + + txn.prepareToCommit(); + + Throwable accumulate = null; + for (SSTableMultiWriter writer : flushResults) + accumulate = writer.commit(accumulate); + + maybeFail(txn.commit(accumulate)); + + for (SSTableMultiWriter writer : flushResults) + { + Collection<SSTableReader> flushedSSTables = writer.finished(); + for (SSTableReader sstable : flushedSSTables) + { + if (sstable != null) + { + sstables.add(sstable); + long size = sstable.bytesOnDisk(); + totalBytesOnDisk += size; + maxBytesOnDisk = Math.max(maxBytesOnDisk, size); + minBytesOnDisk = Math.min(minBytesOnDisk, size); + } + } + } + } - memtable.cfs.data.replaceFlushed(memtable, sstables); ++ memtable.cfs.replaceFlushed(memtable, sstables); + reclaim(memtable); + memtable.cfs.compactionStrategyManager.compactionLogger.flush(sstables); + logger.debug("Flushed to {} ({} sstables, {}), biggest {}, smallest {}", + sstables, + sstables.size(), + FBUtilities.prettyPrintMemory(totalBytesOnDisk), + FBUtilities.prettyPrintMemory(maxBytesOnDisk), + FBUtilities.prettyPrintMemory(minBytesOnDisk)); + return sstables; + } + private void reclaim(final Memtable memtable) { // issue a read barrier for reclaiming the memory, and offload the wait to another thread @@@ -2268,10 -2085,10 +2222,10 @@@ { Callable<LifecycleTransaction> callable = new Callable<LifecycleTransaction>() { - public LifecycleTransaction call() throws Exception + public LifecycleTransaction call() { assert data.getCompacting().isEmpty() : data.getCompacting(); - Iterable<SSTableReader> sstables = getPermittedToCompactSSTables(); + Iterable<SSTableReader> sstables = getLiveSSTables(); sstables = AbstractCompactionStrategy.filterSuspectSSTables(sstables); sstables = ImmutableList.copyOf(sstables); LifecycleTransaction modifier = data.tryModify(sstables, operationType); http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/src/java/org/apache/cassandra/db/Directories.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/src/java/org/apache/cassandra/db/Memtable.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/Memtable.java index 7a46d8a,3c77092..e9cca4a --- a/src/java/org/apache/cassandra/db/Memtable.java +++ b/src/java/org/apache/cassandra/db/Memtable.java @@@ -33,7 -33,9 +33,9 @@@ import org.apache.cassandra.config.CFMe import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.commitlog.CommitLog; +import org.apache.cassandra.db.commitlog.CommitLogPosition; + import org.apache.cassandra.db.commitlog.IntervalSet; -import org.apache.cassandra.db.commitlog.ReplayPosition; + import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.filter.ClusteringIndexFilter; import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; @@@ -192,6 -194,11 +194,11 @@@ public class Memtable implements Compar return commitLogLowerBound.get(); } - public ReplayPosition getCommitLogUpperBound() ++ public CommitLogPosition getCommitLogUpperBound() + { + return commitLogUpperBound.get(); + } + public boolean isLive() { return allocator.isLive(); @@@ -361,63 -337,39 +368,72 @@@ return minTimestamp; } + /** + * For testing only. Give this memtable too big a size to make it always fail flushing. + */ + @VisibleForTesting + public void makeUnflushable() + { + liveDataSize.addAndGet(1L * 1024 * 1024 * 1024 * 1024 * 1024); + } + - private long estimatedSize() + class FlushRunnable implements Callable<SSTableMultiWriter> { - long keySize = 0; - for (PartitionPosition key : partitions.keySet()) + private final long estimatedSize; + private final ConcurrentNavigableMap<PartitionPosition, AtomicBTreePartition> toFlush; + + private final boolean isBatchLogTable; + private final SSTableMultiWriter writer; + + // keeping these to be able to log what we are actually flushing + private final PartitionPosition from; + private final PartitionPosition to; + + FlushRunnable(PartitionPosition from, PartitionPosition to, Directories.DataDirectory flushLocation, LifecycleTransaction txn) { - // make sure we don't write non-sensical keys - assert key instanceof DecoratedKey; - keySize += ((DecoratedKey)key).getKey().remaining(); + this(partitions.subMap(from, to), flushLocation, from, to, txn); } - return (long) ((keySize // index entries - + keySize // keys in data file - + liveDataSize.get()) // data - * 1.2); // bloom filter and row index overhead - } - private Collection<SSTableReader> writeSortedContents(File sstableDirectory) - { - boolean isBatchLogTable = cfs.name.equals(SystemKeyspace.BATCHES) && cfs.keyspace.getName().equals(SystemKeyspace.NAME); + FlushRunnable(LifecycleTransaction txn) + { + this(partitions, null, null, null, txn); + } + + FlushRunnable(ConcurrentNavigableMap<PartitionPosition, AtomicBTreePartition> toFlush, Directories.DataDirectory flushLocation, PartitionPosition from, PartitionPosition to, LifecycleTransaction txn) + { + this.toFlush = toFlush; + this.from = from; + this.to = to; + long keySize = 0; + for (PartitionPosition key : toFlush.keySet()) + { + // make sure we don't write non-sensical keys + assert key instanceof DecoratedKey; + keySize += ((DecoratedKey) key).getKey().remaining(); + } + estimatedSize = (long) ((keySize // index entries + + keySize // keys in data file + + liveDataSize.get()) // data + * 1.2); // bloom filter and row index overhead + + this.isBatchLogTable = cfs.name.equals(SystemKeyspace.BATCHES) && cfs.keyspace.getName().equals(SystemKeyspace.NAME); - logger.debug("Writing {}", Memtable.this.toString()); + if (flushLocation == null) + writer = createFlushWriter(txn, cfs.getSSTablePath(getDirectories().getWriteableLocationAsFile(estimatedSize)), columnsCollector.get(), statsCollector.get()); + else + writer = createFlushWriter(txn, cfs.getSSTablePath(getDirectories().getLocationForDisk(flushLocation)), columnsCollector.get(), statsCollector.get()); + + } - Collection<SSTableReader> ssTables; - try (SSTableTxnWriter writer = createFlushWriter(cfs.getSSTablePath(sstableDirectory), columnsCollector.get(), statsCollector.get())) + protected Directories getDirectories() { + return cfs.getDirectories(); + } + + private void writeSortedContents() + { + logger.debug("Writing {}, flushed range = ({}, {}]", Memtable.this.toString(), from, to); + boolean trackContention = logger.isTraceEnabled(); int heavilyContendedRowCount = 0; // (we can't clear out the map as-we-go to free up memory, @@@ -444,39 -396,58 +460,38 @@@ } } - if (writer.getFilePointer() > 0) - { - logger.debug(String.format("Completed flushing %s (%s) for commitlog position %s", - writer.getFilename(), - FBUtilities.prettyPrintMemory(writer.getFilePointer()), - commitLogUpperBound)); - - // sstables should contain non-repaired data. - ssTables = writer.finish(true); - } - else - { - logger.debug("Completed flushing {}; nothing needed to be retained. Commitlog position was {}", - writer.getFilename(), commitLogUpperBound); - writer.abort(); - ssTables = Collections.emptyList(); - } + long bytesFlushed = writer.getFilePointer(); + logger.debug(String.format("Completed flushing %s (%s) for commitlog position %s", + writer.getFilename(), + FBUtilities.prettyPrintMemory(bytesFlushed), + commitLogUpperBound)); + // Update the metrics + cfs.metric.bytesFlushed.inc(bytesFlushed); if (heavilyContendedRowCount > 0) - logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, partitions.size(), Memtable.this.toString())); - - return ssTables; + logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, toFlush.size(), Memtable.this.toString())); } - } - @SuppressWarnings("resource") // log and writer closed by SSTableTxnWriter - public SSTableTxnWriter createFlushWriter(String filename, - PartitionColumns columns, - EncodingStats stats) - { - // we operate "offline" here, as we expose the resulting reader consciously when done - // (although we may want to modify this behaviour in future, to encapsulate full flush behaviour in LifecycleTransaction) - LifecycleTransaction txn = null; - try + public SSTableMultiWriter createFlushWriter(LifecycleTransaction txn, + String filename, + PartitionColumns columns, + EncodingStats stats) { - txn = LifecycleTransaction.offline(OperationType.FLUSH); MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.metadata.comparator) - .commitLogLowerBound(commitLogLowerBound.get()) - .commitLogUpperBound(commitLogUpperBound.get()); - .commitLogIntervals(new IntervalSet(commitLogLowerBound.get(), commitLogUpperBound.get())); - - return new SSTableTxnWriter(txn, - cfs.createSSTableMultiWriter(Descriptor.fromFilename(filename), - (long) partitions.size(), - ActiveRepairService.UNREPAIRED_SSTABLE, - sstableMetadataCollector, - new SerializationHeader(true, cfs.metadata, columns, stats), - txn)); ++ .commitLogIntervals(new IntervalSet<>(commitLogLowerBound.get(), commitLogUpperBound.get())); ++ + return cfs.createSSTableMultiWriter(Descriptor.fromFilename(filename), + (long)toFlush.size(), + ActiveRepairService.UNREPAIRED_SSTABLE, + sstableMetadataCollector, + new SerializationHeader(true, cfs.metadata, columns, stats), txn); - } - catch (Throwable t) + + @Override + public SSTableMultiWriter call() { - if (txn != null) - txn.close(); - throw t; + writeSortedContents(); + return writer; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java index 7ea7439,0000000..8f3b7e4 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java +++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java @@@ -1,582 -1,0 +1,582 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.commitlog; + +import java.io.File; +import java.io.IOException; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Iterables; +import com.google.common.util.concurrent.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.db.*; +import org.apache.cassandra.utils.*; +import org.apache.cassandra.utils.concurrent.WaitQueue; + +import static org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation; + +/** + * Performs eager-creation of commit log segments in a background thread. All the + * public methods are thread safe. + */ +public abstract class AbstractCommitLogSegmentManager +{ + static final Logger logger = LoggerFactory.getLogger(AbstractCommitLogSegmentManager.class); + + // Queue of work to be done by the manager thread, also used to wake the thread to perform segment allocation. + private final BlockingQueue<Runnable> segmentManagementTasks = new LinkedBlockingQueue<>(); + + /** Segments that are ready to be used. Head of the queue is the one we allocate writes to */ + private final ConcurrentLinkedQueue<CommitLogSegment> availableSegments = new ConcurrentLinkedQueue<>(); + + /** Active segments, containing unflushed data */ + private final ConcurrentLinkedQueue<CommitLogSegment> activeSegments = new ConcurrentLinkedQueue<>(); + + /** The segment we are currently allocating commit log records to */ + protected volatile CommitLogSegment allocatingFrom = null; + + private final WaitQueue hasAvailableSegments = new WaitQueue(); + + final String storageDirectory; + + /** + * Tracks commitlog size, in multiples of the segment size. We need to do this so we can "promise" size + * adjustments ahead of actually adding/freeing segments on disk, so that the "evict oldest segment" logic + * can see the effect of recycling segments immediately (even though they're really happening asynchronously + * on the manager thread, which will take a ms or two). + */ + private final AtomicLong size = new AtomicLong(); + + /** + * New segment creation is initially disabled because we'll typically get some "free" segments + * recycled after log replay. + */ + volatile boolean createReserveSegments = false; + + private Thread managerThread; + protected volatile boolean run = true; + protected final CommitLog commitLog; + + private static final SimpleCachedBufferPool bufferPool = + new SimpleCachedBufferPool(DatabaseDescriptor.getCommitLogMaxCompressionBuffersInPool(), DatabaseDescriptor.getCommitLogSegmentSize()); + + AbstractCommitLogSegmentManager(final CommitLog commitLog, String storageDirectory) + { + this.commitLog = commitLog; + this.storageDirectory = storageDirectory; + } + + void start() + { + // The run loop for the manager thread + Runnable runnable = new WrappedRunnable() + { + public void runMayThrow() throws Exception + { + while (run) + { + try + { + Runnable task = segmentManagementTasks.poll(); + if (task == null) + { + // if we have no more work to do, check if we should create a new segment + if (!atSegmentLimit() && + availableSegments.isEmpty() && + (activeSegments.isEmpty() || createReserveSegments)) + { + logger.trace("No segments in reserve; creating a fresh one"); + // TODO : some error handling in case we fail to create a new segment + availableSegments.add(createSegment()); + hasAvailableSegments.signalAll(); + } + + // flush old Cfs if we're full + long unused = unusedCapacity(); + if (unused < 0) + { + List<CommitLogSegment> segmentsToRecycle = new ArrayList<>(); + long spaceToReclaim = 0; + for (CommitLogSegment segment : activeSegments) + { + if (segment == allocatingFrom) + break; + segmentsToRecycle.add(segment); + spaceToReclaim += DatabaseDescriptor.getCommitLogSegmentSize(); + if (spaceToReclaim + unused >= 0) + break; + } + flushDataFrom(segmentsToRecycle, false); + } + + // Since we're operating on a "null" allocation task, block here for the next task on the + // queue rather than looping, grabbing another null, and repeating the above work. + try + { + task = segmentManagementTasks.take(); + } + catch (InterruptedException e) + { + throw new AssertionError(); + } + } + task.run(); + } + catch (Throwable t) + { + JVMStabilityInspector.inspectThrowable(t); + if (!CommitLog.handleCommitError("Failed managing commit log segments", t)) + return; + // sleep some arbitrary period to avoid spamming CL + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); + } + } + } + + private boolean atSegmentLimit() + { + return CommitLogSegment.usesBufferPool(commitLog) && bufferPool.atLimit(); + } + }; + + run = true; + + managerThread = new Thread(runnable, "COMMIT-LOG-ALLOCATOR"); + managerThread.start(); + } + + + /** + * Shut down the CLSM. Used both during testing and during regular shutdown, so needs to stop everything. + */ + public abstract void shutdown(); + + /** + * Allocate a segment within this CLSM. Should either succeed or throw. + */ + public abstract Allocation allocate(Mutation mutation, int size); + + /** + * The recovery and replay process replays mutations into memtables and flushes them to disk. Individual CLSM + * decide what to do with those segments on disk after they've been replayed. + */ + abstract void handleReplayedSegment(final File file); + + /** + * Hook to allow segment managers to track state surrounding creation of new segments. Onl perform as task submit + * to segment manager so it's performed on segment management thread. + */ + abstract CommitLogSegment createSegment(); + + /** + * Indicates that a segment file has been flushed and is no longer needed. Only perform as task submit to segment + * manager so it's performend on segment management thread, or perform while segment management thread is shutdown + * during testing resets. + * + * @param segment segment to be discarded + * @param delete whether or not the segment is safe to be deleted. + */ + abstract void discard(CommitLogSegment segment, boolean delete); + + + /** + * Grab the current CommitLogSegment we're allocating from. Also serves as a utility method to block while the allocator + * is working on initial allocation of a CommitLogSegment. + */ + CommitLogSegment allocatingFrom() + { + CommitLogSegment r = allocatingFrom; + if (r == null) + { + advanceAllocatingFrom(null); + r = allocatingFrom; + } + return r; + } + + /** + * Fetches a new segment from the queue, signaling the management thread to create a new one if necessary, and "activates" it. + * Blocks until a new segment is allocated and the thread requesting an advanceAllocatingFrom is signalled. + * + * WARNING: Assumes segment management thread always succeeds in allocating a new segment or kills the JVM. + */ + protected void advanceAllocatingFrom(CommitLogSegment old) + { + while (true) + { + CommitLogSegment next; + synchronized (this) + { + // do this in a critical section so we can atomically remove from availableSegments and add to allocatingFrom/activeSegments + // see https://issues.apache.org/jira/browse/CASSANDRA-6557?focusedCommentId=13874432&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13874432 + if (allocatingFrom != old) + return; + next = availableSegments.poll(); + if (next != null) + { + allocatingFrom = next; + activeSegments.add(next); + } + } + + if (next != null) + { + if (old != null) + { + // Now we can run the user defined command just after switching to the new commit log. + // (Do this here instead of in the recycle call so we can get a head start on the archive.) + commitLog.archiver.maybeArchive(old); + + // ensure we don't continue to use the old file; not strictly necessary, but cleaner to enforce it + old.discardUnusedTail(); + } + + // request that the CL be synced out-of-band, as we've finished a segment + commitLog.requestExtraSync(); + return; + } + + // no more segments, so register to receive a signal when not empty + WaitQueue.Signal signal = hasAvailableSegments.register(commitLog.metrics.waitingOnSegmentAllocation.time()); + + // trigger the management thread; this must occur after registering + // the signal to ensure we are woken by any new segment creation + wakeManager(); + + // check if the queue has already been added to before waiting on the signal, to catch modifications + // that happened prior to registering the signal; *then* check to see if we've been beaten to making the change + if (!availableSegments.isEmpty() || allocatingFrom != old) + { + signal.cancel(); + // if we've been beaten, just stop immediately + if (allocatingFrom != old) + return; + // otherwise try again, as there should be an available segment + continue; + } + + // can only reach here if the queue hasn't been inserted into + // before we registered the signal, as we only remove items from the queue + // after updating allocatingFrom. Can safely block until we are signalled + // by the allocator that new segments have been published + signal.awaitUninterruptibly(); + } + } + + protected void wakeManager() + { + // put a NO-OP on the queue, to trigger management thread (and create a new segment if necessary) + segmentManagementTasks.add(Runnables.doNothing()); + } + + /** + * Switch to a new segment, regardless of how much is left in the current one. + * + * Flushes any dirty CFs for this segment and any older segments, and then recycles + * the segments + */ + void forceRecycleAll(Iterable<UUID> droppedCfs) + { + List<CommitLogSegment> segmentsToRecycle = new ArrayList<>(activeSegments); + CommitLogSegment last = segmentsToRecycle.get(segmentsToRecycle.size() - 1); + advanceAllocatingFrom(last); + + // wait for the commit log modifications + last.waitForModifications(); + + // make sure the writes have materialized inside of the memtables by waiting for all outstanding writes + // on the relevant keyspaces to complete + Keyspace.writeOrder.awaitNewBarrier(); + + // flush and wait for all CFs that are dirty in segments up-to and including 'last' + Future<?> future = flushDataFrom(segmentsToRecycle, true); + try + { + future.get(); + + for (CommitLogSegment segment : activeSegments) + for (UUID cfId : droppedCfs) - segment.markClean(cfId, segment.getCurrentCommitLogPosition()); ++ segment.markClean(cfId, CommitLogPosition.NONE, segment.getCurrentCommitLogPosition()); + + // now recycle segments that are unused, as we may not have triggered a discardCompletedSegments() + // if the previous active segment was the only one to recycle (since an active segment isn't + // necessarily dirty, and we only call dCS after a flush). + for (CommitLogSegment segment : activeSegments) + { + if (segment.isUnused()) + recycleSegment(segment); + } + + CommitLogSegment first; + if ((first = activeSegments.peek()) != null && first.id <= last.id) + logger.error("Failed to force-recycle all segments; at least one segment is still in use with dirty CFs."); + } + catch (Throwable t) + { + // for now just log the error + logger.error("Failed waiting for a forced recycle of in-use commit log segments", t); + } + } + + /** + * Indicates that a segment is no longer in use and that it should be recycled. + * + * @param segment segment that is no longer in use + */ + void recycleSegment(final CommitLogSegment segment) + { + boolean archiveSuccess = commitLog.archiver.maybeWaitForArchiving(segment.getName()); + if (activeSegments.remove(segment)) + { + // if archiving (command) was not successful then leave the file alone. don't delete or recycle. + discardSegment(segment, archiveSuccess); + } + else + { + logger.warn("segment {} not found in activeSegments queue", segment); + } + } + + /** + * Indicates that a segment file should be deleted. + * + * @param segment segment to be discarded + */ + private void discardSegment(final CommitLogSegment segment, final boolean deleteFile) + { + logger.trace("Segment {} is no longer active and will be deleted {}", segment, deleteFile ? "now" : "by the archive script"); + segmentManagementTasks.add(() -> discard(segment, deleteFile)); + } + + /** + * Adjust the tracked on-disk size. Called by individual segments to reflect writes, allocations and discards. + * @param addedSize + */ + void addSize(long addedSize) + { + size.addAndGet(addedSize); + } + + /** + * @return the space (in bytes) used by all segment files. + */ + public long onDiskSize() + { + return size.get(); + } + + private long unusedCapacity() + { + long total = DatabaseDescriptor.getTotalCommitlogSpaceInMB() * 1024 * 1024; + long currentSize = size.get(); + logger.trace("Total active commitlog segment space used is {} out of {}", currentSize, total); + return total - currentSize; + } + + /** + * @param name the filename to check + * @return true if file is managed by this manager. + */ + public boolean manages(String name) + { + for (CommitLogSegment segment : Iterables.concat(activeSegments, availableSegments)) + if (segment.getName().equals(name)) + return true; + return false; + } + + /** + * Throws a flag that enables the behavior of keeping at least one spare segment + * available at all times. + */ + void enableReserveSegmentCreation() + { + createReserveSegments = true; + wakeManager(); + } + + /** + * Force a flush on all CFs that are still dirty in @param segments. + * + * @return a Future that will finish when all the flushes are complete. + */ + private Future<?> flushDataFrom(List<CommitLogSegment> segments, boolean force) + { + if (segments.isEmpty()) + return Futures.immediateFuture(null); + final CommitLogPosition maxCommitLogPosition = segments.get(segments.size() - 1).getCurrentCommitLogPosition(); + + // a map of CfId -> forceFlush() to ensure we only queue one flush per cf + final Map<UUID, ListenableFuture<?>> flushes = new LinkedHashMap<>(); + + for (CommitLogSegment segment : segments) + { + for (UUID dirtyCFId : segment.getDirtyCFIDs()) + { + Pair<String,String> pair = Schema.instance.getCF(dirtyCFId); + if (pair == null) + { + // even though we remove the schema entry before a final flush when dropping a CF, + // it's still possible for a writer to race and finish his append after the flush. + logger.trace("Marking clean CF {} that doesn't exist anymore", dirtyCFId); - segment.markClean(dirtyCFId, segment.getCurrentCommitLogPosition()); ++ segment.markClean(dirtyCFId, CommitLogPosition.NONE, segment.getCurrentCommitLogPosition()); + } + else if (!flushes.containsKey(dirtyCFId)) + { + String keyspace = pair.left; + final ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(dirtyCFId); + // can safely call forceFlush here as we will only ever block (briefly) for other attempts to flush, + // no deadlock possibility since switchLock removal + flushes.put(dirtyCFId, force ? cfs.forceFlush() : cfs.forceFlush(maxCommitLogPosition)); + } + } + } + + return Futures.allAsList(flushes.values()); + } + + /** + * Stops CL, for testing purposes. DO NOT USE THIS OUTSIDE OF TESTS. + * Only call this after the AbstractCommitLogService is shut down. + */ + public void stopUnsafe(boolean deleteSegments) + { + logger.trace("CLSM closing and clearing existing commit log segments..."); + createReserveSegments = false; + + awaitManagementTasksCompletion(); + + shutdown(); + try + { + awaitTermination(); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + + synchronized (this) + { + for (CommitLogSegment segment : activeSegments) + closeAndDeleteSegmentUnsafe(segment, deleteSegments); + activeSegments.clear(); + + for (CommitLogSegment segment : availableSegments) + closeAndDeleteSegmentUnsafe(segment, deleteSegments); + availableSegments.clear(); + } + + allocatingFrom = null; + + segmentManagementTasks.clear(); + + size.set(0L); + + logger.trace("CLSM done with closing and clearing existing commit log segments."); + } + + // Used by tests only. + void awaitManagementTasksCompletion() + { + while (!segmentManagementTasks.isEmpty()) + Thread.yield(); + // The last management task is not yet complete. Wait a while for it. + Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + // TODO: If this functionality is required by anything other than tests, signalling must be used to ensure + // waiting completes correctly. + } + + /** + * Explicitly for use only during resets in unit testing. + */ + private void closeAndDeleteSegmentUnsafe(CommitLogSegment segment, boolean delete) + { + try + { + discard(segment, delete); + } + catch (AssertionError ignored) + { + // segment file does not exist + } + } + + /** + * Returns when the management thread terminates. + */ + public void awaitTermination() throws InterruptedException + { + managerThread.join(); + + for (CommitLogSegment segment : activeSegments) + segment.close(); + + for (CommitLogSegment segment : availableSegments) + segment.close(); + + bufferPool.shutdown(); + } + + /** + * @return a read-only collection of the active commit log segments + */ + @VisibleForTesting + public Collection<CommitLogSegment> getActiveSegments() + { + return Collections.unmodifiableCollection(activeSegments); + } + + /** + * @return the current CommitLogPosition of the active segment we're allocating from + */ + CommitLogPosition getCurrentPosition() + { + return allocatingFrom().getCurrentCommitLogPosition(); + } + + /** + * Forces a disk flush on the commit log files that need it. Blocking. + */ + public void sync(boolean syncAllSegments) throws IOException + { + CommitLogSegment current = allocatingFrom(); + for (CommitLogSegment segment : getActiveSegments()) + { + if (!syncAllSegments && segment.id > current.id) + return; + segment.sync(); + } + } + + /** + * Used by compressed and encrypted segments to share a buffer pool across the CLSM. + */ + SimpleCachedBufferPool getBufferPool() + { + return bufferPool; + } +} + http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/src/java/org/apache/cassandra/db/commitlog/CommitLog.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLog.java index b66221c,dfe3f91..32f69eb --- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java @@@ -298,20 -290,21 +298,21 @@@ public class CommitLog implements Commi * given. Discards any commit log segments that are no longer used. * * @param cfId the column family ID that was flushed - * @param context the commit log segment position of the flush + * @param lowerBound the lowest covered replay position of the flush + * @param lowerBound the highest covered replay position of the flush */ - public void discardCompletedSegments(final UUID cfId, final CommitLogPosition context) - public void discardCompletedSegments(final UUID cfId, final ReplayPosition lowerBound, final ReplayPosition upperBound) ++ public void discardCompletedSegments(final UUID cfId, final CommitLogPosition lowerBound, final CommitLogPosition upperBound) { - logger.trace("discard completed log segments for {}, table {}", context, cfId); + logger.trace("discard completed log segments for {}-{}, table {}", lowerBound, upperBound, cfId); // Go thru the active segment files, which are ordered oldest to newest, marking the - // flushed CF as clean, until we reach the segment file containing the ReplayPosition passed + // flushed CF as clean, until we reach the segment file containing the CommitLogPosition passed // in the arguments. Any segments that become unused after they are marked clean will be // recycled or discarded. - for (Iterator<CommitLogSegment> iter = allocator.getActiveSegments().iterator(); iter.hasNext();) + for (Iterator<CommitLogSegment> iter = segmentManager.getActiveSegments().iterator(); iter.hasNext();) { CommitLogSegment segment = iter.next(); - segment.markClean(cfId, context); + segment.markClean(cfId, lowerBound, upperBound); if (segment.isUnused()) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java index c8e597f,af8efb4..92364c8 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java @@@ -54,32 -70,31 +54,32 @@@ public class CommitLogReplayer implemen static final String IGNORE_REPLAY_ERRORS_PROPERTY = "cassandra.commitlog.ignorereplayerrors"; private static final Logger logger = LoggerFactory.getLogger(CommitLogReplayer.class); private static final int MAX_OUTSTANDING_REPLAY_COUNT = Integer.getInteger("cassandra.commitlog_max_outstanding_replay_count", 1024); - private static final int LEGACY_END_OF_SEGMENT_MARKER = 0; - private final Set<Keyspace> keyspacesRecovered; - private final List<Future<?>> futures; - private final Map<UUID, AtomicInteger> invalidMutations; + private final Set<Keyspace> keyspacesReplayed; + private final Queue<Future<Integer>> futures; + private final AtomicInteger replayedCount; - private final Map<UUID, ReplayPositionFilter> cfPersisted; - private final Map<UUID, IntervalSet<ReplayPosition>> cfPersisted; - private final ReplayPosition globalPosition; - private final CRC32 checksum; - private byte[] buffer; - private byte[] uncompressedBuffer; ++ private final Map<UUID, IntervalSet<CommitLogPosition>> cfPersisted; + private final CommitLogPosition globalPosition; + + // Used to throttle speed of replay of mutations if we pass the max outstanding count + private long pendingMutationBytes = 0; private final ReplayFilter replayFilter; private final CommitLogArchiver archiver; - CommitLogReplayer(CommitLog commitLog, ReplayPosition globalPosition, Map<UUID, IntervalSet<ReplayPosition>> cfPersisted, ReplayFilter replayFilter) + @VisibleForTesting + protected CommitLogReader commitLogReader; + + CommitLogReplayer(CommitLog commitLog, + CommitLogPosition globalPosition, - Map<UUID, ReplayPositionFilter> cfPersisted, ++ Map<UUID, IntervalSet<CommitLogPosition>> cfPersisted, + ReplayFilter replayFilter) { - this.keyspacesRecovered = new NonBlockingHashSet<Keyspace>(); - this.futures = new ArrayList<Future<?>>(); - this.buffer = new byte[4096]; - this.uncompressedBuffer = new byte[4096]; - this.invalidMutations = new HashMap<UUID, AtomicInteger>(); + this.keyspacesReplayed = new NonBlockingHashSet<Keyspace>(); + this.futures = new ArrayDeque<Future<Integer>>(); // count the number of replayed mutation. We don't really care about atomicity, but we need it to be a reference. this.replayedCount = new AtomicInteger(); - this.checksum = new CRC32(); this.cfPersisted = cfPersisted; this.globalPosition = globalPosition; this.replayFilter = replayFilter; @@@ -89,10 -103,9 +89,10 @@@ public static CommitLogReplayer construct(CommitLog commitLog) { - // compute per-CF and global commit log segment positions - Map<UUID, ReplayPositionFilter> cfPersisted = new HashMap<>(); + // compute per-CF and global replay intervals - Map<UUID, IntervalSet<ReplayPosition>> cfPersisted = new HashMap<>(); ++ Map<UUID, IntervalSet<CommitLogPosition>> cfPersisted = new HashMap<>(); ReplayFilter replayFilter = ReplayFilter.create(); - CommitLogPosition globalPosition = null; ++ for (ColumnFamilyStore cfs : ColumnFamilyStore.all()) { // but, if we've truncated the cf in question, then we need to need to start replay after the truncation @@@ -117,15 -130,11 +117,11 @@@ } } - ReplayPositionFilter filter = new ReplayPositionFilter(cfs.getSSTables(), truncatedAt); - if (!filter.isEmpty()) - cfPersisted.put(cfs.metadata.cfId, filter); - else - globalPosition = CommitLogPosition.NONE; // if we have no ranges for this CF, we must replay everything and filter - IntervalSet<ReplayPosition> filter = persistedIntervals(cfs.getLiveSSTables(), truncatedAt); ++ IntervalSet<CommitLogPosition> filter = persistedIntervals(cfs.getLiveSSTables(), truncatedAt); + cfPersisted.put(cfs.metadata.cfId, filter); } - if (globalPosition == null) - globalPosition = firstNotCovered(cfPersisted.values()); - logger.trace("Global commit log segment position is {} from columnfamilies {}", globalPosition, FBUtilities.toString(cfPersisted)); - ReplayPosition globalPosition = firstNotCovered(cfPersisted.values()); ++ CommitLogPosition globalPosition = firstNotCovered(cfPersisted.values()); + logger.debug("Global replay position is {} from columnfamilies {}", globalPosition, FBUtilities.toString(cfPersisted)); return new CommitLogReplayer(commitLog, globalPosition, cfPersisted, replayFilter); } @@@ -174,135 -208,38 +170,105 @@@ return replayedCount.get(); } - private int readSyncMarker(CommitLogDescriptor descriptor, int offset, RandomAccessReader reader, boolean tolerateTruncation) throws IOException + /* + * Wrapper around initiating mutations read from the log to make it possible + * to spy on initiated mutations for test + */ + @VisibleForTesting + public static class MutationInitiator { - if (offset > reader.length() - CommitLogSegment.SYNC_MARKER_SIZE) - { - // There was no room in the segment to write a final header. No data could be present here. - return -1; - } - reader.seek(offset); - CRC32 crc = new CRC32(); - updateChecksumInt(crc, (int) (descriptor.id & 0xFFFFFFFFL)); - updateChecksumInt(crc, (int) (descriptor.id >>> 32)); - updateChecksumInt(crc, (int) reader.getPosition()); - int end = reader.readInt(); - long filecrc = reader.readInt() & 0xffffffffL; - if (crc.getValue() != filecrc) + protected Future<Integer> initiateMutation(final Mutation mutation, + final long segmentId, + final int serializedSize, + final int entryLocation, + final CommitLogReplayer commitLogReplayer) { - if (end != 0 || filecrc != 0) + Runnable runnable = new WrappedRunnable() { - handleReplayError(false, - "Encountered bad header at position %d of commit log %s, with invalid CRC. " + - "The end of segment marker should be zero.", - offset, reader.getPath()); - } - return -1; - } - else if (end < offset || end > reader.length()) - { - handleReplayError(tolerateTruncation, "Encountered bad header at position %d of commit log %s, with bad position but valid CRC", - offset, reader.getPath()); - return -1; + public void runMayThrow() + { + if (Schema.instance.getKSMetaData(mutation.getKeyspaceName()) == null) + return; + if (commitLogReplayer.pointInTimeExceeded(mutation)) + return; + + final Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName()); + + // Rebuild the mutation, omitting column families that + // a) the user has requested that we ignore, + // b) have already been flushed, + // or c) are part of a cf that was dropped. + // Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead. + Mutation newMutation = null; + for (PartitionUpdate update : commitLogReplayer.replayFilter.filter(mutation)) + { + if (Schema.instance.getCF(update.metadata().cfId) == null) + continue; // dropped + + // replay if current segment is newer than last flushed one or, + // if it is the last known segment, if we are after the commit log segment position + if (commitLogReplayer.shouldReplay(update.metadata().cfId, new CommitLogPosition(segmentId, entryLocation))) + { + if (newMutation == null) + newMutation = new Mutation(mutation.getKeyspaceName(), mutation.key()); + newMutation.add(update); + commitLogReplayer.replayedCount.incrementAndGet(); + } + } + if (newMutation != null) + { + assert !newMutation.isEmpty(); + + try + { + Uninterruptibles.getUninterruptibly(Keyspace.open(newMutation.getKeyspaceName()).applyFromCommitLog(newMutation)); + } + catch (ExecutionException e) + { + throw Throwables.propagate(e.getCause()); + } + + commitLogReplayer.keyspacesReplayed.add(keyspace); + } + } + }; + return StageManager.getStage(Stage.MUTATION).submit(runnable, serializedSize); } - return end; + } + + /** - * A filter of known safe-to-discard commit log replay positions, based on ++ * A set of known safe-to-discard commit log replay positions, based on + * the range covered by on disk sstables and those prior to the most recent truncation record + */ - public static class ReplayPositionFilter ++ public static IntervalSet<CommitLogPosition> persistedIntervals(Iterable<SSTableReader> onDisk, CommitLogPosition truncatedAt) + { - final NavigableMap<CommitLogPosition, CommitLogPosition> persisted = new TreeMap<>(); - public ReplayPositionFilter(Iterable<SSTableReader> onDisk, CommitLogPosition truncatedAt) - { - for (SSTableReader reader : onDisk) - { - CommitLogPosition start = reader.getSSTableMetadata().commitLogLowerBound; - CommitLogPosition end = reader.getSSTableMetadata().commitLogUpperBound; - add(persisted, start, end); - } - if (truncatedAt != null) - add(persisted, CommitLogPosition.NONE, truncatedAt); - } ++ IntervalSet.Builder<CommitLogPosition> builder = new IntervalSet.Builder<>(); ++ for (SSTableReader reader : onDisk) ++ builder.addAll(reader.getSSTableMetadata().commitLogIntervals); + - private static void add(NavigableMap<CommitLogPosition, CommitLogPosition> ranges, CommitLogPosition start, CommitLogPosition end) - { - // extend ourselves to cover any ranges we overlap - // record directly preceding our end may extend past us, so take the max of our end and its - Map.Entry<CommitLogPosition, CommitLogPosition> extend = ranges.floorEntry(end); - if (extend != null && extend.getValue().compareTo(end) > 0) - end = extend.getValue(); - - // record directly preceding our start may extend into us; if it does, we take it as our start - extend = ranges.lowerEntry(start); - if (extend != null && extend.getValue().compareTo(start) >= 0) - start = extend.getKey(); - - ranges.subMap(start, end).clear(); - ranges.put(start, end); - } - - public boolean shouldReplay(CommitLogPosition position) - { - // replay ranges are start exclusive, end inclusive - Map.Entry<CommitLogPosition, CommitLogPosition> range = persisted.lowerEntry(position); - return range == null || position.compareTo(range.getValue()) > 0; - } - - public boolean isEmpty() - { - return persisted.isEmpty(); - } ++ if (truncatedAt != null) ++ builder.add(CommitLogPosition.NONE, truncatedAt); ++ return builder.build(); + } + - public static CommitLogPosition firstNotCovered(Iterable<ReplayPositionFilter> ranges) ++ /** ++ * Find the earliest commit log position that is not covered by the known flushed ranges for some table. ++ * ++ * For efficiency this assumes that the first contiguously flushed interval we know of contains the moment that the ++ * given table was constructed* and hence we can start replay from the end of that interval. ++ * ++ * If such an interval is not known, we must replay from the beginning. ++ * ++ * * This is not true only until if the very first flush of a table stalled or failed, while the second or latter ++ * succeeded. The chances of this happening are at most very low, and if the assumption does prove to be ++ * incorrect during replay there is little chance that the affected deployment is in production. ++ */ ++ public static CommitLogPosition firstNotCovered(Collection<IntervalSet<CommitLogPosition>> ranges) + { - CommitLogPosition min = null; - for (ReplayPositionFilter map : ranges) - { - CommitLogPosition first = map.persisted.firstEntry().getValue(); - if (min == null) - min = first; - else - min = Ordering.natural().min(min, first); - } - if (min == null) - return CommitLogPosition.NONE; - return min; ++ return ranges.stream() ++ .map(intervals -> Iterables.getFirst(intervals.ends(), CommitLogPosition.NONE)) ++ .min(Ordering.natural()) ++ .get(); // iteration is per known-CF, there must be at least one. } abstract static class ReplayFilter @@@ -386,12 -323,346 +352,11 @@@ * * @return true iff replay is necessary */ - private boolean shouldReplay(UUID cfId, ReplayPosition position) + private boolean shouldReplay(UUID cfId, CommitLogPosition position) { - ReplayPositionFilter filter = cfPersisted.get(cfId); - return filter == null || filter.shouldReplay(position); + return !cfPersisted.get(cfId).contains(position); } - @SuppressWarnings("resource") - public void recover(File file, boolean tolerateTruncation) throws IOException - { - CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName()); - try(ChannelProxy channel = new ChannelProxy(file); - RandomAccessReader reader = RandomAccessReader.open(channel)) - { - if (desc.version < CommitLogDescriptor.VERSION_21) - { - if (logAndCheckIfShouldSkip(file, desc)) - return; - if (globalPosition.segment == desc.id) - reader.seek(globalPosition.position); - replaySyncSection(reader, (int) reader.length(), desc, desc.fileName(), tolerateTruncation); - return; - } - - final long segmentId = desc.id; - try - { - desc = CommitLogDescriptor.readHeader(reader); - } - catch (IOException e) - { - desc = null; - } - if (desc == null) { - handleReplayError(false, "Could not read commit log descriptor in file %s", file); - return; - } - if (segmentId != desc.id) - { - handleReplayError(false, "Segment id mismatch (filename %d, descriptor %d) in file %s", segmentId, desc.id, file); - // continue processing if ignored. - } - - if (logAndCheckIfShouldSkip(file, desc)) - return; - - ICompressor compressor = null; - if (desc.compression != null) - { - try - { - compressor = CompressionParams.createCompressor(desc.compression); - } - catch (ConfigurationException e) - { - handleReplayError(false, "Unknown compression: %s", e.getMessage()); - return; - } - } - - assert reader.length() <= Integer.MAX_VALUE; - int end = (int) reader.getFilePointer(); - int replayEnd = end; - - while ((end = readSyncMarker(desc, end, reader, tolerateTruncation)) >= 0) - { - int replayPos = replayEnd + CommitLogSegment.SYNC_MARKER_SIZE; - - if (logger.isTraceEnabled()) - logger.trace("Replaying {} between {} and {}", file, reader.getFilePointer(), end); - if (compressor != null) - { - int uncompressedLength = reader.readInt(); - replayEnd = replayPos + uncompressedLength; - } - else - { - replayEnd = end; - } - - if (segmentId == globalPosition.segment && replayEnd < globalPosition.position) - // Skip over flushed section. - continue; - - FileDataInput sectionReader = reader; - String errorContext = desc.fileName(); - // In the uncompressed case the last non-fully-flushed section can be anywhere in the file. - boolean tolerateErrorsInSection = tolerateTruncation; - if (compressor != null) - { - // In the compressed case we know if this is the last section. - tolerateErrorsInSection &= end == reader.length() || end < 0; - - int start = (int) reader.getFilePointer(); - try - { - int compressedLength = end - start; - if (logger.isTraceEnabled()) - logger.trace("Decompressing {} between replay positions {} and {}", - file, - replayPos, - replayEnd); - if (compressedLength > buffer.length) - buffer = new byte[(int) (1.2 * compressedLength)]; - reader.readFully(buffer, 0, compressedLength); - int uncompressedLength = replayEnd - replayPos; - if (uncompressedLength > uncompressedBuffer.length) - uncompressedBuffer = new byte[(int) (1.2 * uncompressedLength)]; - compressedLength = compressor.uncompress(buffer, 0, compressedLength, uncompressedBuffer, 0); - sectionReader = new FileSegmentInputStream(ByteBuffer.wrap(uncompressedBuffer), reader.getPath(), replayPos); - errorContext = "compressed section at " + start + " in " + errorContext; - } - catch (IOException | ArrayIndexOutOfBoundsException e) - { - handleReplayError(tolerateErrorsInSection, - "Unexpected exception decompressing section at %d: %s", - start, e); - continue; - } - } - - if (!replaySyncSection(sectionReader, replayEnd, desc, errorContext, tolerateErrorsInSection)) - break; - } - logger.debug("Finished reading {}", file); - } - } - - public boolean logAndCheckIfShouldSkip(File file, CommitLogDescriptor desc) - { - logger.debug("Replaying {} (CL version {}, messaging version {}, compression {})", - file.getPath(), - desc.version, - desc.getMessagingVersion(), - desc.compression); - - if (globalPosition.segment > desc.id) - { - logger.trace("skipping replay of fully-flushed {}", file); - return true; - } - return false; - } - - /** - * Replays a sync section containing a list of mutations. - * - * @return Whether replay should continue with the next section. - */ - private boolean replaySyncSection(FileDataInput reader, int end, CommitLogDescriptor desc, String errorContext, boolean tolerateErrors) throws IOException - { - /* read the logs populate Mutation and apply */ - while (reader.getFilePointer() < end && !reader.isEOF()) - { - long mutationStart = reader.getFilePointer(); - if (logger.isTraceEnabled()) - logger.trace("Reading mutation at {}", mutationStart); - - long claimedCRC32; - int serializedSize; - try - { - // any of the reads may hit EOF - serializedSize = reader.readInt(); - if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER) - { - logger.trace("Encountered end of segment marker at {}", reader.getFilePointer()); - return false; - } - - // Mutation must be at LEAST 10 bytes: - // 3 each for a non-empty Keyspace and Key (including the - // 2-byte length from writeUTF/writeWithShortLength) and 4 bytes for column count. - // This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128 - if (serializedSize < 10) - { - handleReplayError(tolerateErrors, - "Invalid mutation size %d at %d in %s", - serializedSize, mutationStart, errorContext); - return false; - } - - long claimedSizeChecksum; - if (desc.version < CommitLogDescriptor.VERSION_21) - claimedSizeChecksum = reader.readLong(); - else - claimedSizeChecksum = reader.readInt() & 0xffffffffL; - checksum.reset(); - if (desc.version < CommitLogDescriptor.VERSION_20) - checksum.update(serializedSize); - else - updateChecksumInt(checksum, serializedSize); - - if (checksum.getValue() != claimedSizeChecksum) - { - handleReplayError(tolerateErrors, - "Mutation size checksum failure at %d in %s", - mutationStart, errorContext); - return false; - } - // ok. - - if (serializedSize > buffer.length) - buffer = new byte[(int) (1.2 * serializedSize)]; - reader.readFully(buffer, 0, serializedSize); - if (desc.version < CommitLogDescriptor.VERSION_21) - claimedCRC32 = reader.readLong(); - else - claimedCRC32 = reader.readInt() & 0xffffffffL; - } - catch (EOFException eof) - { - handleReplayError(tolerateErrors, - "Unexpected end of segment", - mutationStart, errorContext); - return false; // last CL entry didn't get completely written. that's ok. - } - - checksum.update(buffer, 0, serializedSize); - if (claimedCRC32 != checksum.getValue()) - { - handleReplayError(tolerateErrors, - "Mutation checksum failure at %d in %s", - mutationStart, errorContext); - continue; - } - replayMutation(buffer, serializedSize, (int) reader.getFilePointer(), desc); - } - return true; - } - - /** - * Deserializes and replays a commit log entry. - */ - void replayMutation(byte[] inputBuffer, int size, - final int entryLocation, final CommitLogDescriptor desc) throws IOException - { - - final Mutation mutation; - try (RebufferingInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size)) - { - mutation = Mutation.serializer.deserialize(bufIn, - desc.getMessagingVersion(), - SerializationHelper.Flag.LOCAL); - // doublecheck that what we read is [still] valid for the current schema - for (PartitionUpdate upd : mutation.getPartitionUpdates()) - upd.validate(); - } - catch (UnknownColumnFamilyException ex) - { - if (ex.cfId == null) - return; - AtomicInteger i = invalidMutations.get(ex.cfId); - if (i == null) - { - i = new AtomicInteger(1); - invalidMutations.put(ex.cfId, i); - } - else - i.incrementAndGet(); - return; - } - catch (Throwable t) - { - JVMStabilityInspector.inspectThrowable(t); - File f = File.createTempFile("mutation", "dat"); - - try (DataOutputStream out = new DataOutputStream(new FileOutputStream(f))) - { - out.write(inputBuffer, 0, size); - } - - // Checksum passed so this error can't be permissible. - handleReplayError(false, - "Unexpected error deserializing mutation; saved to %s. " + - "This may be caused by replaying a mutation against a table with the same name but incompatible schema. " + - "Exception follows: %s", - f.getAbsolutePath(), - t); - return; - } - - if (logger.isTraceEnabled()) - logger.trace("replaying mutation for {}.{}: {}", mutation.getKeyspaceName(), mutation.key(), "{" + StringUtils.join(mutation.getPartitionUpdates().iterator(), ", ") + "}"); - - Runnable runnable = new WrappedRunnable() - { - public void runMayThrow() - { - if (Schema.instance.getKSMetaData(mutation.getKeyspaceName()) == null) - return; - if (pointInTimeExceeded(mutation)) - return; - - final Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName()); - - // Rebuild the mutation, omitting column families that - // a) the user has requested that we ignore, - // b) have already been flushed, - // or c) are part of a cf that was dropped. - // Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead. - Mutation newMutation = null; - for (PartitionUpdate update : replayFilter.filter(mutation)) - { - if (Schema.instance.getCF(update.metadata().cfId) == null) - continue; // dropped - - // replay if current segment is newer than last flushed one or, - // if it is the last known segment, if we are after the replay position - if (shouldReplay(update.metadata().cfId, new ReplayPosition(desc.id, entryLocation))) - { - if (newMutation == null) - newMutation = new Mutation(mutation.getKeyspaceName(), mutation.key()); - newMutation.add(update); - replayedCount.incrementAndGet(); - } - } - if (newMutation != null) - { - assert !newMutation.isEmpty(); - - try - { - Uninterruptibles.getUninterruptibly(Keyspace.open(newMutation.getKeyspaceName()).applyFromCommitLog(newMutation)); - } - catch (ExecutionException e) - { - throw Throwables.propagate(e.getCause()); - } - - keyspacesRecovered.add(keyspace); - } - } - }; - futures.add(StageManager.getStage(Stage.MUTATION).submit(runnable)); - if (futures.size() > MAX_OUTSTANDING_REPLAY_COUNT) - { - FBUtilities.waitOnFutures(futures); - futures.clear(); - } - } - protected boolean pointInTimeExceeded(Mutation fm) { long restoreTarget = archiver.restorePointInTime; http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java index a1158be,d2f12bf..e32c204 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java @@@ -32,13 -40,16 +32,14 @@@ import org.cliffc.high_scale_lib.NonBlo import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.config.Schema; +import com.codahale.metrics.Timer; +import org.apache.cassandra.config.*; import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.commitlog.CommitLog.Configuration; import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.io.FSWriteError; -import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.utils.CLibrary; + import org.apache.cassandra.utils.IntegerInterval; import org.apache.cassandra.utils.concurrent.OpOrder; import org.apache.cassandra.utils.concurrent.WaitQueue; @@@ -461,22 -448,18 +475,19 @@@ public abstract class CommitLogSegmen * given context argument is contained in this file, it will only mark the CF as * clean if no newer writes have taken place. * -- * @param cfId the column family ID that is now clean -- * @param context the optional clean offset ++ * @param cfId the column family ID that is now clean ++ * @param startPosition the start of the range that is clean ++ * @param endPosition the end of the range that is clean */ - public synchronized void markClean(UUID cfId, CommitLogPosition context) - public synchronized void markClean(UUID cfId, ReplayPosition startPosition, ReplayPosition endPosition) ++ public synchronized void markClean(UUID cfId, CommitLogPosition startPosition, CommitLogPosition endPosition) { - if (startPosition.segment > id || endPosition.segment < id) ++ if (startPosition.segmentId > id || endPosition.segmentId < id) + return; if (!cfDirty.containsKey(cfId)) return; - if (context.segmentId == id) - markClean(cfId, context.position); - else if (context.segmentId > id) - markClean(cfId, Integer.MAX_VALUE); - } - - private void markClean(UUID cfId, int position) - { - ensureAtleast(cfClean, cfId, position); - int start = startPosition.segment == id ? startPosition.position : 0; - int end = endPosition.segment == id ? endPosition.position : Integer.MAX_VALUE; ++ int start = startPosition.segmentId == id ? startPosition.position : 0; ++ int end = endPosition.segmentId == id ? endPosition.position : Integer.MAX_VALUE; + cfClean.computeIfAbsent(cfId, k -> new IntegerInterval.Set()).add(start, end); removeCleanFromDirty(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java ----------------------------------------------------------------------