I see. Thanks for the info! On Thu, Feb 25, 2021 at 6:11 PM Andrew Pilloud <[email protected]> wrote:
> There is some recent work to improve unnest > <https://github.com/apache/beam/pull/12843> that went into Beam 2.25.0+, > it might cover your use case. It looks like we have no support for > the Collect operator, which is your problem here. I verified 'SELECT > ARRAY(SELECT f_int FROM PCOLLECTION)' doesn't work and filed > https://issues.apache.org/jira/browse/BEAM-11872 > > For the UDF side of things, we haven't put much work into making nested > rows work well with UDFs. WrappedRow is intended to be an internal wrapper > for BeamCalcRel, we should probably be passing a schema Row, which gives > you access to fields by name. I filed a JIRA for this: > https://issues.apache.org/jira/browse/BEAM-11871 > > Andrew > > On Wed, Feb 24, 2021 at 10:33 PM Zhiheng Huang <[email protected]> > wrote: > >> Hi beam users, >> >> We have a use case where we have a schema such as: >> >> Schema.of( >> Field.of("array_of_nested_rows", >> FieldType.array(FieldType.row( >> Schema.of(Field.of("row_field1", FieldType.INT32))))), >> Field.of("otherScalarField", FieldType.STRING) >> ) >> >> We would like to select "array_of_nested_rows.row_field1" as a list of >> ints together with "otherScalarField" as the output. For example, in >> BigQuery we can achieve this with: >> >> SELECT >> otherScalarField, >> ARRAY(SELECT row_field1 FROM UNNEST(array_of_nested_rows) >> FROM >> table >> >> Trying this query with beam SQL yields: >> >> Unable to convert query select array(select score from >> UNNEST(Yt8mAnnotation)) from PCOLLECTION >> org.apache.beam.sdk.extensions.sql.impl.SqlConversionException: Unable to >> convert query select array(select score from UNNEST(Yt8mAnnotation)) from >> PCOLLECTION >> at >> org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner.convertToBeamRel(CalciteQueryPlanner.java:181) >> at >> org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv.parseQuery(BeamSqlEnv.java:109) >> at >> org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:135) >> at >> org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:86) >> ... >> >> Caused by: >> org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptPlanner$CannotPlanException: >> There are not enough rules to produce a node with desired properties: >> convention=BEAM_LOGICAL. >> Missing conversion is LogicalCorrelate[convention: NONE -> BEAM_LOGICAL] >> There is 1 empty subset: rel#63:Subset#6.BEAM_LOGICAL, the relevant part >> of the original plan is as follows >> 56:LogicalCorrelate(correlation=[$cor0], joinType=[inner], >> requiredColumns=[{0}]) >> 8:BeamIOSourceRel(subset=[rel#46:Subset#0.BEAM_LOGICAL], table=[[beam, >> PCOLLECTION]]) >> 54:Collect(subset=[rel#55:Subset#5.NONE], field=[EXPR$0]) >> 52:LogicalProject(subset=[rel#53:Subset#4.NONE], score=[$2]) >> 50:Uncollect(subset=[rel#51:Subset#3.NONE]) >> ... >> >> We have also tried to define a UDF that takes in array_of_nested_rows. >> This doesn't work out either because the input param passed into the UDF >> eval function is a list of WrappedRow >> <https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java#L642>, >> which doesn't allow us to query field value with its name. It only supports >> getting the field value given an index. This is useless for us since we do >> not know how to get the row field schema in the eval function. >> >> Do you have any suggestions about how to achieve this? We are using beam >> 2.22.0 >> >> Thanks a lot! >> > -- Sylvon Huang
