Repository: beam Updated Branches: refs/heads/DSL_SQL ab4b11886 -> 2096da25e
[BEAM-2193] Implement FULL, INNER, and OUTER JOIN: - FULL and INNER supported on all variations of unbounded/bounded joins. - OUTER JOIN supported when outer side is unbounded. - Unbounded/bounded joins implemented via side inputs. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/928cec59 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/928cec59 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/928cec59 Branch: refs/heads/DSL_SQL Commit: 928cec597175c363d444331b35ac8793297a242b Parents: ab4b118 Author: James Xu <xumingmi...@gmail.com> Authored: Mon May 29 11:11:34 2017 +0800 Committer: Tyler Akidau <taki...@apache.org> Committed: Thu Jun 29 16:32:23 2017 -0700 ---------------------------------------------------------------------- dsls/pom.xml | 2 +- dsls/sql/pom.xml | 16 +- .../beam/dsls/sql/planner/BeamRuleSets.java | 6 +- .../beam/dsls/sql/rel/BeamAggregationRel.java | 19 +- .../apache/beam/dsls/sql/rel/BeamJoinRel.java | 305 +++++++++++++++++++ .../apache/beam/dsls/sql/rule/BeamJoinRule.java | 53 ++++ .../beam/dsls/sql/schema/BeamSqlRecordType.java | 2 +- .../apache/beam/dsls/sql/schema/BeamSqlRow.java | 2 +- .../beam/dsls/sql/schema/BeamSqlRowCoder.java | 3 - .../dsls/sql/transform/BeamJoinTransforms.java | 166 ++++++++++ .../org/apache/beam/dsls/sql/TestUtils.java | 125 ++++++++ .../dsls/sql/planner/MockedBeamSqlTable.java | 5 +- .../beam/dsls/sql/planner/MockedTable.java | 33 ++ .../dsls/sql/planner/MockedUnboundedTable.java | 120 ++++++++ .../rel/BeamJoinRelBoundedVsBoundedTest.java | 195 ++++++++++++ .../rel/BeamJoinRelUnboundedVsBoundedTest.java | 242 +++++++++++++++ .../BeamJoinRelUnboundedVsUnboundedTest.java | 219 +++++++++++++ .../dsls/sql/schema/BeamSqlRowCoderTest.java | 2 +- 18 files changed, 1486 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/928cec59/dsls/pom.xml ---------------------------------------------------------------------- diff --git a/dsls/pom.xml b/dsls/pom.xml index d932698..a518d03 100644 --- a/dsls/pom.xml +++ b/dsls/pom.xml @@ -66,7 +66,7 @@ </plugin> </plugins> </pluginManagement> - + <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> http://git-wip-us.apache.org/repos/asf/beam/blob/928cec59/dsls/sql/pom.xml ---------------------------------------------------------------------- diff --git a/dsls/sql/pom.xml b/dsls/sql/pom.xml index a2279d5..54f590e 100644 --- a/dsls/sql/pom.xml +++ b/dsls/sql/pom.xml @@ -157,6 +157,11 @@ <scope>provided</scope> </dependency> <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>0.9.0.1</version> + </dependency> + <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </dependency> @@ -193,21 +198,18 @@ <artifactId>joda-time</artifactId> </dependency> <dependency> - <groupId>org.apache.kafka</groupId> - <artifactId>kafka-clients</artifactId> - <version>0.10.1.0</version> - <scope>provided</scope> - </dependency> - <dependency> <groupId>com.google.auto.value</groupId> <artifactId>auto-value</artifactId> <scope>provided</scope> </dependency> - <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-runners-direct-java</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-extensions-join-library</artifactId> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/beam/blob/928cec59/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java index 6c73558..552ff8f 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java @@ -19,15 +19,14 @@ package org.apache.beam.dsls.sql.planner; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; - import java.util.Iterator; - import org.apache.beam.dsls.sql.rel.BeamRelNode; import org.apache.beam.dsls.sql.rule.BeamAggregationRule; import org.apache.beam.dsls.sql.rule.BeamFilterRule; import org.apache.beam.dsls.sql.rule.BeamIOSinkRule; import org.apache.beam.dsls.sql.rule.BeamIOSourceRule; import org.apache.beam.dsls.sql.rule.BeamIntersectRule; +import org.apache.beam.dsls.sql.rule.BeamJoinRule; import org.apache.beam.dsls.sql.rule.BeamMinusRule; import org.apache.beam.dsls.sql.rule.BeamProjectRule; import org.apache.beam.dsls.sql.rule.BeamSortRule; @@ -47,7 +46,8 @@ public class BeamRuleSets { .<RelOptRule>builder().add(BeamIOSourceRule.INSTANCE, BeamProjectRule.INSTANCE, BeamFilterRule.INSTANCE, BeamIOSinkRule.INSTANCE, BeamAggregationRule.INSTANCE, BeamSortRule.INSTANCE, BeamValuesRule.INSTANCE, - BeamIntersectRule.INSTANCE, BeamMinusRule.INSTANCE, BeamUnionRule.INSTANCE) + BeamIntersectRule.INSTANCE, BeamMinusRule.INSTANCE, BeamUnionRule.INSTANCE, + BeamJoinRule.INSTANCE) .build(); public static RuleSet[] getRuleSets() { http://git-wip-us.apache.org/repos/asf/beam/blob/928cec59/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java index 701f620..9ec9e9f 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java @@ -74,40 +74,41 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode { public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections , BeamSqlEnv sqlEnv) throws Exception { RelNode input = getInput(); - String stageName = BeamSqlRelUtils.getStageName(this); + String stageName = BeamSqlRelUtils.getStageName(this) + "_"; PCollection<BeamSqlRow> upstream = BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv); if (windowFieldIdx != -1) { - upstream = upstream.apply(stageName + "_assignEventTimestamp", WithTimestamps - .<BeamSqlRow>of(new BeamAggregationTransforms.WindowTimestampFn(windowFieldIdx))) + upstream = upstream.apply(stageName + "assignEventTimestamp", WithTimestamps + .of(new BeamAggregationTransforms.WindowTimestampFn(windowFieldIdx))) .setCoder(upstream.getCoder()); } - PCollection<BeamSqlRow> windowStream = upstream.apply(stageName + "_window", - Window.<BeamSqlRow>into(windowFn) + PCollection<BeamSqlRow> windowStream = upstream.apply(stageName + "window", + Window.into(windowFn) .triggering(trigger) .withAllowedLateness(allowedLatence) .accumulatingFiredPanes()); BeamSqlRowCoder keyCoder = new BeamSqlRowCoder(exKeyFieldsSchema(input.getRowType())); PCollection<KV<BeamSqlRow, BeamSqlRow>> exCombineByStream = windowStream.apply( - stageName + "_exCombineBy", + stageName + "exCombineBy", WithKeys .of(new BeamAggregationTransforms.AggregationGroupByKeyFn( windowFieldIdx, groupSet))) - .setCoder(KvCoder.<BeamSqlRow, BeamSqlRow>of(keyCoder, upstream.getCoder())); + .setCoder(KvCoder.of(keyCoder, upstream.getCoder())); + BeamSqlRowCoder aggCoder = new BeamSqlRowCoder(exAggFieldsSchema()); PCollection<KV<BeamSqlRow, BeamSqlRow>> aggregatedStream = exCombineByStream.apply( - stageName + "_combineBy", + stageName + "combineBy", Combine.<BeamSqlRow, BeamSqlRow, BeamSqlRow>perKey( new BeamAggregationTransforms.AggregationCombineFn(getAggCallList(), CalciteUtils.toBeamRecordType(input.getRowType())))) .setCoder(KvCoder.of(keyCoder, aggCoder)); - PCollection<BeamSqlRow> mergedStream = aggregatedStream.apply(stageName + "_mergeRecord", + PCollection<BeamSqlRow> mergedStream = aggregatedStream.apply(stageName + "mergeRecord", ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord( CalciteUtils.toBeamRecordType(getRowType()), getAggCallList()))); mergedStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRecordType(getRowType()))); http://git-wip-us.apache.org/repos/asf/beam/blob/928cec59/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java new file mode 100644 index 0000000..e85368e --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.rel; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.beam.dsls.sql.BeamSqlEnv; +import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; +import org.apache.beam.dsls.sql.transform.BeamJoinTransforms; +import org.apache.beam.dsls.sql.utils.CalciteUtils; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.transforms.windowing.IncompatibleWindowException; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.CorrelationId; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.util.Pair; + +/** + * {@code BeamRelNode} to replace a {@code Join} node. + * + * <p>Support for join can be categorized into 3 cases: + * <ul> + * <li>BoundedTable JOIN BoundedTable</li> + * <li>UnboundedTable JOIN UnboundedTable</li> + * <li>BoundedTable JOIN UnboundedTable</li> + * </ul> + * + * <p>For the first two cases, a standard join is utilized as long as the windowFn of the both + * sides match. + * + * <p>For the third case, {@code sideInput} is utilized to implement the join, so there are some + * constraints: + * + * <ul> + * <li>{@code FULL OUTER JOIN} is not supported.</li> + * <li>If it's a {@code LEFT OUTER JOIN}, the unbounded table should on the left side.</li> + * <li>If it's a {@code RIGHT OUTER JOIN}, the unbounded table should on the right side.</li> + * </ul> + * + * + * <p>There are also some general constraints: + * + * <ul> + * <li>Only equi-join is supported.</li> + * <li>CROSS JOIN is not supported.</li> + * </ul> + */ +public class BeamJoinRel extends Join implements BeamRelNode { + public BeamJoinRel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, + RexNode condition, Set<CorrelationId> variablesSet, JoinRelType joinType) { + super(cluster, traits, left, right, condition, variablesSet, joinType); + } + + @Override public Join copy(RelTraitSet traitSet, RexNode conditionExpr, RelNode left, + RelNode right, JoinRelType joinType, boolean semiJoinDone) { + return new BeamJoinRel(getCluster(), traitSet, left, right, conditionExpr, variablesSet, + joinType); + } + + @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections, + BeamSqlEnv sqlEnv) + throws Exception { + BeamRelNode leftRelNode = BeamSqlRelUtils.getBeamRelInput(left); + BeamSqlRecordType leftRowType = CalciteUtils.toBeamRecordType(left.getRowType()); + PCollection<BeamSqlRow> leftRows = leftRelNode.buildBeamPipeline(inputPCollections, sqlEnv); + leftRows.setCoder(new BeamSqlRowCoder(leftRowType)); + + final BeamRelNode rightRelNode = BeamSqlRelUtils.getBeamRelInput(right); + BeamSqlRecordType rightRowType = CalciteUtils.toBeamRecordType(right.getRowType()); + PCollection<BeamSqlRow> rightRows = rightRelNode.buildBeamPipeline(inputPCollections, sqlEnv); + rightRows.setCoder(new BeamSqlRowCoder(rightRowType)); + + String stageName = BeamSqlRelUtils.getStageName(this); + WindowFn leftWinFn = leftRows.getWindowingStrategy().getWindowFn(); + WindowFn rightWinFn = rightRows.getWindowingStrategy().getWindowFn(); + + // extract the join fields + List<Pair<Integer, Integer>> pairs = extractJoinColumns( + leftRelNode.getRowType().getFieldCount()); + + // build the extract key type + // the name of the join field is not important + List<String> names = new ArrayList<>(pairs.size()); + List<Integer> types = new ArrayList<>(pairs.size()); + for (int i = 0; i < pairs.size(); i++) { + names.add("c" + i); + types.add(leftRowType.getFieldsType().get(pairs.get(i).getKey())); + } + BeamSqlRecordType extractKeyRowType = BeamSqlRecordType.create(names, types); + + Coder extractKeyRowCoder = new BeamSqlRowCoder(extractKeyRowType); + + // BeamSqlRow -> KV<BeamSqlRow, BeamSqlRow> + PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedLeftRows = leftRows + .apply(stageName + "_left_ExtractJoinFields", + MapElements.via(new BeamJoinTransforms.ExtractJoinFields(true, pairs))) + .setCoder(KvCoder.of(extractKeyRowCoder, leftRows.getCoder())); + + PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedRightRows = rightRows + .apply(stageName + "_right_ExtractJoinFields", + MapElements.via(new BeamJoinTransforms.ExtractJoinFields(false, pairs))) + .setCoder(KvCoder.of(extractKeyRowCoder, rightRows.getCoder())); + + // prepare the NullRows + BeamSqlRow leftNullRow = buildNullRow(leftRelNode); + BeamSqlRow rightNullRow = buildNullRow(rightRelNode); + + // a regular join + if ((leftRows.isBounded() == PCollection.IsBounded.BOUNDED + && rightRows.isBounded() == PCollection.IsBounded.BOUNDED) + || (leftRows.isBounded() == PCollection.IsBounded.UNBOUNDED + && rightRows.isBounded() == PCollection.IsBounded.UNBOUNDED)) { + try { + leftWinFn.verifyCompatibility(rightWinFn); + } catch (IncompatibleWindowException e) { + throw new IllegalArgumentException( + "WindowFns must match for a bounded-vs-bounded/unbounded-vs-unbounded join.", e); + } + + return standardJoin(extractedLeftRows, extractedRightRows, + leftNullRow, rightNullRow, stageName); + } else if ( + (leftRows.isBounded() == PCollection.IsBounded.BOUNDED + && rightRows.isBounded() == PCollection.IsBounded.UNBOUNDED) + || (leftRows.isBounded() == PCollection.IsBounded.UNBOUNDED + && rightRows.isBounded() == PCollection.IsBounded.BOUNDED) + ) { + // if one of the sides is Bounded & the other is Unbounded + // then do a sideInput join + // when doing a sideInput join, the windowFn does not need to match + // Only support INNER JOIN & LEFT OUTER JOIN where left side of the join must be + // the unbounded + if (joinType == JoinRelType.FULL) { + throw new UnsupportedOperationException("FULL OUTER JOIN is not supported when join " + + "a bounded table with an unbounded table."); + } + + if ((joinType == JoinRelType.LEFT + && leftRows.isBounded() == PCollection.IsBounded.BOUNDED) + || (joinType == JoinRelType.RIGHT + && rightRows.isBounded() == PCollection.IsBounded.BOUNDED)) { + throw new UnsupportedOperationException( + "LEFT side of an OUTER JOIN must be Unbounded table."); + } + + return sideInputJoin(extractedLeftRows, extractedRightRows, + leftNullRow, rightNullRow); + } else { + throw new UnsupportedOperationException( + "The inputs to the JOIN have un-joinnable windowFns: " + leftWinFn + ", " + rightWinFn); + } + } + + private PCollection<BeamSqlRow> standardJoin( + PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedLeftRows, + PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedRightRows, + BeamSqlRow leftNullRow, BeamSqlRow rightNullRow, String stageName) { + PCollection<KV<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>>> joinedRows = null; + switch (joinType) { + case LEFT: + joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join + .leftOuterJoin(extractedLeftRows, extractedRightRows, rightNullRow); + break; + case RIGHT: + joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join + .rightOuterJoin(extractedLeftRows, extractedRightRows, leftNullRow); + break; + case FULL: + joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join + .fullOuterJoin(extractedLeftRows, extractedRightRows, leftNullRow, + rightNullRow); + break; + case INNER: + default: + joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join + .innerJoin(extractedLeftRows, extractedRightRows); + break; + } + + PCollection<BeamSqlRow> ret = joinedRows + .apply(stageName + "_JoinParts2WholeRow", + MapElements.via(new BeamJoinTransforms.JoinParts2WholeRow())) + .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRecordType(getRowType()))); + return ret; + } + + public PCollection<BeamSqlRow> sideInputJoin( + PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedLeftRows, + PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedRightRows, + BeamSqlRow leftNullRow, BeamSqlRow rightNullRow) { + // we always make the Unbounded table on the left to do the sideInput join + // (will convert the result accordingly before return) + boolean swapped = (extractedLeftRows.isBounded() == PCollection.IsBounded.BOUNDED); + JoinRelType realJoinType = + (swapped && joinType != JoinRelType.INNER) ? JoinRelType.LEFT : joinType; + + PCollection<KV<BeamSqlRow, BeamSqlRow>> realLeftRows = + swapped ? extractedRightRows : extractedLeftRows; + PCollection<KV<BeamSqlRow, BeamSqlRow>> realRightRows = + swapped ? extractedLeftRows : extractedRightRows; + BeamSqlRow realRightNullRow = swapped ? leftNullRow : rightNullRow; + + // swapped still need to pass down because, we need to swap the result back. + return sideInputJoinHelper(realJoinType, realLeftRows, realRightRows, + realRightNullRow, swapped); + } + + private PCollection<BeamSqlRow> sideInputJoinHelper( + JoinRelType joinType, + PCollection<KV<BeamSqlRow, BeamSqlRow>> leftRows, + PCollection<KV<BeamSqlRow, BeamSqlRow>> rightRows, + BeamSqlRow rightNullRow, boolean swapped) { + final PCollectionView<Map<BeamSqlRow, Iterable<BeamSqlRow>>> rowsView = rightRows + .apply(View.<BeamSqlRow, BeamSqlRow>asMultimap()); + + PCollection<BeamSqlRow> ret = leftRows + .apply(ParDo.of(new BeamJoinTransforms.SideInputJoinDoFn( + joinType, rightNullRow, rowsView, swapped)).withSideInputs(rowsView)) + .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRecordType(getRowType()))); + + return ret; + } + + private BeamSqlRow buildNullRow(BeamRelNode relNode) { + BeamSqlRecordType leftType = CalciteUtils.toBeamRecordType(relNode.getRowType()); + BeamSqlRow nullRow = new BeamSqlRow(leftType); + for (int i = 0; i < leftType.size(); i++) { + nullRow.addField(i, null); + } + return nullRow; + } + + private List<Pair<Integer, Integer>> extractJoinColumns(int leftRowColumnCount) { + // it's a CROSS JOIN because: condition == true + if (condition instanceof RexLiteral && (Boolean) ((RexLiteral) condition).getValue()) { + throw new UnsupportedOperationException("CROSS JOIN is not supported!"); + } + + RexCall call = (RexCall) condition; + List<Pair<Integer, Integer>> pairs = new ArrayList<>(); + if ("AND".equals(call.getOperator().getName())) { + List<RexNode> operands = call.getOperands(); + for (RexNode rexNode : operands) { + Pair<Integer, Integer> pair = extractOneJoinColumn((RexCall) rexNode, leftRowColumnCount); + pairs.add(pair); + } + } else if ("=".equals(call.getOperator().getName())) { + pairs.add(extractOneJoinColumn(call, leftRowColumnCount)); + } else { + throw new UnsupportedOperationException( + "Operator " + call.getOperator().getName() + " is not supported in join condition"); + } + + return pairs; + } + + private Pair<Integer, Integer> extractOneJoinColumn(RexCall oneCondition, + int leftRowColumnCount) { + List<RexNode> operands = oneCondition.getOperands(); + final int leftIndex = Math.min(((RexInputRef) operands.get(0)).getIndex(), + ((RexInputRef) operands.get(1)).getIndex()); + + final int rightIndex1 = Math.max(((RexInputRef) operands.get(0)).getIndex(), + ((RexInputRef) operands.get(1)).getIndex()); + final int rightIndex = rightIndex1 - leftRowColumnCount; + + return new Pair<>(leftIndex, rightIndex); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/928cec59/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamJoinRule.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamJoinRule.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamJoinRule.java new file mode 100644 index 0000000..78253fe --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamJoinRule.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.rule; + +import org.apache.beam.dsls.sql.rel.BeamJoinRel; +import org.apache.beam.dsls.sql.rel.BeamLogicalConvention; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.logical.LogicalJoin; + +/** + * {@code ConverterRule} to replace {@code Join} with {@code BeamJoinRel}. + */ +public class BeamJoinRule extends ConverterRule { + public static final BeamJoinRule INSTANCE = new BeamJoinRule(); + private BeamJoinRule() { + super(LogicalJoin.class, Convention.NONE, + BeamLogicalConvention.INSTANCE, "BeamJoinRule"); + } + + @Override public RelNode convert(RelNode rel) { + Join join = (Join) rel; + return new BeamJoinRel( + join.getCluster(), + join.getTraitSet().replace(BeamLogicalConvention.INSTANCE), + convert(join.getLeft(), + join.getLeft().getTraitSet().replace(BeamLogicalConvention.INSTANCE)), + convert(join.getRight(), + join.getRight().getTraitSet().replace(BeamLogicalConvention.INSTANCE)), + join.getCondition(), + join.getVariablesSet(), + join.getJoinType() + ); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/928cec59/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java index 9fc3945..52bd652 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java @@ -31,7 +31,7 @@ public abstract class BeamSqlRecordType implements Serializable { public abstract List<Integer> getFieldsType(); public static BeamSqlRecordType create(List<String> fieldNames, List<Integer> fieldTypes) { - return new AutoValue_BeamSqlRecordType(fieldNames, fieldTypes); + return new org.apache.beam.dsls.sql.schema.AutoValue_BeamSqlRecordType(fieldNames, fieldTypes); } public int size() { http://git-wip-us.apache.org/repos/asf/beam/blob/928cec59/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java index 213dcd5..2d7e350 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java @@ -365,6 +365,6 @@ public class BeamSqlRow implements Serializable { } @Override public int hashCode() { - return toString().hashCode(); + return 31 * (31 * dataType.hashCode() + dataValues.hashCode()) + nullFields.hashCode(); } } http://git-wip-us.apache.org/repos/asf/beam/blob/928cec59/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java index e86fb3f..d53ba8d 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java @@ -23,7 +23,6 @@ import java.io.OutputStream; import java.util.Date; import java.util.GregorianCalendar; import java.util.List; - import org.apache.beam.dsls.sql.utils.CalciteUtils; import org.apache.beam.sdk.coders.BigDecimalCoder; import org.apache.beam.sdk.coders.BigEndianIntegerCoder; @@ -58,7 +57,6 @@ public class BeamSqlRowCoder extends CustomCoder<BeamSqlRow> { @Override public void encode(BeamSqlRow value, OutputStream outStream) throws CoderException, IOException { listCoder.encode(value.getNullFields(), outStream); - for (int idx = 0; idx < value.size(); ++idx) { if (value.getNullFields().contains(idx)) { continue; @@ -113,7 +111,6 @@ public class BeamSqlRowCoder extends CustomCoder<BeamSqlRow> { BeamSqlRow record = new BeamSqlRow(tableSchema); record.setNullFields(nullFields); - for (int idx = 0; idx < tableSchema.size(); ++idx) { if (nullFields.contains(idx)) { continue; http://git-wip-us.apache.org/repos/asf/beam/blob/928cec59/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamJoinTransforms.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamJoinTransforms.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamJoinTransforms.java new file mode 100644 index 0000000..8169b83 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamJoinTransforms.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.transform; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.util.Pair; + +/** + * Collections of {@code PTransform} and {@code DoFn} used to perform JOIN operation. + */ +public class BeamJoinTransforms { + + /** + * A {@code SimpleFunction} to extract join fields from the specified row. + */ + public static class ExtractJoinFields + extends SimpleFunction<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>> { + private final boolean isLeft; + private final List<Pair<Integer, Integer>> joinColumns; + + public ExtractJoinFields(boolean isLeft, List<Pair<Integer, Integer>> joinColumns) { + this.isLeft = isLeft; + this.joinColumns = joinColumns; + } + + @Override public KV<BeamSqlRow, BeamSqlRow> apply(BeamSqlRow input) { + // build the type + // the name of the join field is not important + List<String> names = new ArrayList<>(joinColumns.size()); + List<Integer> types = new ArrayList<>(joinColumns.size()); + for (int i = 0; i < joinColumns.size(); i++) { + names.add("c" + i); + types.add(isLeft + ? input.getDataType().getFieldsType().get(joinColumns.get(i).getKey()) : + input.getDataType().getFieldsType().get(joinColumns.get(i).getValue())); + } + BeamSqlRecordType type = BeamSqlRecordType.create(names, types); + + // build the row + BeamSqlRow row = new BeamSqlRow(type); + for (int i = 0; i < joinColumns.size(); i++) { + row.addField(i, input + .getFieldValue(isLeft ? joinColumns.get(i).getKey() : joinColumns.get(i).getValue())); + } + return KV.of(row, input); + } + } + + + /** + * A {@code DoFn} which implement the sideInput-JOIN. + */ + public static class SideInputJoinDoFn extends DoFn<KV<BeamSqlRow, BeamSqlRow>, BeamSqlRow> { + private final PCollectionView<Map<BeamSqlRow, Iterable<BeamSqlRow>>> sideInputView; + private final JoinRelType joinType; + private final BeamSqlRow rightNullRow; + private final boolean swap; + + public SideInputJoinDoFn(JoinRelType joinType, BeamSqlRow rightNullRow, + PCollectionView<Map<BeamSqlRow, Iterable<BeamSqlRow>>> sideInputView, + boolean swap) { + this.joinType = joinType; + this.rightNullRow = rightNullRow; + this.sideInputView = sideInputView; + this.swap = swap; + } + + @ProcessElement public void processElement(ProcessContext context) { + BeamSqlRow key = context.element().getKey(); + BeamSqlRow leftRow = context.element().getValue(); + Map<BeamSqlRow, Iterable<BeamSqlRow>> key2Rows = context.sideInput(sideInputView); + Iterable<BeamSqlRow> rightRowsIterable = key2Rows.get(key); + + if (rightRowsIterable != null && rightRowsIterable.iterator().hasNext()) { + Iterator<BeamSqlRow> it = rightRowsIterable.iterator(); + while (it.hasNext()) { + context.output(combineTwoRowsIntoOne(leftRow, it.next(), swap)); + } + } else { + if (joinType == JoinRelType.LEFT) { + context.output(combineTwoRowsIntoOne(leftRow, rightNullRow, swap)); + } + } + } + } + + + /** + * A {@code SimpleFunction} to combine two rows into one. + */ + public static class JoinParts2WholeRow + extends SimpleFunction<KV<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>>, BeamSqlRow> { + @Override public BeamSqlRow apply(KV<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>> input) { + KV<BeamSqlRow, BeamSqlRow> parts = input.getValue(); + BeamSqlRow leftRow = parts.getKey(); + BeamSqlRow rightRow = parts.getValue(); + return combineTwoRowsIntoOne(leftRow, rightRow, false); + } + } + + /** + * As the method name suggests: combine two rows into one wide row. + */ + private static BeamSqlRow combineTwoRowsIntoOne(BeamSqlRow leftRow, + BeamSqlRow rightRow, boolean swap) { + if (swap) { + return combineTwoRowsIntoOneHelper(rightRow, leftRow); + } else { + return combineTwoRowsIntoOneHelper(leftRow, rightRow); + } + } + + /** + * As the method name suggests: combine two rows into one wide row. + */ + private static BeamSqlRow combineTwoRowsIntoOneHelper(BeamSqlRow leftRow, + BeamSqlRow rightRow) { + // build the type + List<String> names = new ArrayList<>(leftRow.size() + rightRow.size()); + names.addAll(leftRow.getDataType().getFieldsName()); + names.addAll(rightRow.getDataType().getFieldsName()); + + List<Integer> types = new ArrayList<>(leftRow.size() + rightRow.size()); + types.addAll(leftRow.getDataType().getFieldsType()); + types.addAll(rightRow.getDataType().getFieldsType()); + BeamSqlRecordType type = BeamSqlRecordType.create(names, types); + + BeamSqlRow row = new BeamSqlRow(type); + // build the row + for (int i = 0; i < leftRow.size(); i++) { + row.addField(i, leftRow.getFieldValue(i)); + } + + for (int i = 0; i < rightRow.size(); i++) { + row.addField(i + leftRow.size(), rightRow.getFieldValue(i)); + } + + return row; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/928cec59/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java new file mode 100644 index 0000000..375027a --- /dev/null +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.transforms.DoFn; + +/** + * Test utilities. + */ +public class TestUtils { + + /** + * A {@code DoFn} to convert a {@code BeamSqlRow} to a comparable {@code String}. + */ + public static class BeamSqlRow2StringDoFn extends DoFn<BeamSqlRow, String> { + @ProcessElement + public void processElement(ProcessContext ctx) { + ctx.output(ctx.element().valueInString()); + } + } + + /** + * Convert list of {@code BeamSqlRow} to list of {@code String}. + */ + public static List<String> beamSqlRows2Strings(List<BeamSqlRow> rows) { + List<String> strs = new ArrayList<>(); + for (BeamSqlRow row : rows) { + strs.add(row.valueInString()); + } + + return strs; + } + + /** + * Convenient way to build a list of {@code BeamSqlRow}s. + * + * <p>You can use it like this: + * + * <pre>{@code + * TestUtils.RowsBuilder.of( + * Types.INTEGER, "order_id", + * Types.INTEGER, "sum_site_id", + * Types.VARCHAR, "buyer" + * ).values( + * 1, 3, "james", + * 2, 5, "bond" + * ).getStringRows() + * }</pre> + * {@code} + */ + public static class RowsBuilder { + private BeamSqlRecordType type; + private List<BeamSqlRow> rows = new ArrayList<>(); + + /** + * Create a RowsBuilder with the specified row type info. + * + * <p>Note: check the class javadoc for for detailed example. + * + * @args pairs of column type and column names. + */ + public static RowsBuilder of(final Object... args) { + List<Integer> types = new ArrayList<>(); + List<String> names = new ArrayList<>(); + int lastTypeIndex = 0; + for (; lastTypeIndex < args.length; lastTypeIndex += 2) { + types.add((int) args[lastTypeIndex]); + names.add((String) args[lastTypeIndex + 1]); + } + + BeamSqlRecordType beamSQLRecordType = BeamSqlRecordType.create(names, types); + RowsBuilder builder = new RowsBuilder(); + builder.type = beamSQLRecordType; + + return builder; + } + + /** + * Add values to the builder. + * + * <p>Note: check the class javadoc for for detailed example. + */ + public RowsBuilder values(final Object... args) { + int fieldCount = type.size(); + for (int i = 0; i < args.length; i += fieldCount) { + BeamSqlRow row = new BeamSqlRow(type); + for (int j = 0; j < fieldCount; j++) { + row.addField(j, args[i + j]); + } + this.rows.add(row); + } + + return this; + } + + public List<BeamSqlRow> getRows() { + return rows; + } + + public List<String> getStringRows() { + return beamSqlRows2Strings(rows); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/928cec59/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java index f651f6a..fa80cc1 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java @@ -21,7 +21,6 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; - import org.apache.beam.dsls.sql.schema.BaseBeamTable; import org.apache.beam.dsls.sql.schema.BeamIOType; import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; @@ -49,7 +48,6 @@ public class MockedBeamSqlTable extends BaseBeamTable { public static final ConcurrentLinkedQueue<BeamSqlRow> CONTENT = new ConcurrentLinkedQueue<>(); private List<BeamSqlRow> inputRecords; - public MockedBeamSqlTable(BeamSqlRecordType beamSqlRecordType) { super(beamSqlRecordType); } @@ -119,10 +117,11 @@ public class MockedBeamSqlTable extends BaseBeamTable { @Override public BeamIOType getSourceType() { - return BeamIOType.UNBOUNDED; + return BeamIOType.BOUNDED; } @Override + public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) { return PBegin.in(pipeline).apply( "MockedBeamSQLTable_Reader_" + COUNTER.incrementAndGet(), Create.of(inputRecords)); http://git-wip-us.apache.org/repos/asf/beam/blob/928cec59/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedTable.java new file mode 100644 index 0000000..d096a61 --- /dev/null +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedTable.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.planner; + +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.beam.dsls.sql.schema.BaseBeamTable; +import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; + +/** + * Base class for mocked table. + */ +public abstract class MockedTable extends BaseBeamTable { + public static final AtomicInteger COUNTER = new AtomicInteger(); + public MockedTable(BeamSqlRecordType beamSqlRecordType) { + super(beamSqlRecordType); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/928cec59/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedUnboundedTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedUnboundedTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedUnboundedTable.java new file mode 100644 index 0000000..3f22df3 --- /dev/null +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedUnboundedTable.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.planner; + +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.dsls.sql.schema.BeamIOType; +import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.testing.TestStream; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.calcite.util.Pair; +import org.joda.time.Duration; +import org.joda.time.Instant; + +/** + * A mocked unbounded table. + */ +public class MockedUnboundedTable extends MockedTable { + private List<Pair<Duration, List<BeamSqlRow>>> timestampedRows = new ArrayList<>(); + private int timestampField; + private MockedUnboundedTable(BeamSqlRecordType beamSqlRecordType) { + super(beamSqlRecordType); + } + + /** + * Convenient way to build a mocked table. + * + * <p>e.g. + * + * <pre>{@code + * MockedUnboundedTable + * .of(Types.BIGINT, "order_id", + * Types.INTEGER, "site_id", + * Types.DOUBLE, "price", + * Types.TIMESTAMP, "order_time") + * }</pre> + */ + public static MockedUnboundedTable of(final Object... args){ + List<Integer> types = new ArrayList<>(); + List<String> names = new ArrayList<>(); + int lastTypeIndex = 0; + for (; lastTypeIndex < args.length; lastTypeIndex += 2) { + types.add((int) args[lastTypeIndex]); + names.add((String) args[lastTypeIndex + 1]); + } + + return new MockedUnboundedTable( + BeamSqlRecordType.create(names, types) + ); + } + + public MockedUnboundedTable timestampColumnIndex(int idx) { + this.timestampField = idx; + return this; + } + + public MockedUnboundedTable addRows(Duration duration, Object... args) { + List<BeamSqlRow> rows = new ArrayList<>(); + int fieldCount = getRecordType().size(); + + for (int i = 0; i < args.length; i += fieldCount) { + BeamSqlRow row = new BeamSqlRow(getRecordType()); + for (int j = 0; j < fieldCount; j++) { + row.addField(j, args[i + j]); + } + rows.add(row); + } + + // record the watermark + rows + this.timestampedRows.add(Pair.of(duration, rows)); + return this; + } + + @Override public BeamIOType getSourceType() { + return BeamIOType.UNBOUNDED; + } + + @Override public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) { + TestStream.Builder<BeamSqlRow> values = TestStream.create( + new BeamSqlRowCoder(beamSqlRecordType)); + + for (Pair<Duration, List<BeamSqlRow>> pair : timestampedRows) { + values = values.advanceWatermarkTo(new Instant(0).plus(pair.getKey())); + for (int i = 0; i < pair.getValue().size(); i++) { + values = values.addElements(TimestampedValue.of(pair.getValue().get(i), + new Instant(pair.getValue().get(i).getDate(timestampField)))); + } + } + + return pipeline.begin().apply( + "MockedUnboundedTable_" + COUNTER.incrementAndGet(), + values.advanceWatermarkToInfinity()); + } + + @Override public PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter() { + throw new UnsupportedOperationException("MockedUnboundedTable#buildIOWriter unsupported!"); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/928cec59/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelBoundedVsBoundedTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelBoundedVsBoundedTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelBoundedVsBoundedTest.java new file mode 100644 index 0000000..505b742 --- /dev/null +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelBoundedVsBoundedTest.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.rel; + +import org.apache.beam.dsls.sql.BeamSqlCli; +import org.apache.beam.dsls.sql.BeamSqlEnv; +import org.apache.beam.dsls.sql.planner.MockedBeamSqlTable; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.PCollection; +import org.apache.calcite.sql.type.SqlTypeName; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +/** + * Bounded + Bounded Test for {@code BeamJoinRel}. + */ +public class BeamJoinRelBoundedVsBoundedTest { + @Rule + public final TestPipeline pipeline = TestPipeline.create(); + private static final BeamSqlEnv beamSqlEnv = new BeamSqlEnv(); + + @BeforeClass + public static void prepare() { + beamSqlEnv.registerTable("ORDER_DETAILS", + MockedBeamSqlTable + .of(SqlTypeName.INTEGER, "order_id", + SqlTypeName.INTEGER, "site_id", + SqlTypeName.INTEGER, "price", + + 1, 2, 3, + 2, 3, 3, + 3, 4, 5)); + + beamSqlEnv.registerTable("ORDER_DETAILS0", + MockedBeamSqlTable + .of(SqlTypeName.INTEGER, "order_id0", + SqlTypeName.INTEGER, "site_id0", + SqlTypeName.INTEGER, "price0", + + 1, 2, 3, + 2, 3, 3, + 3, 4, 5)); + + } + + @Test + public void testInnerJoin() throws Exception { + String sql = + "SELECT * " + + "FROM ORDER_DETAILS o1" + + " JOIN ORDER_DETAILS o2" + + " on " + + " o1.order_id=o2.site_id AND o2.price=o1.site_id" + ; + + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + PAssert.that(rows).containsInAnyOrder(MockedBeamSqlTable.of( + SqlTypeName.INTEGER, "order_id", + SqlTypeName.INTEGER, "site_id", + SqlTypeName.INTEGER, "price", + SqlTypeName.INTEGER, "order_id0", + SqlTypeName.INTEGER, "site_id0", + SqlTypeName.INTEGER, "price0", + + 2, 3, 3, 1, 2, 3 + ).getInputRecords()); + pipeline.run(); + } + + @Test + public void testLeftOuterJoin() throws Exception { + String sql = + "SELECT * " + + "FROM ORDER_DETAILS o1" + + " LEFT OUTER JOIN ORDER_DETAILS0 o2" + + " on " + + " o1.order_id=o2.site_id0 AND o2.price0=o1.site_id" + ; + + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + pipeline.enableAbandonedNodeEnforcement(false); + PAssert.that(rows).containsInAnyOrder(MockedBeamSqlTable.of( + SqlTypeName.INTEGER, "order_id", + SqlTypeName.INTEGER, "site_id", + SqlTypeName.INTEGER, "price", + SqlTypeName.INTEGER, "order_id0", + SqlTypeName.INTEGER, "site_id0", + SqlTypeName.INTEGER, "price0", + + 1, 2, 3, null, null, null, + 2, 3, 3, 1, 2, 3, + 3, 4, 5, null, null, null + ).getInputRecords()); + pipeline.run(); + } + + @Test + public void testRightOuterJoin() throws Exception { + String sql = + "SELECT * " + + "FROM ORDER_DETAILS o1" + + " RIGHT OUTER JOIN ORDER_DETAILS o2" + + " on " + + " o1.order_id=o2.site_id AND o2.price=o1.site_id" + ; + + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + PAssert.that(rows).containsInAnyOrder(MockedBeamSqlTable.of( + SqlTypeName.INTEGER, "order_id", + SqlTypeName.INTEGER, "site_id", + SqlTypeName.INTEGER, "price", + SqlTypeName.INTEGER, "order_id0", + SqlTypeName.INTEGER, "site_id0", + SqlTypeName.INTEGER, "price0", + + 2, 3, 3, 1, 2, 3, + null, null, null, 2, 3, 3, + null, null, null, 3, 4, 5 + ).getInputRecords()); + pipeline.run(); + } + + @Test + public void testFullOuterJoin() throws Exception { + String sql = + "SELECT * " + + "FROM ORDER_DETAILS o1" + + " FULL OUTER JOIN ORDER_DETAILS o2" + + " on " + + " o1.order_id=o2.site_id AND o2.price=o1.site_id" + ; + + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + PAssert.that(rows).containsInAnyOrder(MockedBeamSqlTable.of( + SqlTypeName.INTEGER, "order_id", + SqlTypeName.INTEGER, "site_id", + SqlTypeName.INTEGER, "price", + SqlTypeName.INTEGER, "order_id0", + SqlTypeName.INTEGER, "site_id0", + SqlTypeName.INTEGER, "price0", + + 2, 3, 3, 1, 2, 3, + 1, 2, 3, null, null, null, + 3, 4, 5, null, null, null, + null, null, null, 2, 3, 3, + null, null, null, 3, 4, 5 + ).getInputRecords()); + pipeline.run(); + } + + @Test(expected = UnsupportedOperationException.class) + public void testException_nonEqualJoin() throws Exception { + String sql = + "SELECT * " + + "FROM ORDER_DETAILS o1" + + " JOIN ORDER_DETAILS o2" + + " on " + + " o1.order_id>o2.site_id" + ; + + pipeline.enableAbandonedNodeEnforcement(false); + BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + pipeline.run(); + } + + @Test(expected = UnsupportedOperationException.class) + public void testException_crossJoin() throws Exception { + String sql = + "SELECT * " + + "FROM ORDER_DETAILS o1, ORDER_DETAILS o2"; + + pipeline.enableAbandonedNodeEnforcement(false); + BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + pipeline.run(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/928cec59/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java new file mode 100644 index 0000000..2ddb00b --- /dev/null +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.rel; + +import java.sql.Types; +import java.util.Date; +import org.apache.beam.dsls.sql.BeamSqlCli; +import org.apache.beam.dsls.sql.BeamSqlEnv; +import org.apache.beam.dsls.sql.TestUtils; +import org.apache.beam.dsls.sql.planner.MockedBeamSqlTable; +import org.apache.beam.dsls.sql.planner.MockedUnboundedTable; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.dsls.sql.transform.BeamSqlOutputToConsoleFn; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.calcite.sql.type.SqlTypeName; +import org.joda.time.Duration; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +/** + * Unbounded + Unbounded Test for {@code BeamJoinRel}. + */ +public class BeamJoinRelUnboundedVsBoundedTest { + @Rule + public final TestPipeline pipeline = TestPipeline.create(); + private static final BeamSqlEnv beamSqlEnv = new BeamSqlEnv(); + public static final Date FIRST_DATE = new Date(1); + public static final Date SECOND_DATE = new Date(1 + 3600 * 1000); + public static final Date THIRD_DATE = new Date(1 + 3600 * 1000 + 3600 * 1000 + 1); + private static final Duration WINDOW_SIZE = Duration.standardHours(1); + + @BeforeClass + public static void prepare() { + beamSqlEnv.registerTable("ORDER_DETAILS", MockedUnboundedTable + .of( + Types.INTEGER, "order_id", + Types.INTEGER, "site_id", + Types.INTEGER, "price", + Types.TIMESTAMP, "order_time" + ) + .timestampColumnIndex(3) + .addRows( + Duration.ZERO, + 1, 1, 1, FIRST_DATE, + 1, 2, 2, FIRST_DATE + ) + .addRows( + WINDOW_SIZE.plus(Duration.standardSeconds(1)), + 2, 2, 3, SECOND_DATE, + 2, 3, 3, SECOND_DATE, + // this late data is omitted + 1, 2, 3, FIRST_DATE + ) + .addRows( + WINDOW_SIZE.plus(WINDOW_SIZE).plus(Duration.standardSeconds(1)), + 3, 3, 3, THIRD_DATE, + // this late data is omitted + 2, 2, 3, SECOND_DATE + ) + ); + + beamSqlEnv.registerTable("ORDER_DETAILS1", MockedBeamSqlTable + .of(SqlTypeName.INTEGER, "order_id", + SqlTypeName.VARCHAR, "buyer", + + 1, "james", + 2, "bond" + )); + } + + @Test + public void testInnerJoin_unboundedTableOnTheLeftSide() throws Exception { + String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM " + + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " + + " JOIN " + + " ORDER_DETAILS1 o2 " + + " on " + + " o1.order_id=o2.order_id" + ; + + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) + .containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.INTEGER, "order_id", + Types.INTEGER, "sum_site_id", + Types.VARCHAR, "buyer" + ).values( + 1, 3, "james", + 2, 5, "bond" + ).getStringRows() + ); + pipeline.run(); + } + + @Test + public void testInnerJoin_boundedTableOnTheLeftSide() throws Exception { + String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM " + + " ORDER_DETAILS1 o2 " + + " JOIN " + + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " + + " on " + + " o1.order_id=o2.order_id" + ; + + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) + .containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.INTEGER, "order_id", + Types.INTEGER, "sum_site_id", + Types.VARCHAR, "buyer" + ).values( + 1, 3, "james", + 2, 5, "bond" + ).getStringRows() + ); + pipeline.run(); + } + + @Test + public void testLeftOuterJoin() throws Exception { + String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM " + + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " + + " LEFT OUTER JOIN " + + " ORDER_DETAILS1 o2 " + + " on " + + " o1.order_id=o2.order_id" + ; + + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + rows.apply(ParDo.of(new BeamSqlOutputToConsoleFn("helloworld"))); + PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) + .containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.INTEGER, "order_id", + Types.INTEGER, "sum_site_id", + Types.VARCHAR, "buyer" + ).values( + 1, 3, "james", + 2, 5, "bond", + 3, 3, null + ).getStringRows() + ); + pipeline.run(); + } + + @Test(expected = UnsupportedOperationException.class) + public void testLeftOuterJoinError() throws Exception { + String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM " + + " ORDER_DETAILS1 o2 " + + " LEFT OUTER JOIN " + + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " + + " on " + + " o1.order_id=o2.order_id" + ; + pipeline.enableAbandonedNodeEnforcement(false); + BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + pipeline.run(); + } + + @Test + public void testRightOuterJoin() throws Exception { + String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM " + + " ORDER_DETAILS1 o2 " + + " RIGHT OUTER JOIN " + + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " + + " on " + + " o1.order_id=o2.order_id" + ; + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) + .containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.INTEGER, "order_id", + Types.INTEGER, "sum_site_id", + Types.VARCHAR, "buyer" + ).values( + 1, 3, "james", + 2, 5, "bond", + 3, 3, null + ).getStringRows() + ); + pipeline.run(); + } + + @Test(expected = UnsupportedOperationException.class) + public void testRightOuterJoinError() throws Exception { + String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM " + + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " + + " RIGHT OUTER JOIN " + + " ORDER_DETAILS1 o2 " + + " on " + + " o1.order_id=o2.order_id" + ; + + pipeline.enableAbandonedNodeEnforcement(false); + BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + pipeline.run(); + } + + @Test(expected = UnsupportedOperationException.class) + public void testFullOuterJoinError() throws Exception { + String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM " + + " ORDER_DETAILS1 o2 " + + " FULL OUTER JOIN " + + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " + + " on " + + " o1.order_id=o2.order_id" + ; + pipeline.enableAbandonedNodeEnforcement(false); + BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + pipeline.run(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/928cec59/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java new file mode 100644 index 0000000..18a5f60 --- /dev/null +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.rel; + +import java.sql.Types; +import java.util.Date; +import org.apache.beam.dsls.sql.BeamSqlCli; +import org.apache.beam.dsls.sql.BeamSqlEnv; +import org.apache.beam.dsls.sql.TestUtils; +import org.apache.beam.dsls.sql.planner.MockedUnboundedTable; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.dsls.sql.transform.BeamSqlOutputToConsoleFn; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.joda.time.Duration; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +/** + * Unbounded + Unbounded Test for {@code BeamJoinRel}. + */ +public class BeamJoinRelUnboundedVsUnboundedTest { + @Rule + public final TestPipeline pipeline = TestPipeline.create(); + private static final BeamSqlEnv beamSqlEnv = new BeamSqlEnv(); + public static final Date FIRST_DATE = new Date(1); + public static final Date SECOND_DATE = new Date(1 + 3600 * 1000); + + private static final Duration WINDOW_SIZE = Duration.standardHours(1); + + @BeforeClass + public static void prepare() { + beamSqlEnv.registerTable("ORDER_DETAILS", MockedUnboundedTable + .of(Types.INTEGER, "order_id", + Types.INTEGER, "site_id", + Types.INTEGER, "price", + Types.TIMESTAMP, "order_time" + ) + .timestampColumnIndex(3) + .addRows( + Duration.ZERO, + 1, 1, 1, FIRST_DATE, + 1, 2, 6, FIRST_DATE + ) + .addRows( + WINDOW_SIZE.plus(Duration.standardMinutes(1)), + 2, 2, 7, SECOND_DATE, + 2, 3, 8, SECOND_DATE, + // this late record is omitted(First window) + 1, 3, 3, FIRST_DATE + ) + .addRows( + // this late record is omitted(Second window) + WINDOW_SIZE.plus(WINDOW_SIZE).plus(Duration.standardMinutes(1)), + 2, 3, 3, SECOND_DATE + ) + ); + } + + @Test + public void testInnerJoin() throws Exception { + String sql = "SELECT * FROM " + + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " + + " JOIN " + + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o2 " + + " on " + + " o1.order_id=o2.order_id" + ; + + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) + .containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.INTEGER, "order_id", + Types.INTEGER, "sum_site_id", + Types.INTEGER, "order_id0", + Types.INTEGER, "sum_site_id0").values( + 1, 3, 1, 3, + 2, 5, 2, 5 + ).getStringRows() + ); + pipeline.run(); + } + + @Test + public void testLeftOuterJoin() throws Exception { + String sql = "SELECT * FROM " + + "(select site_id as order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY site_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " + + " LEFT OUTER JOIN " + + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o2 " + + " on " + + " o1.order_id=o2.order_id" + ; + + // 1, 1 | 1, 3 + // 2, 2 | NULL, NULL + // ---- | ----- + // 2, 2 | 2, 5 + // 3, 3 | NULL, NULL + + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) + .containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.INTEGER, "order_id", + Types.INTEGER, "sum_site_id", + Types.INTEGER, "order_id0", + Types.INTEGER, "sum_site_id0" + ).values( + 1, 1, 1, 3, + 2, 2, null, null, + 2, 2, 2, 5, + 3, 3, null, null + ).getStringRows() + ); + pipeline.run(); + } + + @Test + public void testRightOuterJoin() throws Exception { + String sql = "SELECT * FROM " + + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " + + " RIGHT OUTER JOIN " + + "(select site_id as order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY site_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o2 " + + " on " + + " o1.order_id=o2.order_id" + ; + + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) + .containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.INTEGER, "order_id", + Types.INTEGER, "sum_site_id", + Types.INTEGER, "order_id0", + Types.INTEGER, "sum_site_id0" + ).values( + 1, 3, 1, 1, + null, null, 2, 2, + 2, 5, 2, 2, + null, null, 3, 3 + ).getStringRows() + ); + pipeline.run(); + } + + @Test + public void testFullOuterJoin() throws Exception { + String sql = "SELECT * FROM " + + "(select price as order_id1, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY price, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " + + " FULL OUTER JOIN " + + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY order_id , TUMBLE(order_time, INTERVAL '1' HOUR)) o2 " + + " on " + + " o1.order_id1=o2.order_id" + ; + + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + rows.apply(ParDo.of(new BeamSqlOutputToConsoleFn("hello"))); + PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) + .containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.INTEGER, "order_id1", + Types.INTEGER, "sum_site_id", + Types.INTEGER, "order_id", + Types.INTEGER, "sum_site_id0" + ).values( + 1, 1, 1, 3, + 6, 2, null, null, + 7, 2, null, null, + 8, 3, null, null, + null, null, 2, 5 + ).getStringRows() + ); + pipeline.run(); + } + + @Test(expected = IllegalArgumentException.class) + public void testWindowsMismatch() throws Exception { + String sql = "SELECT * FROM " + + "(select site_id as order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY site_id, TUMBLE(order_time, INTERVAL '2' HOUR)) o1 " + + " LEFT OUTER JOIN " + + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o2 " + + " on " + + " o1.order_id=o2.order_id" + ; + pipeline.enableAbandonedNodeEnforcement(false); + BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + pipeline.run(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/928cec59/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java index b358fe1..f8eaa51 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java @@ -59,7 +59,7 @@ public class BeamSqlRowCoderTest { BeamSqlRecordType beamSQLRecordType = CalciteUtils.toBeamRecordType( protoRowType.apply(new JavaTypeFactoryImpl( - RelDataTypeSystem.DEFAULT))); + RelDataTypeSystem.DEFAULT))); BeamSqlRow row = new BeamSqlRow(beamSQLRecordType); row.addField("col_tinyint", Byte.valueOf("1")); row.addField("col_smallint", Short.valueOf("1"));