Hi Wei,
True, I'm using the method you mention, but glad to change.
I tried your suggestion instead, but got a similar error.
Thanks for your support. That is much more tedious than I thought.
*Option 1 - SQL UDF*
*SQL UDF*
create_func_ddl = """
CREATE FUNCTION dummyMap
AS 'com.dummy.dummyMap' LANGUAGE SCALA
"""
t_env.execute_sql(create_func_ddl)
*Error*
Py4JJavaError: An error occurred while calling o672.execute.
: org.apache.flink.table.api.TableException: Result field does not match
requested type. Requested: Row(s: String, t: String); Actual:
GenericType<org.apache.flink.types.Row>
*Option 2 *- *Overriding getResultType*
Back to the old registering method, but overriding getResultType:
t_env.register_java_function("dummyMap","com.dummy.dummyMap")
*Scala UDF*
class dummyMap() extends ScalarFunction {
def eval(): Row = {
Row.of(java.lang.String.valueOf("foo"),
java.lang.String.valueOf("bar"))
}
override def getResultType(signature: Array[Class[_]]):
TypeInformation[_] = DataTypes.ROW(DataTypes.STRING,DataTypes.STRING)
}
*Error (on compilation)*
[error] dummyMap.scala:66:90: overloaded method value ROW with alternatives:
[error] (x$1:
org.apache.flink.table.api.DataTypes.AbstractField*)org.apache.flink.table.types.UnresolvedDataType
<and>
[error] ()org.apache.flink.table.types.DataType <and>
[error] (x$1:
org.apache.flink.table.api.DataTypes.Field*)org.apache.flink.table.types.DataType
[error] cannot be applied to (org.apache.flink.table.types.DataType,
org.apache.flink.table.types.DataType)
[error] override def getResultType(signature: Array[Class[_]]):
TypeInformation[_] = DataTypes.ROW(DataTypes.STRING,DataTypes.STRING)
[error]
^
[error] one error found
[error] (Compile / compileIncremental) Compilation failed
[error] Total time: 3 s, completed 17 nov. 2020 à 20:00:01
Le mar. 17 nov. 2020 à 14:01, Wei Zhong <[email protected]> a écrit :
> Hi Pierre,
>
> I guess your UDF is registered by the method 'register_java_function'
> which uses the old type system. In this situation you need to override the
> 'getResultType' method instead of adding type hint.
>
> You can also try to register your UDF via the "CREATE FUNCTION" sql
> statement, which accepts the type hint.
>
> Best,
> Wei
>
> 在 2020年11月17日,19:29,Pierre Oberholzer <[email protected]> 写道:
>
> Hi Wei,
>
> Thanks for your suggestion. Same error.
>
> *Scala UDF*
>
> @FunctionHint(output = new DataTypeHint("ROW<s STRING,t STRING>"))
> class dummyMap() extends ScalarFunction {
> def eval(): Row = {
> Row.of(java.lang.String.valueOf("foo"),
> java.lang.String.valueOf("bar"))
> }
> }
>
> Best regards,
>
> Le mar. 17 nov. 2020 à 10:04, Wei Zhong <[email protected]> a écrit :
>
>> Hi Pierre,
>>
>> You can try to replace the '@DataTypeHint("ROW<s STRING,t STRING>")' with
>> '@FunctionHint(output = new DataTypeHint("ROW<s STRING,t STRING>”))'
>>
>> Best,
>> Wei
>>
>> 在 2020年11月17日,15:45,Pierre Oberholzer <[email protected]> 写道:
>>
>> Hi Dian, Community,
>>
>> (bringing the thread back to wider audience)
>>
>> As you suggested, I've tried to use DataTypeHint with Row instead of Map but
>> also this simple case leads to a type mismatch between UDF and Table API.
>> I've also tried other Map objects from Flink (table.data.MapData,
>> flink.types.MapValue, flink.table.api.DataTypes.MAP) in addition to Java
>> (java.util.Map) in combination with DataTypeHint, without success.
>> N.B. I'm using version 1.11.
>>
>> Am I doing something wrong or am I facing limitations in the toolkit ?
>>
>> Thanks in advance for your support !
>>
>> Best regards,
>>
>> *Scala UDF*
>>
>> class dummyMap() extends ScalarFunction {
>>
>> @DataTypeHint("ROW<s STRING,t STRING>")
>> def eval(): Row = {
>>
>> Row.of(java.lang.String.valueOf("foo"),
>> java.lang.String.valueOf("bar"))
>>
>> }
>> }
>>
>> *Table DDL*
>>
>> my_sink_ddl = f"""
>> create table mySink (
>> output_of_my_scala_udf ROW<s STRING,t STRING>
>> ) with (
>> ...
>> )
>> """
>>
>> *Error*
>>
>> Py4JJavaError: An error occurred while calling o2.execute.
>> : org.apache.flink.table.api.ValidationException: Field types of query
>> result and registered TableSink
>> `default_catalog`.`default_database`.`mySink` do not match.
>> Query result schema: [output_of_my_scala_udf:
>> GenericType<org.apache.flink.types.Row>]
>> TableSink schema: [output_of_my_scala_udf: Row(s: String, t: String)]
>>
>>
>>
>> Le ven. 13 nov. 2020 à 11:59, Pierre Oberholzer <
>> [email protected]> a écrit :
>>
>>> Thanks Dian, but same error when using explicit returned type:
>>>
>>> class dummyMap() extends ScalarFunction {
>>>
>>> def eval() : util.Map[java.lang.String,java.lang.String] = {
>>>
>>> val states = Map("key1" -> "val1", "key2" -> "val2")
>>> states.asInstanceOf[util.Map[java.lang.String,java.lang.String]]
>>>
>>> }
>>> }
>>>
>>> Le ven. 13 nov. 2020 à 10:34, Dian Fu <[email protected]> a écrit :
>>>
>>>> You need to explicitly defined the result type the UDF. You could refer
>>>> to [1] for more details if you are using Flink 1.11. If you are using other
>>>> versions of Flink, you need to refer to the corresponding documentation.
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide
>>>>
>>>> 在 2020年11月13日,下午4:56,Pierre Oberholzer <[email protected]>
>>>> 写道:
>>>>
>>>> ScalarFunction
>>>>
>>>>
>>>>
>>>
>>> --
>>> Pierre
>>>
>>
>> --
>> Pierre
>>
>>
>>
>
> --
> Pierre
>
>
>
--
Pierre