Updated Branches: refs/heads/trunk b21a0dab5 -> 4f5242cfb
Remove 1.2 network compatibility code patch by Aleksey Yeschenko; reviewed by Sylvain Lebresne for CASSANDRA-5960 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4f5242cf Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4f5242cf Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4f5242cf Branch: refs/heads/trunk Commit: 4f5242cfbfb302e8099cb514ea78f134fef84d45 Parents: b21a0da Author: Aleksey Yeschenko <alek...@apache.org> Authored: Tue Sep 10 17:13:01 2013 +0300 Committer: Aleksey Yeschenko <alek...@apache.org> Committed: Tue Sep 10 17:13:01 2013 +0300 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../apache/cassandra/db/RangeSliceCommand.java | 130 ++++--------------- .../org/apache/cassandra/db/ReadCommand.java | 46 +------ .../cassandra/db/SliceByNamesReadCommand.java | 102 +++------------ .../cassandra/db/SliceFromReadCommand.java | 89 +++---------- .../apache/cassandra/db/filter/QueryPath.java | 111 ---------------- .../cassandra/net/IncomingTcpConnection.java | 15 +-- .../apache/cassandra/service/StorageProxy.java | 2 +- 8 files changed, 73 insertions(+), 424 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f5242cf/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index d1440cc..1f20fa0 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -2,6 +2,8 @@ * change logging from log4j to logback (CASSANDRA-5883) * switch to LZ4 compression for internode communication (CASSANDRA-5887) * Stop using Thrift-generated Index* classes internally (CASSANDRA-5971) + * Remove 1.2 network compatibility code (CASSANDRA-5960) + 2.0.1 * Improve error message when yaml contains invalid properties (CASSANDRA-5958) http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f5242cf/src/java/org/apache/cassandra/db/RangeSliceCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RangeSliceCommand.java b/src/java/org/apache/cassandra/db/RangeSliceCommand.java index 5e8788c..28b86f8 100644 --- a/src/java/org/apache/cassandra/db/RangeSliceCommand.java +++ b/src/java/org/apache/cassandra/db/RangeSliceCommand.java @@ -20,16 +20,15 @@ package org.apache.cassandra.db; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import com.google.common.base.Objects; + import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.filter.ExtendedFilter; import org.apache.cassandra.db.filter.IDiskAtomFilter; -import org.apache.cassandra.db.marshal.AbstractType; -import org.apache.cassandra.db.marshal.CompositeType; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.net.MessageOut; @@ -46,37 +45,37 @@ public class RangeSliceCommand extends AbstractRangeCommand implements Pageable public final boolean isPaging; public RangeSliceCommand(String keyspace, - String column_family, + String columnFamily, long timestamp, IDiskAtomFilter predicate, AbstractBounds<RowPosition> range, int maxResults) { - this(keyspace, column_family, timestamp, predicate, range, null, maxResults, false, false); + this(keyspace, columnFamily, timestamp, predicate, range, null, maxResults, false, false); } public RangeSliceCommand(String keyspace, - String column_family, + String columnFamily, long timestamp, IDiskAtomFilter predicate, AbstractBounds<RowPosition> range, List<IndexExpression> row_filter, int maxResults) { - this(keyspace, column_family, timestamp, predicate, range, row_filter, maxResults, false, false); + this(keyspace, columnFamily, timestamp, predicate, range, row_filter, maxResults, false, false); } public RangeSliceCommand(String keyspace, - String column_family, + String columnFamily, long timestamp, IDiskAtomFilter predicate, AbstractBounds<RowPosition> range, - List<IndexExpression> row_filter, + List<IndexExpression> rowFilter, int maxResults, boolean countCQL3Rows, boolean isPaging) { - super(keyspace, column_family, timestamp, range, predicate, row_filter); + super(keyspace, columnFamily, timestamp, range, predicate, rowFilter); this.maxResults = maxResults; this.countCQL3Rows = countCQL3Rows; this.isPaging = isPaging; @@ -84,7 +83,7 @@ public class RangeSliceCommand extends AbstractRangeCommand implements Pageable public MessageOut<RangeSliceCommand> createMessage() { - return new MessageOut<RangeSliceCommand>(MessagingService.Verb.RANGE_SLICE, this, serializer); + return new MessageOut<>(MessagingService.Verb.RANGE_SLICE, this, serializer); } public AbstractRangeCommand forSubRange(AbstractBounds<RowPosition> subRange) @@ -137,16 +136,16 @@ public class RangeSliceCommand extends AbstractRangeCommand implements Pageable @Override public String toString() { - return "RangeSliceCommand{" + - "keyspace='" + keyspace + '\'' + - ", columnFamily='" + columnFamily + '\'' + - ", timestamp=" + timestamp + - ", predicate=" + predicate + - ", range=" + keyRange + - ", rowFilter =" + rowFilter + - ", maxResults=" + maxResults + - ", countCQL3Rows=" + countCQL3Rows + - "}"; + return Objects.toStringHelper(this) + .add("keyspace", keyspace) + .add("columnFamily", columnFamily) + .add("predicate", predicate) + .add("keyRange", keyRange) + .add("rowFilter", rowFilter) + .add("maxResults", maxResults) + .add("counterCQL3Rows", countCQL3Rows) + .add("timestamp", timestamp) + .toString(); } } @@ -156,31 +155,9 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm { out.writeUTF(sliceCommand.keyspace); out.writeUTF(sliceCommand.columnFamily); + out.writeLong(sliceCommand.timestamp); - if (version >= MessagingService.VERSION_20) - out.writeLong(sliceCommand.timestamp); - - IDiskAtomFilter filter = sliceCommand.predicate; - if (version < MessagingService.VERSION_20) - { - // Pre-2.0, we need to know if it's a super column. If it is, we - // must extract the super column name from the predicate (and - // modify the predicate accordingly) - ByteBuffer sc = null; - CFMetaData metadata = Schema.instance.getCFMetaData(sliceCommand.getKeyspace(), sliceCommand.columnFamily); - if (metadata.cfType == ColumnFamilyType.Super) - { - SuperColumns.SCFilter scFilter = SuperColumns.filterToSC((CompositeType)metadata.comparator, filter); - sc = scFilter.scName; - filter = scFilter.updatedFilter; - } - - out.writeInt(sc == null ? 0 : sc.remaining()); - if (sc != null) - ByteBufferUtil.write(sc, out); - } - - IDiskAtomFilter.Serializer.instance.serialize(filter, out, version); + IDiskAtomFilter.Serializer.instance.serialize(sliceCommand.predicate, out, version); if (sliceCommand.rowFilter == null) { @@ -206,47 +183,15 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm { String keyspace = in.readUTF(); String columnFamily = in.readUTF(); - - long timestamp = version < MessagingService.VERSION_20 ? System.currentTimeMillis() : in.readLong(); + long timestamp = in.readLong(); CFMetaData metadata = Schema.instance.getCFMetaData(keyspace, columnFamily); - IDiskAtomFilter predicate; - if (version < MessagingService.VERSION_20) - { - int scLength = in.readInt(); - ByteBuffer superColumn = null; - if (scLength > 0) - { - byte[] buf = new byte[scLength]; - in.readFully(buf); - superColumn = ByteBuffer.wrap(buf); - } - - AbstractType<?> comparator; - if (metadata.cfType == ColumnFamilyType.Super) - { - CompositeType type = (CompositeType)metadata.comparator; - comparator = superColumn == null ? type.types.get(0) : type.types.get(1); - } - else - { - comparator = metadata.comparator; - } - - predicate = IDiskAtomFilter.Serializer.instance.deserialize(in, version, comparator); - - if (metadata.cfType == ColumnFamilyType.Super) - predicate = SuperColumns.fromSCFilter((CompositeType)metadata.comparator, superColumn, predicate); - } - else - { - predicate = IDiskAtomFilter.Serializer.instance.deserialize(in, version, metadata.comparator); - } + IDiskAtomFilter predicate = IDiskAtomFilter.Serializer.instance.deserialize(in, version, metadata.comparator); List<IndexExpression> rowFilter; int filterCount = in.readInt(); - rowFilter = new ArrayList<IndexExpression>(filterCount); + rowFilter = new ArrayList<>(filterCount); for (int i = 0; i < filterCount; i++) { IndexExpression expr; @@ -267,32 +212,9 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm { long size = TypeSizes.NATIVE.sizeof(rsc.keyspace); size += TypeSizes.NATIVE.sizeof(rsc.columnFamily); - - if (version >= MessagingService.VERSION_20) - size += TypeSizes.NATIVE.sizeof(rsc.timestamp); + size += TypeSizes.NATIVE.sizeof(rsc.timestamp); IDiskAtomFilter filter = rsc.predicate; - if (version < MessagingService.VERSION_20) - { - ByteBuffer sc = null; - CFMetaData metadata = Schema.instance.getCFMetaData(rsc.keyspace, rsc.columnFamily); - if (metadata.cfType == ColumnFamilyType.Super) - { - SuperColumns.SCFilter scFilter = SuperColumns.filterToSC((CompositeType)metadata.comparator, filter); - sc = scFilter.scName; - filter = scFilter.updatedFilter; - } - - if (sc != null) - { - size += TypeSizes.NATIVE.sizeof(sc.remaining()); - size += sc.remaining(); - } - else - { - size += TypeSizes.NATIVE.sizeof(0); - } - } size += IDiskAtomFilter.Serializer.instance.serializedSize(filter, version); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f5242cf/src/java/org/apache/cassandra/db/ReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java index cadcd7d..b6f954e 100644 --- a/src/java/org/apache/cassandra/db/ReadCommand.java +++ b/src/java/org/apache/cassandra/db/ReadCommand.java @@ -22,13 +22,10 @@ import java.io.DataOutput; import java.io.IOException; import java.nio.ByteBuffer; -import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.filter.IDiskAtomFilter; import org.apache.cassandra.db.filter.NamesQueryFilter; import org.apache.cassandra.db.filter.SliceQueryFilter; -import org.apache.cassandra.db.marshal.CompositeType; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; @@ -60,7 +57,7 @@ public abstract class ReadCommand implements IReadCommand, Pageable public MessageOut<ReadCommand> createMessage() { - return new MessageOut<ReadCommand>(MessagingService.Verb.READ, this, serializer); + return new MessageOut<>(MessagingService.Verb.READ, this, serializer); } public final String ksName; @@ -135,31 +132,14 @@ class ReadCommandSerializer implements IVersionedSerializer<ReadCommand> { public void serialize(ReadCommand command, DataOutput out, int version) throws IOException { - // For super columns, when talking to an older node, we need to translate the filter used. - // That translation can change the filter type (names -> slice), and so change the command type. - // Hence we need to detect that early on, before we've written the command type. - ReadCommand newCommand = command; - ByteBuffer superColumn = null; - if (version < MessagingService.VERSION_20) - { - CFMetaData metadata = Schema.instance.getCFMetaData(command.ksName, command.cfName); - if (metadata.cfType == ColumnFamilyType.Super) - { - SuperColumns.SCFilter scFilter = SuperColumns.filterToSC((CompositeType)metadata.comparator, command.filter()); - newCommand = ReadCommand.create(command.ksName, command.key, command.cfName, command.timestamp, scFilter.updatedFilter); - newCommand.setDigestQuery(command.isDigestQuery()); - superColumn = scFilter.scName; - } - } - - out.writeByte(newCommand.commandType.serializedValue); + out.writeByte(command.commandType.serializedValue); switch (command.commandType) { case GET_BY_NAMES: - SliceByNamesReadCommand.serializer.serialize(newCommand, superColumn, out, version); + SliceByNamesReadCommand.serializer.serialize(command, out, version); break; case GET_SLICES: - SliceFromReadCommand.serializer.serialize(newCommand, superColumn, out, version); + SliceFromReadCommand.serializer.serialize(command, out, version); break; default: throw new AssertionError(); @@ -182,26 +162,12 @@ class ReadCommandSerializer implements IVersionedSerializer<ReadCommand> public long serializedSize(ReadCommand command, int version) { - ReadCommand newCommand = command; - ByteBuffer superColumn = null; - if (version < MessagingService.VERSION_20) - { - CFMetaData metadata = Schema.instance.getCFMetaData(command.ksName, command.cfName); - if (metadata.cfType == ColumnFamilyType.Super) - { - SuperColumns.SCFilter scFilter = SuperColumns.filterToSC((CompositeType)metadata.comparator, command.filter()); - newCommand = ReadCommand.create(command.ksName, command.key, command.cfName, command.timestamp, scFilter.updatedFilter); - newCommand.setDigestQuery(command.isDigestQuery()); - superColumn = scFilter.scName; - } - } - switch (command.commandType) { case GET_BY_NAMES: - return 1 + SliceByNamesReadCommand.serializer.serializedSize(newCommand, superColumn, version); + return 1 + SliceByNamesReadCommand.serializer.serializedSize(command, version); case GET_SLICES: - return 1 + SliceFromReadCommand.serializer.serializedSize(newCommand, superColumn, version); + return 1 + SliceFromReadCommand.serializer.serializedSize(command, version); default: throw new AssertionError(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f5242cf/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java b/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java index ae3db78..60487c8 100644 --- a/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java +++ b/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java @@ -20,13 +20,12 @@ package org.apache.cassandra.db; import java.io.*; import java.nio.ByteBuffer; +import com.google.common.base.Objects; + import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.filter.*; -import org.apache.cassandra.db.marshal.AbstractType; -import org.apache.cassandra.db.marshal.CompositeType; import org.apache.cassandra.io.IVersionedSerializer; -import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; @@ -58,13 +57,13 @@ public class SliceByNamesReadCommand extends ReadCommand @Override public String toString() { - return "SliceByNamesReadCommand(" + - "keyspace='" + ksName + '\'' + - ", key=" + ByteBufferUtil.bytesToHex(key) + - ", cfName='" + cfName + '\'' + - ", timestamp='" + timestamp + '\'' + - ", filter=" + filter + - ')'; + return Objects.toStringHelper(this) + .add("ksName", ksName) + .add("cfName", cfName) + .add("key", ByteBufferUtil.bytesToHex(key)) + .add("filter", filter) + .add("timestamp", timestamp) + .toString(); } public IDiskAtomFilter filter() @@ -77,24 +76,12 @@ class SliceByNamesReadCommandSerializer implements IVersionedSerializer<ReadComm { public void serialize(ReadCommand cmd, DataOutput out, int version) throws IOException { - serialize(cmd, null, out, version); - } - - public void serialize(ReadCommand cmd, ByteBuffer superColumn, DataOutput out, int version) throws IOException - { SliceByNamesReadCommand command = (SliceByNamesReadCommand) cmd; out.writeBoolean(command.isDigestQuery()); out.writeUTF(command.ksName); ByteBufferUtil.writeWithShortLength(command.key, out); - - if (version < MessagingService.VERSION_20) - new QueryPath(command.cfName, superColumn).serialize(out); - else - out.writeUTF(command.cfName); - - if (version >= MessagingService.VERSION_20) - out.writeLong(cmd.timestamp); - + out.writeUTF(command.cfName); + out.writeLong(cmd.timestamp); NamesQueryFilter.serializer.serialize(command.filter, out, version); } @@ -103,65 +90,17 @@ class SliceByNamesReadCommandSerializer implements IVersionedSerializer<ReadComm boolean isDigest = in.readBoolean(); String keyspaceName = in.readUTF(); ByteBuffer key = ByteBufferUtil.readWithShortLength(in); - - String cfName; - ByteBuffer sc = null; - if (version < MessagingService.VERSION_20) - { - QueryPath path = QueryPath.deserialize(in); - cfName = path.columnFamilyName; - sc = path.superColumnName; - } - else - { - cfName = in.readUTF(); - } - - long timestamp = version < MessagingService.VERSION_20 ? System.currentTimeMillis() : in.readLong(); - + String cfName = in.readUTF(); + long timestamp = in.readLong(); CFMetaData metadata = Schema.instance.getCFMetaData(keyspaceName, cfName); - ReadCommand command; - if (version < MessagingService.VERSION_20) - { - AbstractType<?> comparator; - if (metadata.cfType == ColumnFamilyType.Super) - { - CompositeType type = (CompositeType)metadata.comparator; - comparator = sc == null ? type.types.get(0) : type.types.get(1); - } - else - { - comparator = metadata.comparator; - } - - IDiskAtomFilter filter = NamesQueryFilter.serializer.deserialize(in, version, comparator); - - if (metadata.cfType == ColumnFamilyType.Super) - filter = SuperColumns.fromSCFilter((CompositeType)metadata.comparator, sc, filter); - - // Due to SC compat, it's possible we get back a slice filter at this point - if (filter instanceof NamesQueryFilter) - command = new SliceByNamesReadCommand(keyspaceName, key, cfName, timestamp, (NamesQueryFilter)filter); - else - command = new SliceFromReadCommand(keyspaceName, key, cfName, timestamp, (SliceQueryFilter)filter); - } - else - { - NamesQueryFilter filter = NamesQueryFilter.serializer.deserialize(in, version, metadata.comparator); - command = new SliceByNamesReadCommand(keyspaceName, key, cfName, timestamp, filter); - } - + NamesQueryFilter filter = NamesQueryFilter.serializer.deserialize(in, version, metadata.comparator); + ReadCommand command = new SliceByNamesReadCommand(keyspaceName, key, cfName, timestamp, filter); command.setDigestQuery(isDigest); return command; } public long serializedSize(ReadCommand cmd, int version) { - return serializedSize(cmd, null, version); - } - - public long serializedSize(ReadCommand cmd, ByteBuffer superColumn, int version) - { TypeSizes sizes = TypeSizes.NATIVE; SliceByNamesReadCommand command = (SliceByNamesReadCommand) cmd; int size = sizes.sizeof(command.isDigestQuery()); @@ -169,15 +108,8 @@ class SliceByNamesReadCommandSerializer implements IVersionedSerializer<ReadComm size += sizes.sizeof(command.ksName); size += sizes.sizeof((short)keySize) + keySize; - - if (version < MessagingService.VERSION_20) - size += new QueryPath(command.cfName, superColumn).serializedSize(sizes); - else - size += sizes.sizeof(command.cfName); - - if (version >= MessagingService.VERSION_20) - size += sizes.sizeof(cmd.timestamp); - + size += sizes.sizeof(command.cfName); + size += sizes.sizeof(cmd.timestamp); size += NamesQueryFilter.serializer.serializedSize(command.filter, version); return size; http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f5242cf/src/java/org/apache/cassandra/db/SliceFromReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java index 7526796..72de2ca 100644 --- a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java +++ b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java @@ -22,26 +22,18 @@ import java.io.DataOutput; import java.io.IOException; import java.nio.ByteBuffer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import com.google.common.base.Objects; -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.filter.IDiskAtomFilter; import org.apache.cassandra.db.filter.QueryFilter; -import org.apache.cassandra.db.filter.QueryPath; import org.apache.cassandra.db.filter.SliceQueryFilter; -import org.apache.cassandra.db.marshal.CompositeType; import org.apache.cassandra.io.IVersionedSerializer; -import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.RowDataResolver; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; public class SliceFromReadCommand extends ReadCommand { - static final Logger logger = LoggerFactory.getLogger(SliceFromReadCommand.class); - static final SliceFromReadCommandSerializer serializer = new SliceFromReadCommandSerializer(); public final SliceQueryFilter filter; @@ -124,13 +116,13 @@ public class SliceFromReadCommand extends ReadCommand @Override public String toString() { - return "SliceFromReadCommand(" + - "keyspace='" + ksName + '\'' + - ", key='" + ByteBufferUtil.bytesToHex(key) + '\'' + - ", cfName='" + cfName + '\'' + - ", timestamp='" + timestamp + '\'' + - ", filter='" + filter + '\'' + - ')'; + return Objects.toStringHelper(this) + .add("ksName", ksName) + .add("cfName", cfName) + .add("key", ByteBufferUtil.bytesToHex(key)) + .add("filter", filter) + .add("timestamp", timestamp) + .toString(); } } @@ -138,24 +130,12 @@ class SliceFromReadCommandSerializer implements IVersionedSerializer<ReadCommand { public void serialize(ReadCommand rm, DataOutput out, int version) throws IOException { - serialize(rm, null, out, version); - } - - public void serialize(ReadCommand rm, ByteBuffer superColumn, DataOutput out, int version) throws IOException - { SliceFromReadCommand realRM = (SliceFromReadCommand)rm; out.writeBoolean(realRM.isDigestQuery()); out.writeUTF(realRM.ksName); ByteBufferUtil.writeWithShortLength(realRM.key, out); - - if (version < MessagingService.VERSION_20) - new QueryPath(realRM.cfName, superColumn).serialize(out); - else - out.writeUTF(realRM.cfName); - - if (version >= MessagingService.VERSION_20) - out.writeLong(realRM.timestamp); - + out.writeUTF(realRM.cfName); + out.writeLong(realRM.timestamp); SliceQueryFilter.serializer.serialize(realRM.filter, out, version); } @@ -164,36 +144,9 @@ class SliceFromReadCommandSerializer implements IVersionedSerializer<ReadCommand boolean isDigest = in.readBoolean(); String keyspaceName = in.readUTF(); ByteBuffer key = ByteBufferUtil.readWithShortLength(in); - - String cfName; - ByteBuffer sc = null; - if (version < MessagingService.VERSION_20) - { - QueryPath path = QueryPath.deserialize(in); - cfName = path.columnFamilyName; - sc = path.superColumnName; - } - else - { - cfName = in.readUTF(); - } - - long timestamp = version < MessagingService.VERSION_20 ? System.currentTimeMillis() : in.readLong(); - - CFMetaData metadata = Schema.instance.getCFMetaData(keyspaceName, cfName); - SliceQueryFilter filter; - if (version < MessagingService.VERSION_20) - { - filter = SliceQueryFilter.serializer.deserialize(in, version); - - if (metadata.cfType == ColumnFamilyType.Super) - filter = SuperColumns.fromSCSliceFilter((CompositeType)metadata.comparator, sc, filter); - } - else - { - filter = SliceQueryFilter.serializer.deserialize(in, version); - } - + String cfName = in.readUTF(); + long timestamp = in.readLong(); + SliceQueryFilter filter = SliceQueryFilter.serializer.deserialize(in, version); ReadCommand command = new SliceFromReadCommand(keyspaceName, key, cfName, timestamp, filter); command.setDigestQuery(isDigest); return command; @@ -201,11 +154,6 @@ class SliceFromReadCommandSerializer implements IVersionedSerializer<ReadCommand public long serializedSize(ReadCommand cmd, int version) { - return serializedSize(cmd, null, version); - } - - public long serializedSize(ReadCommand cmd, ByteBuffer superColumn, int version) - { TypeSizes sizes = TypeSizes.NATIVE; SliceFromReadCommand command = (SliceFromReadCommand) cmd; int keySize = command.key.remaining(); @@ -213,15 +161,8 @@ class SliceFromReadCommandSerializer implements IVersionedSerializer<ReadCommand int size = sizes.sizeof(cmd.isDigestQuery()); // boolean size += sizes.sizeof(command.ksName); size += sizes.sizeof((short) keySize) + keySize; - - if (version < MessagingService.VERSION_20) - size += new QueryPath(command.cfName, superColumn).serializedSize(sizes); - else - size += sizes.sizeof(command.cfName); - - if (version >= MessagingService.VERSION_20) - size += sizes.sizeof(cmd.timestamp); - + size += sizes.sizeof(command.cfName); + size += sizes.sizeof(cmd.timestamp); size += SliceQueryFilter.serializer.serializedSize(command.filter, version); return size; http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f5242cf/src/java/org/apache/cassandra/db/filter/QueryPath.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/QueryPath.java b/src/java/org/apache/cassandra/db/filter/QueryPath.java deleted file mode 100644 index 26d15a1..0000000 --- a/src/java/org/apache/cassandra/db/filter/QueryPath.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db.filter; - -import java.io.*; -import java.nio.ByteBuffer; - -import org.apache.cassandra.db.TypeSizes; -import org.apache.cassandra.utils.ByteBufferUtil; - -/** - * This class is obsolete internally, but kept for wire compatibility with - * older nodes. I.e. we kept it only for the serialization part. - */ -public class QueryPath -{ - public final String columnFamilyName; - public final ByteBuffer superColumnName; - public final ByteBuffer columnName; - - public QueryPath(String columnFamilyName, ByteBuffer superColumnName, ByteBuffer columnName) - { - this.columnFamilyName = columnFamilyName; - this.superColumnName = superColumnName; - this.columnName = columnName; - } - - public QueryPath(String columnFamilyName, ByteBuffer superColumnName) - { - this(columnFamilyName, superColumnName, null); - } - - @Override - public String toString() - { - return getClass().getSimpleName() + "(" + - "columnFamilyName='" + columnFamilyName + '\'' + - ", superColumnName='" + superColumnName + '\'' + - ", columnName='" + columnName + '\'' + - ')'; - } - - public void serialize(DataOutput out) throws IOException - { - assert !"".equals(columnFamilyName); - assert superColumnName == null || superColumnName.remaining() > 0; - assert columnName == null || columnName.remaining() > 0; - out.writeUTF(columnFamilyName == null ? "" : columnFamilyName); - ByteBufferUtil.writeWithShortLength(superColumnName == null ? ByteBufferUtil.EMPTY_BYTE_BUFFER : superColumnName, out); - ByteBufferUtil.writeWithShortLength(columnName == null ? ByteBufferUtil.EMPTY_BYTE_BUFFER : columnName, out); - } - - public static QueryPath deserialize(DataInput din) throws IOException - { - String cfName = din.readUTF(); - ByteBuffer scName = ByteBufferUtil.readWithShortLength(din); - ByteBuffer cName = ByteBufferUtil.readWithShortLength(din); - return new QueryPath(cfName.isEmpty() ? null : cfName, - scName.remaining() == 0 ? null : scName, - cName.remaining() == 0 ? null : cName); - } - - public int serializedSize(TypeSizes typeSizes) - { - int size = 0; - - if (columnFamilyName == null) - size += typeSizes.sizeof((short) 0); - else - size += typeSizes.sizeof(columnFamilyName); - - if (superColumnName == null) - { - size += typeSizes.sizeof((short) 0); - } - else - { - int scNameSize = superColumnName.remaining(); - size += typeSizes.sizeof((short) scNameSize); - size += scNameSize; - } - - if (columnName == null) - { - size += typeSizes.sizeof((short) 0); - } - else - { - int cNameSize = columnName.remaining(); - size += typeSizes.sizeof((short) cNameSize); - size += cNameSize; - } - - return size; - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f5242cf/src/java/org/apache/cassandra/net/IncomingTcpConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java index 3b0bc8f..3524a69 100644 --- a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java +++ b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java @@ -73,10 +73,12 @@ public class IncomingTcpConnection extends Thread { try { - if (version < MessagingService.VERSION_12) - handleLegacyVersion(); - else - handleModernVersion(); + if (version < MessagingService.VERSION_20) + throw new UnsupportedOperationException(String.format("Unable to read obsolete message version %s; " + + "The earliest version supported is 2.0.0", + version)); + + handleModernVersion(); } catch (EOFException e) { @@ -141,11 +143,6 @@ public class IncomingTcpConnection extends Thread } } - private void handleLegacyVersion() - { - throw new UnsupportedOperationException("Unable to read obsolete message version " + version + "; the earliest version supported is 1.2.0"); - } - private InetAddress receiveMessage(DataInputStream input, int version) throws IOException { int id; http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f5242cf/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 50dfd07..cff4b02 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -837,7 +837,7 @@ public class StorageProxy implements StorageProxyMBean String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(destination); // direct writes to local DC or old Cassandra versions // (1.1 knows how to forward old-style String message IDs; updated to int in 2.0) - if (localDataCenter.equals(dc) || MessagingService.instance().getVersion(destination) < MessagingService.VERSION_20) + if (localDataCenter.equals(dc)) { MessagingService.instance().sendRR(message, destination, responseHandler); }