Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-12-03 Thread Pierre Oberholzer
Hi Xingbo, Wei, Dian,

Many thanks for this plus for the high quality and prompt support overall.
Let’s close this thread here. Looking forward trying your approach.
Community, feel free to reach out with additional remarks and experiences
on structured streaming on complex/sparse objects.

Best regards,

Le jeu. 3 déc. 2020 à 08:47, Xingbo Huang  a écrit :

> Hi Pierre,
>
> The serialization/deserialization of sparse Row in flink is specially
> optimized. The principle is that each Row will have a leading mask when
> serializing to identify whether the field at the specified position is
> NULL, and one field corresponds to one bit. For example, if you have 10k
> fields, then there will be a mask of 10k bit / 8 = 1250 byte. In this way,
> the serialization/deserialization overhead can be omitted for those field
> values that are NULL.
>
> For specific code optimization logic, you can refer to java logic[1], or
> python logic[2] and cython logic[3].
>
> [1]
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java#L185
> [2]
> https://github.com/apache/flink/blob/master/flink-python/pyflink/fn_execution/beam/beam_coder_impl_slow.py#L100
> [3]
> https://github.com/apache/flink/blob/master/flink-python/pyflink/fn_execution/coder_impl_fast.pyx#L697
>
> Best,
> Xingbo
>
> Pierre Oberholzer  于2020年12月3日周四 下午3:08写道:
>
>> Hi Xingbo, Community,
>>
>> Thanks a lot for your support.
>> May I finally ask to conclude this thread, including wider audience:
>> - Are serious performance issues to be expected with 100k fields per ROW
>> (i.e. due solely to metadata overhead and independently of queries logic) ?
>> - In sparse population (say 99% sparsity) already optimized in the ROW
>> object or are sparse types on your roadmap ?
>> Any experience with sparse Table from other users (including benchmarks
>> vs. other frameworks) are also highly welcome.
>>
>> Thanks !
>>
>> Best
>>
>>
>> Le jeu. 3 déc. 2020 à 02:53, Xingbo Huang  a écrit :
>>
>>> Hi Pierre,
>>>
>>> This example is written based on the syntax of release-1.12 that is
>>> about to be released, and the test passed. In release-1.12, input_type can
>>> be omitted and expression can be used directly. If you are using
>>> release-1.11, you only need to modify the grammar of udf used slightly
>>> according to the udf documentation[1].
>>>
>>> The flink table connector supports avro format, please refer to the
>>> document[2].
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/udfs/python_udfs.html#scalar-functions
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/avro.html#avro-format
>>>
>>> Best,
>>> Xingbo
>>>
>>> Pierre Oberholzer  于2020年12月3日周四 上午2:57写道:
>>>
 Hi Xingbo,

 Nice ! This looks a bit hacky, but shows that it can be done ;)

 I just got an exception preventing me running your code, apparently
 from udf.py:

 TypeError: Invalid input_type: input_type should be DataType but
 contains None

 Can you pls check again ?
 If the schema is defined is a .avsc file, do we have to parse it and
 rebuild those syntax (ddl and udf) and or is there an existing component
 that could be used ?

 Thanks a lot !

 Best,


 Le mer. 2 déc. 2020 à 04:50, Xingbo Huang  a
 écrit :

