Wait until the message is being send to decide which serializer must be used
patch by Benjamin Lerer; reviewed by Tyler Hobbs for CASSANDRA-11393 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fbd287ad Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fbd287ad Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fbd287ad Branch: refs/heads/cassandra-3.8 Commit: fbd287ad2ed09190dd9c6e152b82215e81020847 Parents: e99ee19 Author: Benjamin Lerer <b.le...@gmail.com> Authored: Thu Jul 14 11:33:08 2016 +0200 Committer: Benjamin Lerer <b.le...@gmail.com> Committed: Thu Jul 14 11:33:08 2016 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/config/DatabaseDescriptor.java | 1 + .../cassandra/db/PartitionRangeReadCommand.java | 7 +-- .../org/apache/cassandra/db/ReadCommand.java | 65 ++++++++++---------- .../org/apache/cassandra/db/ReadResponse.java | 43 +++++-------- .../db/SinglePartitionReadCommand.java | 2 +- .../io/ForwardingVersionedSerializer.java | 57 +++++++++++++++++ .../apache/cassandra/net/MessagingService.java | 6 +- 8 files changed, 113 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbd287ad/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 70210a8..3829046 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.9 + * Wait until the message is being send to decide which serializer must be used (CASSANDRA-11393) * Fix migration of static thrift column names with non-text comparators (CASSANDRA-12147) * Fix upgrading sparse tables that are incorrectly marked as dense (CASSANDRA-11315) * Fix reverse queries ignoring range tombstones (CASSANDRA-11733) http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbd287ad/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 100bcf4..b71ebf6 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -1124,6 +1124,7 @@ public class DatabaseDescriptor case READ: return getReadRpcTimeout(); case RANGE_SLICE: + case PAGED_RANGE: return getRangeRpcTimeout(); case TRUNCATE: return getTruncateRpcTimeout(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbd287ad/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java index 842ad5f..99e24c8 100644 --- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java +++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java @@ -253,12 +253,9 @@ public class PartitionRangeReadCommand extends ReadCommand public MessageOut<ReadCommand> createMessage(int version) { - if (version >= MessagingService.VERSION_30) - return new MessageOut<>(MessagingService.Verb.RANGE_SLICE, this, serializer); - return dataRange().isPaging() - ? new MessageOut<>(MessagingService.Verb.PAGED_RANGE, this, legacyPagedRangeCommandSerializer) - : new MessageOut<>(MessagingService.Verb.RANGE_SLICE, this, legacyRangeSliceCommandSerializer); + ? new MessageOut<>(MessagingService.Verb.PAGED_RANGE, this, pagedRangeSerializer) + : new MessageOut<>(MessagingService.Verb.RANGE_SLICE, this, rangeSliceSerializer); } protected void appendCQLWhereClause(StringBuilder sb) http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbd287ad/src/java/org/apache/cassandra/db/ReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java index c792a5a..36969f8 100644 --- a/src/java/org/apache/cassandra/db/ReadCommand.java +++ b/src/java/org/apache/cassandra/db/ReadCommand.java @@ -34,6 +34,7 @@ import org.apache.cassandra.db.transform.Transformation; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.index.Index; import org.apache.cassandra.index.IndexNotAvailableException; +import org.apache.cassandra.io.ForwardingVersionedSerializer; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; @@ -58,9 +59,39 @@ public abstract class ReadCommand implements ReadQuery protected static final Logger logger = LoggerFactory.getLogger(ReadCommand.class); public static final IVersionedSerializer<ReadCommand> serializer = new Serializer(); + + // For READ verb: will either dispatch on 'serializer' for 3.0 or 'legacyReadCommandSerializer' for earlier version. + // Can be removed (and replaced by 'serializer') once we drop pre-3.0 backward compatibility. + public static final IVersionedSerializer<ReadCommand> readSerializer = new ForwardingVersionedSerializer<ReadCommand>() + { + protected IVersionedSerializer<ReadCommand> delegate(int version) + { + return version < MessagingService.VERSION_30 + ? legacyReadCommandSerializer : serializer; + } + }; + // For RANGE_SLICE verb: will either dispatch on 'serializer' for 3.0 or 'legacyRangeSliceCommandSerializer' for earlier version. // Can be removed (and replaced by 'serializer') once we drop pre-3.0 backward compatibility. - public static final IVersionedSerializer<ReadCommand> rangeSliceSerializer = new RangeSliceSerializer(); + public static final IVersionedSerializer<ReadCommand> rangeSliceSerializer = new ForwardingVersionedSerializer<ReadCommand>() + { + protected IVersionedSerializer<ReadCommand> delegate(int version) + { + return version < MessagingService.VERSION_30 + ? legacyRangeSliceCommandSerializer : serializer; + } + }; + + // For PAGED_RANGE verb: will either dispatch on 'serializer' for 3.0 or 'legacyPagedRangeCommandSerializer' for earlier version. + // Can be removed (and replaced by 'serializer') once we drop pre-3.0 backward compatibility. + public static final IVersionedSerializer<ReadCommand> pagedRangeSerializer = new ForwardingVersionedSerializer<ReadCommand>() + { + protected IVersionedSerializer<ReadCommand> delegate(int version) + { + return version < MessagingService.VERSION_30 + ? legacyPagedRangeCommandSerializer : serializer; + } + }; public static final IVersionedSerializer<ReadCommand> legacyRangeSliceCommandSerializer = new LegacyRangeSliceCommandSerializer(); public static final IVersionedSerializer<ReadCommand> legacyPagedRangeCommandSerializer = new LegacyPagedRangeCommandSerializer(); @@ -567,7 +598,6 @@ public abstract class ReadCommand implements ReadQuery public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException { - // for serialization, createLegacyMessage() should cause legacyReadCommandSerializer to be used directly assert version >= MessagingService.VERSION_30; out.writeByte(command.kind.ordinal()); @@ -587,8 +617,7 @@ public abstract class ReadCommand implements ReadQuery public ReadCommand deserialize(DataInputPlus in, int version) throws IOException { - if (version < MessagingService.VERSION_30) - return legacyReadCommandSerializer.deserialize(in, version); + assert version >= MessagingService.VERSION_30; Kind kind = Kind.values()[in.readByte()]; int flags = in.readByte(); @@ -628,7 +657,6 @@ public abstract class ReadCommand implements ReadQuery public long serializedSize(ReadCommand command, int version) { - // for serialization, createLegacyMessage() should cause legacyReadCommandSerializer to be used directly assert version >= MessagingService.VERSION_30; return 2 // kind + flags @@ -643,33 +671,6 @@ public abstract class ReadCommand implements ReadQuery } } - // Dispatch to either Serializer or LegacyRangeSliceCommandSerializer. Only useful as long as we maintain pre-3.0 - // compatibility - private static class RangeSliceSerializer implements IVersionedSerializer<ReadCommand> - { - public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException - { - if (version < MessagingService.VERSION_30) - legacyRangeSliceCommandSerializer.serialize(command, out, version); - else - serializer.serialize(command, out, version); - } - - public ReadCommand deserialize(DataInputPlus in, int version) throws IOException - { - return version < MessagingService.VERSION_30 - ? legacyRangeSliceCommandSerializer.deserialize(in, version) - : serializer.deserialize(in, version); - } - - public long serializedSize(ReadCommand command, int version) - { - return version < MessagingService.VERSION_30 - ? legacyRangeSliceCommandSerializer.serializedSize(command, version) - : serializer.serializedSize(command, version); - } - } - private enum LegacyType { GET_BY_NAMES((byte)1), http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbd287ad/src/java/org/apache/cassandra/db/ReadResponse.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java b/src/java/org/apache/cassandra/db/ReadResponse.java index 8bd1be6..12a200f 100644 --- a/src/java/org/apache/cassandra/db/ReadResponse.java +++ b/src/java/org/apache/cassandra/db/ReadResponse.java @@ -32,6 +32,7 @@ import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.dht.*; +import org.apache.cassandra.io.ForwardingVersionedSerializer; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.DataInputPlus; @@ -46,11 +47,20 @@ public abstract class ReadResponse { // Serializer for single partition read response public static final IVersionedSerializer<ReadResponse> serializer = new Serializer(); - // Serializer for partition range read response (this actually delegate to 'serializer' in 3.0 and to - // 'legacyRangeSliceReplySerializer' in older version. - public static final IVersionedSerializer<ReadResponse> rangeSliceSerializer = new RangeSliceSerializer(); // Serializer for the pre-3.0 rang slice responses. public static final IVersionedSerializer<ReadResponse> legacyRangeSliceReplySerializer = new LegacyRangeSliceReplySerializer(); + // Serializer for partition range read response (this actually delegate to 'serializer' in 3.0 and to + // 'legacyRangeSliceReplySerializer' in older version. + public static final IVersionedSerializer<ReadResponse> rangeSliceSerializer = new ForwardingVersionedSerializer<ReadResponse>() + { + @Override + protected IVersionedSerializer<ReadResponse> delegate(int version) + { + return version < MessagingService.VERSION_30 + ? legacyRangeSliceReplySerializer + : serializer; + } + }; // This is used only when serializing data responses and we can't it easily in other cases. So this can be null, which is slighly // hacky, but as this hack doesn't escape this class, and it's easy enough to validate that it's not null when we need, it's "good enough". @@ -411,31 +421,6 @@ public abstract class ReadResponse } } - private static class RangeSliceSerializer implements IVersionedSerializer<ReadResponse> - { - public void serialize(ReadResponse response, DataOutputPlus out, int version) throws IOException - { - if (version < MessagingService.VERSION_30) - legacyRangeSliceReplySerializer.serialize(response, out, version); - else - serializer.serialize(response, out, version); - } - - public ReadResponse deserialize(DataInputPlus in, int version) throws IOException - { - return version < MessagingService.VERSION_30 - ? legacyRangeSliceReplySerializer.deserialize(in, version) - : serializer.deserialize(in, version); - } - - public long serializedSize(ReadResponse response, int version) - { - return version < MessagingService.VERSION_30 - ? legacyRangeSliceReplySerializer.serializedSize(response, version) - : serializer.serializedSize(response, version); - } - } - private static class LegacyRangeSliceReplySerializer implements IVersionedSerializer<ReadResponse> { public void serialize(ReadResponse response, DataOutputPlus out, int version) throws IOException @@ -477,6 +462,8 @@ public abstract class ReadResponse public ReadResponse deserialize(DataInputPlus in, int version) throws IOException { + assert version < MessagingService.VERSION_30; + int partitionCount = in.readInt(); ArrayList<ImmutableBTreePartition> partitions = new ArrayList<>(partitionCount); for (int i = 0; i < partitionCount; i++) http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbd287ad/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java index 6784770..73eb9bd 100644 --- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java @@ -894,7 +894,7 @@ public class SinglePartitionReadCommand extends ReadCommand public MessageOut<ReadCommand> createMessage(int version) { - return new MessageOut<>(MessagingService.Verb.READ, this, version < MessagingService.VERSION_30 ? legacyReadCommandSerializer : serializer); + return new MessageOut<>(MessagingService.Verb.READ, this, readSerializer); } protected void appendCQLWhereClause(StringBuilder sb) http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbd287ad/src/java/org/apache/cassandra/io/ForwardingVersionedSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/ForwardingVersionedSerializer.java b/src/java/org/apache/cassandra/io/ForwardingVersionedSerializer.java new file mode 100644 index 0000000..64f91d7 --- /dev/null +++ b/src/java/org/apache/cassandra/io/ForwardingVersionedSerializer.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io; + +import java.io.IOException; + +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; + +/** + * A serializer which forwards all its method calls to another serializer. Subclasses should override one or more + * methods to modify the behavior of the backing serializer as desired per the decorator pattern. + */ +public abstract class ForwardingVersionedSerializer<T> implements IVersionedSerializer<T> +{ + protected ForwardingVersionedSerializer() + { + } + + /** + * Returns the backing delegate instance that methods are forwarded to. + * + * @param version the server version + * @return the backing delegate instance that methods are forwarded to. + */ + protected abstract IVersionedSerializer<T> delegate(int version); + + public void serialize(T t, DataOutputPlus out, int version) throws IOException + { + delegate(version).serialize(t, out, version); + } + + public T deserialize(DataInputPlus in, int version) throws IOException + { + return delegate(version).deserialize(in, version); + } + + public long serializedSize(T t, int version) + { + return delegate(version).serializedSize(t, version); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbd287ad/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index fac46eb..d01419f 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -215,9 +215,9 @@ public final class MessagingService implements MessagingServiceMBean put(Verb.MUTATION, Mutation.serializer); put(Verb.READ_REPAIR, Mutation.serializer); - put(Verb.READ, ReadCommand.serializer); + put(Verb.READ, ReadCommand.readSerializer); put(Verb.RANGE_SLICE, ReadCommand.rangeSliceSerializer); - put(Verb.PAGED_RANGE, ReadCommand.legacyPagedRangeCommandSerializer); + put(Verb.PAGED_RANGE, ReadCommand.pagedRangeSerializer); put(Verb.BOOTSTRAP_TOKEN, BootStrapper.StringSerializer.instance); put(Verb.REPAIR_MESSAGE, RepairMessage.serializer); put(Verb.GOSSIP_DIGEST_ACK, GossipDigestAck.serializer); @@ -247,7 +247,7 @@ public final class MessagingService implements MessagingServiceMBean put(Verb.READ_REPAIR, WriteResponse.serializer); put(Verb.COUNTER_MUTATION, WriteResponse.serializer); put(Verb.RANGE_SLICE, ReadResponse.rangeSliceSerializer); - put(Verb.PAGED_RANGE, ReadResponse.legacyRangeSliceReplySerializer); + put(Verb.PAGED_RANGE, ReadResponse.rangeSliceSerializer); put(Verb.READ, ReadResponse.serializer); put(Verb.TRUNCATE, TruncateResponse.serializer); put(Verb.SNAPSHOT, null);