This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 036c5492a9bfe7636d5e7d5eb664af5e77952707
Author: Timo Walther <twal...@apache.org>
AuthorDate: Mon Jul 1 15:25:04 2019 +0200

    [FLINK-13028][table-api-java] Merge flatten and call resolution rule
---
 .../expressions/resolver/ExpressionResolver.java   |   3 +-
 .../resolver/rules/ResolveCallByArgumentsRule.java |  94 ++++++++++++-------
 .../resolver/rules/ResolveFlattenCallRule.java     | 101 ---------------------
 .../expressions/resolver/rules/ResolverRules.java  |   5 -
 .../flink/table/api/batch/table/CalcTest.scala     |   1 +
 .../table/validation/CalcValidationTest.scala      |  12 ---
 6 files changed, 62 insertions(+), 154 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
index 4c1e62d..e4c07fa 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
@@ -90,8 +90,7 @@ public class ExpressionResolver {
                        ResolverRules.OVER_WINDOWS,
                        ResolverRules.FIELD_RESOLVE,
                        ResolverRules.QUALIFY_BUILT_IN_FUNCTIONS,
-                       ResolverRules.RESOLVE_CALL_BY_ARGUMENTS,
-                       ResolverRules.FLATTEN_CALL);
+                       ResolverRules.RESOLVE_CALL_BY_ARGUMENTS);
        }
 
        private static final VerifyResolutionVisitor VERIFY_RESOLUTION_VISITOR 
= new VerifyResolutionVisitor();
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
index 73429f4..1f184c8 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
@@ -19,6 +19,8 @@
 package org.apache.flink.table.expressions.resolver.rules;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.ObjectIdentifier;
@@ -38,21 +40,26 @@ import 
org.apache.flink.table.types.inference.TypeInferenceUtil;
 import org.apache.flink.table.types.inference.TypeStrategies;
 import org.apache.flink.util.Preconditions;
 
