http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/src/java/org/apache/cassandra/db/lifecycle/Tracker.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/lifecycle/Tracker.java index b1c706e,5a3d524..f464e08 --- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java +++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java @@@ -353,35 -347,13 +349,16 @@@ public class Tracke Throwable fail; fail = updateSizeTracking(emptySet(), sstables, null); + + notifyDiscarded(memtable); + - maybeFail(fail); - } - - /** - * permit compaction of the provided sstable; this translates to notifying compaction - * strategies of its existence, and potentially submitting a background task - */ - public void permitCompactionOfFlushed(Collection<SSTableReader> sstables) - { - if (sstables.isEmpty()) - return; + // TODO: if we're invalidated, should we notifyadded AND removed, or just skip both? + fail = notifyAdded(sstables, fail); - apply(View.permitCompactionOfFlushed(sstables)); - - if (isDummy()) - return; - - if (cfstore.isValid()) - { - notifyAdded(sstables); - CompactionManager.instance.submitBackground(cfstore); - } - else - { + if (!isDummy() && !cfstore.isValid()) dropSSTables(); - } + + maybeFail(fail); }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/src/java/org/apache/cassandra/db/lifecycle/View.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/lifecycle/View.java index a5c781d,4b3aae0..b26426d --- a/src/java/org/apache/cassandra/db/lifecycle/View.java +++ b/src/java/org/apache/cassandra/db/lifecycle/View.java @@@ -40,7 -39,7 +39,6 @@@ import static com.google.common.collect import static com.google.common.collect.Iterables.all; import static com.google.common.collect.Iterables.concat; import static com.google.common.collect.Iterables.filter; --import static com.google.common.collect.Iterables.transform; import static org.apache.cassandra.db.lifecycle.Helpers.emptySet; import static org.apache.cassandra.db.lifecycle.Helpers.filterOut; import static org.apache.cassandra.db.lifecycle.Helpers.replace; @@@ -336,14 -333,12 +332,12 @@@ public class Vie List<Memtable> flushingMemtables = copyOf(filter(view.flushingMemtables, not(equalTo(memtable)))); assert flushingMemtables.size() == view.flushingMemtables.size() - 1; - if (flushed == null || flushed.isEmpty()) + if (flushed == null || Iterables.isEmpty(flushed)) return new View(view.liveMemtables, flushingMemtables, view.sstablesMap, - view.compactingMap, view.premature, view.intervalTree); + view.compactingMap, view.intervalTree); Map<SSTableReader, SSTableReader> sstableMap = replace(view.sstablesMap, emptySet(), flushed); - Map<SSTableReader, SSTableReader> compactingMap = replace(view.compactingMap, emptySet(), flushed); - Set<SSTableReader> premature = replace(view.premature, emptySet(), flushed); - return new View(view.liveMemtables, flushingMemtables, sstableMap, compactingMap, premature, + return new View(view.liveMemtables, flushingMemtables, sstableMap, view.compactingMap, SSTableIntervalTree.build(sstableMap.keySet())); } }; http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java index 505de49,a683513..14e391b --- a/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java +++ b/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java @@@ -24,7 -24,8 +24,8 @@@ import java.util.* import com.google.common.collect.Maps; import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.db.commitlog.CommitLogPosition; + import org.apache.cassandra.db.commitlog.IntervalSet; -import org.apache.cassandra.db.commitlog.ReplayPosition; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.format.Version; @@@ -35,6 -36,8 +36,8 @@@ import org.apache.cassandra.utils.ByteB import org.apache.cassandra.utils.EstimatedHistogram; import org.apache.cassandra.utils.StreamingHistogram; -import static org.apache.cassandra.io.sstable.metadata.StatsMetadata.replayPositionSetSerializer; ++import static org.apache.cassandra.io.sstable.metadata.StatsMetadata.commitLogPositionSetSerializer; + /** * Serializer for SSTable from legacy versions */ @@@ -55,7 -58,7 +58,7 @@@ public class LegacyMetadataSerializer e EstimatedHistogram.serializer.serialize(stats.estimatedPartitionSize, out); EstimatedHistogram.serializer.serialize(stats.estimatedColumnCount, out); - CommitLogPosition.serializer.serialize(stats.commitLogUpperBound, out); - ReplayPosition.serializer.serialize(stats.commitLogIntervals.upperBound().orElse(ReplayPosition.NONE), out); ++ CommitLogPosition.serializer.serialize(stats.commitLogIntervals.upperBound().orElse(CommitLogPosition.NONE), out); out.writeLong(stats.minTimestamp); out.writeLong(stats.maxTimestamp); out.writeInt(stats.maxLocalDeletionTime); @@@ -72,7 -75,9 +75,9 @@@ for (ByteBuffer value : stats.maxClusteringValues) ByteBufferUtil.writeWithShortLength(value, out); if (version.hasCommitLogLowerBound()) - CommitLogPosition.serializer.serialize(stats.commitLogLowerBound, out); - ReplayPosition.serializer.serialize(stats.commitLogIntervals.lowerBound().orElse(ReplayPosition.NONE), out); ++ CommitLogPosition.serializer.serialize(stats.commitLogIntervals.lowerBound().orElse(CommitLogPosition.NONE), out); + if (version.hasCommitLogIntervals()) - replayPositionSetSerializer.serialize(stats.commitLogIntervals, out); ++ commitLogPositionSetSerializer.serialize(stats.commitLogIntervals, out); } /** @@@ -120,7 -125,12 +125,12 @@@ maxColumnNames.add(ByteBufferUtil.readWithShortLength(in)); if (descriptor.version.hasCommitLogLowerBound()) - commitLogLowerBound = ReplayPosition.serializer.deserialize(in); - IntervalSet<ReplayPosition> commitLogIntervals; + commitLogLowerBound = CommitLogPosition.serializer.deserialize(in); ++ IntervalSet<CommitLogPosition> commitLogIntervals; + if (descriptor.version.hasCommitLogIntervals()) - commitLogIntervals = replayPositionSetSerializer.deserialize(in); ++ commitLogIntervals = commitLogPositionSetSerializer.deserialize(in); + else + commitLogIntervals = new IntervalSet<>(commitLogLowerBound, commitLogUpperBound); if (types.contains(MetadataType.VALIDATION)) components.put(MetadataType.VALIDATION, http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java index 299bc87,1ff2ca8..196cfbf --- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java +++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java @@@ -20,16 -20,18 +20,16 @@@ package org.apache.cassandra.io.sstable import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import com.google.common.collect.Maps; - import com.google.common.collect.Ordering; import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus; import com.clearspring.analytics.stream.cardinality.ICardinality; import org.apache.cassandra.db.*; +import org.apache.cassandra.db.commitlog.CommitLogPosition; + import org.apache.cassandra.db.commitlog.IntervalSet; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.partitions.PartitionStatisticsCollector; import org.apache.cassandra.db.rows.Cell; @@@ -88,8 -89,7 +87,7 @@@ public class MetadataCollector implemen protected EstimatedHistogram estimatedPartitionSize = defaultPartitionSizeHistogram(); // TODO: cound the number of row per partition (either with the number of cells, or instead) protected EstimatedHistogram estimatedCellPerPartitionCount = defaultCellPerPartitionCountHistogram(); - protected CommitLogPosition commitLogLowerBound = CommitLogPosition.NONE; - protected CommitLogPosition commitLogUpperBound = CommitLogPosition.NONE; - protected IntervalSet commitLogIntervals = IntervalSet.empty(); ++ protected IntervalSet<CommitLogPosition> commitLogIntervals = IntervalSet.empty(); protected final MinMaxLongTracker timestampTracker = new MinMaxLongTracker(); protected final MinMaxIntTracker localDeletionTimeTracker = new MinMaxIntTracker(Cell.NO_DELETION_TIME, Cell.NO_DELETION_TIME); protected final MinMaxIntTracker ttlTracker = new MinMaxIntTracker(Cell.NO_TTL, Cell.NO_TTL); @@@ -123,23 -123,13 +121,13 @@@ { this(comparator); - CommitLogPosition min = null, max = null; - IntervalSet.Builder intervals = new IntervalSet.Builder(); ++ IntervalSet.Builder<CommitLogPosition> intervals = new IntervalSet.Builder<>(); for (SSTableReader sstable : sstables) { - if (min == null) - { - min = sstable.getSSTableMetadata().commitLogLowerBound; - max = sstable.getSSTableMetadata().commitLogUpperBound; - } - else - { - min = Ordering.natural().min(min, sstable.getSSTableMetadata().commitLogLowerBound); - max = Ordering.natural().max(max, sstable.getSSTableMetadata().commitLogUpperBound); - } + intervals.addAll(sstable.getSSTableMetadata().commitLogIntervals); } - commitLogLowerBound(min); - commitLogUpperBound(max); + commitLogIntervals(intervals.build()); sstableLevel(level); } @@@ -226,15 -216,9 +214,9 @@@ ttlTracker.update(newTTL); } - public MetadataCollector commitLogLowerBound(CommitLogPosition commitLogLowerBound) - public MetadataCollector commitLogIntervals(IntervalSet commitLogIntervals) ++ public MetadataCollector commitLogIntervals(IntervalSet<CommitLogPosition> commitLogIntervals) { - this.commitLogLowerBound = commitLogLowerBound; - return this; - } - - public MetadataCollector commitLogUpperBound(CommitLogPosition commitLogUpperBound) - { - this.commitLogUpperBound = commitLogUpperBound; + this.commitLogIntervals = commitLogIntervals; return this; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java index e765235,9971eaa..c83c2cf --- a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java +++ b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java @@@ -26,7 -27,8 +27,8 @@@ import org.apache.cassandra.io.sstable. import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.db.commitlog.CommitLogPosition; + import org.apache.cassandra.db.commitlog.IntervalSet; -import org.apache.cassandra.db.commitlog.ReplayPosition; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.utils.ByteBufferUtil; @@@ -39,11 -41,11 +41,11 @@@ import org.apache.cassandra.utils.Strea public class StatsMetadata extends MetadataComponent { public static final IMetadataComponentSerializer serializer = new StatsMetadataSerializer(); - public static final ISerializer<IntervalSet<ReplayPosition>> replayPositionSetSerializer = IntervalSet.serializer(ReplayPosition.serializer); ++ public static final ISerializer<IntervalSet<CommitLogPosition>> commitLogPositionSetSerializer = IntervalSet.serializer(CommitLogPosition.serializer); public final EstimatedHistogram estimatedPartitionSize; public final EstimatedHistogram estimatedColumnCount; - public final CommitLogPosition commitLogLowerBound; - public final CommitLogPosition commitLogUpperBound; - public final IntervalSet<ReplayPosition> commitLogIntervals; ++ public final IntervalSet<CommitLogPosition> commitLogIntervals; public final long minTimestamp; public final long maxTimestamp; public final int minLocalDeletionTime; @@@ -62,8 -64,7 +64,7 @@@ public StatsMetadata(EstimatedHistogram estimatedPartitionSize, EstimatedHistogram estimatedColumnCount, - CommitLogPosition commitLogLowerBound, - CommitLogPosition commitLogUpperBound, - IntervalSet<ReplayPosition> commitLogIntervals, ++ IntervalSet<CommitLogPosition> commitLogIntervals, long minTimestamp, long maxTimestamp, int minLocalDeletionTime, @@@ -239,7 -235,7 +235,7 @@@ int size = 0; size += EstimatedHistogram.serializer.serializedSize(component.estimatedPartitionSize); size += EstimatedHistogram.serializer.serializedSize(component.estimatedColumnCount); - size += CommitLogPosition.serializer.serializedSize(component.commitLogUpperBound); - size += ReplayPosition.serializer.serializedSize(component.commitLogIntervals.upperBound().orElse(ReplayPosition.NONE)); ++ size += CommitLogPosition.serializer.serializedSize(component.commitLogIntervals.upperBound().orElse(CommitLogPosition.NONE)); if (version.storeRows()) size += 8 + 8 + 4 + 4 + 4 + 4 + 8 + 8; // mix/max timestamp(long), min/maxLocalDeletionTime(int), min/max TTL, compressionRatio(double), repairedAt (long) else @@@ -258,7 -254,9 +254,9 @@@ if (version.storeRows()) size += 8 + 8; // totalColumnsSet, totalRows if (version.hasCommitLogLowerBound()) - size += CommitLogPosition.serializer.serializedSize(component.commitLogLowerBound); - size += ReplayPosition.serializer.serializedSize(component.commitLogIntervals.lowerBound().orElse(ReplayPosition.NONE)); ++ size += CommitLogPosition.serializer.serializedSize(component.commitLogIntervals.lowerBound().orElse(CommitLogPosition.NONE)); + if (version.hasCommitLogIntervals()) - size += replayPositionSetSerializer.serializedSize(component.commitLogIntervals); ++ size += commitLogPositionSetSerializer.serializedSize(component.commitLogIntervals); return size; } @@@ -266,7 -264,7 +264,7 @@@ { EstimatedHistogram.serializer.serialize(component.estimatedPartitionSize, out); EstimatedHistogram.serializer.serialize(component.estimatedColumnCount, out); - CommitLogPosition.serializer.serialize(component.commitLogUpperBound, out); - ReplayPosition.serializer.serialize(component.commitLogIntervals.upperBound().orElse(ReplayPosition.NONE), out); ++ CommitLogPosition.serializer.serialize(component.commitLogIntervals.upperBound().orElse(CommitLogPosition.NONE), out); out.writeLong(component.minTimestamp); out.writeLong(component.maxTimestamp); if (version.storeRows()) @@@ -296,7 -294,9 +294,9 @@@ } if (version.hasCommitLogLowerBound()) - CommitLogPosition.serializer.serialize(component.commitLogLowerBound, out); - ReplayPosition.serializer.serialize(component.commitLogIntervals.lowerBound().orElse(ReplayPosition.NONE), out); ++ CommitLogPosition.serializer.serialize(component.commitLogIntervals.lowerBound().orElse(CommitLogPosition.NONE), out); + if (version.hasCommitLogIntervals()) - replayPositionSetSerializer.serialize(component.commitLogIntervals, out); ++ commitLogPositionSetSerializer.serialize(component.commitLogIntervals, out); } public StatsMetadata deserialize(Version version, DataInputPlus in) throws IOException @@@ -337,7 -337,12 +337,12 @@@ long totalRows = version.storeRows() ? in.readLong() : -1L; if (version.hasCommitLogLowerBound()) - commitLogLowerBound = ReplayPosition.serializer.deserialize(in); - IntervalSet<ReplayPosition> commitLogIntervals; + commitLogLowerBound = CommitLogPosition.serializer.deserialize(in); ++ IntervalSet<CommitLogPosition> commitLogIntervals; + if (version.hasCommitLogIntervals()) - commitLogIntervals = replayPositionSetSerializer.deserialize(in); ++ commitLogIntervals = commitLogPositionSetSerializer.deserialize(in); + else - commitLogIntervals = new IntervalSet<ReplayPosition>(commitLogLowerBound, commitLogUpperBound); ++ commitLogIntervals = new IntervalSet<CommitLogPosition>(commitLogLowerBound, commitLogUpperBound); return new StatsMetadata(partitionSizes, columnCounts, http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java index 3c8ba64,5f7513f..6686684 --- a/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java +++ b/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java @@@ -112,15 -70,11 +112,14 @@@ public class SSTableMetadataViewe out.printf("Estimated droppable tombstones: %s%n", stats.getEstimatedDroppableTombstoneRatio((int) (System.currentTimeMillis() / 1000))); out.printf("SSTable Level: %d%n", stats.sstableLevel); out.printf("Repaired at: %d%n", stats.repairedAt); - out.printf("Minimum replay position: %s\n", stats.commitLogLowerBound); - out.printf("Maximum replay position: %s\n", stats.commitLogUpperBound); + out.printf("Replay positions covered: %s\n", stats.commitLogIntervals); + out.printf("totalColumnsSet: %s%n", stats.totalColumnsSet); + out.printf("totalRows: %s%n", stats.totalRows); out.println("Estimated tombstone drop times:"); - for (Map.Entry<Double, Long> entry : stats.estimatedTombstoneDropTime.getAsMap().entrySet()) + + for (Map.Entry<Number, long[]> entry : stats.estimatedTombstoneDropTime.getAsMap().entrySet()) { - out.printf("%-10s:%10s%n",entry.getKey().intValue(), entry.getValue()); + out.printf("%-10s:%10s%n",entry.getKey().intValue(), entry.getValue()[0]); } printHistograms(stats, out); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java ---------------------------------------------------------------------- diff --cc test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java index 239077e,02b26c7..2858597 --- a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java +++ b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java @@@ -250,9 -245,9 +250,10 @@@ public class CommitLogStressTes } verifySizes(commitLog); - commitLog.discardCompletedSegments(Schema.instance.getCFMetaData("Keyspace1", "Standard1").cfId, discardedPos); + commitLog.discardCompletedSegments(Schema.instance.getCFMetaData("Keyspace1", "Standard1").cfId, - ReplayPosition.NONE, discardedPos); ++ CommitLogPosition.NONE, discardedPos); threads.clear(); + System.out.format("Discarded at %s\n", discardedPos); verifySizes(commitLog); http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/test/unit/org/apache/cassandra/Util.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/Util.java index cd709d5,d04ca9b..7bcee7a --- a/test/unit/org/apache/cassandra/Util.java +++ b/test/unit/org/apache/cassandra/Util.java @@@ -677,9 -648,26 +679,26 @@@ public class Uti } private UnfilteredPartitionIterator queryStorageInternal(ColumnFamilyStore cfs, - ReadOrderGroup orderGroup) + ReadExecutionController controller) { - return queryStorage(cfs, orderGroup); + return queryStorage(cfs, controller); } } + + public static Closeable markDirectoriesUnwriteable(ColumnFamilyStore cfs) + { + try + { + for ( ; ; ) + { + DataDirectory dir = cfs.getDirectories().getWriteableLocation(1); + BlacklistedDirectories.maybeMarkUnwritable(cfs.getDirectories().getLocationForDisk(dir)); + } + } + catch (IOError e) + { + // Expected -- marked all directories as unwritable + } + return () -> BlacklistedDirectories.clearUnwritableUnsafe(); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/test/unit/org/apache/cassandra/cql3/CQLTester.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/test/unit/org/apache/cassandra/cql3/OutOfSpaceTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java index 23ec58b,9a0ddb8..6ab7d46 --- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java @@@ -20,9 -20,13 +20,10 @@@ package org.apache.cassandra.db.commitl import java.io.*; import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.UUID; +import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; + import java.util.function.BiConsumer; import java.util.zip.CRC32; import java.util.zip.Checksum; @@@ -35,34 -40,27 +36,34 @@@ import org.junit.runners.Parameterized. import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; +import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.ParameterizedClass; - import org.apache.cassandra.db.ColumnFamilyStore; - import org.apache.cassandra.db.Keyspace; - import org.apache.cassandra.db.Mutation; - import org.apache.cassandra.db.RowUpdateBuilder; + import org.apache.cassandra.config.Config.DiskFailurePolicy; + import org.apache.cassandra.db.*; import org.apache.cassandra.db.commitlog.CommitLogReplayer.CommitLogReplayException; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.marshal.AsciiType; import org.apache.cassandra.db.marshal.BytesType; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.exceptions.ConfigurationException; + import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.io.compress.DeflateCompressor; import org.apache.cassandra.io.compress.LZ4Compressor; import org.apache.cassandra.io.compress.SnappyCompressor; + import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.schema.KeyspaceParams; -import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.security.EncryptionContext; +import org.apache.cassandra.security.EncryptionContextGenerator; +import org.apache.cassandra.utils.Hex; import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.KillerForTests; +import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.vint.VIntCoding; +import org.junit.After; import static org.apache.cassandra.utils.ByteBufferUtil.bytes; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@@ -247,13 -228,13 +248,13 @@@ public class CommitLogTes .build(); CommitLog.instance.add(m2); - assert CommitLog.instance.activeSegments() == 2 : "Expecting 2 segments, got " + CommitLog.instance.activeSegments(); + assertEquals(2, CommitLog.instance.segmentManager.getActiveSegments().size()); UUID cfid2 = m2.getColumnFamilyIds().iterator().next(); - CommitLog.instance.discardCompletedSegments(cfid2, CommitLog.instance.getCurrentPosition()); - CommitLog.instance.discardCompletedSegments(cfid2, ReplayPosition.NONE, CommitLog.instance.getContext()); ++ CommitLog.instance.discardCompletedSegments(cfid2, CommitLogPosition.NONE, CommitLog.instance.getCurrentPosition()); - // Assert we still have both our segment - assert CommitLog.instance.activeSegments() == 2 : "Expecting 2 segments, got " + CommitLog.instance.activeSegments(); + // Assert we still have both our segments + assertEquals(2, CommitLog.instance.segmentManager.getActiveSegments().size()); } @Test @@@ -278,9 -258,9 +279,9 @@@ // "Flush": this won't delete anything UUID cfid1 = rm.getColumnFamilyIds().iterator().next(); CommitLog.instance.sync(true); - CommitLog.instance.discardCompletedSegments(cfid1, CommitLog.instance.getCurrentPosition()); - CommitLog.instance.discardCompletedSegments(cfid1, ReplayPosition.NONE, CommitLog.instance.getContext()); ++ CommitLog.instance.discardCompletedSegments(cfid1, CommitLogPosition.NONE, CommitLog.instance.getCurrentPosition()); - assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments(); + assertEquals(1, CommitLog.instance.segmentManager.getActiveSegments().size()); // Adding new mutation on another CF, large enough (including CL entry overhead) that a new segment is created Mutation rm2 = new RowUpdateBuilder(cfs2.metadata, 0, "k") @@@ -298,10 -279,10 +299,10 @@@ // didn't write anything on cf1 since last flush (and we flush cf2) UUID cfid2 = rm2.getColumnFamilyIds().iterator().next(); - CommitLog.instance.discardCompletedSegments(cfid2, CommitLog.instance.getCurrentPosition()); - CommitLog.instance.discardCompletedSegments(cfid2, ReplayPosition.NONE, CommitLog.instance.getContext()); ++ CommitLog.instance.discardCompletedSegments(cfid2, CommitLogPosition.NONE, CommitLog.instance.getCurrentPosition()); // Assert we still have both our segment - assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments(); + assertEquals(1, CommitLog.instance.segmentManager.getActiveSegments().size()); } private static int getMaxRecordDataSize(String keyspace, ByteBuffer key, String cfName, String colName) @@@ -545,13 -486,13 +546,13 @@@ for (int i = 0 ; i < 5 ; i++) CommitLog.instance.add(m2); - assertEquals(2, CommitLog.instance.activeSegments()); - ReplayPosition position = CommitLog.instance.getContext(); - for (Keyspace ks : Keyspace.system()) - for (ColumnFamilyStore syscfs : ks.getColumnFamilyStores()) - CommitLog.instance.discardCompletedSegments(syscfs.metadata.cfId, ReplayPosition.NONE, position); - CommitLog.instance.discardCompletedSegments(cfs2.metadata.cfId, ReplayPosition.NONE, position); - assertEquals(1, CommitLog.instance.activeSegments()); + assertEquals(2, CommitLog.instance.segmentManager.getActiveSegments().size()); + CommitLogPosition position = CommitLog.instance.getCurrentPosition(); + for (Keyspace keyspace : Keyspace.system()) + for (ColumnFamilyStore syscfs : keyspace.getColumnFamilyStores()) - CommitLog.instance.discardCompletedSegments(syscfs.metadata.cfId, position); - CommitLog.instance.discardCompletedSegments(cfs2.metadata.cfId, position); ++ CommitLog.instance.discardCompletedSegments(syscfs.metadata.cfId, CommitLogPosition.NONE, position); ++ CommitLog.instance.discardCompletedSegments(cfs2.metadata.cfId, CommitLogPosition.NONE, position); + assertEquals(1, CommitLog.instance.segmentManager.getActiveSegments().size()); } finally { @@@ -589,108 -530,136 +590,240 @@@ } @Test + public void replaySimple() throws IOException + { + int cellCount = 0; + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1); + final Mutation rm1 = new RowUpdateBuilder(cfs.metadata, 0, "k1") + .clustering("bytes") + .add("val", bytes("this is a string")) + .build(); + cellCount += 1; + CommitLog.instance.add(rm1); + + final Mutation rm2 = new RowUpdateBuilder(cfs.metadata, 0, "k2") + .clustering("bytes") + .add("val", bytes("this is a string")) + .build(); + cellCount += 1; + CommitLog.instance.add(rm2); + + CommitLog.instance.sync(true); + + SimpleCountingReplayer replayer = new SimpleCountingReplayer(CommitLog.instance, CommitLogPosition.NONE, cfs.metadata); + List<String> activeSegments = CommitLog.instance.getActiveSegmentNames(); + Assert.assertFalse(activeSegments.isEmpty()); + + File[] files = new File(CommitLog.instance.segmentManager.storageDirectory).listFiles((file, name) -> activeSegments.contains(name)); + replayer.replayFiles(files); + + assertEquals(cellCount, replayer.cells); + } + + @Test + public void replayWithDiscard() throws IOException + { + int cellCount = 0; + int max = 1024; + int discardPosition = (int)(max * .8); // an arbitrary number of entries that we'll skip on the replay + CommitLogPosition commitLogPosition = null; + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1); + + for (int i = 0; i < max; i++) + { + final Mutation rm1 = new RowUpdateBuilder(cfs.metadata, 0, "k" + 1) + .clustering("bytes") + .add("val", bytes("this is a string")) + .build(); + CommitLogPosition position = CommitLog.instance.add(rm1); + + if (i == discardPosition) + commitLogPosition = position; + if (i > discardPosition) + { + cellCount += 1; + } + } + + CommitLog.instance.sync(true); + + SimpleCountingReplayer replayer = new SimpleCountingReplayer(CommitLog.instance, commitLogPosition, cfs.metadata); + List<String> activeSegments = CommitLog.instance.getActiveSegmentNames(); + Assert.assertFalse(activeSegments.isEmpty()); + + File[] files = new File(CommitLog.instance.segmentManager.storageDirectory).listFiles((file, name) -> activeSegments.contains(name)); + replayer.replayFiles(files); + + assertEquals(cellCount, replayer.cells); + } + + class SimpleCountingReplayer extends CommitLogReplayer + { + private final CommitLogPosition filterPosition; + private final CFMetaData metadata; + int cells; + int skipped; + + SimpleCountingReplayer(CommitLog commitLog, CommitLogPosition filterPosition, CFMetaData cfm) + { + super(commitLog, filterPosition, Collections.emptyMap(), ReplayFilter.create()); + this.filterPosition = filterPosition; + this.metadata = cfm; + } + + @SuppressWarnings("resource") + @Override + public void handleMutation(Mutation m, int size, int entryLocation, CommitLogDescriptor desc) + { + if (entryLocation <= filterPosition.position) + { + // Skip over this mutation. + skipped++; + return; + } + for (PartitionUpdate partitionUpdate : m.getPartitionUpdates()) + { + // Only process mutations for the CF's we're testing against, since we can't deterministically predict + // whether or not system keyspaces will be mutated during a test. + if (partitionUpdate.metadata().cfName.equals(metadata.cfName)) + { + for (Row row : partitionUpdate) + cells += Iterables.size(row.cells()); + } + } + } + } ++ + public void testUnwriteableFlushRecovery() throws ExecutionException, InterruptedException, IOException + { + CommitLog.instance.resetUnsafe(true); + + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1); + + DiskFailurePolicy oldPolicy = DatabaseDescriptor.getDiskFailurePolicy(); + try + { + DatabaseDescriptor.setDiskFailurePolicy(DiskFailurePolicy.ignore); + + for (int i = 0 ; i < 5 ; i++) + { + new RowUpdateBuilder(cfs.metadata, 0, "k") + .clustering("c" + i).add("val", ByteBuffer.allocate(100)) + .build() + .apply(); + + if (i == 2) + { + try (Closeable c = Util.markDirectoriesUnwriteable(cfs)) + { + cfs.forceBlockingFlush(); + } + catch (Throwable t) + { + // expected. Cause (after some wrappings) should be a write error + while (!(t instanceof FSWriteError)) + t = t.getCause(); + } + } + else + cfs.forceBlockingFlush(); + } + } + finally + { + DatabaseDescriptor.setDiskFailurePolicy(oldPolicy); + } + + CommitLog.instance.sync(true); + System.setProperty("cassandra.replayList", KEYSPACE1 + "." + STANDARD1); + // Currently we don't attempt to re-flush a memtable that failed, thus make sure data is replayed by commitlog. + // If retries work subsequent flushes should clear up error and this should change to expect 0. + Assert.assertEquals(1, CommitLog.instance.resetUnsafe(false)); + } + + public void testOutOfOrderFlushRecovery(BiConsumer<ColumnFamilyStore, Memtable> flushAction, boolean performCompaction) + throws ExecutionException, InterruptedException, IOException + { + CommitLog.instance.resetUnsafe(true); + + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1); + + for (int i = 0 ; i < 5 ; i++) + { + new RowUpdateBuilder(cfs.metadata, 0, "k") + .clustering("c" + i).add("val", ByteBuffer.allocate(100)) + .build() + .apply(); + + Memtable current = cfs.getTracker().getView().getCurrentMemtable(); + if (i == 2) + current.makeUnflushable(); + + flushAction.accept(cfs, current); + } + if (performCompaction) + cfs.forceMajorCompaction(); + // Make sure metadata saves and reads fine + for (SSTableReader reader : cfs.getLiveSSTables()) + reader.reloadSSTableMetadata(); + + CommitLog.instance.sync(true); + System.setProperty("cassandra.replayList", KEYSPACE1 + "." + STANDARD1); + // In the absence of error, this should be 0 because forceBlockingFlush/forceRecycleAllSegments would have + // persisted all data in the commit log. Because we know there was an error, there must be something left to + // replay. + Assert.assertEquals(1, CommitLog.instance.resetUnsafe(false)); + } + + BiConsumer<ColumnFamilyStore, Memtable> flush = (cfs, current) -> + { + try + { + cfs.forceBlockingFlush(); + } + catch (Throwable t) + { + // expected after makeUnflushable. Cause (after some wrappings) should be a write error + while (!(t instanceof FSWriteError)) + t = t.getCause(); + // Wait for started flushes to complete. + cfs.switchMemtableIfCurrent(current); + } + }; + + BiConsumer<ColumnFamilyStore, Memtable> recycleSegments = (cfs, current) -> + { + // Move to new commit log segment and try to flush all data. Also delete segments that no longer contain + // flushed data. + // This does not stop on errors and should retain segments for which flushing failed. + CommitLog.instance.forceRecycleAllSegments(); + + // Wait for started flushes to complete. + cfs.switchMemtableIfCurrent(current); + }; + + @Test + public void testOutOfOrderFlushRecovery() throws ExecutionException, InterruptedException, IOException + { + testOutOfOrderFlushRecovery(flush, false); + } + + @Test + public void testOutOfOrderLogDiscard() throws ExecutionException, InterruptedException, IOException + { + testOutOfOrderFlushRecovery(recycleSegments, false); + } + + @Test + public void testOutOfOrderFlushRecoveryWithCompaction() throws ExecutionException, InterruptedException, IOException + { + testOutOfOrderFlushRecovery(flush, true); + } + + @Test + public void testOutOfOrderLogDiscardWithCompaction() throws ExecutionException, InterruptedException, IOException + { + testOutOfOrderFlushRecovery(recycleSegments, true); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java index 1668ddc,479e4e2..84e3e05 --- a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java +++ b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java @@@ -299,14 -298,10 +299,11 @@@ public class TrackerTes Assert.assertTrue(tracker.getView().flushingMemtables.contains(prev2)); SSTableReader reader = MockSchema.sstable(0, 10, false, cfs); - tracker.replaceFlushed(prev2, Collections.singleton(reader)); + tracker.replaceFlushed(prev2, singleton(reader)); Assert.assertEquals(1, tracker.getView().sstables.size()); - Assert.assertEquals(1, tracker.getView().premature.size()); - tracker.permitCompactionOfFlushed(singleton(reader)); - Assert.assertEquals(0, tracker.getView().premature.size()); - Assert.assertEquals(1, listener.received.size()); - Assert.assertEquals(singleton(reader), ((SSTableAddedNotification) listener.received.get(0)).added); + Assert.assertEquals(2, listener.received.size()); + Assert.assertEquals(prev2, ((MemtableDiscardedNotification) listener.received.get(0)).memtable); + Assert.assertEquals(singleton(reader), ((SSTableAddedNotification) listener.received.get(1)).added); listener.received.clear(); Assert.assertTrue(reader.isKeyCacheSetup()); Assert.assertEquals(10, cfs.metric.liveDiskSpaceUsed.getCount()); @@@ -324,13 -319,10 +321,12 @@@ Assert.assertEquals(0, tracker.getView().sstables.size()); Assert.assertEquals(0, tracker.getView().flushingMemtables.size()); Assert.assertEquals(0, cfs.metric.liveDiskSpaceUsed.getCount()); - System.out.println(listener.received); - Assert.assertEquals(4, listener.received.size()); - Assert.assertEquals(3, listener.received.size()); - Assert.assertEquals(singleton(reader), ((SSTableAddedNotification) listener.received.get(0)).added); - Assert.assertTrue(listener.received.get(1) instanceof SSTableDeletingNotification); - Assert.assertEquals(1, ((SSTableListChangedNotification) listener.received.get(2)).removed.size()); ++ Assert.assertEquals(5, listener.received.size()); + Assert.assertEquals(prev1, ((MemtableSwitchedNotification) listener.received.get(0)).memtable); + Assert.assertEquals(prev1, ((MemtableDiscardedNotification) listener.received.get(1)).memtable); - Assert.assertTrue(listener.received.get(2) instanceof SSTableDeletingNotification); - Assert.assertEquals(1, ((SSTableListChangedNotification) listener.received.get(3)).removed.size()); ++ Assert.assertEquals(singleton(reader), ((SSTableAddedNotification) listener.received.get(2)).added); ++ Assert.assertTrue(listener.received.get(3) instanceof SSTableDeletingNotification); ++ Assert.assertEquals(1, ((SSTableListChangedNotification) listener.received.get(4)).removed.size()); DatabaseDescriptor.setIncrementalBackupsEnabled(backups); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java index a3382c4,de12d57..4bd4489 --- a/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java @@@ -32,7 -32,8 +32,8 @@@ import org.apache.cassandra.SchemaLoade import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.SerializationHeader; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.commitlog.CommitLogPosition; + import org.apache.cassandra.db.commitlog.IntervalSet; -import org.apache.cassandra.db.commitlog.ReplayPosition; import org.apache.cassandra.dht.RandomPartitioner; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; @@@ -85,8 -86,7 +86,7 @@@ public class MetadataSerializerTes CFMetaData cfm = SchemaLoader.standardCFMD("ks1", "cf1"); MetadataCollector collector = new MetadataCollector(cfm.comparator) - .commitLogLowerBound(cllb) - .commitLogUpperBound(club); - .commitLogIntervals(new IntervalSet(cllb, club)); ++ .commitLogIntervals(new IntervalSet<>(cllb, club)); String partitioner = RandomPartitioner.class.getCanonicalName(); double bfFpChance = 0.1;