Repository: beam
Updated Branches:
  refs/heads/DSL_SQL 36a436ca0 -> f96f9f680


[BEAM-2255] Implement ORDER BY


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/35abd097
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/35abd097
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/35abd097

Branch: refs/heads/DSL_SQL
Commit: 35abd097ea327eea0f6bcd13068ce62f7d2bcc31
Parents: 36a436c
Author: James Xu <xumingmi...@gmail.com>
Authored: Fri May 12 01:07:18 2017 +0800
Committer: Jean-Baptiste Onofré <jbono...@apache.org>
Committed: Wed May 17 04:03:49 2017 +0200

----------------------------------------------------------------------
 .../beam/dsls/sql/planner/BeamRuleSets.java     |   3 +-
 .../apache/beam/dsls/sql/rel/BeamSortRel.java   | 242 +++++++++++++++++++
 .../apache/beam/dsls/sql/rule/BeamSortRule.java |  52 ++++
 .../apache/beam/dsls/sql/schema/BeamSQLRow.java |   7 +
 .../dsls/sql/planner/MockedBeamSQLTable.java    |  68 +++++-
 .../beam/dsls/sql/rel/BeamSortRelTest.java      | 231 ++++++++++++++++++
 6 files changed, 601 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/35abd097/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java
index acbd43f..2cac5ae 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java
@@ -27,6 +27,7 @@ import org.apache.beam.dsls.sql.rule.BeamFilterRule;
 import org.apache.beam.dsls.sql.rule.BeamIOSinkRule;
 import org.apache.beam.dsls.sql.rule.BeamIOSourceRule;
 import org.apache.beam.dsls.sql.rule.BeamProjectRule;