-import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static java.util.Collections.singletonList;
+import static 
org.apache.flink.table.expressions.utils.ApiExpressionUtils.valueLiteral;
+import static 
org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo;
+import static 
org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
+
 /**
  * This rule checks if a {@link UnresolvedCallExpression} can work with the 
given arguments and infers
  * the output data type. All function calls are resolved {@link 
CallExpression} after applying this
- * rule except for the special case of {@link 
BuiltInFunctionDefinitions#FLATTEN}.
+ * rule.
+ *
+ * <p>This rule also resolves {@code flatten()} calls on composite types.
  *
  * <p>If the call expects different types of arguments, but the given 
arguments have types that can
  * be casted, a {@link BuiltInFunctionDefinitions#CAST} expression is inserted.
- *
- * @see ResolveFlattenCallRule
  */
 @Internal
 final class ResolveCallByArgumentsRule implements ResolverRule {
@@ -60,39 +67,27 @@ final class ResolveCallByArgumentsRule implements 
ResolverRule {
        @Override
        public List<Expression> apply(List<Expression> expression, 
ResolutionContext context) {
                return expression.stream()
-                       .map(expr -> expr.accept(new 
CallArgumentsCastingVisitor(context)))
+                       .flatMap(expr -> expr.accept(new 
ResolvingCallVisitor(context)).stream())
                        .collect(Collectors.toList());
        }
 
-       private class CallArgumentsCastingVisitor extends 
RuleExpressionVisitor<Expression> {
+       // 
--------------------------------------------------------------------------------------------
+
+       private class ResolvingCallVisitor extends 
RuleExpressionVisitor<List<ResolvedExpression>> {
 
-               CallArgumentsCastingVisitor(ResolutionContext context) {
+               ResolvingCallVisitor(ResolutionContext context) {
                        super(context);
                }
 
                @Override
-               public Expression visit(UnresolvedCallExpression 
unresolvedCall) {
+               public List<ResolvedExpression> visit(UnresolvedCallExpression 
unresolvedCall) {
 
                        final List<ResolvedExpression> resolvedArgs = 
unresolvedCall.getChildren().stream()
-                               .map(c -> c.accept(this))
-                               .map(e -> {
-                                       // special case: FLATTEN
-                                       // a call chain 
`myFunc().flatten().flatten()` is not allowed
-                                       if (e instanceof 
UnresolvedCallExpression &&
-                                                       
((UnresolvedCallExpression) e).getFunctionDefinition() == 
BuiltInFunctionDefinitions.FLATTEN) {
-                                               throw new 
ValidationException("Consecutive flattening calls are not allowed.");
-                                       }
-                                       if (e instanceof ResolvedExpression) {
-                                               return (ResolvedExpression) e;
-                                       }
-                                       throw new TableException("Unexpected 
unresolved expression: " + e);
-                               })
+                               .flatMap(c -> c.accept(this).stream())
                                .collect(Collectors.toList());
 
-                       // FLATTEN is a special case and the only call that 
remains unresolved after this rule
-                       // it will be resolved by ResolveFlattenCallRule
                        if (unresolvedCall.getFunctionDefinition() == 
BuiltInFunctionDefinitions.FLATTEN) {
-                               return unresolvedCall.replaceArgs(new 
ArrayList<>(resolvedArgs));
+                               return executeFlatten(resolvedArgs);
                        }
 
                        if (unresolvedCall.getFunctionDefinition() instanceof 
BuiltInFunctionDefinition) {
@@ -100,13 +95,49 @@ final class ResolveCallByArgumentsRule implements 
ResolverRule {
                                        (BuiltInFunctionDefinition) 
unresolvedCall.getFunctionDefinition();
 
                                if 
(definition.getTypeInference().getOutputTypeStrategy() != 
TypeStrategies.MISSING) {
-                                       return runTypeInference(
-                                               unresolvedCall,
-                                               definition.getTypeInference(),
-                                               resolvedArgs);
+                                       return Collections.singletonList(
+                                               runTypeInference(
+                                                       unresolvedCall,
+                                                       
definition.getTypeInference(),
+                                                       resolvedArgs));
                                }
                        }
-                       return runLegacyTypeInference(unresolvedCall, 
resolvedArgs);
+                       return Collections.singletonList(
+                               runLegacyTypeInference(unresolvedCall, 
resolvedArgs));
+               }
+
+               @Override
+               protected List<ResolvedExpression> defaultMethod(Expression 
expression) {
+                       if (expression instanceof ResolvedExpression) {
+                               return 
Collections.singletonList((ResolvedExpression) expression);
+                       }
+                       throw new TableException("Unexpected unresolved 
expression: " + expression);
+               }
+
+               private List<ResolvedExpression> 
executeFlatten(List<ResolvedExpression> args) {
+                       if (args.size() != 1) {
+                               throw new ValidationException("Invalid number 
of arguments for flattening.");
+                       }
+                       final ResolvedExpression composite = args.get(0);
+                       // TODO support the new type system with ROW and 
STRUCTURED_TYPE
+                       final TypeInformation<?> resultType = 
fromDataTypeToLegacyInfo(composite.getOutputDataType());
+                       if (resultType instanceof CompositeType) {
+                               return flattenCompositeType(composite, 
(CompositeType<?>) resultType);
+                       } else {
+                               return singletonList(composite);
+                       }
+               }
+
+               private List<ResolvedExpression> 
flattenCompositeType(ResolvedExpression composite, CompositeType<?> resultType) 
{
+                       return IntStream.range(0, resultType.getArity())
+                               .mapToObj(idx ->
+                                       
resolutionContext.postResolutionFactory()
+                                               .get(
+                                                       composite,
+                                                       
valueLiteral(resultType.getFieldNames()[idx]),
+                                                       
fromLegacyInfoToDataType(resultType.getTypeAt(idx)))
+                               )
+                               .collect(Collectors.toList());
                }
 
                private ResolvedExpression runTypeInference(
@@ -163,11 +194,6 @@ final class ResolveCallByArgumentsRule implements 
ResolverRule {
                                })
                                .collect(Collectors.toList());
                }
-
-               @Override
-               protected Expression defaultMethod(Expression expression) {
-                       return expression;
-               }
        }
 
        // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveFlattenCallRule.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveFlattenCallRule.java
deleted file mode 100644
index 53d7750..0000000
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveFlattenCallRule.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.expressions.resolver.rules;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.ResolvedExpression;
-import org.apache.flink.table.expressions.UnresolvedCallExpression;
-import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
-
-import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import static java.util.Collections.singletonList;
-import static 
org.apache.flink.table.expressions.utils.ApiExpressionUtils.valueLiteral;
-import static 
org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo;
-import static 
org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
-
-/**
- * Replaces {@link BuiltInFunctionDefinitions#FLATTEN} with resolved calls to 
{@link BuiltInFunctionDefinitions#GET}
- * for all fields of underlying field of complex type.
- *
- * @see ResolveCallByArgumentsRule
- */
-@Internal
-final class ResolveFlattenCallRule implements ResolverRule {
-
-       @Override
-       public List<Expression> apply(List<Expression> expression, 
ResolutionContext context) {
-               return expression.stream()
-                       .flatMap(expr -> expr.accept(new 
FlatteningCallVisitor(context)).stream())
-                       .collect(Collectors.toList());
-       }
-
-       private class FlatteningCallVisitor extends 
RuleExpressionVisitor<List<Expression>> {
-
-               FlatteningCallVisitor(ResolutionContext context) {
-                       super(context);
-               }
-
-               @Override
-               public List<Expression> visit(UnresolvedCallExpression 
unresolvedCall) {
-                       if (unresolvedCall.getFunctionDefinition() == 
BuiltInFunctionDefinitions.FLATTEN) {
-                               return executeFlatten(unresolvedCall);
-                       }
-
-                       return singletonList(unresolvedCall);
-               }
-
-               private List<Expression> 
executeFlatten(UnresolvedCallExpression unresolvedCall) {
-                       final Expression composite = 
unresolvedCall.getChildren().get(0);
-                       if (!(composite instanceof ResolvedExpression)) {
-                               throw new TableException("Resolved expression 
expected for flattening.");
-                       }
-                       final ResolvedExpression resolvedComposite = 
(ResolvedExpression) composite;
-                       final TypeInformation<?> resultType = 
fromDataTypeToLegacyInfo(resolvedComposite.getOutputDataType());
-                       if (resultType instanceof CompositeType) {
-                               return flattenCompositeType(resolvedComposite, 
(CompositeType<?>) resultType);
-                       } else {
-                               return singletonList(composite);
-                       }
-               }
-
-               private List<Expression> 
flattenCompositeType(ResolvedExpression resolvedComposite, CompositeType<?> 
resultType) {
-                       return IntStream.range(0, resultType.getArity())
-                               .mapToObj(idx ->
-                                       
resolutionContext.postResolutionFactory()
-                                               .get(
-                                                       resolvedComposite,
-                                                       
valueLiteral(resultType.getFieldNames()[idx]),
-                                                       
fromLegacyInfoToDataType(resultType.getTypeAt(idx)))
-                               )
-                               .collect(Collectors.toList());
-               }
-
-               @Override
-               protected List<Expression> defaultMethod(Expression expression) 
{
-                       return singletonList(expression);
-               }
-       }
-}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRules.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRules.java
index 915f6a5..671b1e1 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRules.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRules.java
@@ -28,11 +28,6 @@ import 
org.apache.flink.table.expressions.UnresolvedReferenceExpression;
 public final class ResolverRules {
 
        /**
-        * Rule that resolves flatten call. See {@link ResolveFlattenCallRule} 
for details.
-        */
-       public static final ResolverRule FLATTEN_CALL = new 
ResolveFlattenCallRule();
-
-       /**
         * Resolves {@link UnresolvedReferenceExpression}. See {@link 
ReferenceResolverRule} for details.
         */
        public static final ResolverRule FIELD_RESOLVE = new 
ReferenceResolverRule();
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/CalcTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/CalcTest.scala
index 7f8bc81..b688d4a 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/CalcTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/CalcTest.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.api.batch.table
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala.createTypeInformation
+import org.apache.flink.table.api.DataTypes
 import org.apache.flink.table.api.batch.table.CalcTest.{MyHashCode, 
TestCaseClass, WC, giveMeCaseClass}
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.functions.ScalarFunction
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/CalcValidationTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/CalcValidationTest.scala
index c4216e1..b9e9f21 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/CalcValidationTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/CalcValidationTest.scala
@@ -161,16 +161,4 @@ class CalcValidationTest extends TableTestBase {
     util.addTable[(Int, Long, String)]("MyTable", 'int, 'long, 'string)
       .select('int, 'long.log as 'long, 'string)
   }
-
-  @Test
-  def testConsecutiveFlattening(): Unit = {
-    expectedException.expect(classOf[ValidationException])
-    expectedException.expectMessage("Consecutive flattening calls are not 
allowed.")
-
-    val util = streamTestUtil()
-    util.addTable[(Long, Int)](
-      "MyTable",
-      'tuple)
-    .select('tuple.flatten().flatten())
-  }
 }

Reply via email to