Switch to DataInputPlus patch by ariel; reviewed by benedict for CASSANDRA-9499
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/03f72acd Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/03f72acd Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/03f72acd Branch: refs/heads/trunk Commit: 03f72acd546407c7f9de2a976de31dcd565dba9a Parents: 1491a40 Author: Ariel Weisberg <ar...@weisberg.ws> Authored: Wed Jul 1 16:27:43 2015 -0400 Committer: Benedict Elliott Smith <bened...@apache.org> Committed: Thu Jul 2 09:39:58 2015 +0100 ---------------------------------------------------------------------- .../apache/cassandra/cache/AutoSavingCache.java | 8 +- .../org/apache/cassandra/cache/OHCProvider.java | 123 ++------------ .../cassandra/cache/SerializingCache.java | 12 +- .../cache/SerializingCacheProvider.java | 13 +- .../org/apache/cassandra/config/CFMetaData.java | 2 +- .../cassandra/db/AbstractLivenessInfo.java | 6 +- .../apache/cassandra/db/BatchlogManager.java | 7 +- .../org/apache/cassandra/db/Clustering.java | 4 +- .../apache/cassandra/db/ClusteringPrefix.java | 10 +- src/java/org/apache/cassandra/db/Columns.java | 6 +- .../apache/cassandra/db/CounterMutation.java | 6 +- src/java/org/apache/cassandra/db/DataRange.java | 2 +- .../org/apache/cassandra/db/DeletionInfo.java | 2 +- .../org/apache/cassandra/db/DeletionTime.java | 9 +- .../cassandra/db/HintedHandOffManager.java | 7 +- .../org/apache/cassandra/db/LegacyLayout.java | 13 +- src/java/org/apache/cassandra/db/Mutation.java | 17 +- .../apache/cassandra/db/PartitionPosition.java | 2 +- .../org/apache/cassandra/db/RangeTombstone.java | 6 +- .../apache/cassandra/db/RangeTombstoneList.java | 29 ++-- .../org/apache/cassandra/db/ReadCommand.java | 13 +- .../org/apache/cassandra/db/ReadResponse.java | 12 +- .../org/apache/cassandra/db/RowIndexEntry.java | 20 +-- .../cassandra/db/SerializationHeader.java | 36 ++-- .../org/apache/cassandra/db/Serializers.java | 9 +- .../db/SinglePartitionReadCommand.java | 3 +- src/java/org/apache/cassandra/db/Slice.java | 12 +- src/java/org/apache/cassandra/db/Slices.java | 6 +- .../apache/cassandra/db/SnapshotCommand.java | 12 +- .../org/apache/cassandra/db/SystemKeyspace.java | 6 +- .../apache/cassandra/db/TruncateResponse.java | 10 +- .../org/apache/cassandra/db/Truncation.java | 6 +- src/java/org/apache/cassandra/db/TypeSizes.java | 91 +++------- .../cassandra/db/UnfilteredDeserializer.java | 13 +- .../org/apache/cassandra/db/WriteResponse.java | 4 +- .../db/commitlog/CommitLogReplayer.java | 10 +- .../cassandra/db/commitlog/ReplayPosition.java | 8 +- .../cassandra/db/context/CounterContext.java | 8 +- .../filter/AbstractClusteringIndexFilter.java | 7 +- .../db/filter/ClusteringIndexNamesFilter.java | 4 +- .../db/filter/ClusteringIndexSliceFilter.java | 4 +- .../cassandra/db/filter/ColumnFilter.java | 10 +- .../cassandra/db/filter/ColumnSubselection.java | 10 +- .../apache/cassandra/db/filter/DataLimits.java | 17 +- .../apache/cassandra/db/filter/RowFilter.java | 16 +- .../cassandra/db/marshal/AbstractType.java | 4 +- .../cassandra/db/marshal/CollectionType.java | 5 +- .../partitions/ArrayBackedCachedPartition.java | 10 +- .../db/partitions/PartitionUpdate.java | 17 +- .../org/apache/cassandra/db/rows/CellPath.java | 3 +- .../org/apache/cassandra/db/rows/RowStats.java | 10 +- .../rows/UnfilteredRowIteratorSerializer.java | 24 +-- .../cassandra/db/rows/UnfilteredSerializer.java | 64 +++---- .../apache/cassandra/dht/AbstractBounds.java | 2 +- .../org/apache/cassandra/dht/BootStrapper.java | 8 +- src/java/org/apache/cassandra/dht/Token.java | 2 +- .../org/apache/cassandra/gms/EchoMessage.java | 4 +- .../org/apache/cassandra/gms/EndpointState.java | 9 +- .../org/apache/cassandra/gms/GossipDigest.java | 7 +- .../apache/cassandra/gms/GossipDigestAck.java | 6 +- .../apache/cassandra/gms/GossipDigestAck2.java | 5 +- .../apache/cassandra/gms/GossipDigestSyn.java | 11 +- .../apache/cassandra/gms/HeartBeatState.java | 5 +- .../apache/cassandra/gms/VersionedValue.java | 7 +- .../org/apache/cassandra/io/ISerializer.java | 7 +- .../cassandra/io/IVersionedSerializer.java | 4 +- .../io/compress/CompressionMetadata.java | 7 +- .../io/compress/CompressionParameters.java | 14 +- .../cassandra/io/sstable/IndexHelper.java | 17 +- .../io/sstable/SSTableSimpleIterator.java | 5 +- .../io/sstable/SSTableSimpleUnsortedWriter.java | 4 +- .../io/sstable/metadata/CompactionMetadata.java | 10 +- .../metadata/IMetadataComponentSerializer.java | 4 +- .../metadata/LegacyMetadataSerializer.java | 4 +- .../io/sstable/metadata/StatsMetadata.java | 17 +- .../io/sstable/metadata/ValidationMetadata.java | 6 +- .../cassandra/io/util/AbstractDataInput.java | 36 +--- .../apache/cassandra/io/util/DataInputPlus.java | 168 +++++++++++++++++++ .../cassandra/io/util/DataOutputPlus.java | 109 ++++++++++++ .../apache/cassandra/io/util/FileDataInput.java | 3 +- .../cassandra/io/util/NIODataInputStream.java | 74 +++++++- .../net/IncomingStreamingConnection.java | 9 +- .../cassandra/net/IncomingTcpConnection.java | 13 +- .../org/apache/cassandra/net/MessageIn.java | 5 +- .../org/apache/cassandra/net/MessageOut.java | 10 +- .../apache/cassandra/net/MessagingService.java | 5 +- .../org/apache/cassandra/repair/NodePair.java | 4 +- .../apache/cassandra/repair/RepairJobDesc.java | 10 +- .../repair/messages/AnticompactionRequest.java | 4 +- .../repair/messages/CleanupMessage.java | 4 +- .../repair/messages/PrepareMessage.java | 11 +- .../repair/messages/RepairMessage.java | 4 +- .../repair/messages/SnapshotMessage.java | 4 +- .../cassandra/repair/messages/SyncComplete.java | 6 +- .../cassandra/repair/messages/SyncRequest.java | 6 +- .../repair/messages/ValidationComplete.java | 6 +- .../repair/messages/ValidationRequest.java | 6 +- .../apache/cassandra/service/CacheService.java | 10 +- .../cassandra/service/MigrationManager.java | 11 +- .../apache/cassandra/service/paxos/Commit.java | 18 +- .../service/paxos/PrepareResponse.java | 17 +- .../cassandra/streaming/StreamReader.java | 8 +- .../cassandra/streaming/StreamRequest.java | 14 +- .../cassandra/streaming/StreamSummary.java | 8 +- .../streaming/compress/CompressionInfo.java | 8 +- .../streaming/messages/FileMessageHeader.java | 20 +-- .../streaming/messages/IncomingFileMessage.java | 4 +- .../streaming/messages/PrepareMessage.java | 4 +- .../streaming/messages/ReceivedMessage.java | 4 +- .../streaming/messages/RetryMessage.java | 4 +- .../streaming/messages/StreamInitMessage.java | 14 +- .../org/apache/cassandra/utils/BloomFilter.java | 3 +- .../cassandra/utils/BloomFilterSerializer.java | 12 +- .../cassandra/utils/BooleanSerializer.java | 4 +- .../apache/cassandra/utils/ByteBufferUtil.java | 8 +- .../cassandra/utils/BytesReadTracker.java | 4 +- .../cassandra/utils/EstimatedHistogram.java | 13 +- .../apache/cassandra/utils/IntervalTree.java | 23 +-- .../org/apache/cassandra/utils/MerkleTree.java | 17 +- .../cassandra/utils/StreamingHistogram.java | 10 +- .../apache/cassandra/utils/UUIDSerializer.java | 6 +- .../org/apache/cassandra/utils/obs/IBitSet.java | 4 +- .../cassandra/utils/obs/OffHeapBitSet.java | 6 +- .../apache/cassandra/utils/obs/OpenBitSet.java | 6 +- .../utils/vint/EncodedDataInputStream.java | 94 ----------- .../utils/vint/EncodedDataOutputStream.java | 69 -------- .../db/commitlog/CommitLogStressTest.java | 11 +- .../cassandra/AbstractSerializationsTester.java | 8 +- .../org/apache/cassandra/db/PartitionTest.java | 11 +- .../apache/cassandra/db/ReadMessageTest.java | 9 +- .../db/commitlog/CommitLogTestReplayer.java | 8 +- .../apache/cassandra/gms/GossipDigestTest.java | 9 +- .../cassandra/gms/SerializationsTest.java | 6 +- .../cassandra/service/SerializationsTest.java | 11 +- .../cassandra/utils/EncodedStreamsTest.java | 158 ----------------- .../cassandra/utils/IntervalTreeTest.java | 17 +- .../apache/cassandra/utils/MerkleTreeTest.java | 8 +- .../cassandra/utils/SerializationsTest.java | 4 +- .../cassandra/utils/StreamingHistogramTest.java | 4 +- 139 files changed, 971 insertions(+), 1146 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/cache/AutoSavingCache.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java index 9dda019..0b334f5 100644 --- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java +++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java @@ -26,7 +26,6 @@ import java.util.concurrent.TimeUnit; import org.cliffc.high_scale_lib.NonBlockingHashSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; @@ -39,6 +38,7 @@ import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.marshal.BytesType; import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.io.util.*; +import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus; import org.apache.cassandra.service.CacheService; import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.Pair; @@ -132,11 +132,11 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K File path = getCachePath(cfs.metadata.cfId, CURRENT_VERSION); if (path.exists()) { - DataInputStream in = null; + DataInputStreamPlus in = null; try { logger.info(String.format("reading saved cache %s", path)); - in = new DataInputStream(new LengthAvailableInputStream(new BufferedInputStream(streamFactory.getInputStream(path)), path.length())); + in = new DataInputStreamPlus(new LengthAvailableInputStream(new BufferedInputStream(streamFactory.getInputStream(path)), path.length())); List<Future<Pair<K, V>>> futures = new ArrayList<Future<Pair<K, V>>>(); while (in.available() > 0) { @@ -356,6 +356,6 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K { void serialize(K key, DataOutputPlus out) throws IOException; - Future<Pair<K, V>> deserialize(DataInputStream in, ColumnFamilyStore cfs) throws IOException; + Future<Pair<K, V>> deserialize(DataInputPlus in, ColumnFamilyStore cfs) throws IOException; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/cache/OHCProvider.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cache/OHCProvider.java b/src/java/org/apache/cassandra/cache/OHCProvider.java index 981a1f8..46cbb8b 100644 --- a/src/java/org/apache/cassandra/cache/OHCProvider.java +++ b/src/java/org/apache/cassandra/cache/OHCProvider.java @@ -20,19 +20,15 @@ package org.apache.cassandra.cache; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.WritableByteChannel; import java.util.Iterator; import java.util.UUID; -import com.google.common.base.Function; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.db.partitions.CachedPartition; +import org.apache.cassandra.io.util.DataInputPlus.DataInputPlusAdapter; import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.io.util.Memory; -import org.apache.cassandra.net.MessagingService; import org.caffinitas.ohc.OHCache; import org.caffinitas.ohc.OHCacheBuilder; @@ -42,8 +38,8 @@ public class OHCProvider implements CacheProvider<RowCacheKey, IRowCacheEntry> { OHCacheBuilder<RowCacheKey, IRowCacheEntry> builder = OHCacheBuilder.newBuilder(); builder.capacity(DatabaseDescriptor.getRowCacheSizeInMB() * 1024 * 1024) - .keySerializer(new KeySerializer()) - .valueSerializer(new ValueSerializer()) + .keySerializer(KeySerializer.instance) + .valueSerializer(ValueSerializer.instance) .throwOOME(true); return new OHCacheAdapter(builder.build()); @@ -70,7 +66,7 @@ public class OHCProvider implements CacheProvider<RowCacheKey, IRowCacheEntry> public void put(RowCacheKey key, IRowCacheEntry value) { - ohCache.put(key, value); + ohCache.put(key, value); } public boolean putIfAbsent(RowCacheKey key, IRowCacheEntry value) @@ -126,6 +122,7 @@ public class OHCProvider implements CacheProvider<RowCacheKey, IRowCacheEntry> private static class KeySerializer implements org.caffinitas.ohc.CacheSerializer<RowCacheKey> { + private static KeySerializer instance = new KeySerializer(); public void serialize(RowCacheKey rowCacheKey, DataOutput dataOutput) throws IOException { dataOutput.writeLong(rowCacheKey.cfId.getMostSignificantBits()); @@ -151,6 +148,7 @@ public class OHCProvider implements CacheProvider<RowCacheKey, IRowCacheEntry> private static class ValueSerializer implements org.caffinitas.ohc.CacheSerializer<IRowCacheEntry> { + private static ValueSerializer instance = new ValueSerializer(); public void serialize(IRowCacheEntry entry, DataOutput out) throws IOException { assert entry != null; // unlike CFS we don't support nulls, since there is no need for that in the cache @@ -159,7 +157,7 @@ public class OHCProvider implements CacheProvider<RowCacheKey, IRowCacheEntry> if (isSentinel) out.writeLong(((RowCacheSentinel) entry).sentinelId); else - CachedPartition.cacheSerializer.serialize((CachedPartition)entry, new DataOutputPlusAdapter(out)); + CachedPartition.cacheSerializer.serialize((CachedPartition)entry, new DataOutputPlus.DataOutputPlusAdapter(out)); } public IRowCacheEntry deserialize(DataInput in) throws IOException @@ -167,116 +165,17 @@ public class OHCProvider implements CacheProvider<RowCacheKey, IRowCacheEntry> boolean isSentinel = in.readBoolean(); if (isSentinel) return new RowCacheSentinel(in.readLong()); - return CachedPartition.cacheSerializer.deserialize(in); + return CachedPartition.cacheSerializer.deserialize(new DataInputPlusAdapter(in)); } public int serializedSize(IRowCacheEntry entry) { - TypeSizes typeSizes = TypeSizes.NATIVE; - int size = typeSizes.sizeof(true); + int size = TypeSizes.sizeof(true); if (entry instanceof RowCacheSentinel) - size += typeSizes.sizeof(((RowCacheSentinel) entry).sentinelId); + size += TypeSizes.sizeof(((RowCacheSentinel) entry).sentinelId); else - size += CachedPartition.cacheSerializer.serializedSize((CachedPartition) entry, typeSizes); + size += CachedPartition.cacheSerializer.serializedSize((CachedPartition) entry); return size; } } - - static class DataOutputPlusAdapter implements DataOutputPlus - { - private final DataOutput out; - - public void write(byte[] b) throws IOException - { - out.write(b); - } - - public void write(byte[] b, int off, int len) throws IOException - { - out.write(b, off, len); - } - - public void write(int b) throws IOException - { - out.write(b); - } - - public void writeBoolean(boolean v) throws IOException - { - out.writeBoolean(v); - } - - public void writeByte(int v) throws IOException - { - out.writeByte(v); - } - - public void writeBytes(String s) throws IOException - { - out.writeBytes(s); - } - - public void writeChar(int v) throws IOException - { - out.writeChar(v); - } - - public void writeChars(String s) throws IOException - { - out.writeChars(s); - } - - public void writeDouble(double v) throws IOException - { - out.writeDouble(v); - } - - public void writeFloat(float v) throws IOException - { - out.writeFloat(v); - } - - public void writeInt(int v) throws IOException - { - out.writeInt(v); - } - - public void writeLong(long v) throws IOException - { - out.writeLong(v); - } - - public void writeShort(int v) throws IOException - { - out.writeShort(v); - } - - public void writeUTF(String s) throws IOException - { - out.writeUTF(s); - } - - public DataOutputPlusAdapter(DataOutput out) - { - this.out = out; - } - - public void write(ByteBuffer buffer) throws IOException - { - if (buffer.hasArray()) - out.write(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining()); - else - throw new UnsupportedOperationException("IMPLEMENT ME"); - } - - public void write(Memory memory, long offset, long length) throws IOException - { - throw new UnsupportedOperationException("IMPLEMENT ME"); - } - - public <R> R applyToChannel(Function<WritableByteChannel, R> c) throws IOException - { - throw new UnsupportedOperationException("IMPLEMENT ME"); - } - } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/cache/SerializingCache.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cache/SerializingCache.java b/src/java/org/apache/cassandra/cache/SerializingCache.java index 0e38922..ae1c428 100644 --- a/src/java/org/apache/cassandra/cache/SerializingCache.java +++ b/src/java/org/apache/cassandra/cache/SerializingCache.java @@ -26,12 +26,11 @@ import org.slf4j.LoggerFactory; import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap; import com.googlecode.concurrentlinkedhashmap.EvictionListener; import com.googlecode.concurrentlinkedhashmap.Weigher; -import org.apache.cassandra.db.TypeSizes; + import org.apache.cassandra.io.ISerializer; import org.apache.cassandra.io.util.MemoryInputStream; import org.apache.cassandra.io.util.MemoryOutputStream; -import org.apache.cassandra.utils.vint.EncodedDataInputStream; -import org.apache.cassandra.utils.vint.EncodedDataOutputStream; +import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus; /** * Serializes cache values off-heap. @@ -39,7 +38,6 @@ import org.apache.cassandra.utils.vint.EncodedDataOutputStream; public class SerializingCache<K, V> implements ICache<K, V> { private static final Logger logger = LoggerFactory.getLogger(SerializingCache.class); - private static final TypeSizes ENCODED_TYPE_SIZES = TypeSizes.VINT; private static final int DEFAULT_CONCURENCY_LEVEL = 64; @@ -88,7 +86,7 @@ public class SerializingCache<K, V> implements ICache<K, V> { try { - return serializer.deserialize(new EncodedDataInputStream(new MemoryInputStream(mem))); + return serializer.deserialize(new MemoryInputStream(mem)); } catch (IOException e) { @@ -99,7 +97,7 @@ public class SerializingCache<K, V> implements ICache<K, V> private RefCountedMemory serialize(V value) { - long serializedSize = serializer.serializedSize(value, ENCODED_TYPE_SIZES); + long serializedSize = serializer.serializedSize(value); if (serializedSize > Integer.MAX_VALUE) throw new IllegalArgumentException("Unable to allocate " + serializedSize + " bytes"); @@ -115,7 +113,7 @@ public class SerializingCache<K, V> implements ICache<K, V> try { - serializer.serialize(value, new EncodedDataOutputStream(new MemoryOutputStream(freeableMemory))); + serializer.serialize(value, new WrappedDataOutputStreamPlus(new MemoryOutputStream(freeableMemory))); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java b/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java index 70d9e73..1119295 100644 --- a/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java +++ b/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java @@ -17,15 +17,14 @@ */ package org.apache.cassandra.cache; -import java.io.DataInput; import java.io.IOException; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.db.partitions.CachedPartition; import org.apache.cassandra.io.ISerializer; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.net.MessagingService; public class SerializingCacheProvider implements CacheProvider<RowCacheKey, IRowCacheEntry> { @@ -48,7 +47,7 @@ public class SerializingCacheProvider implements CacheProvider<RowCacheKey, IRow CachedPartition.cacheSerializer.serialize((CachedPartition)entry, out); } - public IRowCacheEntry deserialize(DataInput in) throws IOException + public IRowCacheEntry deserialize(DataInputPlus in) throws IOException { boolean isSentinel = in.readBoolean(); if (isSentinel) @@ -57,13 +56,13 @@ public class SerializingCacheProvider implements CacheProvider<RowCacheKey, IRow return CachedPartition.cacheSerializer.deserialize(in); } - public long serializedSize(IRowCacheEntry entry, TypeSizes typeSizes) + public long serializedSize(IRowCacheEntry entry) { - int size = typeSizes.sizeof(true); + int size = TypeSizes.sizeof(true); if (entry instanceof RowCacheSentinel) - size += typeSizes.sizeof(((RowCacheSentinel) entry).sentinelId); + size += TypeSizes.sizeof(((RowCacheSentinel) entry).sentinelId); else - size += CachedPartition.cacheSerializer.serializedSize((CachedPartition) entry, typeSizes); + size += CachedPartition.cacheSerializer.serializedSize((CachedPartition) entry); return size; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/config/CFMetaData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java index 4db53e7..6deee6d 100644 --- a/src/java/org/apache/cassandra/config/CFMetaData.java +++ b/src/java/org/apache/cassandra/config/CFMetaData.java @@ -1579,7 +1579,7 @@ public final class CFMetaData return metadata; } - public long serializedSize(CFMetaData metadata, int version, TypeSizes sizes) + public long serializedSize(CFMetaData metadata, int version) { // We've made sure it was encoded as 16 bytes whatever the TypeSizes is. return 16; http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/db/AbstractLivenessInfo.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/AbstractLivenessInfo.java b/src/java/org/apache/cassandra/db/AbstractLivenessInfo.java index bbda598..4f4b5d1 100644 --- a/src/java/org/apache/cassandra/db/AbstractLivenessInfo.java +++ b/src/java/org/apache/cassandra/db/AbstractLivenessInfo.java @@ -85,11 +85,11 @@ public abstract class AbstractLivenessInfo implements LivenessInfo { int size = 0; if (hasTimestamp()) - size += TypeSizes.NATIVE.sizeof(timestamp()); + size += TypeSizes.sizeof(timestamp()); if (hasTTL()) - size += TypeSizes.NATIVE.sizeof(ttl()); + size += TypeSizes.sizeof(ttl()); if (hasLocalDeletionTime()) - size += TypeSizes.NATIVE.sizeof(localDeletionTime()); + size += TypeSizes.sizeof(localDeletionTime()); return size; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/db/BatchlogManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java index 83a0654..39a7aa6 100644 --- a/src/java/org/apache/cassandra/db/BatchlogManager.java +++ b/src/java/org/apache/cassandra/db/BatchlogManager.java @@ -25,6 +25,7 @@ import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; + import javax.management.MBeanServer; import javax.management.ObjectName; @@ -34,7 +35,6 @@ import com.google.common.util.concurrent.RateLimiter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.UntypedResultSet; @@ -47,7 +47,9 @@ import org.apache.cassandra.exceptions.WriteTimeoutException; import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.io.util.NIODataInputStream; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; @@ -57,6 +59,7 @@ import org.apache.cassandra.service.WriteResponseHandler; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.WrappedRunnable; + import static org.apache.cassandra.cql3.QueryProcessor.executeInternal; public class BatchlogManager implements BatchlogManagerMBean @@ -315,7 +318,7 @@ public class BatchlogManager implements BatchlogManagerMBean private List<Mutation> replayingMutations() throws IOException { - DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(data)); + DataInputPlus in = new NIODataInputStream(data, true); int size = in.readInt(); List<Mutation> mutations = new ArrayList<>(size); for (int i = 0; i < size; i++) http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/db/Clustering.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Clustering.java b/src/java/org/apache/cassandra/db/Clustering.java index 5ac9671..541556b 100644 --- a/src/java/org/apache/cassandra/db/Clustering.java +++ b/src/java/org/apache/cassandra/db/Clustering.java @@ -151,9 +151,9 @@ public abstract class Clustering extends AbstractClusteringPrefix ClusteringPrefix.serializer.serializeValuesWithoutSize(clustering, out, version, types); } - public long serializedSize(Clustering clustering, int version, List<AbstractType<?>> types, TypeSizes sizes) + public long serializedSize(Clustering clustering, int version, List<AbstractType<?>> types) { - return ClusteringPrefix.serializer.valuesWithoutSizeSerializedSize(clustering, version, types, sizes); + return ClusteringPrefix.serializer.valuesWithoutSizeSerializedSize(clustering, version, types); } public void deserialize(DataInput in, int version, List<AbstractType<?>> types, Writer writer) throws IOException http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/db/ClusteringPrefix.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ClusteringPrefix.java b/src/java/org/apache/cassandra/db/ClusteringPrefix.java index 73cedb8..36d91e7 100644 --- a/src/java/org/apache/cassandra/db/ClusteringPrefix.java +++ b/src/java/org/apache/cassandra/db/ClusteringPrefix.java @@ -273,14 +273,14 @@ public interface ClusteringPrefix extends Aliasable<ClusteringPrefix>, IMeasurab return Slice.Bound.serializer.deserializeValues(in, kind, version, types); } - public long serializedSize(ClusteringPrefix clustering, int version, List<AbstractType<?>> types, TypeSizes sizes) + public long serializedSize(ClusteringPrefix clustering, int version, List<AbstractType<?>> types) { // We shouldn't serialize static clusterings assert clustering.kind() != Kind.STATIC_CLUSTERING; if (clustering.kind() == Kind.CLUSTERING) - return 1 + Clustering.serializer.serializedSize((Clustering)clustering, version, types, sizes); + return 1 + Clustering.serializer.serializedSize((Clustering)clustering, version, types); else - return Slice.Bound.serializer.serializedSize((Slice.Bound)clustering, version, types, sizes); + return Slice.Bound.serializer.serializedSize((Slice.Bound)clustering, version, types); } void serializeValuesWithoutSize(ClusteringPrefix clustering, DataOutputPlus out, int version, List<AbstractType<?>> types) throws IOException @@ -299,7 +299,7 @@ public interface ClusteringPrefix extends Aliasable<ClusteringPrefix>, IMeasurab } } - long valuesWithoutSizeSerializedSize(ClusteringPrefix clustering, int version, List<AbstractType<?>> types, TypeSizes sizes) + long valuesWithoutSizeSerializedSize(ClusteringPrefix clustering, int version, List<AbstractType<?>> types) { if (clustering.size() == 0) return 0; @@ -311,7 +311,7 @@ public interface ClusteringPrefix extends Aliasable<ClusteringPrefix>, IMeasurab if (v == null || !v.hasRemaining()) continue; // handled in the header - size += types.get(i).writtenLength(v, sizes); + size += types.get(i).writtenLength(v); } return size; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/db/Columns.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Columns.java b/src/java/org/apache/cassandra/db/Columns.java index 83d39db..94e45dc 100644 --- a/src/java/org/apache/cassandra/db/Columns.java +++ b/src/java/org/apache/cassandra/db/Columns.java @@ -502,11 +502,11 @@ public class Columns implements Iterable<ColumnDefinition> ByteBufferUtil.writeWithShortLength(column.name.bytes, out); } - public long serializedSize(Columns columns, TypeSizes sizes) + public long serializedSize(Columns columns) { - long size = sizes.sizeof((short)columns.columnCount()); + long size = TypeSizes.sizeof((short)columns.columnCount()); for (ColumnDefinition column : columns) - size += sizes.sizeofWithShortLength(column.name.bytes); + size += TypeSizes.sizeofWithShortLength(column.name.bytes); return size; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/db/CounterMutation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/CounterMutation.java b/src/java/org/apache/cassandra/db/CounterMutation.java index f87c66c..978417f 100644 --- a/src/java/org/apache/cassandra/db/CounterMutation.java +++ b/src/java/org/apache/cassandra/db/CounterMutation.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.db; -import java.io.DataInput; import java.io.IOException; import java.util.*; import java.util.concurrent.TimeUnit; @@ -37,6 +36,7 @@ import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.context.CounterContext; import org.apache.cassandra.exceptions.WriteTimeoutException; import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; @@ -322,7 +322,7 @@ public class CounterMutation implements IMutation out.writeUTF(cm.consistency.name()); } - public CounterMutation deserialize(DataInput in, int version) throws IOException + public CounterMutation deserialize(DataInputPlus in, int version) throws IOException { Mutation m = Mutation.serializer.deserialize(in, version); ConsistencyLevel consistency = Enum.valueOf(ConsistencyLevel.class, in.readUTF()); @@ -332,7 +332,7 @@ public class CounterMutation implements IMutation public long serializedSize(CounterMutation cm, int version) { return Mutation.serializer.serializedSize(cm.mutation, version) - + TypeSizes.NATIVE.sizeof(cm.consistency.name()); + + TypeSizes.sizeof(cm.consistency.name()); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/db/DataRange.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DataRange.java b/src/java/org/apache/cassandra/db/DataRange.java index 909d6ed..0d7a762 100644 --- a/src/java/org/apache/cassandra/db/DataRange.java +++ b/src/java/org/apache/cassandra/db/DataRange.java @@ -397,7 +397,7 @@ public class DataRange if (range instanceof Paging) { - size += Clustering.serializer.serializedSize(((Paging)range).lastReturned, version, metadata.comparator.subtypes(), TypeSizes.NATIVE); + size += Clustering.serializer.serializedSize(((Paging)range).lastReturned, version, metadata.comparator.subtypes()); size += 1; // inclusive boolean } return size; http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/db/DeletionInfo.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DeletionInfo.java b/src/java/org/apache/cassandra/db/DeletionInfo.java index e54d6b1..a441c48 100644 --- a/src/java/org/apache/cassandra/db/DeletionInfo.java +++ b/src/java/org/apache/cassandra/db/DeletionInfo.java @@ -192,7 +192,7 @@ public class DeletionInfo implements IMeasurableMemory public int dataSize() { - int size = TypeSizes.NATIVE.sizeof(partitionDeletion.markedForDeleteAt()); + int size = TypeSizes.sizeof(partitionDeletion.markedForDeleteAt()); return size + (ranges == null ? 0 : ranges.dataSize()); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/db/DeletionTime.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DeletionTime.java b/src/java/org/apache/cassandra/db/DeletionTime.java index f070778..67842a7 100644 --- a/src/java/org/apache/cassandra/db/DeletionTime.java +++ b/src/java/org/apache/cassandra/db/DeletionTime.java @@ -25,6 +25,7 @@ import com.google.common.base.Objects; import org.apache.cassandra.cache.IMeasurableMemory; import org.apache.cassandra.io.ISerializer; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.utils.FBUtilities; @@ -144,7 +145,7 @@ public abstract class DeletionTime implements Comparable<DeletionTime>, IMeasura out.writeLong(delTime.markedForDeleteAt()); } - public DeletionTime deserialize(DataInput in) throws IOException + public DeletionTime deserialize(DataInputPlus in) throws IOException { int ldt = in.readInt(); long mfda = in.readLong(); @@ -158,10 +159,10 @@ public abstract class DeletionTime implements Comparable<DeletionTime>, IMeasura FileUtils.skipBytesFully(in, 4 + 8); } - public long serializedSize(DeletionTime delTime, TypeSizes typeSizes) + public long serializedSize(DeletionTime delTime) { - return typeSizes.sizeof(delTime.localDeletionTime()) - + typeSizes.sizeof(delTime.markedForDeleteAt()); + return TypeSizes.sizeof(delTime.localDeletionTime()) + + TypeSizes.sizeof(delTime.markedForDeleteAt()); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/db/HintedHandOffManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java index 848ba01..38113c8 100644 --- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java +++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java @@ -26,6 +26,7 @@ import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; + import javax.management.MBeanServer; import javax.management.ObjectName; @@ -33,9 +34,9 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.util.concurrent.RateLimiter; import com.google.common.util.concurrent.Uninterruptibles; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.cassandra.concurrent.JMXEnabledScheduledThreadPoolExecutor; import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.config.ColumnDefinition; @@ -53,6 +54,8 @@ import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.SSTable; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.NIODataInputStream; import org.apache.cassandra.metrics.HintedHandoffMetrics; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; @@ -418,7 +421,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean Cell cell = hint.getCell(hintColumn); final long timestamp = cell.livenessInfo().timestamp(); - DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(cell.value())); + DataInputPlus in = new NIODataInputStream(cell.value(), true); Mutation mutation; try { http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/db/LegacyLayout.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/LegacyLayout.java b/src/java/org/apache/cassandra/db/LegacyLayout.java index c1a7fd0..9eb7145 100644 --- a/src/java/org/apache/cassandra/db/LegacyLayout.java +++ b/src/java/org/apache/cassandra/db/LegacyLayout.java @@ -26,15 +26,16 @@ import java.util.*; import com.google.common.collect.AbstractIterator; import com.google.common.collect.Iterators; import com.google.common.collect.PeekingIterator; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.context.CounterContext; import org.apache.cassandra.db.marshal.*; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.thrift.ColumnDef; import org.apache.cassandra.utils.*; @@ -330,7 +331,7 @@ public abstract class LegacyLayout }; } - public static Row extractStaticColumns(CFMetaData metadata, DataInput in, Columns statics) throws IOException + public static Row extractStaticColumns(CFMetaData metadata, DataInputPlus in, Columns statics) throws IOException { assert !statics.isEmpty(); assert metadata.isCompactTable(); @@ -609,7 +610,7 @@ public abstract class LegacyLayout }; } - public static LegacyAtom readLegacyAtom(CFMetaData metadata, DataInput in, boolean readAllAsDynamic) throws IOException + public static LegacyAtom readLegacyAtom(CFMetaData metadata, DataInputPlus in, boolean readAllAsDynamic) throws IOException { while (true) { @@ -677,14 +678,14 @@ public abstract class LegacyLayout } } - public static LegacyRangeTombstone readLegacyRangeTombstone(CFMetaData metadata, DataInput in) throws IOException + public static LegacyRangeTombstone readLegacyRangeTombstone(CFMetaData metadata, DataInputPlus in) throws IOException { ByteBuffer boundname = ByteBufferUtil.readWithShortLength(in); in.readUnsignedByte(); return readLegacyRangeTombstoneBody(metadata, in, boundname); } - public static LegacyRangeTombstone readLegacyRangeTombstoneBody(CFMetaData metadata, DataInput in, ByteBuffer boundname) throws IOException + public static LegacyRangeTombstone readLegacyRangeTombstoneBody(CFMetaData metadata, DataInputPlus in, ByteBuffer boundname) throws IOException { LegacyBound min = decodeBound(metadata, boundname, true); LegacyBound max = decodeBound(metadata, ByteBufferUtil.readWithShortLength(in), false); @@ -1214,7 +1215,7 @@ public abstract class LegacyLayout //rtlSerializer.serialize(info.ranges, out, version); } - public LegacyDeletionInfo deserialize(CFMetaData metadata, DataInput in, int version) throws IOException + public LegacyDeletionInfo deserialize(CFMetaData metadata, DataInputPlus in, int version) throws IOException { DeletionTime topLevel = DeletionTime.serializer.deserialize(in); http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/db/Mutation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java index b2eaf8e..aca6622 100644 --- a/src/java/org/apache/cassandra/db/Mutation.java +++ b/src/java/org/apache/cassandra/db/Mutation.java @@ -17,24 +17,22 @@ */ package org.apache.cassandra.db; -import java.io.DataInput; import java.io.IOException; import java.util.*; import org.apache.commons.lang3.StringUtils; - import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.rows.SerializationHelper; import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -261,7 +259,7 @@ public class Mutation implements IMutation PartitionUpdate.serializer.serialize(entry.getValue(), out, version); } - public Mutation deserialize(DataInput in, int version, SerializationHelper.Flag flag) throws IOException + public Mutation deserialize(DataInputPlus in, int version, SerializationHelper.Flag flag) throws IOException { String keyspaceName = null; // will always be set from cf.metadata but javac isn't smart enough to see that if (version < MessagingService.VERSION_20) @@ -293,28 +291,27 @@ public class Mutation implements IMutation return new Mutation(keyspaceName, key, modifications); } - public Mutation deserialize(DataInput in, int version) throws IOException + public Mutation deserialize(DataInputPlus in, int version) throws IOException { return deserialize(in, version, SerializationHelper.Flag.FROM_REMOTE); } public long serializedSize(Mutation mutation, int version) { - TypeSizes sizes = TypeSizes.NATIVE; int size = 0; if (version < MessagingService.VERSION_20) - size += sizes.sizeof(mutation.getKeyspaceName()); + size += TypeSizes.sizeof(mutation.getKeyspaceName()); if (version < MessagingService.VERSION_30) { int keySize = mutation.key().getKey().remaining(); - size += sizes.sizeof((short) keySize) + keySize; + size += TypeSizes.sizeof((short) keySize) + keySize; } - size += sizes.sizeof(mutation.modifications.size()); + size += TypeSizes.sizeof(mutation.modifications.size()); for (Map.Entry<UUID, PartitionUpdate> entry : mutation.modifications.entrySet()) - size += PartitionUpdate.serializer.serializedSize(entry.getValue(), version, sizes); + size += PartitionUpdate.serializer.serializedSize(entry.getValue(), version); return size; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/db/PartitionPosition.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/PartitionPosition.java b/src/java/org/apache/cassandra/db/PartitionPosition.java index 1dc940e..afb446d 100644 --- a/src/java/org/apache/cassandra/db/PartitionPosition.java +++ b/src/java/org/apache/cassandra/db/PartitionPosition.java @@ -100,7 +100,7 @@ public interface PartitionPosition extends RingPosition<PartitionPosition> if (kind == Kind.ROW_KEY) { int keySize = ((DecoratedKey)pos).getKey().remaining(); - size += TypeSizes.NATIVE.sizeof((short) keySize) + keySize; + size += TypeSizes.sizeof((short) keySize) + keySize; } else { http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/db/RangeTombstone.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RangeTombstone.java b/src/java/org/apache/cassandra/db/RangeTombstone.java index 3373afa..de21950 100644 --- a/src/java/org/apache/cassandra/db/RangeTombstone.java +++ b/src/java/org/apache/cassandra/db/RangeTombstone.java @@ -153,11 +153,11 @@ public class RangeTombstone ClusteringPrefix.serializer.serializeValuesWithoutSize(bound, out, version, types); } - public long serializedSize(RangeTombstone.Bound bound, int version, List<AbstractType<?>> types, TypeSizes sizes) + public long serializedSize(RangeTombstone.Bound bound, int version, List<AbstractType<?>> types) { return 1 // kind ordinal - + sizes.sizeof((short)bound.size()) - + ClusteringPrefix.serializer.valuesWithoutSizeSerializedSize(bound, version, types, sizes); + + TypeSizes.sizeof((short)bound.size()) + + ClusteringPrefix.serializer.valuesWithoutSizeSerializedSize(bound, version, types); } public Kind deserialize(DataInput in, int version, List<AbstractType<?>> types, Writer writer) throws IOException http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/db/RangeTombstoneList.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RangeTombstoneList.java b/src/java/org/apache/cassandra/db/RangeTombstoneList.java index 0c27bc4..c377d10 100644 --- a/src/java/org/apache/cassandra/db/RangeTombstoneList.java +++ b/src/java/org/apache/cassandra/db/RangeTombstoneList.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.db; -import java.io.DataInput; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; @@ -25,12 +24,13 @@ import java.util.Iterator; import com.google.common.collect.AbstractIterator; import com.google.common.collect.Iterators; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.cassandra.cache.IMeasurableMemory; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.utils.ObjectSizes; import org.apache.cassandra.utils.memory.AbstractAllocator; @@ -289,12 +289,12 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable public int dataSize() { - int dataSize = TypeSizes.NATIVE.sizeof(size); + int dataSize = TypeSizes.sizeof(size); for (int i = 0; i < size; i++) { dataSize += starts[i].dataSize() + ends[i].dataSize(); - dataSize += TypeSizes.NATIVE.sizeof(markedAts[i]); - dataSize += TypeSizes.NATIVE.sizeof(delTimes[i]); + dataSize += TypeSizes.sizeof(markedAts[i]); + dataSize += TypeSizes.sizeof(delTimes[i]); } return dataSize; } @@ -463,7 +463,7 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable RangeTombstoneList that = (RangeTombstoneList)o; if (size != that.size) return false; - + for (int i = 0; i < size; i++) { if (!starts[i].equals(that.starts[i])) @@ -796,7 +796,7 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable //} } - public RangeTombstoneList deserialize(DataInput in, int version) throws IOException + public RangeTombstoneList deserialize(DataInputPlus in, int version) throws IOException { // TODO throw new UnsupportedOperationException(); @@ -834,27 +834,22 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable //return tombstones; } - public long serializedSize(RangeTombstoneList tombstones, TypeSizes typeSizes, int version) + public long serializedSize(RangeTombstoneList tombstones, int version) { // TODO throw new UnsupportedOperationException(); //if (tombstones == null) - // return typeSizes.sizeof(0); + // return TypeSizes.sizeof(0); - //long size = typeSizes.sizeof(tombstones.size); + //long size = TypeSizes.sizeof(tombstones.size); //for (int i = 0; i < tombstones.size; i++) //{ // size += type.serializer().serializedSize(tombstones.starts[i], typeSizes); // size += type.serializer().serializedSize(tombstones.ends[i], typeSizes); - // size += typeSizes.sizeof(tombstones.delTimes[i]); - // size += typeSizes.sizeof(tombstones.markedAts[i]); + // size += TypeSizes.sizeof(tombstones.delTimes[i]); + // size += TypeSizes.sizeof(tombstones.markedAts[i]); //} //return size; } - - public long serializedSize(RangeTombstoneList tombstones, int version) - { - return serializedSize(tombstones, TypeSizes.NATIVE, version); - } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/db/ReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java index bad096f..7cc4884 100644 --- a/src/java/org/apache/cassandra/db/ReadCommand.java +++ b/src/java/org/apache/cassandra/db/ReadCommand.java @@ -21,9 +21,9 @@ import java.io.DataInput; import java.io.IOException; import com.google.common.collect.Iterables; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.config.DatabaseDescriptor; @@ -32,6 +32,7 @@ import org.apache.cassandra.db.filter.*; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.metrics.ColumnFamilyMetrics; import org.apache.cassandra.net.MessageOut; @@ -481,7 +482,7 @@ public abstract class ReadCommand implements ReadQuery command.serializeSelection(out, version); } - public ReadCommand deserialize(DataInput in, int version) throws IOException + public ReadCommand deserialize(DataInputPlus in, int version) throws IOException { if (version < MessagingService.VERSION_30) throw new UnsupportedOperationException(); @@ -504,12 +505,10 @@ public abstract class ReadCommand implements ReadQuery if (version < MessagingService.VERSION_30) throw new UnsupportedOperationException(); - TypeSizes sizes = TypeSizes.NATIVE; - return 2 // kind + flags - + CFMetaData.serializer.serializedSize(command.metadata(), version, sizes) - + sizes.sizeof(command.nowInSec()) - + ColumnFilter.serializer.serializedSize(command.columnFilter(), version, sizes) + + CFMetaData.serializer.serializedSize(command.metadata(), version) + + TypeSizes.sizeof(command.nowInSec()) + + ColumnFilter.serializer.serializedSize(command.columnFilter(), version) + RowFilter.serializer.serializedSize(command.rowFilter(), version) + DataLimits.serializer.serializedSize(command.limits(), version) + command.selectionSerializedSize(version); http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/db/ReadResponse.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java b/src/java/org/apache/cassandra/db/ReadResponse.java index 6453077..6a61416 100644 --- a/src/java/org/apache/cassandra/db/ReadResponse.java +++ b/src/java/org/apache/cassandra/db/ReadResponse.java @@ -26,6 +26,7 @@ import java.util.List; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.net.MessagingService; @@ -161,7 +162,7 @@ public abstract class ReadResponse } } - public ReadResponse deserialize(DataInput in, int version) throws IOException + public ReadResponse deserialize(DataInputPlus in, int version) throws IOException { if (version < MessagingService.VERSION_30) { @@ -186,9 +187,8 @@ public abstract class ReadResponse throw new UnsupportedOperationException(); } - TypeSizes sizes = TypeSizes.NATIVE; boolean isDigest = response.isDigestQuery(); - long size = ByteBufferUtil.serializedSizeWithShortLength(isDigest ? response.digest() : ByteBufferUtil.EMPTY_BYTE_BUFFER, sizes); + long size = ByteBufferUtil.serializedSizeWithShortLength(isDigest ? response.digest() : ByteBufferUtil.EMPTY_BYTE_BUFFER); if (!isDigest) { @@ -196,7 +196,7 @@ public abstract class ReadResponse // version, we'll have to deserialize/re-serialize the data to be in the proper version. assert version == MessagingService.VERSION_30; ByteBuffer data = ((DataResponse)response).data; - size += ByteBufferUtil.serializedSizeWithLength(data, sizes); + size += ByteBufferUtil.serializedSizeWithLength(data); } return size; } @@ -213,7 +213,7 @@ public abstract class ReadResponse // Row.serializer.serialize(row, out, version); } - public ReadResponse deserialize(DataInput in, int version) throws IOException + public ReadResponse deserialize(DataInputPlus in, int version) throws IOException { // TODO throw new UnsupportedOperationException(); @@ -228,7 +228,7 @@ public abstract class ReadResponse { // TODO throw new UnsupportedOperationException(); - // int size = TypeSizes.NATIVE.sizeof(rsr.rows.size()); + // int size = TypeSizes.sizeof(rsr.rows.size()); // for (Row row : rsr.rows) // size += Row.serializer.serializedSize(row, version); // return size; http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/db/RowIndexEntry.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RowIndexEntry.java b/src/java/org/apache/cassandra/db/RowIndexEntry.java index 016e26e..e783508 100644 --- a/src/java/org/apache/cassandra/db/RowIndexEntry.java +++ b/src/java/org/apache/cassandra/db/RowIndexEntry.java @@ -31,6 +31,7 @@ import org.apache.cassandra.cache.IMeasurableMemory; import org.apache.cassandra.io.ISerializer; import org.apache.cassandra.io.sstable.IndexHelper; import org.apache.cassandra.io.sstable.format.Version; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.utils.ObjectSizes; @@ -101,7 +102,7 @@ public class RowIndexEntry<T> implements IMeasurableMemory public static interface IndexSerializer<T> { void serialize(RowIndexEntry<T> rie, DataOutputPlus out) throws IOException; - RowIndexEntry<T> deserialize(DataInput in) throws IOException; + RowIndexEntry<T> deserialize(DataInputPlus in) throws IOException; public int serializedSize(RowIndexEntry<T> rie); } @@ -133,7 +134,7 @@ public class RowIndexEntry<T> implements IMeasurableMemory } } - public RowIndexEntry<IndexHelper.IndexInfo> deserialize(DataInput in) throws IOException + public RowIndexEntry<IndexHelper.IndexInfo> deserialize(DataInputPlus in) throws IOException { long position = in.readLong(); @@ -173,18 +174,18 @@ public class RowIndexEntry<T> implements IMeasurableMemory public int serializedSize(RowIndexEntry<IndexHelper.IndexInfo> rie) { - int size = TypeSizes.NATIVE.sizeof(rie.position) + TypeSizes.NATIVE.sizeof(rie.promotedSize(metadata, version, header)); + int size = TypeSizes.sizeof(rie.position) + TypeSizes.sizeof(rie.promotedSize(metadata, version, header)); if (rie.isIndexed()) { List<IndexHelper.IndexInfo> index = rie.columnsIndex(); - size += DeletionTime.serializer.serializedSize(rie.deletionTime(), TypeSizes.NATIVE); - size += TypeSizes.NATIVE.sizeof(index.size()); + size += DeletionTime.serializer.serializedSize(rie.deletionTime()); + size += TypeSizes.sizeof(index.size()); IndexHelper.IndexInfo.Serializer idxSerializer = metadata.serializers().indexSerializer(version); for (IndexHelper.IndexInfo info : index) - size += idxSerializer.serializedSize(info, header, TypeSizes.NATIVE); + size += idxSerializer.serializedSize(info, header); } @@ -227,12 +228,11 @@ public class RowIndexEntry<T> implements IMeasurableMemory @Override public int promotedSize(CFMetaData metadata, Version version, SerializationHeader header) { - TypeSizes typeSizes = TypeSizes.NATIVE; - long size = DeletionTime.serializer.serializedSize(deletionTime, typeSizes); - size += typeSizes.sizeof(columnsIndex.size()); // number of entries + long size = DeletionTime.serializer.serializedSize(deletionTime); + size += TypeSizes.sizeof(columnsIndex.size()); // number of entries IndexHelper.IndexInfo.Serializer idxSerializer = metadata.serializers().indexSerializer(version); for (IndexHelper.IndexInfo info : columnsIndex) - size += idxSerializer.serializedSize(info, header, typeSizes); + size += idxSerializer.serializedSize(info, header); return Ints.checkedCast(size); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/db/SerializationHeader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SerializationHeader.java b/src/java/org/apache/cassandra/db/SerializationHeader.java index 304332e..c720804 100644 --- a/src/java/org/apache/cassandra/db/SerializationHeader.java +++ b/src/java/org/apache/cassandra/db/SerializationHeader.java @@ -38,6 +38,7 @@ import org.apache.cassandra.io.sstable.format.Version; import org.apache.cassandra.io.sstable.metadata.MetadataType; import org.apache.cassandra.io.sstable.metadata.MetadataComponent; import org.apache.cassandra.io.sstable.metadata.IMetadataComponentSerializer; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.utils.ByteBufferUtil; @@ -377,13 +378,13 @@ public class SerializationHeader return new SerializationHeader(keyType, clusteringTypes, new PartitionColumns(statics, regulars), stats, null); } - public long serializedSizeForMessaging(SerializationHeader header, TypeSizes sizes, boolean hasStatic) + public long serializedSizeForMessaging(SerializationHeader header, boolean hasStatic) { - long size = RowStats.serializer.serializedSize(header.stats, sizes); + long size = RowStats.serializer.serializedSize(header.stats); if (hasStatic) - size += Columns.serializer.serializedSize(header.columns.statics, sizes); - size += Columns.serializer.serializedSize(header.columns.regulars, sizes); + size += Columns.serializer.serializedSize(header.columns.statics); + size += Columns.serializer.serializedSize(header.columns.regulars); return size; } @@ -402,7 +403,7 @@ public class SerializationHeader } // For SSTables - public Component deserialize(Version version, DataInput in) throws IOException + public Component deserialize(Version version, DataInputPlus in) throws IOException { RowStats stats = RowStats.serializer.deserialize(in); @@ -424,16 +425,15 @@ public class SerializationHeader // For SSTables public int serializedSize(Component header) { - TypeSizes sizes = TypeSizes.NATIVE; - int size = RowStats.serializer.serializedSize(header.stats, sizes); + int size = RowStats.serializer.serializedSize(header.stats); - size += sizeofType(header.keyType, sizes); - size += sizes.sizeof((short)header.clusteringTypes.size()); + size += sizeofType(header.keyType); + size += TypeSizes.sizeof((short)header.clusteringTypes.size()); for (AbstractType<?> type : header.clusteringTypes) - size += sizeofType(type, sizes); + size += sizeofType(type); - size += sizeofColumnsWithTypes(header.staticColumns, sizes); - size += sizeofColumnsWithTypes(header.regularColumns, sizes); + size += sizeofColumnsWithTypes(header.staticColumns); + size += sizeofColumnsWithTypes(header.regularColumns); return size; } @@ -447,13 +447,13 @@ public class SerializationHeader } } - private long sizeofColumnsWithTypes(Map<ByteBuffer, AbstractType<?>> columns, TypeSizes sizes) + private long sizeofColumnsWithTypes(Map<ByteBuffer, AbstractType<?>> columns) { - long size = sizes.sizeof((short)columns.size()); + long size = TypeSizes.sizeof((short)columns.size()); for (Map.Entry<ByteBuffer, AbstractType<?>> entry : columns.entrySet()) { - size += sizes.sizeofWithShortLength(entry.getKey()); - size += sizeofType(entry.getValue(), sizes); + size += TypeSizes.sizeofWithShortLength(entry.getKey()); + size += sizeofType(entry.getValue()); } return size; } @@ -480,9 +480,9 @@ public class SerializationHeader return TypeParser.parse(UTF8Type.instance.compose(raw)); } - private int sizeofType(AbstractType<?> type, TypeSizes sizes) + private int sizeofType(AbstractType<?> type) { - return sizes.sizeofWithLength(UTF8Type.instance.decompose(type.toString())); + return TypeSizes.sizeofWithLength(UTF8Type.instance.decompose(type.toString())); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/db/Serializers.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Serializers.java b/src/java/org/apache/cassandra/db/Serializers.java index 862d02e..8056222 100644 --- a/src/java/org/apache/cassandra/db/Serializers.java +++ b/src/java/org/apache/cassandra/db/Serializers.java @@ -21,6 +21,7 @@ import java.io.*; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.io.ISerializer; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.io.sstable.format.Version; @@ -43,7 +44,7 @@ public class Serializers return new IndexInfo.Serializer(metadata, version); } - // Note that for the old layout, this will actually discard the cellname parts that are not strictly + // Note that for the old layout, this will actually discard the cellname parts that are not strictly // part of the clustering prefix. Don't use this if that's not what you want. public ISerializer<ClusteringPrefix> clusteringPrefixSerializer(final Version version, final SerializationHeader header) { @@ -57,14 +58,14 @@ public class Serializers ClusteringPrefix.serializer.serialize(clustering, out, version.correspondingMessagingVersion(), header.clusteringTypes()); } - public ClusteringPrefix deserialize(DataInput in) throws IOException + public ClusteringPrefix deserialize(DataInputPlus in) throws IOException { return ClusteringPrefix.serializer.deserialize(in, version.correspondingMessagingVersion(), header.clusteringTypes()); } - public long serializedSize(ClusteringPrefix clustering, TypeSizes sizes) + public long serializedSize(ClusteringPrefix clustering) { - return ClusteringPrefix.serializer.serializedSize(clustering, version.correspondingMessagingVersion(), header.clusteringTypes(), sizes); + return ClusteringPrefix.serializer.serializedSize(clustering, version.correspondingMessagingVersion(), header.clusteringTypes()); } }; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java index 38651c1..3fa8486 100644 --- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java @@ -401,8 +401,7 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter protected long selectionSerializedSize(int version) { - TypeSizes sizes = TypeSizes.NATIVE; - return metadata().getKeyValidator().writtenLength(partitionKey().getKey(), sizes) + return metadata().getKeyValidator().writtenLength(partitionKey().getKey()) + ClusteringIndexFilter.serializer.serializedSize(clusteringIndexFilter(), version); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/db/Slice.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Slice.java b/src/java/org/apache/cassandra/db/Slice.java index dae491e..05c2977 100644 --- a/src/java/org/apache/cassandra/db/Slice.java +++ b/src/java/org/apache/cassandra/db/Slice.java @@ -325,10 +325,10 @@ public class Slice Bound.serializer.serialize(slice.end, out, version, types); } - public long serializedSize(Slice slice, int version, List<AbstractType<?>> types, TypeSizes sizes) + public long serializedSize(Slice slice, int version, List<AbstractType<?>> types) { - return Bound.serializer.serializedSize(slice.start, version, types, sizes) - + Bound.serializer.serializedSize(slice.end, version, types, sizes); + return Bound.serializer.serializedSize(slice.start, version, types) + + Bound.serializer.serializedSize(slice.end, version, types); } public Slice deserialize(DataInput in, int version, List<AbstractType<?>> types) throws IOException @@ -615,11 +615,11 @@ public class Slice ClusteringPrefix.serializer.serializeValuesWithoutSize(bound, out, version, types); } - public long serializedSize(Slice.Bound bound, int version, List<AbstractType<?>> types, TypeSizes sizes) + public long serializedSize(Slice.Bound bound, int version, List<AbstractType<?>> types) { return 1 // kind ordinal - + sizes.sizeof((short)bound.size()) - + ClusteringPrefix.serializer.valuesWithoutSizeSerializedSize(bound, version, types, sizes); + + TypeSizes.sizeof((short)bound.size()) + + ClusteringPrefix.serializer.valuesWithoutSizeSerializedSize(bound, version, types); } public Slice.Bound deserialize(DataInput in, int version, List<AbstractType<?>> types) throws IOException http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/db/Slices.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Slices.java b/src/java/org/apache/cassandra/db/Slices.java index ec7797d..a6c690b 100644 --- a/src/java/org/apache/cassandra/db/Slices.java +++ b/src/java/org/apache/cassandra/db/Slices.java @@ -301,9 +301,9 @@ public abstract class Slices implements Iterable<Slice> Slice.serializer.serialize(slice, out, version, types); } - public long serializedSize(Slices slices, int version, TypeSizes sizes) + public long serializedSize(Slices slices, int version) { - long size = sizes.sizeof(slices.size()); + long size = TypeSizes.sizeof(slices.size()); if (slices.size() == 0) return size; @@ -313,7 +313,7 @@ public abstract class Slices implements Iterable<Slice> : ((ArrayBackedSlices)slices).comparator.subtypes(); for (Slice slice : slices) - size += Slice.serializer.serializedSize(slice, version, types, sizes); + size += Slice.serializer.serializedSize(slice, version, types); return size; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/db/SnapshotCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SnapshotCommand.java b/src/java/org/apache/cassandra/db/SnapshotCommand.java index 427e9ec..eb6f67a 100644 --- a/src/java/org/apache/cassandra/db/SnapshotCommand.java +++ b/src/java/org/apache/cassandra/db/SnapshotCommand.java @@ -17,10 +17,10 @@ */ package org.apache.cassandra.db; -import java.io.DataInput; import java.io.IOException; import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; @@ -67,7 +67,7 @@ class SnapshotCommandSerializer implements IVersionedSerializer<SnapshotCommand> out.writeBoolean(snapshot_command.clear_snapshot); } - public SnapshotCommand deserialize(DataInput in, int version) throws IOException + public SnapshotCommand deserialize(DataInputPlus in, int version) throws IOException { String keyspace = in.readUTF(); String column_family = in.readUTF(); @@ -78,9 +78,9 @@ class SnapshotCommandSerializer implements IVersionedSerializer<SnapshotCommand> public long serializedSize(SnapshotCommand sc, int version) { - return TypeSizes.NATIVE.sizeof(sc.keyspace) - + TypeSizes.NATIVE.sizeof(sc.column_family) - + TypeSizes.NATIVE.sizeof(sc.snapshot_name) - + TypeSizes.NATIVE.sizeof(sc.clear_snapshot); + return TypeSizes.sizeof(sc.keyspace) + + TypeSizes.sizeof(sc.column_family) + + TypeSizes.sizeof(sc.snapshot_name) + + TypeSizes.sizeof(sc.clear_snapshot); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/db/SystemKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index ffbc1eb..f38436b 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -22,15 +22,16 @@ import java.net.InetAddress; import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.TimeUnit; + import javax.management.openmbean.OpenDataException; import javax.management.openmbean.TabularData; import com.google.common.base.Function; import com.google.common.collect.*; import com.google.common.io.ByteStreams; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.QueryProcessor; @@ -47,6 +48,7 @@ import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.io.util.NIODataInputStream; import org.apache.cassandra.locator.IEndpointSnitch; import org.apache.cassandra.metrics.RestorableMeter; import org.apache.cassandra.net.MessagingService; @@ -519,7 +521,7 @@ public final class SystemKeyspace { try { - DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(bytes)); + NIODataInputStream in = new NIODataInputStream(bytes, true); return Pair.create(ReplayPosition.serializer.deserialize(in), in.available() > 0 ? in.readLong() : Long.MIN_VALUE); } catch (IOException e) http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/db/TruncateResponse.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/TruncateResponse.java b/src/java/org/apache/cassandra/db/TruncateResponse.java index d8f5ad2..af4ed8f 100644 --- a/src/java/org/apache/cassandra/db/TruncateResponse.java +++ b/src/java/org/apache/cassandra/db/TruncateResponse.java @@ -17,10 +17,10 @@ */ package org.apache.cassandra.db; -import java.io.DataInput; import java.io.IOException; import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; @@ -58,7 +58,7 @@ public class TruncateResponse out.writeBoolean(tr.success); } - public TruncateResponse deserialize(DataInput in, int version) throws IOException + public TruncateResponse deserialize(DataInputPlus in, int version) throws IOException { String keyspace = in.readUTF(); String columnFamily = in.readUTF(); @@ -68,9 +68,9 @@ public class TruncateResponse public long serializedSize(TruncateResponse tr, int version) { - return TypeSizes.NATIVE.sizeof(tr.keyspace) - + TypeSizes.NATIVE.sizeof(tr.columnFamily) - + TypeSizes.NATIVE.sizeof(tr.success); + return TypeSizes.sizeof(tr.keyspace) + + TypeSizes.sizeof(tr.columnFamily) + + TypeSizes.sizeof(tr.success); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/db/Truncation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Truncation.java b/src/java/org/apache/cassandra/db/Truncation.java index 88742cd..39a2ec6 100644 --- a/src/java/org/apache/cassandra/db/Truncation.java +++ b/src/java/org/apache/cassandra/db/Truncation.java @@ -17,10 +17,10 @@ */ package org.apache.cassandra.db; -import java.io.DataInput; import java.io.IOException; import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; @@ -60,7 +60,7 @@ class TruncationSerializer implements IVersionedSerializer<Truncation> out.writeUTF(t.columnFamily); } - public Truncation deserialize(DataInput in, int version) throws IOException + public Truncation deserialize(DataInputPlus in, int version) throws IOException { String keyspace = in.readUTF(); String columnFamily = in.readUTF(); @@ -69,6 +69,6 @@ class TruncationSerializer implements IVersionedSerializer<Truncation> public long serializedSize(Truncation truncation, int version) { - return TypeSizes.NATIVE.sizeof(truncation.keyspace) + TypeSizes.NATIVE.sizeof(truncation.columnFamily); + return TypeSizes.sizeof(truncation.keyspace) + TypeSizes.sizeof(truncation.columnFamily); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/db/TypeSizes.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/TypeSizes.java b/src/java/org/apache/cassandra/db/TypeSizes.java index 79d5774..a9e432f 100644 --- a/src/java/org/apache/cassandra/db/TypeSizes.java +++ b/src/java/org/apache/cassandra/db/TypeSizes.java @@ -22,10 +22,10 @@ import java.util.UUID; import org.apache.cassandra.utils.vint.VIntCoding; -public abstract class TypeSizes +public final class TypeSizes { - public static final TypeSizes NATIVE = new NativeDBTypeSizes(); - public static final TypeSizes VINT = new VIntEncodedTypeSizes(); + + private TypeSizes(){} private static final int BOOL_SIZE = 1; private static final int SHORT_SIZE = 2; @@ -33,14 +33,8 @@ public abstract class TypeSizes private static final int LONG_SIZE = 8; private static final int UUID_SIZE = 16; - public abstract int sizeof(boolean value); - public abstract int sizeof(short value); - public abstract int sizeof(int value); - public abstract int sizeof(long value); - public abstract int sizeof(UUID value); - /** assumes UTF8 */ - public int sizeof(String value) + public static int sizeof(String value) { int length = encodedUTF8Length(value); assert length <= Short.MAX_VALUE; @@ -64,76 +58,43 @@ public abstract class TypeSizes return utflen; } - public int sizeofWithShortLength(ByteBuffer value) + public static int sizeofWithShortLength(ByteBuffer value) { return sizeof((short) value.remaining()) + value.remaining(); } - public int sizeofWithLength(ByteBuffer value) + public static int sizeofWithLength(ByteBuffer value) { return sizeof(value.remaining()) + value.remaining(); } - public static class NativeDBTypeSizes extends TypeSizes + public static int sizeof(boolean value) { - public int sizeof(boolean value) - { - return BOOL_SIZE; - } - - public int sizeof(short value) - { - return SHORT_SIZE; - } - - public int sizeof(int value) - { - return INT_SIZE; - } - - public int sizeof(long value) - { - return LONG_SIZE; - } - - public int sizeof(UUID value) - { - return UUID_SIZE; - } + return BOOL_SIZE; } - public static class VIntEncodedTypeSizes extends TypeSizes + public static int sizeof(short value) { - private static final int BOOL_SIZE = 1; - - public int sizeofVInt(long i) - { - return VIntCoding.computeVIntSize(i); - } - - public int sizeof(long i) - { - return sizeofVInt(i); - } + return SHORT_SIZE; + } - public int sizeof(boolean i) - { - return BOOL_SIZE; - } + public static int sizeof(int value) + { + return INT_SIZE; + } - public int sizeof(short i) - { - return sizeofVInt(i); - } + public static int sizeof(long value) + { + return LONG_SIZE; + } - public int sizeof(int i) - { - return sizeofVInt(i); - } + public static int sizeof(UUID value) + { + return UUID_SIZE; + } - public int sizeof(UUID value) - { - return sizeofVInt(value.getMostSignificantBits()) + sizeofVInt(value.getLeastSignificantBits()); - } + public static int sizeofVInt(long value) + { + return VIntCoding.computeVIntSize(value); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java index a15fb61..cf7c2dd 100644 --- a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java +++ b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java @@ -17,14 +17,13 @@ */ package org.apache.cassandra.db; -import java.io.DataInput; import java.io.IOException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.net.MessagingService; /** @@ -39,11 +38,11 @@ public abstract class UnfilteredDeserializer private static final Logger logger = LoggerFactory.getLogger(UnfilteredDeserializer.class); protected final CFMetaData metadata; - protected final DataInput in; + protected final DataInputPlus in; protected final SerializationHelper helper; protected UnfilteredDeserializer(CFMetaData metadata, - DataInput in, + DataInputPlus in, SerializationHelper helper) { this.metadata = metadata; @@ -52,7 +51,7 @@ public abstract class UnfilteredDeserializer } public static UnfilteredDeserializer create(CFMetaData metadata, - DataInput in, + DataInputPlus in, SerializationHeader header, SerializationHelper helper, DeletionTime partitionDeletion, @@ -116,7 +115,7 @@ public abstract class UnfilteredDeserializer private final RangeTombstoneMarker.Builder markerBuilder; private CurrentDeserializer(CFMetaData metadata, - DataInput in, + DataInputPlus in, SerializationHeader header, SerializationHelper helper) { @@ -237,7 +236,7 @@ public abstract class UnfilteredDeserializer private RangeTombstoneMarker closingMarker; private OldFormatDeserializer(CFMetaData metadata, - DataInput in, + DataInputPlus in, SerializationHelper helper, DeletionTime partitionDeletion, boolean readAllAsDynamic) http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/db/WriteResponse.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/WriteResponse.java b/src/java/org/apache/cassandra/db/WriteResponse.java index a7b108b..824368e 100644 --- a/src/java/org/apache/cassandra/db/WriteResponse.java +++ b/src/java/org/apache/cassandra/db/WriteResponse.java @@ -17,10 +17,10 @@ */ package org.apache.cassandra.db; -import java.io.DataInput; import java.io.IOException; import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; @@ -43,7 +43,7 @@ public class WriteResponse { } - public WriteResponse deserialize(DataInput in, int version) throws IOException + public WriteResponse deserialize(DataInputPlus in, int version) throws IOException { return new WriteResponse(); }