This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9646760621aadedefb6d9ff806b70111ee64d318 Author: Dawid Wysakowicz <dwysakow...@apache.org> AuthorDate: Mon Mar 2 16:00:44 2020 +0100 [hotfix][tabe, tests] Extract FunctionLookupMock to be reusable. --- .../resolver/ExpressionResolverTest.java | 97 +------------- .../flink/table/utils/FunctionLookupMock.java | 149 +++++++++++++++++++++ 2 files changed, 153 insertions(+), 93 deletions(-) diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/expressions/resolver/ExpressionResolverTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/expressions/resolver/ExpressionResolverTest.java index 4d4a809..f303e44 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/expressions/resolver/ExpressionResolverTest.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/expressions/resolver/ExpressionResolverTest.java @@ -26,16 +26,12 @@ import org.apache.flink.table.annotation.InputGroup; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.catalog.FunctionLookup; import org.apache.flink.table.catalog.ObjectIdentifier; -import org.apache.flink.table.catalog.UnresolvedIdentifier; -import org.apache.flink.table.delegation.PlannerTypeInferenceUtil; import org.apache.flink.table.expressions.CallExpression; import org.apache.flink.table.expressions.Expression; import org.apache.flink.table.expressions.FieldReferenceExpression; import org.apache.flink.table.expressions.ResolvedExpression; import org.apache.flink.table.expressions.ValueLiteralExpression; -import org.apache.flink.table.functions.BuiltInFunctionDefinition; import org.apache.flink.table.functions.BuiltInFunctionDefinitions; import org.apache.flink.table.functions.FunctionDefinition; import org.apache.flink.table.functions.FunctionIdentifier; @@ -43,9 +39,8 @@ import org.apache.flink.table.functions.ScalarFunction; import org.apache.flink.table.functions.ScalarFunctionDefinition; import org.apache.flink.table.operations.CatalogQueryOperation; import org.apache.flink.table.operations.QueryOperation; -import org.apache.flink.table.types.inference.TypeInferenceUtil; import org.apache.flink.table.types.utils.DataTypeFactoryMock; -import org.apache.flink.table.types.utils.TypeConversions; +import org.apache.flink.table.utils.FunctionLookupMock; import org.junit.Test; import org.junit.runner.RunWith; @@ -58,7 +53,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.stream.Collectors; import static org.apache.flink.table.api.Expressions.$; import static org.apache.flink.table.api.Expressions.call; @@ -69,21 +63,8 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.junit.Assert.assertThat; /** - * This test supports only a subset of builtin functions because those functions still depend on - * planner expressions for argument validation and type inference. Supported builtin functions are: - * - * <p>- BuiltinFunctionDefinitions.EQUALS - * - BuiltinFunctionDefinitions.IS_NULL - * - * <p>Pseudo functions that are executed during expression resolution e.g.: - * - BuiltinFunctionDefinitions.WITH_COLUMNS - * - BuiltinFunctionDefinitions.WITHOUT_COLUMNS - * - BuiltinFunctionDefinitions.RANGE_TO - * - BuiltinFunctionDefinitions.FLATTEN - * - * <p>This test supports only a simplified identifier parsing logic. It does not support escaping. - * It just naively splits on dots. The proper logic comes with a planner implementation which is not - * available in the API module. + * Tests for resolving expressions with {@link ExpressionResolver} created with Expression DSL. + * See also {@link FunctionLookupMock} for a set of supported functions. */ @RunWith(Parameterized.class) public class ExpressionResolverTest { @@ -340,80 +321,10 @@ public class ExpressionResolverTest { } public ExpressionResolver getResolver() { - FunctionLookup functionLookup = new FunctionLookup() { - @Override - public Optional<Result> lookupFunction(String stringIdentifier) { - // this is a simplified version for the test - return lookupFunction(UnresolvedIdentifier.of(stringIdentifier.split("\\."))); - } - - @Override - public Optional<Result> lookupFunction(UnresolvedIdentifier identifier) { - final FunctionIdentifier functionIdentifier; - if (identifier.getCatalogName().isPresent() && identifier.getDatabaseName().isPresent()) { - functionIdentifier = FunctionIdentifier.of( - ObjectIdentifier.of( - identifier.getCatalogName().get(), - identifier.getDatabaseName().get(), - identifier.getObjectName())); - } else { - functionIdentifier = FunctionIdentifier.of(identifier.getObjectName()); - } - - return Optional.ofNullable(functions.get(functionIdentifier)) - .map(func -> new Result(functionIdentifier, func)); - } - - @Override - public Result lookupBuiltInFunction(BuiltInFunctionDefinition definition) { - return new Result( - FunctionIdentifier.of(definition.getName()), - definition - ); - } - - @Override - public PlannerTypeInferenceUtil getPlannerTypeInferenceUtil() { - return (unresolvedCall, resolvedArgs) -> { - FunctionDefinition functionDefinition = unresolvedCall.getFunctionDefinition(); - if (functionDefinition.equals(BuiltInFunctionDefinitions.EQUALS)) { - return new TypeInferenceUtil.Result( - resolvedArgs.stream() - .map(ResolvedExpression::getOutputDataType) - .collect(Collectors.toList()), - null, - DataTypes.BOOLEAN() - ); - } else if (functionDefinition.equals(BuiltInFunctionDefinitions.IS_NULL)) { - return new TypeInferenceUtil.Result( - resolvedArgs.stream() - .map(ResolvedExpression::getOutputDataType) - .collect(Collectors.toList()), - null, - DataTypes.BOOLEAN() - ); - } else if (functionDefinition instanceof ScalarFunctionDefinition) { - return new TypeInferenceUtil.Result( - resolvedArgs.stream() - .map(ResolvedExpression::getOutputDataType) - .collect(Collectors.toList()), - null, - // We do not support a full legacy type inference here. We support only a static result - // type - TypeConversions.fromLegacyInfoToDataType(((ScalarFunctionDefinition) functionDefinition) - .getScalarFunction() - .getResultType(null))); - } - - throw new IllegalArgumentException( - "Unsupported builtin function in the test: " + unresolvedCall); - }; - } - }; return ExpressionResolver.resolverFor( new TableConfig(), name -> Optional.empty(), - functionLookup, + new FunctionLookupMock(functions), new DataTypeFactoryMock(), Arrays.stream(schemas) .map(schema -> (QueryOperation) new CatalogQueryOperation(ObjectIdentifier.of("", "", ""), schema)) diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/FunctionLookupMock.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/FunctionLookupMock.java new file mode 100644 index 0000000..c05686a --- /dev/null +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/FunctionLookupMock.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.utils; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.catalog.FunctionLookup; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.UnresolvedIdentifier; +import org.apache.flink.table.delegation.PlannerTypeInferenceUtil; +import org.apache.flink.table.expressions.ResolvedExpression; +import org.apache.flink.table.functions.BuiltInFunctionDefinition; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.FunctionDefinition; +import org.apache.flink.table.functions.FunctionIdentifier; +import org.apache.flink.table.functions.ScalarFunctionDefinition; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.inference.TypeInferenceUtil; +import org.apache.flink.table.types.utils.TypeConversions; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * A test implementation for a {@link FunctionLookup}. It mocks away a few features of a + * {@link org.apache.flink.table.catalog.FunctionCatalog}. This class supports only a subset + * of builtin functions because those functions still depend on planner expressions for argument + * validation and type inference. Supported builtin functions are: + * + * <ul> + * <li>BuiltinFunctionDefinitions.EQUALS</li> + * <li>BuiltinFunctionDefinitions.IS_NULL</li> + * <li>BuiltinFunctionDefinitions.ROW</li> + * </ul> + * + * <p>Pseudo functions that are executed during expression resolution e.g.: + * <ul> + * <li>BuiltinFunctionDefinitions.WITH_COLUMNS</li> + * <li>BuiltinFunctionDefinitions.WITHOUT_COLUMNS</li> + * <li>BuiltinFunctionDefinitions.RANGE_TO</li> + * <li>BuiltinFunctionDefinitions.FLATTEN</li> + * </ul> + * + * <p>This class supports only a simplified identifier parsing logic. It does not support escaping. + * It just naively splits on dots. The proper logic comes with a planner implementation which is not + * available in the API module. + */ +public final class FunctionLookupMock implements FunctionLookup { + + private final Map<FunctionIdentifier, FunctionDefinition> functions; + + public FunctionLookupMock(Map<FunctionIdentifier, FunctionDefinition> functions) { + this.functions = functions; + } + + @Override + public Optional<Result> lookupFunction(String stringIdentifier) { + // this is a simplified version for the test + return lookupFunction(UnresolvedIdentifier.of(stringIdentifier.split("\\."))); + } + + @Override + public Optional<Result> lookupFunction(UnresolvedIdentifier identifier) { + final FunctionIdentifier functionIdentifier; + if (identifier.getCatalogName().isPresent() && identifier.getDatabaseName().isPresent()) { + functionIdentifier = FunctionIdentifier.of( + ObjectIdentifier.of( + identifier.getCatalogName().get(), + identifier.getDatabaseName().get(), + identifier.getObjectName())); + } else { + functionIdentifier = FunctionIdentifier.of(identifier.getObjectName()); + } + + return Optional.ofNullable(functions.get(functionIdentifier)) + .map(func -> new Result(functionIdentifier, func)); + } + + @Override + public Result lookupBuiltInFunction(BuiltInFunctionDefinition definition) { + return new Result( + FunctionIdentifier.of(definition.getName()), + definition + ); + } + + @Override + public PlannerTypeInferenceUtil getPlannerTypeInferenceUtil() { + return (unresolvedCall, resolvedArgs) -> { + FunctionDefinition functionDefinition = unresolvedCall.getFunctionDefinition(); + List<DataType> argumentTypes = resolvedArgs.stream() + .map(ResolvedExpression::getOutputDataType) + .collect(Collectors.toList()); + if (functionDefinition.equals(BuiltInFunctionDefinitions.EQUALS)) { + return new TypeInferenceUtil.Result( + argumentTypes, + null, + DataTypes.BOOLEAN() + ); + } else if (functionDefinition.equals(BuiltInFunctionDefinitions.IS_NULL)) { + return new TypeInferenceUtil.Result( + argumentTypes, + null, + DataTypes.BOOLEAN() + ); + } else if (functionDefinition.equals(BuiltInFunctionDefinitions.ROW)) { + DataTypes.Field[] fields = IntStream.range(0, argumentTypes.size()) + .mapToObj(idx -> DataTypes.FIELD("f" + idx, argumentTypes.get(idx))) + .toArray(DataTypes.Field[]::new); + + return new TypeInferenceUtil.Result( + argumentTypes, + null, + DataTypes.ROW(fields) + ); + } else if (functionDefinition instanceof ScalarFunctionDefinition) { + return new TypeInferenceUtil.Result( + argumentTypes, + null, + // We do not support a full legacy type inference here. We support only a static result + // type + TypeConversions.fromLegacyInfoToDataType(((ScalarFunctionDefinition) functionDefinition) + .getScalarFunction() + .getResultType(null))); + } + + throw new IllegalArgumentException( + "Unsupported builtin function in the test: " + unresolvedCall); + }; + } +}