This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 272ac71328aac0583df859bf197e30afcec0981f
Author: Timo Walther <twal...@apache.org>
AuthorDate: Mon Jul 1 08:25:05 2019 +0200

    [FLINK-13028][table-api-java] Remove planner expression from 
ExpressionResolver
---
 .../table/expressions/ExpressionResolver.java      | 98 +++-------------------
 .../expressions/rules/OverWindowResolverRule.java  | 50 ++++++++++-
 .../rules/ResolveCallByArgumentsRule.java          | 26 ++++--
 ...enCallRule.java => ResolveFlattenCallRule.java} | 39 +++++----
 .../table/expressions/rules/ResolverRule.java      |  6 --
 .../table/expressions/rules/ResolverRules.java     |  4 +-
 .../table/validation/CalcValidationTest.scala      | 12 +++
 7 files changed, 119 insertions(+), 116 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java
index 394180b..015751c 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java
@@ -19,14 +19,9 @@
 package org.apache.flink.table.expressions;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.api.GroupWindow;
 import org.apache.flink.table.api.OverWindow;
-import org.apache.flink.table.api.SessionWithGapOnTimeWithAlias;
-import org.apache.flink.table.api.SlideWithSizeAndSlideOnTimeWithAlias;
 import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.api.TumbleWithSizeOnTimeWithAlias;
-import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.FunctionLookup;
 import org.apache.flink.table.expressions.lookups.FieldReferenceLookup;
 import org.apache.flink.table.expressions.lookups.TableReferenceLookup;
@@ -36,15 +31,9 @@ import 
org.apache.flink.table.functions.BuiltInFunctionDefinition;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 import org.apache.flink.table.operations.QueryOperation;
 import org.apache.flink.table.plan.logical.LogicalOverWindow;
-import org.apache.flink.table.plan.logical.LogicalWindow;
-import org.apache.flink.table.plan.logical.SessionGroupWindow;
-import org.apache.flink.table.plan.logical.SlidingGroupWindow;
-import org.apache.flink.table.plan.logical.TumblingGroupWindow;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.util.Preconditions;
 
-import javax.annotation.Nullable;
-
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -54,11 +43,8 @@ import java.util.Optional;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
-import scala.Some;
-
 import static 
org.apache.flink.table.expressions.ApiExpressionUtils.typeLiteral;
 import static 
