This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 3561adf  [FLINK-17523] Add call expression with a class of UDF as a 
parameter
3561adf is described below

commit 3561adf03deb88bff540773b5a2037c8576c09f8
Author: Dawid Wysakowicz <dwysakow...@apache.org>
AuthorDate: Mon May 11 08:46:03 2020 +0200

    [FLINK-17523] Add call expression with a class of UDF as a parameter
---
 .../org/apache/flink/table/api/Expressions.java    | 12 +++++++++++
 .../java/org/apache/flink/table/api/Table.java     | 24 ++++++++--------------
 .../apache/flink/table/api/WindowGroupedTable.java |  6 ++----
 .../resolver/ExpressionResolverTest.java           | 14 +++++++++++++
 .../org/apache/flink/table/api/expressionDsl.scala |  9 ++++++++
 5 files changed, 45 insertions(+), 20 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
index 6071764..37f53d4 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
@@ -27,6 +27,7 @@ import org.apache.flink.table.expressions.TimePointUnit;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.functions.UserDefinedFunction;
+import org.apache.flink.table.functions.UserDefinedFunctionHelper;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.utils.TypeConversions;
 import org.apache.flink.table.types.utils.ValueDataTypeConverter;
@@ -526,6 +527,17 @@ public final class Expressions {
                return apiCall(function, arguments);
        }
 
+       /**
+        * A call to an unregistered, inline function.
+        *
+        * <p>For functions that have been registered before and are identified 
by a name, use
+        * {@link #call(String, Object...)}.
+        */
+       public static ApiExpression call(Class<? extends UserDefinedFunction> 
function, Object... arguments) {
+               final UserDefinedFunction functionInstance = 
UserDefinedFunctionHelper.instantiateFunction(function);
+               return apiCall(functionInstance, arguments);
+       }
+
        private static ApiExpression apiCall(FunctionDefinition 
functionDefinition, Object... args) {
                List<Expression> arguments =
                        Stream.of(args)
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
index d1239e6..4d8219f 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
@@ -595,8 +595,7 @@ public interface Table {
         *     }
         *   }
         *
-        *   TableFunction<String> split = new MySplitUDTF();
-        *   table.joinLateral(call(split, $("c")).as("s"))
+        *   table.joinLateral(call(MySplitUDTF.class, $("c")).as("s"))
         *        .select($("a"), $("b"), $("c"), $("s"));
         * }
         * </pre>
@@ -659,8 +658,7 @@ public interface Table {
         *     }
         *   }
         *
-        *   TableFunction<String> split = new MySplitUDTF();
-        *   table.joinLateral(call(split, $("c")).as("s"), 
$("a").isEqual($("s")))
+        *   table.joinLateral(call(MySplitUDTF.class, $("c")).as("s"), 
$("a").isEqual($("s")))
         *        .select($("a"), $("b"), $("c"), $("s"));
         * }
         * </pre>
@@ -725,8 +723,7 @@ public interface Table {
         *     }
         *   }
         *
-        *   TableFunction<String> split = new MySplitUDTF();
-        *   table.leftOuterJoinLateral(call(split, $("c")).as("s"))
+        *   table.leftOuterJoinLateral(call(MySplitUDTF.class, $("c")).as("s"))
         *        .select($("a"), $("b"), $("c"), $("s"));
         * }
         * </pre>
@@ -791,8 +788,7 @@ public interface Table {
         *     }
         *   }
         *
-        *   TableFunction<String> split = new MySplitUDTF();
-        *   table.leftOuterJoinLateral(call(split, $("c")).as("s"), 
$("a").isEqual($("s")))
+        *   table.leftOuterJoinLateral(call(MySplitUDTF.class, 
$("c")).as("s"), $("a").isEqual($("s")))
         *        .select($("a"), $("b"), $("c"), $("s"));
         * }
         * </pre>
