Hi beam users,

We have a use case where we have a schema such as:

Schema.of(
    Field.of("array_of_nested_rows",
             FieldType.array(FieldType.row(
                 Schema.of(Field.of("row_field1", FieldType.INT32))))),
    Field.of("otherScalarField", FieldType.STRING)
)

We would like to select "array_of_nested_rows.row_field1" as a list of ints
together with "otherScalarField" as the output. For example, in BigQuery we
can achieve this with:

SELECT
  otherScalarField,
  ARRAY(SELECT row_field1 FROM UNNEST(array_of_nested_rows)
FROM
  table

Trying this query with beam SQL yields:

Unable to convert query select array(select score from
UNNEST(Yt8mAnnotation)) from PCOLLECTION
org.apache.beam.sdk.extensions.sql.impl.SqlConversionException: Unable to
convert query select array(select score from UNNEST(Yt8mAnnotation)) from
PCOLLECTION
at
org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner.convertToBeamRel(CalciteQueryPlanner.java:181)
at
org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv.parseQuery(BeamSqlEnv.java:109)
at
org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:135)
at
org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:86)
...

Caused by:
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptPlanner$CannotPlanException:
There are not enough rules to produce a node with desired properties:
convention=BEAM_LOGICAL.
Missing conversion is LogicalCorrelate[convention: NONE -> BEAM_LOGICAL]
There is 1 empty subset: rel#63:Subset#6.BEAM_LOGICAL, the relevant part of
the original plan is as follows
56:LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{0}])
  8:BeamIOSourceRel(subset=[rel#46:Subset#0.BEAM_LOGICAL], table=[[beam,
PCOLLECTION]])
  54:Collect(subset=[rel#55:Subset#5.NONE], field=[EXPR$0])
    52:LogicalProject(subset=[rel#53:Subset#4.NONE], score=[$2])
      50:Uncollect(subset=[rel#51:Subset#3.NONE])
...

We have also tried to define a UDF that takes in array_of_nested_rows. This
doesn't work out either because the input param passed into the UDF eval
function is a list of WrappedRow
<https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java#L642>,
which doesn't allow us to query field value with its name. It only supports
getting the field value given an index. This is useless for us since we do
not know how to get the row field schema in the eval function.

 Do you have any suggestions about how to achieve this? We are using beam
2.22.0

Thanks a lot!

Reply via email to