Thank you, David, that's what I had been looking for!

On Sun, Feb 20, 2022 at 3:52 AM David Anderson <dander...@apache.org> wrote:

> Matthias,
>
> You can use a CROSS JOIN UNNEST, as mentioned very briefly in the docs [1].
>
> Something like this should work:
>
> SELECT
>   id, customerid, productid, quantity, ...
> FROM
>   orders
> CROSS JOIN UNNEST(entries) AS items (productid, quantity, unit_price,
> discount);
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/joins/#array-expansion
>
> Regards,
> David
>
> On Sun, Feb 20, 2022 at 2:25 AM Matthias Broecheler <matth...@dataeng.ai>
> wrote:
>
>> Hey Flinksters,
>>
>> I'm reading a nested JSON object into a table and would like to access
>> the nested rows inside an array. Is there a way to flatten them so that I
>> get a table with the nested rows?
>>
>> So far, I've only been able to figure out how to access a specific
>> element inside the array using the "at" method but I'm trying to flatten
>> the nested rows into a table and the arrays can have variable length. Below
>> is a code snippet of what I have thus far but notice how I'm only accessing
>> the first element in each array.
>>
>> How do you do this in Flink? Apologies if this is obvious - I wasn't able
>> to find an example or documentation and would appreciate any help.
>>
>> Thank you,
>> Matthias
>>
>> ---------------
>>
>> StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
>>
>>
>> TableDescriptor jsonTable = TableDescriptor.forConnector("filesystem")
>>         .schema(Schema.newBuilder()
>>                 .column("id", DataTypes.INT())
>>                 .column("customerid", DataTypes.INT())
>>                 .column("time", DataTypes.INT())
>>                 .column("entries", DataTypes.ARRAY(DataTypes.ROW(
>>                         DataTypes.FIELD("productid", DataTypes.INT()),
>>                         DataTypes.FIELD("quantity", DataTypes.INT()),
>>                         DataTypes.FIELD("unit_price", 
>> DataTypes.DECIMAL(9,3)),
>>                         DataTypes.FIELD("discount", DataTypes.DECIMAL(9,3))
>>                         )))
>>                 .build())
>>         .option("path", C360Test.RETAIL_DATA_DIR.toAbsolutePath() + 
>> "/orders.json")
>>         .format("json")
>>         .build();
>>
>>
>>
>> tEnv.createTable("Orders",jsonTable);
>> Table orders = tEnv.from("Orders");
>>
>> Table flattenEntries = 
>> orders.select($("entries").at(1).get("quantity").sum().as("totalquant"));
>>
>>

Reply via email to