Hi Pierre,

Currently there is no type hint like ‘Map[String, Any]’. The recommended way is 
declaring your type more explicitly.

If you insist on doing this, you can try to declaring a RAW data type for 
java.util.HashMap [1], but you may encounter some troubles [2] related to the 
kryo serializers.

Best,
Wei

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html#raw
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html#raw>
[2] 
https://stackoverflow.com/questions/28157236/kryo-serialization-with-nested-hashmap-with-custom-class
 
<https://stackoverflow.com/questions/28157236/kryo-serialization-with-nested-hashmap-with-custom-class>


> 在 2020年11月19日,04:31,Pierre Oberholzer <pierre.oberhol...@gmail.com> 写道:
> 
> Hi Wei,
> 
> It works ! Thanks a lot for your support.
> I hadn't tried this last combination for option 1, and I had wrong syntax for 
> option 2.
> 
> So to summarize..
> 
> Methods working:
> - Current: DataTypeHint in UDF definition + SQL for UDF registering
> - Outdated: override getResultType in UDF definition + 
> t_env.register_java_function for UDF registering
> 
> Type conversions working:
> - scala.collection.immutable.Map[String,String] => org.apache.flink.types.Row 
> => ROW<STRING,STRING>
> - scala.collection.immutable.Map[String,String] => 
> java.util.Map[String,String] => MAP<STRING,STRING>
> 
> Any hint for Map[String,Any] ?
> 
> Best regards,
> 
> Le mer. 18 nov. 2020 à 03:26, Wei Zhong <weizhong0...@gmail.com 
> <mailto:weizhong0...@gmail.com>> a écrit :
> Hi Pierre,
> 
> Those 2 approaches all work in my local machine, this is my code:
> 
> Scala UDF:
> package com.dummy
> 
> import org.apache.flink.api.common.typeinfo.TypeInformation
> import org.apache.flink.table.annotation.DataTypeHint
> import org.apache.flink.table.api.Types
> import org.apache.flink.table.functions.ScalarFunction
> import org.apache.flink.types.Row
> 
> /**
>   * The scala UDF.
>   */
> class dummyMap extends ScalarFunction {
> 
>   // If the udf would be registered by the SQL statement, you need add this 
> typehint
>   @DataTypeHint("ROW<s STRING,t STRING>")
>   def eval(): Row = {
> 
>     Row.of(java.lang.String.valueOf("foo"), java.lang.String.valueOf("bar"))
> 
>   }
> 
>   // If the udf would be registered by the method 'register_java_function', 
> you need override this
>   // method.
>   override def getResultType(signature: Array[Class[_]]): TypeInformation[_] 
> = {
>     // The type of the return values should be TypeInformation
>     Types.ROW(Array("s", "t"), Array[TypeInformation[_]](Types.STRING(), 
> Types.STRING()))
>   }
> }
> Python code:
> 
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import StreamTableEnvironment
> 
> s_env = StreamExecutionEnvironment.get_execution_environment()
> st_env = StreamTableEnvironment.create(s_env)
> 
> # load the scala udf jar file, the path should be modified to yours
> # or your can also load the jar file via other approaches
> st_env.get_config().get_configuration().set_string("pipeline.jars", 
> "file:///Users/zhongwei/the-dummy-udf.jar <>")
> 
> # register the udf via 
> st_env.execute_sql("CREATE FUNCTION dummyMap AS 'com.dummy.dummyMap' LANGUAGE 
> SCALA")
> # or register via the method
> # st_env.register_java_function("dummyMap", "com.dummy.dummyMap")
> 
> # prepare source and sink
> t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 
> 'c'])
> st_env.execute_sql("""create table mySink (
>         output_of_my_scala_udf ROW<s STRING,t STRING>
>     ) with (
>         'connector' = 'print'
>     )""")
> 
> # execute query
> t.select("dummyMap()").execute_insert("mySink").get_job_client().get_job_execution_result().result()
> 
> Best,
> Wei
> 
>> 在 2020年11月18日,03:28,Pierre Oberholzer <pierre.oberhol...@gmail.com 
>> <mailto:pierre.oberhol...@gmail.com>> 写道:
>> 
>> Hi Wei,
>> 
>> True, I'm using the method you mention, but glad to change. 
>> I tried your suggestion instead, but got a similar error.
>> 
>> Thanks for your support. That is much more tedious than I thought.
>> 
>> Option 1 - SQL UDF
>> 
>> SQL UDF
>> create_func_ddl = """
>> CREATE FUNCTION dummyMap 
>>   AS 'com.dummy.dummyMap' LANGUAGE SCALA
>> """
>>   
>> t_env.execute_sql(create_func_ddl)
>> 
>> Error
>> Py4JJavaError: An error occurred while calling o672.execute.
>> : org.apache.flink.table.api.TableException: Result field does not match 
>> requested type. Requested: Row(s: String, t: String); Actual: 
>> GenericType<org.apache.flink.types.Row>
>> 
>> Option 2 - Overriding getResultType
>> 
>> Back to the old registering method, but overriding getResultType:
>> 
>> t_env.register_java_function("dummyMap","com.dummy.dummyMap")
>> 
>> Scala UDF
>> class dummyMap() extends ScalarFunction {
>> 
>>   def eval(): Row = {
>> 
>>       Row.of(java.lang.String.valueOf("foo"), 
>> java.lang.String.valueOf("bar"))
>> 
>>   }
>> 
>>   override def getResultType(signature: Array[Class[_]]): TypeInformation[_] 
>> = DataTypes.ROW(DataTypes.STRING,DataTypes.STRING)
>> }
>> 
>> Error (on compilation)
>> 
>> [error] dummyMap.scala:66:90: overloaded method value ROW with alternatives:
>> [error]   (x$1: 
>> org.apache.flink.table.api.DataTypes.AbstractField*)org.apache.flink.table.types.UnresolvedDataType
>>  <and>
>> [error]   ()org.apache.flink.table.types.DataType <and>
>> [error]   (x$1: 
>> org.apache.flink.table.api.DataTypes.Field*)org.apache.flink.table.types.DataType
>> [error]  cannot be applied to (org.apache.flink.table.types.DataType, 
>> org.apache.flink.table.types.DataType)
>> [error]   override def getResultType(signature: Array[Class[_]]): 
>> TypeInformation[_] = DataTypes.ROW(DataTypes.STRING,DataTypes.STRING)
>> [error]                                                                      
>>                     ^
>> [error] one error found
>> [error] (Compile / compileIncremental) Compilation failed
>> [error] Total time: 3 s, completed 17 nov. 2020 à 20:00:01
>> 
>> Le mar. 17 nov. 2020 à 14:01, Wei Zhong <weizhong0...@gmail.com 
>> <mailto:weizhong0...@gmail.com>> a écrit :
>> Hi Pierre,
>> 
>> I guess your UDF is registered by the method 'register_java_function' which 
>> uses the old type system. In this situation you need to override the 
>> 'getResultType' method instead of adding type hint. 
>> 
>> You can also try to register your UDF via the "CREATE FUNCTION" sql 
>> statement, which accepts the type hint.
>> 
>> Best,
>> Wei
>> 
>>> 在 2020年11月17日,19:29,Pierre Oberholzer <pierre.oberhol...@gmail.com 
>>> <mailto:pierre.oberhol...@gmail.com>> 写道:
>>> 
>>> Hi Wei,
>>> 
>>> Thanks for your suggestion. Same error.
>>> 
>>> Scala UDF
>>> 
>>> @FunctionHint(output = new DataTypeHint("ROW<s STRING,t STRING>"))
>>> class dummyMap() extends ScalarFunction {
>>>   def eval(): Row = {
>>>     Row.of(java.lang.String.valueOf("foo"), java.lang.String.valueOf("bar"))
>>>   }
>>> }
>>> 
>>> Best regards,
>>> 
>>> Le mar. 17 nov. 2020 à 10:04, Wei Zhong <weizhong0...@gmail.com 
>>> <mailto:weizhong0...@gmail.com>> a écrit :
>>> Hi Pierre,
>>> 
>>> You can try to replace the '@DataTypeHint("ROW<s STRING,t STRING>")' with 
>>> '@FunctionHint(output = new DataTypeHint("ROW<s STRING,t STRING>”))'
>>> 
>>> Best,
>>> Wei
>>> 
>>>> 在 2020年11月17日,15:45,Pierre Oberholzer <pierre.oberhol...@gmail.com 
>>>> <mailto:pierre.oberhol...@gmail.com>> 写道:
>>>> 
>>>> Hi Dian, Community,
>>>> 
>>>> (bringing the thread back to wider audience)
>>>> 
>>>> As you suggested, I've tried to use DataTypeHint with Row instead of Map 
>>>> but also this simple case leads to a type mismatch between UDF and Table 
>>>> API.
>>>> I've also tried other Map objects from Flink (table.data.MapData, 
>>>> flink.types.MapValue, flink.table.api.DataTypes.MAP) in addition to Java 
>>>> (java.util.Map) in combination with DataTypeHint, without success.
>>>> N.B. I'm using version 1.11.
>>>> 
>>>> Am I doing something wrong or am I facing limitations in the toolkit ?
>>>> 
>>>> Thanks in advance for your support !
>>>> 
>>>> Best regards,
>>>> 
>>>> Scala UDF
>>>> 
>>>> class dummyMap() extends ScalarFunction {
>>>> 
>>>>  @DataTypeHint("ROW<s STRING,t STRING>")
>>>>  def eval(): Row = {
>>>> 
>>>>     Row.of(java.lang.String.valueOf("foo"), 
>>>> java.lang.String.valueOf("bar"))
>>>> 
>>>>   }
>>>> }
>>>> 
>>>> Table DDL
>>>> 
>>>> my_sink_ddl = f"""
>>>>     create table mySink (
>>>>         output_of_my_scala_udf ROW<s STRING,t STRING>
>>>>     ) with (
>>>>         ...
>>>>     )
>>>> """
>>>> 
>>>> Error
>>>> 
>>>> Py4JJavaError: An error occurred while calling o2.execute.
>>>> : org.apache.flink.table.api.ValidationException: Field types of query 
>>>> result and registered TableSink 
>>>> `default_catalog`.`default_database`.`mySink` do not match.
>>>> Query result schema: [output_of_my_scala_udf: 
>>>> GenericType<org.apache.flink.types.Row>]
>>>> TableSink schema:    [output_of_my_scala_udf: Row(s: String, t: String)]
>>>> 
>>>> 
>>>> 
>>>> Le ven. 13 nov. 2020 à 11:59, Pierre Oberholzer 
>>>> <pierre.oberhol...@gmail.com <mailto:pierre.oberhol...@gmail.com>> a écrit 
>>>> :
>>>> Thanks Dian, but same error when using explicit returned type:
>>>> 
>>>> class dummyMap() extends ScalarFunction {
>>>> 
>>>>   def eval() : util.Map[java.lang.String,java.lang.String] = {
>>>>     
>>>>     val states = Map("key1" -> "val1", "key2" -> "val2")
>>>>     states.asInstanceOf[util.Map[java.lang.String,java.lang.String]]
>>>> 
>>>>   }
>>>> }
>>>> 
>>>> Le ven. 13 nov. 2020 à 10:34, Dian Fu <dian0511...@gmail.com 
>>>> <mailto:dian0511...@gmail.com>> a écrit :
>>>> You need to explicitly defined the result type the UDF. You could refer to 
>>>> [1] for more details if you are using Flink 1.11. If you are using other 
>>>> versions of Flink, you need to refer to the corresponding documentation.
>>>> 
>>>> [1] 
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide
>>>>  
>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide>
>>>>> 在 2020年11月13日,下午4:56,Pierre Oberholzer <pierre.oberhol...@gmail.com 
>>>>> <mailto:pierre.oberhol...@gmail.com>> 写道:
>>>>> 
>>>>> ScalarFunction
>>>> 
>>>> 
>>>> 
>>>> -- 
>>>> Pierre
>>>> 
>>>> -- 
>>>> Pierre
>>> 
>>> 
>>> 
>>> -- 
>>> Pierre
>> 
>> 
>> 
>> -- 
>> Pierre
> 
> 
> 
> -- 
> Pierre

Reply via email to