Record previous row size

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/525855d2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/525855d2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/525855d2

Branch: refs/heads/10378
Commit: 525855d2f37b2fe9376b4ce2dab9107d0d227f6a
Parents: 424b59a
Author: Sylvain Lebresne <sylv...@datastax.com>
Authored: Wed Sep 23 14:36:04 2015 -0700
Committer: Sylvain Lebresne <sylv...@datastax.com>
Committed: Wed Sep 23 14:36:04 2015 -0700

----------------------------------------------------------------------
 .../org/apache/cassandra/db/ColumnIndex.java    | 10 ++-
 .../rows/UnfilteredRowIteratorSerializer.java   |  2 +
 .../cassandra/db/rows/UnfilteredSerializer.java | 69 +++++++++++++++-----
 3 files changed, 60 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/525855d2/src/java/org/apache/cassandra/db/ColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java 
b/src/java/org/apache/cassandra/db/ColumnIndex.java
index add5fa7..ede3f79 100644
--- a/src/java/org/apache/cassandra/db/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/db/ColumnIndex.java
@@ -76,6 +76,7 @@ public class ColumnIndex
         private long startPosition = -1;
 
         private int written;
+        private long previousRowStart;
 
         private ClusteringPrefix firstClustering;
         private ClusteringPrefix lastClustering;
@@ -99,7 +100,7 @@ public class ColumnIndex
             
ByteBufferUtil.writeWithShortLength(iterator.partitionKey().getKey(), writer);
             
DeletionTime.serializer.serialize(iterator.partitionLevelDeletion(), writer);
             if (header.hasStatic())
-                
UnfilteredSerializer.serializer.serialize(iterator.staticRow(), header, writer, 
version);
+                
UnfilteredSerializer.serializer.serializeStaticRow(iterator.staticRow(), 
header, writer, version);
         }
 
         public ColumnIndex build() throws IOException
