Hi Pierre,

Those 2 approaches all work in my local machine, this is my code:

Scala UDF:
package com.dummy

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.annotation.DataTypeHint
import org.apache.flink.table.api.Types
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.types.Row

/**
  * The scala UDF.
  */
class dummyMap extends ScalarFunction {

  // If the udf would be registered by the SQL statement, you need add this 
typehint
  @DataTypeHint("ROW<s STRING,t STRING>")
  def eval(): Row = {

    Row.of(java.lang.String.valueOf("foo"), java.lang.String.valueOf("bar"))

  }

  // If the udf would be registered by the method 'register_java_function', you 
need override this
  // method.
  override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = {
    // The type of the return values should be TypeInformation
    Types.ROW(Array("s", "t"), Array[TypeInformation[_]](Types.STRING(), 
Types.STRING()))
  }
}
Python code:

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

s_env = StreamExecutionEnvironment.get_execution_environment()
st_env = StreamTableEnvironment.create(s_env)

# load the scala udf jar file, the path should be modified to yours
# or your can also load the jar file via other approaches
st_env.get_config().get_configuration().set_string("pipeline.jars", 
"file:///Users/zhongwei/the-dummy-udf.jar")

# register the udf via 
st_env.execute_sql("CREATE FUNCTION dummyMap AS 'com.dummy.dummyMap' LANGUAGE 
SCALA")
# or register via the method
# st_env.register_java_function("dummyMap", "com.dummy.dummyMap")

# prepare source and sink
t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 
'c'])
st_env.execute_sql("""create table mySink (
        output_of_my_scala_udf ROW<s STRING,t STRING>
    ) with (
        'connector' = 'print'
    )""")

# execute query
t.select("dummyMap()").execute_insert("mySink").get_job_client().get_job_execution_result().result()

Best,
Wei

> 在 2020年11月18日,03:28,Pierre Oberholzer <pierre.oberhol...@gmail.com> 写道:
> 
> Hi Wei,
> 
> True, I'm using the method you mention, but glad to change. 
> I tried your suggestion instead, but got a similar error.
> 
> Thanks for your support. That is much more tedious than I thought.
> 
> Option 1 - SQL UDF
> 
> SQL UDF
> create_func_ddl = """
> CREATE FUNCTION dummyMap 
>   AS 'com.dummy.dummyMap' LANGUAGE SCALA
> """
>   
> t_env.execute_sql(create_func_ddl)
> 
> Error
> Py4JJavaError: An error occurred while calling o672.execute.
> : org.apache.flink.table.api.TableException: Result field does not match 
> requested type. Requested: Row(s: String, t: String); Actual: 
> GenericType<org.apache.flink.types.Row>
> 
> Option 2 - Overriding getResultType
> 
> Back to the old registering method, but overriding getResultType:
> 
> t_env.register_java_function("dummyMap","com.dummy.dummyMap")
> 
> Scala UDF
> class dummyMap() extends ScalarFunction {
> 
>   def eval(): Row = {
> 
>       Row.of(java.lang.String.valueOf("foo"), java.lang.String.valueOf("bar"))
> 
>   }
> 
>   override def getResultType(signature: Array[Class[_]]): TypeInformation[_] 
> = DataTypes.ROW(DataTypes.STRING,DataTypes.STRING)
> }
> 
> Error (on compilation)
> 
> [error] dummyMap.scala:66:90: overloaded method value ROW with alternatives:
> [error]   (x$1: 
> org.apache.flink.table.api.DataTypes.AbstractField*)org.apache.flink.table.types.UnresolvedDataType
>  <and>
> [error]   ()org.apache.flink.table.types.DataType <and>
> [error]   (x$1: 
> org.apache.flink.table.api.DataTypes.Field*)org.apache.flink.table.types.DataType
> [error]  cannot be applied to (org.apache.flink.table.types.DataType, 
> org.apache.flink.table.types.DataType)
> [error]   override def getResultType(signature: Array[Class[_]]): 
> TypeInformation[_] = DataTypes.ROW(DataTypes.STRING,DataTypes.STRING)
> [error]                                                                       
>                    ^
> [error] one error found
> [error] (Compile / compileIncremental) Compilation failed
> [error] Total time: 3 s, completed 17 nov. 2020 à 20:00:01
> 
> Le mar. 17 nov. 2020 à 14:01, Wei Zhong <weizhong0...@gmail.com 
> <mailto:weizhong0...@gmail.com>> a écrit :
> Hi Pierre,
> 
> I guess your UDF is registered by the method 'register_java_function' which 
> uses the old type system. In this situation you need to override the 
> 'getResultType' method instead of adding type hint. 
> 
> You can also try to register your UDF via the "CREATE FUNCTION" sql 
> statement, which accepts the type hint.
> 
> Best,
> Wei
> 
>> 在 2020年11月17日,19:29,Pierre Oberholzer <pierre.oberhol...@gmail.com 
>> <mailto:pierre.oberhol...@gmail.com>> 写道:
>> 
>> Hi Wei,
>> 
>> Thanks for your suggestion. Same error.
>> 
>> Scala UDF
>> 
>> @FunctionHint(output = new DataTypeHint("ROW<s STRING,t STRING>"))
>> class dummyMap() extends ScalarFunction {
>>   def eval(): Row = {
>>     Row.of(java.lang.String.valueOf("foo"), java.lang.String.valueOf("bar"))
>>   }
>> }
>> 
>> Best regards,
>> 
>> Le mar. 17 nov. 2020 à 10:04, Wei Zhong <weizhong0...@gmail.com 
>> <mailto:weizhong0...@gmail.com>> a écrit :
>> Hi Pierre,
>> 
>> You can try to replace the '@DataTypeHint("ROW<s STRING,t STRING>")' with 
>> '@FunctionHint(output = new DataTypeHint("ROW<s STRING,t STRING>”))'
>> 
>> Best,
>> Wei
>> 
>>> 在 2020年11月17日,15:45,Pierre Oberholzer <pierre.oberhol...@gmail.com 
>>> <mailto:pierre.oberhol...@gmail.com>> 写道:
>>> 
>>> Hi Dian, Community,
>>> 
>>> (bringing the thread back to wider audience)
>>> 
>>> As you suggested, I've tried to use DataTypeHint with Row instead of Map 
>>> but also this simple case leads to a type mismatch between UDF and Table 
>>> API.
>>> I've also tried other Map objects from Flink (table.data.MapData, 
>>> flink.types.MapValue, flink.table.api.DataTypes.MAP) in addition to Java 
>>> (java.util.Map) in combination with DataTypeHint, without success.
>>> N.B. I'm using version 1.11.
>>> 
>>> Am I doing something wrong or am I facing limitations in the toolkit ?
>>> 
>>> Thanks in advance for your support !
>>> 
>>> Best regards,
>>> 
>>> Scala UDF
>>> 
>>> class dummyMap() extends ScalarFunction {
>>> 
>>>  @DataTypeHint("ROW<s STRING,t STRING>")
>>>  def eval(): Row = {
>>> 
>>>     Row.of(java.lang.String.valueOf("foo"), java.lang.String.valueOf("bar"))
>>> 
>>>   }
>>> }
>>> 
>>> Table DDL
>>> 
>>> my_sink_ddl = f"""
>>>     create table mySink (
>>>         output_of_my_scala_udf ROW<s STRING,t STRING>
>>>     ) with (
>>>         ...
>>>     )
>>> """
>>> 
>>> Error
>>> 
>>> Py4JJavaError: An error occurred while calling o2.execute.
>>> : org.apache.flink.table.api.ValidationException: Field types of query 
>>> result and registered TableSink 
>>> `default_catalog`.`default_database`.`mySink` do not match.
>>> Query result schema: [output_of_my_scala_udf: 
>>> GenericType<org.apache.flink.types.Row>]
>>> TableSink schema:    [output_of_my_scala_udf: Row(s: String, t: String)]
>>> 
>>> 
>>> 
>>> Le ven. 13 nov. 2020 à 11:59, Pierre Oberholzer 
>>> <pierre.oberhol...@gmail.com <mailto:pierre.oberhol...@gmail.com>> a écrit :
>>> Thanks Dian, but same error when using explicit returned type:
>>> 
>>> class dummyMap() extends ScalarFunction {
>>> 
>>>   def eval() : util.Map[java.lang.String,java.lang.String] = {
>>>     
>>>     val states = Map("key1" -> "val1", "key2" -> "val2")
>>>     states.asInstanceOf[util.Map[java.lang.String,java.lang.String]]
>>> 
>>>   }
>>> }
>>> 
>>> Le ven. 13 nov. 2020 à 10:34, Dian Fu <dian0511...@gmail.com 
>>> <mailto:dian0511...@gmail.com>> a écrit :
>>> You need to explicitly defined the result type the UDF. You could refer to 
>>> [1] for more details if you are using Flink 1.11. If you are using other 
>>> versions of Flink, you need to refer to the corresponding documentation.
>>> 
>>> [1] 
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide
>>>  
>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide>
>>>> 在 2020年11月13日,下午4:56,Pierre Oberholzer <pierre.oberhol...@gmail.com 
>>>> <mailto:pierre.oberhol...@gmail.com>> 写道:
>>>> 
>>>> ScalarFunction
>>> 
>>> 
>>> 
>>> -- 
>>> Pierre
>>> 
>>> -- 
>>> Pierre
>> 
>> 
>> 
>> -- 
>> Pierre
> 
> 
> 
> -- 
> Pierre

Reply via email to