+import org.apache.beam.dsls.sql.rule.BeamSortRule;
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.tools.RuleSet;
@@ -40,7 +41,7 @@ public class BeamRuleSets {
   private static final ImmutableSet<RelOptRule> calciteToBeamConversionRules = 
ImmutableSet
       .<RelOptRule>builder().add(BeamIOSourceRule.INSTANCE, 
BeamProjectRule.INSTANCE,
           BeamFilterRule.INSTANCE, BeamIOSinkRule.INSTANCE,
-          BeamAggregationRule.INSTANCE)
+          BeamAggregationRule.INSTANCE, BeamSortRule.INSTANCE)
       .build();
 
   public static RuleSet[] getRuleSets() {

http://git-wip-us.apache.org/repos/asf/beam/blob/35abd097/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java
new file mode 100644
index 0000000..3df2f34
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.rel;
+
+import java.io.Serializable;
+import java.lang.reflect.Type;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.exception.BeamSqlUnsupportedException;
+import org.apache.beam.dsls.sql.planner.BeamPipelineCreator;
+import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.UnsupportedDataTypeException;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Top;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollationImpl;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamRelNode} to replace a {@code Sort} node.
+ *
+ * <p>Since Beam does not fully supported global sort we are using {@link Top} 
to implement
+ * the {@code Sort} algebra. The following types of ORDER BY are supported:
+
+ * <pre>{@code
+ *     select * from t order by id desc limit 10;
+ *     select * from t order by id desc limit 10, 5;
+ * }</pre>
+ *
+ * <p>but Order BY without a limit is NOT supported:
+ *
+ * <pre>{@code
+ *   select * from t order by id desc
+ * }</pre>
+ *
+ * <h3>Constraints</h3>
+ * <ul>
+ *   <li>Due to the constraints of {@link Top}, the result of a `ORDER BY 
LIMIT`
+ *   must fit into the memory of a single machine.</li>
+ *   <li>Since `WINDOW`(HOP, TUMBLE, SESSION etc) is always associated with 
`GroupBy`,
+ *   it does not make much sense to use `ORDER BY` with `WINDOW`.
+ *   </li>
+ * </ul>
+ */
+public class BeamSortRel extends Sort implements BeamRelNode {
+  private List<Integer> fieldIndices = new ArrayList<>();
+  private List<Boolean> orientation = new ArrayList<>();
+  private List<Boolean> nullsFirst = new ArrayList<>();
+
+  private int startIndex = 0;
+  private int count;
+
+  public BeamSortRel(
+      RelOptCluster cluster,
+      RelTraitSet traits,
+      RelNode child,
+      RelCollation collation,
+      RexNode offset,
+      RexNode fetch) {
+    super(cluster, traits, child, collation, offset, fetch);
+
+    List<RexNode> fieldExps = getChildExps();
+    RelCollationImpl collationImpl = (RelCollationImpl) collation;
+    List<RelFieldCollation> collations = collationImpl.getFieldCollations();
+    for (int i = 0; i < fieldExps.size(); i++) {
+      RexNode fieldExp = fieldExps.get(i);
+      RexInputRef inputRef = (RexInputRef) fieldExp;
+      fieldIndices.add(inputRef.getIndex());
+      orientation.add(collations.get(i).getDirection() == 
RelFieldCollation.Direction.ASCENDING);
+
+      RelFieldCollation.NullDirection rawNullDirection = 
collations.get(i).nullDirection;
+      if (rawNullDirection == RelFieldCollation.NullDirection.UNSPECIFIED) {
+        rawNullDirection = 
collations.get(i).getDirection().defaultNullDirection();
+      }
+      nullsFirst.add(rawNullDirection == 
RelFieldCollation.NullDirection.FIRST);
+    }
+
+    if (fetch == null) {
+      throw new BeamSqlUnsupportedException("ORDER BY without a LIMIT is not 
supported!");
+    }
+
+    RexLiteral fetchLiteral = (RexLiteral) fetch;
+    count = ((BigDecimal) fetchLiteral.getValue()).intValue();
+
+    if (offset != null) {
+      RexLiteral offsetLiteral = (RexLiteral) offset;
+      startIndex = ((BigDecimal) offsetLiteral.getValue()).intValue();
+    }
+  }
+
+  @Override public PCollection<BeamSQLRow> buildBeamPipeline(
+      BeamPipelineCreator planCreator) throws Exception {
+    RelNode input = getInput();
+    PCollection<BeamSQLRow> upstream = BeamSQLRelUtils.getBeamRelInput(input)
+        .buildBeamPipeline(planCreator);
+    Type windowType = upstream.getWindowingStrategy().getWindowFn()
+        .getWindowTypeDescriptor().getType();
+    if (!windowType.equals(GlobalWindow.class)) {
+      throw new BeamSqlUnsupportedException(
+          "`ORDER BY` is only supported for GlobalWindow, actual window: " + 
windowType);
+    }
+
+    BeamSQLRowComparator comparator = new BeamSQLRowComparator(fieldIndices, 
orientation,
+        nullsFirst);
+    // first find the top (offset + count)
+    PCollection<List<BeamSQLRow>> rawStream =
+        upstream.apply("extractTopOffsetAndFetch",
+            Top.of(startIndex + count, comparator).withoutDefaults());
+
+    // strip the `leading offset`
+    if (startIndex > 0) {
+      rawStream = rawStream.apply("stripLeadingOffset", ParDo.of(
+          new SubListFn<BeamSQLRow>(startIndex, startIndex + count)));
+    }
+
+    PCollection<BeamSQLRow> orderedStream = rawStream.apply(
+        "flatten", Flatten.<BeamSQLRow>iterables());
+    return orderedStream;
+  }
+
+  private static class SubListFn<T> extends DoFn<List<T>, List<T>> {
+    private int startIndex;
+    private int endIndex;
+
+    public SubListFn(int startIndex, int endIndex) {
+      this.startIndex = startIndex;
+      this.endIndex = endIndex;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext ctx) {
+      ctx.output(ctx.element().subList(startIndex, endIndex));
+    }
+  }
+
+  @Override public Sort copy(RelTraitSet traitSet, RelNode newInput, 
RelCollation newCollation,
+      RexNode offset, RexNode fetch) {
+    return new BeamSortRel(getCluster(), traitSet, newInput, newCollation, 
offset, fetch);
+  }
+
+  private static class BeamSQLRowComparator implements Comparator<BeamSQLRow>, 
Serializable {
+    private List<Integer> fieldsIndices;
+    private List<Boolean> orientation;
+    private List<Boolean> nullsFirst;
+
+    public BeamSQLRowComparator(List<Integer> fieldsIndices,
+        List<Boolean> orientation,
+        List<Boolean> nullsFirst) {
+      this.fieldsIndices = fieldsIndices;
+      this.orientation = orientation;
+      this.nullsFirst = nullsFirst;
+    }
+
+    @Override public int compare(BeamSQLRow row1, BeamSQLRow row2) {
+      for (int i = 0; i < fieldsIndices.size(); i++) {
+        int fieldIndex = fieldsIndices.get(i);
+        int fieldRet = 0;
+        SqlTypeName fieldType = 
row1.getDataType().getFieldsType().get(fieldIndex);
+        // whether NULL should be ordered first or last(compared to non-null 
values) depends on
+        // what user specified in SQL(NULLS FIRST/NULLS LAST)
+        if (row1.isNull(fieldIndex) && row2.isNull(fieldIndex)) {
+          continue;
+        } else if (row1.isNull(fieldIndex) && !row2.isNull(fieldIndex)) {
+          fieldRet = -1 * (nullsFirst.get(i) ? -1 : 1);
+        } else if (!row1.isNull(fieldIndex) && row2.isNull(fieldIndex)) {
+          fieldRet = 1 * (nullsFirst.get(i) ? -1 : 1);
+        } else {
+          switch (fieldType) {
+            case TINYINT:
+              fieldRet = numberCompare(row1.getByte(fieldIndex), 
row2.getByte(fieldIndex));
+              break;
+            case SMALLINT:
+              fieldRet = numberCompare(row1.getShort(fieldIndex), 
row2.getShort(fieldIndex));
+              break;
+            case INTEGER:
+              fieldRet = numberCompare(row1.getInteger(fieldIndex), 
row2.getInteger(fieldIndex));
+              break;
+            case BIGINT:
+              fieldRet = numberCompare(row1.getLong(fieldIndex), 
row2.getLong(fieldIndex));
+              break;
+            case FLOAT:
+              fieldRet = numberCompare(row1.getFloat(fieldIndex), 
row2.getFloat(fieldIndex));
+              break;
+            case DOUBLE:
+              fieldRet = numberCompare(row1.getDouble(fieldIndex), 
row2.getDouble(fieldIndex));
+              break;
+            case VARCHAR:
+              fieldRet = 
row1.getString(fieldIndex).compareTo(row2.getString(fieldIndex));
+              break;
+            case DATE:
+              fieldRet = 
row1.getDate(fieldIndex).compareTo(row2.getDate(fieldIndex));
+              break;
+            default:
+              throw new UnsupportedDataTypeException(fieldType);
+          }
+        }
+
+        fieldRet *= (orientation.get(i) ? -1 : 1);
+        if (fieldRet != 0) {
+          return fieldRet;
+        }
+      }
+      return 0;
+    }
+  }
+
+  public static <T extends Number & Comparable> int numberCompare(T a, T b) {
+    return a.compareTo(b);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/35abd097/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamSortRule.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamSortRule.java 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamSortRule.java
new file mode 100644
index 0000000..d802e9d
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamSortRule.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.rule;
+
+import org.apache.beam.dsls.sql.rel.BeamLogicalConvention;
+
+import org.apache.beam.dsls.sql.rel.BeamSortRel;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.logical.LogicalSort;
+
+/**
+ * {@code ConverterRule} to replace {@code Sort} with {@code BeamSortRel}.
+ */
+public class BeamSortRule extends ConverterRule {
+  public static final BeamSortRule INSTANCE = new BeamSortRule();
+  private BeamSortRule() {
+    super(LogicalSort.class, Convention.NONE,
+        BeamLogicalConvention.INSTANCE, "BeamSortRule");
+  }
+
+  @Override public RelNode convert(RelNode rel) {
+    Sort sort = (Sort) rel;
+    final RelNode input = sort.getInput();
+    return new BeamSortRel(
+        sort.getCluster(),
+        sort.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
+        convert(input, 
input.getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
+        sort.getCollation(),
+        sort.offset,
+        sort.fetch
+    );
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/35abd097/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java
index 5bdd5d2..7b6428e 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java
@@ -285,6 +285,13 @@ public class BeamSQLRow implements Serializable {
     return nullFields;
   }
 
+  /**
+   * is the specified field NULL?
+   */
+  public boolean isNull(int idx) {
+    return nullFields.contains(idx);
+  }
+
   public Instant getWindowStart() {
     return windowStart;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/35abd097/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java
index 611bd73..8ccb332 100644
--- 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java
@@ -19,8 +19,10 @@ package org.apache.beam.dsls.sql.planner;
 
 import java.util.ArrayList;
 import java.util.List;
+
 import org.apache.beam.dsls.sql.schema.BaseBeamTable;
 import org.apache.beam.dsls.sql.schema.BeamIOType;
+import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSQLRow;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -29,7 +31,10 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
  * A mock table use to check input/output.
@@ -50,6 +55,64 @@ public class MockedBeamSQLTable extends BaseBeamTable {
     return this;
   }
 
+  /**
+   * Convenient way to build a mocked table with mock data:
+   *
+   * <p>e.g.
+   *
+   * <pre>{@code
+   * MockedBeamSQLTable
+   *   .of(SqlTypeName.BIGINT, "order_id",
+   *       SqlTypeName.INTEGER, "site_id",
+   *       SqlTypeName.DOUBLE, "price",
+   *       SqlTypeName.TIMESTAMP, "order_time",
+   *
+   *       1L, 2, 1.0, new Date(),
+   *       1L, 1, 2.0, new Date(),
+   *       2L, 4, 3.0, new Date(),
+   *       2L, 1, 4.0, new Date(),
+   *       5L, 5, 5.0, new Date(),
+   *       6L, 6, 6.0, new Date(),
+   *       7L, 7, 7.0, new Date(),
+   *       8L, 8888, 8.0, new Date(),
+   *       8L, 999, 9.0, new Date(),
+   *       10L, 100, 10.0, new Date())
+   * }</pre>
+   */
+  public static MockedBeamSQLTable of(final Object... args){
+    final RelProtoDataType protoRowType = new RelProtoDataType() {
+      @Override
+      public RelDataType apply(RelDataTypeFactory a0) {
+        RelDataTypeFactory.FieldInfoBuilder builder = a0.builder();
+
+        int lastTypeIndex = 0;
+        for (; lastTypeIndex < args.length; lastTypeIndex += 2) {
+          if (args[lastTypeIndex] instanceof SqlTypeName) {
+            builder.add(args[lastTypeIndex + 1].toString(),
+                (SqlTypeName) args[lastTypeIndex]);
+          } else {
+            break;
+          }
+        }
+        return builder.build();
+      }
+    };
+
+    List<BeamSQLRow> rows = new ArrayList<>();
+    BeamSQLRecordType beamSQLRecordType = BeamSQLRecordType.from(
+        protoRowType.apply(BeamQueryPlanner.TYPE_FACTORY));
+    int fieldCount = beamSQLRecordType.size();
+
+    for (int i = fieldCount * 2; i < args.length; i += fieldCount) {
+      BeamSQLRow row = new BeamSQLRow(beamSQLRecordType);
+      for (int j = 0; j < fieldCount; j++) {
+        row.addField(j, args[i + j]);
+      }
+      rows.add(row);
+    }
+    return new MockedBeamSQLTable(protoRowType).withInputRecords(rows);
+  }
+
   @Override
   public BeamIOType getSourceType() {
     return BeamIOType.UNBOUNDED;
@@ -65,6 +128,10 @@ public class MockedBeamSQLTable extends BaseBeamTable {
     return new OutputStore();
   }
 
+  public List<BeamSQLRow> getInputRecords() {
+    return inputRecords;
+  }
+
   /**
    * Keep output in {@code CONTENT} for validation.
    *
@@ -93,7 +160,6 @@ public class MockedBeamSQLTable extends BaseBeamTable {
       }));
       return PDone.in(input.getPipeline());
     }
-
   }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/35abd097/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java
new file mode 100644
index 0000000..11cec51
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.rel;
+
+import java.util.Date;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.exception.BeamSqlUnsupportedException;
+import org.apache.beam.dsls.sql.planner.BeamSqlRunner;
+import org.apache.beam.dsls.sql.planner.MockedBeamSQLTable;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for {@code BeamSortRel}.
+ */
+public class BeamSortRelTest {
+  public static BeamSqlRunner runner = new BeamSqlRunner();
+  private static MockedBeamSQLTable subOrderRamTable = MockedBeamSQLTable.of(
+      SqlTypeName.BIGINT, "order_id",
+      SqlTypeName.INTEGER, "site_id",
+      SqlTypeName.DOUBLE, "price");
+
+  private static MockedBeamSQLTable orderDetailTable = MockedBeamSQLTable
+      .of(SqlTypeName.BIGINT, "order_id",
+          SqlTypeName.INTEGER, "site_id",
+          SqlTypeName.DOUBLE, "price",
+          SqlTypeName.TIMESTAMP, "order_time",
+
+          1L, 2, 1.0, new Date(),
+          1L, 1, 2.0, new Date(),
+          2L, 4, 3.0, new Date(),
+          2L, 1, 4.0, new Date(),
+          5L, 5, 5.0, new Date(),
+          6L, 6, 6.0, new Date(),
+          7L, 7, 7.0, new Date(),
+          8L, 8888, 8.0, new Date(),
+          8L, 999, 9.0, new Date(),
+          10L, 100, 10.0, new Date());
+
+  @Test
+  public void testOrderBy_basic() throws Exception {
+    prepare();
+    String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price)  SELECT "
+        + " order_id, site_id, price "
+        + "FROM ORDER_DETAILS "
+        + "ORDER BY order_id asc, site_id desc limit 4";
+
+    System.out.println(sql);
+    runner.submitQuery(sql);
+
+    assertEquals(
+        MockedBeamSQLTable.of(
+            SqlTypeName.BIGINT, "order_id",
+            SqlTypeName.INTEGER, "site_id",
+            SqlTypeName.DOUBLE, "price",
+            1L, 2, 1.0,
+            1L, 1, 2.0,
+            2L, 4, 3.0,
+            2L, 1, 4.0
+        ).getInputRecords(), MockedBeamSQLTable.CONTENT);
+  }
+
+  @Test
+  public void testOrderBy_nullsFirst() throws Exception {
+    runner.addTable("ORDER_DETAILS", MockedBeamSQLTable
+        .of(SqlTypeName.BIGINT, "order_id",
+            SqlTypeName.INTEGER, "site_id",
+            SqlTypeName.DOUBLE, "price",
+
+            1L, 2, 1.0,
+            1L, null, 2.0,
+            2L, 1, 3.0,
+            2L, null, 4.0,
+            5L, 5, 5.0));
+    runner.addTable("SUB_ORDER_RAM", MockedBeamSQLTable
+        .of(SqlTypeName.BIGINT, "order_id",
+            SqlTypeName.INTEGER, "site_id",
+            SqlTypeName.DOUBLE, "price"));
+
+    String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price)  SELECT "
+        + " order_id, site_id, price "
+        + "FROM ORDER_DETAILS "
+        + "ORDER BY order_id asc, site_id desc NULLS FIRST limit 4";
+
+    runner.submitQuery(sql);
+
+    assertEquals(
+        MockedBeamSQLTable.of(
+            SqlTypeName.BIGINT, "order_id",
+            SqlTypeName.INTEGER, "site_id",
+            SqlTypeName.DOUBLE, "price",
+
+            1L, null, 2.0,
+            1L, 2, 1.0,
+            2L, null, 4.0,
+            2L, 1, 3.0
+        ).getInputRecords(), MockedBeamSQLTable.CONTENT);
+  }
+
+  @Test
+  public void testOrderBy_nullsLast() throws Exception {
+    runner.addTable("ORDER_DETAILS", MockedBeamSQLTable
+        .of(SqlTypeName.BIGINT, "order_id",
+            SqlTypeName.INTEGER, "site_id",
+            SqlTypeName.DOUBLE, "price",
+
+            1L, 2, 1.0,
+            1L, null, 2.0,
+            2L, 1, 3.0,
+            2L, null, 4.0,
+            5L, 5, 5.0));
+    runner.addTable("SUB_ORDER_RAM", MockedBeamSQLTable
+        .of(SqlTypeName.BIGINT, "order_id",
+            SqlTypeName.INTEGER, "site_id",
+            SqlTypeName.DOUBLE, "price"));
+
+    String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price)  SELECT "
+        + " order_id, site_id, price "
+        + "FROM ORDER_DETAILS "
+        + "ORDER BY order_id asc, site_id desc NULLS LAST limit 4";
+
+    runner.submitQuery(sql);
+
+    assertEquals(
+        MockedBeamSQLTable.of(
+            SqlTypeName.BIGINT, "order_id",
+            SqlTypeName.INTEGER, "site_id",
+            SqlTypeName.DOUBLE, "price",
+
+            1L, 2, 1.0,
+            1L, null, 2.0,
+            2L, 1, 3.0,
+            2L, null, 4.0
+        ).getInputRecords(), MockedBeamSQLTable.CONTENT);
+  }
+
+  @Test
+  public void testOrderBy_with_offset() throws Exception {
+    prepare();
+    String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price)  SELECT "
+        + " order_id, site_id, price "
+        + "FROM ORDER_DETAILS "
+        + "ORDER BY order_id asc, site_id desc limit 4 offset 4";
+
+    runner.submitQuery(sql);
+
+    assertEquals(
+        MockedBeamSQLTable.of(
+            SqlTypeName.BIGINT, "order_id",
+            SqlTypeName.INTEGER, "site_id",
+            SqlTypeName.DOUBLE, "price",
+
+            5L, 5, 5.0,
+            6L, 6, 6.0,
+            7L, 7, 7.0,
+            8L, 8888, 8.0
+        ).getInputRecords(), MockedBeamSQLTable.CONTENT);
+  }
+
+  @Test
+  public void testOrderBy_bigFetch() throws Exception {
+    prepare();
+    String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price)  SELECT "
+        + " order_id, site_id, price "
+        + "FROM ORDER_DETAILS "
+        + "ORDER BY order_id asc, site_id desc limit 11";
+
+    runner.submitQuery(sql);
+
+    assertEquals(
+        MockedBeamSQLTable.of(
+            SqlTypeName.BIGINT, "order_id",
+            SqlTypeName.INTEGER, "site_id",
+            SqlTypeName.DOUBLE, "price",
+
+            1L, 2, 1.0,
+            1L, 1, 2.0,
+            2L, 4, 3.0,
+            2L, 1, 4.0,
+            5L, 5, 5.0,
+            6L, 6, 6.0,
+            7L, 7, 7.0,
+            8L, 8888, 8.0,
+            8L, 999, 9.0,
+            10L, 100, 10.0
+        ).getInputRecords(), MockedBeamSQLTable.CONTENT);
+  }
+
+  @Test(expected = BeamSqlUnsupportedException.class)
+  public void testOrderBy_exception() throws Exception {
+    prepare();
+    String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id)  SELECT "
+        + " order_id, COUNT(*) "
+        + "FROM ORDER_DETAILS "
+        + "GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)"
+        + "ORDER BY order_id asc limit 11";
+
+    runner.submitQuery(sql);
+  }
+
+  public static void prepare() {
+    runner.addTable("ORDER_DETAILS", orderDetailTable);
+    runner.addTable("SUB_ORDER_RAM", subOrderRamTable);
+  }
+
+  private void assertEquals(List<BeamSQLRow> rows1, List<BeamSQLRow> rows2) {
+    Assert.assertEquals(rows1.size(), rows2.size());
+    for (int i = 0; i < rows1.size(); i++) {
+      Assert.assertEquals(rows1.get(i), rows2.get(i));
+    }
+  }
+}

Reply via email to