[
https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15674232#comment-15674232
]
ASF GitHub Bot commented on FLINK-4469:
---------------------------------------
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/2653#discussion_r88483859
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/UserDefinedFunctionUtils.scala
---
@@ -162,24 +191,107 @@ object UserDefinedFunctionUtils {
}
/**
+ * Internal method of [[ScalarFunction#getResultType()]] that does some
pre-checking and uses
+ * [[TypeExtractor]] as default return type inference.
+ */
+ def getResultType(
+ tableFunction: TableFunction[_],
+ signature: Array[Class[_]])
+ : TypeInformation[_] = {
+ // find method for signature
+ val evalMethod = tableFunction.getEvalMethods
+ .find(m => signature.sameElements(m.getParameterTypes))
+ .getOrElse(throw new ValidationException("Given signature is
invalid."))
+
+ val userDefinedTypeInfo = tableFunction.getResultType
+ if (userDefinedTypeInfo != null) {
+ userDefinedTypeInfo
+ } else {
+ try {
+ TypeExtractor.getForClass(evalMethod.getReturnType)
+ } catch {
+ case ite: InvalidTypesException =>
+ throw new ValidationException(
+ s"Return type of table function '$this' cannot be " +
+ s"automatically determined. Please provide type information
manually.")
+ }
+ }
+ }
+
+ /**
* Returns the return type of the evaluation method matching the given
signature.
*/
def getResultTypeClass(
- scalarFunction: ScalarFunction,
+ function: EvaluableFunction,
signature: Array[Class[_]])
: Class[_] = {
// find method for signature
- val evalMethod = scalarFunction.getEvalMethods
+ val evalMethod = function.getEvalMethods
.find(m => signature.sameElements(m.getParameterTypes))
.getOrElse(throw new IllegalArgumentException("Given signature is
invalid."))
evalMethod.getReturnType
}
/**
- * Prints all signatures of a [[ScalarFunction]].
+ * Prints all signatures of a [[EvaluableFunction]].
*/
- def signaturesToString(scalarFunction: ScalarFunction): String = {
- scalarFunction.getSignatures.map(signatureToString).mkString(", ")
+ def signaturesToString(function: EvaluableFunction): String = {
+ function.getSignatures.map(signatureToString).mkString(", ")
}
+ /**
+ * Returns field names and field positions for a given
[[TypeInformation]].
+ *
+ * Field names are automatically extracted for
+ * [[org.apache.flink.api.common.typeutils.CompositeType]].
+ *
+ * @param inputType The TypeInformation extract the field names and
positions from.
+ * @return A tuple of two arrays holding the field names and
corresponding field positions.
+ */
+ def getFieldInfo(inputType: TypeInformation[_])
+ : (Array[String], Array[Int]) = {
+ val fieldNames: Array[String] = inputType match {
+ case t: TupleTypeInfo[_] => t.getFieldNames
--- End diff --
Why don't you use `CompositeType` here?
> Add support for user defined table function in Table API & SQL
> --------------------------------------------------------------
>
> Key: FLINK-4469
> URL: https://issues.apache.org/jira/browse/FLINK-4469
> Project: Flink
> Issue Type: New Feature
> Components: Table API & SQL
> Reporter: Jark Wu
> Assignee: Jark Wu
>
> Normal user-defined functions, such as concat(), take in a single input row
> and output a single output row. In contrast, table-generating functions
> transform a single input row to multiple output rows. It is very useful in
> some cases, such as look up in HBase by rowkey and return one or more rows.
> Adding a user defined table function should:
> 1. inherit from UDTF class with specific generic type T
> 2. define one or more evel function.
> NOTE:
> 1. the eval method must be public and non-static.
> 2. the generic type T is the row type returned by table function. Because of
> Java type erasure, we can’t extract T from the Iterable.
> 3. use {{collect(T)}} to emit table row
> 4. eval method can be overload. Blink will choose the best match eval method
> to call according to parameter types and number.
> {code}
> public class Word {
> public String word;
> public Integer length;
> }
> public class SplitStringUDTF extends UDTF<Word> {
> public Iterable<Word> eval(String str) {
> if (str != null) {
> for (String s : str.split(",")) {
> collect(new Word(s, s.length()));
> }
> }
> }
> }
> // in SQL
> tableEnv.registerFunction("split", new SplitStringUDTF())
> tableEnv.sql("SELECT a, b, t.* FROM MyTable, LATERAL TABLE(split(c)) AS
> t(w,l)")
> // in Java Table API
> tableEnv.registerFunction("split", new SplitStringUDTF())
> // rename split table columns to “w” and “l”
> table.crossApply("split(c) as (w, l)")
> .select("a, b, w, l")
> // without renaming, we will use the origin field names in the POJO/case/...
> table.crossApply("split(c)")
> .select("a, b, word, length")
> // in Scala Table API
> val split = new SplitStringUDTF()
> table.crossApply(split('c) as ('w, 'l))
> .select('a, 'b, 'w, 'l)
> // outerApply for outer join to a UDTF
> table.outerApply(split('c))
> .select('a, 'b, 'word, 'length)
> {code}
> See [1] for more information about UDTF design.
> [1]
> https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzbbuVFPZWBYuY1Ek/edit#
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)