http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/io/util/DataInputPlus.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/DataInputPlus.java b/src/java/org/apache/cassandra/io/util/DataInputPlus.java new file mode 100644 index 0000000..d4e25d6 --- /dev/null +++ b/src/java/org/apache/cassandra/io/util/DataInputPlus.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.util; + +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; + +import org.apache.cassandra.utils.vint.VIntCoding; + +/** + * Extension to DataInput that provides support for reading varints + */ +public interface DataInputPlus extends DataInput +{ + + default long readVInt() throws IOException + { + return VIntCoding.readVInt(this); + } + + /** + * Think hard before opting for an unsigned encoding. Is this going to bite someone because some day + * they might need to pass in a sentinel value using negative numbers? Is the risk worth it + * to save a few bytes? + * + * Signed, not a fan of unsigned values in protocols and formats + */ + default long readUnsignedVInt() throws IOException + { + return VIntCoding.readUnsignedVInt(this); + } + + public static class ForwardingDataInput implements DataInput + { + protected final DataInput in; + + public ForwardingDataInput(DataInput in) + { + this.in = in; + } + + @Override + public void readFully(byte[] b) throws IOException + { + in.readFully(b); + } + + @Override + public void readFully(byte[] b, int off, int len) throws IOException + { + in.readFully(b, off, len); + } + + @Override + public int skipBytes(int n) throws IOException + { + return in.skipBytes(n); + } + + @Override + public boolean readBoolean() throws IOException + { + return in.readBoolean(); + } + + @Override + public byte readByte() throws IOException + { + return in.readByte(); + } + + @Override + public int readUnsignedByte() throws IOException + { + return in.readUnsignedByte(); + } + + @Override + public short readShort() throws IOException + { + return in.readShort(); + } + + @Override + public int readUnsignedShort() throws IOException + { + return in.readUnsignedShort(); + } + + @Override + public char readChar() throws IOException + { + return in.readChar(); + } + + @Override + public int readInt() throws IOException + { + return in.readInt(); + } + + @Override + public long readLong() throws IOException + { + return in.readLong(); + } + + @Override + public float readFloat() throws IOException + { + return in.readFloat(); + } + + @Override + public double readDouble() throws IOException + { + return in.readDouble(); + } + + @Override + public String readLine() throws IOException + { + return in.readLine(); + } + + @Override + public String readUTF() throws IOException + { + return in.readUTF(); + } + } + + public static class DataInputPlusAdapter extends ForwardingDataInput implements DataInputPlus + { + public DataInputPlusAdapter(DataInput in) + { + super(in); + } + } + + /** + * Wrapper around an InputStream that provides no buffering but can decode varints + */ + public class DataInputStreamPlus extends DataInputStream implements DataInputPlus + { + public DataInputStreamPlus(InputStream is) + { + super(is); + } + } +}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/io/util/DataOutputPlus.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/DataOutputPlus.java b/src/java/org/apache/cassandra/io/util/DataOutputPlus.java index f6a3648..b46d23a 100644 --- a/src/java/org/apache/cassandra/io/util/DataOutputPlus.java +++ b/src/java/org/apache/cassandra/io/util/DataOutputPlus.java @@ -59,4 +59,113 @@ public interface DataOutputPlus extends DataOutput { VIntCoding.writeUnsignedVInt(i, this); } + + + public static class ForwardingDataOutput implements DataOutput + { + protected final DataOutput out; + + public ForwardingDataOutput(DataOutput out) + { + this.out = out; + } + + public void write(byte[] b) throws IOException + { + out.write(b); + } + + public void write(byte[] b, int off, int len) throws IOException + { + out.write(b, off, len); + } + + public void write(int b) throws IOException + { + out.write(b); + } + + public void writeBoolean(boolean v) throws IOException + { + out.writeBoolean(v); + } + + public void writeByte(int v) throws IOException + { + out.writeByte(v); + } + + public void writeBytes(String s) throws IOException + { + out.writeBytes(s); + } + + public void writeChar(int v) throws IOException + { + out.writeChar(v); + } + + public void writeChars(String s) throws IOException + { + out.writeChars(s); + } + + public void writeDouble(double v) throws IOException + { + out.writeDouble(v); + } + + public void writeFloat(float v) throws IOException + { + out.writeFloat(v); + } + + public void writeInt(int v) throws IOException + { + out.writeInt(v); + } + + public void writeLong(long v) throws IOException + { + out.writeLong(v); + } + + public void writeShort(int v) throws IOException + { + out.writeShort(v); + } + + public void writeUTF(String s) throws IOException + { + out.writeUTF(s); + } + + } + + public static class DataOutputPlusAdapter extends ForwardingDataOutput implements DataOutputPlus + { + + public DataOutputPlusAdapter(DataOutput out) + { + super(out); + } + + public void write(ByteBuffer buffer) throws IOException + { + if (buffer.hasArray()) + out.write(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining()); + else + throw new UnsupportedOperationException("IMPLEMENT ME"); + } + + public void write(Memory memory, long offset, long length) throws IOException + { + throw new UnsupportedOperationException("IMPLEMENT ME"); + } + + public <R> R applyToChannel(Function<WritableByteChannel, R> c) throws IOException + { + throw new UnsupportedOperationException("IMPLEMENT ME"); + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/io/util/FileDataInput.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/FileDataInput.java b/src/java/org/apache/cassandra/io/util/FileDataInput.java index d94075c..b63b750 100644 --- a/src/java/org/apache/cassandra/io/util/FileDataInput.java +++ b/src/java/org/apache/cassandra/io/util/FileDataInput.java @@ -18,11 +18,10 @@ package org.apache.cassandra.io.util; import java.io.Closeable; -import java.io.DataInput; import java.io.IOException; import java.nio.ByteBuffer; -public interface FileDataInput extends DataInput, Closeable +public interface FileDataInput extends DataInputPlus, Closeable { public String getPath(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/io/util/NIODataInputStream.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/NIODataInputStream.java b/src/java/org/apache/cassandra/io/util/NIODataInputStream.java index 4816379..edbf660 100644 --- a/src/java/org/apache/cassandra/io/util/NIODataInputStream.java +++ b/src/java/org/apache/cassandra/io/util/NIODataInputStream.java @@ -18,7 +18,6 @@ package org.apache.cassandra.io.util; import java.io.Closeable; -import java.io.DataInput; import java.io.DataInputStream; import java.io.EOFException; import java.io.IOException; @@ -43,11 +42,35 @@ import com.google.common.base.Preconditions; * * NIODataInputStream is not thread safe. */ -public class NIODataInputStream extends InputStream implements DataInput, Closeable +public class NIODataInputStream extends InputStream implements DataInputPlus, Closeable { private final ReadableByteChannel rbc; private final ByteBuffer buf; + /* + * Used when wrapping a fixed buffer of data instead of a channel + */ + private static final ReadableByteChannel emptyReadableByteChannel = new ReadableByteChannel() + { + + @Override + public boolean isOpen() + { + return true; + } + + @Override + public void close() throws IOException + { + } + + @Override + public int read(ByteBuffer dst) throws IOException + { + return -1; + } + + }; public NIODataInputStream(ReadableByteChannel rbc, int bufferSize) { @@ -59,6 +82,53 @@ public class NIODataInputStream extends InputStream implements DataInput, Closea buf.limit(0); } + /** + * + * @param buf + * @param duplicate Whether or not to duplicate the buffer to ensure thread safety + */ + public NIODataInputStream(ByteBuffer buf, boolean duplicate) + { + Preconditions.checkNotNull(buf); + Preconditions.checkArgument(buf.capacity() >= 9, "Buffer size must be large enough to accomadate a varint"); + if (duplicate) + this.buf = buf.duplicate(); + else + this.buf = buf; + this.rbc = emptyReadableByteChannel; + } + + /* + * The decision to duplicate or not really needs to conscious since it a real impact + * in terms of thread safety so don't expose this constructor with an implicit default. + */ + private NIODataInputStream(ByteBuffer buf) + { + this(buf, false); + } + + private static ByteBuffer slice(byte buffer[], int offset, int length) + { + ByteBuffer buf = ByteBuffer.wrap(buffer); + if (offset > 0 || length < buf.capacity()) + { + buf.position(offset); + buf.limit(offset + length); + buf = buf.slice(); + } + return buf; + } + + public NIODataInputStream(byte buffer[], int offset, int length) + { + this(slice(buffer, offset, length)); + } + + public NIODataInputStream(byte buffer[]) + { + this(ByteBuffer.wrap(buffer)); + } + @Override public void readFully(byte[] b) throws IOException { http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java index 274e47b..4b66c4e 100644 --- a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java +++ b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java @@ -18,15 +18,14 @@ package org.apache.cassandra.net; import java.io.Closeable; -import java.io.DataInput; -import java.io.DataInputStream; import java.io.IOException; import java.net.Socket; import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus; import org.apache.cassandra.streaming.StreamResultFuture; import org.apache.cassandra.streaming.messages.StreamInitMessage; import org.apache.cassandra.streaming.messages.StreamMessage; @@ -59,7 +58,7 @@ public class IncomingStreamingConnection extends Thread implements Closeable if (version != StreamMessage.CURRENT_VERSION) throw new IOException(String.format("Received stream using protocol version %d (my version %d). Terminating connection", version, MessagingService.current_version)); - DataInput input = new DataInputStream(socket.getInputStream()); + DataInputPlus input = new DataInputStreamPlus(socket.getInputStream()); StreamInitMessage init = StreamInitMessage.serializer.deserialize(input, version); // The initiator makes two connections, one for incoming and one for outgoing. @@ -74,7 +73,7 @@ public class IncomingStreamingConnection extends Thread implements Closeable close(); } } - + @Override public void close() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/net/IncomingTcpConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java index c325717..25f850d 100644 --- a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java +++ b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java @@ -36,7 +36,8 @@ import org.apache.cassandra.config.Config; import org.xerial.snappy.SnappyInputStream; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.UnknownColumnFamilyException; -import org.apache.cassandra.gms.Gossiper; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus; import org.apache.cassandra.io.util.NIODataInputStream; public class IncomingTcpConnection extends Thread implements Closeable @@ -106,7 +107,7 @@ public class IncomingTcpConnection extends Thread implements Closeable close(); } } - + @Override public void close() { @@ -135,7 +136,7 @@ public class IncomingTcpConnection extends Thread implements Closeable // to connect with, the other node will disconnect out.writeInt(MessagingService.current_version); out.flush(); - DataInput in = new DataInputStream(socket.getInputStream()); + DataInputPlus in = new DataInputStreamPlus(socket.getInputStream()); int maxVersion = in.readInt(); // outbound side will reconnect if necessary to upgrade version assert version <= MessagingService.current_version; @@ -149,13 +150,13 @@ public class IncomingTcpConnection extends Thread implements Closeable logger.debug("Upgrading incoming connection to be compressed"); if (version < MessagingService.VERSION_21) { - in = new DataInputStream(new SnappyInputStream(socket.getInputStream())); + in = new DataInputStreamPlus(new SnappyInputStream(socket.getInputStream())); } else { LZ4FastDecompressor decompressor = LZ4Factory.fastestInstance().fastDecompressor(); Checksum checksum = XXHashFactory.fastestInstance().newStreamingHash32(OutboundTcpConnection.LZ4_HASH_SEED).asChecksum(); - in = new DataInputStream(new LZ4BlockInputStream(socket.getInputStream(), + in = new DataInputStreamPlus(new LZ4BlockInputStream(socket.getInputStream(), decompressor, checksum)); } @@ -172,7 +173,7 @@ public class IncomingTcpConnection extends Thread implements Closeable } } - private InetAddress receiveMessage(DataInput input, int version) throws IOException + private InetAddress receiveMessage(DataInputPlus input, int version) throws IOException { int id; if (version < MessagingService.VERSION_20) http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/net/MessageIn.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessageIn.java b/src/java/org/apache/cassandra/net/MessageIn.java index 10260c2..82f4000 100644 --- a/src/java/org/apache/cassandra/net/MessageIn.java +++ b/src/java/org/apache/cassandra/net/MessageIn.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.net; -import java.io.DataInput; import java.io.IOException; import java.net.InetAddress; import java.util.Collections; @@ -27,10 +26,10 @@ import com.google.common.collect.ImmutableMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.FileUtils; public class MessageIn<T> @@ -57,7 +56,7 @@ public class MessageIn<T> return new MessageIn<T>(from, payload, parameters, verb, version); } - public static <T2> MessageIn<T2> read(DataInput in, int version, int id) throws IOException + public static <T2> MessageIn<T2> read(DataInputPlus in, int version, int id) throws IOException { InetAddress from = CompactEndpointSerializationHelper.deserialize(in); http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/net/MessageOut.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessageOut.java b/src/java/org/apache/cassandra/net/MessageOut.java index 28038b3..63e583a 100644 --- a/src/java/org/apache/cassandra/net/MessageOut.java +++ b/src/java/org/apache/cassandra/net/MessageOut.java @@ -129,18 +129,18 @@ public class MessageOut<T> { int size = CompactEndpointSerializationHelper.serializedSize(from); - size += TypeSizes.NATIVE.sizeof(verb.ordinal()); - size += TypeSizes.NATIVE.sizeof(parameters.size()); + size += TypeSizes.sizeof(verb.ordinal()); + size += TypeSizes.sizeof(parameters.size()); for (Map.Entry<String, byte[]> entry : parameters.entrySet()) { - size += TypeSizes.NATIVE.sizeof(entry.getKey()); - size += TypeSizes.NATIVE.sizeof(entry.getValue().length); + size += TypeSizes.sizeof(entry.getKey()); + size += TypeSizes.sizeof(entry.getValue().length); size += entry.getValue().length; } long longSize = payloadSize(version); assert longSize <= Integer.MAX_VALUE; // larger values are supported in sstables but not messages - size += TypeSizes.NATIVE.sizeof((int) longSize); + size += TypeSizes.sizeof((int) longSize); size += longSize; return size; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index 3f2160f..22bfdbf 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -37,10 +37,8 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.cliffc.high_scale_lib.NonBlockingHashMap; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; @@ -57,6 +55,7 @@ import org.apache.cassandra.gms.GossipDigestAck; import org.apache.cassandra.gms.GossipDigestAck2; import org.apache.cassandra.gms.GossipDigestSyn; import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.locator.ILatencySubscriber; @@ -257,7 +256,7 @@ public final class MessagingService implements MessagingServiceMBean { public static final CallbackDeterminedSerializer instance = new CallbackDeterminedSerializer(); - public Object deserialize(DataInput in, int version) throws IOException + public Object deserialize(DataInputPlus in, int version) throws IOException { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/repair/NodePair.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/NodePair.java b/src/java/org/apache/cassandra/repair/NodePair.java index bb6be04..a73c61a 100644 --- a/src/java/org/apache/cassandra/repair/NodePair.java +++ b/src/java/org/apache/cassandra/repair/NodePair.java @@ -17,13 +17,13 @@ */ package org.apache.cassandra.repair; -import java.io.DataInput; import java.io.IOException; import java.net.InetAddress; import com.google.common.base.Objects; import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.CompactEndpointSerializationHelper; @@ -69,7 +69,7 @@ public class NodePair CompactEndpointSerializationHelper.serialize(nodePair.endpoint2, out); } - public NodePair deserialize(DataInput in, int version) throws IOException + public NodePair deserialize(DataInputPlus in, int version) throws IOException { InetAddress ep1 = CompactEndpointSerializationHelper.deserialize(in); InetAddress ep2 = CompactEndpointSerializationHelper.deserialize(in); http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/repair/RepairJobDesc.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairJobDesc.java b/src/java/org/apache/cassandra/repair/RepairJobDesc.java index 8382136..1dd67c7 100644 --- a/src/java/org/apache/cassandra/repair/RepairJobDesc.java +++ b/src/java/org/apache/cassandra/repair/RepairJobDesc.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.repair; -import java.io.DataInput; import java.io.IOException; import java.util.UUID; @@ -28,6 +27,7 @@ import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.utils.UUIDSerializer; @@ -104,7 +104,7 @@ public class RepairJobDesc AbstractBounds.tokenSerializer.serialize(desc.range, out, version); } - public RepairJobDesc deserialize(DataInput in, int version) throws IOException + public RepairJobDesc deserialize(DataInputPlus in, int version) throws IOException { UUID parentSessionId = null; if (version >= MessagingService.VERSION_21) @@ -124,13 +124,13 @@ public class RepairJobDesc int size = 0; if (version >= MessagingService.VERSION_21) { - size += TypeSizes.NATIVE.sizeof(desc.parentSessionId != null); + size += TypeSizes.sizeof(desc.parentSessionId != null); if (desc.parentSessionId != null) size += UUIDSerializer.serializer.serializedSize(desc.parentSessionId, version); } size += UUIDSerializer.serializer.serializedSize(desc.sessionId, version); - size += TypeSizes.NATIVE.sizeof(desc.keyspace); - size += TypeSizes.NATIVE.sizeof(desc.columnFamily); + size += TypeSizes.sizeof(desc.keyspace); + size += TypeSizes.sizeof(desc.columnFamily); size += AbstractBounds.tokenSerializer.serializedSize(desc.range, version); return size; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java b/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java index b554500..3e47374 100644 --- a/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java +++ b/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.repair.messages; -import java.io.DataInput; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -26,6 +25,7 @@ import java.util.UUID; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.utils.UUIDSerializer; @@ -59,7 +59,7 @@ public class AnticompactionRequest extends RepairMessage } } - public AnticompactionRequest deserialize(DataInput in, int version) throws IOException + public AnticompactionRequest deserialize(DataInputPlus in, int version) throws IOException { UUID parentRepairSession = UUIDSerializer.serializer.deserialize(in, version); int rangeCount = in.readInt(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/repair/messages/CleanupMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/CleanupMessage.java b/src/java/org/apache/cassandra/repair/messages/CleanupMessage.java index 6d702ce..43a8f02 100644 --- a/src/java/org/apache/cassandra/repair/messages/CleanupMessage.java +++ b/src/java/org/apache/cassandra/repair/messages/CleanupMessage.java @@ -17,10 +17,10 @@ */ package org.apache.cassandra.repair.messages; -import java.io.DataInput; import java.io.IOException; import java.util.UUID; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.utils.UUIDSerializer; @@ -47,7 +47,7 @@ public class CleanupMessage extends RepairMessage UUIDSerializer.serializer.serialize(message.parentRepairSession, out, version); } - public CleanupMessage deserialize(DataInput in, int version) throws IOException + public CleanupMessage deserialize(DataInputPlus in, int version) throws IOException { UUID parentRepairSession = UUIDSerializer.serializer.deserialize(in, version); return new CleanupMessage(parentRepairSession); http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java b/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java index 37dc07c..cd1b99d 100644 --- a/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java +++ b/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.repair.messages; -import java.io.DataInput; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -27,6 +26,7 @@ import java.util.UUID; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.utils.UUIDSerializer; @@ -67,7 +67,7 @@ public class PrepareMessage extends RepairMessage out.writeBoolean(message.isIncremental); } - public PrepareMessage deserialize(DataInput in, int version) throws IOException + public PrepareMessage deserialize(DataInputPlus in, int version) throws IOException { int cfIdCount = in.readInt(); List<UUID> cfIds = new ArrayList<>(cfIdCount); @@ -85,15 +85,14 @@ public class PrepareMessage extends RepairMessage public long serializedSize(PrepareMessage message, int version) { long size; - TypeSizes sizes = TypeSizes.NATIVE; - size = sizes.sizeof(message.cfIds.size()); + size = TypeSizes.sizeof(message.cfIds.size()); for (UUID cfId : message.cfIds) size += UUIDSerializer.serializer.serializedSize(cfId, version); size += UUIDSerializer.serializer.serializedSize(message.parentRepairSession, version); - size += sizes.sizeof(message.ranges.size()); + size += TypeSizes.sizeof(message.ranges.size()); for (Range<Token> r : message.ranges) size += Range.tokenSerializer.serializedSize(r, version); - size += sizes.sizeof(message.isIncremental); + size += TypeSizes.sizeof(message.isIncremental); return size; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/repair/messages/RepairMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java index 6b5226d..55fdb66 100644 --- a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java +++ b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java @@ -17,10 +17,10 @@ */ package org.apache.cassandra.repair.messages; -import java.io.DataInput; import java.io.IOException; import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; @@ -90,7 +90,7 @@ public abstract class RepairMessage message.messageType.serializer.serialize(message, out, version); } - public RepairMessage deserialize(DataInput in, int version) throws IOException + public RepairMessage deserialize(DataInputPlus in, int version) throws IOException { RepairMessage.Type messageType = RepairMessage.Type.fromByte(in.readByte()); return messageType.serializer.deserialize(in, version); http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/repair/messages/SnapshotMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/SnapshotMessage.java b/src/java/org/apache/cassandra/repair/messages/SnapshotMessage.java index caccc82..1b15126 100644 --- a/src/java/org/apache/cassandra/repair/messages/SnapshotMessage.java +++ b/src/java/org/apache/cassandra/repair/messages/SnapshotMessage.java @@ -17,9 +17,9 @@ */ package org.apache.cassandra.repair.messages; -import java.io.DataInput; import java.io.IOException; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.repair.RepairJobDesc; @@ -39,7 +39,7 @@ public class SnapshotMessage extends RepairMessage RepairJobDesc.serializer.serialize(message.desc, out, version); } - public SnapshotMessage deserialize(DataInput in, int version) throws IOException + public SnapshotMessage deserialize(DataInputPlus in, int version) throws IOException { RepairJobDesc desc = RepairJobDesc.serializer.deserialize(in, version); return new SnapshotMessage(desc); http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/repair/messages/SyncComplete.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/SyncComplete.java b/src/java/org/apache/cassandra/repair/messages/SyncComplete.java index c9548ca..35cf5d4 100644 --- a/src/java/org/apache/cassandra/repair/messages/SyncComplete.java +++ b/src/java/org/apache/cassandra/repair/messages/SyncComplete.java @@ -17,11 +17,11 @@ */ package org.apache.cassandra.repair.messages; -import java.io.DataInput; import java.io.IOException; import java.net.InetAddress; import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.repair.NodePair; import org.apache.cassandra.repair.RepairJobDesc; @@ -62,7 +62,7 @@ public class SyncComplete extends RepairMessage out.writeBoolean(message.success); } - public SyncComplete deserialize(DataInput in, int version) throws IOException + public SyncComplete deserialize(DataInputPlus in, int version) throws IOException { RepairJobDesc desc = RepairJobDesc.serializer.deserialize(in, version); NodePair nodes = NodePair.serializer.deserialize(in, version); @@ -73,7 +73,7 @@ public class SyncComplete extends RepairMessage { long size = RepairJobDesc.serializer.serializedSize(message.desc, version); size += NodePair.serializer.serializedSize(message.nodes, version); - size += TypeSizes.NATIVE.sizeof(message.success); + size += TypeSizes.sizeof(message.success); return size; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/repair/messages/SyncRequest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/SyncRequest.java b/src/java/org/apache/cassandra/repair/messages/SyncRequest.java index 68aaf4d..2c9799e 100644 --- a/src/java/org/apache/cassandra/repair/messages/SyncRequest.java +++ b/src/java/org/apache/cassandra/repair/messages/SyncRequest.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.repair.messages; -import java.io.DataInput; import java.io.IOException; import java.net.InetAddress; import java.util.ArrayList; @@ -28,6 +27,7 @@ import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.CompactEndpointSerializationHelper; import org.apache.cassandra.net.MessagingService; @@ -73,7 +73,7 @@ public class SyncRequest extends RepairMessage } } - public SyncRequest deserialize(DataInput in, int version) throws IOException + public SyncRequest deserialize(DataInputPlus in, int version) throws IOException { RepairJobDesc desc = RepairJobDesc.serializer.deserialize(in, version); InetAddress owner = CompactEndpointSerializationHelper.deserialize(in); @@ -90,7 +90,7 @@ public class SyncRequest extends RepairMessage { long size = RepairJobDesc.serializer.serializedSize(message.desc, version); size += 3 * CompactEndpointSerializationHelper.serializedSize(message.initiator); - size += TypeSizes.NATIVE.sizeof(message.ranges.size()); + size += TypeSizes.sizeof(message.ranges.size()); for (Range<Token> range : message.ranges) size += AbstractBounds.tokenSerializer.serializedSize(range, version); return size; http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/repair/messages/ValidationComplete.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/ValidationComplete.java b/src/java/org/apache/cassandra/repair/messages/ValidationComplete.java index 8328979..ef0c4ec 100644 --- a/src/java/org/apache/cassandra/repair/messages/ValidationComplete.java +++ b/src/java/org/apache/cassandra/repair/messages/ValidationComplete.java @@ -17,10 +17,10 @@ */ package org.apache.cassandra.repair.messages; -import java.io.DataInput; import java.io.IOException; import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.repair.RepairJobDesc; import org.apache.cassandra.utils.MerkleTree; @@ -64,7 +64,7 @@ public class ValidationComplete extends RepairMessage MerkleTree.serializer.serialize(message.tree, out, version); } - public ValidationComplete deserialize(DataInput in, int version) throws IOException + public ValidationComplete deserialize(DataInputPlus in, int version) throws IOException { RepairJobDesc desc = RepairJobDesc.serializer.deserialize(in, version); if (in.readBoolean()) @@ -81,7 +81,7 @@ public class ValidationComplete extends RepairMessage public long serializedSize(ValidationComplete message, int version) { long size = RepairJobDesc.serializer.serializedSize(message.desc, version); - size += TypeSizes.NATIVE.sizeof(message.success); + size += TypeSizes.sizeof(message.success); if (message.success) size += MerkleTree.serializer.serializedSize(message.tree, version); return size; http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/repair/messages/ValidationRequest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/ValidationRequest.java b/src/java/org/apache/cassandra/repair/messages/ValidationRequest.java index 43bcf23..0dfab6a 100644 --- a/src/java/org/apache/cassandra/repair/messages/ValidationRequest.java +++ b/src/java/org/apache/cassandra/repair/messages/ValidationRequest.java @@ -17,10 +17,10 @@ */ package org.apache.cassandra.repair.messages; -import java.io.DataInput; import java.io.IOException; import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.repair.RepairJobDesc; @@ -73,7 +73,7 @@ public class ValidationRequest extends RepairMessage out.writeInt(message.gcBefore); } - public ValidationRequest deserialize(DataInput dis, int version) throws IOException + public ValidationRequest deserialize(DataInputPlus dis, int version) throws IOException { RepairJobDesc desc = RepairJobDesc.serializer.deserialize(dis, version); return new ValidationRequest(desc, dis.readInt()); @@ -82,7 +82,7 @@ public class ValidationRequest extends RepairMessage public long serializedSize(ValidationRequest message, int version) { long size = RepairJobDesc.serializer.serializedSize(message.desc, version); - size += TypeSizes.NATIVE.sizeof(message.gcBefore); + size += TypeSizes.sizeof(message.gcBefore); return size; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/service/CacheService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/CacheService.java b/src/java/org/apache/cassandra/service/CacheService.java index e82e8a4..6c25793 100644 --- a/src/java/org/apache/cassandra/service/CacheService.java +++ b/src/java/org/apache/cassandra/service/CacheService.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.service; -import java.io.DataInputStream; import java.io.IOException; import java.lang.management.ManagementFactory; import java.nio.ByteBuffer; @@ -29,6 +28,7 @@ import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; + import javax.management.MBeanServer; import javax.management.ObjectName; @@ -37,7 +37,6 @@ import com.google.common.util.concurrent.Futures; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.cassandra.cache.*; import org.apache.cassandra.cache.AutoSavingCache.CacheSerializer; import org.apache.cassandra.concurrent.Stage; @@ -52,6 +51,7 @@ import org.apache.cassandra.db.filter.*; import org.apache.cassandra.db.partitions.ArrayBackedCachedPartition; import org.apache.cassandra.db.partitions.CachedPartition; import org.apache.cassandra.db.context.CounterContext; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; @@ -362,7 +362,7 @@ public class CacheService implements CacheServiceMBean ByteBufferUtil.writeWithLength(key.cellName, out); } - public Future<Pair<CounterCacheKey, ClockAndCount>> deserialize(DataInputStream in, final ColumnFamilyStore cfs) throws IOException + public Future<Pair<CounterCacheKey, ClockAndCount>> deserialize(DataInputPlus in, final ColumnFamilyStore cfs) throws IOException { final ByteBuffer partitionKey = ByteBufferUtil.readWithLength(in); final ByteBuffer cellName = ByteBufferUtil.readWithLength(in); @@ -416,7 +416,7 @@ public class CacheService implements CacheServiceMBean ByteBufferUtil.writeWithLength(key.key, out); } - public Future<Pair<RowCacheKey, IRowCacheEntry>> deserialize(DataInputStream in, final ColumnFamilyStore cfs) throws IOException + public Future<Pair<RowCacheKey, IRowCacheEntry>> deserialize(DataInputPlus in, final ColumnFamilyStore cfs) throws IOException { final ByteBuffer buffer = ByteBufferUtil.readWithLength(in); final int rowsToCache = cfs.metadata.getCaching().rowCache.rowsToCache; @@ -455,7 +455,7 @@ public class CacheService implements CacheServiceMBean key.desc.getFormat().getIndexSerializer(cfm, key.desc.version, SerializationHeader.forKeyCache(cfm)).serialize(entry, out); } - public Future<Pair<KeyCacheKey, RowIndexEntry>> deserialize(DataInputStream input, ColumnFamilyStore cfs) throws IOException + public Future<Pair<KeyCacheKey, RowIndexEntry>> deserialize(DataInputPlus input, ColumnFamilyStore cfs) throws IOException { int keyLength = input.readInt(); if (keyLength > FBUtilities.MAX_UNSIGNED_SHORT) http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/service/MigrationManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java index 9deb313..d979c02 100644 --- a/src/java/org/apache/cassandra/service/MigrationManager.java +++ b/src/java/org/apache/cassandra/service/MigrationManager.java @@ -17,21 +17,15 @@ */ package org.apache.cassandra.service; -import java.io.DataInput; import java.io.IOException; import java.net.InetAddress; import java.util.*; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; import java.util.concurrent.*; - import java.lang.management.ManagementFactory; import java.lang.management.RuntimeMXBean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; @@ -45,6 +39,7 @@ import org.apache.cassandra.exceptions.AlreadyExistsException; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.gms.*; import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; @@ -521,7 +516,7 @@ public class MigrationManager Mutation.serializer.serialize(mutation, out, version); } - public Collection<Mutation> deserialize(DataInput in, int version) throws IOException + public Collection<Mutation> deserialize(DataInputPlus in, int version) throws IOException { int count = in.readInt(); Collection<Mutation> schema = new ArrayList<>(count); @@ -534,7 +529,7 @@ public class MigrationManager public long serializedSize(Collection<Mutation> schema, int version) { - int size = TypeSizes.NATIVE.sizeof(schema.size()); + int size = TypeSizes.sizeof(schema.size()); for (Mutation mutation : schema) size += Mutation.serializer.serializedSize(mutation, version); return size; http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/service/paxos/Commit.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/paxos/Commit.java b/src/java/org/apache/cassandra/service/paxos/Commit.java index 6077166..9a5e619 100644 --- a/src/java/org/apache/cassandra/service/paxos/Commit.java +++ b/src/java/org/apache/cassandra/service/paxos/Commit.java @@ -1,6 +1,6 @@ package org.apache.cassandra.service.paxos; /* - * + * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -8,20 +8,19 @@ package org.apache.cassandra.service.paxos; * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - * + * */ -import java.io.DataInput; import java.io.IOException; import java.util.UUID; @@ -33,6 +32,7 @@ import org.apache.cassandra.db.*; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.StorageService; @@ -164,7 +164,7 @@ public class Commit PartitionUpdate.serializer.serialize(commit.update, out, version); } - public Commit deserialize(DataInput in, int version) throws IOException + public Commit deserialize(DataInputPlus in, int version) throws IOException { DecoratedKey key = null; if (version < MessagingService.VERSION_30) @@ -177,15 +177,13 @@ public class Commit public long serializedSize(Commit commit, int version) { - TypeSizes sizes = TypeSizes.NATIVE; - int size = 0; if (version < MessagingService.VERSION_30) - size += ByteBufferUtil.serializedSizeWithShortLength(commit.update.partitionKey().getKey(), sizes); + size += ByteBufferUtil.serializedSizeWithShortLength(commit.update.partitionKey().getKey()); return size + UUIDSerializer.serializer.serializedSize(commit.ballot, version) - + PartitionUpdate.serializer.serializedSize(commit.update, version, sizes); + + PartitionUpdate.serializer.serializedSize(commit.update, version); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java b/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java index bf07402..f843b8d 100644 --- a/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java +++ b/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java @@ -1,6 +1,6 @@ package org.apache.cassandra.service.paxos; /* - * + * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -8,20 +8,19 @@ package org.apache.cassandra.service.paxos; * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - * + * */ -import java.io.DataInput; import java.io.IOException; import java.util.UUID; @@ -29,6 +28,7 @@ import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.db.rows.SerializationHelper; import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.utils.UUIDSerializer; @@ -81,7 +81,7 @@ public class PrepareResponse } } - public PrepareResponse deserialize(DataInput in, int version) throws IOException + public PrepareResponse deserialize(DataInputPlus in, int version) throws IOException { boolean success = in.readBoolean(); Commit inProgress = Commit.serializer.deserialize(in, version); @@ -101,14 +101,13 @@ public class PrepareResponse public long serializedSize(PrepareResponse response, int version) { - TypeSizes sizes = TypeSizes.NATIVE; - long size = sizes.sizeof(response.promised) + long size = TypeSizes.sizeof(response.promised) + Commit.serializer.serializedSize(response.inProgressCommit, version); if (version < MessagingService.VERSION_30) { size += UUIDSerializer.serializer.serializedSize(response.mostRecentCommit.ballot, version); - size += PartitionUpdate.serializer.serializedSize(response.mostRecentCommit.update, version, sizes); + size += PartitionUpdate.serializer.serializedSize(response.mostRecentCommit.update, version); } else { http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/streaming/StreamReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java index 66eb220..2876f08 100644 --- a/src/java/org/apache/cassandra/streaming/StreamReader.java +++ b/src/java/org/apache/cassandra/streaming/StreamReader.java @@ -25,8 +25,8 @@ import java.util.Collection; import java.util.UUID; import com.google.common.base.Throwables; - import com.google.common.collect.UnmodifiableIterator; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,11 +42,11 @@ import org.apache.cassandra.io.sstable.SSTableSimpleIterator; import org.apache.cassandra.io.sstable.format.SSTableFormat; import org.apache.cassandra.io.sstable.format.SSTableWriter; import org.apache.cassandra.io.sstable.format.Version; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.streaming.messages.FileMessageHeader; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.BytesReadTracker; -import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; @@ -174,7 +174,7 @@ public class StreamReader public static class StreamDeserializer extends UnmodifiableIterator<Unfiltered> implements UnfilteredRowIterator { private final CFMetaData metadata; - private final DataInput in; + private final DataInputPlus in; private final SerializationHeader header; private final SerializationHelper helper; @@ -186,7 +186,7 @@ public class StreamReader private final CounterFilteredRow counterRow; - public StreamDeserializer(CFMetaData metadata, DataInput in, Version version, SerializationHeader header) + public StreamDeserializer(CFMetaData metadata, DataInputPlus in, Version version, SerializationHeader header) { assert version.storeRows() : "We don't allow streaming from pre-3.0 nodes"; this.metadata = metadata; http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/streaming/StreamRequest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamRequest.java b/src/java/org/apache/cassandra/streaming/StreamRequest.java index 0fe40cf..93726e7 100644 --- a/src/java/org/apache/cassandra/streaming/StreamRequest.java +++ b/src/java/org/apache/cassandra/streaming/StreamRequest.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.streaming; -import java.io.DataInput; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -28,6 +27,7 @@ import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.MessagingService; @@ -65,7 +65,7 @@ public class StreamRequest out.writeUTF(cf); } - public StreamRequest deserialize(DataInput in, int version) throws IOException + public StreamRequest deserialize(DataInputPlus in, int version) throws IOException { String keyspace = in.readUTF(); long repairedAt = in.readLong(); @@ -86,17 +86,17 @@ public class StreamRequest public long serializedSize(StreamRequest request, int version) { - int size = TypeSizes.NATIVE.sizeof(request.keyspace); - size += TypeSizes.NATIVE.sizeof(request.repairedAt); - size += TypeSizes.NATIVE.sizeof(request.ranges.size()); + int size = TypeSizes.sizeof(request.keyspace); + size += TypeSizes.sizeof(request.repairedAt); + size += TypeSizes.sizeof(request.ranges.size()); for (Range<Token> range : request.ranges) { size += Token.serializer.serializedSize(range.left, version); size += Token.serializer.serializedSize(range.right, version); } - size += TypeSizes.NATIVE.sizeof(request.columnFamilies.size()); + size += TypeSizes.sizeof(request.columnFamilies.size()); for (String cf : request.columnFamilies) - size += TypeSizes.NATIVE.sizeof(cf); + size += TypeSizes.sizeof(cf); return size; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/streaming/StreamSummary.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamSummary.java b/src/java/org/apache/cassandra/streaming/StreamSummary.java index dc332cb..c427283 100644 --- a/src/java/org/apache/cassandra/streaming/StreamSummary.java +++ b/src/java/org/apache/cassandra/streaming/StreamSummary.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.streaming; -import java.io.DataInput; import java.io.IOException; import java.io.Serializable; import java.util.UUID; @@ -26,6 +25,7 @@ import com.google.common.base.Objects; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.utils.UUIDSerializer; @@ -88,7 +88,7 @@ public class StreamSummary implements Serializable out.writeLong(summary.totalSize); } - public StreamSummary deserialize(DataInput in, int version) throws IOException + public StreamSummary deserialize(DataInputPlus in, int version) throws IOException { UUID cfId = UUIDSerializer.serializer.deserialize(in, MessagingService.current_version); int files = in.readInt(); @@ -99,8 +99,8 @@ public class StreamSummary implements Serializable public long serializedSize(StreamSummary summary, int version) { long size = UUIDSerializer.serializer.serializedSize(summary.cfId, MessagingService.current_version); - size += TypeSizes.NATIVE.sizeof(summary.files); - size += TypeSizes.NATIVE.sizeof(summary.totalSize); + size += TypeSizes.sizeof(summary.files); + size += TypeSizes.sizeof(summary.totalSize); return size; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/streaming/compress/CompressionInfo.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressionInfo.java b/src/java/org/apache/cassandra/streaming/compress/CompressionInfo.java index 907a1c7..924a656 100644 --- a/src/java/org/apache/cassandra/streaming/compress/CompressionInfo.java +++ b/src/java/org/apache/cassandra/streaming/compress/CompressionInfo.java @@ -17,13 +17,13 @@ */ package org.apache.cassandra.streaming.compress; -import java.io.DataInput; import java.io.IOException; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.compress.CompressionMetadata; import org.apache.cassandra.io.compress.CompressionParameters; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; /** @@ -61,7 +61,7 @@ public class CompressionInfo CompressionParameters.serializer.serialize(info.parameters, out, version); } - public CompressionInfo deserialize(DataInput in, int version) throws IOException + public CompressionInfo deserialize(DataInputPlus in, int version) throws IOException { // chunks int chunkCount = in.readInt(); @@ -80,11 +80,11 @@ public class CompressionInfo public long serializedSize(CompressionInfo info, int version) { if (info == null) - return TypeSizes.NATIVE.sizeof(-1); + return TypeSizes.sizeof(-1); // chunks int chunkCount = info.chunks.length; - long size = TypeSizes.NATIVE.sizeof(chunkCount); + long size = TypeSizes.sizeof(chunkCount); for (int i = 0; i < chunkCount; i++) size += CompressionMetadata.Chunk.serializer.serializedSize(info.chunks[i], version); // compression params http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java index b8e7979..04d65d7 100644 --- a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java +++ b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.streaming.messages; -import java.io.DataInput; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -30,6 +29,7 @@ import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.compress.CompressionMetadata; import org.apache.cassandra.io.sstable.format.SSTableFormat; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.io.sstable.format.Version; import org.apache.cassandra.net.MessagingService; @@ -165,7 +165,7 @@ public class FileMessageHeader SerializationHeader.serializer.serialize(header.header, out); } - public FileMessageHeader deserialize(DataInput in, int version) throws IOException + public FileMessageHeader deserialize(DataInputPlus in, int version) throws IOException { UUID cfId = UUIDSerializer.serializer.deserialize(in, MessagingService.current_version); int sequenceNumber = in.readInt(); @@ -193,22 +193,22 @@ public class FileMessageHeader public long serializedSize(FileMessageHeader header, int version) { long size = UUIDSerializer.serializer.serializedSize(header.cfId, version); - size += TypeSizes.NATIVE.sizeof(header.sequenceNumber); - size += TypeSizes.NATIVE.sizeof(header.version.toString()); + size += TypeSizes.sizeof(header.sequenceNumber); + size += TypeSizes.sizeof(header.version.toString()); if (version >= StreamMessage.VERSION_22) - size += TypeSizes.NATIVE.sizeof(header.format.name); + size += TypeSizes.sizeof(header.format.name); - size += TypeSizes.NATIVE.sizeof(header.estimatedKeys); + size += TypeSizes.sizeof(header.estimatedKeys); - size += TypeSizes.NATIVE.sizeof(header.sections.size()); + size += TypeSizes.sizeof(header.sections.size()); for (Pair<Long, Long> section : header.sections) { - size += TypeSizes.NATIVE.sizeof(section.left); - size += TypeSizes.NATIVE.sizeof(section.right); + size += TypeSizes.sizeof(section.left); + size += TypeSizes.sizeof(section.right); } size += CompressionInfo.serializer.serializedSize(header.compressionInfo, version); - size += TypeSizes.NATIVE.sizeof(header.sstableLevel); + size += TypeSizes.sizeof(header.sstableLevel); if (version >= StreamMessage.VERSION_30) size += SerializationHeader.serializer.serializedSize(header.header); http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java index fdfb32e..bce9691 100644 --- a/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java @@ -23,6 +23,8 @@ import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; import org.apache.cassandra.io.sstable.format.SSTableWriter; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus; import org.apache.cassandra.io.util.DataOutputStreamPlus; import org.apache.cassandra.streaming.StreamReader; import org.apache.cassandra.streaming.StreamSession; @@ -39,7 +41,7 @@ public class IncomingFileMessage extends StreamMessage @SuppressWarnings("resource") public IncomingFileMessage deserialize(ReadableByteChannel in, int version, StreamSession session) throws IOException { - DataInputStream input = new DataInputStream(Channels.newInputStream(in)); + DataInputPlus input = new DataInputStreamPlus(Channels.newInputStream(in)); FileMessageHeader header = FileMessageHeader.serializer.deserialize(input, version); StreamReader reader = header.compressionInfo == null ? new StreamReader(header, session) : new CompressedStreamReader(header, session); http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/streaming/messages/PrepareMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/PrepareMessage.java b/src/java/org/apache/cassandra/streaming/messages/PrepareMessage.java index 004df18..7ce1a2a 100644 --- a/src/java/org/apache/cassandra/streaming/messages/PrepareMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/PrepareMessage.java @@ -23,6 +23,8 @@ import java.nio.channels.ReadableByteChannel; import java.util.ArrayList; import java.util.Collection; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus; import org.apache.cassandra.io.util.DataOutputStreamPlus; import org.apache.cassandra.streaming.StreamRequest; import org.apache.cassandra.streaming.StreamSession; @@ -34,7 +36,7 @@ public class PrepareMessage extends StreamMessage { public PrepareMessage deserialize(ReadableByteChannel in, int version, StreamSession session) throws IOException { - DataInput input = new DataInputStream(Channels.newInputStream(in)); + DataInputPlus input = new DataInputStreamPlus(Channels.newInputStream(in)); PrepareMessage message = new PrepareMessage(); // requests int numRequests = input.readInt(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java b/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java index 1255947..71c8c2a 100644 --- a/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java @@ -22,6 +22,8 @@ import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; import java.util.UUID; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus; import org.apache.cassandra.io.util.DataOutputStreamPlus; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.streaming.StreamSession; @@ -33,7 +35,7 @@ public class ReceivedMessage extends StreamMessage { public ReceivedMessage deserialize(ReadableByteChannel in, int version, StreamSession session) throws IOException { - DataInput input = new DataInputStream(Channels.newInputStream(in)); + DataInputPlus input = new DataInputStreamPlus(Channels.newInputStream(in)); return new ReceivedMessage(UUIDSerializer.serializer.deserialize(input, MessagingService.current_version), input.readInt()); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java b/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java index 29e84bf..fa9c30f 100644 --- a/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java @@ -22,6 +22,8 @@ import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; import java.util.UUID; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus; import org.apache.cassandra.io.util.DataOutputStreamPlus; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.streaming.StreamSession; @@ -33,7 +35,7 @@ public class RetryMessage extends StreamMessage { public RetryMessage deserialize(ReadableByteChannel in, int version, StreamSession session) throws IOException { - DataInput input = new DataInputStream(Channels.newInputStream(in)); + DataInputPlus input = new DataInputStreamPlus(Channels.newInputStream(in)); return new RetryMessage(UUIDSerializer.serializer.deserialize(input, MessagingService.current_version), input.readInt()); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java index e8b3f82..6d807e9 100644 --- a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.streaming.messages; -import java.io.DataInput; import java.io.IOException; import java.net.InetAddress; import java.nio.ByteBuffer; @@ -25,6 +24,7 @@ import java.util.UUID; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.io.util.DataOutputBufferFixed; import org.apache.cassandra.io.util.DataOutputPlus; @@ -116,7 +116,7 @@ public class StreamInitMessage out.writeBoolean(message.isIncremental); } - public StreamInitMessage deserialize(DataInput in, int version) throws IOException + public StreamInitMessage deserialize(DataInputPlus in, int version) throws IOException { InetAddress from = CompactEndpointSerializationHelper.deserialize(in); int sessionIndex = in.readInt(); @@ -131,12 +131,12 @@ public class StreamInitMessage public long serializedSize(StreamInitMessage message, int version) { long size = CompactEndpointSerializationHelper.serializedSize(message.from); - size += TypeSizes.NATIVE.sizeof(message.sessionIndex); + size += TypeSizes.sizeof(message.sessionIndex); size += UUIDSerializer.serializer.serializedSize(message.planId, MessagingService.current_version); - size += TypeSizes.NATIVE.sizeof(message.description); - size += TypeSizes.NATIVE.sizeof(message.isForOutgoing); - size += TypeSizes.NATIVE.sizeof(message.keepSSTableLevel); - size += TypeSizes.NATIVE.sizeof(message.isIncremental); + size += TypeSizes.sizeof(message.description); + size += TypeSizes.sizeof(message.isForOutgoing); + size += TypeSizes.sizeof(message.keepSSTableLevel); + size += TypeSizes.sizeof(message.isIncremental); return size; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/utils/BloomFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/BloomFilter.java b/src/java/org/apache/cassandra/utils/BloomFilter.java index 9de202c..dbd489f 100644 --- a/src/java/org/apache/cassandra/utils/BloomFilter.java +++ b/src/java/org/apache/cassandra/utils/BloomFilter.java @@ -21,7 +21,6 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.cassandra.utils.concurrent.WrappedSharedCloseable; import org.apache.cassandra.utils.obs.IBitSet; -import org.apache.cassandra.db.TypeSizes; public class BloomFilter extends WrappedSharedCloseable implements IFilter { @@ -54,7 +53,7 @@ public class BloomFilter extends WrappedSharedCloseable implements IFilter public long serializedSize() { - return serializer.serializedSize(this, TypeSizes.NATIVE); + return serializer.serializedSize(this); } // Murmur is faster than an SHA-based approach and provides as-good collision http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java b/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java index 6f80ac0..00bb153 100644 --- a/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java +++ b/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java @@ -22,6 +22,7 @@ import java.io.IOException; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.ISerializer; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.utils.obs.IBitSet; import org.apache.cassandra.utils.obs.OffHeapBitSet; @@ -35,7 +36,7 @@ class BloomFilterSerializer implements ISerializer<BloomFilter> bf.bitset.serialize(out); } - public BloomFilter deserialize(DataInput in) throws IOException + public BloomFilter deserialize(DataInputPlus in) throws IOException { return deserialize(in, false); } @@ -55,16 +56,15 @@ class BloomFilterSerializer implements ISerializer<BloomFilter> /** * Calculates a serialized size of the given Bloom Filter - * @see org.apache.cassandra.io.ISerializer#serialize(Object, org.apache.cassandra.io.util.DataOutputPlus) - * * @param bf Bloom filter to calculate serialized size + * @see org.apache.cassandra.io.ISerializer#serialize(Object, org.apache.cassandra.io.util.DataOutputPlus) * * @return serialized size of the given bloom filter */ - public long serializedSize(BloomFilter bf, TypeSizes typeSizes) + public long serializedSize(BloomFilter bf) { - int size = typeSizes.sizeof(bf.hashCount); // hash count - size += bf.bitset.serializedSize(typeSizes); + int size = TypeSizes.sizeof(bf.hashCount); // hash count + size += bf.bitset.serializedSize(); return size; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/utils/BooleanSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/BooleanSerializer.java b/src/java/org/apache/cassandra/utils/BooleanSerializer.java index 8f3abde..1fe7702 100644 --- a/src/java/org/apache/cassandra/utils/BooleanSerializer.java +++ b/src/java/org/apache/cassandra/utils/BooleanSerializer.java @@ -17,10 +17,10 @@ */ package org.apache.cassandra.utils; -import java.io.DataInput; import java.io.IOException; import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; public class BooleanSerializer implements IVersionedSerializer<Boolean> @@ -32,7 +32,7 @@ public class BooleanSerializer implements IVersionedSerializer<Boolean> out.writeBoolean(b); } - public Boolean deserialize(DataInput in, int version) throws IOException + public Boolean deserialize(DataInputPlus in, int version) throws IOException { return in.readBoolean(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/utils/ByteBufferUtil.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java index 69915bf..65ed23c 100644 --- a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java +++ b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java @@ -323,10 +323,10 @@ public class ByteBufferUtil return ByteBufferUtil.read(in, length); } - public static int serializedSizeWithLength(ByteBuffer buffer, TypeSizes sizes) + public static int serializedSizeWithLength(ByteBuffer buffer) { int size = buffer.remaining(); - return sizes.sizeof(size) + size; + return TypeSizes.sizeof(size) + size; } /* @return An unsigned short in an integer. */ @@ -345,10 +345,10 @@ public class ByteBufferUtil return ByteBufferUtil.read(in, readShortLength(in)); } - public static int serializedSizeWithShortLength(ByteBuffer buffer, TypeSizes sizes) + public static int serializedSizeWithShortLength(ByteBuffer buffer) { int size = buffer.remaining(); - return sizes.sizeof((short)size) + size; + return TypeSizes.sizeof((short)size) + size; } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/utils/BytesReadTracker.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/BytesReadTracker.java b/src/java/org/apache/cassandra/utils/BytesReadTracker.java index f363513..0da51a5 100644 --- a/src/java/org/apache/cassandra/utils/BytesReadTracker.java +++ b/src/java/org/apache/cassandra/utils/BytesReadTracker.java @@ -21,10 +21,12 @@ import java.io.DataInput; import java.io.DataInputStream; import java.io.IOException; +import org.apache.cassandra.io.util.DataInputPlus; + /** * This class is to track bytes read from given DataInput */ -public class BytesReadTracker implements DataInput +public class BytesReadTracker implements DataInputPlus { private long bytesRead; http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/utils/EstimatedHistogram.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/EstimatedHistogram.java b/src/java/org/apache/cassandra/utils/EstimatedHistogram.java index a5c51c8..89fbe4e 100644 --- a/src/java/org/apache/cassandra/utils/EstimatedHistogram.java +++ b/src/java/org/apache/cassandra/utils/EstimatedHistogram.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.utils; -import java.io.DataInput; import java.io.IOException; import java.util.Arrays; import java.util.concurrent.atomic.AtomicLongArray; @@ -26,8 +25,8 @@ import com.google.common.base.Objects; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.ISerializer; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; - import org.slf4j.Logger; public class EstimatedHistogram @@ -330,7 +329,7 @@ public class EstimatedHistogram } } - public EstimatedHistogram deserialize(DataInput in) throws IOException + public EstimatedHistogram deserialize(DataInputPlus in) throws IOException { int size = in.readInt(); long[] offsets = new long[size - 1]; @@ -343,17 +342,17 @@ public class EstimatedHistogram return new EstimatedHistogram(offsets, buckets); } - public long serializedSize(EstimatedHistogram eh, TypeSizes typeSizes) + public long serializedSize(EstimatedHistogram eh) { int size = 0; long[] offsets = eh.getBucketOffsets(); long[] buckets = eh.getBuckets(false); - size += typeSizes.sizeof(buckets.length); + size += TypeSizes.sizeof(buckets.length); for (int i = 0; i < buckets.length; i++) { - size += typeSizes.sizeof(offsets[i == 0 ? 0 : i - 1]); - size += typeSizes.sizeof(buckets[i]); + size += TypeSizes.sizeof(offsets[i == 0 ? 0 : i - 1]); + size += TypeSizes.sizeof(buckets[i]); } return size; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/utils/IntervalTree.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/IntervalTree.java b/src/java/org/apache/cassandra/utils/IntervalTree.java index e857ee7..f40e9a3 100644 --- a/src/java/org/apache/cassandra/utils/IntervalTree.java +++ b/src/java/org/apache/cassandra/utils/IntervalTree.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.utils; -import java.io.DataInput; import java.io.IOException; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; @@ -26,12 +25,13 @@ import java.util.*; import com.google.common.base.Joiner; import com.google.common.collect.AbstractIterator; import com.google.common.collect.Iterators; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.ISerializer; import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.utils.AsymmetricOrdering.Op; @@ -329,12 +329,12 @@ public class IntervalTree<C extends Comparable<? super C>, D, I extends Interval * tree is to use a custom comparator, as the comparator is *not* * serialized. */ - public IntervalTree<C, D, I> deserialize(DataInput in, int version) throws IOException + public IntervalTree<C, D, I> deserialize(DataInputPlus in, int version) throws IOException { return deserialize(in, version, null); } - public IntervalTree<C, D, I> deserialize(DataInput in, int version, Comparator<C> comparator) throws IOException + public IntervalTree<C, D, I> deserialize(DataInputPlus in, int version, Comparator<C> comparator) throws IOException { try { @@ -355,21 +355,16 @@ public class IntervalTree<C extends Comparable<? super C>, D, I extends Interval } } - public long serializedSize(IntervalTree<C, D, I> it, TypeSizes typeSizes, int version) + public long serializedSize(IntervalTree<C, D, I> it, int version) { - long size = typeSizes.sizeof(0); + long size = TypeSizes.sizeof(0); for (Interval<C, D> interval : it) { - size += pointSerializer.serializedSize(interval.min, typeSizes); - size += pointSerializer.serializedSize(interval.max, typeSizes); - size += dataSerializer.serializedSize(interval.data, typeSizes); + size += pointSerializer.serializedSize(interval.min); + size += pointSerializer.serializedSize(interval.max); + size += dataSerializer.serializedSize(interval.data); } return size; } - - public long serializedSize(IntervalTree<C, D, I> it, int version) - { - return serializedSize(it, TypeSizes.NATIVE, version); - } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/utils/MerkleTree.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/MerkleTree.java b/src/java/org/apache/cassandra/utils/MerkleTree.java index 4fec62d..3840622 100644 --- a/src/java/org/apache/cassandra/utils/MerkleTree.java +++ b/src/java/org/apache/cassandra/utils/MerkleTree.java @@ -33,6 +33,7 @@ import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; /** @@ -91,7 +92,7 @@ public class MerkleTree implements Serializable Hashable.serializer.serialize(mt.root, out, version); } - public MerkleTree deserialize(DataInput in, int version) throws IOException + public MerkleTree deserialize(DataInputPlus in, int version) throws IOException { byte hashdepth = in.readByte(); long maxsize = in.readLong(); @@ -120,9 +121,9 @@ public class MerkleTree implements Serializable public long serializedSize(MerkleTree mt, int version) { long size = 1 // mt.hashdepth - + TypeSizes.NATIVE.sizeof(mt.maxsize) - + TypeSizes.NATIVE.sizeof(mt.size) - + TypeSizes.NATIVE.sizeof(mt.partitioner.getClass().getCanonicalName()); + + TypeSizes.sizeof(mt.maxsize) + + TypeSizes.sizeof(mt.size) + + TypeSizes.sizeof(mt.partitioner.getClass().getCanonicalName()); // full range size += Token.serializer.serializedSize(mt.fullRange.left, version); @@ -843,8 +844,8 @@ public class MerkleTree implements Serializable public long serializedSize(Inner inner, int version) { int size = inner.hash == null - ? TypeSizes.NATIVE.sizeof(-1) - : TypeSizes.NATIVE.sizeof(inner.hash().length) + inner.hash().length; + ? TypeSizes.sizeof(-1) + : TypeSizes.sizeof(inner.hash().length) + inner.hash().length; size += Token.serializer.serializedSize(inner.token, version) + Hashable.serializer.serializedSize(inner.lchild, version) @@ -920,8 +921,8 @@ public class MerkleTree implements Serializable public long serializedSize(Leaf leaf, int version) { return leaf.hash == null - ? TypeSizes.NATIVE.sizeof(-1) - : TypeSizes.NATIVE.sizeof(leaf.hash().length) + leaf.hash().length; + ? TypeSizes.sizeof(-1) + : TypeSizes.sizeof(leaf.hash().length) + leaf.hash().length; } } }