Merge branch cassandra-2.1 into cassandra-2.2

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f497c13e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f497c13e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f497c13e

Branch: refs/heads/trunk
Commit: f497c13ee33cc76b7c7bd4c6d4d12caf475ca79d
Parents: def5803 f587397
Author: blerer <benjamin.le...@datastax.com>
Authored: Fri Oct 16 14:44:29 2015 +0200
Committer: blerer <benjamin.le...@datastax.com>
Committed: Fri Oct 16 14:45:08 2015 +0200

----------------------------------------------------------------------
 .../org/apache/cassandra/cql3/ResultSet.java    |  5 ++
 .../apache/cassandra/cql3/UntypedResultSet.java |  6 +--
 .../cassandra/cql3/selection/Selection.java     | 57 +++++++++++++-------
 .../cassandra/cql3/selection/Selector.java      | 12 +++++
 .../cql3/selection/SelectorFactories.java       | 20 +++++++
 .../cql3/selection/SimpleSelector.java          |  6 +++
 .../cql3/statements/SelectStatement.java        |  2 +-
 .../operations/SelectOrderByTest.java           | 52 ++++++++++++++++++
 8 files changed, 138 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f497c13e/src/java/org/apache/cassandra/cql3/ResultSet.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f497c13e/src/java/org/apache/cassandra/cql3/UntypedResultSet.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/UntypedResultSet.java
index 49e0d86,a0b6ae7..e8d610d
--- a/src/java/org/apache/cassandra/cql3/UntypedResultSet.java
+++ b/src/java/org/apache/cassandra/cql3/UntypedResultSet.java
@@@ -73,9 -74,9 +73,9 @@@ public abstract class UntypedResultSet 
  
          public Row one()
          {
 -            if (cqlRows.rows.size() != 1)
 -                throw new IllegalStateException("One row required, " + 
cqlRows.rows.size() + " found");
 +            if (cqlRows.size() != 1)
 +                throw new IllegalStateException("One row required, " + 
cqlRows.size() + " found");
-             return new Row(cqlRows.metadata.names, cqlRows.rows.get(0));
+             return new Row(cqlRows.metadata.requestNames(), 
cqlRows.rows.get(0));
          }
  
          public Iterator<Row> iterator()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f497c13e/src/java/org/apache/cassandra/cql3/selection/Selection.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/selection/Selection.java
