Hi Pierre,
Those 2 approaches all work in my local machine, this is my code:
Scala UDF:
package com.dummy
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.annotation.DataTypeHint
import org.apache.flink.table.api.Types
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.types.Row
/**
* The scala UDF.
*/
class dummyMap extends ScalarFunction {
// If the udf would be registered by the SQL statement, you need add this
typehint
@DataTypeHint("ROW<s STRING,t STRING>")
def eval(): Row = {
Row.of(java.lang.String.valueOf("foo"), java.lang.String.valueOf("bar"))
}
// If the udf would be registered by the method 'register_java_function', you
need override this
// method.
override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = {
// The type of the return values should be TypeInformation
Types.ROW(Array("s", "t"), Array[TypeInformation[_]](Types.STRING(),
Types.STRING()))
}
}
Python code:
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
s_env = StreamExecutionEnvironment.get_execution_environment()
st_env = StreamTableEnvironment.create(s_env)
# load the scala udf jar file, the path should be modified to yours
# or your can also load the jar file via other approaches
st_env.get_config().get_configuration().set_string("pipeline.jars",
"file:///Users/zhongwei/the-dummy-udf.jar")
# register the udf via
st_env.execute_sql("CREATE FUNCTION dummyMap AS 'com.dummy.dummyMap' LANGUAGE
SCALA")
# or register via the method
# st_env.register_java_function("dummyMap", "com.dummy.dummyMap")
# prepare source and sink
t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b',
'c'])
st_env.execute_sql("""create table mySink (
output_of_my_scala_udf ROW<s STRING,t STRING>
) with (
'connector' = 'print'
)""")
# execute query
t.select("dummyMap()").execute_insert("mySink").get_job_client().get_job_execution_result().result()
Best,
Wei
> 在 2020年11月18日,03:28,Pierre Oberholzer <[email protected]> 写道:
>
> Hi Wei,
>
> True, I'm using the method you mention, but glad to change.
> I tried your suggestion instead, but got a similar error.
>
> Thanks for your support. That is much more tedious than I thought.
>
> Option 1 - SQL UDF
>
> SQL UDF
> create_func_ddl = """
> CREATE FUNCTION dummyMap
> AS 'com.dummy.dummyMap' LANGUAGE SCALA
> """
>
> t_env.execute_sql(create_func_ddl)
>
> Error
> Py4JJavaError: An error occurred while calling o672.execute.
> : org.apache.flink.table.api.TableException: Result field does not match
> requested type. Requested: Row(s: String, t: String); Actual:
> GenericType<org.apache.flink.types.Row>
>
> Option 2 - Overriding getResultType
>
> Back to the old registering method, but overriding getResultType:
>
> t_env.register_java_function("dummyMap","com.dummy.dummyMap")
>
> Scala UDF
> class dummyMap() extends ScalarFunction {
>
> def eval(): Row = {
>
> Row.of(java.lang.String.valueOf("foo"), java.lang.String.valueOf("bar"))
>
> }
>
> override def getResultType(signature: Array[Class[_]]): TypeInformation[_]
> = DataTypes.ROW(DataTypes.STRING,DataTypes.STRING)
> }
>
> Error (on compilation)
>
> [error] dummyMap.scala:66:90: overloaded method value ROW with alternatives:
> [error] (x$1:
> org.apache.flink.table.api.DataTypes.AbstractField*)org.apache.flink.table.types.UnresolvedDataType
> <and>
> [error] ()org.apache.flink.table.types.DataType <and>
> [error] (x$1:
> org.apache.flink.table.api.DataTypes.Field*)org.apache.flink.table.types.DataType
> [error] cannot be applied to (org.apache.flink.table.types.DataType,
> org.apache.flink.table.types.DataType)
> [error] override def getResultType(signature: Array[Class[_]]):
> TypeInformation[_] = DataTypes.ROW(DataTypes.STRING,DataTypes.STRING)
> [error]
> ^
> [error] one error found
> [error] (Compile / compileIncremental) Compilation failed
> [error] Total time: 3 s, completed 17 nov. 2020 à 20:00:01
>
> Le mar. 17 nov. 2020 à 14:01, Wei Zhong <[email protected]
> <mailto:[email protected]>> a écrit :
> Hi Pierre,
>
> I guess your UDF is registered by the method 'register_java_function' which
> uses the old type system. In this situation you need to override the
> 'getResultType' method instead of adding type hint.
>
> You can also try to register your UDF via the "CREATE FUNCTION" sql
> statement, which accepts the type hint.
>
> Best,
> Wei
>
>> 在 2020年11月17日,19:29,Pierre Oberholzer <[email protected]
>> <mailto:[email protected]>> 写道:
>>
>> Hi Wei,
>>
>> Thanks for your suggestion. Same error.
>>
>> Scala UDF
>>
>> @FunctionHint(output = new DataTypeHint("ROW<s STRING,t STRING>"))
>> class dummyMap() extends ScalarFunction {
>> def eval(): Row = {
>> Row.of(java.lang.String.valueOf("foo"), java.lang.String.valueOf("bar"))
>> }
>> }
>>
>> Best regards,
>>
>> Le mar. 17 nov. 2020 à 10:04, Wei Zhong <[email protected]
>> <mailto:[email protected]>> a écrit :
>> Hi Pierre,
>>
>> You can try to replace the '@DataTypeHint("ROW<s STRING,t STRING>")' with
>> '@FunctionHint(output = new DataTypeHint("ROW<s STRING,t STRING>”))'
>>
>> Best,
>> Wei
>>
>>> 在 2020年11月17日,15:45,Pierre Oberholzer <[email protected]
>>> <mailto:[email protected]>> 写道:
>>>
>>> Hi Dian, Community,
>>>
>>> (bringing the thread back to wider audience)
>>>
>>> As you suggested, I've tried to use DataTypeHint with Row instead of Map
>>> but also this simple case leads to a type mismatch between UDF and Table
>>> API.
>>> I've also tried other Map objects from Flink (table.data.MapData,
>>> flink.types.MapValue, flink.table.api.DataTypes.MAP) in addition to Java
>>> (java.util.Map) in combination with DataTypeHint, without success.
>>> N.B. I'm using version 1.11.
>>>
>>> Am I doing something wrong or am I facing limitations in the toolkit ?
>>>
>>> Thanks in advance for your support !
>>>
>>> Best regards,
>>>
>>> Scala UDF
>>>
>>> class dummyMap() extends ScalarFunction {
>>>
>>> @DataTypeHint("ROW<s STRING,t STRING>")
>>> def eval(): Row = {
>>>
>>> Row.of(java.lang.String.valueOf("foo"), java.lang.String.valueOf("bar"))
>>>
>>> }
>>> }
>>>
>>> Table DDL
>>>
>>> my_sink_ddl = f"""
>>> create table mySink (
>>> output_of_my_scala_udf ROW<s STRING,t STRING>
>>> ) with (
>>> ...
>>> )
>>> """
>>>
>>> Error
>>>
>>> Py4JJavaError: An error occurred while calling o2.execute.
>>> : org.apache.flink.table.api.ValidationException: Field types of query
>>> result and registered TableSink
>>> `default_catalog`.`default_database`.`mySink` do not match.
>>> Query result schema: [output_of_my_scala_udf:
>>> GenericType<org.apache.flink.types.Row>]
>>> TableSink schema: [output_of_my_scala_udf: Row(s: String, t: String)]
>>>
>>>
>>>
>>> Le ven. 13 nov. 2020 à 11:59, Pierre Oberholzer
>>> <[email protected] <mailto:[email protected]>> a écrit :
>>> Thanks Dian, but same error when using explicit returned type:
>>>
>>> class dummyMap() extends ScalarFunction {
>>>
>>> def eval() : util.Map[java.lang.String,java.lang.String] = {
>>>
>>> val states = Map("key1" -> "val1", "key2" -> "val2")
>>> states.asInstanceOf[util.Map[java.lang.String,java.lang.String]]
>>>
>>> }
>>> }
>>>
>>> Le ven. 13 nov. 2020 à 10:34, Dian Fu <[email protected]
>>> <mailto:[email protected]>> a écrit :
>>> You need to explicitly defined the result type the UDF. You could refer to
>>> [1] for more details if you are using Flink 1.11. If you are using other
>>> versions of Flink, you need to refer to the corresponding documentation.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide
>>>
>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide>
>>>> 在 2020年11月13日,下午4:56,Pierre Oberholzer <[email protected]
>>>> <mailto:[email protected]>> 写道:
>>>>
>>>> ScalarFunction
>>>
>>>
>>>
>>> --
>>> Pierre
>>>
>>> --
>>> Pierre
>>
>>
>>
>> --
>> Pierre
>
>
>
> --
> Pierre