Repository: beam Updated Branches: refs/heads/DSL_SQL 36a436ca0 -> f96f9f680
[BEAM-2255] Implement ORDER BY Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/35abd097 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/35abd097 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/35abd097 Branch: refs/heads/DSL_SQL Commit: 35abd097ea327eea0f6bcd13068ce62f7d2bcc31 Parents: 36a436c Author: James Xu <xumingmi...@gmail.com> Authored: Fri May 12 01:07:18 2017 +0800 Committer: Jean-Baptiste Onofré <jbono...@apache.org> Committed: Wed May 17 04:03:49 2017 +0200 ---------------------------------------------------------------------- .../beam/dsls/sql/planner/BeamRuleSets.java | 3 +- .../apache/beam/dsls/sql/rel/BeamSortRel.java | 242 +++++++++++++++++++ .../apache/beam/dsls/sql/rule/BeamSortRule.java | 52 ++++ .../apache/beam/dsls/sql/schema/BeamSQLRow.java | 7 + .../dsls/sql/planner/MockedBeamSQLTable.java | 68 +++++- .../beam/dsls/sql/rel/BeamSortRelTest.java | 231 ++++++++++++++++++ 6 files changed, 601 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/35abd097/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java index acbd43f..2cac5ae 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java @@ -27,6 +27,7 @@ import org.apache.beam.dsls.sql.rule.BeamFilterRule; import org.apache.beam.dsls.sql.rule.BeamIOSinkRule; import org.apache.beam.dsls.sql.rule.BeamIOSourceRule; import org.apache.beam.dsls.sql.rule.BeamProjectRule; +import org.apache.beam.dsls.sql.rule.BeamSortRule; import org.apache.calcite.plan.RelOptRule; import org.apache.calcite.rel.RelNode; import org.apache.calcite.tools.RuleSet; @@ -40,7 +41,7 @@ public class BeamRuleSets { private static final ImmutableSet<RelOptRule> calciteToBeamConversionRules = ImmutableSet .<RelOptRule>builder().add(BeamIOSourceRule.INSTANCE, BeamProjectRule.INSTANCE, BeamFilterRule.INSTANCE, BeamIOSinkRule.INSTANCE, - BeamAggregationRule.INSTANCE) + BeamAggregationRule.INSTANCE, BeamSortRule.INSTANCE) .build(); public static RuleSet[] getRuleSets() { http://git-wip-us.apache.org/repos/asf/beam/blob/35abd097/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java new file mode 100644 index 0000000..3df2f34 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.rel; + +import java.io.Serializable; +import java.lang.reflect.Type; +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; + +import org.apache.beam.dsls.sql.exception.BeamSqlUnsupportedException; +import org.apache.beam.dsls.sql.planner.BeamPipelineCreator; +import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils; +import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.dsls.sql.schema.UnsupportedDataTypeException; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Top; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.values.PCollection; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelCollationImpl; +import org.apache.calcite.rel.RelFieldCollation; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Sort; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * {@code BeamRelNode} to replace a {@code Sort} node. + * + * <p>Since Beam does not fully supported global sort we are using {@link Top} to implement + * the {@code Sort} algebra. The following types of ORDER BY are supported: + + * <pre>{@code + * select * from t order by id desc limit 10; + * select * from t order by id desc limit 10, 5; + * }</pre> + * + * <p>but Order BY without a limit is NOT supported: + * + * <pre>{@code + * select * from t order by id desc + * }</pre> + * + * <h3>Constraints</h3> + * <ul> + * <li>Due to the constraints of {@link Top}, the result of a `ORDER BY LIMIT` + * must fit into the memory of a single machine.</li> + * <li>Since `WINDOW`(HOP, TUMBLE, SESSION etc) is always associated with `GroupBy`, + * it does not make much sense to use `ORDER BY` with `WINDOW`. + * </li> + * </ul> + */ +public class BeamSortRel extends Sort implements BeamRelNode { + private List<Integer> fieldIndices = new ArrayList<>(); + private List<Boolean> orientation = new ArrayList<>(); + private List<Boolean> nullsFirst = new ArrayList<>(); + + private int startIndex = 0; + private int count; + + public BeamSortRel( + RelOptCluster cluster, + RelTraitSet traits, + RelNode child, + RelCollation collation, + RexNode offset, + RexNode fetch) { + super(cluster, traits, child, collation, offset, fetch); + + List<RexNode> fieldExps = getChildExps(); + RelCollationImpl collationImpl = (RelCollationImpl) collation; + List<RelFieldCollation> collations = collationImpl.getFieldCollations(); + for (int i = 0; i < fieldExps.size(); i++) { + RexNode fieldExp = fieldExps.get(i); + RexInputRef inputRef = (RexInputRef) fieldExp; + fieldIndices.add(inputRef.getIndex()); + orientation.add(collations.get(i).getDirection() == RelFieldCollation.Direction.ASCENDING); + + RelFieldCollation.NullDirection rawNullDirection = collations.get(i).nullDirection; + if (rawNullDirection == RelFieldCollation.NullDirection.UNSPECIFIED) { + rawNullDirection = collations.get(i).getDirection().defaultNullDirection(); + } + nullsFirst.add(rawNullDirection == RelFieldCollation.NullDirection.FIRST); + } + + if (fetch == null) { + throw new BeamSqlUnsupportedException("ORDER BY without a LIMIT is not supported!"); + } + + RexLiteral fetchLiteral = (RexLiteral) fetch; + count = ((BigDecimal) fetchLiteral.getValue()).intValue(); + + if (offset != null) { + RexLiteral offsetLiteral = (RexLiteral) offset; + startIndex = ((BigDecimal) offsetLiteral.getValue()).intValue(); + } + } + + @Override public PCollection<BeamSQLRow> buildBeamPipeline( + BeamPipelineCreator planCreator) throws Exception { + RelNode input = getInput(); + PCollection<BeamSQLRow> upstream = BeamSQLRelUtils.getBeamRelInput(input) + .buildBeamPipeline(planCreator); + Type windowType = upstream.getWindowingStrategy().getWindowFn() + .getWindowTypeDescriptor().getType(); + if (!windowType.equals(GlobalWindow.class)) { + throw new BeamSqlUnsupportedException( + "`ORDER BY` is only supported for GlobalWindow, actual window: " + windowType); + } + + BeamSQLRowComparator comparator = new BeamSQLRowComparator(fieldIndices, orientation, + nullsFirst); + // first find the top (offset + count) + PCollection<List<BeamSQLRow>> rawStream = + upstream.apply("extractTopOffsetAndFetch", + Top.of(startIndex + count, comparator).withoutDefaults()); + + // strip the `leading offset` + if (startIndex > 0) { + rawStream = rawStream.apply("stripLeadingOffset", ParDo.of( + new SubListFn<BeamSQLRow>(startIndex, startIndex + count))); + } + + PCollection<BeamSQLRow> orderedStream = rawStream.apply( + "flatten", Flatten.<BeamSQLRow>iterables()); + return orderedStream; + } + + private static class SubListFn<T> extends DoFn<List<T>, List<T>> { + private int startIndex; + private int endIndex; + + public SubListFn(int startIndex, int endIndex) { + this.startIndex = startIndex; + this.endIndex = endIndex; + } + + @ProcessElement + public void processElement(ProcessContext ctx) { + ctx.output(ctx.element().subList(startIndex, endIndex)); + } + } + + @Override public Sort copy(RelTraitSet traitSet, RelNode newInput, RelCollation newCollation, + RexNode offset, RexNode fetch) { + return new BeamSortRel(getCluster(), traitSet, newInput, newCollation, offset, fetch); + } + + private static class BeamSQLRowComparator implements Comparator<BeamSQLRow>, Serializable { + private List<Integer> fieldsIndices; + private List<Boolean> orientation; + private List<Boolean> nullsFirst; + + public BeamSQLRowComparator(List<Integer> fieldsIndices, + List<Boolean> orientation, + List<Boolean> nullsFirst) { + this.fieldsIndices = fieldsIndices; + this.orientation = orientation; + this.nullsFirst = nullsFirst; + } + + @Override public int compare(BeamSQLRow row1, BeamSQLRow row2) { + for (int i = 0; i < fieldsIndices.size(); i++) { + int fieldIndex = fieldsIndices.get(i); + int fieldRet = 0; + SqlTypeName fieldType = row1.getDataType().getFieldsType().get(fieldIndex); + // whether NULL should be ordered first or last(compared to non-null values) depends on + // what user specified in SQL(NULLS FIRST/NULLS LAST) + if (row1.isNull(fieldIndex) && row2.isNull(fieldIndex)) { + continue; + } else if (row1.isNull(fieldIndex) && !row2.isNull(fieldIndex)) { + fieldRet = -1 * (nullsFirst.get(i) ? -1 : 1); + } else if (!row1.isNull(fieldIndex) && row2.isNull(fieldIndex)) { + fieldRet = 1 * (nullsFirst.get(i) ? -1 : 1); + } else { + switch (fieldType) { + case TINYINT: + fieldRet = numberCompare(row1.getByte(fieldIndex), row2.getByte(fieldIndex)); + break; + case SMALLINT: + fieldRet = numberCompare(row1.getShort(fieldIndex), row2.getShort(fieldIndex)); + break; + case INTEGER: + fieldRet = numberCompare(row1.getInteger(fieldIndex), row2.getInteger(fieldIndex)); + break; + case BIGINT: + fieldRet = numberCompare(row1.getLong(fieldIndex), row2.getLong(fieldIndex)); + break; + case FLOAT: + fieldRet = numberCompare(row1.getFloat(fieldIndex), row2.getFloat(fieldIndex)); + break; + case DOUBLE: + fieldRet = numberCompare(row1.getDouble(fieldIndex), row2.getDouble(fieldIndex)); + break; + case VARCHAR: + fieldRet = row1.getString(fieldIndex).compareTo(row2.getString(fieldIndex)); + break; + case DATE: + fieldRet = row1.getDate(fieldIndex).compareTo(row2.getDate(fieldIndex)); + break; + default: + throw new UnsupportedDataTypeException(fieldType); + } + } + + fieldRet *= (orientation.get(i) ? -1 : 1); + if (fieldRet != 0) { + return fieldRet; + } + } + return 0; + } + } + + public static <T extends Number & Comparable> int numberCompare(T a, T b) { + return a.compareTo(b); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/35abd097/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamSortRule.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamSortRule.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamSortRule.java new file mode 100644 index 0000000..d802e9d --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamSortRule.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.rule; + +import org.apache.beam.dsls.sql.rel.BeamLogicalConvention; + +import org.apache.beam.dsls.sql.rel.BeamSortRel; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Sort; +import org.apache.calcite.rel.logical.LogicalSort; + +/** + * {@code ConverterRule} to replace {@code Sort} with {@code BeamSortRel}. + */ +public class BeamSortRule extends ConverterRule { + public static final BeamSortRule INSTANCE = new BeamSortRule(); + private BeamSortRule() { + super(LogicalSort.class, Convention.NONE, + BeamLogicalConvention.INSTANCE, "BeamSortRule"); + } + + @Override public RelNode convert(RelNode rel) { + Sort sort = (Sort) rel; + final RelNode input = sort.getInput(); + return new BeamSortRel( + sort.getCluster(), + sort.getTraitSet().replace(BeamLogicalConvention.INSTANCE), + convert(input, input.getTraitSet().replace(BeamLogicalConvention.INSTANCE)), + sort.getCollation(), + sort.offset, + sort.fetch + ); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/35abd097/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java index 5bdd5d2..7b6428e 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java @@ -285,6 +285,13 @@ public class BeamSQLRow implements Serializable { return nullFields; } + /** + * is the specified field NULL? + */ + public boolean isNull(int idx) { + return nullFields.contains(idx); + } + public Instant getWindowStart() { return windowStart; } http://git-wip-us.apache.org/repos/asf/beam/blob/35abd097/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java index 611bd73..8ccb332 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java @@ -19,8 +19,10 @@ package org.apache.beam.dsls.sql.planner; import java.util.ArrayList; import java.util.List; + import org.apache.beam.dsls.sql.schema.BaseBeamTable; import org.apache.beam.dsls.sql.schema.BeamIOType; +import org.apache.beam.dsls.sql.schema.BeamSQLRecordType; import org.apache.beam.dsls.sql.schema.BeamSQLRow; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; @@ -29,7 +31,10 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.sql.type.SqlTypeName; /** * A mock table use to check input/output. @@ -50,6 +55,64 @@ public class MockedBeamSQLTable extends BaseBeamTable { return this; } + /** + * Convenient way to build a mocked table with mock data: + * + * <p>e.g. + * + * <pre>{@code + * MockedBeamSQLTable + * .of(SqlTypeName.BIGINT, "order_id", + * SqlTypeName.INTEGER, "site_id", + * SqlTypeName.DOUBLE, "price", + * SqlTypeName.TIMESTAMP, "order_time", + * + * 1L, 2, 1.0, new Date(), + * 1L, 1, 2.0, new Date(), + * 2L, 4, 3.0, new Date(), + * 2L, 1, 4.0, new Date(), + * 5L, 5, 5.0, new Date(), + * 6L, 6, 6.0, new Date(), + * 7L, 7, 7.0, new Date(), + * 8L, 8888, 8.0, new Date(), + * 8L, 999, 9.0, new Date(), + * 10L, 100, 10.0, new Date()) + * }</pre> + */ + public static MockedBeamSQLTable of(final Object... args){ + final RelProtoDataType protoRowType = new RelProtoDataType() { + @Override + public RelDataType apply(RelDataTypeFactory a0) { + RelDataTypeFactory.FieldInfoBuilder builder = a0.builder(); + + int lastTypeIndex = 0; + for (; lastTypeIndex < args.length; lastTypeIndex += 2) { + if (args[lastTypeIndex] instanceof SqlTypeName) { + builder.add(args[lastTypeIndex + 1].toString(), + (SqlTypeName) args[lastTypeIndex]); + } else { + break; + } + } + return builder.build(); + } + }; + + List<BeamSQLRow> rows = new ArrayList<>(); + BeamSQLRecordType beamSQLRecordType = BeamSQLRecordType.from( + protoRowType.apply(BeamQueryPlanner.TYPE_FACTORY)); + int fieldCount = beamSQLRecordType.size(); + + for (int i = fieldCount * 2; i < args.length; i += fieldCount) { + BeamSQLRow row = new BeamSQLRow(beamSQLRecordType); + for (int j = 0; j < fieldCount; j++) { + row.addField(j, args[i + j]); + } + rows.add(row); + } + return new MockedBeamSQLTable(protoRowType).withInputRecords(rows); + } + @Override public BeamIOType getSourceType() { return BeamIOType.UNBOUNDED; @@ -65,6 +128,10 @@ public class MockedBeamSQLTable extends BaseBeamTable { return new OutputStore(); } + public List<BeamSQLRow> getInputRecords() { + return inputRecords; + } + /** * Keep output in {@code CONTENT} for validation. * @@ -93,7 +160,6 @@ public class MockedBeamSQLTable extends BaseBeamTable { })); return PDone.in(input.getPipeline()); } - } } http://git-wip-us.apache.org/repos/asf/beam/blob/35abd097/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java new file mode 100644 index 0000000..11cec51 --- /dev/null +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.rel; + +import java.util.Date; +import java.util.List; + +import org.apache.beam.dsls.sql.exception.BeamSqlUnsupportedException; +import org.apache.beam.dsls.sql.planner.BeamSqlRunner; +import org.apache.beam.dsls.sql.planner.MockedBeamSQLTable; +import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.calcite.sql.type.SqlTypeName; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test for {@code BeamSortRel}. + */ +public class BeamSortRelTest { + public static BeamSqlRunner runner = new BeamSqlRunner(); + private static MockedBeamSQLTable subOrderRamTable = MockedBeamSQLTable.of( + SqlTypeName.BIGINT, "order_id", + SqlTypeName.INTEGER, "site_id", + SqlTypeName.DOUBLE, "price"); + + private static MockedBeamSQLTable orderDetailTable = MockedBeamSQLTable + .of(SqlTypeName.BIGINT, "order_id", + SqlTypeName.INTEGER, "site_id", + SqlTypeName.DOUBLE, "price", + SqlTypeName.TIMESTAMP, "order_time", + + 1L, 2, 1.0, new Date(), + 1L, 1, 2.0, new Date(), + 2L, 4, 3.0, new Date(), + 2L, 1, 4.0, new Date(), + 5L, 5, 5.0, new Date(), + 6L, 6, 6.0, new Date(), + 7L, 7, 7.0, new Date(), + 8L, 8888, 8.0, new Date(), + 8L, 999, 9.0, new Date(), + 10L, 100, 10.0, new Date()); + + @Test + public void testOrderBy_basic() throws Exception { + prepare(); + String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT " + + " order_id, site_id, price " + + "FROM ORDER_DETAILS " + + "ORDER BY order_id asc, site_id desc limit 4"; + + System.out.println(sql); + runner.submitQuery(sql); + + assertEquals( + MockedBeamSQLTable.of( + SqlTypeName.BIGINT, "order_id", + SqlTypeName.INTEGER, "site_id", + SqlTypeName.DOUBLE, "price", + 1L, 2, 1.0, + 1L, 1, 2.0, + 2L, 4, 3.0, + 2L, 1, 4.0 + ).getInputRecords(), MockedBeamSQLTable.CONTENT); + } + + @Test + public void testOrderBy_nullsFirst() throws Exception { + runner.addTable("ORDER_DETAILS", MockedBeamSQLTable + .of(SqlTypeName.BIGINT, "order_id", + SqlTypeName.INTEGER, "site_id", + SqlTypeName.DOUBLE, "price", + + 1L, 2, 1.0, + 1L, null, 2.0, + 2L, 1, 3.0, + 2L, null, 4.0, + 5L, 5, 5.0)); + runner.addTable("SUB_ORDER_RAM", MockedBeamSQLTable + .of(SqlTypeName.BIGINT, "order_id", + SqlTypeName.INTEGER, "site_id", + SqlTypeName.DOUBLE, "price")); + + String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT " + + " order_id, site_id, price " + + "FROM ORDER_DETAILS " + + "ORDER BY order_id asc, site_id desc NULLS FIRST limit 4"; + + runner.submitQuery(sql); + + assertEquals( + MockedBeamSQLTable.of( + SqlTypeName.BIGINT, "order_id", + SqlTypeName.INTEGER, "site_id", + SqlTypeName.DOUBLE, "price", + + 1L, null, 2.0, + 1L, 2, 1.0, + 2L, null, 4.0, + 2L, 1, 3.0 + ).getInputRecords(), MockedBeamSQLTable.CONTENT); + } + + @Test + public void testOrderBy_nullsLast() throws Exception { + runner.addTable("ORDER_DETAILS", MockedBeamSQLTable + .of(SqlTypeName.BIGINT, "order_id", + SqlTypeName.INTEGER, "site_id", + SqlTypeName.DOUBLE, "price", + + 1L, 2, 1.0, + 1L, null, 2.0, + 2L, 1, 3.0, + 2L, null, 4.0, + 5L, 5, 5.0)); + runner.addTable("SUB_ORDER_RAM", MockedBeamSQLTable + .of(SqlTypeName.BIGINT, "order_id", + SqlTypeName.INTEGER, "site_id", + SqlTypeName.DOUBLE, "price")); + + String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT " + + " order_id, site_id, price " + + "FROM ORDER_DETAILS " + + "ORDER BY order_id asc, site_id desc NULLS LAST limit 4"; + + runner.submitQuery(sql); + + assertEquals( + MockedBeamSQLTable.of( + SqlTypeName.BIGINT, "order_id", + SqlTypeName.INTEGER, "site_id", + SqlTypeName.DOUBLE, "price", + + 1L, 2, 1.0, + 1L, null, 2.0, + 2L, 1, 3.0, + 2L, null, 4.0 + ).getInputRecords(), MockedBeamSQLTable.CONTENT); + } + + @Test + public void testOrderBy_with_offset() throws Exception { + prepare(); + String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT " + + " order_id, site_id, price " + + "FROM ORDER_DETAILS " + + "ORDER BY order_id asc, site_id desc limit 4 offset 4"; + + runner.submitQuery(sql); + + assertEquals( + MockedBeamSQLTable.of( + SqlTypeName.BIGINT, "order_id", + SqlTypeName.INTEGER, "site_id", + SqlTypeName.DOUBLE, "price", + + 5L, 5, 5.0, + 6L, 6, 6.0, + 7L, 7, 7.0, + 8L, 8888, 8.0 + ).getInputRecords(), MockedBeamSQLTable.CONTENT); + } + + @Test + public void testOrderBy_bigFetch() throws Exception { + prepare(); + String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT " + + " order_id, site_id, price " + + "FROM ORDER_DETAILS " + + "ORDER BY order_id asc, site_id desc limit 11"; + + runner.submitQuery(sql); + + assertEquals( + MockedBeamSQLTable.of( + SqlTypeName.BIGINT, "order_id", + SqlTypeName.INTEGER, "site_id", + SqlTypeName.DOUBLE, "price", + + 1L, 2, 1.0, + 1L, 1, 2.0, + 2L, 4, 3.0, + 2L, 1, 4.0, + 5L, 5, 5.0, + 6L, 6, 6.0, + 7L, 7, 7.0, + 8L, 8888, 8.0, + 8L, 999, 9.0, + 10L, 100, 10.0 + ).getInputRecords(), MockedBeamSQLTable.CONTENT); + } + + @Test(expected = BeamSqlUnsupportedException.class) + public void testOrderBy_exception() throws Exception { + prepare(); + String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id) SELECT " + + " order_id, COUNT(*) " + + "FROM ORDER_DETAILS " + + "GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)" + + "ORDER BY order_id asc limit 11"; + + runner.submitQuery(sql); + } + + public static void prepare() { + runner.addTable("ORDER_DETAILS", orderDetailTable); + runner.addTable("SUB_ORDER_RAM", subOrderRamTable); + } + + private void assertEquals(List<BeamSQLRow> rows1, List<BeamSQLRow> rows2) { + Assert.assertEquals(rows1.size(), rows2.size()); + for (int i = 0; i < rows1.size(); i++) { + Assert.assertEquals(rows1.get(i), rows2.get(i)); + } + } +}