> Hi Pierre,
>
> I wrote a PyFlink implementation, you can see if it meets your needs:
>
>
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import StreamTableEnvironment, EnvironmentSettings,
> DataTypes
> from pyflink.table.udf import udf
>
>
> def test():
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_parallelism(1)
> t_env = StreamTableEnvironment.create(env,
>
> environment_settings=EnvironmentSettings.new_instance()
>
> .in_streaming_mode().use_blink_planner().build())
>
> t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
>   '80m')
>
> # 10k nested columns
> num_field = 10_000
> fields = ['f%s INT' % i for i in range(num_field)]
> field_str = ','.join(fields)
> t_env.execute_sql(f"""
> CREATE TABLE source_table (
> f0 BIGINT,
> f1 DECIMAL(32,2),
> f2 ROW<${field_str}>,
> f3 TIMESTAMP(3)
> ) WITH (
>   'connector' = 'datagen',
>   'number-of-rows' = '2'
> )
> """)
>
> t_env.execute_sql(f"""
> CREATE TABLE print_table (
>  f0 BIGINT,
>  f1 DECIMAL(32,2),
>  f2 ROW<${field_str}>,
>  f3 TIMESTAMP(3)
> ) WITH (

Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-12-02 Thread Xingbo Huang
Hi Pierre,

The serialization/deserialization of sparse Row in flink is specially
optimized. The principle is that each Row will have a leading mask when
serializing to identify whether the field at the specified position is
NULL, and one field corresponds to one bit. For example, if you have 10k
fields, then there will be a mask of 10k bit / 8 = 1250 byte. In this way,
the serialization/deserialization overhead can be omitted for those field
values that are NULL.

For specific code optimization logic, you can refer to java logic[1], or
python logic[2] and cython logic[3].

[1]
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java#L185
[2]
https://github.com/apache/flink/blob/master/flink-python/pyflink/fn_execution/beam/beam_coder_impl_slow.py#L100
[3]
https://github.com/apache/flink/blob/master/flink-python/pyflink/fn_execution/coder_impl_fast.pyx#L697

Best,
Xingbo

Pierre Oberholzer  于2020年12月3日周四 下午3:08写道:

> Hi Xingbo, Community,
>
> Thanks a lot for your support.
> May I finally ask to conclude this thread, including wider audience:
> - Are serious performance issues to be expected with 100k fields per ROW
> (i.e. due solely to metadata overhead and independently of queries logic) ?
> - In sparse population (say 99% sparsity) already optimized in the ROW
> object or are sparse types on your roadmap ?
> Any experience with sparse Table from other users (including benchmarks
> vs. other frameworks) are also highly welcome.
>
> Thanks !
>
> Best
>
>
> Le jeu. 3 déc. 2020 à 02:53, Xingbo Huang  a écrit :
>
>> Hi Pierre,
>>
>> This example is written based on the syntax of release-1.12 that is about
>> to be released, and the test passed. In release-1.12, input_type can be
>> omitted and expression can be used directly. If you are using release-1.11,
>> you only need to modify the grammar of udf used slightly according to the
>> udf documentation[1].
>>
>> The flink table connector supports avro format, please refer to the
>> document[2].
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/udfs/python_udfs.html#scalar-functions
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/avro.html#avro-format
>>
>> Best,
>> Xingbo
>>
>> Pierre Oberholzer  于2020年12月3日周四 上午2:57写道:
>>
>>> Hi Xingbo,
>>>
>>> Nice ! This looks a bit hacky, but shows that it can be done ;)
>>>
>>> I just got an exception preventing me running your code, apparently from
>>> udf.py:
>>>
>>> TypeError: Invalid input_type: input_type should be DataType but
>>> contains None
>>>
>>> Can you pls check again ?
>>> If the schema is defined is a .avsc file, do we have to parse it and
>>> rebuild those syntax (ddl and udf) and or is there an existing component
>>> that could be used ?
>>>
>>> Thanks a lot !
>>>
>>> Best,
>>>
>>>
>>> Le mer. 2 déc. 2020 à 04:50, Xingbo Huang  a écrit :
>>>
 Hi Pierre,

 I wrote a PyFlink implementation, you can see if it meets your needs:


 from pyflink.datastream import StreamExecutionEnvironment
 from pyflink.table import StreamTableEnvironment, EnvironmentSettings,
 DataTypes
 from pyflink.table.udf import udf


 def test():
 env = StreamExecutionEnvironment.get_execution_environment()
 env.set_parallelism(1)
 t_env = StreamTableEnvironment.create(env,

 environment_settings=EnvironmentSettings.new_instance()

 .in_streaming_mode().use_blink_planner().build())

 t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
   '80m')

 # 10k nested columns
 num_field = 10_000
 fields = ['f%s INT' % i for i in range(num_field)]
 field_str = ','.join(fields)
 t_env.execute_sql(f"""
 CREATE TABLE source_table (
 f0 BIGINT,
 f1 DECIMAL(32,2),
 f2 ROW<${field_str}>,
 f3 TIMESTAMP(3)
 ) WITH (
   'connector' = 'datagen',
   'number-of-rows' = '2'
 )
 """)

 t_env.execute_sql(f"""
 CREATE TABLE print_table (
  f0 BIGINT,
  f1 DECIMAL(32,2),
  f2 ROW<${field_str}>,
  f3 TIMESTAMP(3)
 ) WITH (
  'connector' = 'print'
 )
 """)
 result_type = DataTypes.ROW(
 [DataTypes.FIELD("f%s" % i, DataTypes.INT()) for i in
 range(num_field)])

 func = udf(lambda x: x, result_type=result_type)

 source = t_env.from_path("source_table")
 result = source.select(source.f0, source.f1, func(source.f2),
 source.f3)
 result.execute_insert("print_table")


 if __name__ == '__main__':
 test()



Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-12-02 Thread Pierre Oberholzer
Hi Xingbo, Community,

Thanks a lot for your support.
May I finally ask to conclude this thread, including wider audience:
- Are serious performance issues to be expected with 100k fields per ROW
(i.e. due solely to metadata overhead and independently of queries logic) ?
- In sparse population (say 99% sparsity) already optimized in the ROW
object or are sparse types on your roadmap ?
Any experience with sparse Table from other users (including benchmarks vs.
other frameworks) are also highly welcome.

Thanks !

Best


Le jeu. 3 déc. 2020 à 02:53, Xingbo Huang  a écrit :

> Hi Pierre,
>
> This example is written based on the syntax of release-1.12 that is about
> to be released, and the test passed. In release-1.12, input_type can be
> omitted and expression can be used directly. If you are using release-1.11,
> you only need to modify the grammar of udf used slightly according to the
> udf documentation[1].
>
> The flink table connector supports avro format, please refer to the
> document[2].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/udfs/python_udfs.html#scalar-functions
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/avro.html#avro-format
>
> Best,
> Xingbo
>
> Pierre Oberholzer  于2020年12月3日周四 上午2:57写道:
>
>> Hi Xingbo,
>>
>> Nice ! This looks a bit hacky, but shows that it can be done ;)
>>
>> I just got an exception preventing me running your code, apparently from
>> udf.py:
>>
>> TypeError: Invalid input_type: input_type should be DataType but contains
>> None
>>
>> Can you pls check again ?
>> If the schema is defined is a .avsc file, do we have to parse it and
>> rebuild those syntax (ddl and udf) and or is there an existing component
>> that could be used ?
>>
>> Thanks a lot !
>>
>> Best,
>>
>>
>> Le mer. 2 déc. 2020 à 04:50, Xingbo Huang  a écrit :
>>
>>> Hi Pierre,
>>>
>>> I wrote a PyFlink implementation, you can see if it meets your needs:
>>>
>>>
>>> from pyflink.datastream import StreamExecutionEnvironment
>>> from pyflink.table import StreamTableEnvironment, EnvironmentSettings,
>>> DataTypes
>>> from pyflink.table.udf import udf
>>>
>>>
>>> def test():
>>> env = StreamExecutionEnvironment.get_execution_environment()
>>> env.set_parallelism(1)
>>> t_env = StreamTableEnvironment.create(env,
>>>
>>> environment_settings=EnvironmentSettings.new_instance()
>>>
>>> .in_streaming_mode().use_blink_planner().build())
>>>
>>> t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
>>>   '80m')
>>>
>>> # 10k nested columns
>>> num_field = 10_000
>>> fields = ['f%s INT' % i for i in range(num_field)]
>>> field_str = ','.join(fields)
>>> t_env.execute_sql(f"""
>>> CREATE TABLE source_table (
>>> f0 BIGINT,
>>> f1 DECIMAL(32,2),
>>> f2 ROW<${field_str}>,
>>> f3 TIMESTAMP(3)
>>> ) WITH (
>>>   'connector' = 'datagen',
>>>   'number-of-rows' = '2'
>>> )
>>> """)
>>>
>>> t_env.execute_sql(f"""
>>> CREATE TABLE print_table (
>>>  f0 BIGINT,
>>>  f1 DECIMAL(32,2),
>>>  f2 ROW<${field_str}>,
>>>  f3 TIMESTAMP(3)
>>> ) WITH (
>>>  'connector' = 'print'
>>> )
>>> """)
>>> result_type = DataTypes.ROW(
>>> [DataTypes.FIELD("f%s" % i, DataTypes.INT()) for i in
>>> range(num_field)])
>>>
>>> func = udf(lambda x: x, result_type=result_type)
>>>
>>> source = t_env.from_path("source_table")
>>> result = source.select(source.f0, source.f1, func(source.f2),
>>> source.f3)
>>> result.execute_insert("print_table")
>>>
>>>
>>> if __name__ == '__main__':
>>> test()
>>>
>>>
>>>  Best,
>>>  Xingbo
>>>
>>> Pierre Oberholzer  于2020年12月1日周二 下午6:10写道:
>>>
 Hi Xingbo,

 That would mean giving up on using Flink (table) features on the
 content of the parsed JSON objects, so definitely a big loss. Let me know
 if I missed something.

 Thanks !

 Le mar. 1 déc. 2020 à 07:26, Xingbo Huang  a
 écrit :

> Hi Pierre,
>
> Have you ever thought of declaring your entire json as a string field
> in `Table` and putting the parsing work in UDF?
>
> Best,
> Xingbo
>
> Pierre Oberholzer  于2020年12月1日周二
> 上午4:13写道:
>
>> Hi Xingbo,
>>
>> Many thanks for your follow up. Yes you got it right.
>> So using Table API and a ROW object for the nested output of my UDF,
>> and since types are mandatory, I guess this boils down to:
>> - How to nicely specify the types for the 100k fields : shall I use
>> TypeInformation [1] or better retrieve it from Schema Registry [2] ?
>> - Do I have to put NULL values for all the fields that don't have a
>> value in my JSON ?
>> - Will the resulting Table be "sparse" 

Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-12-02 Thread Xingbo Huang
Hi Pierre,

This example is written based on the syntax of release-1.12 that is about
to be released, and the test passed. In release-1.12, input_type can be
omitted and expression can be used directly. If you are using release-1.11,
you only need to modify the grammar of udf used slightly according to the
udf documentation[1].

The flink table connector supports avro format, please refer to the
document[2].

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/udfs/python_udfs.html#scalar-functions
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/avro.html#avro-format

Best,
Xingbo

Pierre Oberholzer  于2020年12月3日周四 上午2:57写道:

> Hi Xingbo,
>
> Nice ! This looks a bit hacky, but shows that it can be done ;)
>
> I just got an exception preventing me running your code, apparently from
> udf.py:
>
> TypeError: Invalid input_type: input_type should be DataType but contains
> None
>
> Can you pls check again ?
> If the schema is defined is a .avsc file, do we have to parse it and
> rebuild those syntax (ddl and udf) and or is there an existing component
> that could be used ?
>
> Thanks a lot !
>
> Best,
>
>
> Le mer. 2 déc. 2020 à 04:50, Xingbo Huang  a écrit :
>
>> Hi Pierre,
>>
>> I wrote a PyFlink implementation, you can see if it meets your needs:
>>
>>
>> from pyflink.datastream import StreamExecutionEnvironment
>> from pyflink.table import StreamTableEnvironment, EnvironmentSettings,
>> DataTypes
>> from pyflink.table.udf import udf
>>
>>
>> def test():
>> env = StreamExecutionEnvironment.get_execution_environment()
>> env.set_parallelism(1)
>> t_env = StreamTableEnvironment.create(env,
>>
>> environment_settings=EnvironmentSettings.new_instance()
>>
>> .in_streaming_mode().use_blink_planner().build())
>>
>> t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
>>   '80m')
>>
>> # 10k nested columns
>> num_field = 10_000
>> fields = ['f%s INT' % i for i in range(num_field)]
>> field_str = ','.join(fields)
>> t_env.execute_sql(f"""
>> CREATE TABLE source_table (
>> f0 BIGINT,
>> f1 DECIMAL(32,2),
>> f2 ROW<${field_str}>,
>> f3 TIMESTAMP(3)
>> ) WITH (
>>   'connector' = 'datagen',
>>   'number-of-rows' = '2'
>> )
>> """)
>>
>> t_env.execute_sql(f"""
>> CREATE TABLE print_table (
>>  f0 BIGINT,
>>  f1 DECIMAL(32,2),
>>  f2 ROW<${field_str}>,
>>  f3 TIMESTAMP(3)
>> ) WITH (
>>  'connector' = 'print'
>> )
>> """)
>> result_type = DataTypes.ROW(
>> [DataTypes.FIELD("f%s" % i, DataTypes.INT()) for i in
>> range(num_field)])
>>
>> func = udf(lambda x: x, result_type=result_type)
>>
>> source = t_env.from_path("source_table")
>> result = source.select(source.f0, source.f1, func(source.f2),
>> source.f3)
>> result.execute_insert("print_table")
>>
>>
>> if __name__ == '__main__':
>> test()
>>
>>
>>  Best,
>>  Xingbo
>>
>> Pierre Oberholzer  于2020年12月1日周二 下午6:10写道:
>>
>>> Hi Xingbo,
>>>
>>> That would mean giving up on using Flink (table) features on the content
>>> of the parsed JSON objects, so definitely a big loss. Let me know if I
>>> missed something.
>>>
>>> Thanks !
>>>
>>> Le mar. 1 déc. 2020 à 07:26, Xingbo Huang  a écrit :
>>>
 Hi Pierre,

 Have you ever thought of declaring your entire json as a string field
 in `Table` and putting the parsing work in UDF?

 Best,
 Xingbo

 Pierre Oberholzer  于2020年12月1日周二 上午4:13写道:

> Hi Xingbo,
>
> Many thanks for your follow up. Yes you got it right.
> So using Table API and a ROW object for the nested output of my UDF,
> and since types are mandatory, I guess this boils down to:
> - How to nicely specify the types for the 100k fields : shall I use
> TypeInformation [1] or better retrieve it from Schema Registry [2] ?
> - Do I have to put NULL values for all the fields that don't have a
> value in my JSON ?
> - Will the resulting Table be "sparse" and suffer performance
> limitations ?
> Let me know if Table API and ROW are the right candidates here, or if
> other better alternatives exist.
> As said I'd be glad to apply some downstream transformations using
> key,value access (and possibly some Table <-> Pandas operations). Hope 
> that
> doesn't make it a too long wish list ;)
>
> Thanks a lot !
>
> Best regards,
>
> [1]
> https://stackoverflow.com/questions/48696875/how-to-attach-schema-to-a-flink-datastream-on-the-fly
> [2]
> https://docs.cloudera.com/csa/1.2.0/datastream-connectors/topics/csa-schema-registry.html
>
> Le sam. 28 nov. 2020 à 04:04, Xingbo Huang  a
> écrit :
>
>> Hi Pierre,
>>

Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-12-02 Thread Pierre Oberholzer
Hi Xingbo,

Nice ! This looks a bit hacky, but shows that it can be done ;)

I just got an exception preventing me running your code, apparently from
udf.py:

TypeError: Invalid input_type: input_type should be DataType but contains
None

Can you pls check again ?
If the schema is defined is a .avsc file, do we have to parse it and
rebuild those syntax (ddl and udf) and or is there an existing component
that could be used ?

Thanks a lot !

Best,


Le mer. 2 déc. 2020 à 04:50, Xingbo Huang  a écrit :

> Hi Pierre,
>
> I wrote a PyFlink implementation, you can see if it meets your needs:
>
>
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import StreamTableEnvironment, EnvironmentSettings,
> DataTypes
> from pyflink.table.udf import udf
>
>
> def test():
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_parallelism(1)
> t_env = StreamTableEnvironment.create(env,
>
> environment_settings=EnvironmentSettings.new_instance()
>
> .in_streaming_mode().use_blink_planner().build())
>
> t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
>   '80m')
>
> # 10k nested columns
> num_field = 10_000
> fields = ['f%s INT' % i for i in range(num_field)]
> field_str = ','.join(fields)
> t_env.execute_sql(f"""
> CREATE TABLE source_table (
> f0 BIGINT,
> f1 DECIMAL(32,2),
> f2 ROW<${field_str}>,
> f3 TIMESTAMP(3)
> ) WITH (
>   'connector' = 'datagen',
>   'number-of-rows' = '2'
> )
> """)
>
> t_env.execute_sql(f"""
> CREATE TABLE print_table (
>  f0 BIGINT,
>  f1 DECIMAL(32,2),
>  f2 ROW<${field_str}>,
>  f3 TIMESTAMP(3)
> ) WITH (
>  'connector' = 'print'
> )
> """)
> result_type = DataTypes.ROW(
> [DataTypes.FIELD("f%s" % i, DataTypes.INT()) for i in
> range(num_field)])
>
> func = udf(lambda x: x, result_type=result_type)
>
> source = t_env.from_path("source_table")
> result = source.select(source.f0, source.f1, func(source.f2),
> source.f3)
> result.execute_insert("print_table")
>
>
> if __name__ == '__main__':
> test()
>
>
>  Best,
>  Xingbo
>
> Pierre Oberholzer  于2020年12月1日周二 下午6:10写道:
>
>> Hi Xingbo,
>>
>> That would mean giving up on using Flink (table) features on the content
>> of the parsed JSON objects, so definitely a big loss. Let me know if I
>> missed something.
>>
>> Thanks !
>>
>> Le mar. 1 déc. 2020 à 07:26, Xingbo Huang  a écrit :
>>
>>> Hi Pierre,
>>>
>>> Have you ever thought of declaring your entire json as a string field in
>>> `Table` and putting the parsing work in UDF?
>>>
>>> Best,
>>> Xingbo
>>>
>>> Pierre Oberholzer  于2020年12月1日周二 上午4:13写道:
>>>
 Hi Xingbo,

 Many thanks for your follow up. Yes you got it right.
 So using Table API and a ROW object for the nested output of my UDF,
 and since types are mandatory, I guess this boils down to:
 - How to nicely specify the types for the 100k fields : shall I use
 TypeInformation [1] or better retrieve it from Schema Registry [2] ?
 - Do I have to put NULL values for all the fields that don't have a
 value in my JSON ?
 - Will the resulting Table be "sparse" and suffer performance
 limitations ?
 Let me know if Table API and ROW are the right candidates here, or if
 other better alternatives exist.
 As said I'd be glad to apply some downstream transformations using
 key,value access (and possibly some Table <-> Pandas operations). Hope that
 doesn't make it a too long wish list ;)

 Thanks a lot !

 Best regards,

 [1]
 https://stackoverflow.com/questions/48696875/how-to-attach-schema-to-a-flink-datastream-on-the-fly
 [2]
 https://docs.cloudera.com/csa/1.2.0/datastream-connectors/topics/csa-schema-registry.html

 Le sam. 28 nov. 2020 à 04:04, Xingbo Huang  a
 écrit :

> Hi Pierre,
>
> Sorry for the late reply.
> Your requirement is that your `Table` has a `field` in `Json` format
> and its key has reached 100k, and then you want to use such a `field` as
> the input/output of `udf`, right? As to whether there is a limit on the
> number of nested key, I am not quite clear. Other contributors with
> experience in this area may have answers. On the part of `Python UDF`, if
> the type of key or value of your `Map` is `Any`, we do not support it now.
> You need to specify a specific type. For more information, please refer to
> the related document[1].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/python_types.html
>
> Best,
> Xingbo
>
> 2020年11月28日 上午12:49,Pierre Oberholzer 
> 写道:
>
> Hello Wei, Dian, Xingbo,
>
> 

Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-12-01 Thread Xingbo Huang
Hi Pierre,

I wrote a PyFlink implementation, you can see if it meets your needs:


from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings,
DataTypes
from pyflink.table.udf import udf


def test():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env,

environment_settings=EnvironmentSettings.new_instance()

.in_streaming_mode().use_blink_planner().build())

t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
  '80m')

# 10k nested columns
num_field = 10_000
fields = ['f%s INT' % i for i in range(num_field)]
field_str = ','.join(fields)
t_env.execute_sql(f"""
CREATE TABLE source_table (
f0 BIGINT,
f1 DECIMAL(32,2),
f2 ROW<${field_str}>,
f3 TIMESTAMP(3)
) WITH (
  'connector' = 'datagen',
  'number-of-rows' = '2'
)
""")

t_env.execute_sql(f"""
CREATE TABLE print_table (
 f0 BIGINT,
 f1 DECIMAL(32,2),
 f2 ROW<${field_str}>,
 f3 TIMESTAMP(3)
) WITH (
 'connector' = 'print'
)
""")
result_type = DataTypes.ROW(
[DataTypes.FIELD("f%s" % i, DataTypes.INT()) for i in
range(num_field)])

func = udf(lambda x: x, result_type=result_type)

source = t_env.from_path("source_table")
result = source.select(source.f0, source.f1, func(source.f2), source.f3)
result.execute_insert("print_table")


if __name__ == '__main__':
test()


 Best,
 Xingbo

Pierre Oberholzer  于2020年12月1日周二 下午6:10写道:

> Hi Xingbo,
>
> That would mean giving up on using Flink (table) features on the content
> of the parsed JSON objects, so definitely a big loss. Let me know if I
> missed something.
>
> Thanks !
>
> Le mar. 1 déc. 2020 à 07:26, Xingbo Huang  a écrit :
>
>> Hi Pierre,
>>
>> Have you ever thought of declaring your entire json as a string field in
>> `Table` and putting the parsing work in UDF?
>>
>> Best,
>> Xingbo
>>
>> Pierre Oberholzer  于2020年12月1日周二 上午4:13写道:
>>
>>> Hi Xingbo,
>>>
>>> Many thanks for your follow up. Yes you got it right.
>>> So using Table API and a ROW object for the nested output of my UDF, and
>>> since types are mandatory, I guess this boils down to:
>>> - How to nicely specify the types for the 100k fields : shall I use
>>> TypeInformation [1] or better retrieve it from Schema Registry [2] ?
>>> - Do I have to put NULL values for all the fields that don't have a
>>> value in my JSON ?
>>> - Will the resulting Table be "sparse" and suffer performance
>>> limitations ?
>>> Let me know if Table API and ROW are the right candidates here, or if
>>> other better alternatives exist.
>>> As said I'd be glad to apply some downstream transformations using
>>> key,value access (and possibly some Table <-> Pandas operations). Hope that
>>> doesn't make it a too long wish list ;)
>>>
>>> Thanks a lot !
>>>
>>> Best regards,
>>>
>>> [1]
>>> https://stackoverflow.com/questions/48696875/how-to-attach-schema-to-a-flink-datastream-on-the-fly
>>> [2]
>>> https://docs.cloudera.com/csa/1.2.0/datastream-connectors/topics/csa-schema-registry.html
>>>
>>> Le sam. 28 nov. 2020 à 04:04, Xingbo Huang  a
>>> écrit :
>>>
 Hi Pierre,

 Sorry for the late reply.
 Your requirement is that your `Table` has a `field` in `Json` format
 and its key has reached 100k, and then you want to use such a `field` as
 the input/output of `udf`, right? As to whether there is a limit on the
 number of nested key, I am not quite clear. Other contributors with
 experience in this area may have answers. On the part of `Python UDF`, if
 the type of key or value of your `Map` is `Any`, we do not support it now.
 You need to specify a specific type. For more information, please refer to
 the related document[1].

 [1]
 https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/python_types.html

 Best,
 Xingbo

 2020年11月28日 上午12:49,Pierre Oberholzer  写道:

 Hello Wei, Dian, Xingbo,

 Not really sure when it is appropriate to knock on the door of the
 community ;)
 I just wanted to mention that your feedback on the above topic will be
 highly appreciated as it will condition the choice of framework on our side
 for the months to come, and potentially help the community to cover sparse
 data with Flink.

 Thanks a lot !

 Have a great week-end

 Best,

 Le ven. 20 nov. 2020 à 10:11, Pierre Oberholzer <
 pierre.oberhol...@gmail.com> a écrit :

> Hi Wei,
>
> Thanks for the hint. May I please follow up by adding more context and
> ask for your guidance.
>
> In case the bespoken Map[String,Any] object 

Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-12-01 Thread Pierre Oberholzer
Hi Xingbo,

That would mean giving up on using Flink (table) features on the content of
the parsed JSON objects, so definitely a big loss. Let me know if I missed
something.

Thanks !

Le mar. 1 déc. 2020 à 07:26, Xingbo Huang  a écrit :

> Hi Pierre,
>
> Have you ever thought of declaring your entire json as a string field in
> `Table` and putting the parsing work in UDF?
>
> Best,
> Xingbo
>
> Pierre Oberholzer  于2020年12月1日周二 上午4:13写道:
>
>> Hi Xingbo,
>>
>> Many thanks for your follow up. Yes you got it right.
>> So using Table API and a ROW object for the nested output of my UDF, and
>> since types are mandatory, I guess this boils down to:
>> - How to nicely specify the types for the 100k fields : shall I use
>> TypeInformation [1] or better retrieve it from Schema Registry [2] ?
>> - Do I have to put NULL values for all the fields that don't have a value
>> in my JSON ?
>> - Will the resulting Table be "sparse" and suffer performance limitations
>> ?
>> Let me know if Table API and ROW are the right candidates here, or if
>> other better alternatives exist.
>> As said I'd be glad to apply some downstream transformations using
>> key,value access (and possibly some Table <-> Pandas operations). Hope that
>> doesn't make it a too long wish list ;)
>>
>> Thanks a lot !
>>
>> Best regards,
>>
>> [1]
>> https://stackoverflow.com/questions/48696875/how-to-attach-schema-to-a-flink-datastream-on-the-fly
>> [2]
>> https://docs.cloudera.com/csa/1.2.0/datastream-connectors/topics/csa-schema-registry.html
>>
>> Le sam. 28 nov. 2020 à 04:04, Xingbo Huang  a écrit :
>>
>>> Hi Pierre,
>>>
>>> Sorry for the late reply.
>>> Your requirement is that your `Table` has a `field` in `Json` format and
>>> its key has reached 100k, and then you want to use such a `field` as the
>>> input/output of `udf`, right? As to whether there is a limit on the number
>>> of nested key, I am not quite clear. Other contributors with experience in
>>> this area may have answers. On the part of `Python UDF`, if the type of key
>>> or value of your `Map` is `Any`, we do not support it now. You need to
>>> specify a specific type. For more information, please refer to the related
>>> document[1].
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/python_types.html
>>>
>>> Best,
>>> Xingbo
>>>
>>> 2020年11月28日 上午12:49,Pierre Oberholzer  写道:
>>>
>>> Hello Wei, Dian, Xingbo,
>>>
>>> Not really sure when it is appropriate to knock on the door of the
>>> community ;)
>>> I just wanted to mention that your feedback on the above topic will be
>>> highly appreciated as it will condition the choice of framework on our side
>>> for the months to come, and potentially help the community to cover sparse
>>> data with Flink.
>>>
>>> Thanks a lot !
>>>
>>> Have a great week-end
>>>
>>> Best,
>>>
>>> Le ven. 20 nov. 2020 à 10:11, Pierre Oberholzer <
>>> pierre.oberhol...@gmail.com> a écrit :
>>>
 Hi Wei,

 Thanks for the hint. May I please follow up by adding more context and
 ask for your guidance.

 In case the bespoken Map[String,Any] object returned by Scala:

 - Has a defined schema (incl. nested) with up to 100k (!) different
 possible keys
 - Has only some portion of the keys populated for each record
 - Is convertible to JSON
 - Has to undergo downstream processing in Flink and/or Python UDF with
 key value access
 - Has to be ultimately stored in a Kafka/AVRO sink

 How would you declare the types explicitly in such a case ?

 Thanks for your support !

 Pierre

 Le jeu. 19 nov. 2020 à 03:54, Wei Zhong  a
 écrit :

> Hi Pierre,
>
> Currently there is no type hint like ‘Map[String, Any]’. The
> recommended way is declaring your type more explicitly.
>
> If you insist on doing this, you can try to declaring a RAW data type
> for java.util.HashMap [1], but you may encounter some troubles [2] related
> to the kryo serializers.
>
> Best,
> Wei
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html#raw
> [2]
> https://stackoverflow.com/questions/28157236/kryo-serialization-with-nested-hashmap-with-custom-class
>
>
> 在 2020年11月19日,04:31,Pierre Oberholzer 
> 写道:
>
> Hi Wei,
>
> It works ! Thanks a lot for your support.
> I hadn't tried this last combination for option 1, and I had wrong
> syntax for option 2.
>
> So to summarize..
>
> Methods working:
> - Current: DataTypeHint in UDF definition + SQL for UDF registering
> - Outdated: override getResultType in UDF definition
> + t_env.register_java_function for UDF registering
>
> Type conversions working:
> - scala.collection.immutable.Map[String,String] =>
> org.apache.flink.types.Row => ROW
> - scala.collection.immutable.Map[String,String] =>
> 

Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-11-30 Thread Xingbo Huang
Hi Pierre,

Have you ever thought of declaring your entire json as a string field in
`Table` and putting the parsing work in UDF?

Best,
Xingbo

Pierre Oberholzer  于2020年12月1日周二 上午4:13写道:

> Hi Xingbo,
>
> Many thanks for your follow up. Yes you got it right.
> So using Table API and a ROW object for the nested output of my UDF, and
> since types are mandatory, I guess this boils down to:
> - How to nicely specify the types for the 100k fields : shall I use
> TypeInformation [1] or better retrieve it from Schema Registry [2] ?
> - Do I have to put NULL values for all the fields that don't have a value
> in my JSON ?
> - Will the resulting Table be "sparse" and suffer performance limitations ?
> Let me know if Table API and ROW are the right candidates here, or if
> other better alternatives exist.
> As said I'd be glad to apply some downstream transformations using
> key,value access (and possibly some Table <-> Pandas operations). Hope that
> doesn't make it a too long wish list ;)
>
> Thanks a lot !
>
> Best regards,
>
> [1]
> https://stackoverflow.com/questions/48696875/how-to-attach-schema-to-a-flink-datastream-on-the-fly
> [2]
> https://docs.cloudera.com/csa/1.2.0/datastream-connectors/topics/csa-schema-registry.html
>
> Le sam. 28 nov. 2020 à 04:04, Xingbo Huang  a écrit :
>
>> Hi Pierre,
>>
>> Sorry for the late reply.
>> Your requirement is that your `Table` has a `field` in `Json` format and
>> its key has reached 100k, and then you want to use such a `field` as the
>> input/output of `udf`, right? As to whether there is a limit on the number
>> of nested key, I am not quite clear. Other contributors with experience in
>> this area may have answers. On the part of `Python UDF`, if the type of key
>> or value of your `Map` is `Any`, we do not support it now. You need to
>> specify a specific type. For more information, please refer to the related
>> document[1].
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/python_types.html
>>
>> Best,
>> Xingbo
>>
>> 2020年11月28日 上午12:49,Pierre Oberholzer  写道:
>>
>> Hello Wei, Dian, Xingbo,
>>
>> Not really sure when it is appropriate to knock on the door of the
>> community ;)
>> I just wanted to mention that your feedback on the above topic will be
>> highly appreciated as it will condition the choice of framework on our side
>> for the months to come, and potentially help the community to cover sparse
>> data with Flink.
>>
>> Thanks a lot !
>>
>> Have a great week-end
>>
>> Best,
>>
>> Le ven. 20 nov. 2020 à 10:11, Pierre Oberholzer <
>> pierre.oberhol...@gmail.com> a écrit :
>>
>>> Hi Wei,
>>>
>>> Thanks for the hint. May I please follow up by adding more context and
>>> ask for your guidance.
>>>
>>> In case the bespoken Map[String,Any] object returned by Scala:
>>>
>>> - Has a defined schema (incl. nested) with up to 100k (!) different
>>> possible keys
>>> - Has only some portion of the keys populated for each record
>>> - Is convertible to JSON
>>> - Has to undergo downstream processing in Flink and/or Python UDF with
>>> key value access
>>> - Has to be ultimately stored in a Kafka/AVRO sink
>>>
>>> How would you declare the types explicitly in such a case ?
>>>
>>> Thanks for your support !
>>>
>>> Pierre
>>>
>>> Le jeu. 19 nov. 2020 à 03:54, Wei Zhong  a
>>> écrit :
>>>
 Hi Pierre,

 Currently there is no type hint like ‘Map[String, Any]’. The
 recommended way is declaring your type more explicitly.

 If you insist on doing this, you can try to declaring a RAW data type
 for java.util.HashMap [1], but you may encounter some troubles [2] related
 to the kryo serializers.

 Best,
 Wei

 [1]
 https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html#raw
 [2]
 https://stackoverflow.com/questions/28157236/kryo-serialization-with-nested-hashmap-with-custom-class


 在 2020年11月19日,04:31,Pierre Oberholzer  写道:

 Hi Wei,

 It works ! Thanks a lot for your support.
 I hadn't tried this last combination for option 1, and I had wrong
 syntax for option 2.

 So to summarize..

 Methods working:
 - Current: DataTypeHint in UDF definition + SQL for UDF registering
 - Outdated: override getResultType in UDF definition
 + t_env.register_java_function for UDF registering

 Type conversions working:
 - scala.collection.immutable.Map[String,String] =>
 org.apache.flink.types.Row => ROW
 - scala.collection.immutable.Map[String,String] =>
 java.util.Map[String,String] => MAP

 Any hint for Map[String,Any] ?

 Best regards,

 Le mer. 18 nov. 2020 à 03:26, Wei Zhong  a
 écrit :

> Hi Pierre,
>
> Those 2 approaches all work in my local machine, this is my code:
>
> Scala UDF:
>
> package com.dummy
>
> import org.apache.flink.api.common.typeinfo.TypeInformation
> import 

Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-11-30 Thread Pierre Oberholzer
Hi Xingbo,

Many thanks for your follow up. Yes you got it right.
So using Table API and a ROW object for the nested output of my UDF, and
since types are mandatory, I guess this boils down to:
- How to nicely specify the types for the 100k fields : shall I use
TypeInformation [1] or better retrieve it from Schema Registry [2] ?
- Do I have to put NULL values for all the fields that don't have a value
in my JSON ?
- Will the resulting Table be "sparse" and suffer performance limitations ?
Let me know if Table API and ROW are the right candidates here, or if other
better alternatives exist.
As said I'd be glad to apply some downstream transformations using
key,value access (and possibly some Table <-> Pandas operations). Hope that
doesn't make it a too long wish list ;)

Thanks a lot !

Best regards,

[1]
https://stackoverflow.com/questions/48696875/how-to-attach-schema-to-a-flink-datastream-on-the-fly
[2]
https://docs.cloudera.com/csa/1.2.0/datastream-connectors/topics/csa-schema-registry.html

Le sam. 28 nov. 2020 à 04:04, Xingbo Huang  a écrit :

> Hi Pierre,
>
> Sorry for the late reply.
> Your requirement is that your `Table` has a `field` in `Json` format and
> its key has reached 100k, and then you want to use such a `field` as the
> input/output of `udf`, right? As to whether there is a limit on the number
> of nested key, I am not quite clear. Other contributors with experience in
> this area may have answers. On the part of `Python UDF`, if the type of key
> or value of your `Map` is `Any`, we do not support it now. You need to
> specify a specific type. For more information, please refer to the related
> document[1].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/python_types.html
>
> Best,
> Xingbo
>
> 2020年11月28日 上午12:49,Pierre Oberholzer  写道:
>
> Hello Wei, Dian, Xingbo,
>
> Not really sure when it is appropriate to knock on the door of the
> community ;)
> I just wanted to mention that your feedback on the above topic will be
> highly appreciated as it will condition the choice of framework on our side
> for the months to come, and potentially help the community to cover sparse
> data with Flink.
>
> Thanks a lot !
>
> Have a great week-end
>
> Best,
>
> Le ven. 20 nov. 2020 à 10:11, Pierre Oberholzer <
> pierre.oberhol...@gmail.com> a écrit :
>
>> Hi Wei,
>>
>> Thanks for the hint. May I please follow up by adding more context and
>> ask for your guidance.
>>
>> In case the bespoken Map[String,Any] object returned by Scala:
>>
>> - Has a defined schema (incl. nested) with up to 100k (!) different
>> possible keys
>> - Has only some portion of the keys populated for each record
>> - Is convertible to JSON
>> - Has to undergo downstream processing in Flink and/or Python UDF with
>> key value access
>> - Has to be ultimately stored in a Kafka/AVRO sink
>>
>> How would you declare the types explicitly in such a case ?
>>
>> Thanks for your support !
>>
>> Pierre
>>
>> Le jeu. 19 nov. 2020 à 03:54, Wei Zhong  a
>> écrit :
>>
>>> Hi Pierre,
>>>
>>> Currently there is no type hint like ‘Map[String, Any]’. The recommended
>>> way is declaring your type more explicitly.
>>>
>>> If you insist on doing this, you can try to declaring a RAW data type
>>> for java.util.HashMap [1], but you may encounter some troubles [2] related
>>> to the kryo serializers.
>>>
>>> Best,
>>> Wei
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html#raw
>>> [2]
>>> https://stackoverflow.com/questions/28157236/kryo-serialization-with-nested-hashmap-with-custom-class
>>>
>>>
>>> 在 2020年11月19日,04:31,Pierre Oberholzer  写道:
>>>
>>> Hi Wei,
>>>
>>> It works ! Thanks a lot for your support.
>>> I hadn't tried this last combination for option 1, and I had wrong
>>> syntax for option 2.
>>>
>>> So to summarize..
>>>
>>> Methods working:
>>> - Current: DataTypeHint in UDF definition + SQL for UDF registering
>>> - Outdated: override getResultType in UDF definition
>>> + t_env.register_java_function for UDF registering
>>>
>>> Type conversions working:
>>> - scala.collection.immutable.Map[String,String] =>
>>> org.apache.flink.types.Row => ROW
>>> - scala.collection.immutable.Map[String,String] =>
>>> java.util.Map[String,String] => MAP
>>>
>>> Any hint for Map[String,Any] ?
>>>
>>> Best regards,
>>>
>>> Le mer. 18 nov. 2020 à 03:26, Wei Zhong  a
>>> écrit :
>>>
 Hi Pierre,

 Those 2 approaches all work in my local machine, this is my code:

 Scala UDF:

 package com.dummy

 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.annotation.DataTypeHint
 import org.apache.flink.table.api.Types
 import org.apache.flink.table.functions.ScalarFunction
 import org.apache.flink.types.Row

 /**
   * The scala UDF.
   */
 class dummyMap extends ScalarFunction {

   // If the udf would be registered by the SQL statement, you 

Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-11-27 Thread Xingbo Huang
Hi Pierre,

Sorry for the late reply.
Your requirement is that your `Table` has a `field` in `Json` format and its 
key has reached 100k, and then you want to use such a `field` as the 
input/output of `udf`, right? As to whether there is a limit on the number of 
nested key, I am not quite clear. Other contributors with experience in this 
area may have answers. On the part of `Python UDF`, if the type of key or value 
of your `Map` is `Any`, we do not support it now. You need to specify a 
specific type. For more information, please refer to the related document[1].

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/python_types.html

Best,
Xingbo

> 2020年11月28日 上午12:49,Pierre Oberholzer  写道:
> 
> Hello Wei, Dian, Xingbo,
> 
> Not really sure when it is appropriate to knock on the door of the community 
> ;)
> I just wanted to mention that your feedback on the above topic will be highly 
> appreciated as it will condition the choice of framework on our side for the 
> months to come, and potentially help the community to cover sparse data with 
> Flink.
> 
> Thanks a lot !
> 
> Have a great week-end
> 
> Best,
> 
> Le ven. 20 nov. 2020 à 10:11, Pierre Oberholzer  > a écrit :
> Hi Wei,
> 
> Thanks for the hint. May I please follow up by adding more context and ask 
> for your guidance.
> 
> In case the bespoken Map[String,Any] object returned by Scala:
> 
> - Has a defined schema (incl. nested) with up to 100k (!) different possible 
> keys
> - Has only some portion of the keys populated for each record
> - Is convertible to JSON
> - Has to undergo downstream processing in Flink and/or Python UDF with key 
> value access
> - Has to be ultimately stored in a Kafka/AVRO sink
> 
> How would you declare the types explicitly in such a case ?
> 
> Thanks for your support !
> 
> Pierre
> 
> Le jeu. 19 nov. 2020 à 03:54, Wei Zhong  > a écrit :
> Hi Pierre,
> 
> Currently there is no type hint like ‘Map[String, Any]’. The recommended way 
> is declaring your type more explicitly.
> 
> If you insist on doing this, you can try to declaring a RAW data type for 
> java.util.HashMap [1], but you may encounter some troubles [2] related to the 
> kryo serializers.
> 
> Best,
> Wei
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html#raw
>  
> 
> [2] 
> https://stackoverflow.com/questions/28157236/kryo-serialization-with-nested-hashmap-with-custom-class
>  
> 
> 
> 
>> 在 2020年11月19日,04:31,Pierre Oberholzer > > 写道:
>> 
>> Hi Wei,
>> 
>> It works ! Thanks a lot for your support.
>> I hadn't tried this last combination for option 1, and I had wrong syntax 
>> for option 2.
>> 
>> So to summarize..
>> 
>> Methods working:
>> - Current: DataTypeHint in UDF definition + SQL for UDF registering
>> - Outdated: override getResultType in UDF definition + 
>> t_env.register_java_function for UDF registering
>> 
>> Type conversions working:
>> - scala.collection.immutable.Map[String,String] => 
>> org.apache.flink.types.Row => ROW
>> - scala.collection.immutable.Map[String,String] => 
>> java.util.Map[String,String] => MAP
>> 
>> Any hint for Map[String,Any] ?
>> 
>> Best regards,
>> 
>> Le mer. 18 nov. 2020 à 03:26, Wei Zhong > > a écrit :
>> Hi Pierre,
>> 
>> Those 2 approaches all work in my local machine, this is my code:
>> 
>> Scala UDF:
>> package com.dummy
>> 
>> import org.apache.flink.api.common.typeinfo.TypeInformation
>> import org.apache.flink.table.annotation.DataTypeHint
>> import org.apache.flink.table.api.Types
>> import org.apache.flink.table.functions.ScalarFunction
>> import org.apache.flink.types.Row
>> 
>> /**
>>   * The scala UDF.
>>   */
>> class dummyMap extends ScalarFunction {
>> 
>>   // If the udf would be registered by the SQL statement, you need add this 
>> typehint
>>   @DataTypeHint("ROW")
>>   def eval(): Row = {
>> 
>> Row.of(java.lang.String.valueOf("foo"), java.lang.String.valueOf("bar"))
>> 
>>   }
>> 
>>   // If the udf would be registered by the method 'register_java_function', 
>> you need override this
>>   // method.
>>   override def getResultType(signature: Array[Class[_]]): TypeInformation[_] 
>> = {
>> // The type of the return values should be TypeInformation
>> Types.ROW(Array("s", "t"), Array[TypeInformation[_]](Types.STRING(), 
>> Types.STRING()))
>>   }
>> }
>> Python code:
>> 
>> from pyflink.datastream import StreamExecutionEnvironment
>> from pyflink.table import StreamTableEnvironment
>> 
>> s_env = StreamExecutionEnvironment.get_execution_environment()
>> st_env = StreamTableEnvironment.create(s_env)
>> 
>> # load the scala udf jar file, the path should be 

Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-11-20 Thread Pierre Oberholzer
Hi Wei,

Thanks for the hint. May I please follow up by adding more context and ask
for your guidance.

In case the bespoken Map[String,Any] object returned by Scala:

- Has a defined schema (incl. nested) with up to 100k (!) different
possible keys
- Has only some portion of the keys populated for each record
- Is convertible to JSON
- Has to undergo downstream processing in Flink and/or Python UDF with key
value access
- Has to be ultimately stored in a Kafka/AVRO sink

How would you declare the types explicitly in such a case ?

Thanks for your support !

Pierre

Le jeu. 19 nov. 2020 à 03:54, Wei Zhong  a écrit :

> Hi Pierre,
>
> Currently there is no type hint like ‘Map[String, Any]’. The recommended
> way is declaring your type more explicitly.
>
> If you insist on doing this, you can try to declaring a RAW data type for
> java.util.HashMap [1], but you may encounter some troubles [2] related to
> the kryo serializers.
>
> Best,
> Wei
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html#raw
> [2]
> https://stackoverflow.com/questions/28157236/kryo-serialization-with-nested-hashmap-with-custom-class
>
>
> 在 2020年11月19日,04:31,Pierre Oberholzer  写道:
>
> Hi Wei,
>
> It works ! Thanks a lot for your support.
> I hadn't tried this last combination for option 1, and I had wrong
> syntax for option 2.
>
> So to summarize..
>
> Methods working:
> - Current: DataTypeHint in UDF definition + SQL for UDF registering
> - Outdated: override getResultType in UDF definition
> + t_env.register_java_function for UDF registering
>
> Type conversions working:
> - scala.collection.immutable.Map[String,String] =>
> org.apache.flink.types.Row => ROW
> - scala.collection.immutable.Map[String,String] =>
> java.util.Map[String,String] => MAP
>
> Any hint for Map[String,Any] ?
>
> Best regards,
>
> Le mer. 18 nov. 2020 à 03:26, Wei Zhong  a écrit :
>
>> Hi Pierre,
>>
>> Those 2 approaches all work in my local machine, this is my code:
>>
>> Scala UDF:
>>
>> package com.dummy
>>
>> import org.apache.flink.api.common.typeinfo.TypeInformation
>> import org.apache.flink.table.annotation.DataTypeHint
>> import org.apache.flink.table.api.Types
>> import org.apache.flink.table.functions.ScalarFunction
>> import org.apache.flink.types.Row
>>
>> /**
>>   * The scala UDF.
>>   */
>> class dummyMap extends ScalarFunction {
>>
>>   // If the udf would be registered by the SQL statement, you need add this 
>> typehint
>>   @DataTypeHint("ROW")
>>   def eval(): Row = {
>>
>> Row.of(java.lang.String.valueOf("foo"), java.lang.String.valueOf("bar"))
>>
>>   }
>>
>>   // If the udf would be registered by the method 'register_java_function', 
>> you need override this
>>   // method.
>>   override def getResultType(signature: Array[Class[_]]): TypeInformation[_] 
>> = {
>> // The type of the return values should be TypeInformation
>> Types.ROW(Array("s", "t"), Array[TypeInformation[_]](Types.STRING(), 
>> Types.STRING()))
>>   }
>> }
>>
>> Python code:
>>
>> from pyflink.datastream import StreamExecutionEnvironment
>> from pyflink.table import StreamTableEnvironment
>>
>> s_env = StreamExecutionEnvironment.get_execution_environment()
>> st_env = StreamTableEnvironment.create(s_env)
>>
>> # load the scala udf jar file, the path should be modified to yours
>> # or your can also load the jar file via other approaches
>> st_env.get_config().get_configuration().set_string("pipeline.jars", "
>> file:///Users/zhongwei/the-dummy-udf.jar")
>>
>> # register the udf via
>> st_env.execute_sql("CREATE FUNCTION dummyMap AS 'com.dummy.dummyMap'
>> LANGUAGE SCALA")
>> # or register via the method
>> # st_env.register_java_function("dummyMap", "com.dummy.dummyMap")
>>
>> # prepare source and sink
>> t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a',
>> 'b', 'c'])
>> st_env.execute_sql("""create table mySink (
>> output_of_my_scala_udf ROW
>> ) with (
>> 'connector' = 'print'
>> )""")
>>
>> # execute query
>>
>> t.select("dummyMap()").execute_insert("mySink").get_job_client().get_job_execution_result().result()
>>
>> Best,
>> Wei
>>
>> 在 2020年11月18日,03:28,Pierre Oberholzer  写道:
>>
>> Hi Wei,
>>
>> True, I'm using the method you mention, but glad to change.
>> I tried your suggestion instead, but got a similar error.
>>
>> Thanks for your support. That is much more tedious than I thought.
>>
>> *Option 1 - SQL UDF*
>>
>> *SQL UDF*
>> create_func_ddl = """
>> CREATE FUNCTION dummyMap
>>   AS 'com.dummy.dummyMap' LANGUAGE SCALA
>> """
>>
>> t_env.execute_sql(create_func_ddl)
>>
>> *Error*
>> Py4JJavaError: An error occurred while calling o672.execute.
>> : org.apache.flink.table.api.TableException: Result field does not match
>> requested type. Requested: Row(s: String, t: String); Actual:
>> GenericType
>>
>> *Option 2 *- *Overriding getResultType*
>>
>> Back to the old registering method, but overriding getResultType:
>>
>> 

Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-11-18 Thread Wei Zhong
Hi Pierre,

Currently there is no type hint like ‘Map[String, Any]’. The recommended way is 
declaring your type more explicitly.

If you insist on doing this, you can try to declaring a RAW data type for 
java.util.HashMap [1], but you may encounter some troubles [2] related to the 
kryo serializers.

Best,
Wei

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html#raw
 

[2] 
https://stackoverflow.com/questions/28157236/kryo-serialization-with-nested-hashmap-with-custom-class
 



> 在 2020年11月19日,04:31,Pierre Oberholzer  写道:
> 
> Hi Wei,
> 
> It works ! Thanks a lot for your support.
> I hadn't tried this last combination for option 1, and I had wrong syntax for 
> option 2.
> 
> So to summarize..
> 
> Methods working:
> - Current: DataTypeHint in UDF definition + SQL for UDF registering
> - Outdated: override getResultType in UDF definition + 
> t_env.register_java_function for UDF registering
> 
> Type conversions working:
> - scala.collection.immutable.Map[String,String] => org.apache.flink.types.Row 
> => ROW
> - scala.collection.immutable.Map[String,String] => 
> java.util.Map[String,String] => MAP
> 
> Any hint for Map[String,Any] ?
> 
> Best regards,
> 
> Le mer. 18 nov. 2020 à 03:26, Wei Zhong  > a écrit :
> Hi Pierre,
> 
> Those 2 approaches all work in my local machine, this is my code:
> 
> Scala UDF:
> package com.dummy
> 
> import org.apache.flink.api.common.typeinfo.TypeInformation
> import org.apache.flink.table.annotation.DataTypeHint
> import org.apache.flink.table.api.Types
> import org.apache.flink.table.functions.ScalarFunction
> import org.apache.flink.types.Row
> 
> /**
>   * The scala UDF.
>   */
> class dummyMap extends ScalarFunction {
> 
>   // If the udf would be registered by the SQL statement, you need add this 
> typehint
>   @DataTypeHint("ROW")
>   def eval(): Row = {
> 
> Row.of(java.lang.String.valueOf("foo"), java.lang.String.valueOf("bar"))
> 
>   }
> 
>   // If the udf would be registered by the method 'register_java_function', 
> you need override this
>   // method.
>   override def getResultType(signature: Array[Class[_]]): TypeInformation[_] 
> = {
> // The type of the return values should be TypeInformation
> Types.ROW(Array("s", "t"), Array[TypeInformation[_]](Types.STRING(), 
> Types.STRING()))
>   }
> }
> Python code:
> 
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import StreamTableEnvironment
> 
> s_env = StreamExecutionEnvironment.get_execution_environment()
> st_env = StreamTableEnvironment.create(s_env)
> 
> # load the scala udf jar file, the path should be modified to yours
> # or your can also load the jar file via other approaches
> st_env.get_config().get_configuration().set_string("pipeline.jars", 
> "file:///Users/zhongwei/the-dummy-udf.jar <>")
> 
> # register the udf via 
> st_env.execute_sql("CREATE FUNCTION dummyMap AS 'com.dummy.dummyMap' LANGUAGE 
> SCALA")
> # or register via the method
> # st_env.register_java_function("dummyMap", "com.dummy.dummyMap")
> 
> # prepare source and sink
> t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 
> 'c'])
> st_env.execute_sql("""create table mySink (
> output_of_my_scala_udf ROW
> ) with (
> 'connector' = 'print'
> )""")
> 
> # execute query
> t.select("dummyMap()").execute_insert("mySink").get_job_client().get_job_execution_result().result()
> 
> Best,
> Wei
> 
>> 在 2020年11月18日,03:28,Pierre Oberholzer > > 写道:
>> 
>> Hi Wei,
>> 
>> True, I'm using the method you mention, but glad to change. 
>> I tried your suggestion instead, but got a similar error.
>> 
>> Thanks for your support. That is much more tedious than I thought.
>> 
>> Option 1 - SQL UDF
>> 
>> SQL UDF
>> create_func_ddl = """
>> CREATE FUNCTION dummyMap 
>>   AS 'com.dummy.dummyMap' LANGUAGE SCALA
>> """
>>   
>> t_env.execute_sql(create_func_ddl)
>> 
>> Error
>> Py4JJavaError: An error occurred while calling o672.execute.
>> : org.apache.flink.table.api.TableException: Result field does not match 
>> requested type. Requested: Row(s: String, t: String); Actual: 
>> GenericType
>> 
>> Option 2 - Overriding getResultType
>> 
>> Back to the old registering method, but overriding getResultType:
>> 
>> t_env.register_java_function("dummyMap","com.dummy.dummyMap")
>> 
>> Scala UDF
>> class dummyMap() extends ScalarFunction {
>> 
>>   def eval(): Row = {
>> 
>>   Row.of(java.lang.String.valueOf("foo"), 
>> java.lang.String.valueOf("bar"))
>> 
>>   }
>> 
>>   override def getResultType(signature: Array[Class[_]]): TypeInformation[_] 
>> = DataTypes.ROW(DataTypes.STRING,DataTypes.STRING)
>> }
>> 
>> Error (on compilation)
>> 
>> [error] dummyMap.scala:66:90: 

Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-11-18 Thread Pierre Oberholzer
Hi Wei,

It works ! Thanks a lot for your support.
I hadn't tried this last combination for option 1, and I had wrong
syntax for option 2.

So to summarize..

Methods working:
- Current: DataTypeHint in UDF definition + SQL for UDF registering
- Outdated: override getResultType in UDF definition
+ t_env.register_java_function for UDF registering

Type conversions working:
- scala.collection.immutable.Map[String,String] =>
org.apache.flink.types.Row => ROW
- scala.collection.immutable.Map[String,String] =>
java.util.Map[String,String] => MAP

Any hint for Map[String,Any] ?

Best regards,

Le mer. 18 nov. 2020 à 03:26, Wei Zhong  a écrit :

> Hi Pierre,
>
> Those 2 approaches all work in my local machine, this is my code:
>
> Scala UDF:
>
> package com.dummy
>
> import org.apache.flink.api.common.typeinfo.TypeInformation
> import org.apache.flink.table.annotation.DataTypeHint
> import org.apache.flink.table.api.Types
> import org.apache.flink.table.functions.ScalarFunction
> import org.apache.flink.types.Row
>
> /**
>   * The scala UDF.
>   */
> class dummyMap extends ScalarFunction {
>
>   // If the udf would be registered by the SQL statement, you need add this 
> typehint
>   @DataTypeHint("ROW")
>   def eval(): Row = {
>
> Row.of(java.lang.String.valueOf("foo"), java.lang.String.valueOf("bar"))
>
>   }
>
>   // If the udf would be registered by the method 'register_java_function', 
> you need override this
>   // method.
>   override def getResultType(signature: Array[Class[_]]): TypeInformation[_] 
> = {
> // The type of the return values should be TypeInformation
> Types.ROW(Array("s", "t"), Array[TypeInformation[_]](Types.STRING(), 
> Types.STRING()))
>   }
> }
>
> Python code:
>
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import StreamTableEnvironment
>
> s_env = StreamExecutionEnvironment.get_execution_environment()
> st_env = StreamTableEnvironment.create(s_env)
>
> # load the scala udf jar file, the path should be modified to yours
> # or your can also load the jar file via other approaches
> st_env.get_config().get_configuration().set_string("pipeline.jars", "
> file:///Users/zhongwei/the-dummy-udf.jar")
>
> # register the udf via
> st_env.execute_sql("CREATE FUNCTION dummyMap AS 'com.dummy.dummyMap'
> LANGUAGE SCALA")
> # or register via the method
> # st_env.register_java_function("dummyMap", "com.dummy.dummyMap")
>
> # prepare source and sink
> t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a',
> 'b', 'c'])
> st_env.execute_sql("""create table mySink (
> output_of_my_scala_udf ROW
> ) with (
> 'connector' = 'print'
> )""")
>
> # execute query
>
> t.select("dummyMap()").execute_insert("mySink").get_job_client().get_job_execution_result().result()
>
> Best,
> Wei
>
> 在 2020年11月18日,03:28,Pierre Oberholzer  写道:
>
> Hi Wei,
>
> True, I'm using the method you mention, but glad to change.
> I tried your suggestion instead, but got a similar error.
>
> Thanks for your support. That is much more tedious than I thought.
>
> *Option 1 - SQL UDF*
>
> *SQL UDF*
> create_func_ddl = """
> CREATE FUNCTION dummyMap
>   AS 'com.dummy.dummyMap' LANGUAGE SCALA
> """
>
> t_env.execute_sql(create_func_ddl)
>
> *Error*
> Py4JJavaError: An error occurred while calling o672.execute.
> : org.apache.flink.table.api.TableException: Result field does not match
> requested type. Requested: Row(s: String, t: String); Actual:
> GenericType
>
> *Option 2 *- *Overriding getResultType*
>
> Back to the old registering method, but overriding getResultType:
>
> t_env.register_java_function("dummyMap","com.dummy.dummyMap")
>
> *Scala UDF*
> class dummyMap() extends ScalarFunction {
>
>   def eval(): Row = {
>
>   Row.of(java.lang.String.valueOf("foo"),
> java.lang.String.valueOf("bar"))
>
>   }
>
>   override def getResultType(signature: Array[Class[_]]):
> TypeInformation[_] = DataTypes.ROW(DataTypes.STRING,DataTypes.STRING)
> }
>
> *Error (on compilation)*
>
> [error] dummyMap.scala:66:90: overloaded method value ROW with
> alternatives:
> [error]   (x$1:
> org.apache.flink.table.api.DataTypes.AbstractField*)org.apache.flink.table.types.UnresolvedDataType
> 
> [error]   ()org.apache.flink.table.types.DataType 
> [error]   (x$1:
> org.apache.flink.table.api.DataTypes.Field*)org.apache.flink.table.types.DataType
> [error]  cannot be applied to (org.apache.flink.table.types.DataType,
> org.apache.flink.table.types.DataType)
> [error]   override def getResultType(signature: Array[Class[_]]):
> TypeInformation[_] = DataTypes.ROW(DataTypes.STRING,DataTypes.STRING)
> [error]
> ^
> [error] one error found
> [error] (Compile / compileIncremental) Compilation failed
> [error] Total time: 3 s, completed 17 nov. 2020 à 20:00:01
>
> Le mar. 17 nov. 2020 à 14:01, Wei Zhong  a écrit :
>
>> Hi Pierre,
>>
>> I guess your UDF is registered by the method 'register_java_function'
>> which uses the old type system. In this 

Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-11-17 Thread Wei Zhong
Hi Pierre,

Those 2 approaches all work in my local machine, this is my code:

Scala UDF:
package com.dummy

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.annotation.DataTypeHint
import org.apache.flink.table.api.Types
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.types.Row

/**
  * The scala UDF.
  */
class dummyMap extends ScalarFunction {

  // If the udf would be registered by the SQL statement, you need add this 
typehint
  @DataTypeHint("ROW")
  def eval(): Row = {

Row.of(java.lang.String.valueOf("foo"), java.lang.String.valueOf("bar"))

  }

  // If the udf would be registered by the method 'register_java_function', you 
need override this
  // method.
  override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = {
// The type of the return values should be TypeInformation
Types.ROW(Array("s", "t"), Array[TypeInformation[_]](Types.STRING(), 
Types.STRING()))
  }
}
Python code:

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

s_env = StreamExecutionEnvironment.get_execution_environment()
st_env = StreamTableEnvironment.create(s_env)

# load the scala udf jar file, the path should be modified to yours
# or your can also load the jar file via other approaches
st_env.get_config().get_configuration().set_string("pipeline.jars", 
"file:///Users/zhongwei/the-dummy-udf.jar")

# register the udf via 
st_env.execute_sql("CREATE FUNCTION dummyMap AS 'com.dummy.dummyMap' LANGUAGE 
SCALA")
# or register via the method
# st_env.register_java_function("dummyMap", "com.dummy.dummyMap")

# prepare source and sink
t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 
'c'])
st_env.execute_sql("""create table mySink (
output_of_my_scala_udf ROW
) with (
'connector' = 'print'
)""")

# execute query
t.select("dummyMap()").execute_insert("mySink").get_job_client().get_job_execution_result().result()

Best,
Wei

> 在 2020年11月18日,03:28,Pierre Oberholzer  写道:
> 
> Hi Wei,
> 
> True, I'm using the method you mention, but glad to change. 
> I tried your suggestion instead, but got a similar error.
> 
> Thanks for your support. That is much more tedious than I thought.
> 
> Option 1 - SQL UDF
> 
> SQL UDF
> create_func_ddl = """
> CREATE FUNCTION dummyMap 
>   AS 'com.dummy.dummyMap' LANGUAGE SCALA
> """
>   
> t_env.execute_sql(create_func_ddl)
> 
> Error
> Py4JJavaError: An error occurred while calling o672.execute.
> : org.apache.flink.table.api.TableException: Result field does not match 
> requested type. Requested: Row(s: String, t: String); Actual: 
> GenericType
> 
> Option 2 - Overriding getResultType
> 
> Back to the old registering method, but overriding getResultType:
> 
> t_env.register_java_function("dummyMap","com.dummy.dummyMap")
> 
> Scala UDF
> class dummyMap() extends ScalarFunction {
> 
>   def eval(): Row = {
> 
>   Row.of(java.lang.String.valueOf("foo"), java.lang.String.valueOf("bar"))
> 
>   }
> 
>   override def getResultType(signature: Array[Class[_]]): TypeInformation[_] 
> = DataTypes.ROW(DataTypes.STRING,DataTypes.STRING)
> }
> 
> Error (on compilation)
> 
> [error] dummyMap.scala:66:90: overloaded method value ROW with alternatives:
> [error]   (x$1: 
> org.apache.flink.table.api.DataTypes.AbstractField*)org.apache.flink.table.types.UnresolvedDataType
>  
> [error]   ()org.apache.flink.table.types.DataType 
> [error]   (x$1: 
> org.apache.flink.table.api.DataTypes.Field*)org.apache.flink.table.types.DataType
> [error]  cannot be applied to (org.apache.flink.table.types.DataType, 
> org.apache.flink.table.types.DataType)
> [error]   override def getResultType(signature: Array[Class[_]]): 
> TypeInformation[_] = DataTypes.ROW(DataTypes.STRING,DataTypes.STRING)
> [error]   
>^
> [error] one error found
> [error] (Compile / compileIncremental) Compilation failed
> [error] Total time: 3 s, completed 17 nov. 2020 à 20:00:01
> 
> Le mar. 17 nov. 2020 à 14:01, Wei Zhong  > a écrit :
> Hi Pierre,
> 
> I guess your UDF is registered by the method 'register_java_function' which 
> uses the old type system. In this situation you need to override the 
> 'getResultType' method instead of adding type hint. 
> 
> You can also try to register your UDF via the "CREATE FUNCTION" sql 
> statement, which accepts the type hint.
> 
> Best,
> Wei
> 
>> 在 2020年11月17日,19:29,Pierre Oberholzer > > 写道:
>> 
>> Hi Wei,
>> 
>> Thanks for your suggestion. Same error.
>> 
>> Scala UDF
>> 
>> @FunctionHint(output = new DataTypeHint("ROW"))
>> class dummyMap() extends ScalarFunction {
>>   def eval(): Row = {
>> Row.of(java.lang.String.valueOf("foo"), java.lang.String.valueOf("bar"))
>>   }
>> }
>> 
>> Best regards,
>> 
>> Le mar. 17 nov. 2020 à 10:04, Wei Zhong 

Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-11-17 Thread Pierre Oberholzer
Hi Wei,

True, I'm using the method you mention, but glad to change.
I tried your suggestion instead, but got a similar error.

Thanks for your support. That is much more tedious than I thought.

*Option 1 - SQL UDF*

*SQL UDF*
create_func_ddl = """
CREATE FUNCTION dummyMap
  AS 'com.dummy.dummyMap' LANGUAGE SCALA
"""

t_env.execute_sql(create_func_ddl)

*Error*
Py4JJavaError: An error occurred while calling o672.execute.
: org.apache.flink.table.api.TableException: Result field does not match
requested type. Requested: Row(s: String, t: String); Actual:
GenericType

*Option 2 *- *Overriding getResultType*

Back to the old registering method, but overriding getResultType:

t_env.register_java_function("dummyMap","com.dummy.dummyMap")

*Scala UDF*
class dummyMap() extends ScalarFunction {

  def eval(): Row = {

  Row.of(java.lang.String.valueOf("foo"),
java.lang.String.valueOf("bar"))

  }

  override def getResultType(signature: Array[Class[_]]):
TypeInformation[_] = DataTypes.ROW(DataTypes.STRING,DataTypes.STRING)
}

*Error (on compilation)*

[error] dummyMap.scala:66:90: overloaded method value ROW with alternatives:

[error]   (x$1:
org.apache.flink.table.api.DataTypes.AbstractField*)org.apache.flink.table.types.UnresolvedDataType


[error]   ()org.apache.flink.table.types.DataType 

[error]   (x$1:
org.apache.flink.table.api.DataTypes.Field*)org.apache.flink.table.types.DataType

[error]  cannot be applied to (org.apache.flink.table.types.DataType,
org.apache.flink.table.types.DataType)

[error]   override def getResultType(signature: Array[Class[_]]):
TypeInformation[_] = DataTypes.ROW(DataTypes.STRING,DataTypes.STRING)

[error]
  ^

[error] one error found

[error] (Compile / compileIncremental) Compilation failed

[error] Total time: 3 s, completed 17 nov. 2020 à 20:00:01

Le mar. 17 nov. 2020 à 14:01, Wei Zhong  a écrit :

> Hi Pierre,
>
> I guess your UDF is registered by the method 'register_java_function'
> which uses the old type system. In this situation you need to override the
> 'getResultType' method instead of adding type hint.
>
> You can also try to register your UDF via the "CREATE FUNCTION" sql
> statement, which accepts the type hint.
>
> Best,
> Wei
>
> 在 2020年11月17日,19:29,Pierre Oberholzer  写道:
>
> Hi Wei,
>
> Thanks for your suggestion. Same error.
>
> *Scala UDF*
>
> @FunctionHint(output = new DataTypeHint("ROW"))
> class dummyMap() extends ScalarFunction {
>   def eval(): Row = {
> Row.of(java.lang.String.valueOf("foo"),
> java.lang.String.valueOf("bar"))
>   }
> }
>
> Best regards,
>
> Le mar. 17 nov. 2020 à 10:04, Wei Zhong  a écrit :
>
>> Hi Pierre,
>>
>> You can try to replace the '@DataTypeHint("ROW")' with
>> '@FunctionHint(output = new DataTypeHint("ROW”))'
>>
>> Best,
>> Wei
>>
>> 在 2020年11月17日,15:45,Pierre Oberholzer  写道:
>>
>> Hi Dian, Community,
>>
>> (bringing the thread back to wider audience)
>>
>> As you suggested, I've tried to use DataTypeHint with Row instead of Map but
>> also this simple case leads to a type mismatch between UDF and Table API.
>> I've also tried other Map objects from Flink (table.data.MapData,
>> flink.types.MapValue, flink.table.api.DataTypes.MAP) in addition to Java
>> (java.util.Map) in combination with DataTypeHint, without success.
>> N.B. I'm using version 1.11.
>>
>> Am I doing something wrong or am I facing limitations in the toolkit ?
>>
>> Thanks in advance for your support !
>>
>> Best regards,
>>
>> *Scala UDF*
>>
>> class dummyMap() extends ScalarFunction {
>>
>>  @DataTypeHint("ROW")
>>  def eval(): Row = {
>>
>> Row.of(java.lang.String.valueOf("foo"),
>> java.lang.String.valueOf("bar"))
>>
>>   }
>> }
>>
>> *Table DDL*
>>
>> my_sink_ddl = f"""
>> create table mySink (
>> output_of_my_scala_udf ROW
>> ) with (
>> ...
>> )
>> """
>>
>> *Error*
>>
>> Py4JJavaError: An error occurred while calling o2.execute.
>> : org.apache.flink.table.api.ValidationException: Field types of query
>> result and registered TableSink
>> `default_catalog`.`default_database`.`mySink` do not match.
>> Query result schema: [output_of_my_scala_udf:
>> GenericType]
>> TableSink schema:[output_of_my_scala_udf: Row(s: String, t: String)]
>>
>>
>>
>> Le ven. 13 nov. 2020 à 11:59, Pierre Oberholzer <
>> pierre.oberhol...@gmail.com> a écrit :
>>
>>> Thanks Dian, but same error when using explicit returned type:
>>>
>>> class dummyMap() extends ScalarFunction {
>>>
>>>   def eval() : util.Map[java.lang.String,java.lang.String] = {
>>>
>>> val states = Map("key1" -> "val1", "key2" -> "val2")
>>> states.asInstanceOf[util.Map[java.lang.String,java.lang.String]]
>>>
>>>   }
>>> }
>>>
>>> Le ven. 13 nov. 2020 à 10:34, Dian Fu  a écrit :
>>>
 You need to explicitly defined the result type the UDF. You could refer
 to [1] for more details if you are using Flink 1.11. If you are using other
 versions of Flink, you need to refer to the corresponding documentation.

 [1]
 

Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-11-17 Thread Wei Zhong
Hi Pierre,

I guess your UDF is registered by the method 'register_java_function' which 
uses the old type system. In this situation you need to override the 
'getResultType' method instead of adding type hint. 

You can also try to register your UDF via the "CREATE FUNCTION" sql statement, 
which accepts the type hint.

Best,
Wei

> 在 2020年11月17日,19:29,Pierre Oberholzer  写道:
> 
> Hi Wei,
> 
> Thanks for your suggestion. Same error.
> 
> Scala UDF
> 
> @FunctionHint(output = new DataTypeHint("ROW"))
> class dummyMap() extends ScalarFunction {
>   def eval(): Row = {
> Row.of(java.lang.String.valueOf("foo"), java.lang.String.valueOf("bar"))
>   }
> }
> 
> Best regards,
> 
> Le mar. 17 nov. 2020 à 10:04, Wei Zhong  > a écrit :
> Hi Pierre,
> 
> You can try to replace the '@DataTypeHint("ROW")' with 
> '@FunctionHint(output = new DataTypeHint("ROW”))'
> 
> Best,
> Wei
> 
>> 在 2020年11月17日,15:45,Pierre Oberholzer > > 写道:
>> 
>> Hi Dian, Community,
>> 
>> (bringing the thread back to wider audience)
>> 
>> As you suggested, I've tried to use DataTypeHint with Row instead of Map but 
>> also this simple case leads to a type mismatch between UDF and Table API.
>> I've also tried other Map objects from Flink (table.data.MapData, 
>> flink.types.MapValue, flink.table.api.DataTypes.MAP) in addition to Java 
>> (java.util.Map) in combination with DataTypeHint, without success.
>> N.B. I'm using version 1.11.
>> 
>> Am I doing something wrong or am I facing limitations in the toolkit ?
>> 
>> Thanks in advance for your support !
>> 
>> Best regards,
>> 
>> Scala UDF
>> 
>> class dummyMap() extends ScalarFunction {
>> 
>>  @DataTypeHint("ROW")
>>  def eval(): Row = {
>> 
>> Row.of(java.lang.String.valueOf("foo"), java.lang.String.valueOf("bar"))
>> 
>>   }
>> }
>> 
>> Table DDL
>> 
>> my_sink_ddl = f"""
>> create table mySink (
>> output_of_my_scala_udf ROW
>> ) with (
>> ...
>> )
>> """
>> 
>> Error
>> 
>> Py4JJavaError: An error occurred while calling o2.execute.
>> : org.apache.flink.table.api.ValidationException: Field types of query 
>> result and registered TableSink 
>> `default_catalog`.`default_database`.`mySink` do not match.
>> Query result schema: [output_of_my_scala_udf: 
>> GenericType]
>> TableSink schema:[output_of_my_scala_udf: Row(s: String, t: String)]
>> 
>> 
>> 
>> Le ven. 13 nov. 2020 à 11:59, Pierre Oberholzer > > a écrit :
>> Thanks Dian, but same error when using explicit returned type:
>> 
>> class dummyMap() extends ScalarFunction {
>> 
>>   def eval() : util.Map[java.lang.String,java.lang.String] = {
>> 
>> val states = Map("key1" -> "val1", "key2" -> "val2")
>> states.asInstanceOf[util.Map[java.lang.String,java.lang.String]]
>> 
>>   }
>> }
>> 
>> Le ven. 13 nov. 2020 à 10:34, Dian Fu > > a écrit :
>> You need to explicitly defined the result type the UDF. You could refer to 
>> [1] for more details if you are using Flink 1.11. If you are using other 
>> versions of Flink, you need to refer to the corresponding documentation.
>> 
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide
>>  
>> 
>>> 在 2020年11月13日,下午4:56,Pierre Oberholzer >> > 写道:
>>> 
>>> ScalarFunction
>> 
>> 
>> 
>> -- 
>> Pierre
>> 
>> -- 
>> Pierre
> 
> 
> 
> -- 
> Pierre



Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-11-17 Thread Pierre Oberholzer
Hi Wei,

Thanks for your suggestion. Same error.

*Scala UDF*

@FunctionHint(output = new DataTypeHint("ROW"))
class dummyMap() extends ScalarFunction {
  def eval(): Row = {
Row.of(java.lang.String.valueOf("foo"), java.lang.String.valueOf("bar"))
  }
}

Best regards,

Le mar. 17 nov. 2020 à 10:04, Wei Zhong  a écrit :

> Hi Pierre,
>
> You can try to replace the '@DataTypeHint("ROW")' with
> '@FunctionHint(output = new DataTypeHint("ROW”))'
>
> Best,
> Wei
>
> 在 2020年11月17日,15:45,Pierre Oberholzer  写道:
>
> Hi Dian, Community,
>
> (bringing the thread back to wider audience)
>
> As you suggested, I've tried to use DataTypeHint with Row instead of Map but
> also this simple case leads to a type mismatch between UDF and Table API.
> I've also tried other Map objects from Flink (table.data.MapData,
> flink.types.MapValue, flink.table.api.DataTypes.MAP) in addition to Java
> (java.util.Map) in combination with DataTypeHint, without success.
> N.B. I'm using version 1.11.
>
> Am I doing something wrong or am I facing limitations in the toolkit ?
>
> Thanks in advance for your support !
>
> Best regards,
>
> *Scala UDF*
>
> class dummyMap() extends ScalarFunction {
>
>  @DataTypeHint("ROW")
>  def eval(): Row = {
>
> Row.of(java.lang.String.valueOf("foo"),
> java.lang.String.valueOf("bar"))
>
>   }
> }
>
> *Table DDL*
>
> my_sink_ddl = f"""
> create table mySink (
> output_of_my_scala_udf ROW
> ) with (
> ...
> )
> """
>
> *Error*
>
> Py4JJavaError: An error occurred while calling o2.execute.
> : org.apache.flink.table.api.ValidationException: Field types of query
> result and registered TableSink
> `default_catalog`.`default_database`.`mySink` do not match.
> Query result schema: [output_of_my_scala_udf:
> GenericType]
> TableSink schema:[output_of_my_scala_udf: Row(s: String, t: String)]
>
>
>
> Le ven. 13 nov. 2020 à 11:59, Pierre Oberholzer <
> pierre.oberhol...@gmail.com> a écrit :
>
>> Thanks Dian, but same error when using explicit returned type:
>>
>> class dummyMap() extends ScalarFunction {
>>
>>   def eval() : util.Map[java.lang.String,java.lang.String] = {
>>
>> val states = Map("key1" -> "val1", "key2" -> "val2")
>> states.asInstanceOf[util.Map[java.lang.String,java.lang.String]]
>>
>>   }
>> }
>>
>> Le ven. 13 nov. 2020 à 10:34, Dian Fu  a écrit :
>>
>>> You need to explicitly defined the result type the UDF. You could refer
>>> to [1] for more details if you are using Flink 1.11. If you are using other
>>> versions of Flink, you need to refer to the corresponding documentation.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide
>>>
>>> 在 2020年11月13日,下午4:56,Pierre Oberholzer  写道:
>>>
>>> ScalarFunction
>>>
>>>
>>>
>>
>> --
>> Pierre
>>
>
> --
> Pierre
>
>
>

-- 
Pierre


Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-11-17 Thread Wei Zhong
Hi Pierre,

You can try to replace the '@DataTypeHint("ROW")' with 
'@FunctionHint(output = new DataTypeHint("ROW”))'

Best,
Wei

> 在 2020年11月17日,15:45,Pierre Oberholzer  写道:
> 
> Hi Dian, Community,
> 
> (bringing the thread back to wider audience)
> 
> As you suggested, I've tried to use DataTypeHint with Row instead of Map but 
> also this simple case leads to a type mismatch between UDF and Table API.
> I've also tried other Map objects from Flink (table.data.MapData, 
> flink.types.MapValue, flink.table.api.DataTypes.MAP) in addition to Java 
> (java.util.Map) in combination with DataTypeHint, without success.
> N.B. I'm using version 1.11.
> 
> Am I doing something wrong or am I facing limitations in the toolkit ?
> 
> Thanks in advance for your support !
> 
> Best regards,
> 
> Scala UDF
> 
> class dummyMap() extends ScalarFunction {
> 
>  @DataTypeHint("ROW")
>  def eval(): Row = {
> 
> Row.of(java.lang.String.valueOf("foo"), java.lang.String.valueOf("bar"))
> 
>   }
> }
> 
> Table DDL
> 
> my_sink_ddl = f"""
> create table mySink (
> output_of_my_scala_udf ROW
> ) with (
> ...
> )
> """
> 
> Error
> 
> Py4JJavaError: An error occurred while calling o2.execute.
> : org.apache.flink.table.api.ValidationException: Field types of query result 
> and registered TableSink `default_catalog`.`default_database`.`mySink` do not 
> match.
> Query result schema: [output_of_my_scala_udf: 
> GenericType]
> TableSink schema:[output_of_my_scala_udf: Row(s: String, t: String)]
> 
> 
> 
> Le ven. 13 nov. 2020 à 11:59, Pierre Oberholzer  > a écrit :
> Thanks Dian, but same error when using explicit returned type:
> 
> class dummyMap() extends ScalarFunction {
> 
>   def eval() : util.Map[java.lang.String,java.lang.String] = {
> 
> val states = Map("key1" -> "val1", "key2" -> "val2")
> states.asInstanceOf[util.Map[java.lang.String,java.lang.String]]
> 
>   }
> }
> 
> Le ven. 13 nov. 2020 à 10:34, Dian Fu  > a écrit :
> You need to explicitly defined the result type the UDF. You could refer to 
> [1] for more details if you are using Flink 1.11. If you are using other 
> versions of Flink, you need to refer to the corresponding documentation.
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide
>  
> 
>> 在 2020年11月13日,下午4:56,Pierre Oberholzer > > 写道:
>> 
>> ScalarFunction
> 
> 
> 
> -- 
> Pierre
> 
> -- 
> Pierre



Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-11-16 Thread Pierre Oberholzer
Hi Dian, Community,

(bringing the thread back to wider audience)

As you suggested, I've tried to use DataTypeHint with Row instead of Map but
also this simple case leads to a type mismatch between UDF and Table API.
I've also tried other Map objects from Flink (table.data.MapData,
flink.types.MapValue, flink.table.api.DataTypes.MAP) in addition to Java
(java.util.Map) in combination with DataTypeHint, without success.
N.B. I'm using version 1.11.

Am I doing something wrong or am I facing limitations in the toolkit ?

Thanks in advance for your support !

Best regards,

*Scala UDF*

class dummyMap() extends ScalarFunction {

 @DataTypeHint("ROW")
 def eval(): Row = {

Row.of(java.lang.String.valueOf("foo"), java.lang.String.valueOf("bar"))

  }
}

*Table DDL*

my_sink_ddl = f"""
create table mySink (
output_of_my_scala_udf ROW
) with (
...
)
"""

*Error*

Py4JJavaError: An error occurred while calling o2.execute.
: org.apache.flink.table.api.ValidationException: Field types of query
result and registered TableSink
`default_catalog`.`default_database`.`mySink` do not match.
Query result schema: [output_of_my_scala_udf:
GenericType]
TableSink schema:[output_of_my_scala_udf: Row(s: String, t: String)]



Le ven. 13 nov. 2020 à 11:59, Pierre Oberholzer 
a écrit :

> Thanks Dian, but same error when using explicit returned type:
>
> class dummyMap() extends ScalarFunction {
>
>   def eval() : util.Map[java.lang.String,java.lang.String] = {
>
> val states = Map("key1" -> "val1", "key2" -> "val2")
> states.asInstanceOf[util.Map[java.lang.String,java.lang.String]]
>
>   }
> }
>
> Le ven. 13 nov. 2020 à 10:34, Dian Fu  a écrit :
>
>> You need to explicitly defined the result type the UDF. You could refer
>> to [1] for more details if you are using Flink 1.11. If you are using other
>> versions of Flink, you need to refer to the corresponding documentation.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide
>>
>> 在 2020年11月13日,下午4:56,Pierre Oberholzer  写道:
>>
>> ScalarFunction
>>
>>
>>
>
> --
> Pierre
>

-- 
Pierre


Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-11-13 Thread Pierre Oberholzer
Thanks Dian, but same error when using explicit returned type:

class dummyMap() extends ScalarFunction {

  def eval() : util.Map[java.lang.String,java.lang.String] = {

val states = Map("key1" -> "val1", "key2" -> "val2")
states.asInstanceOf[util.Map[java.lang.String,java.lang.String]]

  }
}

Le ven. 13 nov. 2020 à 10:34, Dian Fu  a écrit :

> You need to explicitly defined the result type the UDF. You could refer to
> [1] for more details if you are using Flink 1.11. If you are using other
> versions of Flink, you need to refer to the corresponding documentation.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide
>
> 在 2020年11月13日,下午4:56,Pierre Oberholzer  写道:
>
> ScalarFunction
>
>
>

-- 
Pierre Oberholzer
Hohlstrasse 507
CH - 8048 Zürich
+41 77 402 17 07


Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-11-13 Thread Dian Fu
You need to explicitly defined the result type the UDF. You could refer to [1] 
for more details if you are using Flink 1.11. If you are using other versions 
of Flink, you need to refer to the corresponding documentation.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide

> 在 2020年11月13日,下午4:56,Pierre Oberholzer  写道:
> 
> ScalarFunction



PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-11-13 Thread Pierre Oberholzer
Hi,

I'm trying to use a Map[String,String] object output of a Scala UDF (
scala.collection.immutable.map) as a valid data type in the Table API,
namely via Java type (java.util.Map) as recommended here

,

However I get below error.

Any idea about the right way to proceed ?
If yes, is there a way to generalize the conversion to a (nested) Scala
object of type Map[String,Any] ?


*Code*
*Scala UDF*

class dummyMap() extends ScalarFunction {
  def eval() = {
val whatevermap = Map("key1" -> "val1", "key2" -> "val2")
whatevermap.asInstanceOf[java.util.Map[java.lang.String,java.lang.String]]
  }
}

*Sink*

my_sink_ddl = f"""
create table mySink (
output_of_dummyMap_udf MAP
) with (
...
)

*Error*

Py4JJavaError: An error occurred while calling o430.execute.
: org.apache.flink.table.api.ValidationException: Field types of query
result and registered TableSink
`default_catalog`.`default_database`.`mySink` do not match.
Query result schema: [output_of_my_scala_udf: GenericType]
TableSink schema:[output_of_my_scala_udf: Map]

Thanks for your support !

Best regards,

-- 
Pierre