Merge branch 'cassandra-2.1' into trunk Conflicts: src/java/org/apache/cassandra/cql3/statements/Selection.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/45084f18 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/45084f18 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/45084f18 Branch: refs/heads/trunk Commit: 45084f182a46234243b94059fd1b6b53e927ead8 Parents: 708c6ba 1d285ea Author: Sylvain Lebresne <sylv...@datastax.com> Authored: Wed Oct 29 10:49:20 2014 +0100 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Wed Oct 29 10:49:20 2014 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + doc/native_protocol_v3.spec | 8 +++-- .../org/apache/cassandra/cql3/QueryOptions.java | 10 +++--- .../apache/cassandra/cql3/UpdateParameters.java | 6 ++++ .../cassandra/cql3/selection/Selection.java | 2 +- .../cql3/selection/WritetimeOrTTLSelector.java | 4 +-- .../apache/cassandra/cql3/TimestampTest.java | 36 ++++++++++++++++++++ 7 files changed, 56 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/45084f18/CHANGES.txt ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/45084f18/src/java/org/apache/cassandra/cql3/selection/Selection.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/cql3/selection/Selection.java index 67cce72,0000000..cd5e2a8 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/cql3/selection/Selection.java +++ b/src/java/org/apache/cassandra/cql3/selection/Selection.java @@@ -1,390 -1,0 +1,390 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.cql3.selection; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.cql3.ColumnIdentifier; +import org.apache.cassandra.cql3.ColumnSpecification; +import org.apache.cassandra.cql3.ResultSet; +import org.apache.cassandra.db.Cell; +import org.apache.cassandra.db.CounterCell; +import org.apache.cassandra.db.ExpiringCell; +import org.apache.cassandra.db.context.CounterContext; +import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.utils.ByteBufferUtil; + +import com.google.common.collect.Iterators; + +public abstract class Selection +{ + private final Collection<ColumnDefinition> columns; + private final ResultSet.Metadata metadata; + private final boolean collectTimestamps; + private final boolean collectTTLs; + + protected Selection(Collection<ColumnDefinition> columns, List<ColumnSpecification> metadata, boolean collectTimestamps, boolean collectTTLs) + { + this.columns = columns; + this.metadata = new ResultSet.Metadata(metadata); + this.collectTimestamps = collectTimestamps; + this.collectTTLs = collectTTLs; + } + + // Overriden by SimpleSelection when appropriate. + public boolean isWildcard() + { + return false; + } + + public ResultSet.Metadata getResultMetadata() + { + return metadata; + } + + public static Selection wildcard(CFMetaData cfm) + { + List<ColumnDefinition> all = new ArrayList<ColumnDefinition>(cfm.allColumns().size()); + Iterators.addAll(all, cfm.allColumnsInSelectOrder()); + return new SimpleSelection(all, true); + } + + public static Selection forColumns(Collection<ColumnDefinition> columns) + { + return new SimpleSelection(columns, false); + } + + public int addColumnForOrdering(ColumnDefinition c) + { + columns.add(c); + metadata.addNonSerializedColumn(c); + return columns.size() - 1; + } + + private static boolean isUsingFunction(List<RawSelector> rawSelectors) + { + for (RawSelector rawSelector : rawSelectors) + { + if (!(rawSelector.selectable instanceof ColumnIdentifier)) + return true; + } + return false; + } + + public static Selection fromSelectors(CFMetaData cfm, List<RawSelector> rawSelectors) throws InvalidRequestException + { + List<ColumnDefinition> defs = new ArrayList<ColumnDefinition>(); + + SelectorFactories factories = + SelectorFactories.createFactoriesAndCollectColumnDefinitions(RawSelector.toSelectables(rawSelectors), cfm, defs); + List<ColumnSpecification> metadata = collectMetadata(cfm, rawSelectors, factories); + + return isUsingFunction(rawSelectors) ? new SelectionWithFunctions(defs, metadata, factories) + : new SimpleSelection(defs, metadata, false); + } + + private static List<ColumnSpecification> collectMetadata(CFMetaData cfm, + List<RawSelector> rawSelectors, + SelectorFactories factories) + { + List<ColumnSpecification> metadata = new ArrayList<ColumnSpecification>(rawSelectors.size()); + Iterator<RawSelector> iter = rawSelectors.iterator(); + for (Selector.Factory factory : factories) + { + ColumnSpecification colSpec = factory.getColumnSpecification(cfm); + ColumnIdentifier alias = iter.next().alias; + metadata.add(alias == null ? colSpec : colSpec.withAlias(alias)); + } + return metadata; + } + + protected abstract Selectors newSelectors(); + + /** + * @return the list of CQL3 columns value this SelectionClause needs. + */ + public Collection<ColumnDefinition> getColumns() + { + return columns; + } + + public ResultSetBuilder resultSetBuilder(long now) + { + return new ResultSetBuilder(now); + } + + public abstract boolean isAggregate(); + + /** + * Checks that selectors are either all aggregates or that none of them is. + * + * @param selectors the selectors to test. + * @param msgTemplate the error message template + * @param messageArgs the error message arguments + * @throws InvalidRequestException if some of the selectors are aggregate but not all of them + */ + static void validateSelectors(List<Selector> selectors, String messageTemplate, Object... messageArgs) + throws InvalidRequestException + { + int aggregates = 0; + for (Selector s : selectors) + if (s.isAggregate()) + ++aggregates; + + if (aggregates != 0 && aggregates != selectors.size()) + throw new InvalidRequestException(String.format(messageTemplate, messageArgs)); + } + + public class ResultSetBuilder + { + private final ResultSet resultSet; + + /** + * As multiple thread can access a <code>Selection</code> instance each <code>ResultSetBuilder</code> will use + * its own <code>Selectors</code> instance. + */ + private final Selectors selectors; + + /* + * We'll build CQL3 row one by one. + * The currentRow is the values for the (CQL3) columns we've fetched. + * We also collect timestamps and ttls for the case where the writetime and + * ttl functions are used. Note that we might collect timestamp and/or ttls + * we don't care about, but since the array below are allocated just once, + * it doesn't matter performance wise. + */ + List<ByteBuffer> current; + final long[] timestamps; + final int[] ttls; + final long now; + + private ResultSetBuilder(long now) + { + this.resultSet = new ResultSet(getResultMetadata().copy(), new ArrayList<List<ByteBuffer>>()); + this.selectors = newSelectors(); + this.timestamps = collectTimestamps ? new long[columns.size()] : null; + this.ttls = collectTTLs ? new int[columns.size()] : null; + this.now = now; + } + + public void add(ByteBuffer v) + { + current.add(v); + } + + public void add(Cell c) + { + current.add(isDead(c) ? null : value(c)); + if (timestamps != null) + { - timestamps[current.size() - 1] = isDead(c) ? -1 : c.timestamp(); ++ timestamps[current.size() - 1] = isDead(c) ? Long.MIN_VALUE : c.timestamp(); + } + if (ttls != null) + { + int ttl = -1; + if (!isDead(c) && c instanceof ExpiringCell) + ttl = c.getLocalDeletionTime() - (int) (now / 1000); + ttls[current.size() - 1] = ttl; + } + } + + private boolean isDead(Cell c) + { + return c == null || !c.isLive(now); + } + + public void newRow() throws InvalidRequestException + { + if (current != null) + { + selectors.addInputRow(this); + if (!selectors.isAggregate()) + { + resultSet.addRow(selectors.getOutputRow()); + selectors.reset(); + } + } + current = new ArrayList<ByteBuffer>(columns.size()); + } + + public ResultSet build() throws InvalidRequestException + { + if (current != null) + { + selectors.addInputRow(this); + resultSet.addRow(selectors.getOutputRow()); + selectors.reset(); + current = null; + } + return resultSet; + } + + private ByteBuffer value(Cell c) + { + return (c instanceof CounterCell) + ? ByteBufferUtil.bytes(CounterContext.instance().total(c.value())) + : c.value(); + } + } + + private static interface Selectors + { + public boolean isAggregate(); + + /** + * Adds the current row of the specified <code>ResultSetBuilder</code>. + * + * @param rs the <code>ResultSetBuilder</code> + * @throws InvalidRequestException + */ + public void addInputRow(ResultSetBuilder rs) throws InvalidRequestException; + + public List<ByteBuffer> getOutputRow() throws InvalidRequestException; + + public void reset(); + } + + // Special cased selection for when no function is used (this save some allocations). + private static class SimpleSelection extends Selection + { + private final boolean isWildcard; + + public SimpleSelection(Collection<ColumnDefinition> columns, boolean isWildcard) + { + this(columns, new ArrayList<ColumnSpecification>(columns), isWildcard); + } + + public SimpleSelection(Collection<ColumnDefinition> columns, List<ColumnSpecification> metadata, boolean isWildcard) + { + /* + * In theory, even a simple selection could have multiple time the same column, so we + * could filter those duplicate out of columns. But since we're very unlikely to + * get much duplicate in practice, it's more efficient not to bother. + */ + super(columns, metadata, false, false); + this.isWildcard = isWildcard; + } + + @Override + public boolean isWildcard() + { + return isWildcard; + } + + public boolean isAggregate() + { + return false; + } + + protected Selectors newSelectors() + { + return new Selectors() + { + private List<ByteBuffer> current; + + public void reset() + { + current = null; + } + + public List<ByteBuffer> getOutputRow() + { + return current; + } + + public void addInputRow(ResultSetBuilder rs) throws InvalidRequestException + { + current = rs.current; + } + + public boolean isAggregate() + { + return false; + } + }; + } + } + + private static class SelectionWithFunctions extends Selection + { + private final SelectorFactories factories; + + public SelectionWithFunctions(Collection<ColumnDefinition> columns, + List<ColumnSpecification> metadata, + SelectorFactories factories) throws InvalidRequestException + { + super(columns, metadata, factories.containsWritetimeSelectorFactory(), factories.containsTTLSelectorFactory()); + this.factories = factories; + + if (factories.doesAggregation() && !factories.containsOnlyAggregateFunctions()) + throw new InvalidRequestException("the select clause must either contains only aggregates or none"); + } + + public boolean isAggregate() + { + return factories.containsOnlyAggregateFunctions(); + } + + protected Selectors newSelectors() + { + return new Selectors() + { + private final List<Selector> selectors = factories.newInstances(); + + public void reset() + { + for (int i = 0, m = selectors.size(); i < m; i++) + { + selectors.get(i).reset(); + } + } + + public boolean isAggregate() + { + return factories.containsOnlyAggregateFunctions(); + } + + public List<ByteBuffer> getOutputRow() throws InvalidRequestException + { + List<ByteBuffer> outputRow = new ArrayList<>(selectors.size()); + + for (int i = 0, m = selectors.size(); i < m; i++) + { + outputRow.add(selectors.get(i).getOutput()); + } + return outputRow; + } + + public void addInputRow(ResultSetBuilder rs) throws InvalidRequestException + { + for (int i = 0, m = selectors.size(); i < m; i++) + { + selectors.get(i).addInput(rs); + } + } + }; + } + + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/45084f18/src/java/org/apache/cassandra/cql3/selection/WritetimeOrTTLSelector.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/cql3/selection/WritetimeOrTTLSelector.java index 6d6edd3,0000000..a57a3ca mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/cql3/selection/WritetimeOrTTLSelector.java +++ b/src/java/org/apache/cassandra/cql3/selection/WritetimeOrTTLSelector.java @@@ -1,110 -1,0 +1,110 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.cql3.selection; + +import java.nio.ByteBuffer; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.cql3.ColumnIdentifier; +import org.apache.cassandra.cql3.ColumnSpecification; +import org.apache.cassandra.cql3.selection.Selection.ResultSetBuilder; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.Int32Type; +import org.apache.cassandra.db.marshal.LongType; +import org.apache.cassandra.utils.ByteBufferUtil; + +final class WritetimeOrTTLSelector extends Selector +{ + private final String columnName; + private final int idx; + private final boolean isWritetime; + private ByteBuffer current; + + public static Factory newFactory(final String columnName, final int idx, final boolean isWritetime) + { + return new Factory() + { + public ColumnSpecification getColumnSpecification(CFMetaData cfm) + { + String text = String.format("%s(%s)", isWritetime ? "writetime" : "ttl", columnName); + return new ColumnSpecification(cfm.ksName, + cfm.cfName, + new ColumnIdentifier(text, true), + isWritetime ? LongType.instance : Int32Type.instance); + } + + public Selector newInstance() + { + return new WritetimeOrTTLSelector(columnName, idx, isWritetime); + } + + public boolean isWritetimeSelectorFactory() + { + return isWritetime; + } + + public boolean isTTLSelectorFactory() + { + return !isWritetime; + } + }; + } + + public void addInput(ResultSetBuilder rs) + { + if (isWritetime) + { + long ts = rs.timestamps[idx]; - current = ts >= 0 ? ByteBufferUtil.bytes(ts) : null; ++ current = ts != Long.MIN_VALUE ? ByteBufferUtil.bytes(ts) : null; + } + else + { + int ttl = rs.ttls[idx]; + current = ttl > 0 ? ByteBufferUtil.bytes(ttl) : null; + } + } + + public ByteBuffer getOutput() + { + return current; + } + + public void reset() + { + current = null; + } + + public AbstractType<?> getType() + { + return isWritetime ? LongType.instance : Int32Type.instance; + } + + @Override + public String toString() + { + return columnName; + } + + private WritetimeOrTTLSelector(String columnName, int idx, boolean isWritetime) + { + this.columnName = columnName; + this.idx = idx; + this.isWritetime = isWritetime; + } + - } ++}