http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/db/compaction/Upgrader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java index 34ec1dd..5a60ddd 100644 --- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java +++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java @@ -32,6 +32,7 @@ import org.apache.cassandra.io.sstable.*; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.format.SSTableWriter; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; +import org.apache.cassandra.io.sstable.metadata.StatsMetadata; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.OutputHandler; import org.apache.cassandra.utils.UUIDGen; @@ -66,13 +67,14 @@ public class Upgrader this.estimatedRows = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables); } - private SSTableWriter createCompactionWriter(long repairedAt) + private SSTableWriter createCompactionWriter(long repairedAt, UUID parentRepair) { MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.getComparator()); sstableMetadataCollector.sstableLevel(sstable.getSSTableLevel()); return SSTableWriter.create(cfs.newSSTableDescriptor(directory), estimatedRows, repairedAt, + parentRepair, cfs.metadata, sstableMetadataCollector, SerializationHeader.make(cfs.metadata(), Sets.newHashSet(sstable)), @@ -88,7 +90,8 @@ public class Upgrader AbstractCompactionStrategy.ScannerList scanners = strategyManager.getScanners(transaction.originals()); CompactionIterator iter = new CompactionIterator(transaction.opType(), scanners.scanners, controller, nowInSec, UUIDGen.getTimeUUID())) { - writer.switchWriter(createCompactionWriter(sstable.getSSTableMetadata().repairedAt)); + StatsMetadata metadata = sstable.getSSTableMetadata(); + writer.switchWriter(createCompactionWriter(metadata.repairedAt, metadata.pendingRepair)); while (iter.hasNext()) writer.append(iter.next());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/db/compaction/Verifier.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/Verifier.java b/src/java/org/apache/cassandra/db/compaction/Verifier.java index 467d50d..bca6e79 100644 --- a/src/java/org/apache/cassandra/db/compaction/Verifier.java +++ b/src/java/org/apache/cassandra/db/compaction/Verifier.java @@ -234,7 +234,7 @@ public class Verifier implements Closeable private void markAndThrow() throws IOException { - sstable.descriptor.getMetadataSerializer().mutateRepairedAt(sstable.descriptor, ActiveRepairService.UNREPAIRED_SSTABLE); + sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor, ActiveRepairService.UNREPAIRED_SSTABLE, sstable.getSSTableMetadata().pendingRepair); throw new CorruptSSTableException(new Exception(String.format("Invalid SSTable %s, please force repair", sstable.getFilename())), sstable.getFilename()); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java index 205aebe..e8f7d72 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java @@ -22,6 +22,7 @@ import java.io.File; import java.util.Collection; import java.util.List; import java.util.Set; +import java.util.UUID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,6 +56,7 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa protected final long estimatedTotalKeys; protected final long maxAge; protected final long minRepairedAt; + protected final UUID pendingRepair; protected final SSTableRewriter sstableWriter; protected final LifecycleTransaction txn; @@ -88,6 +90,7 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa maxAge = CompactionTask.getMaxDataAge(nonExpiredSSTables); sstableWriter = SSTableRewriter.construct(cfs, txn, keepOriginals, maxAge); minRepairedAt = CompactionTask.getMinRepairedAt(nonExpiredSSTables); + pendingRepair = CompactionTask.getPendingRepair(nonExpiredSSTables); locations = cfs.getDirectories().getWriteableLocations(); diskBoundaries = StorageService.getDiskBoundaries(cfs); locationIndex = -1; http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java index 4ffc747..cda7e38 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java @@ -71,6 +71,7 @@ public class DefaultCompactionWriter extends CompactionAwareWriter SSTableWriter writer = SSTableWriter.create(cfs.newSSTableDescriptor(getDirectories().getLocationForDisk(directory)), estimatedTotalKeys, minRepairedAt, + pendingRepair, cfs.metadata, new MetadataCollector(txn.originals(), cfs.metadata().comparator, sstableLevel), SerializationHeader.make(cfs.metadata(), nonExpiredSSTables), http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java index c2d3a7d..3959b4b 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java @@ -107,6 +107,7 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter sstableWriter.switchWriter(SSTableWriter.create(cfs.newSSTableDescriptor(getDirectories().getLocationForDisk(sstableDirectory)), keysPerSSTable, minRepairedAt, + pendingRepair, cfs.metadata, new MetadataCollector(txn.originals(), cfs.metadata().comparator, currentLevel), SerializationHeader.make(cfs.metadata(), txn.originals()), @@ -114,6 +115,5 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter txn)); partitionsWritten = 0; sstablesWritten = 0; - } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java index eb05a23..c4f84e8 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java @@ -110,6 +110,7 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter SSTableWriter writer = SSTableWriter.create(cfs.newSSTableDescriptor(getDirectories().getLocationForDisk(sstableDirectory)), estimatedTotalKeys / estimatedSSTables, minRepairedAt, + pendingRepair, cfs.metadata, new MetadataCollector(allSSTables, cfs.metadata().comparator, level), SerializationHeader.make(cfs.metadata(), nonExpiredSSTables), http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java index 3a3e805..a4af783 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java @@ -106,6 +106,7 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter SSTableWriter writer = SSTableWriter.create(cfs.newSSTableDescriptor(getDirectories().getLocationForDisk(location)), currentPartitionsToWrite, minRepairedAt, + pendingRepair, cfs.metadata, new MetadataCollector(allSSTables, cfs.metadata().comparator, 0), SerializationHeader.make(cfs.metadata(), nonExpiredSSTables), http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/dht/RangeStreamer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java index 46ca779..4d7c903 100644 --- a/src/java/org/apache/cassandra/dht/RangeStreamer.java +++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java @@ -156,7 +156,7 @@ public class RangeStreamer this.address = address; this.description = description; this.streamPlan = new StreamPlan(description, ActiveRepairService.UNREPAIRED_SSTABLE, connectionsPerHost, - true, false, connectSequentially); + true, false, connectSequentially, null); this.useStrictConsistency = useStrictConsistency; this.snitch = snitch; this.stateStore = stateStore; http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java index b1e15ed..1fa5d8e 100644 --- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java @@ -69,12 +69,13 @@ abstract class AbstractSSTableSimpleWriter implements Closeable SerializationHeader header = new SerializationHeader(true, metadata.get(), columns, EncodingStats.NO_STATS); if (makeRangeAware) - return SSTableTxnWriter.createRangeAware(metadata, 0, ActiveRepairService.UNREPAIRED_SSTABLE, formatType, 0, header); + return SSTableTxnWriter.createRangeAware(metadata, 0, ActiveRepairService.UNREPAIRED_SSTABLE, ActiveRepairService.NO_PENDING_REPAIR, formatType, 0, header); return SSTableTxnWriter.create(metadata, createDescriptor(directory, metadata.keyspace, metadata.name, formatType), 0, ActiveRepairService.UNREPAIRED_SSTABLE, + ActiveRepairService.NO_PENDING_REPAIR, 0, header, Collections.emptySet()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java index 47b37ef..7e79fa9 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java @@ -159,7 +159,7 @@ public class SSTableLoader implements StreamEventHandler client.init(keyspace); outputHandler.output("Established connection to initial hosts"); - StreamPlan plan = new StreamPlan("Bulk Load", 0, connectionsPerHost, false, false, false).connectionFactory(client.getConnectionFactory()); + StreamPlan plan = new StreamPlan("Bulk Load", 0, connectionsPerHost, false, false, false, null).connectionFactory(client.getConnectionFactory()); Map<InetAddress, Collection<Range<Token>>> endpointToRanges = client.getEndpointToRangesMap(); openSSTables(endpointToRanges); http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java index 75797a9..60b8962 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java @@ -20,6 +20,7 @@ package org.apache.cassandra.io.sstable; import java.io.IOException; import java.util.Collection; +import java.util.UUID; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; @@ -98,10 +99,10 @@ public class SSTableTxnWriter extends Transactional.AbstractTransactional implem } @SuppressWarnings("resource") // log and writer closed during doPostCleanup - public static SSTableTxnWriter create(ColumnFamilyStore cfs, Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header) + public static SSTableTxnWriter create(ColumnFamilyStore cfs, Descriptor descriptor, long keyCount, long repairedAt, UUID pendingRepair, int sstableLevel, SerializationHeader header) { LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.WRITE); - SSTableMultiWriter writer = cfs.createSSTableMultiWriter(descriptor, keyCount, repairedAt, sstableLevel, header, txn); + SSTableMultiWriter writer = cfs.createSSTableMultiWriter(descriptor, keyCount, repairedAt, pendingRepair, sstableLevel, header, txn); return new SSTableTxnWriter(txn, writer); } @@ -110,6 +111,7 @@ public class SSTableTxnWriter extends Transactional.AbstractTransactional implem public static SSTableTxnWriter createRangeAware(TableMetadataRef metadata, long keyCount, long repairedAt, + UUID pendingRepair, SSTableFormat.Type type, int sstableLevel, SerializationHeader header) @@ -120,7 +122,7 @@ public class SSTableTxnWriter extends Transactional.AbstractTransactional implem SSTableMultiWriter writer; try { - writer = new RangeAwareSSTableWriter(cfs, keyCount, repairedAt, type, sstableLevel, 0, txn, header); + writer = new RangeAwareSSTableWriter(cfs, keyCount, repairedAt, pendingRepair, type, sstableLevel, 0, txn, header); } catch (IOException e) { @@ -137,6 +139,7 @@ public class SSTableTxnWriter extends Transactional.AbstractTransactional implem Descriptor descriptor, long keyCount, long repairedAt, + UUID pendingRepair, int sstableLevel, SerializationHeader header, Collection<Index> indexes) @@ -144,12 +147,12 @@ public class SSTableTxnWriter extends Transactional.AbstractTransactional implem // if the column family store does not exist, we create a new default SSTableMultiWriter to use: LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.WRITE); MetadataCollector collector = new MetadataCollector(metadata.get().comparator).sstableLevel(sstableLevel); - SSTableMultiWriter writer = SimpleSSTableMultiWriter.create(descriptor, keyCount, repairedAt, metadata, collector, header, indexes, txn); + SSTableMultiWriter writer = SimpleSSTableMultiWriter.create(descriptor, keyCount, repairedAt, pendingRepair, metadata, collector, header, indexes, txn); return new SSTableTxnWriter(txn, writer); } - public static SSTableTxnWriter create(ColumnFamilyStore cfs, Descriptor desc, long keyCount, long repairedAt, SerializationHeader header) + public static SSTableTxnWriter create(ColumnFamilyStore cfs, Descriptor desc, long keyCount, long repairedAt, UUID pendingRepair, SerializationHeader header) { - return create(cfs, desc, keyCount, repairedAt, 0, header); + return create(cfs, desc, keyCount, repairedAt, pendingRepair, 0, header); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java b/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java index 2d7d967..a40ec18 100644 --- a/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java @@ -19,6 +19,7 @@ package org.apache.cassandra.io.sstable; import java.util.Collection; import java.util.Collections; +import java.util.UUID; import org.apache.cassandra.db.RowIndexEntry; import org.apache.cassandra.db.SerializationHeader; @@ -109,13 +110,14 @@ public class SimpleSSTableMultiWriter implements SSTableMultiWriter public static SSTableMultiWriter create(Descriptor descriptor, long keyCount, long repairedAt, + UUID pendingRepair, TableMetadataRef metadata, MetadataCollector metadataCollector, SerializationHeader header, Collection<Index> indexes, LifecycleTransaction txn) { - SSTableWriter writer = SSTableWriter.create(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, indexes, txn); + SSTableWriter writer = SSTableWriter.create(descriptor, keyCount, repairedAt, pendingRepair, metadata, metadataCollector, header, indexes, txn); return new SimpleSSTableMultiWriter(writer, txn); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java index 766a930..88c60e5 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.UUID; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.DecoratedKey; @@ -42,6 +43,7 @@ public class RangeAwareSSTableWriter implements SSTableMultiWriter private final int sstableLevel; private final long estimatedKeys; private final long repairedAt; + private final UUID pendingRepair; private final SSTableFormat.Type format; private final SerializationHeader header; private final LifecycleTransaction txn; @@ -51,13 +53,14 @@ public class RangeAwareSSTableWriter implements SSTableMultiWriter private final List<SSTableReader> finishedReaders = new ArrayList<>(); private SSTableMultiWriter currentWriter = null; - public RangeAwareSSTableWriter(ColumnFamilyStore cfs, long estimatedKeys, long repairedAt, SSTableFormat.Type format, int sstableLevel, long totalSize, LifecycleTransaction txn, SerializationHeader header) throws IOException + public RangeAwareSSTableWriter(ColumnFamilyStore cfs, long estimatedKeys, long repairedAt, UUID pendingRepair, SSTableFormat.Type format, int sstableLevel, long totalSize, LifecycleTransaction txn, SerializationHeader header) throws IOException { directories = cfs.getDirectories().getWriteableLocations(); this.sstableLevel = sstableLevel; this.cfs = cfs; this.estimatedKeys = estimatedKeys / directories.length; this.repairedAt = repairedAt; + this.pendingRepair = pendingRepair; this.format = format; this.txn = txn; this.header = header; @@ -69,7 +72,7 @@ public class RangeAwareSSTableWriter implements SSTableMultiWriter throw new IOException(String.format("Insufficient disk space to store %s", FBUtilities.prettyPrintMemory(totalSize))); Descriptor desc = cfs.newSSTableDescriptor(cfs.getDirectories().getLocationForDisk(localDir), format); - currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, header, txn); + currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, pendingRepair, sstableLevel, header, txn); } } @@ -91,7 +94,7 @@ public class RangeAwareSSTableWriter implements SSTableMultiWriter finishedWriters.add(currentWriter); Descriptor desc = cfs.newSSTableDescriptor(cfs.getDirectories().getLocationForDisk(directories[currentIndex]), format); - currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, header, txn); + currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, pendingRepair, sstableLevel, header, txn); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java index 87e12eb..716b27d 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java @@ -1751,6 +1751,17 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS return key; } + public boolean isPendingRepair() + { + return sstableMetadata.pendingRepair != ActiveRepairService.NO_PENDING_REPAIR; + } + + public boolean intersects(Collection<Range<Token>> ranges) + { + Range<Token> range = new Range<>(first.getToken(), last.getToken()); + return Iterables.any(ranges, r -> r.intersects(range)); + } + /** * TODO: Move someplace reusable */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java index 31354a0..1e183e2 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java @@ -54,6 +54,7 @@ import org.apache.cassandra.utils.concurrent.Transactional; public abstract class SSTableWriter extends SSTable implements Transactional { protected long repairedAt; + protected UUID pendingRepair; protected long maxDataAge = -1; protected final long keyCount; protected final MetadataCollector metadataCollector; @@ -75,6 +76,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional protected SSTableWriter(Descriptor descriptor, long keyCount, long repairedAt, + UUID pendingRepair, TableMetadataRef metadata, MetadataCollector metadataCollector, SerializationHeader header, @@ -83,6 +85,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional super(descriptor, components(metadata.get()), metadata, DatabaseDescriptor.getDiskOptimizationStrategy()); this.keyCount = keyCount; this.repairedAt = repairedAt; + this.pendingRepair = pendingRepair; this.metadataCollector = metadataCollector; this.header = header; this.rowIndexEntrySerializer = descriptor.version.getSSTableFormat().getIndexSerializer(metadata.get(), descriptor.version, header); @@ -92,6 +95,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional public static SSTableWriter create(Descriptor descriptor, Long keyCount, Long repairedAt, + UUID pendingRepair, TableMetadataRef metadata, MetadataCollector metadataCollector, SerializationHeader header, @@ -99,43 +103,46 @@ public abstract class SSTableWriter extends SSTable implements Transactional LifecycleTransaction txn) { Factory writerFactory = descriptor.getFormat().getWriterFactory(); - return writerFactory.open(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, observers(descriptor, indexes, txn.opType()), txn); + return writerFactory.open(descriptor, keyCount, repairedAt, pendingRepair, metadata, metadataCollector, header, observers(descriptor, indexes, txn.opType()), txn); } public static SSTableWriter create(Descriptor descriptor, long keyCount, long repairedAt, + UUID pendingRepair, int sstableLevel, SerializationHeader header, Collection<Index> indexes, LifecycleTransaction txn) { TableMetadataRef metadata = Schema.instance.getTableMetadataRef(descriptor); - return create(metadata, descriptor, keyCount, repairedAt, sstableLevel, header, indexes, txn); + return create(metadata, descriptor, keyCount, repairedAt, pendingRepair, sstableLevel, header, indexes, txn); } public static SSTableWriter create(TableMetadataRef metadata, Descriptor descriptor, long keyCount, long repairedAt, + UUID pendingRepair, int sstableLevel, SerializationHeader header, Collection<Index> indexes, LifecycleTransaction txn) { MetadataCollector collector = new MetadataCollector(metadata.get().comparator).sstableLevel(sstableLevel); - return create(descriptor, keyCount, repairedAt, metadata, collector, header, indexes, txn); + return create(descriptor, keyCount, repairedAt, pendingRepair, metadata, collector, header, indexes, txn); } @VisibleForTesting public static SSTableWriter create(Descriptor descriptor, long keyCount, long repairedAt, + UUID pendingRepair, SerializationHeader header, Collection<Index> indexes, LifecycleTransaction txn) { - return create(descriptor, keyCount, repairedAt, 0, header, indexes, txn); + return create(descriptor, keyCount, repairedAt, pendingRepair, 0, header, indexes, txn); } private static Set<Component> components(TableMetadata metadata) @@ -301,6 +308,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional return metadataCollector.finalizeMetadata(getPartitioner().getClass().getCanonicalName(), metadata().params.bloomFilterFpChance, repairedAt, + pendingRepair, header); } @@ -329,6 +337,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional public abstract SSTableWriter open(Descriptor descriptor, long keyCount, long repairedAt, + UUID pendingRepair, TableMetadataRef metadata, MetadataCollector metadataCollector, SerializationHeader header, http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/io/sstable/format/Version.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/Version.java b/src/java/org/apache/cassandra/io/sstable/format/Version.java index f900fc4..a07e48f 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/Version.java +++ b/src/java/org/apache/cassandra/io/sstable/format/Version.java @@ -51,6 +51,8 @@ public abstract class Version public abstract boolean hasCommitLogIntervals(); + public abstract boolean hasPendingRepair(); + public String getVersion() { return version; http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java index d949197..cad192a 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java @@ -19,6 +19,7 @@ package org.apache.cassandra.io.sstable.format.big; import java.util.Collection; import java.util.Set; +import java.util.UUID; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.schema.TableMetadataRef; @@ -83,13 +84,14 @@ public class BigFormat implements SSTableFormat public SSTableWriter open(Descriptor descriptor, long keyCount, long repairedAt, + UUID pendingRepair, TableMetadataRef metadata, MetadataCollector metadataCollector, SerializationHeader header, Collection<SSTableFlushObserver> observers, LifecycleTransaction txn) { - return new BigTableWriter(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, observers, txn); + return new BigTableWriter(descriptor, keyCount, repairedAt, pendingRepair, metadata, metadataCollector, header, observers, txn); } } @@ -110,13 +112,14 @@ public class BigFormat implements SSTableFormat // we always incremented the major version. static class BigVersion extends Version { - public static final String current_version = "mc"; + public static final String current_version = "md"; public static final String earliest_supported_version = "ma"; // ma (3.0.0): swap bf hash order // store rows natively // mb (3.0.7, 3.7): commit log lower bound included // mc (3.0.8, 3.9): commit log intervals included + // md (3.0.9, 3.10): pending repair session included // // NOTE: when adding a new version, please add that to LegacySSTableTest, too. @@ -124,6 +127,7 @@ public class BigFormat implements SSTableFormat public final int correspondingMessagingVersion; private final boolean hasCommitLogLowerBound; private final boolean hasCommitLogIntervals; + private final boolean hasPendingRepair; BigVersion(String version) { @@ -134,6 +138,7 @@ public class BigFormat implements SSTableFormat hasCommitLogLowerBound = version.compareTo("mb") >= 0; hasCommitLogIntervals = version.compareTo("mc") >= 0; + hasPendingRepair = version.compareTo("md") >= 0; } @Override @@ -154,6 +159,11 @@ public class BigFormat implements SSTableFormat return hasCommitLogIntervals; } + public boolean hasPendingRepair() + { + return hasPendingRepair; + } + @Override public int correspondingMessagingVersion() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java index e134f2d..4ae4331 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java @@ -17,13 +17,9 @@ */ package org.apache.cassandra.io.sstable.format.big; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; +import java.io.*; import java.nio.ByteBuffer; -import java.util.Collection; -import java.util.Map; -import java.util.Optional; +import java.util.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,13 +67,14 @@ public class BigTableWriter extends SSTableWriter public BigTableWriter(Descriptor descriptor, long keyCount, long repairedAt, + UUID pendingRepair, TableMetadataRef metadata, MetadataCollector metadataCollector, SerializationHeader header, Collection<SSTableFlushObserver> observers, LifecycleTransaction txn) { - super(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, observers); + super(descriptor, keyCount, repairedAt, pendingRepair, metadata, metadataCollector, header, observers); txn.trackNew(this); // must track before any files are created if (compression) http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java index 100cfdb..6a40d94 100644 --- a/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java +++ b/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java @@ -20,6 +20,7 @@ package org.apache.cassandra.io.sstable.metadata; import java.io.IOException; import java.util.EnumSet; import java.util.Map; +import java.util.UUID; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.format.Version; @@ -70,7 +71,7 @@ public interface IMetadataSerializer void mutateLevel(Descriptor descriptor, int newLevel) throws IOException; /** - * Mutate repairedAt time + * Mutate the repairedAt time and pendingRepair ID */ - void mutateRepairedAt(Descriptor descriptor, long newRepairedAt) throws IOException; + void mutateRepaired(Descriptor descriptor, long newRepairedAt, UUID newPendingRepair) throws IOException; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java index 3b32ae2..6af93ad 100644 --- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java +++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.EnumMap; import java.util.List; import java.util.Map; +import java.util.UUID; import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus; import com.clearspring.analytics.stream.cardinality.ICardinality; @@ -80,7 +81,8 @@ public class MetadataCollector implements PartitionStatisticsCollector true, ActiveRepairService.UNREPAIRED_SSTABLE, -1, - -1); + -1, + null); } protected EstimatedHistogram estimatedPartitionSize = defaultPartitionSizeHistogram(); @@ -275,7 +277,7 @@ public class MetadataCollector implements PartitionStatisticsCollector this.hasLegacyCounterShards = this.hasLegacyCounterShards || hasLegacyCounterShards; } - public Map<MetadataType, MetadataComponent> finalizeMetadata(String partitioner, double bloomFilterFPChance, long repairedAt, SerializationHeader header) + public Map<MetadataType, MetadataComponent> finalizeMetadata(String partitioner, double bloomFilterFPChance, long repairedAt, UUID pendingRepair, SerializationHeader header) { Map<MetadataType, MetadataComponent> components = new EnumMap<>(MetadataType.class); components.put(MetadataType.VALIDATION, new ValidationMetadata(partitioner, bloomFilterFPChance)); @@ -296,7 +298,8 @@ public class MetadataCollector implements PartitionStatisticsCollector hasLegacyCounterShards, repairedAt, totalColumnsSet, - totalRows)); + totalRows, + pendingRepair)); components.put(MetadataType.COMPACTION, new CompactionMetadata(cardinality)); components.put(MetadataType.HEADER, header.toComponent()); return components; http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java index e6e0953..2c1e0ec 100644 --- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java +++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java @@ -136,13 +136,14 @@ public class MetadataSerializer implements IMetadataSerializer rewriteSSTableMetadata(descriptor, currentComponents); } - public void mutateRepairedAt(Descriptor descriptor, long newRepairedAt) throws IOException + public void mutateRepaired(Descriptor descriptor, long newRepairedAt, UUID newPendingRepair) throws IOException { - logger.trace("Mutating {} to repairedAt time {}", descriptor.filenameFor(Component.STATS), newRepairedAt); + logger.trace("Mutating {} to repairedAt time {} and pendingRepair {}", + descriptor.filenameFor(Component.STATS), newRepairedAt, newPendingRepair); Map<MetadataType, MetadataComponent> currentComponents = deserialize(descriptor, EnumSet.allOf(MetadataType.class)); StatsMetadata stats = (StatsMetadata) currentComponents.remove(MetadataType.STATS); - // mutate level - currentComponents.put(MetadataType.STATS, stats.mutateRepairedAt(newRepairedAt)); + // mutate time & id + currentComponents.put(MetadataType.STATS, stats.mutateRepairedAt(newRepairedAt).mutatePendingRepair(newPendingRepair)); rewriteSSTableMetadata(descriptor, currentComponents); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java index 0f6434b..fe5d7bb 100644 --- a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java +++ b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import java.util.UUID; import org.apache.cassandra.io.ISerializer; import org.apache.cassandra.io.sstable.format.Version; @@ -34,6 +35,7 @@ import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.EstimatedHistogram; import org.apache.cassandra.utils.StreamingHistogram; +import org.apache.cassandra.utils.UUIDSerializer; /** * SSTable metadata that always stay on heap. @@ -61,6 +63,7 @@ public class StatsMetadata extends MetadataComponent public final long repairedAt; public final long totalColumnsSet; public final long totalRows; + public final UUID pendingRepair; public StatsMetadata(EstimatedHistogram estimatedPartitionSize, EstimatedHistogram estimatedColumnCount, @@ -79,7 +82,8 @@ public class StatsMetadata extends MetadataComponent boolean hasLegacyCounterShards, long repairedAt, long totalColumnsSet, - long totalRows) + long totalRows, + UUID pendingRepair) { this.estimatedPartitionSize = estimatedPartitionSize; this.estimatedColumnCount = estimatedColumnCount; @@ -99,6 +103,7 @@ public class StatsMetadata extends MetadataComponent this.repairedAt = repairedAt; this.totalColumnsSet = totalColumnsSet; this.totalRows = totalRows; + this.pendingRepair = pendingRepair; } public MetadataType getType() @@ -149,7 +154,8 @@ public class StatsMetadata extends MetadataComponent hasLegacyCounterShards, repairedAt, totalColumnsSet, - totalRows); + totalRows, + pendingRepair); } public StatsMetadata mutateRepairedAt(long newRepairedAt) @@ -171,7 +177,31 @@ public class StatsMetadata extends MetadataComponent hasLegacyCounterShards, newRepairedAt, totalColumnsSet, - totalRows); + totalRows, + pendingRepair); + } + + public StatsMetadata mutatePendingRepair(UUID newPendingRepair) + { + return new StatsMetadata(estimatedPartitionSize, + estimatedColumnCount, + commitLogIntervals, + minTimestamp, + maxTimestamp, + minLocalDeletionTime, + maxLocalDeletionTime, + minTTL, + maxTTL, + compressionRatio, + estimatedTombstoneDropTime, + sstableLevel, + minClusteringValues, + maxClusteringValues, + hasLegacyCounterShards, + repairedAt, + totalColumnsSet, + totalRows, + newPendingRepair); } @Override @@ -200,6 +230,7 @@ public class StatsMetadata extends MetadataComponent .append(hasLegacyCounterShards, that.hasLegacyCounterShards) .append(totalColumnsSet, that.totalColumnsSet) .append(totalRows, that.totalRows) + .append(pendingRepair, that.pendingRepair) .build(); } @@ -225,6 +256,7 @@ public class StatsMetadata extends MetadataComponent .append(hasLegacyCounterShards) .append(totalColumnsSet) .append(totalRows) + .append(pendingRepair) .build(); } @@ -253,6 +285,13 @@ public class StatsMetadata extends MetadataComponent size += CommitLogPosition.serializer.serializedSize(component.commitLogIntervals.lowerBound().orElse(CommitLogPosition.NONE)); if (version.hasCommitLogIntervals()) size += commitLogPositionSetSerializer.serializedSize(component.commitLogIntervals); + + if (version.hasPendingRepair()) + { + size += 1; + if (component.pendingRepair != null) + size += UUIDSerializer.serializer.serializedSize(component.pendingRepair, 0); + } return size; } @@ -286,6 +325,19 @@ public class StatsMetadata extends MetadataComponent CommitLogPosition.serializer.serialize(component.commitLogIntervals.lowerBound().orElse(CommitLogPosition.NONE), out); if (version.hasCommitLogIntervals()) commitLogPositionSetSerializer.serialize(component.commitLogIntervals, out); + + if (version.hasPendingRepair()) + { + if (component.pendingRepair != null) + { + out.writeByte(1); + UUIDSerializer.serializer.serialize(component.pendingRepair, out, 0); + } + else + { + out.writeByte(0); + } + } } public StatsMetadata deserialize(Version version, DataInputPlus in) throws IOException @@ -328,6 +380,12 @@ public class StatsMetadata extends MetadataComponent else commitLogIntervals = new IntervalSet<CommitLogPosition>(commitLogLowerBound, commitLogUpperBound); + UUID pendingRepair = null; + if (version.hasPendingRepair() && in.readByte() != 0) + { + pendingRepair = UUIDSerializer.serializer.deserialize(in, 0); + } + return new StatsMetadata(partitionSizes, columnCounts, commitLogIntervals, @@ -345,7 +403,8 @@ public class StatsMetadata extends MetadataComponent hasLegacyCounterShards, repairedAt, totalColumnsSet, - totalRows); + totalRows, + pendingRepair); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java index b97b836..19bf3d4 100644 --- a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java +++ b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java @@ -73,7 +73,7 @@ public class IncomingStreamingConnection extends Thread implements Closeable // The receiving side distinguish two connections by looking at StreamInitMessage#isForOutgoing. // Note: we cannot use the same socket for incoming and outgoing streams because we want to // parallelize said streams and the socket is blocking, so we might deadlock. - StreamResultFuture.initReceivingSide(init.sessionIndex, init.planId, init.description, init.from, this, init.isForOutgoing, version, init.keepSSTableLevel, init.isIncremental); + StreamResultFuture.initReceivingSide(init.sessionIndex, init.planId, init.description, init.from, this, init.isForOutgoing, version, init.keepSSTableLevel, init.isIncremental, init.pendingRepair); } catch (Throwable t) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/repair/AnticompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/AnticompactionTask.java b/src/java/org/apache/cassandra/repair/AnticompactionTask.java deleted file mode 100644 index 6e6bb65..0000000 --- a/src/java/org/apache/cassandra/repair/AnticompactionTask.java +++ /dev/null @@ -1,174 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.repair; - -import java.io.IOException; -import java.net.InetAddress; -import java.util.Collection; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import com.google.common.util.concurrent.AbstractFuture; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.SystemKeyspace; -import org.apache.cassandra.dht.Range; -import org.apache.cassandra.dht.Token; -import org.apache.cassandra.exceptions.RequestFailureReason; -import org.apache.cassandra.gms.ApplicationState; -import org.apache.cassandra.gms.EndpointState; -import org.apache.cassandra.gms.FailureDetector; -import org.apache.cassandra.gms.IEndpointStateChangeSubscriber; -import org.apache.cassandra.gms.IFailureDetectionEventListener; -import org.apache.cassandra.gms.VersionedValue; -import org.apache.cassandra.net.IAsyncCallbackWithFailure; -import org.apache.cassandra.net.MessageIn; -import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.repair.messages.AnticompactionRequest; -import org.apache.cassandra.utils.CassandraVersion; - -public class AnticompactionTask extends AbstractFuture<InetAddress> implements Runnable, IEndpointStateChangeSubscriber, - IFailureDetectionEventListener -{ - /* - * Version that anticompaction response is not supported up to. - * If Cassandra version is more than this, we need to wait for anticompaction response. - */ - private static final CassandraVersion VERSION_CHECKER = new CassandraVersion("2.1.5"); - private static Logger logger = LoggerFactory.getLogger(AnticompactionTask.class); - - private final UUID parentSession; - private final InetAddress neighbor; - private final Collection<Range<Token>> successfulRanges; - private final AtomicBoolean isFinished = new AtomicBoolean(false); - - public AnticompactionTask(UUID parentSession, InetAddress neighbor, Collection<Range<Token>> successfulRanges) - { - this.parentSession = parentSession; - this.neighbor = neighbor; - this.successfulRanges = successfulRanges; - } - - public void run() - { - if (FailureDetector.instance.isAlive(neighbor)) - { - AnticompactionRequest acr = new AnticompactionRequest(parentSession, successfulRanges); - CassandraVersion peerVersion = SystemKeyspace.getReleaseVersion(neighbor); - if (peerVersion != null && peerVersion.compareTo(VERSION_CHECKER) > 0) - { - MessagingService.instance().sendRR(acr.createMessage(), neighbor, new AnticompactionCallback(this), TimeUnit.DAYS.toMillis(1), true); - } - else - { - // immediately return after sending request - MessagingService.instance().sendOneWay(acr.createMessage(), neighbor); - maybeSetResult(neighbor); - } - } - else - { - maybeSetException(new IOException(neighbor + " is down")); - } - } - - private boolean maybeSetException(Throwable t) - { - if (isFinished.compareAndSet(false, true)) - { - setException(t); - return true; - } - return false; - } - - private boolean maybeSetResult(InetAddress o) - { - if (isFinished.compareAndSet(false, true)) - { - set(o); - return true; - } - return false; - } - - /** - * Callback for antitcompaction request. Run on INTERNAL_RESPONSE stage. - */ - public class AnticompactionCallback implements IAsyncCallbackWithFailure - { - final AnticompactionTask task; - - public AnticompactionCallback(AnticompactionTask task) - { - this.task = task; - } - - public void response(MessageIn msg) - { - maybeSetResult(msg.from); - } - - public boolean isLatencyForSnitch() - { - return false; - } - - public void onFailure(InetAddress from, RequestFailureReason failureReason) - { - maybeSetException(new RuntimeException("Anticompaction failed or timed out in " + from)); - } - } - - public void onJoin(InetAddress endpoint, EndpointState epState) {} - public void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {} - public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {} - public void onAlive(InetAddress endpoint, EndpointState state) {} - public void onDead(InetAddress endpoint, EndpointState state) {} - - public void onRemove(InetAddress endpoint) - { - convict(endpoint, Double.MAX_VALUE); - } - - public void onRestart(InetAddress endpoint, EndpointState epState) - { - convict(endpoint, Double.MAX_VALUE); - } - - public void convict(InetAddress endpoint, double phi) - { - if (!neighbor.equals(endpoint)) - return; - - // We want a higher confidence in the failure detection than usual because failing a repair wrongly has a high cost. - if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold()) - return; - - Exception exception = new IOException(String.format("Endpoint %s died during anti-compaction.", endpoint)); - if (maybeSetException(exception)) - { - // Though unlikely, it is possible to arrive here multiple time and we want to avoid print an error message twice - logger.error("[repair #{}] Endpoint {} died during anti-compaction", endpoint, parentSession, exception); - } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/repair/LocalSyncTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/LocalSyncTask.java b/src/java/org/apache/cassandra/repair/LocalSyncTask.java index cfc181e..56411d9 100644 --- a/src/java/org/apache/cassandra/repair/LocalSyncTask.java +++ b/src/java/org/apache/cassandra/repair/LocalSyncTask.java @@ -19,6 +19,7 @@ package org.apache.cassandra.repair; import java.net.InetAddress; import java.util.List; +import java.util.UUID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,13 +47,15 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler private static final Logger logger = LoggerFactory.getLogger(LocalSyncTask.class); private final long repairedAt; + private final UUID pendingRepair; private final boolean pullRepair; - public LocalSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2, long repairedAt, boolean pullRepair) + public LocalSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2, long repairedAt, UUID pendingRepair, boolean pullRepair) { super(desc, r1, r2); this.repairedAt = repairedAt; + this.pendingRepair = pendingRepair; this.pullRepair = pullRepair; } @@ -76,7 +79,7 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler isIncremental = prs.isIncremental; } Tracing.traceRepair(message); - StreamPlan plan = new StreamPlan("Repair", repairedAt, 1, false, isIncremental, false).listeners(this) + StreamPlan plan = new StreamPlan("Repair", repairedAt, 1, false, isIncremental, false, pendingRepair).listeners(this) .flushBeforeTransfer(true) // request ranges from the remote node .requestRanges(dst, preferred, desc.keyspace, differences, desc.columnFamily); http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/repair/RepairJob.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java index 7fc7816..07bc1e2 100644 --- a/src/java/org/apache/cassandra/repair/RepairJob.java +++ b/src/java/org/apache/cassandra/repair/RepairJob.java @@ -42,6 +42,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable private final RepairParallelism parallelismDegree; private final long repairedAt; private final ListeningExecutorService taskExecutor; + private final boolean isConsistent; /** * Create repair job to run on specific columnfamily @@ -49,13 +50,14 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable * @param session RepairSession that this RepairJob belongs * @param columnFamily name of the ColumnFamily to repair */ - public RepairJob(RepairSession session, String columnFamily) + public RepairJob(RepairSession session, String columnFamily, boolean isConsistent) { this.session = session; this.desc = new RepairJobDesc(session.parentRepairSession, session.getId(), session.keyspace, columnFamily, session.getRanges()); this.repairedAt = session.repairedAt; this.taskExecutor = session.taskExecutor; this.parallelismDegree = session.parallelismDegree; + this.isConsistent = isConsistent; } /** @@ -73,16 +75,26 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable // Create a snapshot at all nodes unless we're using pure parallel repairs if (parallelismDegree != RepairParallelism.PARALLEL) { - // Request snapshot to all replica - List<ListenableFuture<InetAddress>> snapshotTasks = new ArrayList<>(allEndpoints.size()); - for (InetAddress endpoint : allEndpoints) + ListenableFuture<List<InetAddress>> allSnapshotTasks; + if (isConsistent) { - SnapshotTask snapshotTask = new SnapshotTask(desc, endpoint); - snapshotTasks.add(snapshotTask); - taskExecutor.execute(snapshotTask); + // consistent repair does it's own "snapshotting" + allSnapshotTasks = Futures.immediateFuture(allEndpoints); } + else + { + // Request snapshot to all replica + List<ListenableFuture<InetAddress>> snapshotTasks = new ArrayList<>(allEndpoints.size()); + for (InetAddress endpoint : allEndpoints) + { + SnapshotTask snapshotTask = new SnapshotTask(desc, endpoint); + snapshotTasks.add(snapshotTask); + taskExecutor.execute(snapshotTask); + } + allSnapshotTasks = Futures.allAsList(snapshotTasks); + } + // When all snapshot complete, send validation requests - ListenableFuture<List<InetAddress>> allSnapshotTasks = Futures.allAsList(snapshotTasks); validations = Futures.transform(allSnapshotTasks, new AsyncFunction<List<InetAddress>, List<TreeResponse>>() { public ListenableFuture<List<TreeResponse>> apply(List<InetAddress> endpoints) @@ -118,7 +130,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable SyncTask task; if (r1.endpoint.equals(local) || r2.endpoint.equals(local)) { - task = new LocalSyncTask(desc, r1, r2, repairedAt, session.pullRepair); + task = new LocalSyncTask(desc, r1, r2, repairedAt, isConsistent ? desc.parentSessionId : null, session.pullRepair); } else { http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java index d7736f0..4f412f0 100644 --- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java +++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java @@ -21,8 +21,6 @@ import java.net.InetAddress; import java.util.*; import com.google.common.base.Predicate; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.MoreExecutors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,6 +44,12 @@ import org.apache.cassandra.service.ActiveRepairService; public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage> { private static final Logger logger = LoggerFactory.getLogger(RepairMessageVerbHandler.class); + + private boolean isConsistent(UUID sessionID) + { + return ActiveRepairService.instance.consistent.local.isSessionInProgress(sessionID); + } + public void doVerb(final MessageIn<RepairMessage> message, final int id) { // TODO add cancel/interrupt message @@ -122,7 +126,8 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage> return; } - Validator validator = new Validator(desc, message.from, validationRequest.gcBefore); + ActiveRepairService.instance.consistent.local.maybeSetRepairing(desc.parentSessionId); + Validator validator = new Validator(desc, message.from, validationRequest.gcBefore, isConsistent(desc.parentSessionId)); CompactionManager.instance.submitValidation(store, validator); break; @@ -134,24 +139,10 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage> if (desc.parentSessionId != null && ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId) != null) repairedAt = ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId).getRepairedAt(); - StreamingRepairTask task = new StreamingRepairTask(desc, request, repairedAt); + StreamingRepairTask task = new StreamingRepairTask(desc, request, repairedAt, isConsistent(desc.parentSessionId)); task.run(); break; - case ANTICOMPACTION_REQUEST: - AnticompactionRequest anticompactionRequest = (AnticompactionRequest) message.payload; - logger.debug("Got anticompaction request {}", anticompactionRequest); - ListenableFuture<?> compactionDone = ActiveRepairService.instance.doAntiCompaction(anticompactionRequest.parentRepairSession, anticompactionRequest.successfulRanges); - compactionDone.addListener(new Runnable() - { - @Override - public void run() - { - MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from); - } - }, MoreExecutors.directExecutor()); - break; - case CLEANUP: logger.debug("cleaning up repair"); CleanupMessage cleanup = (CleanupMessage) message.payload; @@ -159,6 +150,40 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage> MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from); break; + case CONSISTENT_REQUEST: + ActiveRepairService.instance.consistent.local.handlePrepareMessage(message.from, (PrepareConsistentRequest) message.payload); + break; + + case CONSISTENT_RESPONSE: + ActiveRepairService.instance.consistent.coordinated.handlePrepareResponse((PrepareConsistentResponse) message.payload); + break; + + case FINALIZE_PROPOSE: + ActiveRepairService.instance.consistent.local.handleFinalizeProposeMessage(message.from, (FinalizePropose) message.payload); + break; + + case FINALIZE_PROMISE: + ActiveRepairService.instance.consistent.coordinated.handleFinalizePromiseMessage((FinalizePromise) message.payload); + break; + + case FINALIZE_COMMIT: + ActiveRepairService.instance.consistent.local.handleFinalizeCommitMessage(message.from, (FinalizeCommit) message.payload); + break; + + case FAILED_SESSION: + FailSession failure = (FailSession) message.payload; + ActiveRepairService.instance.consistent.coordinated.handleFailSessionMessage(failure); + ActiveRepairService.instance.consistent.local.handleFailSessionMessage(message.from, failure); + break; + + case STATUS_REQUEST: + ActiveRepairService.instance.consistent.local.handleStatusRequest(message.from, (StatusRequest) message.payload); + break; + + case STATUS_RESPONSE: + ActiveRepairService.instance.consistent.local.handleStatusResponse(message.from, (StatusResponse) message.payload); + break; + default: ActiveRepairService.instance.handleMessage(message.from, message.payload); break;