Repository: calcite Updated Branches: refs/heads/branch-avatica-1.8 d58bf9150 -> e934f1576
[CALCITE-1235] Fully push down limit+offset in Cassandra Close apache/calcite#229 Project: http://git-wip-us.apache.org/repos/asf/calcite/repo Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/4c89dce3 Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/4c89dce3 Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/4c89dce3 Branch: refs/heads/branch-avatica-1.8 Commit: 4c89dce303f8760e10bde3ab2b387780ae059e31 Parents: d58bf91 Author: Michael Mior <[email protected]> Authored: Fri May 20 09:08:54 2016 +0300 Committer: Michael Mior <[email protected]> Committed: Fri May 20 09:19:43 2016 +0300 ---------------------------------------------------------------------- .../adapter/cassandra/CassandraLimit.java | 71 ++++++++++++++++++++ .../adapter/cassandra/CassandraMethod.java | 2 +- .../calcite/adapter/cassandra/CassandraRel.java | 7 +- .../adapter/cassandra/CassandraRules.java | 39 +++++++++-- .../adapter/cassandra/CassandraSort.java | 20 ++---- .../adapter/cassandra/CassandraTable.java | 20 ++++-- .../CassandraToEnumerableConverter.java | 11 +-- .../apache/calcite/test/CassandraAdapterIT.java | 32 +++++++-- 8 files changed, 161 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/calcite/blob/4c89dce3/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraLimit.java ---------------------------------------------------------------------- diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraLimit.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraLimit.java new file mode 100644 index 0000000..cca7e19 --- /dev/null +++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraLimit.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.cassandra; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelWriter; +import org.apache.calcite.rel.SingleRel; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; + +import java.util.List; + +/** + * Implementation of limits in Cassandra. + */ +public class CassandraLimit extends SingleRel implements CassandraRel { + public final RexNode offset; + public final RexNode fetch; + + public CassandraLimit(RelOptCluster cluster, RelTraitSet traitSet, + RelNode input, RexNode offset, RexNode fetch) { + super(cluster, traitSet, input); + this.offset = offset; + this.fetch = fetch; + assert getConvention() == input.getConvention(); + } + + @Override public RelOptCost computeSelfCost(RelOptPlanner planner, + RelMetadataQuery mq) { + // We do this so we get the limit for free + return planner.getCostFactory().makeZeroCost(); + } + + @Override public CassandraLimit copy(RelTraitSet traitSet, List<RelNode> newInputs) { + return new CassandraLimit(getCluster(), traitSet, sole(newInputs), offset, fetch); + } + + public void implement(Implementor implementor) { + implementor.visitChild(0, getInput()); + if (offset != null) { implementor.offset = RexLiteral.intValue(offset); } + if (fetch != null) { implementor.fetch = RexLiteral.intValue(fetch); } + } + + public RelWriter explainTerms(RelWriter pw) { + super.explainTerms(pw); + pw.itemIf("offset", offset, offset != null); + pw.itemIf("fetch", fetch, fetch != null); + return pw; + } +} + +// End CassandraLimit.java http://git-wip-us.apache.org/repos/asf/calcite/blob/4c89dce3/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraMethod.java ---------------------------------------------------------------------- diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraMethod.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraMethod.java index c7d0973..b2035e5 100644 --- a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraMethod.java +++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraMethod.java @@ -28,7 +28,7 @@ import java.util.List; */ public enum CassandraMethod { CASSANDRA_QUERYABLE_QUERY(CassandraTable.CassandraQueryable.class, "query", - List.class, List.class, List.class, List.class, String.class); + List.class, List.class, List.class, List.class, Integer.class, Integer.class); public final Method method; http://git-wip-us.apache.org/repos/asf/calcite/blob/4c89dce3/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraRel.java ---------------------------------------------------------------------- diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraRel.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraRel.java index 0191fd0..b74919d 100644 --- a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraRel.java +++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraRel.java @@ -39,7 +39,8 @@ public interface CassandraRel extends RelNode { class Implementor { final Map<String, String> selectFields = new LinkedHashMap<String, String>(); final List<String> whereClause = new ArrayList<String>(); - String limitValue = null; + int offset = 0; + int fetch = -1; final List<String> order = new ArrayList<String>(); RelOptTable table; @@ -63,10 +64,6 @@ public interface CassandraRel extends RelNode { order.addAll(newOrder); } - public void setLimit(String limit) { - limitValue = limit; - } - public void visitChild(int ordinal, RelNode input) { assert ordinal == 0; ((CassandraRel) input).implement(this); http://git-wip-us.apache.org/repos/asf/calcite/blob/4c89dce3/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraRules.java ---------------------------------------------------------------------- diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraRules.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraRules.java index 0e0bcb2..d67532d 100644 --- a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraRules.java +++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraRules.java @@ -16,6 +16,7 @@ */ package org.apache.calcite.adapter.cassandra; +import org.apache.calcite.adapter.enumerable.EnumerableLimit; import org.apache.calcite.adapter.java.JavaTypeFactory; import org.apache.calcite.plan.Convention; import org.apache.calcite.plan.RelOptRule; @@ -59,7 +60,8 @@ public class CassandraRules { public static final RelOptRule[] RULES = { CassandraFilterRule.INSTANCE, CassandraProjectRule.INSTANCE, - CassandraSortRule.INSTANCE + CassandraSortRule.INSTANCE, + CassandraLimitRule.INSTANCE }; static List<String> cassandraFieldNames(final RelDataType rowType) { @@ -277,8 +279,8 @@ public class CassandraRules { private static final Predicate<Sort> SORT_PREDICATE = new Predicate<Sort>() { public boolean apply(Sort input) { - // CQL has no support for offsets - return input.offset == null; + // Limits are handled by CassandraLimit + return input.offset == null && input.fetch == null; } }; private static final Predicate<CassandraFilter> FILTER_PREDICATE = @@ -304,7 +306,7 @@ public class CassandraRules { .replace(sort.getCollation()); return new CassandraSort(sort.getCluster(), traitSet, convert(sort.getInput(), traitSet.replace(RelCollations.EMPTY)), - sort.getCollation(), filter.getImplicitCollation(), sort.fetch); + sort.getCollation()); } public boolean matches(RelOptRuleCall call) { @@ -383,6 +385,35 @@ public class CassandraRules { } } } + + /** + * Rule to convert a {@link org.apache.calcite.adapter.enumerable.EnumerableLimit} to a + * {@link CassandraLimit}. + */ + private static class CassandraLimitRule extends RelOptRule { + private static final CassandraLimitRule INSTANCE = new CassandraLimitRule(); + + private CassandraLimitRule() { + super(operand(EnumerableLimit.class, operand(CassandraToEnumerableConverter.class, any())), + "CassandraLimitRule"); + } + + public RelNode convert(EnumerableLimit limit) { + final RelTraitSet traitSet = + limit.getTraitSet().replace(CassandraRel.CONVENTION); + return new CassandraLimit(limit.getCluster(), traitSet, + convert(limit.getInput(), CassandraRel.CONVENTION), limit.offset, limit.fetch); + } + + /** @see org.apache.calcite.rel.convert.ConverterRule */ + public void onMatch(RelOptRuleCall call) { + final EnumerableLimit limit = call.rel(0); + final RelNode converted = convert(limit); + if (converted != null) { + call.transformTo(converted); + } + } + } } // End CassandraRules.java http://git-wip-us.apache.org/repos/asf/calcite/blob/4c89dce3/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSort.java ---------------------------------------------------------------------- diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSort.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSort.java index 61d7b31..8487815 100644 --- a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSort.java +++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSort.java @@ -26,7 +26,6 @@ import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Sort; import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.calcite.rel.type.RelDataTypeField; -import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexNode; import java.util.ArrayList; @@ -37,13 +36,9 @@ import java.util.List; * relational expression in Cassandra. */ public class CassandraSort extends Sort implements CassandraRel { - private final RelCollation implicitCollation; - public CassandraSort(RelOptCluster cluster, RelTraitSet traitSet, - RelNode child, RelCollation collation, RelCollation implicitCollation, RexNode fetch) { - super(cluster, traitSet, child, collation, null, fetch); - - this.implicitCollation = implicitCollation; + RelNode child, RelCollation collation) { + super(cluster, traitSet, child, collation, null, null); assert getConvention() == CassandraRel.CONVENTION; assert getConvention() == child.getConvention(); @@ -54,18 +49,14 @@ public class CassandraSort extends Sort implements CassandraRel { RelOptCost cost = super.computeSelfCost(planner, mq); if (!collation.getFieldCollations().isEmpty()) { return cost.multiplyBy(0.05); - } else if (fetch == null) { - return cost; } else { - // We do this so we get the limit for free - return planner.getCostFactory().makeZeroCost(); + return cost; } } @Override public Sort copy(RelTraitSet traitSet, RelNode input, RelCollation newCollation, RexNode offset, RexNode fetch) { - return new CassandraSort(getCluster(), traitSet, input, collation, implicitCollation, - fetch); + return new CassandraSort(getCluster(), traitSet, input, collation); } public void implement(Implementor implementor) { @@ -88,9 +79,6 @@ public class CassandraSort extends Sort implements CassandraRel { implementor.addOrder(fieldOrder); } - if (fetch != null) { - implementor.setLimit(((RexLiteral) fetch).getValue().toString()); - } } } http://git-wip-us.apache.org/repos/asf/calcite/blob/4c89dce3/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraTable.java ---------------------------------------------------------------------- diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraTable.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraTable.java index 1ff5502..ec2a636 100644 --- a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraTable.java +++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraTable.java @@ -99,7 +99,7 @@ public class CassandraTable extends AbstractQueryableTable public Enumerable<Object> query(final Session session) { return query(session, Collections.<Map.Entry<String, Class>>emptyList(), Collections.<Map.Entry<String, String>>emptyList(), - Collections.<String>emptyList(), Collections.<String>emptyList(), null); + Collections.<String>emptyList(), Collections.<String>emptyList(), 0, -1); } /** Executes a CQL query on the underlying table. @@ -111,7 +111,7 @@ public class CassandraTable extends AbstractQueryableTable */ public Enumerable<Object> query(final Session session, List<Map.Entry<String, Class>> fields, final List<Map.Entry<String, String>> selectFields, List<String> predicates, - List<String> order, String limit) { + List<String> order, final Integer offset, final Integer fetch) { // Build the type of the resulting row based on the provided fields final RelDataTypeFactory typeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT); @@ -181,7 +181,10 @@ public class CassandraTable extends AbstractQueryableTable if (!order.isEmpty()) { queryBuilder.append(Util.toString(order, " ORDER BY ", ", ", "")); } - if (limit != null) { + + int limit = offset; + if (fetch >= 0) { limit += fetch; } + if (limit > 0) { queryBuilder.append(" LIMIT " + limit); } queryBuilder.append(" ALLOW FILTERING"); @@ -190,7 +193,12 @@ public class CassandraTable extends AbstractQueryableTable return new AbstractEnumerable<Object>() { public Enumerator<Object> enumerator() { final ResultSet results = session.execute(query); - return new CassandraEnumerator(results, resultRowType); + // Skip results until we get to the right offset + int skip = 0; + Enumerator<Object> enumerator = new CassandraEnumerator(results, resultRowType); + while (skip < offset && enumerator.moveNext()) { skip++; } + + return enumerator; } }; } @@ -238,9 +246,9 @@ public class CassandraTable extends AbstractQueryableTable @SuppressWarnings("UnusedDeclaration") public Enumerable<Object> query(List<Map.Entry<String, Class>> fields, List<Map.Entry<String, String>> selectFields, List<String> predicates, - List<String> order, String limit) { + List<String> order, Integer offset, Integer fetch) { return getTable().query(getSession(), fields, selectFields, predicates, - order, limit); + order, offset, fetch); } } } http://git-wip-us.apache.org/repos/asf/calcite/blob/4c89dce3/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraToEnumerableConverter.java ---------------------------------------------------------------------- diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraToEnumerableConverter.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraToEnumerableConverter.java index 31bfdcc..66db1ff 100644 --- a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraToEnumerableConverter.java +++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraToEnumerableConverter.java @@ -112,14 +112,17 @@ public class CassandraToEnumerableConverter final Expression order = list.append("order", constantArrayList(cassandraImplementor.order, String.class)); - final Expression limit = - list.append("limit", - Expressions.constant(cassandraImplementor.limitValue)); + final Expression offset = + list.append("offset", + Expressions.constant(cassandraImplementor.offset)); + final Expression fetch = + list.append("fetch", + Expressions.constant(cassandraImplementor.fetch)); Expression enumerable = list.append("enumerable", Expressions.call(table, CassandraMethod.CASSANDRA_QUERYABLE_QUERY.method, fields, - selectFields, predicates, order, limit)); + selectFields, predicates, order, offset, fetch)); if (CalcitePrepareImpl.DEBUG) { System.out.println("Cassandra: " + predicates); } http://git-wip-us.apache.org/repos/asf/calcite/blob/4c89dce3/cassandra/src/test/java/org/apache/calcite/test/CassandraAdapterIT.java ---------------------------------------------------------------------- diff --git a/cassandra/src/test/java/org/apache/calcite/test/CassandraAdapterIT.java b/cassandra/src/test/java/org/apache/calcite/test/CassandraAdapterIT.java index a2c132e..f1e87ff 100644 --- a/cassandra/src/test/java/org/apache/calcite/test/CassandraAdapterIT.java +++ b/cassandra/src/test/java/org/apache/calcite/test/CassandraAdapterIT.java @@ -103,11 +103,12 @@ public class CassandraAdapterIT { CalciteAssert.that() .enable(enabled()) .with(TWISSANDRA) - .query("select \"tweet_id\" from \"userline\" where \"username\" = '!PUBLIC!' limit 1") - .returns("tweet_id=f3c329de-d05b-11e5-b58b-90e2ba530b12\n") + .query("select \"tweet_id\" from \"userline\" where \"username\" = '!PUBLIC!' limit 2") + .returns("tweet_id=f3c329de-d05b-11e5-b58b-90e2ba530b12\n" + + "tweet_id=f3dbb03a-d05b-11e5-b58b-90e2ba530b12\n") .explainContains("PLAN=CassandraToEnumerableConverter\n" - + " CassandraProject(tweet_id=[$2])\n" - + " CassandraSort(fetch=[1])\n" + + " CassandraLimit(fetch=[2])\n" + + " CassandraProject(tweet_id=[$2])\n" + " CassandraFilter(condition=[=(CAST($0):VARCHAR(8) CHARACTER SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\", '!PUBLIC!')])\n"); } @@ -133,7 +134,28 @@ public class CassandraAdapterIT { .enable(enabled()) .with(TWISSANDRA) .query("select \"tweet_id\" from \"userline\" where \"username\" = '!PUBLIC!' limit 8") - .explainContains("CassandraSort(fetch=[8])\n"); + .explainContains("CassandraLimit(fetch=[8])\n"); + } + + @Test public void testSortLimit() { + CalciteAssert.that() + .enable(enabled()) + .with(TWISSANDRA) + .query("select * from \"userline\" where \"username\"='!PUBLIC!' " + + "order by \"time\" desc limit 10") + .explainContains(" CassandraLimit(fetch=[10])\n" + + " CassandraSort(sort0=[$1], dir0=[DESC])"); + } + + @Test public void testSortOffset() { + CalciteAssert.that() + .enable(enabled()) + .with(TWISSANDRA) + .query("select \"tweet_id\" from \"userline\" where " + + "\"username\"='!PUBLIC!' limit 2 offset 1") + .explainContains("CassandraLimit(offset=[1], fetch=[2])") + .returns("tweet_id=f3dbb03a-d05b-11e5-b58b-90e2ba530b12\n" + + "tweet_id=f3e4182e-d05b-11e5-b58b-90e2ba530b12\n"); } @Test public void testMaterializedView() {
