http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java index ce42126..ad0f3c9 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java @@ -54,18 +54,12 @@ public abstract class SSTableSimpleIterator extends AbstractIterator<Unfiltered> public static SSTableSimpleIterator create(CFMetaData metadata, DataInputPlus in, SerializationHeader header, SerializationHelper helper, DeletionTime partitionDeletion) { - if (helper.version < MessagingService.VERSION_30) - return new OldFormatIterator(metadata, in, helper, partitionDeletion); - else - return new CurrentFormatIterator(metadata, in, header, helper); + return new CurrentFormatIterator(metadata, in, header, helper); } public static SSTableSimpleIterator createTombstoneOnly(CFMetaData metadata, DataInputPlus in, SerializationHeader header, SerializationHelper helper, DeletionTime partitionDeletion) { - if (helper.version < MessagingService.VERSION_30) - return new OldFormatTombstoneIterator(metadata, in, helper, partitionDeletion); - else - return new CurrentFormatTombstoneIterator(metadata, in, header, helper); + return new CurrentFormatTombstoneIterator(metadata, in, header, helper); } public abstract Row readStaticRow() throws IOException; @@ -136,106 +130,4 @@ public abstract class SSTableSimpleIterator extends AbstractIterator<Unfiltered> } } } - - private static class OldFormatIterator extends SSTableSimpleIterator - { - private final UnfilteredDeserializer deserializer; - - private OldFormatIterator(CFMetaData metadata, DataInputPlus in, SerializationHelper helper, DeletionTime partitionDeletion) - { - super(metadata, in, helper); - // We use an UnfilteredDeserializer because even though we don't need all it's fanciness, it happens to handle all - // the details we need for reading the old format. - this.deserializer = UnfilteredDeserializer.create(metadata, in, null, helper, partitionDeletion, false); - } - - public Row readStaticRow() throws IOException - { - if (metadata.isCompactTable()) - { - // For static compact tables, in the old format, static columns are intermingled with the other columns, so we - // need to extract them. Which imply 2 passes (one to extract the static, then one for other value). - if (metadata.isStaticCompactTable()) - { - assert in instanceof RewindableDataInput; - RewindableDataInput file = (RewindableDataInput)in; - DataPosition mark = file.mark(); - Row staticRow = LegacyLayout.extractStaticColumns(metadata, file, metadata.partitionColumns().statics); - file.reset(mark); - - // We've extracted the static columns, so we must ignore them on the 2nd pass - ((UnfilteredDeserializer.OldFormatDeserializer)deserializer).setSkipStatic(); - return staticRow; - } - else - { - return Rows.EMPTY_STATIC_ROW; - } - } - - return deserializer.hasNext() && deserializer.nextIsStatic() - ? (Row)deserializer.readNext() - : Rows.EMPTY_STATIC_ROW; - - } - - protected Unfiltered computeNext() - { - while (true) - { - try - { - if (!deserializer.hasNext()) - return endOfData(); - - Unfiltered unfiltered = deserializer.readNext(); - if (metadata.isStaticCompactTable() && unfiltered.kind() == Unfiltered.Kind.ROW) - { - Row row = (Row) unfiltered; - ColumnDefinition def = metadata.getColumnDefinition(LegacyLayout.encodeClustering(metadata, row.clustering())); - if (def != null && def.isStatic()) - continue; - } - return unfiltered; - } - catch (IOException e) - { - throw new IOError(e); - } - } - } - - } - - private static class OldFormatTombstoneIterator extends OldFormatIterator - { - private OldFormatTombstoneIterator(CFMetaData metadata, DataInputPlus in, SerializationHelper helper, DeletionTime partitionDeletion) - { - super(metadata, in, helper, partitionDeletion); - } - - public Row readStaticRow() throws IOException - { - Row row = super.readStaticRow(); - if (!row.deletion().isLive()) - return BTreeRow.emptyDeletedRow(row.clustering(), row.deletion()); - return Rows.EMPTY_STATIC_ROW; - } - - protected Unfiltered computeNext() - { - while (true) - { - Unfiltered unfiltered = super.computeNext(); - if (unfiltered == null || unfiltered.isRangeTombstoneMarker()) - return unfiltered; - - Row row = (Row) unfiltered; - if (!row.deletion().isLive()) - return BTreeRow.emptyDeletedRow(row.clustering(), row.deletion()); - // Otherwise read next. - } - } - - } }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java index 015c5bb..323b1bd 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java @@ -148,14 +148,8 @@ public class SSTableTxnWriter extends Transactional.AbstractTransactional implem return new SSTableTxnWriter(txn, writer); } - public static SSTableTxnWriter create(ColumnFamilyStore cfs, String filename, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header) + public static SSTableTxnWriter create(ColumnFamilyStore cfs, Descriptor desc, long keyCount, long repairedAt, SerializationHeader header) { - Descriptor desc = Descriptor.fromFilename(filename); - return create(cfs, desc, keyCount, repairedAt, sstableLevel, header); - } - - public static SSTableTxnWriter create(ColumnFamilyStore cfs, String filename, long keyCount, long repairedAt, SerializationHeader header) - { - return create(cfs, filename, keyCount, repairedAt, 0, header); + return create(cfs, desc, keyCount, repairedAt, 0, header); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java index 3665da7..89c064b 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java @@ -68,7 +68,7 @@ public class RangeAwareSSTableWriter implements SSTableMultiWriter if (localDir == null) throw new IOException(String.format("Insufficient disk space to store %s", FBUtilities.prettyPrintMemory(totalSize))); - Descriptor desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(localDir), format)); + Descriptor desc = cfs.newSSTableDescriptor(cfs.getDirectories().getLocationForDisk(localDir), format); currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, header, txn); } } @@ -90,7 +90,7 @@ public class RangeAwareSSTableWriter implements SSTableMultiWriter if (currentWriter != null) finishedWriters.add(currentWriter); - Descriptor desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(directories[currentIndex])), format); + Descriptor desc = cfs.newSSTableDescriptor(cfs.getDirectories().getLocationForDisk(directories[currentIndex]), format); currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, header, txn); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java index 4391946..29e29ef 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java @@ -41,10 +41,6 @@ public interface SSTableFormat public static enum Type { - //Used internally to refer to files with no - //format flag in the filename - LEGACY("big", BigFormat.instance), - //The original sstable format BIG("big", BigFormat.instance); @@ -70,10 +66,6 @@ public interface SSTableFormat { for (Type valid : Type.values()) { - //This is used internally for old sstables - if (valid == LEGACY) - continue; - if (valid.name.equalsIgnoreCase(name)) return valid; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java index 1a2e1b0..add8ddc 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java @@ -253,58 +253,48 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS { long count = -1; - // check if cardinality estimator is available for all SSTables - boolean cardinalityAvailable = !Iterables.isEmpty(sstables) && Iterables.all(sstables, new Predicate<SSTableReader>() - { - public boolean apply(SSTableReader sstable) - { - return sstable.descriptor.version.hasNewStatsFile(); - } - }); + if (Iterables.isEmpty(sstables)) + return count; - // if it is, load them to estimate key count - if (cardinalityAvailable) + boolean failed = false; + ICardinality cardinality = null; + for (SSTableReader sstable : sstables) { - boolean failed = false; - ICardinality cardinality = null; - for (SSTableReader sstable : sstables) - { - if (sstable.openReason == OpenReason.EARLY) - continue; - - try - { - CompactionMetadata metadata = (CompactionMetadata) sstable.descriptor.getMetadataSerializer().deserialize(sstable.descriptor, MetadataType.COMPACTION); - // If we can't load the CompactionMetadata, we are forced to estimate the keys using the index - // summary. (CASSANDRA-10676) - if (metadata == null) - { - logger.warn("Reading cardinality from Statistics.db failed for {}", sstable.getFilename()); - failed = true; - break; - } + if (sstable.openReason == OpenReason.EARLY) + continue; - if (cardinality == null) - cardinality = metadata.cardinalityEstimator; - else - cardinality = cardinality.merge(metadata.cardinalityEstimator); - } - catch (IOException e) - { - logger.warn("Reading cardinality from Statistics.db failed.", e); - failed = true; - break; - } - catch (CardinalityMergeException e) + try + { + CompactionMetadata metadata = (CompactionMetadata) sstable.descriptor.getMetadataSerializer().deserialize(sstable.descriptor, MetadataType.COMPACTION); + // If we can't load the CompactionMetadata, we are forced to estimate the keys using the index + // summary. (CASSANDRA-10676) + if (metadata == null) { - logger.warn("Cardinality merge failed.", e); + logger.warn("Reading cardinality from Statistics.db failed for {}", sstable.getFilename()); failed = true; break; } + + if (cardinality == null) + cardinality = metadata.cardinalityEstimator; + else + cardinality = cardinality.merge(metadata.cardinalityEstimator); + } + catch (IOException e) + { + logger.warn("Reading cardinality from Statistics.db failed.", e); + failed = true; + break; + } + catch (CardinalityMergeException e) + { + logger.warn("Cardinality merge failed.", e); + failed = true; + break; } - if (cardinality != null && !failed) - count = cardinality.cardinality(); } + if (cardinality != null && !failed) + count = cardinality.cardinality(); // if something went wrong above or cardinality is not available, calculate using index summary if (count < 0) @@ -481,14 +471,14 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS assert !validate || components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor; // For the 3.0+ sstable format, the (misnomed) stats component hold the serialization header which we need to deserialize the sstable content - assert !descriptor.version.storeRows() || components.contains(Component.STATS) : "Stats component is missing for sstable " + descriptor; + assert components.contains(Component.STATS) : "Stats component is missing for sstable " + descriptor; EnumSet<MetadataType> types = EnumSet.of(MetadataType.VALIDATION, MetadataType.STATS, MetadataType.HEADER); Map<MetadataType, MetadataComponent> sstableMetadata = descriptor.getMetadataSerializer().deserialize(descriptor, types); ValidationMetadata validationMetadata = (ValidationMetadata) sstableMetadata.get(MetadataType.VALIDATION); StatsMetadata statsMetadata = (StatsMetadata) sstableMetadata.get(MetadataType.STATS); SerializationHeader.Component header = (SerializationHeader.Component) sstableMetadata.get(MetadataType.HEADER); - assert !descriptor.version.storeRows() || header != null; + assert header != null; // Check if sstable is created using same partitioner. // Partitioner can be null, which indicates older version of sstable or no stats available. @@ -730,7 +720,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS { // bf is enabled and fp chance matches the currently configured value. load(false, true); - loadBloomFilter(descriptor.version.hasOldBfHashOrder()); + loadBloomFilter(); } } @@ -739,11 +729,11 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS * * @throws IOException */ - private void loadBloomFilter(boolean oldBfHashOrder) throws IOException + private void loadBloomFilter() throws IOException { try (DataInputStream stream = new DataInputStream(new BufferedInputStream(new FileInputStream(descriptor.filenameFor(Component.FILTER))))) { - bf = FilterFactory.deserialize(stream, true, oldBfHashOrder); + bf = FilterFactory.deserialize(stream, true); } } @@ -829,7 +819,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS : estimateRowsFromIndex(primaryIndex); // statistics is supposed to be optional if (recreateBloomFilter) - bf = FilterFactory.getFilter(estimatedKeys, metadata.params.bloomFilterFpChance, true, descriptor.version.hasOldBfHashOrder()); + bf = FilterFactory.getFilter(estimatedKeys, metadata.params.bloomFilterFpChance, true); try (IndexSummaryBuilder summaryBuilder = summaryLoaded ? null : new IndexSummaryBuilder(estimatedKeys, metadata.params.minIndexInterval, samplingLevel)) { @@ -883,7 +873,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS { iStream = new DataInputStream(new FileInputStream(summariesFile)); indexSummary = IndexSummary.serializer.deserialize( - iStream, getPartitioner(), descriptor.version.hasSamplingLevel(), + iStream, getPartitioner(), metadata.params.minIndexInterval, metadata.params.maxIndexInterval); first = decorateKey(ByteBufferUtil.readWithLength(iStream)); last = decorateKey(ByteBufferUtil.readWithLength(iStream)); @@ -932,7 +922,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS try (DataOutputStreamPlus oStream = new BufferedDataOutputStreamPlus(new FileOutputStream(summariesFile));) { - IndexSummary.serializer.serialize(summary, oStream, descriptor.version.hasSamplingLevel()); + IndexSummary.serializer.serialize(summary, oStream); ByteBufferUtil.writeWithLength(first.getKey(), oStream); ByteBufferUtil.writeWithLength(last.getKey(), oStream); } @@ -1106,8 +1096,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS @SuppressWarnings("resource") public SSTableReader cloneWithNewSummarySamplingLevel(ColumnFamilyStore parent, int samplingLevel) throws IOException { - assert descriptor.version.hasSamplingLevel(); - synchronized (tidy.global) { assert openReason != OpenReason.EARLY; http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java index 9fb5f7c..874c679 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java @@ -127,26 +127,14 @@ public abstract class SSTableWriter extends SSTable implements Transactional return create(descriptor, keyCount, repairedAt, metadata, collector, header, indexes, txn); } - public static SSTableWriter create(String filename, - long keyCount, - long repairedAt, - int sstableLevel, - SerializationHeader header, - Collection<Index> indexes, - LifecycleTransaction txn) - { - return create(Descriptor.fromFilename(filename), keyCount, repairedAt, sstableLevel, header, indexes, txn); - } - @VisibleForTesting - public static SSTableWriter create(String filename, + public static SSTableWriter create(Descriptor descriptor, long keyCount, long repairedAt, SerializationHeader header, Collection<Index> indexes, LifecycleTransaction txn) { - Descriptor descriptor = Descriptor.fromFilename(filename); return create(descriptor, keyCount, repairedAt, 0, header, indexes, txn); } @@ -157,7 +145,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional Component.STATS, Component.SUMMARY, Component.TOC, - Component.digestFor(BigFormat.latestVersion.uncompressedChecksumType()))); + Component.DIGEST)); if (metadata.params.bloomFilterFpChance < 1.0) components.add(Component.FILTER); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/io/sstable/format/Version.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/Version.java b/src/java/org/apache/cassandra/io/sstable/format/Version.java index 96c5a6e..b78e434 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/Version.java +++ b/src/java/org/apache/cassandra/io/sstable/format/Version.java @@ -46,30 +46,8 @@ public abstract class Version public abstract boolean isLatestVersion(); - public abstract boolean hasSamplingLevel(); - - public abstract boolean hasNewStatsFile(); - - public abstract ChecksumType compressedChecksumType(); - - public abstract ChecksumType uncompressedChecksumType(); - - public abstract boolean hasRepairedAt(); - - public abstract boolean tracksLegacyCounterShards(); - - public abstract boolean hasNewFileName(); - - public abstract boolean storeRows(); - public abstract int correspondingMessagingVersion(); // Only use by storage that 'storeRows' so far - public abstract boolean hasOldBfHashOrder(); - - public abstract boolean hasCompactionAncestors(); - - public abstract boolean hasBoundaries(); - public abstract boolean hasCommitLogLowerBound(); public abstract boolean hasCommitLogIntervals(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java index 3846194..980eed0 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java @@ -111,16 +111,8 @@ public class BigFormat implements SSTableFormat static class BigVersion extends Version { public static final String current_version = "mc"; - public static final String earliest_supported_version = "jb"; + public static final String earliest_supported_version = "ma"; - // jb (2.0.1): switch from crc32 to adler32 for compression checksums - // checksum the compressed data - // ka (2.1.0): new Statistics.db file format - // index summaries can be downsampled and the sampling level is persisted - // switch uncompressed checksums to adler32 - // tracks presense of legacy (local and remote) counter shards - // la (2.2.0): new file name format - // lb (2.2.7): commit log lower bound included // ma (3.0.0): swap bf hash order // store rows natively // mb (3.0.7, 3.7): commit log lower bound included @@ -129,62 +121,17 @@ public class BigFormat implements SSTableFormat // NOTE: when adding a new version, please add that to LegacySSTableTest, too. private final boolean isLatestVersion; - private final boolean hasSamplingLevel; - private final boolean newStatsFile; - private final ChecksumType compressedChecksumType; - private final ChecksumType uncompressedChecksumType; - private final boolean hasRepairedAt; - private final boolean tracksLegacyCounterShards; - private final boolean newFileName; - public final boolean storeRows; - public final int correspondingMessagingVersion; // Only use by storage that 'storeRows' so far - public final boolean hasBoundaries; - /** - * CASSANDRA-8413: 3.0 bloom filter representation changed (two longs just swapped) - * have no 'static' bits caused by using the same upper bits for both bloom filter and token distribution. - */ - private final boolean hasOldBfHashOrder; + public final int correspondingMessagingVersion; private final boolean hasCommitLogLowerBound; private final boolean hasCommitLogIntervals; - /** - * CASSANDRA-7066: compaction ancerstors are no longer used and have been removed. - */ - private final boolean hasCompactionAncestors; - BigVersion(String version) { super(instance, version); isLatestVersion = version.compareTo(current_version) == 0; - hasSamplingLevel = version.compareTo("ka") >= 0; - newStatsFile = version.compareTo("ka") >= 0; - - //For a while Adler32 was in use, now the CRC32 instrinsic is very good especially after Haswell - //PureJavaCRC32 was always faster than Adler32. See CASSANDRA-8684 - ChecksumType checksumType = ChecksumType.CRC32; - if (version.compareTo("ka") >= 0 && version.compareTo("ma") < 0) - checksumType = ChecksumType.Adler32; - this.uncompressedChecksumType = checksumType; - - checksumType = ChecksumType.CRC32; - if (version.compareTo("jb") >= 0 && version.compareTo("ma") < 0) - checksumType = ChecksumType.Adler32; - this.compressedChecksumType = checksumType; - - hasRepairedAt = version.compareTo("ka") >= 0; - tracksLegacyCounterShards = version.compareTo("ka") >= 0; + correspondingMessagingVersion = MessagingService.VERSION_30; - newFileName = version.compareTo("la") >= 0; - - hasOldBfHashOrder = version.compareTo("ma") < 0; - hasCompactionAncestors = version.compareTo("ma") < 0; - storeRows = version.compareTo("ma") >= 0; - correspondingMessagingVersion = storeRows - ? MessagingService.VERSION_30 - : MessagingService.VERSION_21; - - hasBoundaries = version.compareTo("ma") < 0; hasCommitLogLowerBound = (version.compareTo("lb") >= 0 && version.compareTo("ma") < 0) || version.compareTo("mb") >= 0; hasCommitLogIntervals = version.compareTo("mc") >= 0; @@ -197,60 +144,6 @@ public class BigFormat implements SSTableFormat } @Override - public boolean hasSamplingLevel() - { - return hasSamplingLevel; - } - - @Override - public boolean hasNewStatsFile() - { - return newStatsFile; - } - - @Override - public ChecksumType compressedChecksumType() - { - return compressedChecksumType; - } - - @Override - public ChecksumType uncompressedChecksumType() - { - return uncompressedChecksumType; - } - - @Override - public boolean hasRepairedAt() - { - return hasRepairedAt; - } - - @Override - public boolean tracksLegacyCounterShards() - { - return tracksLegacyCounterShards; - } - - @Override - public boolean hasOldBfHashOrder() - { - return hasOldBfHashOrder; - } - - @Override - public boolean hasCompactionAncestors() - { - return hasCompactionAncestors; - } - - @Override - public boolean hasNewFileName() - { - return newFileName; - } - - @Override public boolean hasCommitLogLowerBound() { return hasCommitLogLowerBound; @@ -263,24 +156,12 @@ public class BigFormat implements SSTableFormat } @Override - public boolean storeRows() - { - return storeRows; - } - - @Override public int correspondingMessagingVersion() { return correspondingMessagingVersion; } @Override - public boolean hasBoundaries() - { - return hasBoundaries; - } - - @Override public boolean isCompatible() { return version.compareTo(earliest_supported_version) >= 0 && version.charAt(0) <= current_version.charAt(0); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java index c3139a3..018edac 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java @@ -84,7 +84,7 @@ public class BigTableWriter extends SSTableWriter { dataFile = new CompressedSequentialWriter(new File(getFilename()), descriptor.filenameFor(Component.COMPRESSION_INFO), - new File(descriptor.filenameFor(descriptor.digestComponent)), + new File(descriptor.filenameFor(Component.DIGEST)), writerOption, metadata.params.compression, metadataCollector); @@ -93,7 +93,7 @@ public class BigTableWriter extends SSTableWriter { dataFile = new ChecksummedSequentialWriter(new File(getFilename()), new File(descriptor.filenameFor(Component.CRC)), - new File(descriptor.filenameFor(descriptor.digestComponent)), + new File(descriptor.filenameFor(Component.DIGEST)), writerOption); } dbuilder = new FileHandle.Builder(descriptor.filenameFor(Component.DATA)).compressed(compression) @@ -442,7 +442,7 @@ public class BigTableWriter extends SSTableWriter builder = new FileHandle.Builder(descriptor.filenameFor(Component.PRIMARY_INDEX)).mmapped(DatabaseDescriptor.getIndexAccessMode() == Config.DiskAccessMode.mmap); chunkCache.ifPresent(builder::withChunkCache); summary = new IndexSummaryBuilder(keyCount, metadata.params.minIndexInterval, Downsampling.BASE_SAMPLING_LEVEL); - bf = FilterFactory.getFilter(keyCount, metadata.params.bloomFilterFpChance, true, descriptor.version.hasOldBfHashOrder()); + bf = FilterFactory.getFilter(keyCount, metadata.params.bloomFilterFpChance, true); // register listeners to be alerted when the data files are flushed indexFile.setPostFlushListener(() -> summary.markIndexSynced(indexFile.getLastFlushOffset())); dataFile.setPostFlushListener(() -> summary.markDataSynced(dataFile.getLastFlushOffset())); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/io/sstable/metadata/CompactionMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/CompactionMetadata.java b/src/java/org/apache/cassandra/io/sstable/metadata/CompactionMetadata.java index ef3453a..c9dfe39 100644 --- a/src/java/org/apache/cassandra/io/sstable/metadata/CompactionMetadata.java +++ b/src/java/org/apache/cassandra/io/sstable/metadata/CompactionMetadata.java @@ -75,30 +75,17 @@ public class CompactionMetadata extends MetadataComponent public int serializedSize(Version version, CompactionMetadata component) throws IOException { int sz = 0; - if (version.hasCompactionAncestors()) - { // write empty ancestor marker - sz = 4; - } byte[] serializedCardinality = component.cardinalityEstimator.getBytes(); return TypeSizes.sizeof(serializedCardinality.length) + serializedCardinality.length + sz; } public void serialize(Version version, CompactionMetadata component, DataOutputPlus out) throws IOException { - if (version.hasCompactionAncestors()) - { // write empty ancestor marker - out.writeInt(0); - } ByteBufferUtil.writeWithLength(component.cardinalityEstimator.getBytes(), out); } public CompactionMetadata deserialize(Version version, DataInputPlus in) throws IOException { - if (version.hasCompactionAncestors()) - { // skip ancestors - int nbAncestors = in.readInt(); - in.skipBytes(nbAncestors * TypeSizes.sizeof(nbAncestors)); - } ICardinality cardinality = HyperLogLogPlus.Builder.build(ByteBufferUtil.readBytes(in, in.readInt())); return new CompactionMetadata(cardinality); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java deleted file mode 100644 index 6cc33f5..0000000 --- a/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.io.sstable.metadata; - -import java.io.*; -import java.nio.ByteBuffer; -import java.util.*; - -import org.apache.cassandra.db.TypeSizes; -import org.apache.cassandra.db.commitlog.CommitLogPosition; -import org.apache.cassandra.db.commitlog.IntervalSet; -import org.apache.cassandra.io.sstable.Component; -import org.apache.cassandra.io.sstable.Descriptor; -import org.apache.cassandra.io.sstable.format.Version; -import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus; -import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.service.ActiveRepairService; -import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.EstimatedHistogram; -import org.apache.cassandra.utils.StreamingHistogram; - -import static org.apache.cassandra.io.sstable.metadata.StatsMetadata.commitLogPositionSetSerializer; - -/** - * Serializer for SSTable from legacy versions - */ -@Deprecated -public class LegacyMetadataSerializer extends MetadataSerializer -{ - /** - * Legacy serialization is only used for SSTable level reset. - */ - @Override - public void serialize(Map<MetadataType, MetadataComponent> components, DataOutputPlus out, Version version) throws IOException - { - ValidationMetadata validation = (ValidationMetadata) components.get(MetadataType.VALIDATION); - StatsMetadata stats = (StatsMetadata) components.get(MetadataType.STATS); - CompactionMetadata compaction = (CompactionMetadata) components.get(MetadataType.COMPACTION); - - assert validation != null && stats != null && compaction != null && validation.partitioner != null; - - EstimatedHistogram.serializer.serialize(stats.estimatedPartitionSize, out); - EstimatedHistogram.serializer.serialize(stats.estimatedColumnCount, out); - CommitLogPosition.serializer.serialize(stats.commitLogIntervals.upperBound().orElse(CommitLogPosition.NONE), out); - out.writeLong(stats.minTimestamp); - out.writeLong(stats.maxTimestamp); - out.writeInt(stats.maxLocalDeletionTime); - out.writeDouble(validation.bloomFilterFPChance); - out.writeDouble(stats.compressionRatio); - out.writeUTF(validation.partitioner); - out.writeInt(0); // compaction ancestors - StreamingHistogram.serializer.serialize(stats.estimatedTombstoneDropTime, out); - out.writeInt(stats.sstableLevel); - out.writeInt(stats.minClusteringValues.size()); - for (ByteBuffer value : stats.minClusteringValues) - ByteBufferUtil.writeWithShortLength(value, out); - out.writeInt(stats.maxClusteringValues.size()); - for (ByteBuffer value : stats.maxClusteringValues) - ByteBufferUtil.writeWithShortLength(value, out); - if (version.hasCommitLogLowerBound()) - CommitLogPosition.serializer.serialize(stats.commitLogIntervals.lowerBound().orElse(CommitLogPosition.NONE), out); - if (version.hasCommitLogIntervals()) - commitLogPositionSetSerializer.serialize(stats.commitLogIntervals, out); - } - - /** - * Legacy serializer deserialize all components no matter what types are specified. - */ - @Override - public Map<MetadataType, MetadataComponent> deserialize(Descriptor descriptor, EnumSet<MetadataType> types) throws IOException - { - Map<MetadataType, MetadataComponent> components = new EnumMap<>(MetadataType.class); - - File statsFile = new File(descriptor.filenameFor(Component.STATS)); - if (!statsFile.exists() && types.contains(MetadataType.STATS)) - { - components.put(MetadataType.STATS, MetadataCollector.defaultStatsMetadata()); - } - else - { - try (DataInputStreamPlus in = new DataInputStreamPlus(new BufferedInputStream(new FileInputStream(statsFile)))) - { - EstimatedHistogram partitionSizes = EstimatedHistogram.serializer.deserialize(in); - EstimatedHistogram columnCounts = EstimatedHistogram.serializer.deserialize(in); - CommitLogPosition commitLogLowerBound = CommitLogPosition.NONE; - CommitLogPosition commitLogUpperBound = CommitLogPosition.serializer.deserialize(in); - long minTimestamp = in.readLong(); - long maxTimestamp = in.readLong(); - int maxLocalDeletionTime = in.readInt(); - double bloomFilterFPChance = in.readDouble(); - double compressionRatio = in.readDouble(); - String partitioner = in.readUTF(); - int nbAncestors = in.readInt(); //skip compaction ancestors - in.skipBytes(nbAncestors * TypeSizes.sizeof(nbAncestors)); - StreamingHistogram tombstoneHistogram = StreamingHistogram.serializer.deserialize(in); - int sstableLevel = 0; - if (in.available() > 0) - sstableLevel = in.readInt(); - - int colCount = in.readInt(); - List<ByteBuffer> minColumnNames = new ArrayList<>(colCount); - for (int i = 0; i < colCount; i++) - minColumnNames.add(ByteBufferUtil.readWithShortLength(in)); - - colCount = in.readInt(); - List<ByteBuffer> maxColumnNames = new ArrayList<>(colCount); - for (int i = 0; i < colCount; i++) - maxColumnNames.add(ByteBufferUtil.readWithShortLength(in)); - - if (descriptor.version.hasCommitLogLowerBound()) - commitLogLowerBound = CommitLogPosition.serializer.deserialize(in); - IntervalSet<CommitLogPosition> commitLogIntervals; - if (descriptor.version.hasCommitLogIntervals()) - commitLogIntervals = commitLogPositionSetSerializer.deserialize(in); - else - commitLogIntervals = new IntervalSet<>(commitLogLowerBound, commitLogUpperBound); - - if (types.contains(MetadataType.VALIDATION)) - components.put(MetadataType.VALIDATION, - new ValidationMetadata(partitioner, bloomFilterFPChance)); - if (types.contains(MetadataType.STATS)) - components.put(MetadataType.STATS, - new StatsMetadata(partitionSizes, - columnCounts, - commitLogIntervals, - minTimestamp, - maxTimestamp, - Integer.MAX_VALUE, - maxLocalDeletionTime, - 0, - Integer.MAX_VALUE, - compressionRatio, - tombstoneHistogram, - sstableLevel, - minColumnNames, - maxColumnNames, - true, - ActiveRepairService.UNREPAIRED_SSTABLE, - -1, - -1)); - if (types.contains(MetadataType.COMPACTION)) - components.put(MetadataType.COMPACTION, - new CompactionMetadata(null)); - } - } - return components; - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java index c83c2cf..0f6434b 100644 --- a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java +++ b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java @@ -236,10 +236,7 @@ public class StatsMetadata extends MetadataComponent size += EstimatedHistogram.serializer.serializedSize(component.estimatedPartitionSize); size += EstimatedHistogram.serializer.serializedSize(component.estimatedColumnCount); size += CommitLogPosition.serializer.serializedSize(component.commitLogIntervals.upperBound().orElse(CommitLogPosition.NONE)); - if (version.storeRows()) - size += 8 + 8 + 4 + 4 + 4 + 4 + 8 + 8; // mix/max timestamp(long), min/maxLocalDeletionTime(int), min/max TTL, compressionRatio(double), repairedAt (long) - else - size += 8 + 8 + 4 + 8 + 8; // mix/max timestamp(long), maxLocalDeletionTime(int), compressionRatio(double), repairedAt (long) + size += 8 + 8 + 4 + 4 + 4 + 4 + 8 + 8; // mix/max timestamp(long), min/maxLocalDeletionTime(int), min/max TTL, compressionRatio(double), repairedAt (long) size += StreamingHistogram.serializer.serializedSize(component.estimatedTombstoneDropTime); size += TypeSizes.sizeof(component.sstableLevel); // min column names @@ -251,8 +248,7 @@ public class StatsMetadata extends MetadataComponent for (ByteBuffer value : component.maxClusteringValues) size += 2 + value.remaining(); // with short length size += TypeSizes.sizeof(component.hasLegacyCounterShards); - if (version.storeRows()) - size += 8 + 8; // totalColumnsSet, totalRows + size += 8 + 8; // totalColumnsSet, totalRows if (version.hasCommitLogLowerBound()) size += CommitLogPosition.serializer.serializedSize(component.commitLogIntervals.lowerBound().orElse(CommitLogPosition.NONE)); if (version.hasCommitLogIntervals()) @@ -267,14 +263,10 @@ public class StatsMetadata extends MetadataComponent CommitLogPosition.serializer.serialize(component.commitLogIntervals.upperBound().orElse(CommitLogPosition.NONE), out); out.writeLong(component.minTimestamp); out.writeLong(component.maxTimestamp); - if (version.storeRows()) - out.writeInt(component.minLocalDeletionTime); + out.writeInt(component.minLocalDeletionTime); out.writeInt(component.maxLocalDeletionTime); - if (version.storeRows()) - { - out.writeInt(component.minTTL); - out.writeInt(component.maxTTL); - } + out.writeInt(component.minTTL); + out.writeInt(component.maxTTL); out.writeDouble(component.compressionRatio); StreamingHistogram.serializer.serialize(component.estimatedTombstoneDropTime, out); out.writeInt(component.sstableLevel); @@ -287,11 +279,8 @@ public class StatsMetadata extends MetadataComponent ByteBufferUtil.writeWithShortLength(value, out); out.writeBoolean(component.hasLegacyCounterShards); - if (version.storeRows()) - { - out.writeLong(component.totalColumnsSet); - out.writeLong(component.totalRows); - } + out.writeLong(component.totalColumnsSet); + out.writeLong(component.totalRows); if (version.hasCommitLogLowerBound()) CommitLogPosition.serializer.serialize(component.commitLogIntervals.lowerBound().orElse(CommitLogPosition.NONE), out); @@ -307,17 +296,14 @@ public class StatsMetadata extends MetadataComponent commitLogUpperBound = CommitLogPosition.serializer.deserialize(in); long minTimestamp = in.readLong(); long maxTimestamp = in.readLong(); - // We use MAX_VALUE as that's the default value for "no deletion time" - int minLocalDeletionTime = version.storeRows() ? in.readInt() : Integer.MAX_VALUE; + int minLocalDeletionTime = in.readInt(); int maxLocalDeletionTime = in.readInt(); - int minTTL = version.storeRows() ? in.readInt() : 0; - int maxTTL = version.storeRows() ? in.readInt() : Integer.MAX_VALUE; + int minTTL = in.readInt(); + int maxTTL = in.readInt(); double compressionRatio = in.readDouble(); StreamingHistogram tombstoneHistogram = StreamingHistogram.serializer.deserialize(in); int sstableLevel = in.readInt(); - long repairedAt = 0; - if (version.hasRepairedAt()) - repairedAt = in.readLong(); + long repairedAt = in.readLong(); int colCount = in.readInt(); List<ByteBuffer> minClusteringValues = new ArrayList<>(colCount); @@ -329,12 +315,10 @@ public class StatsMetadata extends MetadataComponent for (int i = 0; i < colCount; i++) maxClusteringValues.add(ByteBufferUtil.readWithShortLength(in)); - boolean hasLegacyCounterShards = true; - if (version.tracksLegacyCounterShards()) - hasLegacyCounterShards = in.readBoolean(); + boolean hasLegacyCounterShards = in.readBoolean(); - long totalColumnsSet = version.storeRows() ? in.readLong() : -1L; - long totalRows = version.storeRows() ? in.readLong() : -1L; + long totalColumnsSet = in.readLong(); + long totalRows = in.readLong(); if (version.hasCommitLogLowerBound()) commitLogLowerBound = CommitLogPosition.serializer.deserialize(in); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/io/util/CompressedChunkReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/CompressedChunkReader.java b/src/java/org/apache/cassandra/io/util/CompressedChunkReader.java index 8f00ce7..219f0eb 100644 --- a/src/java/org/apache/cassandra/io/util/CompressedChunkReader.java +++ b/src/java/org/apache/cassandra/io/util/CompressedChunkReader.java @@ -29,6 +29,7 @@ import org.apache.cassandra.io.compress.BufferType; import org.apache.cassandra.io.compress.CompressionMetadata; import org.apache.cassandra.io.compress.CorruptBlockException; import org.apache.cassandra.io.sstable.CorruptSSTableException; +import org.apache.cassandra.utils.ChecksumType; public abstract class CompressedChunkReader extends AbstractReaderFileProxy implements ChunkReader { @@ -142,7 +143,7 @@ public abstract class CompressedChunkReader extends AbstractReaderFileProxy impl if (getCrcCheckChance() > ThreadLocalRandom.current().nextDouble()) { compressed.rewind(); - int checksum = (int) metadata.checksumType.of(compressed); + int checksum = (int) ChecksumType.CRC32.of(compressed); compressed.clear().limit(Integer.BYTES); if (channel.read(compressed, chunk.offset + chunk.length) != Integer.BYTES @@ -204,7 +205,7 @@ public abstract class CompressedChunkReader extends AbstractReaderFileProxy impl { compressedChunk.position(chunkOffset).limit(chunkOffset + chunk.length); - int checksum = (int) metadata.checksumType.of(compressedChunk); + int checksum = (int) ChecksumType.CRC32.of(compressedChunk); compressedChunk.limit(compressedChunk.capacity()); if (compressedChunk.getInt() != checksum) http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java index cee23c9..91b189d 100644 --- a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java +++ b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java @@ -44,7 +44,7 @@ public class DataIntegrityMetadata public ChecksumValidator(Descriptor descriptor) throws IOException { - this(descriptor.version.uncompressedChecksumType(), + this(ChecksumType.CRC32, RandomAccessReader.open(new File(descriptor.filenameFor(Component.CRC))), descriptor.filenameFor(Component.DATA)); } @@ -99,8 +99,8 @@ public class DataIntegrityMetadata public FileDigestValidator(Descriptor descriptor) throws IOException { this.descriptor = descriptor; - checksum = descriptor.version.uncompressedChecksumType().newInstance(); - digestReader = RandomAccessReader.open(new File(descriptor.filenameFor(Component.digestFor(descriptor.version.uncompressedChecksumType())))); + checksum = ChecksumType.CRC32.newInstance(); + digestReader = RandomAccessReader.open(new File(descriptor.filenameFor(Component.DIGEST))); dataReader = RandomAccessReader.open(new File(descriptor.filenameFor(Component.DATA))); try { http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/net/IncomingTcpConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java index 9878590..53e53a4 100644 --- a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java +++ b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java @@ -86,9 +86,9 @@ public class IncomingTcpConnection extends FastThreadLocalThread implements Clos { try { - if (version < MessagingService.VERSION_20) + if (version < MessagingService.VERSION_30) throw new UnsupportedOperationException(String.format("Unable to read obsolete message version %s; " - + "The earliest version supported is 2.0.0", + + "The earliest version supported is 3.0.0", version)); receiveMessages(); @@ -155,18 +155,11 @@ public class IncomingTcpConnection extends FastThreadLocalThread implements Clos if (compressed) { logger.trace("Upgrading incoming connection to be compressed"); - if (version < MessagingService.VERSION_21) - { - in = new DataInputStreamPlus(new SnappyInputStream(socket.getInputStream())); - } - else - { - LZ4FastDecompressor decompressor = LZ4Factory.fastestInstance().fastDecompressor(); - Checksum checksum = XXHashFactory.fastestInstance().newStreamingHash32(OutboundTcpConnection.LZ4_HASH_SEED).asChecksum(); - in = new DataInputStreamPlus(new LZ4BlockInputStream(socket.getInputStream(), - decompressor, - checksum)); - } + LZ4FastDecompressor decompressor = LZ4Factory.fastestInstance().fastDecompressor(); + Checksum checksum = XXHashFactory.fastestInstance().newStreamingHash32(OutboundTcpConnection.LZ4_HASH_SEED).asChecksum(); + in = new DataInputStreamPlus(new LZ4BlockInputStream(socket.getInputStream(), + decompressor, + checksum)); } else { @@ -183,11 +176,8 @@ public class IncomingTcpConnection extends FastThreadLocalThread implements Clos private InetAddress receiveMessage(DataInputPlus input, int version) throws IOException { - int id; - if (version < MessagingService.VERSION_20) - id = Integer.parseInt(input.readUTF()); - else - id = input.readInt(); + int id = input.readInt(); + long currentTime = ApproximateTime.currentTimeMillis(); MessageIn message = MessageIn.read(input, version, id, MessageIn.readConstructionTime(from, input, currentTime)); if (message == null) http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/net/MessageOut.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessageOut.java b/src/java/org/apache/cassandra/net/MessageOut.java index 4f41ee5..94019f2 100644 --- a/src/java/org/apache/cassandra/net/MessageOut.java +++ b/src/java/org/apache/cassandra/net/MessageOut.java @@ -104,7 +104,7 @@ public class MessageOut<T> { CompactEndpointSerializationHelper.serialize(from, out); - out.writeInt(MessagingService.Verb.convertForMessagingServiceVersion(verb, version).ordinal()); + out.writeInt(verb.ordinal()); out.writeInt(parameters.size()); for (Map.Entry<String, byte[]> entry : parameters.entrySet()) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index f82e80b..38c1cd2 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -88,10 +88,6 @@ public final class MessagingService implements MessagingServiceMBean public static final String MBEAN_NAME = "org.apache.cassandra.net:type=MessagingService"; // 8 bits version, so don't waste versions - public static final int VERSION_12 = 6; - public static final int VERSION_20 = 7; - public static final int VERSION_21 = 8; - public static final int VERSION_22 = 9; public static final int VERSION_30 = 10; public static final int current_version = VERSION_30; @@ -105,9 +101,6 @@ public final class MessagingService implements MessagingServiceMBean */ public static final int PROTOCOL_MAGIC = 0xCA552DFA; - private boolean allNodesAtLeast22 = true; - private boolean allNodesAtLeast30 = true; - public final MessagingMetrics metrics = new MessagingMetrics(); /* All verb handler identifiers */ @@ -236,16 +229,6 @@ public final class MessagingService implements MessagingServiceMBean UNUSED_5, ; - // This is to support a "late" choice of the verb based on the messaging service version. - // See CASSANDRA-12249 for more details. - public static Verb convertForMessagingServiceVersion(Verb verb, int version) - { - if (verb == PAGED_RANGE && version >= VERSION_30) - return RANGE_SLICE; - - return verb; - } - public long getTimeout() { return DatabaseDescriptor.getRpcTimeout(); @@ -319,9 +302,9 @@ public final class MessagingService implements MessagingServiceMBean put(Verb.MUTATION, Mutation.serializer); put(Verb.READ_REPAIR, Mutation.serializer); - put(Verb.READ, ReadCommand.readSerializer); - put(Verb.RANGE_SLICE, ReadCommand.rangeSliceSerializer); - put(Verb.PAGED_RANGE, ReadCommand.pagedRangeSerializer); + put(Verb.READ, ReadCommand.serializer); + put(Verb.RANGE_SLICE, ReadCommand.serializer); + put(Verb.PAGED_RANGE, ReadCommand.serializer); put(Verb.BOOTSTRAP_TOKEN, BootStrapper.StringSerializer.instance); put(Verb.REPAIR_MESSAGE, RepairMessage.serializer); put(Verb.GOSSIP_DIGEST_ACK, GossipDigestAck.serializer); @@ -350,8 +333,8 @@ public final class MessagingService implements MessagingServiceMBean put(Verb.HINT, HintResponse.serializer); put(Verb.READ_REPAIR, WriteResponse.serializer); put(Verb.COUNTER_MUTATION, WriteResponse.serializer); - put(Verb.RANGE_SLICE, ReadResponse.rangeSliceSerializer); - put(Verb.PAGED_RANGE, ReadResponse.rangeSliceSerializer); + put(Verb.RANGE_SLICE, ReadResponse.serializer); + put(Verb.PAGED_RANGE, ReadResponse.serializer); put(Verb.READ, ReadResponse.serializer); put(Verb.TRUNCATE, TruncateResponse.serializer); put(Verb.SNAPSHOT, null); @@ -1041,16 +1024,6 @@ public final class MessagingService implements MessagingServiceMBean return packed >>> (start + 1) - count & ~(-1 << count); } - public boolean areAllNodesAtLeast22() - { - return allNodesAtLeast22; - } - - public boolean areAllNodesAtLeast30() - { - return allNodesAtLeast30; - } - /** * @return the last version associated with address, or @param version if this is the first such version */ @@ -1058,50 +1031,16 @@ public final class MessagingService implements MessagingServiceMBean { // We can't talk to someone from the future version = Math.min(version, current_version); - logger.trace("Setting version {} for {}", version, endpoint); - if (version < VERSION_22) - allNodesAtLeast22 = false; - if (version < VERSION_30) - allNodesAtLeast30 = false; - Integer v = versions.put(endpoint, version); - - // if the version was increased to 2.2 or later see if the min version across the cluster has changed - if (v != null && (v < VERSION_30 && version >= VERSION_22)) - refreshAllNodeMinVersions(); - return v == null ? version : v; } public void resetVersion(InetAddress endpoint) { logger.trace("Resetting version for {}", endpoint); - Integer removed = versions.remove(endpoint); - if (removed != null && removed <= VERSION_30) - refreshAllNodeMinVersions(); - } - - private void refreshAllNodeMinVersions() - { - boolean anyNodeLowerThan30 = false; - for (Integer version : versions.values()) - { - if (version < MessagingService.VERSION_30) - { - anyNodeLowerThan30 = true; - allNodesAtLeast30 = false; - } - - if (version < MessagingService.VERSION_22) - { - allNodesAtLeast22 = false; - return; - } - } - allNodesAtLeast22 = true; - allNodesAtLeast30 = !anyNodeLowerThan30; + versions.remove(endpoint); } public int getVersion(InetAddress endpoint) http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/net/OutboundTcpConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java index 1f47334..c32154e 100644 --- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java +++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java @@ -336,11 +336,7 @@ public class OutboundTcpConnection extends FastThreadLocalThread private void writeInternal(MessageOut message, int id, long timestamp) throws IOException { out.writeInt(MessagingService.PROTOCOL_MAGIC); - - if (targetVersion < MessagingService.VERSION_20) - out.writeUTF(String.valueOf(id)); - else - out.writeInt(id); + out.writeInt(id); // int cast cuts off the high-order half of the timestamp, which we can assume remains // the same between now and when the recipient reconstructs it. @@ -427,9 +423,7 @@ public class OutboundTcpConnection extends FastThreadLocalThread int maxTargetVersion = handshakeVersion(in); if (maxTargetVersion == NO_VERSION) { - // no version is returned, so disconnect an try again: we will either get - // a different target version (targetVersion < MessagingService.VERSION_12) - // or if the same version the handshake will finally succeed + // no version is returned, so disconnect an try again logger.trace("Target max version is {}; no version information yet, will retry", maxTargetVersion); if (DatabaseDescriptor.getSeeds().contains(poolReference.endPoint())) logger.warn("Seed gossip version is {}; will not connect with that version", maxTargetVersion); @@ -461,22 +455,15 @@ public class OutboundTcpConnection extends FastThreadLocalThread { out.flush(); logger.trace("Upgrading OutputStream to {} to be compressed", poolReference.endPoint()); - if (targetVersion < MessagingService.VERSION_21) - { - // Snappy is buffered, so no need for extra buffering output stream - out = new WrappedDataOutputStreamPlus(new SnappyOutputStream(socket.getOutputStream())); - } - else - { - // TODO: custom LZ4 OS that supports BB write methods - LZ4Compressor compressor = LZ4Factory.fastestInstance().fastCompressor(); - Checksum checksum = XXHashFactory.fastestInstance().newStreamingHash32(LZ4_HASH_SEED).asChecksum(); - out = new WrappedDataOutputStreamPlus(new LZ4BlockOutputStream(socket.getOutputStream(), - 1 << 14, // 16k block size - compressor, - checksum, - true)); // no async flushing - } + + // TODO: custom LZ4 OS that supports BB write methods + LZ4Compressor compressor = LZ4Factory.fastestInstance().fastCompressor(); + Checksum checksum = XXHashFactory.fastestInstance().newStreamingHash32(LZ4_HASH_SEED).asChecksum(); + out = new WrappedDataOutputStreamPlus(new LZ4BlockOutputStream(socket.getOutputStream(), + 1 << 14, // 16k block size + compressor, + checksum, + true)); // no async flushing } logger.debug("Done connecting to {}", poolReference.endPoint()); return true; http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/repair/RepairJobDesc.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairJobDesc.java b/src/java/org/apache/cassandra/repair/RepairJobDesc.java index 05adbf9..be3daef 100644 --- a/src/java/org/apache/cassandra/repair/RepairJobDesc.java +++ b/src/java/org/apache/cassandra/repair/RepairJobDesc.java @@ -93,12 +93,10 @@ public class RepairJobDesc { public void serialize(RepairJobDesc desc, DataOutputPlus out, int version) throws IOException { - if (version >= MessagingService.VERSION_21) - { - out.writeBoolean(desc.parentSessionId != null); - if (desc.parentSessionId != null) - UUIDSerializer.serializer.serialize(desc.parentSessionId, out, version); - } + out.writeBoolean(desc.parentSessionId != null); + if (desc.parentSessionId != null) + UUIDSerializer.serializer.serialize(desc.parentSessionId, out, version); + UUIDSerializer.serializer.serialize(desc.sessionId, out, version); out.writeUTF(desc.keyspace); out.writeUTF(desc.columnFamily); @@ -111,11 +109,8 @@ public class RepairJobDesc public RepairJobDesc deserialize(DataInputPlus in, int version) throws IOException { UUID parentSessionId = null; - if (version >= MessagingService.VERSION_21) - { - if (in.readBoolean()) - parentSessionId = UUIDSerializer.serializer.deserialize(in, version); - } + if (in.readBoolean()) + parentSessionId = UUIDSerializer.serializer.deserialize(in, version); UUID sessionId = UUIDSerializer.serializer.deserialize(in, version); String keyspace = in.readUTF(); String columnFamily = in.readUTF(); @@ -136,13 +131,9 @@ public class RepairJobDesc public long serializedSize(RepairJobDesc desc, int version) { - int size = 0; - if (version >= MessagingService.VERSION_21) - { - size += TypeSizes.sizeof(desc.parentSessionId != null); - if (desc.parentSessionId != null) - size += UUIDSerializer.serializer.serializedSize(desc.parentSessionId, version); - } + int size = TypeSizes.sizeof(desc.parentSessionId != null); + if (desc.parentSessionId != null) + size += UUIDSerializer.serializer.serializedSize(desc.parentSessionId, version); size += UUIDSerializer.serializer.serializedSize(desc.sessionId, version); size += TypeSizes.sizeof(desc.keyspace); size += TypeSizes.sizeof(desc.columnFamily); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/repair/Validator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/Validator.java b/src/java/org/apache/cassandra/repair/Validator.java index a2a2512..e20995e 100644 --- a/src/java/org/apache/cassandra/repair/Validator.java +++ b/src/java/org/apache/cassandra/repair/Validator.java @@ -218,7 +218,7 @@ public class Validator implements Runnable validated++; // MerkleTree uses XOR internally, so we want lots of output bits here CountingDigest digest = new CountingDigest(FBUtilities.newMessageDigest("SHA-256")); - UnfilteredRowIterators.digest(null, partition, digest, MessagingService.current_version); + UnfilteredRowIterators.digest(partition, digest, MessagingService.current_version); // only return new hash for merkle tree in case digest was updated - see CASSANDRA-8979 return digest.count > 0 ? new MerkleTree.RowHash(partition.partitionKey().getToken(), digest.digest(), digest.count)