http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/PartitionRangeReadQuery.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadQuery.java 
b/src/java/org/apache/cassandra/db/PartitionRangeReadQuery.java
new file mode 100644
index 0000000..560fbe9
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadQuery.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.pager.PagingState;
+import org.apache.cassandra.service.pager.PartitionRangeQueryPager;
+import org.apache.cassandra.service.pager.QueryPager;
+import org.apache.cassandra.transport.ProtocolVersion;
+
+/**
+ *  A {@code ReadQuery} for a range of partitions.
+ */
+public interface PartitionRangeReadQuery extends ReadQuery
+{
+    static ReadQuery create(TableMetadata table,
+                            int nowInSec,
+                            ColumnFilter columnFilter,
+                            RowFilter rowFilter,
+                            DataLimits limits,
+                            DataRange dataRange)
+    {
+        if (table.isVirtual())
+            return VirtualTablePartitionRangeReadQuery.create(table, nowInSec, 
columnFilter, rowFilter, limits, dataRange);
+
+        return PartitionRangeReadCommand.create(table, nowInSec, columnFilter, 
rowFilter, limits, dataRange);
+    }
+
+    DataRange dataRange();
+
+    /**
+     * Creates a new {@code PartitionRangeReadQuery} with the updated limits.
+     *
+     * @param newLimits the new limits
+     * @return the new {@code PartitionRangeReadQuery}
+     */
+    PartitionRangeReadQuery withUpdatedLimit(DataLimits newLimits);
+
+    /**
+     * Creates a new {@code PartitionRangeReadQuery} with the updated limits 
and data range.
+     *
+     * @param newLimits the new limits
+     * @return the new {@code PartitionRangeReadQuery}
+     */
+    PartitionRangeReadQuery withUpdatedLimitsAndDataRange(DataLimits 
newLimits, DataRange newDataRange);
+
+    default QueryPager getPager(PagingState pagingState, ProtocolVersion 
protocolVersion)
+    {
+        return new PartitionRangeQueryPager(this, pagingState, 
protocolVersion);
+    }
+
+    default boolean selectsKey(DecoratedKey key)
+    {
+        if (!dataRange().contains(key))
+            return false;
+
+        return rowFilter().partitionKeyRestrictionsAreSatisfiedBy(key, 
metadata().partitionKeyType);
+    }
+
+    default boolean selectsClustering(DecoratedKey key, Clustering clustering)
+    {
+        if (clustering == Clustering.STATIC_CLUSTERING)
+            return !columnFilter().fetchedColumns().statics.isEmpty();
+
+        if (!dataRange().clusteringIndexFilter(key).selects(clustering))
+            return false;
+        return rowFilter().clusteringKeyRestrictionsAreSatisfiedBy(clustering);
+    }
+
+    default boolean selectsFullPartition()
+    {
+        return metadata().isStaticCompactTable() ||
+               (dataRange().selectsAllPartition() && 
!rowFilter().hasExpressionOnClusteringOrRegularColumns());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java 
b/src/java/org/apache/cassandra/db/ReadCommand.java
index 128f8f3..064dd77 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.db;
 
 import java.io.IOException;
 import java.util.function.LongPredicate;
-import java.util.function.Predicate;
 
 import javax.annotation.Nullable;
 
@@ -29,7 +28,6 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.config.*;
 import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.db.monitoring.ApproximateTime;
-import org.apache.cassandra.db.monitoring.MonitorableImpl;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.transform.StoppingTransformation;
@@ -37,6 +35,7 @@ import org.apache.cassandra.db.transform.Transformation;
 import org.apache.cassandra.exceptions.UnknownIndexException;
 import org.apache.cassandra.index.Index;
 import org.apache.cassandra.index.IndexNotAvailableException;
+import org.apache.cassandra.index.IndexRegistry;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
@@ -57,19 +56,13 @@ import org.apache.cassandra.utils.FBUtilities;
  * <p>
  * This contains all the informations needed to do a local read.
  */
-public abstract class ReadCommand extends MonitorableImpl implements ReadQuery
+public abstract class ReadCommand extends AbstractReadQuery
 {
     private static final int TEST_ITERATION_DELAY_MILLIS = 
Integer.parseInt(System.getProperty("cassandra.test.read_iteration_delay_ms", 
"0"));
     protected static final Logger logger = 
LoggerFactory.getLogger(ReadCommand.class);
     public static final IVersionedSerializer<ReadCommand> serializer = new 
Serializer();
 
     private final Kind kind;
-    private final TableMetadata metadata;
-    private final int nowInSec;
-
-    private final ColumnFilter columnFilter;
-    private final RowFilter rowFilter;
-    private final DataLimits limits;
 
     private final boolean isDigestQuery;
     // if a digest query, the version for which the digest is expected. 
Ignored if not a digest.
@@ -115,14 +108,10 @@ public abstract class ReadCommand extends MonitorableImpl 
implements ReadQuery
                           DataLimits limits,
                           IndexMetadata index)
     {
+        super(metadata, nowInSec, columnFilter, rowFilter, limits);
         this.kind = kind;
         this.isDigestQuery = isDigestQuery;
         this.digestVersion = digestVersion;
-        this.metadata = metadata;
-        this.nowInSec = nowInSec;
-        this.columnFilter = columnFilter;
-        this.rowFilter = rowFilter;
-        this.limits = limits;
         this.index = index;
     }
 
@@ -140,30 +129,6 @@ public abstract class ReadCommand extends MonitorableImpl 
implements ReadQuery
     public abstract ReadCommand withUpdatedLimit(DataLimits newLimits);
 
     /**
-     * The metadata for the table queried.
-     *
-     * @return the metadata for the table queried.
-     */
-    public TableMetadata metadata()
-    {
-        return metadata;
-    }
-
-    /**
-     * The time in seconds to use as "now" for this query.
-     * <p>
-     * We use the same time as "now" for the whole query to avoid considering 
different
-     * values as expired during the query, which would be buggy (would throw 
of counting amongst other
-     * things).
-     *
-     * @return the time (in seconds) to use as "now".
-     */
-    public int nowInSec()
-    {
-        return nowInSec;
-    }
-
-    /**
      * The configured timeout for this command.
      *
      * @return the configured timeout for this command.
@@ -171,43 +136,6 @@ public abstract class ReadCommand extends MonitorableImpl 
implements ReadQuery
     public abstract long getTimeout();
 
     /**
-     * A filter on which (non-PK) columns must be returned by the query.
-     *
-     * @return which columns must be fetched by this query.
-     */
-    public ColumnFilter columnFilter()
-    {
-        return columnFilter;
-    }
-
-    /**
-     * Filters/Resrictions on CQL rows.
-     * <p>
-     * This contains the restrictions that are not directly handled by the
-     * {@code ClusteringIndexFilter}. More specifically, this includes any 
non-PK column
-     * restrictions and can include some PK columns restrictions when those 
can't be
-     * satisfied entirely by the clustering index filter (because not all 
clustering columns
-     * have been restricted for instance). If there is 2ndary indexes on the 
table,
-     * one of this restriction might be handled by a 2ndary index.
-     *
-     * @return the filter holding the expression that rows must satisfy.
-     */
-    public RowFilter rowFilter()
-    {
-        return rowFilter;
-    }
-
-    /**
-     * The limits set on this query.
-     *
-     * @return the limits set on this query.
-     */
-    public DataLimits limits()
-    {
-        return limits;
-    }
-
-    /**
      * Whether this query is a digest one or not.
      *
      * @return Whether this query is a digest query.
@@ -327,9 +255,8 @@ public abstract class ReadCommand extends MonitorableImpl 
implements ReadQuery
      */
     public void maybeValidateIndex()
     {
-        Index index = getIndex(Keyspace.openAndGetStore(metadata));
         if (null != index)
-            index.validate(this);
+            IndexRegistry.obtain(metadata()).getIndex(index).validate(this);
     }
 
     /**
@@ -388,11 +315,6 @@ public abstract class ReadCommand extends MonitorableImpl 
implements ReadQuery
 
     protected abstract void recordLatency(TableMetrics metric, long 
latencyNanos);
 
-    public PartitionIterator executeInternal(ReadExecutionController 
controller)
-    {
-        return UnfilteredPartitionIterators.filter(executeLocally(controller), 
nowInSec());
-    }
-
     public ReadExecutionController executionController()
     {
         return ReadExecutionController.forCommand(this);
@@ -410,7 +332,7 @@ public abstract class ReadCommand extends MonitorableImpl 
implements ReadQuery
             private final int warningThreshold = 
DatabaseDescriptor.getTombstoneWarnThreshold();
 
             private final boolean respectTombstoneThresholds = 
!SchemaConstants.isLocalSystemKeyspace(ReadCommand.this.metadata().keyspace);
-            private final boolean enforceStrictLiveness = 
metadata.enforceStrictLiveness();
+            private final boolean enforceStrictLiveness = 
metadata().enforceStrictLiveness();
 
             private int liveRows = 0;
             private int tombstones = 0;
@@ -553,7 +475,7 @@ public abstract class ReadCommand extends MonitorableImpl 
implements ReadQuery
 
         private void maybeDelayForTesting()
         {
-            if (!metadata.keyspace.startsWith("system"))
+            if (!metadata().keyspace.startsWith("system"))
                 FBUtilities.sleepQuietly(TEST_ITERATION_DELAY_MILLIS);
         }
     }
@@ -605,7 +527,7 @@ public abstract class ReadCommand extends MonitorableImpl 
implements ReadQuery
     {
         StringBuilder sb = new StringBuilder();
         sb.append("SELECT ").append(columnFilter());
-        sb.append(" FROM 
").append(metadata().keyspace).append('.').append(metadata.name);
+        sb.append(" FROM 
").append(metadata().keyspace).append('.').append(metadata().name);
         appendCQLWhereClause(sb);
 
         if (limits() != DataLimits.NONE)
@@ -657,11 +579,11 @@ public abstract class ReadCommand extends MonitorableImpl 
implements ReadQuery
             out.writeByte(digestFlag(command.isDigestQuery()) | indexFlag(null 
!= command.indexMetadata()));
             if (command.isDigestQuery())
                 out.writeUnsignedVInt(command.digestVersion());
-            command.metadata.id.serialize(out);
+            command.metadata().id.serialize(out);
             out.writeInt(command.nowInSec());
             ColumnFilter.serializer.serialize(command.columnFilter(), out, 
version);
             RowFilter.serializer.serialize(command.rowFilter(), out, version);
-            DataLimits.serializer.serialize(command.limits(), out, version, 
command.metadata.comparator);
+            DataLimits.serializer.serialize(command.limits(), out, version, 
command.metadata().comparator);
             if (null != command.index)
                 IndexMetadata.serializer.serialize(command.index, out, 
version);
 
@@ -714,11 +636,11 @@ public abstract class ReadCommand extends MonitorableImpl 
implements ReadQuery
         {
             return 2 // kind + flags
                    + (command.isDigestQuery() ? 
TypeSizes.sizeofUnsignedVInt(command.digestVersion()) : 0)
-                   + command.metadata.id.serializedSize()
+                   + command.metadata().id.serializedSize()
                    + TypeSizes.sizeof(command.nowInSec())
                    + 
ColumnFilter.serializer.serializedSize(command.columnFilter(), version)
                    + RowFilter.serializer.serializedSize(command.rowFilter(), 
version)
-                   + DataLimits.serializer.serializedSize(command.limits(), 
version, command.metadata.comparator)
+                   + DataLimits.serializer.serializedSize(command.limits(), 
version, command.metadata().comparator)
                    + command.selectionSerializedSize(version)
                    + command.indexSerializedSize(version);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/ReadQuery.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadQuery.java 
b/src/java/org/apache/cassandra/db/ReadQuery.java
index d527d28..fd94aa1 100644
--- a/src/java/org/apache/cassandra/db/ReadQuery.java
+++ b/src/java/org/apache/cassandra/db/ReadQuery.java
@@ -17,74 +17,113 @@
  */
 package org.apache.cassandra.db;
 
+import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.filter.RowFilter;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.pager.QueryPager;
 import org.apache.cassandra.service.pager.PagingState;
 import org.apache.cassandra.transport.ProtocolVersion;
+import org.apache.cassandra.utils.FBUtilities;
 
 /**
  * Generic abstraction for read queries.
- * <p>
- * The main implementation of this is {@link ReadCommand} but we have this 
interface because
- * {@link SinglePartitionReadCommand.Group} is also consider as a "read query" 
but is not a
- * {@code ReadCommand}.
  */
 public interface ReadQuery
 {
-    ReadQuery EMPTY = new ReadQuery()
+    public static ReadQuery empty(final TableMetadata metadata)
     {
-        public ReadExecutionController executionController()
+        return new ReadQuery()
         {
-            return ReadExecutionController.empty();
-        }
+            public TableMetadata metadata()
+            {
+                return metadata;
+            }
 
-        public PartitionIterator execute(ConsistencyLevel consistency, 
ClientState clientState, long queryStartNanoTime) throws 
RequestExecutionException
-        {
-            return EmptyIterators.partition();
-        }
+            public ReadExecutionController executionController()
+            {
+                return ReadExecutionController.empty();
+            }
 
-        public PartitionIterator executeInternal(ReadExecutionController 
controller)
-        {
-            return EmptyIterators.partition();
-        }
+            public PartitionIterator execute(ConsistencyLevel consistency, 
ClientState clientState, long queryStartNanoTime) throws 
RequestExecutionException
+            {
+                return EmptyIterators.partition();
+            }
 
-        public UnfilteredPartitionIterator 
executeLocally(ReadExecutionController executionController)
-        {
-            return 
EmptyIterators.unfilteredPartition(executionController.metadata());
-        }
+            public PartitionIterator executeInternal(ReadExecutionController 
controller)
+            {
+                return EmptyIterators.partition();
+            }
 
-        public DataLimits limits()
-        {
-            // What we return here doesn't matter much in practice. However, 
returning DataLimits.NONE means
-            // "no particular limit", which makes SelectStatement.execute() 
take the slightly more complex "paging"
-            // path. Not a big deal but it's easy enough to return a limit of 
0 rows which avoids this.
-            return DataLimits.cqlLimits(0);
-        }
+            public UnfilteredPartitionIterator 
executeLocally(ReadExecutionController executionController)
+            {
+                return 
EmptyIterators.unfilteredPartition(executionController.metadata());
+            }
 
-        public QueryPager getPager(PagingState state, ProtocolVersion 
protocolVersion)
-        {
-            return QueryPager.EMPTY;
-        }
+            public DataLimits limits()
+            {
+                // What we return here doesn't matter much in practice. 
However, returning DataLimits.NONE means
+                // "no particular limit", which makes 
SelectStatement.execute() take the slightly more complex "paging"
+                // path. Not a big deal but it's easy enough to return a limit 
of 0 rows which avoids this.
+                return DataLimits.cqlLimits(0);
+            }
 
-        public boolean selectsKey(DecoratedKey key)
-        {
-            return false;
-        }
+            public QueryPager getPager(PagingState state, ProtocolVersion 
protocolVersion)
+            {
+                return QueryPager.EMPTY;
+            }
 
-        public boolean selectsClustering(DecoratedKey key, Clustering 
clustering)
-        {
-            return false;
-        }
+            public boolean selectsKey(DecoratedKey key)
+            {
+                return false;
+            }
 
-        @Override
-        public boolean selectsFullPartition()
-        {
-            return false;
-        }
-    };
+            public boolean selectsClustering(DecoratedKey key, Clustering 
clustering)
+            {
+                return false;
+            }
+
+            @Override
+            public int nowInSec()
+            {
+                return FBUtilities.nowInSeconds();
+            }
+
+            @Override
+            public boolean selectsFullPartition()
+            {
+                return false;
+            }
+
+            @Override
+            public boolean isEmpty()
+            {
+                return true;
+            }
+
+            @Override
+            public RowFilter rowFilter()
+            {
+                return RowFilter.NONE;
+            }
+
+            @Override
+            public ColumnFilter columnFilter()
+            {
+                return ColumnFilter.NONE;
+            }
+        };
+    }
+
+    /**
+     * The metadata for the table this is a query on.
+     *
+     * @return the metadata for the table this is a query on.
+     */
+    public TableMetadata metadata();
 
     /**
      * Starts a new read operation.
@@ -156,8 +195,63 @@ public interface ReadQuery
     public boolean selectsClustering(DecoratedKey key, Clustering clustering);
 
     /**
+     * The time in seconds to use as "now" for this query.
+     * <p>
+     * We use the same time as "now" for the whole query to avoid considering 
different
+     * values as expired during the query, which would be buggy (would throw 
of counting amongst other
+     * things).
+     *
+     * @return the time (in seconds) to use as "now".
+     */
+    public int nowInSec();
+
+    /**
      * Checks if this {@code ReadQuery} selects full partitions, that is it 
has no filtering on clustering or regular columns.
      * @return {@code true} if this {@code ReadQuery} selects full partitions, 
{@code false} otherwise.
      */
     public boolean selectsFullPartition();
+
+    /**
+     * Filters/Resrictions on CQL rows.
+     * <p>
+     * This contains the restrictions that are not directly handled by the
+     * {@code ClusteringIndexFilter}. More specifically, this includes any 
non-PK column
+     * restrictions and can include some PK columns restrictions when those 
can't be
+     * satisfied entirely by the clustering index filter (because not all 
clustering columns
+     * have been restricted for instance). If there is 2ndary indexes on the 
table,
+     * one of this restriction might be handled by a 2ndary index.
+     *
+     * @return the filter holding the expression that rows must satisfy.
+     */
+    public RowFilter rowFilter();
+
+    /**
+     * A filter on which (non-PK) columns must be returned by the query.
+     *
+     * @return which columns must be fetched by this query.
+     */
+    public ColumnFilter columnFilter();
+
+    /**
+     * Whether this query is known to return nothing upfront.
+     * <p>
+     * This is overridden by the {@code ReadQuery} created through {@link 
#empty(TableMetadata)}, and that's probably the
+     * only place that should override it.
+     *
+     * @return if this method is guaranteed to return no results whatsoever.
+     */
+    public default boolean isEmpty()
+    {
+        return false;
+    }
+
+    /**
+     * If the index manager for the table determines that there's an applicable
+     * 2i that can be used to execute this query, call its (optional)
+     * validation method to check that nothing in this query's parameters
+     * violates the implementation specific validation rules.
+     */
+    default void maybeValidateIndex()
+    {
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java 
b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index 64a25f8..7214106 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -20,14 +20,11 @@ package org.apache.cassandra.db;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
-import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 
-import org.apache.commons.lang3.tuple.Pair;
-
 import org.apache.cassandra.cache.IRowCacheEntry;
 import org.apache.cassandra.cache.RowCacheKey;
 import org.apache.cassandra.cache.RowCacheSentinel;
@@ -53,9 +50,7 @@ import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.StorageProxy;
-import org.apache.cassandra.service.pager.*;
 import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.SearchIterator;
 import org.apache.cassandra.utils.btree.BTreeSet;
@@ -63,7 +58,7 @@ import org.apache.cassandra.utils.btree.BTreeSet;
 /**
  * A read command that selects a (part of a) single partition.
  */
-public class SinglePartitionReadCommand extends ReadCommand
+public class SinglePartitionReadCommand extends ReadCommand implements 
SinglePartitionReadQuery
 {
     protected static final SelectionDeserializer selectionDeserializer = new 
Deserializer();
 
@@ -314,6 +309,7 @@ public class SinglePartitionReadCommand extends ReadCommand
                                               indexMetadata());
     }
 
+    @Override
     public SinglePartitionReadCommand withUpdatedLimit(DataLimits newLimits)
     {
         return new SinglePartitionReadCommand(isDigestQuery(),
@@ -328,11 +324,13 @@ public class SinglePartitionReadCommand extends 
ReadCommand
                                               indexMetadata());
     }
 
+    @Override
     public DecoratedKey partitionKey()
     {
         return partitionKey;
     }
 
+    @Override
     public ClusteringIndexFilter clusteringIndexFilter()
     {
         return clusteringIndexFilter;
@@ -348,35 +346,7 @@ public class SinglePartitionReadCommand extends ReadCommand
         return DatabaseDescriptor.getReadRpcTimeout();
     }
 
-    public boolean selectsKey(DecoratedKey key)
-    {
-        if (!this.partitionKey().equals(key))
-            return false;
-
-        return rowFilter().partitionKeyRestrictionsAreSatisfiedBy(key, 
metadata().partitionKeyType);
-    }
-
-    public boolean selectsClustering(DecoratedKey key, Clustering clustering)
-    {
-        if (clustering == Clustering.STATIC_CLUSTERING)
-            return !columnFilter().fetchedColumns().statics.isEmpty();
-
-        if (!clusteringIndexFilter().selects(clustering))
-            return false;
-
-        return rowFilter().clusteringKeyRestrictionsAreSatisfiedBy(clustering);
-    }
-
-    /**
-     * Returns a new command suitable to paging from the last returned row.
-     *
-     * @param lastReturned the last row returned by the previous page. The 
newly created command
-     * will only query row that comes after this (in query order). This can be 
{@code null} if this
-     * is the first page.
-     * @param limits the limits to use for the page to query.
-     *
-     * @return the newly create command.
-     */
+    @Override
     public SinglePartitionReadCommand forPaging(Clustering lastReturned, 
DataLimits limits)
     {
         // We shouldn't have set digest yet when reaching that point
@@ -395,16 +365,6 @@ public class SinglePartitionReadCommand extends ReadCommand
         return StorageProxy.read(Group.one(this), consistency, clientState, 
queryStartNanoTime);
     }
 
-    public SinglePartitionPager getPager(PagingState pagingState, 
ProtocolVersion protocolVersion)
-    {
-        return getPager(this, pagingState, protocolVersion);
-    }
-
-    private static SinglePartitionPager getPager(SinglePartitionReadCommand 
command, PagingState pagingState, ProtocolVersion protocolVersion)
-    {
-        return new SinglePartitionPager(command, pagingState, protocolVersion);
-    }
-
     protected void recordLatency(TableMetrics metric, long latencyNanos)
     {
         metric.readLatency.addNano(latencyNanos);
@@ -1054,23 +1014,34 @@ public class SinglePartitionReadCommand extends 
ReadCommand
     /**
      * Groups multiple single partition read commands.
      */
-    public static class Group implements ReadQuery
+    public static class Group extends 
SinglePartitionReadQuery.Group<SinglePartitionReadCommand>
     {
-        public final List<SinglePartitionReadCommand> commands;
-        private final DataLimits limits;
-        private final int nowInSec;
-        private final boolean selectsFullPartitions;
+        public static Group create(TableMetadata metadata,
+                                   int nowInSec,
+                                   ColumnFilter columnFilter,
+                                   RowFilter rowFilter,
+                                   DataLimits limits,
+                                   List<DecoratedKey> partitionKeys,
+                                   ClusteringIndexFilter clusteringIndexFilter)
+        {
+            List<SinglePartitionReadCommand> commands = new 
ArrayList<>(partitionKeys.size());
+            for (DecoratedKey partitionKey : partitionKeys)
+            {
+                commands.add(SinglePartitionReadCommand.create(metadata,
+                                                               nowInSec,
+                                                               columnFilter,
+                                                               rowFilter,
+                                                               limits,
+                                                               partitionKey,
+                                                               
clusteringIndexFilter));
+            }
+
+            return new Group(commands, limits);
+        }
 
         public Group(List<SinglePartitionReadCommand> commands, DataLimits 
limits)
         {
-            assert !commands.isEmpty();
-            this.commands = commands;
-            this.limits = limits;
-            SinglePartitionReadCommand firstCommand = commands.get(0);
-            this.nowInSec = firstCommand.nowInSec();
-            this.selectsFullPartitions = firstCommand.selectsFullPartition();
-            for (int i = 1; i < commands.size(); i++)
-                assert commands.get(i).nowInSec() == nowInSec;
+            super(commands, limits);
         }
 
         public static Group one(SinglePartitionReadCommand command)
@@ -1082,97 +1053,6 @@ public class SinglePartitionReadCommand extends 
ReadCommand
         {
             return StorageProxy.read(this, consistency, clientState, 
queryStartNanoTime);
         }
-
-        public int nowInSec()
-        {
-            return nowInSec;
-        }
-
-        public DataLimits limits()
-        {
-            return limits;
-        }
-
-        public TableMetadata metadata()
-        {
-            return commands.get(0).metadata();
-        }
-
-        @Override
-        public boolean selectsFullPartition()
-        {
-            return selectsFullPartitions;
-        }
-
-        public ReadExecutionController executionController()
-        {
-            // Note that the only difference between the command in a group 
must be the partition key on which
-            // they applied. So as far as ReadOrderGroup is concerned, we can 
use any of the commands to start one.
-            return commands.get(0).executionController();
-        }
-
-        public PartitionIterator executeInternal(ReadExecutionController 
controller)
-        {
-            // Note that the only difference between the command in a group 
must be the partition key on which
-            // they applied.
-            boolean enforceStrictLiveness = 
commands.get(0).metadata().enforceStrictLiveness();
-            return 
limits.filter(UnfilteredPartitionIterators.filter(executeLocally(controller, 
false), nowInSec),
-                                 nowInSec,
-                                 selectsFullPartitions,
-                                 enforceStrictLiveness);
-        }
-
-        public UnfilteredPartitionIterator 
executeLocally(ReadExecutionController executionController)
-        {
-            return executeLocally(executionController, true);
-        }
-
-        /**
-         * Implementation of {@link 
ReadQuery#executeLocally(ReadExecutionController)}.
-         *
-         * @param executionController - the {@code ReadExecutionController} 
protecting the read.
-         * @param sort - whether to sort the inner commands by partition key, 
required for merging the iterator
-         *               later on. This will be false when called by {@link 
ReadQuery#executeInternal(ReadExecutionController)}
-         *               because in this case it is safe to do so as there is 
no merging involved and we don't want to
-         *               change the old behavior which was to not sort by 
partition.
-         *
-         * @return - the iterator that can be used to retrieve the query 
result.
-         */
-        private UnfilteredPartitionIterator 
executeLocally(ReadExecutionController executionController, boolean sort)
-        {
-            List<Pair<DecoratedKey, UnfilteredPartitionIterator>> partitions = 
new ArrayList<>(commands.size());
-            for (SinglePartitionReadCommand cmd : commands)
-                partitions.add(Pair.of(cmd.partitionKey, 
cmd.executeLocally(executionController)));
-
-            if (sort)
-                Collections.sort(partitions, (p1, p2) -> 
p1.getLeft().compareTo(p2.getLeft()));
-
-            return 
UnfilteredPartitionIterators.concat(partitions.stream().map(p -> 
p.getRight()).collect(Collectors.toList()));
-        }
-
-        public QueryPager getPager(PagingState pagingState, ProtocolVersion 
protocolVersion)
-        {
-            if (commands.size() == 1)
-                return SinglePartitionReadCommand.getPager(commands.get(0), 
pagingState, protocolVersion);
-
-            return new MultiPartitionPager(this, pagingState, protocolVersion);
-        }
-
-        public boolean selectsKey(DecoratedKey key)
-        {
-            return Iterables.any(commands, c -> c.selectsKey(key));
-        }
-
-        public boolean selectsClustering(DecoratedKey key, Clustering 
clustering)
-        {
-            return Iterables.any(commands, c -> c.selectsClustering(key, 
clustering));
-        }
-
-        @Override
-        public String toString()
-        {
-            return commands.toString();
-        }
     }
 
     private static class Deserializer extends SelectionDeserializer

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/SinglePartitionReadQuery.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadQuery.java 
b/src/java/org/apache/cassandra/db/SinglePartitionReadQuery.java
new file mode 100644
index 0000000..f9f0014
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadQuery.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.Iterables;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import org.apache.cassandra.db.filter.ClusteringIndexFilter;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.pager.MultiPartitionPager;
+import org.apache.cassandra.service.pager.PagingState;
+import org.apache.cassandra.service.pager.QueryPager;
+import org.apache.cassandra.service.pager.SinglePartitionPager;
+import org.apache.cassandra.transport.ProtocolVersion;
+
+/**
+ * A {@code ReadQuery} for a single partition.
+ */
+public interface SinglePartitionReadQuery extends ReadQuery
+{
+    public static Group<? extends SinglePartitionReadQuery> 
createGroup(TableMetadata metadata,
+                                                                        int 
nowInSec,
+                                                                        
ColumnFilter columnFilter,
+                                                                        
RowFilter rowFilter,
+                                                                        
DataLimits limits,
+                                                                        
List<DecoratedKey> partitionKeys,
+                                                                        
ClusteringIndexFilter clusteringIndexFilter)
+    {
+        return metadata.isVirtual()
+             ? VirtualTableSinglePartitionReadQuery.Group.create(metadata, 
nowInSec, columnFilter, rowFilter, limits, partitionKeys, clusteringIndexFilter)
+             : SinglePartitionReadCommand.Group.create(metadata, nowInSec, 
columnFilter, rowFilter, limits, partitionKeys, clusteringIndexFilter);
+    }
+
+
+    /**
+     * Creates a new read query on a single partition.
+     *
+     * @param metadata the table to query.
+     * @param nowInSec the time in seconds to use are "now" for this query.
+     * @param key the partition key for the partition to query.
+     * @param columnFilter the column filter to use for the query.
+     * @param filter the clustering index filter to use for the query.
+     *
+     * @return a newly created read query. The returned query will use no row 
filter and have no limits.
+     */
+    public static SinglePartitionReadQuery create(TableMetadata metadata,
+                                                  int nowInSec,
+                                                  DecoratedKey key,
+                                                  ColumnFilter columnFilter,
+                                                  ClusteringIndexFilter filter)
+    {
+        return create(metadata, nowInSec, columnFilter, RowFilter.NONE, 
DataLimits.NONE, key, filter);
+    }
+
+    /**
+     * Creates a new read query on a single partition.
+     *
+     * @param metadata the table to query.
+     * @param nowInSec the time in seconds to use are "now" for this query.
+     * @param columnFilter the column filter to use for the query.
+     * @param rowFilter the row filter to use for the query.
+     * @param limits the limits to use for the query.
+     * @param partitionKey the partition key for the partition to query.
+     * @param clusteringIndexFilter the clustering index filter to use for the 
query.
+     *
+     * @return a newly created read query.
+     */
+    public static SinglePartitionReadQuery create(TableMetadata metadata,
+                                                  int nowInSec,
+                                                  ColumnFilter columnFilter,
+                                                  RowFilter rowFilter,
+                                                  DataLimits limits,
+                                                  DecoratedKey partitionKey,
+                                                  ClusteringIndexFilter 
clusteringIndexFilter)
+    {
+        return metadata.isVirtual()
+             ? VirtualTableSinglePartitionReadQuery.create(metadata, nowInSec, 
columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter)
+             : SinglePartitionReadCommand.create(metadata, nowInSec, 
columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter);
+    }
+
+    /**
+     * Returns the key of the partition queried by this {@code ReadQuery}
+     * @return the key of the partition queried
+     */
+    DecoratedKey partitionKey();
+
+    /**
+     * Creates a new {@code SinglePartitionReadQuery} with the specified 
limits.
+     *
+     * @param newLimits the new limits
+     * @return the new {@code SinglePartitionReadQuery}
+     */
+    SinglePartitionReadQuery withUpdatedLimit(DataLimits newLimits);
+
+    /**
+     * Returns a new {@code SinglePartitionReadQuery} suitable to paging from 
the last returned row.
+     *
+     * @param lastReturned the last row returned by the previous page. The 
newly created query
+     * will only query row that comes after this (in query order). This can be 
{@code null} if this
+     * is the first page.
+     * @param limits the limits to use for the page to query.
+     *
+     * @return the newly create query.
+     */
+    SinglePartitionReadQuery forPaging(Clustering lastReturned, DataLimits 
limits);
+
+    @Override
+    default SinglePartitionPager getPager(PagingState pagingState, 
ProtocolVersion protocolVersion)
+    {
+        return new SinglePartitionPager(this, pagingState, protocolVersion);
+    }
+
+    ClusteringIndexFilter clusteringIndexFilter();
+
+    default boolean selectsKey(DecoratedKey key)
+    {
+        if (!this.partitionKey().equals(key))
+            return false;
+
+        return rowFilter().partitionKeyRestrictionsAreSatisfiedBy(key, 
metadata().partitionKeyType);
+    }
+
+    default boolean selectsClustering(DecoratedKey key, Clustering clustering)
+    {
+        if (clustering == Clustering.STATIC_CLUSTERING)
+            return !columnFilter().fetchedColumns().statics.isEmpty();
+
+        if (!clusteringIndexFilter().selects(clustering))
+            return false;
+
+        return rowFilter().clusteringKeyRestrictionsAreSatisfiedBy(clustering);
+    }
+
+    /**
+     * Groups multiple single partition read queries.
+     */
+    abstract class Group<T extends SinglePartitionReadQuery> implements 
ReadQuery
+    {
+        public final List<T> queries;
+        private final DataLimits limits;
+        private final int nowInSec;
+        private final boolean selectsFullPartitions;
+
+        public Group(List<T> queries, DataLimits limits)
+        {
+            assert !queries.isEmpty();
+            this.queries = queries;
+            this.limits = limits;
+            T firstQuery = queries.get(0);
+            this.nowInSec = firstQuery.nowInSec();
+            this.selectsFullPartitions = firstQuery.selectsFullPartition();
+            for (int i = 1; i < queries.size(); i++)
+                assert queries.get(i).nowInSec() == nowInSec;
+        }
+
+        public int nowInSec()
+        {
+            return nowInSec;
+        }
+
+        public DataLimits limits()
+        {
+            return limits;
+        }
+
+        public TableMetadata metadata()
+        {
+            return queries.get(0).metadata();
+        }
+
+        @Override
+        public boolean selectsFullPartition()
+        {
+            return selectsFullPartitions;
+        }
+
+        public ReadExecutionController executionController()
+        {
+            // Note that the only difference between the queries in a group 
must be the partition key on which
+            // they applied. So as far as ReadOrderGroup is concerned, we can 
use any of the queries to start one.
+            return queries.get(0).executionController();
+        }
+
+        public PartitionIterator executeInternal(ReadExecutionController 
controller)
+        {
+            // Note that the only difference between the queries in a group 
must be the partition key on which
+            // they applied.
+            boolean enforceStrictLiveness = 
queries.get(0).metadata().enforceStrictLiveness();
+            return 
limits.filter(UnfilteredPartitionIterators.filter(executeLocally(controller, 
false), nowInSec),
+                                 nowInSec,
+                                 selectsFullPartitions,
+                                 enforceStrictLiveness);
+        }
+
+        public UnfilteredPartitionIterator 
executeLocally(ReadExecutionController executionController)
+        {
+            return executeLocally(executionController, true);
+        }
+
+        /**
+         * Implementation of {@link 
ReadQuery#executeLocally(ReadExecutionController)}.
+         *
+         * @param executionController - the {@code ReadExecutionController} 
protecting the read.
+         * @param sort - whether to sort the inner queries by partition key, 
required for merging the iterator
+         *               later on. This will be false when called by {@link 
ReadQuery#executeInternal(ReadExecutionController)}
+         *               because in this case it is safe to do so as there is 
no merging involved and we don't want to
+         *               change the old behavior which was to not sort by 
partition.
+         *
+         * @return - the iterator that can be used to retrieve the query 
result.
+         */
+        private UnfilteredPartitionIterator 
executeLocally(ReadExecutionController executionController, boolean sort)
+        {
+            List<Pair<DecoratedKey, UnfilteredPartitionIterator>> partitions = 
new ArrayList<>(queries.size());
+            for (T query : queries)
+                partitions.add(Pair.of(query.partitionKey(), 
query.executeLocally(executionController)));
+
+            if (sort)
+                Collections.sort(partitions, (p1, p2) -> 
p1.getLeft().compareTo(p2.getLeft()));
+
+            return 
UnfilteredPartitionIterators.concat(partitions.stream().map(p -> 
p.getRight()).collect(Collectors.toList()));
+        }
+
+        public QueryPager getPager(PagingState pagingState, ProtocolVersion 
protocolVersion)
+        {
+            if (queries.size() == 1)
+                return new SinglePartitionPager(queries.get(0), pagingState, 
protocolVersion);
+
+            return new MultiPartitionPager<T>(this, pagingState, 
protocolVersion);
+        }
+
+        public boolean selectsKey(DecoratedKey key)
+        {
+            return Iterables.any(queries, c -> c.selectsKey(key));
+        }
+
+        public boolean selectsClustering(DecoratedKey key, Clustering 
clustering)
+        {
+            return Iterables.any(queries, c -> c.selectsClustering(key, 
clustering));
+        }
+
+        @Override
+        public RowFilter rowFilter()
+        {
+            // Note that the only difference between the query in a group must 
be the partition key on which
+            // they applied.
+            return queries.get(0).rowFilter();
+        }
+
+        @Override
+        public ColumnFilter columnFilter()
+        {
+            // Note that the only difference between the query in a group must 
be the partition key on which
+            // they applied.
+            return queries.get(0).columnFilter();
+        }
+
+        @Override
+        public String toString()
+        {
+            return queries.toString();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/VirtualTablePartitionRangeReadQuery.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/VirtualTablePartitionRangeReadQuery.java 
b/src/java/org/apache/cassandra/db/VirtualTablePartitionRangeReadQuery.java
new file mode 100644
index 0000000..48cafa1
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/VirtualTablePartitionRangeReadQuery.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry;
+import org.apache.cassandra.db.virtual.VirtualTable;
+import org.apache.cassandra.schema.TableMetadata;
+
+/**
+ * A read query that selects a (part of a) range of partitions of a virtual 
table.
+ */
+public class VirtualTablePartitionRangeReadQuery extends VirtualTableReadQuery 
implements PartitionRangeReadQuery
+{
+    private final DataRange dataRange;
+
+    public static VirtualTablePartitionRangeReadQuery create(TableMetadata 
metadata,
+                                                             int nowInSec,
+                                                             ColumnFilter 
columnFilter,
+                                                             RowFilter 
rowFilter,
+                                                             DataLimits limits,
+                                                             DataRange 
dataRange)
+    {
+        return new VirtualTablePartitionRangeReadQuery(metadata,
+                                                       nowInSec,
+                                                       columnFilter,
+                                                       rowFilter,
+                                                       limits,
+                                                       dataRange);
+    }
+
+    private VirtualTablePartitionRangeReadQuery(TableMetadata metadata,
+                                                int nowInSec,
+                                                ColumnFilter columnFilter,
+                                                RowFilter rowFilter,
+                                                DataLimits limits,
+                                                DataRange dataRange)
+    {
+        super(metadata, nowInSec, columnFilter, rowFilter, limits);
+        this.dataRange = dataRange;
+    }
+
+    @Override
+    public DataRange dataRange()
+    {
+        return dataRange;
+    }
+
+    @Override
+    public PartitionRangeReadQuery withUpdatedLimit(DataLimits newLimits)
+    {
+        return new VirtualTablePartitionRangeReadQuery(metadata(),
+                                                       nowInSec(),
+                                                       columnFilter(),
+                                                       rowFilter(),
+                                                       newLimits,
+                                                       dataRange());
+    }
+
+    @Override
+    public PartitionRangeReadQuery withUpdatedLimitsAndDataRange(DataLimits 
newLimits, DataRange newDataRange)
+    {
+        return new VirtualTablePartitionRangeReadQuery(metadata(),
+                                                       nowInSec(),
+                                                       columnFilter(),
+                                                       rowFilter(),
+                                                       newLimits,
+                                                       newDataRange);
+    }
+
+    @Override
+    protected UnfilteredPartitionIterator queryVirtualTable()
+    {
+        VirtualTable view = 
VirtualKeyspaceRegistry.instance.getTableNullable(metadata().id);
+        return view.select(dataRange, columnFilter());
+    }
+
+    @Override
+    protected void appendCQLWhereClause(StringBuilder sb)
+    {
+        if (dataRange.isUnrestricted() && rowFilter().isEmpty())
+            return;
+
+        sb.append(" WHERE ");
+        // We put the row filter first because the data range can end by 
"ORDER BY"
+        if (!rowFilter().isEmpty())
+        {
+            sb.append(rowFilter());
+            if (!dataRange.isUnrestricted())
+                sb.append(" AND ");
+        }
+        if (!dataRange.isUnrestricted())
+            sb.append(dataRange.toCQLString(metadata()));
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/VirtualTableReadQuery.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/VirtualTableReadQuery.java 
b/src/java/org/apache/cassandra/db/VirtualTableReadQuery.java
new file mode 100644
index 0000000..f2c9a49
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/VirtualTableReadQuery.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.ClientState;
+
+/**
+ * Base class for the {@code ReadQuery} implementations use to query virtual 
tables.
+ */
+public abstract class VirtualTableReadQuery extends AbstractReadQuery
+{
+    protected VirtualTableReadQuery(TableMetadata metadata,
+                                    int nowInSec,
+                                    ColumnFilter columnFilter,
+                                    RowFilter rowFilter,
+                                    DataLimits limits)
+    {
+        super(metadata, nowInSec, columnFilter, rowFilter, limits);
+    }
+
+    @Override
+    public ReadExecutionController executionController()
+    {
+        return ReadExecutionController.empty();
+    }
+
+    @Override
+    public PartitionIterator execute(ConsistencyLevel consistency,
+                                     ClientState clientState,
+                                     long queryStartNanoTime) throws 
RequestExecutionException
+    {
+        return executeInternal(executionController());
+    }
+
+    @Override
+    public UnfilteredPartitionIterator executeLocally(ReadExecutionController 
executionController)
+    {
+        UnfilteredPartitionIterator resultIterator = queryVirtualTable();
+        return limits().filter(rowFilter().filter(resultIterator, nowInSec()), 
nowInSec(), selectsFullPartition());
+    }
+
+    protected abstract UnfilteredPartitionIterator queryVirtualTable();
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/VirtualTableSinglePartitionReadQuery.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/VirtualTableSinglePartitionReadQuery.java 
b/src/java/org/apache/cassandra/db/VirtualTableSinglePartitionReadQuery.java
new file mode 100644
index 0000000..11f1f77
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/VirtualTableSinglePartitionReadQuery.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.cassandra.db.filter.ClusteringIndexFilter;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.PartitionIterators;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry;
+import org.apache.cassandra.db.virtual.VirtualTable;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.ClientState;
+
+/**
+ * A read query that selects a (part of a) single partition of a virtual table.
+ */
+public class VirtualTableSinglePartitionReadQuery extends 
VirtualTableReadQuery implements SinglePartitionReadQuery
+{
+    private final DecoratedKey partitionKey;
+    private final ClusteringIndexFilter clusteringIndexFilter;
+
+    public static VirtualTableSinglePartitionReadQuery create(TableMetadata 
metadata,
+                                                              int nowInSec,
+                                                              ColumnFilter 
columnFilter,
+                                                              RowFilter 
rowFilter,
+                                                              DataLimits 
limits,
+                                                              DecoratedKey 
partitionKey,
+                                                              
ClusteringIndexFilter clusteringIndexFilter)
+    {
+        return new VirtualTableSinglePartitionReadQuery(metadata,
+                                                        nowInSec,
+                                                        columnFilter,
+                                                        rowFilter,
+                                                        limits,
+                                                        partitionKey,
+                                                        clusteringIndexFilter);
+    }
+
+    private VirtualTableSinglePartitionReadQuery(TableMetadata metadata,
+                                                 int nowInSec,
+                                                 ColumnFilter columnFilter,
+                                                 RowFilter rowFilter,
+                                                 DataLimits limits,
+                                                 DecoratedKey partitionKey,
+                                                 ClusteringIndexFilter 
clusteringIndexFilter)
+    {
+        super(metadata, nowInSec, columnFilter, rowFilter, limits);
+        this.partitionKey = partitionKey;
+        this.clusteringIndexFilter = clusteringIndexFilter;
+    }
+
+    @Override
+    protected void appendCQLWhereClause(StringBuilder sb)
+    {
+        sb.append(" WHERE ");
+
+        
sb.append(ColumnMetadata.toCQLString(metadata().partitionKeyColumns())).append("
 = ");
+        DataRange.appendKeyString(sb, metadata().partitionKeyType, 
partitionKey().getKey());
+
+        // We put the row filter first because the clustering index filter can 
end by "ORDER BY"
+        if (!rowFilter().isEmpty())
+            sb.append(" AND ").append(rowFilter());
+
+        String filterString = clusteringIndexFilter().toCQLString(metadata());
+        if (!filterString.isEmpty())
+            sb.append(" AND ").append(filterString);
+    }
+
+    @Override
+    public ClusteringIndexFilter clusteringIndexFilter()
+    {
+        return clusteringIndexFilter;
+    }
+
+    @Override
+    public boolean selectsFullPartition()
+    {
+        return clusteringIndexFilter.selectsAllPartition() && 
!rowFilter().hasExpressionOnClusteringOrRegularColumns();
+    }
+
+    @Override
+    public DecoratedKey partitionKey()
+    {
+        return partitionKey;
+    }
+
+    @Override
+    public SinglePartitionReadQuery withUpdatedLimit(DataLimits newLimits)
+    {
+        return new VirtualTableSinglePartitionReadQuery(metadata(),
+                                                        nowInSec(),
+                                                        columnFilter(),
+                                                        rowFilter(),
+                                                        newLimits,
+                                                        partitionKey(),
+                                                        clusteringIndexFilter);
+    }
+
+    @Override
+    public SinglePartitionReadQuery forPaging(Clustering lastReturned, 
DataLimits limits)
+    {
+        return new VirtualTableSinglePartitionReadQuery(metadata(),
+                                                        nowInSec(),
+                                                        columnFilter(),
+                                                        rowFilter(),
+                                                        limits,
+                                                        partitionKey(),
+                                                      lastReturned == null ? 
clusteringIndexFilter
+                                                              : 
clusteringIndexFilter.forPaging(metadata().comparator,
+                                                                               
                 lastReturned,
+                                                                               
                 false));
+    }
+
+    @Override
+    protected UnfilteredPartitionIterator queryVirtualTable()
+    {
+        VirtualTable view = 
VirtualKeyspaceRegistry.instance.getTableNullable(metadata().id);
+        return view.select(partitionKey, clusteringIndexFilter, 
columnFilter());
+    }
+
+    /**
+     * Groups multiple single partition read queries.
+     */
+    public static class Group extends 
SinglePartitionReadQuery.Group<VirtualTableSinglePartitionReadQuery>
+    {
+        public static Group create(TableMetadata metadata,
+                                   int nowInSec,
+                                   ColumnFilter columnFilter,
+                                   RowFilter rowFilter,
+                                   DataLimits limits,
+                                   List<DecoratedKey> partitionKeys,
+                                   ClusteringIndexFilter clusteringIndexFilter)
+        {
+            List<VirtualTableSinglePartitionReadQuery> queries = new 
ArrayList<>(partitionKeys.size());
+            for (DecoratedKey partitionKey : partitionKeys)
+            {
+                
queries.add(VirtualTableSinglePartitionReadQuery.create(metadata,
+                                                                        
nowInSec,
+                                                                        
columnFilter,
+                                                                        
rowFilter,
+                                                                        limits,
+                                                                        
partitionKey,
+                                                                        
clusteringIndexFilter));
+            }
+
+            return new Group(queries, limits);
+        }
+
+        public Group(List<VirtualTableSinglePartitionReadQuery> queries, 
DataLimits limits)
+        {
+            super(queries, limits);
+        }
+
+        public static Group one(VirtualTableSinglePartitionReadQuery query)
+        {
+            return new Group(Collections.singletonList(query), query.limits());
+        }
+
+        public PartitionIterator execute(ConsistencyLevel consistency, 
ClientState clientState, long queryStartNanoTime) throws 
RequestExecutionException
+        {
+            if (queries.size() == 1)
+                return queries.get(0).execute(consistency, clientState, 
queryStartNanoTime);
+
+            return PartitionIterators.concat(queries.stream()
+                                                    .map(q -> 
q.execute(consistency, clientState, queryStartNanoTime))
+                                                    
.collect(Collectors.toList()));
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java 
b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
index 1d7d1c8..20a1656 100644
--- a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
@@ -65,6 +65,8 @@ import org.apache.cassandra.schema.TableMetadata;
  */
 public class ColumnFilter
 {
+    public static final ColumnFilter NONE = 
selection(RegularAndStaticColumns.NONE);
+
     public static final Serializer serializer = new Serializer();
 
     // True if _fetched_ includes all regular columns (and any static in 
_queried_), in which case metadata must not be

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java 
b/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java
index 70759cf..9064b0f 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java
@@ -24,7 +24,7 @@ import org.apache.cassandra.db.transform.MorePartitions;
 import org.apache.cassandra.db.transform.Transformation;
 import org.apache.cassandra.utils.AbstractIterator;
 
-import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.SinglePartitionReadQuery;
 import org.apache.cassandra.db.rows.*;
 
 public abstract class PartitionIterators
@@ -32,15 +32,15 @@ public abstract class PartitionIterators
     private PartitionIterators() {}
 
     @SuppressWarnings("resource") // The created resources are returned right 
away
-    public static RowIterator getOnlyElement(final PartitionIterator iter, 
SinglePartitionReadCommand command)
+    public static RowIterator getOnlyElement(final PartitionIterator iter, 
SinglePartitionReadQuery query)
     {
         // If the query has no results, we'll get an empty iterator, but we 
still
         // want a RowIterator out of this method, so we return an empty one.
         RowIterator toReturn = iter.hasNext()
                              ? iter.next()
-                             : EmptyIterators.row(command.metadata(),
-                                                  command.partitionKey(),
-                                                  
command.clusteringIndexFilter().isReversed());
+                             : EmptyIterators.row(query.metadata(),
+                                                  query.partitionKey(),
+                                                  
query.clusteringIndexFilter().isReversed());
 
         // Note that in general, we should wrap the result so that it's close 
method actually
         // close the whole PartitionIterator.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java 
b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
index a549458..2dc566a 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.index.IndexRegistry;
 import org.apache.cassandra.io.util.*;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.Schema;
@@ -427,7 +428,7 @@ public class PartitionUpdate extends AbstractBTreePartition
 
     public void validateIndexedColumns()
     {
-        Keyspace.openAndGetStore(metadata()).indexManager.validate(this);
+        IndexRegistry.obtain(metadata()).validate(this);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/virtual/AbstractVirtualTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/virtual/AbstractVirtualTable.java 
b/src/java/org/apache/cassandra/db/virtual/AbstractVirtualTable.java
new file mode 100644
index 0000000..a776d01
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/virtual/AbstractVirtualTable.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.virtual;
+
+import java.util.Iterator;
+import java.util.NavigableMap;
+
+import com.google.common.collect.AbstractIterator;
+
+import org.apache.cassandra.db.DataRange;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.EmptyIterators;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.filter.ClusteringIndexFilter;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.partitions.AbstractUnfilteredPartitionIterator;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.partitions.SingletonUnfilteredPartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.schema.TableMetadata;
+
+/**
+ * An abstract virtual table implementation that builds the resultset on 
demand.
+ */
+public abstract class AbstractVirtualTable implements VirtualTable
+{
+    private final TableMetadata metadata;
+
+    protected AbstractVirtualTable(TableMetadata metadata)
+    {
+        if (!metadata.isVirtual())
+            throw new IllegalArgumentException();
+
+        this.metadata = metadata;
+    }
+
+    public TableMetadata metadata()
+    {
+        return metadata;
+    }
+
+    /**
+     * Provide a {@link DataSet} that is contains all of the virtual table's 
data.
+     */
+    public abstract DataSet data();
+
+    /**
+     * Provide a {@link DataSet} that is potentially restricted to the 
provided partition - but is allowed to contain
+     * other partitions.
+     */
+    public DataSet data(DecoratedKey partitionKey)
+    {
+        return data();
+    }
+
+    @Override
+    public final UnfilteredPartitionIterator select(DecoratedKey partitionKey, 
ClusteringIndexFilter clusteringIndexFilter, ColumnFilter columnFilter)
+    {
+        Partition partition = data(partitionKey).getPartition(partitionKey);
+
+        if (null == partition)
+            return EmptyIterators.unfilteredPartition(metadata);
+
+        long now = System.currentTimeMillis();
+        UnfilteredRowIterator rowIterator = 
partition.toRowIterator(metadata(), clusteringIndexFilter, columnFilter, now);
+        return new SingletonUnfilteredPartitionIterator(rowIterator);
+    }
+
+    @Override
+    public final UnfilteredPartitionIterator select(DataRange dataRange, 
ColumnFilter columnFilter)
+    {
+        DataSet data = data();
+
+        if (data.isEmpty())
+            return EmptyIterators.unfilteredPartition(metadata);
+
+        Iterator<Partition> iterator = data.getPartitions(dataRange);
+
+        long now = System.currentTimeMillis();
+
+        return new AbstractUnfilteredPartitionIterator()
+        {
+            @Override
+            public UnfilteredRowIterator next()
+            {
+                Partition partition = iterator.next();
+                return partition.toRowIterator(metadata, 
dataRange.clusteringIndexFilter(partition.key()), columnFilter, now);
+            }
+
+            @Override
+            public boolean hasNext()
+            {
+                return iterator.hasNext();
+            }
+
+            @Override
+            public TableMetadata metadata()
+            {
+                return metadata;
+            }
+        };
+    }
+
+    @Override
+    public void apply(PartitionUpdate update)
+    {
+        throw new InvalidRequestException("Modification is not supported by 
table " + metadata);
+    }
+
+    public interface DataSet
+    {
+        boolean isEmpty();
+        Partition getPartition(DecoratedKey partitionKey);
+        Iterator<Partition> getPartitions(DataRange range);
+    }
+
+    public interface Partition
+    {
+        DecoratedKey key();
+        UnfilteredRowIterator toRowIterator(TableMetadata metadata, 
ClusteringIndexFilter clusteringIndexFilter, ColumnFilter columnFilter, long 
now);
+    }
+
+    /**
+     * An abstract, map-backed DataSet implementation. Can be backed by any 
{@link NavigableMap}, then either maintained
+     * persistently, or built on demand and thrown away after use, depending 
on the implementing class.
+     */
+    public static abstract class AbstractDataSet implements DataSet
+    {
+        protected final NavigableMap<DecoratedKey, Partition> partitions;
+
+        protected AbstractDataSet(NavigableMap<DecoratedKey, Partition> 
partitions)
+        {
+            this.partitions = partitions;
+        }
+
+        public boolean isEmpty()
+        {
+            return partitions.isEmpty();
+        }
+
+        public Partition getPartition(DecoratedKey key)
+        {
+            return partitions.get(key);
+        }
+
+        public Iterator<Partition> getPartitions(DataRange dataRange)
+        {
+            AbstractBounds<PartitionPosition> keyRange = dataRange.keyRange();
+            PartitionPosition startKey = keyRange.left;
+            PartitionPosition endKey = keyRange.right;
+
+            NavigableMap<DecoratedKey, Partition> selection = partitions;
+
+            if (startKey.isMinimum() && endKey.isMinimum())
+                return selection.values().iterator();
+
+            if (startKey.isMinimum() && endKey instanceof DecoratedKey)
+                return selection.headMap((DecoratedKey) endKey, 
keyRange.isEndInclusive()).values().iterator();
+
+            if (startKey instanceof DecoratedKey && endKey instanceof 
DecoratedKey)
+            {
+                return selection.subMap((DecoratedKey) startKey, 
keyRange.isStartInclusive(), (DecoratedKey) endKey, keyRange.isEndInclusive())
+                                .values()
+                                .iterator();
+            }
+
+            if (startKey instanceof DecoratedKey)
+                selection = selection.tailMap((DecoratedKey) startKey, 
keyRange.isStartInclusive());
+
+            if (endKey instanceof DecoratedKey)
+                selection = selection.headMap((DecoratedKey) endKey, 
keyRange.isEndInclusive());
+
+            // If we have reach this point it means that one of the 
PartitionPosition is a KeyBound and we have
+            // to use filtering for eliminating the unwanted partitions.
+            Iterator<Partition> iterator = selection.values().iterator();
+
+            return new AbstractIterator<Partition>()
+            {
+                private boolean encounteredPartitionsWithinRange;
+
+                @Override
+                protected Partition computeNext()
+                {
+                    while (iterator.hasNext())
+                    {
+                        Partition partition = iterator.next();
+                        if (dataRange.contains(partition.key()))
+                        {
+                            encounteredPartitionsWithinRange = true;
+                            return partition;
+                        }
+
+                        // we encountered some partitions within the range, 
but the last one is outside of the range: we are done
+                        if (encounteredPartitionsWithinRange)
+                            return endOfData();
+                    }
+
+                    return endOfData();
+                }
+            };
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/virtual/SimpleDataSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/virtual/SimpleDataSet.java 
b/src/java/org/apache/cassandra/db/virtual/SimpleDataSet.java
new file mode 100644
index 0000000..2af3b6a
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/virtual/SimpleDataSet.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.virtual;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.Iterables;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ClusteringIndexFilter;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+/**
+ * A DataSet implementation that is filled on demand and has an easy to use 
API for adding rows.
+ */
+public class SimpleDataSet extends AbstractVirtualTable.AbstractDataSet
+{
+    private final TableMetadata metadata;
+
+    private Row currentRow;
+
+    public SimpleDataSet(TableMetadata metadata)
+    {
+        super(new TreeMap<>(DecoratedKey.comparator));
+        this.metadata = metadata;
+    }
+
+    public SimpleDataSet row(Object... primaryKeyValues)
+    {
+        if (Iterables.size(metadata.primaryKeyColumns()) != 
primaryKeyValues.length)
+            throw new IllegalArgumentException();
+
+        Object[] partitionKeyValues = new 
Object[metadata.partitionKeyColumns().size()];
+        Object[]   clusteringValues = new 
Object[metadata.clusteringColumns().size()];
+
+        System.arraycopy(primaryKeyValues, 0, partitionKeyValues, 0, 
partitionKeyValues.length);
+        System.arraycopy(primaryKeyValues, partitionKeyValues.length, 
clusteringValues, 0, clusteringValues.length);
+
+        DecoratedKey partitionKey = makeDecoratedKey(partitionKeyValues);
+        Clustering clustering = makeClustering(clusteringValues);
+
+        currentRow = new Row(metadata, clustering);
+        SimplePartition partition = (SimplePartition) 
partitions.computeIfAbsent(partitionKey, pk -> new SimplePartition(metadata, 
pk));
+        partition.add(currentRow);
+
+        return this;
+    }
+
+    public SimpleDataSet column(String columnName, Object value)
+    {
+        if (null == currentRow)
+            throw new IllegalStateException();
+        currentRow.add(columnName, value);
+        return this;
+    }
+
+    private DecoratedKey makeDecoratedKey(Object... partitionKeyValues)
+    {
+        ByteBuffer partitionKey = partitionKeyValues.length == 1
+                                ? decompose(metadata.partitionKeyType, 
partitionKeyValues[0])
+                                : ((CompositeType) 
metadata.partitionKeyType).decompose(partitionKeyValues);
+        return metadata.partitioner.decorateKey(partitionKey);
+    }
+
+    private Clustering makeClustering(Object... clusteringValues)
+    {
+        if (clusteringValues.length == 0)
+            return Clustering.EMPTY;
+
+        ByteBuffer[] clusteringByteBuffers = new 
ByteBuffer[clusteringValues.length];
+        for (int i = 0; i < clusteringValues.length; i++)
+            clusteringByteBuffers[i] = 
decompose(metadata.clusteringColumns().get(i).type, clusteringValues[i]);
+        return Clustering.make(clusteringByteBuffers);
+    }
+
+    private static final class SimplePartition implements 
AbstractVirtualTable.Partition
+    {
+        private final DecoratedKey key;
+        private final NavigableMap<Clustering, Row> rows;
+
+        private SimplePartition(TableMetadata metadata, DecoratedKey key)
+        {
+            this.key = key;
+            this.rows = new TreeMap<>(metadata.comparator);
+        }
+
+        private void add(Row row)
+        {
+            rows.put(row.clustering, row);
+        }
+
+        public DecoratedKey key()
+        {
+            return key;
+        }
+
+        public UnfilteredRowIterator toRowIterator(TableMetadata metadata,
+                                                   ClusteringIndexFilter 
clusteringIndexFilter,
+                                                   ColumnFilter columnFilter,
+                                                   long now)
+        {
+            Iterator<Row> iterator = (clusteringIndexFilter.isReversed() ? 
rows.descendingMap() : rows).values().iterator();
+
+            return new AbstractUnfilteredRowIterator(metadata,
+                                                     key,
+                                                     DeletionTime.LIVE,
+                                                     
columnFilter.queriedColumns(),
+                                                     Rows.EMPTY_STATIC_ROW,
+                                                     false,
+                                                     EncodingStats.NO_STATS)
+            {
+                protected Unfiltered computeNext()
+                {
+                    while (iterator.hasNext())
+                    {
+                        Row row = iterator.next();
+                        if (clusteringIndexFilter.selects(row.clustering))
+                            return row.toTableRow(columns, now);
+                    }
+                    return endOfData();
+                }
+            };
+        }
+    }
+
+    private static class Row
+    {
+        private final TableMetadata metadata;
+        private final Clustering clustering;
+
+        private final Map<ColumnMetadata, Object> values = new HashMap<>();
+
+        private Row(TableMetadata metadata, Clustering clustering)
+        {
+            this.metadata = metadata;
+            this.clustering = clustering;
+        }
+
+        private void add(String columnName, Object value)
+        {
+            ColumnMetadata column = 
metadata.getColumn(ByteBufferUtil.bytes(columnName));
+            if (null == column || !column.isRegular())
+                throw new IllegalArgumentException();
+            values.put(column, value);
+        }
+
+        private org.apache.cassandra.db.rows.Row 
toTableRow(RegularAndStaticColumns columns, long now)
+        {
+            org.apache.cassandra.db.rows.Row.Builder builder = 
BTreeRow.unsortedBuilder((int) TimeUnit.MILLISECONDS.toSeconds(now));
+            builder.newRow(clustering);
+
+            columns.forEach(c ->
+            {
+                Object value = values.get(c);
+                if (null != value)
+                    builder.addCell(BufferCell.live(c, now, decompose(c.type, 
value)));
+            });
+
+            return builder.build();
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private static <T> ByteBuffer decompose(AbstractType<?> type, T value)
+    {
+        return ((AbstractType<T>) type).decompose(value);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java 
b/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java
new file mode 100644
index 0000000..8d6f59b
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.virtual;
+
+import com.google.common.collect.ImmutableList;
+
+public final class SystemViewsKeyspace extends VirtualKeyspace
+{
+    private static final String NAME = "system_views";
+
+    public static SystemViewsKeyspace instance = new SystemViewsKeyspace();
+
+    private SystemViewsKeyspace()
+    {
+        super(NAME, ImmutableList.of());
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to