Merge branch 'cassandra-3.0' into cassandra-3.5
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/587773fa Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/587773fa Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/587773fa Branch: refs/heads/trunk Commit: 587773fa478ff64aa46cf17760eb31d6f83fc46d Parents: e3716ee e8651b6 Author: Yuki Morishita <yu...@apache.org> Authored: Thu Mar 17 10:42:20 2016 -0500 Committer: Yuki Morishita <yu...@apache.org> Committed: Thu Mar 17 10:42:20 2016 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/ColumnFamilyStore.java | 2 + .../org/apache/cassandra/db/Directories.java | 30 + .../cassandra/db/SerializationHeader.java | 5 + .../org/apache/cassandra/db/Serializers.java | 114 ++-- .../columniterator/AbstractSSTableIterator.java | 4 +- .../EncryptedFileSegmentInputStream.java | 4 +- .../cassandra/hints/ChecksummedDataInput.java | 8 +- .../org/apache/cassandra/hints/HintMessage.java | 4 +- .../io/compress/CompressedSequentialWriter.java | 8 +- .../io/sstable/SSTableSimpleIterator.java | 11 +- .../sstable/format/RangeAwareSSTableWriter.java | 8 +- .../io/sstable/format/SSTableReader.java | 2 +- .../io/sstable/format/SSTableWriter.java | 2 +- .../io/sstable/format/big/BigTableWriter.java | 4 +- .../cassandra/io/util/BytesReadTracker.java | 30 + .../apache/cassandra/io/util/DataPosition.java | 21 + .../apache/cassandra/io/util/FileDataInput.java | 8 +- .../org/apache/cassandra/io/util/FileMark.java | 20 - .../io/util/FileSegmentInputStream.java | 12 +- .../cassandra/io/util/RandomAccessReader.java | 8 +- .../cassandra/io/util/RewindableDataInput.java | 30 + .../io/util/RewindableDataInputStreamPlus.java | 569 +++++++++++++++++++ .../cassandra/io/util/SequentialWriter.java | 6 +- .../cassandra/io/util/TrackedDataInputPlus.java | 150 +++++ .../cassandra/io/util/TrackedInputStream.java | 76 +++ .../cassandra/service/StorageService.java | 1 + .../cassandra/streaming/StreamReader.java | 85 ++- .../compress/CompressedStreamReader.java | 18 +- .../streaming/messages/FileMessageHeader.java | 4 +- .../apache/cassandra/tools/nodetool/Repair.java | 2 +- .../cassandra/utils/BytesReadTracker.java | 153 ----- .../cassandra/utils/CloseableIterator.java | 1 - ...acy_jb_clust_compact-jb-1-CompressionInfo.db | Bin 0 -> 83 bytes ..._tables-legacy_jb_clust_compact-jb-1-Data.db | Bin 0 -> 5270 bytes ...ables-legacy_jb_clust_compact-jb-1-Filter.db | Bin 0 -> 24 bytes ...tables-legacy_jb_clust_compact-jb-1-Index.db | Bin 0 -> 157685 bytes ...s-legacy_jb_clust_compact-jb-1-Statistics.db | Bin 0 -> 6791 bytes ...bles-legacy_jb_clust_compact-jb-1-Summary.db | Bin 0 -> 71 bytes ..._tables-legacy_jb_clust_compact-jb-1-TOC.txt | 7 + ...lust_counter_compact-jb-1-CompressionInfo.db | Bin 0 -> 75 bytes ...legacy_jb_clust_counter_compact-jb-1-Data.db | Bin 0 -> 4228 bytes ...gacy_jb_clust_counter_compact-jb-1-Filter.db | Bin 0 -> 24 bytes ...egacy_jb_clust_counter_compact-jb-1-Index.db | Bin 0 -> 157685 bytes ..._jb_clust_counter_compact-jb-1-Statistics.db | Bin 0 -> 6791 bytes ...acy_jb_clust_counter_compact-jb-1-Summary.db | Bin 0 -> 71 bytes ...legacy_jb_clust_counter_compact-jb-1-TOC.txt | 7 + ...cy_jb_simple_compact-jb-1-CompressionInfo.db | Bin 0 -> 43 bytes ...tables-legacy_jb_simple_compact-jb-1-Data.db | Bin 0 -> 108 bytes ...bles-legacy_jb_simple_compact-jb-1-Filter.db | Bin 0 -> 24 bytes ...ables-legacy_jb_simple_compact-jb-1-Index.db | Bin 0 -> 75 bytes ...-legacy_jb_simple_compact-jb-1-Statistics.db | Bin 0 -> 4395 bytes ...les-legacy_jb_simple_compact-jb-1-Summary.db | Bin 0 -> 71 bytes ...tables-legacy_jb_simple_compact-jb-1-TOC.txt | 7 + ...mple_counter_compact-jb-1-CompressionInfo.db | Bin 0 -> 43 bytes ...egacy_jb_simple_counter_compact-jb-1-Data.db | Bin 0 -> 118 bytes ...acy_jb_simple_counter_compact-jb-1-Filter.db | Bin 0 -> 24 bytes ...gacy_jb_simple_counter_compact-jb-1-Index.db | Bin 0 -> 75 bytes ...jb_simple_counter_compact-jb-1-Statistics.db | Bin 0 -> 4395 bytes ...cy_jb_simple_counter_compact-jb-1-Summary.db | Bin 0 -> 71 bytes ...egacy_jb_simple_counter_compact-jb-1-TOC.txt | 7 + ...acy_ka_clust_compact-ka-1-CompressionInfo.db | Bin 0 -> 83 bytes ..._tables-legacy_ka_clust_compact-ka-1-Data.db | Bin 0 -> 5277 bytes ...les-legacy_ka_clust_compact-ka-1-Digest.sha1 | 1 + ...ables-legacy_ka_clust_compact-ka-1-Filter.db | Bin 0 -> 24 bytes ...tables-legacy_ka_clust_compact-ka-1-Index.db | Bin 0 -> 157685 bytes ...s-legacy_ka_clust_compact-ka-1-Statistics.db | Bin 0 -> 6859 bytes ...bles-legacy_ka_clust_compact-ka-1-Summary.db | Bin 0 -> 83 bytes ..._tables-legacy_ka_clust_compact-ka-1-TOC.txt | 8 + ...lust_counter_compact-ka-1-CompressionInfo.db | Bin 0 -> 75 bytes ...legacy_ka_clust_counter_compact-ka-1-Data.db | Bin 0 -> 4527 bytes ...cy_ka_clust_counter_compact-ka-1-Digest.sha1 | 1 + ...gacy_ka_clust_counter_compact-ka-1-Filter.db | Bin 0 -> 24 bytes ...egacy_ka_clust_counter_compact-ka-1-Index.db | Bin 0 -> 157685 bytes ..._ka_clust_counter_compact-ka-1-Statistics.db | Bin 0 -> 6859 bytes ...acy_ka_clust_counter_compact-ka-1-Summary.db | Bin 0 -> 83 bytes ...legacy_ka_clust_counter_compact-ka-1-TOC.txt | 8 + ...cy_ka_simple_compact-ka-1-CompressionInfo.db | Bin 0 -> 43 bytes ...tables-legacy_ka_simple_compact-ka-1-Data.db | Bin 0 -> 105 bytes ...es-legacy_ka_simple_compact-ka-1-Digest.sha1 | 1 + ...bles-legacy_ka_simple_compact-ka-1-Filter.db | Bin 0 -> 24 bytes ...ables-legacy_ka_simple_compact-ka-1-Index.db | Bin 0 -> 75 bytes ...-legacy_ka_simple_compact-ka-1-Statistics.db | Bin 0 -> 4453 bytes ...les-legacy_ka_simple_compact-ka-1-Summary.db | Bin 0 -> 83 bytes ...tables-legacy_ka_simple_compact-ka-1-TOC.txt | 8 + ...mple_counter_compact-ka-1-CompressionInfo.db | Bin 0 -> 43 bytes ...egacy_ka_simple_counter_compact-ka-1-Data.db | Bin 0 -> 124 bytes ...y_ka_simple_counter_compact-ka-1-Digest.sha1 | 1 + ...acy_ka_simple_counter_compact-ka-1-Filter.db | Bin 0 -> 24 bytes ...gacy_ka_simple_counter_compact-ka-1-Index.db | Bin 0 -> 75 bytes ...ka_simple_counter_compact-ka-1-Statistics.db | Bin 0 -> 4453 bytes ...cy_ka_simple_counter_compact-ka-1-Summary.db | Bin 0 -> 83 bytes ...egacy_ka_simple_counter_compact-ka-1-TOC.txt | 8 + .../la-1-big-CompressionInfo.db | Bin 0 -> 83 bytes .../legacy_la_clust_compact/la-1-big-Data.db | Bin 0 -> 5286 bytes .../la-1-big-Digest.adler32 | 1 + .../legacy_la_clust_compact/la-1-big-Filter.db | Bin 0 -> 24 bytes .../legacy_la_clust_compact/la-1-big-Index.db | Bin 0 -> 157685 bytes .../la-1-big-Statistics.db | Bin 0 -> 6859 bytes .../legacy_la_clust_compact/la-1-big-Summary.db | Bin 0 -> 75 bytes .../legacy_la_clust_compact/la-1-big-TOC.txt | 8 + .../la-1-big-CompressionInfo.db | Bin 0 -> 75 bytes .../la-1-big-Data.db | Bin 0 -> 4527 bytes .../la-1-big-Digest.adler32 | 1 + .../la-1-big-Filter.db | Bin 0 -> 24 bytes .../la-1-big-Index.db | Bin 0 -> 157685 bytes .../la-1-big-Statistics.db | Bin 0 -> 6859 bytes .../la-1-big-Summary.db | Bin 0 -> 75 bytes .../la-1-big-TOC.txt | 8 + .../la-1-big-CompressionInfo.db | Bin 0 -> 43 bytes .../legacy_la_simple_compact/la-1-big-Data.db | Bin 0 -> 106 bytes .../la-1-big-Digest.adler32 | 1 + .../legacy_la_simple_compact/la-1-big-Filter.db | Bin 0 -> 24 bytes .../legacy_la_simple_compact/la-1-big-Index.db | Bin 0 -> 75 bytes .../la-1-big-Statistics.db | Bin 0 -> 4453 bytes .../la-1-big-Summary.db | Bin 0 -> 75 bytes .../legacy_la_simple_compact/la-1-big-TOC.txt | 8 + .../la-1-big-CompressionInfo.db | Bin 0 -> 43 bytes .../la-1-big-Data.db | Bin 0 -> 124 bytes .../la-1-big-Digest.adler32 | 1 + .../la-1-big-Filter.db | Bin 0 -> 24 bytes .../la-1-big-Index.db | Bin 0 -> 75 bytes .../la-1-big-Statistics.db | Bin 0 -> 4453 bytes .../la-1-big-Summary.db | Bin 0 -> 75 bytes .../la-1-big-TOC.txt | 8 + .../ma-1-big-CompressionInfo.db | Bin 0 -> 83 bytes .../legacy_ma_clust_compact/ma-1-big-Data.db | Bin 0 -> 5393 bytes .../ma-1-big-Digest.crc32 | 1 + .../legacy_ma_clust_compact/ma-1-big-Filter.db | Bin 0 -> 24 bytes .../legacy_ma_clust_compact/ma-1-big-Index.db | Bin 0 -> 157553 bytes .../ma-1-big-Statistics.db | Bin 0 -> 7046 bytes .../legacy_ma_clust_compact/ma-1-big-Summary.db | Bin 0 -> 47 bytes .../legacy_ma_clust_compact/ma-1-big-TOC.txt | 8 + .../ma-1-big-CompressionInfo.db | Bin 0 -> 75 bytes .../ma-1-big-Data.db | Bin 0 -> 4606 bytes .../ma-1-big-Digest.crc32 | 1 + .../ma-1-big-Filter.db | Bin 0 -> 24 bytes .../ma-1-big-Index.db | Bin 0 -> 157553 bytes .../ma-1-big-Statistics.db | Bin 0 -> 7055 bytes .../ma-1-big-Summary.db | Bin 0 -> 47 bytes .../ma-1-big-TOC.txt | 8 + .../ma-1-big-CompressionInfo.db | Bin 0 -> 43 bytes .../legacy_ma_simple_compact/ma-1-big-Data.db | Bin 0 -> 91 bytes .../ma-1-big-Digest.crc32 | 1 + .../legacy_ma_simple_compact/ma-1-big-Filter.db | Bin 0 -> 24 bytes .../legacy_ma_simple_compact/ma-1-big-Index.db | Bin 0 -> 26 bytes .../ma-1-big-Statistics.db | Bin 0 -> 4640 bytes .../ma-1-big-Summary.db | Bin 0 -> 47 bytes .../legacy_ma_simple_compact/ma-1-big-TOC.txt | 8 + .../ma-1-big-CompressionInfo.db | Bin 0 -> 43 bytes .../ma-1-big-Data.db | Bin 0 -> 114 bytes .../ma-1-big-Digest.crc32 | 1 + .../ma-1-big-Filter.db | Bin 0 -> 24 bytes .../ma-1-big-Index.db | Bin 0 -> 27 bytes .../ma-1-big-Statistics.db | Bin 0 -> 4649 bytes .../ma-1-big-Summary.db | Bin 0 -> 47 bytes .../ma-1-big-TOC.txt | 8 + .../cassandra/AbstractSerializationsTester.java | 1 - .../apache/cassandra/db/DirectoriesTest.java | 98 ++-- .../cassandra/gms/SerializationsTest.java | 1 - .../CompressedRandomAccessReaderTest.java | 6 +- .../CompressedSequentialWriterTest.java | 4 +- .../cassandra/io/sstable/LegacySSTableTest.java | 369 ++++++------ .../io/util/BufferedRandomAccessFileTest.java | 4 +- .../io/util/RandomAccessReaderTest.java | 2 +- .../util/RewindableDataInputStreamPlusTest.java | 539 ++++++++++++++++++ .../cassandra/utils/BytesReadTrackerTest.java | 104 +++- 167 files changed, 2115 insertions(+), 550 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/587773fa/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index a01e511,51cfc16..53dd292 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,6 -1,6 +1,7 @@@ -3.0.5 +3.5 +Merged from 3.0: + * Support streaming pre-3.0 sstables (CASSANDRA-10990) - * Add backpressure to compressed commit log (CASSANDRA-10971) + * Add backpressure to compressed or encrypted commit log (CASSANDRA-10971) * SSTableExport supports secondary index tables (CASSANDRA-11330) * Fix sstabledump to include missing info in debug output (CASSANDRA-11321) * Establish and implement canonical bulk reading workload(s) (CASSANDRA-10331) http://git-wip-us.apache.org/repos/asf/cassandra/blob/587773fa/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/587773fa/src/java/org/apache/cassandra/db/Directories.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/587773fa/src/java/org/apache/cassandra/db/Serializers.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/Serializers.java index 17f1de0,348fda3..cef06a3 --- a/src/java/org/apache/cassandra/db/Serializers.java +++ b/src/java/org/apache/cassandra/db/Serializers.java @@@ -46,62 -46,77 +46,77 @@@ public class Serializer // unecessary (since IndexInfo.Serializer won't depend on the metadata either). public ISerializer<ClusteringPrefix> indexEntryClusteringPrefixSerializer(final Version version, final SerializationHeader header) { - if (!version.storeRows()) + if (!version.storeRows() || header == null) //null header indicates streaming from pre-3.0 sstables { - return new ISerializer<ClusteringPrefix>() + return oldFormatSerializer(version); + } + + return newFormatSerializer(version, header); + } + + private ISerializer<ClusteringPrefix> oldFormatSerializer(final Version version) + { + return new ISerializer<ClusteringPrefix>() + { + SerializationHeader newHeader = SerializationHeader.makeWithoutStats(metadata); + + public void serialize(ClusteringPrefix clustering, DataOutputPlus out) throws IOException { - public void serialize(ClusteringPrefix clustering, DataOutputPlus out) throws IOException - { - // We should only use this for reading old sstable, never write new ones. - throw new UnsupportedOperationException(); - } + //we deserialize in the old format and serialize in the new format + ClusteringPrefix.serializer.serialize(clustering, out, + version.correspondingMessagingVersion(), + newHeader.clusteringTypes()); + } + + public ClusteringPrefix deserialize(DataInputPlus in) throws IOException + { + // We're reading the old cellname/composite + ByteBuffer bb = ByteBufferUtil.readWithShortLength(in); + assert bb.hasRemaining(); // empty cellnames were invalid + + int clusteringSize = metadata.clusteringColumns().size(); + // If the table has no clustering column, then the cellname will just be the "column" name, which we ignore here. + if (clusteringSize == 0) + return Clustering.EMPTY; + + if (!metadata.isCompound()) - return new Clustering(bb); ++ return Clustering.make(bb); - public ClusteringPrefix deserialize(DataInputPlus in) throws IOException + List<ByteBuffer> components = CompositeType.splitName(bb); + byte eoc = CompositeType.lastEOC(bb); + + if (eoc == 0 || components.size() >= clusteringSize) { - // We're reading the old cellname/composite - ByteBuffer bb = ByteBufferUtil.readWithShortLength(in); - assert bb.hasRemaining(); // empty cellnames were invalid - - int clusteringSize = metadata.clusteringColumns().size(); - // If the table has no clustering column, then the cellname will just be the "column" name, which we ignore here. - if (clusteringSize == 0) - return Clustering.EMPTY; - - if (!metadata.isCompound()) - return Clustering.make(bb); - - List<ByteBuffer> components = CompositeType.splitName(bb); - byte eoc = CompositeType.lastEOC(bb); - - if (eoc == 0 || components.size() >= clusteringSize) - { - // That's a clustering. - if (components.size() > clusteringSize) - components = components.subList(0, clusteringSize); - - return Clustering.make(components.toArray(new ByteBuffer[clusteringSize])); - } - else - { - // It's a range tombstone bound. It is a start since that's the only part we've ever included - // in the index entries. - Slice.Bound.Kind boundKind = eoc > 0 - ? Slice.Bound.Kind.EXCL_START_BOUND - : Slice.Bound.Kind.INCL_START_BOUND; - - return Slice.Bound.create(boundKind, components.toArray(new ByteBuffer[components.size()])); - } - } + // That's a clustering. + if (components.size() > clusteringSize) + components = components.subList(0, clusteringSize); - public long serializedSize(ClusteringPrefix clustering) - return new Clustering(components.toArray(new ByteBuffer[clusteringSize])); ++ return Clustering.make(components.toArray(new ByteBuffer[clusteringSize])); + } + else { - // We should only use this for reading old sstable, never write new ones. - throw new UnsupportedOperationException(); + // It's a range tombstone bound. It is a start since that's the only part we've ever included + // in the index entries. + Slice.Bound.Kind boundKind = eoc > 0 + ? Slice.Bound.Kind.EXCL_START_BOUND + : Slice.Bound.Kind.INCL_START_BOUND; + + return Slice.Bound.create(boundKind, components.toArray(new ByteBuffer[components.size()])); } - }; - } + } - return new ISerializer<ClusteringPrefix>() + public long serializedSize(ClusteringPrefix clustering) + { + return ClusteringPrefix.serializer.serializedSize(clustering, version.correspondingMessagingVersion(), + newHeader.clusteringTypes()); + } + }; + } + + + private ISerializer<ClusteringPrefix> newFormatSerializer(final Version version, final SerializationHeader header) + { + return new ISerializer<ClusteringPrefix>() //Reading and writing from/to the new sstable format { public void serialize(ClusteringPrefix clustering, DataOutputPlus out) throws IOException { @@@ -119,4 -134,5 +134,5 @@@ } }; } - } + -} ++} http://git-wip-us.apache.org/repos/asf/cassandra/blob/587773fa/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java index d55161b,0e2012e..7f2e3bb --- a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java +++ b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java @@@ -29,10 -30,10 +29,10 @@@ import org.apache.cassandra.io.sstable. import org.apache.cassandra.io.sstable.CorruptSSTableException; import org.apache.cassandra.io.sstable.IndexHelper; import org.apache.cassandra.io.util.FileDataInput; - import org.apache.cassandra.io.util.FileMark; + import org.apache.cassandra.io.util.DataPosition; import org.apache.cassandra.utils.ByteBufferUtil; -abstract class AbstractSSTableIterator implements SliceableUnfilteredRowIterator +abstract class AbstractSSTableIterator implements UnfilteredRowIterator { protected final SSTableReader sstable; protected final DecoratedKey key; http://git-wip-us.apache.org/repos/asf/cassandra/blob/587773fa/src/java/org/apache/cassandra/db/commitlog/EncryptedFileSegmentInputStream.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/commitlog/EncryptedFileSegmentInputStream.java index 6915196,0000000..56bb7d6 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/commitlog/EncryptedFileSegmentInputStream.java +++ b/src/java/org/apache/cassandra/db/commitlog/EncryptedFileSegmentInputStream.java @@@ -1,73 -1,0 +1,73 @@@ +package org.apache.cassandra.db.commitlog; + +import java.io.DataInput; +import java.nio.ByteBuffer; + ++import org.apache.cassandra.io.util.DataPosition; +import org.apache.cassandra.io.util.FileDataInput; - import org.apache.cassandra.io.util.FileMark; +import org.apache.cassandra.io.util.FileSegmentInputStream; + +/** + * Each segment of an encrypted file may contain many encrypted chunks, and each chunk needs to be individually decrypted + * to reconstruct the full segment. + */ +public class EncryptedFileSegmentInputStream extends FileSegmentInputStream implements FileDataInput, DataInput +{ + private final long segmentOffset; + private final int expectedLength; + private final ChunkProvider chunkProvider; + + /** + * offset the decrypted chunks already processed in this segment. + */ + private int totalChunkOffset; + + public EncryptedFileSegmentInputStream(String filePath, long segmentOffset, int position, int expectedLength, ChunkProvider chunkProvider) + { + super(chunkProvider.nextChunk(), filePath, position); + this.segmentOffset = segmentOffset; + this.expectedLength = expectedLength; + this.chunkProvider = chunkProvider; + } + + public interface ChunkProvider + { + /** + * Get the next chunk from the backing provider, if any chunks remain. + * @return Next chunk, else null if no more chunks remain. + */ + ByteBuffer nextChunk(); + } + + public long getFilePointer() + { + return segmentOffset + totalChunkOffset + buffer.position(); + } + + public boolean isEOF() + { + return totalChunkOffset + buffer.position() >= expectedLength; + } + + public long bytesRemaining() + { + return expectedLength - (totalChunkOffset + buffer.position()); + } + + public void seek(long position) + { + // implement this when we actually need it + throw new UnsupportedOperationException(); + } + - public long bytesPastMark(FileMark mark) ++ public long bytesPastMark(DataPosition mark) + { + throw new UnsupportedOperationException(); + } + + public void reBuffer() + { + totalChunkOffset += buffer.position(); + buffer = chunkProvider.nextChunk(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/587773fa/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/587773fa/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java index 674ed7f,0000000..9fcdfa4 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java @@@ -1,205 -1,0 +1,205 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.sstable.format; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.UUID; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Directories; +import org.apache.cassandra.db.PartitionPosition; +import org.apache.cassandra.db.SerializationHeader; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.service.StorageService; + +public class RangeAwareSSTableWriter implements SSTableMultiWriter +{ + private final List<PartitionPosition> boundaries; + private final Directories.DataDirectory[] directories; + private final int sstableLevel; + private final long estimatedKeys; + private final long repairedAt; + private final SSTableFormat.Type format; - private final SerializationHeader.Component header; ++ private final SerializationHeader header; + private final LifecycleTransaction txn; + private int currentIndex = -1; + public final ColumnFamilyStore cfs; + private final List<SSTableMultiWriter> finishedWriters = new ArrayList<>(); + private final List<SSTableReader> finishedReaders = new ArrayList<>(); + private SSTableMultiWriter currentWriter = null; + - public RangeAwareSSTableWriter(ColumnFamilyStore cfs, long estimatedKeys, long repairedAt, SSTableFormat.Type format, int sstableLevel, long totalSize, LifecycleTransaction txn, SerializationHeader.Component header) throws IOException ++ public RangeAwareSSTableWriter(ColumnFamilyStore cfs, long estimatedKeys, long repairedAt, SSTableFormat.Type format, int sstableLevel, long totalSize, LifecycleTransaction txn, SerializationHeader header) throws IOException + { + directories = cfs.getDirectories().getWriteableLocations(); + this.sstableLevel = sstableLevel; + this.cfs = cfs; + this.estimatedKeys = estimatedKeys / directories.length; + this.repairedAt = repairedAt; + this.format = format; + this.txn = txn; + this.header = header; + boundaries = StorageService.getDiskBoundaries(cfs, directories); + if (boundaries == null) + { + Directories.DataDirectory localDir = cfs.getDirectories().getWriteableLocation(totalSize); + if (localDir == null) + throw new IOException("Insufficient disk space to store " + totalSize + " bytes"); + Descriptor desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(localDir), format)); - currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, header.toHeader(cfs.metadata), txn); ++ currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, header, txn); + } + } + + private void maybeSwitchWriter(DecoratedKey key) + { + if (boundaries == null) + return; + + boolean switched = false; + while (currentIndex < 0 || key.compareTo(boundaries.get(currentIndex)) > 0) + { + switched = true; + currentIndex++; + } + + if (switched) + { + if (currentWriter != null) + finishedWriters.add(currentWriter); + + Descriptor desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(directories[currentIndex])), format); - currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, header.toHeader(cfs.metadata), txn); ++ currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, header, txn); + } + } + + public boolean append(UnfilteredRowIterator partition) + { + maybeSwitchWriter(partition.partitionKey()); + return currentWriter.append(partition); + } + + @Override + public Collection<SSTableReader> finish(long repairedAt, long maxDataAge, boolean openResult) + { + if (currentWriter != null) + finishedWriters.add(currentWriter); + currentWriter = null; + for (SSTableMultiWriter writer : finishedWriters) + { + if (writer.getFilePointer() > 0) + finishedReaders.addAll(writer.finish(repairedAt, maxDataAge, openResult)); + else + SSTableMultiWriter.abortOrDie(writer); + } + return finishedReaders; + } + + @Override + public Collection<SSTableReader> finish(boolean openResult) + { + if (currentWriter != null) + finishedWriters.add(currentWriter); + currentWriter = null; + for (SSTableMultiWriter writer : finishedWriters) + { + if (writer.getFilePointer() > 0) + finishedReaders.addAll(writer.finish(openResult)); + else + SSTableMultiWriter.abortOrDie(writer); + } + return finishedReaders; + } + + @Override + public Collection<SSTableReader> finished() + { + return finishedReaders; + } + + @Override + public SSTableMultiWriter setOpenResult(boolean openResult) + { + finishedWriters.forEach((w) -> w.setOpenResult(openResult)); + currentWriter.setOpenResult(openResult); + return this; + } + + public String getFilename() + { + return String.join("/", cfs.keyspace.getName(), cfs.getTableName()); + } + + @Override + public long getFilePointer() + { + return currentWriter.getFilePointer(); + } + + @Override + public UUID getCfId() + { + return currentWriter.getCfId(); + } + + @Override + public Throwable commit(Throwable accumulate) + { + if (currentWriter != null) + finishedWriters.add(currentWriter); + currentWriter = null; + for (SSTableMultiWriter writer : finishedWriters) + accumulate = writer.commit(accumulate); + return accumulate; + } + + @Override + public Throwable abort(Throwable accumulate) + { + if (currentWriter != null) + finishedWriters.add(currentWriter); + currentWriter = null; + for (SSTableMultiWriter finishedWriter : finishedWriters) + accumulate = finishedWriter.abort(accumulate); + + return accumulate; + } + + @Override + public void prepareToCommit() + { + if (currentWriter != null) + finishedWriters.add(currentWriter); + currentWriter = null; + finishedWriters.forEach(SSTableMultiWriter::prepareToCommit); + } + + @Override + public void close() + { + if (currentWriter != null) + finishedWriters.add(currentWriter); + currentWriter = null; + finishedWriters.forEach(SSTableMultiWriter::close); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/587773fa/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/587773fa/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java index ab38ba9,5f35029..6aaf776 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java @@@ -83,9 -80,8 +83,9 @@@ public abstract class SSTableWriter ext this.keyCount = keyCount; this.repairedAt = repairedAt; this.metadataCollector = metadataCollector; - this.header = header; + this.header = header != null ? header : SerializationHeader.makeWithoutStats(metadata); //null header indicates streaming from pre-3.0 sstable this.rowIndexEntrySerializer = descriptor.version.getSSTableFormat().getIndexSerializer(metadata, descriptor.version, header); + this.observers = observers == null ? Collections.emptySet() : observers; } public static SSTableWriter create(Descriptor descriptor, http://git-wip-us.apache.org/repos/asf/cassandra/blob/587773fa/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/587773fa/src/java/org/apache/cassandra/io/util/RandomAccessReader.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/587773fa/src/java/org/apache/cassandra/io/util/SequentialWriter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/587773fa/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/587773fa/src/java/org/apache/cassandra/streaming/StreamReader.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/streaming/StreamReader.java index 7348027,f8db26b..7d7cf8a --- a/src/java/org/apache/cassandra/streaming/StreamReader.java +++ b/src/java/org/apache/cassandra/streaming/StreamReader.java @@@ -35,15 -35,18 +35,18 @@@ import org.apache.cassandra.config.CFMe import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.*; import org.apache.cassandra.db.rows.*; -import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.SSTableMultiWriter; import org.apache.cassandra.io.sstable.SSTableSimpleIterator; +import org.apache.cassandra.io.sstable.format.RangeAwareSSTableWriter; import org.apache.cassandra.io.sstable.format.SSTableFormat; import org.apache.cassandra.io.sstable.format.Version; + import org.apache.cassandra.io.util.RewindableDataInputStreamPlus; import org.apache.cassandra.io.util.DataInputPlus; + import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.streaming.messages.FileMessageHeader; import org.apache.cassandra.utils.ByteBufferUtil; - import org.apache.cassandra.utils.BytesReadTracker; + import org.apache.cassandra.io.util.TrackedInputStream; + import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; @@@ -124,7 -129,7 +127,7 @@@ public class StreamReade { if (deserializer != null) logger.warn("[Stream {}] Error while reading partition {} from stream on ks='{}' and table='{}'.", -- session.planId(), deserializer.partitionKey(), cfs.keyspace.getName(), cfs.getColumnFamilyName()); ++ session.planId(), deserializer.partitionKey(), cfs.keyspace.getName(), cfs.getTableName()); if (writer != null) { writer.abort(e); @@@ -142,10 -157,9 +155,10 @@@ Directories.DataDirectory localDir = cfs.getDirectories().getWriteableLocation(totalSize); if (localDir == null) throw new IOException("Insufficient disk space to store " + totalSize + " bytes"); - desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(localDir), format)); - RangeAwareSSTableWriter writer = new RangeAwareSSTableWriter(cfs, estimatedKeys, repairedAt, format, sstableLevel, totalSize, session.getTransaction(cfId), header); - return cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, getHeader(cfs.metadata), session.getTransaction(cfId)); ++ RangeAwareSSTableWriter writer = new RangeAwareSSTableWriter(cfs, estimatedKeys, repairedAt, format, sstableLevel, totalSize, session.getTransaction(cfId), getHeader(cfs.metadata)); + StreamHook.instance.reportIncomingFile(cfs, writer, session, fileSeqNum); + return writer; } protected void drain(InputStream dis, long bytesRead) throws IOException http://git-wip-us.apache.org/repos/asf/cassandra/blob/587773fa/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java index 5a47787,9719587..318484f --- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java +++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java @@@ -24,9 -24,7 +24,8 @@@ import java.nio.channels.ReadableByteCh import com.google.common.base.Throwables; - import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.format.RangeAwareSSTableWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory;