Hi Wei,

True, I'm using the method you mention, but glad to change.
I tried your suggestion instead, but got a similar error.

Thanks for your support. That is much more tedious than I thought.

*Option 1 - SQL UDF*

*SQL UDF*
create_func_ddl = """
CREATE FUNCTION dummyMap
  AS 'com.dummy.dummyMap' LANGUAGE SCALA
"""

t_env.execute_sql(create_func_ddl)

*Error*
Py4JJavaError: An error occurred while calling o672.execute.
: org.apache.flink.table.api.TableException: Result field does not match
requested type. Requested: Row(s: String, t: String); Actual:
GenericType<org.apache.flink.types.Row>

*Option 2 *- *Overriding getResultType*

Back to the old registering method, but overriding getResultType:

t_env.register_java_function("dummyMap","com.dummy.dummyMap")

*Scala UDF*
class dummyMap() extends ScalarFunction {

  def eval(): Row = {

      Row.of(java.lang.String.valueOf("foo"),
java.lang.String.valueOf("bar"))

  }

  override def getResultType(signature: Array[Class[_]]):
TypeInformation[_] = DataTypes.ROW(DataTypes.STRING,DataTypes.STRING)
}

*Error (on compilation)*

[error] dummyMap.scala:66:90: overloaded method value ROW with alternatives:

[error]   (x$1:
org.apache.flink.table.api.DataTypes.AbstractField*)org.apache.flink.table.types.UnresolvedDataType
<and>

[error]   ()org.apache.flink.table.types.DataType <and>

[error]   (x$1:
org.apache.flink.table.api.DataTypes.Field*)org.apache.flink.table.types.DataType

[error]  cannot be applied to (org.apache.flink.table.types.DataType,
org.apache.flink.table.types.DataType)

[error]   override def getResultType(signature: Array[Class[_]]):
TypeInformation[_] = DataTypes.ROW(DataTypes.STRING,DataTypes.STRING)

[error]
                      ^

[error] one error found

[error] (Compile / compileIncremental) Compilation failed

[error] Total time: 3 s, completed 17 nov. 2020 à 20:00:01

Le mar. 17 nov. 2020 à 14:01, Wei Zhong <weizhong0...@gmail.com> a écrit :

> Hi Pierre,
>
> I guess your UDF is registered by the method 'register_java_function'
> which uses the old type system. In this situation you need to override the
> 'getResultType' method instead of adding type hint.
>
> You can also try to register your UDF via the "CREATE FUNCTION" sql
> statement, which accepts the type hint.
>
> Best,
> Wei
>
> 在 2020年11月17日,19:29,Pierre Oberholzer <pierre.oberhol...@gmail.com> 写道:
>
> Hi Wei,
>
> Thanks for your suggestion. Same error.
>
> *Scala UDF*
>
> @FunctionHint(output = new DataTypeHint("ROW<s STRING,t STRING>"))
> class dummyMap() extends ScalarFunction {
>   def eval(): Row = {
>     Row.of(java.lang.String.valueOf("foo"),
> java.lang.String.valueOf("bar"))
>   }
> }
>
> Best regards,
>
> Le mar. 17 nov. 2020 à 10:04, Wei Zhong <weizhong0...@gmail.com> a écrit :
>
>> Hi Pierre,
>>
>> You can try to replace the '@DataTypeHint("ROW<s STRING,t STRING>")' with
>> '@FunctionHint(output = new DataTypeHint("ROW<s STRING,t STRING>”))'
>>
>> Best,
>> Wei
>>
>> 在 2020年11月17日,15:45,Pierre Oberholzer <pierre.oberhol...@gmail.com> 写道:
>>
>> Hi Dian, Community,
>>
>> (bringing the thread back to wider audience)
>>
>> As you suggested, I've tried to use DataTypeHint with Row instead of Map but
>> also this simple case leads to a type mismatch between UDF and Table API.
>> I've also tried other Map objects from Flink (table.data.MapData,
>> flink.types.MapValue, flink.table.api.DataTypes.MAP) in addition to Java
>> (java.util.Map) in combination with DataTypeHint, without success.
>> N.B. I'm using version 1.11.
>>
>> Am I doing something wrong or am I facing limitations in the toolkit ?
>>
>> Thanks in advance for your support !
>>
>> Best regards,
>>
>> *Scala UDF*
>>
>> class dummyMap() extends ScalarFunction {
>>
>>  @DataTypeHint("ROW<s STRING,t STRING>")
>>  def eval(): Row = {
>>
>>     Row.of(java.lang.String.valueOf("foo"),
>> java.lang.String.valueOf("bar"))
>>
>>   }
>> }
>>
>> *Table DDL*
>>
>> my_sink_ddl = f"""
>>     create table mySink (
>>         output_of_my_scala_udf ROW<s STRING,t STRING>
>>     ) with (
>>         ...
>>     )
>> """
>>
>> *Error*
>>
>> Py4JJavaError: An error occurred while calling o2.execute.
>> : org.apache.flink.table.api.ValidationException: Field types of query
>> result and registered TableSink
>> `default_catalog`.`default_database`.`mySink` do not match.
>> Query result schema: [output_of_my_scala_udf:
>> GenericType<org.apache.flink.types.Row>]
>> TableSink schema:    [output_of_my_scala_udf: Row(s: String, t: String)]
>>
>>
>>
>> Le ven. 13 nov. 2020 à 11:59, Pierre Oberholzer <
>> pierre.oberhol...@gmail.com> a écrit :
>>
>>> Thanks Dian, but same error when using explicit returned type:
>>>
>>> class dummyMap() extends ScalarFunction {
>>>
>>>   def eval() : util.Map[java.lang.String,java.lang.String] = {
>>>
>>>     val states = Map("key1" -> "val1", "key2" -> "val2")
>>>     states.asInstanceOf[util.Map[java.lang.String,java.lang.String]]
>>>
>>>   }
>>> }
>>>
>>> Le ven. 13 nov. 2020 à 10:34, Dian Fu <dian0511...@gmail.com> a écrit :
>>>
>>>> You need to explicitly defined the result type the UDF. You could refer
>>>> to [1] for more details if you are using Flink 1.11. If you are using other
>>>> versions of Flink, you need to refer to the corresponding documentation.
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide
>>>>
>>>> 在 2020年11月13日,下午4:56,Pierre Oberholzer <pierre.oberhol...@gmail.com>
>>>> 写道:
>>>>
>>>> ScalarFunction
>>>>
>>>>
>>>>
>>>
>>> --
>>> Pierre
>>>
>>
>> --
>> Pierre
>>
>>
>>
>
> --
> Pierre
>
>
>

-- 
Pierre

Reply via email to