Repository: cassandra Updated Branches: refs/heads/trunk ed9343edf -> 288f2cf4f
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java b/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java index 4baf6a3..bb2fbf1 100644 --- a/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java +++ b/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java @@ -103,38 +103,6 @@ public abstract class AbstractCompositeType extends AbstractType<ByteBuffer> return l.toArray(new ByteBuffer[l.size()]); } - public static class CompositeComponent - { - public AbstractType<?> comparator; - public ByteBuffer value; - - public CompositeComponent( AbstractType<?> comparator, ByteBuffer value ) - { - this.comparator = comparator; - this.value = value; - } - } - - public List<CompositeComponent> deconstruct( ByteBuffer bytes ) - { - List<CompositeComponent> list = new ArrayList<CompositeComponent>(); - - ByteBuffer bb = bytes.duplicate(); - readIsStatic(bb); - int i = 0; - - while (bb.remaining() > 0) - { - AbstractType comparator = getComparator(i, bb); - ByteBuffer value = ByteBufferUtil.readBytesWithShortLength(bb); - - list.add( new CompositeComponent(comparator,value) ); - - byte b = bb.get(); // Ignore; not relevant here - ++i; - } - return list; - } /* * Escapes all occurences of the ':' character from the input, replacing them by "\:". http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/marshal/CompositeType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/marshal/CompositeType.java b/src/java/org/apache/cassandra/db/marshal/CompositeType.java index 01eb58f..633a994 100644 --- a/src/java/org/apache/cassandra/db/marshal/CompositeType.java +++ b/src/java/org/apache/cassandra/db/marshal/CompositeType.java @@ -218,6 +218,32 @@ public class CompositeType extends AbstractCompositeType return null; } + public static class CompositeComponent + { + public ByteBuffer value; + public byte eoc; + + public CompositeComponent(ByteBuffer value, byte eoc) + { + this.value = value; + this.eoc = eoc; + } + } + + public static List<CompositeComponent> deconstruct(ByteBuffer bytes) + { + List<CompositeComponent> list = new ArrayList<>(); + ByteBuffer bb = bytes.duplicate(); + readStatic(bb); + while (bb.remaining() > 0) + { + ByteBuffer value = ByteBufferUtil.readBytesWithShortLength(bb); + byte eoc = bb.get(); + list.add(new CompositeComponent(value, eoc)); + } + return list; + } + // Extract CQL3 column name from the full column name. public ByteBuffer extractLastComponent(ByteBuffer bb) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/partitions/AbstractThreadUnsafePartition.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/AbstractThreadUnsafePartition.java b/src/java/org/apache/cassandra/db/partitions/AbstractThreadUnsafePartition.java index acdd0e2..0b218f5 100644 --- a/src/java/org/apache/cassandra/db/partitions/AbstractThreadUnsafePartition.java +++ b/src/java/org/apache/cassandra/db/partitions/AbstractThreadUnsafePartition.java @@ -189,7 +189,7 @@ public abstract class AbstractThreadUnsafePartition implements Partition, Iterab return sliceableUnfilteredIterator(ColumnFilter.all(metadata()), false); } - protected SliceableUnfilteredRowIterator sliceableUnfilteredIterator(ColumnFilter selection, boolean reversed) + public SliceableUnfilteredRowIterator sliceableUnfilteredIterator(ColumnFilter selection, boolean reversed) { return new SliceableIterator(this, selection, reversed); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java index f2e0617..bb73929 100644 --- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java +++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java @@ -23,12 +23,15 @@ import java.util.*; import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.db.*; +import org.apache.cassandra.db.context.CounterContext; import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.io.util.DataInputBuffer; @@ -36,9 +39,10 @@ import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.serializers.MarshalException; +import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.MergeIterator; +import org.apache.cassandra.utils.Pair; /** * Stores updates made on a partition. @@ -494,7 +498,7 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition } @Override - protected SliceableUnfilteredRowIterator sliceableUnfilteredIterator(ColumnFilter columns, boolean reversed) + public SliceableUnfilteredRowIterator sliceableUnfilteredIterator(ColumnFilter columns, boolean reversed) { maybeBuild(); return super.sliceableUnfilteredIterator(columns, reversed); @@ -503,7 +507,7 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition /** * Validates the data contained in this update. * - * @throws MarshalException if some of the data contained in this update is corrupted. + * @throws org.apache.cassandra.serializers.MarshalException if some of the data contained in this update is corrupted. */ public void validate() { @@ -701,37 +705,19 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition { public void serialize(PartitionUpdate update, DataOutputPlus out, int version) throws IOException { - if (version < MessagingService.VERSION_30) - { - // TODO - throw new UnsupportedOperationException(); - - // if (cf == null) - // { - // out.writeBoolean(false); - // return; - // } - - // out.writeBoolean(true); - // serializeCfId(cf.id(), out, version); - // cf.getComparator().deletionInfoSerializer().serialize(cf.deletionInfo(), out, version); - // ColumnSerializer columnSerializer = cf.getComparator().columnSerializer(); - // int count = cf.getColumnCount(); - // out.writeInt(count); - // int written = 0; - // for (Cell cell : cf) - // { - // columnSerializer.serialize(cell, out); - // written++; - // } - // assert count == written: "Table had " + count + " columns, but " + written + " written"; - } - - CFMetaData.serializer.serialize(update.metadata(), out, version); try (UnfilteredRowIterator iter = update.sliceableUnfilteredIterator()) { assert !iter.isReverseOrder(); - UnfilteredRowIteratorSerializer.serializer.serialize(iter, out, version, update.rows.size()); + + if (version < MessagingService.VERSION_30) + { + LegacyLayout.serializeAsLegacyPartition(iter, out, version); + } + else + { + CFMetaData.serializer.serialize(update.metadata(), out, version); + UnfilteredRowIteratorSerializer.serializer.serialize(iter, out, version, update.rows.size()); + } } } @@ -745,9 +731,7 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition else { assert key != null; - CFMetaData metadata = deserializeMetadata(in, version); - DecoratedKey dk = metadata.decorateKey(key); - return deserializePre30(in, version, flag, metadata, dk); + return deserializePre30(in, version, flag, key); } } @@ -761,8 +745,7 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition else { assert key != null; - CFMetaData metadata = deserializeMetadata(in, version); - return deserializePre30(in, version, flag, metadata, key); + return deserializePre30(in, version, flag, key.getKey()); } } @@ -802,48 +785,22 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition false); } - private static CFMetaData deserializeMetadata(DataInputPlus in, int version) throws IOException + private static PartitionUpdate deserializePre30(DataInputPlus in, int version, SerializationHelper.Flag flag, ByteBuffer key) throws IOException { - // This is only used in mutation, and mutation have never allowed "null" column families - boolean present = in.readBoolean(); - assert present; - - CFMetaData metadata = CFMetaData.serializer.deserialize(in, version); - return metadata; - } - - private static PartitionUpdate deserializePre30(DataInputPlus in, int version, SerializationHelper.Flag flag, CFMetaData metadata, DecoratedKey dk) throws IOException - { - LegacyLayout.LegacyDeletionInfo info = LegacyLayout.LegacyDeletionInfo.serializer.deserialize(metadata, in, version); - int size = in.readInt(); - Iterator<LegacyLayout.LegacyCell> cells = LegacyLayout.deserializeCells(metadata, in, flag, size); - SerializationHelper helper = new SerializationHelper(metadata, version, flag); - try (UnfilteredRowIterator iterator = LegacyLayout.onWireCellstoUnfilteredRowIterator(metadata, dk, info, cells, false, helper)) + try (UnfilteredRowIterator iterator = LegacyLayout.deserializeLegacyPartition(in, version, flag, key)) { + assert iterator != null; // This is only used in mutation, and mutation have never allowed "null" column families return PartitionUpdate.fromIterator(iterator); } } public long serializedSize(PartitionUpdate update, int version) { - if (version < MessagingService.VERSION_30) - { - // TODO - throw new UnsupportedOperationException("Version is " + version); - //if (cf == null) - //{ - // return TypeSizes.sizeof(false); - //} - //else - //{ - // return TypeSizes.sizeof(true) /* nullness bool */ - // + cfIdSerializedSize(cf.id(), typeSizes, version) /* id */ - // + contentSerializedSize(cf, typeSizes, version); - //} - } - try (UnfilteredRowIterator iter = update.sliceableUnfilteredIterator()) { + if (version < MessagingService.VERSION_30) + return LegacyLayout.serializedSizeAsLegacyPartition(iter, version); + return CFMetaData.serializer.serializedSize(update.metadata(), version) + UnfilteredRowIteratorSerializer.serializer.serializedSize(iter, version, update.rows.size()); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java index dd625c4..0418e7f 100644 --- a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java +++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java @@ -22,13 +22,19 @@ import java.io.IOException; import java.security.MessageDigest; import java.util.*; +import com.google.common.collect.AbstractIterator; +import com.google.common.collect.Lists; + import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.*; +import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.utils.MergeIterator; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.*; /** * Static methods to work with partition iterators. @@ -357,8 +363,7 @@ public abstract class UnfilteredPartitionIterators { public void serialize(UnfilteredPartitionIterator iter, DataOutputPlus out, int version) throws IOException { - if (version < MessagingService.VERSION_30) - throw new UnsupportedOperationException(); + assert version >= MessagingService.VERSION_30; // We handle backward compatibility directy in ReadResponse.LegacyRangeSliceReplySerializer out.writeBoolean(iter.isForThrift()); while (iter.hasNext()) @@ -374,9 +379,7 @@ public abstract class UnfilteredPartitionIterators public UnfilteredPartitionIterator deserialize(final DataInputPlus in, final int version, final CFMetaData metadata, final SerializationHelper.Flag flag) throws IOException { - if (version < MessagingService.VERSION_30) - throw new UnsupportedOperationException(); - + assert version >= MessagingService.VERSION_30; // We handle backward compatibility directy in ReadResponse.LegacyRangeSliceReplySerializer final boolean isForThrift = in.readBoolean(); return new AbstractUnfilteredPartitionIterator() http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/rows/BTreeBackedRow.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/BTreeBackedRow.java b/src/java/org/apache/cassandra/db/rows/BTreeBackedRow.java index 7e9ceb8..8f9e921 100644 --- a/src/java/org/apache/cassandra/db/rows/BTreeBackedRow.java +++ b/src/java/org/apache/cassandra/db/rows/BTreeBackedRow.java @@ -23,11 +23,13 @@ import java.util.function.Predicate; import com.google.common.base.Function; import com.google.common.collect.AbstractIterator; +import com.google.common.collect.Iterators; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.db.*; import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.ObjectSizes; @@ -363,6 +365,11 @@ public class BTreeBackedRow extends AbstractRow ((ComplexColumnData) current).setValue(path, value); } + public Iterable<Cell> cellsInLegacyOrder(CFMetaData metadata) + { + return () -> new CellInLegacyOrderIterator(metadata); + } + private class CellIterator extends AbstractIterator<Cell> { private Iterator<ColumnData> columnData = iterator(); @@ -392,6 +399,61 @@ public class BTreeBackedRow extends AbstractRow } } + private class CellInLegacyOrderIterator extends AbstractIterator<Cell> + { + private final AbstractType<?> comparator; + private final int firstComplexIdx; + private int simpleIdx; + private int complexIdx; + private Iterator<Cell> complexCells; + private final Object[] data; + + private CellInLegacyOrderIterator(CFMetaData metadata) + { + this.comparator = metadata.getColumnDefinitionNameComparator(isStatic() ? ColumnDefinition.Kind.STATIC : ColumnDefinition.Kind.REGULAR); + + // copy btree into array for simple separate iteration of simple and complex columns + this.data = new Object[BTree.size(btree)]; + BTree.toArray(btree, data, 0); + + int idx = Iterators.indexOf(Iterators.forArray(data), cd -> cd instanceof ComplexColumnData); + this.firstComplexIdx = idx < 0 ? data.length : idx; + this.complexIdx = firstComplexIdx; + } + + protected Cell computeNext() + { + while (true) + { + if (complexCells != null) + { + if (complexCells.hasNext()) + return complexCells.next(); + + complexCells = null; + } + + if (simpleIdx >= firstComplexIdx) + { + if (complexIdx >= data.length) + return endOfData(); + + complexCells = ((ComplexColumnData)data[complexIdx++]).iterator(); + } + else + { + if (complexIdx >= data.length) + return (Cell)data[simpleIdx++]; + + if (comparator.compare(((ColumnData) data[simpleIdx]).column().name.bytes, ((ColumnData) data[complexIdx]).column().name.bytes) < 0) + return (Cell)data[simpleIdx++]; + else + complexCells = ((ComplexColumnData)data[complexIdx++]).iterator(); + } + } + } + } + public static class Builder implements Row.Builder { // a simple marker class that will sort to the beginning of a run of complex cells to store the deletion time http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/rows/Row.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/Row.java b/src/java/org/apache/cassandra/db/rows/Row.java index 0c3dc2d..33ad447 100644 --- a/src/java/org/apache/cassandra/db/rows/Row.java +++ b/src/java/org/apache/cassandra/db/rows/Row.java @@ -146,6 +146,18 @@ public interface Row extends Unfiltered, Iterable<ColumnData> public Iterable<Cell> cells(); /** + * An iterable over the cells of this row that return cells in "legacy order". + * <p> + * In 3.0+, columns are sorted so that all simple columns are before all complex columns. Previously + * however, the cells where just sorted by the column name. This iterator return cells in that + * legacy order. It's only ever meaningful for backward/thrift compatibility code. + * + * @param metadata the table this is a row of. + * @return an iterable over the cells of this row in "legacy order". + */ + public Iterable<Cell> cellsInLegacyOrder(CFMetaData metadata); + + /** * Whether the row stores any (non-live) complex deletion for any complex column. */ public boolean hasComplexDeletion(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index 2c7932b..25eb0d0 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -210,8 +210,8 @@ public final class MessagingService implements MessagingServiceMBean put(Verb.BATCHLOG_MUTATION, Mutation.serializer); put(Verb.READ_REPAIR, Mutation.serializer); put(Verb.READ, ReadCommand.serializer); - //put(Verb.RANGE_SLICE, ReadCommand.legacyRangeSliceCommandSerializer); - //put(Verb.PAGED_RANGE, ReadCommand.legacyPagedRangeCommandSerializer); + put(Verb.RANGE_SLICE, ReadCommand.legacyRangeSliceCommandSerializer); + put(Verb.PAGED_RANGE, ReadCommand.legacyPagedRangeCommandSerializer); put(Verb.BOOTSTRAP_TOKEN, BootStrapper.StringSerializer.instance); put(Verb.REPAIR_MESSAGE, RepairMessage.serializer); put(Verb.GOSSIP_DIGEST_ACK, GossipDigestAck.serializer); http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/service/AbstractReadExecutor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java index 487a14c..16a3e6e 100644 --- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java +++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java @@ -99,7 +99,7 @@ public abstract class AbstractReadExecutor traceState.trace("reading {} from {}", readCommand.isDigestQuery() ? "digest" : "data", endpoint); logger.trace("reading {} from {}", readCommand.isDigestQuery() ? "digest" : "data", endpoint); if (message == null) - message = readCommand.createMessage(); + message = readCommand.createMessage(MessagingService.instance().getVersion(endpoint)); MessagingService.instance().sendRRWithFailure(message, endpoint, handler); } @@ -277,7 +277,8 @@ public abstract class AbstractReadExecutor if (traceState != null) traceState.trace("speculating read retry on {}", extraReplica); logger.trace("speculating read retry on {}", extraReplica); - MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(), extraReplica, handler); + int version = MessagingService.instance().getVersion(extraReplica); + MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(version), extraReplica, handler); speculated = true; cfs.metric.speculativeRetries.inc(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/service/DataResolver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java index a1b5c96..6bfe94a 100644 --- a/src/java/org/apache/cassandra/service/DataResolver.java +++ b/src/java/org/apache/cassandra/service/DataResolver.java @@ -49,7 +49,7 @@ public class DataResolver extends ResponseResolver public PartitionIterator getData() { ReadResponse response = responses.iterator().next().payload; - return UnfilteredPartitionIterators.filter(response.makeIterator(command.metadata()), command.nowInSec()); + return UnfilteredPartitionIterators.filter(response.makeIterator(command.metadata(), command), command.nowInSec()); } public PartitionIterator resolve() @@ -62,7 +62,7 @@ public class DataResolver extends ResponseResolver for (int i = 0; i < count; i++) { MessageIn<ReadResponse> msg = responses.get(i); - iters.add(msg.payload.makeIterator(command.metadata())); + iters.add(msg.payload.makeIterator(command.metadata(), command)); sources[i] = msg.from; } @@ -406,12 +406,12 @@ public class DataResolver extends ResponseResolver if (StorageProxy.canDoLocalRequest(source)) StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(retryCommand, handler)); else - MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(), source, handler); + MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(MessagingService.current_version), source, handler); // We don't call handler.get() because we want to preserve tombstones since we're still in the middle of merging node results. handler.awaitResults(); assert resolver.responses.size() == 1; - return UnfilteredPartitionIterators.getOnlyElement(resolver.responses.get(0).payload.makeIterator(command.metadata()), retryCommand); + return UnfilteredPartitionIterators.getOnlyElement(resolver.responses.get(0).payload.makeIterator(command.metadata(), command), retryCommand); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/service/DigestResolver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/DigestResolver.java b/src/java/org/apache/cassandra/service/DigestResolver.java index 42aee04..db8adf3 100644 --- a/src/java/org/apache/cassandra/service/DigestResolver.java +++ b/src/java/org/apache/cassandra/service/DigestResolver.java @@ -48,7 +48,7 @@ public class DigestResolver extends ResponseResolver public PartitionIterator getData() { assert isDataPresent(); - return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command.metadata()), command.nowInSec()); + return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command.metadata(), command), command.nowInSec()); } /* @@ -77,7 +77,7 @@ public class DigestResolver extends ResponseResolver { ReadResponse response = message.payload; - ByteBuffer newDigest = response.digest(command.metadata()); + ByteBuffer newDigest = response.digest(command.metadata(), command); if (digest == null) digest = newDigest; else if (!digest.equals(newDigest)) @@ -88,7 +88,7 @@ public class DigestResolver extends ResponseResolver if (logger.isDebugEnabled()) logger.debug("resolve: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)); - return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command.metadata()), command.nowInSec()); + return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command.metadata(), command), command.nowInSec()); } public boolean isDataPresent() http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/service/ReadCallback.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java index d548019..8b1ef32 100644 --- a/src/java/org/apache/cassandra/service/ReadCallback.java +++ b/src/java/org/apache/cassandra/service/ReadCallback.java @@ -238,9 +238,11 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse> final DataResolver repairResolver = new DataResolver(keyspace, command, consistencyLevel, endpoints.size()); AsyncRepairCallback repairHandler = new AsyncRepairCallback(repairResolver, endpoints.size()); - MessageOut<ReadCommand> message = command.createMessage(); for (InetAddress endpoint : endpoints) + { + MessageOut<ReadCommand> message = command.createMessage(MessagingService.instance().getVersion(endpoint)); MessagingService.instance().sendRR(message, endpoint, repairHandler); + } } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 2c3c018..1e1f847 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -1621,9 +1621,9 @@ public class StorageProxy implements StorageProxyMBean keyspace, executor.handler.endpoints); - MessageOut<ReadCommand> message = command.createMessage(); for (InetAddress endpoint : executor.getContactedReplicas()) { + MessageOut<ReadCommand> message = command.createMessage(MessagingService.instance().getVersion(endpoint)); Tracing.trace("Enqueuing full data read to {}", endpoint); MessagingService.instance().sendRRWithFailure(message, endpoint, repairHandler); } @@ -1974,9 +1974,9 @@ public class StorageProxy implements StorageProxyMBean } else { - MessageOut<ReadCommand> message = rangeCommand.createMessage(); for (InetAddress endpoint : toQuery.filteredEndpoints) { + MessageOut<ReadCommand> message = rangeCommand.createMessage(MessagingService.instance().getVersion(endpoint)); Tracing.trace("Enqueuing request to {}", endpoint); MessagingService.instance().sendRRWithFailure(message, endpoint, handler); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index a8ac8bf..959f7e3 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -296,8 +296,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE MessagingService.instance().registerVerbHandlers(MessagingService.Verb.BATCHLOG_MUTATION, new MutationVerbHandler()); MessagingService.instance().registerVerbHandlers(MessagingService.Verb.READ_REPAIR, new ReadRepairVerbHandler()); MessagingService.instance().registerVerbHandlers(MessagingService.Verb.READ, new ReadCommandVerbHandler()); - MessagingService.instance().registerVerbHandlers(MessagingService.Verb.RANGE_SLICE, new ReadCommandVerbHandler()); - MessagingService.instance().registerVerbHandlers(MessagingService.Verb.PAGED_RANGE, new ReadCommandVerbHandler()); + MessagingService.instance().registerVerbHandlers(MessagingService.Verb.RANGE_SLICE, new RangeSliceVerbHandler()); + MessagingService.instance().registerVerbHandlers(MessagingService.Verb.PAGED_RANGE, new RangeSliceVerbHandler()); MessagingService.instance().registerVerbHandlers(MessagingService.Verb.COUNTER_MUTATION, new CounterMutationVerbHandler()); MessagingService.instance().registerVerbHandlers(MessagingService.Verb.TRUNCATE, new TruncateVerbHandler()); MessagingService.instance().registerVerbHandlers(MessagingService.Verb.PAXOS_PREPARE, new PrepareVerbHandler()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java index 169944b..2e57a8b 100644 --- a/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java +++ b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java @@ -47,7 +47,9 @@ public class RangeSliceQueryPager extends AbstractQueryPager if (state != null) { lastReturnedKey = command.metadata().decorateKey(state.partitionKey); - lastReturnedClustering = LegacyLayout.decodeClustering(command.metadata(), state.cellName); + lastReturnedClustering = state.cellName.hasRemaining() + ? LegacyLayout.decodeClustering(command.metadata(), state.cellName) + : null; restoreState(lastReturnedKey, state.remaining, state.remainingInPartition); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java b/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java index bb223b8..28c5206 100644 --- a/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java +++ b/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java @@ -24,11 +24,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.db.*; import org.apache.cassandra.db.rows.*; -import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.filter.*; -import org.apache.cassandra.exceptions.RequestValidationException; -import org.apache.cassandra.exceptions.RequestExecutionException; -import org.apache.cassandra.service.ClientState; /** * Common interface to single partition queries (by slice and by name). @@ -50,7 +46,9 @@ public class SinglePartitionPager extends AbstractQueryPager if (state != null) { - lastReturned = LegacyLayout.decodeClustering(command.metadata(), state.cellName); + lastReturned = state.cellName.hasRemaining() + ? LegacyLayout.decodeClustering(command.metadata(), state.cellName) + : null; restoreState(command.partitionKey(), state.remaining, state.remainingInPartition); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/thrift/CassandraServer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java index 9353d16..733067e 100644 --- a/src/java/org/apache/cassandra/thrift/CassandraServer.java +++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java @@ -237,7 +237,7 @@ public class CassandraServer implements Cassandra.Iface if (partition.isEmpty()) return EMPTY_COLUMNS; - Iterator<LegacyLayout.LegacyCell> cells = LegacyLayout.fromRowIterator(partition); + Iterator<LegacyLayout.LegacyCell> cells = LegacyLayout.fromRowIterator(partition).right; List<ColumnOrSuperColumn> result; if (partition.metadata().isSuper()) { @@ -932,7 +932,7 @@ public class CassandraServer implements Cassandra.Iface { return result == null ? new CASResult(true) - : new CASResult(false).setCurrent_values(thriftifyColumnsAsColumns(metadata, LegacyLayout.fromRowIterator(result))); + : new CASResult(false).setCurrent_values(thriftifyColumnsAsColumns(metadata, LegacyLayout.fromRowIterator(result).right)); } } catch (UnknownColumnException e)