Merge branch 'cassandra-3.0' into cassandra-3.11
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0918ba01 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0918ba01 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0918ba01 Branch: refs/heads/cassandra-3.11 Commit: 0918ba01176392deadf8655a61dad44979a49ee5 Parents: 6a449b8 3539a07 Author: Benedict Elliott Smith <bened...@apache.org> Authored: Fri Dec 7 12:27:08 2018 +0000 Committer: Benedict Elliott Smith <bened...@apache.org> Committed: Fri Dec 7 12:27:08 2018 +0000 ---------------------------------------------------------------------- .../apache/cassandra/db/ColumnFamilyStore.java | 8 ++-- .../compaction/AbstractCompactionStrategy.java | 5 ++- .../compaction/CompactionStrategyManager.java | 14 +++--- .../db/lifecycle/LifecycleNewTracker.java | 47 ++++++++++++++++++++ .../db/lifecycle/LifecycleTransaction.java | 7 ++- .../apache/cassandra/db/lifecycle/LogFile.java | 24 ++++------ .../cassandra/db/lifecycle/LogTransaction.java | 2 +- .../io/sstable/SimpleSSTableMultiWriter.java | 16 +++---- .../sstable/format/RangeAwareSSTableWriter.java | 12 ++--- .../io/sstable/format/SSTableWriter.java | 24 +++++----- .../io/sstable/format/big/BigFormat.java | 6 +-- .../io/sstable/format/big/BigTableWriter.java | 6 +-- .../cassandra/streaming/StreamReader.java | 6 +-- .../cassandra/streaming/StreamReceiveTask.java | 35 ++++++++++++++- .../cassandra/streaming/StreamSession.java | 4 +- .../unit/org/apache/cassandra/db/ScrubTest.java | 5 ++- 16 files changed, 150 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/0918ba01/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 13f0280,c455c4c..700c1cc --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@@ -507,15 -475,15 +507,15 @@@ public class ColumnFamilyStore implemen return directories; } - public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header, LifecycleTransaction txn) + public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header, LifecycleNewTracker lifecycleNewTracker) { MetadataCollector collector = new MetadataCollector(metadata.comparator).sstableLevel(sstableLevel); - return createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, txn); + return createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, lifecycleNewTracker); } - public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, MetadataCollector metadataCollector, SerializationHeader header, LifecycleTransaction txn) + public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, MetadataCollector metadataCollector, SerializationHeader header, LifecycleNewTracker lifecycleNewTracker) { - return getCompactionStrategyManager().createSSTableMultiWriter(descriptor, keyCount, repairedAt, metadataCollector, header, indexManager.listIndexes(), txn); - return getCompactionStrategyManager().createSSTableMultiWriter(descriptor, keyCount, repairedAt, metadataCollector, header, lifecycleNewTracker); ++ return getCompactionStrategyManager().createSSTableMultiWriter(descriptor, keyCount, repairedAt, metadataCollector, header, indexManager.listIndexes(), lifecycleNewTracker); } public boolean supportsEarlyOpen() http://git-wip-us.apache.org/repos/asf/cassandra/blob/0918ba01/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java index 8454147,9f07691..3d7800d --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java @@@ -26,7 -26,8 +26,8 @@@ import com.google.common.collect.Iterab import org.apache.cassandra.db.Directories; import org.apache.cassandra.db.SerializationHeader; +import org.apache.cassandra.index.Index; + import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.SSTableMultiWriter; import org.apache.cassandra.io.sstable.SimpleSSTableMultiWriter; @@@ -571,20 -514,9 +572,20 @@@ public abstract class AbstractCompactio return groupedSSTables; } - public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, MetadataCollector meta, SerializationHeader header, LifecycleNewTracker lifecycleNewTracker) + public CompactionLogger.Strategy strategyLogger() + { + return CompactionLogger.Strategy.none; + } + + public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, + long keyCount, + long repairedAt, + MetadataCollector meta, + SerializationHeader header, + Collection<Index> indexes, - LifecycleTransaction txn) ++ LifecycleNewTracker lifecycleNewTracker) { - return SimpleSSTableMultiWriter.create(descriptor, keyCount, repairedAt, cfs.metadata, meta, header, indexes, txn); - return SimpleSSTableMultiWriter.create(descriptor, keyCount, repairedAt, cfs.metadata, meta, header, lifecycleNewTracker); ++ return SimpleSSTableMultiWriter.create(descriptor, keyCount, repairedAt, cfs.metadata, meta, header, indexes, lifecycleNewTracker); } public boolean supportsEarlyOpen() http://git-wip-us.apache.org/repos/asf/cassandra/blob/0918ba01/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java index a50f428,1d3d18c..86170a1 --- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java @@@ -20,30 -20,20 +20,30 @@@ package org.apache.cassandra.db.compact import java.util.*; import java.util.concurrent.Callable; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Iterables; - - import org.apache.cassandra.db.DiskBoundaries; - import org.apache.cassandra.db.Memtable; - import org.apache.cassandra.index.Index; +import com.google.common.primitives.Ints; + import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Directories; ++import org.apache.cassandra.db.DiskBoundaries; + import org.apache.cassandra.db.Memtable; import org.apache.cassandra.db.SerializationHeader; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.db.lifecycle.SSTableSet; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; ++import org.apache.cassandra.index.Index; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.SSTableMultiWriter; import org.apache.cassandra.io.sstable.format.SSTableReader; @@@ -1010,43 -490,15 +1010,43 @@@ public class CompactionStrategyManager return Boolean.parseBoolean(params.options().get(AbstractCompactionStrategy.ONLY_PURGE_REPAIRED_TOMBSTONES)); } - public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, MetadataCollector collector, SerializationHeader header, LifecycleNewTracker lifecycleNewTracker) + public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, + long keyCount, + long repairedAt, + MetadataCollector collector, + SerializationHeader header, + Collection<Index> indexes, - LifecycleTransaction txn) ++ LifecycleNewTracker lifecycleNewTracker) { - if (repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE) + maybeReloadDiskBoundaries(); + readLock.lock(); + try { - return unrepaired.createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, lifecycleNewTracker); + if (repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE) + { - return unrepaired.get(0).createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, indexes, txn); ++ return unrepaired.get(0).createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, indexes, lifecycleNewTracker); + } + else + { - return repaired.get(0).createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, indexes, txn); ++ return repaired.get(0).createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, indexes, lifecycleNewTracker); + } } - else + finally + { + readLock.unlock(); + } + } + + public boolean isRepaired(AbstractCompactionStrategy strategy) + { + readLock.lock(); + try + { + return repaired.contains(strategy); + } + finally { - return repaired.createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, lifecycleNewTracker); + readLock.unlock(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0918ba01/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/0918ba01/src/java/org/apache/cassandra/db/lifecycle/LogFile.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/0918ba01/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/0918ba01/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java index 2217ae2,ded070e..76e4dbb --- a/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java @@@ -25,9 -25,8 +25,9 @@@ import java.util.UUID import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.RowIndexEntry; import org.apache.cassandra.db.SerializationHeader; - import org.apache.cassandra.db.lifecycle.LifecycleTransaction; + import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.index.Index; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.format.SSTableWriter; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; @@@ -35,11 -34,9 +35,11 @@@ public class SimpleSSTableMultiWriter implements SSTableMultiWriter { private final SSTableWriter writer; - private final LifecycleTransaction txn; ++ private final LifecycleNewTracker lifecycleNewTracker; - protected SimpleSSTableMultiWriter(SSTableWriter writer, LifecycleTransaction txn) - protected SimpleSSTableMultiWriter(SSTableWriter writer) ++ protected SimpleSSTableMultiWriter(SSTableWriter writer, LifecycleNewTracker lifecycleNewTracker) { - this.txn = txn; ++ this.lifecycleNewTracker = lifecycleNewTracker; this.writer = writer; } @@@ -92,7 -89,6 +92,7 @@@ public Throwable abort(Throwable accumulate) { - txn.untrackNew(writer); ++ lifecycleNewTracker.untrackNew(writer); return writer.abort(accumulate); } @@@ -113,10 -109,9 +113,10 @@@ CFMetaData cfm, MetadataCollector metadataCollector, SerializationHeader header, + Collection<Index> indexes, - LifecycleTransaction txn) + LifecycleNewTracker lifecycleNewTracker) { - SSTableWriter writer = SSTableWriter.create(descriptor, keyCount, repairedAt, cfm, metadataCollector, header, indexes, txn); - return new SimpleSSTableMultiWriter(writer, txn); - SSTableWriter writer = SSTableWriter.create(descriptor, keyCount, repairedAt, cfm, metadataCollector, header, lifecycleNewTracker); - return new SimpleSSTableMultiWriter(writer); ++ SSTableWriter writer = SSTableWriter.create(descriptor, keyCount, repairedAt, cfm, metadataCollector, header, indexes, lifecycleNewTracker); ++ return new SimpleSSTableMultiWriter(writer, lifecycleNewTracker); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0918ba01/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java index 353aacb,0000000..3358225 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java @@@ -1,208 -1,0 +1,208 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.sstable.format; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.UUID; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Directories; +import org.apache.cassandra.db.DiskBoundaries; +import org.apache.cassandra.db.PartitionPosition; +import org.apache.cassandra.db.SerializationHeader; - import org.apache.cassandra.db.lifecycle.LifecycleTransaction; ++import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.utils.FBUtilities; + +public class RangeAwareSSTableWriter implements SSTableMultiWriter +{ + private final List<PartitionPosition> boundaries; + private final List<Directories.DataDirectory> directories; + private final int sstableLevel; + private final long estimatedKeys; + private final long repairedAt; + private final SSTableFormat.Type format; + private final SerializationHeader header; - private final LifecycleTransaction txn; ++ private final LifecycleNewTracker lifecycleNewTracker; + private int currentIndex = -1; + public final ColumnFamilyStore cfs; + private final List<SSTableMultiWriter> finishedWriters = new ArrayList<>(); + private final List<SSTableReader> finishedReaders = new ArrayList<>(); + private SSTableMultiWriter currentWriter = null; + - public RangeAwareSSTableWriter(ColumnFamilyStore cfs, long estimatedKeys, long repairedAt, SSTableFormat.Type format, int sstableLevel, long totalSize, LifecycleTransaction txn, SerializationHeader header) throws IOException ++ public RangeAwareSSTableWriter(ColumnFamilyStore cfs, long estimatedKeys, long repairedAt, SSTableFormat.Type format, int sstableLevel, long totalSize, LifecycleNewTracker lifecycleNewTracker, SerializationHeader header) throws IOException + { + DiskBoundaries db = cfs.getDiskBoundaries(); + directories = db.directories; + this.sstableLevel = sstableLevel; + this.cfs = cfs; + this.estimatedKeys = estimatedKeys / directories.size(); + this.repairedAt = repairedAt; + this.format = format; - this.txn = txn; ++ this.lifecycleNewTracker = lifecycleNewTracker; + this.header = header; + boundaries = db.positions; + if (boundaries == null) + { + Directories.DataDirectory localDir = cfs.getDirectories().getWriteableLocation(totalSize); + if (localDir == null) + throw new IOException(String.format("Insufficient disk space to store %s", + FBUtilities.prettyPrintMemory(totalSize))); + Descriptor desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(localDir), format)); - currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, header, txn); ++ currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, header, lifecycleNewTracker); + } + } + + private void maybeSwitchWriter(DecoratedKey key) + { + if (boundaries == null) + return; + + boolean switched = false; + while (currentIndex < 0 || key.compareTo(boundaries.get(currentIndex)) > 0) + { + switched = true; + currentIndex++; + } + + if (switched) + { + if (currentWriter != null) + finishedWriters.add(currentWriter); + + Descriptor desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(directories.get(currentIndex))), format); - currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, header, txn); ++ currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, header, lifecycleNewTracker); + } + } + + public boolean append(UnfilteredRowIterator partition) + { + maybeSwitchWriter(partition.partitionKey()); + return currentWriter.append(partition); + } + + @Override + public Collection<SSTableReader> finish(long repairedAt, long maxDataAge, boolean openResult) + { + if (currentWriter != null) + finishedWriters.add(currentWriter); + currentWriter = null; + for (SSTableMultiWriter writer : finishedWriters) + { + if (writer.getFilePointer() > 0) + finishedReaders.addAll(writer.finish(repairedAt, maxDataAge, openResult)); + else + SSTableMultiWriter.abortOrDie(writer); + } + return finishedReaders; + } + + @Override + public Collection<SSTableReader> finish(boolean openResult) + { + if (currentWriter != null) + finishedWriters.add(currentWriter); + currentWriter = null; + for (SSTableMultiWriter writer : finishedWriters) + { + if (writer.getFilePointer() > 0) + finishedReaders.addAll(writer.finish(openResult)); + else + SSTableMultiWriter.abortOrDie(writer); + } + return finishedReaders; + } + + @Override + public Collection<SSTableReader> finished() + { + return finishedReaders; + } + + @Override + public SSTableMultiWriter setOpenResult(boolean openResult) + { + finishedWriters.forEach((w) -> w.setOpenResult(openResult)); + currentWriter.setOpenResult(openResult); + return this; + } + + public String getFilename() + { + return String.join("/", cfs.keyspace.getName(), cfs.getTableName()); + } + + @Override + public long getFilePointer() + { + return currentWriter.getFilePointer(); + } + + @Override + public UUID getCfId() + { + return currentWriter.getCfId(); + } + + @Override + public Throwable commit(Throwable accumulate) + { + if (currentWriter != null) + finishedWriters.add(currentWriter); + currentWriter = null; + for (SSTableMultiWriter writer : finishedWriters) + accumulate = writer.commit(accumulate); + return accumulate; + } + + @Override + public Throwable abort(Throwable accumulate) + { + if (currentWriter != null) + finishedWriters.add(currentWriter); + currentWriter = null; + for (SSTableMultiWriter finishedWriter : finishedWriters) + accumulate = finishedWriter.abort(accumulate); + + return accumulate; + } + + @Override + public void prepareToCommit() + { + if (currentWriter != null) + finishedWriters.add(currentWriter); + currentWriter = null; + finishedWriters.forEach(SSTableMultiWriter::prepareToCommit); + } + + @Override + public void close() + { + if (currentWriter != null) + finishedWriters.add(currentWriter); + currentWriter = null; + finishedWriters.forEach(SSTableMultiWriter::close); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/0918ba01/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java index 9fb5f7c,fcc23a2..e320f30 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java @@@ -29,11 -30,8 +29,11 @@@ import org.apache.cassandra.config.Data import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.RowIndexEntry; import org.apache.cassandra.db.SerializationHeader; +import org.apache.cassandra.db.compaction.OperationType; - import org.apache.cassandra.db.lifecycle.LifecycleTransaction; + import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.index.Index; +import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.SSTable; @@@ -95,23 -90,16 +95,23 @@@ public abstract class SSTableWriter ext CFMetaData metadata, MetadataCollector metadataCollector, SerializationHeader header, + Collection<Index> indexes, - LifecycleTransaction txn) + LifecycleNewTracker lifecycleNewTracker) { Factory writerFactory = descriptor.getFormat().getWriterFactory(); - return writerFactory.open(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, observers(descriptor, indexes, txn.opType()), txn); - return writerFactory.open(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, lifecycleNewTracker); ++ return writerFactory.open(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, observers(descriptor, indexes, lifecycleNewTracker.opType()), lifecycleNewTracker); } - public static SSTableWriter create(Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header, LifecycleNewTracker lifecycleNewTracker) + public static SSTableWriter create(Descriptor descriptor, + long keyCount, + long repairedAt, + int sstableLevel, + SerializationHeader header, + Collection<Index> indexes, - LifecycleTransaction txn) ++ LifecycleNewTracker lifecycleNewTracker) { CFMetaData metadata = Schema.instance.getCFMetaData(descriptor); - return create(metadata, descriptor, keyCount, repairedAt, sstableLevel, header, indexes, txn); - return create(metadata, descriptor, keyCount, repairedAt, sstableLevel, header, lifecycleNewTracker); ++ return create(metadata, descriptor, keyCount, repairedAt, sstableLevel, header, indexes, lifecycleNewTracker); } public static SSTableWriter create(CFMetaData metadata, @@@ -120,34 -108,21 +120,34 @@@ long repairedAt, int sstableLevel, SerializationHeader header, + Collection<Index> indexes, - LifecycleTransaction txn) + LifecycleNewTracker lifecycleNewTracker) { MetadataCollector collector = new MetadataCollector(metadata.comparator).sstableLevel(sstableLevel); - return create(descriptor, keyCount, repairedAt, metadata, collector, header, indexes, txn); - return create(descriptor, keyCount, repairedAt, metadata, collector, header, lifecycleNewTracker); ++ return create(descriptor, keyCount, repairedAt, metadata, collector, header, indexes, lifecycleNewTracker); } - public static SSTableWriter create(String filename, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header, LifecycleNewTracker lifecycleNewTracker) + public static SSTableWriter create(String filename, + long keyCount, + long repairedAt, + int sstableLevel, + SerializationHeader header, + Collection<Index> indexes, - LifecycleTransaction txn) ++ LifecycleNewTracker lifecycleNewTracker) { - return create(Descriptor.fromFilename(filename), keyCount, repairedAt, sstableLevel, header, indexes, txn); - return create(Descriptor.fromFilename(filename), keyCount, repairedAt, sstableLevel, header, lifecycleNewTracker); ++ return create(Descriptor.fromFilename(filename), keyCount, repairedAt, sstableLevel, header, indexes, lifecycleNewTracker); } @VisibleForTesting - public static SSTableWriter create(String filename, long keyCount, long repairedAt, SerializationHeader header, LifecycleNewTracker lifecycleNewTracker) + public static SSTableWriter create(String filename, + long keyCount, + long repairedAt, + SerializationHeader header, + Collection<Index> indexes, - LifecycleTransaction txn) ++ LifecycleNewTracker lifecycleNewTracker) { - return create(Descriptor.fromFilename(filename), keyCount, repairedAt, 0, header, lifecycleNewTracker); + Descriptor descriptor = Descriptor.fromFilename(filename); - return create(descriptor, keyCount, repairedAt, 0, header, indexes, txn); ++ return create(descriptor, keyCount, repairedAt, 0, header, indexes, lifecycleNewTracker); } private static Set<Component> components(CFMetaData metadata) @@@ -344,7 -285,6 +344,7 @@@ CFMetaData metadata, MetadataCollector metadataCollector, SerializationHeader header, + Collection<SSTableFlushObserver> observers, - LifecycleTransaction txn); + LifecycleNewTracker lifecycleNewTracker); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0918ba01/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java index b62cb11,360ef8a..9af7dc0 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java @@@ -23,10 -22,13 +23,10 @@@ import java.util.Set import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.RowIndexEntry; import org.apache.cassandra.db.SerializationHeader; - import org.apache.cassandra.db.lifecycle.LifecycleTransaction; + import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; -import org.apache.cassandra.io.sstable.format.SSTableFormat; -import org.apache.cassandra.io.sstable.format.SSTableReader; -import org.apache.cassandra.io.sstable.format.SSTableWriter; -import org.apache.cassandra.io.sstable.format.Version; +import org.apache.cassandra.io.sstable.format.*; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; import org.apache.cassandra.io.sstable.metadata.StatsMetadata; import org.apache.cassandra.net.MessagingService; @@@ -86,10 -88,9 +86,10 @@@ public class BigFormat implements SSTab CFMetaData metadata, MetadataCollector metadataCollector, SerializationHeader header, + Collection<SSTableFlushObserver> observers, - LifecycleTransaction txn) + LifecycleNewTracker lifecycleNewTracker) { - return new BigTableWriter(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, observers, txn); - return new BigTableWriter(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, lifecycleNewTracker); ++ return new BigTableWriter(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, observers, lifecycleNewTracker); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0918ba01/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java index c3139a3,f733619..9083cd3 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java @@@ -17,31 -17,23 +17,31 @@@ */ package org.apache.cassandra.io.sstable.format.big; -import java.io.*; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collection; import java.util.Map; - -import org.apache.cassandra.db.*; -import org.apache.cassandra.db.transform.Transformation; -import org.apache.cassandra.io.sstable.*; -import org.apache.cassandra.io.sstable.format.SSTableReader; -import org.apache.cassandra.io.sstable.format.SSTableWriter; +import java.util.Optional; + import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import org.apache.cassandra.cache.ChunkCache; import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.*; - import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.transform.Transformation; import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.io.compress.CompressedSequentialWriter; +import org.apache.cassandra.io.sstable.*; +import org.apache.cassandra.io.sstable.format.SSTableFlushObserver; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.format.SSTableWriter; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; import org.apache.cassandra.io.sstable.metadata.MetadataComponent; import org.apache.cassandra.io.sstable.metadata.MetadataType; @@@ -60,25 -56,17 +60,25 @@@ public class BigTableWriter extends SST protected final SequentialWriter dataFile; private DecoratedKey lastWrittenKey; private DataPosition dataMark; - - public BigTableWriter(Descriptor descriptor, - Long keyCount, - Long repairedAt, - CFMetaData metadata, + private long lastEarlyOpenLength = 0; + private final Optional<ChunkCache> chunkCache = Optional.ofNullable(ChunkCache.instance); + + private final SequentialWriterOption writerOption = SequentialWriterOption.newBuilder() + .trickleFsync(DatabaseDescriptor.getTrickleFsync()) + .trickleFsyncByteInterval(DatabaseDescriptor.getTrickleFsyncIntervalInKb() * 1024) + .build(); + + public BigTableWriter(Descriptor descriptor, + long keyCount, + long repairedAt, + CFMetaData metadata, MetadataCollector metadataCollector, SerializationHeader header, + Collection<SSTableFlushObserver> observers, - LifecycleTransaction txn) + LifecycleNewTracker lifecycleNewTracker) { - super(descriptor, keyCount, repairedAt, metadata, metadataCollector, header); + super(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, observers); - txn.trackNew(this); // must track before any files are created + lifecycleNewTracker.trackNew(this); // must track before any files are created if (compression) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/0918ba01/src/java/org/apache/cassandra/streaming/StreamReader.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/streaming/StreamReader.java index 6465bf7,07278cb..dbd5a4a --- a/src/java/org/apache/cassandra/streaming/StreamReader.java +++ b/src/java/org/apache/cassandra/streaming/StreamReader.java @@@ -34,10 -34,11 +34,11 @@@ import com.ning.compress.lzf.LZFInputSt import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.*; + import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; import org.apache.cassandra.db.rows.*; -import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.SSTableMultiWriter; import org.apache.cassandra.io.sstable.SSTableSimpleIterator; +import org.apache.cassandra.io.sstable.format.RangeAwareSSTableWriter; import org.apache.cassandra.io.sstable.format.SSTableFormat; import org.apache.cassandra.io.sstable.format.Version; import org.apache.cassandra.io.util.RewindableDataInputStreamPlus; @@@ -151,11 -152,11 +150,12 @@@ public class StreamReade { Directories.DataDirectory localDir = cfs.getDirectories().getWriteableLocation(totalSize); if (localDir == null) - throw new IOException("Insufficient disk space to store " + totalSize + " bytes"); - desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(localDir), format)); + throw new IOException(String.format("Insufficient disk space to store %s", FBUtilities.prettyPrintMemory(totalSize))); - RangeAwareSSTableWriter writer = new RangeAwareSSTableWriter(cfs, estimatedKeys, repairedAt, format, sstableLevel, totalSize, session.getTransaction(cfId), getHeader(cfs.metadata)); - return cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, getHeader(cfs.metadata), - session.getReceivingTask(cfId).createLifecycleNewTracker()); ++ LifecycleNewTracker lifecycleNewTracker = session.getReceivingTask(cfId).createLifecycleNewTracker(); ++ RangeAwareSSTableWriter writer = new RangeAwareSSTableWriter(cfs, estimatedKeys, repairedAt, format, sstableLevel, totalSize, lifecycleNewTracker, getHeader(cfs.metadata)); + StreamHook.instance.reportIncomingFile(cfs, writer, session, fileSeqNum); + return writer; } protected long totalSize() http://git-wip-us.apache.org/repos/asf/cassandra/blob/0918ba01/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/0918ba01/src/java/org/apache/cassandra/streaming/StreamSession.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/streaming/StreamSession.java index 736d30f,c79a711..a426207 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@@ -223,18 -208,12 +223,18 @@@ public class StreamSession implements I } - public LifecycleTransaction getTransaction(UUID cfId) + StreamReceiveTask getReceivingTask(UUID cfId) { assert receivers.containsKey(cfId); - return receivers.get(cfId).getTransaction(); + return receivers.get(cfId); } + private boolean isKeepAliveSupported() + { + CassandraVersion peerVersion = Gossiper.instance.getReleaseVersion(peer); + return STREAM_KEEP_ALIVE.isSupportedBy(peerVersion); + } + /** * Bind this session to report to specific {@link StreamResultFuture} and * perform pre-streaming initialization. http://git-wip-us.apache.org/repos/asf/cassandra/blob/0918ba01/test/unit/org/apache/cassandra/db/ScrubTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/ScrubTest.java index 56ac4ba,757add9..7117df1 --- a/test/unit/org/apache/cassandra/db/ScrubTest.java +++ b/test/unit/org/apache/cassandra/db/ScrubTest.java @@@ -644,9 -642,9 +645,9 @@@ public class ScrubTes private static class TestMultiWriter extends SimpleSSTableMultiWriter { - TestMultiWriter(SSTableWriter writer, LifecycleTransaction txn) - TestMultiWriter(SSTableWriter writer) ++ TestMultiWriter(SSTableWriter writer, LifecycleNewTracker lifecycleNewTracker) { - super(writer, txn); - super(writer); ++ super(writer, lifecycleNewTracker); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org