[
https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15636647#comment-15636647
]
ASF GitHub Bot commented on FLINK-4469:
---------------------------------------
Github user sunjincheng121 commented on a diff in the pull request:
https://github.com/apache/flink/pull/2653#discussion_r86560516
--- Diff:
flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/UserDefinedTableFunctionITCase.java
---
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.batch;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.table.BatchTableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.Table;
+import org.apache.flink.api.table.TableEnvironment;
+import org.apache.flink.api.table.functions.TableFunction;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+
+@RunWith(Parameterized.class)
+public class UserDefinedTableFunctionITCase extends TableProgramsTestBase {
+
+ public UserDefinedTableFunctionITCase(TestExecutionMode mode,
TableConfigMode configMode){
+ super(mode, configMode);
+ }
+
+
+ @Test
+ public void testUDTF() throws Exception {
+ ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv =
TableEnvironment.getTableEnvironment(env);
+
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 =
+ CollectionDataSets.getSmall5TupleDataSet(env);
+
+ Table table = tableEnv.fromDataSet(ds1, "a, b, c, d, e");
+
+ tableEnv.registerFunction("stack", new TableFunc0());
+
+ Table result = table.crossApply("stack(a,c) as (f)")
+ .select("b,f");
+
+ // with overloading
+ DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+ List<Row> results = ds.collect();
+ String expected = "1,1\n" + "1,0\n" + "2,2\n" + "2,1\n" +
"3,2\n" + "3,2\n";
+ compareResultAsText(results, expected);
+
+ Table result2 = table.crossApply("stack(a,c,e) as (f)")
+ .select("b,f");
+
+ DataSet<Row> ds2 = tableEnv.toDataSet(result2, Row.class);
+ List<Row> results2 = ds2.collect();
+ String expected2 = "1,1\n" + "1,1\n" + "1,0\n" + "2,2\n" +
"2,2\n" + "2,1\n" +
+ "3,1\n" + "3,2\n" + "3,2\n";
+ compareResultAsText(results2, expected2);
+ }
+
+ @Test
+ public void testUDTFWithOuterApply() throws Exception {
+ ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv =
TableEnvironment.getTableEnvironment(env);
+
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 =
+ CollectionDataSets.getSmall5TupleDataSet(env);
+
+ Table table = tableEnv.fromDataSet(ds1, "a, b, c, d, e");
+
+ tableEnv.registerFunction("func1", new TableFunc1());
+
+ Table result = table.crossApply("func1(d) as (s,l)")
+ .select("d,s,l");
+
+ DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+ List<Row> results = ds.collect();
+ String expected = "Hallo Welt,Welt,4\n" + "Hallo Welt
wie,Welt,4\n" +
+ "Hallo Welt wie,wie,3\n";
+ compareResultAsText(results, expected);
+
+
+ Table result2 = table.outerApply("func1(d) as (s,l)")
+ .select("d,s,l");
+
+ DataSet<Row> ds2 = tableEnv.toDataSet(result2, Row.class);
+ List<Row> results2 = ds2.collect();
+ String expected2 = "Hallo,null,null\n" + "Hallo Welt,Welt,4\n"
+ "Hallo Welt wie,Welt,4\n" +
+ "Hallo Welt wie,wie,3\n";
+ compareResultAsText(results2, expected2);
+ }
+
+ @Test
+ public void testUDTFWithScalarFunction() throws Exception {
+ ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv =
TableEnvironment.getTableEnvironment(env);
+
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 =
+ CollectionDataSets.getSmall5TupleDataSet(env);
+
+ Table table = tableEnv.fromDataSet(ds1, "a, b, c, d, e");
+
+ tableEnv.registerFunction("func0", new TableFunc0());
+
+ Table result = table.crossApply("func0(c, charLength(d)) as
(l)")
+ .select("d,l");
+
+ DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+ List<Row> results = ds.collect();
+ String expected = "Hallo,0\n" + "Hallo,5\n" + "Hallo Welt,1\n"
+ "Hallo Welt,10\n" +
+ "Hallo Welt wie,2\n" + "Hallo Welt wie,14\n";
+ compareResultAsText(results, expected);
+ }
+
+
+ public static class TableFunc0 extends TableFunction<Integer> {
+ public void eval(int a, int b) {
+ collect(a);
+ collect(b);
+ }
+
+ public void eval(int a, int b, long c) {
+ collect(a);
+ collect(b);
+ collect((int) c);
+ }
+ }
+
+ public static class TableFunc1 extends TableFunction<Tuple2<String,
Integer>> {
--- End diff --
Currently supports pojo type?
> Add support for user defined table function in Table API & SQL
> --------------------------------------------------------------
>
> Key: FLINK-4469
> URL: https://issues.apache.org/jira/browse/FLINK-4469
> Project: Flink
> Issue Type: New Feature
> Components: Table API & SQL
> Reporter: Jark Wu
> Assignee: Jark Wu
>
> Normal user-defined functions, such as concat(), take in a single input row
> and output a single output row. In contrast, table-generating functions
> transform a single input row to multiple output rows. It is very useful in
> some cases, such as look up in HBase by rowkey and return one or more rows.
> Adding a user defined table function should:
> 1. inherit from UDTF class with specific generic type T
> 2. define one or more evel function.
> NOTE:
> 1. the eval method must be public and non-static.
> 2. the generic type T is the row type returned by table function. Because of
> Java type erasure, we can’t extract T from the Iterable.
> 3. use {{collect(T)}} to emit table row
> 4. eval method can be overload. Blink will choose the best match eval method
> to call according to parameter types and number.
> {code}
> public class Word {
> public String word;
> public Integer length;
> }
> public class SplitStringUDTF extends UDTF<Word> {
> public Iterable<Word> eval(String str) {
> if (str != null) {
> for (String s : str.split(",")) {
> collect(new Word(s, s.length()));
> }
> }
> }
> }
> // in SQL
> tableEnv.registerFunction("split", new SplitStringUDTF())
> tableEnv.sql("SELECT a, b, t.* FROM MyTable, LATERAL TABLE(split(c)) AS
> t(w,l)")
> // in Java Table API
> tableEnv.registerFunction("split", new SplitStringUDTF())
> // rename split table columns to “w” and “l”
> table.crossApply("split(c) as (w, l)")
> .select("a, b, w, l")
> // without renaming, we will use the origin field names in the POJO/case/...
> table.crossApply("split(c)")
> .select("a, b, word, length")
> // in Scala Table API
> val split = new SplitStringUDTF()
> table.crossApply(split('c) as ('w, 'l))
> .select('a, 'b, 'w, 'l)
> // outerApply for outer join to a UDTF
> table.outerApply(split('c))
> .select('a, 'b, 'word, 'length)
> {code}
> See [1] for more information about UDTF design.
> [1]
> https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzbbuVFPZWBYuY1Ek/edit#
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)