index 13e030f,0000000..f6925b2
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/cql3/selection/Selection.java
+++ b/src/java/org/apache/cassandra/cql3/selection/Selection.java
@@@ -1,545 -1,0 +1,566 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.cql3.selection;
 +
 +import java.nio.ByteBuffer;
 +import java.util.*;
 +
 +import com.google.common.base.Objects;
 +import com.google.common.base.Predicate;
 +import com.google.common.collect.Iterables;
 +import com.google.common.collect.Iterators;
 +
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.ColumnDefinition;
 +import org.apache.cassandra.cql3.*;
 +import org.apache.cassandra.cql3.functions.Function;
 +import org.apache.cassandra.db.Cell;
 +import org.apache.cassandra.db.CounterCell;
 +import org.apache.cassandra.db.ExpiringCell;
 +import org.apache.cassandra.db.context.CounterContext;
 +import org.apache.cassandra.db.marshal.UTF8Type;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +
 +public abstract class Selection
 +{
 +    /**
 +     * A predicate that returns <code>true</code> for static columns.
 +     */
 +    private static final Predicate<ColumnDefinition> STATIC_COLUMN_FILTER = 
new Predicate<ColumnDefinition>()
 +    {
 +        public boolean apply(ColumnDefinition def)
 +        {
 +            return def.isStatic();
 +        }
 +    };
 +
 +    private final CFMetaData cfm;
 +    private final List<ColumnDefinition> columns;
 +    private final SelectionColumnMapping columnMapping;
 +    private final ResultSet.ResultMetadata metadata;
 +    private final boolean collectTimestamps;
 +    private final boolean collectTTLs;
 +
 +    protected Selection(CFMetaData cfm,
 +                        List<ColumnDefinition> columns,
 +                        SelectionColumnMapping columnMapping,
 +                        boolean collectTimestamps,
 +                        boolean collectTTLs)
 +    {
 +        this.cfm = cfm;
 +        this.columns = columns;
 +        this.columnMapping = columnMapping;
 +        this.metadata = new 
ResultSet.ResultMetadata(columnMapping.getColumnSpecifications());
 +        this.collectTimestamps = collectTimestamps;
 +        this.collectTTLs = collectTTLs;
 +    }
 +
 +    // Overriden by SimpleSelection when appropriate.
 +    public boolean isWildcard()
 +    {
 +        return false;
 +    }    
 +
 +    /**
 +     * Checks if this selection contains static columns.
 +     * @return <code>true</code> if this selection contains static columns, 
<code>false</code> otherwise;
 +     */
 +    public boolean containsStaticColumns()
 +    {
 +        if (!cfm.hasStaticColumns())
 +            return false;
 +
 +        if (isWildcard())
 +            return true;
 +
 +        return !Iterables.isEmpty(Iterables.filter(columns, 
STATIC_COLUMN_FILTER));
 +    }
 +
 +    /**
 +     * Checks if this selection contains only static columns.
 +     * @return <code>true</code> if this selection contains only static 
columns, <code>false</code> otherwise;
 +     */
 +    public boolean containsOnlyStaticColumns()
 +    {
 +        if (!containsStaticColumns())
 +            return false;
 +
 +        if (isWildcard())
 +            return false;
 +
 +        for (ColumnDefinition def : getColumns())
 +        {
 +            if (!def.isPartitionKey() && !def.isStatic())
 +                return false;
 +        }
 +
 +        return true;
 +    }
 +
 +    /**
 +     * Checks if this selection contains a collection.
 +     *
 +     * @return <code>true</code> if this selection contains a collection, 
<code>false</code> otherwise.
 +     */
 +    public boolean containsACollection()
 +    {
 +        if (!cfm.comparator.hasCollections())
 +            return false;
 +
 +        for (ColumnDefinition def : getColumns())
 +            if (def.type.isCollection() && def.type.isMultiCell())
 +                return true;
 +
 +        return false;
 +    }
 +
-     /**
-      * Returns the index of the specified column.
-      *
-      * @param def the column definition
-      * @return the index of the specified column
-      */
-     public int indexOf(final ColumnDefinition def)
-     {
-         return Iterators.indexOf(getColumns().iterator(), new 
Predicate<ColumnDefinition>()
-            {
-                public boolean apply(ColumnDefinition n)
-                {
-                    return def.name.equals(n.name);
-                }
-            });
-     }
- 
 +    public ResultSet.ResultMetadata getResultMetadata(boolean isJson)
 +    {
 +        if (!isJson)
 +            return metadata;
 +
 +        ColumnSpecification firstColumn = metadata.names.get(0);
 +        ColumnSpecification jsonSpec = new 
ColumnSpecification(firstColumn.ksName, firstColumn.cfName, 
Json.JSON_COLUMN_ID, UTF8Type.instance);
 +        return new ResultSet.ResultMetadata(Arrays.asList(jsonSpec));
 +    }
 +
 +    public static Selection wildcard(CFMetaData cfm)
 +    {
 +        List<ColumnDefinition> all = new ArrayList<>(cfm.allColumns().size());
 +        Iterators.addAll(all, cfm.allColumnsInSelectOrder());
 +        return new SimpleSelection(cfm, all, true);
 +    }
 +
 +    public static Selection forColumns(CFMetaData cfm, List<ColumnDefinition> 
columns)
 +    {
 +        return new SimpleSelection(cfm, columns, false);
 +    }
 +
 +    public int addColumnForOrdering(ColumnDefinition c)
 +    {
 +        columns.add(c);
 +        metadata.addNonSerializedColumn(c);
 +        return columns.size() - 1;
 +    }
 +
 +    public Iterable<Function> getFunctions()
 +    {
 +        return Collections.emptySet();
 +    }
 +
 +    private static boolean processesSelection(List<RawSelector> rawSelectors)
 +    {
 +        for (RawSelector rawSelector : rawSelectors)
 +        {
 +            if (rawSelector.processesSelection())
 +                return true;
 +        }
 +        return false;
 +    }
 +
 +    public static Selection fromSelectors(CFMetaData cfm, List<RawSelector> 
rawSelectors) throws InvalidRequestException
 +    {
 +        List<ColumnDefinition> defs = new ArrayList<>();
 +
 +        SelectorFactories factories =
 +                
SelectorFactories.createFactoriesAndCollectColumnDefinitions(RawSelector.toSelectables(rawSelectors,
 cfm), cfm, defs);
 +        SelectionColumnMapping mapping = collectColumnMappings(cfm, 
rawSelectors, factories);
 +
 +        return (processesSelection(rawSelectors) || rawSelectors.size() != 
defs.size())
 +               ? new SelectionWithProcessing(cfm, defs, mapping, factories)
 +               : new SimpleSelection(cfm, defs, mapping, false);
 +    }
 +
++    /**
++     * Returns the index of the specified column within the resultset
++     * @param c the column
++     * @return the index of the specified column within the resultset or -1
++     */
++    public int getResultSetIndex(ColumnDefinition c)
++    {
++        return getColumnIndex(c);
++    }
++
++    /**
++     * Returns the index of the specified column
++     * @param c the column
++     * @return the index of the specified column or -1
++     */
++    protected final int getColumnIndex(ColumnDefinition c)
++    {
++        for (int i = 0, m = columns.size(); i < m; i++)
++            if (columns.get(i).name.equals(c.name))
++                return i;
++        return -1;
++    }
++
 +    private static SelectionColumnMapping collectColumnMappings(CFMetaData 
cfm,
 +                                                                
List<RawSelector> rawSelectors,
 +                                                                
SelectorFactories factories)
 +    {
 +        SelectionColumnMapping selectionColumns = 
SelectionColumnMapping.newMapping();
 +        Iterator<RawSelector> iter = rawSelectors.iterator();
 +        for (Selector.Factory factory : factories)
 +        {
 +            ColumnSpecification colSpec = factory.getColumnSpecification(cfm);
 +            ColumnIdentifier alias = iter.next().alias;
 +            factory.addColumnMapping(selectionColumns,
 +                                     alias == null ? colSpec : 
colSpec.withAlias(alias));
 +        }
 +        return selectionColumns;
 +    }
 +
 +    protected abstract Selectors newSelectors() throws 
InvalidRequestException;
 +
 +    /**
 +     * @return the list of CQL3 columns value this SelectionClause needs.
 +     */
 +    public List<ColumnDefinition> getColumns()
 +    {
 +        return columns;
 +    }
 +
 +    /**
 +     * @return the mappings between resultset columns and the underlying 
columns
 +     */
 +    public SelectionColumns getColumnMapping()
 +    {
 +        return columnMapping;
 +    }
 +
 +    public ResultSetBuilder resultSetBuilder(long now, boolean isJson) throws 
InvalidRequestException
 +    {
 +        return new ResultSetBuilder(now, isJson);
 +    }
 +
 +    public abstract boolean isAggregate();
 +
 +    @Override
 +    public String toString()
 +    {
 +        return Objects.toStringHelper(this)
 +                .add("columns", columns)
 +                .add("columnMapping", columnMapping)
 +                .add("metadata", metadata)
 +                .add("collectTimestamps", collectTimestamps)
 +                .add("collectTTLs", collectTTLs)
 +                .toString();
 +    }
 +
 +    public class ResultSetBuilder
 +    {
 +        private final ResultSet resultSet;
 +
 +        /**
 +         * As multiple thread can access a <code>Selection</code> instance 
each <code>ResultSetBuilder</code> will use
 +         * its own <code>Selectors</code> instance.
 +         */
 +        private final Selectors selectors;
 +
 +        /*
 +         * We'll build CQL3 row one by one.
 +         * The currentRow is the values for the (CQL3) columns we've fetched.
 +         * We also collect timestamps and ttls for the case where the 
writetime and
 +         * ttl functions are used. Note that we might collect timestamp 
and/or ttls
 +         * we don't care about, but since the array below are allocated just 
once,
 +         * it doesn't matter performance wise.
 +         */
 +        List<ByteBuffer> current;
 +        final long[] timestamps;
 +        final int[] ttls;
 +        final long now;
 +
 +        private final boolean isJson;
 +
 +        private ResultSetBuilder(long now, boolean isJson) throws 
InvalidRequestException
 +        {
 +            this.resultSet = new ResultSet(getResultMetadata(isJson).copy(), 
new ArrayList<List<ByteBuffer>>());
 +            this.selectors = newSelectors();
 +            this.timestamps = collectTimestamps ? new long[columns.size()] : 
null;
 +            this.ttls = collectTTLs ? new int[columns.size()] : null;
 +            this.now = now;
 +            this.isJson = isJson;
 +        }
 +
 +        public void add(ByteBuffer v)
 +        {
 +            current.add(v);
 +        }
 +
 +        public void add(Cell c)
 +        {
 +            current.add(isDead(c) ? null : value(c));
 +            if (timestamps != null)
 +            {
 +                timestamps[current.size() - 1] = isDead(c) ? Long.MIN_VALUE : 
c.timestamp();
 +            }
 +            if (ttls != null)
 +            {
 +                int ttl = -1;
 +                if (!isDead(c) && c instanceof ExpiringCell)
 +                    ttl = c.getLocalDeletionTime() - (int) (now / 1000);
 +                ttls[current.size() - 1] = ttl;
 +            }
 +        }
 +
 +        private boolean isDead(Cell c)
 +        {
 +            return c == null || !c.isLive(now);
 +        }
 +
 +        public void newRow(int protocolVersion) throws InvalidRequestException
 +        {
 +            if (current != null)
 +            {
 +                selectors.addInputRow(protocolVersion, this);
 +                if (!selectors.isAggregate())
 +                {
 +                    resultSet.addRow(getOutputRow(protocolVersion));
 +                    selectors.reset();
 +                }
 +            }
 +            current = new ArrayList<>(columns.size());
 +        }
 +
 +        public ResultSet build(int protocolVersion) throws 
InvalidRequestException
 +        {
 +            if (current != null)
 +            {
 +                selectors.addInputRow(protocolVersion, this);
 +                resultSet.addRow(getOutputRow(protocolVersion));
 +                selectors.reset();
 +                current = null;
 +            }
 +
 +            if (resultSet.isEmpty() && selectors.isAggregate())
 +                resultSet.addRow(getOutputRow(protocolVersion));
 +            return resultSet;
 +        }
 +
 +        private List<ByteBuffer> getOutputRow(int protocolVersion)
 +        {
 +            List<ByteBuffer> outputRow = 
selectors.getOutputRow(protocolVersion);
 +            return isJson ? rowToJson(outputRow, protocolVersion)
 +                          : outputRow;
 +        }
 +
 +        private List<ByteBuffer> rowToJson(List<ByteBuffer> row, int 
protocolVersion)
 +        {
 +            StringBuilder sb = new StringBuilder("{");
 +            for (int i = 0; i < metadata.names.size(); i++)
 +            {
 +                if (i > 0)
 +                    sb.append(", ");
 +
 +                ColumnSpecification spec = metadata.names.get(i);
 +                String columnName = spec.name.toString();
 +                if (!columnName.equals(columnName.toLowerCase(Locale.US)))
 +                    columnName = "\"" + columnName + "\"";
 +
 +                ByteBuffer buffer = row.get(i);
 +                sb.append('"');
 +                sb.append(Json.JSON_STRING_ENCODER.quoteAsString(columnName));
 +                sb.append("\": ");
 +                if (buffer == null)
 +                    sb.append("null");
 +                else
 +                    sb.append(spec.type.toJSONString(buffer, 
protocolVersion));
 +            }
 +            sb.append("}");
 +            return 
Collections.singletonList(UTF8Type.instance.getSerializer().serialize(sb.toString()));
 +        }
 +
 +        private ByteBuffer value(Cell c)
 +        {
 +            return (c instanceof CounterCell)
 +                ? 
ByteBufferUtil.bytes(CounterContext.instance().total(c.value()))
 +                : c.value();
 +        }
 +    }
 +
 +    private static interface Selectors
 +    {
 +        public boolean isAggregate();
 +
 +        /**
 +         * Adds the current row of the specified 
<code>ResultSetBuilder</code>.
 +         *
 +         * @param rs the <code>ResultSetBuilder</code>
 +         * @throws InvalidRequestException
 +         */
 +        public void addInputRow(int protocolVersion, ResultSetBuilder rs) 
throws InvalidRequestException;
 +
 +        public List<ByteBuffer> getOutputRow(int protocolVersion) throws 
InvalidRequestException;
 +
 +        public void reset();
 +    }
 +
 +    // Special cased selection for when no function is used (this save some 
allocations).
 +    private static class SimpleSelection extends Selection
 +    {
 +        private final boolean isWildcard;
 +
 +        public SimpleSelection(CFMetaData cfm, List<ColumnDefinition> 
columns, boolean isWildcard)
 +        {
 +            this(cfm, columns, SelectionColumnMapping.simpleMapping(columns), 
isWildcard);
 +        }
 +
 +        public SimpleSelection(CFMetaData cfm,
 +                               List<ColumnDefinition> columns,
 +                               SelectionColumnMapping metadata,
 +                               boolean isWildcard)
 +        {
 +            /*
 +             * In theory, even a simple selection could have multiple time 
the same column, so we
 +             * could filter those duplicate out of columns. But since we're 
very unlikely to
 +             * get much duplicate in practice, it's more efficient not to 
bother.
 +             */
 +            super(cfm, columns, metadata, false, false);
 +            this.isWildcard = isWildcard;
 +        }
 +
 +        @Override
 +        public boolean isWildcard()
 +        {
 +            return isWildcard;
 +        }
 +
 +        public boolean isAggregate()
 +        {
 +            return false;
 +        }
 +
 +        protected Selectors newSelectors()
 +        {
 +            return new Selectors()
 +            {
 +                private List<ByteBuffer> current;
 +
 +                public void reset()
 +                {
 +                    current = null;
 +                }
 +
 +                public List<ByteBuffer> getOutputRow(int protocolVersion)
 +                {
 +                    return current;
 +                }
 +
 +                public void addInputRow(int protocolVersion, ResultSetBuilder 
rs) throws InvalidRequestException
 +                {
 +                    current = rs.current;
 +                }
 +
 +                public boolean isAggregate()
 +                {
 +                    return false;
 +                }
 +            };
 +        }
 +    }
 +
 +    private static class SelectionWithProcessing extends Selection
 +    {
 +        private final SelectorFactories factories;
 +
 +        public SelectionWithProcessing(CFMetaData cfm,
 +                                       List<ColumnDefinition> columns,
 +                                       SelectionColumnMapping metadata,
 +                                       SelectorFactories factories) throws 
InvalidRequestException
 +        {
 +            super(cfm,
 +                  columns,
 +                  metadata,
 +                  factories.containsWritetimeSelectorFactory(),
 +                  factories.containsTTLSelectorFactory());
 +
 +            this.factories = factories;
 +        }
 +
 +        @Override
 +        public Iterable<Function> getFunctions()
 +        {
 +            return factories.getFunctions();
 +        }
 +
 +        @Override
++        public int getResultSetIndex(ColumnDefinition c)
++        {
++            int index = getColumnIndex(c);
++
++            if (index < 0)
++                return -1;
++
++            for (int i = 0, m = factories.size(); i < m; i++)
++                if (factories.get(i).isSimpleSelectorFactory(index))
++                    return i;
++
++            return -1;
++        }
++
++        @Override
 +        public int addColumnForOrdering(ColumnDefinition c)
 +        {
 +            int index = super.addColumnForOrdering(c);
 +            factories.addSelectorForOrdering(c, index);
-             return index;
++            return factories.size() - 1;
 +        }
 +
 +        public boolean isAggregate()
 +        {
 +            return factories.doesAggregation();
 +        }
 +
 +        protected Selectors newSelectors() throws InvalidRequestException
 +        {
 +            return new Selectors()
 +            {
 +                private final List<Selector> selectors = 
factories.newInstances();
 +
 +                public void reset()
 +                {
 +                    for (Selector selector : selectors)
 +                        selector.reset();
 +                }
 +
 +                public boolean isAggregate()
 +                {
 +                    return factories.doesAggregation();
 +                }
 +
 +                public List<ByteBuffer> getOutputRow(int protocolVersion) 
throws InvalidRequestException
 +                {
 +                    List<ByteBuffer> outputRow = new 
ArrayList<>(selectors.size());
 +
 +                    for (Selector selector: selectors)
 +                        outputRow.add(selector.getOutput(protocolVersion));
 +
 +                    return outputRow;
 +                }
 +
 +                public void addInputRow(int protocolVersion, ResultSetBuilder 
rs) throws InvalidRequestException
 +                {
 +                    for (Selector selector : selectors)
 +                        selector.addInput(protocolVersion, rs);
 +                }
 +            };
 +        }
 +
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f497c13e/src/java/org/apache/cassandra/cql3/selection/Selector.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/selection/Selector.java
index 1bddcc8,0000000..7b818b5
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/cql3/selection/Selector.java
+++ b/src/java/org/apache/cassandra/cql3/selection/Selector.java
@@@ -1,195 -1,0 +1,207 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.cql3.selection;
 +
 +import java.nio.ByteBuffer;
 +import java.util.Collections;
 +
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.cql3.AssignmentTestable;
 +import org.apache.cassandra.cql3.ColumnIdentifier;
 +import org.apache.cassandra.cql3.ColumnSpecification;
 +import org.apache.cassandra.cql3.functions.Function;
 +import org.apache.cassandra.cql3.selection.Selection.ResultSetBuilder;
 +import org.apache.cassandra.db.marshal.AbstractType;
 +import org.apache.cassandra.db.marshal.ReversedType;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +
 +/**
 + * A <code>Selector</code> is used to convert the data returned by the 
storage engine into the data requested by the 
 + * user. They correspond to the &lt;selector&gt; elements from the select 
clause.
 + * <p>Since the introduction of aggregation, <code>Selector</code>s cannot be 
called anymore by multiple threads 
 + * as they have an internal state.</p>
 + */
 +public abstract class Selector implements AssignmentTestable
 +{
 +    /**
 +     * A factory for <code>Selector</code> instances.
 +     */
 +    public static abstract class Factory
 +    {
 +        public Iterable<Function> getFunctions()
 +        {
 +            return Collections.emptySet();
 +        }
 +
 +        /**
 +         * Returns the column specification corresponding to the output value 
of the selector instances created by
 +         * this factory.
 +         *
 +         * @param cfm the column family meta data
 +         * @return a column specification
 +         */
 +        public final ColumnSpecification getColumnSpecification(CFMetaData 
cfm)
 +        {
 +            return new ColumnSpecification(cfm.ksName,
 +                                           cfm.cfName,
 +                                           new 
ColumnIdentifier(getColumnName(), true),
 +                                           getReturnType());
 +        }
 +
 +        /**
 +         * Creates a new <code>Selector</code> instance.
 +         *
 +         * @return a new <code>Selector</code> instance
 +         */
 +        public abstract Selector newInstance() throws InvalidRequestException;
 +
 +        /**
 +         * Checks if this factory creates selectors instances that creates 
aggregates.
 +         *
 +         * @return <code>true</code> if this factory creates selectors 
instances that creates aggregates,
 +         * <code>false</code> otherwise
 +         */
 +        public boolean isAggregateSelectorFactory()
 +        {
 +            return false;
 +        }
 +
 +        /**
 +         * Checks if this factory creates <code>writetime</code> selectors 
instances.
 +         *
 +         * @return <code>true</code> if this factory creates 
<code>writetime</code> selectors instances,
 +         * <code>false</code> otherwise
 +         */
 +        public boolean isWritetimeSelectorFactory()
 +        {
 +            return false;
 +        }
 +
 +        /**
 +         * Checks if this factory creates <code>TTL</code> selectors 
instances.
 +         *
 +         * @return <code>true</code> if this factory creates <code>TTL</code> 
selectors instances,
 +         * <code>false</code> otherwise
 +         */
 +        public boolean isTTLSelectorFactory()
 +        {
 +            return false;
 +        }
 +
 +        /**
++         * Checks if this factory creates <code>Selector</code>s that simply 
return the specified column.
++         *
++         * @param index the column index
++         * @return <code>true</code> if this factory creates 
<code>Selector</code>s that simply return
++         * the specified column, <code>false</code> otherwise.
++         */
++        public boolean isSimpleSelectorFactory(int index)
++        {
++            return false;
++        }
++
++        /**
 +         * Returns the name of the column corresponding to the output value 
of the selector instances created by
 +         * this factory.
 +         *
 +         * @return a column name
 +         */
 +        protected abstract String getColumnName();
 +
 +        /**
 +         * Returns the type of the values returned by the selector instances 
created by this factory.
 +         *
 +         * @return the selector output type
 +         */
 +        protected abstract AbstractType<?> getReturnType();
 +
 +        /**
 +         * Record a mapping between the ColumnDefinitions that are used by 
the selector
 +         * instances created by this factory and a column in the 
ResultSet.Metadata
 +         * returned with a query. In most cases, this is likely to be a 1:1 
mapping,
 +         * but some selector instances may utilise multiple columns (or none 
at all)
 +         * to produce a value (i.e. functions).
 +         *
 +         * @param mapping the instance of the column mapping belonging to the 
current query's Selection
 +         * @param resultsColumn the column in the ResultSet.Metadata to which 
the ColumnDefinitions used
 +         *                      by the Selector are to be mapped
 +         */
 +        protected abstract void addColumnMapping(SelectionColumnMapping 
mapping, ColumnSpecification resultsColumn);
 +    }
 +
 +    /**
 +     * Add the current value from the specified <code>ResultSetBuilder</code>.
 +     *
 +     * @param protocolVersion protocol version used for serialization
 +     * @param rs the <code>ResultSetBuilder</code>
 +     * @throws InvalidRequestException if a problem occurs while add the 
input value
 +     */
 +    public abstract void addInput(int protocolVersion, ResultSetBuilder rs) 
throws InvalidRequestException;
 +
 +    /**
 +     * Returns the selector output.
 +     *
 +     * @param protocolVersion protocol version used for serialization
 +     * @return the selector output
 +     * @throws InvalidRequestException if a problem occurs while computing 
the output value
 +     */
 +    public abstract ByteBuffer getOutput(int protocolVersion) throws 
InvalidRequestException;
 +
 +    /**
 +     * Returns the <code>Selector</code> output type.
 +     *
 +     * @return the <code>Selector</code> output type.
 +     */
 +    public abstract AbstractType<?> getType();
 +
 +    /**
 +     * Checks if this <code>Selector</code> is creating aggregates.
 +     *
 +     * @return <code>true</code> if this <code>Selector</code> is creating 
aggregates <code>false</code>
 +     * otherwise.
 +     */
 +    public boolean isAggregate()
 +    {
 +        return false;
 +    }
 +
 +    /**
 +     * Reset the internal state of this <code>Selector</code>.
 +     */
 +    public abstract void reset();
 +
 +    public final AssignmentTestable.TestResult testAssignment(String 
keyspace, ColumnSpecification receiver)
 +    {
 +        // We should ignore the fact that the output type is frozen in our 
comparison as functions do not support
 +        // frozen types for arguments
 +        AbstractType<?> receiverType = receiver.type;
 +        if (getType().isFrozenCollection())
 +            receiverType = receiverType.freeze();
 +
 +        if (getType().isReversed())
 +            receiverType = ReversedType.getInstance(receiverType);
 +
 +        if (receiverType.equals(getType()))
 +            return AssignmentTestable.TestResult.EXACT_MATCH;
 +
 +        if (receiverType.isValueCompatibleWith(getType()))
 +            return AssignmentTestable.TestResult.WEAKLY_ASSIGNABLE;
 +
 +        return AssignmentTestable.TestResult.NOT_ASSIGNABLE;
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f497c13e/src/java/org/apache/cassandra/cql3/selection/SelectorFactories.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/selection/SelectorFactories.java
index 5ea2957,0000000..fbbfbb5
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/cql3/selection/SelectorFactories.java
+++ b/src/java/org/apache/cassandra/cql3/selection/SelectorFactories.java
@@@ -1,194 -1,0 +1,214 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.cql3.selection;
 +
 +import java.util.*;
 +
 +import com.google.common.collect.Iterables;
 +import com.google.common.collect.Lists;
 +
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.ColumnDefinition;
 +import org.apache.cassandra.cql3.functions.Function;
 +import org.apache.cassandra.cql3.selection.Selector.Factory;
 +import org.apache.cassandra.db.marshal.AbstractType;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +
 +/**
 + * A set of <code>Selector</code> factories.
 + */
 +final class SelectorFactories implements Iterable<Selector.Factory>
 +{
 +    /**
 +     * The <code>Selector</code> factories.
 +     */
 +    private final List<Selector.Factory> factories;
 +
 +    /**
 +     * <code>true</code> if one of the factory creates writetime selectors.
 +     */
 +    private boolean containsWritetimeFactory;
 +
 +    /**
 +     * <code>true</code> if one of the factory creates TTL selectors.
 +     */
 +    private boolean containsTTLFactory;
 +
 +    /**
 +     * The number of factories creating aggregates.
 +     */
 +    private int numberOfAggregateFactories;
 +
 +    /**
 +     * Creates a new <code>SelectorFactories</code> instance and collect the 
column definitions.
 +     *
 +     * @param selectables the <code>Selectable</code>s for which the 
factories must be created
 +     * @param cfm the Column Family Definition
 +     * @param defs the collector parameter for the column definitions
 +     * @return a new <code>SelectorFactories</code> instance
 +     * @throws InvalidRequestException if a problem occurs while creating the 
factories
 +     */
 +    public static SelectorFactories 
createFactoriesAndCollectColumnDefinitions(List<Selectable> selectables,
 +                                                                              
 CFMetaData cfm,
 +                                                                              
 List<ColumnDefinition> defs)
 +                                                                              
 throws InvalidRequestException
 +    {
 +        return new SelectorFactories(selectables, cfm, defs);
 +    }
 +
 +    private SelectorFactories(List<Selectable> selectables,
 +                              CFMetaData cfm,
 +                              List<ColumnDefinition> defs)
 +                              throws InvalidRequestException
 +    {
 +        factories = new ArrayList<>(selectables.size());
 +
 +        for (Selectable selectable : selectables)
 +        {
 +            Factory factory = selectable.newSelectorFactory(cfm, defs);
 +            containsWritetimeFactory |= factory.isWritetimeSelectorFactory();
 +            containsTTLFactory |= factory.isTTLSelectorFactory();
 +            if (factory.isAggregateSelectorFactory())
 +                ++numberOfAggregateFactories;
 +            factories.add(factory);
 +        }
 +    }
 +
 +    public Iterable<Function> getFunctions()
 +    {
 +        Iterable<Function> functions = Collections.emptySet();
 +        for (Factory factory : factories)
 +            if (factory != null)
 +                functions = Iterables.concat(functions, 
factory.getFunctions());
 +        return functions;
 +    }
 +
 +    /**
++     * Returns the factory with the specified index.
++     *
++     * @param i the factory index
++     * @return the factory with the specified index
++     */
++    public Selector.Factory get(int i)
++    {
++        return factories.get(i);
++    }
++
++    /**
 +     * Adds a new <code>Selector.Factory</code> for a column that is needed 
only for ORDER BY purposes.
 +     * @param def the column that is needed for ordering
 +     * @param index the index of the column definition in the Selection's 
list of columns
 +     */
 +    public void addSelectorForOrdering(ColumnDefinition def, int index)
 +    {
 +        factories.add(SimpleSelector.newFactory(def, index));
 +    }
 +
 +    /**
 +     * Whether the selector built by this factory does aggregation or not 
(either directly or in a sub-selector).
 +     *
 +     * @return <code>true</code> if the selector built by this factor does 
aggregation, <code>false</code> otherwise.
 +     */
 +    public boolean doesAggregation()
 +    {
 +        return numberOfAggregateFactories > 0;
 +    }
 +
 +    /**
 +     * Checks if this <code>SelectorFactories</code> contains at least one 
factory for writetime selectors.
 +     *
 +     * @return <code>true</code> if this <code>SelectorFactories</code> 
contains at least one factory for writetime
 +     * selectors, <code>false</code> otherwise.
 +     */
 +    public boolean containsWritetimeSelectorFactory()
 +    {
 +        return containsWritetimeFactory;
 +    }
 +
 +    /**
 +     * Checks if this <code>SelectorFactories</code> contains at least one 
factory for TTL selectors.
 +     *
 +     * @return <code>true</code> if this <code>SelectorFactories</code> 
contains at least one factory for TTL
 +     * selectors, <code>false</code> otherwise.
 +     */
 +    public boolean containsTTLSelectorFactory()
 +    {
 +        return containsTTLFactory;
 +    }
 +
 +    /**
 +     * Creates a list of new <code>Selector</code> instances.
 +     * @return a list of new <code>Selector</code> instances.
 +     */
 +    public List<Selector> newInstances() throws InvalidRequestException
 +    {
 +        List<Selector> selectors = new ArrayList<>(factories.size());
 +        for (Selector.Factory factory : factories)
 +        {
 +            selectors.add(factory.newInstance());
 +        }
 +        return selectors;
 +    }
 +
 +    public Iterator<Factory> iterator()
 +    {
 +        return factories.iterator();
 +    }
 +
 +    /**
 +     * Returns the names of the columns corresponding to the output values of 
the selector instances created by
 +     * these factories.
 +     *
 +     * @return a list of column names
 +     */
 +    public List<String> getColumnNames()
 +    {
 +        return Lists.transform(factories, new 
com.google.common.base.Function<Selector.Factory, String>()
 +        {
 +            public String apply(Selector.Factory factory)
 +            {
 +                return factory.getColumnName();
 +            }
 +        });
 +    }
 +
 +    /**
 +     * Returns a list of the return types of the selector instances created 
by these factories.
 +     *
 +     * @return a list of types
 +     */
 +    public List<AbstractType<?>> getReturnTypes()
 +    {
 +        return Lists.transform(factories, new 
com.google.common.base.Function<Selector.Factory, AbstractType<?>>()
 +        {
 +            public AbstractType<?> apply(Selector.Factory factory)
 +            {
 +                return factory.getReturnType();
 +            }
 +        });
 +    }
++
++    /**
++     * Returns the number of factories.
++     * @return the number of factories
++     */
++    public int size()
++    {
++        return factories.size();
++    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f497c13e/src/java/org/apache/cassandra/cql3/selection/SimpleSelector.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/selection/SimpleSelector.java
index 2e0514a,0000000..e4040fa
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/cql3/selection/SimpleSelector.java
+++ b/src/java/org/apache/cassandra/cql3/selection/SimpleSelector.java
@@@ -1,106 -1,0 +1,112 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.cql3.selection;
 +
 +import java.nio.ByteBuffer;
 +
 +import org.apache.cassandra.config.ColumnDefinition;
 +import org.apache.cassandra.cql3.ColumnSpecification;
 +import org.apache.cassandra.cql3.selection.Selection.ResultSetBuilder;
 +import org.apache.cassandra.db.marshal.AbstractType;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +
 +public final class SimpleSelector extends Selector
 +{
 +    private final String columnName;
 +    private final int idx;
 +    private final AbstractType<?> type;
 +    private ByteBuffer current;
 +    private boolean isSet;
 +
 +    public static Factory newFactory(final ColumnDefinition def, final int 
idx)
 +    {
 +        return new Factory()
 +        {
 +            @Override
 +            protected String getColumnName()
 +            {
 +                return def.name.toString();
 +            }
 +
 +            @Override
 +            protected AbstractType<?> getReturnType()
 +            {
 +                return def.type;
 +            }
 +
 +            protected void addColumnMapping(SelectionColumnMapping mapping, 
ColumnSpecification resultColumn)
 +            {
 +               mapping.addMapping(resultColumn, def);
 +            }
 +
 +            @Override
 +            public Selector newInstance()
 +            {
 +                return new SimpleSelector(def.name.toString(), idx, def.type);
 +            }
++
++            @Override
++            public boolean isSimpleSelectorFactory(int index)
++            {
++                return index == idx;
++            }
 +        };
 +    }
 +
 +    @Override
 +    public void addInput(int protocolVersion, ResultSetBuilder rs) throws 
InvalidRequestException
 +    {
 +        if (!isSet)
 +        {
 +            isSet = true;
 +            current = rs.current.get(idx);
 +        }
 +    }
 +
 +    @Override
 +    public ByteBuffer getOutput(int protocolVersion) throws 
InvalidRequestException
 +    {
 +        return current;
 +    }
 +
 +    @Override
 +    public void reset()
 +    {
 +        isSet = false;
 +        current = null;
 +    }
 +
 +    @Override
 +    public AbstractType<?> getType()
 +    {
 +        return type;
 +    }
 +
 +    @Override
 +    public String toString()
 +    {
 +        return columnName;
 +    }
 +
 +    private SimpleSelector(String columnName, int idx, AbstractType<?> type)
 +    {
 +        this.columnName = columnName;
 +        this.idx = idx;
 +        this.type = type;
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f497c13e/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index e6ac6c0,8a6d704..5f142ce
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@@ -874,85 -1647,576 +874,85 @@@ public class SelectStatement implement
              return prepLimit;
          }
  
 -        private void updateRestrictionsForRelation(SelectStatement stmt, 
List<ColumnDefinition> defs, MultiColumnRelation relation, 
VariableSpecifications boundNames) throws InvalidRequestException
 +        private static void verifyOrderingIsAllowed(StatementRestrictions 
restrictions) throws InvalidRequestException
          {
 -            List<ColumnDefinition> restrictedColumns = new ArrayList<>();
 -            Set<ColumnDefinition> seen = new HashSet<>();
 -            Restriction existing = null;
 -
 -            int previousPosition = defs.get(0).position() - 1;
 -            for (int i = 0, m = defs.size(); i < m; i++)
 -            {
 -                ColumnDefinition def = defs.get(i);
 -
 -                // ensure multi-column restriction only applies to clustering 
columns
 -                if (def.kind != ColumnDefinition.Kind.CLUSTERING_COLUMN)
 -                    throw new 
InvalidRequestException(String.format("Multi-column relations can only be 
applied to clustering columns: %s", def.name));
 -
 -                if (seen.contains(def))
 -                    throw new InvalidRequestException(String.format("Column 
\"%s\" appeared twice in a relation: %s", def.name, relation));
 -                seen.add(def);
 -
 -                // check that no clustering columns were skipped
 -                if (def.position() != previousPosition + 1)
 -                {
 -                    if (previousPosition == -1)
 -                        throw new InvalidRequestException(String.format(
 -                                "Clustering columns may not be skipped in 
multi-column relations. " +
 -                                "They should appear in the PRIMARY KEY order. 
Got %s", relation));
 -
 -                    throw new InvalidRequestException(String.format(
 -                                "Clustering columns must appear in the 
PRIMARY KEY order in multi-column relations: %s",
 -                                 relation));
 -                }
 -                previousPosition++;
 -
 -                Restriction previous = existing;
 -                existing = getExistingRestriction(stmt, def);
 -                Operator operator = relation.operator();
 -                if (existing != null)
 -                {
 -                    if (operator == Operator.EQ || operator == Operator.IN)
 -                    {
 -                        throw new InvalidRequestException(String.format(
 -                                "Column \"%s\" cannot be restricted by more 
than one relation if it is in an %s relation",
 -                                def.name, operator));
 -                    }
 -                    else if (!existing.isSlice())
 -                    {
 -                        throw new InvalidRequestException(String.format(
 -                                "Column \"%s\" cannot be restricted by an 
equality relation and an inequality relation",
 -                                def.name));
 -                    }
 -                    else
 -                    {
 -                        boolean existingRestrictionStartBefore =
 -                            (i == 0 && def.position() != 0 && 
stmt.columnRestrictions[def.position() - 1] == existing);
 -
 -                        boolean existingRestrictionStartAfter = (i != 0 && 
previous != existing);
 -
 -                        if (existingRestrictionStartBefore || 
existingRestrictionStartAfter)
 -                        {
 -                            throw new InvalidRequestException(String.format(
 -                                    "Column \"%s\" cannot be restricted by 
two inequalities not starting with the same column: %s",
 -                                    def.name, relation));
 -                        }
 -
 -                        checkBound(existing, def, operator);
 -                    }
 -                }
 -                restrictedColumns.add(def);
 -            }
 -
 -            switch (relation.operator())
 -            {
 -                case EQ:
 -                {
 -                    Term t = relation.getValue().prepare(keyspace(), defs);
 -                    t.collectMarkerSpecification(boundNames);
 -                    Restriction restriction = new 
MultiColumnRestriction.EQ(t, false);
 -                    for (ColumnDefinition def : restrictedColumns)
 -                        stmt.columnRestrictions[def.position()] = restriction;
 -                    break;
 -                }
 -                case IN:
 -                {
 -                    Restriction restriction;
 -                    List<? extends Term.MultiColumnRaw> inValues = 
relation.getInValues();
 -                    if (inValues != null)
 -                    {
 -                        // we have something like "(a, b, c) IN ((1, 2, 3), 
(4, 5, 6), ...) or
 -                        // "(a, b, c) IN (?, ?, ?)
 -                        List<Term> terms = new ArrayList<>(inValues.size());
 -                        for (Term.MultiColumnRaw tuple : inValues)
 -                        {
 -                            Term t = tuple.prepare(keyspace(), defs);
 -                            t.collectMarkerSpecification(boundNames);
 -                            terms.add(t);
 -                        }
 -                         restriction = new 
MultiColumnRestriction.InWithValues(terms);
 -                    }
 -                    else
 -                    {
 -                        Tuples.INRaw rawMarker = relation.getInMarker();
 -                        AbstractMarker t = rawMarker.prepare(keyspace(), 
defs);
 -                        t.collectMarkerSpecification(boundNames);
 -                        restriction = new 
MultiColumnRestriction.InWithMarker(t);
 -                    }
 -                    for (ColumnDefinition def : restrictedColumns)
 -                        stmt.columnRestrictions[def.position()] = restriction;
 -
 -                    break;
 -                }
 -                case LT:
 -                case LTE:
 -                case GT:
 -                case GTE:
 -                {
 -                    Term t = relation.getValue().prepare(keyspace(), defs);
 -                    t.collectMarkerSpecification(boundNames);
 -
 -                    Restriction.Slice existingRestriction = 
(Restriction.Slice) getExistingRestriction(stmt, defs.get(0));
 -                    Restriction.Slice restriction;
 -                    if (existingRestriction == null)
 -                    {
 -                        restriction = new MultiColumnRestriction.Slice(false);
 -                    }
 -                    else if (!existingRestriction.isMultiColumn())
 -                    {
 -                        restriction = new MultiColumnRestriction.Slice(false);
 -                        restriction.setBound(existingRestriction);
 -                    }
 -                    else
 -                    {
 -                        restriction = existingRestriction;
 -                    }
 -                    restriction.setBound(relation.operator(), t);
 -
 -                    for (ColumnDefinition def : defs)
 -                    {
 -                        stmt.columnRestrictions[def.position()] = restriction;
 -                    }
 -                    break;
 -                }
 -                case NEQ:
 -                    throw new 
InvalidRequestException(String.format("Unsupported \"!=\" relation: %s", 
relation));
 -            }
 +            checkFalse(restrictions.usesSecondaryIndexing(), "ORDER BY with 
2ndary indexes is not supported.");
 +            checkFalse(restrictions.isKeyRange(), "ORDER BY is only supported 
when the partition key is restricted by an EQ or an IN.");
          }
  
 -        /**
 -         * Checks that the operator for the specified column is compatible 
with the bounds of the existing restriction.
 -         *
 -         * @param existing the existing restriction
 -         * @param def the column definition
 -         * @param operator the operator
 -         * @throws InvalidRequestException if the operator is not compatible 
with the bounds of the existing restriction
 -         */
 -        private static void checkBound(Restriction existing, ColumnDefinition 
def, Operator operator) throws InvalidRequestException
 +        private static void validateDistinctSelection(CFMetaData cfm,
 +                                                      Selection selection,
 +                                                      StatementRestrictions 
restrictions)
 +                                                      throws 
InvalidRequestException
          {
 -            Restriction.Slice existingSlice = (Restriction.Slice) existing;
 +            Collection<ColumnDefinition> requestedColumns = 
selection.getColumns();
 +            for (ColumnDefinition def : requestedColumns)
 +                checkFalse(!def.isPartitionKey() && !def.isStatic(),
 +                           "SELECT DISTINCT queries must only request 
partition key columns and/or static columns (not %s)",
 +                           def.name);
  
 -            if (existingSlice.hasBound(Bound.START) && (operator == 
Operator.GT || operator == Operator.GTE))
 -                throw new InvalidRequestException(String.format(
 -                            "More than one restriction was found for the 
start bound on %s", def.name));
 +            // If it's a key range, we require that all partition key columns 
are selected so we don't have to bother
 +            // with post-query grouping.
 +            if (!restrictions.isKeyRange())
 +                return;
  
 -            if (existingSlice.hasBound(Bound.END) && (operator == Operator.LT 
|| operator == Operator.LTE))
 -                throw new InvalidRequestException(String.format(
 -                            "More than one restriction was found for the end 
bound on %s", def.name));
 -        }
 -
 -        private static Restriction getExistingRestriction(SelectStatement 
stmt, ColumnDefinition def)
 -        {
 -            switch (def.kind)
 -            {
 -                case PARTITION_KEY:
 -                    return stmt.keyRestrictions[def.position()];
 -                case CLUSTERING_COLUMN:
 -                    return stmt.columnRestrictions[def.position()];
 -                case REGULAR:
 -                case STATIC:
 -                    return stmt.metadataRestrictions.get(def.name);
 -                default:
 -                    throw new AssertionError();
 -            }
 +            for (ColumnDefinition def : cfm.partitionKeyColumns())
 +                checkTrue(requestedColumns.contains(def),
 +                          "SELECT DISTINCT queries must request all the 
partition key columns (missing %s)", def.name);
          }
  
 -        private void updateRestrictionsForRelation(SelectStatement stmt, 
ColumnDefinition def, SingleColumnRelation relation, VariableSpecifications 
names) throws InvalidRequestException
 +        private void handleUnrecognizedOrderingColumn(ColumnIdentifier 
column) throws InvalidRequestException
          {
 -            switch (def.kind)
 -            {
 -                case PARTITION_KEY:
 -                {
 -                    Restriction existingRestriction = 
stmt.keyRestrictions[def.position()];
 -                    Restriction previousRestriction = def.position() == 0 ? 
null : stmt.keyRestrictions[def.position() - 1];
 -                    stmt.keyRestrictions[def.position()] = 
updateSingleColumnRestriction(def, existingRestriction, previousRestriction, 
relation, names);
 -                    break;
 -                }
 -                case CLUSTERING_COLUMN:
 -                {
 -                    Restriction existingRestriction = 
stmt.columnRestrictions[def.position()];
 -                    Restriction previousRestriction = def.position() == 0 ? 
null : stmt.columnRestrictions[def.position() - 1];
 -                    stmt.columnRestrictions[def.position()] = 
updateSingleColumnRestriction(def, existingRestriction, previousRestriction, 
relation, names);
 -                    break;
 -                }
 -                case COMPACT_VALUE:
 -                {
 -                    throw new 
InvalidRequestException(String.format("Predicates on the non-primary-key column 
(%s) of a COMPACT table are not yet supported", def.name));
 -                }
 -                case REGULAR:
 -                case STATIC:
 -                {
 -                    // We only all IN on the row key and last clustering key 
so far, never on non-PK columns, and this even if there's an index
 -                    Restriction r = updateSingleColumnRestriction(def, 
stmt.metadataRestrictions.get(def.name), null, relation, names);
 -
 -                    if (r.isIN() && 
!((Restriction.IN)r).canHaveOnlyOneValue())
 -                        // Note: for backward compatibility reason, we 
conside a IN of 1 value the same as a EQ, so we let that slide.
 -                        throw new InvalidRequestException(String.format("IN 
predicates on non-primary-key columns (%s) is not yet supported", def.name));
 -                    stmt.metadataRestrictions.put(def.name, r);
 -                    break;
 -                }
 -            }
 +            checkFalse(containsAlias(column), "Aliases are not allowed in 
order by clause ('%s')", column);
 +            checkFalse(true, "Order by on unknown column %s", column);
          }
  
 -        Restriction updateSingleColumnRestriction(ColumnDefinition def, 
Restriction existingRestriction, Restriction previousRestriction, 
SingleColumnRelation newRel, VariableSpecifications boundNames) throws 
InvalidRequestException
 +        private Comparator<List<ByteBuffer>> getOrderingComparator(CFMetaData 
cfm,
 +                                                                   Selection 
selection,
 +                                                                   
StatementRestrictions restrictions)
 +                                                                   throws 
InvalidRequestException
          {
 -            ColumnSpecification receiver = def;
 -            if (newRel.onToken)
 -            {
 -                if (def.kind != ColumnDefinition.Kind.PARTITION_KEY)
 -                    throw new InvalidRequestException(String.format("The 
token() function is only supported on the partition key, found on %s", 
def.name));
 -
 -                receiver = new ColumnSpecification(def.ksName,
 -                                                   def.cfName,
 -                                                   new 
ColumnIdentifier("partition key token", true),
 -                                                   
StorageService.getPartitioner().getTokenValidator());
 -            }
 -
 -            // We don't support relations against entire collections (unless 
they're frozen), like "numbers = {1, 2, 3}"
 -            if (receiver.type.isCollection() && receiver.type.isMultiCell() 
&& !(newRel.operator() == Operator.CONTAINS_KEY || newRel.operator() == 
Operator.CONTAINS))
 -            {
 -                throw new InvalidRequestException(String.format("Collection 
column '%s' (%s) cannot be restricted by a '%s' relation",
 -                                                                def.name, 
receiver.type.asCQL3Type(), newRel.operator()));
 -            }
 -
 -            switch (newRel.operator())
 -            {
 -                case EQ:
 -                {
 -                    if (existingRestriction != null)
 -                        throw new InvalidRequestException(String.format("%s 
cannot be restricted by more than one relation if it includes an Equal", 
def.name));
 -                    Term t = newRel.getValue().prepare(keyspace(), receiver);
 -                    t.collectMarkerSpecification(boundNames);
 -                    existingRestriction = new SingleColumnRestriction.EQ(t, 
newRel.onToken);
 -                }
 -                break;
 -                case IN:
 -                    if (existingRestriction != null)
 -                        throw new InvalidRequestException(String.format("%s 
cannot be restricted by more than one relation if it includes a IN", def.name));
 -
 -                    if (newRel.getInValues() == null)
 -                    {
 -                        // Means we have a "SELECT ... IN ?"
 -                        assert newRel.getValue() != null;
 -                        Term t = newRel.getValue().prepare(keyspace(), 
receiver);
 -                        t.collectMarkerSpecification(boundNames);
 -                        existingRestriction = new 
SingleColumnRestriction.InWithMarker((Lists.Marker)t);
 -                    }
 -                    else
 -                    {
 -                        List<Term> inValues = new 
ArrayList<>(newRel.getInValues().size());
 -                        for (Term.Raw raw : newRel.getInValues())
 -                        {
 -                            Term t = raw.prepare(keyspace(), receiver);
 -                            t.collectMarkerSpecification(boundNames);
 -                            inValues.add(t);
 -                        }
 -                        existingRestriction = new 
SingleColumnRestriction.InWithValues(inValues);
 -                    }
 -                    break;
 -                case NEQ:
 -                    throw new 
InvalidRequestException(String.format("Unsupported \"!=\" relation on column 
\"%s\"", def.name));
 -                case GT:
 -                case GTE:
 -                case LT:
 -                case LTE:
 -                    {
 -                        // A slice restriction can be merged with another one 
under some conditions:
 -                        // 1) both restrictions are on a token function or 
non of them are
 -                        // (e.g. token(partitionKey) > token(?) AND 
token(partitionKey) <= token(?) or clustering1 > 1 AND clustering1 <= 2).
 -                        // 2) both restrictions needs to start with the same 
column (e.g clustering1 > 0 AND (clustering1, clustering2) <= (2, 1)).
 -                        if (existingRestriction == null)
 -                            existingRestriction = new 
SingleColumnRestriction.Slice(newRel.onToken);
 -                        else if (!existingRestriction.isSlice())
 -                            throw new 
InvalidRequestException(String.format("Column \"%s\" cannot be restricted by 
both an equality and an inequality relation", def.name));
 -                        else if (existingRestriction.isOnToken() != 
newRel.onToken)
 -                            // For partition keys, we shouldn't have slice 
restrictions without token(). And while this is rejected later by
 -                            // processPartitionKeysRestrictions, we shouldn't 
update the existing restriction by the new one if the old one was using token()
 -                            // and the new one isn't since that would bypass 
that later test.
 -                            throw new InvalidRequestException("Only EQ and IN 
relation are supported on the partition key (unless you use the token() 
function)");
 -
 -                        checkBound(existingRestriction, def, 
newRel.operator());
 -
 -                        if (def.position() != 0 && previousRestriction == 
existingRestriction)
 -                            throw new 
InvalidRequestException(String.format("Column \"%s\" cannot be restricted by 
two inequalities not starting with the same column: %s",
 -                                                                            
def.name,
 -                                                                            
newRel));
 -
 -                        Term t = newRel.getValue().prepare(keyspace(), 
receiver);
 -                        t.collectMarkerSpecification(boundNames);
 -                        ((SingleColumnRestriction.Slice) 
existingRestriction).setBound(newRel.operator(), t);
 -                    }
 -                    break;
 -                case CONTAINS_KEY:
 -                    if (!(receiver.type instanceof MapType))
 -                        throw new 
InvalidRequestException(String.format("Cannot use CONTAINS KEY on non-map 
column %s", def.name));
 -                    // Fallthrough on purpose
 -                case CONTAINS:
 -                {
 -                    if (!receiver.type.isCollection())
 -                        throw new 
InvalidRequestException(String.format("Cannot use %s relation on non collection 
column %s", newRel.operator(), def.name));
 -
 -                    if (existingRestriction == null)
 -                        existingRestriction = new 
SingleColumnRestriction.Contains();
 -                    else if (!existingRestriction.isContains())
 -                        throw new 
InvalidRequestException(String.format("Collection column %s can only be 
restricted by CONTAINS or CONTAINS KEY", def.name));
 -
 -                    boolean isKey = newRel.operator() == 
Operator.CONTAINS_KEY;
 -                    receiver = makeCollectionReceiver(receiver, isKey);
 -                    Term t = newRel.getValue().prepare(keyspace(), receiver);
 -
 -                    t.collectMarkerSpecification(boundNames);
 -                    
((SingleColumnRestriction.Contains)existingRestriction).add(t, isKey);
 -                    break;
 -                }
 -            }
 -            return existingRestriction;
 -        }
 -
 -        private void processPartitionKeyRestrictions(SelectStatement stmt, 
boolean hasQueriableIndex, CFMetaData cfm) throws InvalidRequestException
 -        {
 -            // If there is a queriable index, no special condition are 
required on the other restrictions.
 -            // But we still need to know 2 things:
 -            //   - If we don't have a queriable index, is the query ok
 -            //   - Is it queriable without 2ndary index, which is always more 
efficient
 -            // If a component of the partition key is restricted by a 
relation, all preceding
 -            // components must have a EQ. Only the last partition key 
component can be in IN relation.
 -            boolean canRestrictFurtherComponents = true;
 -            ColumnDefinition previous = null;
 -            stmt.keyIsInRelation = false;
 -            Iterator<ColumnDefinition> iter = 
cfm.partitionKeyColumns().iterator();
 -            for (int i = 0; i < stmt.keyRestrictions.length; i++)
 -            {
 -                ColumnDefinition cdef = iter.next();
 -                Restriction restriction = stmt.keyRestrictions[i];
 +            if (!restrictions.keyIsInRelation())
 +                return null;
  
 -                if (restriction == null)
 -                {
 -                    if (stmt.onToken)
 -                        throw new InvalidRequestException("The token() 
function must be applied to all partition key components or none of them");
 -
 -                    // The only time not restricting a key part is allowed is 
if none are restricted or an index is used.
 -                    if (i > 0 && stmt.keyRestrictions[i - 1] != null)
 -                    {
 -                        if (hasQueriableIndex)
 -                        {
 -                            stmt.usesSecondaryIndexing = true;
 -                            stmt.isKeyRange = true;
 -                            break;
 -                        }
 -                        throw new 
InvalidRequestException(String.format("Partition key part %s must be restricted 
since preceding part is", cdef.name));
 -                    }
 -
 -                    stmt.isKeyRange = true;
 -                    canRestrictFurtherComponents = false;
 -                }
 -                else if (!canRestrictFurtherComponents)
 -                {
 -                    if (hasQueriableIndex)
 -                    {
 -                        stmt.usesSecondaryIndexing = true;
 -                        break;
 -                    }
 -                    throw new InvalidRequestException(String.format(
 -                            "Partitioning column \"%s\" cannot be restricted 
because the preceding column (\"%s\") is " +
 -                            "either not restricted or is restricted by a 
non-EQ relation", cdef.name, previous));
 -                }
 -                else if (restriction.isOnToken())
 -                {
 -                    // If this is a query on tokens, it's necessarily a range 
query (there can be more than one key per token).
 -                    stmt.isKeyRange = true;
 -                    stmt.onToken = true;
 -                }
 -                else if (stmt.onToken)
 -                {
 -                    throw new InvalidRequestException(String.format("The 
token() function must be applied to all partition key components or none of 
them"));
 -                }
 -                else if (!restriction.isSlice())
 -                {
 -                    if (restriction.isIN())
 -                    {
 -                        // We only support IN for the last name so far
 -                        if (i != stmt.keyRestrictions.length - 1)
 -                            throw new 
InvalidRequestException(String.format("Partition KEY part %s cannot be 
restricted by IN relation (only the last part of the partition key can)", 
cdef.name));
 -                        stmt.keyIsInRelation = true;
 -                    }
 -                }
 -                else
 -                {
 -                    // Non EQ relation is not supported without token(), even 
if we have a 2ndary index (since even those are ordered by partitioner).
 -                    // Note: In theory we could allow it for 2ndary index 
queries with ALLOW FILTERING, but that would probably require some special 
casing
 -                    // Note bis: This is also why we don't bother handling 
the 'tuple' notation of #4851 for keys. If we lift the limitation for 2ndary
 -                    // index with filtering, we'll need to handle it though.
 -                    throw new InvalidRequestException("Only EQ and IN 
relation are supported on the partition key (unless you use the token() 
function)");
 -                }
 -                previous = cdef;
 -            }
 +            Map<ColumnIdentifier, Integer> orderingIndexes = 
getOrderingIndex(cfm, selection);
  
 -            if (stmt.onToken)
 -                checkTokenFunctionArgumentsOrder(cfm);
 -        }
 +            List<Integer> idToSort = new ArrayList<Integer>();
 +            List<Comparator<ByteBuffer>> sorters = new 
ArrayList<Comparator<ByteBuffer>>();
  
 -        /**
 -         * Checks that the column identifiers used as argument for the token 
function have been specified in the
 -         * partition key order.
 -         * @param cfm the Column Family MetaData
 -         * @throws InvalidRequestException if the arguments have not been 
provided in the proper order.
 -         */
 -        private void checkTokenFunctionArgumentsOrder(CFMetaData cfm) throws 
InvalidRequestException
 -        {
 -            Iterator<ColumnDefinition> iter = 
Iterators.cycle(cfm.partitionKeyColumns());
 -            for (Relation relation : whereClause)
 +            for (ColumnIdentifier.Raw raw : parameters.orderings.keySet())
              {
 -                if (!relation.isOnToken())
 -                    continue;
 -
 -                assert !relation.isMultiColumn() : "Unexpectedly got 
multi-column token relation";
 -                SingleColumnRelation singleColumnRelation = 
(SingleColumnRelation) relation;
 -                if 
(!cfm.getColumnDefinition(singleColumnRelation.getEntity().prepare(cfm)).equals(iter.next()))
 -                    throw new InvalidRequestException(String.format("The 
token function arguments must be in the partition key order: %s",
 -                                                                    
Joiner.on(',').join(cfm.partitionKeyColumns())));
 +                ColumnIdentifier identifier = raw.prepare(cfm);
 +                ColumnDefinition orderingColumn = 
cfm.getColumnDefinition(identifier);
 +                idToSort.add(orderingIndexes.get(orderingColumn.name));
 +                sorters.add(orderingColumn.type);
              }
 +            return idToSort.size() == 1 ? new 
SingleColumnComparator(idToSort.get(0), sorters.get(0))
 +                    : new CompositeComparator(sorters, idToSort);
          }
  
 -        private void processColumnRestrictions(SelectStatement stmt, boolean 
hasQueriableIndex, CFMetaData cfm) throws InvalidRequestException
 +        private Map<ColumnIdentifier, Integer> getOrderingIndex(CFMetaData 
cfm, Selection selection)
 +                throws InvalidRequestException
          {
 -            // If a clustering key column is restricted by a non-EQ relation, 
all preceding
 -            // columns must have a EQ, and all following must have no 
restriction. Unless
 -            // the column is indexed that is.
 -            boolean canRestrictFurtherComponents = true;
 -            ColumnDefinition previous = null;
 -            Restriction previousRestriction = null;
 -            Iterator<ColumnDefinition> iter = 
cfm.clusteringColumns().iterator();
 -            for (int i = 0; i < stmt.columnRestrictions.length; i++)
 -            {
 -                ColumnDefinition cdef = iter.next();
 -                Restriction restriction = stmt.columnRestrictions[i];
 -
 -                if (restriction == null)
 -                {
 -                    canRestrictFurtherComponents = false;
 -                }
 -                else if (!canRestrictFurtherComponents)
 -                {
 -                    // We're here if the previous clustering column was 
either not restricted, was a slice or an IN tulpe-notation.
 -
 -                    // we can continue if we are in the special case of a 
slice 'tuple' notation from #4851
 -                    if (restriction != previousRestriction)
 -                    {
 -                        // if we have a 2ndary index, we need to use it
 -                        if (hasQueriableIndex)
 -                        {
 -                            stmt.usesSecondaryIndexing = true;
 -                            break;
 -                        }
 -
 -                        if (previousRestriction == null)
 -                            throw new InvalidRequestException(String.format(
 -                                "PRIMARY KEY column \"%s\" cannot be 
restricted (preceding column \"%s\" is not restricted)", cdef.name, 
previous.name));
 -
 -                        if (previousRestriction.isMultiColumn() && 
previousRestriction.isIN())
 -                            throw new InvalidRequestException(String.format(
 -                                     "PRIMARY KEY column \"%s\" cannot be 
restricted (preceding column \"%s\" is restricted by an IN tuple notation)", 
cdef.name, previous.name));
 -
 -                        throw new InvalidRequestException(String.format(
 -                                "PRIMARY KEY column \"%s\" cannot be 
restricted (preceding column \"%s\" is restricted by a non-EQ relation)", 
cdef.name, previous.name));
 -                    }
 -                }
 -                else if (restriction.isSlice())
 -                {
 -                    canRestrictFurtherComponents = false;
 -                    Restriction.Slice slice = (Restriction.Slice)restriction;
 -                    // For non-composite slices, we don't support internally 
the difference between exclusive and
 -                    // inclusive bounds, so we deal with it manually.
 -                    if (!cfm.comparator.isCompound() && 
(!slice.isInclusive(Bound.START) || !slice.isInclusive(Bound.END)))
 -                        stmt.sliceRestriction = slice;
 -                }
 -                else if (restriction.isIN())
 -                {
 -                    if (!restriction.isMultiColumn() && i != 
stmt.columnRestrictions.length - 1)
 -                        throw new 
InvalidRequestException(String.format("Clustering column \"%s\" cannot be 
restricted by an IN relation", cdef.name));
 -
 -                    if (stmt.selectACollection())
 -                        throw new 
InvalidRequestException(String.format("Cannot restrict column \"%s\" by IN 
relation as a collection is selected by the query", cdef.name));
 -
 -                    if (restriction.isMultiColumn())
 -                        canRestrictFurtherComponents = false;
 -                }
 -                else if (restriction.isContains())
 -                {
 -                    if (!hasQueriableIndex)
 -                        throw new 
InvalidRequestException(String.format("Cannot restrict column \"%s\" by a 
CONTAINS relation without a secondary index", cdef.name));
 -                    stmt.usesSecondaryIndexing = true;
 -                }
 -
 -                previous = cdef;
 -                previousRestriction = restriction;
 -            }
 -        }
 -
 -        private void validateSecondaryIndexSelections(SelectStatement stmt) 
throws InvalidRequestException
 -        {
 -            if (stmt.keyIsInRelation)
 -                throw new InvalidRequestException("Select on indexed columns 
and with IN clause for the PRIMARY KEY are not supported");
 -            // When the user only select static columns, the intent is that 
we don't query the whole partition but just
 -            // the static parts. But 1) we don't have an easy way to do that 
with 2i and 2) since we don't support index on static columns
 -            // so far, 2i means that you've restricted a non static column, 
so the query is somewhat non-sensical.
 -            if (stmt.selectsOnlyStaticColumns)
 -                throw new InvalidRequestException("Queries using 2ndary 
indexes don't support selecting only static columns");            
 -        }
 -
 -        private void verifyOrderingIsAllowed(SelectStatement stmt) throws 
InvalidRequestException
 -        {
 -            if (stmt.usesSecondaryIndexing)
 -                throw new InvalidRequestException("ORDER BY with 2ndary 
indexes is not supported.");
 -
 -            if (stmt.isKeyRange)
 -                throw new InvalidRequestException("ORDER BY is only supported 
when the partition key is restricted by an EQ or an IN.");
 -        }
 -
 -        private void handleUnrecognizedOrderingColumn(ColumnIdentifier 
column) throws InvalidRequestException
 -        {
 -            if (containsAlias(column))
 -                throw new InvalidRequestException(String.format("Aliases are 
not allowed in order by clause ('%s')", column));
 -            else
 -                throw new InvalidRequestException(String.format("Order by on 
unknown column %s", column));
 -        }
 -
 -        private void processOrderingClause(SelectStatement stmt, CFMetaData 
cfm) throws InvalidRequestException
 -        {
 -            verifyOrderingIsAllowed(stmt);
 -
 -            // If we order post-query (see orderResults), the sorted column 
needs to be in the ResultSet for sorting, even if we don't
 +            // If we order post-query (see orderResults), the sorted column 
needs to be in the ResultSet for sorting,
 +            // even if we don't
              // ultimately ship them to the client (CASSANDRA-4911).
 -            if (stmt.keyIsInRelation)
 +            Map<ColumnIdentifier, Integer> orderingIndexes = new HashMap<>();
 +            for (ColumnIdentifier.Raw raw : parameters.orderings.keySet())
              {
 -                stmt.orderingIndexes = new HashMap<>();
 -                for (ColumnIdentifier.Raw rawColumn : 
stmt.parameters.orderings.keySet())
 -                {
 -                    ColumnIdentifier column = rawColumn.prepare(cfm);
 -                    final ColumnDefinition def = 
cfm.getColumnDefinition(column);
 -                    if (def == null)
 -                        handleUnrecognizedOrderingColumn(column);
 -
 -                    int index = stmt.selection.getResultSetIndex(def);
 -                    if (index < 0)
 -                        index = stmt.selection.addColumnForOrdering(def);
 -                    stmt.orderingIndexes.put(def.name, index);
 -                }
 +                ColumnIdentifier column = raw.prepare(cfm);
 +                final ColumnDefinition def = cfm.getColumnDefinition(column);
 +                if (def == null)
 +                    handleUnrecognizedOrderingColumn(column);
-                 int index = selection.indexOf(def);
++                int index = selection.getResultSetIndex(def);
 +                if (index < 0)
 +                    index = selection.addColumnForOrdering(def);
 +                orderingIndexes.put(def.name, index);
              }
 -            stmt.isReversed = isReversed(stmt, cfm);
 +            return orderingIndexes;
          }
  
 -        private boolean isReversed(SelectStatement stmt, CFMetaData cfm) 
throws InvalidRequestException
 +        private boolean isReversed(CFMetaData cfm) throws 
InvalidRequestException
          {
              Boolean[] reversedMap = new 
Boolean[cfm.clusteringColumns().size()];
              int i = 0;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f497c13e/test/unit/org/apache/cassandra/cql3/validation/operations/SelectOrderByTest.java
----------------------------------------------------------------------

Reply via email to