Hi Torben,

It is indeed a bug, and I have created a JIRA[1]. The work around solution
is to use the index to solve (written in release-1.12):

    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)
    t_env = StreamTableEnvironment.create(env,

environment_settings=EnvironmentSettings.new_instance()

.in_streaming_mode().use_blink_planner().build())

    @udf(result_type=DataTypes.STRING())
    def get_string_element(my_list):
        my_string = 'xxx'
        for element in my_list:
            if element[0] == 2:
                my_string = element[1]
        return my_string

    t = t_env.from_elements(
        [("1", [Row(3, "flink")]), ("3", [Row(2, "pyflink")]), ("2",
[Row(2, "python")])],
        DataTypes.ROW(
            [DataTypes.FIELD("Key", DataTypes.STRING()),
             DataTypes.FIELD("List_element",
                             DataTypes.ARRAY(DataTypes.ROW(
                                 [DataTypes.FIELD("integer_element",
DataTypes.INT()),
                                  DataTypes.FIELD("string_element",
DataTypes.STRING())])))]))
    print(t.select(get_string_element(t.List_element)).to_pandas())



[1] https://issues.apache.org/jira/browse/FLINK-20666

Best,
Xingbo

Barth, Torben <t.ba...@fraport.de> 于2020年12月18日周五 上午2:46写道:

> Dear List,
>
>
>
> I have  a table with the following structure
>
>
>
> my_table
>
> -- Key: String
>
> -- List_element: ARRAY<ROW<`integer_element` INT, `string_element` STRING
> >>
>
>
>
> I want to define a udf to extract information of the “list_element”. I do
> not manage to access the information of the array in the udf. I try
> something like:
>
>
>
> @udf(result_type=DataTypes.STRING())
>
> def get_string_element(my_list):
>
>        my_string = ‘xxx’
>
>         for element in my_list:
>
>             if element.integer_element  == 2:
>
>                 my_string = element. string_element
>
>         return my_string
>
>
>
>
>
> table_env.create_temporary_function("get_string_element",
> get_string_element)
>
> *# use the function in Python Table API*
>
> my_table.select("get_string_element(List_element)")
>
>
>
> Unfortunately, I cannot get it work. Does anybody have an idea how the
> correct way to extract the information is?
>
>
>
> Any comments or ideas are very welcome.
>
>
>
> Thanks
>
> Torben
>
> Fraport AG Frankfurt Airport Services Worldwide, 60547 Frankfurt am Main,
> Sitz der Gesellschaft: Frankfurt am Main, Amtsgericht Frankfurt am Main:
> HRB 7042, Umsatzsteuer-Identifikationsnummer: DE 114150623, Vorsitzender
> des Aufsichtsrates: Michael Boddenberg – Hessischer Minister der Finanzen;
> Vorstand: Dr. Stefan Schulte (Vorsitzender), Anke Giesen, Michael Müller,
> Dr. Pierre Dominique Prümm, Dr. Matthias Zieschang
>

Reply via email to