Hi Torben,
It is indeed a bug, and I have created a JIRA[1]. The work around solution
is to use the index to solve (written in release-1.12):
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env,
environment_settings=EnvironmentSettings.new_instance()
.in_streaming_mode().use_blink_planner().build())
@udf(result_type=DataTypes.STRING())
def get_string_element(my_list):
my_string = 'xxx'
for element in my_list:
if element[0] == 2:
my_string = element[1]
return my_string
t = t_env.from_elements(
[("1", [Row(3, "flink")]), ("3", [Row(2, "pyflink")]), ("2",
[Row(2, "python")])],
DataTypes.ROW(
[DataTypes.FIELD("Key", DataTypes.STRING()),
DataTypes.FIELD("List_element",
DataTypes.ARRAY(DataTypes.ROW(
[DataTypes.FIELD("integer_element",
DataTypes.INT()),
DataTypes.FIELD("string_element",
DataTypes.STRING())])))]))
print(t.select(get_string_element(t.List_element)).to_pandas())
[1] https://issues.apache.org/jira/browse/FLINK-20666
Best,
Xingbo
Barth, Torben 于2020年12月18日周五 上午2:46写道:
> Dear List,
>
>
>
> I have a table with the following structure
>
>
>
> my_table
>
> -- Key: String
>
> -- List_element: ARRAY >>
>
>
>
> I want to define a udf to extract information of the “list_element”. I do
> not manage to access the information of the array in the udf. I try
> something like:
>
>
>
> @udf(result_type=DataTypes.STRING())
>
> def get_string_element(my_list):
>
>my_string = ‘xxx’
>
> for element in my_list:
>
> if element.integer_element == 2:
>
> my_string = element. string_element
>
> return my_string
>
>
>
>
>
> table_env.create_temporary_function("get_string_element",
> get_string_element)
>
> *# use the function in Python Table API*
>
> my_table.select("get_string_element(List_element)")
>
>
>
> Unfortunately, I cannot get it work. Does anybody have an idea how the
> correct way to extract the information is?
>
>
>
> Any comments or ideas are very welcome.
>
>
>
> Thanks
>
> Torben
>
> Fraport AG Frankfurt Airport Services Worldwide, 60547 Frankfurt am Main,
> Sitz der Gesellschaft: Frankfurt am Main, Amtsgericht Frankfurt am Main:
> HRB 7042, Umsatzsteuer-Identifikationsnummer: DE 114150623, Vorsitzender
> des Aufsichtsrates: Michael Boddenberg – Hessischer Minister der Finanzen;
> Vorstand: Dr. Stefan Schulte (Vorsitzender), Anke Giesen, Michael Müller,
> Dr. Pierre Dominique Prümm, Dr. Matthias Zieschang
>