http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/db/filter/RowFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/RowFilter.java b/src/java/org/apache/cassandra/db/filter/RowFilter.java index 4c0608f..5baf783 100644 --- a/src/java/org/apache/cassandra/db/filter/RowFilter.java +++ b/src/java/org/apache/cassandra/db/filter/RowFilter.java @@ -509,16 +509,12 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression> { public void serialize(Expression expression, DataOutputPlus out, int version) throws IOException { - if (version >= MessagingService.VERSION_30) - out.writeByte(expression.kind().ordinal()); + out.writeByte(expression.kind().ordinal()); // Custom expressions include neither a column or operator, but all - // other expressions do. Also, custom expressions are 3.0+ only, so - // the column & operator will always be the first things written for - // any pre-3.0 version + // other expressions do. if (expression.kind() == Kind.CUSTOM) { - assert version >= MessagingService.VERSION_30; IndexMetadata.serializer.serialize(((CustomExpression)expression).targetIndex, out, version); ByteBufferUtil.writeWithShortLength(expression.value, out); return; @@ -526,7 +522,6 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression> if (expression.kind() == Kind.USER) { - assert version >= MessagingService.VERSION_30; UserExpression.serialize((UserExpression)expression, out, version); return; } @@ -541,15 +536,8 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression> break; case MAP_EQUALITY: MapEqualityExpression mexpr = (MapEqualityExpression)expression; - if (version < MessagingService.VERSION_30) - { - ByteBufferUtil.writeWithShortLength(mexpr.getIndexValue(), out); - } - else - { - ByteBufferUtil.writeWithShortLength(mexpr.key, out); - ByteBufferUtil.writeWithShortLength(mexpr.value, out); - } + ByteBufferUtil.writeWithShortLength(mexpr.key, out); + ByteBufferUtil.writeWithShortLength(mexpr.value, out); break; case THRIFT_DYN_EXPR: ByteBufferUtil.writeWithShortLength(((ThriftExpression)expression).value, out); @@ -559,62 +547,33 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression> public Expression deserialize(DataInputPlus in, int version, CFMetaData metadata) throws IOException { - Kind kind = null; - ByteBuffer name; - Operator operator; - ColumnDefinition column; + Kind kind = Kind.values()[in.readByte()]; - if (version >= MessagingService.VERSION_30) + // custom expressions (3.0+ only) do not contain a column or operator, only a value + if (kind == Kind.CUSTOM) { - kind = Kind.values()[in.readByte()]; - // custom expressions (3.0+ only) do not contain a column or operator, only a value - if (kind == Kind.CUSTOM) - { - return new CustomExpression(metadata, - IndexMetadata.serializer.deserialize(in, version, metadata), - ByteBufferUtil.readWithShortLength(in)); - } - - if (kind == Kind.USER) - { - return UserExpression.deserialize(in, version, metadata); - } + return new CustomExpression(metadata, + IndexMetadata.serializer.deserialize(in, version, metadata), + ByteBufferUtil.readWithShortLength(in)); } - name = ByteBufferUtil.readWithShortLength(in); - operator = Operator.readFrom(in); - column = metadata.getColumnDefinition(name); + if (kind == Kind.USER) + return UserExpression.deserialize(in, version, metadata); + + ByteBuffer name = ByteBufferUtil.readWithShortLength(in); + Operator operator = Operator.readFrom(in); + ColumnDefinition column = metadata.getColumnDefinition(name); + if (!metadata.isCompactTable() && column == null) throw new RuntimeException("Unknown (or dropped) column " + UTF8Type.instance.getString(name) + " during deserialization"); - if (version < MessagingService.VERSION_30) - { - if (column == null) - kind = Kind.THRIFT_DYN_EXPR; - else if (column.type instanceof MapType && operator == Operator.EQ) - kind = Kind.MAP_EQUALITY; - else - kind = Kind.SIMPLE; - } - - assert kind != null; switch (kind) { case SIMPLE: return new SimpleExpression(column, operator, ByteBufferUtil.readWithShortLength(in)); case MAP_EQUALITY: - ByteBuffer key, value; - if (version < MessagingService.VERSION_30) - { - ByteBuffer composite = ByteBufferUtil.readWithShortLength(in); - key = CompositeType.extractComponent(composite, 0); - value = CompositeType.extractComponent(composite, 0); - } - else - { - key = ByteBufferUtil.readWithShortLength(in); - value = ByteBufferUtil.readWithShortLength(in); - } + ByteBuffer key = ByteBufferUtil.readWithShortLength(in); + ByteBuffer value = ByteBufferUtil.readWithShortLength(in); return new MapEqualityExpression(column, key, operator, value); case THRIFT_DYN_EXPR: return new ThriftExpression(metadata, name, operator, ByteBufferUtil.readWithShortLength(in)); @@ -622,16 +581,12 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression> throw new AssertionError(); } - public long serializedSize(Expression expression, int version) { - // version 3.0+ includes a byte for Kind - long size = version >= MessagingService.VERSION_30 ? 1 : 0; + long size = 1; // kind byte // Custom expressions include neither a column or operator, but all - // other expressions do. Also, custom expressions are 3.0+ only, so - // the column & operator will always be the first things written for - // any pre-3.0 version + // other expressions do. if (expression.kind() != Kind.CUSTOM && expression.kind() != Kind.USER) size += ByteBufferUtil.serializedSizeWithShortLength(expression.column().name.bytes) + expression.operator.serializedSize(); @@ -643,23 +598,19 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression> break; case MAP_EQUALITY: MapEqualityExpression mexpr = (MapEqualityExpression)expression; - if (version < MessagingService.VERSION_30) - size += ByteBufferUtil.serializedSizeWithShortLength(mexpr.getIndexValue()); - else - size += ByteBufferUtil.serializedSizeWithShortLength(mexpr.key) - + ByteBufferUtil.serializedSizeWithShortLength(mexpr.value); + size += ByteBufferUtil.serializedSizeWithShortLength(mexpr.key) + + ByteBufferUtil.serializedSizeWithShortLength(mexpr.value); break; case THRIFT_DYN_EXPR: size += ByteBufferUtil.serializedSizeWithShortLength(((ThriftExpression)expression).value); break; case CUSTOM: - if (version >= MessagingService.VERSION_30) - size += IndexMetadata.serializer.serializedSize(((CustomExpression)expression).targetIndex, version) - + ByteBufferUtil.serializedSizeWithShortLength(expression.value); + size += IndexMetadata.serializer.serializedSize(((CustomExpression)expression).targetIndex, version) + + ByteBufferUtil.serializedSizeWithShortLength(expression.value); break; case USER: - if (version >= MessagingService.VERSION_30) - size += UserExpression.serializedSize((UserExpression)expression, version); + size += UserExpression.serializedSize((UserExpression)expression, version); + break; } return size; }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java index b95a310..1ed961f 100644 --- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java +++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java @@ -238,12 +238,10 @@ public class PartitionUpdate extends AbstractBTreePartition * * @param bytes the byte buffer that contains the serialized update. * @param version the version with which the update is serialized. - * @param key the partition key for the update. This is only used if {@code version < 3.0} - * and can be {@code null} otherwise. * * @return the deserialized update or {@code null} if {@code bytes == null}. */ - public static PartitionUpdate fromBytes(ByteBuffer bytes, int version, DecoratedKey key) + public static PartitionUpdate fromBytes(ByteBuffer bytes, int version) { if (bytes == null) return null; @@ -252,8 +250,7 @@ public class PartitionUpdate extends AbstractBTreePartition { return serializer.deserialize(new DataInputBuffer(bytes, true), version, - SerializationHelper.Flag.LOCAL, - version < MessagingService.VERSION_30 ? key : null); + SerializationHelper.Flag.LOCAL); } catch (IOException e) { @@ -780,47 +777,12 @@ public class PartitionUpdate extends AbstractBTreePartition { assert !iter.isReverseOrder(); - if (version < MessagingService.VERSION_30) - { - LegacyLayout.serializeAsLegacyPartition(null, iter, out, version); - } - else - { - CFMetaData.serializer.serialize(update.metadata(), out, version); - UnfilteredRowIteratorSerializer.serializer.serialize(iter, null, out, version, update.rowCount()); - } + CFMetaData.serializer.serialize(update.metadata(), out, version); + UnfilteredRowIteratorSerializer.serializer.serialize(iter, null, out, version, update.rowCount()); } } - public PartitionUpdate deserialize(DataInputPlus in, int version, SerializationHelper.Flag flag, ByteBuffer key) throws IOException - { - if (version >= MessagingService.VERSION_30) - { - assert key == null; // key is only there for the old format - return deserialize30(in, version, flag); - } - else - { - assert key != null; - return deserializePre30(in, version, flag, key); - } - } - - // Used to share same decorated key between updates. - public PartitionUpdate deserialize(DataInputPlus in, int version, SerializationHelper.Flag flag, DecoratedKey key) throws IOException - { - if (version >= MessagingService.VERSION_30) - { - return deserialize30(in, version, flag); - } - else - { - assert key != null; - return deserializePre30(in, version, flag, key.getKey()); - } - } - - private static PartitionUpdate deserialize30(DataInputPlus in, int version, SerializationHelper.Flag flag) throws IOException + public PartitionUpdate deserialize(DataInputPlus in, int version, SerializationHelper.Flag flag) throws IOException { CFMetaData metadata = CFMetaData.serializer.deserialize(in, version); UnfilteredRowIteratorSerializer.Header header = UnfilteredRowIteratorSerializer.serializer.deserializeHeader(metadata, null, in, version, flag); @@ -854,22 +816,10 @@ public class PartitionUpdate extends AbstractBTreePartition false); } - private static PartitionUpdate deserializePre30(DataInputPlus in, int version, SerializationHelper.Flag flag, ByteBuffer key) throws IOException - { - try (UnfilteredRowIterator iterator = LegacyLayout.deserializeLegacyPartition(in, version, flag, key)) - { - assert iterator != null; // This is only used in mutation, and mutation have never allowed "null" column families - return PartitionUpdate.fromIterator(iterator, ColumnFilter.all(iterator.metadata())); - } - } - public long serializedSize(PartitionUpdate update, int version) { try (UnfilteredRowIterator iter = update.unfilteredIterator()) { - if (version < MessagingService.VERSION_30) - return LegacyLayout.serializedSizeAsLegacyPartition(null, iter, version); - return CFMetaData.serializer.serializedSize(update.metadata(), version) + UnfilteredRowIteratorSerializer.serializer.serializedSize(iter, null, version, update.rowCount()); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java index 852d95e..bcc8d4d 100644 --- a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java +++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java @@ -252,13 +252,11 @@ public abstract class UnfilteredPartitionIterators /** * Digests the the provided iterator. * - * @param command the command that has yield {@code iterator}. This can be null if {@code version >= MessagingService.VERSION_30} - * as this is only used when producing digest to be sent to legacy nodes. * @param iterator the iterator to digest. * @param digest the {@code MessageDigest} to use for the digest. * @param version the messaging protocol to use when producing the digest. */ - public static void digest(ReadCommand command, UnfilteredPartitionIterator iterator, MessageDigest digest, int version) + public static void digest(UnfilteredPartitionIterator iterator, MessageDigest digest, int version) { try (UnfilteredPartitionIterator iter = iterator) { @@ -266,7 +264,7 @@ public abstract class UnfilteredPartitionIterators { try (UnfilteredRowIterator partition = iter.next()) { - UnfilteredRowIterators.digest(command, partition, digest, version); + UnfilteredRowIterators.digest(partition, digest, version); } } } @@ -303,8 +301,6 @@ public abstract class UnfilteredPartitionIterators { public void serialize(UnfilteredPartitionIterator iter, ColumnFilter selection, DataOutputPlus out, int version) throws IOException { - assert version >= MessagingService.VERSION_30; // We handle backward compatibility directy in ReadResponse.LegacyRangeSliceReplySerializer - out.writeBoolean(iter.isForThrift()); while (iter.hasNext()) { @@ -319,7 +315,6 @@ public abstract class UnfilteredPartitionIterators public UnfilteredPartitionIterator deserialize(final DataInputPlus in, final int version, final CFMetaData metadata, final ColumnFilter selection, final SerializationHelper.Flag flag) throws IOException { - assert version >= MessagingService.VERSION_30; // We handle backward compatibility directy in ReadResponse.LegacyRangeSliceReplySerializer final boolean isForThrift = in.readBoolean(); return new AbstractUnfilteredPartitionIterator() http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java index 14730ac..30a0a37 100644 --- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java +++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java @@ -211,12 +211,11 @@ public class UnfilteredRowIteratorWithLowerBound extends LazilyInitializedUnfilt /** * @return true if we can use the clustering values in the stats of the sstable: - * - we need the latest stats file format (or else the clustering values create clusterings with the wrong size) - * - we cannot create tombstone bounds from these values only and so we rule out sstables with tombstones + * we cannot create tombstone bounds from these values only and so we rule out sstables with tombstones */ private boolean canUseMetadataLowerBound() { - return !sstable.hasTombstones() && sstable.descriptor.version.hasNewStatsFile(); + return !sstable.hasTombstones(); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java index 46447ec..004783e 100644 --- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java +++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java @@ -143,20 +143,12 @@ public abstract class UnfilteredRowIterators /** * Digests the partition represented by the provided iterator. * - * @param command the command that has yield {@code iterator}. This can be null if {@code version >= MessagingService.VERSION_30} - * as this is only used when producing digest to be sent to legacy nodes. * @param iterator the iterator to digest. * @param digest the {@code MessageDigest} to use for the digest. * @param version the messaging protocol to use when producing the digest. */ - public static void digest(ReadCommand command, UnfilteredRowIterator iterator, MessageDigest digest, int version) + public static void digest(UnfilteredRowIterator iterator, MessageDigest digest, int version) { - if (version < MessagingService.VERSION_30) - { - LegacyLayout.fromUnfilteredRowIterator(command, iterator).digest(iterator.metadata(), digest); - return; - } - digest.update(iterator.partitionKey().getKey().duplicate()); iterator.partitionLevelDeletion().digest(digest); iterator.columns().regulars.digest(digest); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/dht/AbstractBounds.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/AbstractBounds.java b/src/java/org/apache/cassandra/dht/AbstractBounds.java index 298c316..7a603b0 100644 --- a/src/java/org/apache/cassandra/dht/AbstractBounds.java +++ b/src/java/org/apache/cassandra/dht/AbstractBounds.java @@ -184,6 +184,9 @@ public abstract class AbstractBounds<T extends RingPosition<T>> implements Seria * The first int tells us if it's a range or bounds (depending on the value) _and_ if it's tokens or keys (depending on the * sign). We use negative kind for keys so as to preserve the serialization of token from older version. */ + // !WARNING! While we don't support the pre-3.0 messaging protocol, we serialize the token range in the + // system table (see SystemKeypsace.rangeToBytes) using the old/pre-3.0 format and until we deal with that + // problem, we have to preserve this code. if (version < MessagingService.VERSION_30) out.writeInt(kindInt(range)); else @@ -195,6 +198,7 @@ public abstract class AbstractBounds<T extends RingPosition<T>> implements Seria public AbstractBounds<T> deserialize(DataInput in, IPartitioner p, int version) throws IOException { boolean isToken, startInclusive, endInclusive; + // !WARNING! See serialize method above for why we still need to have that condition. if (version < MessagingService.VERSION_30) { int kind = in.readInt(); @@ -226,6 +230,7 @@ public abstract class AbstractBounds<T extends RingPosition<T>> implements Seria public long serializedSize(AbstractBounds<T> ab, int version) { + // !WARNING! See serialize method above for why we still need to have that condition. int size = version < MessagingService.VERSION_30 ? TypeSizes.sizeof(kindInt(ab)) : 1; http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/gms/Gossiper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java index 212f88c..c4c3872 100644 --- a/src/java/org/apache/cassandra/gms/Gossiper.java +++ b/src/java/org/apache/cassandra/gms/Gossiper.java @@ -979,12 +979,6 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean private void markAlive(final InetAddress addr, final EndpointState localState) { - if (MessagingService.instance().getVersion(addr) < MessagingService.VERSION_20) - { - realMarkAlive(addr, localState); - return; - } - localState.markDead(); MessageOut<EchoMessage> echoMessage = new MessageOut<EchoMessage>(MessagingService.Verb.ECHO, EchoMessage.instance, EchoMessage.serializer); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/hints/LegacyHintsMigrator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/LegacyHintsMigrator.java b/src/java/org/apache/cassandra/hints/LegacyHintsMigrator.java deleted file mode 100644 index 50d8b6e..0000000 --- a/src/java/org/apache/cassandra/hints/LegacyHintsMigrator.java +++ /dev/null @@ -1,244 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.hints; - -import java.io.File; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.*; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.config.SchemaConstants; -import org.apache.cassandra.cql3.QueryProcessor; -import org.apache.cassandra.cql3.UntypedResultSet; -import org.apache.cassandra.db.*; -import org.apache.cassandra.db.compaction.CompactionManager; -import org.apache.cassandra.db.marshal.UUIDType; -import org.apache.cassandra.db.partitions.PartitionUpdate; -import org.apache.cassandra.io.FSWriteError; -import org.apache.cassandra.io.sstable.Descriptor; -import org.apache.cassandra.io.util.DataInputBuffer; -import org.apache.cassandra.io.util.FileUtils; -import org.apache.cassandra.serializers.MarshalException; -import org.apache.cassandra.utils.FBUtilities; - -/** - * A migrator that goes through the legacy system.hints table and writes all the hints to the new hints storage format. - */ -@SuppressWarnings("deprecation") -public final class LegacyHintsMigrator -{ - private static final Logger logger = LoggerFactory.getLogger(LegacyHintsMigrator.class); - - private final File hintsDirectory; - private final long maxHintsFileSize; - - private final ColumnFamilyStore legacyHintsTable; - private final int pageSize; - - public LegacyHintsMigrator(File hintsDirectory, long maxHintsFileSize) - { - this.hintsDirectory = hintsDirectory; - this.maxHintsFileSize = maxHintsFileSize; - - legacyHintsTable = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(SystemKeyspace.LEGACY_HINTS); - pageSize = calculatePageSize(legacyHintsTable); - } - - // read fewer columns (mutations) per page if they are very large - private static int calculatePageSize(ColumnFamilyStore legacyHintsTable) - { - int size = 128; - - int meanCellCount = legacyHintsTable.getMeanColumns(); - double meanPartitionSize = legacyHintsTable.getMeanPartitionSize(); - - if (meanCellCount != 0 && meanPartitionSize != 0) - { - int avgHintSize = (int) meanPartitionSize / meanCellCount; - size = Math.max(2, Math.min(size, (512 << 10) / avgHintSize)); - } - - return size; - } - - public void migrate() - { - // nothing to migrate - if (legacyHintsTable.isEmpty()) - return; - logger.info("Migrating legacy hints to new storage"); - - // major-compact all of the existing sstables to get rid of the tombstones + expired hints - logger.info("Forcing a major compaction of {}.{} table", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.LEGACY_HINTS); - compactLegacyHints(); - - // paginate over legacy hints and write them to the new storage - logger.info("Writing legacy hints to the new storage"); - migrateLegacyHints(); - - // truncate the legacy hints table - logger.info("Truncating {}.{} table", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.LEGACY_HINTS); - legacyHintsTable.truncateBlocking(); - } - - private void compactLegacyHints() - { - Collection<Descriptor> descriptors = new ArrayList<>(); - legacyHintsTable.getTracker().getUncompacting().forEach(sstable -> descriptors.add(sstable.descriptor)); - if (!descriptors.isEmpty()) - forceCompaction(descriptors); - } - - private void forceCompaction(Collection<Descriptor> descriptors) - { - try - { - CompactionManager.instance.submitUserDefined(legacyHintsTable, descriptors, FBUtilities.nowInSeconds()).get(); - } - catch (InterruptedException | ExecutionException e) - { - throw new RuntimeException(e); - } - } - - private void migrateLegacyHints() - { - ByteBuffer buffer = ByteBuffer.allocateDirect(256 * 1024); - String query = String.format("SELECT DISTINCT target_id FROM %s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.LEGACY_HINTS); - //noinspection ConstantConditions - QueryProcessor.executeInternal(query).forEach(row -> migrateLegacyHints(row.getUUID("target_id"), buffer)); - FileUtils.clean(buffer); - } - - private void migrateLegacyHints(UUID hostId, ByteBuffer buffer) - { - String query = String.format("SELECT target_id, hint_id, message_version, mutation, ttl(mutation) AS ttl, writeTime(mutation) AS write_time " + - "FROM %s.%s " + - "WHERE target_id = ?", - SchemaConstants.SYSTEM_KEYSPACE_NAME, - SystemKeyspace.LEGACY_HINTS); - - // read all the old hints (paged iterator), write them in the new format - UntypedResultSet rows = QueryProcessor.executeInternalWithPaging(query, pageSize, hostId); - migrateLegacyHints(hostId, rows, buffer); - - // delete the whole partition in the legacy table; we would truncate the whole table afterwards, but this allows - // to not lose progress in case of a terminated conversion - deleteLegacyHintsPartition(hostId); - } - - private void migrateLegacyHints(UUID hostId, UntypedResultSet rows, ByteBuffer buffer) - { - migrateLegacyHints(hostId, rows.iterator(), buffer); - } - - private void migrateLegacyHints(UUID hostId, Iterator<UntypedResultSet.Row> iterator, ByteBuffer buffer) - { - do - { - migrateLegacyHintsInternal(hostId, iterator, buffer); - // if there are hints that didn't fit in the previous file, keep calling the method to write to a new - // file until we get everything written. - } - while (iterator.hasNext()); - } - - private void migrateLegacyHintsInternal(UUID hostId, Iterator<UntypedResultSet.Row> iterator, ByteBuffer buffer) - { - HintsDescriptor descriptor = new HintsDescriptor(hostId, System.currentTimeMillis()); - - try (HintsWriter writer = HintsWriter.create(hintsDirectory, descriptor)) - { - try (HintsWriter.Session session = writer.newSession(buffer)) - { - while (iterator.hasNext()) - { - Hint hint = convertLegacyHint(iterator.next()); - if (hint != null) - session.append(hint); - - if (session.position() >= maxHintsFileSize) - break; - } - } - } - catch (IOException e) - { - throw new FSWriteError(e, descriptor.fileName()); - } - } - - private static Hint convertLegacyHint(UntypedResultSet.Row row) - { - Mutation mutation = deserializeLegacyMutation(row); - if (mutation == null) - return null; - - long creationTime = row.getLong("write_time"); // milliseconds, not micros, for the hints table - int expirationTime = FBUtilities.nowInSeconds() + row.getInt("ttl"); - int originalGCGS = expirationTime - (int) TimeUnit.MILLISECONDS.toSeconds(creationTime); - - int gcgs = Math.min(originalGCGS, mutation.smallestGCGS()); - - return Hint.create(mutation, creationTime, gcgs); - } - - private static Mutation deserializeLegacyMutation(UntypedResultSet.Row row) - { - try (DataInputBuffer dib = new DataInputBuffer(row.getBlob("mutation"), true)) - { - Mutation mutation = Mutation.serializer.deserialize(dib, - row.getInt("message_version")); - mutation.getPartitionUpdates().forEach(PartitionUpdate::validate); - return mutation; - } - catch (IOException e) - { - logger.error("Failed to migrate a hint for {} from legacy {}.{} table", - row.getUUID("target_id"), - SchemaConstants.SYSTEM_KEYSPACE_NAME, - SystemKeyspace.LEGACY_HINTS, - e); - return null; - } - catch (MarshalException e) - { - logger.warn("Failed to validate a hint for {} from legacy {}.{} table - skipping", - row.getUUID("target_id"), - SchemaConstants.SYSTEM_KEYSPACE_NAME, - SystemKeyspace.LEGACY_HINTS, - e); - return null; - } - } - - private static void deleteLegacyHintsPartition(UUID hostId) - { - // intentionally use millis, like the rest of the legacy implementation did, just in case - Mutation mutation = new Mutation(PartitionUpdate.fullPartitionDelete(SystemKeyspace.LegacyHints, - UUIDType.instance.decompose(hostId), - System.currentTimeMillis(), - FBUtilities.nowInSeconds())); - mutation.applyUnsafe(); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/io/ForwardingVersionedSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/ForwardingVersionedSerializer.java b/src/java/org/apache/cassandra/io/ForwardingVersionedSerializer.java deleted file mode 100644 index 64f91d7..0000000 --- a/src/java/org/apache/cassandra/io/ForwardingVersionedSerializer.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.io; - -import java.io.IOException; - -import org.apache.cassandra.io.util.DataInputPlus; -import org.apache.cassandra.io.util.DataOutputPlus; - -/** - * A serializer which forwards all its method calls to another serializer. Subclasses should override one or more - * methods to modify the behavior of the backing serializer as desired per the decorator pattern. - */ -public abstract class ForwardingVersionedSerializer<T> implements IVersionedSerializer<T> -{ - protected ForwardingVersionedSerializer() - { - } - - /** - * Returns the backing delegate instance that methods are forwarded to. - * - * @param version the server version - * @return the backing delegate instance that methods are forwarded to. - */ - protected abstract IVersionedSerializer<T> delegate(int version); - - public void serialize(T t, DataOutputPlus out, int version) throws IOException - { - delegate(version).serialize(t, out, version); - } - - public T deserialize(DataInputPlus in, int version) throws IOException - { - return delegate(version).deserialize(in, version); - } - - public long serializedSize(T t, int version) - { - return delegate(version).serializedSize(t, version); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java index 9a4d919..7d98570 100644 --- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java +++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java @@ -71,7 +71,6 @@ public class CompressionMetadata private final long chunkOffsetsSize; public final String indexFilePath; public final CompressionParams parameters; - public final ChecksumType checksumType; /** * Create metadata about given compressed file including uncompressed data length, chunk size @@ -87,14 +86,13 @@ public class CompressionMetadata public static CompressionMetadata create(String dataFilePath) { Descriptor desc = Descriptor.fromFilename(dataFilePath); - return new CompressionMetadata(desc.filenameFor(Component.COMPRESSION_INFO), new File(dataFilePath).length(), desc.version.compressedChecksumType()); + return new CompressionMetadata(desc.filenameFor(Component.COMPRESSION_INFO), new File(dataFilePath).length()); } @VisibleForTesting - public CompressionMetadata(String indexFilePath, long compressedLength, ChecksumType checksumType) + public CompressionMetadata(String indexFilePath, long compressedLength) { this.indexFilePath = indexFilePath; - this.checksumType = checksumType; try (DataInputStream stream = new DataInputStream(new FileInputStream(indexFilePath))) { @@ -133,7 +131,7 @@ public class CompressionMetadata this.chunkOffsetsSize = chunkOffsets.size(); } - private CompressionMetadata(String filePath, CompressionParams parameters, SafeMemory offsets, long offsetsSize, long dataLength, long compressedLength, ChecksumType checksumType) + private CompressionMetadata(String filePath, CompressionParams parameters, SafeMemory offsets, long offsetsSize, long dataLength, long compressedLength) { this.indexFilePath = filePath; this.parameters = parameters; @@ -141,7 +139,6 @@ public class CompressionMetadata this.compressedFileLength = compressedLength; this.chunkOffsets = offsets; this.chunkOffsetsSize = offsetsSize; - this.checksumType = checksumType; } public ICompressor compressor() @@ -417,7 +414,7 @@ public class CompressionMetadata if (count < this.count) compressedLength = offsets.getLong(count * 8L); - return new CompressionMetadata(filePath, parameters, offsets, count * 8L, dataLength, compressedLength, ChecksumType.CRC32); + return new CompressionMetadata(filePath, parameters, offsets, count * 8L, dataLength, compressedLength); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java index 9a8f968..7efca63 100644 --- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java @@ -18,7 +18,7 @@ package org.apache.cassandra.io.sstable; import java.io.File; -import java.io.FilenameFilter; +import java.io.FileFilter; import java.io.IOException; import java.io.Closeable; import java.nio.ByteBuffer; @@ -90,12 +90,11 @@ abstract class AbstractSSTableSimpleWriter implements Closeable private static int getNextGeneration(File directory, final String columnFamily) { final Set<Descriptor> existing = new HashSet<>(); - directory.list(new FilenameFilter() + directory.listFiles(new FileFilter() { - public boolean accept(File dir, String name) + public boolean accept(File file) { - Pair<Descriptor, Component> p = SSTable.tryComponentFromFilename(dir, name); - Descriptor desc = p == null ? null : p.left; + Descriptor desc = SSTable.tryDescriptorFromFilename(file); if (desc == null) return false; http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/io/sstable/Component.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/Component.java b/src/java/org/apache/cassandra/io/sstable/Component.java index 38152af..469a25c 100644 --- a/src/java/org/apache/cassandra/io/sstable/Component.java +++ b/src/java/org/apache/cassandra/io/sstable/Component.java @@ -50,8 +50,8 @@ public class Component COMPRESSION_INFO("CompressionInfo.db"), // statistical metadata about the content of the sstable STATS("Statistics.db"), - // holds adler32 checksum of the data file - DIGEST("Digest.crc32", "Digest.adler32", "Digest.sha1"), + // holds CRC32 checksum of the data file + DIGEST("Digest.crc32"), // holds the CRC32 for chunks in an a uncompressed file. CRC("CRC.db"), // holds SSTable Index Summary (sampling of Index component) @@ -61,16 +61,11 @@ public class Component // built-in secondary index (may be multiple per sstable) SECONDARY_INDEX("SI_.*.db"), // custom component, used by e.g. custom compaction strategy - CUSTOM(new String[] { null }); + CUSTOM(null); - final String[] repr; + final String repr; Type(String repr) { - this(new String[] { repr }); - } - - Type(String... repr) - { this.repr = repr; } @@ -78,9 +73,7 @@ public class Component { for (Type type : TYPES) { - if (type.repr == null || type.repr.length == 0 || type.repr[0] == null) - continue; - if (Pattern.matches(type.repr[0], repr)) + if (type.repr != null && Pattern.matches(type.repr, repr)) return type; } return CUSTOM; @@ -93,36 +86,18 @@ public class Component public final static Component FILTER = new Component(Type.FILTER); public final static Component COMPRESSION_INFO = new Component(Type.COMPRESSION_INFO); public final static Component STATS = new Component(Type.STATS); - private static final String digestCrc32 = "Digest.crc32"; - private static final String digestAdler32 = "Digest.adler32"; - private static final String digestSha1 = "Digest.sha1"; - public final static Component DIGEST_CRC32 = new Component(Type.DIGEST, digestCrc32); - public final static Component DIGEST_ADLER32 = new Component(Type.DIGEST, digestAdler32); - public final static Component DIGEST_SHA1 = new Component(Type.DIGEST, digestSha1); + public final static Component DIGEST = new Component(Type.DIGEST); public final static Component CRC = new Component(Type.CRC); public final static Component SUMMARY = new Component(Type.SUMMARY); public final static Component TOC = new Component(Type.TOC); - public static Component digestFor(ChecksumType checksumType) - { - switch (checksumType) - { - case Adler32: - return DIGEST_ADLER32; - case CRC32: - return DIGEST_CRC32; - } - throw new AssertionError(); - } - public final Type type; public final String name; public final int hashCode; public Component(Type type) { - this(type, type.repr[0]); - assert type.repr.length == 1; + this(type, type.repr); assert type != Type.CUSTOM; } @@ -143,45 +118,32 @@ public class Component } /** - * {@code - * Filename of the form "<ksname>/<cfname>-[tmp-][<version>-]<gen>-<component>", - * } - * @return A Descriptor for the SSTable, and a Component for this particular file. - * TODO move descriptor into Component field + * Parse the component part of a sstable filename into a {@code Component} object. + * + * @param name a string representing a sstable component. + * @return the component corresponding to {@code name}. Note that this always return a component as an unrecognized + * name is parsed into a CUSTOM component. */ - public static Pair<Descriptor,Component> fromFilename(File directory, String name) + static Component parse(String name) { - Pair<Descriptor,String> path = Descriptor.fromFilename(directory, name); + Type type = Type.fromRepresentation(name); - // parse the component suffix - Type type = Type.fromRepresentation(path.right); - // build (or retrieve singleton for) the component object - Component component; - switch(type) + // Build (or retrieve singleton for) the component object + switch (type) { - case DATA: component = Component.DATA; break; - case PRIMARY_INDEX: component = Component.PRIMARY_INDEX; break; - case FILTER: component = Component.FILTER; break; - case COMPRESSION_INFO: component = Component.COMPRESSION_INFO; break; - case STATS: component = Component.STATS; break; - case DIGEST: switch (path.right) - { - case digestCrc32: component = Component.DIGEST_CRC32; break; - case digestAdler32: component = Component.DIGEST_ADLER32; break; - case digestSha1: component = Component.DIGEST_SHA1; break; - default: throw new IllegalArgumentException("Invalid digest component " + path.right); - } - break; - case CRC: component = Component.CRC; break; - case SUMMARY: component = Component.SUMMARY; break; - case TOC: component = Component.TOC; break; - case SECONDARY_INDEX: component = new Component(Type.SECONDARY_INDEX, path.right); break; - case CUSTOM: component = new Component(Type.CUSTOM, path.right); break; - default: - throw new IllegalStateException(); + case DATA: return Component.DATA; + case PRIMARY_INDEX: return Component.PRIMARY_INDEX; + case FILTER: return Component.FILTER; + case COMPRESSION_INFO: return Component.COMPRESSION_INFO; + case STATS: return Component.STATS; + case DIGEST: return Component.DIGEST; + case CRC: return Component.CRC; + case SUMMARY: return Component.SUMMARY; + case TOC: return Component.TOC; + case SECONDARY_INDEX: return new Component(Type.SECONDARY_INDEX, name); + case CUSTOM: return new Component(Type.CUSTOM, name); + default: throw new AssertionError(); } - - return Pair.create(path.left, component); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/io/sstable/Descriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java b/src/java/org/apache/cassandra/io/sstable/Descriptor.java index 1f7e67f..3804fd8 100644 --- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java +++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java @@ -26,12 +26,12 @@ import java.util.regex.Pattern; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.CharMatcher; import com.google.common.base.Objects; +import com.google.common.base.Splitter; import org.apache.cassandra.db.Directories; import org.apache.cassandra.io.sstable.format.SSTableFormat; import org.apache.cassandra.io.sstable.format.Version; import org.apache.cassandra.io.sstable.metadata.IMetadataSerializer; -import org.apache.cassandra.io.sstable.metadata.LegacyMetadataSerializer; import org.apache.cassandra.io.sstable.metadata.MetadataSerializer; import org.apache.cassandra.utils.Pair; @@ -46,8 +46,13 @@ import static org.apache.cassandra.io.sstable.Component.separator; */ public class Descriptor { + private final static String LEGACY_TMP_REGEX_STR = "^((.*)\\-(.*)\\-)?tmp(link)?\\-((?:l|k).)\\-(\\d)*\\-(.*)$"; + private final static Pattern LEGACY_TMP_REGEX = Pattern.compile(LEGACY_TMP_REGEX_STR); + public static String TMP_EXT = ".tmp"; + private static final Splitter filenameSplitter = Splitter.on('-'); + /** canonicalized path to the directory where SSTable resides */ public final File directory; /** version has the following format: <code>[a-z]+</code> */ @@ -56,8 +61,6 @@ public class Descriptor public final String cfname; public final int generation; public final SSTableFormat.Type formatType; - /** digest component - might be {@code null} for old, legacy sstables */ - public final Component digestComponent; private final int hashCode; /** @@ -66,7 +69,7 @@ public class Descriptor @VisibleForTesting public Descriptor(File directory, String ksname, String cfname, int generation) { - this(SSTableFormat.Type.current().info.getLatestVersion(), directory, ksname, cfname, generation, SSTableFormat.Type.current(), null); + this(SSTableFormat.Type.current().info.getLatestVersion(), directory, ksname, cfname, generation, SSTableFormat.Type.current()); } /** @@ -74,16 +77,10 @@ public class Descriptor */ public Descriptor(File directory, String ksname, String cfname, int generation, SSTableFormat.Type formatType) { - this(formatType.info.getLatestVersion(), directory, ksname, cfname, generation, formatType, Component.digestFor(formatType.info.getLatestVersion().uncompressedChecksumType())); - } - - @VisibleForTesting - public Descriptor(String version, File directory, String ksname, String cfname, int generation, SSTableFormat.Type formatType) - { - this(formatType.info.getVersion(version), directory, ksname, cfname, generation, formatType, Component.digestFor(formatType.info.getLatestVersion().uncompressedChecksumType())); + this(formatType.info.getLatestVersion(), directory, ksname, cfname, generation, formatType); } - public Descriptor(Version version, File directory, String ksname, String cfname, int generation, SSTableFormat.Type formatType, Component digestComponent) + public Descriptor(Version version, File directory, String ksname, String cfname, int generation, SSTableFormat.Type formatType) { assert version != null && directory != null && ksname != null && cfname != null && formatType.info.getLatestVersion().getClass().equals(version.getClass()); this.version = version; @@ -99,24 +96,18 @@ public class Descriptor this.cfname = cfname; this.generation = generation; this.formatType = formatType; - this.digestComponent = digestComponent; hashCode = Objects.hashCode(version, this.directory, generation, ksname, cfname, formatType); } public Descriptor withGeneration(int newGeneration) { - return new Descriptor(version, directory, ksname, cfname, newGeneration, formatType, digestComponent); + return new Descriptor(version, directory, ksname, cfname, newGeneration, formatType); } public Descriptor withFormatType(SSTableFormat.Type newType) { - return new Descriptor(newType.info.getLatestVersion(), directory, ksname, cfname, generation, newType, digestComponent); - } - - public Descriptor withDigestComponent(Component newDigestComponent) - { - return new Descriptor(version, directory, ksname, cfname, generation, formatType, newDigestComponent); + return new Descriptor(newType.info.getLatestVersion(), directory, ksname, cfname, generation, newType); } public String tmpFilenameFor(Component component) @@ -139,15 +130,9 @@ public class Descriptor private void appendFileName(StringBuilder buff) { - if (!version.hasNewFileName()) - { - buff.append(ksname).append(separator); - buff.append(cfname).append(separator); - } buff.append(version).append(separator); buff.append(generation); - if (formatType != SSTableFormat.Type.LEGACY) - buff.append(separator).append(formatType.name); + buff.append(separator).append(formatType.name); } public String relativeFilenameFor(Component component) @@ -176,155 +161,156 @@ public class Descriptor return ret; } - /** - * Files obsoleted by CASSANDRA-7066 : temporary files and compactions_in_progress. We support - * versions 2.1 (ka) and 2.2 (la). - * Temporary files have tmp- or tmplink- at the beginning for 2.2 sstables or after ks-cf- for 2.1 sstables - */ - - private final static String LEGACY_COMP_IN_PROG_REGEX_STR = "^compactions_in_progress(\\-[\\d,a-f]{32})?$"; - private final static Pattern LEGACY_COMP_IN_PROG_REGEX = Pattern.compile(LEGACY_COMP_IN_PROG_REGEX_STR); - private final static String LEGACY_TMP_REGEX_STR = "^((.*)\\-(.*)\\-)?tmp(link)?\\-((?:l|k).)\\-(\\d)*\\-(.*)$"; - private final static Pattern LEGACY_TMP_REGEX = Pattern.compile(LEGACY_TMP_REGEX_STR); - - public static boolean isLegacyFile(File file) + public static boolean isValidFile(File file) { - if (file.isDirectory()) - return file.getParentFile() != null && - file.getParentFile().getName().equalsIgnoreCase("system") && - LEGACY_COMP_IN_PROG_REGEX.matcher(file.getName()).matches(); - else - return LEGACY_TMP_REGEX.matcher(file.getName()).matches(); - } - - public static boolean isValidFile(String fileName) - { - return fileName.endsWith(".db") && !LEGACY_TMP_REGEX.matcher(fileName).matches(); + String filename = file.getName(); + return filename.endsWith(".db") && !LEGACY_TMP_REGEX.matcher(filename).matches(); } /** - * @see #fromFilename(File directory, String name) - * @param filename The SSTable filename - * @return Descriptor of the SSTable initialized from filename + * Parse a sstable filename into a Descriptor. + * <p> + * This is a shortcut for {@code fromFilename(new File(filename))}. + * + * @param filename the filename to a sstable component. + * @return the descriptor for the parsed file. + * + * @throws IllegalArgumentException if the provided {@code file} does point to a valid sstable filename. This could + * mean either that the filename doesn't look like a sstable file, or that it is for an old and unsupported + * versions. */ public static Descriptor fromFilename(String filename) { - return fromFilename(filename, false); - } - - public static Descriptor fromFilename(String filename, SSTableFormat.Type formatType) - { - return fromFilename(filename).withFormatType(formatType); + return fromFilename(new File(filename)); } - public static Descriptor fromFilename(String filename, boolean skipComponent) - { - File file = new File(filename).getAbsoluteFile(); - return fromFilename(file.getParentFile(), file.getName(), skipComponent).left; - } - - public static Pair<Descriptor, String> fromFilename(File directory, String name) + /** + * Parse a sstable filename into a Descriptor. + * <p> + * SSTables files are all located within subdirectories of the form {@code <keyspace>/<table>/}. Normal sstables are + * are directly within that subdirectory structure while 2ndary index, backups and snapshot are each inside an + * additional subdirectory. The file themselves have the form: + * {@code <version>-<gen>-<format>-<component>}. + * <p> + * Note that this method will only sucessfully parse sstable files of supported versions. + * + * @param file the {@code File} object for the filename to parse. + * @return the descriptor for the parsed file. + * + * @throws IllegalArgumentException if the provided {@code file} does point to a valid sstable filename. This could + * mean either that the filename doesn't look like a sstable file, or that it is for an old and unsupported + * versions. + */ + public static Descriptor fromFilename(File file) { - return fromFilename(directory, name, false); + return fromFilenameWithComponent(file).left; } /** - * Filename of the form is vary by version: + * Parse a sstable filename, extracting both the {@code Descriptor} and {@code Component} part. * - * <ul> - * <li><ksname>-<cfname>-(tmp-)?<version>-<gen>-<component> for cassandra 2.0 and before</li> - * <li>(<tmp marker>-)?<version>-<gen>-<component> for cassandra 3.0 and later</li> - * </ul> + * @param file the {@code File} object for the filename to parse. + * @return a pair of the descriptor and component corresponding to the provided {@code file}. * - * If this is for SSTable of secondary index, directory should ends with index name for 2.1+. - * - * @param directory The directory of the SSTable files - * @param name The name of the SSTable file - * @param skipComponent true if the name param should not be parsed for a component tag - * - * @return A Descriptor for the SSTable, and the Component remainder. + * @throws IllegalArgumentException if the provided {@code file} does point to a valid sstable filename. This could + * mean either that the filename doesn't look like a sstable file, or that it is for an old and unsupported + * versions. */ - public static Pair<Descriptor, String> fromFilename(File directory, String name, boolean skipComponent) + public static Pair<Descriptor, Component> fromFilenameWithComponent(File file) { - File parentDirectory = directory != null ? directory : new File("."); + // We need to extract the keyspace and table names from the parent directories, so make sure we deal with the + // absolute path. + if (!file.isAbsolute()) + file = file.getAbsoluteFile(); - // tokenize the filename - StringTokenizer st = new StringTokenizer(name, String.valueOf(separator)); - String nexttok; + String name = file.getName(); + List<String> tokens = filenameSplitter.splitToList(name); + int size = tokens.size(); - // read tokens backwards to determine version - Deque<String> tokenStack = new ArrayDeque<>(); - while (st.hasMoreTokens()) + if (size != 4) { - tokenStack.push(st.nextToken()); + // This is an invalid sstable file for this version. But to provide a more helpful error message, we detect + // old format sstable, which had the format: + // <keyspace>-<table>-(tmp-)?<version>-<gen>-<component> + // Note that we assume it's an old format sstable if it has the right number of tokens: this is not perfect + // but we're just trying to be helpful, not perfect. + if (size == 5 || size == 6) + throw new IllegalArgumentException(String.format("%s is of version %s which is now unsupported and cannot be read.", + name, + tokens.get(size - 3))); + throw new IllegalArgumentException(String.format("Invalid sstable file %s: the name doesn't look like a supported sstable file name", name)); } - // component suffix - String component = skipComponent ? null : tokenStack.pop(); + String versionString = tokens.get(0); + if (!Version.validate(versionString)) + throw invalidSSTable(name, "invalid version %s", versionString); - nexttok = tokenStack.pop(); - // generation OR format type - SSTableFormat.Type fmt = SSTableFormat.Type.LEGACY; - if (!CharMatcher.DIGIT.matchesAllOf(nexttok)) + int generation; + try { - fmt = SSTableFormat.Type.validate(nexttok); - nexttok = tokenStack.pop(); + generation = Integer.parseInt(tokens.get(1)); + } + catch (NumberFormatException e) + { + throw invalidSSTable(name, "the 'generation' part of the name doesn't parse as a number"); } - // generation - int generation = Integer.parseInt(nexttok); + String formatString = tokens.get(2); + SSTableFormat.Type format; + try + { + format = SSTableFormat.Type.validate(formatString); + } + catch (IllegalArgumentException e) + { + throw invalidSSTable(name, "unknown 'format' part (%s)", formatString); + } - // version - nexttok = tokenStack.pop(); + Component component = Component.parse(tokens.get(3)); - if (!Version.validate(nexttok)) - throw new UnsupportedOperationException("SSTable " + name + " is too old to open. Upgrade to 2.0 first, and run upgradesstables"); + Version version = format.info.getVersion(versionString); + if (!version.isCompatible()) + throw invalidSSTable(name, "incompatible sstable version (%s); you should have run upgradesstables before upgrading", versionString); - Version version = fmt.info.getVersion(nexttok); + File directory = parentOf(name, file); + File tableDir = directory; - // ks/cf names - String ksname, cfname; - if (version.hasNewFileName()) + // Check if it's a 2ndary index directory (not that it doesn't exclude it to be also a backup or snapshot) + String indexName = ""; + if (tableDir.getName().startsWith(Directories.SECONDARY_INDEX_NAME_SEPARATOR)) { - // for 2.1+ read ks and cf names from directory - File cfDirectory = parentDirectory; - // check if this is secondary index - String indexName = ""; - if (cfDirectory.getName().startsWith(Directories.SECONDARY_INDEX_NAME_SEPARATOR)) - { - indexName = cfDirectory.getName(); - cfDirectory = cfDirectory.getParentFile(); - } - if (cfDirectory.getName().equals(Directories.BACKUPS_SUBDIR)) - { - cfDirectory = cfDirectory.getParentFile(); - } - else if (cfDirectory.getParentFile().getName().equals(Directories.SNAPSHOT_SUBDIR)) - { - cfDirectory = cfDirectory.getParentFile().getParentFile(); - } - cfname = cfDirectory.getName().split("-")[0] + indexName; - ksname = cfDirectory.getParentFile().getName(); + indexName = tableDir.getName(); + tableDir = parentOf(name, tableDir); } - else - { - cfname = tokenStack.pop(); - ksname = tokenStack.pop(); - } - assert tokenStack.isEmpty() : "Invalid file name " + name + " in " + directory; - return Pair.create(new Descriptor(version, parentDirectory, ksname, cfname, generation, fmt, - // _assume_ version from version - Component.digestFor(version.uncompressedChecksumType())), - component); + // Then it can be a backup or a snapshot + if (tableDir.getName().equals(Directories.BACKUPS_SUBDIR)) + tableDir = tableDir.getParentFile(); + else if (parentOf(name, tableDir).getName().equals(Directories.SNAPSHOT_SUBDIR)) + tableDir = parentOf(name, parentOf(name, tableDir)); + + String table = tableDir.getName().split("-")[0] + indexName; + String keyspace = parentOf(name, tableDir).getName(); + + return Pair.create(new Descriptor(version, directory, keyspace, table, generation, format), component); + } + + private static File parentOf(String name, File file) + { + File parent = file.getParentFile(); + if (parent == null) + throw invalidSSTable(name, "cannot extract keyspace and table name; make sure the sstable is in the proper sub-directories"); + return parent; + } + + private static IllegalArgumentException invalidSSTable(String name, String msgFormat, Object... parameters) + { + throw new IllegalArgumentException(String.format("Invalid sstable file " + name + ": " + msgFormat, parameters)); } public IMetadataSerializer getMetadataSerializer() { - if (version.hasNewStatsFile()) - return new MetadataSerializer(); - else - return new LegacyMetadataSerializer(); + return new MetadataSerializer(); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/io/sstable/IndexInfo.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/IndexInfo.java b/src/java/org/apache/cassandra/io/sstable/IndexInfo.java index 9ee1996..03246c5 100644 --- a/src/java/org/apache/cassandra/io/sstable/IndexInfo.java +++ b/src/java/org/apache/cassandra/io/sstable/IndexInfo.java @@ -19,11 +19,14 @@ package org.apache.cassandra.io.sstable; import java.io.IOException; +import java.util.List; import org.apache.cassandra.db.ClusteringPrefix; import org.apache.cassandra.db.DeletionTime; import org.apache.cassandra.db.RowIndexEntry; +import org.apache.cassandra.db.SerializationHeader; import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.io.ISerializer; import org.apache.cassandra.io.sstable.format.Version; import org.apache.cassandra.io.util.DataInputPlus; @@ -79,6 +82,11 @@ public class IndexInfo this.endOpenMarker = endOpenMarker; } + public static IndexInfo.Serializer serializer(Version version, SerializationHeader header) + { + return new IndexInfo.Serializer(version, header.clusteringTypes()); + } + public static class Serializer implements ISerializer<IndexInfo> { // This is the default index size that we use to delta-encode width when serializing so we get better vint-encoding. @@ -87,21 +95,19 @@ public class IndexInfo // size so using the default is almost surely better than using no base at all. public static final long WIDTH_BASE = 64 * 1024; - private final ISerializer<ClusteringPrefix> clusteringSerializer; - private final Version version; + private final int version; + private final List<AbstractType<?>> clusteringTypes; - public Serializer(Version version, ISerializer<ClusteringPrefix> clusteringSerializer) + public Serializer(Version version, List<AbstractType<?>> clusteringTypes) { - this.clusteringSerializer = clusteringSerializer; - this.version = version; + this.version = version.correspondingMessagingVersion(); + this.clusteringTypes = clusteringTypes; } public void serialize(IndexInfo info, DataOutputPlus out) throws IOException { - assert version.storeRows() : "We read old index files but we should never write them"; - - clusteringSerializer.serialize(info.firstName, out); - clusteringSerializer.serialize(info.lastName, out); + ClusteringPrefix.serializer.serialize(info.firstName, out, version, clusteringTypes); + ClusteringPrefix.serializer.serialize(info.lastName, out, version, clusteringTypes); out.writeUnsignedVInt(info.offset); out.writeVInt(info.width - WIDTH_BASE); @@ -112,53 +118,33 @@ public class IndexInfo public void skip(DataInputPlus in) throws IOException { - clusteringSerializer.skip(in); - clusteringSerializer.skip(in); - if (version.storeRows()) - { - in.readUnsignedVInt(); - in.readVInt(); - if (in.readBoolean()) - DeletionTime.serializer.skip(in); - } - else - { - in.skipBytes(TypeSizes.sizeof(0L)); - in.skipBytes(TypeSizes.sizeof(0L)); - } + ClusteringPrefix.serializer.skip(in, version, clusteringTypes); + ClusteringPrefix.serializer.skip(in, version, clusteringTypes); + in.readUnsignedVInt(); + in.readVInt(); + if (in.readBoolean()) + DeletionTime.serializer.skip(in); } public IndexInfo deserialize(DataInputPlus in) throws IOException { - ClusteringPrefix firstName = clusteringSerializer.deserialize(in); - ClusteringPrefix lastName = clusteringSerializer.deserialize(in); - long offset; - long width; + ClusteringPrefix firstName = ClusteringPrefix.serializer.deserialize(in, version, clusteringTypes); + ClusteringPrefix lastName = ClusteringPrefix.serializer.deserialize(in, version, clusteringTypes); + long offset = in.readUnsignedVInt(); + long width = in.readVInt() + WIDTH_BASE; DeletionTime endOpenMarker = null; - if (version.storeRows()) - { - offset = in.readUnsignedVInt(); - width = in.readVInt() + WIDTH_BASE; - if (in.readBoolean()) - endOpenMarker = DeletionTime.serializer.deserialize(in); - } - else - { - offset = in.readLong(); - width = in.readLong(); - } + if (in.readBoolean()) + endOpenMarker = DeletionTime.serializer.deserialize(in); return new IndexInfo(firstName, lastName, offset, width, endOpenMarker); } public long serializedSize(IndexInfo info) { - assert version.storeRows() : "We read old index files but we should never write them"; - - long size = clusteringSerializer.serializedSize(info.firstName) - + clusteringSerializer.serializedSize(info.lastName) - + TypeSizes.sizeofUnsignedVInt(info.offset) - + TypeSizes.sizeofVInt(info.width - WIDTH_BASE) - + TypeSizes.sizeof(info.endOpenMarker != null); + long size = ClusteringPrefix.serializer.serializedSize(info.firstName, version, clusteringTypes) + + ClusteringPrefix.serializer.serializedSize(info.lastName, version, clusteringTypes) + + TypeSizes.sizeofUnsignedVInt(info.offset) + + TypeSizes.sizeofVInt(info.width - WIDTH_BASE) + + TypeSizes.sizeof(info.endOpenMarker != null); if (info.endOpenMarker != null) size += DeletionTime.serializer.serializedSize(info.endOpenMarker); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/io/sstable/IndexSummary.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java index 6de3478..303adfd 100644 --- a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java +++ b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java @@ -268,16 +268,13 @@ public class IndexSummary extends WrappedSharedCloseable public static class IndexSummarySerializer { - public void serialize(IndexSummary t, DataOutputPlus out, boolean withSamplingLevel) throws IOException + public void serialize(IndexSummary t, DataOutputPlus out) throws IOException { out.writeInt(t.minIndexInterval); out.writeInt(t.offsetCount); out.writeLong(t.getOffHeapSize()); - if (withSamplingLevel) - { - out.writeInt(t.samplingLevel); - out.writeInt(t.sizeAtFullSampling); - } + out.writeInt(t.samplingLevel); + out.writeInt(t.sizeAtFullSampling); // our on-disk representation treats the offsets and the summary data as one contiguous structure, // in which the offsets are based from the start of the structure. i.e., if the offsets occupy // X bytes, the value of the first offset will be X. In memory we split the two regions up, so that @@ -297,7 +294,7 @@ public class IndexSummary extends WrappedSharedCloseable } @SuppressWarnings("resource") - public IndexSummary deserialize(DataInputStream in, IPartitioner partitioner, boolean haveSamplingLevel, int expectedMinIndexInterval, int maxIndexInterval) throws IOException + public IndexSummary deserialize(DataInputStream in, IPartitioner partitioner, int expectedMinIndexInterval, int maxIndexInterval) throws IOException { int minIndexInterval = in.readInt(); if (minIndexInterval != expectedMinIndexInterval) @@ -308,17 +305,8 @@ public class IndexSummary extends WrappedSharedCloseable int offsetCount = in.readInt(); long offheapSize = in.readLong(); - int samplingLevel, fullSamplingSummarySize; - if (haveSamplingLevel) - { - samplingLevel = in.readInt(); - fullSamplingSummarySize = in.readInt(); - } - else - { - samplingLevel = BASE_SAMPLING_LEVEL; - fullSamplingSummarySize = offsetCount; - } + int samplingLevel = in.readInt(); + int fullSamplingSummarySize = in.readInt(); int effectiveIndexInterval = (int) Math.ceil((BASE_SAMPLING_LEVEL / (double) samplingLevel) * minIndexInterval); if (effectiveIndexInterval > maxIndexInterval) @@ -355,13 +343,12 @@ public class IndexSummary extends WrappedSharedCloseable * * Only for use by offline tools like SSTableMetadataViewer, otherwise SSTable.first/last should be used. */ - public Pair<DecoratedKey, DecoratedKey> deserializeFirstLastKey(DataInputStream in, IPartitioner partitioner, boolean haveSamplingLevel) throws IOException + public Pair<DecoratedKey, DecoratedKey> deserializeFirstLastKey(DataInputStream in, IPartitioner partitioner) throws IOException { in.skipBytes(4); // minIndexInterval int offsetCount = in.readInt(); long offheapSize = in.readLong(); - if (haveSamplingLevel) - in.skipBytes(8); // samplingLevel, fullSamplingSummarySize + in.skipBytes(8); // samplingLevel, fullSamplingSummarySize in.skip(offsetCount * 4); in.skip(offheapSize - offsetCount * 4); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java index 8fb4835..fc326dc 100644 --- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java +++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java @@ -73,21 +73,9 @@ public class IndexSummaryRedistribution extends CompactionInfo.Holder public List<SSTableReader> redistributeSummaries() throws IOException { logger.info("Redistributing index summaries"); - List<SSTableReader> oldFormatSSTables = new ArrayList<>(); List<SSTableReader> redistribute = new ArrayList<>(); for (LifecycleTransaction txn : transactions.values()) { - for (SSTableReader sstable : ImmutableList.copyOf(txn.originals())) - { - // We can't change the sampling level of sstables with the old format, because the serialization format - // doesn't include the sampling level. Leave this one as it is. (See CASSANDRA-8993 for details.) - logger.trace("SSTable {} cannot be re-sampled due to old sstable format", sstable); - if (!sstable.descriptor.version.hasSamplingLevel()) - { - oldFormatSSTables.add(sstable); - txn.cancel(sstable); - } - } redistribute.addAll(txn.originals()); } @@ -119,7 +107,7 @@ public class IndexSummaryRedistribution extends CompactionInfo.Holder Collections.sort(sstablesByHotness, new ReadRateComparator(readRates)); long remainingBytes = memoryPoolBytes; - for (SSTableReader sstable : Iterables.concat(compacting, oldFormatSSTables)) + for (SSTableReader sstable : compacting) remainingBytes -= sstable.getIndexSummaryOffHeapSize(); logger.trace("Index summaries for compacting SSTables are using {} MB of space", @@ -130,7 +118,7 @@ public class IndexSummaryRedistribution extends CompactionInfo.Holder txn.finish(); total = 0; - for (SSTableReader sstable : Iterables.concat(compacting, oldFormatSSTables, newSSTables)) + for (SSTableReader sstable : Iterables.concat(compacting, newSSTables)) total += sstable.getIndexSummaryOffHeapSize(); logger.trace("Completed resizing of index summaries; current approximate memory used: {}", FBUtilities.prettyPrintMemory(total)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/io/sstable/SSTable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java b/src/java/org/apache/cassandra/io/sstable/SSTable.java index 601f5a0..8556cfa 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTable.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java @@ -168,14 +168,40 @@ public abstract class SSTable } /** - * @return Descriptor and Component pair. null if given file is not acceptable as SSTable component. - * If component is of unknown type, returns CUSTOM component. + * Parse a sstable filename into both a {@link Descriptor} and {@code Component} object. + * + * @param file the filename to parse. + * @return a pair of the {@code Descriptor} and {@code Component} corresponding to {@code file} if it corresponds to + * a valid and supported sstable filename, {@code null} otherwise. Note that components of an unknown type will be + * returned as CUSTOM ones. + */ + public static Pair<Descriptor, Component> tryComponentFromFilename(File file) + { + try + { + return Descriptor.fromFilenameWithComponent(file); + } + catch (Throwable e) + { + return null; + } + } + + /** + * Parse a sstable filename into a {@link Descriptor} object. + * <p> + * Note that this method ignores the component part of the filename; if this is not what you want, use + * {@link #tryComponentFromFilename} instead. + * + * @param file the filename to parse. + * @return the {@code Descriptor} corresponding to {@code file} if it corresponds to a valid and supported sstable + * filename, {@code null} otherwise. */ - public static Pair<Descriptor, Component> tryComponentFromFilename(File dir, String name) + public static Descriptor tryDescriptorFromFilename(File file) { try { - return Component.fromFilename(dir, name); + return Descriptor.fromFilename(file); } catch (Throwable e) { @@ -218,17 +244,9 @@ public abstract class SSTable Set<Component> components = Sets.newHashSetWithExpectedSize(knownTypes.size()); for (Component.Type componentType : knownTypes) { - if (componentType == Component.Type.DIGEST) - { - if (desc.digestComponent != null && new File(desc.filenameFor(desc.digestComponent)).exists()) - components.add(desc.digestComponent); - } - else - { - Component component = new Component(componentType); - if (new File(desc.filenameFor(component)).exists()) - components.add(component); - } + Component component = new Component(componentType); + if (new File(desc.filenameFor(component)).exists()) + components.add(component); } return components; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java index 043f6fa..e00de4a 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java @@ -85,7 +85,7 @@ public class SSTableLoader implements StreamEventHandler return false; } - Pair<Descriptor, Component> p = SSTable.tryComponentFromFilename(dir, name); + Pair<Descriptor, Component> p = SSTable.tryComponentFromFilename(file); Descriptor desc = p == null ? null : p.left; if (p == null || !p.right.equals(Component.DATA)) return false;