@@ -1267,8 +1263,7 @@ public interface Table {
         *
         * <pre>
         * {@code
-        *   ScalarFunction func = new MyMapFunction();
-        *   tab.map(call(func, $("c")))
+        *   tab.map(call(MyMapFunction.class, $("c")))
         * }
         * </pre>
         *
@@ -1309,8 +1304,7 @@ public interface Table {
         *
         * <pre>
         * {@code
-        *   TableFunction func = new MyFlatMapFunction();
-        *   tab.flatMap(call(func, $("c")))
+        *   tab.flatMap(call(MyFlatMapFunction.class, $("c")))
         * }
         * </pre>
         *
@@ -1354,8 +1348,7 @@ public interface Table {
         *
         * <pre>
         * {@code
-        *   AggregateFunction aggFunc = new MyAggregateFunction();
-        *   tab.aggregate(call(aggFunc, $("a"), $("b")).as("f0", "f1", "f2"))
+        *   tab.aggregate(call(MyAggregateFunction.class, $("a"), 
$("b")).as("f0", "f1", "f2"))
         *     .select($("f0"), $("f1"));
         * }
         * </pre>
@@ -1399,8 +1392,7 @@ public interface Table {
         *
         * <pre>
         * {@code
-        *   TableAggregateFunction tableAggFunc = new 
MyTableAggregateFunction();
-        *   tab.flatAggregate(call(tableAggFunc, $("a"), $("b")).as("x", "y", 
"z"))
+        *   tab.flatAggregate(call(MyTableAggregateFunction.class, $("a"), 
$("b")).as("x", "y", "z"))
         *     .select($("x"), $("y"), $("z"));
         * }
         * </pre>
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/WindowGroupedTable.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/WindowGroupedTable.java
index 409c7b6..0dbfc01 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/WindowGroupedTable.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/WindowGroupedTable.java
@@ -95,8 +95,7 @@ public interface WindowGroupedTable {
         *
         * <pre>
         * {@code
-        *   AggregateFunction aggFunc = new MyAggregateFunction();
-        *   windowGroupedTable.aggregate(call(aggFunc, $("a"), $("b")).as("x", 
"y", "z"))
+        *   windowGroupedTable.aggregate(call(MyAggregateFunction.class, 
$("a"), $("b")).as("x", "y", "z"))
         *     .select($("key"), $("window").start(), $("x"), $("y"), $("z"));
         * }
         * </pre>
@@ -142,8 +141,7 @@ public interface WindowGroupedTable {
         *
         * <pre>
         * {@code
-        *   TableAggregateFunction tableAggFunc = new 
MyTableAggregateFunction();
-        *   windowGroupedTable.flatAggregate(call(tableAggFunc, $("a"), 
$("b")).as("x", "y", "z"))
+        *   
windowGroupedTable.flatAggregate(call(MyTableAggregateFunction.class, $("a"), 
$("b")).as("x", "y", "z"))
         *     .select($("key"), $("window").start(), $("x"), $("y"), $("z"));
         * }
         * </pre>
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/expressions/resolver/ExpressionResolverTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/expressions/resolver/ExpressionResolverTest.java
index f303e44..af5735f 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/expressions/resolver/ExpressionResolverTest.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/expressions/resolver/ExpressionResolverTest.java
@@ -175,6 +175,20 @@ public class ExpressionResolverTest {
                                                
DataTypes.INT().notNull().bridgedTo(int.class)
                                        )),
 
+                       TestSpec.test("Inline function call via a class")
+                               .inputSchemas(
+                                       TableSchema.builder()
+                                               .field("f0", DataTypes.INT())
+                                               .build()
+                               )
+                               .select(call(ScalarFunc.class, 1, $("f0")))
+                               .equalTo(
+                                       new CallExpression(
+                                               new ScalarFunc(),
+                                               Arrays.asList(valueLiteral(1), 
new FieldReferenceExpression("f0", DataTypes.INT(), 0, 0)),
+                                               
DataTypes.INT().notNull().bridgedTo(int.class)
+                                       )),
+
                        TestSpec.test("Lookup catalog function call")
                                .inputSchemas(
                                        TableSchema.builder()
diff --git 
a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/expressionDsl.scala
 
b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/expressionDsl.scala
index 2a7577a..a1239a9 100644
--- 
a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/expressionDsl.scala
+++ 
b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/expressionDsl.scala
@@ -506,6 +506,15 @@ trait ImplicitExpressionConversions {
     function,
     params: _*)
 
+  /**
+   * A call to an unregistered, inline function. For functions that have been 
registered before and
+   * are identified by a name, use [[call(String, Object...)]].
+   */
+  def call(function: Class[_ <: UserDefinedFunction], params: Expression*): 
Expression =
+    Expressions.call(
+      function,
+      params: _*)
+
   // 
----------------------------------------------------------------------------------------------
   // Implicit expressions in prefix notation
   // 
----------------------------------------------------------------------------------------------

Reply via email to