Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-12-03 Thread Pierre Oberholzer
Hi Xingbo, Wei, Dian, Many thanks for this plus for the high quality and prompt support overall. Let’s close this thread here. Looking forward trying your approach. Community, feel free to reach out with additional remarks and experiences on structured streaming on complex/sparse objects. Best

Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-12-02 Thread Xingbo Huang
Hi Pierre, The serialization/deserialization of sparse Row in flink is specially optimized. The principle is that each Row will have a leading mask when serializing to identify whether the field at the specified position is NULL, and one field corresponds to one bit. For example, if you have 10k

Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-12-02 Thread Pierre Oberholzer
Hi Xingbo, Community, Thanks a lot for your support. May I finally ask to conclude this thread, including wider audience: - Are serious performance issues to be expected with 100k fields per ROW (i.e. due solely to metadata overhead and independently of queries logic) ? - In sparse population

Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-12-02 Thread Xingbo Huang
Hi Pierre, This example is written based on the syntax of release-1.12 that is about to be released, and the test passed. In release-1.12, input_type can be omitted and expression can be used directly. If you are using release-1.11, you only need to modify the grammar of udf used slightly

Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-12-02 Thread Pierre Oberholzer
Hi Xingbo, Nice ! This looks a bit hacky, but shows that it can be done ;) I just got an exception preventing me running your code, apparently from udf.py: TypeError: Invalid input_type: input_type should be DataType but contains None Can you pls check again ? If the schema is defined is a

Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-12-01 Thread Xingbo Huang
Hi Pierre, I wrote a PyFlink implementation, you can see if it meets your needs: from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment, EnvironmentSettings, DataTypes from pyflink.table.udf import udf def test(): env =

Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-12-01 Thread Pierre Oberholzer
Hi Xingbo, That would mean giving up on using Flink (table) features on the content of the parsed JSON objects, so definitely a big loss. Let me know if I missed something. Thanks ! Le mar. 1 déc. 2020 à 07:26, Xingbo Huang a écrit : > Hi Pierre, > > Have you ever thought of declaring your

Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-11-30 Thread Xingbo Huang
Hi Pierre, Have you ever thought of declaring your entire json as a string field in `Table` and putting the parsing work in UDF? Best, Xingbo Pierre Oberholzer 于2020年12月1日周二 上午4:13写道: > Hi Xingbo, > > Many thanks for your follow up. Yes you got it right. > So using Table API and a ROW object

Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-11-30 Thread Pierre Oberholzer
Hi Xingbo, Many thanks for your follow up. Yes you got it right. So using Table API and a ROW object for the nested output of my UDF, and since types are mandatory, I guess this boils down to: - How to nicely specify the types for the 100k fields : shall I use TypeInformation [1] or better

Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-11-27 Thread Xingbo Huang
Hi Pierre, Sorry for the late reply. Your requirement is that your `Table` has a `field` in `Json` format and its key has reached 100k, and then you want to use such a `field` as the input/output of `udf`, right? As to whether there is a limit on the number of nested key, I am not quite clear.

Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-11-20 Thread Pierre Oberholzer
Hi Wei, Thanks for the hint. May I please follow up by adding more context and ask for your guidance. In case the bespoken Map[String,Any] object returned by Scala: - Has a defined schema (incl. nested) with up to 100k (!) different possible keys - Has only some portion of the keys populated

Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-11-18 Thread Wei Zhong
Hi Pierre, Currently there is no type hint like ‘Map[String, Any]’. The recommended way is declaring your type more explicitly. If you insist on doing this, you can try to declaring a RAW data type for java.util.HashMap [1], but you may encounter some troubles [2] related to the kryo

Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-11-18 Thread Pierre Oberholzer
Hi Wei, It works ! Thanks a lot for your support. I hadn't tried this last combination for option 1, and I had wrong syntax for option 2. So to summarize.. Methods working: - Current: DataTypeHint in UDF definition + SQL for UDF registering - Outdated: override getResultType in UDF definition +

Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-11-17 Thread Wei Zhong
Hi Pierre, Those 2 approaches all work in my local machine, this is my code: Scala UDF: package com.dummy import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.table.annotation.DataTypeHint import org.apache.flink.table.api.Types import

Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-11-17 Thread Pierre Oberholzer
Hi Wei, True, I'm using the method you mention, but glad to change. I tried your suggestion instead, but got a similar error. Thanks for your support. That is much more tedious than I thought. *Option 1 - SQL UDF* *SQL UDF* create_func_ddl = """ CREATE FUNCTION dummyMap AS

Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-11-17 Thread Wei Zhong
Hi Pierre, I guess your UDF is registered by the method 'register_java_function' which uses the old type system. In this situation you need to override the 'getResultType' method instead of adding type hint. You can also try to register your UDF via the "CREATE FUNCTION" sql statement, which

Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-11-17 Thread Pierre Oberholzer
Hi Wei, Thanks for your suggestion. Same error. *Scala UDF* @FunctionHint(output = new DataTypeHint("ROW")) class dummyMap() extends ScalarFunction { def eval(): Row = { Row.of(java.lang.String.valueOf("foo"), java.lang.String.valueOf("bar")) } } Best regards, Le mar. 17 nov. 2020 à

Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-11-17 Thread Wei Zhong
Hi Pierre, You can try to replace the '@DataTypeHint("ROW")' with '@FunctionHint(output = new DataTypeHint("ROW”))' Best, Wei > 在 2020年11月17日,15:45,Pierre Oberholzer 写道: > > Hi Dian, Community, > > (bringing the thread back to wider audience) > > As you suggested, I've tried to use

Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-11-16 Thread Pierre Oberholzer
Hi Dian, Community, (bringing the thread back to wider audience) As you suggested, I've tried to use DataTypeHint with Row instead of Map but also this simple case leads to a type mismatch between UDF and Table API. I've also tried other Map objects from Flink (table.data.MapData,

Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-11-13 Thread Pierre Oberholzer
Thanks Dian, but same error when using explicit returned type: class dummyMap() extends ScalarFunction { def eval() : util.Map[java.lang.String,java.lang.String] = { val states = Map("key1" -> "val1", "key2" -> "val2") states.asInstanceOf[util.Map[java.lang.String,java.lang.String]]

Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-11-13 Thread Dian Fu
You need to explicitly defined the result type the UDF. You could refer to [1] for more details if you are using Flink 1.11. If you are using other versions of Flink, you need to refer to the corresponding documentation. [1]

PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-11-13 Thread Pierre Oberholzer
Hi, I'm trying to use a Map[String,String] object output of a Scala UDF ( scala.collection.immutable.map) as a valid data type in the Table API, namely via Java type (java.util.Map) as recommended here ,