This is an automated email from the ASF dual-hosted git repository. jake pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new 227409d920 Add Mutation Serialization Caching 227409d920 is described below commit 227409d9201fa1aeb9f80b22f499577aedfe25bc Author: Jake Luciani <j...@datastax.com> AuthorDate: Mon Dec 19 16:05:12 2022 -0500 Add Mutation Serialization Caching Patch by T Jake Luciani; Reviewed by Josh McKenzie for CASSANDRA-17998 Co-authored-by: T Jake Luciani (j...@apache.org) Co-authored-by: Mike Adamson (mikeat...@gmail.com) Co-authored-by: Robert Stupp (sn...@apache.org) --- CHANGES.txt | 1 + .../config/CassandraRelevantProperties.java | 3 + src/java/org/apache/cassandra/db/Mutation.java | 202 ++++++++++++++++-- .../apache/cassandra/io/util/TeeDataInputPlus.java | 225 +++++++++++++++++++++ .../org/apache/cassandra/net/MessagingService.java | 20 ++ .../org/apache/cassandra/service/StorageProxy.java | 9 + .../cassandra/utils/TeeDataInputPlusTest.java | 141 +++++++++++++ 7 files changed, 580 insertions(+), 21 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 621d84a20d..11de4309e8 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.2 + * Add Mutation Serialization Caching (CASSANDRA-17998) * Only reload compaction strategies if disk boundaries change (CASSANDRA-17874) * CEP-10: Simulator Java11 Support (CASSANDRA-17178) * Set the major compaction type correctly for compactionstats (CASSANDRA-18055) diff --git a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java index 8ab3cb7e48..41de975475 100644 --- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java +++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java @@ -238,6 +238,9 @@ public enum CassandraRelevantProperties SYSTEM_TRACES_DEFAULT_RF("cassandra.system_traces.default_rf", "2"), SYSTEM_DISTRIBUTED_DEFAULT_RF("cassandra.system_distributed.default_rf", "3"), + /** Represents the maximum size (in bytes) of a serialized mutation that can be cached **/ + CACHEABLE_MUTATION_SIZE_LIMIT("cassandra.cacheable_mutation_size_limit_bytes", Long.toString(1_000_000)), + MEMTABLE_OVERHEAD_SIZE("cassandra.memtable.row_overhead_size", "-1"), MEMTABLE_OVERHEAD_COMPUTE_STEPS("cassandra.memtable_row_overhead_computation_step", "100000"), MEMTABLE_TRIE_SIZE_LIMIT("cassandra.trie_size_limit_mb"), diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java index 7b6a68670f..b9ac32498f 100644 --- a/src/java/org/apache/cassandra/db/Mutation.java +++ b/src/java/org/apache/cassandra/db/Mutation.java @@ -23,20 +23,28 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableCollection; import com.google.common.collect.ImmutableMap; import org.apache.commons.lang3.StringUtils; +import org.apache.cassandra.concurrent.Stage; +import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.db.rows.DeserializationHelper; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.io.util.TeeDataInputPlus; +import org.apache.cassandra.locator.ReplicaPlan; +import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.service.AbstractWriteResponseHandler; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.concurrent.Future; @@ -64,6 +72,15 @@ public class Mutation implements IMutation, Supplier<Mutation> private final boolean cdcEnabled; + private static final int SERIALIZATION_VERSION_COUNT = MessagingService.Version.values().length; + // Contains serialized representations of this mutation. + // Note: there is no functionality to clear/remove serialized instances, because a mutation must never + // be modified (e.g. calling add(PartitionUpdate)) when it's being serialized. + private final Serialization[] cachedSerializations = new Serialization[SERIALIZATION_VERSION_COUNT]; + + /** @see CassandraRelevantProperties#CACHEABLE_MUTATION_SIZE_LIMIT */ + private static final long CACHEABLE_MUTATION_SIZE_LIMIT = CassandraRelevantProperties.CACHEABLE_MUTATION_SIZE_LIMIT.getLong(); + public Mutation(PartitionUpdate update) { this(update.metadata().keyspace, update.partitionKey(), ImmutableMap.of(update.metadata().id, update), approxTime.now(), update.metadata().params.cdc); @@ -298,6 +315,7 @@ public class Mutation implements IMutation, Supplier<Mutation> } return buff.append("])").toString(); } + private int serializedSize30; private int serializedSize3014; private int serializedSize40; @@ -393,34 +411,115 @@ public class Mutation implements IMutation, Supplier<Mutation> { public void serialize(Mutation mutation, DataOutputPlus out, int version) throws IOException { + serialization(mutation, version).serialize(PartitionUpdate.serializer, mutation, out, version); + } + + /** + * Called early during request processing to prevent that {@link #serialization(Mutation, int)} is + * called concurrently. + * See {@link org.apache.cassandra.service.StorageProxy#sendToHintedReplicas(Mutation, ReplicaPlan.ForWrite, AbstractWriteResponseHandler, String, Stage)} + */ + @SuppressWarnings("JavadocReference") + public void prepareSerializedBuffer(Mutation mutation, int version) + { + serialization(mutation, version); + } + + /** + * Retrieve the cached serialization of this mutation, or compute and cache said serialization if it doesn't + * exist yet. Note that this method is _not_ synchronized even though it may (and will often) be called + * concurrently. Concurrent calls are still safe however, the only risk is that the value is not cached yet, + * multiple concurrent calls may compute it multiple times instead of just once. This is ok as in practice + * as we make sure this doesn't happen in the hot path by forcing the initial caching in + * {@link org.apache.cassandra.service.StorageProxy#sendToHintedReplicas(Mutation, ReplicaPlan.ForWrite, AbstractWriteResponseHandler, String, Stage)} + * via {@link #prepareSerializedBuffer(Mutation)}, which is the only caller that passes + * {@code isPrepare==true}. + */ + @SuppressWarnings("JavadocReference") + private Serialization serialization(Mutation mutation, int version) + { + int versionOrdinal = MessagingService.getVersionOrdinal(version); + // Retrieves the cached version, or build+cache it if it's not cached already. + Serialization serialization = mutation.cachedSerializations[versionOrdinal]; + if (serialization == null) + { + serialization = new SizeOnlyCacheableSerialization(); + long serializedSize = serialization.serializedSize(PartitionUpdate.serializer, mutation, version); + + // Excessively large mutation objects cause GC pressure and huge allocations when serialized. + // so we only cache serialized mutations when they are below the defined limit. + if (serializedSize < CACHEABLE_MUTATION_SIZE_LIMIT) + { + try (DataOutputBuffer dob = DataOutputBuffer.scratchBuffer.get()) + { + serializeInternal(PartitionUpdate.serializer, mutation, dob, version); + serialization = new CachedSerialization(dob.toByteArray()); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + mutation.cachedSerializations[versionOrdinal] = serialization; + } + + return serialization; + } + + static void serializeInternal(PartitionUpdate.PartitionUpdateSerializer serializer, + Mutation mutation, + DataOutputPlus out, + int version) throws IOException + { + Map<TableId, PartitionUpdate> modifications = mutation.modifications; + /* serialize the modifications in the mutation */ - int size = mutation.modifications.size(); + int size = modifications.size(); out.writeUnsignedVInt(size); assert size > 0; - for (Map.Entry<TableId, PartitionUpdate> entry : mutation.modifications.entrySet()) - PartitionUpdate.serializer.serialize(entry.getValue(), out, version); + for (PartitionUpdate partitionUpdate : modifications.values()) + { + serializer.serialize(partitionUpdate, out, version); + } } public Mutation deserialize(DataInputPlus in, int version, DeserializationHelper.Flag flag) throws IOException { - int size = (int)in.readUnsignedVInt(); - assert size > 0; - - PartitionUpdate update = PartitionUpdate.serializer.deserialize(in, version, flag); - if (size == 1) - return new Mutation(update); - - ImmutableMap.Builder<TableId, PartitionUpdate> modifications = new ImmutableMap.Builder<>(); - DecoratedKey dk = update.partitionKey(); - - modifications.put(update.metadata().id, update); - for (int i = 1; i < size; ++i) + Mutation m; + TeeDataInputPlus teeIn; + try (DataOutputBuffer dob = DataOutputBuffer.scratchBuffer.get()) { - update = PartitionUpdate.serializer.deserialize(in, version, flag); - modifications.put(update.metadata().id, update); + teeIn = new TeeDataInputPlus(in, dob, CACHEABLE_MUTATION_SIZE_LIMIT); + + int size = (int) teeIn.readUnsignedVInt(); + assert size > 0; + + PartitionUpdate update = PartitionUpdate.serializer.deserialize(teeIn, version, flag); + if (size == 1) + { + m = new Mutation(update); + } + else + { + ImmutableMap.Builder<TableId, PartitionUpdate> modifications = new ImmutableMap.Builder<>(); + DecoratedKey dk = update.partitionKey(); + + modifications.put(update.metadata().id, update); + for (int i = 1; i < size; ++i) + { + update = PartitionUpdate.serializer.deserialize(teeIn, version, flag); + modifications.put(update.metadata().id, update); + } + m = new Mutation(update.metadata().keyspace, dk, modifications.build(), approxTime.now()); + } + + //Only cache serializations that don't hit the limit + if (!teeIn.isLimitReached()) + m.cachedSerializations[MessagingService.getVersionOrdinal(version)] = new CachedSerialization(dob.toByteArray()); + + return m; } - return new Mutation(update.metadata().keyspace, dk, modifications.build(), approxTime.now()); } public Mutation deserialize(DataInputPlus in, int version) throws IOException @@ -430,10 +529,71 @@ public class Mutation implements IMutation, Supplier<Mutation> public long serializedSize(Mutation mutation, int version) { - int size = TypeSizes.sizeofUnsignedVInt(mutation.modifications.size()); - for (Map.Entry<TableId, PartitionUpdate> entry : mutation.modifications.entrySet()) - size += PartitionUpdate.serializer.serializedSize(entry.getValue(), version); + return serialization(mutation, version).serializedSize(PartitionUpdate.serializer, mutation, version); + } + } + + /** + * There are two implementations of this class. One that keeps the serialized representation on-heap for later + * reuse and one that doesn't. Keeping all sized mutations around may lead to "bad" GC pressure (G1 GC) due to humongous objects. + * By default serialized mutations up to 2MB are kept on-heap - see {@link org.apache.cassandra.config.CassandraRelevantProperties#CACHEABLE_MUTATION_SIZE_LIMIT}. + */ + private static abstract class Serialization + { + abstract void serialize(PartitionUpdate.PartitionUpdateSerializer serializer, Mutation mutation, DataOutputPlus out, int version) throws IOException; + + abstract long serializedSize(PartitionUpdate.PartitionUpdateSerializer serializer, Mutation mutation, int version); + } + + /** + * Represents the cached serialization of a {@link Mutation} as a {@code byte[]}. + */ + private static final class CachedSerialization extends Serialization + { + private final byte[] serialized; + + CachedSerialization(byte[] serialized) + { + this.serialized = Preconditions.checkNotNull(serialized); + } + + @Override + void serialize(PartitionUpdate.PartitionUpdateSerializer serializer, Mutation mutation, DataOutputPlus out, int version) throws IOException + { + out.write(serialized); + } + @Override + long serializedSize(PartitionUpdate.PartitionUpdateSerializer serializer, Mutation mutation, int version) + { + return serialized.length; + } + } + + /** + * Represents a non-cacheable serialization of a {@link Mutation}, only the size of the mutation is lazily cached. + */ + private static final class SizeOnlyCacheableSerialization extends Serialization + { + private volatile long size; + + @Override + void serialize(PartitionUpdate.PartitionUpdateSerializer serializer, Mutation mutation, DataOutputPlus out, int version) throws IOException + { + MutationSerializer.serializeInternal(serializer, mutation, out, version); + } + + @Override + long serializedSize(PartitionUpdate.PartitionUpdateSerializer serializer, Mutation mutation, int version) + { + long size = this.size; + if (size == 0L) + { + size = TypeSizes.sizeofUnsignedVInt(mutation.modifications.size()); + for (PartitionUpdate partitionUpdate : mutation.modifications.values()) + size += serializer.serializedSize(partitionUpdate, version); + this.size = size; + } return size; } } diff --git a/src/java/org/apache/cassandra/io/util/TeeDataInputPlus.java b/src/java/org/apache/cassandra/io/util/TeeDataInputPlus.java new file mode 100644 index 0000000000..b16c1cc5f4 --- /dev/null +++ b/src/java/org/apache/cassandra/io/util/TeeDataInputPlus.java @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.util; + +import java.io.EOFException; +import java.io.IOException; + +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.utils.Throwables; + +/** + * DataInput that also stores the raw inputs into an output buffer + * This is useful for storing serialized buffers as they are deserialized. + * + * Note: If a non-zero limit is included it is important to for callers to check {@link #isLimitReached()} + * before using the tee buffer as it could be cropped. + */ +public class TeeDataInputPlus implements DataInputPlus +{ + private final DataInputPlus source; + private final DataOutputPlus teeBuffer; + + private final long limit; + private boolean limitReached; + + public TeeDataInputPlus(DataInputPlus source, DataOutputPlus teeBuffer) + { + this(source, teeBuffer, 0); + } + + public TeeDataInputPlus(DataInputPlus source, DataOutputPlus teeBuffer, long limit) + { + assert source != null && teeBuffer != null; + this.source = source; + this.teeBuffer = teeBuffer; + this.limit = limit; + this.limitReached = false; + } + + private void maybeWrite(int length, Throwables.DiscreteAction<IOException> writeAction) throws IOException + { + if (limit <= 0 || (!limitReached && (teeBuffer.position() + length) < limit)) + writeAction.perform(); + else + limitReached = true; + } + + @Override + public void readFully(byte[] bytes) throws IOException + { + source.readFully(bytes); + maybeWrite(bytes.length, () -> teeBuffer.write(bytes)); + } + + @Override + public void readFully(byte[] bytes, int offset, int length) throws IOException + { + source.readFully(bytes, offset, length); + maybeWrite(length, () -> teeBuffer.write(bytes, offset, length)); + } + + @Override + public int skipBytes(int n) throws IOException + { + for (int i = 0; i < n; i++) + { + try + { + byte v = source.readByte(); + maybeWrite(TypeSizes.BYTE_SIZE, () -> teeBuffer.writeByte(v)); + } + catch (EOFException eof) + { + return i; + } + } + return n; + } + + @Override + public boolean readBoolean() throws IOException + { + boolean v = source.readBoolean(); + maybeWrite(TypeSizes.BOOL_SIZE, () -> teeBuffer.writeBoolean(v)); + return v; + } + + @Override + public byte readByte() throws IOException + { + byte v = source.readByte(); + maybeWrite(TypeSizes.BYTE_SIZE, () -> teeBuffer.writeByte(v)); + return v; + } + + @Override + public int readUnsignedByte() throws IOException + { + int v = source.readUnsignedByte(); + maybeWrite(TypeSizes.BYTE_SIZE, () -> teeBuffer.writeByte(v)); + return v; + } + + @Override + public short readShort() throws IOException + { + short v = source.readShort(); + maybeWrite(TypeSizes.SHORT_SIZE, () -> teeBuffer.writeShort(v)); + return v; + } + + @Override + public int readUnsignedShort() throws IOException + { + int v = source.readUnsignedShort(); + maybeWrite(TypeSizes.SHORT_SIZE, () -> teeBuffer.writeShort(v)); + return v; + } + + @Override + public char readChar() throws IOException + { + char v = source.readChar(); + maybeWrite(TypeSizes.BYTE_SIZE, () -> teeBuffer.writeChar(v)); + return v; + } + + @Override + public int readInt() throws IOException + { + int v = source.readInt(); + maybeWrite(TypeSizes.INT_SIZE, () -> teeBuffer.writeInt(v)); + return v; + } + + @Override + public long readLong() throws IOException + { + long v = source.readLong(); + maybeWrite(TypeSizes.LONG_SIZE, () -> teeBuffer.writeLong(v)); + return v; + } + + @Override + public float readFloat() throws IOException + { + float v = source.readFloat(); + maybeWrite(TypeSizes.FLOAT_SIZE, () -> teeBuffer.writeFloat(v)); + return v; + } + + @Override + public double readDouble() throws IOException + { + double v = source.readDouble(); + maybeWrite(TypeSizes.DOUBLE_SIZE, () -> teeBuffer.writeDouble(v)); + return v; + } + + @Override + public String readLine() throws IOException + { + //This one isn't safe since we know the actual line termination type + throw new UnsupportedOperationException(); + } + + @Override + public String readUTF() throws IOException + { + String v = source.readUTF(); + maybeWrite(TypeSizes.sizeof(v), () -> teeBuffer.writeUTF(v)); + return v; + } + + @Override + public long readVInt() throws IOException + { + long v = source.readVInt(); + maybeWrite(TypeSizes.sizeofVInt(v), () -> teeBuffer.writeVInt(v)); + return v; + } + + @Override + public long readUnsignedVInt() throws IOException + { + long v = source.readUnsignedVInt(); + maybeWrite(TypeSizes.sizeofUnsignedVInt(v), () -> teeBuffer.writeUnsignedVInt(v)); + return v; + } + + @Override + public void skipBytesFully(int n) throws IOException + { + source.skipBytesFully(n); + maybeWrite(n, () -> { + for (int i = 0; i < n; i++) + teeBuffer.writeByte(0); + }); + } + + /** + * Used to detect if the teeBuffer hit the supplied limit. + * If true this means the teeBuffer does not contain the full input. + */ + public boolean isLimitReached() + { + return limitReached; + } +} diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index d968a0ce2b..650bc03e9e 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -20,10 +20,13 @@ package org.apache.cassandra.net; import java.io.IOException; import java.nio.channels.ClosedChannelException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; @@ -212,6 +215,23 @@ public class MessagingService extends MessagingServiceMBeanImpl public static final int current_version = VERSION_40; static AcceptVersions accept_messaging = new AcceptVersions(minimum_version, current_version); static AcceptVersions accept_streaming = new AcceptVersions(current_version, current_version); + static Map<Integer, Integer> versionOrdinalMap = Arrays.stream(Version.values()).collect(Collectors.toMap(v -> v.value, v -> v.ordinal())); + + /** + * This is an optimisation to speed up the translation of the serialization + * version to the {@link Version} enum ordinal. + * + * @param version the serialization version + * @return a {@link Version} ordinal value + */ + public static int getVersionOrdinal(int version) + { + Integer ordinal = versionOrdinalMap.get(version); + if (ordinal == null) + throw new IllegalStateException("Unkown serialization version: " + version); + + return ordinal; + } public enum Version { diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 3c20aad9ce..52039c1d9f 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -1475,6 +1475,15 @@ public class StorageProxy implements StorageProxyMBean List<InetAddressAndPort> backPressureHosts = null; + // For performance, Mutation caches serialized buffers that are computed lazily in serializedBuffer(). That + // computation is not synchronized however and we will potentially call that method concurrently for each + // dispatched message (not that concurrent calls to serializedBuffer() are "unsafe" per se, just that they + // may result in multiple computations, making the caching optimization moot). So forcing the serialization + // here to make sure it's already cached/computed when it's concurrently used later. + // Side note: we have one cached buffers for each used EncodingVersion and this only pre-compute the one for + // the current version, but it's just an optimization and we're ok not optimizing for mixed-version clusters. + Mutation.serializer.prepareSerializedBuffer(mutation, MessagingService.current_version); + for (Replica destination : plan.contacts()) { checkHintOverload(destination); diff --git a/test/unit/org/apache/cassandra/utils/TeeDataInputPlusTest.java b/test/unit/org/apache/cassandra/utils/TeeDataInputPlusTest.java new file mode 100644 index 0000000000..baac9ca73d --- /dev/null +++ b/test/unit/org/apache/cassandra/utils/TeeDataInputPlusTest.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.utils; + +import java.util.Arrays; + +import org.junit.Test; + +import org.apache.cassandra.io.util.DataInputBuffer; +import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.io.util.TeeDataInputPlus; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class TeeDataInputPlusTest +{ + @Test + public void testTeeBuffer() throws Exception + { + DataOutputBuffer out = new DataOutputBuffer(); + byte[] testData; + + // boolean + out.writeBoolean(true); + // byte + out.writeByte(0x1); + // char + out.writeChar('a'); + // short + out.writeShort(1); + // int + out.writeInt(1); + // long + out.writeLong(1L); + // float + out.writeFloat(1.0f); + // double + out.writeDouble(1.0d); + // vint + out.writeVInt(-1337L); + //unsigned vint + out.writeUnsignedVInt(1337L); + // String + out.writeUTF("abc"); + //Another string to test skip + out.writeUTF("garbagetoskipattheend"); + testData = out.toByteArray(); + + int LIMITED_SIZE = 40; + DataInputBuffer reader = new DataInputBuffer(testData); + DataInputBuffer reader2 = new DataInputBuffer(testData); + DataOutputBuffer teeOut = new DataOutputBuffer(); + DataOutputBuffer limitedTeeOut = new DataOutputBuffer(); + TeeDataInputPlus tee = new TeeDataInputPlus(reader, teeOut); + TeeDataInputPlus limitedTee = new TeeDataInputPlus(reader2, limitedTeeOut, LIMITED_SIZE); + + // boolean = 1byte + boolean bool = tee.readBoolean(); + assertTrue(bool); + bool = limitedTee.readBoolean(); + assertTrue(bool); + // byte = 1byte + byte b = tee.readByte(); + assertEquals(b, 0x1); + b = limitedTee.readByte(); + assertEquals(b, 0x1); + // char = 2byte + char c = tee.readChar(); + assertEquals('a', c); + c = limitedTee.readChar(); + assertEquals('a', c); + // short = 2bytes + short s = tee.readShort(); + assertEquals(1, s); + s = limitedTee.readShort(); + assertEquals(1, s); + // int = 4bytes + int i = tee.readInt(); + assertEquals(1, i); + i = limitedTee.readInt(); + assertEquals(1, i); + // long = 8bytes + long l = tee.readLong(); + assertEquals(1L, l); + l = limitedTee.readLong(); + assertEquals(1L, l); + // float = 4bytes + float f = tee.readFloat(); + assertEquals(1.0f, f, 0); + f = limitedTee.readFloat(); + assertEquals(1.0f, f, 0); + // double = 8bytes + double d = tee.readDouble(); + assertEquals(1.0d, d, 0); + d = limitedTee.readDouble(); + assertEquals(1.0d, d, 0); + long vint = tee.readVInt(); + assertEquals(-1337L, vint); + vint = limitedTee.readVInt(); + assertEquals(-1337L, vint); + long uvint = tee.readUnsignedVInt(); + assertEquals(1337L, uvint); + uvint = limitedTee.readUnsignedVInt(); + assertEquals(1337L, uvint); + // String("abc") = 2(string size) + 3 = 5 bytes + String str = tee.readUTF(); + assertEquals("abc", str); + str = limitedTee.readUTF(); + assertEquals("abc", str); + int skipped = tee.skipBytes(100); + assertEquals(23, skipped); + skipped = limitedTee.skipBytes(100); + assertEquals(23, skipped); + + byte[] teeData = teeOut.toByteArray(); + assertFalse(tee.isLimitReached()); + assertTrue(Arrays.equals(testData, teeData)); + + byte[] limitedTeeData = limitedTeeOut.toByteArray(); + assertTrue(limitedTee.isLimitReached()); + assertTrue(Arrays.equals(Arrays.copyOf(testData, LIMITED_SIZE - 1 ), limitedTeeData)); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org