org.apache.flink.table.expressions.ApiExpressionUtils.valueLiteral;
-import static 
org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
 
 /**
  * Tries to resolve all unresolved expressions such as {@link 
UnresolvedReferenceExpression}
@@ -97,15 +83,13 @@ public class ExpressionResolver {
                        ResolverRules.EXPAND_COLUMN_FUNCTIONS,
                        ResolverRules.OVER_WINDOWS,
                        ResolverRules.FIELD_RESOLVE,
-                       ResolverRules.FLATTEN_CALL,
                        ResolverRules.QUALIFY_BUILT_IN_FUNCTIONS,
-                       ResolverRules.RESOLVE_CALL_BY_ARGUMENTS);
+                       ResolverRules.RESOLVE_CALL_BY_ARGUMENTS,
+                       ResolverRules.FLATTEN_CALL);
        }
 
        private static final VerifyResolutionVisitor VERIFY_RESOLUTION_VISITOR 
= new VerifyResolutionVisitor();
 
-       private final PlannerExpressionConverter bridgeConverter = 
PlannerExpressionConverter.INSTANCE();
-
        private final FieldReferenceLookup fieldLookup;
 
        private final TableReferenceLookup tableLookup;
@@ -187,54 +171,6 @@ public class ExpressionResolver {
        }
 
        /**
-        * Converts an API class to a logical window for planning with 
expressions already resolved.
-        *
-        * @param window window to resolve
-        * @return logical window with expressions resolved
-        */
-       public LogicalWindow resolveGroupWindow(GroupWindow window) {
-               Expression alias = window.getAlias();
-
-               if (!(alias instanceof UnresolvedReferenceExpression)) {
-                       throw new ValidationException("Alias of group window 
should be an UnresolvedFieldReference");
-               }
-
-               final String windowName = ((UnresolvedReferenceExpression) 
alias).getName();
-               List<Expression> resolvedTimeFieldExpression =
-                       
prepareExpressions(Collections.singletonList(window.getTimeField()));
-               if (resolvedTimeFieldExpression.size() != 1) {
-                       throw new ValidationException("Group Window only 
supports a single time field column.");
-               }
-               PlannerExpression timeField = 
resolvedTimeFieldExpression.get(0).accept(bridgeConverter);
-
-               //TODO replace with LocalReferenceExpression
-               WindowReference resolvedAlias = new WindowReference(windowName, 
new Some<>(timeField.resultType()));
-
-               if (window instanceof TumbleWithSizeOnTimeWithAlias) {
-                       TumbleWithSizeOnTimeWithAlias tw = 
(TumbleWithSizeOnTimeWithAlias) window;
-                       return new TumblingGroupWindow(
-                               resolvedAlias,
-                               timeField,
-                               
resolveFieldsInSingleExpression(tw.getSize()).accept(bridgeConverter));
-               } else if (window instanceof 
SlideWithSizeAndSlideOnTimeWithAlias) {
-                       SlideWithSizeAndSlideOnTimeWithAlias sw = 
(SlideWithSizeAndSlideOnTimeWithAlias) window;
-                       return new SlidingGroupWindow(
-                               resolvedAlias,
-                               timeField,
-                               
resolveFieldsInSingleExpression(sw.getSize()).accept(bridgeConverter),
-                               
resolveFieldsInSingleExpression(sw.getSlide()).accept(bridgeConverter));
-               } else if (window instanceof SessionWithGapOnTimeWithAlias) {
-                       SessionWithGapOnTimeWithAlias sw = 
(SessionWithGapOnTimeWithAlias) window;
-                       return new SessionGroupWindow(
-                               resolvedAlias,
-                               timeField,
-                               
resolveFieldsInSingleExpression(sw.getGap()).accept(bridgeConverter));
-               } else {
-                       throw new TableException("Unknown window type");
-               }
-       }
-
-       /**
         * Enables the creation of resolved expressions for transformations 
after the actual resolution.
         */
        public PostResolverFactory postResolverFactory() {
@@ -251,20 +187,6 @@ public class ExpressionResolver {
                        );
        }
 
-       private void prepareLocalReferencesFromGroupWindows(@Nullable 
GroupWindow groupWindow) {
-               if (groupWindow != null) {
-                       String windowName = ((UnresolvedReferenceExpression) 
groupWindow.getAlias()).getName();
-                       TypeInformation<?> windowType =
-                               
prepareExpressions(Collections.singletonList(groupWindow.getTimeField())).get(0)
-                                       .accept(bridgeConverter)
-                                       .resultType();
-
-                       localReferences.put(
-                               windowName,
-                               new LocalReferenceExpression(windowName, 
fromLegacyInfoToDataType(windowType)));
-               }
-       }
-
        private Map<Expression, LogicalOverWindow> 
prepareOverWindows(List<OverWindow> overWindows) {
                return overWindows.stream()
                        .map(this::resolveOverWindow)
@@ -343,11 +265,6 @@ public class ExpressionResolver {
                public Optional<LogicalOverWindow> getOverWindow(Expression 
alias) {
                        return Optional.ofNullable(overWindows.get(alias));
                }
-
-               @Override
-               public PlannerExpression bridge(Expression expression) {
-                       return expression.accept(bridgeConverter);
-               }
        }
 
        private LogicalOverWindow resolveOverWindow(OverWindow overWindow) {
@@ -401,6 +318,17 @@ public class ExpressionResolver {
                                Collections.singletonList(expression),
                                expression.getOutputDataType()); // the output 
type is equal to the input type
                }
+
+               public CallExpression get(ResolvedExpression composite, 
ValueLiteralExpression key, DataType dataType) {
+                       final FunctionLookup.Result lookupOfGet = functionLookup
+                               
.lookupBuiltInFunction(BuiltInFunctionDefinitions.GET);
+
+                       return new CallExpression(
+                               lookupOfGet.getObjectIdentifier(),
+                               lookupOfGet.getFunctionDefinition(),
+                               Arrays.asList(composite, key),
+                               dataType);
+               }
        }
 
        /**
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/OverWindowResolverRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/OverWindowResolverRule.java
index 92fc177..fcec1ac 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/OverWindowResolverRule.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/OverWindowResolverRule.java
@@ -19,13 +19,15 @@
 package org.apache.flink.table.expressions.rules;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.expressions.ApiExpressionDefaultVisitor;
 import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.PlannerExpression;
 import org.apache.flink.table.expressions.UnresolvedCallExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.plan.logical.LogicalOverWindow;
+import org.apache.flink.table.types.logical.LogicalType;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -33,6 +35,9 @@ import java.util.stream.Collectors;
 
 import static java.util.Arrays.asList;
 import static 
org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedCall;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.BIGINT;
+import static 
org.apache.flink.table.types.logical.LogicalTypeRoot.INTERVAL_DAY_TIME;
+import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasRoot;
 
 /**
  * Joins call to {@link BuiltInFunctionDefinitions#OVER} with corresponding 
over window
@@ -41,6 +46,8 @@ import static 
org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedCa
 @Internal
 final class OverWindowResolverRule implements ResolverRule {
 
+       private static final WindowKindExtractor OVER_WINDOW_KIND_EXTRACTOR = 
new WindowKindExtractor();
+
        @Override
        public List<Expression> apply(List<Expression> expression, 
ResolutionContext context) {
                return expression.stream()
@@ -85,8 +92,8 @@ final class OverWindowResolverRule implements ResolverRule {
 
                private Expression 
calculateOverWindowFollowing(LogicalOverWindow referenceWindow) {
                        return referenceWindow.following().orElseGet(() -> {
-                                       PlannerExpression preceding = 
resolutionContext.bridge(referenceWindow.preceding());
-                                       if (preceding.resultType() == 
BasicTypeInfo.LONG_TYPE_INFO) {
+                                       WindowKind kind = 
referenceWindow.preceding().accept(OVER_WINDOW_KIND_EXTRACTOR);
+                                       if (kind == WindowKind.ROW) {
                                                return 
unresolvedCall(BuiltInFunctionDefinitions.CURRENT_ROW);
                                        } else {
                                                return 
unresolvedCall(BuiltInFunctionDefinitions.CURRENT_RANGE);
@@ -100,4 +107,39 @@ final class OverWindowResolverRule implements ResolverRule 
{
                        return expression;
                }
        }
+
+       private enum WindowKind {
+               ROW,
+               RANGE
+       }
+
+       private static class WindowKindExtractor extends 
ApiExpressionDefaultVisitor<WindowKind> {
+
+               @Override
+               public WindowKind visit(ValueLiteralExpression valueLiteral) {
+                       final LogicalType literalType = 
valueLiteral.getOutputDataType().getLogicalType();
+                       if (hasRoot(literalType, BIGINT)) {
+                               return WindowKind.ROW;
+                       } else if (hasRoot(literalType, INTERVAL_DAY_TIME)) {
+                               return WindowKind.RANGE;
+                       }
+                       return defaultMethod(valueLiteral);
+               }
+
+               @Override
+               public WindowKind visit(UnresolvedCallExpression 
unresolvedCall) {
+                       final FunctionDefinition definition = 
unresolvedCall.getFunctionDefinition();
+                       if (definition == 
BuiltInFunctionDefinitions.UNBOUNDED_ROW) {
+                               return WindowKind.ROW;
+                       } else if (definition == 
BuiltInFunctionDefinitions.UNBOUNDED_RANGE) {
+                               return WindowKind.RANGE;
+                       }
+                       return defaultMethod(unresolvedCall);
+               }
+
+               @Override
+               protected WindowKind defaultMethod(Expression expression) {
+                       throw new ValidationException("An over window expects 
literal or unbounded bounds for preceding.");
+               }
+       }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolveCallByArgumentsRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolveCallByArgumentsRule.java
index d2ad89c..23aa7d4 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolveCallByArgumentsRule.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolveCallByArgumentsRule.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.expressions.rules;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.delegation.PlannerTypeInferenceUtil;
 import org.apache.flink.table.expressions.CallExpression;
@@ -37,6 +38,7 @@ import 
org.apache.flink.table.types.inference.TypeInferenceUtil;
 import org.apache.flink.table.types.inference.TypeStrategies;
 import org.apache.flink.util.Preconditions;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 import java.util.stream.Collectors;
@@ -44,11 +46,13 @@ import java.util.stream.IntStream;
 
 /**
  * This rule checks if a {@link UnresolvedCallExpression} can work with the 
given arguments and infers
- * the output data type. All function calls are resolved {@link 
CallExpression} after applying this rule.
+ * the output data type. All function calls are resolved {@link 
CallExpression} after applying this
+ * rule except for the special case of {@link 
BuiltInFunctionDefinitions#FLATTEN}.
  *
- * <p>If the call expects different types of arguments, but the given arguments
- * have types that can be casted, a {@link BuiltInFunctionDefinitions#CAST}
- * expression is inserted.
+ * <p>If the call expects different types of arguments, but the given 
arguments have types that can
+ * be casted, a {@link BuiltInFunctionDefinitions#CAST} expression is inserted.
+ *
+ * @see ResolveFlattenCallRule
  */
 @Internal
 final class ResolveCallByArgumentsRule implements ResolverRule {
@@ -67,11 +71,17 @@ final class ResolveCallByArgumentsRule implements 
ResolverRule {
                }
 
                @Override
-               public ResolvedExpression visit(UnresolvedCallExpression 
unresolvedCall) {
+               public Expression visit(UnresolvedCallExpression 
unresolvedCall) {
 
                        final List<ResolvedExpression> resolvedArgs = 
unresolvedCall.getChildren().stream()
                                .map(c -> c.accept(this))
                                .map(e -> {
+                                       // special case: FLATTEN
+                                       // a call chain 
`myFunc().flatten().flatten()` is not allowed
+                                       if (e instanceof 
UnresolvedCallExpression &&
+                                                       
((UnresolvedCallExpression) e).getFunctionDefinition() == 
BuiltInFunctionDefinitions.FLATTEN) {
+                                               throw new 
ValidationException("Consecutive flattening calls are not allowed.");
+                                       }
                                        if (e instanceof ResolvedExpression) {
                                                return (ResolvedExpression) e;
                                        }
@@ -79,6 +89,12 @@ final class ResolveCallByArgumentsRule implements 
ResolverRule {
                                })
                                .collect(Collectors.toList());
 
+                       // FLATTEN is a special case and the only call that 
remains unresolved after this rule
+                       // it will be resolved by ResolveFlattenCallRule
+                       if (unresolvedCall.getFunctionDefinition() == 
BuiltInFunctionDefinitions.FLATTEN) {
+                               return unresolvedCall.replaceArgs(new 
ArrayList<>(resolvedArgs));
+                       }
+
                        if (unresolvedCall.getFunctionDefinition() instanceof 
BuiltInFunctionDefinition) {
                                final BuiltInFunctionDefinition definition =
                                        (BuiltInFunctionDefinition) 
unresolvedCall.getFunctionDefinition();
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/FlattenCallRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolveFlattenCallRule.java
similarity index 63%
rename from 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/FlattenCallRule.java
rename to 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolveFlattenCallRule.java
index 7a07b43..1d2a394 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/FlattenCallRule.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolveFlattenCallRule.java
@@ -21,8 +21,9 @@ package org.apache.flink.table.expressions.rules;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.PlannerExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.expressions.UnresolvedCallExpression;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 
@@ -31,16 +32,18 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static java.util.Collections.singletonList;
-import static 
org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedCall;
 import static 
org.apache.flink.table.expressions.ApiExpressionUtils.valueLiteral;
-import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.GET;
+import static 
org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo;
+import static 
org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
 
 /**
- * Replaces {@link BuiltInFunctionDefinitions#FLATTEN} with calls to {@link 
BuiltInFunctionDefinitions#GET} for all
- * fields of underlying field of complex type.
+ * Replaces {@link BuiltInFunctionDefinitions#FLATTEN} with resolved calls to 
{@link BuiltInFunctionDefinitions#GET}
+ * for all fields of underlying field of complex type.
+ *
+ * @see ResolveCallByArgumentsRule
  */
 @Internal
-final class FlattenCallRule implements ResolverRule {
+final class ResolveFlattenCallRule implements ResolverRule {
 
        @Override
        public List<Expression> apply(List<Expression> expression, 
ResolutionContext context) {
@@ -65,20 +68,28 @@ final class FlattenCallRule implements ResolverRule {
                }
 
                private List<Expression> 
executeFlatten(UnresolvedCallExpression unresolvedCall) {
-                       Expression arg = unresolvedCall.getChildren().get(0);
-                       PlannerExpression plannerExpression = 
resolutionContext.bridge(arg);
-                       plannerExpression.validateInput();
-                       TypeInformation<?> resultType = 
plannerExpression.resultType();
+                       final Expression composite = 
unresolvedCall.getChildren().get(0);
+                       if (!(composite instanceof ResolvedExpression)) {
+                               throw new TableException("Resolved expression 
expected for flattening.");
+                       }
+                       final ResolvedExpression resolvedComposite = 
(ResolvedExpression) composite;
+                       final TypeInformation<?> resultType = 
fromDataTypeToLegacyInfo(resolvedComposite.getOutputDataType());
                        if (resultType instanceof CompositeType) {
-                               return flattenCompositeType(arg, 
(CompositeType<?>) resultType);
+                               return flattenCompositeType(resolvedComposite, 
(CompositeType<?>) resultType);
                        } else {
-                               return singletonList(arg);
+                               return singletonList(composite);
                        }
                }
 
-               private List<Expression> flattenCompositeType(Expression arg, 
CompositeType<?> resultType) {
+               private List<Expression> 
flattenCompositeType(ResolvedExpression resolvedComposite, CompositeType<?> 
resultType) {
                        return IntStream.range(0, resultType.getArity())
-                               .mapToObj(idx -> unresolvedCall(GET, arg, 
valueLiteral(resultType.getFieldNames()[idx])))
+                               .mapToObj(idx ->
+                                       
resolutionContext.postResolutionFactory()
+                                               .get(
+                                                       resolvedComposite,
+                                                       
valueLiteral(resultType.getFieldNames()[idx]),
+                                                       
fromLegacyInfoToDataType(resultType.getTypeAt(idx)))
+                               )
                                .collect(Collectors.toList());
                }
 
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolverRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolverRule.java
index e5e43ef..af51499 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolverRule.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolverRule.java
@@ -23,7 +23,6 @@ import org.apache.flink.table.catalog.FunctionLookup;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.ExpressionResolver;
 import org.apache.flink.table.expressions.LocalReferenceExpression;
-import org.apache.flink.table.expressions.PlannerExpression;
 import org.apache.flink.table.expressions.lookups.FieldReferenceLookup;
 import org.apache.flink.table.expressions.lookups.TableReferenceLookup;
 import org.apache.flink.table.functions.FunctionDefinition;
@@ -77,10 +76,5 @@ public interface ResolverRule {
                 * Lookup for over windows.
                 */
                Optional<LogicalOverWindow> getOverWindow(Expression alias);
-
-               /**
-                * Temporary way to convert expression to PlannerExpression to 
evaluate type.
-                */
-               PlannerExpression bridge(Expression expression);
        }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolverRules.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolverRules.java
index 84ea332..d68899e 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolverRules.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolverRules.java
@@ -28,9 +28,9 @@ import 
org.apache.flink.table.expressions.UnresolvedReferenceExpression;
 public final class ResolverRules {
 
        /**
-        * Rule that resolves flatten call. See {@link FlattenCallRule} for 
details.
+        * Rule that resolves flatten call. See {@link ResolveFlattenCallRule} 
for details.
         */
-       public static final ResolverRule FLATTEN_CALL = new FlattenCallRule();
+       public static final ResolverRule FLATTEN_CALL = new 
ResolveFlattenCallRule();
 
        /**
         * Resolves {@link UnresolvedReferenceExpression}. See {@link 
ReferenceResolverRule} for details.
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/CalcValidationTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/CalcValidationTest.scala
index b9e9f21..c4216e1 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/CalcValidationTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/CalcValidationTest.scala
@@ -161,4 +161,16 @@ class CalcValidationTest extends TableTestBase {
     util.addTable[(Int, Long, String)]("MyTable", 'int, 'long, 'string)
       .select('int, 'long.log as 'long, 'string)
   }
+
+  @Test
+  def testConsecutiveFlattening(): Unit = {
+    expectedException.expect(classOf[ValidationException])
+    expectedException.expectMessage("Consecutive flattening calls are not 
allowed.")
+
+    val util = streamTestUtil()
+    util.addTable[(Long, Int)](
+      "MyTable",
+      'tuple)
+    .select('tuple.flatten().flatten())
+  }
 }

Reply via email to