http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/db/marshal/UserType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/marshal/UserType.java b/src/java/org/apache/cassandra/db/marshal/UserType.java index cd181cc..176ab84 100644 --- a/src/java/org/apache/cassandra/db/marshal/UserType.java +++ b/src/java/org/apache/cassandra/db/marshal/UserType.java @@ -29,6 +29,7 @@ import org.apache.cassandra.db.rows.CellPath; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.exceptions.SyntaxException; import org.apache.cassandra.serializers.MarshalException; +import org.apache.cassandra.transport.ProtocolVersion; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.Pair; import org.slf4j.Logger; @@ -143,7 +144,7 @@ public class UserType extends TupleType return ShortType.instance; } - public ByteBuffer serializeForNativeProtocol(Iterator<Cell> cells, int protocolVersion) + public ByteBuffer serializeForNativeProtocol(Iterator<Cell> cells, ProtocolVersion protocolVersion) { assert isMultiCell; @@ -249,7 +250,7 @@ public class UserType extends TupleType } @Override - public String toJSONString(ByteBuffer buffer, int protocolVersion) + public String toJSONString(ByteBuffer buffer, ProtocolVersion protocolVersion) { ByteBuffer[] buffers = split(buffer); StringBuilder sb = new StringBuilder("{");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/schema/SchemaKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java index f1ee3c1..2dffe58 100644 --- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java +++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java @@ -44,7 +44,7 @@ import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.view.View; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.exceptions.InvalidRequestException; -import org.apache.cassandra.transport.Server; +import org.apache.cassandra.transport.ProtocolVersion; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; @@ -856,7 +856,7 @@ public final class SchemaKeyspace .add("final_func", aggregate.finalFunction() != null ? aggregate.finalFunction().name().name : null) .add("initcond", aggregate.initialCondition() != null // must use the frozen state type here, as 'null' for unfrozen collections may mean 'empty' - ? aggregate.stateType().freeze().asCQL3Type().toCQLLiteral(aggregate.initialCondition(), Server.CURRENT_VERSION) + ? aggregate.stateType().freeze().asCQL3Type().toCQLLiteral(aggregate.initialCondition(), ProtocolVersion.CURRENT) : null); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/serializers/CollectionSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/serializers/CollectionSerializer.java b/src/java/org/apache/cassandra/serializers/CollectionSerializer.java index 3d6be67..95a0388 100644 --- a/src/java/org/apache/cassandra/serializers/CollectionSerializer.java +++ b/src/java/org/apache/cassandra/serializers/CollectionSerializer.java @@ -22,7 +22,7 @@ import java.nio.ByteBuffer; import java.util.Collection; import java.util.List; -import org.apache.cassandra.transport.Server; +import org.apache.cassandra.transport.ProtocolVersion; import org.apache.cassandra.utils.ByteBufferUtil; public abstract class CollectionSerializer<T> implements TypeSerializer<T> @@ -30,14 +30,14 @@ public abstract class CollectionSerializer<T> implements TypeSerializer<T> protected abstract List<ByteBuffer> serializeValues(T value); protected abstract int getElementCount(T value); - public abstract T deserializeForNativeProtocol(ByteBuffer buffer, int version); - public abstract void validateForNativeProtocol(ByteBuffer buffer, int version); + public abstract T deserializeForNativeProtocol(ByteBuffer buffer, ProtocolVersion version); + public abstract void validateForNativeProtocol(ByteBuffer buffer, ProtocolVersion version); public ByteBuffer serialize(T value) { List<ByteBuffer> values = serializeValues(value); // See deserialize() for why using the protocol v3 variant is the right thing to do. - return pack(values, getElementCount(value), Server.VERSION_3); + return pack(values, getElementCount(value), ProtocolVersion.V3); } public T deserialize(ByteBuffer bytes) @@ -47,16 +47,16 @@ public abstract class CollectionSerializer<T> implements TypeSerializer<T> // 1) when collections are frozen // 2) for internal calls. // In both case, using the protocol 3 version variant is the right thing to do. - return deserializeForNativeProtocol(bytes, Server.VERSION_3); + return deserializeForNativeProtocol(bytes, ProtocolVersion.V3); } public void validate(ByteBuffer bytes) throws MarshalException { // Same thing as above - validateForNativeProtocol(bytes, Server.VERSION_3); + validateForNativeProtocol(bytes, ProtocolVersion.V3); } - public static ByteBuffer pack(Collection<ByteBuffer> buffers, int elements, int version) + public static ByteBuffer pack(Collection<ByteBuffer> buffers, int elements, ProtocolVersion version) { int size = 0; for (ByteBuffer bb : buffers) @@ -69,22 +69,22 @@ public abstract class CollectionSerializer<T> implements TypeSerializer<T> return (ByteBuffer)result.flip(); } - protected static void writeCollectionSize(ByteBuffer output, int elements, int version) + protected static void writeCollectionSize(ByteBuffer output, int elements, ProtocolVersion version) { output.putInt(elements); } - public static int readCollectionSize(ByteBuffer input, int version) + public static int readCollectionSize(ByteBuffer input, ProtocolVersion version) { return input.getInt(); } - protected static int sizeOfCollectionSize(int elements, int version) + protected static int sizeOfCollectionSize(int elements, ProtocolVersion version) { return 4; } - public static void writeValue(ByteBuffer output, ByteBuffer value, int version) + public static void writeValue(ByteBuffer output, ByteBuffer value, ProtocolVersion version) { if (value == null) { @@ -96,7 +96,7 @@ public abstract class CollectionSerializer<T> implements TypeSerializer<T> output.put(value.duplicate()); } - public static ByteBuffer readValue(ByteBuffer input, int version) + public static ByteBuffer readValue(ByteBuffer input, ProtocolVersion version) { int size = input.getInt(); if (size < 0) @@ -105,7 +105,7 @@ public abstract class CollectionSerializer<T> implements TypeSerializer<T> return ByteBufferUtil.readBytes(input, size); } - public static int sizeOfValue(ByteBuffer value, int version) + public static int sizeOfValue(ByteBuffer value, ProtocolVersion version) { return value == null ? 4 : 4 + value.remaining(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/serializers/ListSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/serializers/ListSerializer.java b/src/java/org/apache/cassandra/serializers/ListSerializer.java index 3fd0803..44c33a6 100644 --- a/src/java/org/apache/cassandra/serializers/ListSerializer.java +++ b/src/java/org/apache/cassandra/serializers/ListSerializer.java @@ -18,7 +18,7 @@ package org.apache.cassandra.serializers; -import org.apache.cassandra.transport.Server; +import org.apache.cassandra.transport.ProtocolVersion; import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; @@ -60,7 +60,7 @@ public class ListSerializer<T> extends CollectionSerializer<List<T>> return value.size(); } - public void validateForNativeProtocol(ByteBuffer bytes, int version) + public void validateForNativeProtocol(ByteBuffer bytes, ProtocolVersion version) { try { @@ -78,7 +78,7 @@ public class ListSerializer<T> extends CollectionSerializer<List<T>> } } - public List<T> deserializeForNativeProtocol(ByteBuffer bytes, int version) + public List<T> deserializeForNativeProtocol(ByteBuffer bytes, ProtocolVersion version) { try { @@ -130,7 +130,7 @@ public class ListSerializer<T> extends CollectionSerializer<List<T>> try { ByteBuffer input = serializedList.duplicate(); - int n = readCollectionSize(input, Server.VERSION_3); + int n = readCollectionSize(input, ProtocolVersion.V3); if (n <= index) return null; @@ -139,7 +139,7 @@ public class ListSerializer<T> extends CollectionSerializer<List<T>> int length = input.getInt(); input.position(input.position() + length); } - return readValue(input, Server.VERSION_3); + return readValue(input, ProtocolVersion.V3); } catch (BufferUnderflowException e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/serializers/MapSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/serializers/MapSerializer.java b/src/java/org/apache/cassandra/serializers/MapSerializer.java index 67e5637..1722832 100644 --- a/src/java/org/apache/cassandra/serializers/MapSerializer.java +++ b/src/java/org/apache/cassandra/serializers/MapSerializer.java @@ -23,7 +23,7 @@ import java.nio.ByteBuffer; import java.util.*; import org.apache.cassandra.db.marshal.AbstractType; -import org.apache.cassandra.transport.Server; +import org.apache.cassandra.transport.ProtocolVersion; import org.apache.cassandra.utils.Pair; public class MapSerializer<K, V> extends CollectionSerializer<Map<K, V>> @@ -74,7 +74,7 @@ public class MapSerializer<K, V> extends CollectionSerializer<Map<K, V>> return value.size(); } - public void validateForNativeProtocol(ByteBuffer bytes, int version) + public void validateForNativeProtocol(ByteBuffer bytes, ProtocolVersion version) { try { @@ -94,7 +94,7 @@ public class MapSerializer<K, V> extends CollectionSerializer<Map<K, V>> } } - public Map<K, V> deserializeForNativeProtocol(ByteBuffer bytes, int version) + public Map<K, V> deserializeForNativeProtocol(ByteBuffer bytes, ProtocolVersion version) { try { @@ -141,11 +141,11 @@ public class MapSerializer<K, V> extends CollectionSerializer<Map<K, V>> try { ByteBuffer input = serializedMap.duplicate(); - int n = readCollectionSize(input, Server.VERSION_3); + int n = readCollectionSize(input, ProtocolVersion.V3); for (int i = 0; i < n; i++) { - ByteBuffer kbb = readValue(input, Server.VERSION_3); - ByteBuffer vbb = readValue(input, Server.VERSION_3); + ByteBuffer kbb = readValue(input, ProtocolVersion.V3); + ByteBuffer vbb = readValue(input, ProtocolVersion.V3); int comparison = keyType.compare(kbb, serializedKey); if (comparison == 0) return vbb; http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/serializers/SetSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/serializers/SetSerializer.java b/src/java/org/apache/cassandra/serializers/SetSerializer.java index da7744b..a440234 100644 --- a/src/java/org/apache/cassandra/serializers/SetSerializer.java +++ b/src/java/org/apache/cassandra/serializers/SetSerializer.java @@ -22,6 +22,8 @@ import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; import java.util.*; +import org.apache.cassandra.transport.ProtocolVersion; + public class SetSerializer<T> extends CollectionSerializer<Set<T>> { // interning instances @@ -61,7 +63,7 @@ public class SetSerializer<T> extends CollectionSerializer<Set<T>> return value.size(); } - public void validateForNativeProtocol(ByteBuffer bytes, int version) + public void validateForNativeProtocol(ByteBuffer bytes, ProtocolVersion version) { try { @@ -78,7 +80,7 @@ public class SetSerializer<T> extends CollectionSerializer<Set<T>> } } - public Set<T> deserializeForNativeProtocol(ByteBuffer bytes, int version) + public Set<T> deserializeForNativeProtocol(ByteBuffer bytes, ProtocolVersion version) { try { http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 4354c32..07eb1d8 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -91,6 +91,7 @@ import org.apache.cassandra.thrift.EndpointDetails; import org.apache.cassandra.thrift.TokenRange; import org.apache.cassandra.thrift.cassandraConstants; import org.apache.cassandra.tracing.TraceKeyspace; +import org.apache.cassandra.transport.ProtocolVersion; import org.apache.cassandra.utils.*; import org.apache.cassandra.utils.progress.ProgressEvent; import org.apache.cassandra.utils.progress.ProgressEventType; @@ -602,7 +603,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE logger.info("Cassandra version: {}", FBUtilities.getReleaseVersionString()); logger.info("Thrift API version: {}", cassandraConstants.VERSION); logger.info("CQL supported versions: {} (default: {})", - StringUtils.join(ClientState.getCQLSupportedVersion(), ","), ClientState.DEFAULT_CQL_VERSION); + StringUtils.join(ClientState.getCQLSupportedVersion(), ", "), ClientState.DEFAULT_CQL_VERSION); + logger.info("Native protocol supported versions: {} (default: {})", + StringUtils.join(ProtocolVersion.supportedVersions(), ", "), ProtocolVersion.CURRENT); try { http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java index d9b3632..22ddc84 100644 --- a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java +++ b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java @@ -23,12 +23,13 @@ import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.filter.DataLimits; import org.apache.cassandra.db.transform.Transformation; import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.transport.ProtocolVersion; abstract class AbstractQueryPager implements QueryPager { protected final ReadCommand command; protected final DataLimits limits; - protected final int protocolVersion; + protected final ProtocolVersion protocolVersion; private int remaining; @@ -40,7 +41,7 @@ abstract class AbstractQueryPager implements QueryPager private boolean exhausted; - protected AbstractQueryPager(ReadCommand command, int protocolVersion) + protected AbstractQueryPager(ReadCommand command, ProtocolVersion protocolVersion) { this.command = command; this.protocolVersion = protocolVersion; http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java index 75cc71f..da388d0 100644 --- a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java +++ b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java @@ -17,6 +17,7 @@ */ package org.apache.cassandra.service.pager; +import org.apache.cassandra.transport.ProtocolVersion; import org.apache.cassandra.utils.AbstractIterator; import java.util.Arrays; @@ -53,7 +54,7 @@ public class MultiPartitionPager implements QueryPager private int remaining; private int current; - public MultiPartitionPager(SinglePartitionReadCommand.Group group, PagingState state, int protocolVersion) + public MultiPartitionPager(SinglePartitionReadCommand.Group group, PagingState state, ProtocolVersion protocolVersion) { this.limit = group.limits(); this.nowInSec = group.nowInSec(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/service/pager/PagingState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/pager/PagingState.java b/src/java/org/apache/cassandra/service/pager/PagingState.java index 30e14c3..4a9ac39 100644 --- a/src/java/org/apache/cassandra/service/pager/PagingState.java +++ b/src/java/org/apache/cassandra/service/pager/PagingState.java @@ -33,7 +33,7 @@ import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.io.util.DataOutputBufferFixed; import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.transport.Server; +import org.apache.cassandra.transport.ProtocolVersion; import org.apache.cassandra.transport.ProtocolException; import org.apache.cassandra.utils.ByteBufferUtil; @@ -52,7 +52,7 @@ public class PagingState this.remainingInPartition = remainingInPartition; } - public static PagingState deserialize(ByteBuffer bytes, int protocolVersion) + public static PagingState deserialize(ByteBuffer bytes, ProtocolVersion protocolVersion) { if (bytes == null) return null; @@ -62,7 +62,7 @@ public class PagingState ByteBuffer pk; RowMark mark; int remaining, remainingInPartition; - if (protocolVersion <= Server.VERSION_3) + if (protocolVersion.isSmallerOrEqualTo(ProtocolVersion.V3)) { pk = ByteBufferUtil.readWithShortLength(in); mark = new RowMark(ByteBufferUtil.readWithShortLength(in), protocolVersion); @@ -91,14 +91,14 @@ public class PagingState } } - public ByteBuffer serialize(int protocolVersion) + public ByteBuffer serialize(ProtocolVersion protocolVersion) { assert rowMark == null || protocolVersion == rowMark.protocolVersion; try (DataOutputBuffer out = new DataOutputBufferFixed(serializedSize(protocolVersion))) { ByteBuffer pk = partitionKey == null ? ByteBufferUtil.EMPTY_BYTE_BUFFER : partitionKey; ByteBuffer mark = rowMark == null ? ByteBufferUtil.EMPTY_BYTE_BUFFER : rowMark.mark; - if (protocolVersion <= Server.VERSION_3) + if (protocolVersion.isSmallerOrEqualTo(ProtocolVersion.V3)) { ByteBufferUtil.writeWithShortLength(pk, out); ByteBufferUtil.writeWithShortLength(mark, out); @@ -120,12 +120,12 @@ public class PagingState } } - public int serializedSize(int protocolVersion) + public int serializedSize(ProtocolVersion protocolVersion) { assert rowMark == null || protocolVersion == rowMark.protocolVersion; ByteBuffer pk = partitionKey == null ? ByteBufferUtil.EMPTY_BYTE_BUFFER : partitionKey; ByteBuffer mark = rowMark == null ? ByteBufferUtil.EMPTY_BYTE_BUFFER : rowMark.mark; - if (protocolVersion <= Server.VERSION_3) + if (protocolVersion.isSmallerOrEqualTo(ProtocolVersion.V3)) { return ByteBufferUtil.serializedSizeWithShortLength(pk) + ByteBufferUtil.serializedSizeWithShortLength(mark) @@ -180,9 +180,9 @@ public class PagingState { // This can be null for convenience if no row is marked. private final ByteBuffer mark; - private final int protocolVersion; + private final ProtocolVersion protocolVersion; - private RowMark(ByteBuffer mark, int protocolVersion) + private RowMark(ByteBuffer mark, ProtocolVersion protocolVersion) { this.mark = mark; this.protocolVersion = protocolVersion; @@ -202,10 +202,10 @@ public class PagingState return l; } - public static RowMark create(CFMetaData metadata, Row row, int protocolVersion) + public static RowMark create(CFMetaData metadata, Row row, ProtocolVersion protocolVersion) { ByteBuffer mark; - if (protocolVersion <= Server.VERSION_3) + if (protocolVersion.isSmallerOrEqualTo(ProtocolVersion.V3)) { // We need to be backward compatible with 2.1/2.2 nodes paging states. Which means we have to send // the full cellname of the "last" cell in the row we get (since that's how 2.1/2.2 nodes will start after @@ -238,7 +238,7 @@ public class PagingState if (mark == null) return null; - return protocolVersion <= Server.VERSION_3 + return protocolVersion.isSmallerOrEqualTo(ProtocolVersion.V3) ? LegacyLayout.decodeClustering(metadata, mark) : Clustering.serializer.deserialize(mark, MessagingService.VERSION_30, makeClusteringTypes(metadata)); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java b/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java index 5a7cccf..5ba13a4 100644 --- a/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java +++ b/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java @@ -26,6 +26,7 @@ import org.apache.cassandra.dht.*; import org.apache.cassandra.exceptions.RequestExecutionException; import org.apache.cassandra.index.Index; import org.apache.cassandra.schema.IndexMetadata; +import org.apache.cassandra.transport.ProtocolVersion; /** * Pages a PartitionRangeReadCommand. @@ -38,7 +39,7 @@ public class PartitionRangeQueryPager extends AbstractQueryPager private volatile DecoratedKey lastReturnedKey; private volatile PagingState.RowMark lastReturnedRow; - public PartitionRangeQueryPager(PartitionRangeReadCommand command, PagingState state, int protocolVersion) + public PartitionRangeQueryPager(PartitionRangeReadCommand command, PagingState state, ProtocolVersion protocolVersion) { super(command, protocolVersion); @@ -51,7 +52,7 @@ public class PartitionRangeQueryPager extends AbstractQueryPager } public PartitionRangeQueryPager(ReadCommand command, - int protocolVersion, + ProtocolVersion protocolVersion, DecoratedKey lastReturnedKey, PagingState.RowMark lastReturnedRow, int remaining, http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/service/pager/QueryPagers.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/pager/QueryPagers.java b/src/java/org/apache/cassandra/service/pager/QueryPagers.java index 7fb4e70..15311ab 100644 --- a/src/java/org/apache/cassandra/service/pager/QueryPagers.java +++ b/src/java/org/apache/cassandra/service/pager/QueryPagers.java @@ -24,7 +24,7 @@ import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.exceptions.RequestExecutionException; import org.apache.cassandra.exceptions.RequestValidationException; import org.apache.cassandra.service.ClientState; -import org.apache.cassandra.transport.Server; +import org.apache.cassandra.transport.ProtocolVersion; /** * Static utility methods for paging. @@ -49,7 +49,7 @@ public class QueryPagers long queryStartNanoTime) throws RequestValidationException, RequestExecutionException { SinglePartitionReadCommand command = SinglePartitionReadCommand.create(isForThrift, metadata, nowInSec, columnFilter, RowFilter.NONE, limits, key, filter); - final SinglePartitionPager pager = new SinglePartitionPager(command, null, Server.CURRENT_VERSION); + final SinglePartitionPager pager = new SinglePartitionPager(command, null, ProtocolVersion.CURRENT); int count = 0; while (!pager.isExhausted()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java b/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java index 59b2a51..e400fb6 100644 --- a/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java +++ b/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java @@ -22,6 +22,7 @@ import java.nio.ByteBuffer; import org.apache.cassandra.db.*; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.filter.*; +import org.apache.cassandra.transport.ProtocolVersion; /** * Common interface to single partition queries (by slice and by name). @@ -34,7 +35,7 @@ public class SinglePartitionPager extends AbstractQueryPager private volatile PagingState.RowMark lastReturned; - public SinglePartitionPager(SinglePartitionReadCommand command, PagingState state, int protocolVersion) + public SinglePartitionPager(SinglePartitionReadCommand command, PagingState state, ProtocolVersion protocolVersion) { super(command, protocolVersion); this.command = command; @@ -47,7 +48,7 @@ public class SinglePartitionPager extends AbstractQueryPager } private SinglePartitionPager(SinglePartitionReadCommand command, - int protocolVersion, + ProtocolVersion protocolVersion, PagingState.RowMark rowMark, int remaining, int remainingInPartition) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/transport/CBCodec.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/CBCodec.java b/src/java/org/apache/cassandra/transport/CBCodec.java index 0ef619e..9b0847b 100644 --- a/src/java/org/apache/cassandra/transport/CBCodec.java +++ b/src/java/org/apache/cassandra/transport/CBCodec.java @@ -21,7 +21,7 @@ import io.netty.buffer.ByteBuf; public interface CBCodec<T> { - public T decode(ByteBuf body, int version); - public void encode(T t, ByteBuf dest, int version); - public int encodedSize(T t, int version); + public T decode(ByteBuf body, ProtocolVersion version); + public void encode(T t, ByteBuf dest, ProtocolVersion version); + public int encodedSize(T t, ProtocolVersion version); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/transport/CBUtil.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/CBUtil.java b/src/java/org/apache/cassandra/transport/CBUtil.java index 91b9cc7..66e5e73 100644 --- a/src/java/org/apache/cassandra/transport/CBUtil.java +++ b/src/java/org/apache/cassandra/transport/CBUtil.java @@ -391,12 +391,12 @@ public abstract class CBUtil return ByteBuffer.wrap(readRawBytes(slice)); } - public static ByteBuffer readBoundValue(ByteBuf cb, int protocolVersion) + public static ByteBuffer readBoundValue(ByteBuf cb, ProtocolVersion protocolVersion) { int length = cb.readInt(); if (length < 0) { - if (protocolVersion < Server.VERSION_4) // backward compatibility for pre-version 4 + if (protocolVersion.isSmallerThan(ProtocolVersion.V4)) // backward compatibility for pre-version 4 return null; if (length == -1) return null; @@ -454,7 +454,7 @@ public abstract class CBUtil return 4 + (valueSize < 0 ? 0 : valueSize); } - public static List<ByteBuffer> readValueList(ByteBuf cb, int protocolVersion) + public static List<ByteBuffer> readValueList(ByteBuf cb, ProtocolVersion protocolVersion) { int size = cb.readUnsignedShort(); if (size == 0) @@ -481,7 +481,7 @@ public abstract class CBUtil return size; } - public static Pair<List<String>, List<ByteBuffer>> readNameAndValueList(ByteBuf cb, int protocolVersion) + public static Pair<List<String>, List<ByteBuffer>> readNameAndValueList(ByteBuf cb, ProtocolVersion protocolVersion) { int size = cb.readUnsignedShort(); if (size == 0) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/transport/Client.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Client.java b/src/java/org/apache/cassandra/transport/Client.java index f6216e1..e428b06 100644 --- a/src/java/org/apache/cassandra/transport/Client.java +++ b/src/java/org/apache/cassandra/transport/Client.java @@ -43,7 +43,7 @@ public class Client extends SimpleClient { private final SimpleEventHandler eventHandler = new SimpleEventHandler(); - public Client(String host, int port, int version, ClientEncryptionOptions encryptionOptions) + public Client(String host, int port, ProtocolVersion version, ClientEncryptionOptions encryptionOptions) { super(host, port, version, encryptionOptions); setEventHandler(eventHandler); @@ -136,7 +136,7 @@ public class Client extends SimpleClient return null; } } - return new QueryMessage(query, QueryOptions.create(ConsistencyLevel.ONE, Collections.<ByteBuffer>emptyList(), false, pageSize, null, null)); + return new QueryMessage(query, QueryOptions.create(ConsistencyLevel.ONE, Collections.<ByteBuffer>emptyList(), false, pageSize, null, null, version)); } else if (msgType.equals("PREPARE")) { @@ -251,7 +251,7 @@ public class Client extends SimpleClient // Parse options. String host = args[0]; int port = Integer.parseInt(args[1]); - int version = args.length == 3 ? Integer.parseInt(args[2]) : Server.CURRENT_VERSION; + ProtocolVersion version = args.length == 3 ? ProtocolVersion.decode(Integer.parseInt(args[2])) : ProtocolVersion.CURRENT; ClientEncryptionOptions encryptionOptions = new ClientEncryptionOptions(); System.out.println("CQL binary protocol console " + host + "@" + port + " using native protocol version " + version); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/transport/Connection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Connection.java b/src/java/org/apache/cassandra/transport/Connection.java index af26557..a04a055 100644 --- a/src/java/org/apache/cassandra/transport/Connection.java +++ b/src/java/org/apache/cassandra/transport/Connection.java @@ -25,12 +25,12 @@ public class Connection static final AttributeKey<Connection> attributeKey = AttributeKey.valueOf("CONN"); private final Channel channel; - private final int version; + private final ProtocolVersion version; private final Tracker tracker; private volatile FrameCompressor frameCompressor; - public Connection(Channel channel, int version, Tracker tracker) + public Connection(Channel channel, ProtocolVersion version, Tracker tracker) { this.channel = channel; this.version = version; @@ -54,7 +54,7 @@ public class Connection return tracker; } - public int getVersion() + public ProtocolVersion getVersion() { return version; } @@ -66,7 +66,7 @@ public class Connection public interface Factory { - Connection newConnection(Channel channel, int version); + Connection newConnection(Channel channel, ProtocolVersion version); } public interface Tracker http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/transport/DataType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/DataType.java b/src/java/org/apache/cassandra/transport/DataType.java index eb1f1f4..aef3fa1 100644 --- a/src/java/org/apache/cassandra/transport/DataType.java +++ b/src/java/org/apache/cassandra/transport/DataType.java @@ -35,37 +35,37 @@ import org.apache.cassandra.utils.Pair; public enum DataType implements OptionCodec.Codecable<DataType> { - CUSTOM (0, null, 1), - ASCII (1, AsciiType.instance, 1), - BIGINT (2, LongType.instance, 1), - BLOB (3, BytesType.instance, 1), - BOOLEAN (4, BooleanType.instance, 1), - COUNTER (5, CounterColumnType.instance, 1), - DECIMAL (6, DecimalType.instance, 1), - DOUBLE (7, DoubleType.instance, 1), - FLOAT (8, FloatType.instance, 1), - INT (9, Int32Type.instance, 1), - TEXT (10, UTF8Type.instance, 1), - TIMESTAMP(11, TimestampType.instance, 1), - UUID (12, UUIDType.instance, 1), - VARCHAR (13, UTF8Type.instance, 1), - VARINT (14, IntegerType.instance, 1), - TIMEUUID (15, TimeUUIDType.instance, 1), - INET (16, InetAddressType.instance, 1), - DATE (17, SimpleDateType.instance, 4), - TIME (18, TimeType.instance, 4), - SMALLINT (19, ShortType.instance, 4), - BYTE (20, ByteType.instance, 4), - LIST (32, null, 1), - MAP (33, null, 1), - SET (34, null, 1), - UDT (48, null, 3), - TUPLE (49, null, 3); + CUSTOM (0, null, ProtocolVersion.V1), + ASCII (1, AsciiType.instance, ProtocolVersion.V1), + BIGINT (2, LongType.instance, ProtocolVersion.V1), + BLOB (3, BytesType.instance, ProtocolVersion.V1), + BOOLEAN (4, BooleanType.instance, ProtocolVersion.V1), + COUNTER (5, CounterColumnType.instance, ProtocolVersion.V1), + DECIMAL (6, DecimalType.instance, ProtocolVersion.V1), + DOUBLE (7, DoubleType.instance, ProtocolVersion.V1), + FLOAT (8, FloatType.instance, ProtocolVersion.V1), + INT (9, Int32Type.instance, ProtocolVersion.V1), + TEXT (10, UTF8Type.instance, ProtocolVersion.V1), + TIMESTAMP(11, TimestampType.instance, ProtocolVersion.V1), + UUID (12, UUIDType.instance, ProtocolVersion.V1), + VARCHAR (13, UTF8Type.instance, ProtocolVersion.V1), + VARINT (14, IntegerType.instance, ProtocolVersion.V1), + TIMEUUID (15, TimeUUIDType.instance, ProtocolVersion.V1), + INET (16, InetAddressType.instance, ProtocolVersion.V1), + DATE (17, SimpleDateType.instance, ProtocolVersion.V4), + TIME (18, TimeType.instance, ProtocolVersion.V4), + SMALLINT (19, ShortType.instance, ProtocolVersion.V4), + BYTE (20, ByteType.instance, ProtocolVersion.V4), + LIST (32, null, ProtocolVersion.V1), + MAP (33, null, ProtocolVersion.V1), + SET (34, null, ProtocolVersion.V1), + UDT (48, null, ProtocolVersion.V3), + TUPLE (49, null, ProtocolVersion.V3); public static final OptionCodec<DataType> codec = new OptionCodec<DataType>(DataType.class); private final int id; - private final int protocolVersion; + private final ProtocolVersion protocolVersion; private final AbstractType type; private final Pair<DataType, Object> pair; private static final Map<AbstractType, DataType> dataTypeMap = new HashMap<AbstractType, DataType>(); @@ -78,7 +78,7 @@ public enum DataType implements OptionCodec.Codecable<DataType> } } - DataType(int id, AbstractType type, int protocolVersion) + DataType(int id, AbstractType type, ProtocolVersion protocolVersion) { this.id = id; this.type = type; @@ -86,14 +86,14 @@ public enum DataType implements OptionCodec.Codecable<DataType> pair = Pair.create(this, null); } - public int getId(int version) + public int getId(ProtocolVersion version) { - if (version < protocolVersion) + if (version.isSmallerThan(protocolVersion)) return DataType.CUSTOM.getId(version); return id; } - public Object readValue(ByteBuf cb, int version) + public Object readValue(ByteBuf cb, ProtocolVersion version) { switch (this) { @@ -131,10 +131,10 @@ public enum DataType implements OptionCodec.Codecable<DataType> } } - public void writeValue(Object value, ByteBuf cb, int version) + public void writeValue(Object value, ByteBuf cb, ProtocolVersion version) { // Serialize as CUSTOM if client on the other side's version is < required for type - if (version < protocolVersion) + if (version.isSmallerThan(protocolVersion)) { CBUtil.writeString(value.toString(), cb); return; @@ -177,10 +177,10 @@ public enum DataType implements OptionCodec.Codecable<DataType> } } - public int serializedValueSize(Object value, int version) + public int serializedValueSize(Object value, ProtocolVersion version) { // Serialize as CUSTOM if client on the other side's version is < required for type - if (version < protocolVersion) + if (version.isSmallerThan(protocolVersion)) return CBUtil.sizeOfString(value.toString()); switch (this) @@ -219,7 +219,7 @@ public enum DataType implements OptionCodec.Codecable<DataType> } } - public static Pair<DataType, Object> fromType(AbstractType type, int version) + public static Pair<DataType, Object> fromType(AbstractType type, ProtocolVersion version) { // For CQL3 clients, ReversedType is an implementation detail and they // shouldn't have to care about it. @@ -251,10 +251,10 @@ public enum DataType implements OptionCodec.Codecable<DataType> throw new AssertionError(); } - if (type instanceof UserType && version >= UDT.protocolVersion) + if (type instanceof UserType && version.isGreaterOrEqualTo(UDT.protocolVersion)) return Pair.<DataType, Object>create(UDT, type); - if (type instanceof TupleType && version >= TUPLE.protocolVersion) + if (type instanceof TupleType && version.isGreaterOrEqualTo(TUPLE.protocolVersion)) return Pair.<DataType, Object>create(TUPLE, type); return Pair.<DataType, Object>create(CUSTOM, type.toString()); @@ -262,7 +262,7 @@ public enum DataType implements OptionCodec.Codecable<DataType> else { // Fall back to CUSTOM if target doesn't know this data type - if (version < dt.protocolVersion) + if (version.isSmallerThan(dt.protocolVersion)) return Pair.<DataType, Object>create(CUSTOM, type.toString()); return dt.pair; } @@ -298,7 +298,7 @@ public enum DataType implements OptionCodec.Codecable<DataType> } @VisibleForTesting - public int getProtocolVersion() + public ProtocolVersion getProtocolVersion() { return protocolVersion; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/transport/Event.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Event.java b/src/java/org/apache/cassandra/transport/Event.java index 1b72cbe..ed77e59 100644 --- a/src/java/org/apache/cassandra/transport/Event.java +++ b/src/java/org/apache/cassandra/transport/Event.java @@ -29,14 +29,14 @@ public abstract class Event { public enum Type { - TOPOLOGY_CHANGE(Server.VERSION_3), - STATUS_CHANGE(Server.VERSION_3), - SCHEMA_CHANGE(Server.VERSION_3), - TRACE_COMPLETE(Server.VERSION_4); + TOPOLOGY_CHANGE(ProtocolVersion.V3), + STATUS_CHANGE(ProtocolVersion.V3), + SCHEMA_CHANGE(ProtocolVersion.V3), + TRACE_COMPLETE(ProtocolVersion.V4); - public final int minimumVersion; + public final ProtocolVersion minimumVersion; - Type(int minimumVersion) + Type(ProtocolVersion minimumVersion) { this.minimumVersion = minimumVersion; } @@ -49,10 +49,10 @@ public abstract class Event this.type = type; } - public static Event deserialize(ByteBuf cb, int version) + public static Event deserialize(ByteBuf cb, ProtocolVersion version) { Type eventType = CBUtil.readEnumValue(Type.class, cb); - if (eventType.minimumVersion > version) + if (eventType.minimumVersion.isGreaterThan(version)) throw new ProtocolException("Event " + eventType.name() + " not valid for protocol version " + version); switch (eventType) { @@ -66,21 +66,21 @@ public abstract class Event throw new AssertionError(); } - public void serialize(ByteBuf dest, int version) + public void serialize(ByteBuf dest, ProtocolVersion version) { - if (type.minimumVersion > version) + if (type.minimumVersion.isGreaterThan(version)) throw new ProtocolException("Event " + type.name() + " not valid for protocol version " + version); CBUtil.writeEnumValue(type, dest); serializeEvent(dest, version); } - public int serializedSize(int version) + public int serializedSize(ProtocolVersion version) { return CBUtil.sizeOfEnumValue(type) + eventSerializedSize(version); } - protected abstract void serializeEvent(ByteBuf dest, int version); - protected abstract int eventSerializedSize(int version); + protected abstract void serializeEvent(ByteBuf dest, ProtocolVersion version); + protected abstract int eventSerializedSize(ProtocolVersion version); public static abstract class NodeEvent extends Event { @@ -126,20 +126,20 @@ public abstract class Event } // Assumes the type has already been deserialized - private static TopologyChange deserializeEvent(ByteBuf cb, int version) + private static TopologyChange deserializeEvent(ByteBuf cb, ProtocolVersion version) { Change change = CBUtil.readEnumValue(Change.class, cb); InetSocketAddress node = CBUtil.readInet(cb); return new TopologyChange(change, node); } - protected void serializeEvent(ByteBuf dest, int version) + protected void serializeEvent(ByteBuf dest, ProtocolVersion version) { CBUtil.writeEnumValue(change, dest); CBUtil.writeInet(node, dest); } - protected int eventSerializedSize(int version) + protected int eventSerializedSize(ProtocolVersion version) { return CBUtil.sizeOfEnumValue(change) + CBUtil.sizeOfInet(node); } @@ -192,20 +192,20 @@ public abstract class Event } // Assumes the type has already been deserialized - private static StatusChange deserializeEvent(ByteBuf cb, int version) + private static StatusChange deserializeEvent(ByteBuf cb, ProtocolVersion version) { Status status = CBUtil.readEnumValue(Status.class, cb); InetSocketAddress node = CBUtil.readInet(cb); return new StatusChange(status, node); } - protected void serializeEvent(ByteBuf dest, int version) + protected void serializeEvent(ByteBuf dest, ProtocolVersion version) { CBUtil.writeEnumValue(status, dest); CBUtil.writeInet(node, dest); } - protected int eventSerializedSize(int version) + protected int eventSerializedSize(ProtocolVersion version) { return CBUtil.sizeOfEnumValue(status) + CBUtil.sizeOfInet(node); } @@ -268,10 +268,10 @@ public abstract class Event } // Assumes the type has already been deserialized - public static SchemaChange deserializeEvent(ByteBuf cb, int version) + public static SchemaChange deserializeEvent(ByteBuf cb, ProtocolVersion version) { Change change = CBUtil.readEnumValue(Change.class, cb); - if (version >= Server.VERSION_3) + if (version.isGreaterOrEqualTo(ProtocolVersion.V3)) { Target target = CBUtil.readEnumValue(Target.class, cb); String keyspace = CBUtil.readString(cb); @@ -290,11 +290,11 @@ public abstract class Event } } - public void serializeEvent(ByteBuf dest, int version) + public void serializeEvent(ByteBuf dest, ProtocolVersion version) { if (target == Target.FUNCTION || target == Target.AGGREGATE) { - if (version >= Server.VERSION_4) + if (version.isGreaterOrEqualTo(ProtocolVersion.V4)) { // available since protocol version 4 CBUtil.writeEnumValue(change, dest); @@ -307,7 +307,7 @@ public abstract class Event { // not available in protocol versions < 4 - just say the keyspace was updated. CBUtil.writeEnumValue(Change.UPDATED, dest); - if (version >= 3) + if (version.isGreaterOrEqualTo(ProtocolVersion.V3)) CBUtil.writeEnumValue(Target.KEYSPACE, dest); CBUtil.writeString(keyspace, dest); CBUtil.writeString("", dest); @@ -315,7 +315,7 @@ public abstract class Event return; } - if (version >= Server.VERSION_3) + if (version.isGreaterOrEqualTo(ProtocolVersion.V3)) { CBUtil.writeEnumValue(change, dest); CBUtil.writeEnumValue(target, dest); @@ -342,17 +342,17 @@ public abstract class Event } } - public int eventSerializedSize(int version) + public int eventSerializedSize(ProtocolVersion version) { if (target == Target.FUNCTION || target == Target.AGGREGATE) { - if (version >= Server.VERSION_4) + if (version.isGreaterOrEqualTo(ProtocolVersion.V4)) return CBUtil.sizeOfEnumValue(change) + CBUtil.sizeOfEnumValue(target) + CBUtil.sizeOfString(keyspace) + CBUtil.sizeOfString(name) + CBUtil.sizeOfStringList(argTypes); - if (version >= Server.VERSION_3) + if (version.isGreaterOrEqualTo(ProtocolVersion.V3)) return CBUtil.sizeOfEnumValue(Change.UPDATED) + CBUtil.sizeOfEnumValue(Target.KEYSPACE) + CBUtil.sizeOfString(keyspace); @@ -361,7 +361,7 @@ public abstract class Event + CBUtil.sizeOfString(""); } - if (version >= Server.VERSION_3) + if (version.isGreaterOrEqualTo(ProtocolVersion.V3)) { int size = CBUtil.sizeOfEnumValue(change) + CBUtil.sizeOfEnumValue(target) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/transport/Frame.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Frame.java b/src/java/org/apache/cassandra/transport/Frame.java index f2f6174..6cd8b1e 100644 --- a/src/java/org/apache/cassandra/transport/Frame.java +++ b/src/java/org/apache/cassandra/transport/Frame.java @@ -67,7 +67,7 @@ public class Frame return body.release(); } - public static Frame create(Message.Type type, int streamId, int version, EnumSet<Header.Flag> flags, ByteBuf body) + public static Frame create(Message.Type type, int streamId, ProtocolVersion version, EnumSet<Header.Flag> flags, ByteBuf body) { Header header = new Header(version, flags, streamId, type); return new Frame(header, body); @@ -80,12 +80,12 @@ public class Frame public static final int BODY_LENGTH_SIZE = 4; - public final int version; + public final ProtocolVersion version; public final EnumSet<Flag> flags; public final int streamId; public final Message.Type type; - private Header(int version, EnumSet<Flag> flags, int streamId, Message.Type type) + private Header(ProtocolVersion version, EnumSet<Flag> flags, int streamId, Message.Type type) { this.version = version; this.flags = flags; @@ -93,7 +93,7 @@ public class Frame this.type = type; } - public static enum Flag + public enum Flag { // The order of that enum matters!! COMPRESSED, @@ -169,9 +169,8 @@ public class Frame // 1 and 2 use a shorter header, so we may never have a complete header's worth of bytes. int firstByte = buffer.getByte(idx++); Message.Direction direction = Message.Direction.extractFromVersion(firstByte); - int version = firstByte & PROTOCOL_VERSION_MASK; - if (version < Server.MIN_SUPPORTED_VERSION) - throw new ProtocolException(protocolVersionExceptionMessage(version), version); + int versionNum = firstByte & PROTOCOL_VERSION_MASK; + ProtocolVersion version = ProtocolVersion.decode(versionNum); // Wait until we have the complete header if (readableBytes < Header.LENGTH) @@ -179,17 +178,10 @@ public class Frame int flags = buffer.getByte(idx++); EnumSet<Header.Flag> decodedFlags = Header.Flag.deserialize(flags); - if (version > Server.CURRENT_VERSION) - { - if (version == Server.BETA_VERSION) - { - if (!decodedFlags.contains(Header.Flag.USE_BETA)) - throw new ProtocolException(String.format("Beta version of the protocol used (%d), but USE_BETA flag is unset", - version)); - } - else - throw new ProtocolException(protocolVersionExceptionMessage(version)); - } + + if (version.isBeta() && !decodedFlags.contains(Header.Flag.USE_BETA)) + throw new ProtocolException(String.format("Beta version of the protocol used (%s), but USE_BETA flag is unset", version), + version); int streamId = buffer.getShort(idx); idx += 2; @@ -227,7 +219,7 @@ public class Frame // extract body ByteBuf body = buffer.slice(idx, (int) bodyLength); body.retain(); - + idx += bodyLength; buffer.readerIndex(idx); @@ -243,7 +235,7 @@ public class Frame { throw ErrorMessage.wrap( new ProtocolException(String.format( - "Invalid message version. Got %d but previous messages on this connection had version %d", + "Invalid message version. Got %s but previous messages on this connection had version %s", version, connection.getVersion())), streamId); } @@ -251,12 +243,6 @@ public class Frame results.add(new Frame(new Header(version, decodedFlags, streamId, type), body)); } - private static String protocolVersionExceptionMessage(int version) - { - return String.format("Invalid or unsupported protocol version (%d); the lowest supported version is %d and the greatest is %d", - version, Server.MIN_SUPPORTED_VERSION, Server.CURRENT_VERSION); - } - private void fail() { // Reset to the initial state and throw the exception @@ -285,12 +271,12 @@ public class Frame ByteBuf header = CBUtil.allocator.buffer(Header.LENGTH); Message.Type type = frame.header.type; - header.writeByte(type.direction.addToVersion(frame.header.version)); + header.writeByte(type.direction.addToVersion(frame.header.version.asInt())); header.writeByte(Header.Flag.serialize(frame.header.flags)); // Continue to support writing pre-v3 headers so that we can give proper error messages to drivers that // connect with the v1/v2 protocol. See CASSANDRA-11464. - if (frame.header.version >= Server.VERSION_3) + if (frame.header.version.isGreaterOrEqualTo(ProtocolVersion.V3)) header.writeShort(frame.header.streamId); else header.writeByte(frame.header.streamId); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/transport/Message.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java index 6f3b0f8..fc8cd93 100644 --- a/src/java/org/apache/cassandra/transport/Message.java +++ b/src/java/org/apache/cassandra/transport/Message.java @@ -121,7 +121,7 @@ public abstract class Message } } - private Type(int opcode, Direction direction, Codec<?> codec) + Type(int opcode, Direction direction, Codec<?> codec) { this.opcode = opcode; this.direction = direction; @@ -150,7 +150,7 @@ public abstract class Message private int streamId; private Frame sourceFrame; private Map<String, ByteBuffer> customPayload; - protected Integer forcedProtocolVersion = null; + protected ProtocolVersion forcedProtocolVersion = null; protected Message(Type type) { @@ -275,7 +275,7 @@ public abstract class Message try { - if (isCustomPayload && frame.header.version < Server.VERSION_4) + if (isCustomPayload && frame.header.version.isSmallerThan(ProtocolVersion.V4)) throw new ProtocolException("Received frame with CUSTOM_PAYLOAD flag for native protocol version < 4"); Message message = frame.header.type.codec.decode(frame.body, frame.header.version); @@ -319,11 +319,8 @@ public abstract class Message { Connection connection = ctx.channel().attr(Connection.attributeKey).get(); // The only case the connection can be null is when we send the initial STARTUP message (client side thus) - int version = connection == null ? Server.CURRENT_VERSION : connection.getVersion(); - + ProtocolVersion version = connection == null ? ProtocolVersion.CURRENT : connection.getVersion(); EnumSet<Frame.Header.Flag> flags = EnumSet.noneOf(Frame.Header.Flag.class); - if (version == Server.BETA_VERSION) - flags.add(Frame.Header.Flag.USE_BETA); Codec<Message> codec = (Codec<Message>)message.type.codec; try @@ -339,13 +336,13 @@ public abstract class Message List<String> warnings = ((Response)message).getWarnings(); if (warnings != null) { - if (version < Server.VERSION_4) + if (version.isSmallerThan(ProtocolVersion.V4)) throw new ProtocolException("Must not send frame with WARNING flag for native protocol version < 4"); messageSize += CBUtil.sizeOfStringList(warnings); } if (customPayload != null) { - if (version < Server.VERSION_4) + if (version.isSmallerThan(ProtocolVersion.V4)) throw new ProtocolException("Must not send frame with CUSTOM_PAYLOAD flag for native protocol version < 4"); messageSize += CBUtil.sizeOfBytesMap(customPayload); } @@ -394,9 +391,13 @@ public abstract class Message // if the driver attempted to connect with a protocol version lower than the minimum supported // version, respond with a protocol error message with the correct frame header for that version - int responseVersion = message.forcedProtocolVersion == null + ProtocolVersion responseVersion = message.forcedProtocolVersion == null ? version : message.forcedProtocolVersion; + + if (responseVersion.isBeta()) + flags.add(Frame.Header.Flag.USE_BETA); + results.add(Frame.create(message.type, message.getStreamId(), responseVersion, flags, body)); } catch (Throwable e) @@ -507,7 +508,7 @@ public abstract class Message { assert request.connection() instanceof ServerConnection; connection = (ServerConnection)request.connection(); - if (connection.getVersion() >= Server.VERSION_4) + if (connection.getVersion().isGreaterOrEqualTo(ProtocolVersion.V4)) ClientWarn.instance.captureWarnings(); QueryState qstate = connection.validateNewMessage(request.type, connection.getVersion(), request.getStreamId()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/transport/OptionCodec.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/OptionCodec.java b/src/java/org/apache/cassandra/transport/OptionCodec.java index 3a8b813..cdfadf6 100644 --- a/src/java/org/apache/cassandra/transport/OptionCodec.java +++ b/src/java/org/apache/cassandra/transport/OptionCodec.java @@ -30,11 +30,11 @@ public class OptionCodec<T extends Enum<T> & OptionCodec.Codecable<T>> { public interface Codecable<T extends Enum<T>> { - public int getId(int version); + public int getId(ProtocolVersion version); - public Object readValue(ByteBuf cb, int version); - public void writeValue(Object value, ByteBuf cb, int version); - public int serializedValueSize(Object obj, int version); + public Object readValue(ByteBuf cb, ProtocolVersion version); + public void writeValue(Object value, ByteBuf cb, ProtocolVersion version); + public int serializedValueSize(Object obj, ProtocolVersion version); } private final Class<T> klass; @@ -48,13 +48,13 @@ public class OptionCodec<T extends Enum<T> & OptionCodec.Codecable<T>> T[] values = klass.getEnumConstants(); int maxId = -1; for (T opt : values) - maxId = Math.max(maxId, opt.getId(Server.CURRENT_VERSION)); + maxId = Math.max(maxId, opt.getId(ProtocolVersion.CURRENT)); ids = (T[])Array.newInstance(klass, maxId + 1); for (T opt : values) { - if (ids[opt.getId(Server.CURRENT_VERSION)] != null) - throw new IllegalStateException(String.format("Duplicate option id %d", opt.getId(Server.CURRENT_VERSION))); - ids[opt.getId(Server.CURRENT_VERSION)] = opt; + if (ids[opt.getId(ProtocolVersion.CURRENT)] != null) + throw new IllegalStateException(String.format("Duplicate option id %d", opt.getId(ProtocolVersion.CURRENT))); + ids[opt.getId(ProtocolVersion.CURRENT)] = opt; } } @@ -66,7 +66,7 @@ public class OptionCodec<T extends Enum<T> & OptionCodec.Codecable<T>> return opt; } - public Map<T, Object> decode(ByteBuf body, int version) + public Map<T, Object> decode(ByteBuf body, ProtocolVersion version) { EnumMap<T, Object> options = new EnumMap<T, Object>(klass); int n = body.readUnsignedShort(); @@ -81,7 +81,7 @@ public class OptionCodec<T extends Enum<T> & OptionCodec.Codecable<T>> return options; } - public ByteBuf encode(Map<T, Object> options, int version) + public ByteBuf encode(Map<T, Object> options, ProtocolVersion version) { int optLength = 2; for (Map.Entry<T, Object> entry : options.entrySet()) @@ -97,14 +97,14 @@ public class OptionCodec<T extends Enum<T> & OptionCodec.Codecable<T>> return cb; } - public Pair<T, Object> decodeOne(ByteBuf body, int version) + public Pair<T, Object> decodeOne(ByteBuf body, ProtocolVersion version) { T opt = fromId(body.readUnsignedShort()); Object value = opt.readValue(body, version); return Pair.create(opt, value); } - public void writeOne(Pair<T, Object> option, ByteBuf dest, int version) + public void writeOne(Pair<T, Object> option, ByteBuf dest, ProtocolVersion version) { T opt = option.left; Object obj = option.right; @@ -112,7 +112,7 @@ public class OptionCodec<T extends Enum<T> & OptionCodec.Codecable<T>> opt.writeValue(obj, dest, version); } - public int oneSerializedSize(Pair<T, Object> option, int version) + public int oneSerializedSize(Pair<T, Object> option, ProtocolVersion version) { T opt = option.left; Object obj = option.right; http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/transport/ProtocolException.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/ProtocolException.java b/src/java/org/apache/cassandra/transport/ProtocolException.java index 9d8c270..6ef17ac 100644 --- a/src/java/org/apache/cassandra/transport/ProtocolException.java +++ b/src/java/org/apache/cassandra/transport/ProtocolException.java @@ -25,19 +25,17 @@ import org.apache.cassandra.exceptions.TransportException; */ public class ProtocolException extends RuntimeException implements TransportException { - private final Integer attemptedLowProtocolVersion; + private final ProtocolVersion forcedProtocolVersion; public ProtocolException(String msg) { this(msg, null); } - public ProtocolException(String msg, Integer attemptedLowProtocolVersion) + public ProtocolException(String msg, ProtocolVersion forcedProtocolVersion) { super(msg); - assert attemptedLowProtocolVersion == null || attemptedLowProtocolVersion < Server.MIN_SUPPORTED_VERSION; - - this.attemptedLowProtocolVersion = attemptedLowProtocolVersion; + this.forcedProtocolVersion = forcedProtocolVersion; } public ExceptionCode code() @@ -45,13 +43,8 @@ public class ProtocolException extends RuntimeException implements TransportExce return ExceptionCode.PROTOCOL_ERROR; } - /** - * If the ProtocolException is due to a connection being made with a protocol version that is lower - * than Server.MIN_SUPPORTED_VERSION, this will return that unsupported protocol version. Otherwise, - * null is returned. - */ - public Integer getAttemptedLowProtocolVersion() + public ProtocolVersion getForcedProtocolVersion() { - return attemptedLowProtocolVersion; + return forcedProtocolVersion; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/transport/ProtocolVersion.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/ProtocolVersion.java b/src/java/org/apache/cassandra/transport/ProtocolVersion.java new file mode 100644 index 0000000..cd73c86 --- /dev/null +++ b/src/java/org/apache/cassandra/transport/ProtocolVersion.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.transport; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.EnumSet; +import java.util.List; +import java.util.Optional; + +import org.apache.commons.lang3.ArrayUtils; + +/** + * The native (CQL binary) protocol version. + * + * Some versions may be in beta, which means that the client must + * specify the beta flag in the frame for the version to be considered valid. + * Beta versions must have the word "beta" in their description, this is mandated + * by the specs. + * + */ +public enum ProtocolVersion implements Comparable<ProtocolVersion> +{ + // The order is important as it defines the chronological history of versions, which is used + // to determine if a feature is supported or some serdes formats + V1(1, "v1", false), // no longer supported + V2(2, "v2", false), // no longer supported + V3(3, "v3", false), + V4(4, "v4", false), + V5(5, "v5-beta", true); + + /** The version number */ + private final int num; + + /** A description of the version, beta versions should have the word "-beta" */ + private final String descr; + + /** Set this to true for beta versions */ + private final boolean beta; + + ProtocolVersion(int num, String descr, boolean beta) + { + this.num = num; + this.descr = descr; + this.beta = beta; + } + + /** The supported versions stored as an array, these should be private and are required for fast decoding*/ + private final static ProtocolVersion[] SUPPORTED_VERSIONS = new ProtocolVersion[] { V3, V4, V5 }; + final static ProtocolVersion MIN_SUPPORTED_VERSION = SUPPORTED_VERSIONS[0]; + final static ProtocolVersion MAX_SUPPORTED_VERSION = SUPPORTED_VERSIONS[SUPPORTED_VERSIONS.length - 1]; + + /** All supported versions, published as an enumset */ + public final static EnumSet<ProtocolVersion> SUPPORTED = EnumSet.copyOf(Arrays.asList((ProtocolVersion[]) ArrayUtils.addAll(SUPPORTED_VERSIONS))); + + /** Old unsupported versions, this is OK as long as we never add newer unsupported versions */ + public final static EnumSet<ProtocolVersion> UNSUPPORTED = EnumSet.complementOf(SUPPORTED); + + /** The preferred versions */ + public final static ProtocolVersion CURRENT = V4; + public final static Optional<ProtocolVersion> BETA = Optional.of(V5); + + public static List<String> supportedVersions() + { + List<String> ret = new ArrayList<>(SUPPORTED.size()); + for (ProtocolVersion version : SUPPORTED) + ret.add(version.toString()); + return ret; + } + + public static ProtocolVersion decode(int versionNum) + { + ProtocolVersion ret = versionNum >= MIN_SUPPORTED_VERSION.num && versionNum <= MAX_SUPPORTED_VERSION.num + ? SUPPORTED_VERSIONS[versionNum - MIN_SUPPORTED_VERSION.num] + : null; + + if (ret == null) + { + // if this is not a supported version check the old versions + for (ProtocolVersion version : UNSUPPORTED) + { + // if it is an old version that is no longer supported this ensures that we reply + // with that same version + if (version.num == versionNum) + throw new ProtocolException(ProtocolVersion.invalidVersionMessage(versionNum), version); + } + + // If the version is invalid reply with the highest version that we support + throw new ProtocolException(invalidVersionMessage(versionNum), MAX_SUPPORTED_VERSION); + } + + return ret; + } + + public boolean isBeta() + { + return beta; + } + + public static String invalidVersionMessage(int version) + { + return String.format("Invalid or unsupported protocol version (%d); supported versions are (%s)", + version, String.join(", ", ProtocolVersion.supportedVersions())); + } + + public int asInt() + { + return num; + } + + @Override + public String toString() + { + // This format is mandated by the protocl specs for the SUPPORTED message, see OptionsMessage execute(). + return String.format("%d/%s", num, descr); + } + + public final boolean isGreaterThan(ProtocolVersion other) + { + return num > other.num; + } + + public final boolean isGreaterOrEqualTo(ProtocolVersion other) + { + return num >= other.num; + } + + public final boolean isSmallerThan(ProtocolVersion other) + { + return num < other.num; + } + + public final boolean isSmallerOrEqualTo(ProtocolVersion other) + { + return num <= other.num; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/transport/Server.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java index 267d532..1eeecac 100644 --- a/src/java/org/apache/cassandra/transport/Server.java +++ b/src/java/org/apache/cassandra/transport/Server.java @@ -64,18 +64,11 @@ public class Server implements CassandraDaemon.Server private static final Logger logger = LoggerFactory.getLogger(Server.class); private static final boolean useEpoll = NativeTransportService.useEpoll(); - public static final int VERSION_3 = 3; - public static final int VERSION_4 = 4; - public static final int VERSION_5 = 5; - public static final int CURRENT_VERSION = VERSION_4; - public static final int BETA_VERSION = VERSION_5; - public static final int MIN_SUPPORTED_VERSION = VERSION_3; - private final ConnectionTracker connectionTracker = new ConnectionTracker(); private final Connection.Factory connectionFactory = new Connection.Factory() { - public Connection newConnection(Channel channel, int version) + public Connection newConnection(Channel channel, ProtocolVersion version) { return new ServerConnection(channel, version, connectionTracker); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/transport/ServerConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/ServerConnection.java b/src/java/org/apache/cassandra/transport/ServerConnection.java index 1ef6c73..9374ca0 100644 --- a/src/java/org/apache/cassandra/transport/ServerConnection.java +++ b/src/java/org/apache/cassandra/transport/ServerConnection.java @@ -36,7 +36,7 @@ public class ServerConnection extends Connection private final ConcurrentMap<Integer, QueryState> queryStates = new ConcurrentHashMap<>(); - public ServerConnection(Channel channel, int version, Connection.Tracker tracker) + public ServerConnection(Channel channel, ProtocolVersion version, Connection.Tracker tracker) { super(channel, version, tracker); this.clientState = ClientState.forExternalCalls(channel.remoteAddress()); @@ -56,7 +56,7 @@ public class ServerConnection extends Connection return qState; } - public QueryState validateNewMessage(Message.Type type, int version, int streamId) + public QueryState validateNewMessage(Message.Type type, ProtocolVersion version, int streamId) { switch (state) { @@ -67,7 +67,7 @@ public class ServerConnection extends Connection case AUTHENTICATION: // Support both SASL auth from protocol v2 and the older style Credentials auth from v1 if (type != Message.Type.AUTH_RESPONSE && type != Message.Type.CREDENTIALS) - throw new ProtocolException(String.format("Unexpected message %s, expecting %s", type, version == 1 ? "CREDENTIALS" : "SASL_RESPONSE")); + throw new ProtocolException(String.format("Unexpected message %s, expecting %s", type, version == ProtocolVersion.V1 ? "CREDENTIALS" : "SASL_RESPONSE")); break; case READY: if (type == Message.Type.STARTUP) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/transport/SimpleClient.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java b/src/java/org/apache/cassandra/transport/SimpleClient.java index 4d8a30b..1bb081b 100644 --- a/src/java/org/apache/cassandra/transport/SimpleClient.java +++ b/src/java/org/apache/cassandra/transport/SimpleClient.java @@ -75,7 +75,7 @@ public class SimpleClient implements Closeable protected final ResponseHandler responseHandler = new ResponseHandler(); protected final Connection.Tracker tracker = new ConnectionTracker(); - protected final int version; + protected final ProtocolVersion version; // We don't track connection really, so we don't need one Connection per channel protected Connection connection; protected Bootstrap bootstrap; @@ -84,32 +84,32 @@ public class SimpleClient implements Closeable private final Connection.Factory connectionFactory = new Connection.Factory() { - public Connection newConnection(Channel channel, int version) + public Connection newConnection(Channel channel, ProtocolVersion version) { return connection; } }; - public SimpleClient(String host, int port, int version, ClientEncryptionOptions encryptionOptions) + public SimpleClient(String host, int port, ProtocolVersion version, ClientEncryptionOptions encryptionOptions) { this(host, port, version, false, encryptionOptions); } public SimpleClient(String host, int port, ClientEncryptionOptions encryptionOptions) { - this(host, port, Server.CURRENT_VERSION, encryptionOptions); + this(host, port, ProtocolVersion.CURRENT, encryptionOptions); } - public SimpleClient(String host, int port, int version) + public SimpleClient(String host, int port, ProtocolVersion version) { this(host, port, version, new ClientEncryptionOptions()); } - public SimpleClient(String host, int port, int version, boolean useBeta, ClientEncryptionOptions encryptionOptions) + public SimpleClient(String host, int port, ProtocolVersion version, boolean useBeta, ClientEncryptionOptions encryptionOptions) { this.host = host; this.port = port; - if (version == Server.BETA_VERSION && !useBeta) + if (version.isBeta() && !useBeta) throw new IllegalArgumentException(String.format("Beta version of server used (%s), but USE_BETA flag is not set", version)); this.version = version; http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/transport/messages/AuthChallenge.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/AuthChallenge.java b/src/java/org/apache/cassandra/transport/messages/AuthChallenge.java index 15a9a9a..fda83f9 100644 --- a/src/java/org/apache/cassandra/transport/messages/AuthChallenge.java +++ b/src/java/org/apache/cassandra/transport/messages/AuthChallenge.java @@ -20,6 +20,7 @@ package org.apache.cassandra.transport.messages; import org.apache.cassandra.transport.CBUtil; import org.apache.cassandra.transport.Message; import io.netty.buffer.ByteBuf; +import org.apache.cassandra.transport.ProtocolVersion; import java.nio.ByteBuffer; @@ -30,7 +31,7 @@ public class AuthChallenge extends Message.Response { public static final Message.Codec<AuthChallenge> codec = new Message.Codec<AuthChallenge>() { - public AuthChallenge decode(ByteBuf body, int version) + public AuthChallenge decode(ByteBuf body, ProtocolVersion version) { ByteBuffer b = CBUtil.readValue(body); byte[] token = new byte[b.remaining()]; @@ -38,12 +39,12 @@ public class AuthChallenge extends Message.Response return new AuthChallenge(token); } - public void encode(AuthChallenge challenge, ByteBuf dest, int version) + public void encode(AuthChallenge challenge, ByteBuf dest, ProtocolVersion version) { CBUtil.writeValue(challenge.token, dest); } - public int encodedSize(AuthChallenge challenge, int version) + public int encodedSize(AuthChallenge challenge, ProtocolVersion version) { return CBUtil.sizeOfValue(challenge.token); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/transport/messages/AuthResponse.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/AuthResponse.java b/src/java/org/apache/cassandra/transport/messages/AuthResponse.java index e90f740..332b024 100644 --- a/src/java/org/apache/cassandra/transport/messages/AuthResponse.java +++ b/src/java/org/apache/cassandra/transport/messages/AuthResponse.java @@ -36,9 +36,9 @@ public class AuthResponse extends Message.Request { public static final Message.Codec<AuthResponse> codec = new Message.Codec<AuthResponse>() { - public AuthResponse decode(ByteBuf body, int version) + public AuthResponse decode(ByteBuf body, ProtocolVersion version) { - if (version == 1) + if (version == ProtocolVersion.V1) throw new ProtocolException("SASL Authentication is not supported in version 1 of the protocol"); ByteBuffer b = CBUtil.readValue(body); @@ -47,12 +47,12 @@ public class AuthResponse extends Message.Request return new AuthResponse(token); } - public void encode(AuthResponse response, ByteBuf dest, int version) + public void encode(AuthResponse response, ByteBuf dest, ProtocolVersion version) { CBUtil.writeValue(response.token, dest); } - public int encodedSize(AuthResponse response, int version) + public int encodedSize(AuthResponse response, ProtocolVersion version) { return CBUtil.sizeOfValue(response.token); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/transport/messages/AuthSuccess.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/AuthSuccess.java b/src/java/org/apache/cassandra/transport/messages/AuthSuccess.java index 8c1b5b1..b8ed7f0 100644 --- a/src/java/org/apache/cassandra/transport/messages/AuthSuccess.java +++ b/src/java/org/apache/cassandra/transport/messages/AuthSuccess.java @@ -20,6 +20,7 @@ package org.apache.cassandra.transport.messages; import org.apache.cassandra.transport.CBUtil; import org.apache.cassandra.transport.Message; import io.netty.buffer.ByteBuf; +import org.apache.cassandra.transport.ProtocolVersion; import java.nio.ByteBuffer; @@ -33,7 +34,7 @@ public class AuthSuccess extends Message.Response { public static final Message.Codec<AuthSuccess> codec = new Message.Codec<AuthSuccess>() { - public AuthSuccess decode(ByteBuf body, int version) + public AuthSuccess decode(ByteBuf body, ProtocolVersion version) { ByteBuffer b = CBUtil.readValue(body); byte[] token = null; @@ -45,12 +46,12 @@ public class AuthSuccess extends Message.Response return new AuthSuccess(token); } - public void encode(AuthSuccess success, ByteBuf dest, int version) + public void encode(AuthSuccess success, ByteBuf dest, ProtocolVersion version) { CBUtil.writeValue(success.token, dest); } - public int encodedSize(AuthSuccess success, int version) + public int encodedSize(AuthSuccess success, ProtocolVersion version) { return CBUtil.sizeOfValue(success.token); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java b/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java index 230f0f2..1261083 100644 --- a/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java @@ -21,6 +21,7 @@ import io.netty.buffer.ByteBuf; import org.apache.cassandra.transport.CBUtil; import org.apache.cassandra.transport.Message; +import org.apache.cassandra.transport.ProtocolVersion; /** * Message to indicate that the server is ready to receive requests. @@ -29,18 +30,18 @@ public class AuthenticateMessage extends Message.Response { public static final Message.Codec<AuthenticateMessage> codec = new Message.Codec<AuthenticateMessage>() { - public AuthenticateMessage decode(ByteBuf body, int version) + public AuthenticateMessage decode(ByteBuf body, ProtocolVersion version) { String authenticator = CBUtil.readString(body); return new AuthenticateMessage(authenticator); } - public void encode(AuthenticateMessage msg, ByteBuf dest, int version) + public void encode(AuthenticateMessage msg, ByteBuf dest, ProtocolVersion version) { CBUtil.writeString(msg.authenticator, dest); } - public int encodedSize(AuthenticateMessage msg, int version) + public int encodedSize(AuthenticateMessage msg, ProtocolVersion version) { return CBUtil.sizeOfString(msg.authenticator); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/transport/messages/BatchMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java index 6675565..bb6411f 100644 --- a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java @@ -43,7 +43,7 @@ public class BatchMessage extends Message.Request { public static final Message.Codec<BatchMessage> codec = new Message.Codec<BatchMessage>() { - public BatchMessage decode(ByteBuf body, int version) + public BatchMessage decode(ByteBuf body, ProtocolVersion version) { byte type = body.readByte(); int n = body.readUnsignedShort(); @@ -65,7 +65,7 @@ public class BatchMessage extends Message.Request return new BatchMessage(toType(type), queryOrIds, variables, options); } - public void encode(BatchMessage msg, ByteBuf dest, int version) + public void encode(BatchMessage msg, ByteBuf dest, ProtocolVersion version) { int queries = msg.queryOrIdList.size(); @@ -84,13 +84,13 @@ public class BatchMessage extends Message.Request CBUtil.writeValueList(msg.values.get(i), dest); } - if (version < Server.VERSION_3) + if (version.isSmallerThan(ProtocolVersion.V3)) CBUtil.writeConsistencyLevel(msg.options.getConsistency(), dest); else QueryOptions.codec.encode(msg.options, dest, version); } - public int encodedSize(BatchMessage msg, int version) + public int encodedSize(BatchMessage msg, ProtocolVersion version) { int size = 3; // type + nb queries for (int i = 0; i < msg.queryOrIdList.size(); i++) @@ -102,7 +102,7 @@ public class BatchMessage extends Message.Request size += CBUtil.sizeOfValueList(msg.values.get(i)); } - size += version < Server.VERSION_3 + size += version.isSmallerThan(ProtocolVersion.V3) ? CBUtil.sizeOfConsistencyLevel(msg.options.getConsistency()) : QueryOptions.codec.encodedSize(msg.options, version); return size;