Hi Wei,

It works ! Thanks a lot for your support.
I hadn't tried this last combination for option 1, and I had wrong
syntax for option 2.

So to summarize..

Methods working:
- Current: DataTypeHint in UDF definition + SQL for UDF registering
- Outdated: override getResultType in UDF definition
+ t_env.register_java_function for UDF registering

Type conversions working:
- scala.collection.immutable.Map[String,String] =>
org.apache.flink.types.Row => ROW<STRING,STRING>
- scala.collection.immutable.Map[String,String] =>
java.util.Map[String,String] => MAP<STRING,STRING>

Any hint for Map[String,Any] ?

Best regards,

Le mer. 18 nov. 2020 à 03:26, Wei Zhong <weizhong0...@gmail.com> a écrit :

> Hi Pierre,
>
> Those 2 approaches all work in my local machine, this is my code:
>
> Scala UDF:
>
> package com.dummy
>
> import org.apache.flink.api.common.typeinfo.TypeInformation
> import org.apache.flink.table.annotation.DataTypeHint
> import org.apache.flink.table.api.Types
> import org.apache.flink.table.functions.ScalarFunction
> import org.apache.flink.types.Row
>
> /**
>   * The scala UDF.
>   */
> class dummyMap extends ScalarFunction {
>
>   // If the udf would be registered by the SQL statement, you need add this 
> typehint
>   @DataTypeHint("ROW<s STRING,t STRING>")
>   def eval(): Row = {
>
>     Row.of(java.lang.String.valueOf("foo"), java.lang.String.valueOf("bar"))
>
>   }
>
>   // If the udf would be registered by the method 'register_java_function', 
> you need override this
>   // method.
>   override def getResultType(signature: Array[Class[_]]): TypeInformation[_] 
> = {
>     // The type of the return values should be TypeInformation
>     Types.ROW(Array("s", "t"), Array[TypeInformation[_]](Types.STRING(), 
> Types.STRING()))
>   }
> }
>
> Python code:
>
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import StreamTableEnvironment
>
> s_env = StreamExecutionEnvironment.get_execution_environment()
> st_env = StreamTableEnvironment.create(s_env)
>
> # load the scala udf jar file, the path should be modified to yours
> # or your can also load the jar file via other approaches
> st_env.get_config().get_configuration().set_string("pipeline.jars", "
> file:///Users/zhongwei/the-dummy-udf.jar")
>
> # register the udf via
> st_env.execute_sql("CREATE FUNCTION dummyMap AS 'com.dummy.dummyMap'
> LANGUAGE SCALA")
> # or register via the method
> # st_env.register_java_function("dummyMap", "com.dummy.dummyMap")
>
> # prepare source and sink
> t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a',
> 'b', 'c'])
> st_env.execute_sql("""create table mySink (
>         output_of_my_scala_udf ROW<s STRING,t STRING>
>     ) with (
>         'connector' = 'print'
>     )""")
>
> # execute query
>
> t.select("dummyMap()").execute_insert("mySink").get_job_client().get_job_execution_result().result()
>
> Best,
> Wei
>
> 在 2020年11月18日,03:28,Pierre Oberholzer <pierre.oberhol...@gmail.com> 写道:
>
> Hi Wei,
>
> True, I'm using the method you mention, but glad to change.
> I tried your suggestion instead, but got a similar error.
>
> Thanks for your support. That is much more tedious than I thought.
>
> *Option 1 - SQL UDF*
>
> *SQL UDF*
> create_func_ddl = """
> CREATE FUNCTION dummyMap
>   AS 'com.dummy.dummyMap' LANGUAGE SCALA
> """
>
> t_env.execute_sql(create_func_ddl)
>
> *Error*
> Py4JJavaError: An error occurred while calling o672.execute.
> : org.apache.flink.table.api.TableException: Result field does not match
> requested type. Requested: Row(s: String, t: String); Actual:
> GenericType<org.apache.flink.types.Row>
>
> *Option 2 *- *Overriding getResultType*
>
> Back to the old registering method, but overriding getResultType:
>
> t_env.register_java_function("dummyMap","com.dummy.dummyMap")
>
> *Scala UDF*
> class dummyMap() extends ScalarFunction {
>
>   def eval(): Row = {
>
>       Row.of(java.lang.String.valueOf("foo"),
> java.lang.String.valueOf("bar"))
>
>   }
>
>   override def getResultType(signature: Array[Class[_]]):
> TypeInformation[_] = DataTypes.ROW(DataTypes.STRING,DataTypes.STRING)
> }
>
> *Error (on compilation)*
>
> [error] dummyMap.scala:66:90: overloaded method value ROW with
> alternatives:
> [error]   (x$1:
> org.apache.flink.table.api.DataTypes.AbstractField*)org.apache.flink.table.types.UnresolvedDataType
> <and>
> [error]   ()org.apache.flink.table.types.DataType <and>
> [error]   (x$1:
> org.apache.flink.table.api.DataTypes.Field*)org.apache.flink.table.types.DataType
> [error]  cannot be applied to (org.apache.flink.table.types.DataType,
> org.apache.flink.table.types.DataType)
> [error]   override def getResultType(signature: Array[Class[_]]):
> TypeInformation[_] = DataTypes.ROW(DataTypes.STRING,DataTypes.STRING)
> [error]
>                         ^
> [error] one error found
> [error] (Compile / compileIncremental) Compilation failed
> [error] Total time: 3 s, completed 17 nov. 2020 à 20:00:01
>
> Le mar. 17 nov. 2020 à 14:01, Wei Zhong <weizhong0...@gmail.com> a écrit :
>
>> Hi Pierre,
>>
>> I guess your UDF is registered by the method 'register_java_function'
>> which uses the old type system. In this situation you need to override the
>> 'getResultType' method instead of adding type hint.
>>
>> You can also try to register your UDF via the "CREATE FUNCTION" sql
>> statement, which accepts the type hint.
>>
>> Best,
>> Wei
>>
>> 在 2020年11月17日,19:29,Pierre Oberholzer <pierre.oberhol...@gmail.com> 写道:
>>
>> Hi Wei,
>>
>> Thanks for your suggestion. Same error.
>>
>> *Scala UDF*
>>
>> @FunctionHint(output = new DataTypeHint("ROW<s STRING,t STRING>"))
>> class dummyMap() extends ScalarFunction {
>>   def eval(): Row = {
>>     Row.of(java.lang.String.valueOf("foo"),
>> java.lang.String.valueOf("bar"))
>>   }
>> }
>>
>> Best regards,
>>
>> Le mar. 17 nov. 2020 à 10:04, Wei Zhong <weizhong0...@gmail.com> a
>> écrit :
>>
>>> Hi Pierre,
>>>
>>> You can try to replace the '@DataTypeHint("ROW<s STRING,t
>>> STRING>")' with '@FunctionHint(output = new DataTypeHint("ROW<s STRING,t
>>> STRING>”))'
>>>
>>> Best,
>>> Wei
>>>
>>> 在 2020年11月17日,15:45,Pierre Oberholzer <pierre.oberhol...@gmail.com> 写道:
>>>
>>> Hi Dian, Community,
>>>
>>> (bringing the thread back to wider audience)
>>>
>>> As you suggested, I've tried to use DataTypeHint with Row instead of Map but
>>> also this simple case leads to a type mismatch between UDF and Table API.
>>> I've also tried other Map objects from Flink (table.data.MapData,
>>> flink.types.MapValue, flink.table.api.DataTypes.MAP) in addition to Java
>>> (java.util.Map) in combination with DataTypeHint, without success.
>>> N.B. I'm using version 1.11.
>>>
>>> Am I doing something wrong or am I facing limitations in the toolkit ?
>>>
>>> Thanks in advance for your support !
>>>
>>> Best regards,
>>>
>>> *Scala UDF*
>>>
>>> class dummyMap() extends ScalarFunction {
>>>
>>>  @DataTypeHint("ROW<s STRING,t STRING>")
>>>  def eval(): Row = {
>>>
>>>     Row.of(java.lang.String.valueOf("foo"),
>>> java.lang.String.valueOf("bar"))
>>>
>>>   }
>>> }
>>>
>>> *Table DDL*
>>>
>>> my_sink_ddl = f"""
>>>     create table mySink (
>>>         output_of_my_scala_udf ROW<s STRING,t STRING>
>>>     ) with (
>>>         ...
>>>     )
>>> """
>>>
>>> *Error*
>>>
>>> Py4JJavaError: An error occurred while calling o2.execute.
>>> : org.apache.flink.table.api.ValidationException: Field types of query
>>> result and registered TableSink
>>> `default_catalog`.`default_database`.`mySink` do not match.
>>> Query result schema: [output_of_my_scala_udf:
>>> GenericType<org.apache.flink.types.Row>]
>>> TableSink schema:    [output_of_my_scala_udf: Row(s: String, t: String)]
>>>
>>>
>>>
>>> Le ven. 13 nov. 2020 à 11:59, Pierre Oberholzer <
>>> pierre.oberhol...@gmail.com> a écrit :
>>>
>>>> Thanks Dian, but same error when using explicit returned type:
>>>>
>>>> class dummyMap() extends ScalarFunction {
>>>>
>>>>   def eval() : util.Map[java.lang.String,java.lang.String] = {
>>>>
>>>>     val states = Map("key1" -> "val1", "key2" -> "val2")
>>>>     states.asInstanceOf[util.Map[java.lang.String,java.lang.String]]
>>>>
>>>>   }
>>>> }
>>>>
>>>> Le ven. 13 nov. 2020 à 10:34, Dian Fu <dian0511...@gmail.com> a écrit :
>>>>
>>>>> You need to explicitly defined the result type the UDF. You could
>>>>> refer to [1] for more details if you are using Flink 1.11. If you are 
>>>>> using
>>>>> other versions of Flink, you need to refer to the corresponding
>>>>> documentation.
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide
>>>>>
>>>>> 在 2020年11月13日,下午4:56,Pierre Oberholzer <pierre.oberhol...@gmail.com>
>>>>> 写道:
>>>>>
>>>>> ScalarFunction
>>>>>
>>>>>
>>>>>
>>>>
>>>> --
>>>> Pierre
>>>>
>>>
>>> --
>>> Pierre
>>>
>>>
>>>
>>
>> --
>> Pierre
>>
>>
>>
>
> --
> Pierre
>
>
>

-- 
Pierre

Reply via email to