http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/db/Serializers.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Serializers.java b/src/java/org/apache/cassandra/db/Serializers.java deleted file mode 100644 index d6aac64..0000000 --- a/src/java/org/apache/cassandra/db/Serializers.java +++ /dev/null @@ -1,183 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db; - -import java.io.*; -import java.nio.ByteBuffer; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.db.marshal.AbstractType; -import org.apache.cassandra.db.marshal.CompositeType; -import org.apache.cassandra.io.ISerializer; -import org.apache.cassandra.io.sstable.IndexInfo; -import org.apache.cassandra.io.sstable.format.big.BigFormat; -import org.apache.cassandra.io.util.DataInputPlus; -import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.io.sstable.format.Version; -import org.apache.cassandra.utils.ByteBufferUtil; - -/** - * Holds references on serializers that depend on the table definition. - */ -public class Serializers -{ - private final CFMetaData metadata; - - private Map<Version, IndexInfo.Serializer> otherVersionClusteringSerializers; - - private final IndexInfo.Serializer latestVersionIndexSerializer; - - public Serializers(CFMetaData metadata) - { - this.metadata = metadata; - this.latestVersionIndexSerializer = new IndexInfo.Serializer(BigFormat.latestVersion, - indexEntryClusteringPrefixSerializer(BigFormat.latestVersion, SerializationHeader.makeWithoutStats(metadata))); - } - - IndexInfo.Serializer indexInfoSerializer(Version version, SerializationHeader header) - { - // null header indicates streaming from pre-3.0 sstables - if (version.equals(BigFormat.latestVersion) && header != null) - return latestVersionIndexSerializer; - - if (otherVersionClusteringSerializers == null) - otherVersionClusteringSerializers = new ConcurrentHashMap<>(); - IndexInfo.Serializer serializer = otherVersionClusteringSerializers.get(version); - if (serializer == null) - { - serializer = new IndexInfo.Serializer(version, - indexEntryClusteringPrefixSerializer(version, header)); - otherVersionClusteringSerializers.put(version, serializer); - } - return serializer; - } - - // TODO: Once we drop support for old (pre-3.0) sstables, we can drop this method and inline the calls to - // ClusteringPrefix.serializer directly. At which point this whole class probably becomes - // unecessary (since IndexInfo.Serializer won't depend on the metadata either). - private ISerializer<ClusteringPrefix> indexEntryClusteringPrefixSerializer(Version version, SerializationHeader header) - { - if (!version.storeRows() || header == null) //null header indicates streaming from pre-3.0 sstables - { - return oldFormatSerializer(version); - } - - return new NewFormatSerializer(version, header.clusteringTypes()); - } - - private ISerializer<ClusteringPrefix> oldFormatSerializer(Version version) - { - return new ISerializer<ClusteringPrefix>() - { - List<AbstractType<?>> clusteringTypes = SerializationHeader.makeWithoutStats(metadata).clusteringTypes(); - - public void serialize(ClusteringPrefix clustering, DataOutputPlus out) throws IOException - { - //we deserialize in the old format and serialize in the new format - ClusteringPrefix.serializer.serialize(clustering, out, - version.correspondingMessagingVersion(), - clusteringTypes); - } - - @Override - public void skip(DataInputPlus in) throws IOException - { - ByteBufferUtil.skipShortLength(in); - } - - public ClusteringPrefix deserialize(DataInputPlus in) throws IOException - { - // We're reading the old cellname/composite - ByteBuffer bb = ByteBufferUtil.readWithShortLength(in); - assert bb.hasRemaining(); // empty cellnames were invalid - - int clusteringSize = metadata.clusteringColumns().size(); - // If the table has no clustering column, then the cellname will just be the "column" name, which we ignore here. - if (clusteringSize == 0) - return Clustering.EMPTY; - - if (!metadata.isCompound()) - return Clustering.make(bb); - - List<ByteBuffer> components = CompositeType.splitName(bb); - byte eoc = CompositeType.lastEOC(bb); - - if (eoc == 0 || components.size() >= clusteringSize) - { - // That's a clustering. - if (components.size() > clusteringSize) - components = components.subList(0, clusteringSize); - - return Clustering.make(components.toArray(new ByteBuffer[clusteringSize])); - } - else - { - // It's a range tombstone bound. It is a start since that's the only part we've ever included - // in the index entries. - ClusteringPrefix.Kind boundKind = eoc > 0 - ? ClusteringPrefix.Kind.EXCL_START_BOUND - : ClusteringPrefix.Kind.INCL_START_BOUND; - - return ClusteringBound.create(boundKind, components.toArray(new ByteBuffer[components.size()])); - } - } - - public long serializedSize(ClusteringPrefix clustering) - { - return ClusteringPrefix.serializer.serializedSize(clustering, version.correspondingMessagingVersion(), - clusteringTypes); - } - }; - } - - private static class NewFormatSerializer implements ISerializer<ClusteringPrefix> - { - private final Version version; - private final List<AbstractType<?>> clusteringTypes; - - NewFormatSerializer(Version version, List<AbstractType<?>> clusteringTypes) - { - this.version = version; - this.clusteringTypes = clusteringTypes; - } - - public void serialize(ClusteringPrefix clustering, DataOutputPlus out) throws IOException - { - ClusteringPrefix.serializer.serialize(clustering, out, version.correspondingMessagingVersion(), clusteringTypes); - } - - @Override - public void skip(DataInputPlus in) throws IOException - { - ClusteringPrefix.serializer.skip(in, version.correspondingMessagingVersion(), clusteringTypes); - } - - public ClusteringPrefix deserialize(DataInputPlus in) throws IOException - { - return ClusteringPrefix.serializer.deserialize(in, version.correspondingMessagingVersion(), clusteringTypes); - } - - public long serializedSize(ClusteringPrefix clustering) - { - return ClusteringPrefix.serializer.serializedSize(clustering, version.correspondingMessagingVersion(), clusteringTypes); - } - } -}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java index d87d277..1d6ec7b 100644 --- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java @@ -935,9 +935,9 @@ public class SinglePartitionReadCommand extends ReadCommand nowInSec()); } - public MessageOut<ReadCommand> createMessage(int version) + public MessageOut<ReadCommand> createMessage() { - return new MessageOut<>(MessagingService.Verb.READ, this, readSerializer); + return new MessageOut<>(MessagingService.Verb.READ, this, serializer); } protected void appendCQLWhereClause(StringBuilder sb) http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/db/SystemKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index 31a461b..91adf3a 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -102,16 +102,6 @@ public final class SystemKeyspace public static final String BUILT_VIEWS = "built_views"; public static final String PREPARED_STATEMENTS = "prepared_statements"; - @Deprecated public static final String LEGACY_HINTS = "hints"; - @Deprecated public static final String LEGACY_BATCHLOG = "batchlog"; - @Deprecated public static final String LEGACY_KEYSPACES = "schema_keyspaces"; - @Deprecated public static final String LEGACY_COLUMNFAMILIES = "schema_columnfamilies"; - @Deprecated public static final String LEGACY_COLUMNS = "schema_columns"; - @Deprecated public static final String LEGACY_TRIGGERS = "schema_triggers"; - @Deprecated public static final String LEGACY_USERTYPES = "schema_usertypes"; - @Deprecated public static final String LEGACY_FUNCTIONS = "schema_functions"; - @Deprecated public static final String LEGACY_AGGREGATES = "schema_aggregates"; - public static final CFMetaData Batches = compile(BATCHES, "batches awaiting replay", @@ -288,148 +278,6 @@ public final class SystemKeyspace + "query_string text," + "PRIMARY KEY ((prepared_id)))"); - @Deprecated - public static final CFMetaData LegacyHints = - compile(LEGACY_HINTS, - "*DEPRECATED* hints awaiting delivery", - "CREATE TABLE %s (" - + "target_id uuid," - + "hint_id timeuuid," - + "message_version int," - + "mutation blob," - + "PRIMARY KEY ((target_id), hint_id, message_version)) " - + "WITH COMPACT STORAGE") - .compaction(CompactionParams.scts(singletonMap("enabled", "false"))) - .gcGraceSeconds(0); - - @Deprecated - public static final CFMetaData LegacyBatchlog = - compile(LEGACY_BATCHLOG, - "*DEPRECATED* batchlog entries", - "CREATE TABLE %s (" - + "id uuid," - + "data blob," - + "version int," - + "written_at timestamp," - + "PRIMARY KEY ((id)))") - .compaction(CompactionParams.scts(singletonMap("min_threshold", "2"))) - .gcGraceSeconds(0); - - @Deprecated - public static final CFMetaData LegacyKeyspaces = - compile(LEGACY_KEYSPACES, - "*DEPRECATED* keyspace definitions", - "CREATE TABLE %s (" - + "keyspace_name text," - + "durable_writes boolean," - + "strategy_class text," - + "strategy_options text," - + "PRIMARY KEY ((keyspace_name))) " - + "WITH COMPACT STORAGE"); - - @Deprecated - public static final CFMetaData LegacyColumnfamilies = - compile(LEGACY_COLUMNFAMILIES, - "*DEPRECATED* table definitions", - "CREATE TABLE %s (" - + "keyspace_name text," - + "columnfamily_name text," - + "bloom_filter_fp_chance double," - + "caching text," - + "cf_id uuid," // post-2.1 UUID cfid - + "comment text," - + "compaction_strategy_class text," - + "compaction_strategy_options text," - + "comparator text," - + "compression_parameters text," - + "default_time_to_live int," - + "default_validator text," - + "dropped_columns map<text, bigint>," - + "gc_grace_seconds int," - + "is_dense boolean," - + "key_validator text," - + "local_read_repair_chance double," - + "max_compaction_threshold int," - + "max_index_interval int," - + "memtable_flush_period_in_ms int," - + "min_compaction_threshold int," - + "min_index_interval int," - + "read_repair_chance double," - + "speculative_retry text," - + "subcomparator text," - + "type text," - + "PRIMARY KEY ((keyspace_name), columnfamily_name))"); - - @Deprecated - public static final CFMetaData LegacyColumns = - compile(LEGACY_COLUMNS, - "*DEPRECATED* column definitions", - "CREATE TABLE %s (" - + "keyspace_name text," - + "columnfamily_name text," - + "column_name text," - + "component_index int," - + "index_name text," - + "index_options text," - + "index_type text," - + "type text," - + "validator text," - + "PRIMARY KEY ((keyspace_name), columnfamily_name, column_name))"); - - @Deprecated - public static final CFMetaData LegacyTriggers = - compile(LEGACY_TRIGGERS, - "*DEPRECATED* trigger definitions", - "CREATE TABLE %s (" - + "keyspace_name text," - + "columnfamily_name text," - + "trigger_name text," - + "trigger_options map<text, text>," - + "PRIMARY KEY ((keyspace_name), columnfamily_name, trigger_name))"); - - @Deprecated - public static final CFMetaData LegacyUsertypes = - compile(LEGACY_USERTYPES, - "*DEPRECATED* user defined type definitions", - "CREATE TABLE %s (" - + "keyspace_name text," - + "type_name text," - + "field_names list<text>," - + "field_types list<text>," - + "PRIMARY KEY ((keyspace_name), type_name))"); - - @Deprecated - public static final CFMetaData LegacyFunctions = - compile(LEGACY_FUNCTIONS, - "*DEPRECATED* user defined function definitions", - "CREATE TABLE %s (" - + "keyspace_name text," - + "function_name text," - + "signature frozen<list<text>>," - + "argument_names list<text>," - + "argument_types list<text>," - + "body text," - + "language text," - + "return_type text," - + "called_on_null_input boolean," - + "PRIMARY KEY ((keyspace_name), function_name, signature))"); - - @Deprecated - public static final CFMetaData LegacyAggregates = - compile(LEGACY_AGGREGATES, - "*DEPRECATED* user defined aggregate definitions", - "CREATE TABLE %s (" - + "keyspace_name text," - + "aggregate_name text," - + "signature frozen<list<text>>," - + "argument_types list<text>," - + "final_func text," - + "initcond blob," - + "return_type text," - + "state_func text," - + "state_type text," - + "PRIMARY KEY ((keyspace_name), aggregate_name, signature))"); - private static CFMetaData compile(String name, String description, String schema) { return CFMetaData.compile(String.format(schema, name), SchemaConstants.SYSTEM_KEYSPACE_NAME) @@ -457,16 +305,7 @@ public final class SystemKeyspace TransferredRanges, ViewsBuildsInProgress, BuiltViews, - LegacyHints, - LegacyBatchlog, - PreparedStatements, - LegacyKeyspaces, - LegacyColumnfamilies, - LegacyColumns, - LegacyTriggers, - LegacyUsertypes, - LegacyFunctions, - LegacyAggregates); + PreparedStatements); } private static Functions functions() @@ -1131,18 +970,27 @@ public final class SystemKeyspace if (results.isEmpty()) return new PaxosState(key, metadata); UntypedResultSet.Row row = results.one(); + + // Note: Pre-3.0, we used to not store the versions at which things were serialized. As 3.0 is a mandatory + // upgrade to 4.0+ and the paxos table is TTLed, it's _very_ unlikely we'll ever read a proposal or MRC without + // a version. But if we do (say gc_grace, on which the TTL is based, happens to be super large), we consider + // the commit too old and ignore it. + if (!row.has("proposal_version") || !row.has("most_recent_commit_version")) + return new PaxosState(key, metadata); + + Commit promised = row.has("in_progress_ballot") ? new Commit(row.getUUID("in_progress_ballot"), new PartitionUpdate(metadata, key, metadata.partitionColumns(), 1)) : Commit.emptyCommit(key, metadata); // either we have both a recently accepted ballot and update or we have neither - int proposalVersion = row.has("proposal_version") ? row.getInt("proposal_version") : MessagingService.VERSION_21; - Commit accepted = row.has("proposal") - ? new Commit(row.getUUID("proposal_ballot"), PartitionUpdate.fromBytes(row.getBytes("proposal"), proposalVersion, key)) + Commit accepted = row.has("proposal_version") && row.has("proposal") + ? new Commit(row.getUUID("proposal_ballot"), + PartitionUpdate.fromBytes(row.getBytes("proposal"), row.getInt("proposal_version"))) : Commit.emptyCommit(key, metadata); // either most_recent_commit and most_recent_commit_at will both be set, or neither - int mostRecentVersion = row.has("most_recent_commit_version") ? row.getInt("most_recent_commit_version") : MessagingService.VERSION_21; - Commit mostRecent = row.has("most_recent_commit") - ? new Commit(row.getUUID("most_recent_commit_at"), PartitionUpdate.fromBytes(row.getBytes("most_recent_commit"), mostRecentVersion, key)) + Commit mostRecent = row.has("most_recent_commit_version") && row.has("most_recent_commit") + ? new Commit(row.getUUID("most_recent_commit_at"), + PartitionUpdate.fromBytes(row.getBytes("most_recent_commit"), row.getInt("most_recent_commit_version"))) : Commit.emptyCommit(key, metadata); return new PaxosState(promised, accepted, mostRecent); } @@ -1404,45 +1252,17 @@ public final class SystemKeyspace return result.one().getString("release_version"); } - /** - * Check data directories for old files that can be removed when migrating from 2.1 or 2.2 to 3.0, - * these checks can be removed in 4.0, see CASSANDRA-7066 - */ - public static void migrateDataDirs() - { - Iterable<String> dirs = Arrays.asList(DatabaseDescriptor.getAllDataFileLocations()); - for (String dataDir : dirs) - { - logger.trace("Checking {} for old files", dataDir); - File dir = new File(dataDir); - assert dir.exists() : dir + " should have been created by startup checks"; - - for (File ksdir : dir.listFiles((d, n) -> new File(d, n).isDirectory())) - { - logger.trace("Checking {} for old files", ksdir); - - for (File cfdir : ksdir.listFiles((d, n) -> new File(d, n).isDirectory())) - { - logger.trace("Checking {} for old files", cfdir); - - if (Descriptor.isLegacyFile(cfdir)) - { - FileUtils.deleteRecursive(cfdir); - } - else - { - FileUtils.delete(cfdir.listFiles((d, n) -> Descriptor.isLegacyFile(new File(d, n)))); - } - } - } - } - } - private static ByteBuffer rangeToBytes(Range<Token> range) { try (DataOutputBuffer out = new DataOutputBuffer()) { - Range.tokenSerializer.serialize(range, out, MessagingService.VERSION_22); + // The format with which token ranges are serialized in the system tables is the pre-3.0 serialization + // formot for ranges, so we should maintain that for now. And while we don't really support pre-3.0 + // messaging versions, we know AbstractBounds.Serializer still support it _exactly_ for this use case, so we + // pass 0 as the version to trigger that legacy code. + // In the future, it might be worth switching to a stable text format for the ranges to 1) save that and 2) + // be more user friendly (the serialization format we currently use is pretty custom). + Range.tokenSerializer.serialize(range, out, 0); return out.buffer(); } catch (IOException e) @@ -1456,9 +1276,10 @@ public final class SystemKeyspace { try { + // See rangeToBytes above for why version is 0. return (Range<Token>) Range.tokenSerializer.deserialize(ByteStreams.newDataInput(ByteBufferUtil.getArray(rawRange)), partitioner, - MessagingService.VERSION_22); + 0); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java index 9e39105..c32a642 100644 --- a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java +++ b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java @@ -39,7 +39,7 @@ import org.apache.cassandra.net.MessagingService; * we don't do more work than necessary (i.e. we don't allocate/deserialize * objects for things we don't care about). */ -public abstract class UnfilteredDeserializer +public class UnfilteredDeserializer { private static final Logger logger = LoggerFactory.getLogger(UnfilteredDeserializer.class); @@ -47,32 +47,67 @@ public abstract class UnfilteredDeserializer protected final DataInputPlus in; protected final SerializationHelper helper; - protected UnfilteredDeserializer(CFMetaData metadata, - DataInputPlus in, - SerializationHelper helper) + private final ClusteringPrefix.Deserializer clusteringDeserializer; + private final SerializationHeader header; + + private int nextFlags; + private int nextExtendedFlags; + private boolean isReady; + private boolean isDone; + + private final Row.Builder builder; + + private UnfilteredDeserializer(CFMetaData metadata, + DataInputPlus in, + SerializationHeader header, + SerializationHelper helper) { this.metadata = metadata; this.in = in; this.helper = helper; + this.header = header; + this.clusteringDeserializer = new ClusteringPrefix.Deserializer(metadata.comparator, in, header); + this.builder = BTreeRow.sortedBuilder(); } public static UnfilteredDeserializer create(CFMetaData metadata, DataInputPlus in, SerializationHeader header, - SerializationHelper helper, - DeletionTime partitionDeletion, - boolean readAllAsDynamic) + SerializationHelper helper) { - if (helper.version >= MessagingService.VERSION_30) - return new CurrentDeserializer(metadata, in, header, helper); - else - return new OldFormatDeserializer(metadata, in, helper, partitionDeletion, readAllAsDynamic); + return new UnfilteredDeserializer(metadata, in, header, helper); } /** * Whether or not there is more atom to read. */ - public abstract boolean hasNext() throws IOException; + public boolean hasNext() throws IOException + { + if (isReady) + return true; + + prepareNext(); + return !isDone; + } + + private void prepareNext() throws IOException + { + if (isDone) + return; + + nextFlags = in.readUnsignedByte(); + if (UnfilteredSerializer.isEndOfPartition(nextFlags)) + { + isDone = true; + isReady = false; + return; + } + + nextExtendedFlags = UnfilteredSerializer.readExtendedFlags(in, nextFlags); + + clusteringDeserializer.prepare(nextFlags, nextExtendedFlags); + isReady = true; + } /** * Compare the provided bound to the next atom to read on disk. @@ -81,585 +116,68 @@ public abstract class UnfilteredDeserializer * comparison. Whenever we know what to do with this atom (read it or skip it), * readNext or skipNext should be called. */ - public abstract int compareNextTo(ClusteringBound bound) throws IOException; - - /** - * Returns whether the next atom is a row or not. - */ - public abstract boolean nextIsRow() throws IOException; - - /** - * Returns whether the next atom is the static row or not. - */ - public abstract boolean nextIsStatic() throws IOException; + public int compareNextTo(ClusteringBound bound) throws IOException + { + if (!isReady) + prepareNext(); - /** - * Returns the next atom. - */ - public abstract Unfiltered readNext() throws IOException; + assert !isDone; - /** - * Clears any state in this deserializer. - */ - public abstract void clearState() throws IOException; + return clusteringDeserializer.compareNextTo(bound); + } /** - * Skips the next atom. + * Returns whether the next atom is a row or not. */ - public abstract void skipNext() throws IOException; + public boolean nextIsRow() throws IOException + { + if (!isReady) + prepareNext(); + return UnfilteredSerializer.kind(nextFlags) == Unfiltered.Kind.ROW; + } /** - * For the legacy layout deserializer, we have to deal with the fact that a row can span multiple index blocks and that - * the call to hasNext() reads the next element upfront. We must take that into account when we check in AbstractSSTableIterator if - * we're past the end of an index block boundary as that check expect to account for only consumed data (that is, if hasNext has - * been called and made us cross an index boundary but neither readNext() or skipNext() as yet been called, we shouldn't consider - * the index block boundary crossed yet). - * - * TODO: we don't care about this for the current file format because a row can never span multiple index blocks (further, hasNext() - * only just basically read 2 bytes from disk in that case). So once we drop backward compatibility with pre-3.0 sstable, we should - * remove this. + * Returns the next atom. */ - public abstract long bytesReadForUnconsumedData(); - - private static class CurrentDeserializer extends UnfilteredDeserializer + public Unfiltered readNext() throws IOException { - private final ClusteringPrefix.Deserializer clusteringDeserializer; - private final SerializationHeader header; - - private int nextFlags; - private int nextExtendedFlags; - private boolean isReady; - private boolean isDone; - - private final Row.Builder builder; - - private CurrentDeserializer(CFMetaData metadata, - DataInputPlus in, - SerializationHeader header, - SerializationHelper helper) + isReady = false; + if (UnfilteredSerializer.kind(nextFlags) == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER) { - super(metadata, in, helper); - this.header = header; - this.clusteringDeserializer = new ClusteringPrefix.Deserializer(metadata.comparator, in, header); - this.builder = BTreeRow.sortedBuilder(); + ClusteringBoundOrBoundary bound = clusteringDeserializer.deserializeNextBound(); + return UnfilteredSerializer.serializer.deserializeMarkerBody(in, header, bound); } - - public boolean hasNext() throws IOException - { - if (isReady) - return true; - - prepareNext(); - return !isDone; - } - - private void prepareNext() throws IOException - { - if (isDone) - return; - - nextFlags = in.readUnsignedByte(); - if (UnfilteredSerializer.isEndOfPartition(nextFlags)) - { - isDone = true; - isReady = false; - return; - } - - nextExtendedFlags = UnfilteredSerializer.readExtendedFlags(in, nextFlags); - - clusteringDeserializer.prepare(nextFlags, nextExtendedFlags); - isReady = true; - } - - public int compareNextTo(ClusteringBound bound) throws IOException - { - if (!isReady) - prepareNext(); - - assert !isDone; - - return clusteringDeserializer.compareNextTo(bound); - } - - public boolean nextIsRow() throws IOException - { - if (!isReady) - prepareNext(); - - return UnfilteredSerializer.kind(nextFlags) == Unfiltered.Kind.ROW; - } - - public boolean nextIsStatic() throws IOException - { - // This exists only for the sake of the OldFormatDeserializer - throw new UnsupportedOperationException(); - } - - public Unfiltered readNext() throws IOException - { - isReady = false; - if (UnfilteredSerializer.kind(nextFlags) == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER) - { - ClusteringBoundOrBoundary bound = clusteringDeserializer.deserializeNextBound(); - return UnfilteredSerializer.serializer.deserializeMarkerBody(in, header, bound); - } - else - { - builder.newRow(clusteringDeserializer.deserializeNextClustering()); - return UnfilteredSerializer.serializer.deserializeRowBody(in, header, helper, nextFlags, nextExtendedFlags, builder); - } - } - - public void skipNext() throws IOException - { - isReady = false; - clusteringDeserializer.skipNext(); - if (UnfilteredSerializer.kind(nextFlags) == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER) - { - UnfilteredSerializer.serializer.skipMarkerBody(in); - } - else - { - UnfilteredSerializer.serializer.skipRowBody(in); - } - } - - public void clearState() - { - isReady = false; - isDone = false; - } - - public long bytesReadForUnconsumedData() + else { - // In theory, hasNext() does consume 2-3 bytes, but we don't care about this for the current file format so returning - // 0 to mean "do nothing". - return 0; + builder.newRow(clusteringDeserializer.deserializeNextClustering()); + return UnfilteredSerializer.serializer.deserializeRowBody(in, header, helper, nextFlags, nextExtendedFlags, builder); } } - public static class OldFormatDeserializer extends UnfilteredDeserializer + /** + * Clears any state in this deserializer. + */ + public void clearState() { - private final boolean readAllAsDynamic; - private boolean skipStatic; - - // The next Unfiltered to return, computed by hasNext() - private Unfiltered next; - // A temporary storage for an unfiltered that isn't returned next but should be looked at just afterwards - private Unfiltered saved; - - private boolean isFirst = true; - - // The Unfiltered as read from the old format input - private final UnfilteredIterator iterator; - - // The position in the input after the last data consumption (readNext/skipNext). - private long lastConsumedPosition; - - private OldFormatDeserializer(CFMetaData metadata, - DataInputPlus in, - SerializationHelper helper, - DeletionTime partitionDeletion, - boolean readAllAsDynamic) - { - super(metadata, in, helper); - this.iterator = new UnfilteredIterator(partitionDeletion); - this.readAllAsDynamic = readAllAsDynamic; - this.lastConsumedPosition = currentPosition(); - } - - public void setSkipStatic() - { - this.skipStatic = true; - } - - private boolean isStatic(Unfiltered unfiltered) - { - return unfiltered.isRow() && ((Row)unfiltered).isStatic(); - } - - public boolean hasNext() throws IOException - { - try - { - while (next == null) - { - if (saved == null && !iterator.hasNext()) - return false; - - next = saved == null ? iterator.next() : saved; - saved = null; - - // The sstable iterators assume that if there is one, the static row is the first thing this deserializer will return. - // However, in the old format, a range tombstone with an empty start would sort before any static cell. So we should - // detect that case and return the static parts first if necessary. - if (isFirst && iterator.hasNext() && isStatic(iterator.peek())) - { - saved = next; - next = iterator.next(); - } - isFirst = false; - - // When reading old tables, we sometimes want to skip static data (due to how staticly defined column of compact - // tables are handled). - if (skipStatic && isStatic(next)) - next = null; - } - return true; - } - catch (IOError e) - { - if (e.getCause() != null && e.getCause() instanceof IOException) - throw (IOException)e.getCause(); - throw e; - } - } - - private boolean isRow(LegacyLayout.LegacyAtom atom) - { - if (atom.isCell()) - return true; - - LegacyLayout.LegacyRangeTombstone tombstone = atom.asRangeTombstone(); - return tombstone.isCollectionTombstone() || tombstone.isRowDeletion(metadata); - } - - public int compareNextTo(ClusteringBound bound) throws IOException - { - if (!hasNext()) - throw new IllegalStateException(); - return metadata.comparator.compare(next.clustering(), bound); - } - - public boolean nextIsRow() throws IOException - { - if (!hasNext()) - throw new IllegalStateException(); - return next.isRow(); - } - - public boolean nextIsStatic() throws IOException - { - return nextIsRow() && ((Row)next).isStatic(); - } - - private long currentPosition() - { - // We return a bogus value if the input is not file based, but check we never rely - // on that value in that case in bytesReadForUnconsumedData - return in instanceof FileDataInput ? ((FileDataInput)in).getFilePointer() : 0; - } - - public Unfiltered readNext() throws IOException - { - if (!hasNext()) - throw new IllegalStateException(); - Unfiltered toReturn = next; - next = null; - lastConsumedPosition = currentPosition(); - return toReturn; - } - - public void skipNext() throws IOException - { - if (!hasNext()) - throw new UnsupportedOperationException(); - next = null; - lastConsumedPosition = currentPosition(); - } - - public long bytesReadForUnconsumedData() - { - if (!(in instanceof FileDataInput)) - throw new AssertionError(); - - return currentPosition() - lastConsumedPosition; - } - - public void clearState() - { - next = null; - saved = null; - iterator.clearState(); - lastConsumedPosition = currentPosition(); - } + isReady = false; + isDone = false; + } - // Groups atoms from the input into proper Unfiltered. - // Note: this could use guava AbstractIterator except that we want to be able to clear - // the internal state of the iterator so it's cleaner to do it ourselves. - private class UnfilteredIterator implements PeekingIterator<Unfiltered> + /** + * Skips the next atom. + */ + public void skipNext() throws IOException + { + isReady = false; + clusteringDeserializer.skipNext(); + if (UnfilteredSerializer.kind(nextFlags) == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER) { - private final AtomIterator atoms; - private final LegacyLayout.CellGrouper grouper; - private final TombstoneTracker tombstoneTracker; - - private Unfiltered next; - - private UnfilteredIterator(DeletionTime partitionDeletion) - { - this.grouper = new LegacyLayout.CellGrouper(metadata, helper); - this.tombstoneTracker = new TombstoneTracker(partitionDeletion); - this.atoms = new AtomIterator(); - } - - public boolean hasNext() - { - // Note that we loop on next == null because TombstoneTracker.openNew() could return null below or the atom might be shadowed. - while (next == null) - { - if (atoms.hasNext()) - { - // If a range tombstone closes strictly before the next row/RT, we need to return that close (or boundary) marker first. - if (tombstoneTracker.hasClosingMarkerBefore(atoms.peek())) - { - next = tombstoneTracker.popClosingMarker(); - } - else - { - LegacyLayout.LegacyAtom atom = atoms.next(); - if (!tombstoneTracker.isShadowed(atom)) - next = isRow(atom) ? readRow(atom) : tombstoneTracker.openNew(atom.asRangeTombstone()); - } - } - else if (tombstoneTracker.hasOpenTombstones()) - { - next = tombstoneTracker.popClosingMarker(); - } - else - { - return false; - } - } - return true; - } - - private Unfiltered readRow(LegacyLayout.LegacyAtom first) - { - LegacyLayout.CellGrouper grouper = first.isStatic() - ? LegacyLayout.CellGrouper.staticGrouper(metadata, helper) - : this.grouper; - grouper.reset(); - grouper.addAtom(first); - // As long as atoms are part of the same row, consume them. Note that the call to addAtom() uses - // atoms.peek() so that the atom is only consumed (by next) if it's part of the row (addAtom returns true) - while (atoms.hasNext() && grouper.addAtom(atoms.peek())) - { - atoms.next(); - } - return grouper.getRow(); - } - - public Unfiltered next() - { - if (!hasNext()) - throw new UnsupportedOperationException(); - Unfiltered toReturn = next; - next = null; - return toReturn; - } - - public Unfiltered peek() - { - if (!hasNext()) - throw new UnsupportedOperationException(); - return next; - } - - public void clearState() - { - atoms.clearState(); - tombstoneTracker.clearState(); - next = null; - } - - public void remove() - { - throw new UnsupportedOperationException(); - } + UnfilteredSerializer.serializer.skipMarkerBody(in); } - - // Wraps the input of the deserializer to provide an iterator (and skip shadowed atoms). - // Note: this could use guava AbstractIterator except that we want to be able to clear - // the internal state of the iterator so it's cleaner to do it ourselves. - private class AtomIterator implements PeekingIterator<LegacyLayout.LegacyAtom> - { - private boolean isDone; - private LegacyLayout.LegacyAtom next; - - private AtomIterator() - { - } - - public boolean hasNext() - { - if (isDone) - return false; - - if (next == null) - { - next = readAtom(); - if (next == null) - { - isDone = true; - return false; - } - } - return true; - } - - private LegacyLayout.LegacyAtom readAtom() - { - try - { - return LegacyLayout.readLegacyAtom(metadata, in, readAllAsDynamic); - } - catch (IOException e) - { - throw new IOError(e); - } - } - - public LegacyLayout.LegacyAtom next() - { - if (!hasNext()) - throw new UnsupportedOperationException(); - LegacyLayout.LegacyAtom toReturn = next; - next = null; - return toReturn; - } - - public LegacyLayout.LegacyAtom peek() - { - if (!hasNext()) - throw new UnsupportedOperationException(); - return next; - } - - public void clearState() - { - this.next = null; - this.isDone = false; - } - - public void remove() - { - throw new UnsupportedOperationException(); - } - } - - /** - * Tracks which range tombstones are open when deserializing the old format. - */ - private class TombstoneTracker + else { - private final DeletionTime partitionDeletion; - - // Open tombstones sorted by their closing bound (i.e. first tombstone is the first to close). - // As we only track non-fully-shadowed ranges, the first range is necessarily the currently - // open tombstone (the one with the higher timestamp). - private final SortedSet<LegacyLayout.LegacyRangeTombstone> openTombstones; - - public TombstoneTracker(DeletionTime partitionDeletion) - { - this.partitionDeletion = partitionDeletion; - this.openTombstones = new TreeSet<>((rt1, rt2) -> metadata.comparator.compare(rt1.stop.bound, rt2.stop.bound)); - } - - /** - * Checks if the provided atom is fully shadowed by the open tombstones of this tracker (or the partition deletion). - */ - public boolean isShadowed(LegacyLayout.LegacyAtom atom) - { - assert !hasClosingMarkerBefore(atom); - long timestamp = atom.isCell() ? atom.asCell().timestamp : atom.asRangeTombstone().deletionTime.markedForDeleteAt(); - - if (partitionDeletion.deletes(timestamp)) - return true; - - SortedSet<LegacyLayout.LegacyRangeTombstone> coveringTombstones = isRow(atom) ? openTombstones : openTombstones.tailSet(atom.asRangeTombstone()); - return Iterables.any(coveringTombstones, tombstone -> tombstone.deletionTime.deletes(timestamp)); - } - - /** - * Whether the currently open marker closes stricly before the provided row/RT. - */ - public boolean hasClosingMarkerBefore(LegacyLayout.LegacyAtom atom) - { - return !openTombstones.isEmpty() - && metadata.comparator.compare(openTombstones.first().stop.bound, atom.clustering()) < 0; - } - - /** - * Returns the unfiltered corresponding to closing the currently open marker (and update the tracker accordingly). - */ - public Unfiltered popClosingMarker() - { - assert !openTombstones.isEmpty(); - - Iterator<LegacyLayout.LegacyRangeTombstone> iter = openTombstones.iterator(); - LegacyLayout.LegacyRangeTombstone first = iter.next(); - iter.remove(); - - // If that was the last open tombstone, we just want to close it. Otherwise, we have a boundary with the - // next tombstone - if (!iter.hasNext()) - return new RangeTombstoneBoundMarker(first.stop.bound, first.deletionTime); - - LegacyLayout.LegacyRangeTombstone next = iter.next(); - return RangeTombstoneBoundaryMarker.makeBoundary(false, first.stop.bound, first.stop.bound.invert(), first.deletionTime, next.deletionTime); - } - - /** - * Update the tracker given the provided newly open tombstone. This return the Unfiltered corresponding to the opening - * of said tombstone: this can be a simple open mark, a boundary (if there was an open tombstone superseded by this new one) - * or even null (if the new tombston start is supersedes by the currently open tombstone). - * - * Note that this method assume the added tombstone is not fully shadowed, i.e. that !isShadowed(tombstone). It also - * assumes no opened tombstone closes before that tombstone (so !hasClosingMarkerBefore(tombstone)). - */ - public Unfiltered openNew(LegacyLayout.LegacyRangeTombstone tombstone) - { - if (openTombstones.isEmpty()) - { - openTombstones.add(tombstone); - return new RangeTombstoneBoundMarker(tombstone.start.bound, tombstone.deletionTime); - } - - Iterator<LegacyLayout.LegacyRangeTombstone> iter = openTombstones.iterator(); - LegacyLayout.LegacyRangeTombstone first = iter.next(); - if (tombstone.deletionTime.supersedes(first.deletionTime)) - { - // We're supperseding the currently open tombstone, so we should produce a boundary that close the currently open - // one and open the new one. We should also add the tombstone, but if it stop after the first one, we should - // also remove that first tombstone as it won't be useful anymore. - if (metadata.comparator.compare(tombstone.stop.bound, first.stop.bound) >= 0) - iter.remove(); - - openTombstones.add(tombstone); - return RangeTombstoneBoundaryMarker.makeBoundary(false, tombstone.start.bound.invert(), tombstone.start.bound, first.deletionTime, tombstone.deletionTime); - } - else - { - // If the new tombstone don't supersedes the currently open tombstone, we don't have anything to return, we - // just add the new tombstone (because we know tombstone is not fully shadowed, this imply the new tombstone - // simply extend after the first one and we'll deal with it later) - assert metadata.comparator.compare(tombstone.start.bound, first.stop.bound) > 0; - openTombstones.add(tombstone); - return null; - } - } - - public boolean hasOpenTombstones() - { - return !openTombstones.isEmpty(); - } - - public void clearState() - { - openTombstones.clear(); - } + UnfilteredSerializer.serializer.skipRowBody(in); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java index f7e614a..d435832 100644 --- a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java +++ b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java @@ -89,12 +89,6 @@ public abstract class AbstractSSTableIterator implements UnfilteredRowIterator // - we're querying static columns. boolean needSeekAtPartitionStart = !indexEntry.isIndexed() || !columns.fetchedColumns().statics.isEmpty(); - // For CQL queries on static compact tables, we only want to consider static value (only those are exposed), - // but readStaticRow have already read them and might in fact have consumed the whole partition (when reading - // the legacy file format), so set the reader to null so we don't try to read anything more. We can remove this - // once we drop support for the legacy file format - boolean needsReader = sstable.descriptor.version.storeRows() || isForThrift || !sstable.metadata.isStaticCompactTable(); - if (needSeekAtPartitionStart) { // Not indexed (or is reading static), set to the beginning of the partition and read partition level deletion there @@ -108,14 +102,14 @@ public abstract class AbstractSSTableIterator implements UnfilteredRowIterator // Note that this needs to be called after file != null and after the partitionDeletion has been set, but before readStaticRow // (since it uses it) so we can't move that up (but we'll be able to simplify as soon as we drop support for the old file format). - this.reader = needsReader ? createReader(indexEntry, file, shouldCloseFile) : null; - this.staticRow = readStaticRow(sstable, file, helper, columns.fetchedColumns().statics, isForThrift, reader == null ? null : reader.deserializer); + this.reader = createReader(indexEntry, file, shouldCloseFile); + this.staticRow = readStaticRow(sstable, file, helper, columns.fetchedColumns().statics, isForThrift, reader.deserializer); } else { this.partitionLevelDeletion = indexEntry.deletionTime(); this.staticRow = Rows.EMPTY_STATIC_ROW; - this.reader = needsReader ? createReader(indexEntry, file, shouldCloseFile) : null; + this.reader = createReader(indexEntry, file, shouldCloseFile); } if (reader != null && !slices.isEmpty()) @@ -168,33 +162,6 @@ public abstract class AbstractSSTableIterator implements UnfilteredRowIterator boolean isForThrift, UnfilteredDeserializer deserializer) throws IOException { - if (!sstable.descriptor.version.storeRows()) - { - if (!sstable.metadata.isCompactTable()) - { - assert deserializer != null; - return deserializer.hasNext() && deserializer.nextIsStatic() - ? (Row)deserializer.readNext() - : Rows.EMPTY_STATIC_ROW; - } - - // For compact tables, we use statics for the "column_metadata" definition. However, in the old format, those - // "column_metadata" are intermingled as any other "cell". In theory, this means that we'd have to do a first - // pass to extract the static values. However, for thrift, we'll use the ThriftResultsMerger right away which - // will re-merge static values with dynamic ones, so we can just ignore static and read every cell as a - // "dynamic" one. For CQL, if the table is a "static compact", then is has only static columns exposed and no - // dynamic ones. So we do a pass to extract static columns here, but will have no more work to do. Otherwise, - // the table won't have static columns. - if (statics.isEmpty() || isForThrift) - return Rows.EMPTY_STATIC_ROW; - - assert sstable.metadata.isStaticCompactTable(); - - // As said above, if it's a CQL query and the table is a "static compact", the only exposed columns are the - // static ones. So we don't have to mark the position to seek back later. - return LegacyLayout.extractStaticColumns(sstable.metadata, file, statics); - } - if (!sstable.header.hasStatic()) return Rows.EMPTY_STATIC_ROW; @@ -345,7 +312,7 @@ public abstract class AbstractSSTableIterator implements UnfilteredRowIterator private void createDeserializer() { assert file != null && deserializer == null; - deserializer = UnfilteredDeserializer.create(sstable.metadata, file, sstable.header, helper, partitionLevelDeletion, isForThrift); + deserializer = UnfilteredDeserializer.create(sstable.metadata, file, sstable.header, helper); } protected void seekToPosition(long position) throws IOException @@ -550,8 +517,7 @@ public abstract class AbstractSSTableIterator implements UnfilteredRowIterator public boolean isPastCurrentBlock() throws IOException { assert reader.deserializer != null; - long correction = reader.deserializer.bytesReadForUnconsumedData(); - return reader.file.bytesPastMark(mark) - correction >= currentIndex().width; + return reader.file.bytesPastMark(mark) >= currentIndex().width; } public int currentBlockIdx() http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java index b3c2e94..aa0a390 100644 --- a/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java +++ b/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java @@ -257,12 +257,10 @@ public class SSTableIterator extends AbstractSSTableIterator // so if currentIdx == lastBlockIdx and slice.end < indexes[currentIdx].firstName, we're guaranteed that the // whole slice is between the previous block end and this block start, and thus has no corresponding // data. One exception is if the previous block ends with an openMarker as it will cover our slice - // and we need to return it (we also don't skip the slice for the old format because we didn't have the openMarker - // info in that case and can't rely on this optimization). + // and we need to return it. if (indexState.currentBlockIdx() == lastBlockIdx && metadata().comparator.compare(slice.end(), indexState.currentIndex().firstName) < 0 - && openMarker == null - && sstable.descriptor.version.storeRows()) + && openMarker == null) { sliceDone = true; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java index c74b5db..ca0cce2 100644 --- a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java +++ b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java @@ -310,23 +310,7 @@ public class SSTableReversedIterator extends AbstractSSTableIterator int currentBlock = indexState.currentBlockIdx(); boolean canIncludeSliceStart = currentBlock == lastBlockIdx; - - // When dealing with old format sstable, we have the problem that a row can span 2 index block, i.e. it can - // start at the end of a block and end at the beginning of the next one. That's not a problem per se for - // UnfilteredDeserializer.OldFormatSerializer, since it always read rows entirely, even if they span index - // blocks, but as we reading index block in reverse we must be careful to not read the end of the row at - // beginning of a block before we're reading the beginning of that row. So what we do is that if we detect - // that the row starting this block is also the row ending the previous one, we skip that first result and - // let it be read when we'll read the previous block. - boolean includeFirst = true; - if (!sstable.descriptor.version.storeRows() && currentBlock > 0) - { - ClusteringPrefix lastOfPrevious = indexState.index(currentBlock - 1).lastName; - ClusteringPrefix firstOfCurrent = indexState.index(currentBlock).firstName; - includeFirst = metadata().comparator.compare(lastOfPrevious, firstOfCurrent) != 0; - } - - loadFromDisk(canIncludeSliceStart ? slice.start() : null, canIncludeSliceEnd ? slice.end() : null, includeFirst); + loadFromDisk(canIncludeSliceStart ? slice.start() : null, canIncludeSliceEnd ? slice.end() : null, true); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java index a30ca0e..c7c5971 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java @@ -233,7 +233,7 @@ public class CommitLogArchiver throw new IllegalStateException("Cannot safely construct descriptor for segment, either from its name or its header: " + fromFile.getPath()); else if (fromHeader != null && fromName != null && !fromHeader.equalsIgnoringCompression(fromName)) throw new IllegalStateException(String.format("Cannot safely construct descriptor for segment, as name and header descriptors do not match (%s vs %s): %s", fromHeader, fromName, fromFile.getPath())); - else if (fromName != null && fromHeader == null && fromName.version >= CommitLogDescriptor.VERSION_21) + else if (fromName != null && fromHeader == null) throw new IllegalStateException("Cannot safely construct descriptor for segment, as name descriptor implies a version that should contain a header descriptor, but that descriptor could not be read: " + fromFile.getPath()); else if (fromHeader != null) descriptor = fromHeader; http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java index 088d44a..0ab191d 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java @@ -57,10 +57,7 @@ public class CommitLogDescriptor static final String COMPRESSION_PARAMETERS_KEY = "compressionParameters"; static final String COMPRESSION_CLASS_KEY = "compressionClass"; - public static final int VERSION_12 = 2; - public static final int VERSION_20 = 3; - public static final int VERSION_21 = 4; - public static final int VERSION_22 = 5; + // We don't support anything pre-3.0 public static final int VERSION_30 = 6; /** @@ -104,20 +101,15 @@ public class CommitLogDescriptor out.putLong(descriptor.id); updateChecksumInt(crc, (int) (descriptor.id & 0xFFFFFFFFL)); updateChecksumInt(crc, (int) (descriptor.id >>> 32)); - if (descriptor.version >= VERSION_22) - { - String parametersString = constructParametersString(descriptor.compression, descriptor.encryptionContext, additionalHeaders); - byte[] parametersBytes = parametersString.getBytes(StandardCharsets.UTF_8); - if (parametersBytes.length != (((short) parametersBytes.length) & 0xFFFF)) - throw new ConfigurationException(String.format("Compression parameters too long, length %d cannot be above 65535.", - parametersBytes.length)); - out.putShort((short) parametersBytes.length); - updateChecksumInt(crc, parametersBytes.length); - out.put(parametersBytes); - crc.update(parametersBytes, 0, parametersBytes.length); - } - else - assert descriptor.compression == null; + String parametersString = constructParametersString(descriptor.compression, descriptor.encryptionContext, additionalHeaders); + byte[] parametersBytes = parametersString.getBytes(StandardCharsets.UTF_8); + if (parametersBytes.length != (((short) parametersBytes.length) & 0xFFFF)) + throw new ConfigurationException(String.format("Compression parameters too long, length %d cannot be above 65535.", + parametersBytes.length)); + out.putShort((short) parametersBytes.length); + updateChecksumInt(crc, parametersBytes.length); + out.put(parametersBytes); + crc.update(parametersBytes, 0, parametersBytes.length); out.putInt((int) crc.getValue()); } @@ -157,16 +149,15 @@ public class CommitLogDescriptor { CRC32 checkcrc = new CRC32(); int version = input.readInt(); + if (version < VERSION_30) + throw new IllegalArgumentException("Unsupported pre-3.0 commit log found; cannot read."); + updateChecksumInt(checkcrc, version); long id = input.readLong(); updateChecksumInt(checkcrc, (int) (id & 0xFFFFFFFFL)); updateChecksumInt(checkcrc, (int) (id >>> 32)); - int parametersLength = 0; - if (version >= VERSION_22) - { - parametersLength = input.readShort() & 0xFFFF; - updateChecksumInt(checkcrc, parametersLength); - } + int parametersLength = input.readShort() & 0xFFFF; + updateChecksumInt(checkcrc, parametersLength); // This should always succeed as parametersLength cannot be too long even for a // corrupt segment file. byte[] parametersBytes = new byte[parametersLength]; @@ -213,14 +204,6 @@ public class CommitLogDescriptor { switch (version) { - case VERSION_12: - return MessagingService.VERSION_12; - case VERSION_20: - return MessagingService.VERSION_20; - case VERSION_21: - return MessagingService.VERSION_21; - case VERSION_22: - return MessagingService.VERSION_22; case VERSION_30: return MessagingService.VERSION_30; default: http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java index e6e2e1a..eb745c7 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java @@ -122,19 +122,6 @@ public class CommitLogReader try(RandomAccessReader reader = RandomAccessReader.open(file)) { - if (desc.version < CommitLogDescriptor.VERSION_21) - { - if (!shouldSkipSegmentId(file, desc, minPosition)) - { - if (minPosition.segmentId == desc.id) - reader.seek(minPosition.position); - ReadStatusTracker statusTracker = new ReadStatusTracker(mutationLimit, tolerateTruncation); - statusTracker.errorContext = desc.fileName(); - readSection(handler, reader, minPosition, (int) reader.length(), statusTracker, desc); - } - return; - } - final long segmentIdFromFilename = desc.id; try { @@ -430,42 +417,17 @@ public class CommitLogReader { public static long calculateClaimedChecksum(FileDataInput input, int commitLogVersion) throws IOException { - switch (commitLogVersion) - { - case CommitLogDescriptor.VERSION_12: - case CommitLogDescriptor.VERSION_20: - return input.readLong(); - // Changed format in 2.1 - default: - return input.readInt() & 0xffffffffL; - } + return input.readInt() & 0xffffffffL; } public static void updateChecksum(CRC32 checksum, int serializedSize, int commitLogVersion) { - switch (commitLogVersion) - { - case CommitLogDescriptor.VERSION_12: - checksum.update(serializedSize); - break; - // Changed format in 2.0 - default: - updateChecksumInt(checksum, serializedSize); - break; - } + updateChecksumInt(checksum, serializedSize); } public static long calculateClaimedCRC32(FileDataInput input, int commitLogVersion) throws IOException { - switch (commitLogVersion) - { - case CommitLogDescriptor.VERSION_12: - case CommitLogDescriptor.VERSION_20: - return input.readLong(); - // Changed format in 2.1 - default: - return input.readInt() & 0xffffffffL; - } + return input.readInt() & 0xffffffffL; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index a22cda5..78650f1 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -1242,7 +1242,7 @@ public class CompactionManager implements CompactionManagerMBean header = SerializationHeader.make(sstable.metadata, Collections.singleton(sstable)); return SSTableWriter.create(cfs.metadata, - Descriptor.fromFilename(cfs.getSSTablePath(compactionFileLocation)), + cfs.newSSTableDescriptor(compactionFileLocation), expectedBloomFilterSize, repairedAt, sstable.getSSTableLevel(), @@ -1274,7 +1274,7 @@ public class CompactionManager implements CompactionManagerMBean break; } } - return SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(compactionFileLocation)), + return SSTableWriter.create(cfs.newSSTableDescriptor(compactionFileLocation), (long) expectedBloomFilterSize, repairedAt, cfs.metadata, http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/db/compaction/Upgrader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java index 7a5b719..aedb208 100644 --- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java +++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java @@ -70,7 +70,7 @@ public class Upgrader { MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.getComparator()); sstableMetadataCollector.sstableLevel(sstable.getSSTableLevel()); - return SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(directory)), + return SSTableWriter.create(cfs.newSSTableDescriptor(directory), estimatedRows, repairedAt, cfs.metadata, http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/db/compaction/Verifier.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/Verifier.java b/src/java/org/apache/cassandra/db/compaction/Verifier.java index df659e4..a52dd82 100644 --- a/src/java/org/apache/cassandra/db/compaction/Verifier.java +++ b/src/java/org/apache/cassandra/db/compaction/Verifier.java @@ -97,8 +97,7 @@ public class Verifier implements Closeable { validator = null; - if (sstable.descriptor.digestComponent != null && - new File(sstable.descriptor.filenameFor(sstable.descriptor.digestComponent)).exists()) + if (new File(sstable.descriptor.filenameFor(Component.DIGEST)).exists()) { validator = DataIntegrityMetadata.fileDigestValidator(sstable.descriptor); validator.validate(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java index f8ecd87..d279321 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java @@ -69,7 +69,7 @@ public class DefaultCompactionWriter extends CompactionAwareWriter public void switchCompactionLocation(Directories.DataDirectory directory) { @SuppressWarnings("resource") - SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(getDirectories().getLocationForDisk(directory))), + SSTableWriter writer = SSTableWriter.create(cfs.newSSTableDescriptor(getDirectories().getLocationForDisk(directory)), estimatedTotalKeys, minRepairedAt, cfs.metadata, http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java index 0beb505..a3d8c98 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java @@ -105,7 +105,7 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter { this.sstableDirectory = location; averageEstimatedKeysPerSSTable = Math.round(((double) averageEstimatedKeysPerSSTable * sstablesWritten + partitionsWritten) / (sstablesWritten + 1)); - sstableWriter.switchWriter(SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(getDirectories().getLocationForDisk(sstableDirectory))), + sstableWriter.switchWriter(SSTableWriter.create(cfs.newSSTableDescriptor(getDirectories().getLocationForDisk(sstableDirectory)), keysPerSSTable, minRepairedAt, cfs.metadata, http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java index 864185e..7acb870 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java @@ -108,7 +108,7 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter { sstableDirectory = location; @SuppressWarnings("resource") - SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(getDirectories().getLocationForDisk(sstableDirectory))), + SSTableWriter writer = SSTableWriter.create(cfs.newSSTableDescriptor(getDirectories().getLocationForDisk(sstableDirectory)), estimatedTotalKeys / estimatedSSTables, minRepairedAt, cfs.metadata, http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java index 46cb891..a01672e 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java @@ -104,7 +104,7 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter this.location = location; long currentPartitionsToWrite = Math.round(ratios[currentRatioIndex] * estimatedTotalKeys); @SuppressWarnings("resource") - SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(getDirectories().getLocationForDisk(location))), + SSTableWriter writer = SSTableWriter.create(cfs.newSSTableDescriptor(getDirectories().getLocationForDisk(location)), currentPartitionsToWrite, minRepairedAt, cfs.metadata,