Give compaction strategies more control over sstable creation Patch by Blake Eggleston; reviewed by marcuse for CASSANDRA-8671
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9ed27277 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9ed27277 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9ed27277 Branch: refs/heads/trunk Commit: 9ed2727739c73d64086d09a86a407a77390f081a Parents: 0d86645 Author: Blake Eggleston <bdeggles...@gmail.com> Authored: Thu Aug 6 10:19:55 2015 -0700 Committer: Marcus Eriksson <marc...@apache.org> Committed: Thu Aug 20 20:47:40 2015 +0200 ---------------------------------------------------------------------- .../apache/cassandra/db/ColumnFamilyStore.java | 73 +++++++++--- .../org/apache/cassandra/db/Directories.java | 42 +++++-- src/java/org/apache/cassandra/db/Keyspace.java | 5 + src/java/org/apache/cassandra/db/Memtable.java | 32 +++-- .../compaction/AbstractCompactionStrategy.java | 24 +++- .../db/compaction/AbstractCompactionTask.java | 3 +- .../db/compaction/CompactionManager.java | 6 +- .../compaction/CompactionStrategyManager.java | 40 +++++-- .../cassandra/db/compaction/CompactionTask.java | 22 ++-- .../db/compaction/LeveledCompactionTask.java | 6 +- .../db/compaction/SSTableSplitter.java | 3 +- .../cassandra/db/compaction/Scrubber.java | 3 +- .../SizeTieredCompactionStrategy.java | 4 +- .../writers/CompactionAwareWriter.java | 53 ++++++--- .../writers/DefaultCompactionWriter.java | 32 ++--- .../writers/MajorLeveledCompactionWriter.java | 46 ++++---- .../writers/MaxSSTableSizeWriter.java | 45 ++++--- .../SplittingSizeTieredCompactionWriter.java | 52 ++++----- .../db/lifecycle/LifecycleTransaction.java | 9 ++ .../apache/cassandra/db/lifecycle/Tracker.java | 34 +++--- .../org/apache/cassandra/db/lifecycle/View.java | 4 +- .../io/sstable/AbstractSSTableSimpleWriter.java | 11 +- .../io/sstable/SSTableMultiWriter.java | 54 +++++++++ .../cassandra/io/sstable/SSTableTxnWriter.java | 43 +++++-- .../io/sstable/SimpleSSTableMultiWriter.java | 116 +++++++++++++++++++ .../notifications/SSTableAddedNotification.java | 4 +- .../cassandra/streaming/StreamReader.java | 22 ++-- .../cassandra/streaming/StreamReceiveTask.java | 22 ++-- .../compress/CompressedStreamReader.java | 8 +- .../streaming/messages/IncomingFileMessage.java | 7 +- .../cassandra/tools/SSTableExpiredBlockers.java | 3 +- .../cassandra/tools/SSTableLevelResetter.java | 2 +- .../cassandra/tools/SSTableOfflineRelevel.java | 5 +- .../cassandra/tools/StandaloneScrubber.java | 2 +- .../cassandra/tools/StandaloneUpgrader.java | 2 +- .../cassandra/tools/StandaloneVerifier.java | 7 +- .../db/compaction/LongCompactionsTest.java | 6 +- test/unit/org/apache/cassandra/MockSchema.java | 2 +- .../cassandra/db/ColumnFamilyStoreTest.java | 4 +- .../unit/org/apache/cassandra/db/ScrubTest.java | 12 +- .../db/compaction/AntiCompactionTest.java | 10 +- .../compaction/CompactionAwareWriterTest.java | 8 +- .../LeveledCompactionStrategyTest.java | 2 +- .../db/lifecycle/RealTransactionsTest.java | 8 +- .../cassandra/db/lifecycle/TrackerTest.java | 19 +-- .../apache/cassandra/db/lifecycle/ViewTest.java | 2 +- .../io/sstable/BigTableWriterTest.java | 4 +- .../io/sstable/CQLSSTableWriterClientTest.java | 2 + .../io/sstable/SSTableRewriterTest.java | 10 +- .../cassandra/io/sstable/SSTableUtils.java | 25 ++-- .../org/apache/cassandra/schema/DefsTest.java | 6 +- 51 files changed, 651 insertions(+), 315 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index a12de0a..b199c77 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -58,6 +58,7 @@ import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.io.sstable.*; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.format.*; +import org.apache.cassandra.io.sstable.metadata.MetadataCollector; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.metrics.TableMetrics.Sampler; import org.apache.cassandra.metrics.TableMetrics; @@ -75,6 +76,33 @@ import static org.apache.cassandra.utils.Throwables.maybeFail; public class ColumnFamilyStore implements ColumnFamilyStoreMBean { + // the directories used to load sstables on cfs instantiation + private static volatile Directories.DataDirectory[] initialDirectories = Directories.dataDirectories; + + /** + * a hook to add additional directories to initialDirectories. + * Any additional directories should be added prior to ColumnFamilyStore instantiation on startup + */ + public static synchronized void addInitialDirectories(Directories.DataDirectory[] newDirectories) + { + assert newDirectories != null; + + Set<Directories.DataDirectory> existing = Sets.newHashSet(initialDirectories); + + List<Directories.DataDirectory> replacementList = Lists.newArrayList(initialDirectories); + for (Directories.DataDirectory directory: newDirectories) + { + if (!existing.contains(directory)) + { + replacementList.add(directory); + } + } + + Directories.DataDirectory[] replacementArray = new Directories.DataDirectory[replacementList.size()]; + replacementList.toArray(replacementArray); + initialDirectories = replacementArray; + } + private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyStore.class); private static final ExecutorService flushExecutor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(), @@ -164,7 +192,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean private volatile DefaultInteger maxCompactionThreshold; private final CompactionStrategyManager compactionStrategyManager; - public final Directories directories; + private volatile Directories directories; public final TableMetrics metric; public volatile long sampleLatencyNanos; @@ -189,6 +217,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean cfs.maxCompactionThreshold = new DefaultInteger(metadata.params.compaction.maxCompactionThreshold()); compactionStrategyManager.maybeReload(metadata); + directories = compactionStrategyManager.getDirectories(); scheduleFlush(); @@ -330,6 +359,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean boolean loadSSTables, boolean registerBookkeeping) { + assert directories != null; assert metadata != null : "null metadata for " + keyspace + ":" + columnFamilyName; this.keyspace = keyspace; @@ -363,6 +393,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean // compaction strategy should be created after the CFS has been prepared this.compactionStrategyManager = new CompactionStrategyManager(this); + this.directories = this.compactionStrategyManager.getDirectories(); if (maxCompactionThreshold.value() <= 0 || minCompactionThreshold.value() <=0) { @@ -426,6 +457,22 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean } } + public Directories getDirectories() + { + return directories; + } + + public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header, LifecycleTransaction txn) + { + MetadataCollector collector = new MetadataCollector(metadata.comparator).sstableLevel(sstableLevel); + return createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, txn); + } + + public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, MetadataCollector metadataCollector, SerializationHeader header, LifecycleTransaction txn) + { + return getCompactionStrategyManager().createSSTableMultiWriter(descriptor, keyCount, repairedAt, metadataCollector, header, txn); + } + /** call when dropping or renaming a CF. Performs mbean housekeeping and invalidates CFS to other operations */ public void invalidate() { @@ -499,7 +546,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean boolean loadSSTables) { // get the max generation number, to prevent generation conflicts - Directories directories = new Directories(metadata); + Directories directories = new Directories(metadata, initialDirectories); Directories.SSTableLister lister = directories.sstableLister(Directories.OnTxnErr.IGNORE).includeBackups(true); List<Integer> generations = new ArrayList<Integer>(); for (Map.Entry<Descriptor, Set<Component>> entry : lister.list().entrySet()) @@ -633,7 +680,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean currentDescriptors.add(sstable.descriptor); Set<SSTableReader> newSSTables = new HashSet<>(); - Directories.SSTableLister lister = directories.sstableLister(Directories.OnTxnErr.IGNORE).skipTemporary(true); + Directories.SSTableLister lister = getDirectories().sstableLister(Directories.OnTxnErr.IGNORE).skipTemporary(true); for (Map.Entry<Descriptor, Set<Component>> entry : lister.list().entrySet()) { Descriptor descriptor = entry.getKey(); @@ -1378,9 +1425,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean maybeFail(data.dropSSTables(Predicates.in(sstables), compactionType, null)); } - void replaceFlushed(Memtable memtable, SSTableReader sstable) + void replaceFlushed(Memtable memtable, Collection<SSTableReader> sstables) { - compactionStrategyManager.replaceFlushed(memtable, sstable); + compactionStrategyManager.replaceFlushed(memtable, sstables); } public boolean isValid() @@ -1580,7 +1627,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean private void writeSnapshotManifest(final JSONArray filesJSONArr, final String snapshotName) { - final File manifestFile = directories.getSnapshotManifestFile(snapshotName); + final File manifestFile = getDirectories().getSnapshotManifestFile(snapshotName); try { @@ -1602,7 +1649,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean private void createEphemeralSnapshotMarkerFile(final String snapshot) { - final File ephemeralSnapshotMarker = directories.getNewEphemeralSnapshotMarkerFile(snapshot); + final File ephemeralSnapshotMarker = getDirectories().getNewEphemeralSnapshotMarkerFile(snapshot); try { @@ -1635,7 +1682,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean Map<Integer, SSTableReader> active = new HashMap<>(); for (SSTableReader sstable : getSSTables(SSTableSet.CANONICAL)) active.put(sstable.descriptor.generation, sstable); - Map<Descriptor, Set<Component>> snapshots = directories.sstableLister(Directories.OnTxnErr.IGNORE).snapshots(tag).list(); + Map<Descriptor, Set<Component>> snapshots = getDirectories().sstableLister(Directories.OnTxnErr.IGNORE).snapshots(tag).list(); Refs<SSTableReader> refs = new Refs<>(); try { @@ -1692,12 +1739,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean public boolean snapshotExists(String snapshotName) { - return directories.snapshotExists(snapshotName); + return getDirectories().snapshotExists(snapshotName); } public long getSnapshotCreationTime(String snapshotName) { - return directories.snapshotCreationTime(snapshotName); + return getDirectories().snapshotCreationTime(snapshotName); } /** @@ -1708,7 +1755,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean */ public void clearSnapshot(String snapshotName) { - List<File> snapshotDirs = directories.getCFDirectories(); + List<File> snapshotDirs = getDirectories().getCFDirectories(); Directories.clearSnapshot(snapshotName, snapshotDirs); } /** @@ -1718,7 +1765,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean */ public Map<String, Pair<Long,Long>> getSnapshotDetails() { - return directories.getSnapshotDetails(); + return getDirectories().getSnapshotDetails(); } /** @@ -2251,7 +2298,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean public long trueSnapshotsSize() { - return directories.trueSnapshotsSize(); + return getDirectories().trueSnapshotsSize(); } @VisibleForTesting http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/Directories.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Directories.java b/src/java/org/apache/cassandra/db/Directories.java index fa01269..90d2085 100644 --- a/src/java/org/apache/cassandra/db/Directories.java +++ b/src/java/org/apache/cassandra/db/Directories.java @@ -178,30 +178,36 @@ public class Directories } private final CFMetaData metadata; + private final DataDirectory[] paths; private final File[] dataPaths; + public Directories(final CFMetaData metadata) + { + this(metadata, dataDirectories); + } /** * Create Directories of given ColumnFamily. * SSTable directories are created under data_directories defined in cassandra.yaml if not exist at this time. * * @param metadata metadata of ColumnFamily */ - public Directories(final CFMetaData metadata) + public Directories(final CFMetaData metadata, DataDirectory[] paths) { this.metadata = metadata; + this.paths = paths; String cfId = ByteBufferUtil.bytesToHex(ByteBufferUtil.bytes(metadata.cfId)); int idx = metadata.cfName.indexOf(SECONDARY_INDEX_NAME_SEPARATOR); String cfName = idx >= 0 ? metadata.cfName.substring(0, idx) : metadata.cfName; String indexNameWithDot = idx >= 0 ? metadata.cfName.substring(idx) : null; - this.dataPaths = new File[dataDirectories.length]; + this.dataPaths = new File[paths.length]; // If upgraded from version less than 2.1, use existing directories String oldSSTableRelativePath = join(metadata.ksName, cfName); - for (int i = 0; i < dataDirectories.length; ++i) + for (int i = 0; i < paths.length; ++i) { // check if old SSTable directory exists - dataPaths[i] = new File(dataDirectories[i].location, oldSSTableRelativePath); + dataPaths[i] = new File(paths[i].location, oldSSTableRelativePath); } boolean olderDirectoryExists = Iterables.any(Arrays.asList(dataPaths), new Predicate<File>() { @@ -214,13 +220,13 @@ public class Directories { // use 2.1+ style String newSSTableRelativePath = join(metadata.ksName, cfName + '-' + cfId); - for (int i = 0; i < dataDirectories.length; ++i) - dataPaths[i] = new File(dataDirectories[i].location, newSSTableRelativePath); + for (int i = 0; i < paths.length; ++i) + dataPaths[i] = new File(paths[i].location, newSSTableRelativePath); } // if index, then move to its own directory if (indexNameWithDot != null) { - for (int i = 0; i < dataDirectories.length; ++i) + for (int i = 0; i < paths.length; ++i) dataPaths[i] = new File(dataPaths[i], indexNameWithDot); } @@ -327,7 +333,7 @@ public class Directories // pick directories with enough space and so that resulting sstable dirs aren't blacklisted for writes. boolean tooBig = false; - for (DataDirectory dataDir : dataDirectories) + for (DataDirectory dataDir : paths) { if (BlacklistedDirectories.isUnwritable(getLocationForDisk(dataDir))) { @@ -393,7 +399,7 @@ public class Directories long writeSize = expectedTotalWriteSize / estimatedSSTables; long totalAvailable = 0L; - for (DataDirectory dataDir : dataDirectories) + for (DataDirectory dataDir : paths) { if (BlacklistedDirectories.isUnwritable(getLocationForDisk(dataDir))) continue; @@ -481,6 +487,24 @@ public class Directories { return location.getUsableSpace(); } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + DataDirectory that = (DataDirectory) o; + + return location.equals(that.location); + + } + + @Override + public int hashCode() + { + return location.hashCode(); + } } static final class DataDirectoryCandidate implements Comparable<DataDirectoryCandidate> http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/Keyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java index 78b593b..c2613fe 100644 --- a/src/java/org/apache/cassandra/db/Keyspace.java +++ b/src/java/org/apache/cassandra/db/Keyspace.java @@ -210,6 +210,11 @@ public class Keyspace return cfs; } + public boolean hasColumnFamilyStore(UUID id) + { + return columnFamilyStores.containsKey(id); + } + /** * Take a snapshot of the specific column family, or the entire set of column families * if columnFamily is null with a given timestamp http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/Memtable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java index 1b30fc7..4a54666 100644 --- a/src/java/org/apache/cassandra/db/Memtable.java +++ b/src/java/org/apache/cassandra/db/Memtable.java @@ -47,7 +47,6 @@ import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.dht.*; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.format.SSTableReader; -import org.apache.cassandra.io.sstable.format.SSTableWriter; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; import org.apache.cassandra.io.util.DiskAwareRunnable; import org.apache.cassandra.service.ActiveRepairService; @@ -345,22 +344,22 @@ public class Memtable implements Comparable<Memtable> { long writeSize = getExpectedWriteSize(); Directories.DataDirectory dataDirectory = getWriteDirectory(writeSize); - File sstableDirectory = cfs.directories.getLocationForDisk(dataDirectory); + File sstableDirectory = cfs.getDirectories().getLocationForDisk(dataDirectory); assert sstableDirectory != null : "Flush task is not bound to any disk"; - SSTableReader sstable = writeSortedContents(context, sstableDirectory); - cfs.replaceFlushed(Memtable.this, sstable); + Collection<SSTableReader> sstables = writeSortedContents(context, sstableDirectory); + cfs.replaceFlushed(Memtable.this, sstables); } protected Directories getDirectories() { - return cfs.directories; + return cfs.getDirectories(); } - private SSTableReader writeSortedContents(ReplayPosition context, File sstableDirectory) + private Collection<SSTableReader> writeSortedContents(ReplayPosition context, File sstableDirectory) { logger.info("Writing {}", Memtable.this.toString()); - SSTableReader ssTable; + Collection<SSTableReader> ssTables; try (SSTableTxnWriter writer = createFlushWriter(cfs.getSSTablePath(sstableDirectory), columnsCollector.get(), statsCollector.get())) { boolean trackContention = logger.isDebugEnabled(); @@ -397,20 +396,20 @@ public class Memtable implements Comparable<Memtable> context)); // sstables should contain non-repaired data. - ssTable = writer.finish(true); + ssTables = writer.finish(true); } else { logger.info("Completed flushing {}; nothing needed to be retained. Commitlog position was {}", writer.getFilename(), context); writer.abort(); - ssTable = null; + ssTables = null; } if (heavilyContendedRowCount > 0) logger.debug(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, partitions.size(), Memtable.this.toString())); - return ssTable; + return ssTables; } } @@ -423,13 +422,12 @@ public class Memtable implements Comparable<Memtable> LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.FLUSH, cfs.metadata); MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.metadata.comparator).replayPosition(context); return new SSTableTxnWriter(txn, - SSTableWriter.create(Descriptor.fromFilename(filename), - (long)partitions.size(), - ActiveRepairService.UNREPAIRED_SSTABLE, - cfs.metadata, - sstableMetadataCollector, - new SerializationHeader(cfs.metadata, columns, stats), - txn)); + cfs.createSSTableMultiWriter(Descriptor.fromFilename(filename), + (long)partitions.size(), + ActiveRepairService.UNREPAIRED_SSTABLE, + sstableMetadataCollector, + new SerializationHeader(cfs.metadata, columns, stats), + txn)); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java index d9c9ea3..721fd70 100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java @@ -25,7 +25,12 @@ import com.google.common.base.Predicate; import com.google.common.collect.Iterables; import com.google.common.util.concurrent.RateLimiter; +import org.apache.cassandra.db.Directories; +import org.apache.cassandra.db.SerializationHeader; import org.apache.cassandra.db.lifecycle.SSTableSet; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.SimpleSSTableMultiWriter; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,6 +43,7 @@ import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.ISSTableScanner; +import org.apache.cassandra.io.sstable.metadata.MetadataCollector; import org.apache.cassandra.utils.JVMStabilityInspector; /** @@ -113,6 +119,11 @@ public abstract class AbstractCompactionStrategy } } + public Directories getDirectories() + { + return cfs.getDirectories(); + } + /** * For internal, temporary suspension of background compactions so that we can do exceptional * things like truncate or major compaction @@ -222,12 +233,12 @@ public abstract class AbstractCompactionStrategy * Handle a flushed memtable. * * @param memtable the flushed memtable - * @param sstable the written sstable. can be null if the memtable was clean. + * @param sstables the written sstables. can be null or empty if the memtable was clean. */ - public void replaceFlushed(Memtable memtable, SSTableReader sstable) + public void replaceFlushed(Memtable memtable, Collection<SSTableReader> sstables) { - cfs.getTracker().replaceFlushed(memtable, sstable); - if (sstable != null) + cfs.getTracker().replaceFlushed(memtable, sstables); + if (sstables != null && !sstables.isEmpty()) CompactionManager.instance.submitBackground(cfs); } @@ -493,4 +504,9 @@ public abstract class AbstractCompactionStrategy groupedSSTables.add(currGroup); return groupedSSTables; } + + public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, MetadataCollector meta, SerializationHeader header, LifecycleTransaction txn) + { + return SimpleSSTableMultiWriter.create(descriptor, keyCount, repairedAt, cfs.metadata, meta, header, txn); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java index 3bf224e..155bf2f 100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java @@ -20,6 +20,7 @@ package org.apache.cassandra.db.compaction; import java.util.Set; import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Directories; import org.apache.cassandra.db.compaction.CompactionManager.CompactionExecutorStatsCollector; import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter; import org.apache.cassandra.io.sstable.format.SSTableReader; @@ -63,7 +64,7 @@ public abstract class AbstractCompactionTask extends WrappedRunnable transaction.close(); } } - public abstract CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables); + public abstract CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, Directories directories, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables); protected abstract int executeInternal(CompactionExecutorStatsCollector collector); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 8aa16d5..66f9ed5 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -594,7 +594,7 @@ public class CompactionManager implements CompactionManagerMBean } // group by keyspace/columnfamily ColumnFamilyStore cfs = Keyspace.open(desc.ksname).getColumnFamilyStore(desc.cfname); - descriptors.put(cfs, cfs.directories.find(new File(filename.trim()).getName())); + descriptors.put(cfs, cfs.getDirectories().find(new File(filename.trim()).getName())); } List<Future<?>> futures = new ArrayList<>(); @@ -817,7 +817,7 @@ public class CompactionManager implements CompactionManagerMBean logger.info("Cleaning up {}", sstable); - File compactionFileLocation = cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(txn.originals(), OperationType.CLEANUP)); + File compactionFileLocation = cfs.getDirectories().getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(txn.originals(), OperationType.CLEANUP)); if (compactionFileLocation == null) throw new IOException("disk full"); @@ -1192,7 +1192,7 @@ public class CompactionManager implements CompactionManagerMBean logger.info("Anticompacting {}", anticompactionGroup); Set<SSTableReader> sstableAsSet = anticompactionGroup.originals(); - File destination = cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableAsSet, OperationType.ANTICOMPACTION)); + File destination = cfs.getDirectories().getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableAsSet, OperationType.ANTICOMPACTION)); long repairedKeyCount = 0; long unrepairedKeyCount = 0; int nowInSec = FBUtilities.nowInSeconds(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java index f5097af..47c8de8 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java @@ -27,15 +27,21 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Directories; import org.apache.cassandra.db.Memtable; +import org.apache.cassandra.db.SerializationHeader; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.db.lifecycle.SSTableSet; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.ISSTableScanner; +import org.apache.cassandra.io.sstable.metadata.MetadataCollector; import org.apache.cassandra.notifications.*; import org.apache.cassandra.schema.CompactionParams; +import org.apache.cassandra.service.ActiveRepairService; /** * Manages the compaction strategies. @@ -181,10 +187,10 @@ public class CompactionStrategyManager implements INotificationConsumer startup(); } - public void replaceFlushed(Memtable memtable, SSTableReader sstable) + public void replaceFlushed(Memtable memtable, Collection<SSTableReader> sstables) { - cfs.getTracker().replaceFlushed(memtable, sstable); - if (sstable != null) + cfs.getTracker().replaceFlushed(memtable, sstables); + if (sstables != null && !sstables.isEmpty()) CompactionManager.instance.submitBackground(cfs); } @@ -235,16 +241,24 @@ public class CompactionStrategyManager implements INotificationConsumer return repaired.shouldDefragment(); } + public Directories getDirectories() + { + assert repaired.getClass().equals(unrepaired.getClass()); + return repaired.getDirectories(); + } public synchronized void handleNotification(INotification notification, Object sender) { if (notification instanceof SSTableAddedNotification) { SSTableAddedNotification flushedNotification = (SSTableAddedNotification) notification; - if (flushedNotification.added.isRepaired()) - repaired.addSSTable(flushedNotification.added); - else - unrepaired.addSSTable(flushedNotification.added); + for (SSTableReader sstable : flushedNotification.added) + { + if (sstable.isRepaired()) + repaired.addSSTable(sstable); + else + unrepaired.addSSTable(sstable); + } } else if (notification instanceof SSTableListChangedNotification) { @@ -484,4 +498,16 @@ public class CompactionStrategyManager implements INotificationConsumer { return Boolean.parseBoolean(params.options().get(AbstractCompactionStrategy.ONLY_PURGE_REPAIRED_TOMBSTONES)); } + + public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, MetadataCollector collector, SerializationHeader header, LifecycleTransaction txn) + { + if (repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE) + { + return unrepaired.createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, txn); + } + else + { + return repaired.createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, txn); + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/CompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index 0bd6aae..1d96324 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -19,7 +19,6 @@ package org.apache.cassandra.db.compaction; import java.util.Collection; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; @@ -29,9 +28,9 @@ import com.google.common.base.Predicate; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; +import org.apache.cassandra.db.Directories; import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter; import org.apache.cassandra.db.compaction.writers.DefaultCompactionWriter; -import org.apache.cassandra.io.sstable.format.SSTableFormat; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -43,7 +42,6 @@ import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.db.compaction.CompactionManager.CompactionExecutorStatsCollector; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.service.ActiveRepairService; -import org.apache.cassandra.utils.CloseableIterator; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.concurrent.Refs; @@ -51,8 +49,8 @@ public class CompactionTask extends AbstractCompactionTask { protected static final Logger logger = LoggerFactory.getLogger(CompactionTask.class); protected final int gcBefore; - private final boolean offline; - private final boolean keepOriginals; + protected final boolean offline; + protected final boolean keepOriginals; protected static long totalBytesCompacted = 0; private CompactionExecutorStatsCollector collector; @@ -154,7 +152,7 @@ public class CompactionTask extends AbstractCompactionTask { Set<SSTableReader> actuallyCompact = Sets.difference(transaction.originals(), controller.getFullyExpiredSSTables()); - List<SSTableReader> newSStables; + Collection<SSTableReader> newSStables; long[] mergedRowCounts; @@ -173,7 +171,7 @@ public class CompactionTask extends AbstractCompactionTask if (!controller.cfs.getCompactionStrategyManager().isActive) throw new CompactionInterruptedException(ci.getCompactionInfo()); - try (CompactionAwareWriter writer = getCompactionAwareWriter(cfs, transaction, actuallyCompact)) + try (CompactionAwareWriter writer = getCompactionAwareWriter(cfs, getDirectories(), transaction, actuallyCompact)) { estimatedKeys = writer.estimatedKeys(); while (ci.hasNext()) @@ -228,10 +226,11 @@ public class CompactionTask extends AbstractCompactionTask @Override public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, + Directories directories, LifecycleTransaction transaction, Set<SSTableReader> nonExpiredSSTables) { - return new DefaultCompactionWriter(cfs, transaction, nonExpiredSSTables, offline, keepOriginals); + return new DefaultCompactionWriter(cfs, directories, transaction, nonExpiredSSTables, offline, keepOriginals); } public static String updateCompactionHistory(String keyspaceName, String columnFamilyName, long[] mergedRowCounts, long startSize, long endSize) @@ -252,6 +251,11 @@ public class CompactionTask extends AbstractCompactionTask return mergeSummary.toString(); } + protected Directories getDirectories() + { + return cfs.getDirectories(); + } + public static long getMinRepairedAt(Set<SSTableReader> actuallyCompact) { long minRepairedAt= Long.MAX_VALUE; @@ -264,7 +268,7 @@ public class CompactionTask extends AbstractCompactionTask protected void checkAvailableDiskSpace(long estimatedSSTables, long expectedWriteSize) { - while (!cfs.directories.hasAvailableDiskSpace(estimatedSSTables, expectedWriteSize)) + while (!getDirectories().hasAvailableDiskSpace(estimatedSSTables, expectedWriteSize)) { if (!reduceScopeForLimitedSpace()) throw new RuntimeException(String.format("Not enough space for compaction, estimated sstables = %d, expected write size = %d", estimatedSSTables, expectedWriteSize)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java index 11d113d..eeb3615 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java @@ -20,6 +20,7 @@ package org.apache.cassandra.db.compaction; import java.util.Set; import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Directories; import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter; import org.apache.cassandra.db.compaction.writers.MajorLeveledCompactionWriter; import org.apache.cassandra.db.compaction.writers.MaxSSTableSizeWriter; @@ -42,12 +43,13 @@ public class LeveledCompactionTask extends CompactionTask @Override public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, + Directories directories, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables) { if (majorCompaction) - return new MajorLeveledCompactionWriter(cfs, txn, nonExpiredSSTables, maxSSTableBytes, false, false); - return new MaxSSTableSizeWriter(cfs, txn, nonExpiredSSTables, maxSSTableBytes, getLevel(), false, false); + return new MajorLeveledCompactionWriter(cfs, directories, txn, nonExpiredSSTables, maxSSTableBytes, false, false); + return new MaxSSTableSizeWriter(cfs, directories, txn, nonExpiredSSTables, maxSSTableBytes, getLevel(), false, false); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java index 1944364..3655a37 100644 --- a/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java +++ b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java @@ -75,10 +75,11 @@ public class SSTableSplitter { @Override public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, + Directories directories, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables) { - return new MaxSSTableSizeWriter(cfs, txn, nonExpiredSSTables, sstableSizeInMB * 1024L * 1024L, 0, true, false); + return new MaxSSTableSizeWriter(cfs, directories, txn, nonExpiredSSTables, sstableSizeInMB * 1024L * 1024L, 0, true, false); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/Scrubber.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java index 747b956..c437832 100644 --- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java +++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java @@ -35,7 +35,6 @@ import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.io.util.RandomAccessReader; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.OutputHandler; import org.apache.cassandra.utils.UUIDGen; @@ -106,7 +105,7 @@ public class Scrubber implements Closeable List<SSTableReader> toScrub = Collections.singletonList(sstable); // Calculate the expected compacted filesize - this.destination = cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(toScrub, OperationType.SCRUB)); + this.destination = cfs.getDirectories().getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(toScrub, OperationType.SCRUB)); if (destination == null) throw new IOException("disk full"); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java index 2353aa3..05f446c 100644 --- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java @@ -26,6 +26,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Directories; import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter; import org.apache.cassandra.db.compaction.writers.SplittingSizeTieredCompactionWriter; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; @@ -342,10 +343,11 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy @Override public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, + Directories directories, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables) { - return new SplittingSizeTieredCompactionWriter(cfs, txn, nonExpiredSSTables); + return new SplittingSizeTieredCompactionWriter(cfs, directories, txn, nonExpiredSSTables); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java index 50e5a96..abc4107 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java @@ -18,14 +18,17 @@ package org.apache.cassandra.db.compaction.writers; +import java.util.Collection; import java.util.List; import java.util.Set; import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.Directories; import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.db.compaction.CompactionTask; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; import org.apache.cassandra.io.sstable.SSTableRewriter; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.utils.concurrent.Transactional; @@ -38,6 +41,7 @@ import org.apache.cassandra.utils.concurrent.Transactional; public abstract class CompactionAwareWriter extends Transactional.AbstractTransactional implements Transactional { protected final ColumnFamilyStore cfs; + protected final Directories directories; protected final Set<SSTableReader> nonExpiredSSTables; protected final long estimatedTotalKeys; protected final long maxAge; @@ -45,35 +49,25 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa protected final LifecycleTransaction txn; protected final SSTableRewriter sstableWriter; + private boolean isInitialized = false; public CompactionAwareWriter(ColumnFamilyStore cfs, - LifecycleTransaction txn, - Set<SSTableReader> nonExpiredSSTables) - { - this(cfs, txn, nonExpiredSSTables, false, false); - } - - public CompactionAwareWriter(ColumnFamilyStore cfs, + Directories directories, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, boolean offline, boolean keepOriginals) { this.cfs = cfs; + this.directories = directories; this.nonExpiredSSTables = nonExpiredSSTables; this.estimatedTotalKeys = SSTableReader.getApproximateKeyCount(nonExpiredSSTables); this.maxAge = CompactionTask.getMaxDataAge(nonExpiredSSTables); this.minRepairedAt = CompactionTask.getMinRepairedAt(nonExpiredSSTables); this.txn = txn; this.sstableWriter = new SSTableRewriter(cfs, txn, maxAge, offline).keepOriginals(keepOriginals); - } - /** - * Writes a partition in an implementation specific way - * @param partition the partition to append - * @return true if the partition was written, false otherwise - */ - public abstract boolean append(UnfilteredRowIterator partition); + } @Override protected Throwable doAbort(Throwable accumulate) @@ -98,7 +92,7 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa * @return all the written sstables sstables */ @Override - public List<SSTableReader> finish() + public Collection<SSTableReader> finish() { super.finish(); return sstableWriter.finished(); @@ -112,12 +106,39 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa return estimatedTotalKeys; } + public final boolean append(UnfilteredRowIterator partition) + { + maybeSwitchWriter(partition.partitionKey()); + return realAppend(partition); + } + + protected abstract boolean realAppend(UnfilteredRowIterator partition); + + /** + * Guaranteed to be called before the first call to realAppend. + * @param key + */ + protected void maybeSwitchWriter(DecoratedKey key) + { + if (!isInitialized) + switchCompactionLocation(getDirectories().getWriteableLocation(cfs.getExpectedCompactedFileSize(nonExpiredSSTables, txn.opType()))); + isInitialized = true; + } + + /** + * Implementations of this method should finish the current sstable writer and start writing to this directory. + * + * Called once before starting to append and then whenever we see a need to start writing to another directory. + * @param directory + */ + protected abstract void switchCompactionLocation(Directories.DataDirectory directory); + /** * The directories we can write to */ public Directories getDirectories() { - return cfs.directories; + return directories; } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java index eb55d20..8b90224 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java @@ -18,13 +18,13 @@ package org.apache.cassandra.db.compaction.writers; -import java.io.File; import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Directories; import org.apache.cassandra.db.SerializationHeader; import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; @@ -40,20 +40,28 @@ public class DefaultCompactionWriter extends CompactionAwareWriter { protected static final Logger logger = LoggerFactory.getLogger(DefaultCompactionWriter.class); - public DefaultCompactionWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables) + public DefaultCompactionWriter(ColumnFamilyStore cfs, Directories directories, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables) { - this(cfs, txn, nonExpiredSSTables, false, false); + this(cfs, directories, txn, nonExpiredSSTables, false, false); } @SuppressWarnings("resource") - public DefaultCompactionWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, boolean offline, boolean keepOriginals) + public DefaultCompactionWriter(ColumnFamilyStore cfs, Directories directories, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, boolean offline, boolean keepOriginals) + { + super(cfs, directories, txn, nonExpiredSSTables, offline, keepOriginals); + } + + @Override + public boolean realAppend(UnfilteredRowIterator partition) + { + return sstableWriter.append(partition) != null; + } + + @Override + protected void switchCompactionLocation(Directories.DataDirectory directory) { - super(cfs, txn, nonExpiredSSTables, offline, keepOriginals); - logger.debug("Expected bloom filter size : {}", estimatedTotalKeys); - long expectedWriteSize = cfs.getExpectedCompactedFileSize(nonExpiredSSTables, txn.opType()); - File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(expectedWriteSize)); @SuppressWarnings("resource") - SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)), + SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(getDirectories().getLocationForDisk(directory))), estimatedTotalKeys, minRepairedAt, cfs.metadata, @@ -64,12 +72,6 @@ public class DefaultCompactionWriter extends CompactionAwareWriter } @Override - public boolean append(UnfilteredRowIterator partition) - { - return sstableWriter.append(partition) != null; - } - - @Override public long estimatedKeys() { return estimatedTotalKeys; http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java index 73ce216..6d191f8 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java @@ -24,6 +24,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Directories; import org.apache.cassandra.db.RowIndexEntry; import org.apache.cassandra.db.SerializationHeader; import org.apache.cassandra.db.rows.UnfilteredRowIterator; @@ -47,43 +48,32 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter private int sstablesWritten = 0; public MajorLeveledCompactionWriter(ColumnFamilyStore cfs, + Directories directories, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, long maxSSTableSize) { - this(cfs, txn, nonExpiredSSTables, maxSSTableSize, false, false); + this(cfs, directories, txn, nonExpiredSSTables, maxSSTableSize, false, false); } @SuppressWarnings("resource") public MajorLeveledCompactionWriter(ColumnFamilyStore cfs, + Directories directories, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, long maxSSTableSize, boolean offline, boolean keepOriginals) { - super(cfs, txn, nonExpiredSSTables, offline, keepOriginals); + super(cfs, directories, txn, nonExpiredSSTables, offline, keepOriginals); this.maxSSTableSize = maxSSTableSize; this.allSSTables = txn.originals(); expectedWriteSize = Math.min(maxSSTableSize, cfs.getExpectedCompactedFileSize(nonExpiredSSTables, txn.opType())); - long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(nonExpiredSSTables) / maxSSTableSize); - long keysPerSSTable = estimatedTotalKeys / estimatedSSTables; - File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(expectedWriteSize)); - - @SuppressWarnings("resource") - SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)), - keysPerSSTable, - minRepairedAt, - cfs.metadata, - new MetadataCollector(allSSTables, cfs.metadata.comparator, currentLevel), - SerializationHeader.make(cfs.metadata, nonExpiredSSTables), - txn); - sstableWriter.switchWriter(writer); } @Override @SuppressWarnings("resource") - public boolean append(UnfilteredRowIterator partition) + public boolean realAppend(UnfilteredRowIterator partition) { long posBefore = sstableWriter.currentWriter().getOnDiskFilePointer(); RowIndexEntry rie = sstableWriter.append(partition); @@ -98,19 +88,25 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter } averageEstimatedKeysPerSSTable = Math.round(((double) averageEstimatedKeysPerSSTable * sstablesWritten + partitionsWritten) / (sstablesWritten + 1)); - File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(expectedWriteSize)); - SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)), - averageEstimatedKeysPerSSTable, - minRepairedAt, - cfs.metadata, - new MetadataCollector(allSSTables, cfs.metadata.comparator, currentLevel), - SerializationHeader.make(cfs.metadata, nonExpiredSSTables), - txn); - sstableWriter.switchWriter(writer); + switchCompactionLocation(getWriteDirectory(expectedWriteSize)); partitionsWritten = 0; sstablesWritten++; } return rie != null; } + + public void switchCompactionLocation(Directories.DataDirectory directory) + { + File sstableDirectory = getDirectories().getLocationForDisk(directory); + @SuppressWarnings("resource") + SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)), + averageEstimatedKeysPerSSTable, + minRepairedAt, + cfs.metadata, + new MetadataCollector(allSSTables, cfs.metadata.comparator, currentLevel), + SerializationHeader.make(cfs.metadata, nonExpiredSSTables), + txn); + sstableWriter.switchWriter(writer); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java index 241af0d..142fe87 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java @@ -17,10 +17,10 @@ */ package org.apache.cassandra.db.compaction.writers; -import java.io.File; import java.util.Set; import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Directories; import org.apache.cassandra.db.RowIndexEntry; import org.apache.cassandra.db.SerializationHeader; import org.apache.cassandra.db.rows.UnfilteredRowIterator; @@ -40,16 +40,18 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter private final Set<SSTableReader> allSSTables; public MaxSSTableSizeWriter(ColumnFamilyStore cfs, + Directories directories, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, long maxSSTableSize, int level) { - this(cfs, txn, nonExpiredSSTables, maxSSTableSize, level, false, false); + this(cfs, directories, txn, nonExpiredSSTables, maxSSTableSize, level, false, false); } @SuppressWarnings("resource") public MaxSSTableSizeWriter(ColumnFamilyStore cfs, + Directories directories, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, long maxSSTableSize, @@ -57,7 +59,7 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter boolean offline, boolean keepOriginals) { - super(cfs, txn, nonExpiredSSTables, offline, keepOriginals); + super(cfs, directories, txn, nonExpiredSSTables, offline, keepOriginals); this.allSSTables = txn.originals(); this.level = level; this.maxSSTableSize = maxSSTableSize; @@ -65,37 +67,30 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter expectedWriteSize = Math.min(maxSSTableSize, totalSize); estimatedTotalKeys = SSTableReader.getApproximateKeyCount(nonExpiredSSTables); estimatedSSTables = Math.max(1, estimatedTotalKeys / maxSSTableSize); - File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(expectedWriteSize)); + } + + @Override + public boolean realAppend(UnfilteredRowIterator partition) + { + RowIndexEntry rie = sstableWriter.append(partition); + if (sstableWriter.currentWriter().getOnDiskFilePointer() > maxSSTableSize) + switchCompactionLocation(getWriteDirectory(expectedWriteSize)); + return rie != null; + } + + public void switchCompactionLocation(Directories.DataDirectory location) + { @SuppressWarnings("resource") - SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)), + SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(getDirectories().getLocationForDisk(location))), estimatedTotalKeys / estimatedSSTables, minRepairedAt, cfs.metadata, new MetadataCollector(allSSTables, cfs.metadata.comparator, level), SerializationHeader.make(cfs.metadata, nonExpiredSSTables), txn); - sstableWriter.switchWriter(writer); - } - @Override - public boolean append(UnfilteredRowIterator partition) - { - RowIndexEntry rie = sstableWriter.append(partition); - if (sstableWriter.currentWriter().getOnDiskFilePointer() > maxSSTableSize) - { - File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(expectedWriteSize)); - @SuppressWarnings("resource") - SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)), - estimatedTotalKeys / estimatedSSTables, - minRepairedAt, - cfs.metadata, - new MetadataCollector(allSSTables, cfs.metadata.comparator, level), - SerializationHeader.make(cfs.metadata, nonExpiredSSTables), - txn); + sstableWriter.switchWriter(writer); - sstableWriter.switchWriter(writer); - } - return rie != null; } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java index 65924fa..07ca3d0 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java @@ -25,6 +25,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Directories; import org.apache.cassandra.db.RowIndexEntry; import org.apache.cassandra.db.SerializationHeader; import org.apache.cassandra.db.rows.UnfilteredRowIterator; @@ -51,15 +52,15 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter private long currentBytesToWrite; private int currentRatioIndex = 0; - public SplittingSizeTieredCompactionWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables) + public SplittingSizeTieredCompactionWriter(ColumnFamilyStore cfs, Directories directories, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables) { - this(cfs, txn, nonExpiredSSTables, DEFAULT_SMALLEST_SSTABLE_BYTES); + this(cfs, directories, txn, nonExpiredSSTables, DEFAULT_SMALLEST_SSTABLE_BYTES); } @SuppressWarnings("resource") - public SplittingSizeTieredCompactionWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, long smallestSSTable) + public SplittingSizeTieredCompactionWriter(ColumnFamilyStore cfs, Directories directories, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, long smallestSSTable) { - super(cfs, txn, nonExpiredSSTables, false, false); + super(cfs, directories, txn, nonExpiredSSTables, false, false); this.allSSTables = txn.originals(); totalSize = cfs.getExpectedCompactedFileSize(nonExpiredSSTables, txn.opType()); double[] potentialRatios = new double[20]; @@ -81,43 +82,38 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter } } ratios = Arrays.copyOfRange(potentialRatios, 0, noPointIndex); - File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(Math.round(totalSize * ratios[currentRatioIndex]))); long currentPartitionsToWrite = Math.round(estimatedTotalKeys * ratios[currentRatioIndex]); currentBytesToWrite = Math.round(totalSize * ratios[currentRatioIndex]); - @SuppressWarnings("resource") - SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)), - currentPartitionsToWrite, - minRepairedAt, - cfs.metadata, - new MetadataCollector(allSSTables, cfs.metadata.comparator, 0), - SerializationHeader.make(cfs.metadata, nonExpiredSSTables), - txn); - - sstableWriter.switchWriter(writer); + switchCompactionLocation(getWriteDirectory(currentBytesToWrite)); logger.debug("Ratios={}, expectedKeys = {}, totalSize = {}, currentPartitionsToWrite = {}, currentBytesToWrite = {}", ratios, estimatedTotalKeys, totalSize, currentPartitionsToWrite, currentBytesToWrite); } @Override - public boolean append(UnfilteredRowIterator partition) + public boolean realAppend(UnfilteredRowIterator partition) { RowIndexEntry rie = sstableWriter.append(partition); if (sstableWriter.currentWriter().getOnDiskFilePointer() > currentBytesToWrite && currentRatioIndex < ratios.length - 1) // if we underestimate how many keys we have, the last sstable might get more than we expect { currentRatioIndex++; currentBytesToWrite = Math.round(totalSize * ratios[currentRatioIndex]); - long currentPartitionsToWrite = Math.round(ratios[currentRatioIndex] * estimatedTotalKeys); - File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(Math.round(totalSize * ratios[currentRatioIndex]))); - @SuppressWarnings("resource") - SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)), - currentPartitionsToWrite, - minRepairedAt, - cfs.metadata, - new MetadataCollector(allSSTables, cfs.metadata.comparator, 0), - SerializationHeader.make(cfs.metadata, nonExpiredSSTables), - txn); - sstableWriter.switchWriter(writer); - logger.debug("Switching writer, currentPartitionsToWrite = {}", currentPartitionsToWrite); + switchCompactionLocation(getWriteDirectory(Math.round(totalSize * ratios[currentRatioIndex]))); } return rie != null; } + + public void switchCompactionLocation(Directories.DataDirectory location) + { + long currentPartitionsToWrite = Math.round(ratios[currentRatioIndex] * estimatedTotalKeys); + @SuppressWarnings("resource") + SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(getDirectories().getLocationForDisk(location))), + currentPartitionsToWrite, + minRepairedAt, + cfs.metadata, + new MetadataCollector(allSSTables, cfs.metadata.comparator, 0), + SerializationHeader.make(cfs.metadata, nonExpiredSSTables), + txn); + logger.debug("Switching writer, currentPartitionsToWrite = {}", currentPartitionsToWrite); + sstableWriter.switchWriter(writer); + + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java index c6cb979..520b229 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java +++ b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java @@ -338,6 +338,7 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional return accumulate; } + /** * update a reader: if !original, this is a reader that is being introduced by this transaction; * otherwise it must be in the originals() set, i.e. a reader guarded by this transaction @@ -355,6 +356,14 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional reader.setupOnline(); } + public void update(Collection<SSTableReader> readers, boolean original) + { + for(SSTableReader reader: readers) + { + update(reader, original); + } + } + /** * mark this reader as for obsoletion : on checkpoint() the reader will be removed from the live set */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/lifecycle/Tracker.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java index 6f6aca9..d028493 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java +++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java @@ -186,11 +186,8 @@ public class Tracker public void addSSTables(Iterable<SSTableReader> sstables) { addInitialSSTables(sstables); - for (SSTableReader sstable : sstables) - { - maybeIncrementallyBackup(sstable); - notifyAdded(sstable); - } + maybeIncrementallyBackup(sstables); + notifyAdded(sstables); } /** (Re)initializes the tracker, purging all references. */ @@ -330,10 +327,10 @@ public class Tracker apply(View.markFlushing(memtable)); } - public void replaceFlushed(Memtable memtable, SSTableReader sstable) + public void replaceFlushed(Memtable memtable, Collection<SSTableReader> sstables) { assert !isDummy(); - if (sstable == null) + if (sstables == null || sstables.isEmpty()) { // sstable may be null if we flushed batchlog and nothing needed to be retained // if it's null, we don't care what state the cfstore is in, we just replace it and continue @@ -341,16 +338,16 @@ public class Tracker return; } - sstable.setupOnline(); + sstables.forEach(SSTableReader::setupOnline); // back up before creating a new Snapshot (which makes the new one eligible for compaction) - maybeIncrementallyBackup(sstable); + maybeIncrementallyBackup(sstables); - apply(View.replaceFlushed(memtable, sstable)); + apply(View.replaceFlushed(memtable, sstables)); Throwable fail; - fail = updateSizeTracking(emptySet(), singleton(sstable), null); + fail = updateSizeTracking(emptySet(), sstables, null); // TODO: if we're invalidated, should we notifyadded AND removed, or just skip both? - fail = notifyAdded(sstable, fail); + fail = notifyAdded(sstables, fail); if (!isDummy() && !cfstore.isValid()) dropSSTables(); @@ -377,13 +374,16 @@ public class Tracker return view.get().getUncompacting(candidates); } - public void maybeIncrementallyBackup(final SSTableReader sstable) + public void maybeIncrementallyBackup(final Iterable<SSTableReader> sstables) { if (!DatabaseDescriptor.isIncrementalBackupsEnabled()) return; - File backupsDir = Directories.getBackupsDirectory(sstable.descriptor); - sstable.createLinks(FileUtils.getCanonicalPath(backupsDir)); + for (SSTableReader sstable : sstables) + { + File backupsDir = Directories.getBackupsDirectory(sstable.descriptor); + sstable.createLinks(FileUtils.getCanonicalPath(backupsDir)); + } } // NOTIFICATION @@ -405,7 +405,7 @@ public class Tracker return accumulate; } - Throwable notifyAdded(SSTableReader added, Throwable accumulate) + Throwable notifyAdded(Iterable<SSTableReader> added, Throwable accumulate) { INotification notification = new SSTableAddedNotification(added); for (INotificationConsumer subscriber : subscribers) @@ -422,7 +422,7 @@ public class Tracker return accumulate; } - public void notifyAdded(SSTableReader added) + public void notifyAdded(Iterable<SSTableReader> added) { maybeFail(notifyAdded(added, null)); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/lifecycle/View.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/lifecycle/View.java b/src/java/org/apache/cassandra/db/lifecycle/View.java index 7ee0fdf..b62c7e3 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/View.java +++ b/src/java/org/apache/cassandra/db/lifecycle/View.java @@ -310,7 +310,7 @@ public class View } // called after flush: removes memtable from flushingMemtables, and inserts flushed into the live sstable set - static Function<View, View> replaceFlushed(final Memtable memtable, final SSTableReader flushed) + static Function<View, View> replaceFlushed(final Memtable memtable, final Collection<SSTableReader> flushed) { return new Function<View, View>() { @@ -323,7 +323,7 @@ public class View return new View(view.liveMemtables, flushingMemtables, view.sstablesMap, view.compactingMap, view.intervalTree); - Map<SSTableReader, SSTableReader> sstableMap = replace(view.sstablesMap, emptySet(), singleton(flushed)); + Map<SSTableReader, SSTableReader> sstableMap = replace(view.sstablesMap, emptySet(), flushed); return new View(view.liveMemtables, flushingMemtables, sstableMap, view.compactingMap, SSTableIntervalTree.build(sstableMap.keySet())); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java index f4b4da8..d94b219 100644 --- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java @@ -60,11 +60,12 @@ abstract class AbstractSSTableSimpleWriter implements Closeable protected SSTableTxnWriter createWriter() { - return SSTableTxnWriter.create(createDescriptor(directory, metadata.ksName, metadata.cfName, formatType), - 0, - ActiveRepairService.UNREPAIRED_SSTABLE, - 0, - new SerializationHeader(metadata, columns, EncodingStats.NO_STATS)); + return SSTableTxnWriter.create(metadata, + createDescriptor(directory, metadata.ksName, metadata.cfName, formatType), + 0, + ActiveRepairService.UNREPAIRED_SSTABLE, + 0, + new SerializationHeader(metadata, columns, EncodingStats.NO_STATS)); } private static Descriptor createDescriptor(File directory, final String keyspace, final String columnFamily, final SSTableFormat.Type fmt) http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/io/sstable/SSTableMultiWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableMultiWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableMultiWriter.java new file mode 100644 index 0000000..0bb3721 --- /dev/null +++ b/src/java/org/apache/cassandra/io/sstable/SSTableMultiWriter.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.sstable; + +import java.util.Collection; +import java.util.List; +import java.util.UUID; + +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.utils.Throwables; +import org.apache.cassandra.utils.concurrent.Transactional; + +public interface SSTableMultiWriter extends Transactional +{ + + /** + * Writes a partition in an implementation specific way + * @param partition the partition to append + * @return true if the partition was written, false otherwise + */ + boolean append(UnfilteredRowIterator partition); + + Collection<SSTableReader> finish(long repairedAt, long maxDataAge, boolean openResult); + Collection<SSTableReader> finish(boolean openResult); + Collection<SSTableReader> finished(); + + SSTableMultiWriter setOpenResult(boolean openResult); + + String getFilename(); + long getFilePointer(); + UUID getCfId(); + + static void abortOrDie(SSTableMultiWriter writer) + { + Throwables.maybeFail(writer.abort(null)); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java index 42bffb1..6e1ac38 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java @@ -18,13 +18,17 @@ package org.apache.cassandra.io.sstable; -import org.apache.cassandra.db.RowIndexEntry; +import java.util.Collection; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.SerializationHeader; import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.io.sstable.format.SSTableReader; -import org.apache.cassandra.io.sstable.format.SSTableWriter; +import org.apache.cassandra.io.sstable.metadata.MetadataCollector; import org.apache.cassandra.utils.concurrent.Transactional; /** @@ -35,15 +39,15 @@ import org.apache.cassandra.utils.concurrent.Transactional; public class SSTableTxnWriter extends Transactional.AbstractTransactional implements Transactional { private final LifecycleTransaction txn; - private final SSTableWriter writer; + private final SSTableMultiWriter writer; - public SSTableTxnWriter(LifecycleTransaction txn, SSTableWriter writer) + public SSTableTxnWriter(LifecycleTransaction txn, SSTableMultiWriter writer) { this.txn = txn; this.writer = writer; } - public RowIndexEntry append(UnfilteredRowIterator iterator) + public boolean append(UnfilteredRowIterator iterator) { return writer.append(iterator); } @@ -74,28 +78,43 @@ public class SSTableTxnWriter extends Transactional.AbstractTransactional implem writer.prepareToCommit(); } - public SSTableReader finish(boolean openResult) + public Collection<SSTableReader> finish(boolean openResult) { writer.setOpenResult(openResult); finish(); return writer.finished(); } - public static SSTableTxnWriter create(Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header) + public static SSTableTxnWriter create(ColumnFamilyStore cfs, Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header) { LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.WRITE, descriptor.directory); - SSTableWriter writer = SSTableWriter.create(descriptor, keyCount, repairedAt, sstableLevel, header, txn); + SSTableMultiWriter writer = cfs.createSSTableMultiWriter(descriptor, keyCount, repairedAt, sstableLevel, header, txn); + return new SSTableTxnWriter(txn, writer); + } + + public static SSTableTxnWriter create(CFMetaData cfm, Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header) + { + if (Keyspace.open(cfm.ksName).hasColumnFamilyStore(cfm.cfId)) + { + ColumnFamilyStore cfs = Keyspace.open(cfm.ksName).getColumnFamilyStore(cfm.cfId); + return create(cfs, descriptor, keyCount, repairedAt, sstableLevel, header); + } + + // if the column family store does not exist, we create a new default SSTableMultiWriter to use: + LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.WRITE, descriptor.directory); + MetadataCollector collector = new MetadataCollector(cfm.comparator).sstableLevel(sstableLevel); + SSTableMultiWriter writer = SimpleSSTableMultiWriter.create(descriptor, keyCount, repairedAt, cfm, collector, header, txn); return new SSTableTxnWriter(txn, writer); } - public static SSTableTxnWriter create(String filename, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header) + public static SSTableTxnWriter create(ColumnFamilyStore cfs, String filename, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header) { Descriptor desc = Descriptor.fromFilename(filename); - return create(desc, keyCount, repairedAt, sstableLevel, header); + return create(cfs, desc, keyCount, repairedAt, sstableLevel, header); } - public static SSTableTxnWriter create(String filename, long keyCount, long repairedAt, SerializationHeader header) + public static SSTableTxnWriter create(ColumnFamilyStore cfs, String filename, long keyCount, long repairedAt, SerializationHeader header) { - return create(filename, keyCount, repairedAt, 0, header); + return create(cfs, filename, keyCount, repairedAt, 0, header); } }