Hi Wei,
Thanks for your suggestion. Same error.
*Scala UDF*
@FunctionHint(output = new DataTypeHint("ROW<s STRING,t STRING>"))
class dummyMap() extends ScalarFunction {
def eval(): Row = {
Row.of(java.lang.String.valueOf("foo"), java.lang.String.valueOf("bar"))
}
}
Best regards,
Le mar. 17 nov. 2020 à 10:04, Wei Zhong <[email protected]> a écrit :
> Hi Pierre,
>
> You can try to replace the '@DataTypeHint("ROW<s STRING,t STRING>")' with
> '@FunctionHint(output = new DataTypeHint("ROW<s STRING,t STRING>”))'
>
> Best,
> Wei
>
> 在 2020年11月17日,15:45,Pierre Oberholzer <[email protected]> 写道:
>
> Hi Dian, Community,
>
> (bringing the thread back to wider audience)
>
> As you suggested, I've tried to use DataTypeHint with Row instead of Map but
> also this simple case leads to a type mismatch between UDF and Table API.
> I've also tried other Map objects from Flink (table.data.MapData,
> flink.types.MapValue, flink.table.api.DataTypes.MAP) in addition to Java
> (java.util.Map) in combination with DataTypeHint, without success.
> N.B. I'm using version 1.11.
>
> Am I doing something wrong or am I facing limitations in the toolkit ?
>
> Thanks in advance for your support !
>
> Best regards,
>
> *Scala UDF*
>
> class dummyMap() extends ScalarFunction {
>
> @DataTypeHint("ROW<s STRING,t STRING>")
> def eval(): Row = {
>
> Row.of(java.lang.String.valueOf("foo"),
> java.lang.String.valueOf("bar"))
>
> }
> }
>
> *Table DDL*
>
> my_sink_ddl = f"""
> create table mySink (
> output_of_my_scala_udf ROW<s STRING,t STRING>
> ) with (
> ...
> )
> """
>
> *Error*
>
> Py4JJavaError: An error occurred while calling o2.execute.
> : org.apache.flink.table.api.ValidationException: Field types of query
> result and registered TableSink
> `default_catalog`.`default_database`.`mySink` do not match.
> Query result schema: [output_of_my_scala_udf:
> GenericType<org.apache.flink.types.Row>]
> TableSink schema: [output_of_my_scala_udf: Row(s: String, t: String)]
>
>
>
> Le ven. 13 nov. 2020 à 11:59, Pierre Oberholzer <
> [email protected]> a écrit :
>
>> Thanks Dian, but same error when using explicit returned type:
>>
>> class dummyMap() extends ScalarFunction {
>>
>> def eval() : util.Map[java.lang.String,java.lang.String] = {
>>
>> val states = Map("key1" -> "val1", "key2" -> "val2")
>> states.asInstanceOf[util.Map[java.lang.String,java.lang.String]]
>>
>> }
>> }
>>
>> Le ven. 13 nov. 2020 à 10:34, Dian Fu <[email protected]> a écrit :
>>
>>> You need to explicitly defined the result type the UDF. You could refer
>>> to [1] for more details if you are using Flink 1.11. If you are using other
>>> versions of Flink, you need to refer to the corresponding documentation.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide
>>>
>>> 在 2020年11月13日,下午4:56,Pierre Oberholzer <[email protected]> 写道:
>>>
>>> ScalarFunction
>>>
>>>
>>>
>>
>> --
>> Pierre
>>
>
> --
> Pierre
>
>
>
--
Pierre