Hi Pierre,
You can try to replace the '@DataTypeHint("ROW<s STRING,t STRING>")' with
'@FunctionHint(output = new DataTypeHint("ROW<s STRING,t STRING>”))'
Best,
Wei
> 在 2020年11月17日,15:45,Pierre Oberholzer <[email protected]> 写道:
>
> Hi Dian, Community,
>
> (bringing the thread back to wider audience)
>
> As you suggested, I've tried to use DataTypeHint with Row instead of Map but
> also this simple case leads to a type mismatch between UDF and Table API.
> I've also tried other Map objects from Flink (table.data.MapData,
> flink.types.MapValue, flink.table.api.DataTypes.MAP) in addition to Java
> (java.util.Map) in combination with DataTypeHint, without success.
> N.B. I'm using version 1.11.
>
> Am I doing something wrong or am I facing limitations in the toolkit ?
>
> Thanks in advance for your support !
>
> Best regards,
>
> Scala UDF
>
> class dummyMap() extends ScalarFunction {
>
> @DataTypeHint("ROW<s STRING,t STRING>")
> def eval(): Row = {
>
> Row.of(java.lang.String.valueOf("foo"), java.lang.String.valueOf("bar"))
>
> }
> }
>
> Table DDL
>
> my_sink_ddl = f"""
> create table mySink (
> output_of_my_scala_udf ROW<s STRING,t STRING>
> ) with (
> ...
> )
> """
>
> Error
>
> Py4JJavaError: An error occurred while calling o2.execute.
> : org.apache.flink.table.api.ValidationException: Field types of query result
> and registered TableSink `default_catalog`.`default_database`.`mySink` do not
> match.
> Query result schema: [output_of_my_scala_udf:
> GenericType<org.apache.flink.types.Row>]
> TableSink schema: [output_of_my_scala_udf: Row(s: String, t: String)]
>
>
>
> Le ven. 13 nov. 2020 à 11:59, Pierre Oberholzer <[email protected]
> <mailto:[email protected]>> a écrit :
> Thanks Dian, but same error when using explicit returned type:
>
> class dummyMap() extends ScalarFunction {
>
> def eval() : util.Map[java.lang.String,java.lang.String] = {
>
> val states = Map("key1" -> "val1", "key2" -> "val2")
> states.asInstanceOf[util.Map[java.lang.String,java.lang.String]]
>
> }
> }
>
> Le ven. 13 nov. 2020 à 10:34, Dian Fu <[email protected]
> <mailto:[email protected]>> a écrit :
> You need to explicitly defined the result type the UDF. You could refer to
> [1] for more details if you are using Flink 1.11. If you are using other
> versions of Flink, you need to refer to the corresponding documentation.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide
>
> <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide>
>> 在 2020年11月13日,下午4:56,Pierre Oberholzer <[email protected]
>> <mailto:[email protected]>> 写道:
>>
>> ScalarFunction
>
>
>
> --
> Pierre
>
> --
> Pierre