This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 036c5492a9bfe7636d5e7d5eb664af5e77952707 Author: Timo Walther <twal...@apache.org> AuthorDate: Mon Jul 1 15:25:04 2019 +0200 [FLINK-13028][table-api-java] Merge flatten and call resolution rule --- .../expressions/resolver/ExpressionResolver.java | 3 +- .../resolver/rules/ResolveCallByArgumentsRule.java | 94 ++++++++++++------- .../resolver/rules/ResolveFlattenCallRule.java | 101 --------------------- .../expressions/resolver/rules/ResolverRules.java | 5 - .../flink/table/api/batch/table/CalcTest.scala | 1 + .../table/validation/CalcValidationTest.scala | 12 --- 6 files changed, 62 insertions(+), 154 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java index 4c1e62d..e4c07fa 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java @@ -90,8 +90,7 @@ public class ExpressionResolver { ResolverRules.OVER_WINDOWS, ResolverRules.FIELD_RESOLVE, ResolverRules.QUALIFY_BUILT_IN_FUNCTIONS, - ResolverRules.RESOLVE_CALL_BY_ARGUMENTS, - ResolverRules.FLATTEN_CALL); + ResolverRules.RESOLVE_CALL_BY_ARGUMENTS); } private static final VerifyResolutionVisitor VERIFY_RESOLUTION_VISITOR = new VerifyResolutionVisitor(); diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java index 73429f4..1f184c8 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java @@ -19,6 +19,8 @@ package org.apache.flink.table.expressions.resolver.rules; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.ObjectIdentifier; @@ -38,21 +40,26 @@ import org.apache.flink.table.types.inference.TypeInferenceUtil; import org.apache.flink.table.types.inference.TypeStrategies; import org.apache.flink.util.Preconditions; -import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.IntStream; +import static java.util.Collections.singletonList; +import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.valueLiteral; +import static org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo; +import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType; + /** * This rule checks if a {@link UnresolvedCallExpression} can work with the given arguments and infers * the output data type. All function calls are resolved {@link CallExpression} after applying this - * rule except for the special case of {@link BuiltInFunctionDefinitions#FLATTEN}. + * rule. + * + * <p>This rule also resolves {@code flatten()} calls on composite types. * * <p>If the call expects different types of arguments, but the given arguments have types that can * be casted, a {@link BuiltInFunctionDefinitions#CAST} expression is inserted. - * - * @see ResolveFlattenCallRule */ @Internal final class ResolveCallByArgumentsRule implements ResolverRule { @@ -60,39 +67,27 @@ final class ResolveCallByArgumentsRule implements ResolverRule { @Override public List<Expression> apply(List<Expression> expression, ResolutionContext context) { return expression.stream() - .map(expr -> expr.accept(new CallArgumentsCastingVisitor(context))) + .flatMap(expr -> expr.accept(new ResolvingCallVisitor(context)).stream()) .collect(Collectors.toList()); } - private class CallArgumentsCastingVisitor extends RuleExpressionVisitor<Expression> { + // -------------------------------------------------------------------------------------------- + + private class ResolvingCallVisitor extends RuleExpressionVisitor<List<ResolvedExpression>> { - CallArgumentsCastingVisitor(ResolutionContext context) { + ResolvingCallVisitor(ResolutionContext context) { super(context); } @Override - public Expression visit(UnresolvedCallExpression unresolvedCall) { + public List<ResolvedExpression> visit(UnresolvedCallExpression unresolvedCall) { final List<ResolvedExpression> resolvedArgs = unresolvedCall.getChildren().stream() - .map(c -> c.accept(this)) - .map(e -> { - // special case: FLATTEN - // a call chain `myFunc().flatten().flatten()` is not allowed - if (e instanceof UnresolvedCallExpression && - ((UnresolvedCallExpression) e).getFunctionDefinition() == BuiltInFunctionDefinitions.FLATTEN) { - throw new ValidationException("Consecutive flattening calls are not allowed."); - } - if (e instanceof ResolvedExpression) { - return (ResolvedExpression) e; - } - throw new TableException("Unexpected unresolved expression: " + e); - }) + .flatMap(c -> c.accept(this).stream()) .collect(Collectors.toList()); - // FLATTEN is a special case and the only call that remains unresolved after this rule - // it will be resolved by ResolveFlattenCallRule if (unresolvedCall.getFunctionDefinition() == BuiltInFunctionDefinitions.FLATTEN) { - return unresolvedCall.replaceArgs(new ArrayList<>(resolvedArgs)); + return executeFlatten(resolvedArgs); } if (unresolvedCall.getFunctionDefinition() instanceof BuiltInFunctionDefinition) { @@ -100,13 +95,49 @@ final class ResolveCallByArgumentsRule implements ResolverRule { (BuiltInFunctionDefinition) unresolvedCall.getFunctionDefinition(); if (definition.getTypeInference().getOutputTypeStrategy() != TypeStrategies.MISSING) { - return runTypeInference( - unresolvedCall, - definition.getTypeInference(), - resolvedArgs); + return Collections.singletonList( + runTypeInference( + unresolvedCall, + definition.getTypeInference(), + resolvedArgs)); } } - return runLegacyTypeInference(unresolvedCall, resolvedArgs); + return Collections.singletonList( + runLegacyTypeInference(unresolvedCall, resolvedArgs)); + } + + @Override + protected List<ResolvedExpression> defaultMethod(Expression expression) { + if (expression instanceof ResolvedExpression) { + return Collections.singletonList((ResolvedExpression) expression); + } + throw new TableException("Unexpected unresolved expression: " + expression); + } + + private List<ResolvedExpression> executeFlatten(List<ResolvedExpression> args) { + if (args.size() != 1) { + throw new ValidationException("Invalid number of arguments for flattening."); + } + final ResolvedExpression composite = args.get(0); + // TODO support the new type system with ROW and STRUCTURED_TYPE + final TypeInformation<?> resultType = fromDataTypeToLegacyInfo(composite.getOutputDataType()); + if (resultType instanceof CompositeType) { + return flattenCompositeType(composite, (CompositeType<?>) resultType); + } else { + return singletonList(composite); + } + } + + private List<ResolvedExpression> flattenCompositeType(ResolvedExpression composite, CompositeType<?> resultType) { + return IntStream.range(0, resultType.getArity()) + .mapToObj(idx -> + resolutionContext.postResolutionFactory() + .get( + composite, + valueLiteral(resultType.getFieldNames()[idx]), + fromLegacyInfoToDataType(resultType.getTypeAt(idx))) + ) + .collect(Collectors.toList()); } private ResolvedExpression runTypeInference( @@ -163,11 +194,6 @@ final class ResolveCallByArgumentsRule implements ResolverRule { }) .collect(Collectors.toList()); } - - @Override - protected Expression defaultMethod(Expression expression) { - return expression; - } } // -------------------------------------------------------------------------------------------- diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveFlattenCallRule.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveFlattenCallRule.java deleted file mode 100644 index 53d7750..0000000 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveFlattenCallRule.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.expressions.resolver.rules; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.CompositeType; -import org.apache.flink.table.api.TableException; -import org.apache.flink.table.expressions.Expression; -import org.apache.flink.table.expressions.ResolvedExpression; -import org.apache.flink.table.expressions.UnresolvedCallExpression; -import org.apache.flink.table.functions.BuiltInFunctionDefinitions; - -import java.util.List; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - -import static java.util.Collections.singletonList; -import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.valueLiteral; -import static org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo; -import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType; - -/** - * Replaces {@link BuiltInFunctionDefinitions#FLATTEN} with resolved calls to {@link BuiltInFunctionDefinitions#GET} - * for all fields of underlying field of complex type. - * - * @see ResolveCallByArgumentsRule - */ -@Internal -final class ResolveFlattenCallRule implements ResolverRule { - - @Override - public List<Expression> apply(List<Expression> expression, ResolutionContext context) { - return expression.stream() - .flatMap(expr -> expr.accept(new FlatteningCallVisitor(context)).stream()) - .collect(Collectors.toList()); - } - - private class FlatteningCallVisitor extends RuleExpressionVisitor<List<Expression>> { - - FlatteningCallVisitor(ResolutionContext context) { - super(context); - } - - @Override - public List<Expression> visit(UnresolvedCallExpression unresolvedCall) { - if (unresolvedCall.getFunctionDefinition() == BuiltInFunctionDefinitions.FLATTEN) { - return executeFlatten(unresolvedCall); - } - - return singletonList(unresolvedCall); - } - - private List<Expression> executeFlatten(UnresolvedCallExpression unresolvedCall) { - final Expression composite = unresolvedCall.getChildren().get(0); - if (!(composite instanceof ResolvedExpression)) { - throw new TableException("Resolved expression expected for flattening."); - } - final ResolvedExpression resolvedComposite = (ResolvedExpression) composite; - final TypeInformation<?> resultType = fromDataTypeToLegacyInfo(resolvedComposite.getOutputDataType()); - if (resultType instanceof CompositeType) { - return flattenCompositeType(resolvedComposite, (CompositeType<?>) resultType); - } else { - return singletonList(composite); - } - } - - private List<Expression> flattenCompositeType(ResolvedExpression resolvedComposite, CompositeType<?> resultType) { - return IntStream.range(0, resultType.getArity()) - .mapToObj(idx -> - resolutionContext.postResolutionFactory() - .get( - resolvedComposite, - valueLiteral(resultType.getFieldNames()[idx]), - fromLegacyInfoToDataType(resultType.getTypeAt(idx))) - ) - .collect(Collectors.toList()); - } - - @Override - protected List<Expression> defaultMethod(Expression expression) { - return singletonList(expression); - } - } -} diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRules.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRules.java index 915f6a5..671b1e1 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRules.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRules.java @@ -28,11 +28,6 @@ import org.apache.flink.table.expressions.UnresolvedReferenceExpression; public final class ResolverRules { /** - * Rule that resolves flatten call. See {@link ResolveFlattenCallRule} for details. - */ - public static final ResolverRule FLATTEN_CALL = new ResolveFlattenCallRule(); - - /** * Resolves {@link UnresolvedReferenceExpression}. See {@link ReferenceResolverRule} for details. */ public static final ResolverRule FIELD_RESOLVE = new ReferenceResolverRule(); diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/CalcTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/CalcTest.scala index 7f8bc81..b688d4a 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/CalcTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/CalcTest.scala @@ -20,6 +20,7 @@ package org.apache.flink.table.api.batch.table import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.scala.createTypeInformation +import org.apache.flink.table.api.DataTypes import org.apache.flink.table.api.batch.table.CalcTest.{MyHashCode, TestCaseClass, WC, giveMeCaseClass} import org.apache.flink.table.api.scala._ import org.apache.flink.table.functions.ScalarFunction diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/CalcValidationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/CalcValidationTest.scala index c4216e1..b9e9f21 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/CalcValidationTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/CalcValidationTest.scala @@ -161,16 +161,4 @@ class CalcValidationTest extends TableTestBase { util.addTable[(Int, Long, String)]("MyTable", 'int, 'long, 'string) .select('int, 'long.log as 'long, 'string) } - - @Test - def testConsecutiveFlattening(): Unit = { - expectedException.expect(classOf[ValidationException]) - expectedException.expectMessage("Consecutive flattening calls are not allowed.") - - val util = streamTestUtil() - util.addTable[(Long, Int)]( - "MyTable", - 'tuple) - .select('tuple.flatten().flatten()) - } }