This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 272ac71328aac0583df859bf197e30afcec0981f Author: Timo Walther <twal...@apache.org> AuthorDate: Mon Jul 1 08:25:05 2019 +0200 [FLINK-13028][table-api-java] Remove planner expression from ExpressionResolver --- .../table/expressions/ExpressionResolver.java | 98 +++------------------- .../expressions/rules/OverWindowResolverRule.java | 50 ++++++++++- .../rules/ResolveCallByArgumentsRule.java | 26 ++++-- ...enCallRule.java => ResolveFlattenCallRule.java} | 39 +++++---- .../table/expressions/rules/ResolverRule.java | 6 -- .../table/expressions/rules/ResolverRules.java | 4 +- .../table/validation/CalcValidationTest.scala | 12 +++ 7 files changed, 119 insertions(+), 116 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java index 394180b..015751c 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java @@ -19,14 +19,9 @@ package org.apache.flink.table.expressions; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.table.api.GroupWindow; import org.apache.flink.table.api.OverWindow; -import org.apache.flink.table.api.SessionWithGapOnTimeWithAlias; -import org.apache.flink.table.api.SlideWithSizeAndSlideOnTimeWithAlias; import org.apache.flink.table.api.TableException; -import org.apache.flink.table.api.TumbleWithSizeOnTimeWithAlias; -import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.FunctionLookup; import org.apache.flink.table.expressions.lookups.FieldReferenceLookup; import org.apache.flink.table.expressions.lookups.TableReferenceLookup; @@ -36,15 +31,9 @@ import org.apache.flink.table.functions.BuiltInFunctionDefinition; import org.apache.flink.table.functions.BuiltInFunctionDefinitions; import org.apache.flink.table.operations.QueryOperation; import org.apache.flink.table.plan.logical.LogicalOverWindow; -import org.apache.flink.table.plan.logical.LogicalWindow; -import org.apache.flink.table.plan.logical.SessionGroupWindow; -import org.apache.flink.table.plan.logical.SlidingGroupWindow; -import org.apache.flink.table.plan.logical.TumblingGroupWindow; import org.apache.flink.table.types.DataType; import org.apache.flink.util.Preconditions; -import javax.annotation.Nullable; - import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -54,11 +43,8 @@ import java.util.Optional; import java.util.function.Function; import java.util.stream.Collectors; -import scala.Some; - import static org.apache.flink.table.expressions.ApiExpressionUtils.typeLiteral; import static org.apache.flink.table.expressions.ApiExpressionUtils.valueLiteral; -import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType; /** * Tries to resolve all unresolved expressions such as {@link UnresolvedReferenceExpression} @@ -97,15 +83,13 @@ public class ExpressionResolver { ResolverRules.EXPAND_COLUMN_FUNCTIONS, ResolverRules.OVER_WINDOWS, ResolverRules.FIELD_RESOLVE, - ResolverRules.FLATTEN_CALL, ResolverRules.QUALIFY_BUILT_IN_FUNCTIONS, - ResolverRules.RESOLVE_CALL_BY_ARGUMENTS); + ResolverRules.RESOLVE_CALL_BY_ARGUMENTS, + ResolverRules.FLATTEN_CALL); } private static final VerifyResolutionVisitor VERIFY_RESOLUTION_VISITOR = new VerifyResolutionVisitor(); - private final PlannerExpressionConverter bridgeConverter = PlannerExpressionConverter.INSTANCE(); - private final FieldReferenceLookup fieldLookup; private final TableReferenceLookup tableLookup; @@ -187,54 +171,6 @@ public class ExpressionResolver { } /** - * Converts an API class to a logical window for planning with expressions already resolved. - * - * @param window window to resolve - * @return logical window with expressions resolved - */ - public LogicalWindow resolveGroupWindow(GroupWindow window) { - Expression alias = window.getAlias(); - - if (!(alias instanceof UnresolvedReferenceExpression)) { - throw new ValidationException("Alias of group window should be an UnresolvedFieldReference"); - } - - final String windowName = ((UnresolvedReferenceExpression) alias).getName(); - List<Expression> resolvedTimeFieldExpression = - prepareExpressions(Collections.singletonList(window.getTimeField())); - if (resolvedTimeFieldExpression.size() != 1) { - throw new ValidationException("Group Window only supports a single time field column."); - } - PlannerExpression timeField = resolvedTimeFieldExpression.get(0).accept(bridgeConverter); - - //TODO replace with LocalReferenceExpression - WindowReference resolvedAlias = new WindowReference(windowName, new Some<>(timeField.resultType())); - - if (window instanceof TumbleWithSizeOnTimeWithAlias) { - TumbleWithSizeOnTimeWithAlias tw = (TumbleWithSizeOnTimeWithAlias) window; - return new TumblingGroupWindow( - resolvedAlias, - timeField, - resolveFieldsInSingleExpression(tw.getSize()).accept(bridgeConverter)); - } else if (window instanceof SlideWithSizeAndSlideOnTimeWithAlias) { - SlideWithSizeAndSlideOnTimeWithAlias sw = (SlideWithSizeAndSlideOnTimeWithAlias) window; - return new SlidingGroupWindow( - resolvedAlias, - timeField, - resolveFieldsInSingleExpression(sw.getSize()).accept(bridgeConverter), - resolveFieldsInSingleExpression(sw.getSlide()).accept(bridgeConverter)); - } else if (window instanceof SessionWithGapOnTimeWithAlias) { - SessionWithGapOnTimeWithAlias sw = (SessionWithGapOnTimeWithAlias) window; - return new SessionGroupWindow( - resolvedAlias, - timeField, - resolveFieldsInSingleExpression(sw.getGap()).accept(bridgeConverter)); - } else { - throw new TableException("Unknown window type"); - } - } - - /** * Enables the creation of resolved expressions for transformations after the actual resolution. */ public PostResolverFactory postResolverFactory() { @@ -251,20 +187,6 @@ public class ExpressionResolver { ); } - private void prepareLocalReferencesFromGroupWindows(@Nullable GroupWindow groupWindow) { - if (groupWindow != null) { - String windowName = ((UnresolvedReferenceExpression) groupWindow.getAlias()).getName(); - TypeInformation<?> windowType = - prepareExpressions(Collections.singletonList(groupWindow.getTimeField())).get(0) - .accept(bridgeConverter) - .resultType(); - - localReferences.put( - windowName, - new LocalReferenceExpression(windowName, fromLegacyInfoToDataType(windowType))); - } - } - private Map<Expression, LogicalOverWindow> prepareOverWindows(List<OverWindow> overWindows) { return overWindows.stream() .map(this::resolveOverWindow) @@ -343,11 +265,6 @@ public class ExpressionResolver { public Optional<LogicalOverWindow> getOverWindow(Expression alias) { return Optional.ofNullable(overWindows.get(alias)); } - - @Override - public PlannerExpression bridge(Expression expression) { - return expression.accept(bridgeConverter); - } } private LogicalOverWindow resolveOverWindow(OverWindow overWindow) { @@ -401,6 +318,17 @@ public class ExpressionResolver { Collections.singletonList(expression), expression.getOutputDataType()); // the output type is equal to the input type } + + public CallExpression get(ResolvedExpression composite, ValueLiteralExpression key, DataType dataType) { + final FunctionLookup.Result lookupOfGet = functionLookup + .lookupBuiltInFunction(BuiltInFunctionDefinitions.GET); + + return new CallExpression( + lookupOfGet.getObjectIdentifier(), + lookupOfGet.getFunctionDefinition(), + Arrays.asList(composite, key), + dataType); + } } /** diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/OverWindowResolverRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/OverWindowResolverRule.java index 92fc177..fcec1ac 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/OverWindowResolverRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/OverWindowResolverRule.java @@ -19,13 +19,15 @@ package org.apache.flink.table.expressions.rules; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.expressions.ApiExpressionDefaultVisitor; import org.apache.flink.table.expressions.Expression; -import org.apache.flink.table.expressions.PlannerExpression; import org.apache.flink.table.expressions.UnresolvedCallExpression; +import org.apache.flink.table.expressions.ValueLiteralExpression; import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.FunctionDefinition; import org.apache.flink.table.plan.logical.LogicalOverWindow; +import org.apache.flink.table.types.logical.LogicalType; import java.util.ArrayList; import java.util.List; @@ -33,6 +35,9 @@ import java.util.stream.Collectors; import static java.util.Arrays.asList; import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedCall; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.BIGINT; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.INTERVAL_DAY_TIME; +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasRoot; /** * Joins call to {@link BuiltInFunctionDefinitions#OVER} with corresponding over window @@ -41,6 +46,8 @@ import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedCa @Internal final class OverWindowResolverRule implements ResolverRule { + private static final WindowKindExtractor OVER_WINDOW_KIND_EXTRACTOR = new WindowKindExtractor(); + @Override public List<Expression> apply(List<Expression> expression, ResolutionContext context) { return expression.stream() @@ -85,8 +92,8 @@ final class OverWindowResolverRule implements ResolverRule { private Expression calculateOverWindowFollowing(LogicalOverWindow referenceWindow) { return referenceWindow.following().orElseGet(() -> { - PlannerExpression preceding = resolutionContext.bridge(referenceWindow.preceding()); - if (preceding.resultType() == BasicTypeInfo.LONG_TYPE_INFO) { + WindowKind kind = referenceWindow.preceding().accept(OVER_WINDOW_KIND_EXTRACTOR); + if (kind == WindowKind.ROW) { return unresolvedCall(BuiltInFunctionDefinitions.CURRENT_ROW); } else { return unresolvedCall(BuiltInFunctionDefinitions.CURRENT_RANGE); @@ -100,4 +107,39 @@ final class OverWindowResolverRule implements ResolverRule { return expression; } } + + private enum WindowKind { + ROW, + RANGE + } + + private static class WindowKindExtractor extends ApiExpressionDefaultVisitor<WindowKind> { + + @Override + public WindowKind visit(ValueLiteralExpression valueLiteral) { + final LogicalType literalType = valueLiteral.getOutputDataType().getLogicalType(); + if (hasRoot(literalType, BIGINT)) { + return WindowKind.ROW; + } else if (hasRoot(literalType, INTERVAL_DAY_TIME)) { + return WindowKind.RANGE; + } + return defaultMethod(valueLiteral); + } + + @Override + public WindowKind visit(UnresolvedCallExpression unresolvedCall) { + final FunctionDefinition definition = unresolvedCall.getFunctionDefinition(); + if (definition == BuiltInFunctionDefinitions.UNBOUNDED_ROW) { + return WindowKind.ROW; + } else if (definition == BuiltInFunctionDefinitions.UNBOUNDED_RANGE) { + return WindowKind.RANGE; + } + return defaultMethod(unresolvedCall); + } + + @Override + protected WindowKind defaultMethod(Expression expression) { + throw new ValidationException("An over window expects literal or unbounded bounds for preceding."); + } + } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolveCallByArgumentsRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolveCallByArgumentsRule.java index d2ad89c..23aa7d4 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolveCallByArgumentsRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolveCallByArgumentsRule.java @@ -20,6 +20,7 @@ package org.apache.flink.table.expressions.rules; import org.apache.flink.annotation.Internal; import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.delegation.PlannerTypeInferenceUtil; import org.apache.flink.table.expressions.CallExpression; @@ -37,6 +38,7 @@ import org.apache.flink.table.types.inference.TypeInferenceUtil; import org.apache.flink.table.types.inference.TypeStrategies; import org.apache.flink.util.Preconditions; +import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; @@ -44,11 +46,13 @@ import java.util.stream.IntStream; /** * This rule checks if a {@link UnresolvedCallExpression} can work with the given arguments and infers - * the output data type. All function calls are resolved {@link CallExpression} after applying this rule. + * the output data type. All function calls are resolved {@link CallExpression} after applying this + * rule except for the special case of {@link BuiltInFunctionDefinitions#FLATTEN}. * - * <p>If the call expects different types of arguments, but the given arguments - * have types that can be casted, a {@link BuiltInFunctionDefinitions#CAST} - * expression is inserted. + * <p>If the call expects different types of arguments, but the given arguments have types that can + * be casted, a {@link BuiltInFunctionDefinitions#CAST} expression is inserted. + * + * @see ResolveFlattenCallRule */ @Internal final class ResolveCallByArgumentsRule implements ResolverRule { @@ -67,11 +71,17 @@ final class ResolveCallByArgumentsRule implements ResolverRule { } @Override - public ResolvedExpression visit(UnresolvedCallExpression unresolvedCall) { + public Expression visit(UnresolvedCallExpression unresolvedCall) { final List<ResolvedExpression> resolvedArgs = unresolvedCall.getChildren().stream() .map(c -> c.accept(this)) .map(e -> { + // special case: FLATTEN + // a call chain `myFunc().flatten().flatten()` is not allowed + if (e instanceof UnresolvedCallExpression && + ((UnresolvedCallExpression) e).getFunctionDefinition() == BuiltInFunctionDefinitions.FLATTEN) { + throw new ValidationException("Consecutive flattening calls are not allowed."); + } if (e instanceof ResolvedExpression) { return (ResolvedExpression) e; } @@ -79,6 +89,12 @@ final class ResolveCallByArgumentsRule implements ResolverRule { }) .collect(Collectors.toList()); + // FLATTEN is a special case and the only call that remains unresolved after this rule + // it will be resolved by ResolveFlattenCallRule + if (unresolvedCall.getFunctionDefinition() == BuiltInFunctionDefinitions.FLATTEN) { + return unresolvedCall.replaceArgs(new ArrayList<>(resolvedArgs)); + } + if (unresolvedCall.getFunctionDefinition() instanceof BuiltInFunctionDefinition) { final BuiltInFunctionDefinition definition = (BuiltInFunctionDefinition) unresolvedCall.getFunctionDefinition(); diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/FlattenCallRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolveFlattenCallRule.java similarity index 63% rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/FlattenCallRule.java rename to flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolveFlattenCallRule.java index 7a07b43..1d2a394 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/FlattenCallRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolveFlattenCallRule.java @@ -21,8 +21,9 @@ package org.apache.flink.table.expressions.rules; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.table.api.TableException; import org.apache.flink.table.expressions.Expression; -import org.apache.flink.table.expressions.PlannerExpression; +import org.apache.flink.table.expressions.ResolvedExpression; import org.apache.flink.table.expressions.UnresolvedCallExpression; import org.apache.flink.table.functions.BuiltInFunctionDefinitions; @@ -31,16 +32,18 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import static java.util.Collections.singletonList; -import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedCall; import static org.apache.flink.table.expressions.ApiExpressionUtils.valueLiteral; -import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.GET; +import static org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo; +import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType; /** - * Replaces {@link BuiltInFunctionDefinitions#FLATTEN} with calls to {@link BuiltInFunctionDefinitions#GET} for all - * fields of underlying field of complex type. + * Replaces {@link BuiltInFunctionDefinitions#FLATTEN} with resolved calls to {@link BuiltInFunctionDefinitions#GET} + * for all fields of underlying field of complex type. + * + * @see ResolveCallByArgumentsRule */ @Internal -final class FlattenCallRule implements ResolverRule { +final class ResolveFlattenCallRule implements ResolverRule { @Override public List<Expression> apply(List<Expression> expression, ResolutionContext context) { @@ -65,20 +68,28 @@ final class FlattenCallRule implements ResolverRule { } private List<Expression> executeFlatten(UnresolvedCallExpression unresolvedCall) { - Expression arg = unresolvedCall.getChildren().get(0); - PlannerExpression plannerExpression = resolutionContext.bridge(arg); - plannerExpression.validateInput(); - TypeInformation<?> resultType = plannerExpression.resultType(); + final Expression composite = unresolvedCall.getChildren().get(0); + if (!(composite instanceof ResolvedExpression)) { + throw new TableException("Resolved expression expected for flattening."); + } + final ResolvedExpression resolvedComposite = (ResolvedExpression) composite; + final TypeInformation<?> resultType = fromDataTypeToLegacyInfo(resolvedComposite.getOutputDataType()); if (resultType instanceof CompositeType) { - return flattenCompositeType(arg, (CompositeType<?>) resultType); + return flattenCompositeType(resolvedComposite, (CompositeType<?>) resultType); } else { - return singletonList(arg); + return singletonList(composite); } } - private List<Expression> flattenCompositeType(Expression arg, CompositeType<?> resultType) { + private List<Expression> flattenCompositeType(ResolvedExpression resolvedComposite, CompositeType<?> resultType) { return IntStream.range(0, resultType.getArity()) - .mapToObj(idx -> unresolvedCall(GET, arg, valueLiteral(resultType.getFieldNames()[idx]))) + .mapToObj(idx -> + resolutionContext.postResolutionFactory() + .get( + resolvedComposite, + valueLiteral(resultType.getFieldNames()[idx]), + fromLegacyInfoToDataType(resultType.getTypeAt(idx))) + ) .collect(Collectors.toList()); } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolverRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolverRule.java index e5e43ef..af51499 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolverRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolverRule.java @@ -23,7 +23,6 @@ import org.apache.flink.table.catalog.FunctionLookup; import org.apache.flink.table.expressions.Expression; import org.apache.flink.table.expressions.ExpressionResolver; import org.apache.flink.table.expressions.LocalReferenceExpression; -import org.apache.flink.table.expressions.PlannerExpression; import org.apache.flink.table.expressions.lookups.FieldReferenceLookup; import org.apache.flink.table.expressions.lookups.TableReferenceLookup; import org.apache.flink.table.functions.FunctionDefinition; @@ -77,10 +76,5 @@ public interface ResolverRule { * Lookup for over windows. */ Optional<LogicalOverWindow> getOverWindow(Expression alias); - - /** - * Temporary way to convert expression to PlannerExpression to evaluate type. - */ - PlannerExpression bridge(Expression expression); } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolverRules.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolverRules.java index 84ea332..d68899e 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolverRules.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolverRules.java @@ -28,9 +28,9 @@ import org.apache.flink.table.expressions.UnresolvedReferenceExpression; public final class ResolverRules { /** - * Rule that resolves flatten call. See {@link FlattenCallRule} for details. + * Rule that resolves flatten call. See {@link ResolveFlattenCallRule} for details. */ - public static final ResolverRule FLATTEN_CALL = new FlattenCallRule(); + public static final ResolverRule FLATTEN_CALL = new ResolveFlattenCallRule(); /** * Resolves {@link UnresolvedReferenceExpression}. See {@link ReferenceResolverRule} for details. diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/CalcValidationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/CalcValidationTest.scala index b9e9f21..c4216e1 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/CalcValidationTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/CalcValidationTest.scala @@ -161,4 +161,16 @@ class CalcValidationTest extends TableTestBase { util.addTable[(Int, Long, String)]("MyTable", 'int, 'long, 'string) .select('int, 'long.log as 'long, 'string) } + + @Test + def testConsecutiveFlattening(): Unit = { + expectedException.expect(classOf[ValidationException]) + expectedException.expectMessage("Consecutive flattening calls are not allowed.") + + val util = streamTestUtil() + util.addTable[(Long, Int)]( + "MyTable", + 'tuple) + .select('tuple.flatten().flatten()) + } }