Change commitlog and sstables to track dirty and clean intervals. patch by Branimir Lambov; reviewed by Sylvain Lebresne for CASSANDRA-11828
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/904cb5d1 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/904cb5d1 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/904cb5d1 Branch: refs/heads/cassandra-3.0 Commit: 904cb5d10e0de1a6ca89249be8c257ed38a80ef0 Parents: cf85f52 Author: Branimir Lambov <branimir.lam...@datastax.com> Authored: Sat May 14 11:31:16 2016 +0300 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Fri Aug 5 15:38:37 2016 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/db/BlacklistedDirectories.java | 13 + .../apache/cassandra/db/ColumnFamilyStore.java | 66 +--- .../org/apache/cassandra/db/Directories.java | 2 +- src/java/org/apache/cassandra/db/Memtable.java | 18 +- .../cassandra/db/commitlog/CommitLog.java | 11 +- .../db/commitlog/CommitLogReplayer.java | 59 +++- .../db/commitlog/CommitLogSegment.java | 77 ++--- .../db/commitlog/CommitLogSegmentManager.java | 4 +- .../cassandra/db/commitlog/IntervalSet.java | 192 +++++++++++ .../cassandra/db/commitlog/ReplayPosition.java | 71 ---- .../compaction/AbstractCompactionStrategy.java | 3 + .../compaction/CompactionStrategyManager.java | 3 + .../apache/cassandra/db/lifecycle/Tracker.java | 44 +-- .../org/apache/cassandra/db/lifecycle/View.java | 36 +- .../cassandra/io/sstable/format/Version.java | 2 + .../io/sstable/format/big/BigFormat.java | 14 +- .../metadata/LegacyMetadataSerializer.java | 17 +- .../io/sstable/metadata/MetadataCollector.java | 38 +-- .../io/sstable/metadata/StatsMetadata.java | 44 +-- .../cassandra/tools/SSTableMetadataViewer.java | 3 +- .../apache/cassandra/utils/IntegerInterval.java | 227 +++++++++++++ .../legacy_mc_clust/mc-1-big-CompressionInfo.db | Bin 0 -> 83 bytes .../legacy_mc_clust/mc-1-big-Data.db | Bin 0 -> 5355 bytes .../legacy_mc_clust/mc-1-big-Digest.crc32 | 1 + .../legacy_mc_clust/mc-1-big-Filter.db | Bin 0 -> 24 bytes .../legacy_mc_clust/mc-1-big-Index.db | Bin 0 -> 157553 bytes .../legacy_mc_clust/mc-1-big-Statistics.db | Bin 0 -> 7086 bytes .../legacy_mc_clust/mc-1-big-Summary.db | Bin 0 -> 47 bytes .../legacy_mc_clust/mc-1-big-TOC.txt | 8 + .../mc-1-big-CompressionInfo.db | Bin 0 -> 83 bytes .../legacy_mc_clust_compact/mc-1-big-Data.db | Bin 0 -> 5382 bytes .../mc-1-big-Digest.crc32 | 1 + .../legacy_mc_clust_compact/mc-1-big-Filter.db | Bin 0 -> 24 bytes .../legacy_mc_clust_compact/mc-1-big-Index.db | Bin 0 -> 157553 bytes .../mc-1-big-Statistics.db | Bin 0 -> 7086 bytes .../legacy_mc_clust_compact/mc-1-big-Summary.db | Bin 0 -> 47 bytes .../legacy_mc_clust_compact/mc-1-big-TOC.txt | 8 + .../mc-1-big-CompressionInfo.db | Bin 0 -> 75 bytes .../legacy_mc_clust_counter/mc-1-big-Data.db | Bin 0 -> 4631 bytes .../mc-1-big-Digest.crc32 | 1 + .../legacy_mc_clust_counter/mc-1-big-Filter.db | Bin 0 -> 24 bytes .../legacy_mc_clust_counter/mc-1-big-Index.db | Bin 0 -> 157553 bytes .../mc-1-big-Statistics.db | Bin 0 -> 7095 bytes .../legacy_mc_clust_counter/mc-1-big-Summary.db | Bin 0 -> 47 bytes .../legacy_mc_clust_counter/mc-1-big-TOC.txt | 8 + .../mc-1-big-CompressionInfo.db | Bin 0 -> 75 bytes .../mc-1-big-Data.db | Bin 0 -> 4625 bytes .../mc-1-big-Digest.crc32 | 1 + .../mc-1-big-Filter.db | Bin 0 -> 24 bytes .../mc-1-big-Index.db | Bin 0 -> 157553 bytes .../mc-1-big-Statistics.db | Bin 0 -> 7095 bytes .../mc-1-big-Summary.db | Bin 0 -> 47 bytes .../mc-1-big-TOC.txt | 8 + .../mc-1-big-CompressionInfo.db | Bin 0 -> 43 bytes .../legacy_mc_simple/mc-1-big-Data.db | Bin 0 -> 89 bytes .../legacy_mc_simple/mc-1-big-Digest.crc32 | 1 + .../legacy_mc_simple/mc-1-big-Filter.db | Bin 0 -> 24 bytes .../legacy_mc_simple/mc-1-big-Index.db | Bin 0 -> 26 bytes .../legacy_mc_simple/mc-1-big-Statistics.db | Bin 0 -> 4639 bytes .../legacy_mc_simple/mc-1-big-Summary.db | Bin 0 -> 47 bytes .../legacy_mc_simple/mc-1-big-TOC.txt | 8 + .../mc-1-big-CompressionInfo.db | Bin 0 -> 43 bytes .../legacy_mc_simple_compact/mc-1-big-Data.db | Bin 0 -> 91 bytes .../mc-1-big-Digest.crc32 | 1 + .../legacy_mc_simple_compact/mc-1-big-Filter.db | Bin 0 -> 24 bytes .../legacy_mc_simple_compact/mc-1-big-Index.db | Bin 0 -> 26 bytes .../mc-1-big-Statistics.db | Bin 0 -> 4680 bytes .../mc-1-big-Summary.db | Bin 0 -> 47 bytes .../legacy_mc_simple_compact/mc-1-big-TOC.txt | 8 + .../mc-1-big-CompressionInfo.db | Bin 0 -> 43 bytes .../legacy_mc_simple_counter/mc-1-big-Data.db | Bin 0 -> 110 bytes .../mc-1-big-Digest.crc32 | 1 + .../legacy_mc_simple_counter/mc-1-big-Filter.db | Bin 0 -> 24 bytes .../legacy_mc_simple_counter/mc-1-big-Index.db | Bin 0 -> 27 bytes .../mc-1-big-Statistics.db | Bin 0 -> 4648 bytes .../mc-1-big-Summary.db | Bin 0 -> 47 bytes .../legacy_mc_simple_counter/mc-1-big-TOC.txt | 8 + .../mc-1-big-CompressionInfo.db | Bin 0 -> 43 bytes .../mc-1-big-Data.db | Bin 0 -> 114 bytes .../mc-1-big-Digest.crc32 | 1 + .../mc-1-big-Filter.db | Bin 0 -> 24 bytes .../mc-1-big-Index.db | Bin 0 -> 27 bytes .../mc-1-big-Statistics.db | Bin 0 -> 4689 bytes .../mc-1-big-Summary.db | Bin 0 -> 47 bytes .../mc-1-big-TOC.txt | 8 + .../db/commitlog/CommitLogStressTest.java | 2 +- test/unit/org/apache/cassandra/Util.java | 21 +- .../org/apache/cassandra/cql3/CQLTester.java | 12 +- .../apache/cassandra/cql3/OutOfSpaceTest.java | 33 +- .../cassandra/db/commitlog/CommitLogTest.java | 159 ++++++++- .../cassandra/db/compaction/NeverPurgeTest.java | 6 +- .../cassandra/db/lifecycle/TrackerTest.java | 12 +- .../apache/cassandra/db/lifecycle/ViewTest.java | 2 +- .../cassandra/io/sstable/LegacySSTableTest.java | 2 +- .../io/sstable/SSTableRewriterTest.java | 4 +- .../metadata/MetadataSerializerTest.java | 16 +- .../cassandra/utils/IntegerIntervalsTest.java | 326 +++++++++++++++++++ 98 files changed, 1229 insertions(+), 383 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/904cb5d1/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 046c8b3..b596fc9 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.9 + * Change commitlog and sstables to track dirty and clean intervals (CASSANDRA-11828) * NullPointerException during compaction on table with static columns (CASSANDRA-12336) * Fixed ConcurrentModificationException when reading metrics in GraphiteReporter (CASSANDRA-11823) * Fix upgrade of super columns on thrift (CASSANDRA-12335) http://git-wip-us.apache.org/repos/asf/cassandra/blob/904cb5d1/src/java/org/apache/cassandra/db/BlacklistedDirectories.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/BlacklistedDirectories.java b/src/java/org/apache/cassandra/db/BlacklistedDirectories.java index f47fd57..bc733d7 100644 --- a/src/java/org/apache/cassandra/db/BlacklistedDirectories.java +++ b/src/java/org/apache/cassandra/db/BlacklistedDirectories.java @@ -29,6 +29,8 @@ import java.util.concurrent.CopyOnWriteArraySet; import javax.management.MBeanServer; import javax.management.ObjectName; +import com.google.common.annotations.VisibleForTesting; + import org.apache.cassandra.utils.JVMStabilityInspector; public class BlacklistedDirectories implements BlacklistedDirectoriesMBean @@ -101,6 +103,17 @@ public class BlacklistedDirectories implements BlacklistedDirectoriesMBean } /** + * Testing only! + * Clear the set of unwritable directories. + */ + @VisibleForTesting + public static void clearUnwritableUnsafe() + { + instance.unwritableDirectories.clear(); + } + + + /** * Tells whether or not the directory is blacklisted for reads. * @return whether or not the directory is blacklisted for reads. */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/904cb5d1/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 400fd36..82604e2 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -179,9 +179,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean } } - @VisibleForTesting - public static volatile ColumnFamilyStore discardFlushResults; - public final Keyspace keyspace; public final String name; public final CFMetaData metadata; @@ -926,25 +923,18 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean final OpOrder.Barrier writeBarrier; final CountDownLatch latch = new CountDownLatch(1); volatile FSWriteError flushFailure = null; - final ReplayPosition commitLogUpperBound; final List<Memtable> memtables; - final List<Collection<SSTableReader>> readers; - private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier, ReplayPosition commitLogUpperBound, - List<Memtable> memtables, List<Collection<SSTableReader>> readers) + private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier, + List<Memtable> memtables) { this.writeBarrier = writeBarrier; this.flushSecondaryIndexes = flushSecondaryIndexes; - this.commitLogUpperBound = commitLogUpperBound; this.memtables = memtables; - this.readers = readers; } public ReplayPosition call() { - if (discardFlushResults == ColumnFamilyStore.this) - return commitLogUpperBound; - writeBarrier.await(); /** @@ -968,17 +958,13 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean throw new IllegalStateException(); } + ReplayPosition commitLogUpperBound = ReplayPosition.NONE; // If a flush errored out but the error was ignored, make sure we don't discard the commit log. - if (flushFailure == null) + if (flushFailure == null && !memtables.isEmpty()) { - CommitLog.instance.discardCompletedSegments(metadata.cfId, commitLogUpperBound); - for (int i = 0 ; i < memtables.size() ; i++) - { - Memtable memtable = memtables.get(i); - Collection<SSTableReader> reader = readers.get(i); - memtable.cfs.data.permitCompactionOfFlushed(reader); - memtable.cfs.compactionStrategyManager.replaceFlushed(memtable, reader); - } + Memtable memtable = memtables.get(0); + commitLogUpperBound = memtable.getCommitLogUpperBound(); + CommitLog.instance.discardCompletedSegments(metadata.cfId, memtable.getCommitLogLowerBound(), commitLogUpperBound); } metric.pendingFlushes.dec(); @@ -1002,7 +988,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean { final OpOrder.Barrier writeBarrier; final List<Memtable> memtables = new ArrayList<>(); - final List<Collection<SSTableReader>> readers = new ArrayList<>(); final PostFlush postFlush; final boolean truncate; @@ -1044,7 +1029,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean // since this happens after wiring up the commitLogUpperBound, we also know all operations with earlier // replay positions have also completed, i.e. the memtables are done and ready to flush writeBarrier.issue(); - postFlush = new PostFlush(!truncate, writeBarrier, commitLogUpperBound.get(), memtables, readers); + postFlush = new PostFlush(!truncate, writeBarrier, memtables); } public void run() @@ -1063,8 +1048,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean memtable.cfs.data.markFlushing(memtable); if (memtable.isClean() || truncate) { - memtable.cfs.data.replaceFlushed(memtable, Collections.emptyList()); - memtable.cfs.compactionStrategyManager.replaceFlushed(memtable, Collections.emptyList()); + memtable.cfs.replaceFlushed(memtable, Collections.emptyList()); reclaim(memtable); iter.remove(); } @@ -1077,9 +1061,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean for (Memtable memtable : memtables) { Collection<SSTableReader> readers = memtable.flush(); - memtable.cfs.data.replaceFlushed(memtable, readers); + memtable.cfs.replaceFlushed(memtable, readers); reclaim(memtable); - this.readers.add(readers); } } catch (FSWriteError e) @@ -1126,21 +1109,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean } } - @VisibleForTesting - // this method should ONLY be used for testing commit log behaviour; it discards the current memtable - // contents without marking the commit log clean, and prevents any proceeding flushes from marking - // the commit log as done, however they *will* terminate (unlike under typical failures) to ensure progress is made - public void simulateFailedFlush() - { - discardFlushResults = this; - data.markFlushing(data.switchMemtable(false, new Memtable(new AtomicReference<>(CommitLog.instance.getContext()), this))); - } - - public void resumeFlushing() - { - discardFlushResults = null; - } - /** * Finds the largest memtable, as a percentage of *either* on- or off-heap memory limits, and immediately * queues it for flushing. If the memtable selected is flushed before this completes, no work is done. @@ -1483,16 +1451,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean return data; } - public Collection<SSTableReader> getSSTables() - { - return data.getSSTables(); - } - - public Iterable<SSTableReader> getPermittedToCompactSSTables() - { - return data.getPermittedToCompact(); - } - public Set<SSTableReader> getLiveSSTables() { return data.getView().liveSSTables(); @@ -2032,7 +1990,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean long now = System.currentTimeMillis(); // make sure none of our sstables are somehow in the future (clock drift, perhaps) for (ColumnFamilyStore cfs : concatWithIndexes()) - for (SSTableReader sstable : cfs.data.getSSTables()) + for (SSTableReader sstable : cfs.getLiveSSTables()) now = Math.max(now, sstable.maxDataAge); truncatedAt = now; @@ -2130,7 +2088,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean public LifecycleTransaction call() throws Exception { assert data.getCompacting().isEmpty() : data.getCompacting(); - Iterable<SSTableReader> sstables = getPermittedToCompactSSTables(); + Iterable<SSTableReader> sstables = getLiveSSTables(); sstables = AbstractCompactionStrategy.filterSuspectSSTables(sstables); sstables = ImmutableList.copyOf(sstables); LifecycleTransaction modifier = data.tryModify(sstables, operationType); http://git-wip-us.apache.org/repos/asf/cassandra/blob/904cb5d1/src/java/org/apache/cassandra/db/Directories.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Directories.java b/src/java/org/apache/cassandra/db/Directories.java index 01ffd52..877f984 100644 --- a/src/java/org/apache/cassandra/db/Directories.java +++ b/src/java/org/apache/cassandra/db/Directories.java @@ -385,7 +385,7 @@ public class Directories if (candidates.isEmpty()) if (tooBig) - throw new RuntimeException("Insufficient disk space to write " + writeSize + " bytes"); + throw new FSWriteError(new IOException("Insufficient disk space to write " + writeSize + " bytes"), ""); else throw new FSWriteError(new IOException("All configured data directories have been blacklisted as unwritable for erroring out"), ""); http://git-wip-us.apache.org/repos/asf/cassandra/blob/904cb5d1/src/java/org/apache/cassandra/db/Memtable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java index 93dc5af..3c77092 100644 --- a/src/java/org/apache/cassandra/db/Memtable.java +++ b/src/java/org/apache/cassandra/db/Memtable.java @@ -33,6 +33,7 @@ import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.commitlog.CommitLog; +import org.apache.cassandra.db.commitlog.IntervalSet; import org.apache.cassandra.db.commitlog.ReplayPosition; import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.filter.ClusteringIndexFilter; @@ -193,6 +194,11 @@ public class Memtable implements Comparable<Memtable> return commitLogLowerBound.get(); } + public ReplayPosition getCommitLogUpperBound() + { + return commitLogUpperBound.get(); + } + public boolean isLive() { return allocator.isLive(); @@ -331,6 +337,15 @@ public class Memtable implements Comparable<Memtable> return minTimestamp; } + /** + * For testing only. Give this memtable too big a size to make it always fail flushing. + */ + @VisibleForTesting + public void makeUnflushable() + { + liveDataSize.addAndGet(1L * 1024 * 1024 * 1024 * 1024 * 1024); + } + private long estimatedSize() { long keySize = 0; @@ -418,8 +433,7 @@ public class Memtable implements Comparable<Memtable> { txn = LifecycleTransaction.offline(OperationType.FLUSH); MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.metadata.comparator) - .commitLogLowerBound(commitLogLowerBound.get()) - .commitLogUpperBound(commitLogUpperBound.get()); + .commitLogIntervals(new IntervalSet(commitLogLowerBound.get(), commitLogUpperBound.get())); return new SSTableTxnWriter(txn, cfs.createSSTableMultiWriter(Descriptor.fromFilename(filename), http://git-wip-us.apache.org/repos/asf/cassandra/blob/904cb5d1/src/java/org/apache/cassandra/db/commitlog/CommitLog.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java index dcdd855..dfe3f91 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java @@ -290,11 +290,12 @@ public class CommitLog implements CommitLogMBean * given. Discards any commit log segments that are no longer used. * * @param cfId the column family ID that was flushed - * @param context the replay position of the flush + * @param lowerBound the lowest covered replay position of the flush + * @param lowerBound the highest covered replay position of the flush */ - public void discardCompletedSegments(final UUID cfId, final ReplayPosition context) + public void discardCompletedSegments(final UUID cfId, final ReplayPosition lowerBound, final ReplayPosition upperBound) { - logger.trace("discard completed log segments for {}, table {}", context, cfId); + logger.trace("discard completed log segments for {}-{}, table {}", lowerBound, upperBound, cfId); // Go thru the active segment files, which are ordered oldest to newest, marking the // flushed CF as clean, until we reach the segment file containing the ReplayPosition passed @@ -303,7 +304,7 @@ public class CommitLog implements CommitLogMBean for (Iterator<CommitLogSegment> iter = allocator.getActiveSegments().iterator(); iter.hasNext();) { CommitLogSegment segment = iter.next(); - segment.markClean(cfId, context); + segment.markClean(cfId, lowerBound, upperBound); if (segment.isUnused()) { @@ -318,7 +319,7 @@ public class CommitLog implements CommitLogMBean // Don't mark or try to delete any newer segments once we've reached the one containing the // position of the flush. - if (segment.contains(context)) + if (segment.contains(upperBound)) break; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/904cb5d1/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java index f45a47a..af8efb4 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java @@ -35,6 +35,7 @@ import com.google.common.base.Throwables; import com.google.common.collect.HashMultimap; import com.google.common.collect.Iterables; import com.google.common.collect.Multimap; +import com.google.common.collect.Ordering; import com.google.common.util.concurrent.Uninterruptibles; import org.apache.commons.lang3.StringUtils; @@ -52,6 +53,7 @@ import org.apache.cassandra.io.util.FileSegmentInputStream; import org.apache.cassandra.io.util.RebufferingInputStream; import org.apache.cassandra.schema.CompressionParams; import org.apache.cassandra.io.compress.ICompressor; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.ChannelProxy; import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.FileDataInput; @@ -74,7 +76,7 @@ public class CommitLogReplayer private final List<Future<?>> futures; private final Map<UUID, AtomicInteger> invalidMutations; private final AtomicInteger replayedCount; - private final Map<UUID, ReplayPosition.ReplayFilter> cfPersisted; + private final Map<UUID, IntervalSet<ReplayPosition>> cfPersisted; private final ReplayPosition globalPosition; private final CRC32 checksum; private byte[] buffer; @@ -83,7 +85,7 @@ public class CommitLogReplayer private final ReplayFilter replayFilter; private final CommitLogArchiver archiver; - CommitLogReplayer(CommitLog commitLog, ReplayPosition globalPosition, Map<UUID, ReplayPosition.ReplayFilter> cfPersisted, ReplayFilter replayFilter) + CommitLogReplayer(CommitLog commitLog, ReplayPosition globalPosition, Map<UUID, IntervalSet<ReplayPosition>> cfPersisted, ReplayFilter replayFilter) { this.keyspacesRecovered = new NonBlockingHashSet<Keyspace>(); this.futures = new ArrayList<Future<?>>(); @@ -101,10 +103,9 @@ public class CommitLogReplayer public static CommitLogReplayer construct(CommitLog commitLog) { - // compute per-CF and global replay positions - Map<UUID, ReplayPosition.ReplayFilter> cfPersisted = new HashMap<>(); + // compute per-CF and global replay intervals + Map<UUID, IntervalSet<ReplayPosition>> cfPersisted = new HashMap<>(); ReplayFilter replayFilter = ReplayFilter.create(); - ReplayPosition globalPosition = null; for (ColumnFamilyStore cfs : ColumnFamilyStore.all()) { // but, if we've truncated the cf in question, then we need to need to start replay after the truncation @@ -129,14 +130,10 @@ public class CommitLogReplayer } } - ReplayPosition.ReplayFilter filter = new ReplayPosition.ReplayFilter(cfs.getSSTables(), truncatedAt); - if (!filter.isEmpty()) - cfPersisted.put(cfs.metadata.cfId, filter); - else - globalPosition = ReplayPosition.NONE; // if we have no ranges for this CF, we must replay everything and filter + IntervalSet<ReplayPosition> filter = persistedIntervals(cfs.getLiveSSTables(), truncatedAt); + cfPersisted.put(cfs.metadata.cfId, filter); } - if (globalPosition == null) - globalPosition = ReplayPosition.firstNotCovered(cfPersisted.values()); + ReplayPosition globalPosition = firstNotCovered(cfPersisted.values()); logger.debug("Global replay position is {} from columnfamilies {}", globalPosition, FBUtilities.toString(cfPersisted)); return new CommitLogReplayer(commitLog, globalPosition, cfPersisted, replayFilter); } @@ -148,6 +145,41 @@ public class CommitLogReplayer recover(clogs[i], i + 1 == clogs.length); } + /** + * A set of known safe-to-discard commit log replay positions, based on + * the range covered by on disk sstables and those prior to the most recent truncation record + */ + public static IntervalSet<ReplayPosition> persistedIntervals(Iterable<SSTableReader> onDisk, ReplayPosition truncatedAt) + { + IntervalSet.Builder<ReplayPosition> builder = new IntervalSet.Builder<>(); + for (SSTableReader reader : onDisk) + builder.addAll(reader.getSSTableMetadata().commitLogIntervals); + + if (truncatedAt != null) + builder.add(ReplayPosition.NONE, truncatedAt); + return builder.build(); + } + + /** + * Find the earliest commit log position that is not covered by the known flushed ranges for some table. + * + * For efficiency this assumes that the first contiguously flushed interval we know of contains the moment that the + * given table was constructed* and hence we can start replay from the end of that interval. + * + * If such an interval is not known, we must replay from the beginning. + * + * * This is not true only until if the very first flush of a table stalled or failed, while the second or latter + * succeeded. The chances of this happening are at most very low, and if the assumption does prove to be + * incorrect during replay there is little chance that the affected deployment is in production. + */ + public static ReplayPosition firstNotCovered(Collection<IntervalSet<ReplayPosition>> ranges) + { + return ranges.stream() + .map(intervals -> Iterables.getFirst(intervals.ends(), ReplayPosition.NONE)) + .min(Ordering.natural()) + .get(); // iteration is per known-CF, there must be at least one. + } + public int blockForWrites() { for (Map.Entry<UUID, AtomicInteger> entry : invalidMutations.entrySet()) @@ -293,8 +325,7 @@ public class CommitLogReplayer */ private boolean shouldReplay(UUID cfId, ReplayPosition position) { - ReplayPosition.ReplayFilter filter = cfPersisted.get(cfId); - return filter == null || filter.shouldReplay(position); + return !cfPersisted.get(cfId).contains(position); } @SuppressWarnings("resource") http://git-wip-us.apache.org/repos/asf/cassandra/blob/904cb5d1/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java index 27c05b4..d2f12bf 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java @@ -49,6 +49,7 @@ import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.utils.CLibrary; +import org.apache.cassandra.utils.IntegerInterval; import org.apache.cassandra.utils.concurrent.OpOrder; import org.apache.cassandra.utils.concurrent.WaitQueue; @@ -101,11 +102,11 @@ public abstract class CommitLogSegment // a signal for writers to wait on to confirm the log message they provided has been written to disk private final WaitQueue syncComplete = new WaitQueue(); - // a map of Cf->dirty position; this is used to permit marking Cfs clean whilst the log is still in use - private final NonBlockingHashMap<UUID, AtomicInteger> cfDirty = new NonBlockingHashMap<>(1024); + // a map of Cf->dirty interval in this segment; if interval is not covered by the clean set, the log contains unflushed data + private final NonBlockingHashMap<UUID, IntegerInterval> cfDirty = new NonBlockingHashMap<>(1024); - // a map of Cf->clean position; this is used to permit marking Cfs clean whilst the log is still in use - private final ConcurrentHashMap<UUID, AtomicInteger> cfClean = new ConcurrentHashMap<>(); + // a map of Cf->clean intervals; separate map from above to permit marking Cfs clean whilst the log is still in use + private final ConcurrentHashMap<UUID, IntegerInterval.Set> cfClean = new ConcurrentHashMap<>(); public final long id; @@ -423,10 +424,23 @@ public abstract class CommitLogSegment } } + public static<K> void coverInMap(ConcurrentMap<K, IntegerInterval> map, K key, int value) + { + IntegerInterval i = map.get(key); + if (i == null) + { + i = map.putIfAbsent(key, new IntegerInterval(value, value)); + if (i == null) + // success + return; + } + i.expandToCover(value); + } + void markDirty(Mutation mutation, int allocatedPosition) { for (PartitionUpdate update : mutation.getPartitionUpdates()) - ensureAtleast(cfDirty, update.metadata().cfId, allocatedPosition); + coverInMap(cfDirty, update.metadata().cfId, allocatedPosition); } /** @@ -437,55 +451,32 @@ public abstract class CommitLogSegment * @param cfId the column family ID that is now clean * @param context the optional clean offset */ - public synchronized void markClean(UUID cfId, ReplayPosition context) + public synchronized void markClean(UUID cfId, ReplayPosition startPosition, ReplayPosition endPosition) { + if (startPosition.segment > id || endPosition.segment < id) + return; if (!cfDirty.containsKey(cfId)) return; - if (context.segment == id) - markClean(cfId, context.position); - else if (context.segment > id) - markClean(cfId, Integer.MAX_VALUE); - } - - private void markClean(UUID cfId, int position) - { - ensureAtleast(cfClean, cfId, position); + int start = startPosition.segment == id ? startPosition.position : 0; + int end = endPosition.segment == id ? endPosition.position : Integer.MAX_VALUE; + cfClean.computeIfAbsent(cfId, k -> new IntegerInterval.Set()).add(start, end); removeCleanFromDirty(); } - private static void ensureAtleast(ConcurrentMap<UUID, AtomicInteger> map, UUID cfId, int value) - { - AtomicInteger i = map.get(cfId); - if (i == null) - { - AtomicInteger i2 = map.putIfAbsent(cfId, i = new AtomicInteger()); - if (i2 != null) - i = i2; - } - while (true) - { - int cur = i.get(); - if (cur > value) - break; - if (i.compareAndSet(cur, value)) - break; - } - } - private void removeCleanFromDirty() { // if we're still allocating from this segment, don't touch anything since it can't be done thread-safely if (isStillAllocating()) return; - Iterator<Map.Entry<UUID, AtomicInteger>> iter = cfClean.entrySet().iterator(); + Iterator<Map.Entry<UUID, IntegerInterval.Set>> iter = cfClean.entrySet().iterator(); while (iter.hasNext()) { - Map.Entry<UUID, AtomicInteger> clean = iter.next(); + Map.Entry<UUID, IntegerInterval.Set> clean = iter.next(); UUID cfId = clean.getKey(); - AtomicInteger cleanPos = clean.getValue(); - AtomicInteger dirtyPos = cfDirty.get(cfId); - if (dirtyPos != null && dirtyPos.intValue() <= cleanPos.intValue()) + IntegerInterval.Set cleanSet = clean.getValue(); + IntegerInterval dirtyInterval = cfDirty.get(cfId); + if (dirtyInterval != null && cleanSet.covers(dirtyInterval)) { cfDirty.remove(cfId); iter.remove(); @@ -502,12 +493,12 @@ public abstract class CommitLogSegment return cfDirty.keySet(); List<UUID> r = new ArrayList<>(cfDirty.size()); - for (Map.Entry<UUID, AtomicInteger> dirty : cfDirty.entrySet()) + for (Map.Entry<UUID, IntegerInterval> dirty : cfDirty.entrySet()) { UUID cfId = dirty.getKey(); - AtomicInteger dirtyPos = dirty.getValue(); - AtomicInteger cleanPos = cfClean.get(cfId); - if (cleanPos == null || cleanPos.intValue() < dirtyPos.intValue()) + IntegerInterval dirtyInterval = dirty.getValue(); + IntegerInterval.Set cleanSet = cfClean.get(cfId); + if (cleanSet == null || !cleanSet.covers(dirtyInterval)) r.add(dirty.getKey()); } return r; http://git-wip-us.apache.org/repos/asf/cassandra/blob/904cb5d1/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java index 66ad6a3..82cee50 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java @@ -310,7 +310,7 @@ public class CommitLogSegmentManager for (CommitLogSegment segment : activeSegments) for (UUID cfId : droppedCfs) - segment.markClean(cfId, segment.getContext()); + segment.markClean(cfId, ReplayPosition.NONE, segment.getContext()); // now recycle segments that are unused, as we may not have triggered a discardCompletedSegments() // if the previous active segment was the only one to recycle (since an active segment isn't @@ -451,7 +451,7 @@ public class CommitLogSegmentManager // even though we remove the schema entry before a final flush when dropping a CF, // it's still possible for a writer to race and finish his append after the flush. logger.trace("Marking clean CF {} that doesn't exist anymore", dirtyCFId); - segment.markClean(dirtyCFId, segment.getContext()); + segment.markClean(dirtyCFId, ReplayPosition.NONE, segment.getContext()); } else if (!flushes.containsKey(dirtyCFId)) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/904cb5d1/src/java/org/apache/cassandra/db/commitlog/IntervalSet.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/IntervalSet.java b/src/java/org/apache/cassandra/db/commitlog/IntervalSet.java new file mode 100644 index 0000000..bd0ea22 --- /dev/null +++ b/src/java/org/apache/cassandra/db/commitlog/IntervalSet.java @@ -0,0 +1,192 @@ +package org.apache.cassandra.db.commitlog; + +import java.io.IOException; +import java.util.*; + +import com.google.common.collect.ImmutableSortedMap; + +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.ISerializer; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; + +/** + * An immutable set of closed intervals, stored in normalized form (i.e. where overlapping intervals are converted + * to a single interval covering both). + * + * The set is stored as a sorted map from interval starts to the corresponding end. The map satisfies + * curr().getKey() <= curr().getValue() < next().getKey() + */ +public class IntervalSet<T extends Comparable<T>> +{ + @SuppressWarnings({ "rawtypes", "unchecked" }) + private static final IntervalSet EMPTY = new IntervalSet(ImmutableSortedMap.of()); + + final private NavigableMap<T, T> ranges; + + private IntervalSet(ImmutableSortedMap<T, T> ranges) + { + this.ranges = ranges; + } + + /** + * Construct new set containing the interval with the given start and end position. + */ + public IntervalSet(T start, T end) + { + this(ImmutableSortedMap.of(start, end)); + } + + @SuppressWarnings("unchecked") + public static <T extends Comparable<T>> IntervalSet<T> empty() + { + return (IntervalSet<T>) EMPTY; + } + + public boolean contains(T position) + { + // closed (i.e. inclusive) intervals + Map.Entry<T, T> range = ranges.floorEntry(position); + return range != null && position.compareTo(range.getValue()) <= 0; + } + + public boolean isEmpty() + { + return ranges.isEmpty(); + } + + public Optional<T> lowerBound() + { + return isEmpty() ? Optional.empty() : Optional.of(ranges.firstKey()); + } + + public Optional<T> upperBound() + { + return isEmpty() ? Optional.empty() : Optional.of(ranges.lastEntry().getValue()); + } + + public Collection<T> starts() + { + return ranges.keySet(); + } + + public Collection<T> ends() + { + return ranges.values(); + } + + public String toString() + { + return ranges.toString(); + } + + @Override + public int hashCode() + { + return ranges.hashCode(); + } + + @Override + public boolean equals(Object obj) + { + return obj instanceof IntervalSet && ranges.equals(((IntervalSet<?>) obj).ranges); + } + + public static final <T extends Comparable<T>> ISerializer<IntervalSet<T>> serializer(ISerializer<T> pointSerializer) + { + return new ISerializer<IntervalSet<T>>() + { + public void serialize(IntervalSet<T> intervals, DataOutputPlus out) throws IOException + { + out.writeInt(intervals.ranges.size()); + for (Map.Entry<T, T> en : intervals.ranges.entrySet()) + { + pointSerializer.serialize(en.getKey(), out); + pointSerializer.serialize(en.getValue(), out); + } + } + + public IntervalSet<T> deserialize(DataInputPlus in) throws IOException + { + int count = in.readInt(); + NavigableMap<T, T> ranges = new TreeMap<>(); + for (int i = 0; i < count; ++i) + ranges.put(pointSerializer.deserialize(in), pointSerializer.deserialize(in)); + return new IntervalSet<T>(ImmutableSortedMap.copyOfSorted(ranges)); + } + + public long serializedSize(IntervalSet<T> intervals) + { + long size = TypeSizes.sizeof(intervals.ranges.size()); + for (Map.Entry<T, T> en : intervals.ranges.entrySet()) + { + size += pointSerializer.serializedSize(en.getKey()); + size += pointSerializer.serializedSize(en.getValue()); + } + return size; + } + }; + }; + + /** + * Builder of interval sets, applying the necessary normalization while adding ranges. + * + * Data is stored as above, as a sorted map from interval starts to the corresponding end, which satisfies + * curr().getKey() <= curr().getValue() < next().getKey() + */ + static public class Builder<T extends Comparable<T>> + { + final NavigableMap<T, T> ranges; + + public Builder() + { + this.ranges = new TreeMap<>(); + } + + public Builder(T start, T end) + { + this(); + assert start.compareTo(end) <= 0; + ranges.put(start, end); + } + + /** + * Add an interval to the set and perform normalization. + */ + public void add(T start, T end) + { + assert start.compareTo(end) <= 0; + // extend ourselves to cover any ranges we overlap + // record directly preceding our end may extend past us, so take the max of our end and its + Map.Entry<T, T> extend = ranges.floorEntry(end); + if (extend != null && extend.getValue().compareTo(end) > 0) + end = extend.getValue(); + + // record directly preceding our start may extend into us; if it does, we take it as our start + extend = ranges.lowerEntry(start); + if (extend != null && extend.getValue().compareTo(start) >= 0) + start = extend.getKey(); + + // remove all covered intervals + // since we have adjusted start and end to cover the ones that would be only partially covered, we + // are certain that anything whose start falls within the span is completely covered + ranges.subMap(start, end).clear(); + // add the new interval + ranges.put(start, end); + } + + public void addAll(IntervalSet<T> otherSet) + { + for (Map.Entry<T, T> en : otherSet.ranges.entrySet()) + { + add(en.getKey(), en.getValue()); + } + } + + public IntervalSet<T> build() + { + return new IntervalSet<T>(ImmutableSortedMap.copyOfSorted(ranges)); + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/904cb5d1/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java b/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java index 0b21763..b0214b8 100644 --- a/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java +++ b/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java @@ -18,15 +18,9 @@ package org.apache.cassandra.db.commitlog; import java.io.IOException; -import java.util.Map; -import java.util.NavigableMap; -import java.util.TreeMap; - -import com.google.common.collect.Ordering; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.ISerializer; -import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; @@ -43,71 +37,6 @@ public class ReplayPosition implements Comparable<ReplayPosition> public final long segment; public final int position; - /** - * A filter of known safe-to-discard commit log replay positions, based on - * the range covered by on disk sstables and those prior to the most recent truncation record - */ - public static class ReplayFilter - { - final NavigableMap<ReplayPosition, ReplayPosition> persisted = new TreeMap<>(); - public ReplayFilter(Iterable<SSTableReader> onDisk, ReplayPosition truncatedAt) - { - for (SSTableReader reader : onDisk) - { - ReplayPosition start = reader.getSSTableMetadata().commitLogLowerBound; - ReplayPosition end = reader.getSSTableMetadata().commitLogUpperBound; - add(persisted, start, end); - } - if (truncatedAt != null) - add(persisted, ReplayPosition.NONE, truncatedAt); - } - - private static void add(NavigableMap<ReplayPosition, ReplayPosition> ranges, ReplayPosition start, ReplayPosition end) - { - // extend ourselves to cover any ranges we overlap - // record directly preceding our end may extend past us, so take the max of our end and its - Map.Entry<ReplayPosition, ReplayPosition> extend = ranges.floorEntry(end); - if (extend != null && extend.getValue().compareTo(end) > 0) - end = extend.getValue(); - - // record directly preceding our start may extend into us; if it does, we take it as our start - extend = ranges.lowerEntry(start); - if (extend != null && extend.getValue().compareTo(start) >= 0) - start = extend.getKey(); - - ranges.subMap(start, end).clear(); - ranges.put(start, end); - } - - public boolean shouldReplay(ReplayPosition position) - { - // replay ranges are start exclusive, end inclusive - Map.Entry<ReplayPosition, ReplayPosition> range = persisted.lowerEntry(position); - return range == null || position.compareTo(range.getValue()) > 0; - } - - public boolean isEmpty() - { - return persisted.isEmpty(); - } - } - - public static ReplayPosition firstNotCovered(Iterable<ReplayFilter> ranges) - { - ReplayPosition min = null; - for (ReplayFilter map : ranges) - { - ReplayPosition first = map.persisted.firstEntry().getValue(); - if (min == null) - min = first; - else - min = Ordering.natural().min(min, first); - } - if (min == null) - return NONE; - return min; - } - public ReplayPosition(long segment, int position) { this.segment = segment; http://git-wip-us.apache.org/repos/asf/cassandra/blob/904cb5d1/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java index 0dce52b..a80a6f4 100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java @@ -241,6 +241,9 @@ public abstract class AbstractCompactionStrategy */ public void replaceFlushed(Memtable memtable, Collection<SSTableReader> sstables) { + cfs.getTracker().replaceFlushed(memtable, sstables); + if (sstables != null && !sstables.isEmpty()) + CompactionManager.instance.submitBackground(cfs); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/904cb5d1/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java index 444d43d..a9bfbd2 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java @@ -189,6 +189,9 @@ public class CompactionStrategyManager implements INotificationConsumer public void replaceFlushed(Memtable memtable, Collection<SSTableReader> sstables) { + cfs.getTracker().replaceFlushed(memtable, sstables); + if (sstables != null && !sstables.isEmpty()) + CompactionManager.instance.submitBackground(cfs); } public int getUnleveledSSTables() http://git-wip-us.apache.org/repos/asf/cassandra/blob/904cb5d1/src/java/org/apache/cassandra/db/lifecycle/Tracker.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java index c94b88f..5a3d524 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java +++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java @@ -37,7 +37,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.FileUtils; @@ -48,8 +47,6 @@ import org.apache.cassandra.utils.Throwables; import org.apache.cassandra.utils.concurrent.OpOrder; import static com.google.common.base.Predicates.and; -import static com.google.common.base.Predicates.in; -import static com.google.common.base.Predicates.not; import static com.google.common.collect.ImmutableSet.copyOf; import static com.google.common.collect.Iterables.filter; import static java.util.Collections.singleton; @@ -204,7 +201,6 @@ public class Tracker ImmutableList.<Memtable>of(), Collections.<SSTableReader, SSTableReader>emptyMap(), Collections.<SSTableReader, SSTableReader>emptyMap(), - Collections.<SSTableReader>emptySet(), SSTableIntervalTree.empty())); } @@ -351,49 +347,19 @@ public class Tracker Throwable fail; fail = updateSizeTracking(emptySet(), sstables, null); + // TODO: if we're invalidated, should we notifyadded AND removed, or just skip both? + fail = notifyAdded(sstables, fail); + + if (!isDummy() && !cfstore.isValid()) + dropSSTables(); maybeFail(fail); } - /** - * permit compaction of the provided sstable; this translates to notifying compaction - * strategies of its existence, and potentially submitting a background task - */ - public void permitCompactionOfFlushed(Collection<SSTableReader> sstables) - { - if (sstables.isEmpty()) - return; - - apply(View.permitCompactionOfFlushed(sstables)); - - if (isDummy()) - return; - - if (cfstore.isValid()) - { - notifyAdded(sstables); - CompactionManager.instance.submitBackground(cfstore); - } - else - { - dropSSTables(); - } - } // MISCELLANEOUS public utility calls - public Set<SSTableReader> getSSTables() - { - return view.get().sstables; - } - - public Iterable<SSTableReader> getPermittedToCompact() - { - View view = this.view.get(); - return filter(view.sstables, not(in(view.premature))); - } - public Set<SSTableReader> getCompacting() { return view.get().compacting; http://git-wip-us.apache.org/repos/asf/cassandra/blob/904cb5d1/src/java/org/apache/cassandra/db/lifecycle/View.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/lifecycle/View.java b/src/java/org/apache/cassandra/db/lifecycle/View.java index 3fa197f..4b3aae0 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/View.java +++ b/src/java/org/apache/cassandra/db/lifecycle/View.java @@ -33,7 +33,6 @@ import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.utils.Interval; import static com.google.common.base.Predicates.equalTo; -import static com.google.common.base.Predicates.in; import static com.google.common.base.Predicates.not; import static com.google.common.collect.ImmutableList.copyOf; import static com.google.common.collect.ImmutableList.of; @@ -70,7 +69,6 @@ public class View public final List<Memtable> flushingMemtables; final Set<SSTableReader> compacting; final Set<SSTableReader> sstables; - final Set<SSTableReader> premature; // we use a Map here so that we can easily perform identity checks as well as equality checks. // When marking compacting, we now indicate if we expect the sstables to be present (by default we do), // and we then check that not only are they all present in the live set, but that the exact instance present is @@ -80,7 +78,7 @@ public class View final SSTableIntervalTree intervalTree; - View(List<Memtable> liveMemtables, List<Memtable> flushingMemtables, Map<SSTableReader, SSTableReader> sstables, Map<SSTableReader, SSTableReader> compacting, Set<SSTableReader> premature, SSTableIntervalTree intervalTree) + View(List<Memtable> liveMemtables, List<Memtable> flushingMemtables, Map<SSTableReader, SSTableReader> sstables, Map<SSTableReader, SSTableReader> compacting, SSTableIntervalTree intervalTree) { assert liveMemtables != null; assert flushingMemtables != null; @@ -95,7 +93,6 @@ public class View this.sstables = sstablesMap.keySet(); this.compactingMap = compacting; this.compacting = compactingMap.keySet(); - this.premature = premature; this.intervalTree = intervalTree; } @@ -256,7 +253,7 @@ public class View assert all(mark, Helpers.idIn(view.sstablesMap)); return new View(view.liveMemtables, view.flushingMemtables, view.sstablesMap, replace(view.compactingMap, unmark, mark), - view.premature, view.intervalTree); + view.intervalTree); } }; } @@ -270,7 +267,7 @@ public class View public boolean apply(View view) { for (SSTableReader reader : readers) - if (view.compacting.contains(reader) || view.sstablesMap.get(reader) != reader || reader.isMarkedCompacted() || view.premature.contains(reader)) + if (view.compacting.contains(reader) || view.sstablesMap.get(reader) != reader || reader.isMarkedCompacted()) return false; return true; } @@ -287,7 +284,7 @@ public class View public View apply(View view) { Map<SSTableReader, SSTableReader> sstableMap = replace(view.sstablesMap, remove, add); - return new View(view.liveMemtables, view.flushingMemtables, sstableMap, view.compactingMap, view.premature, + return new View(view.liveMemtables, view.flushingMemtables, sstableMap, view.compactingMap, SSTableIntervalTree.build(sstableMap.keySet())); } }; @@ -302,7 +299,7 @@ public class View { List<Memtable> newLive = ImmutableList.<Memtable>builder().addAll(view.liveMemtables).add(newMemtable).build(); assert newLive.size() == view.liveMemtables.size() + 1; - return new View(newLive, view.flushingMemtables, view.sstablesMap, view.compactingMap, view.premature, view.intervalTree); + return new View(newLive, view.flushingMemtables, view.sstablesMap, view.compactingMap, view.intervalTree); } }; } @@ -321,7 +318,7 @@ public class View filter(flushing, not(lessThan(toFlush))))); assert newLive.size() == live.size() - 1; assert newFlushing.size() == flushing.size() + 1; - return new View(newLive, newFlushing, view.sstablesMap, view.compactingMap, view.premature, view.intervalTree); + return new View(newLive, newFlushing, view.sstablesMap, view.compactingMap, view.intervalTree); } }; } @@ -338,32 +335,15 @@ public class View if (flushed == null || flushed.isEmpty()) return new View(view.liveMemtables, flushingMemtables, view.sstablesMap, - view.compactingMap, view.premature, view.intervalTree); + view.compactingMap, view.intervalTree); Map<SSTableReader, SSTableReader> sstableMap = replace(view.sstablesMap, emptySet(), flushed); - Map<SSTableReader, SSTableReader> compactingMap = replace(view.compactingMap, emptySet(), flushed); - Set<SSTableReader> premature = replace(view.premature, emptySet(), flushed); - return new View(view.liveMemtables, flushingMemtables, sstableMap, compactingMap, premature, + return new View(view.liveMemtables, flushingMemtables, sstableMap, view.compactingMap, SSTableIntervalTree.build(sstableMap.keySet())); } }; } - static Function<View, View> permitCompactionOfFlushed(final Collection<SSTableReader> readers) - { - Set<SSTableReader> expectAndRemove = ImmutableSet.copyOf(readers); - return new Function<View, View>() - { - public View apply(View view) - { - Set<SSTableReader> premature = replace(view.premature, expectAndRemove, emptySet()); - Map<SSTableReader, SSTableReader> compactingMap = replace(view.compactingMap, expectAndRemove, emptySet()); - return new View(view.liveMemtables, view.flushingMemtables, view.sstablesMap, compactingMap, premature, view.intervalTree); - } - }; - } - - private static <T extends Comparable<T>> Predicate<T> lessThan(final T lessThan) { return new Predicate<T>() http://git-wip-us.apache.org/repos/asf/cassandra/blob/904cb5d1/src/java/org/apache/cassandra/io/sstable/format/Version.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/Version.java b/src/java/org/apache/cassandra/io/sstable/format/Version.java index d9e289c..96c5a6e 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/Version.java +++ b/src/java/org/apache/cassandra/io/sstable/format/Version.java @@ -72,6 +72,8 @@ public abstract class Version public abstract boolean hasCommitLogLowerBound(); + public abstract boolean hasCommitLogIntervals(); + public String getVersion() { return version; http://git-wip-us.apache.org/repos/asf/cassandra/blob/904cb5d1/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java index e0fb3b1..16f0beb 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java @@ -111,7 +111,7 @@ public class BigFormat implements SSTableFormat // we always incremented the major version. static class BigVersion extends Version { - public static final String current_version = "mb"; + public static final String current_version = "mc"; public static final String earliest_supported_version = "jb"; // jb (2.0.1): switch from crc32 to adler32 for compression checksums @@ -124,7 +124,8 @@ public class BigFormat implements SSTableFormat // lb (2.2.7): commit log lower bound included // ma (3.0.0): swap bf hash order // store rows natively - // mb (3.0.6): commit log lower bound included + // mb (3.0.7, 3.7): commit log lower bound included + // mc (3.0.8, 3.9): commit log intervals included // // NOTE: when adding a new version, please add that to LegacySSTableTest, too. @@ -145,6 +146,7 @@ public class BigFormat implements SSTableFormat */ private final boolean hasOldBfHashOrder; private final boolean hasCommitLogLowerBound; + private final boolean hasCommitLogIntervals; /** * CASSANDRA-7066: compaction ancerstors are no longer used and have been removed. @@ -186,6 +188,7 @@ public class BigFormat implements SSTableFormat hasBoundaries = version.compareTo("ma") < 0; hasCommitLogLowerBound = (version.compareTo("lb") >= 0 && version.compareTo("ma") < 0) || version.compareTo("mb") >= 0; + hasCommitLogIntervals = version.compareTo("mc") >= 0; } @Override @@ -248,12 +251,19 @@ public class BigFormat implements SSTableFormat return newFileName; } + @Override public boolean hasCommitLogLowerBound() { return hasCommitLogLowerBound; } @Override + public boolean hasCommitLogIntervals() + { + return hasCommitLogIntervals; + } + + @Override public boolean storeRows() { return storeRows; http://git-wip-us.apache.org/repos/asf/cassandra/blob/904cb5d1/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java index 4561520..a683513 100644 --- a/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java +++ b/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java @@ -24,6 +24,7 @@ import java.util.*; import com.google.common.collect.Maps; import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.db.commitlog.IntervalSet; import org.apache.cassandra.db.commitlog.ReplayPosition; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; @@ -35,6 +36,8 @@ import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.EstimatedHistogram; import org.apache.cassandra.utils.StreamingHistogram; +import static org.apache.cassandra.io.sstable.metadata.StatsMetadata.replayPositionSetSerializer; + /** * Serializer for SSTable from legacy versions */ @@ -55,7 +58,7 @@ public class LegacyMetadataSerializer extends MetadataSerializer EstimatedHistogram.serializer.serialize(stats.estimatedPartitionSize, out); EstimatedHistogram.serializer.serialize(stats.estimatedColumnCount, out); - ReplayPosition.serializer.serialize(stats.commitLogUpperBound, out); + ReplayPosition.serializer.serialize(stats.commitLogIntervals.upperBound().orElse(ReplayPosition.NONE), out); out.writeLong(stats.minTimestamp); out.writeLong(stats.maxTimestamp); out.writeInt(stats.maxLocalDeletionTime); @@ -72,7 +75,9 @@ public class LegacyMetadataSerializer extends MetadataSerializer for (ByteBuffer value : stats.maxClusteringValues) ByteBufferUtil.writeWithShortLength(value, out); if (version.hasCommitLogLowerBound()) - ReplayPosition.serializer.serialize(stats.commitLogLowerBound, out); + ReplayPosition.serializer.serialize(stats.commitLogIntervals.lowerBound().orElse(ReplayPosition.NONE), out); + if (version.hasCommitLogIntervals()) + replayPositionSetSerializer.serialize(stats.commitLogIntervals, out); } /** @@ -121,6 +126,11 @@ public class LegacyMetadataSerializer extends MetadataSerializer if (descriptor.version.hasCommitLogLowerBound()) commitLogLowerBound = ReplayPosition.serializer.deserialize(in); + IntervalSet<ReplayPosition> commitLogIntervals; + if (descriptor.version.hasCommitLogIntervals()) + commitLogIntervals = replayPositionSetSerializer.deserialize(in); + else + commitLogIntervals = new IntervalSet<>(commitLogLowerBound, commitLogUpperBound); if (types.contains(MetadataType.VALIDATION)) components.put(MetadataType.VALIDATION, @@ -129,8 +139,7 @@ public class LegacyMetadataSerializer extends MetadataSerializer components.put(MetadataType.STATS, new StatsMetadata(partitionSizes, columnCounts, - commitLogLowerBound, - commitLogUpperBound, + commitLogIntervals, minTimestamp, maxTimestamp, Integer.MAX_VALUE, http://git-wip-us.apache.org/repos/asf/cassandra/blob/904cb5d1/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java index 53cf0b0..1ff2ca8 100644 --- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java +++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java @@ -27,12 +27,11 @@ import java.util.Map; import java.util.Set; import com.google.common.collect.Maps; -import com.google.common.collect.Ordering; import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus; import com.clearspring.analytics.stream.cardinality.ICardinality; import org.apache.cassandra.db.*; -import org.apache.cassandra.db.commitlog.ReplayPosition; +import org.apache.cassandra.db.commitlog.IntervalSet; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.partitions.PartitionStatisticsCollector; import org.apache.cassandra.db.rows.Cell; @@ -69,8 +68,7 @@ public class MetadataCollector implements PartitionStatisticsCollector { return new StatsMetadata(defaultPartitionSizeHistogram(), defaultCellPerPartitionCountHistogram(), - ReplayPosition.NONE, - ReplayPosition.NONE, + IntervalSet.empty(), Long.MIN_VALUE, Long.MAX_VALUE, Integer.MAX_VALUE, @@ -91,8 +89,7 @@ public class MetadataCollector implements PartitionStatisticsCollector protected EstimatedHistogram estimatedPartitionSize = defaultPartitionSizeHistogram(); // TODO: cound the number of row per partition (either with the number of cells, or instead) protected EstimatedHistogram estimatedCellPerPartitionCount = defaultCellPerPartitionCountHistogram(); - protected ReplayPosition commitLogLowerBound = ReplayPosition.NONE; - protected ReplayPosition commitLogUpperBound = ReplayPosition.NONE; + protected IntervalSet commitLogIntervals = IntervalSet.empty(); protected final MinMaxLongTracker timestampTracker = new MinMaxLongTracker(); protected final MinMaxIntTracker localDeletionTimeTracker = new MinMaxIntTracker(Cell.NO_DELETION_TIME, Cell.NO_DELETION_TIME); protected final MinMaxIntTracker ttlTracker = new MinMaxIntTracker(Cell.NO_TTL, Cell.NO_TTL); @@ -126,23 +123,13 @@ public class MetadataCollector implements PartitionStatisticsCollector { this(comparator); - ReplayPosition min = null, max = null; + IntervalSet.Builder intervals = new IntervalSet.Builder(); for (SSTableReader sstable : sstables) { - if (min == null) - { - min = sstable.getSSTableMetadata().commitLogLowerBound; - max = sstable.getSSTableMetadata().commitLogUpperBound; - } - else - { - min = Ordering.natural().min(min, sstable.getSSTableMetadata().commitLogLowerBound); - max = Ordering.natural().max(max, sstable.getSSTableMetadata().commitLogUpperBound); - } + intervals.addAll(sstable.getSSTableMetadata().commitLogIntervals); } - commitLogLowerBound(min); - commitLogUpperBound(max); + commitLogIntervals(intervals.build()); sstableLevel(level); } @@ -229,15 +216,9 @@ public class MetadataCollector implements PartitionStatisticsCollector ttlTracker.update(newTTL); } - public MetadataCollector commitLogLowerBound(ReplayPosition commitLogLowerBound) - { - this.commitLogLowerBound = commitLogLowerBound; - return this; - } - - public MetadataCollector commitLogUpperBound(ReplayPosition commitLogUpperBound) + public MetadataCollector commitLogIntervals(IntervalSet commitLogIntervals) { - this.commitLogUpperBound = commitLogUpperBound; + this.commitLogIntervals = commitLogIntervals; return this; } @@ -302,8 +283,7 @@ public class MetadataCollector implements PartitionStatisticsCollector components.put(MetadataType.VALIDATION, new ValidationMetadata(partitioner, bloomFilterFPChance)); components.put(MetadataType.STATS, new StatsMetadata(estimatedPartitionSize, estimatedCellPerPartitionCount, - commitLogLowerBound, - commitLogUpperBound, + commitLogIntervals, timestampTracker.min(), timestampTracker.max(), localDeletionTimeTracker.min(), http://git-wip-us.apache.org/repos/asf/cassandra/blob/904cb5d1/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java index 07e35bb..9971eaa 100644 --- a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java +++ b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java @@ -22,10 +22,12 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import org.apache.cassandra.io.ISerializer; import org.apache.cassandra.io.sstable.format.Version; import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.db.commitlog.IntervalSet; import org.apache.cassandra.db.commitlog.ReplayPosition; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; @@ -39,11 +41,11 @@ import org.apache.cassandra.utils.StreamingHistogram; public class StatsMetadata extends MetadataComponent { public static final IMetadataComponentSerializer serializer = new StatsMetadataSerializer(); + public static final ISerializer<IntervalSet<ReplayPosition>> replayPositionSetSerializer = IntervalSet.serializer(ReplayPosition.serializer); public final EstimatedHistogram estimatedPartitionSize; public final EstimatedHistogram estimatedColumnCount; - public final ReplayPosition commitLogLowerBound; - public final ReplayPosition commitLogUpperBound; + public final IntervalSet<ReplayPosition> commitLogIntervals; public final long minTimestamp; public final long maxTimestamp; public final int minLocalDeletionTime; @@ -62,8 +64,7 @@ public class StatsMetadata extends MetadataComponent public StatsMetadata(EstimatedHistogram estimatedPartitionSize, EstimatedHistogram estimatedColumnCount, - ReplayPosition commitLogLowerBound, - ReplayPosition commitLogUpperBound, + IntervalSet<ReplayPosition> commitLogIntervals, long minTimestamp, long maxTimestamp, int minLocalDeletionTime, @@ -82,8 +83,7 @@ public class StatsMetadata extends MetadataComponent { this.estimatedPartitionSize = estimatedPartitionSize; this.estimatedColumnCount = estimatedColumnCount; - this.commitLogLowerBound = commitLogLowerBound; - this.commitLogUpperBound = commitLogUpperBound; + this.commitLogIntervals = commitLogIntervals; this.minTimestamp = minTimestamp; this.maxTimestamp = maxTimestamp; this.minLocalDeletionTime = minLocalDeletionTime; @@ -134,8 +134,7 @@ public class StatsMetadata extends MetadataComponent { return new StatsMetadata(estimatedPartitionSize, estimatedColumnCount, - commitLogLowerBound, - commitLogUpperBound, + commitLogIntervals, minTimestamp, maxTimestamp, minLocalDeletionTime, @@ -157,8 +156,7 @@ public class StatsMetadata extends MetadataComponent { return new StatsMetadata(estimatedPartitionSize, estimatedColumnCount, - commitLogLowerBound, - commitLogUpperBound, + commitLogIntervals, minTimestamp, maxTimestamp, minLocalDeletionTime, @@ -186,8 +184,7 @@ public class StatsMetadata extends MetadataComponent return new EqualsBuilder() .append(estimatedPartitionSize, that.estimatedPartitionSize) .append(estimatedColumnCount, that.estimatedColumnCount) - .append(commitLogLowerBound, that.commitLogLowerBound) - .append(commitLogUpperBound, that.commitLogUpperBound) + .append(commitLogIntervals, that.commitLogIntervals) .append(minTimestamp, that.minTimestamp) .append(maxTimestamp, that.maxTimestamp) .append(minLocalDeletionTime, that.minLocalDeletionTime) @@ -212,8 +209,7 @@ public class StatsMetadata extends MetadataComponent return new HashCodeBuilder() .append(estimatedPartitionSize) .append(estimatedColumnCount) - .append(commitLogLowerBound) - .append(commitLogUpperBound) + .append(commitLogIntervals) .append(minTimestamp) .append(maxTimestamp) .append(minLocalDeletionTime) @@ -239,7 +235,7 @@ public class StatsMetadata extends MetadataComponent int size = 0; size += EstimatedHistogram.serializer.serializedSize(component.estimatedPartitionSize); size += EstimatedHistogram.serializer.serializedSize(component.estimatedColumnCount); - size += ReplayPosition.serializer.serializedSize(component.commitLogUpperBound); + size += ReplayPosition.serializer.serializedSize(component.commitLogIntervals.upperBound().orElse(ReplayPosition.NONE)); if (version.storeRows()) size += 8 + 8 + 4 + 4 + 4 + 4 + 8 + 8; // mix/max timestamp(long), min/maxLocalDeletionTime(int), min/max TTL, compressionRatio(double), repairedAt (long) else @@ -258,7 +254,9 @@ public class StatsMetadata extends MetadataComponent if (version.storeRows()) size += 8 + 8; // totalColumnsSet, totalRows if (version.hasCommitLogLowerBound()) - size += ReplayPosition.serializer.serializedSize(component.commitLogLowerBound); + size += ReplayPosition.serializer.serializedSize(component.commitLogIntervals.lowerBound().orElse(ReplayPosition.NONE)); + if (version.hasCommitLogIntervals()) + size += replayPositionSetSerializer.serializedSize(component.commitLogIntervals); return size; } @@ -266,7 +264,7 @@ public class StatsMetadata extends MetadataComponent { EstimatedHistogram.serializer.serialize(component.estimatedPartitionSize, out); EstimatedHistogram.serializer.serialize(component.estimatedColumnCount, out); - ReplayPosition.serializer.serialize(component.commitLogUpperBound, out); + ReplayPosition.serializer.serialize(component.commitLogIntervals.upperBound().orElse(ReplayPosition.NONE), out); out.writeLong(component.minTimestamp); out.writeLong(component.maxTimestamp); if (version.storeRows()) @@ -296,7 +294,9 @@ public class StatsMetadata extends MetadataComponent } if (version.hasCommitLogLowerBound()) - ReplayPosition.serializer.serialize(component.commitLogLowerBound, out); + ReplayPosition.serializer.serialize(component.commitLogIntervals.lowerBound().orElse(ReplayPosition.NONE), out); + if (version.hasCommitLogIntervals()) + replayPositionSetSerializer.serialize(component.commitLogIntervals, out); } public StatsMetadata deserialize(Version version, DataInputPlus in) throws IOException @@ -338,11 +338,15 @@ public class StatsMetadata extends MetadataComponent if (version.hasCommitLogLowerBound()) commitLogLowerBound = ReplayPosition.serializer.deserialize(in); + IntervalSet<ReplayPosition> commitLogIntervals; + if (version.hasCommitLogIntervals()) + commitLogIntervals = replayPositionSetSerializer.deserialize(in); + else + commitLogIntervals = new IntervalSet<ReplayPosition>(commitLogLowerBound, commitLogUpperBound); return new StatsMetadata(partitionSizes, columnCounts, - commitLogLowerBound, - commitLogUpperBound, + commitLogIntervals, minTimestamp, maxTimestamp, minLocalDeletionTime, http://git-wip-us.apache.org/repos/asf/cassandra/blob/904cb5d1/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java b/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java index 420b802..5f7513f 100644 --- a/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java +++ b/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java @@ -70,8 +70,7 @@ public class SSTableMetadataViewer out.printf("Estimated droppable tombstones: %s%n", stats.getEstimatedDroppableTombstoneRatio((int) (System.currentTimeMillis() / 1000))); out.printf("SSTable Level: %d%n", stats.sstableLevel); out.printf("Repaired at: %d%n", stats.repairedAt); - out.printf("Minimum replay position: %s\n", stats.commitLogLowerBound); - out.printf("Maximum replay position: %s\n", stats.commitLogUpperBound); + out.printf("Replay positions covered: %s\n", stats.commitLogIntervals); out.println("Estimated tombstone drop times:"); for (Map.Entry<Double, Long> entry : stats.estimatedTombstoneDropTime.getAsMap().entrySet()) {