@@ -131,15 +132,18 @@ public class ColumnIndex
 
         private void add(Unfiltered unfiltered) throws IOException
         {
+            long pos = currentPosition();
+
             if (firstClustering == null)
             {
                 // Beginning of an index block. Remember the start and position
                 firstClustering = unfiltered.clustering();
-                startPosition = currentPosition();
+                startPosition = pos;
             }
 
-            UnfilteredSerializer.serializer.serialize(unfiltered, header, 
writer, version);
+            UnfilteredSerializer.serializer.serialize(unfiltered, header, 
writer, pos - previousRowStart, version);
             lastClustering = unfiltered.clustering();
+            previousRowStart = pos;
             ++written;
 
             if (unfiltered.kind() == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/525855d2/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java 
b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
index 3c5cdbf..3a0558e 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
@@ -90,6 +90,8 @@ public class UnfilteredRowIteratorSerializer
     // Should only be used for the on-wire format.
     public void serialize(UnfilteredRowIterator iterator, SerializationHeader 
header, ColumnFilter selection, DataOutputPlus out, int version, int 
rowEstimate) throws IOException
     {
+        assert !header.isForSSTable();
+
         ByteBufferUtil.writeWithVIntLength(iterator.partitionKey().getKey(), 
out);
 
         int flags = 0;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/525855d2/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java 
b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
index 1f77529..fac8863 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
@@ -92,17 +92,31 @@ public class UnfilteredSerializer
     public void serialize(Unfiltered unfiltered, SerializationHeader header, 
DataOutputPlus out, int version)
     throws IOException
     {
+        assert !header.isForSSTable();
+        serialize(unfiltered, header, out, 0, version);
+    }
+
+    public void serialize(Unfiltered unfiltered, SerializationHeader header, 
DataOutputPlus out, long previousUnfilteredSize, int version)
+    throws IOException
+    {
         if (unfiltered.kind() == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER)
         {
-            serialize((RangeTombstoneMarker) unfiltered, header, out, version);
+            serialize((RangeTombstoneMarker) unfiltered, header, out, 
previousUnfilteredSize, version);
         }
         else
         {
-            serialize((Row) unfiltered, header, out, version);
+            serialize((Row) unfiltered, header, out, previousUnfilteredSize, 
version);
         }
     }
 
-    public void serialize(Row row, SerializationHeader header, DataOutputPlus 
out, int version)
+    public void serializeStaticRow(Row row, SerializationHeader header, 
DataOutputPlus out, int version)
+    throws IOException
+    {
+        assert row.isStatic();
+        serialize(row, header, out, 0, version);
+    }
+
+    private void serialize(Row row, SerializationHeader header, DataOutputPlus 
out, long previousUnfilteredSize, int version)
     throws IOException
     {
         int flags = 0;
@@ -151,7 +165,10 @@ public class UnfilteredSerializer
             Clustering.serializer.serialize(row.clustering(), out, version, 
header.clusteringTypes());
 
         if (header.isForSSTable())
-            out.writeUnsignedVInt(serializedRowBodySize(row, header, version));
+        {
+            out.writeUnsignedVInt(serializedRowBodySize(row, header, 
previousUnfilteredSize, version));
+            out.writeUnsignedVInt(previousUnfilteredSize);
+        }
 
         if ((flags & HAS_TIMESTAMP) != 0)
             header.writeTimestamp(pkLiveness.timestamp(), out);
@@ -186,14 +203,17 @@ public class UnfilteredSerializer
             Cell.serializer.serialize(cell, out, rowLiveness, header);
     }
 
-    public void serialize(RangeTombstoneMarker marker, SerializationHeader 
header, DataOutputPlus out, int version)
+    private void serialize(RangeTombstoneMarker marker, SerializationHeader 
header, DataOutputPlus out, long previousUnfilteredSize, int version)
     throws IOException
     {
         out.writeByte((byte)IS_MARKER);
         RangeTombstone.Bound.serializer.serialize(marker.clustering(), out, 
version, header.clusteringTypes());
 
         if (header.isForSSTable())
-            out.writeUnsignedVInt(serializedMarkerBodySize(marker, header, 
version));
+        {
+            out.writeUnsignedVInt(serializedMarkerBodySize(marker, header, 
previousUnfilteredSize, version));
+            out.writeUnsignedVInt(previousUnfilteredSize);
+        }
 
         if (marker.isBoundary())
         {
@@ -214,8 +234,9 @@ public class UnfilteredSerializer
              : serializedSize((Row) unfiltered, header, version);
     }
 
-    public long serializedSize(Row row, SerializationHeader header, int 
version)
+    private long serializedSize(Row row, SerializationHeader header, int 
version)
     {
+        assert !header.isForSSTable();
         long size = 1; // flags
 
         if (row.isStatic() || row.deletion().isShadowable())
@@ -224,13 +245,16 @@ public class UnfilteredSerializer
         if (!row.isStatic())
             size += Clustering.serializer.serializedSize(row.clustering(), 
version, header.clusteringTypes());
 
-        return size + serializedRowBodySize(row, header, version);
+        return size + serializedRowBodySize(row, header, 0, version);
     }
 
-    public long serializedRowBodySize(Row row, SerializationHeader header, int 
version)
+    private long serializedRowBodySize(Row row, SerializationHeader header, 
long previousUnfilteredSize, int version)
     {
         long size = 0;
 
+        if (header.isForSSTable())
+            size += TypeSizes.sizeofUnsignedVInt(previousUnfilteredSize);
+
         boolean isStatic = row.isStatic();
         Columns headerColumns = header.columns(isStatic);
         LivenessInfo pkLiveness = row.primaryKeyLivenessInfo();
@@ -238,7 +262,6 @@ public class UnfilteredSerializer
         boolean hasComplexDeletion = row.hasComplexDeletion();
         boolean hasAllColumns = (row.size() == headerColumns.size());
 
-
         if (!pkLiveness.isEmpty())
             size += header.timestampSerializedSize(pkLiveness.timestamp());
         if (pkLiveness.isExpiring())
@@ -277,16 +300,20 @@ public class UnfilteredSerializer
         return size;
     }
 
-    public long serializedSize(RangeTombstoneMarker marker, 
SerializationHeader header, int version)
+    private long serializedSize(RangeTombstoneMarker marker, 
SerializationHeader header, int version)
     {
+        assert !header.isForSSTable();
         return 1 // flags
              + 
RangeTombstone.Bound.serializer.serializedSize(marker.clustering(), version, 
header.clusteringTypes())
-             + serializedMarkerBodySize(marker, header, version);
+             + serializedMarkerBodySize(marker, header, 0, version);
     }
 
-    public long serializedMarkerBodySize(RangeTombstoneMarker marker, 
SerializationHeader header, int version)
+    private long serializedMarkerBodySize(RangeTombstoneMarker marker, 
SerializationHeader header, long previousUnfilteredSize, int version)
     {
         long size = 0;
+        if (header.isForSSTable())
+            size += TypeSizes.sizeofUnsignedVInt(previousUnfilteredSize);
+
         if (marker.isBoundary())
         {
             RangeTombstoneBoundaryMarker bm = 
(RangeTombstoneBoundaryMarker)marker;
@@ -349,8 +376,11 @@ public class UnfilteredSerializer
     public RangeTombstoneMarker deserializeMarkerBody(DataInputPlus in, 
SerializationHeader header, RangeTombstone.Bound bound)
     throws IOException
     {
-            if (header.isForSSTable())
-                in.readUnsignedVInt(); // Skip marker size
+        if (header.isForSSTable())
+        {
+            in.readUnsignedVInt(); // marker size
+            in.readUnsignedVInt(); // previous unfiltered size
+        }
 
         if (bound.isBoundary())
             return new RangeTombstoneBoundaryMarker(bound, 
header.readDeletionTime(in), header.readDeletionTime(in));
@@ -368,9 +398,6 @@ public class UnfilteredSerializer
     {
         try
         {
-            if (header.isForSSTable())
-                in.readUnsignedVInt(); // Skip row size
-
             boolean isStatic = isStatic(extendedFlags);
             boolean hasTimestamp = (flags & HAS_TIMESTAMP) != 0;
             boolean hasTTL = (flags & HAS_TTL) != 0;
@@ -380,6 +407,12 @@ public class UnfilteredSerializer
             boolean hasAllColumns = (flags & HAS_ALL_COLUMNS) != 0;
             Columns headerColumns = header.columns(isStatic);
 
+            if (header.isForSSTable())
+            {
+                in.readUnsignedVInt(); // Skip row size
+                in.readUnsignedVInt(); // previous unfiltered size
+            }
+
             LivenessInfo rowLiveness = LivenessInfo.EMPTY;
             if (hasTimestamp)
             {

Reply via email to