Record previous row size
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/525855d2 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/525855d2 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/525855d2 Branch: refs/heads/10378 Commit: 525855d2f37b2fe9376b4ce2dab9107d0d227f6a Parents: 424b59a Author: Sylvain Lebresne <sylv...@datastax.com> Authored: Wed Sep 23 14:36:04 2015 -0700 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Wed Sep 23 14:36:04 2015 -0700 ---------------------------------------------------------------------- .../org/apache/cassandra/db/ColumnIndex.java | 10 ++- .../rows/UnfilteredRowIteratorSerializer.java | 2 + .../cassandra/db/rows/UnfilteredSerializer.java | 69 +++++++++++++++----- 3 files changed, 60 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/525855d2/src/java/org/apache/cassandra/db/ColumnIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java index add5fa7..ede3f79 100644 --- a/src/java/org/apache/cassandra/db/ColumnIndex.java +++ b/src/java/org/apache/cassandra/db/ColumnIndex.java @@ -76,6 +76,7 @@ public class ColumnIndex private long startPosition = -1; private int written; + private long previousRowStart; private ClusteringPrefix firstClustering; private ClusteringPrefix lastClustering; @@ -99,7 +100,7 @@ public class ColumnIndex ByteBufferUtil.writeWithShortLength(iterator.partitionKey().getKey(), writer); DeletionTime.serializer.serialize(iterator.partitionLevelDeletion(), writer); if (header.hasStatic()) - UnfilteredSerializer.serializer.serialize(iterator.staticRow(), header, writer, version); + UnfilteredSerializer.serializer.serializeStaticRow(iterator.staticRow(), header, writer, version); } public ColumnIndex build() throws IOException @@ -131,15 +132,18 @@ public class ColumnIndex private void add(Unfiltered unfiltered) throws IOException { + long pos = currentPosition(); + if (firstClustering == null) { // Beginning of an index block. Remember the start and position firstClustering = unfiltered.clustering(); - startPosition = currentPosition(); + startPosition = pos; } - UnfilteredSerializer.serializer.serialize(unfiltered, header, writer, version); + UnfilteredSerializer.serializer.serialize(unfiltered, header, writer, pos - previousRowStart, version); lastClustering = unfiltered.clustering(); + previousRowStart = pos; ++written; if (unfiltered.kind() == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER) http://git-wip-us.apache.org/repos/asf/cassandra/blob/525855d2/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java index 3c5cdbf..3a0558e 100644 --- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java +++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java @@ -90,6 +90,8 @@ public class UnfilteredRowIteratorSerializer // Should only be used for the on-wire format. public void serialize(UnfilteredRowIterator iterator, SerializationHeader header, ColumnFilter selection, DataOutputPlus out, int version, int rowEstimate) throws IOException { + assert !header.isForSSTable(); + ByteBufferUtil.writeWithVIntLength(iterator.partitionKey().getKey(), out); int flags = 0; http://git-wip-us.apache.org/repos/asf/cassandra/blob/525855d2/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java index 1f77529..fac8863 100644 --- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java +++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java @@ -92,17 +92,31 @@ public class UnfilteredSerializer public void serialize(Unfiltered unfiltered, SerializationHeader header, DataOutputPlus out, int version) throws IOException { + assert !header.isForSSTable(); + serialize(unfiltered, header, out, 0, version); + } + + public void serialize(Unfiltered unfiltered, SerializationHeader header, DataOutputPlus out, long previousUnfilteredSize, int version) + throws IOException + { if (unfiltered.kind() == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER) { - serialize((RangeTombstoneMarker) unfiltered, header, out, version); + serialize((RangeTombstoneMarker) unfiltered, header, out, previousUnfilteredSize, version); } else { - serialize((Row) unfiltered, header, out, version); + serialize((Row) unfiltered, header, out, previousUnfilteredSize, version); } } - public void serialize(Row row, SerializationHeader header, DataOutputPlus out, int version) + public void serializeStaticRow(Row row, SerializationHeader header, DataOutputPlus out, int version) + throws IOException + { + assert row.isStatic(); + serialize(row, header, out, 0, version); + } + + private void serialize(Row row, SerializationHeader header, DataOutputPlus out, long previousUnfilteredSize, int version) throws IOException { int flags = 0; @@ -151,7 +165,10 @@ public class UnfilteredSerializer Clustering.serializer.serialize(row.clustering(), out, version, header.clusteringTypes()); if (header.isForSSTable()) - out.writeUnsignedVInt(serializedRowBodySize(row, header, version)); + { + out.writeUnsignedVInt(serializedRowBodySize(row, header, previousUnfilteredSize, version)); + out.writeUnsignedVInt(previousUnfilteredSize); + } if ((flags & HAS_TIMESTAMP) != 0) header.writeTimestamp(pkLiveness.timestamp(), out); @@ -186,14 +203,17 @@ public class UnfilteredSerializer Cell.serializer.serialize(cell, out, rowLiveness, header); } - public void serialize(RangeTombstoneMarker marker, SerializationHeader header, DataOutputPlus out, int version) + private void serialize(RangeTombstoneMarker marker, SerializationHeader header, DataOutputPlus out, long previousUnfilteredSize, int version) throws IOException { out.writeByte((byte)IS_MARKER); RangeTombstone.Bound.serializer.serialize(marker.clustering(), out, version, header.clusteringTypes()); if (header.isForSSTable()) - out.writeUnsignedVInt(serializedMarkerBodySize(marker, header, version)); + { + out.writeUnsignedVInt(serializedMarkerBodySize(marker, header, previousUnfilteredSize, version)); + out.writeUnsignedVInt(previousUnfilteredSize); + } if (marker.isBoundary()) { @@ -214,8 +234,9 @@ public class UnfilteredSerializer : serializedSize((Row) unfiltered, header, version); } - public long serializedSize(Row row, SerializationHeader header, int version) + private long serializedSize(Row row, SerializationHeader header, int version) { + assert !header.isForSSTable(); long size = 1; // flags if (row.isStatic() || row.deletion().isShadowable()) @@ -224,13 +245,16 @@ public class UnfilteredSerializer if (!row.isStatic()) size += Clustering.serializer.serializedSize(row.clustering(), version, header.clusteringTypes()); - return size + serializedRowBodySize(row, header, version); + return size + serializedRowBodySize(row, header, 0, version); } - public long serializedRowBodySize(Row row, SerializationHeader header, int version) + private long serializedRowBodySize(Row row, SerializationHeader header, long previousUnfilteredSize, int version) { long size = 0; + if (header.isForSSTable()) + size += TypeSizes.sizeofUnsignedVInt(previousUnfilteredSize); + boolean isStatic = row.isStatic(); Columns headerColumns = header.columns(isStatic); LivenessInfo pkLiveness = row.primaryKeyLivenessInfo(); @@ -238,7 +262,6 @@ public class UnfilteredSerializer boolean hasComplexDeletion = row.hasComplexDeletion(); boolean hasAllColumns = (row.size() == headerColumns.size()); - if (!pkLiveness.isEmpty()) size += header.timestampSerializedSize(pkLiveness.timestamp()); if (pkLiveness.isExpiring()) @@ -277,16 +300,20 @@ public class UnfilteredSerializer return size; } - public long serializedSize(RangeTombstoneMarker marker, SerializationHeader header, int version) + private long serializedSize(RangeTombstoneMarker marker, SerializationHeader header, int version) { + assert !header.isForSSTable(); return 1 // flags + RangeTombstone.Bound.serializer.serializedSize(marker.clustering(), version, header.clusteringTypes()) - + serializedMarkerBodySize(marker, header, version); + + serializedMarkerBodySize(marker, header, 0, version); } - public long serializedMarkerBodySize(RangeTombstoneMarker marker, SerializationHeader header, int version) + private long serializedMarkerBodySize(RangeTombstoneMarker marker, SerializationHeader header, long previousUnfilteredSize, int version) { long size = 0; + if (header.isForSSTable()) + size += TypeSizes.sizeofUnsignedVInt(previousUnfilteredSize); + if (marker.isBoundary()) { RangeTombstoneBoundaryMarker bm = (RangeTombstoneBoundaryMarker)marker; @@ -349,8 +376,11 @@ public class UnfilteredSerializer public RangeTombstoneMarker deserializeMarkerBody(DataInputPlus in, SerializationHeader header, RangeTombstone.Bound bound) throws IOException { - if (header.isForSSTable()) - in.readUnsignedVInt(); // Skip marker size + if (header.isForSSTable()) + { + in.readUnsignedVInt(); // marker size + in.readUnsignedVInt(); // previous unfiltered size + } if (bound.isBoundary()) return new RangeTombstoneBoundaryMarker(bound, header.readDeletionTime(in), header.readDeletionTime(in)); @@ -368,9 +398,6 @@ public class UnfilteredSerializer { try { - if (header.isForSSTable()) - in.readUnsignedVInt(); // Skip row size - boolean isStatic = isStatic(extendedFlags); boolean hasTimestamp = (flags & HAS_TIMESTAMP) != 0; boolean hasTTL = (flags & HAS_TTL) != 0; @@ -380,6 +407,12 @@ public class UnfilteredSerializer boolean hasAllColumns = (flags & HAS_ALL_COLUMNS) != 0; Columns headerColumns = header.columns(isStatic); + if (header.isForSSTable()) + { + in.readUnsignedVInt(); // Skip row size + in.readUnsignedVInt(); // previous unfiltered size + } + LivenessInfo rowLiveness = LivenessInfo.EMPTY; if (hasTimestamp) {