Repository: cassandra Updated Branches: refs/heads/trunk f8358b8ea -> e8d8941cc
Add row size to sstable format for faster skipping patch by slebresne; reviewed by benedict for CASSANDRA-10378 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6584331c Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6584331c Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6584331c Branch: refs/heads/trunk Commit: 6584331c881329c2cb9afbcef19997e8a2a612d9 Parents: 028c3f7 Author: Sylvain Lebresne <sylv...@datastax.com> Authored: Tue Sep 22 13:53:22 2015 -0700 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Fri Oct 9 18:07:01 2015 +0200 ---------------------------------------------------------------------- .../org/apache/cassandra/db/ColumnIndex.java | 10 +- src/java/org/apache/cassandra/db/Memtable.java | 2 +- .../cassandra/db/SerializationHeader.java | 26 ++- .../cassandra/db/UnfilteredDeserializer.java | 8 +- .../rows/UnfilteredRowIteratorSerializer.java | 10 +- .../cassandra/db/rows/UnfilteredSerializer.java | 174 +++++++++++-------- .../io/sstable/AbstractSSTableSimpleWriter.java | 2 +- .../io/sstable/SSTableSimpleUnsortedWriter.java | 4 +- .../apache/cassandra/db/RowIndexEntryTest.java | 4 +- .../unit/org/apache/cassandra/db/ScrubTest.java | 3 +- .../db/compaction/AntiCompactionTest.java | 2 +- .../io/sstable/BigTableWriterTest.java | 2 +- .../io/sstable/SSTableRewriterTest.java | 4 +- .../cassandra/io/sstable/SSTableUtils.java | 2 +- 14 files changed, 155 insertions(+), 98 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/6584331c/src/java/org/apache/cassandra/db/ColumnIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java index add5fa7..ede3f79 100644 --- a/src/java/org/apache/cassandra/db/ColumnIndex.java +++ b/src/java/org/apache/cassandra/db/ColumnIndex.java @@ -76,6 +76,7 @@ public class ColumnIndex private long startPosition = -1; private int written; + private long previousRowStart; private ClusteringPrefix firstClustering; private ClusteringPrefix lastClustering; @@ -99,7 +100,7 @@ public class ColumnIndex ByteBufferUtil.writeWithShortLength(iterator.partitionKey().getKey(), writer); DeletionTime.serializer.serialize(iterator.partitionLevelDeletion(), writer); if (header.hasStatic()) - UnfilteredSerializer.serializer.serialize(iterator.staticRow(), header, writer, version); + UnfilteredSerializer.serializer.serializeStaticRow(iterator.staticRow(), header, writer, version); } public ColumnIndex build() throws IOException @@ -131,15 +132,18 @@ public class ColumnIndex private void add(Unfiltered unfiltered) throws IOException { + long pos = currentPosition(); + if (firstClustering == null) { // Beginning of an index block. Remember the start and position firstClustering = unfiltered.clustering(); - startPosition = currentPosition(); + startPosition = pos; } - UnfilteredSerializer.serializer.serialize(unfiltered, header, writer, version); + UnfilteredSerializer.serializer.serialize(unfiltered, header, writer, pos - previousRowStart, version); lastClustering = unfiltered.clustering(); + previousRowStart = pos; ++written; if (unfiltered.kind() == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER) http://git-wip-us.apache.org/repos/asf/cassandra/blob/6584331c/src/java/org/apache/cassandra/db/Memtable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java index 014467e..f47efe3 100644 --- a/src/java/org/apache/cassandra/db/Memtable.java +++ b/src/java/org/apache/cassandra/db/Memtable.java @@ -430,7 +430,7 @@ public class Memtable implements Comparable<Memtable> (long)partitions.size(), ActiveRepairService.UNREPAIRED_SSTABLE, sstableMetadataCollector, - new SerializationHeader(cfs.metadata, columns, stats), + new SerializationHeader(true, cfs.metadata, columns, stats), txn)); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6584331c/src/java/org/apache/cassandra/db/SerializationHeader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SerializationHeader.java b/src/java/org/apache/cassandra/db/SerializationHeader.java index decac49..0706d06 100644 --- a/src/java/org/apache/cassandra/db/SerializationHeader.java +++ b/src/java/org/apache/cassandra/db/SerializationHeader.java @@ -45,6 +45,8 @@ public class SerializationHeader { public static final Serializer serializer = new Serializer(); + private final boolean isForSSTable; + private final AbstractType<?> keyType; private final List<AbstractType<?>> clusteringTypes; @@ -53,12 +55,14 @@ public class SerializationHeader private final Map<ByteBuffer, AbstractType<?>> typeMap; - private SerializationHeader(AbstractType<?> keyType, + private SerializationHeader(boolean isForSSTable, + AbstractType<?> keyType, List<AbstractType<?>> clusteringTypes, PartitionColumns columns, EncodingStats stats, Map<ByteBuffer, AbstractType<?>> typeMap) { + this.isForSSTable = isForSSTable; this.keyType = keyType; this.clusteringTypes = clusteringTypes; this.columns = columns; @@ -77,7 +81,8 @@ public class SerializationHeader List<AbstractType<?>> clusteringTypes = new ArrayList<>(size); for (int i = 0; i < size; i++) clusteringTypes.add(BytesType.instance); - return new SerializationHeader(BytesType.instance, + return new SerializationHeader(false, + BytesType.instance, clusteringTypes, PartitionColumns.NONE, EncodingStats.NO_STATS, @@ -108,14 +113,16 @@ public class SerializationHeader else columns.addAll(sstable.header.columns()); } - return new SerializationHeader(metadata, columns.build(), stats.get()); + return new SerializationHeader(true, metadata, columns.build(), stats.get()); } - public SerializationHeader(CFMetaData metadata, + public SerializationHeader(boolean isForSSTable, + CFMetaData metadata, PartitionColumns columns, EncodingStats stats) { - this(metadata.getKeyValidator(), + this(isForSSTable, + metadata.getKeyValidator(), typesOf(metadata.clusteringColumns()), columns, stats, @@ -137,6 +144,11 @@ public class SerializationHeader return !columns.statics.isEmpty(); } + public boolean isForSSTable() + { + return isForSSTable; + } + public EncodingStats stats() { return stats; @@ -320,7 +332,7 @@ public class SerializationHeader } builder.add(column); } - return new SerializationHeader(keyType, clusteringTypes, builder.build(), stats, typeMap); + return new SerializationHeader(true, keyType, clusteringTypes, builder.build(), stats, typeMap); } @Override @@ -390,7 +402,7 @@ public class SerializationHeader regulars = Columns.serializer.deserializeSubset(selection.fetchedColumns().regulars, in); } - return new SerializationHeader(keyType, clusteringTypes, new PartitionColumns(statics, regulars), stats, null); + return new SerializationHeader(false, keyType, clusteringTypes, new PartitionColumns(statics, regulars), stats, null); } public long serializedSizeForMessaging(SerializationHeader header, ColumnFilter selection, boolean hasStatic) http://git-wip-us.apache.org/repos/asf/cassandra/blob/6584331c/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java index 5c76c63..ef30289 100644 --- a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java +++ b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java @@ -147,7 +147,7 @@ public abstract class UnfilteredDeserializer return; } - nextExtendedFlags = UnfilteredSerializer.isExtended(nextFlags) ? in.readUnsignedByte() : 0; + nextExtendedFlags = UnfilteredSerializer.readExtendedFlags(in, nextFlags); clusteringDeserializer.prepare(nextFlags, nextExtendedFlags); isReady = true; @@ -195,14 +195,14 @@ public abstract class UnfilteredDeserializer public void skipNext() throws IOException { isReady = false; - ClusteringPrefix.Kind kind = clusteringDeserializer.skipNext(); + clusteringDeserializer.skipNext(); if (UnfilteredSerializer.kind(nextFlags) == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER) { - UnfilteredSerializer.serializer.skipMarkerBody(in, header, kind.isBoundary()); + UnfilteredSerializer.serializer.skipMarkerBody(in); } else { - UnfilteredSerializer.serializer.skipRowBody(in, header, nextFlags, nextExtendedFlags); + UnfilteredSerializer.serializer.skipRowBody(in); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6584331c/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java index df006d7..3a0558e 100644 --- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java +++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java @@ -80,7 +80,8 @@ public class UnfilteredRowIteratorSerializer // Should only be used for the on-wire format. public void serialize(UnfilteredRowIterator iterator, ColumnFilter selection, DataOutputPlus out, int version, int rowEstimate) throws IOException { - SerializationHeader header = new SerializationHeader(iterator.metadata(), + SerializationHeader header = new SerializationHeader(false, + iterator.metadata(), iterator.columns(), iterator.stats()); serialize(iterator, header, selection, out, version, rowEstimate); @@ -89,6 +90,8 @@ public class UnfilteredRowIteratorSerializer // Should only be used for the on-wire format. public void serialize(UnfilteredRowIterator iterator, SerializationHeader header, ColumnFilter selection, DataOutputPlus out, int version, int rowEstimate) throws IOException { + assert !header.isForSSTable(); + ByteBufferUtil.writeWithVIntLength(iterator.partitionKey().getKey(), out); int flags = 0; @@ -134,7 +137,8 @@ public class UnfilteredRowIteratorSerializer // recreate an iterator for both serialize and serializedSize, which is mostly only PartitionUpdate/ArrayBackedCachedPartition. public long serializedSize(UnfilteredRowIterator iterator, ColumnFilter selection, int version, int rowEstimate) { - SerializationHeader header = new SerializationHeader(iterator.metadata(), + SerializationHeader header = new SerializationHeader(false, + iterator.metadata(), iterator.columns(), iterator.stats()); @@ -175,7 +179,7 @@ public class UnfilteredRowIteratorSerializer boolean isReversed = (flags & IS_REVERSED) != 0; if ((flags & IS_EMPTY) != 0) { - SerializationHeader sh = new SerializationHeader(metadata, PartitionColumns.NONE, EncodingStats.NO_STATS); + SerializationHeader sh = new SerializationHeader(false, metadata, PartitionColumns.NONE, EncodingStats.NO_STATS); return new Header(sh, key, isReversed, true, null, null, 0); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6584331c/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java index b83ccf9..4efc5eb 100644 --- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java +++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java @@ -35,10 +35,12 @@ import org.apache.cassandra.io.util.DataOutputPlus; * flag is defined/explained below as the "Unfiltered flags" constants. One of those flags * is an extension flag, and if present, trigger the rid of another byte that contains more * flags. If the extension is not set, defaults are assumed for the flags of that 2nd byte. - * <row> is <clustering>[<timestamp>][<ttl>][<deletion>]<sc1>...<sci><cc1>...<ccj> where - * <clustering> is the row clustering as serialized by - * {@code Clustering.serializer}. Note that static row are an exception and - * don't have this. <timestamp>, <ttl> and <deletion> are the row timestamp, ttl and deletion + * <row> is <clustering><size>[<timestamp>][<ttl>][<deletion>]<sc1>...<sci><cc1>...<ccj> where + * <clustering> is the row clustering as serialized by {@code Clustering.serializer} (note + * that static row are an exception and don't have this). + * <size> is the size of the whole unfiltered on disk (it's only used for sstables and is + * used to efficiently skip rows). + * <timestamp>, <ttl> and <deletion> are the row timestamp, ttl and deletion * whose presence is determined by the flags. <sci> is the simple columns of the row and <ccj> the * complex ones. * The columns for the row are then serialized if they differ from those in the header, @@ -90,22 +92,35 @@ public class UnfilteredSerializer public void serialize(Unfiltered unfiltered, SerializationHeader header, DataOutputPlus out, int version) throws IOException { + assert !header.isForSSTable(); + serialize(unfiltered, header, out, 0, version); + } + + public void serialize(Unfiltered unfiltered, SerializationHeader header, DataOutputPlus out, long previousUnfilteredSize, int version) + throws IOException + { if (unfiltered.kind() == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER) { - serialize((RangeTombstoneMarker) unfiltered, header, out, version); + serialize((RangeTombstoneMarker) unfiltered, header, out, previousUnfilteredSize, version); } else { - serialize((Row) unfiltered, header, out, version); + serialize((Row) unfiltered, header, out, previousUnfilteredSize, version); } } - public void serialize(Row row, SerializationHeader header, DataOutputPlus out, int version) + public void serializeStaticRow(Row row, SerializationHeader header, DataOutputPlus out, int version) + throws IOException + { + assert row.isStatic(); + serialize(row, header, out, 0, version); + } + + private void serialize(Row row, SerializationHeader header, DataOutputPlus out, long previousUnfilteredSize, int version) throws IOException { int flags = 0; int extendedFlags = 0; - boolean hasExtendedFlags = false; boolean isStatic = row.isStatic(); Columns headerColumns = header.columns(isStatic); @@ -113,12 +128,10 @@ public class UnfilteredSerializer Row.Deletion deletion = row.deletion(); boolean hasComplexDeletion = row.hasComplexDeletion(); boolean hasAllColumns = (row.size() == headerColumns.size()); + boolean hasExtendedFlags = hasExtendedFlags(row); if (isStatic) - { - hasExtendedFlags = true; extendedFlags |= IS_STATIC; - } if (!pkLiveness.isEmpty()) flags |= HAS_TIMESTAMP; @@ -128,10 +141,7 @@ public class UnfilteredSerializer { flags |= HAS_DELETION; if (deletion.isShadowable()) - { - hasExtendedFlags = true; extendedFlags |= HAS_SHADOWABLE_DELETION; - } } if (hasComplexDeletion) flags |= HAS_COMPLEX_DELETION; @@ -148,6 +158,12 @@ public class UnfilteredSerializer if (!isStatic) Clustering.serializer.serialize(row.clustering(), out, version, header.clusteringTypes()); + if (header.isForSSTable()) + { + out.writeUnsignedVInt(serializedRowBodySize(row, header, previousUnfilteredSize, version)); + out.writeUnsignedVInt(previousUnfilteredSize); + } + if ((flags & HAS_TIMESTAMP) != 0) header.writeTimestamp(pkLiveness.timestamp(), out); if ((flags & HAS_TTL) != 0) @@ -181,12 +197,18 @@ public class UnfilteredSerializer Cell.serializer.serialize(cell, out, rowLiveness, header); } - public void serialize(RangeTombstoneMarker marker, SerializationHeader header, DataOutputPlus out, int version) + private void serialize(RangeTombstoneMarker marker, SerializationHeader header, DataOutputPlus out, long previousUnfilteredSize, int version) throws IOException { out.writeByte((byte)IS_MARKER); RangeTombstone.Bound.serializer.serialize(marker.clustering(), out, version, header.clusteringTypes()); + if (header.isForSSTable()) + { + out.writeUnsignedVInt(serializedMarkerBodySize(marker, header, previousUnfilteredSize, version)); + out.writeUnsignedVInt(previousUnfilteredSize); + } + if (marker.isBoundary()) { RangeTombstoneBoundaryMarker bm = (RangeTombstoneBoundaryMarker)marker; @@ -201,15 +223,37 @@ public class UnfilteredSerializer public long serializedSize(Unfiltered unfiltered, SerializationHeader header, int version) { + assert !header.isForSSTable(); + return serializedSize(unfiltered, header, 0, version); + } + + public long serializedSize(Unfiltered unfiltered, SerializationHeader header, long previousUnfilteredSize,int version) + { return unfiltered.kind() == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER - ? serializedSize((RangeTombstoneMarker) unfiltered, header, version) - : serializedSize((Row) unfiltered, header, version); + ? serializedSize((RangeTombstoneMarker) unfiltered, header, previousUnfilteredSize, version) + : serializedSize((Row) unfiltered, header, previousUnfilteredSize, version); } - public long serializedSize(Row row, SerializationHeader header, int version) + private long serializedSize(Row row, SerializationHeader header, long previousUnfilteredSize, int version) { long size = 1; // flags + if (hasExtendedFlags(row)) + size += 1; // extended flags + + if (!row.isStatic()) + size += Clustering.serializer.serializedSize(row.clustering(), version, header.clusteringTypes()); + + return size + serializedRowBodySize(row, header, previousUnfilteredSize, version); + } + + private long serializedRowBodySize(Row row, SerializationHeader header, long previousUnfilteredSize, int version) + { + long size = 0; + + if (header.isForSSTable()) + size += TypeSizes.sizeofUnsignedVInt(previousUnfilteredSize); + boolean isStatic = row.isStatic(); Columns headerColumns = header.columns(isStatic); LivenessInfo pkLiveness = row.primaryKeyLivenessInfo(); @@ -217,12 +261,6 @@ public class UnfilteredSerializer boolean hasComplexDeletion = row.hasComplexDeletion(); boolean hasAllColumns = (row.size() == headerColumns.size()); - if (isStatic || deletion.isShadowable()) - size += 1; // extended flags - - if (!isStatic) - size += Clustering.serializer.serializedSize(row.clustering(), version, header.clusteringTypes()); - if (!pkLiveness.isEmpty()) size += header.timestampSerializedSize(pkLiveness.timestamp()); if (pkLiveness.isExpiring()) @@ -261,10 +299,19 @@ public class UnfilteredSerializer return size; } - public long serializedSize(RangeTombstoneMarker marker, SerializationHeader header, int version) + private long serializedSize(RangeTombstoneMarker marker, SerializationHeader header, long previousUnfilteredSize, int version) { - long size = 1 // flags - + RangeTombstone.Bound.serializer.serializedSize(marker.clustering(), version, header.clusteringTypes()); + assert !header.isForSSTable(); + return 1 // flags + + RangeTombstone.Bound.serializer.serializedSize(marker.clustering(), version, header.clusteringTypes()) + + serializedMarkerBodySize(marker, header, previousUnfilteredSize, version); + } + + private long serializedMarkerBodySize(RangeTombstoneMarker marker, SerializationHeader header, long previousUnfilteredSize, int version) + { + long size = 0; + if (header.isForSSTable()) + size += TypeSizes.sizeofUnsignedVInt(previousUnfilteredSize); if (marker.isBoundary()) { @@ -299,7 +346,7 @@ public class UnfilteredSerializer if (isEndOfPartition(flags)) return null; - int extendedFlags = isExtended(flags) ? in.readUnsignedByte() : 0; + int extendedFlags = readExtendedFlags(in, flags); if (kind(flags) == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER) { @@ -328,6 +375,12 @@ public class UnfilteredSerializer public RangeTombstoneMarker deserializeMarkerBody(DataInputPlus in, SerializationHeader header, RangeTombstone.Bound bound) throws IOException { + if (header.isForSSTable()) + { + in.readUnsignedVInt(); // marker size + in.readUnsignedVInt(); // previous unfiltered size + } + if (bound.isBoundary()) return new RangeTombstoneBoundaryMarker(bound, header.readDeletionTime(in), header.readDeletionTime(in)); else @@ -353,6 +406,12 @@ public class UnfilteredSerializer boolean hasAllColumns = (flags & HAS_ALL_COLUMNS) != 0; Columns headerColumns = header.columns(isStatic); + if (header.isForSSTable()) + { + in.readUnsignedVInt(); // Skip row size + in.readUnsignedVInt(); // previous unfiltered size + } + LivenessInfo rowLiveness = LivenessInfo.EMPTY; if (hasTimestamp) { @@ -430,36 +489,10 @@ public class UnfilteredSerializer } } - public void skipRowBody(DataInputPlus in, SerializationHeader header, int flags, int extendedFlags) throws IOException + public void skipRowBody(DataInputPlus in) throws IOException { - boolean isStatic = isStatic(extendedFlags); - boolean hasTimestamp = (flags & HAS_TIMESTAMP) != 0; - boolean hasTTL = (flags & HAS_TTL) != 0; - boolean hasDeletion = (flags & HAS_DELETION) != 0; - boolean hasComplexDeletion = (flags & HAS_COMPLEX_DELETION) != 0; - boolean hasAllColumns = (flags & HAS_ALL_COLUMNS) != 0; - Columns headerColumns = header.columns(isStatic); - - // Note that we don't want want to use FileUtils.skipBytesFully for anything that may not have - // the size we think due to VINT encoding - if (hasTimestamp) - header.skipTimestamp(in); - if (hasTTL) - { - header.skipLocalDeletionTime(in); - header.skipTTL(in); - } - if (hasDeletion) - header.skipDeletionTime(in); - - Columns columns = hasAllColumns ? headerColumns : Columns.serializer.deserializeSubset(headerColumns, in); - for (ColumnDefinition column : columns) - { - if (column.isSimple()) - Cell.serializer.skip(in, column, header); - else - skipComplexColumn(in, column, header, hasComplexDeletion); - } + int rowSize = (int)in.readUnsignedVInt(); + in.skipBytesFully(rowSize); } public void skipStaticRow(DataInputPlus in, SerializationHeader header, SerializationHelper helper) throws IOException @@ -468,20 +501,13 @@ public class UnfilteredSerializer assert !isEndOfPartition(flags) && kind(flags) == Unfiltered.Kind.ROW && isExtended(flags) : "Flags is " + flags; int extendedFlags = in.readUnsignedByte(); assert isStatic(extendedFlags); - skipRowBody(in, header, flags, extendedFlags); + skipRowBody(in); } - public void skipMarkerBody(DataInputPlus in, SerializationHeader header, boolean isBoundary) throws IOException + public void skipMarkerBody(DataInputPlus in) throws IOException { - if (isBoundary) - { - header.skipDeletionTime(in); - header.skipDeletionTime(in); - } - else - { - header.skipDeletionTime(in); - } + int markerSize = (int)in.readUnsignedVInt(); + in.skipBytesFully(markerSize); } private void skipComplexColumn(DataInputPlus in, ColumnDefinition column, SerializationHeader header, boolean hasComplexDeletion) @@ -510,8 +536,18 @@ public class UnfilteredSerializer return (extendedFlags & IS_STATIC) != 0; } - public static boolean isExtended(int flags) + private static boolean isExtended(int flags) { return (flags & EXTENSION_FLAG) != 0; } + + public static int readExtendedFlags(DataInputPlus in, int flags) throws IOException + { + return isExtended(flags) ? in.readUnsignedByte() : 0; + } + + public static boolean hasExtendedFlags(Row row) + { + return row.isStatic() || row.deletion().isShadowable(); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6584331c/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java index d94b219..62348ec 100644 --- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java @@ -65,7 +65,7 @@ abstract class AbstractSSTableSimpleWriter implements Closeable 0, ActiveRepairService.UNREPAIRED_SSTABLE, 0, - new SerializationHeader(metadata, columns, EncodingStats.NO_STATS)); + new SerializationHeader(true, metadata, columns, EncodingStats.NO_STATS)); } private static Descriptor createDescriptor(File directory, final String keyspace, final String columnFamily, final SSTableFormat.Type fmt) http://git-wip-us.apache.org/repos/asf/cassandra/blob/6584331c/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java index f4b9adf..6d3a714 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java @@ -63,7 +63,7 @@ class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter { super(directory, metadata, columns); this.bufferSize = bufferSizeInMB * 1024L * 1024L; - this.header = new SerializationHeader(metadata, columns, EncodingStats.NO_STATS); + this.header = new SerializationHeader(true, metadata, columns, EncodingStats.NO_STATS); diskWriter.start(); } @@ -89,7 +89,7 @@ class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter // improve that. In particular, what we count is closer to the serialized value, but it's debatable that it's the right thing // to count since it will take a lot more space in memory and the bufferSize if first and foremost used to avoid OOM when // using this writer. - currentSize += UnfilteredSerializer.serializer.serializedSize(row, header, formatType.info.getLatestVersion().correspondingMessagingVersion()); + currentSize += UnfilteredSerializer.serializer.serializedSize(row, header, 0, formatType.info.getLatestVersion().correspondingMessagingVersion()); } private void maybeSync() throws SyncException http://git-wip-us.apache.org/repos/asf/cassandra/blob/6584331c/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java b/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java index 25baa4e..62c88a0 100644 --- a/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java +++ b/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java @@ -60,7 +60,7 @@ public class RowIndexEntryTest extends CQLTester DeletionTime deletionInfo = new DeletionTime(FBUtilities.timestampMicros(), FBUtilities.nowInSeconds()); - SerializationHeader header = new SerializationHeader(cfMeta, cfMeta.partitionColumns(), EncodingStats.NO_STATS); + SerializationHeader header = new SerializationHeader(true, cfMeta, cfMeta.partitionColumns(), EncodingStats.NO_STATS); IndexHelper.IndexInfo.Serializer indexSerializer = new IndexHelper.IndexInfo.Serializer(cfMeta, BigFormat.latestVersion, header); DataOutputBuffer dob = new DataOutputBuffer(); @@ -119,7 +119,7 @@ public class RowIndexEntryTest extends CQLTester final RowIndexEntry simple = new RowIndexEntry(123); DataOutputBuffer buffer = new DataOutputBuffer(); - SerializationHeader header = new SerializationHeader(cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS); + SerializationHeader header = new SerializationHeader(true, cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS); RowIndexEntry.Serializer serializer = new RowIndexEntry.Serializer(cfs.metadata, BigFormat.latestVersion, header); serializer.serialize(simple, buffer); http://git-wip-us.apache.org/repos/asf/cassandra/blob/6584331c/test/unit/org/apache/cassandra/db/ScrubTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java index 2fc8436..ab99750 100644 --- a/test/unit/org/apache/cassandra/db/ScrubTest.java +++ b/test/unit/org/apache/cassandra/db/ScrubTest.java @@ -325,7 +325,8 @@ public class ScrubTest keys.size(), 0L, 0, - new SerializationHeader(cfs.metadata, + new SerializationHeader(true, + cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS))) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/6584331c/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java index cd82b19..db07eb8 100644 --- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java @@ -166,7 +166,7 @@ public class AntiCompactionTest File dir = cfs.getDirectories().getDirectoryForNewSSTables(); String filename = cfs.getSSTablePath(dir); - try (SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, filename, 0, 0, new SerializationHeader(cfm, cfm.partitionColumns(), EncodingStats.NO_STATS))) + try (SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, filename, 0, 0, new SerializationHeader(true, cfm, cfm.partitionColumns(), EncodingStats.NO_STATS))) { for (int i = 0; i < count; i++) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/6584331c/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java index e1ab48f..78964f4 100644 --- a/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java @@ -69,7 +69,7 @@ public class BigTableWriterTest extends AbstractTransactionalTest private TestableBTW(String file) { - this(file, SSTableTxnWriter.create(cfs, file, 0, 0, new SerializationHeader(cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS))); + this(file, SSTableTxnWriter.create(cfs, file, 0, 0, new SerializationHeader(true, cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS))); } private TestableBTW(String file, SSTableTxnWriter sw) http://git-wip-us.apache.org/repos/asf/cassandra/blob/6584331c/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java index 093bffd..bd286e4 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java @@ -943,7 +943,7 @@ public class SSTableRewriterTest extends SchemaLoader File dir = cfs.getDirectories().getDirectoryForNewSSTables(); String filename = cfs.getSSTablePath(dir); - try (SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, filename, 0, 0, new SerializationHeader(cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS))) + try (SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, filename, 0, 0, new SerializationHeader(true, cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS))) { int end = f == fileCount - 1 ? partitionCount : ((f + 1) * partitionCount) / fileCount; for ( ; i < end ; i++) @@ -1011,7 +1011,7 @@ public class SSTableRewriterTest extends SchemaLoader public static SSTableWriter getWriter(ColumnFamilyStore cfs, File directory, LifecycleTransaction txn) { String filename = cfs.getSSTablePath(directory); - return SSTableWriter.create(filename, 0, 0, new SerializationHeader(cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS), txn); + return SSTableWriter.create(filename, 0, 0, new SerializationHeader(true, cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS), txn); } public static ByteBuffer random(int i, int size) http://git-wip-us.apache.org/repos/asf/cassandra/blob/6584331c/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java index fcd2d71..5c7ff02 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java @@ -187,7 +187,7 @@ public class SSTableUtils { public SerializationHeader header() { - return new SerializationHeader(Schema.instance.getCFMetaData(ksname, cfname), builder.build(), EncodingStats.NO_STATS); + return new SerializationHeader(true, Schema.instance.getCFMetaData(ksname, cfname), builder.build(), EncodingStats.NO_STATS); } @Override