There is some recent work to improve unnest <https://github.com/apache/beam/pull/12843> that went into Beam 2.25.0+, it might cover your use case. It looks like we have no support for the Collect operator, which is your problem here. I verified 'SELECT ARRAY(SELECT f_int FROM PCOLLECTION)' doesn't work and filed https://issues.apache.org/jira/browse/BEAM-11872
For the UDF side of things, we haven't put much work into making nested rows work well with UDFs. WrappedRow is intended to be an internal wrapper for BeamCalcRel, we should probably be passing a schema Row, which gives you access to fields by name. I filed a JIRA for this: https://issues.apache.org/jira/browse/BEAM-11871 Andrew On Wed, Feb 24, 2021 at 10:33 PM Zhiheng Huang <[email protected]> wrote: > Hi beam users, > > We have a use case where we have a schema such as: > > Schema.of( > Field.of("array_of_nested_rows", > FieldType.array(FieldType.row( > Schema.of(Field.of("row_field1", FieldType.INT32))))), > Field.of("otherScalarField", FieldType.STRING) > ) > > We would like to select "array_of_nested_rows.row_field1" as a list of > ints together with "otherScalarField" as the output. For example, in > BigQuery we can achieve this with: > > SELECT > otherScalarField, > ARRAY(SELECT row_field1 FROM UNNEST(array_of_nested_rows) > FROM > table > > Trying this query with beam SQL yields: > > Unable to convert query select array(select score from > UNNEST(Yt8mAnnotation)) from PCOLLECTION > org.apache.beam.sdk.extensions.sql.impl.SqlConversionException: Unable to > convert query select array(select score from UNNEST(Yt8mAnnotation)) from > PCOLLECTION > at > org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner.convertToBeamRel(CalciteQueryPlanner.java:181) > at > org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv.parseQuery(BeamSqlEnv.java:109) > at > org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:135) > at > org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:86) > ... > > Caused by: > org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptPlanner$CannotPlanException: > There are not enough rules to produce a node with desired properties: > convention=BEAM_LOGICAL. > Missing conversion is LogicalCorrelate[convention: NONE -> BEAM_LOGICAL] > There is 1 empty subset: rel#63:Subset#6.BEAM_LOGICAL, the relevant part > of the original plan is as follows > 56:LogicalCorrelate(correlation=[$cor0], joinType=[inner], > requiredColumns=[{0}]) > 8:BeamIOSourceRel(subset=[rel#46:Subset#0.BEAM_LOGICAL], table=[[beam, > PCOLLECTION]]) > 54:Collect(subset=[rel#55:Subset#5.NONE], field=[EXPR$0]) > 52:LogicalProject(subset=[rel#53:Subset#4.NONE], score=[$2]) > 50:Uncollect(subset=[rel#51:Subset#3.NONE]) > ... > > We have also tried to define a UDF that takes in array_of_nested_rows. > This doesn't work out either because the input param passed into the UDF > eval function is a list of WrappedRow > <https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java#L642>, > which doesn't allow us to query field value with its name. It only supports > getting the field value given an index. This is useless for us since we do > not know how to get the row field schema in the eval function. > > Do you have any suggestions about how to achieve this? We are using beam > 2.22.0 > > Thanks a lot! >
