This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 3561adf [FLINK-17523] Add call expression with a class of UDF as a parameter 3561adf is described below commit 3561adf03deb88bff540773b5a2037c8576c09f8 Author: Dawid Wysakowicz <dwysakow...@apache.org> AuthorDate: Mon May 11 08:46:03 2020 +0200 [FLINK-17523] Add call expression with a class of UDF as a parameter --- .../org/apache/flink/table/api/Expressions.java | 12 +++++++++++ .../java/org/apache/flink/table/api/Table.java | 24 ++++++++-------------- .../apache/flink/table/api/WindowGroupedTable.java | 6 ++---- .../resolver/ExpressionResolverTest.java | 14 +++++++++++++ .../org/apache/flink/table/api/expressionDsl.scala | 9 ++++++++ 5 files changed, 45 insertions(+), 20 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java index 6071764..37f53d4 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java @@ -27,6 +27,7 @@ import org.apache.flink.table.expressions.TimePointUnit; import org.apache.flink.table.functions.BuiltInFunctionDefinitions; import org.apache.flink.table.functions.FunctionDefinition; import org.apache.flink.table.functions.UserDefinedFunction; +import org.apache.flink.table.functions.UserDefinedFunctionHelper; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.utils.TypeConversions; import org.apache.flink.table.types.utils.ValueDataTypeConverter; @@ -526,6 +527,17 @@ public final class Expressions { return apiCall(function, arguments); } + /** + * A call to an unregistered, inline function. + * + * <p>For functions that have been registered before and are identified by a name, use + * {@link #call(String, Object...)}. + */ + public static ApiExpression call(Class<? extends UserDefinedFunction> function, Object... arguments) { + final UserDefinedFunction functionInstance = UserDefinedFunctionHelper.instantiateFunction(function); + return apiCall(functionInstance, arguments); + } + private static ApiExpression apiCall(FunctionDefinition functionDefinition, Object... args) { List<Expression> arguments = Stream.of(args) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java index d1239e6..4d8219f 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java @@ -595,8 +595,7 @@ public interface Table { * } * } * - * TableFunction<String> split = new MySplitUDTF(); - * table.joinLateral(call(split, $("c")).as("s")) + * table.joinLateral(call(MySplitUDTF.class, $("c")).as("s")) * .select($("a"), $("b"), $("c"), $("s")); * } * </pre> @@ -659,8 +658,7 @@ public interface Table { * } * } * - * TableFunction<String> split = new MySplitUDTF(); - * table.joinLateral(call(split, $("c")).as("s"), $("a").isEqual($("s"))) + * table.joinLateral(call(MySplitUDTF.class, $("c")).as("s"), $("a").isEqual($("s"))) * .select($("a"), $("b"), $("c"), $("s")); * } * </pre> @@ -725,8 +723,7 @@ public interface Table { * } * } * - * TableFunction<String> split = new MySplitUDTF(); - * table.leftOuterJoinLateral(call(split, $("c")).as("s")) + * table.leftOuterJoinLateral(call(MySplitUDTF.class, $("c")).as("s")) * .select($("a"), $("b"), $("c"), $("s")); * } * </pre> @@ -791,8 +788,7 @@ public interface Table { * } * } * - * TableFunction<String> split = new MySplitUDTF(); - * table.leftOuterJoinLateral(call(split, $("c")).as("s"), $("a").isEqual($("s"))) + * table.leftOuterJoinLateral(call(MySplitUDTF.class, $("c")).as("s"), $("a").isEqual($("s"))) * .select($("a"), $("b"), $("c"), $("s")); * } * </pre> @@ -1267,8 +1263,7 @@ public interface Table { * * <pre> * {@code - * ScalarFunction func = new MyMapFunction(); - * tab.map(call(func, $("c"))) + * tab.map(call(MyMapFunction.class, $("c"))) * } * </pre> * @@ -1309,8 +1304,7 @@ public interface Table { * * <pre> * {@code - * TableFunction func = new MyFlatMapFunction(); - * tab.flatMap(call(func, $("c"))) + * tab.flatMap(call(MyFlatMapFunction.class, $("c"))) * } * </pre> * @@ -1354,8 +1348,7 @@ public interface Table { * * <pre> * {@code - * AggregateFunction aggFunc = new MyAggregateFunction(); - * tab.aggregate(call(aggFunc, $("a"), $("b")).as("f0", "f1", "f2")) + * tab.aggregate(call(MyAggregateFunction.class, $("a"), $("b")).as("f0", "f1", "f2")) * .select($("f0"), $("f1")); * } * </pre> @@ -1399,8 +1392,7 @@ public interface Table { * * <pre> * {@code - * TableAggregateFunction tableAggFunc = new MyTableAggregateFunction(); - * tab.flatAggregate(call(tableAggFunc, $("a"), $("b")).as("x", "y", "z")) + * tab.flatAggregate(call(MyTableAggregateFunction.class, $("a"), $("b")).as("x", "y", "z")) * .select($("x"), $("y"), $("z")); * } * </pre> diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/WindowGroupedTable.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/WindowGroupedTable.java index 409c7b6..0dbfc01 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/WindowGroupedTable.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/WindowGroupedTable.java @@ -95,8 +95,7 @@ public interface WindowGroupedTable { * * <pre> * {@code - * AggregateFunction aggFunc = new MyAggregateFunction(); - * windowGroupedTable.aggregate(call(aggFunc, $("a"), $("b")).as("x", "y", "z")) + * windowGroupedTable.aggregate(call(MyAggregateFunction.class, $("a"), $("b")).as("x", "y", "z")) * .select($("key"), $("window").start(), $("x"), $("y"), $("z")); * } * </pre> @@ -142,8 +141,7 @@ public interface WindowGroupedTable { * * <pre> * {@code - * TableAggregateFunction tableAggFunc = new MyTableAggregateFunction(); - * windowGroupedTable.flatAggregate(call(tableAggFunc, $("a"), $("b")).as("x", "y", "z")) + * windowGroupedTable.flatAggregate(call(MyTableAggregateFunction.class, $("a"), $("b")).as("x", "y", "z")) * .select($("key"), $("window").start(), $("x"), $("y"), $("z")); * } * </pre> diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/expressions/resolver/ExpressionResolverTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/expressions/resolver/ExpressionResolverTest.java index f303e44..af5735f 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/expressions/resolver/ExpressionResolverTest.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/expressions/resolver/ExpressionResolverTest.java @@ -175,6 +175,20 @@ public class ExpressionResolverTest { DataTypes.INT().notNull().bridgedTo(int.class) )), + TestSpec.test("Inline function call via a class") + .inputSchemas( + TableSchema.builder() + .field("f0", DataTypes.INT()) + .build() + ) + .select(call(ScalarFunc.class, 1, $("f0"))) + .equalTo( + new CallExpression( + new ScalarFunc(), + Arrays.asList(valueLiteral(1), new FieldReferenceExpression("f0", DataTypes.INT(), 0, 0)), + DataTypes.INT().notNull().bridgedTo(int.class) + )), + TestSpec.test("Lookup catalog function call") .inputSchemas( TableSchema.builder() diff --git a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/expressionDsl.scala b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/expressionDsl.scala index 2a7577a..a1239a9 100644 --- a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/expressionDsl.scala +++ b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/expressionDsl.scala @@ -506,6 +506,15 @@ trait ImplicitExpressionConversions { function, params: _*) + /** + * A call to an unregistered, inline function. For functions that have been registered before and + * are identified by a name, use [[call(String, Object...)]]. + */ + def call(function: Class[_ <: UserDefinedFunction], params: Expression*): Expression = + Expressions.call( + function, + params: _*) + // ---------------------------------------------------------------------------------------------- // Implicit expressions in prefix notation // ----------------------------------------------------------------------------------------------