Hi Pierre,

I guess your UDF is registered by the method 'register_java_function' which 
uses the old type system. In this situation you need to override the 
'getResultType' method instead of adding type hint. 

You can also try to register your UDF via the "CREATE FUNCTION" sql statement, 
which accepts the type hint.

Best,
Wei

> 在 2020年11月17日,19:29,Pierre Oberholzer <pierre.oberhol...@gmail.com> 写道:
> 
> Hi Wei,
> 
> Thanks for your suggestion. Same error.
> 
> Scala UDF
> 
> @FunctionHint(output = new DataTypeHint("ROW<s STRING,t STRING>"))
> class dummyMap() extends ScalarFunction {
>   def eval(): Row = {
>     Row.of(java.lang.String.valueOf("foo"), java.lang.String.valueOf("bar"))
>   }
> }
> 
> Best regards,
> 
> Le mar. 17 nov. 2020 à 10:04, Wei Zhong <weizhong0...@gmail.com 
> <mailto:weizhong0...@gmail.com>> a écrit :
> Hi Pierre,
> 
> You can try to replace the '@DataTypeHint("ROW<s STRING,t STRING>")' with 
> '@FunctionHint(output = new DataTypeHint("ROW<s STRING,t STRING>”))'
> 
> Best,
> Wei
> 
>> 在 2020年11月17日,15:45,Pierre Oberholzer <pierre.oberhol...@gmail.com 
>> <mailto:pierre.oberhol...@gmail.com>> 写道:
>> 
>> Hi Dian, Community,
>> 
>> (bringing the thread back to wider audience)
>> 
>> As you suggested, I've tried to use DataTypeHint with Row instead of Map but 
>> also this simple case leads to a type mismatch between UDF and Table API.
>> I've also tried other Map objects from Flink (table.data.MapData, 
>> flink.types.MapValue, flink.table.api.DataTypes.MAP) in addition to Java 
>> (java.util.Map) in combination with DataTypeHint, without success.
>> N.B. I'm using version 1.11.
>> 
>> Am I doing something wrong or am I facing limitations in the toolkit ?
>> 
>> Thanks in advance for your support !
>> 
>> Best regards,
>> 
>> Scala UDF
>> 
>> class dummyMap() extends ScalarFunction {
>> 
>>  @DataTypeHint("ROW<s STRING,t STRING>")
>>  def eval(): Row = {
>> 
>>     Row.of(java.lang.String.valueOf("foo"), java.lang.String.valueOf("bar"))
>> 
>>   }
>> }
>> 
>> Table DDL
>> 
>> my_sink_ddl = f"""
>>     create table mySink (
>>         output_of_my_scala_udf ROW<s STRING,t STRING>
>>     ) with (
>>         ...
>>     )
>> """
>> 
>> Error
>> 
>> Py4JJavaError: An error occurred while calling o2.execute.
>> : org.apache.flink.table.api.ValidationException: Field types of query 
>> result and registered TableSink 
>> `default_catalog`.`default_database`.`mySink` do not match.
>> Query result schema: [output_of_my_scala_udf: 
>> GenericType<org.apache.flink.types.Row>]
>> TableSink schema:    [output_of_my_scala_udf: Row(s: String, t: String)]
>> 
>> 
>> 
>> Le ven. 13 nov. 2020 à 11:59, Pierre Oberholzer <pierre.oberhol...@gmail.com 
>> <mailto:pierre.oberhol...@gmail.com>> a écrit :
>> Thanks Dian, but same error when using explicit returned type:
>> 
>> class dummyMap() extends ScalarFunction {
>> 
>>   def eval() : util.Map[java.lang.String,java.lang.String] = {
>>     
>>     val states = Map("key1" -> "val1", "key2" -> "val2")
>>     states.asInstanceOf[util.Map[java.lang.String,java.lang.String]]
>> 
>>   }
>> }
>> 
>> Le ven. 13 nov. 2020 à 10:34, Dian Fu <dian0511...@gmail.com 
>> <mailto:dian0511...@gmail.com>> a écrit :
>> You need to explicitly defined the result type the UDF. You could refer to 
>> [1] for more details if you are using Flink 1.11. If you are using other 
>> versions of Flink, you need to refer to the corresponding documentation.
>> 
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide
>>  
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide>
>>> 在 2020年11月13日,下午4:56,Pierre Oberholzer <pierre.oberhol...@gmail.com 
>>> <mailto:pierre.oberhol...@gmail.com>> 写道:
>>> 
>>> ScalarFunction
>> 
>> 
>> 
>> -- 
>> Pierre
>> 
>> -- 
>> Pierre
> 
> 
> 
> -- 
> Pierre

Reply via email to