Merge branch 'cassandra-2.1' into trunk Conflicts: src/java/org/apache/cassandra/db/compaction/CompactionManager.java src/java/org/apache/cassandra/db/compaction/Upgrader.java src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0f59629c Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0f59629c Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0f59629c Branch: refs/heads/trunk Commit: 0f59629ce280ba2a74d65a7719dde7cf79923f05 Parents: e60a06c 5160c91 Author: Marcus Eriksson <marc...@apache.org> Authored: Mon Nov 3 17:02:10 2014 +0100 Committer: Marcus Eriksson <marc...@apache.org> Committed: Mon Nov 3 17:02:10 2014 +0100 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../org/apache/cassandra/db/DataTracker.java | 109 ++-- .../db/compaction/CompactionManager.java | 29 +- .../cassandra/db/compaction/CompactionTask.java | 7 +- .../cassandra/db/compaction/Scrubber.java | 12 +- .../cassandra/db/compaction/Upgrader.java | 31 +- .../io/sstable/IndexSummaryManager.java | 2 +- .../cassandra/io/sstable/SSTableRewriter.java | 160 +++--- .../io/sstable/format/SSTableReader.java | 6 + .../db/compaction/AntiCompactionTest.java | 42 +- .../io/sstable/IndexSummaryManagerTest.java | 2 +- .../cassandra/io/sstable/SSTableReaderTest.java | 2 +- .../io/sstable/SSTableRewriterTest.java | 504 +++++++++++++++++++ 13 files changed, 755 insertions(+), 153 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f59629c/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 3a8ada2,32083cc..9754110 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,37 -1,6 +1,39 @@@ +3.0 + * Mark sstables as repaired after full repair (CASSANDRA-7586) + * Extend Descriptor to include a format value and refactor reader/writer apis (CASSANDRA-7443) + * Integrate JMH for microbenchmarks (CASSANDRA-8151) + * Keep sstable levels when bootstrapping (CASSANDRA-7460) + * Add Sigar library and perform basic OS settings check on startup (CASSANDRA-7838) + * Support for aggregation functions (CASSANDRA-4914) + * Remove cassandra-cli (CASSANDRA-7920) + * Accept dollar quoted strings in CQL (CASSANDRA-7769) + * Make assassinate a first class command (CASSANDRA-7935) + * Support IN clause on any clustering column (CASSANDRA-4762) + * Improve compaction logging (CASSANDRA-7818) + * Remove YamlFileNetworkTopologySnitch (CASSANDRA-7917) + * Do anticompaction in groups (CASSANDRA-6851) + * Support pure user-defined functions (CASSANDRA-7395, 7526, 7562, 7740, 7781, 7929, + 7924, 7812, 8063) + * Permit configurable timestamps with cassandra-stress (CASSANDRA-7416) + * Move sstable RandomAccessReader to nio2, which allows using the + FILE_SHARE_DELETE flag on Windows (CASSANDRA-4050) + * Remove CQL2 (CASSANDRA-5918) + * Add Thrift get_multi_slice call (CASSANDRA-6757) + * Optimize fetching multiple cells by name (CASSANDRA-6933) + * Allow compilation in java 8 (CASSANDRA-7028) + * Make incremental repair default (CASSANDRA-7250) + * Enable code coverage thru JaCoCo (CASSANDRA-7226) + * Switch external naming of 'column families' to 'tables' (CASSANDRA-4369) + * Shorten SSTable path (CASSANDRA-6962) + * Use unsafe mutations for most unit tests (CASSANDRA-6969) + * Fix race condition during calculation of pending ranges (CASSANDRA-7390) + * Fail on very large batch sizes (CASSANDRA-8011) + * improve concurrency of repair (CASSANDRA-6455) + + 2.1.2 + * Refactor how we track live size (CASSANDRA-7852) + * Make sure unfinished compaction files are removed (CASSANDRA-8124) * Fix shutdown when run as Windows service (CASSANDRA-8136) * Fix DESCRIBE TABLE with custom indexes (CASSANDRA-8031) * Fix race in RecoveryManagerTest (CASSANDRA-8176) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f59629c/src/java/org/apache/cassandra/db/DataTracker.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f59629c/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 3ee36cd,84c3cb5..cccb7f9 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@@ -1045,76 -987,63 +1046,78 @@@ public class CompactionManager implemen if (!new File(sstable.getFilename()).exists()) { logger.info("Skipping anticompaction for {}, required sstable was compacted and is no longer available.", sstable); + i.remove(); continue; } + if (groupMaxDataAge < sstable.maxDataAge) + groupMaxDataAge = sstable.maxDataAge; + } + + + if (anticompactionGroup.size() == 0) + { + logger.info("No valid anticompactions for this group, All sstables were compacted and are no longer available"); + return 0; + } - logger.info("Anticompacting {}", sstable); - Set<SSTableReader> sstableAsSet = new HashSet<>(); - sstableAsSet.add(sstable); + logger.info("Anticompacting {}", anticompactionGroup); + Set<SSTableReader> sstableAsSet = new HashSet<>(anticompactionGroup); - File destination = cfs.directories.getDirectoryForNewSSTables(); - SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, false); - SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, false); + File destination = cfs.directories.getDirectoryForNewSSTables(); - SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, groupMaxDataAge, OperationType.ANTICOMPACTION, false); - SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, groupMaxDataAge, OperationType.ANTICOMPACTION, false); ++ SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, groupMaxDataAge, false); ++ SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, groupMaxDataAge, false); - AbstractCompactionStrategy strategy = cfs.getCompactionStrategy(); - try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(new HashSet<>(Collections.singleton(sstable))); - CompactionController controller = new CompactionController(cfs, sstableAsSet, CFMetaData.DEFAULT_GC_GRACE_SECONDS)) - { - repairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, repairedAt, sstable)); - unRepairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstable)); + long repairedKeyCount = 0; + long unrepairedKeyCount = 0; + AbstractCompactionStrategy strategy = cfs.getCompactionStrategy(); + try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(anticompactionGroup); + CompactionController controller = new CompactionController(cfs, sstableAsSet, CFMetaData.DEFAULT_GC_GRACE_SECONDS)) + { + int expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(), (int)(SSTableReader.getApproximateKeyCount(anticompactionGroup))); - CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION, scanners.scanners, controller); - Iterator<AbstractCompactedRow> iter = ci.iterator(); - while(iter.hasNext()) + repairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, repairedAt, sstableAsSet)); + unRepairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstableAsSet)); + + CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION, scanners.scanners, controller, DatabaseDescriptor.getSSTableFormat()); + Iterator<AbstractCompactedRow> iter = ci.iterator(); + while(iter.hasNext()) + { + AbstractCompactedRow row = iter.next(); + // if current range from sstable is repaired, save it into the new repaired sstable + if (Range.isInRanges(row.key.getToken(), ranges)) { - AbstractCompactedRow row = iter.next(); - // if current range from sstable is repaired, save it into the new repaired sstable - if (Range.isInRanges(row.key.getToken(), ranges)) - { - repairedSSTableWriter.append(row); - repairedKeyCount++; - } - // otherwise save into the new 'non-repaired' table - else - { - unRepairedSSTableWriter.append(row); - unrepairedKeyCount++; - } + repairedSSTableWriter.append(row); + repairedKeyCount++; + } + // otherwise save into the new 'non-repaired' table + else + { + unRepairedSSTableWriter.append(row); + unrepairedKeyCount++; } - // we have the same readers being rewritten by both writers, so we ask the first one NOT to close them - // so that the second one can do so safely, without leaving us with references < 0 or any other ugliness - // add repaired table with a non-null timestamp field to be saved in SSTableMetadata#repairedAt - anticompactedSSTables.addAll(repairedSSTableWriter.finish(repairedAt)); - anticompactedSSTables.addAll(unRepairedSSTableWriter.finish(ActiveRepairService.UNREPAIRED_SSTABLE)); - cfs.getDataTracker().markCompactedSSTablesReplaced(sstableAsSet, anticompactedSSTables, OperationType.ANTICOMPACTION); - } - catch (Throwable e) - { - JVMStabilityInspector.inspectThrowable(e); - logger.error("Error anticompacting " + sstable, e); - repairedSSTableWriter.abort(); - unRepairedSSTableWriter.abort(); } + // we have the same readers being rewritten by both writers, so we ask the first one NOT to close them + // so that the second one can do so safely, without leaving us with references < 0 or any other ugliness - repairedSSTableWriter.finish(false, repairedAt); - unRepairedSSTableWriter.finish(ActiveRepairService.UNREPAIRED_SSTABLE); - // add repaired table with a non-null timestamp field to be saved in SSTableMetadata#repairedAt ++ List<SSTableReader> anticompactedSSTables = new ArrayList<>(); ++ anticompactedSSTables.addAll(repairedSSTableWriter.finish(repairedAt)); ++ anticompactedSSTables.addAll(unRepairedSSTableWriter.finish(ActiveRepairService.UNREPAIRED_SSTABLE)); ++ cfs.getDataTracker().markCompactedSSTablesReplaced(sstableAsSet, anticompactedSSTables, OperationType.ANTICOMPACTION); ++ + logger.debug("Repaired {} keys out of {} for {}/{} in {}", repairedKeyCount, + repairedKeyCount + unrepairedKeyCount, + cfs.keyspace.getName(), + cfs.getColumnFamilyName(), + anticompactionGroup); - return repairedSSTableWriter.finished().size() + unRepairedSSTableWriter.finished().size(); ++ return anticompactedSSTables.size(); } - String format = "Repaired {} keys of {} for {}/{}"; - logger.debug(format, repairedKeyCount, (repairedKeyCount + unrepairedKeyCount), cfs.keyspace, cfs.getColumnFamilyName()); - String format2 = "Anticompaction completed successfully, anticompacted from {} to {} sstable(s)."; - logger.info(format2, repairedSSTables.size(), anticompactedSSTables.size()); - - return anticompactedSSTables; + catch (Throwable e) + { + JVMStabilityInspector.inspectThrowable(e); + logger.error("Error anticompacting " + anticompactionGroup, e); + repairedSSTableWriter.abort(); + unRepairedSSTableWriter.abort(); + } + return 0; } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f59629c/src/java/org/apache/cassandra/db/compaction/CompactionTask.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/CompactionTask.java index 584ff38,b442482..808626b --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@@ -158,9 -150,9 +158,9 @@@ public class CompactionTask extends Abs try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(actuallyCompact)) { - AbstractCompactionIterable ci = new CompactionIterable(compactionType, scanners.scanners, controller); + AbstractCompactionIterable ci = new CompactionIterable(compactionType, scanners.scanners, controller, sstableFormat); Iterator<AbstractCompactedRow> iter = ci.iterator(); - + List<SSTableReader> newSStables; // we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to // replace the old entries. Track entries to preheat here until then. long minRepairedAt = getMinRepairedAt(actuallyCompact); http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f59629c/src/java/org/apache/cassandra/db/compaction/Scrubber.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f59629c/src/java/org/apache/cassandra/db/compaction/Upgrader.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/Upgrader.java index c9e7034,39f668d..52739de --- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java +++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java @@@ -21,8 -21,9 +21,9 @@@ import java.io.File import java.util.*; import com.google.common.base.Throwables; + import com.google.common.collect.Sets; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.io.sstable.*; @@@ -69,29 -67,24 +68,24 @@@ public class Upgrade // Get the max timestamp of the precompacted sstables // and adds generation of live ancestors - // -- note that we always only have one SSTable in toUpgrade here: - for (SSTableReader sstable : toUpgrade) + sstableMetadataCollector.addAncestor(sstable.descriptor.generation); + for (Integer i : sstable.getAncestors()) { - sstableMetadataCollector.addAncestor(sstable.descriptor.generation); - for (Integer i : sstable.getAncestors()) - { - if (new File(sstable.descriptor.withGeneration(i).filenameFor(Component.DATA)).exists()) - sstableMetadataCollector.addAncestor(i); - } - sstableMetadataCollector.sstableLevel(sstable.getSSTableLevel()); + if (new File(sstable.descriptor.withGeneration(i).filenameFor(Component.DATA)).exists()) + sstableMetadataCollector.addAncestor(i); } - + sstableMetadataCollector.sstableLevel(sstable.getSSTableLevel()); - return new SSTableWriter(cfs.getTempSSTablePath(directory), estimatedRows, repairedAt, cfs.metadata, cfs.partitioner, sstableMetadataCollector); + return SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(directory)), estimatedRows, repairedAt, cfs.metadata, cfs.partitioner, sstableMetadataCollector); } public void upgrade() { outputHandler.output("Upgrading " + sstable); - - SSTableRewriter writer = new SSTableRewriter(cfs, toUpgrade, CompactionTask.getMaxDataAge(this.toUpgrade), OperationType.UPGRADE_SSTABLES, true); - try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(this.toUpgrade)) + Set<SSTableReader> toUpgrade = Sets.newHashSet(sstable); + SSTableRewriter writer = new SSTableRewriter(cfs, toUpgrade, CompactionTask.getMaxDataAge(toUpgrade), true); + try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(toUpgrade)) { - Iterator<AbstractCompactedRow> iter = new CompactionIterable(compactionType, scanners.scanners, controller).iterator(); + Iterator<AbstractCompactedRow> iter = new CompactionIterable(compactionType, scanners.scanners, controller, DatabaseDescriptor.getSSTableFormat()).iterator(); writer.switchWriter(createCompactionWriter(sstable.getSSTableMetadata().repairedAt)); while (iter.hasNext()) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f59629c/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f59629c/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java index 18825cb,4d5a06f..f3d08a6 --- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java @@@ -34,11 -35,9 +35,11 @@@ import org.apache.cassandra.db.DataTrac import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.RowIndexEntry; import org.apache.cassandra.db.compaction.AbstractCompactedRow; - import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.format.SSTableWriter; import org.apache.cassandra.utils.CLibrary; import org.apache.cassandra.utils.FBUtilities; + import org.apache.cassandra.utils.Pair; /** * Wraps one or more writers as output for rewriting one or more readers: every sstable_preemptive_open_interval_in_mb