Repository: cassandra Updated Branches: refs/heads/trunk aa6233aa1 -> d5b2fa206
Make paxos reuse the timestamp generation of normal operation patch by slebresne; reviewed by iamaleksey for CASSANDRA-7801 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/85ea3735 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/85ea3735 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/85ea3735 Branch: refs/heads/trunk Commit: 85ea37356e666c2780294bbd29daa89a32ebf333 Parents: cdf80d9 Author: Sylvain Lebresne <sylv...@datastax.com> Authored: Thu Nov 6 16:38:27 2014 +0100 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Thu Nov 6 16:38:27 2014 +0100 ---------------------------------------------------------------------- CHANGES.txt | 3 + .../cql3/statements/BatchStatement.java | 7 +- .../cql3/statements/ModificationStatement.java | 3 +- .../cql3/statements/SelectStatement.java | 8 +- .../apache/cassandra/service/ClientState.java | 36 +++++ .../apache/cassandra/service/QueryState.java | 5 +- .../apache/cassandra/service/StorageProxy.java | 139 ++++++++++++++----- .../service/pager/MultiPartitionPager.java | 13 +- .../service/pager/NamesQueryPager.java | 7 +- .../cassandra/service/pager/QueryPagers.java | 30 ++-- .../service/pager/SliceQueryPager.java | 11 +- .../cassandra/thrift/CassandraServer.java | 35 +++-- 12 files changed, 206 insertions(+), 91 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/85ea3735/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 2e60f3a..5348f2f 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,3 +1,6 @@ +2.1.3 + * Have paxos reuse the timestamp generation of normal queries (CASSANDRA-7801) + 2.1.2 * (cqlsh) parse_for_table_meta errors out on queries with undefined grammars (CASSANDRA-8262) http://git-wip-us.apache.org/repos/asf/cassandra/blob/85ea3735/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java index 17d1771..d54e4fd 100644 --- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java @@ -275,7 +275,7 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache throw new InvalidRequestException("Invalid empty serial consistency level"); if (hasConditions) - return executeWithConditions(options, now); + return executeWithConditions(options, queryState); executeWithoutConditions(getMutations(options, local, now), options.getConsistency()); return new ResultMessage.Void(); @@ -297,9 +297,10 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache StorageProxy.mutateWithTriggers(mutations, cl, mutateAtomic); } - private ResultMessage executeWithConditions(BatchQueryOptions options, long now) + private ResultMessage executeWithConditions(BatchQueryOptions options, QueryState state) throws RequestExecutionException, RequestValidationException { + long now = state.getTimestamp(); ByteBuffer key = null; String ksName = null; String cfName = null; @@ -339,7 +340,7 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache casRequest.addRowUpdate(clusteringPrefix, statement, statementOptions, timestamp); } - ColumnFamily result = StorageProxy.cas(ksName, cfName, key, casRequest, options.getSerialConsistency(), options.getConsistency()); + ColumnFamily result = StorageProxy.cas(ksName, cfName, key, casRequest, options.getSerialConsistency(), options.getConsistency(), state.getClientState()); return new ResultMessage.Rows(ModificationStatement.buildCasResultSet(ksName, key, cfName, result, columnsWithConditions, true, options.forStatement(0))); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/85ea3735/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java index 974ccc8..846ad3e 100644 --- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java @@ -524,7 +524,8 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF key, request, options.getSerialConsistency(), - options.getConsistency()); + options.getConsistency(), + queryState.getClientState()); return new ResultMessage.Rows(buildCasResultSet(key, result, options)); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/85ea3735/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java index 2632ee2..84cbdc0 100644 --- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java @@ -212,11 +212,11 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache if (pageSize <= 0 || command == null || !QueryPagers.mayNeedPaging(command, pageSize)) { - return execute(command, options, limit, now); + return execute(command, options, limit, now, state); } else { - QueryPager pager = QueryPagers.pager(command, cl, options.getPagingState()); + QueryPager pager = QueryPagers.pager(command, cl, state.getClientState(), options.getPagingState()); if (parameters.isCount) return pageCountQuery(pager, options, pageSize, now, limit); @@ -250,7 +250,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache return getPageableCommand(options, getLimit(options), System.currentTimeMillis()); } - private ResultMessage.Rows execute(Pageable command, QueryOptions options, int limit, long now) throws RequestValidationException, RequestExecutionException + private ResultMessage.Rows execute(Pageable command, QueryOptions options, int limit, long now, QueryState state) throws RequestValidationException, RequestExecutionException { List<Row> rows; if (command == null) @@ -260,7 +260,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache else { rows = command instanceof Pageable.ReadCommands - ? StorageProxy.read(((Pageable.ReadCommands)command).commands, options.getConsistency()) + ? StorageProxy.read(((Pageable.ReadCommands)command).commands, options.getConsistency(), state.getClientState()) : StorageProxy.getRangeSlice((RangeSliceCommand)command, options.getConsistency()); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/85ea3735/src/java/org/apache/cassandra/service/ClientState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ClientState.java b/src/java/org/apache/cassandra/service/ClientState.java index b4b162c..e1df1bd 100644 --- a/src/java/org/apache/cassandra/service/ClientState.java +++ b/src/java/org/apache/cassandra/service/ClientState.java @@ -20,6 +20,7 @@ package org.apache.cassandra.service; import java.net.SocketAddress; import java.util.*; import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicLong; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; @@ -99,6 +100,9 @@ public class ClientState // The remote address of the client - null for internal clients. private final SocketAddress remoteAddress; + // The biggest timestamp that was returned by getTimestamp/assigned to a query + private final AtomicLong lastTimestampMicros = new AtomicLong(0); + /** * Construct a new, empty ClientState for internal calls. */ @@ -132,6 +136,38 @@ public class ClientState return new ClientState(remoteAddress); } + /** + * This clock guarantees that updates for the same ClientState will be ordered + * in the sequence seen, even if multiple updates happen in the same millisecond. + */ + public long getTimestamp() + { + while (true) + { + long current = System.currentTimeMillis() * 1000; + long last = lastTimestampMicros.get(); + long tstamp = last >= current ? last + 1 : current; + if (lastTimestampMicros.compareAndSet(last, tstamp)) + return tstamp; + } + } + + /** + * Can be use when a timestamp has been assigned by a query, but that timestamp is + * not directly one returned by getTimestamp() (see SP.beginAndRepairPaxos()). + * This ensure following calls to getTimestamp() will return a timestamp strictly + * greated than the one provided to this method. + */ + public void updateLastTimestamp(long tstampMicros) + { + while (true) + { + long last = lastTimestampMicros.get(); + if (tstampMicros <= last || lastTimestampMicros.compareAndSet(last, tstampMicros)) + return; + } + } + public static QueryHandler getCQLQueryHandler() { return cqlQueryHandler; http://git-wip-us.apache.org/repos/asf/cassandra/blob/85ea3735/src/java/org/apache/cassandra/service/QueryState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/QueryState.java b/src/java/org/apache/cassandra/service/QueryState.java index 0179a3e..9891ba0 100644 --- a/src/java/org/apache/cassandra/service/QueryState.java +++ b/src/java/org/apache/cassandra/service/QueryState.java @@ -29,7 +29,6 @@ import org.apache.cassandra.utils.FBUtilities; public class QueryState { private final ClientState clientState; - private volatile long clock; private volatile UUID preparedTracingSession; public QueryState(ClientState clientState) @@ -56,9 +55,7 @@ public class QueryState */ public long getTimestamp() { - long current = System.currentTimeMillis() * 1000; - clock = clock >= current ? clock + 1 : current; - return clock; + return clientState.getTimestamp(); } public boolean traceNextQuery() http://git-wip-us.apache.org/repos/asf/cassandra/blob/85ea3735/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 72f9e15..d45e74b 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -201,7 +201,8 @@ public class StorageProxy implements StorageProxyMBean ByteBuffer key, CASRequest request, ConsistencyLevel consistencyForPaxos, - ConsistencyLevel consistencyForCommit) + ConsistencyLevel consistencyForCommit, + ClientState state) throws UnavailableException, IsBootstrappingException, ReadTimeoutException, WriteTimeoutException, InvalidRequestException { final long start = System.nanoTime(); @@ -221,7 +222,7 @@ public class StorageProxy implements StorageProxyMBean List<InetAddress> liveEndpoints = p.left; int requiredParticipants = p.right; - final Pair<UUID, Integer> pair = beginAndRepairPaxos(start, key, metadata, liveEndpoints, requiredParticipants, consistencyForPaxos, consistencyForCommit, true); + final Pair<UUID, Integer> pair = beginAndRepairPaxos(start, key, metadata, liveEndpoints, requiredParticipants, consistencyForPaxos, consistencyForCommit, true, state); final UUID ballot = pair.left; contentions += pair.right; // read the current values and check they validate the conditions @@ -324,7 +325,15 @@ public class StorageProxy implements StorageProxyMBean * @return the Paxos ballot promised by the replicas if no in-progress requests were seen and a quorum of * nodes have seen the mostRecentCommit. Otherwise, return null. */ - private static Pair<UUID, Integer> beginAndRepairPaxos(long start, ByteBuffer key, CFMetaData metadata, List<InetAddress> liveEndpoints, int requiredParticipants, ConsistencyLevel consistencyForPaxos, ConsistencyLevel consistencyForCommit, final boolean isWrite) + private static Pair<UUID, Integer> beginAndRepairPaxos(long start, + ByteBuffer key, + CFMetaData metadata, + List<InetAddress> liveEndpoints, + int requiredParticipants, + ConsistencyLevel consistencyForPaxos, + ConsistencyLevel consistencyForCommit, + final boolean isWrite, + ClientState state) throws WriteTimeoutException { long timeout = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getCasContentionTimeout()); @@ -333,9 +342,13 @@ public class StorageProxy implements StorageProxyMBean int contentions = 0; while (System.nanoTime() - start < timeout) { + // We don't want to use a timestamp that is older than the last one assigned by the ClientState or operations + // may appear out-of-order (#7801). But note that state.getTimestamp() is in microseconds while the ballot + // timestamp is only in milliseconds + long currentTime = (state.getTimestamp() / 1000) + 1; long ballotMillis = summary == null - ? System.currentTimeMillis() - : Math.max(System.currentTimeMillis(), 1 + UUIDGen.unixTimestamp(summary.mostRecentInProgressCommit.ballot)); + ? currentTime + : Math.max(currentTime, 1 + UUIDGen.unixTimestamp(summary.mostRecentInProgressCommit.ballot)); UUID ballot = UUIDGen.getTimeUUID(ballotMillis); // prepare @@ -394,6 +407,10 @@ public class StorageProxy implements StorageProxyMBean continue; } + // We might commit this ballot and we want to ensure operations starting after this CAS succeed will be assigned + // a timestamp greater that the one of this ballot, so operation order is preserved (#7801) + state.updateLastTimestamp(ballotMillis * 1000); + return Pair.create(ballot, contentions); } @@ -1134,11 +1151,19 @@ public class StorageProxy implements StorageProxyMBean return true; } + public static List<Row> read(List<ReadCommand> commands, ConsistencyLevel consistencyLevel) + throws UnavailableException, IsBootstrappingException, ReadTimeoutException, InvalidRequestException + { + // When using serial CL, the ClientState should be provided + assert !consistencyLevel.isSerialConsistency(); + return read(commands, consistencyLevel, null); + } + /** * Performs the actual reading of a row out of the StorageService, fetching * a specific set of column names from a given column family. */ - public static List<Row> read(List<ReadCommand> commands, ConsistencyLevel consistency_level) + public static List<Row> read(List<ReadCommand> commands, ConsistencyLevel consistencyLevel, ClientState state) throws UnavailableException, IsBootstrappingException, ReadTimeoutException, InvalidRequestException { if (StorageService.instance.isBootstrapMode() && !systemKeyspaceQuery(commands)) @@ -1148,68 +1173,106 @@ public class StorageProxy implements StorageProxyMBean throw new IsBootstrappingException(); } + return consistencyLevel.isSerialConsistency() + ? readWithPaxos(commands, consistencyLevel, state) + : readRegular(commands, consistencyLevel); + } + + private static List<Row> readWithPaxos(List<ReadCommand> commands, ConsistencyLevel consistencyLevel, ClientState state) + throws InvalidRequestException, UnavailableException, ReadTimeoutException + { + assert state != null; + long start = System.nanoTime(); List<Row> rows = null; + try { - if (consistency_level.isSerialConsistency()) - { - // make sure any in-progress paxos writes are done (i.e., committed to a majority of replicas), before performing a quorum read - if (commands.size() > 1) - throw new InvalidRequestException("SERIAL/LOCAL_SERIAL consistency may only be requested for one row at a time"); - ReadCommand command = commands.get(0); - - CFMetaData metadata = Schema.instance.getCFMetaData(command.ksName, command.cfName); - Pair<List<InetAddress>, Integer> p = getPaxosParticipants(command.ksName, command.key, consistency_level); - List<InetAddress> liveEndpoints = p.left; - int requiredParticipants = p.right; + // make sure any in-progress paxos writes are done (i.e., committed to a majority of replicas), before performing a quorum read + if (commands.size() > 1) + throw new InvalidRequestException("SERIAL/LOCAL_SERIAL consistency may only be requested for one row at a time"); + ReadCommand command = commands.get(0); - // does the work of applying in-progress writes; throws UAE or timeout if it can't - final ConsistencyLevel consistencyForCommitOrFetch = consistency_level == ConsistencyLevel.LOCAL_SERIAL ? ConsistencyLevel.LOCAL_QUORUM : ConsistencyLevel.QUORUM; - try - { - final Pair<UUID, Integer> pair = beginAndRepairPaxos(start, command.key, metadata, liveEndpoints, requiredParticipants, consistency_level, consistencyForCommitOrFetch, false); - if(pair.right > 0) - casReadMetrics.contention.update(pair.right); - } - catch (WriteTimeoutException e) - { - throw new ReadTimeoutException(consistency_level, 0, consistency_level.blockFor(Keyspace.open(command.ksName)), false); - } + CFMetaData metadata = Schema.instance.getCFMetaData(command.ksName, command.cfName); + Pair<List<InetAddress>, Integer> p = getPaxosParticipants(command.ksName, command.key, consistencyLevel); + List<InetAddress> liveEndpoints = p.left; + int requiredParticipants = p.right; - rows = fetchRows(commands, consistencyForCommitOrFetch); + // does the work of applying in-progress writes; throws UAE or timeout if it can't + final ConsistencyLevel consistencyForCommitOrFetch = consistencyLevel == ConsistencyLevel.LOCAL_SERIAL + ? ConsistencyLevel.LOCAL_QUORUM + : ConsistencyLevel.QUORUM; + try + { + final Pair<UUID, Integer> pair = beginAndRepairPaxos(start, command.key, metadata, liveEndpoints, requiredParticipants, consistencyLevel, consistencyForCommitOrFetch, false, state); + if (pair.right > 0) + casReadMetrics.contention.update(pair.right); } - else + catch (WriteTimeoutException e) { - rows = fetchRows(commands, consistency_level); + throw new ReadTimeoutException(consistencyLevel, 0, consistencyLevel.blockFor(Keyspace.open(command.ksName)), false); } + + rows = fetchRows(commands, consistencyForCommitOrFetch); + } + catch (UnavailableException e) + { + readMetrics.unavailables.mark(); + ClientRequestMetrics.readUnavailables.inc(); + casReadMetrics.unavailables.mark(); + throw e; + } + catch (ReadTimeoutException e) + { + readMetrics.timeouts.mark(); + ClientRequestMetrics.readTimeouts.inc(); + casReadMetrics.timeouts.mark(); + throw e; + } + finally + { + long latency = System.nanoTime() - start; + readMetrics.addNano(latency); + casReadMetrics.addNano(latency); + // TODO avoid giving every command the same latency number. Can fix this in CASSADRA-5329 + for (ReadCommand command : commands) + Keyspace.open(command.ksName).getColumnFamilyStore(command.cfName).metric.coordinatorReadLatency.update(latency, TimeUnit.NANOSECONDS); + } + + return rows; + } + + private static List<Row> readRegular(List<ReadCommand> commands, ConsistencyLevel consistencyLevel) + throws UnavailableException, ReadTimeoutException + { + long start = System.nanoTime(); + List<Row> rows = null; + + try + { + rows = fetchRows(commands, consistencyLevel); } catch (UnavailableException e) { readMetrics.unavailables.mark(); ClientRequestMetrics.readUnavailables.inc(); - if(consistency_level.isSerialConsistency()) - casReadMetrics.unavailables.mark(); throw e; } catch (ReadTimeoutException e) { readMetrics.timeouts.mark(); ClientRequestMetrics.readTimeouts.inc(); - if(consistency_level.isSerialConsistency()) - casReadMetrics.timeouts.mark(); throw e; } finally { long latency = System.nanoTime() - start; readMetrics.addNano(latency); - if(consistency_level.isSerialConsistency()) - casReadMetrics.addNano(latency); // TODO avoid giving every command the same latency number. Can fix this in CASSADRA-5329 for (ReadCommand command : commands) Keyspace.open(command.ksName).getColumnFamilyStore(command.cfName).metric.coordinatorReadLatency.update(latency, TimeUnit.NANOSECONDS); } + return rows; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/85ea3735/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java index 35d6752..6ed635f 100644 --- a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java +++ b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java @@ -23,6 +23,7 @@ import java.util.List; import org.apache.cassandra.db.*; import org.apache.cassandra.exceptions.RequestValidationException; import org.apache.cassandra.exceptions.RequestExecutionException; +import org.apache.cassandra.service.ClientState; /** * Pager over a list of ReadCommand. @@ -46,7 +47,7 @@ class MultiPartitionPager implements QueryPager private int remaining; private int current; - MultiPartitionPager(List<ReadCommand> commands, ConsistencyLevel consistencyLevel, boolean localQuery, PagingState state) + MultiPartitionPager(List<ReadCommand> commands, ConsistencyLevel consistencyLevel, ClientState cState, boolean localQuery, PagingState state) { int i = 0; // If it's not the beginning (state != null), we need to find where we were and skip previous commands @@ -65,7 +66,7 @@ class MultiPartitionPager implements QueryPager pagers = new SinglePartitionPager[commands.size() - i]; // 'i' is on the first non exhausted pager for the previous page (or the first one) - pagers[0] = makePager(commands.get(i), consistencyLevel, localQuery, state); + pagers[0] = makePager(commands.get(i), consistencyLevel, cState, localQuery, state); timestamp = commands.get(i).timestamp; // Following ones haven't been started yet @@ -74,16 +75,16 @@ class MultiPartitionPager implements QueryPager ReadCommand command = commands.get(j); if (command.timestamp != timestamp) throw new IllegalArgumentException("All commands must have the same timestamp or weird results may happen."); - pagers[j - i] = makePager(command, consistencyLevel, localQuery, null); + pagers[j - i] = makePager(command, consistencyLevel, cState, localQuery, null); } remaining = state == null ? computeRemaining(pagers) : state.remaining; } - private static SinglePartitionPager makePager(ReadCommand command, ConsistencyLevel consistencyLevel, boolean localQuery, PagingState state) + private static SinglePartitionPager makePager(ReadCommand command, ConsistencyLevel consistencyLevel, ClientState cState, boolean localQuery, PagingState state) { return command instanceof SliceFromReadCommand - ? new SliceQueryPager((SliceFromReadCommand)command, consistencyLevel, localQuery, state) - : new NamesQueryPager((SliceByNamesReadCommand)command, consistencyLevel, localQuery); + ? new SliceQueryPager((SliceFromReadCommand)command, consistencyLevel, cState, localQuery, state) + : new NamesQueryPager((SliceByNamesReadCommand)command, consistencyLevel, cState, localQuery); } private static int computeRemaining(SinglePartitionPager[] pagers) http://git-wip-us.apache.org/repos/asf/cassandra/blob/85ea3735/src/java/org/apache/cassandra/service/pager/NamesQueryPager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/pager/NamesQueryPager.java b/src/java/org/apache/cassandra/service/pager/NamesQueryPager.java index 663db22..d03e582 100644 --- a/src/java/org/apache/cassandra/service/pager/NamesQueryPager.java +++ b/src/java/org/apache/cassandra/service/pager/NamesQueryPager.java @@ -25,6 +25,7 @@ import org.apache.cassandra.db.*; import org.apache.cassandra.db.filter.ColumnCounter; import org.apache.cassandra.exceptions.RequestValidationException; import org.apache.cassandra.exceptions.RequestExecutionException; +import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.StorageProxy; /** @@ -34,6 +35,7 @@ public class NamesQueryPager implements SinglePartitionPager { private final SliceByNamesReadCommand command; private final ConsistencyLevel consistencyLevel; + private final ClientState state; private final boolean localQuery; private volatile boolean queried; @@ -49,10 +51,11 @@ public class NamesQueryPager implements SinglePartitionPager * count every cell individually) and the names filter asks for more than pageSize columns. */ // Don't use directly, use QueryPagers method instead - NamesQueryPager(SliceByNamesReadCommand command, ConsistencyLevel consistencyLevel, boolean localQuery) + NamesQueryPager(SliceByNamesReadCommand command, ConsistencyLevel consistencyLevel, ClientState state, boolean localQuery) { this.command = command; this.consistencyLevel = consistencyLevel; + this.state = state; this.localQuery = localQuery; } @@ -87,7 +90,7 @@ public class NamesQueryPager implements SinglePartitionPager queried = true; return localQuery ? Collections.singletonList(command.getRow(Keyspace.open(command.ksName))) - : StorageProxy.read(Collections.<ReadCommand>singletonList(command), consistencyLevel); + : StorageProxy.read(Collections.<ReadCommand>singletonList(command), consistencyLevel, state); } public int maxRemaining() http://git-wip-us.apache.org/repos/asf/cassandra/blob/85ea3735/src/java/org/apache/cassandra/service/pager/QueryPagers.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/pager/QueryPagers.java b/src/java/org/apache/cassandra/service/pager/QueryPagers.java index 04702d0..c03e8ec 100644 --- a/src/java/org/apache/cassandra/service/pager/QueryPagers.java +++ b/src/java/org/apache/cassandra/service/pager/QueryPagers.java @@ -29,6 +29,7 @@ import org.apache.cassandra.db.filter.SliceQueryFilter; import org.apache.cassandra.db.columniterator.IdentityQueryFilter; import org.apache.cassandra.exceptions.RequestExecutionException; import org.apache.cassandra.exceptions.RequestValidationException; +import org.apache.cassandra.service.ClientState; /** * Static utility methods to create query pagers. @@ -82,27 +83,27 @@ public class QueryPagers } } - private static QueryPager pager(ReadCommand command, ConsistencyLevel consistencyLevel, boolean local, PagingState state) + private static QueryPager pager(ReadCommand command, ConsistencyLevel consistencyLevel, ClientState cState, boolean local, PagingState state) { if (command instanceof SliceByNamesReadCommand) - return new NamesQueryPager((SliceByNamesReadCommand)command, consistencyLevel, local); + return new NamesQueryPager((SliceByNamesReadCommand)command, consistencyLevel, cState, local); else - return new SliceQueryPager((SliceFromReadCommand)command, consistencyLevel, local, state); + return new SliceQueryPager((SliceFromReadCommand)command, consistencyLevel, cState, local, state); } - private static QueryPager pager(Pageable command, ConsistencyLevel consistencyLevel, boolean local, PagingState state) + private static QueryPager pager(Pageable command, ConsistencyLevel consistencyLevel, ClientState cState, boolean local, PagingState state) { if (command instanceof Pageable.ReadCommands) { List<ReadCommand> commands = ((Pageable.ReadCommands)command).commands; if (commands.size() == 1) - return pager(commands.get(0), consistencyLevel, local, state); + return pager(commands.get(0), consistencyLevel, cState, local, state); - return new MultiPartitionPager(commands, consistencyLevel, local, state); + return new MultiPartitionPager(commands, consistencyLevel, cState, local, state); } else if (command instanceof ReadCommand) { - return pager((ReadCommand)command, consistencyLevel, local, state); + return pager((ReadCommand)command, consistencyLevel, cState, local, state); } else { @@ -115,19 +116,19 @@ public class QueryPagers } } - public static QueryPager pager(Pageable command, ConsistencyLevel consistencyLevel) + public static QueryPager pager(Pageable command, ConsistencyLevel consistencyLevel, ClientState cState) { - return pager(command, consistencyLevel, false, null); + return pager(command, consistencyLevel, cState, false, null); } - public static QueryPager pager(Pageable command, ConsistencyLevel consistencyLevel, PagingState state) + public static QueryPager pager(Pageable command, ConsistencyLevel consistencyLevel, ClientState cState, PagingState state) { - return pager(command, consistencyLevel, false, state); + return pager(command, consistencyLevel, cState, false, state); } public static QueryPager localPager(Pageable command) { - return pager(command, null, true, null); + return pager(command, null, null, true, null); } /** @@ -137,7 +138,7 @@ public class QueryPagers public static Iterator<ColumnFamily> pageRowLocally(final ColumnFamilyStore cfs, ByteBuffer key, final int pageSize) { SliceFromReadCommand command = new SliceFromReadCommand(cfs.metadata.ksName, key, cfs.name, System.currentTimeMillis(), new IdentityQueryFilter()); - final SliceQueryPager pager = new SliceQueryPager(command, null, true); + final SliceQueryPager pager = new SliceQueryPager(command, null, null, true); return new Iterator<ColumnFamily>() { @@ -176,11 +177,12 @@ public class QueryPagers ByteBuffer key, SliceQueryFilter filter, ConsistencyLevel consistencyLevel, + ClientState cState, final int pageSize, long now) throws RequestValidationException, RequestExecutionException { SliceFromReadCommand command = new SliceFromReadCommand(keyspace, key, columnFamily, now, filter); - final SliceQueryPager pager = new SliceQueryPager(command, consistencyLevel, false); + final SliceQueryPager pager = new SliceQueryPager(command, consistencyLevel, cState, false); ColumnCounter counter = filter.columnCounter(Schema.instance.getCFMetaData(keyspace, columnFamily).comparator, now); while (!pager.isExhausted()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/85ea3735/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java b/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java index cd1caf3..05c05b1 100644 --- a/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java +++ b/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java @@ -27,6 +27,7 @@ import org.apache.cassandra.db.composites.Composite; import org.apache.cassandra.db.filter.SliceQueryFilter; import org.apache.cassandra.exceptions.RequestValidationException; import org.apache.cassandra.exceptions.RequestExecutionException; +import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.StorageProxy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,19 +40,21 @@ public class SliceQueryPager extends AbstractQueryPager implements SinglePartiti private static final Logger logger = LoggerFactory.getLogger(SliceQueryPager.class); private final SliceFromReadCommand command; + private final ClientState cstate; private volatile Composite lastReturned; // Don't use directly, use QueryPagers method instead - SliceQueryPager(SliceFromReadCommand command, ConsistencyLevel consistencyLevel, boolean localQuery) + SliceQueryPager(SliceFromReadCommand command, ConsistencyLevel consistencyLevel, ClientState cstate, boolean localQuery) { super(consistencyLevel, command.filter.count, localQuery, command.ksName, command.cfName, command.filter, command.timestamp); this.command = command; + this.cstate = cstate; } - SliceQueryPager(SliceFromReadCommand command, ConsistencyLevel consistencyLevel, boolean localQuery, PagingState state) + SliceQueryPager(SliceFromReadCommand command, ConsistencyLevel consistencyLevel, ClientState cstate, boolean localQuery, PagingState state) { - this(command, consistencyLevel, localQuery); + this(command, consistencyLevel, cstate, localQuery); if (state != null) { @@ -86,7 +89,7 @@ public class SliceQueryPager extends AbstractQueryPager implements SinglePartiti ReadCommand pageCmd = command.withUpdatedFilter(filter); return localQuery ? Collections.singletonList(pageCmd.getRow(Keyspace.open(command.ksName))) - : StorageProxy.read(Collections.singletonList(pageCmd), consistencyLevel); + : StorageProxy.read(Collections.singletonList(pageCmd), consistencyLevel, cstate); } protected boolean containsPreviousLast(Row first) http://git-wip-us.apache.org/repos/asf/cassandra/blob/85ea3735/src/java/org/apache/cassandra/thrift/CassandraServer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java index e7708df..6955e64 100644 --- a/src/java/org/apache/cassandra/thrift/CassandraServer.java +++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java @@ -103,7 +103,7 @@ public class CassandraServer implements Cassandra.Iface return ThriftSessionManager.instance.currentSession(); } - protected Map<DecoratedKey, ColumnFamily> readColumnFamily(List<ReadCommand> commands, org.apache.cassandra.db.ConsistencyLevel consistency_level) + protected Map<DecoratedKey, ColumnFamily> readColumnFamily(List<ReadCommand> commands, org.apache.cassandra.db.ConsistencyLevel consistency_level, ClientState cState) throws org.apache.cassandra.exceptions.InvalidRequestException, UnavailableException, TimedOutException { // TODO - Support multiple column families per row, right now row only contains 1 column family @@ -115,7 +115,7 @@ public class CassandraServer implements Cassandra.Iface schedule(DatabaseDescriptor.getReadRpcTimeout()); try { - rows = StorageProxy.read(commands, consistency_level); + rows = StorageProxy.read(commands, consistency_level, cState); } finally { @@ -269,10 +269,10 @@ public class CassandraServer implements Cassandra.Iface return thriftSuperColumns; } - private Map<ByteBuffer, List<ColumnOrSuperColumn>> getSlice(List<ReadCommand> commands, boolean subColumnsOnly, org.apache.cassandra.db.ConsistencyLevel consistency_level) + private Map<ByteBuffer, List<ColumnOrSuperColumn>> getSlice(List<ReadCommand> commands, boolean subColumnsOnly, org.apache.cassandra.db.ConsistencyLevel consistency_level, ClientState cState) throws org.apache.cassandra.exceptions.InvalidRequestException, UnavailableException, TimedOutException { - Map<DecoratedKey, ColumnFamily> columnFamilies = readColumnFamily(commands, consistency_level); + Map<DecoratedKey, ColumnFamily> columnFamilies = readColumnFamily(commands, consistency_level, cState); Map<ByteBuffer, List<ColumnOrSuperColumn>> columnFamiliesMap = new HashMap<ByteBuffer, List<ColumnOrSuperColumn>>(); for (ReadCommand command: commands) { @@ -322,7 +322,7 @@ public class CassandraServer implements Cassandra.Iface ClientState cState = state(); String keyspace = cState.getKeyspace(); state().hasColumnFamilyAccess(keyspace, column_parent.column_family, Permission.SELECT); - return getSliceInternal(keyspace, key, column_parent, System.currentTimeMillis(), predicate, consistency_level); + return getSliceInternal(keyspace, key, column_parent, System.currentTimeMillis(), predicate, consistency_level, cState); } catch (RequestValidationException e) { @@ -339,10 +339,11 @@ public class CassandraServer implements Cassandra.Iface ColumnParent column_parent, long timestamp, SlicePredicate predicate, - ConsistencyLevel consistency_level) + ConsistencyLevel consistency_level, + ClientState cState) throws org.apache.cassandra.exceptions.InvalidRequestException, UnavailableException, TimedOutException { - return multigetSliceInternal(keyspace, Collections.singletonList(key), column_parent, timestamp, predicate, consistency_level).get(key); + return multigetSliceInternal(keyspace, Collections.singletonList(key), column_parent, timestamp, predicate, consistency_level, cState).get(key); } public Map<ByteBuffer, List<ColumnOrSuperColumn>> multiget_slice(List<ByteBuffer> keys, ColumnParent column_parent, SlicePredicate predicate, ConsistencyLevel consistency_level) @@ -369,7 +370,7 @@ public class CassandraServer implements Cassandra.Iface ClientState cState = state(); String keyspace = cState.getKeyspace(); cState.hasColumnFamilyAccess(keyspace, column_parent.column_family, Permission.SELECT); - return multigetSliceInternal(keyspace, keys, column_parent, System.currentTimeMillis(), predicate, consistency_level); + return multigetSliceInternal(keyspace, keys, column_parent, System.currentTimeMillis(), predicate, consistency_level, cState); } catch (RequestValidationException e) { @@ -431,7 +432,8 @@ public class CassandraServer implements Cassandra.Iface ColumnParent column_parent, long timestamp, SlicePredicate predicate, - ConsistencyLevel consistency_level) + ConsistencyLevel consistency_level, + ClientState cState) throws org.apache.cassandra.exceptions.InvalidRequestException, UnavailableException, TimedOutException { CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_parent.column_family); @@ -452,7 +454,7 @@ public class CassandraServer implements Cassandra.Iface commands.add(ReadCommand.create(keyspace, key, column_parent.getColumn_family(), timestamp, filter.cloneShallow())); } - return getSlice(commands, column_parent.isSetSuper_column(), consistencyLevel); + return getSlice(commands, column_parent.isSetSuper_column(), consistencyLevel, cState); } public ColumnOrSuperColumn get(ByteBuffer key, ColumnPath column_path, ConsistencyLevel consistency_level) @@ -501,7 +503,7 @@ public class CassandraServer implements Cassandra.Iface long now = System.currentTimeMillis(); ReadCommand command = ReadCommand.create(keyspace, key, column_path.column_family, now, filter); - Map<DecoratedKey, ColumnFamily> cfamilies = readColumnFamily(Arrays.asList(command), consistencyLevel); + Map<DecoratedKey, ColumnFamily> cfamilies = readColumnFamily(Arrays.asList(command), consistencyLevel, cState); ColumnFamily cf = cfamilies.get(StorageService.getPartitioner().decorateKey(command.key)); @@ -549,7 +551,7 @@ public class CassandraServer implements Cassandra.Iface long timestamp = System.currentTimeMillis(); if (predicate.column_names != null) - return getSliceInternal(keyspace, key, column_parent, timestamp, predicate, consistency_level).size(); + return getSliceInternal(keyspace, key, column_parent, timestamp, predicate, consistency_level, cState).size(); int pageSize; // request by page if this is a large row @@ -575,6 +577,7 @@ public class CassandraServer implements Cassandra.Iface key, filter, ThriftConversion.fromThrift(consistency_level), + cState, pageSize, timestamp); } @@ -637,7 +640,8 @@ public class CassandraServer implements Cassandra.Iface column_parent, System.currentTimeMillis(), predicate, - consistency_level); + consistency_level, + cState); for (Map.Entry<ByteBuffer, List<ColumnOrSuperColumn>> cf : columnFamiliesMap.entrySet()) counts.put(cf.getKey(), cf.getValue().size()); @@ -787,7 +791,8 @@ public class CassandraServer implements Cassandra.Iface key, new ThriftCASRequest(cfExpected, cfUpdates), ThriftConversion.fromThrift(serial_consistency_level), - ThriftConversion.fromThrift(commit_consistency_level)); + ThriftConversion.fromThrift(commit_consistency_level), + cState); return result == null ? new CASResult(true) : new CASResult(false).setCurrent_values(thriftifyColumnsAsColumns(result.getSortedColumns(), System.currentTimeMillis())); @@ -2083,7 +2088,7 @@ public class CassandraServer implements Cassandra.Iface SliceQueryFilter filter = new SliceQueryFilter(deoverlapped, request.reversed, request.count); ThriftValidation.validateKey(metadata, request.key); commands.add(ReadCommand.create(keyspace, request.key, request.column_parent.getColumn_family(), System.currentTimeMillis(), filter)); - return getSlice(commands, request.column_parent.isSetSuper_column(), consistencyLevel).entrySet().iterator().next().getValue(); + return getSlice(commands, request.column_parent.isSetSuper_column(), consistencyLevel, cState).entrySet().iterator().next().getValue(); } catch (RequestValidationException e) {