Hi Pierre,

Sorry for the late reply.
Your requirement is that your `Table` has a `field` in `Json` format and its 
key has reached 100k, and then you want to use such a `field` as the 
input/output of `udf`, right? As to whether there is a limit on the number of 
nested key, I am not quite clear. Other contributors with experience in this 
area may have answers. On the part of `Python UDF`, if the type of key or value 
of your `Map` is `Any`, we do not support it now. You need to specify a 
specific type. For more information, please refer to the related document[1].

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/python_types.html

Best,
Xingbo

> 2020年11月28日 上午12:49,Pierre Oberholzer <pierre.oberhol...@gmail.com> 写道:
> 
> Hello Wei, Dian, Xingbo,
> 
> Not really sure when it is appropriate to knock on the door of the community 
> ;)
> I just wanted to mention that your feedback on the above topic will be highly 
> appreciated as it will condition the choice of framework on our side for the 
> months to come, and potentially help the community to cover sparse data with 
> Flink.
> 
> Thanks a lot !
> 
> Have a great week-end
> 
> Best,
> 
> Le ven. 20 nov. 2020 à 10:11, Pierre Oberholzer <pierre.oberhol...@gmail.com 
> <mailto:pierre.oberhol...@gmail.com>> a écrit :
> Hi Wei,
> 
> Thanks for the hint. May I please follow up by adding more context and ask 
> for your guidance.
> 
> In case the bespoken Map[String,Any] object returned by Scala:
> 
> - Has a defined schema (incl. nested) with up to 100k (!) different possible 
> keys
> - Has only some portion of the keys populated for each record
> - Is convertible to JSON
> - Has to undergo downstream processing in Flink and/or Python UDF with key 
> value access
> - Has to be ultimately stored in a Kafka/AVRO sink
> 
> How would you declare the types explicitly in such a case ?
> 
> Thanks for your support !
> 
> Pierre
> 
> Le jeu. 19 nov. 2020 à 03:54, Wei Zhong <weizhong0...@gmail.com 
> <mailto:weizhong0...@gmail.com>> a écrit :
> Hi Pierre,
> 
> Currently there is no type hint like ‘Map[String, Any]’. The recommended way 
> is declaring your type more explicitly.
> 
> If you insist on doing this, you can try to declaring a RAW data type for 
> java.util.HashMap [1], but you may encounter some troubles [2] related to the 
> kryo serializers.
> 
> Best,
> Wei
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html#raw
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html#raw>
> [2] 
> https://stackoverflow.com/questions/28157236/kryo-serialization-with-nested-hashmap-with-custom-class
>  
> <https://stackoverflow.com/questions/28157236/kryo-serialization-with-nested-hashmap-with-custom-class>
> 
> 
>> 在 2020年11月19日,04:31,Pierre Oberholzer <pierre.oberhol...@gmail.com 
>> <mailto:pierre.oberhol...@gmail.com>> 写道:
>> 
>> Hi Wei,
>> 
>> It works ! Thanks a lot for your support.
>> I hadn't tried this last combination for option 1, and I had wrong syntax 
>> for option 2.
>> 
>> So to summarize..
>> 
>> Methods working:
>> - Current: DataTypeHint in UDF definition + SQL for UDF registering
>> - Outdated: override getResultType in UDF definition + 
>> t_env.register_java_function for UDF registering
>> 
>> Type conversions working:
>> - scala.collection.immutable.Map[String,String] => 
>> org.apache.flink.types.Row => ROW<STRING,STRING>
>> - scala.collection.immutable.Map[String,String] => 
>> java.util.Map[String,String] => MAP<STRING,STRING>
>> 
>> Any hint for Map[String,Any] ?
>> 
>> Best regards,
>> 
>> Le mer. 18 nov. 2020 à 03:26, Wei Zhong <weizhong0...@gmail.com 
>> <mailto:weizhong0...@gmail.com>> a écrit :
>> Hi Pierre,
>> 
>> Those 2 approaches all work in my local machine, this is my code:
>> 
>> Scala UDF:
>> package com.dummy
>> 
>> import org.apache.flink.api.common.typeinfo.TypeInformation
>> import org.apache.flink.table.annotation.DataTypeHint
>> import org.apache.flink.table.api.Types
>> import org.apache.flink.table.functions.ScalarFunction
>> import org.apache.flink.types.Row
>> 
>> /**
>>   * The scala UDF.
>>   */
>> class dummyMap extends ScalarFunction {
>> 
>>   // If the udf would be registered by the SQL statement, you need add this 
>> typehint
>>   @DataTypeHint("ROW<s STRING,t STRING>")
>>   def eval(): Row = {
>> 
>>     Row.of(java.lang.String.valueOf("foo"), java.lang.String.valueOf("bar"))
>> 
>>   }
>> 
>>   // If the udf would be registered by the method 'register_java_function', 
>> you need override this
>>   // method.
>>   override def getResultType(signature: Array[Class[_]]): TypeInformation[_] 
>> = {
>>     // The type of the return values should be TypeInformation
>>     Types.ROW(Array("s", "t"), Array[TypeInformation[_]](Types.STRING(), 
>> Types.STRING()))
>>   }
>> }
>> Python code:
>> 
>> from pyflink.datastream import StreamExecutionEnvironment
>> from pyflink.table import StreamTableEnvironment
>> 
>> s_env = StreamExecutionEnvironment.get_execution_environment()
>> st_env = StreamTableEnvironment.create(s_env)
>> 
>> # load the scala udf jar file, the path should be modified to yours
>> # or your can also load the jar file via other approaches
>> st_env.get_config().get_configuration().set_string("pipeline.jars", 
>> "file:///Users/zhongwei/the-dummy-udf.jar <>")
>> 
>> # register the udf via 
>> st_env.execute_sql("CREATE FUNCTION dummyMap AS 'com.dummy.dummyMap' 
>> LANGUAGE SCALA")
>> # or register via the method
>> # st_env.register_java_function("dummyMap", "com.dummy.dummyMap")
>> 
>> # prepare source and sink
>> t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 
>> 'b', 'c'])
>> st_env.execute_sql("""create table mySink (
>>         output_of_my_scala_udf ROW<s STRING,t STRING>
>>     ) with (
>>         'connector' = 'print'
>>     )""")
>> 
>> # execute query
>> t.select("dummyMap()").execute_insert("mySink").get_job_client().get_job_execution_result().result()
>> 
>> Best,
>> Wei
>> 
>>> 在 2020年11月18日,03:28,Pierre Oberholzer <pierre.oberhol...@gmail.com 
>>> <mailto:pierre.oberhol...@gmail.com>> 写道:
>>> 
>>> Hi Wei,
>>> 
>>> True, I'm using the method you mention, but glad to change. 
>>> I tried your suggestion instead, but got a similar error.
>>> 
>>> Thanks for your support. That is much more tedious than I thought.
>>> 
>>> Option 1 - SQL UDF
>>> 
>>> SQL UDF
>>> create_func_ddl = """
>>> CREATE FUNCTION dummyMap 
>>>   AS 'com.dummy.dummyMap' LANGUAGE SCALA
>>> """
>>>   
>>> t_env.execute_sql(create_func_ddl)
>>> 
>>> Error
>>> Py4JJavaError: An error occurred while calling o672.execute.
>>> : org.apache.flink.table.api.TableException: Result field does not match 
>>> requested type. Requested: Row(s: String, t: String); Actual: 
>>> GenericType<org.apache.flink.types.Row>
>>> 
>>> Option 2 - Overriding getResultType
>>> 
>>> Back to the old registering method, but overriding getResultType:
>>> 
>>> t_env.register_java_function("dummyMap","com.dummy.dummyMap")
>>> 
>>> Scala UDF
>>> class dummyMap() extends ScalarFunction {
>>> 
>>>   def eval(): Row = {
>>> 
>>>       Row.of(java.lang.String.valueOf("foo"), 
>>> java.lang.String.valueOf("bar"))
>>> 
>>>   }
>>> 
>>>   override def getResultType(signature: Array[Class[_]]): 
>>> TypeInformation[_] = DataTypes.ROW(DataTypes.STRING,DataTypes.STRING)
>>> }
>>> 
>>> Error (on compilation)
>>> 
>>> [error] dummyMap.scala:66:90: overloaded method value ROW with alternatives:
>>> [error]   (x$1: 
>>> org.apache.flink.table.api.DataTypes.AbstractField*)org.apache.flink.table.types.UnresolvedDataType
>>>  <and>
>>> [error]   ()org.apache.flink.table.types.DataType <and>
>>> [error]   (x$1: 
>>> org.apache.flink.table.api.DataTypes.Field*)org.apache.flink.table.types.DataType
>>> [error]  cannot be applied to (org.apache.flink.table.types.DataType, 
>>> org.apache.flink.table.types.DataType)
>>> [error]   override def getResultType(signature: Array[Class[_]]): 
>>> TypeInformation[_] = DataTypes.ROW(DataTypes.STRING,DataTypes.STRING)
>>> [error]                                                                     
>>>                      ^
>>> [error] one error found
>>> [error] (Compile / compileIncremental) Compilation failed
>>> [error] Total time: 3 s, completed 17 nov. 2020 à 20:00:01
>>> 
>>> Le mar. 17 nov. 2020 à 14:01, Wei Zhong <weizhong0...@gmail.com 
>>> <mailto:weizhong0...@gmail.com>> a écrit :
>>> Hi Pierre,
>>> 
>>> I guess your UDF is registered by the method 'register_java_function' which 
>>> uses the old type system. In this situation you need to override the 
>>> 'getResultType' method instead of adding type hint. 
>>> 
>>> You can also try to register your UDF via the "CREATE FUNCTION" sql 
>>> statement, which accepts the type hint.
>>> 
>>> Best,
>>> Wei
>>> 
>>>> 在 2020年11月17日,19:29,Pierre Oberholzer <pierre.oberhol...@gmail.com 
>>>> <mailto:pierre.oberhol...@gmail.com>> 写道:
>>>> 
>>>> Hi Wei,
>>>> 
>>>> Thanks for your suggestion. Same error.
>>>> 
>>>> Scala UDF
>>>> 
>>>> @FunctionHint(output = new DataTypeHint("ROW<s STRING,t STRING>"))
>>>> class dummyMap() extends ScalarFunction {
>>>>   def eval(): Row = {
>>>>     Row.of(java.lang.String.valueOf("foo"), 
>>>> java.lang.String.valueOf("bar"))
>>>>   }
>>>> }
>>>> 
>>>> Best regards,
>>>> 
>>>> Le mar. 17 nov. 2020 à 10:04, Wei Zhong <weizhong0...@gmail.com 
>>>> <mailto:weizhong0...@gmail.com>> a écrit :
>>>> Hi Pierre,
>>>> 
>>>> You can try to replace the '@DataTypeHint("ROW<s STRING,t STRING>")' with 
>>>> '@FunctionHint(output = new DataTypeHint("ROW<s STRING,t STRING>”))'
>>>> 
>>>> Best,
>>>> Wei
>>>> 
>>>>> 在 2020年11月17日,15:45,Pierre Oberholzer <pierre.oberhol...@gmail.com 
>>>>> <mailto:pierre.oberhol...@gmail.com>> 写道:
>>>>> 
>>>>> Hi Dian, Community,
>>>>> 
>>>>> (bringing the thread back to wider audience)
>>>>> 
>>>>> As you suggested, I've tried to use DataTypeHint with Row instead of Map 
>>>>> but also this simple case leads to a type mismatch between UDF and Table 
>>>>> API.
>>>>> I've also tried other Map objects from Flink (table.data.MapData, 
>>>>> flink.types.MapValue, flink.table.api.DataTypes.MAP) in addition to Java 
>>>>> (java.util.Map) in combination with DataTypeHint, without success.
>>>>> N.B. I'm using version 1.11.
>>>>> 
>>>>> Am I doing something wrong or am I facing limitations in the toolkit ?
>>>>> 
>>>>> Thanks in advance for your support !
>>>>> 
>>>>> Best regards,
>>>>> 
>>>>> Scala UDF
>>>>> 
>>>>> class dummyMap() extends ScalarFunction {
>>>>> 
>>>>>  @DataTypeHint("ROW<s STRING,t STRING>")
>>>>>  def eval(): Row = {
>>>>> 
>>>>>     Row.of(java.lang.String.valueOf("foo"), 
>>>>> java.lang.String.valueOf("bar"))
>>>>> 
>>>>>   }
>>>>> }
>>>>> 
>>>>> Table DDL
>>>>> 
>>>>> my_sink_ddl = f"""
>>>>>     create table mySink (
>>>>>         output_of_my_scala_udf ROW<s STRING,t STRING>
>>>>>     ) with (
>>>>>         ...
>>>>>     )
>>>>> """
>>>>> 
>>>>> Error
>>>>> 
>>>>> Py4JJavaError: An error occurred while calling o2.execute.
>>>>> : org.apache.flink.table.api.ValidationException: Field types of query 
>>>>> result and registered TableSink 
>>>>> `default_catalog`.`default_database`.`mySink` do not match.
>>>>> Query result schema: [output_of_my_scala_udf: 
>>>>> GenericType<org.apache.flink.types.Row>]
>>>>> TableSink schema:    [output_of_my_scala_udf: Row(s: String, t: String)]
>>>>> 
>>>>> 
>>>>> 
>>>>> Le ven. 13 nov. 2020 à 11:59, Pierre Oberholzer 
>>>>> <pierre.oberhol...@gmail.com <mailto:pierre.oberhol...@gmail.com>> a 
>>>>> écrit :
>>>>> Thanks Dian, but same error when using explicit returned type:
>>>>> 
>>>>> class dummyMap() extends ScalarFunction {
>>>>> 
>>>>>   def eval() : util.Map[java.lang.String,java.lang.String] = {
>>>>>     
>>>>>     val states = Map("key1" -> "val1", "key2" -> "val2")
>>>>>     states.asInstanceOf[util.Map[java.lang.String,java.lang.String]]
>>>>> 
>>>>>   }
>>>>> }
>>>>> 
>>>>> Le ven. 13 nov. 2020 à 10:34, Dian Fu <dian0511...@gmail.com 
>>>>> <mailto:dian0511...@gmail.com>> a écrit :
>>>>> You need to explicitly defined the result type the UDF. You could refer 
>>>>> to [1] for more details if you are using Flink 1.11. If you are using 
>>>>> other versions of Flink, you need to refer to the corresponding 
>>>>> documentation.
>>>>> 
>>>>> [1] 
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide
>>>>>  
>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide>
>>>>>> 在 2020年11月13日,下午4:56,Pierre Oberholzer <pierre.oberhol...@gmail.com 
>>>>>> <mailto:pierre.oberhol...@gmail.com>> 写道:
>>>>>> 
>>>>>> ScalarFunction
>>>>> 
>>>>> 
>>>>> 
>>>>> -- 
>>>>> Pierre
>>>>> 
>>>>> -- 
>>>>> Pierre
>>>> 
>>>> 
>>>> 
>>>> -- 
>>>> Pierre
>>> 
>>> 
>>> 
>>> -- 
>>> Pierre
>> 
>> 
>> 
>> -- 
>> Pierre
> 
> 
> 
> -- 
> Pierre
> 
> 
> -- 
> Pierre

Reply via email to