http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/PartitionRangeReadQuery.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadQuery.java b/src/java/org/apache/cassandra/db/PartitionRangeReadQuery.java new file mode 100644 index 0000000..560fbe9 --- /dev/null +++ b/src/java/org/apache/cassandra/db/PartitionRangeReadQuery.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.filter.DataLimits; +import org.apache.cassandra.db.filter.RowFilter; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.service.pager.PagingState; +import org.apache.cassandra.service.pager.PartitionRangeQueryPager; +import org.apache.cassandra.service.pager.QueryPager; +import org.apache.cassandra.transport.ProtocolVersion; + +/** + * A {@code ReadQuery} for a range of partitions. + */ +public interface PartitionRangeReadQuery extends ReadQuery +{ + static ReadQuery create(TableMetadata table, + int nowInSec, + ColumnFilter columnFilter, + RowFilter rowFilter, + DataLimits limits, + DataRange dataRange) + { + if (table.isVirtual()) + return VirtualTablePartitionRangeReadQuery.create(table, nowInSec, columnFilter, rowFilter, limits, dataRange); + + return PartitionRangeReadCommand.create(table, nowInSec, columnFilter, rowFilter, limits, dataRange); + } + + DataRange dataRange(); + + /** + * Creates a new {@code PartitionRangeReadQuery} with the updated limits. + * + * @param newLimits the new limits + * @return the new {@code PartitionRangeReadQuery} + */ + PartitionRangeReadQuery withUpdatedLimit(DataLimits newLimits); + + /** + * Creates a new {@code PartitionRangeReadQuery} with the updated limits and data range. + * + * @param newLimits the new limits + * @return the new {@code PartitionRangeReadQuery} + */ + PartitionRangeReadQuery withUpdatedLimitsAndDataRange(DataLimits newLimits, DataRange newDataRange); + + default QueryPager getPager(PagingState pagingState, ProtocolVersion protocolVersion) + { + return new PartitionRangeQueryPager(this, pagingState, protocolVersion); + } + + default boolean selectsKey(DecoratedKey key) + { + if (!dataRange().contains(key)) + return false; + + return rowFilter().partitionKeyRestrictionsAreSatisfiedBy(key, metadata().partitionKeyType); + } + + default boolean selectsClustering(DecoratedKey key, Clustering clustering) + { + if (clustering == Clustering.STATIC_CLUSTERING) + return !columnFilter().fetchedColumns().statics.isEmpty(); + + if (!dataRange().clusteringIndexFilter(key).selects(clustering)) + return false; + return rowFilter().clusteringKeyRestrictionsAreSatisfiedBy(clustering); + } + + default boolean selectsFullPartition() + { + return metadata().isStaticCompactTable() || + (dataRange().selectsAllPartition() && !rowFilter().hasExpressionOnClusteringOrRegularColumns()); + } +}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/ReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java index 128f8f3..064dd77 100644 --- a/src/java/org/apache/cassandra/db/ReadCommand.java +++ b/src/java/org/apache/cassandra/db/ReadCommand.java @@ -19,7 +19,6 @@ package org.apache.cassandra.db; import java.io.IOException; import java.util.function.LongPredicate; -import java.util.function.Predicate; import javax.annotation.Nullable; @@ -29,7 +28,6 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.config.*; import org.apache.cassandra.db.filter.*; import org.apache.cassandra.db.monitoring.ApproximateTime; -import org.apache.cassandra.db.monitoring.MonitorableImpl; import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.transform.StoppingTransformation; @@ -37,6 +35,7 @@ import org.apache.cassandra.db.transform.Transformation; import org.apache.cassandra.exceptions.UnknownIndexException; import org.apache.cassandra.index.Index; import org.apache.cassandra.index.IndexNotAvailableException; +import org.apache.cassandra.index.IndexRegistry; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; @@ -57,19 +56,13 @@ import org.apache.cassandra.utils.FBUtilities; * <p> * This contains all the informations needed to do a local read. */ -public abstract class ReadCommand extends MonitorableImpl implements ReadQuery +public abstract class ReadCommand extends AbstractReadQuery { private static final int TEST_ITERATION_DELAY_MILLIS = Integer.parseInt(System.getProperty("cassandra.test.read_iteration_delay_ms", "0")); protected static final Logger logger = LoggerFactory.getLogger(ReadCommand.class); public static final IVersionedSerializer<ReadCommand> serializer = new Serializer(); private final Kind kind; - private final TableMetadata metadata; - private final int nowInSec; - - private final ColumnFilter columnFilter; - private final RowFilter rowFilter; - private final DataLimits limits; private final boolean isDigestQuery; // if a digest query, the version for which the digest is expected. Ignored if not a digest. @@ -115,14 +108,10 @@ public abstract class ReadCommand extends MonitorableImpl implements ReadQuery DataLimits limits, IndexMetadata index) { + super(metadata, nowInSec, columnFilter, rowFilter, limits); this.kind = kind; this.isDigestQuery = isDigestQuery; this.digestVersion = digestVersion; - this.metadata = metadata; - this.nowInSec = nowInSec; - this.columnFilter = columnFilter; - this.rowFilter = rowFilter; - this.limits = limits; this.index = index; } @@ -140,30 +129,6 @@ public abstract class ReadCommand extends MonitorableImpl implements ReadQuery public abstract ReadCommand withUpdatedLimit(DataLimits newLimits); /** - * The metadata for the table queried. - * - * @return the metadata for the table queried. - */ - public TableMetadata metadata() - { - return metadata; - } - - /** - * The time in seconds to use as "now" for this query. - * <p> - * We use the same time as "now" for the whole query to avoid considering different - * values as expired during the query, which would be buggy (would throw of counting amongst other - * things). - * - * @return the time (in seconds) to use as "now". - */ - public int nowInSec() - { - return nowInSec; - } - - /** * The configured timeout for this command. * * @return the configured timeout for this command. @@ -171,43 +136,6 @@ public abstract class ReadCommand extends MonitorableImpl implements ReadQuery public abstract long getTimeout(); /** - * A filter on which (non-PK) columns must be returned by the query. - * - * @return which columns must be fetched by this query. - */ - public ColumnFilter columnFilter() - { - return columnFilter; - } - - /** - * Filters/Resrictions on CQL rows. - * <p> - * This contains the restrictions that are not directly handled by the - * {@code ClusteringIndexFilter}. More specifically, this includes any non-PK column - * restrictions and can include some PK columns restrictions when those can't be - * satisfied entirely by the clustering index filter (because not all clustering columns - * have been restricted for instance). If there is 2ndary indexes on the table, - * one of this restriction might be handled by a 2ndary index. - * - * @return the filter holding the expression that rows must satisfy. - */ - public RowFilter rowFilter() - { - return rowFilter; - } - - /** - * The limits set on this query. - * - * @return the limits set on this query. - */ - public DataLimits limits() - { - return limits; - } - - /** * Whether this query is a digest one or not. * * @return Whether this query is a digest query. @@ -327,9 +255,8 @@ public abstract class ReadCommand extends MonitorableImpl implements ReadQuery */ public void maybeValidateIndex() { - Index index = getIndex(Keyspace.openAndGetStore(metadata)); if (null != index) - index.validate(this); + IndexRegistry.obtain(metadata()).getIndex(index).validate(this); } /** @@ -388,11 +315,6 @@ public abstract class ReadCommand extends MonitorableImpl implements ReadQuery protected abstract void recordLatency(TableMetrics metric, long latencyNanos); - public PartitionIterator executeInternal(ReadExecutionController controller) - { - return UnfilteredPartitionIterators.filter(executeLocally(controller), nowInSec()); - } - public ReadExecutionController executionController() { return ReadExecutionController.forCommand(this); @@ -410,7 +332,7 @@ public abstract class ReadCommand extends MonitorableImpl implements ReadQuery private final int warningThreshold = DatabaseDescriptor.getTombstoneWarnThreshold(); private final boolean respectTombstoneThresholds = !SchemaConstants.isLocalSystemKeyspace(ReadCommand.this.metadata().keyspace); - private final boolean enforceStrictLiveness = metadata.enforceStrictLiveness(); + private final boolean enforceStrictLiveness = metadata().enforceStrictLiveness(); private int liveRows = 0; private int tombstones = 0; @@ -553,7 +475,7 @@ public abstract class ReadCommand extends MonitorableImpl implements ReadQuery private void maybeDelayForTesting() { - if (!metadata.keyspace.startsWith("system")) + if (!metadata().keyspace.startsWith("system")) FBUtilities.sleepQuietly(TEST_ITERATION_DELAY_MILLIS); } } @@ -605,7 +527,7 @@ public abstract class ReadCommand extends MonitorableImpl implements ReadQuery { StringBuilder sb = new StringBuilder(); sb.append("SELECT ").append(columnFilter()); - sb.append(" FROM ").append(metadata().keyspace).append('.').append(metadata.name); + sb.append(" FROM ").append(metadata().keyspace).append('.').append(metadata().name); appendCQLWhereClause(sb); if (limits() != DataLimits.NONE) @@ -657,11 +579,11 @@ public abstract class ReadCommand extends MonitorableImpl implements ReadQuery out.writeByte(digestFlag(command.isDigestQuery()) | indexFlag(null != command.indexMetadata())); if (command.isDigestQuery()) out.writeUnsignedVInt(command.digestVersion()); - command.metadata.id.serialize(out); + command.metadata().id.serialize(out); out.writeInt(command.nowInSec()); ColumnFilter.serializer.serialize(command.columnFilter(), out, version); RowFilter.serializer.serialize(command.rowFilter(), out, version); - DataLimits.serializer.serialize(command.limits(), out, version, command.metadata.comparator); + DataLimits.serializer.serialize(command.limits(), out, version, command.metadata().comparator); if (null != command.index) IndexMetadata.serializer.serialize(command.index, out, version); @@ -714,11 +636,11 @@ public abstract class ReadCommand extends MonitorableImpl implements ReadQuery { return 2 // kind + flags + (command.isDigestQuery() ? TypeSizes.sizeofUnsignedVInt(command.digestVersion()) : 0) - + command.metadata.id.serializedSize() + + command.metadata().id.serializedSize() + TypeSizes.sizeof(command.nowInSec()) + ColumnFilter.serializer.serializedSize(command.columnFilter(), version) + RowFilter.serializer.serializedSize(command.rowFilter(), version) - + DataLimits.serializer.serializedSize(command.limits(), version, command.metadata.comparator) + + DataLimits.serializer.serializedSize(command.limits(), version, command.metadata().comparator) + command.selectionSerializedSize(version) + command.indexSerializedSize(version); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/ReadQuery.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadQuery.java b/src/java/org/apache/cassandra/db/ReadQuery.java index d527d28..fd94aa1 100644 --- a/src/java/org/apache/cassandra/db/ReadQuery.java +++ b/src/java/org/apache/cassandra/db/ReadQuery.java @@ -17,74 +17,113 @@ */ package org.apache.cassandra.db; +import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.filter.DataLimits; +import org.apache.cassandra.db.filter.RowFilter; import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.exceptions.RequestExecutionException; +import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.pager.QueryPager; import org.apache.cassandra.service.pager.PagingState; import org.apache.cassandra.transport.ProtocolVersion; +import org.apache.cassandra.utils.FBUtilities; /** * Generic abstraction for read queries. - * <p> - * The main implementation of this is {@link ReadCommand} but we have this interface because - * {@link SinglePartitionReadCommand.Group} is also consider as a "read query" but is not a - * {@code ReadCommand}. */ public interface ReadQuery { - ReadQuery EMPTY = new ReadQuery() + public static ReadQuery empty(final TableMetadata metadata) { - public ReadExecutionController executionController() + return new ReadQuery() { - return ReadExecutionController.empty(); - } + public TableMetadata metadata() + { + return metadata; + } - public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState, long queryStartNanoTime) throws RequestExecutionException - { - return EmptyIterators.partition(); - } + public ReadExecutionController executionController() + { + return ReadExecutionController.empty(); + } - public PartitionIterator executeInternal(ReadExecutionController controller) - { - return EmptyIterators.partition(); - } + public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState, long queryStartNanoTime) throws RequestExecutionException + { + return EmptyIterators.partition(); + } - public UnfilteredPartitionIterator executeLocally(ReadExecutionController executionController) - { - return EmptyIterators.unfilteredPartition(executionController.metadata()); - } + public PartitionIterator executeInternal(ReadExecutionController controller) + { + return EmptyIterators.partition(); + } - public DataLimits limits() - { - // What we return here doesn't matter much in practice. However, returning DataLimits.NONE means - // "no particular limit", which makes SelectStatement.execute() take the slightly more complex "paging" - // path. Not a big deal but it's easy enough to return a limit of 0 rows which avoids this. - return DataLimits.cqlLimits(0); - } + public UnfilteredPartitionIterator executeLocally(ReadExecutionController executionController) + { + return EmptyIterators.unfilteredPartition(executionController.metadata()); + } - public QueryPager getPager(PagingState state, ProtocolVersion protocolVersion) - { - return QueryPager.EMPTY; - } + public DataLimits limits() + { + // What we return here doesn't matter much in practice. However, returning DataLimits.NONE means + // "no particular limit", which makes SelectStatement.execute() take the slightly more complex "paging" + // path. Not a big deal but it's easy enough to return a limit of 0 rows which avoids this. + return DataLimits.cqlLimits(0); + } - public boolean selectsKey(DecoratedKey key) - { - return false; - } + public QueryPager getPager(PagingState state, ProtocolVersion protocolVersion) + { + return QueryPager.EMPTY; + } - public boolean selectsClustering(DecoratedKey key, Clustering clustering) - { - return false; - } + public boolean selectsKey(DecoratedKey key) + { + return false; + } - @Override - public boolean selectsFullPartition() - { - return false; - } - }; + public boolean selectsClustering(DecoratedKey key, Clustering clustering) + { + return false; + } + + @Override + public int nowInSec() + { + return FBUtilities.nowInSeconds(); + } + + @Override + public boolean selectsFullPartition() + { + return false; + } + + @Override + public boolean isEmpty() + { + return true; + } + + @Override + public RowFilter rowFilter() + { + return RowFilter.NONE; + } + + @Override + public ColumnFilter columnFilter() + { + return ColumnFilter.NONE; + } + }; + } + + /** + * The metadata for the table this is a query on. + * + * @return the metadata for the table this is a query on. + */ + public TableMetadata metadata(); /** * Starts a new read operation. @@ -156,8 +195,63 @@ public interface ReadQuery public boolean selectsClustering(DecoratedKey key, Clustering clustering); /** + * The time in seconds to use as "now" for this query. + * <p> + * We use the same time as "now" for the whole query to avoid considering different + * values as expired during the query, which would be buggy (would throw of counting amongst other + * things). + * + * @return the time (in seconds) to use as "now". + */ + public int nowInSec(); + + /** * Checks if this {@code ReadQuery} selects full partitions, that is it has no filtering on clustering or regular columns. * @return {@code true} if this {@code ReadQuery} selects full partitions, {@code false} otherwise. */ public boolean selectsFullPartition(); + + /** + * Filters/Resrictions on CQL rows. + * <p> + * This contains the restrictions that are not directly handled by the + * {@code ClusteringIndexFilter}. More specifically, this includes any non-PK column + * restrictions and can include some PK columns restrictions when those can't be + * satisfied entirely by the clustering index filter (because not all clustering columns + * have been restricted for instance). If there is 2ndary indexes on the table, + * one of this restriction might be handled by a 2ndary index. + * + * @return the filter holding the expression that rows must satisfy. + */ + public RowFilter rowFilter(); + + /** + * A filter on which (non-PK) columns must be returned by the query. + * + * @return which columns must be fetched by this query. + */ + public ColumnFilter columnFilter(); + + /** + * Whether this query is known to return nothing upfront. + * <p> + * This is overridden by the {@code ReadQuery} created through {@link #empty(TableMetadata)}, and that's probably the + * only place that should override it. + * + * @return if this method is guaranteed to return no results whatsoever. + */ + public default boolean isEmpty() + { + return false; + } + + /** + * If the index manager for the table determines that there's an applicable + * 2i that can be used to execute this query, call its (optional) + * validation method to check that nothing in this query's parameters + * violates the implementation specific validation rules. + */ + default void maybeValidateIndex() + { + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java index 64a25f8..7214106 100644 --- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java @@ -20,14 +20,11 @@ package org.apache.cassandra.db; import java.io.IOException; import java.nio.ByteBuffer; import java.util.*; -import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; -import org.apache.commons.lang3.tuple.Pair; - import org.apache.cassandra.cache.IRowCacheEntry; import org.apache.cassandra.cache.RowCacheKey; import org.apache.cassandra.cache.RowCacheSentinel; @@ -53,9 +50,7 @@ import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.service.CacheService; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.StorageProxy; -import org.apache.cassandra.service.pager.*; import org.apache.cassandra.tracing.Tracing; -import org.apache.cassandra.transport.ProtocolVersion; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.SearchIterator; import org.apache.cassandra.utils.btree.BTreeSet; @@ -63,7 +58,7 @@ import org.apache.cassandra.utils.btree.BTreeSet; /** * A read command that selects a (part of a) single partition. */ -public class SinglePartitionReadCommand extends ReadCommand +public class SinglePartitionReadCommand extends ReadCommand implements SinglePartitionReadQuery { protected static final SelectionDeserializer selectionDeserializer = new Deserializer(); @@ -314,6 +309,7 @@ public class SinglePartitionReadCommand extends ReadCommand indexMetadata()); } + @Override public SinglePartitionReadCommand withUpdatedLimit(DataLimits newLimits) { return new SinglePartitionReadCommand(isDigestQuery(), @@ -328,11 +324,13 @@ public class SinglePartitionReadCommand extends ReadCommand indexMetadata()); } + @Override public DecoratedKey partitionKey() { return partitionKey; } + @Override public ClusteringIndexFilter clusteringIndexFilter() { return clusteringIndexFilter; @@ -348,35 +346,7 @@ public class SinglePartitionReadCommand extends ReadCommand return DatabaseDescriptor.getReadRpcTimeout(); } - public boolean selectsKey(DecoratedKey key) - { - if (!this.partitionKey().equals(key)) - return false; - - return rowFilter().partitionKeyRestrictionsAreSatisfiedBy(key, metadata().partitionKeyType); - } - - public boolean selectsClustering(DecoratedKey key, Clustering clustering) - { - if (clustering == Clustering.STATIC_CLUSTERING) - return !columnFilter().fetchedColumns().statics.isEmpty(); - - if (!clusteringIndexFilter().selects(clustering)) - return false; - - return rowFilter().clusteringKeyRestrictionsAreSatisfiedBy(clustering); - } - - /** - * Returns a new command suitable to paging from the last returned row. - * - * @param lastReturned the last row returned by the previous page. The newly created command - * will only query row that comes after this (in query order). This can be {@code null} if this - * is the first page. - * @param limits the limits to use for the page to query. - * - * @return the newly create command. - */ + @Override public SinglePartitionReadCommand forPaging(Clustering lastReturned, DataLimits limits) { // We shouldn't have set digest yet when reaching that point @@ -395,16 +365,6 @@ public class SinglePartitionReadCommand extends ReadCommand return StorageProxy.read(Group.one(this), consistency, clientState, queryStartNanoTime); } - public SinglePartitionPager getPager(PagingState pagingState, ProtocolVersion protocolVersion) - { - return getPager(this, pagingState, protocolVersion); - } - - private static SinglePartitionPager getPager(SinglePartitionReadCommand command, PagingState pagingState, ProtocolVersion protocolVersion) - { - return new SinglePartitionPager(command, pagingState, protocolVersion); - } - protected void recordLatency(TableMetrics metric, long latencyNanos) { metric.readLatency.addNano(latencyNanos); @@ -1054,23 +1014,34 @@ public class SinglePartitionReadCommand extends ReadCommand /** * Groups multiple single partition read commands. */ - public static class Group implements ReadQuery + public static class Group extends SinglePartitionReadQuery.Group<SinglePartitionReadCommand> { - public final List<SinglePartitionReadCommand> commands; - private final DataLimits limits; - private final int nowInSec; - private final boolean selectsFullPartitions; + public static Group create(TableMetadata metadata, + int nowInSec, + ColumnFilter columnFilter, + RowFilter rowFilter, + DataLimits limits, + List<DecoratedKey> partitionKeys, + ClusteringIndexFilter clusteringIndexFilter) + { + List<SinglePartitionReadCommand> commands = new ArrayList<>(partitionKeys.size()); + for (DecoratedKey partitionKey : partitionKeys) + { + commands.add(SinglePartitionReadCommand.create(metadata, + nowInSec, + columnFilter, + rowFilter, + limits, + partitionKey, + clusteringIndexFilter)); + } + + return new Group(commands, limits); + } public Group(List<SinglePartitionReadCommand> commands, DataLimits limits) { - assert !commands.isEmpty(); - this.commands = commands; - this.limits = limits; - SinglePartitionReadCommand firstCommand = commands.get(0); - this.nowInSec = firstCommand.nowInSec(); - this.selectsFullPartitions = firstCommand.selectsFullPartition(); - for (int i = 1; i < commands.size(); i++) - assert commands.get(i).nowInSec() == nowInSec; + super(commands, limits); } public static Group one(SinglePartitionReadCommand command) @@ -1082,97 +1053,6 @@ public class SinglePartitionReadCommand extends ReadCommand { return StorageProxy.read(this, consistency, clientState, queryStartNanoTime); } - - public int nowInSec() - { - return nowInSec; - } - - public DataLimits limits() - { - return limits; - } - - public TableMetadata metadata() - { - return commands.get(0).metadata(); - } - - @Override - public boolean selectsFullPartition() - { - return selectsFullPartitions; - } - - public ReadExecutionController executionController() - { - // Note that the only difference between the command in a group must be the partition key on which - // they applied. So as far as ReadOrderGroup is concerned, we can use any of the commands to start one. - return commands.get(0).executionController(); - } - - public PartitionIterator executeInternal(ReadExecutionController controller) - { - // Note that the only difference between the command in a group must be the partition key on which - // they applied. - boolean enforceStrictLiveness = commands.get(0).metadata().enforceStrictLiveness(); - return limits.filter(UnfilteredPartitionIterators.filter(executeLocally(controller, false), nowInSec), - nowInSec, - selectsFullPartitions, - enforceStrictLiveness); - } - - public UnfilteredPartitionIterator executeLocally(ReadExecutionController executionController) - { - return executeLocally(executionController, true); - } - - /** - * Implementation of {@link ReadQuery#executeLocally(ReadExecutionController)}. - * - * @param executionController - the {@code ReadExecutionController} protecting the read. - * @param sort - whether to sort the inner commands by partition key, required for merging the iterator - * later on. This will be false when called by {@link ReadQuery#executeInternal(ReadExecutionController)} - * because in this case it is safe to do so as there is no merging involved and we don't want to - * change the old behavior which was to not sort by partition. - * - * @return - the iterator that can be used to retrieve the query result. - */ - private UnfilteredPartitionIterator executeLocally(ReadExecutionController executionController, boolean sort) - { - List<Pair<DecoratedKey, UnfilteredPartitionIterator>> partitions = new ArrayList<>(commands.size()); - for (SinglePartitionReadCommand cmd : commands) - partitions.add(Pair.of(cmd.partitionKey, cmd.executeLocally(executionController))); - - if (sort) - Collections.sort(partitions, (p1, p2) -> p1.getLeft().compareTo(p2.getLeft())); - - return UnfilteredPartitionIterators.concat(partitions.stream().map(p -> p.getRight()).collect(Collectors.toList())); - } - - public QueryPager getPager(PagingState pagingState, ProtocolVersion protocolVersion) - { - if (commands.size() == 1) - return SinglePartitionReadCommand.getPager(commands.get(0), pagingState, protocolVersion); - - return new MultiPartitionPager(this, pagingState, protocolVersion); - } - - public boolean selectsKey(DecoratedKey key) - { - return Iterables.any(commands, c -> c.selectsKey(key)); - } - - public boolean selectsClustering(DecoratedKey key, Clustering clustering) - { - return Iterables.any(commands, c -> c.selectsClustering(key, clustering)); - } - - @Override - public String toString() - { - return commands.toString(); - } } private static class Deserializer extends SelectionDeserializer http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/SinglePartitionReadQuery.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadQuery.java b/src/java/org/apache/cassandra/db/SinglePartitionReadQuery.java new file mode 100644 index 0000000..f9f0014 --- /dev/null +++ b/src/java/org/apache/cassandra/db/SinglePartitionReadQuery.java @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import com.google.common.collect.Iterables; + +import org.apache.commons.lang3.tuple.Pair; + +import org.apache.cassandra.db.filter.ClusteringIndexFilter; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.filter.DataLimits; +import org.apache.cassandra.db.filter.RowFilter; +import org.apache.cassandra.db.partitions.PartitionIterator; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.service.pager.MultiPartitionPager; +import org.apache.cassandra.service.pager.PagingState; +import org.apache.cassandra.service.pager.QueryPager; +import org.apache.cassandra.service.pager.SinglePartitionPager; +import org.apache.cassandra.transport.ProtocolVersion; + +/** + * A {@code ReadQuery} for a single partition. + */ +public interface SinglePartitionReadQuery extends ReadQuery +{ + public static Group<? extends SinglePartitionReadQuery> createGroup(TableMetadata metadata, + int nowInSec, + ColumnFilter columnFilter, + RowFilter rowFilter, + DataLimits limits, + List<DecoratedKey> partitionKeys, + ClusteringIndexFilter clusteringIndexFilter) + { + return metadata.isVirtual() + ? VirtualTableSinglePartitionReadQuery.Group.create(metadata, nowInSec, columnFilter, rowFilter, limits, partitionKeys, clusteringIndexFilter) + : SinglePartitionReadCommand.Group.create(metadata, nowInSec, columnFilter, rowFilter, limits, partitionKeys, clusteringIndexFilter); + } + + + /** + * Creates a new read query on a single partition. + * + * @param metadata the table to query. + * @param nowInSec the time in seconds to use are "now" for this query. + * @param key the partition key for the partition to query. + * @param columnFilter the column filter to use for the query. + * @param filter the clustering index filter to use for the query. + * + * @return a newly created read query. The returned query will use no row filter and have no limits. + */ + public static SinglePartitionReadQuery create(TableMetadata metadata, + int nowInSec, + DecoratedKey key, + ColumnFilter columnFilter, + ClusteringIndexFilter filter) + { + return create(metadata, nowInSec, columnFilter, RowFilter.NONE, DataLimits.NONE, key, filter); + } + + /** + * Creates a new read query on a single partition. + * + * @param metadata the table to query. + * @param nowInSec the time in seconds to use are "now" for this query. + * @param columnFilter the column filter to use for the query. + * @param rowFilter the row filter to use for the query. + * @param limits the limits to use for the query. + * @param partitionKey the partition key for the partition to query. + * @param clusteringIndexFilter the clustering index filter to use for the query. + * + * @return a newly created read query. + */ + public static SinglePartitionReadQuery create(TableMetadata metadata, + int nowInSec, + ColumnFilter columnFilter, + RowFilter rowFilter, + DataLimits limits, + DecoratedKey partitionKey, + ClusteringIndexFilter clusteringIndexFilter) + { + return metadata.isVirtual() + ? VirtualTableSinglePartitionReadQuery.create(metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter) + : SinglePartitionReadCommand.create(metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter); + } + + /** + * Returns the key of the partition queried by this {@code ReadQuery} + * @return the key of the partition queried + */ + DecoratedKey partitionKey(); + + /** + * Creates a new {@code SinglePartitionReadQuery} with the specified limits. + * + * @param newLimits the new limits + * @return the new {@code SinglePartitionReadQuery} + */ + SinglePartitionReadQuery withUpdatedLimit(DataLimits newLimits); + + /** + * Returns a new {@code SinglePartitionReadQuery} suitable to paging from the last returned row. + * + * @param lastReturned the last row returned by the previous page. The newly created query + * will only query row that comes after this (in query order). This can be {@code null} if this + * is the first page. + * @param limits the limits to use for the page to query. + * + * @return the newly create query. + */ + SinglePartitionReadQuery forPaging(Clustering lastReturned, DataLimits limits); + + @Override + default SinglePartitionPager getPager(PagingState pagingState, ProtocolVersion protocolVersion) + { + return new SinglePartitionPager(this, pagingState, protocolVersion); + } + + ClusteringIndexFilter clusteringIndexFilter(); + + default boolean selectsKey(DecoratedKey key) + { + if (!this.partitionKey().equals(key)) + return false; + + return rowFilter().partitionKeyRestrictionsAreSatisfiedBy(key, metadata().partitionKeyType); + } + + default boolean selectsClustering(DecoratedKey key, Clustering clustering) + { + if (clustering == Clustering.STATIC_CLUSTERING) + return !columnFilter().fetchedColumns().statics.isEmpty(); + + if (!clusteringIndexFilter().selects(clustering)) + return false; + + return rowFilter().clusteringKeyRestrictionsAreSatisfiedBy(clustering); + } + + /** + * Groups multiple single partition read queries. + */ + abstract class Group<T extends SinglePartitionReadQuery> implements ReadQuery + { + public final List<T> queries; + private final DataLimits limits; + private final int nowInSec; + private final boolean selectsFullPartitions; + + public Group(List<T> queries, DataLimits limits) + { + assert !queries.isEmpty(); + this.queries = queries; + this.limits = limits; + T firstQuery = queries.get(0); + this.nowInSec = firstQuery.nowInSec(); + this.selectsFullPartitions = firstQuery.selectsFullPartition(); + for (int i = 1; i < queries.size(); i++) + assert queries.get(i).nowInSec() == nowInSec; + } + + public int nowInSec() + { + return nowInSec; + } + + public DataLimits limits() + { + return limits; + } + + public TableMetadata metadata() + { + return queries.get(0).metadata(); + } + + @Override + public boolean selectsFullPartition() + { + return selectsFullPartitions; + } + + public ReadExecutionController executionController() + { + // Note that the only difference between the queries in a group must be the partition key on which + // they applied. So as far as ReadOrderGroup is concerned, we can use any of the queries to start one. + return queries.get(0).executionController(); + } + + public PartitionIterator executeInternal(ReadExecutionController controller) + { + // Note that the only difference between the queries in a group must be the partition key on which + // they applied. + boolean enforceStrictLiveness = queries.get(0).metadata().enforceStrictLiveness(); + return limits.filter(UnfilteredPartitionIterators.filter(executeLocally(controller, false), nowInSec), + nowInSec, + selectsFullPartitions, + enforceStrictLiveness); + } + + public UnfilteredPartitionIterator executeLocally(ReadExecutionController executionController) + { + return executeLocally(executionController, true); + } + + /** + * Implementation of {@link ReadQuery#executeLocally(ReadExecutionController)}. + * + * @param executionController - the {@code ReadExecutionController} protecting the read. + * @param sort - whether to sort the inner queries by partition key, required for merging the iterator + * later on. This will be false when called by {@link ReadQuery#executeInternal(ReadExecutionController)} + * because in this case it is safe to do so as there is no merging involved and we don't want to + * change the old behavior which was to not sort by partition. + * + * @return - the iterator that can be used to retrieve the query result. + */ + private UnfilteredPartitionIterator executeLocally(ReadExecutionController executionController, boolean sort) + { + List<Pair<DecoratedKey, UnfilteredPartitionIterator>> partitions = new ArrayList<>(queries.size()); + for (T query : queries) + partitions.add(Pair.of(query.partitionKey(), query.executeLocally(executionController))); + + if (sort) + Collections.sort(partitions, (p1, p2) -> p1.getLeft().compareTo(p2.getLeft())); + + return UnfilteredPartitionIterators.concat(partitions.stream().map(p -> p.getRight()).collect(Collectors.toList())); + } + + public QueryPager getPager(PagingState pagingState, ProtocolVersion protocolVersion) + { + if (queries.size() == 1) + return new SinglePartitionPager(queries.get(0), pagingState, protocolVersion); + + return new MultiPartitionPager<T>(this, pagingState, protocolVersion); + } + + public boolean selectsKey(DecoratedKey key) + { + return Iterables.any(queries, c -> c.selectsKey(key)); + } + + public boolean selectsClustering(DecoratedKey key, Clustering clustering) + { + return Iterables.any(queries, c -> c.selectsClustering(key, clustering)); + } + + @Override + public RowFilter rowFilter() + { + // Note that the only difference between the query in a group must be the partition key on which + // they applied. + return queries.get(0).rowFilter(); + } + + @Override + public ColumnFilter columnFilter() + { + // Note that the only difference between the query in a group must be the partition key on which + // they applied. + return queries.get(0).columnFilter(); + } + + @Override + public String toString() + { + return queries.toString(); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/VirtualTablePartitionRangeReadQuery.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/VirtualTablePartitionRangeReadQuery.java b/src/java/org/apache/cassandra/db/VirtualTablePartitionRangeReadQuery.java new file mode 100644 index 0000000..48cafa1 --- /dev/null +++ b/src/java/org/apache/cassandra/db/VirtualTablePartitionRangeReadQuery.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.filter.DataLimits; +import org.apache.cassandra.db.filter.RowFilter; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry; +import org.apache.cassandra.db.virtual.VirtualTable; +import org.apache.cassandra.schema.TableMetadata; + +/** + * A read query that selects a (part of a) range of partitions of a virtual table. + */ +public class VirtualTablePartitionRangeReadQuery extends VirtualTableReadQuery implements PartitionRangeReadQuery +{ + private final DataRange dataRange; + + public static VirtualTablePartitionRangeReadQuery create(TableMetadata metadata, + int nowInSec, + ColumnFilter columnFilter, + RowFilter rowFilter, + DataLimits limits, + DataRange dataRange) + { + return new VirtualTablePartitionRangeReadQuery(metadata, + nowInSec, + columnFilter, + rowFilter, + limits, + dataRange); + } + + private VirtualTablePartitionRangeReadQuery(TableMetadata metadata, + int nowInSec, + ColumnFilter columnFilter, + RowFilter rowFilter, + DataLimits limits, + DataRange dataRange) + { + super(metadata, nowInSec, columnFilter, rowFilter, limits); + this.dataRange = dataRange; + } + + @Override + public DataRange dataRange() + { + return dataRange; + } + + @Override + public PartitionRangeReadQuery withUpdatedLimit(DataLimits newLimits) + { + return new VirtualTablePartitionRangeReadQuery(metadata(), + nowInSec(), + columnFilter(), + rowFilter(), + newLimits, + dataRange()); + } + + @Override + public PartitionRangeReadQuery withUpdatedLimitsAndDataRange(DataLimits newLimits, DataRange newDataRange) + { + return new VirtualTablePartitionRangeReadQuery(metadata(), + nowInSec(), + columnFilter(), + rowFilter(), + newLimits, + newDataRange); + } + + @Override + protected UnfilteredPartitionIterator queryVirtualTable() + { + VirtualTable view = VirtualKeyspaceRegistry.instance.getTableNullable(metadata().id); + return view.select(dataRange, columnFilter()); + } + + @Override + protected void appendCQLWhereClause(StringBuilder sb) + { + if (dataRange.isUnrestricted() && rowFilter().isEmpty()) + return; + + sb.append(" WHERE "); + // We put the row filter first because the data range can end by "ORDER BY" + if (!rowFilter().isEmpty()) + { + sb.append(rowFilter()); + if (!dataRange.isUnrestricted()) + sb.append(" AND "); + } + if (!dataRange.isUnrestricted()) + sb.append(dataRange.toCQLString(metadata())); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/VirtualTableReadQuery.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/VirtualTableReadQuery.java b/src/java/org/apache/cassandra/db/VirtualTableReadQuery.java new file mode 100644 index 0000000..f2c9a49 --- /dev/null +++ b/src/java/org/apache/cassandra/db/VirtualTableReadQuery.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.filter.DataLimits; +import org.apache.cassandra.db.filter.RowFilter; +import org.apache.cassandra.db.partitions.PartitionIterator; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.exceptions.RequestExecutionException; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.service.ClientState; + +/** + * Base class for the {@code ReadQuery} implementations use to query virtual tables. + */ +public abstract class VirtualTableReadQuery extends AbstractReadQuery +{ + protected VirtualTableReadQuery(TableMetadata metadata, + int nowInSec, + ColumnFilter columnFilter, + RowFilter rowFilter, + DataLimits limits) + { + super(metadata, nowInSec, columnFilter, rowFilter, limits); + } + + @Override + public ReadExecutionController executionController() + { + return ReadExecutionController.empty(); + } + + @Override + public PartitionIterator execute(ConsistencyLevel consistency, + ClientState clientState, + long queryStartNanoTime) throws RequestExecutionException + { + return executeInternal(executionController()); + } + + @Override + public UnfilteredPartitionIterator executeLocally(ReadExecutionController executionController) + { + UnfilteredPartitionIterator resultIterator = queryVirtualTable(); + return limits().filter(rowFilter().filter(resultIterator, nowInSec()), nowInSec(), selectsFullPartition()); + } + + protected abstract UnfilteredPartitionIterator queryVirtualTable(); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/VirtualTableSinglePartitionReadQuery.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/VirtualTableSinglePartitionReadQuery.java b/src/java/org/apache/cassandra/db/VirtualTableSinglePartitionReadQuery.java new file mode 100644 index 0000000..11f1f77 --- /dev/null +++ b/src/java/org/apache/cassandra/db/VirtualTableSinglePartitionReadQuery.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import org.apache.cassandra.db.filter.ClusteringIndexFilter; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.filter.DataLimits; +import org.apache.cassandra.db.filter.RowFilter; +import org.apache.cassandra.db.partitions.PartitionIterator; +import org.apache.cassandra.db.partitions.PartitionIterators; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry; +import org.apache.cassandra.db.virtual.VirtualTable; +import org.apache.cassandra.exceptions.RequestExecutionException; +import org.apache.cassandra.schema.ColumnMetadata; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.service.ClientState; + +/** + * A read query that selects a (part of a) single partition of a virtual table. + */ +public class VirtualTableSinglePartitionReadQuery extends VirtualTableReadQuery implements SinglePartitionReadQuery +{ + private final DecoratedKey partitionKey; + private final ClusteringIndexFilter clusteringIndexFilter; + + public static VirtualTableSinglePartitionReadQuery create(TableMetadata metadata, + int nowInSec, + ColumnFilter columnFilter, + RowFilter rowFilter, + DataLimits limits, + DecoratedKey partitionKey, + ClusteringIndexFilter clusteringIndexFilter) + { + return new VirtualTableSinglePartitionReadQuery(metadata, + nowInSec, + columnFilter, + rowFilter, + limits, + partitionKey, + clusteringIndexFilter); + } + + private VirtualTableSinglePartitionReadQuery(TableMetadata metadata, + int nowInSec, + ColumnFilter columnFilter, + RowFilter rowFilter, + DataLimits limits, + DecoratedKey partitionKey, + ClusteringIndexFilter clusteringIndexFilter) + { + super(metadata, nowInSec, columnFilter, rowFilter, limits); + this.partitionKey = partitionKey; + this.clusteringIndexFilter = clusteringIndexFilter; + } + + @Override + protected void appendCQLWhereClause(StringBuilder sb) + { + sb.append(" WHERE "); + + sb.append(ColumnMetadata.toCQLString(metadata().partitionKeyColumns())).append(" = "); + DataRange.appendKeyString(sb, metadata().partitionKeyType, partitionKey().getKey()); + + // We put the row filter first because the clustering index filter can end by "ORDER BY" + if (!rowFilter().isEmpty()) + sb.append(" AND ").append(rowFilter()); + + String filterString = clusteringIndexFilter().toCQLString(metadata()); + if (!filterString.isEmpty()) + sb.append(" AND ").append(filterString); + } + + @Override + public ClusteringIndexFilter clusteringIndexFilter() + { + return clusteringIndexFilter; + } + + @Override + public boolean selectsFullPartition() + { + return clusteringIndexFilter.selectsAllPartition() && !rowFilter().hasExpressionOnClusteringOrRegularColumns(); + } + + @Override + public DecoratedKey partitionKey() + { + return partitionKey; + } + + @Override + public SinglePartitionReadQuery withUpdatedLimit(DataLimits newLimits) + { + return new VirtualTableSinglePartitionReadQuery(metadata(), + nowInSec(), + columnFilter(), + rowFilter(), + newLimits, + partitionKey(), + clusteringIndexFilter); + } + + @Override + public SinglePartitionReadQuery forPaging(Clustering lastReturned, DataLimits limits) + { + return new VirtualTableSinglePartitionReadQuery(metadata(), + nowInSec(), + columnFilter(), + rowFilter(), + limits, + partitionKey(), + lastReturned == null ? clusteringIndexFilter + : clusteringIndexFilter.forPaging(metadata().comparator, + lastReturned, + false)); + } + + @Override + protected UnfilteredPartitionIterator queryVirtualTable() + { + VirtualTable view = VirtualKeyspaceRegistry.instance.getTableNullable(metadata().id); + return view.select(partitionKey, clusteringIndexFilter, columnFilter()); + } + + /** + * Groups multiple single partition read queries. + */ + public static class Group extends SinglePartitionReadQuery.Group<VirtualTableSinglePartitionReadQuery> + { + public static Group create(TableMetadata metadata, + int nowInSec, + ColumnFilter columnFilter, + RowFilter rowFilter, + DataLimits limits, + List<DecoratedKey> partitionKeys, + ClusteringIndexFilter clusteringIndexFilter) + { + List<VirtualTableSinglePartitionReadQuery> queries = new ArrayList<>(partitionKeys.size()); + for (DecoratedKey partitionKey : partitionKeys) + { + queries.add(VirtualTableSinglePartitionReadQuery.create(metadata, + nowInSec, + columnFilter, + rowFilter, + limits, + partitionKey, + clusteringIndexFilter)); + } + + return new Group(queries, limits); + } + + public Group(List<VirtualTableSinglePartitionReadQuery> queries, DataLimits limits) + { + super(queries, limits); + } + + public static Group one(VirtualTableSinglePartitionReadQuery query) + { + return new Group(Collections.singletonList(query), query.limits()); + } + + public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState, long queryStartNanoTime) throws RequestExecutionException + { + if (queries.size() == 1) + return queries.get(0).execute(consistency, clientState, queryStartNanoTime); + + return PartitionIterators.concat(queries.stream() + .map(q -> q.execute(consistency, clientState, queryStartNanoTime)) + .collect(Collectors.toList())); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/filter/ColumnFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java index 1d7d1c8..20a1656 100644 --- a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java +++ b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java @@ -65,6 +65,8 @@ import org.apache.cassandra.schema.TableMetadata; */ public class ColumnFilter { + public static final ColumnFilter NONE = selection(RegularAndStaticColumns.NONE); + public static final Serializer serializer = new Serializer(); // True if _fetched_ includes all regular columns (and any static in _queried_), in which case metadata must not be http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java index 70759cf..9064b0f 100644 --- a/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java +++ b/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java @@ -24,7 +24,7 @@ import org.apache.cassandra.db.transform.MorePartitions; import org.apache.cassandra.db.transform.Transformation; import org.apache.cassandra.utils.AbstractIterator; -import org.apache.cassandra.db.SinglePartitionReadCommand; +import org.apache.cassandra.db.SinglePartitionReadQuery; import org.apache.cassandra.db.rows.*; public abstract class PartitionIterators @@ -32,15 +32,15 @@ public abstract class PartitionIterators private PartitionIterators() {} @SuppressWarnings("resource") // The created resources are returned right away - public static RowIterator getOnlyElement(final PartitionIterator iter, SinglePartitionReadCommand command) + public static RowIterator getOnlyElement(final PartitionIterator iter, SinglePartitionReadQuery query) { // If the query has no results, we'll get an empty iterator, but we still // want a RowIterator out of this method, so we return an empty one. RowIterator toReturn = iter.hasNext() ? iter.next() - : EmptyIterators.row(command.metadata(), - command.partitionKey(), - command.clusteringIndexFilter().isReversed()); + : EmptyIterators.row(query.metadata(), + query.partitionKey(), + query.clusteringIndexFilter().isReversed()); // Note that in general, we should wrap the result so that it's close method actually // close the whole PartitionIterator. http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java index a549458..2dc566a 100644 --- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java +++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java @@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.db.*; import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.index.IndexRegistry; import org.apache.cassandra.io.util.*; import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.schema.Schema; @@ -427,7 +428,7 @@ public class PartitionUpdate extends AbstractBTreePartition public void validateIndexedColumns() { - Keyspace.openAndGetStore(metadata()).indexManager.validate(this); + IndexRegistry.obtain(metadata()).validate(this); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/virtual/AbstractVirtualTable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/virtual/AbstractVirtualTable.java b/src/java/org/apache/cassandra/db/virtual/AbstractVirtualTable.java new file mode 100644 index 0000000..a776d01 --- /dev/null +++ b/src/java/org/apache/cassandra/db/virtual/AbstractVirtualTable.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.virtual; + +import java.util.Iterator; +import java.util.NavigableMap; + +import com.google.common.collect.AbstractIterator; + +import org.apache.cassandra.db.DataRange; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.EmptyIterators; +import org.apache.cassandra.db.PartitionPosition; +import org.apache.cassandra.db.filter.ClusteringIndexFilter; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.partitions.AbstractUnfilteredPartitionIterator; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.partitions.SingletonUnfilteredPartitionIterator; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.schema.TableMetadata; + +/** + * An abstract virtual table implementation that builds the resultset on demand. + */ +public abstract class AbstractVirtualTable implements VirtualTable +{ + private final TableMetadata metadata; + + protected AbstractVirtualTable(TableMetadata metadata) + { + if (!metadata.isVirtual()) + throw new IllegalArgumentException(); + + this.metadata = metadata; + } + + public TableMetadata metadata() + { + return metadata; + } + + /** + * Provide a {@link DataSet} that is contains all of the virtual table's data. + */ + public abstract DataSet data(); + + /** + * Provide a {@link DataSet} that is potentially restricted to the provided partition - but is allowed to contain + * other partitions. + */ + public DataSet data(DecoratedKey partitionKey) + { + return data(); + } + + @Override + public final UnfilteredPartitionIterator select(DecoratedKey partitionKey, ClusteringIndexFilter clusteringIndexFilter, ColumnFilter columnFilter) + { + Partition partition = data(partitionKey).getPartition(partitionKey); + + if (null == partition) + return EmptyIterators.unfilteredPartition(metadata); + + long now = System.currentTimeMillis(); + UnfilteredRowIterator rowIterator = partition.toRowIterator(metadata(), clusteringIndexFilter, columnFilter, now); + return new SingletonUnfilteredPartitionIterator(rowIterator); + } + + @Override + public final UnfilteredPartitionIterator select(DataRange dataRange, ColumnFilter columnFilter) + { + DataSet data = data(); + + if (data.isEmpty()) + return EmptyIterators.unfilteredPartition(metadata); + + Iterator<Partition> iterator = data.getPartitions(dataRange); + + long now = System.currentTimeMillis(); + + return new AbstractUnfilteredPartitionIterator() + { + @Override + public UnfilteredRowIterator next() + { + Partition partition = iterator.next(); + return partition.toRowIterator(metadata, dataRange.clusteringIndexFilter(partition.key()), columnFilter, now); + } + + @Override + public boolean hasNext() + { + return iterator.hasNext(); + } + + @Override + public TableMetadata metadata() + { + return metadata; + } + }; + } + + @Override + public void apply(PartitionUpdate update) + { + throw new InvalidRequestException("Modification is not supported by table " + metadata); + } + + public interface DataSet + { + boolean isEmpty(); + Partition getPartition(DecoratedKey partitionKey); + Iterator<Partition> getPartitions(DataRange range); + } + + public interface Partition + { + DecoratedKey key(); + UnfilteredRowIterator toRowIterator(TableMetadata metadata, ClusteringIndexFilter clusteringIndexFilter, ColumnFilter columnFilter, long now); + } + + /** + * An abstract, map-backed DataSet implementation. Can be backed by any {@link NavigableMap}, then either maintained + * persistently, or built on demand and thrown away after use, depending on the implementing class. + */ + public static abstract class AbstractDataSet implements DataSet + { + protected final NavigableMap<DecoratedKey, Partition> partitions; + + protected AbstractDataSet(NavigableMap<DecoratedKey, Partition> partitions) + { + this.partitions = partitions; + } + + public boolean isEmpty() + { + return partitions.isEmpty(); + } + + public Partition getPartition(DecoratedKey key) + { + return partitions.get(key); + } + + public Iterator<Partition> getPartitions(DataRange dataRange) + { + AbstractBounds<PartitionPosition> keyRange = dataRange.keyRange(); + PartitionPosition startKey = keyRange.left; + PartitionPosition endKey = keyRange.right; + + NavigableMap<DecoratedKey, Partition> selection = partitions; + + if (startKey.isMinimum() && endKey.isMinimum()) + return selection.values().iterator(); + + if (startKey.isMinimum() && endKey instanceof DecoratedKey) + return selection.headMap((DecoratedKey) endKey, keyRange.isEndInclusive()).values().iterator(); + + if (startKey instanceof DecoratedKey && endKey instanceof DecoratedKey) + { + return selection.subMap((DecoratedKey) startKey, keyRange.isStartInclusive(), (DecoratedKey) endKey, keyRange.isEndInclusive()) + .values() + .iterator(); + } + + if (startKey instanceof DecoratedKey) + selection = selection.tailMap((DecoratedKey) startKey, keyRange.isStartInclusive()); + + if (endKey instanceof DecoratedKey) + selection = selection.headMap((DecoratedKey) endKey, keyRange.isEndInclusive()); + + // If we have reach this point it means that one of the PartitionPosition is a KeyBound and we have + // to use filtering for eliminating the unwanted partitions. + Iterator<Partition> iterator = selection.values().iterator(); + + return new AbstractIterator<Partition>() + { + private boolean encounteredPartitionsWithinRange; + + @Override + protected Partition computeNext() + { + while (iterator.hasNext()) + { + Partition partition = iterator.next(); + if (dataRange.contains(partition.key())) + { + encounteredPartitionsWithinRange = true; + return partition; + } + + // we encountered some partitions within the range, but the last one is outside of the range: we are done + if (encounteredPartitionsWithinRange) + return endOfData(); + } + + return endOfData(); + } + }; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/virtual/SimpleDataSet.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/virtual/SimpleDataSet.java b/src/java/org/apache/cassandra/db/virtual/SimpleDataSet.java new file mode 100644 index 0000000..2af3b6a --- /dev/null +++ b/src/java/org/apache/cassandra/db/virtual/SimpleDataSet.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.virtual; + +import java.nio.ByteBuffer; +import java.util.*; +import java.util.concurrent.TimeUnit; + +import com.google.common.collect.Iterables; + +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.filter.ClusteringIndexFilter; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.CompositeType; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.schema.ColumnMetadata; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.utils.ByteBufferUtil; + +/** + * A DataSet implementation that is filled on demand and has an easy to use API for adding rows. + */ +public class SimpleDataSet extends AbstractVirtualTable.AbstractDataSet +{ + private final TableMetadata metadata; + + private Row currentRow; + + public SimpleDataSet(TableMetadata metadata) + { + super(new TreeMap<>(DecoratedKey.comparator)); + this.metadata = metadata; + } + + public SimpleDataSet row(Object... primaryKeyValues) + { + if (Iterables.size(metadata.primaryKeyColumns()) != primaryKeyValues.length) + throw new IllegalArgumentException(); + + Object[] partitionKeyValues = new Object[metadata.partitionKeyColumns().size()]; + Object[] clusteringValues = new Object[metadata.clusteringColumns().size()]; + + System.arraycopy(primaryKeyValues, 0, partitionKeyValues, 0, partitionKeyValues.length); + System.arraycopy(primaryKeyValues, partitionKeyValues.length, clusteringValues, 0, clusteringValues.length); + + DecoratedKey partitionKey = makeDecoratedKey(partitionKeyValues); + Clustering clustering = makeClustering(clusteringValues); + + currentRow = new Row(metadata, clustering); + SimplePartition partition = (SimplePartition) partitions.computeIfAbsent(partitionKey, pk -> new SimplePartition(metadata, pk)); + partition.add(currentRow); + + return this; + } + + public SimpleDataSet column(String columnName, Object value) + { + if (null == currentRow) + throw new IllegalStateException(); + currentRow.add(columnName, value); + return this; + } + + private DecoratedKey makeDecoratedKey(Object... partitionKeyValues) + { + ByteBuffer partitionKey = partitionKeyValues.length == 1 + ? decompose(metadata.partitionKeyType, partitionKeyValues[0]) + : ((CompositeType) metadata.partitionKeyType).decompose(partitionKeyValues); + return metadata.partitioner.decorateKey(partitionKey); + } + + private Clustering makeClustering(Object... clusteringValues) + { + if (clusteringValues.length == 0) + return Clustering.EMPTY; + + ByteBuffer[] clusteringByteBuffers = new ByteBuffer[clusteringValues.length]; + for (int i = 0; i < clusteringValues.length; i++) + clusteringByteBuffers[i] = decompose(metadata.clusteringColumns().get(i).type, clusteringValues[i]); + return Clustering.make(clusteringByteBuffers); + } + + private static final class SimplePartition implements AbstractVirtualTable.Partition + { + private final DecoratedKey key; + private final NavigableMap<Clustering, Row> rows; + + private SimplePartition(TableMetadata metadata, DecoratedKey key) + { + this.key = key; + this.rows = new TreeMap<>(metadata.comparator); + } + + private void add(Row row) + { + rows.put(row.clustering, row); + } + + public DecoratedKey key() + { + return key; + } + + public UnfilteredRowIterator toRowIterator(TableMetadata metadata, + ClusteringIndexFilter clusteringIndexFilter, + ColumnFilter columnFilter, + long now) + { + Iterator<Row> iterator = (clusteringIndexFilter.isReversed() ? rows.descendingMap() : rows).values().iterator(); + + return new AbstractUnfilteredRowIterator(metadata, + key, + DeletionTime.LIVE, + columnFilter.queriedColumns(), + Rows.EMPTY_STATIC_ROW, + false, + EncodingStats.NO_STATS) + { + protected Unfiltered computeNext() + { + while (iterator.hasNext()) + { + Row row = iterator.next(); + if (clusteringIndexFilter.selects(row.clustering)) + return row.toTableRow(columns, now); + } + return endOfData(); + } + }; + } + } + + private static class Row + { + private final TableMetadata metadata; + private final Clustering clustering; + + private final Map<ColumnMetadata, Object> values = new HashMap<>(); + + private Row(TableMetadata metadata, Clustering clustering) + { + this.metadata = metadata; + this.clustering = clustering; + } + + private void add(String columnName, Object value) + { + ColumnMetadata column = metadata.getColumn(ByteBufferUtil.bytes(columnName)); + if (null == column || !column.isRegular()) + throw new IllegalArgumentException(); + values.put(column, value); + } + + private org.apache.cassandra.db.rows.Row toTableRow(RegularAndStaticColumns columns, long now) + { + org.apache.cassandra.db.rows.Row.Builder builder = BTreeRow.unsortedBuilder((int) TimeUnit.MILLISECONDS.toSeconds(now)); + builder.newRow(clustering); + + columns.forEach(c -> + { + Object value = values.get(c); + if (null != value) + builder.addCell(BufferCell.live(c, now, decompose(c.type, value))); + }); + + return builder.build(); + } + } + + @SuppressWarnings("unchecked") + private static <T> ByteBuffer decompose(AbstractType<?> type, T value) + { + return ((AbstractType<T>) type).decompose(value); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java b/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java new file mode 100644 index 0000000..8d6f59b --- /dev/null +++ b/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.virtual; + +import com.google.common.collect.ImmutableList; + +public final class SystemViewsKeyspace extends VirtualKeyspace +{ + private static final String NAME = "system_views"; + + public static SystemViewsKeyspace instance = new SystemViewsKeyspace(); + + private SystemViewsKeyspace() + { + super(NAME, ImmutableList.of()); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org