This is an automated email from the ASF dual-hosted git repository. amaliujia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new b130aa1 [BEAM-9929] Support UNNEST(array_column) in ZetaSQL. new 46f42d8 Merge pull request #11636 from amaliujia/rw-support_unnest_column b130aa1 is described below commit b130aa15cb6d090603793c5429cb6a9c651c0b61 Author: amaliujia <amaliu...@163.com> AuthorDate: Thu May 7 17:18:34 2020 -0700 [BEAM-9929] Support UNNEST(array_column) in ZetaSQL. --- .../translation/ArrayScanColumnRefToUncollect.java | 90 ++++++++++++++++++++++ ...a => ArrayScanLiteralToUncollectConverter.java} | 4 +- .../translation/QueryStatementConverter.java | 3 +- .../sql/zetasql/ZetaSQLDialectSpecTest.java | 20 +++++ 4 files changed, 114 insertions(+), 3 deletions(-) diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ArrayScanColumnRefToUncollect.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ArrayScanColumnRefToUncollect.java new file mode 100644 index 0000000..0a02a4a --- /dev/null +++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ArrayScanColumnRefToUncollect.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.zetasql.translation; + +import com.google.zetasql.resolvedast.ResolvedNode; +import com.google.zetasql.resolvedast.ResolvedNodes; +import java.util.Collections; +import java.util.List; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.CorrelationId; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.JoinRelType; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Uncollect; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.logical.LogicalCorrelate; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.logical.LogicalProject; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexInputRef; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.util.ImmutableBitSet; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; + +/** Converts array scan that represents a reference to array column literal to uncollect. */ +public class ArrayScanColumnRefToUncollect extends RelConverter<ResolvedNodes.ResolvedArrayScan> { + ArrayScanColumnRefToUncollect(ConversionContext context) { + super(context); + } + + @Override + public boolean canConvert(ResolvedNodes.ResolvedArrayScan zetaNode) { + return zetaNode.getInputScan() != null + && zetaNode.getArrayExpr() instanceof ResolvedNodes.ResolvedColumnRef + && zetaNode.getJoinExpr() == null; + } + + @Override + public List<ResolvedNode> getInputs(ResolvedNodes.ResolvedArrayScan zetaNode) { + return ImmutableList.of(zetaNode.getInputScan()); + } + + @Override + public RelNode convert(ResolvedNodes.ResolvedArrayScan zetaNode, List<RelNode> inputs) { + assert inputs.size() == 1; + RelNode input = inputs.get(0); + RexInputRef columnRef = + (RexInputRef) + getExpressionConverter() + .convertRexNodeFromResolvedExpr( + zetaNode.getArrayExpr(), + zetaNode.getInputScan().getColumnList(), + input.getRowType().getFieldList()); + + String fieldName = + String.format( + "%s%s", + zetaNode.getElementColumn().getTableName(), zetaNode.getElementColumn().getName()); + CorrelationId correlationId = new CorrelationId(0); + RelNode projectNode = + LogicalProject.create( + createOneRow(getCluster()), + Collections.singletonList( + getCluster() + .getRexBuilder() + .makeFieldAccess( + getCluster().getRexBuilder().makeCorrel(input.getRowType(), correlationId), + columnRef.getIndex())), + ImmutableList.of(fieldName)); + + boolean ordinality = (zetaNode.getArrayOffsetColumn() != null); + RelNode uncollect = Uncollect.create(projectNode.getTraitSet(), projectNode, ordinality); + + return LogicalCorrelate.create( + input, + uncollect, + correlationId, + ImmutableBitSet.of(columnRef.getIndex()), + JoinRelType.INNER); + } +} diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ArrayScanToUncollectConverter.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ArrayScanLiteralToUncollectConverter.java similarity index 94% rename from sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ArrayScanToUncollectConverter.java rename to sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ArrayScanLiteralToUncollectConverter.java index 87d777ff..e138f60 100644 --- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ArrayScanToUncollectConverter.java +++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ArrayScanLiteralToUncollectConverter.java @@ -27,9 +27,9 @@ import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexNode; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; /** Converts array scan that represents an array literal to uncollect. */ -class ArrayScanToUncollectConverter extends RelConverter<ResolvedArrayScan> { +class ArrayScanLiteralToUncollectConverter extends RelConverter<ResolvedArrayScan> { - ArrayScanToUncollectConverter(ConversionContext context) { + ArrayScanLiteralToUncollectConverter(ConversionContext context) { super(context); } diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/QueryStatementConverter.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/QueryStatementConverter.java index 5513482..389eac9 100644 --- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/QueryStatementConverter.java +++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/QueryStatementConverter.java @@ -57,7 +57,8 @@ public class QueryStatementConverter extends RelConverter<ResolvedQueryStmt> { ImmutableMultimap.<ResolvedNodeKind, RelConverter>builder() .put(RESOLVED_AGGREGATE_SCAN, new AggregateScanConverter(context)) .put(RESOLVED_ARRAY_SCAN, new ArrayScanToJoinConverter(context)) - .put(RESOLVED_ARRAY_SCAN, new ArrayScanToUncollectConverter(context)) + .put(RESOLVED_ARRAY_SCAN, new ArrayScanLiteralToUncollectConverter(context)) + .put(RESOLVED_ARRAY_SCAN, new ArrayScanColumnRefToUncollect(context)) .put(RESOLVED_FILTER_SCAN, new FilterScanConverter(context)) .put(RESOLVED_JOIN_SCAN, new JoinScanConverter(context)) .put(RESOLVED_JOIN_SCAN, new JoinScanWithRefConverter(context)) diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java index d02421b..5c382a4 100644 --- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java +++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java @@ -3037,6 +3037,26 @@ public class ZetaSQLDialectSpecTest { } @Test + public void testUnnestArrayColumn() { + String sql = + "SELECT p FROM table_with_array_for_unnest, UNNEST(table_with_array_for_unnest.int_array_col) as p"; + + ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); + BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); + PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + + Schema schema = Schema.builder().addInt64Field("int_field").build(); + PAssert.that(stream) + .containsInAnyOrder( + Row.withSchema(schema).addValue(14L).build(), + Row.withSchema(schema).addValue(18L).build(), + Row.withSchema(schema).addValue(22L).build(), + Row.withSchema(schema).addValue(24L).build()); + + pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); + } + + @Test @Ignore("Seeing exception in Beam, need further investigation on the cause of this failed query.") public void testNamedUNNESTJoin() { String sql =