Hi Pierre,

You can try to replace the '@DataTypeHint("ROW<s STRING,t STRING>")' with 
'@FunctionHint(output = new DataTypeHint("ROW<s STRING,t STRING>”))'

Best,
Wei

> 在 2020年11月17日,15:45,Pierre Oberholzer <pierre.oberhol...@gmail.com> 写道:
> 
> Hi Dian, Community,
> 
> (bringing the thread back to wider audience)
> 
> As you suggested, I've tried to use DataTypeHint with Row instead of Map but 
> also this simple case leads to a type mismatch between UDF and Table API.
> I've also tried other Map objects from Flink (table.data.MapData, 
> flink.types.MapValue, flink.table.api.DataTypes.MAP) in addition to Java 
> (java.util.Map) in combination with DataTypeHint, without success.
> N.B. I'm using version 1.11.
> 
> Am I doing something wrong or am I facing limitations in the toolkit ?
> 
> Thanks in advance for your support !
> 
> Best regards,
> 
> Scala UDF
> 
> class dummyMap() extends ScalarFunction {
> 
>  @DataTypeHint("ROW<s STRING,t STRING>")
>  def eval(): Row = {
> 
>     Row.of(java.lang.String.valueOf("foo"), java.lang.String.valueOf("bar"))
> 
>   }
> }
> 
> Table DDL
> 
> my_sink_ddl = f"""
>     create table mySink (
>         output_of_my_scala_udf ROW<s STRING,t STRING>
>     ) with (
>         ...
>     )
> """
> 
> Error
> 
> Py4JJavaError: An error occurred while calling o2.execute.
> : org.apache.flink.table.api.ValidationException: Field types of query result 
> and registered TableSink `default_catalog`.`default_database`.`mySink` do not 
> match.
> Query result schema: [output_of_my_scala_udf: 
> GenericType<org.apache.flink.types.Row>]
> TableSink schema:    [output_of_my_scala_udf: Row(s: String, t: String)]
> 
> 
> 
> Le ven. 13 nov. 2020 à 11:59, Pierre Oberholzer <pierre.oberhol...@gmail.com 
> <mailto:pierre.oberhol...@gmail.com>> a écrit :
> Thanks Dian, but same error when using explicit returned type:
> 
> class dummyMap() extends ScalarFunction {
> 
>   def eval() : util.Map[java.lang.String,java.lang.String] = {
>     
>     val states = Map("key1" -> "val1", "key2" -> "val2")
>     states.asInstanceOf[util.Map[java.lang.String,java.lang.String]]
> 
>   }
> }
> 
> Le ven. 13 nov. 2020 à 10:34, Dian Fu <dian0511...@gmail.com 
> <mailto:dian0511...@gmail.com>> a écrit :
> You need to explicitly defined the result type the UDF. You could refer to 
> [1] for more details if you are using Flink 1.11. If you are using other 
> versions of Flink, you need to refer to the corresponding documentation.
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide>
>> 在 2020年11月13日,下午4:56,Pierre Oberholzer <pierre.oberhol...@gmail.com 
>> <mailto:pierre.oberhol...@gmail.com>> 写道:
>> 
>> ScalarFunction
> 
> 
> 
> -- 
> Pierre
> 
> -- 
> Pierre

Reply via email to