Thank you, David, that's what I had been looking for! On Sun, Feb 20, 2022 at 3:52 AM David Anderson <dander...@apache.org> wrote:
> Matthias, > > You can use a CROSS JOIN UNNEST, as mentioned very briefly in the docs [1]. > > Something like this should work: > > SELECT > id, customerid, productid, quantity, ... > FROM > orders > CROSS JOIN UNNEST(entries) AS items (productid, quantity, unit_price, > discount); > > [1] > https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/joins/#array-expansion > > Regards, > David > > On Sun, Feb 20, 2022 at 2:25 AM Matthias Broecheler <matth...@dataeng.ai> > wrote: > >> Hey Flinksters, >> >> I'm reading a nested JSON object into a table and would like to access >> the nested rows inside an array. Is there a way to flatten them so that I >> get a table with the nested rows? >> >> So far, I've only been able to figure out how to access a specific >> element inside the array using the "at" method but I'm trying to flatten >> the nested rows into a table and the arrays can have variable length. Below >> is a code snippet of what I have thus far but notice how I'm only accessing >> the first element in each array. >> >> How do you do this in Flink? Apologies if this is obvious - I wasn't able >> to find an example or documentation and would appreciate any help. >> >> Thank you, >> Matthias >> >> --------------- >> >> StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); >> >> >> TableDescriptor jsonTable = TableDescriptor.forConnector("filesystem") >> .schema(Schema.newBuilder() >> .column("id", DataTypes.INT()) >> .column("customerid", DataTypes.INT()) >> .column("time", DataTypes.INT()) >> .column("entries", DataTypes.ARRAY(DataTypes.ROW( >> DataTypes.FIELD("productid", DataTypes.INT()), >> DataTypes.FIELD("quantity", DataTypes.INT()), >> DataTypes.FIELD("unit_price", >> DataTypes.DECIMAL(9,3)), >> DataTypes.FIELD("discount", DataTypes.DECIMAL(9,3)) >> ))) >> .build()) >> .option("path", C360Test.RETAIL_DATA_DIR.toAbsolutePath() + >> "/orders.json") >> .format("json") >> .build(); >> >> >> >> tEnv.createTable("Orders",jsonTable); >> Table orders = tEnv.from("Orders"); >> >> Table flattenEntries = >> orders.select($("entries").at(1).get("quantity").sum().as("totalquant")); >> >>