On-wire backward compatibility for 3.0 This adds support for mixed-version clusters with Cassandra 2.1 and 2.2.
Patch by Tyler Hobbs and Sylvain Lebresne for CASSANDRA-9704 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8c64cefd Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8c64cefd Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8c64cefd Branch: refs/heads/trunk Commit: 8c64cefd19d706003d4b33b333274dbf17c9cb34 Parents: 69f0b89 Author: Tyler Hobbs <tylerlho...@gmail.com> Authored: Fri Aug 7 17:42:18 2015 -0500 Committer: Tyler Hobbs <tylerlho...@gmail.com> Committed: Fri Aug 7 17:42:18 2015 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cql3/statements/ModificationStatement.java | 2 +- .../org/apache/cassandra/db/Clustering.java | 8 +- src/java/org/apache/cassandra/db/DataRange.java | 28 +- .../org/apache/cassandra/db/LegacyLayout.java | 922 +++++++++++++++-- .../apache/cassandra/db/PartitionColumns.java | 6 + .../cassandra/db/PartitionRangeReadCommand.java | 11 + .../cassandra/db/RangeSliceVerbHandler.java | 29 + .../org/apache/cassandra/db/ReadCommand.java | 995 ++++++++++++++++++- .../cassandra/db/ReadCommandVerbHandler.java | 9 +- .../org/apache/cassandra/db/ReadResponse.java | 250 ++++- .../db/SinglePartitionReadCommand.java | 11 +- src/java/org/apache/cassandra/db/Slice.java | 48 +- .../filter/AbstractClusteringIndexFilter.java | 20 - .../db/filter/ClusteringIndexFilter.java | 20 + .../db/filter/ClusteringIndexNamesFilter.java | 4 +- .../db/filter/ClusteringIndexSliceFilter.java | 4 +- .../cassandra/db/filter/ColumnFilter.java | 3 + .../apache/cassandra/db/filter/DataLimits.java | 12 +- .../db/marshal/AbstractCompositeType.java | 32 - .../cassandra/db/marshal/CompositeType.java | 26 + .../AbstractThreadUnsafePartition.java | 2 +- .../db/partitions/PartitionUpdate.java | 93 +- .../UnfilteredPartitionIterators.java | 13 +- .../cassandra/db/rows/BTreeBackedRow.java | 62 ++ src/java/org/apache/cassandra/db/rows/Row.java | 12 + .../apache/cassandra/net/MessagingService.java | 4 +- .../cassandra/service/AbstractReadExecutor.java | 5 +- .../apache/cassandra/service/DataResolver.java | 8 +- .../cassandra/service/DigestResolver.java | 6 +- .../apache/cassandra/service/ReadCallback.java | 4 +- .../apache/cassandra/service/StorageProxy.java | 4 +- .../cassandra/service/StorageService.java | 4 +- .../service/pager/RangeSliceQueryPager.java | 4 +- .../service/pager/SinglePartitionPager.java | 8 +- .../cassandra/thrift/CassandraServer.java | 4 +- 36 files changed, 2353 insertions(+), 321 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 216d3f7..0ba7b4e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.0-beta1 + * Support mixed-version clusters with Cassandra 2.1 and 2.2 (CASSANDRA-9704) * Fix multiple slices on RowSearchers (CASSANDRA-10002) * Fix bug in merging of collections (CASSANDRA-10001) * Optimize batchlog replay to avoid full scans (CASSANDRA-7237) http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java index 9f2c952..5fa1842 100644 --- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java @@ -544,7 +544,7 @@ public abstract class ModificationStatement implements CQLStatement key, new ClusteringIndexNamesFilter(clusterings, false))); - Map<DecoratedKey, Partition> map = new HashMap(); + Map<DecoratedKey, Partition> map = new HashMap<>(); SinglePartitionReadCommand.Group group = new SinglePartitionReadCommand.Group(commands, DataLimits.NONE); http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/Clustering.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Clustering.java b/src/java/org/apache/cassandra/db/Clustering.java index 7754182..a29ce65 100644 --- a/src/java/org/apache/cassandra/db/Clustering.java +++ b/src/java/org/apache/cassandra/db/Clustering.java @@ -57,10 +57,16 @@ public class Clustering extends AbstractClusteringPrefix } @Override - public String toString(CFMetaData metadata) + public String toString() { return "STATIC"; } + + @Override + public String toString(CFMetaData metadata) + { + return toString(); + } }; /** Empty clustering for tables having no clustering columns. */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/DataRange.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DataRange.java b/src/java/org/apache/cassandra/db/DataRange.java index 79b2448..ffe041e 100644 --- a/src/java/org/apache/cassandra/db/DataRange.java +++ b/src/java/org/apache/cassandra/db/DataRange.java @@ -149,6 +149,16 @@ public class DataRange } /** + * Whether the data range is for a paged request or not. + * + * @return true if for paging, false otherwise + */ + public boolean isPaging() + { + return false; + } + + /** * Whether the range queried by this {@code DataRange} actually wraps around. * * @return whether the range queried by this {@code DataRange} actually wraps around. @@ -307,7 +317,7 @@ public class DataRange * first queried partition (the one for that last result) so it only fetch results that follow that * last result. In other words, this makes sure this resume paging where we left off. */ - private static class Paging extends DataRange + public static class Paging extends DataRange { private final ClusteringComparator comparator; private final Clustering lastReturned; @@ -349,6 +359,20 @@ public class DataRange : new DataRange(range, clusteringIndexFilter); } + /** + * @return the last Clustering that was returned (in the previous page) + */ + public Clustering getLastReturned() + { + return lastReturned; + } + + @Override + public boolean isPaging() + { + return true; + } + @Override public boolean isUnrestricted() { @@ -358,7 +382,7 @@ public class DataRange @Override public String toString(CFMetaData metadata) { - return String.format("range=%s pfilter=%s lastReturned=%s (%s)", + return String.format("range=%s (paging) pfilter=%s lastReturned=%s (%s)", keyRange.getString(metadata.getKeyValidator()), clusteringIndexFilter.toString(metadata), lastReturned.toString(metadata), http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/LegacyLayout.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/LegacyLayout.java b/src/java/org/apache/cassandra/db/LegacyLayout.java index 696c1c9..50e5d04 100644 --- a/src/java/org/apache/cassandra/db/LegacyLayout.java +++ b/src/java/org/apache/cassandra/db/LegacyLayout.java @@ -25,10 +25,9 @@ import java.util.*; import com.google.common.collect.AbstractIterator; import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; import com.google.common.collect.PeekingIterator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.db.filter.ColumnFilter; @@ -38,6 +37,7 @@ import org.apache.cassandra.db.context.CounterContext; import org.apache.cassandra.db.marshal.*; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.utils.*; import static org.apache.cassandra.utils.ByteBufferUtil.bytes; @@ -47,14 +47,14 @@ import static org.apache.cassandra.utils.ByteBufferUtil.bytes; */ public abstract class LegacyLayout { - private static final Logger logger = LoggerFactory.getLogger(LegacyLayout.class); - public final static int MAX_CELL_NAME_LENGTH = FBUtilities.MAX_UNSIGNED_SHORT; - private final static int DELETION_MASK = 0x01; - private final static int EXPIRATION_MASK = 0x02; - private final static int COUNTER_MASK = 0x04; - private final static int COUNTER_UPDATE_MASK = 0x08; + public final static int STATIC_PREFIX = 0xFFFF; + + public final static int DELETION_MASK = 0x01; + public final static int EXPIRATION_MASK = 0x02; + public final static int COUNTER_MASK = 0x04; + public final static int COUNTER_UPDATE_MASK = 0x08; private final static int RANGE_TOMBSTONE_MASK = 0x10; private LegacyLayout() {} @@ -177,25 +177,69 @@ public abstract class LegacyLayout if (!bound.hasRemaining()) return isStart ? LegacyBound.BOTTOM : LegacyBound.TOP; - List<ByteBuffer> components = metadata.isCompound() - ? CompositeType.splitName(bound) - : Collections.singletonList(bound); + List<CompositeType.CompositeComponent> components = metadata.isCompound() + ? CompositeType.deconstruct(bound) + : Collections.singletonList(new CompositeType.CompositeComponent(bound, (byte) 0)); // Either it's a prefix of the clustering, or it's the bound of a collection range tombstone (and thus has // the collection column name) assert components.size() <= metadata.comparator.size() || (!metadata.isCompactTable() && components.size() == metadata.comparator.size() + 1); - List<ByteBuffer> prefix = components.size() <= metadata.comparator.size() ? components : components.subList(0, metadata.comparator.size()); - Slice.Bound sb = Slice.Bound.create(isStart ? Slice.Bound.Kind.INCL_START_BOUND : Slice.Bound.Kind.INCL_END_BOUND, - prefix.toArray(new ByteBuffer[prefix.size()])); + List<CompositeType.CompositeComponent> prefix = components.size() <= metadata.comparator.size() + ? components + : components.subList(0, metadata.comparator.size()); + Slice.Bound.Kind boundKind; + if (isStart) + { + if (components.get(components.size() - 1).eoc > 0) + boundKind = Slice.Bound.Kind.EXCL_START_BOUND; + else + boundKind = Slice.Bound.Kind.INCL_START_BOUND; + } + else + { + if (components.get(components.size() - 1).eoc < 0) + boundKind = Slice.Bound.Kind.EXCL_END_BOUND; + else + boundKind = Slice.Bound.Kind.INCL_END_BOUND; + } + + ByteBuffer[] prefixValues = new ByteBuffer[prefix.size()]; + for (int i = 0; i < prefix.size(); i++) + prefixValues[i] = prefix.get(i).value; + Slice.Bound sb = Slice.Bound.create(boundKind, prefixValues); ColumnDefinition collectionName = components.size() == metadata.comparator.size() + 1 - ? metadata.getColumnDefinition(components.get(metadata.comparator.size())) + ? metadata.getColumnDefinition(components.get(metadata.comparator.size()).value) : null; return new LegacyBound(sb, metadata.isCompound() && CompositeType.isStaticName(bound), collectionName); } - public static ByteBuffer encodeCellName(CFMetaData metadata, Clustering clustering, ByteBuffer columnName, ByteBuffer collectionElement) + public static ByteBuffer encodeBound(CFMetaData metadata, Slice.Bound bound, boolean isStart) + { + if (bound == Slice.Bound.BOTTOM || bound == Slice.Bound.TOP || metadata.comparator.size() == 0) + return ByteBufferUtil.EMPTY_BYTE_BUFFER; + + ClusteringPrefix clustering = bound.clustering(); + + if (!metadata.isCompound()) + { + assert clustering.size() == 1; + return clustering.get(0); + } + + CompositeType ctype = CompositeType.getInstance(metadata.comparator.subtypes()); + CompositeType.Builder builder = ctype.builder(); + for (int i = 0; i < clustering.size(); i++) + builder.add(clustering.get(i)); + + if (isStart) + return bound.isInclusive() ? builder.build() : builder.buildAsEndOfRange(); + else + return bound.isInclusive() ? builder.buildAsEndOfRange() : builder.build(); + } + + public static ByteBuffer encodeCellName(CFMetaData metadata, ClusteringPrefix clustering, ByteBuffer columnName, ByteBuffer collectionElement) { boolean isStatic = clustering == Clustering.STATIC_CLUSTERING; @@ -204,7 +248,7 @@ public abstract class LegacyLayout if (isStatic) return columnName; - assert clustering.size() == 1; + assert clustering.size() == 1 : "Expected clustering size to be 1, but was " + clustering.size(); return clustering.get(0); } @@ -253,8 +297,11 @@ public abstract class LegacyLayout return new Clustering(components.subList(0, Math.min(csize, components.size())).toArray(new ByteBuffer[csize])); } - public static ByteBuffer encodeClustering(CFMetaData metadata, Clustering clustering) + public static ByteBuffer encodeClustering(CFMetaData metadata, ClusteringPrefix clustering) { + if (clustering.size() == 0) + return ByteBufferUtil.EMPTY_BYTE_BUFFER; + if (!metadata.isCompound()) { assert clustering.size() == 1; @@ -268,14 +315,151 @@ public abstract class LegacyLayout } // For serializing to old wire format - public static Pair<DeletionInfo, Iterator<LegacyCell>> fromUnfilteredRowIterator(UnfilteredRowIterator iterator) + public static LegacyUnfilteredPartition fromUnfilteredRowIterator(UnfilteredRowIterator iterator) { // we need to extract the range tombstone so materialize the partition. Since this is // used for the on-wire format, this is not worst than it used to be. final ArrayBackedPartition partition = ArrayBackedPartition.create(iterator); DeletionInfo info = partition.deletionInfo(); - Iterator<LegacyCell> cells = fromRowIterator(partition.metadata(), partition.iterator(), partition.staticRow()); - return Pair.create(info, cells); + Pair<LegacyRangeTombstoneList, Iterator<LegacyCell>> pair = fromRowIterator(partition.metadata(), partition.iterator(), partition.staticRow()); + + LegacyLayout.LegacyRangeTombstoneList rtl = pair.left; + + // Processing the cell iterator results in the LegacyRangeTombstoneList being populated, so we do this + // before we use the LegacyRangeTombstoneList at all + List<LegacyLayout.LegacyCell> cells = Lists.newArrayList(pair.right); + + // The LegacyRangeTombstoneList already has range tombstones for the single-row deletions and complex + // deletions. Go through our normal range tombstones and add then to the LegacyRTL so that the range + // tombstones all get merged and sorted properly. + if (info.hasRanges()) + { + Iterator<RangeTombstone> rangeTombstoneIterator = info.rangeIterator(false); + while (rangeTombstoneIterator.hasNext()) + { + RangeTombstone rt = rangeTombstoneIterator.next(); + Slice slice = rt.deletedSlice(); + LegacyLayout.LegacyBound start = new LegacyLayout.LegacyBound(slice.start(), false, null); + LegacyLayout.LegacyBound end = new LegacyLayout.LegacyBound(slice.end(), false, null); + rtl.add(start, end, rt.deletionTime().markedForDeleteAt(), rt.deletionTime().localDeletionTime()); + } + } + + return new LegacyUnfilteredPartition(info.getPartitionDeletion(), rtl, cells); + } + + public static void serializeAsLegacyPartition(UnfilteredRowIterator partition, DataOutputPlus out, int version) throws IOException + { + assert version < MessagingService.VERSION_30; + + out.writeBoolean(true); + + LegacyLayout.LegacyUnfilteredPartition legacyPartition = LegacyLayout.fromUnfilteredRowIterator(partition); + + UUIDSerializer.serializer.serialize(partition.metadata().cfId, out, version); + DeletionTime.serializer.serialize(legacyPartition.partitionDeletion, out); + + legacyPartition.rangeTombstones.serialize(out, partition.metadata()); + + // begin cell serialization + out.writeInt(legacyPartition.cells.size()); + for (LegacyLayout.LegacyCell cell : legacyPartition.cells) + { + ByteBufferUtil.writeWithShortLength(cell.name.encode(partition.metadata()), out); + if (cell.kind == LegacyLayout.LegacyCell.Kind.EXPIRING) + { + out.writeByte(LegacyLayout.EXPIRATION_MASK); // serialization flags + out.writeInt(cell.ttl); + out.writeInt(cell.localDeletionTime); + } + else if (cell.kind == LegacyLayout.LegacyCell.Kind.DELETED) + { + out.writeByte(LegacyLayout.DELETION_MASK); // serialization flags + out.writeLong(cell.timestamp); + out.writeInt(TypeSizes.sizeof(cell.localDeletionTime)); + out.writeInt(cell.localDeletionTime); + continue; + } + else if (cell.kind == LegacyLayout.LegacyCell.Kind.COUNTER) + { + out.writeByte(LegacyLayout.COUNTER_MASK); // serialization flags + out.writeLong(Long.MIN_VALUE); // timestampOfLastDelete (not used, and MIN_VALUE is the default) + } + else + { + // normal cell + out.writeByte(0); // serialization flags + } + + out.writeLong(cell.timestamp); + ByteBufferUtil.writeWithLength(cell.value, out); + } + } + + // For the old wire format + // Note: this can return null if an empty partition is serialized! + public static UnfilteredRowIterator deserializeLegacyPartition(DataInputPlus in, int version, SerializationHelper.Flag flag, ByteBuffer key) throws IOException + { + assert version < MessagingService.VERSION_30; + + // This is only used in mutation, and mutation have never allowed "null" column families + boolean present = in.readBoolean(); + if (!present) + return null; + + CFMetaData metadata = CFMetaData.serializer.deserialize(in, version); + LegacyDeletionInfo info = LegacyDeletionInfo.deserialize(metadata, in); + int size = in.readInt(); + Iterator<LegacyCell> cells = deserializeCells(metadata, in, flag, size); + SerializationHelper helper = new SerializationHelper(metadata, version, flag); + return onWireCellstoUnfilteredRowIterator(metadata, metadata.partitioner.decorateKey(key), info, cells, false, helper); + } + + // For the old wire format + public static long serializedSizeAsLegacyPartition(UnfilteredRowIterator partition, int version) + { + assert version < MessagingService.VERSION_30; + + if (partition.isEmpty()) + return TypeSizes.sizeof(false); + + long size = TypeSizes.sizeof(true); + + LegacyLayout.LegacyUnfilteredPartition legacyPartition = LegacyLayout.fromUnfilteredRowIterator(partition); + + size += UUIDSerializer.serializer.serializedSize(partition.metadata().cfId, version); + size += DeletionTime.serializer.serializedSize(legacyPartition.partitionDeletion); + size += legacyPartition.rangeTombstones.serializedSize(partition.metadata()); + + // begin cell serialization + size += TypeSizes.sizeof(legacyPartition.cells.size()); + for (LegacyLayout.LegacyCell cell : legacyPartition.cells) + { + size += ByteBufferUtil.serializedSizeWithShortLength(cell.name.encode(partition.metadata())); + size += 1; // serialization flags + if (cell.kind == LegacyLayout.LegacyCell.Kind.EXPIRING) + { + size += TypeSizes.sizeof(cell.ttl); + size += TypeSizes.sizeof(cell.localDeletionTime); + } + else if (cell.kind == LegacyLayout.LegacyCell.Kind.DELETED) + { + size += TypeSizes.sizeof(cell.timestamp); + // localDeletionTime replaces cell.value as the body + size += TypeSizes.sizeof(TypeSizes.sizeof(cell.localDeletionTime)); + size += TypeSizes.sizeof(cell.localDeletionTime); + continue; + } + else if (cell.kind == LegacyLayout.LegacyCell.Kind.COUNTER) + { + size += TypeSizes.sizeof(Long.MIN_VALUE); // timestampOfLastDelete + } + + size += TypeSizes.sizeof(cell.timestamp); + size += ByteBufferUtil.serializedSizeWithLength(cell.value); + } + + return size; } // For thrift sake @@ -296,6 +480,7 @@ public abstract class LegacyLayout boolean reversed, SerializationHelper helper) { + // If the table is a static compact, the "column_metadata" are now internally encoded as // static. This has already been recognized by decodeCellName, but it means the cells // provided are not in the expected order (the "static" cells are not necessarily at the front). @@ -441,18 +626,27 @@ public abstract class LegacyLayout }; } - public static Iterator<LegacyCell> fromRowIterator(final RowIterator iterator) + public static Pair<LegacyRangeTombstoneList, Iterator<LegacyCell>> fromRowIterator(final RowIterator iterator) { return fromRowIterator(iterator.metadata(), iterator, iterator.staticRow()); } - public static Iterator<LegacyCell> fromRowIterator(final CFMetaData metadata, final Iterator<Row> iterator, final Row staticRow) + private static Pair<LegacyRangeTombstoneList, Iterator<LegacyCell>> fromRowIterator(final CFMetaData metadata, final Iterator<Row> iterator, final Row staticRow) { - return new AbstractIterator<LegacyCell>() + LegacyRangeTombstoneList deletions = new LegacyRangeTombstoneList(new LegacyBoundComparator(metadata.comparator), 10); + Iterator<LegacyCell> cells = new AbstractIterator<LegacyCell>() { - private Iterator<LegacyCell> currentRow = staticRow.isEmpty() - ? Collections.<LegacyLayout.LegacyCell>emptyIterator() - : fromRow(metadata, staticRow); + private Iterator<LegacyCell> currentRow = initializeRow(); + + private Iterator<LegacyCell> initializeRow() + { + if (staticRow == null || staticRow.isEmpty()) + return Collections.<LegacyLayout.LegacyCell>emptyIterator(); + + Pair<LegacyRangeTombstoneList, Iterator<LegacyCell>> row = fromRow(metadata, staticRow); + deletions.addAll(row.left); + return row.right; + } protected LegacyCell computeNext() { @@ -462,17 +656,58 @@ public abstract class LegacyLayout if (!iterator.hasNext()) return endOfData(); - currentRow = fromRow(metadata, iterator.next()); + Pair<LegacyRangeTombstoneList, Iterator<LegacyCell>> row = fromRow(metadata, iterator.next()); + deletions.addAll(row.left); + currentRow = row.right; return computeNext(); } }; + + return Pair.create(deletions, cells); } - private static Iterator<LegacyCell> fromRow(final CFMetaData metadata, final Row row) + private static Pair<LegacyRangeTombstoneList, Iterator<LegacyCell>> fromRow(final CFMetaData metadata, final Row row) { - return new AbstractIterator<LegacyCell>() + // convert any complex deletions or row deletion into normal range tombstones so that we can build and send a proper RangeTombstoneList + // to legacy nodes + LegacyRangeTombstoneList deletions = new LegacyRangeTombstoneList(new LegacyBoundComparator(metadata.comparator), 10); + + if (!row.deletion().isLive()) { - private final Iterator<Cell> cells = row.cells().iterator(); + Clustering clustering = row.clustering(); + Slice.Bound startBound = Slice.Bound.inclusiveStartOf(clustering); + Slice.Bound endBound = Slice.Bound.inclusiveEndOf(clustering); + + LegacyBound start = new LegacyLayout.LegacyBound(startBound, false, null); + LegacyBound end = new LegacyLayout.LegacyBound(endBound, false, null); + + deletions.add(start, end, row.deletion().markedForDeleteAt(), row.deletion().localDeletionTime()); + } + + for (ColumnData cd : row) + { + ColumnDefinition col = cd.column(); + if (col.isSimple()) + continue; + + DeletionTime delTime = ((ComplexColumnData)cd).complexDeletion(); + if (!delTime.isLive()) + { + Clustering clustering = row.clustering(); + + Slice.Bound startBound = Slice.Bound.inclusiveStartOf(clustering); + Slice.Bound endBound = Slice.Bound.inclusiveEndOf(clustering); + + LegacyLayout.LegacyBound start = new LegacyLayout.LegacyBound(startBound, col.isStatic(), col); + LegacyLayout.LegacyBound end = new LegacyLayout.LegacyBound(endBound, col.isStatic(), col); + + deletions.add(start, end, delTime.markedForDeleteAt(), delTime.localDeletionTime()); + } + } + + Iterator<LegacyCell> cells = new AbstractIterator<LegacyCell>() + { + private final Iterator<Cell> cells = row.cellsInLegacyOrder(metadata).iterator(); // we don't have (and shouldn't have) row markers for compact tables. private boolean hasReturnedRowMarker = metadata.isCompactTable(); @@ -481,18 +716,24 @@ public abstract class LegacyLayout if (!hasReturnedRowMarker) { hasReturnedRowMarker = true; - LegacyCellName cellName = new LegacyCellName(row.clustering(), null, null); - LivenessInfo info = row.primaryKeyLivenessInfo(); - return new LegacyCell(LegacyCell.Kind.REGULAR, cellName, ByteBufferUtil.EMPTY_BYTE_BUFFER, info.timestamp(), info.localExpirationTime(), info.ttl()); + + // don't include a row marker if there's no timestamp on the primary key; this is the 3.0+ equivalent + // of a row marker + if (!row.primaryKeyLivenessInfo().isEmpty()) + { + LegacyCellName cellName = new LegacyCellName(row.clustering(), null, null); + LivenessInfo info = row.primaryKeyLivenessInfo(); + return new LegacyCell(info.isExpiring() ? LegacyCell.Kind.EXPIRING : LegacyCell.Kind.REGULAR, cellName, ByteBufferUtil.EMPTY_BYTE_BUFFER, info.timestamp(), info.localExpirationTime(), info.ttl()); + } } if (!cells.hasNext()) return endOfData(); - Cell cell = cells.next(); - return makeLegacyCell(row.clustering(), cell); + return makeLegacyCell(row.clustering(), cells.next()); } }; + return Pair.create(deletions, cells); } private static LegacyCell makeLegacyCell(Clustering clustering, Cell cell) @@ -554,6 +795,9 @@ public abstract class LegacyLayout }; } + // Note that this doesn't exactly compare cells as they were pre-3.0 because within a row they sort columns like + // in 3.0, that is, with simple columns before complex columns. In other words, this comparator makes sure cells + // are in the proper order to convert them to actual 3.0 rows. public static Comparator<LegacyCellName> legacyCellNameComparator(final CFMetaData metadata, final boolean reversed) { return new Comparator<LegacyCellName>() @@ -591,13 +835,9 @@ public abstract class LegacyLayout assert c1.column.isRegular() || c1.column.isStatic(); assert c2.column.isRegular() || c2.column.isStatic(); - if (c1.column.kind != c2.column.kind) - return c1.column.isStatic() ? -1 : 1; - - AbstractType<?> cmp = metadata.getColumnDefinitionNameComparator(c1.column.kind); - int c = cmp.compare(c1.column.name.bytes, c2.column.name.bytes); - if (c != 0) - return c; + int cmp = c1.column.compareTo(c2.column); + if (cmp != 0) + return cmp; } assert (c1.collectionElement == null) == (c2.collectionElement == null); @@ -748,13 +988,6 @@ public abstract class LegacyLayout } } - public static LegacyRangeTombstone readLegacyRangeTombstone(CFMetaData metadata, DataInputPlus in) throws IOException - { - ByteBuffer boundname = ByteBufferUtil.readWithShortLength(in); - in.readUnsignedByte(); - return readLegacyRangeTombstoneBody(metadata, in, boundname); - } - public static LegacyRangeTombstone readLegacyRangeTombstoneBody(CFMetaData metadata, DataInputPlus in, ByteBuffer boundname) throws IOException { LegacyBound min = decodeBound(metadata, boundname, true); @@ -806,7 +1039,7 @@ public abstract class LegacyLayout public final CFMetaData metadata; private final boolean isStatic; private final SerializationHelper helper; - private Row.Builder builder; + private final Row.Builder builder; private Clustering clustering; private LegacyRangeTombstone rowDeletion; @@ -822,7 +1055,11 @@ public abstract class LegacyLayout this.metadata = metadata; this.isStatic = isStatic; this.helper = helper; - this.builder = BTreeBackedRow.sortedBuilder(isStatic ? metadata.partitionColumns().statics : metadata.partitionColumns().regulars); + // We cannot use a sorted builder because we don't have exactly the same ordering in 3.0 and pre-3.0. More precisely, within a row, we + // store all simple columns before the complex ones in 3.0, which we use to sort everything sorted by the column name before. Note however + // that the unsorted builder won't have to reconcile cells, so the exact value we pass for nowInSec doesn't matter. + this.builder = BTreeBackedRow.unsortedBuilder(isStatic ? metadata.partitionColumns().statics : metadata.partitionColumns().regulars, FBUtilities.nowInSeconds()); + } public static CellGrouper staticGrouper(CFMetaData metadata, SerializationHelper helper) @@ -939,6 +1176,21 @@ public abstract class LegacyLayout } } + public static class LegacyUnfilteredPartition + { + public final DeletionTime partitionDeletion; + public final LegacyRangeTombstoneList rangeTombstones; + public final List<LegacyCell> cells; + + private LegacyUnfilteredPartition(DeletionTime partitionDeletion, LegacyRangeTombstoneList rangeTombstones, List<LegacyCell> cells) + { + this.partitionDeletion = partitionDeletion; + this.rangeTombstones = rangeTombstones; + this.cells = cells; + } + } + + public static class LegacyCellName { public final Clustering clustering; @@ -987,7 +1239,7 @@ public abstract class LegacyLayout public final boolean isStatic; public final ColumnDefinition collectionName; - private LegacyBound(Slice.Bound bound, boolean isStatic, ColumnDefinition collectionName) + public LegacyBound(Slice.Bound bound, boolean isStatic, ColumnDefinition collectionName) { this.bound = bound; this.isStatic = isStatic; @@ -1131,10 +1383,7 @@ public abstract class LegacyLayout if (isTombstone()) return false; - if (isExpiring()) - return nowInSec < localDeletionTime; - - return true; + return !isExpiring() || nowInSec < localDeletionTime; } @Override @@ -1240,10 +1489,8 @@ public abstract class LegacyLayout public static class LegacyDeletionInfo { - public static final Serializer serializer = new Serializer(); - public final DeletionInfo deletionInfo; - private final List<LegacyRangeTombstone> inRowTombstones; + public final List<LegacyRangeTombstone> inRowTombstones; private LegacyDeletionInfo(DeletionInfo deletionInfo, List<LegacyRangeTombstone> inRowTombstones) { @@ -1253,7 +1500,17 @@ public abstract class LegacyLayout public static LegacyDeletionInfo from(DeletionInfo info) { - return new LegacyDeletionInfo(info, Collections.<LegacyRangeTombstone>emptyList()); + List<LegacyRangeTombstone> rangeTombstones = new ArrayList<>(info.rangeCount()); + Iterator<RangeTombstone> iterator = info.rangeIterator(false); + while (iterator.hasNext()) + { + RangeTombstone rt = iterator.next(); + Slice slice = rt.deletedSlice(); + rangeTombstones.add(new LegacyRangeTombstone(new LegacyBound(slice.start(), false, null), + new LegacyBound(slice.end(), false, null), + rt.deletionTime())); + } + return new LegacyDeletionInfo(info, rangeTombstones); } public static LegacyDeletionInfo live() @@ -1266,47 +1523,536 @@ public abstract class LegacyLayout return inRowTombstones.iterator(); } - public static class Serializer + public static LegacyDeletionInfo deserialize(CFMetaData metadata, DataInputPlus in) throws IOException { - public void serialize(CFMetaData metadata, LegacyDeletionInfo info, DataOutputPlus out, int version) throws IOException + DeletionTime topLevel = DeletionTime.serializer.deserialize(in); + + int rangeCount = in.readInt(); + if (rangeCount == 0) + return from(new MutableDeletionInfo(topLevel)); + + RangeTombstoneList ranges = new RangeTombstoneList(metadata.comparator, rangeCount); + List<LegacyRangeTombstone> inRowTombsones = new ArrayList<>(); + for (int i = 0; i < rangeCount; i++) { - throw new UnsupportedOperationException(); - //DeletionTime.serializer.serialize(info.topLevel, out); - //rtlSerializer.serialize(info.ranges, out, version); + LegacyBound start = decodeBound(metadata, ByteBufferUtil.readWithShortLength(in), true); + LegacyBound end = decodeBound(metadata, ByteBufferUtil.readWithShortLength(in), false); + int delTime = in.readInt(); + long markedAt = in.readLong(); + + LegacyRangeTombstone tombstone = new LegacyRangeTombstone(start, end, new DeletionTime(markedAt, delTime)); + if (tombstone.isCollectionTombstone() || tombstone.isRowDeletion(metadata)) + inRowTombsones.add(tombstone); + else + ranges.add(start.bound, end.bound, markedAt, delTime); + } + return new LegacyDeletionInfo(new MutableDeletionInfo(topLevel, ranges), inRowTombsones); + } + } + + /** + * A helper class for LegacyRangeTombstoneList. This replaces the Comparator<Composite> that RTL used before 3.0. + */ + private static class LegacyBoundComparator implements Comparator<LegacyBound> + { + ClusteringComparator clusteringComparator; + + public LegacyBoundComparator(ClusteringComparator clusteringComparator) + { + this.clusteringComparator = clusteringComparator; + } + + public int compare(LegacyBound a, LegacyBound b) + { + int result = this.clusteringComparator.compare(a.bound, b.bound); + if (result != 0) + return result; + + return UTF8Type.instance.compare(a.collectionName.name.bytes, b.collectionName.name.bytes); + } + } + + /** + * Almost an entire copy of RangeTombstoneList from C* 2.1. The main difference is that LegacyBoundComparator + * is used in place of Comparator<Composite> (because Composite doesn't exist any more). + * + * This class is needed to allow us to convert single-row deletions and complex deletions into range tombstones + * and properly merge them into the normal set of range tombstones. + */ + public static class LegacyRangeTombstoneList + { + private final LegacyBoundComparator comparator; + + // Note: we don't want to use a List for the markedAts and delTimes to avoid boxing. We could + // use a List for starts and ends, but having arrays everywhere is almost simpler. + private LegacyBound[] starts; + private LegacyBound[] ends; + private long[] markedAts; + private int[] delTimes; + + private int size; + + private LegacyRangeTombstoneList(LegacyBoundComparator comparator, LegacyBound[] starts, LegacyBound[] ends, long[] markedAts, int[] delTimes, int size) + { + assert starts.length == ends.length && starts.length == markedAts.length && starts.length == delTimes.length; + this.comparator = comparator; + this.starts = starts; + this.ends = ends; + this.markedAts = markedAts; + this.delTimes = delTimes; + this.size = size; + } + + public LegacyRangeTombstoneList(LegacyBoundComparator comparator, int capacity) + { + this(comparator, new LegacyBound[capacity], new LegacyBound[capacity], new long[capacity], new int[capacity], 0); + } + + public boolean isEmpty() + { + return size == 0; + } + + public int size() + { + return size; + } + + /** + * Adds a new range tombstone. + * + * This method will be faster if the new tombstone sort after all the currently existing ones (this is a common use case), + * but it doesn't assume it. + */ + public void add(LegacyBound start, LegacyBound end, long markedAt, int delTime) + { + if (isEmpty()) + { + addInternal(0, start, end, markedAt, delTime); + return; } - public LegacyDeletionInfo deserialize(CFMetaData metadata, DataInputPlus in, int version) throws IOException + int c = comparator.compare(ends[size-1], start); + + // Fast path if we add in sorted order + if (c <= 0) + { + addInternal(size, start, end, markedAt, delTime); + } + else { - DeletionTime topLevel = DeletionTime.serializer.deserialize(in); + // Note: insertFrom expect i to be the insertion point in term of interval ends + int pos = Arrays.binarySearch(ends, 0, size, start, comparator); + insertFrom((pos >= 0 ? pos : -pos-1), start, end, markedAt, delTime); + } + } - int rangeCount = in.readInt(); - if (rangeCount == 0) - return from(new MutableDeletionInfo(topLevel)); + /* + * Inserts a new element starting at index i. This method assumes that: + * ends[i-1] <= start <= ends[i] + * + * A RangeTombstoneList is a list of range [s_0, e_0]...[s_n, e_n] such that: + * - s_i <= e_i + * - e_i <= s_i+1 + * - if s_i == e_i and e_i == s_i+1 then s_i+1 < e_i+1 + * Basically, range are non overlapping except for their bound and in order. And while + * we allow ranges with the same value for the start and end, we don't allow repeating + * such range (so we can't have [0, 0][0, 0] even though it would respect the first 2 + * conditions). + * + */ + + /** + * Adds all the range tombstones of {@code tombstones} to this RangeTombstoneList. + */ + public void addAll(LegacyRangeTombstoneList tombstones) + { + if (tombstones.isEmpty()) + return; + + if (isEmpty()) + { + copyArrays(tombstones, this); + return; + } - RangeTombstoneList ranges = new RangeTombstoneList(metadata.comparator, rangeCount); - List<LegacyRangeTombstone> inRowTombsones = new ArrayList<>(); - for (int i = 0; i < rangeCount; i++) + /* + * We basically have 2 techniques we can use here: either we repeatedly call add() on tombstones values, + * or we do a merge of both (sorted) lists. If this lists is bigger enough than the one we add, then + * calling add() will be faster, otherwise it's merging that will be faster. + * + * Let's note that during memtables updates, it might not be uncommon that a new update has only a few range + * tombstones, while the CF we're adding it to (the one in the memtable) has many. In that case, using add() is + * likely going to be faster. + * + * In other cases however, like when diffing responses from multiple nodes, the tombstone lists we "merge" will + * be likely sized, so using add() might be a bit inefficient. + * + * Roughly speaking (this ignore the fact that updating an element is not exactly constant but that's not a big + * deal), if n is the size of this list and m is tombstones size, merging is O(n+m) while using add() is O(m*log(n)). + * + * But let's not crank up a logarithm computation for that. Long story short, merging will be a bad choice only + * if this list size is lot bigger that the other one, so let's keep it simple. + */ + if (size > 10 * tombstones.size) + { + for (int i = 0; i < tombstones.size; i++) + add(tombstones.starts[i], tombstones.ends[i], tombstones.markedAts[i], tombstones.delTimes[i]); + } + else + { + int i = 0; + int j = 0; + while (i < size && j < tombstones.size) { - LegacyBound start = decodeBound(metadata, ByteBufferUtil.readWithShortLength(in), true); - LegacyBound end = decodeBound(metadata, ByteBufferUtil.readWithShortLength(in), false); - int delTime = in.readInt(); - long markedAt = in.readLong(); - - LegacyRangeTombstone tombstone = new LegacyRangeTombstone(start, end, new DeletionTime(markedAt, delTime)); - if (tombstone.isCollectionTombstone() || tombstone.isRowDeletion(metadata)) - inRowTombsones.add(tombstone); + if (comparator.compare(tombstones.starts[j], ends[i]) <= 0) + { + insertFrom(i, tombstones.starts[j], tombstones.ends[j], tombstones.markedAts[j], tombstones.delTimes[j]); + j++; + } else - ranges.add(start.bound, end.bound, markedAt, delTime); + { + i++; + } } - return new LegacyDeletionInfo(new MutableDeletionInfo(topLevel, ranges), inRowTombsones); + // Addds the remaining ones from tombstones if any (note that addInternal will increment size if relevant). + for (; j < tombstones.size; j++) + addInternal(size, tombstones.starts[j], tombstones.ends[j], tombstones.markedAts[j], tombstones.delTimes[j]); } + } - public long serializedSize(CFMetaData metadata, LegacyDeletionInfo info, TypeSizes typeSizes, int version) + private static void copyArrays(LegacyRangeTombstoneList src, LegacyRangeTombstoneList dst) + { + dst.grow(src.size); + System.arraycopy(src.starts, 0, dst.starts, 0, src.size); + System.arraycopy(src.ends, 0, dst.ends, 0, src.size); + System.arraycopy(src.markedAts, 0, dst.markedAts, 0, src.size); + System.arraycopy(src.delTimes, 0, dst.delTimes, 0, src.size); + dst.size = src.size; + } + + private void insertFrom(int i, LegacyBound start, LegacyBound end, long markedAt, int delTime) + { + while (i < size) { - throw new UnsupportedOperationException(); - //long size = DeletionTime.serializer.serializedSize(info.topLevel, typeSizes); - //return size + rtlSerializer.serializedSize(info.ranges, typeSizes, version); + assert i == 0 || comparator.compare(ends[i-1], start) <= 0; + + int c = comparator.compare(start, ends[i]); + assert c <= 0; + if (c == 0) + { + // If start == ends[i], then we can insert from the next one (basically the new element + // really start at the next element), except for the case where starts[i] == ends[i]. + // In this latter case, if we were to move to next element, we could end up with ...[x, x][x, x]... + if (comparator.compare(starts[i], ends[i]) == 0) + { + // The current element cover a single value which is equal to the start of the inserted + // element. If the inserted element overwrites the current one, just remove the current + // (it's included in what we insert) and proceed with the insert. + if (markedAt > markedAts[i]) + { + removeInternal(i); + continue; + } + + // Otherwise (the current singleton interval override the new one), we want to leave the + // current element and move to the next, unless start == end since that means the new element + // is in fact fully covered by the current one (so we're done) + if (comparator.compare(start, end) == 0) + return; + } + i++; + continue; + } + + // Do we overwrite the current element? + if (markedAt > markedAts[i]) + { + // We do overwrite. + + // First deal with what might come before the newly added one. + if (comparator.compare(starts[i], start) < 0) + { + addInternal(i, starts[i], start, markedAts[i], delTimes[i]); + i++; + // We don't need to do the following line, but in spirit that's what we want to do + // setInternal(i, start, ends[i], markedAts, delTime]) + } + + // now, start <= starts[i] + + // Does the new element stops before/at the current one, + int endCmp = comparator.compare(end, starts[i]); + if (endCmp <= 0) + { + // Here start <= starts[i] and end <= starts[i] + // This means the current element is before the current one. However, one special + // case is if end == starts[i] and starts[i] == ends[i]. In that case, + // the new element entirely overwrite the current one and we can just overwrite + if (endCmp == 0 && comparator.compare(starts[i], ends[i]) == 0) + setInternal(i, start, end, markedAt, delTime); + else + addInternal(i, start, end, markedAt, delTime); + return; + } + + // Do we overwrite the current element fully? + int cmp = comparator.compare(ends[i], end); + if (cmp <= 0) + { + // We do overwrite fully: + // update the current element until it's end and continue + // on with the next element (with the new inserted start == current end). + + // If we're on the last element, we can optimize + if (i == size-1) + { + setInternal(i, start, end, markedAt, delTime); + return; + } + + setInternal(i, start, ends[i], markedAt, delTime); + if (cmp == 0) + return; + + start = ends[i]; + i++; + } + else + { + // We don't ovewrite fully. Insert the new interval, and then update the now next + // one to reflect the not overwritten parts. We're then done. + addInternal(i, start, end, markedAt, delTime); + i++; + setInternal(i, end, ends[i], markedAts[i], delTimes[i]); + return; + } + } + else + { + // we don't overwrite the current element + + // If the new interval starts before the current one, insert that new interval + if (comparator.compare(start, starts[i]) < 0) + { + // If we stop before the start of the current element, just insert the new + // interval and we're done; otherwise insert until the beginning of the + // current element + if (comparator.compare(end, starts[i]) <= 0) + { + addInternal(i, start, end, markedAt, delTime); + return; + } + addInternal(i, start, starts[i], markedAt, delTime); + i++; + } + + // After that, we're overwritten on the current element but might have + // some residual parts after ... + + // ... unless we don't extend beyond it. + if (comparator.compare(end, ends[i]) <= 0) + return; + + start = ends[i]; + i++; + } + } + + // If we got there, then just insert the remainder at the end + addInternal(i, start, end, markedAt, delTime); + } + private int capacity() + { + return starts.length; + } + + private void addInternal(int i, LegacyBound start, LegacyBound end, long markedAt, int delTime) + { + assert i >= 0; + + if (size == capacity()) + growToFree(i); + else if (i < size) + moveElements(i); + + setInternal(i, start, end, markedAt, delTime); + size++; + } + + private void removeInternal(int i) + { + assert i >= 0; + + System.arraycopy(starts, i+1, starts, i, size - i - 1); + System.arraycopy(ends, i+1, ends, i, size - i - 1); + System.arraycopy(markedAts, i+1, markedAts, i, size - i - 1); + System.arraycopy(delTimes, i+1, delTimes, i, size - i - 1); + + --size; + starts[size] = null; + ends[size] = null; + } + + /* + * Grow the arrays, leaving index i "free" in the process. + */ + private void growToFree(int i) + { + int newLength = (capacity() * 3) / 2 + 1; + grow(i, newLength); + } + + /* + * Grow the arrays to match newLength capacity. + */ + private void grow(int newLength) + { + if (capacity() < newLength) + grow(-1, newLength); + } + + private void grow(int i, int newLength) + { + starts = grow(starts, size, newLength, i); + ends = grow(ends, size, newLength, i); + markedAts = grow(markedAts, size, newLength, i); + delTimes = grow(delTimes, size, newLength, i); + } + + private static LegacyBound[] grow(LegacyBound[] a, int size, int newLength, int i) + { + if (i < 0 || i >= size) + return Arrays.copyOf(a, newLength); + + LegacyBound[] newA = new LegacyBound[newLength]; + System.arraycopy(a, 0, newA, 0, i); + System.arraycopy(a, i, newA, i+1, size - i); + return newA; + } + + private static long[] grow(long[] a, int size, int newLength, int i) + { + if (i < 0 || i >= size) + return Arrays.copyOf(a, newLength); + + long[] newA = new long[newLength]; + System.arraycopy(a, 0, newA, 0, i); + System.arraycopy(a, i, newA, i+1, size - i); + return newA; + } + + private static int[] grow(int[] a, int size, int newLength, int i) + { + if (i < 0 || i >= size) + return Arrays.copyOf(a, newLength); + + int[] newA = new int[newLength]; + System.arraycopy(a, 0, newA, 0, i); + System.arraycopy(a, i, newA, i+1, size - i); + return newA; + } + + /* + * Move elements so that index i is "free", assuming the arrays have at least one free slot at the end. + */ + private void moveElements(int i) + { + if (i >= size) + return; + + System.arraycopy(starts, i, starts, i+1, size - i); + System.arraycopy(ends, i, ends, i+1, size - i); + System.arraycopy(markedAts, i, markedAts, i+1, size - i); + System.arraycopy(delTimes, i, delTimes, i+1, size - i); + // we set starts[i] to null to indicate the position is now empty, so that we update boundaryHeapSize + // when we set it + starts[i] = null; + } + + private void setInternal(int i, LegacyBound start, LegacyBound end, long markedAt, int delTime) + { + starts[i] = start; + ends[i] = end; + markedAts[i] = markedAt; + delTimes[i] = delTime; + } + + public void serialize(DataOutputPlus out, CFMetaData metadata) throws IOException + { + out.writeInt(size); + if (size == 0) + return; + + List<AbstractType<?>> types = new ArrayList<>(comparator.clusteringComparator.subtypes()); + if (!metadata.isDense()) + types.add(UTF8Type.instance); + CompositeType type = CompositeType.getInstance(types); + + for (int i = 0; i < size; i++) + { + LegacyBound start = starts[i]; + LegacyBound end = ends[i]; + + CompositeType.Builder startBuilder = type.builder(); + CompositeType.Builder endBuilder = type.builder(); + for (int j = 0; j < start.bound.clustering().size(); j++) + { + startBuilder.add(start.bound.get(j)); + endBuilder.add(end.bound.get(j)); + } + + if (start.collectionName != null) + startBuilder.add(start.collectionName.name.bytes); + if (end.collectionName != null) + endBuilder.add(end.collectionName.name.bytes); + + ByteBufferUtil.writeWithShortLength(startBuilder.build(), out); + ByteBufferUtil.writeWithShortLength(endBuilder.buildAsEndOfRange(), out); + + out.writeInt(delTimes[i]); + out.writeLong(markedAts[i]); + } + } + + public long serializedSize(CFMetaData metadata) + { + long size = 0; + size += TypeSizes.sizeof(this.size); + + if (this.size == 0) + return size; + + List<AbstractType<?>> types = new ArrayList<>(comparator.clusteringComparator.subtypes()); + if (!metadata.isDense()) + types.add(UTF8Type.instance); + CompositeType type = CompositeType.getInstance(types); + + for (int i = 0; i < this.size; i++) + { + LegacyBound start = starts[i]; + LegacyBound end = ends[i]; + + CompositeType.Builder startBuilder = type.builder(); + CompositeType.Builder endBuilder = type.builder(); + for (int j = 0; j < start.bound.clustering().size(); j++) + { + startBuilder.add(start.bound.get(j)); + endBuilder.add(end.bound.get(j)); + } + + if (start.collectionName != null) + startBuilder.add(start.collectionName.name.bytes); + if (end.collectionName != null) + endBuilder.add(end.collectionName.name.bytes); + + size += ByteBufferUtil.serializedSizeWithShortLength(startBuilder.build()); + size += ByteBufferUtil.serializedSizeWithShortLength(endBuilder.buildAsEndOfRange()); + + size += TypeSizes.sizeof(delTimes[i]); + size += TypeSizes.sizeof(markedAts[i]); } + return size; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/PartitionColumns.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/PartitionColumns.java b/src/java/org/apache/cassandra/db/PartitionColumns.java index 5f1da8a..aa60198 100644 --- a/src/java/org/apache/cassandra/db/PartitionColumns.java +++ b/src/java/org/apache/cassandra/db/PartitionColumns.java @@ -91,6 +91,12 @@ public class PartitionColumns implements Iterable<ColumnDefinition> return Iterators.concat(statics.selectOrderIterator(), regulars.selectOrderIterator()); } + /** * Returns the total number of static and regular columns. */ + public int size() + { + return regulars.columnCount() + statics.columnCount(); + } + @Override public String toString() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java index 18b6950..2219a84 100644 --- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java +++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java @@ -37,6 +37,8 @@ import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.metrics.TableMetrics; +import org.apache.cassandra.net.MessageOut; +import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.*; import org.apache.cassandra.service.pager.*; import org.apache.cassandra.thrift.ThriftResultsMerger; @@ -226,6 +228,15 @@ public class PartitionRangeReadCommand extends ReadCommand }; } + @SuppressWarnings("deprecation") + protected MessageOut<ReadCommand> createLegacyMessage() + { + if (this.dataRange.isPaging()) + return new MessageOut<>(MessagingService.Verb.PAGED_RANGE, this, legacyPagedRangeCommandSerializer); + else + return new MessageOut<>(MessagingService.Verb.RANGE_SLICE, this, legacyRangeSliceCommandSerializer); + } + protected void appendCQLWhereClause(StringBuilder sb) { if (dataRange.isUnrestricted() && rowFilter().isEmpty()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/RangeSliceVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RangeSliceVerbHandler.java b/src/java/org/apache/cassandra/db/RangeSliceVerbHandler.java new file mode 100644 index 0000000..3f1d660 --- /dev/null +++ b/src/java/org/apache/cassandra/db/RangeSliceVerbHandler.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import org.apache.cassandra.io.IVersionedSerializer; + +public class RangeSliceVerbHandler extends ReadCommandVerbHandler +{ + @Override + protected IVersionedSerializer<ReadResponse> serializer() + { + return ReadResponse.legacyRangeSliceReplySerializer; + } +}