Re: Pyflink UDF with ARRAY as input

2020-12-17 Thread Xingbo Huang
Hi Torben,

It is indeed a bug, and I have created a JIRA[1]. The work around solution
is to use the index to solve (written in release-1.12):

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env,

environment_settings=EnvironmentSettings.new_instance()

.in_streaming_mode().use_blink_planner().build())

@udf(result_type=DataTypes.STRING())
def get_string_element(my_list):
my_string = 'xxx'
for element in my_list:
if element[0] == 2:
my_string = element[1]
return my_string

t = t_env.from_elements(
[("1", [Row(3, "flink")]), ("3", [Row(2, "pyflink")]), ("2",
[Row(2, "python")])],
DataTypes.ROW(
[DataTypes.FIELD("Key", DataTypes.STRING()),
 DataTypes.FIELD("List_element",
 DataTypes.ARRAY(DataTypes.ROW(
 [DataTypes.FIELD("integer_element",
DataTypes.INT()),
  DataTypes.FIELD("string_element",
DataTypes.STRING())])))]))
print(t.select(get_string_element(t.List_element)).to_pandas())



[1] https://issues.apache.org/jira/browse/FLINK-20666

Best,
Xingbo

Barth, Torben  于2020年12月18日周五 上午2:46写道:

> Dear List,
>
>
>
> I have  a table with the following structure
>
>
>
> my_table
>
> -- Key: String
>
> -- List_element: ARRAY >>
>
>
>
> I want to define a udf to extract information of the “list_element”. I do
> not manage to access the information of the array in the udf. I try
> something like:
>
>
>
> @udf(result_type=DataTypes.STRING())
>
> def get_string_element(my_list):
>
>my_string = ‘xxx’
>
> for element in my_list:
>
> if element.integer_element  == 2:
>
> my_string = element. string_element
>
> return my_string
>
>
>
>
>
> table_env.create_temporary_function("get_string_element",
> get_string_element)
>
> *# use the function in Python Table API*
>
> my_table.select("get_string_element(List_element)")
>
>
>
> Unfortunately, I cannot get it work. Does anybody have an idea how the
> correct way to extract the information is?
>
>
>
> Any comments or ideas are very welcome.
>
>
>
> Thanks
>
> Torben
>
> Fraport AG Frankfurt Airport Services Worldwide, 60547 Frankfurt am Main,
> Sitz der Gesellschaft: Frankfurt am Main, Amtsgericht Frankfurt am Main:
> HRB 7042, Umsatzsteuer-Identifikationsnummer: DE 114150623, Vorsitzender
> des Aufsichtsrates: Michael Boddenberg – Hessischer Minister der Finanzen;
> Vorstand: Dr. Stefan Schulte (Vorsitzender), Anke Giesen, Michael Müller,
> Dr. Pierre Dominique Prümm, Dr. Matthias Zieschang
>


Pyflink UDF with ARRAY as input

2020-12-17 Thread Barth, Torben
Dear List,

I have  a table with the following structure

my_table
-- Key: String
-- List_element: ARRAY>

I want to define a udf to extract information of the “list_element”. I do not 
manage to access the information of the array in the udf. I try something like:

@udf(result_type=DataTypes.STRING())
def get_string_element(my_list):
   my_string = ‘xxx’
for element in my_list:
if element.integer_element  == 2:
my_string = element. string_element
return my_string


table_env.create_temporary_function("get_string_element", get_string_element)
# use the function in Python Table API
my_table.select("get_string_element(List_element)")

Unfortunately, I cannot get it work. Does anybody have an idea how the correct 
way to extract the information is?

Any comments or ideas are very welcome.

Thanks
Torben

Fraport AG Frankfurt Airport Services Worldwide, 60547 Frankfurt am Main, Sitz 
der Gesellschaft: Frankfurt am Main, Amtsgericht Frankfurt am Main: HRB 7042, 
Umsatzsteuer-Identifikationsnummer: DE 114150623, Vorsitzender des 
Aufsichtsrates: Michael Boddenberg – Hessischer Minister der Finanzen; 
Vorstand: Dr. Stefan Schulte (Vorsitzender), Anke Giesen, Michael Müller, Dr. 
Pierre Dominique Prümm, Dr. Matthias Zieschang