Repository: beam Updated Branches: refs/heads/DSL_SQL c0171593b -> 315f266a6
[BEAM-2325] Support Set operator: intersect & except Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/81d699e4 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/81d699e4 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/81d699e4 Branch: refs/heads/DSL_SQL Commit: 81d699e4069856827bf33782c024671b48578bf4 Parents: c017159 Author: James Xu <xumingmi...@gmail.com> Authored: Fri May 19 21:47:10 2017 +0800 Committer: Jean-Baptiste Onofré <jbono...@apache.org> Committed: Tue Jun 13 14:13:58 2017 +0200 ---------------------------------------------------------------------- .../beam/dsls/sql/planner/BeamRuleSets.java | 7 +- .../beam/dsls/sql/rel/BeamAggregationRel.java | 15 ++- .../beam/dsls/sql/rel/BeamIntersectRel.java | 58 +++++++++ .../apache/beam/dsls/sql/rel/BeamMinusRel.java | 56 +++++++++ .../dsls/sql/rel/BeamSetOperatorRelBase.java | 99 +++++++++++++++ .../apache/beam/dsls/sql/rel/BeamUnionRel.java | 88 +++++++++++++ .../beam/dsls/sql/rule/BeamIntersectRule.java | 51 ++++++++ .../beam/dsls/sql/rule/BeamMinusRule.java | 51 ++++++++ .../beam/dsls/sql/rule/BeamUnionRule.java | 50 ++++++++ .../apache/beam/dsls/sql/schema/BeamSqlRow.java | 3 +- .../transform/BeamSetOperatorsTransforms.java | 113 +++++++++++++++++ .../dsls/sql/planner/MockedBeamSqlTable.java | 6 +- .../beam/dsls/sql/rel/BeamIntersectRelTest.java | 111 +++++++++++++++++ .../beam/dsls/sql/rel/BeamMinusRelTest.java | 110 +++++++++++++++++ .../sql/rel/BeamSetOperatorRelBaseTest.java | 122 +++++++++++++++++++ .../beam/dsls/sql/rel/BeamUnionRelTest.java | 99 +++++++++++++++ .../org/apache/beam/dsls/sql/rel/CheckSize.java | 41 +++++++ 17 files changed, 1069 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/81d699e4/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java index 1ad62bc..6c73558 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java @@ -19,6 +19,7 @@ package org.apache.beam.dsls.sql.planner; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; + import java.util.Iterator; import org.apache.beam.dsls.sql.rel.BeamRelNode; @@ -26,8 +27,11 @@ import org.apache.beam.dsls.sql.rule.BeamAggregationRule; import org.apache.beam.dsls.sql.rule.BeamFilterRule; import org.apache.beam.dsls.sql.rule.BeamIOSinkRule; import org.apache.beam.dsls.sql.rule.BeamIOSourceRule; +import org.apache.beam.dsls.sql.rule.BeamIntersectRule; +import org.apache.beam.dsls.sql.rule.BeamMinusRule; import org.apache.beam.dsls.sql.rule.BeamProjectRule; import org.apache.beam.dsls.sql.rule.BeamSortRule; +import org.apache.beam.dsls.sql.rule.BeamUnionRule; import org.apache.beam.dsls.sql.rule.BeamValuesRule; import org.apache.calcite.plan.RelOptRule; import org.apache.calcite.rel.RelNode; @@ -42,7 +46,8 @@ public class BeamRuleSets { private static final ImmutableSet<RelOptRule> calciteToBeamConversionRules = ImmutableSet .<RelOptRule>builder().add(BeamIOSourceRule.INSTANCE, BeamProjectRule.INSTANCE, BeamFilterRule.INSTANCE, BeamIOSinkRule.INSTANCE, - BeamAggregationRule.INSTANCE, BeamSortRule.INSTANCE, BeamValuesRule.INSTANCE) + BeamAggregationRule.INSTANCE, BeamSortRule.INSTANCE, BeamValuesRule.INSTANCE, + BeamIntersectRule.INSTANCE, BeamMinusRule.INSTANCE, BeamUnionRule.INSTANCE) .build(); public static RuleSet[] getRuleSets() { http://git-wip-us.apache.org/repos/asf/beam/blob/81d699e4/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java index c0d2783..9951536 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java @@ -18,6 +18,7 @@ package org.apache.beam.dsls.sql.rel; import java.util.List; + import org.apache.beam.dsls.sql.planner.BeamSqlRelUtils; import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlRow; @@ -79,37 +80,39 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode { PCollection<BeamSqlRow> upstream = BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections); if (windowFieldIdx != -1) { - upstream = upstream.apply("assignEventTimestamp", WithTimestamps + upstream = upstream.apply(stageName + "_assignEventTimestamp", WithTimestamps .<BeamSqlRow>of(new BeamAggregationTransforms.WindowTimestampFn(windowFieldIdx))) .setCoder(upstream.getCoder()); } - PCollection<BeamSqlRow> windowStream = upstream.apply("window", + PCollection<BeamSqlRow> windowStream = upstream.apply(stageName + "_window", Window.<BeamSqlRow>into(windowFn) .triggering(trigger) .withAllowedLateness(allowedLatence) .accumulatingFiredPanes()); BeamSqlRowCoder keyCoder = new BeamSqlRowCoder(exKeyFieldsSchema(input.getRowType())); - PCollection<KV<BeamSqlRow, BeamSqlRow>> exGroupByStream = windowStream.apply("exGroupBy", + PCollection<KV<BeamSqlRow, BeamSqlRow>> exGroupByStream = windowStream.apply( + stageName + "_exGroupBy", WithKeys .of(new BeamAggregationTransforms.AggregationGroupByKeyFn( windowFieldIdx, groupSet))) .setCoder(KvCoder.<BeamSqlRow, BeamSqlRow>of(keyCoder, upstream.getCoder())); PCollection<KV<BeamSqlRow, Iterable<BeamSqlRow>>> groupedStream = exGroupByStream - .apply("groupBy", GroupByKey.<BeamSqlRow, BeamSqlRow>create()) + .apply(stageName + "_groupBy", GroupByKey.<BeamSqlRow, BeamSqlRow>create()) .setCoder(KvCoder.<BeamSqlRow, Iterable<BeamSqlRow>>of(keyCoder, IterableCoder.<BeamSqlRow>of(upstream.getCoder()))); BeamSqlRowCoder aggCoder = new BeamSqlRowCoder(exAggFieldsSchema()); - PCollection<KV<BeamSqlRow, BeamSqlRow>> aggregatedStream = groupedStream.apply("aggregation", + PCollection<KV<BeamSqlRow, BeamSqlRow>> aggregatedStream = groupedStream.apply( + stageName + "_aggregation", Combine.<BeamSqlRow, BeamSqlRow, BeamSqlRow>groupedValues( new BeamAggregationTransforms.AggregationCombineFn(getAggCallList(), BeamSqlRecordType.from(input.getRowType())))) .setCoder(KvCoder.<BeamSqlRow, BeamSqlRow>of(keyCoder, aggCoder)); - PCollection<BeamSqlRow> mergedStream = aggregatedStream.apply("mergeRecord", + PCollection<BeamSqlRow> mergedStream = aggregatedStream.apply(stageName + "_mergeRecord", ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord( BeamSqlRecordType.from(getRowType()), getAggCallList()))); mergedStream.setCoder(new BeamSqlRowCoder(BeamSqlRecordType.from(getRowType()))); http://git-wip-us.apache.org/repos/asf/beam/blob/81d699e4/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java new file mode 100644 index 0000000..01e1c33 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.rel; + +import java.util.List; + +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Intersect; +import org.apache.calcite.rel.core.SetOp; + +/** + * {@code BeamRelNode} to replace a {@code Intersect} node. + * + * <p>This is used to combine two SELECT statements, but returns rows only from the + * first SELECT statement that are identical to a row in the second SELECT statement. + */ +public class BeamIntersectRel extends Intersect implements BeamRelNode { + private BeamSetOperatorRelBase delegate; + public BeamIntersectRel( + RelOptCluster cluster, + RelTraitSet traits, + List<RelNode> inputs, + boolean all) { + super(cluster, traits, inputs, all); + delegate = new BeamSetOperatorRelBase(this, + BeamSetOperatorRelBase.OpType.INTERSECT, inputs, all); + } + + @Override public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) { + return new BeamIntersectRel(getCluster(), traitSet, inputs, all); + } + + @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections) + throws Exception { + return delegate.buildBeamPipeline(inputPCollections); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/81d699e4/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java new file mode 100644 index 0000000..bee6c11 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.rel; + +import java.util.List; + +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Minus; +import org.apache.calcite.rel.core.SetOp; + +/** + * {@code BeamRelNode} to replace a {@code Minus} node. + * + * <p>Corresponds to the SQL {@code EXCEPT} operator. + */ +public class BeamMinusRel extends Minus implements BeamRelNode { + + private BeamSetOperatorRelBase delegate; + + public BeamMinusRel(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs, + boolean all) { + super(cluster, traits, inputs, all); + delegate = new BeamSetOperatorRelBase(this, + BeamSetOperatorRelBase.OpType.MINUS, inputs, all); + } + + @Override public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) { + return new BeamMinusRel(getCluster(), traitSet, inputs, all); + } + + @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections) + throws Exception { + return delegate.buildBeamPipeline(inputPCollections); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/81d699e4/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java new file mode 100644 index 0000000..271e98f --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.rel; + +import java.io.Serializable; +import java.util.List; + +import org.apache.beam.dsls.sql.planner.BeamSqlRelUtils; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.dsls.sql.transform.BeamSetOperatorsTransforms; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.join.CoGbkResult; +import org.apache.beam.sdk.transforms.join.CoGroupByKey; +import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.calcite.rel.RelNode; + +/** + * Delegate for Set operators: {@code BeamUnionRel}, {@code BeamIntersectRel} + * and {@code BeamMinusRel}. + */ +public class BeamSetOperatorRelBase { + /** + * Set operator type. + */ + public enum OpType implements Serializable { + UNION, + INTERSECT, + MINUS + } + + private BeamRelNode beamRelNode; + private List<RelNode> inputs; + private boolean all; + private OpType opType; + + public BeamSetOperatorRelBase(BeamRelNode beamRelNode, OpType opType, + List<RelNode> inputs, boolean all) { + this.beamRelNode = beamRelNode; + this.opType = opType; + this.inputs = inputs; + this.all = all; + } + + public PCollection<BeamSqlRow> buildBeamPipeline( + PCollectionTuple inputPCollections) throws Exception { + PCollection<BeamSqlRow> leftRows = BeamSqlRelUtils.getBeamRelInput(inputs.get(0)) + .buildBeamPipeline(inputPCollections); + PCollection<BeamSqlRow> rightRows = BeamSqlRelUtils.getBeamRelInput(inputs.get(1)) + .buildBeamPipeline(inputPCollections); + + WindowFn leftWindow = leftRows.getWindowingStrategy().getWindowFn(); + WindowFn rightWindow = rightRows.getWindowingStrategy().getWindowFn(); + if (!leftWindow.isCompatible(rightWindow)) { + throw new IllegalArgumentException( + "inputs of " + opType + " have different window strategy: " + + leftWindow + " VS " + rightWindow); + } + + final TupleTag<BeamSqlRow> leftTag = new TupleTag<>(); + final TupleTag<BeamSqlRow> rightTag = new TupleTag<>(); + + // co-group + String stageName = BeamSqlRelUtils.getStageName(beamRelNode); + PCollection<KV<BeamSqlRow, CoGbkResult>> coGbkResultCollection = KeyedPCollectionTuple + .of(leftTag, leftRows.apply( + stageName + "_CreateLeftIndex", MapElements.via( + new BeamSetOperatorsTransforms.BeamSqlRow2KvFn()))) + .and(rightTag, rightRows.apply( + stageName + "_CreateRightIndex", MapElements.via( + new BeamSetOperatorsTransforms.BeamSqlRow2KvFn()))) + .apply(CoGroupByKey.<BeamSqlRow>create()); + PCollection<BeamSqlRow> ret = coGbkResultCollection + .apply(ParDo.of(new BeamSetOperatorsTransforms.SetOperatorFilteringDoFn(leftTag, rightTag, + opType, all))); + return ret; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/81d699e4/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java new file mode 100644 index 0000000..63cf11a --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.rel; + +import java.util.List; + +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelInput; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.SetOp; +import org.apache.calcite.rel.core.Union; + +/** + * {@link BeamRelNode} to replace a {@link Union}. + * + * <p>{@code BeamUnionRel} needs the input of it have the same {@link WindowFn}. From the SQL + * perspective, two cases are supported: + * + * <p>1) Do not use {@code grouped window function}: + * + * <pre>{@code + * select * from person UNION select * from person + * }</pre> + * + * <p>2) Use the same {@code grouped window function}, with the same param: + * <pre>{@code + * select id, count(*) from person + * group by id, TUMBLE(order_time, INTERVAL '1' HOUR) + * UNION + * select * from person + * group by id, TUMBLE(order_time, INTERVAL '1' HOUR) + * }</pre> + * + * <p>Inputs with different group functions are NOT supported: + * <pre>{@code + * select id, count(*) from person + * group by id, TUMBLE(order_time, INTERVAL '1' HOUR) + * UNION + * select * from person + * group by id, TUMBLE(order_time, INTERVAL '2' HOUR) + * }</pre> + */ +public class BeamUnionRel extends Union implements BeamRelNode { + private BeamSetOperatorRelBase delegate; + public BeamUnionRel(RelOptCluster cluster, + RelTraitSet traits, + List<RelNode> inputs, + boolean all) { + super(cluster, traits, inputs, all); + this.delegate = new BeamSetOperatorRelBase(this, + BeamSetOperatorRelBase.OpType.UNION, + inputs, all); + } + + public BeamUnionRel(RelInput input) { + super(input); + } + + @Override public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) { + return new BeamUnionRel(getCluster(), traitSet, inputs, all); + } + + @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections) + throws Exception { + return delegate.buildBeamPipeline(inputPCollections); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/81d699e4/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIntersectRule.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIntersectRule.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIntersectRule.java new file mode 100644 index 0000000..70716c5 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIntersectRule.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.rule; + +import java.util.List; + +import org.apache.beam.dsls.sql.rel.BeamIntersectRel; +import org.apache.beam.dsls.sql.rel.BeamLogicalConvention; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Intersect; +import org.apache.calcite.rel.logical.LogicalIntersect; + +/** + * {@code ConverterRule} to replace {@code Intersect} with {@code BeamIntersectRel}. + */ +public class BeamIntersectRule extends ConverterRule { + public static final BeamIntersectRule INSTANCE = new BeamIntersectRule(); + private BeamIntersectRule() { + super(LogicalIntersect.class, Convention.NONE, + BeamLogicalConvention.INSTANCE, "BeamIntersectRule"); + } + + @Override public RelNode convert(RelNode rel) { + Intersect intersect = (Intersect) rel; + final List<RelNode> inputs = intersect.getInputs(); + return new BeamIntersectRel( + intersect.getCluster(), + intersect.getTraitSet().replace(BeamLogicalConvention.INSTANCE), + convertList(inputs, BeamLogicalConvention.INSTANCE), + intersect.all + ); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/81d699e4/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamMinusRule.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamMinusRule.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamMinusRule.java new file mode 100644 index 0000000..ca93c71 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamMinusRule.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.rule; + +import java.util.List; + +import org.apache.beam.dsls.sql.rel.BeamLogicalConvention; +import org.apache.beam.dsls.sql.rel.BeamMinusRel; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Minus; +import org.apache.calcite.rel.logical.LogicalMinus; + +/** + * {@code ConverterRule} to replace {@code Minus} with {@code BeamMinusRel}. + */ +public class BeamMinusRule extends ConverterRule { + public static final BeamMinusRule INSTANCE = new BeamMinusRule(); + private BeamMinusRule() { + super(LogicalMinus.class, Convention.NONE, + BeamLogicalConvention.INSTANCE, "BeamMinusRule"); + } + + @Override public RelNode convert(RelNode rel) { + Minus minus = (Minus) rel; + final List<RelNode> inputs = minus.getInputs(); + return new BeamMinusRel( + minus.getCluster(), + minus.getTraitSet().replace(BeamLogicalConvention.INSTANCE), + convertList(inputs, BeamLogicalConvention.INSTANCE), + minus.all + ); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/81d699e4/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamUnionRule.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamUnionRule.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamUnionRule.java new file mode 100644 index 0000000..b8430b9 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamUnionRule.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.rule; + +import org.apache.beam.dsls.sql.rel.BeamLogicalConvention; +import org.apache.beam.dsls.sql.rel.BeamUnionRel; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Union; +import org.apache.calcite.rel.logical.LogicalUnion; + +/** + * A {@code ConverterRule} to replace {@link org.apache.calcite.rel.core.Union} with + * {@link BeamUnionRule}. + */ +public class BeamUnionRule extends ConverterRule { + public static final BeamUnionRule INSTANCE = new BeamUnionRule(); + private BeamUnionRule() { + super(LogicalUnion.class, Convention.NONE, BeamLogicalConvention.INSTANCE, + "BeamUnionRule"); + } + + @Override public RelNode convert(RelNode rel) { + Union union = (Union) rel; + + return new BeamUnionRel( + union.getCluster(), + union.getTraitSet().replace(BeamLogicalConvention.INSTANCE), + convertList(union.getInputs(), BeamLogicalConvention.INSTANCE), + union.all + ); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/81d699e4/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java index f885aaf..a7e9f4b 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java @@ -32,11 +32,10 @@ import org.apache.calcite.sql.type.SqlTypeName; import org.joda.time.Instant; /** - * Repersent a generic ROW record in Beam SQL. + * Represent a generic ROW record in Beam SQL. * */ public class BeamSqlRow implements Serializable { - private List<Integer> nullFields = new ArrayList<>(); private List<Object> dataValues; private BeamSqlRecordType dataType; http://git-wip-us.apache.org/repos/asf/beam/blob/81d699e4/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSetOperatorsTransforms.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSetOperatorsTransforms.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSetOperatorsTransforms.java new file mode 100644 index 0000000..56b3e14 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSetOperatorsTransforms.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.transform; + +import java.util.Iterator; + +import org.apache.beam.dsls.sql.rel.BeamSetOperatorRelBase; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.join.CoGbkResult; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TupleTag; + +/** + * Collections of {@code PTransform} and {@code DoFn} used to perform Set operations. + */ +public abstract class BeamSetOperatorsTransforms { + /** + * Transform a {@code BeamSqlRow} to a {@code KV<BeamSqlRow, BeamSqlRow>}. + */ + public static class BeamSqlRow2KvFn extends + SimpleFunction<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>> { + @Override public KV<BeamSqlRow, BeamSqlRow> apply(BeamSqlRow input) { + return KV.of(input, input); + } + } + + /** + * Filter function used for Set operators. + */ + public static class SetOperatorFilteringDoFn extends + DoFn<KV<BeamSqlRow, CoGbkResult>, BeamSqlRow> { + private TupleTag<BeamSqlRow> leftTag; + private TupleTag<BeamSqlRow> rightTag; + private BeamSetOperatorRelBase.OpType opType; + // ALL? + private boolean all; + + public SetOperatorFilteringDoFn(TupleTag<BeamSqlRow> leftTag, TupleTag<BeamSqlRow> rightTag, + BeamSetOperatorRelBase.OpType opType, boolean all) { + this.leftTag = leftTag; + this.rightTag = rightTag; + this.opType = opType; + this.all = all; + } + + @ProcessElement public void processElement(ProcessContext ctx) { + CoGbkResult coGbkResult = ctx.element().getValue(); + Iterable<BeamSqlRow> leftRows = coGbkResult.getAll(leftTag); + Iterable<BeamSqlRow> rightRows = coGbkResult.getAll(rightTag); + switch (opType) { + case UNION: + if (all) { + // output both left & right + Iterator<BeamSqlRow> iter = leftRows.iterator(); + while (iter.hasNext()) { + ctx.output(iter.next()); + } + iter = rightRows.iterator(); + while (iter.hasNext()) { + ctx.output(iter.next()); + } + } else { + // only output the key + ctx.output(ctx.element().getKey()); + } + break; + case INTERSECT: + if (leftRows.iterator().hasNext() && rightRows.iterator().hasNext()) { + if (all) { + Iterator<BeamSqlRow> iter = leftRows.iterator(); + while (iter.hasNext()) { + ctx.output(iter.next()); + } + } else { + ctx.output(ctx.element().getKey()); + } + } + break; + case MINUS: + if (leftRows.iterator().hasNext() && !rightRows.iterator().hasNext()) { + Iterator<BeamSqlRow> iter = leftRows.iterator(); + if (all) { + // output all + while (iter.hasNext()) { + ctx.output(iter.next()); + } + } else { + // only output one + ctx.output(iter.next()); + } + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/81d699e4/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java index 2ff042d..185e95a 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java @@ -20,6 +20,7 @@ package org.apache.beam.dsls.sql.planner; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.beam.dsls.sql.schema.BaseBeamTable; import org.apache.beam.dsls.sql.schema.BeamIOType; @@ -43,7 +44,7 @@ import org.apache.calcite.sql.type.SqlTypeName; * */ public class MockedBeamSqlTable extends BaseBeamTable { - + public static final AtomicInteger COUNTER = new AtomicInteger(); public static final ConcurrentLinkedQueue<BeamSqlRow> CONTENT = new ConcurrentLinkedQueue<>(); private List<BeamSqlRow> inputRecords; @@ -122,7 +123,8 @@ public class MockedBeamSqlTable extends BaseBeamTable { @Override public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) { - return PBegin.in(pipeline).apply(Create.of(inputRecords)); + return PBegin.in(pipeline).apply( + "MockedBeamSQLTable_Reader_" + COUNTER.incrementAndGet(), Create.of(inputRecords)); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/81d699e4/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java new file mode 100644 index 0000000..02223c2 --- /dev/null +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.rel; + +import org.apache.beam.dsls.sql.BeamSqlCli; +import org.apache.beam.dsls.sql.BeamSqlEnv; +import org.apache.beam.dsls.sql.planner.MockedBeamSqlTable; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.PCollection; +import org.apache.calcite.sql.type.SqlTypeName; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +/** + * Test for {@code BeamIntersectRel}. + */ +public class BeamIntersectRelTest { + @Rule + public final TestPipeline pipeline = TestPipeline.create(); + private static MockedBeamSqlTable orderDetailsTable1 = MockedBeamSqlTable + .of(SqlTypeName.BIGINT, "order_id", + SqlTypeName.INTEGER, "site_id", + SqlTypeName.DOUBLE, "price", + 1L, 1, 1.0, + 1L, 1, 1.0, + 2L, 2, 2.0, + 4L, 4, 4.0 + ); + + private static MockedBeamSqlTable orderDetailsTable2 = MockedBeamSqlTable + .of(SqlTypeName.BIGINT, "order_id", + SqlTypeName.INTEGER, "site_id", + SqlTypeName.DOUBLE, "price", + 1L, 1, 1.0, + 2L, 2, 2.0, + 3L, 3, 3.0 + ); + + @BeforeClass + public static void setUp() { + BeamSqlEnv.registerTable("ORDER_DETAILS1", orderDetailsTable1); + BeamSqlEnv.registerTable("ORDER_DETAILS2", orderDetailsTable2); + } + + @Test + public void testIntersect() throws Exception { + String sql = ""; + sql += "SELECT order_id, site_id, price " + + "FROM ORDER_DETAILS1 " + + " INTERSECT " + + "SELECT order_id, site_id, price " + + "FROM ORDER_DETAILS2 "; + + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline); + PAssert.that(rows).containsInAnyOrder( + MockedBeamSqlTable.of( + SqlTypeName.BIGINT, "order_id", + SqlTypeName.INTEGER, "site_id", + SqlTypeName.DOUBLE, "price", + + 1L, 1, 1.0, + 2L, 2, 2.0 + ).getInputRecords()); + + pipeline.run().waitUntilFinish(); + } + + @Test + public void testIntersectAll() throws Exception { + String sql = ""; + sql += "SELECT order_id, site_id, price " + + "FROM ORDER_DETAILS1 " + + " INTERSECT ALL " + + "SELECT order_id, site_id, price " + + "FROM ORDER_DETAILS2 "; + + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline); + PAssert.that(rows).satisfies(new CheckSize(3)); + + PAssert.that(rows).containsInAnyOrder( + MockedBeamSqlTable.of( + SqlTypeName.BIGINT, "order_id", + SqlTypeName.INTEGER, "site_id", + SqlTypeName.DOUBLE, "price", + 1L, 1, 1.0, + 1L, 1, 1.0, + 2L, 2, 2.0 + ).getInputRecords()); + + pipeline.run(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/81d699e4/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java new file mode 100644 index 0000000..cd6ba16 --- /dev/null +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.rel; + +import org.apache.beam.dsls.sql.BeamSqlCli; +import org.apache.beam.dsls.sql.BeamSqlEnv; +import org.apache.beam.dsls.sql.planner.MockedBeamSqlTable; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.PCollection; +import org.apache.calcite.sql.type.SqlTypeName; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +/** + * Test for {@code BeamMinusRel}. + */ +public class BeamMinusRelTest { + @Rule + public final TestPipeline pipeline = TestPipeline.create(); + private MockedBeamSqlTable orderDetailsTable1 = MockedBeamSqlTable + .of(SqlTypeName.BIGINT, "order_id", + SqlTypeName.INTEGER, "site_id", + SqlTypeName.DOUBLE, "price", + 1L, 1, 1.0, + 1L, 1, 1.0, + 2L, 2, 2.0, + 4L, 4, 4.0, + 4L, 4, 4.0 + ); + + private MockedBeamSqlTable orderDetailsTable2 = MockedBeamSqlTable + .of(SqlTypeName.BIGINT, "order_id", + SqlTypeName.INTEGER, "site_id", + SqlTypeName.DOUBLE, "price", + 1L, 1, 1.0, + 2L, 2, 2.0, + 3L, 3, 3.0 + ); + + @Before + public void setUp() { + BeamSqlEnv.registerTable("ORDER_DETAILS1", orderDetailsTable1); + BeamSqlEnv.registerTable("ORDER_DETAILS2", orderDetailsTable2); + MockedBeamSqlTable.CONTENT.clear(); + } + + @Test + public void testExcept() throws Exception { + String sql = ""; + sql += "SELECT order_id, site_id, price " + + "FROM ORDER_DETAILS1 " + + " EXCEPT " + + "SELECT order_id, site_id, price " + + "FROM ORDER_DETAILS2 "; + + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline); + PAssert.that(rows).containsInAnyOrder( + MockedBeamSqlTable.of( + SqlTypeName.BIGINT, "order_id", + SqlTypeName.INTEGER, "site_id", + SqlTypeName.DOUBLE, "price", + 4L, 4, 4.0 + ).getInputRecords()); + + pipeline.run(); + } + + @Test + public void testExceptAll() throws Exception { + String sql = ""; + sql += "SELECT order_id, site_id, price " + + "FROM ORDER_DETAILS1 " + + " EXCEPT ALL " + + "SELECT order_id, site_id, price " + + "FROM ORDER_DETAILS2 "; + + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline); + PAssert.that(rows).satisfies(new CheckSize(2)); + + PAssert.that(rows).containsInAnyOrder( + MockedBeamSqlTable.of( + SqlTypeName.BIGINT, "order_id", + SqlTypeName.INTEGER, "site_id", + SqlTypeName.DOUBLE, "price", + 4L, 4, 4.0, + 4L, 4, 4.0 + ).getInputRecords()); + + pipeline.run(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/81d699e4/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java new file mode 100644 index 0000000..4936062 --- /dev/null +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.rel; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +import org.apache.beam.dsls.sql.BeamSqlCli; +import org.apache.beam.dsls.sql.BeamSqlEnv; +import org.apache.beam.dsls.sql.planner.MockedBeamSqlTable; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.calcite.sql.type.SqlTypeName; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +/** + * Test for {@code BeamSetOperatorRelBase}. + */ +public class BeamSetOperatorRelBaseTest { + @Rule + public final TestPipeline pipeline = TestPipeline.create(); + public static final Date THE_DATE = new Date(); + private static MockedBeamSqlTable orderDetailsTable = MockedBeamSqlTable + .of(SqlTypeName.BIGINT, "order_id", + SqlTypeName.INTEGER, "site_id", + SqlTypeName.DOUBLE, "price", + SqlTypeName.TIMESTAMP, "order_time", + + 1L, 1, 1.0, THE_DATE, + 2L, 2, 2.0, THE_DATE); + + @BeforeClass + public static void prepare() { + THE_DATE.setTime(100000); + BeamSqlEnv.registerTable("ORDER_DETAILS", orderDetailsTable); + } + + @Test + public void testSameWindow() throws Exception { + String sql = "SELECT " + + " order_id, site_id, count(*) as cnt " + + "FROM ORDER_DETAILS GROUP BY order_id, site_id" + + ", TUMBLE(order_time, INTERVAL '1' HOUR) " + + " UNION SELECT " + + " order_id, site_id, count(*) as cnt " + + "FROM ORDER_DETAILS GROUP BY order_id, site_id" + + ", TUMBLE(order_time, INTERVAL '1' HOUR) "; + + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline); + List<BeamSqlRow> expRows = + MockedBeamSqlTable.of( + SqlTypeName.BIGINT, "order_id", + SqlTypeName.INTEGER, "site_id", + SqlTypeName.BIGINT, "cnt", + + 1L, 1, 1L, + 2L, 2, 1L + ).getInputRecords(); + // compare valueInString to ignore the windowStart & windowEnd + PAssert.that(rows.apply(ParDo.of(new ToString()))).containsInAnyOrder(toString(expRows)); + pipeline.run(); + } + + @Test(expected = IllegalArgumentException.class) + public void testDifferentWindows() throws Exception { + String sql = "SELECT " + + " order_id, site_id, count(*) as cnt " + + "FROM ORDER_DETAILS GROUP BY order_id, site_id" + + ", TUMBLE(order_time, INTERVAL '1' HOUR) " + + " UNION SELECT " + + " order_id, site_id, count(*) as cnt " + + "FROM ORDER_DETAILS GROUP BY order_id, site_id" + + ", TUMBLE(order_time, INTERVAL '2' HOUR) "; + + // use a real pipeline rather than the TestPipeline because we are + // testing exceptions, the pipeline will not actually run. + Pipeline pipeline1 = Pipeline.create(PipelineOptionsFactory.create()); + BeamSqlCli.compilePipeline(sql, pipeline1); + pipeline.run(); + } + + static class ToString extends DoFn<BeamSqlRow, String> { + @ProcessElement + public void processElement(ProcessContext ctx) { + ctx.output(ctx.element().valueInString()); + } + } + + static List<String> toString (List<BeamSqlRow> rows) { + List<String> strs = new ArrayList<>(); + for (BeamSqlRow row : rows) { + strs.add(row.valueInString()); + } + + return strs; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/81d699e4/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java new file mode 100644 index 0000000..c2a0597 --- /dev/null +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.rel; + +import org.apache.beam.dsls.sql.BeamSqlCli; +import org.apache.beam.dsls.sql.BeamSqlEnv; +import org.apache.beam.dsls.sql.planner.MockedBeamSqlTable; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.PCollection; +import org.apache.calcite.sql.type.SqlTypeName; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +/** + * Test for {@code BeamUnionRel}. + */ +public class BeamUnionRelTest { + @Rule + public final TestPipeline pipeline = TestPipeline.create(); + private static MockedBeamSqlTable orderDetailsTable = MockedBeamSqlTable + .of(SqlTypeName.BIGINT, "order_id", + SqlTypeName.INTEGER, "site_id", + SqlTypeName.DOUBLE, "price", + + 1L, 1, 1.0, + 2L, 2, 2.0); + + @BeforeClass + public static void prepare() { + BeamSqlEnv.registerTable("ORDER_DETAILS", orderDetailsTable); + } + + @Test + public void testUnion() throws Exception { + String sql = "SELECT " + + " order_id, site_id, price " + + "FROM ORDER_DETAILS " + + " UNION SELECT " + + " order_id, site_id, price " + + "FROM ORDER_DETAILS "; + + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline); + PAssert.that(rows).containsInAnyOrder( + MockedBeamSqlTable.of( + SqlTypeName.BIGINT, "order_id", + SqlTypeName.INTEGER, "site_id", + SqlTypeName.DOUBLE, "price", + + 1L, 1, 1.0, + 2L, 2, 2.0 + ).getInputRecords() + ); + pipeline.run(); + } + + @Test + public void testUnionAll() throws Exception { + String sql = "SELECT " + + " order_id, site_id, price " + + "FROM ORDER_DETAILS" + + " UNION ALL " + + " SELECT order_id, site_id, price " + + "FROM ORDER_DETAILS"; + + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline); + PAssert.that(rows).containsInAnyOrder( + MockedBeamSqlTable.of( + SqlTypeName.BIGINT, "order_id", + SqlTypeName.INTEGER, "site_id", + SqlTypeName.DOUBLE, "price", + + 1L, 1, 1.0, + 1L, 1, 1.0, + 2L, 2, 2.0, + 2L, 2, 2.0 + ).getInputRecords() + ); + pipeline.run(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/81d699e4/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/CheckSize.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/CheckSize.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/CheckSize.java new file mode 100644 index 0000000..ce532df --- /dev/null +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/CheckSize.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.rel; + +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.junit.Assert; + +/** + * Utility class to check size of BeamSQLRow iterable. + */ +public class CheckSize implements SerializableFunction<Iterable<BeamSqlRow>, Void> { + private int size; + public CheckSize(int size) { + this.size = size; + } + @Override public Void apply(Iterable<BeamSqlRow> input) { + int count = 0; + for (BeamSqlRow row : input) { + count++; + } + Assert.assertEquals(size, count); + return null; + } +}