Aha, yes this indeed another bug in the transform. The schema is set on the top-level Row but not on any nested rows.
On Tue, Jun 1, 2021 at 6:37 PM Matthew Ouyang <matthew.ouy...@gmail.com> wrote: > Thank you everyone for your input. I believe it will be easiest to > respond to all feedback in a single message rather than messages per person. > > - NeedsRunner - The tests are run eventually, so obviously all good on > my end. I was trying to run the smallest subset of test cases possible and > didn't venture beyond `gradle test`. > - Stack Trace - There wasn't any unfortunately because no exception > thrown in the code. The Beam Row was translated into a BQ TableRow and an > insertion was attempted. The error "message" was part of the response JSON > that came back as a result of a request against the BQ API. > - Desired Behaviour - (field0_1.field1_0, nestedStringField) -> > field0_1.nestedStringField is what I am looking for. > - Info Logging Findings (In Lieu of a Stack Trace) > - The Beam Schema was as expected with all renames applied. > - The example I provided was heavily stripped down in order to > isolate the problem. My work example which a bit impractical because > it's > part of some generic tooling has 4 levels of nesting and also produces > the > correct output too. > - BigQueryUtils.toTableRow(Row) returns the expected TableRow in > DirectRunner. In DataflowRunner however, only the top-level renames > were > reflected in the TableRow and all renames in the nested fields weren't. > - BigQueryUtils.toTableRow(Row) recurses on the Row values and uses > the Row.schema to get the field names. This makes sense to me, but if a > value is actually a Row then its schema appears to be inconsistent with > the > top-level schema > - My Current Workaround - I forked RenameFields and replaced the > attachValues in expand method to be a "deep" rename. This is obviously > inefficient and I will not be submitting a PR for that. > - JIRA ticket - https://issues.apache.org/jira/browse/BEAM-12442 > > > On Tue, Jun 1, 2021 at 5:51 PM Reuven Lax <re...@google.com> wrote: > >> This transform is the same across all runners. A few comments on the test: >> >> - Using attachValues directly is error prone (per the comment on the >> method). I recommend using the withFieldValue builders instead. >> - I recommend capturing the RenameFields PCollection into a local >> variable of type PCollection<Row> and printing out the schema (which you >> can get using the PCollection.getSchema method) to ensure that the output >> schema looks like you expect. >> - RenameFields doesn't flatten. So renaming field0_1.field1_0 - > >> nestedStringField results in field0_1.nestedStringField; if you wanted to >> flatten, then the better transform would be >> Select.fieldNameAs("field0_1.field1_0", nestedStringField). >> >> This all being said, eyeballing the implementation of RenameFields makes >> me think that it is buggy in the case where you specify a top-level field >> multiple times like you do. I think it is simply adding the top-level field >> into the output schema multiple times, and the second time is with the >> field0_1 base name; I have no idea why your test doesn't catch this in the >> DirectRunner, as it's equally broken there. Could you file a JIRA about >> this issue and assign it to me? >> >> Reuven >> >> On Tue, Jun 1, 2021 at 12:47 PM Kenneth Knowles <k...@apache.org> wrote: >> >>> >>> >>> On Tue, Jun 1, 2021 at 12:42 PM Brian Hulette <bhule...@google.com> >>> wrote: >>> >>>> Hi Matthew, >>>> >>>> > The unit tests also seem to be disabled for this as well and so I >>>> don’t know if the PTransform behaves as expected. >>>> >>>> The exclusion for NeedsRunner tests is just a quirk in our testing >>>> framework. NeedsRunner indicates that a test suite can't be executed with >>>> the SDK alone, it needs a runner. So that exclusion just makes sure we >>>> don't run the test when we're verifying the SDK by itself in the >>>> :sdks:java:core:test task. The test is still run in other tasks where we >>>> have a runner, most notably in the Java PreCommit [1], where we run it as >>>> part of the :runners:direct-java:test task. >>>> >>>> That being said, we may only run these tests continuously with the >>>> DirectRunner, I'm not sure if we test them on all the runners like we do >>>> with ValidatesRunner tests. >>>> >>> >>> That is correct. The tests are tests _of the transform_ so they run only >>> on the DirectRunner. They are not tests of the runner, which is only >>> responsible for correctly implementing Beam's primitives. The transform >>> should not behave differently on different runners, except for fundamental >>> differences in how they schedule work and checkpoint. >>> >>> Kenn >>> >>> >>>> > The error message I’m receiving, : Error while reading data, error >>>> message: JSON parsing error in row starting at position 0: No such field: >>>> nestedField.field1_0, suggests the BigQuery is trying to use the >>>> original name for the nested field and not the substitute name. >>>> >>>> Is there a stacktrace associated with this error? It would be helpful >>>> to see where the error is coming from. >>>> >>>> Brian >>>> >>>> >>>> [1] >>>> https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/4101/testReport/org.apache.beam.sdk.schemas.transforms/RenameFieldsTest/ >>>> >>>> On Mon, May 31, 2021 at 5:02 PM Matthew Ouyang < >>>> matthew.ouy...@gmail.com> wrote: >>>> >>>>> I’m trying to use the RenameFields transform prior to inserting into >>>>> BigQuery on nested fields. Insertion into BigQuery is successful with >>>>> DirectRunner, but DataflowRunner has an issue with renamed nested fields >>>>> The error message I’m receiving, : Error while reading data, error >>>>> message: JSON parsing error in row starting at position 0: No such field: >>>>> nestedField.field1_0, suggests the BigQuery is trying to use the >>>>> original name for the nested field and not the substitute name. >>>>> >>>>> The code for RenameFields seems simple enough but does it behave >>>>> differently in different runners? Will a deep attachValues be necessary >>>>> in >>>>> order get the nested renames to work across all runners? Is there >>>>> something >>>>> wrong in my code? >>>>> >>>>> >>>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/RenameFields.java#L186 >>>>> >>>>> The unit tests also seem to be disabled for this as well and so I >>>>> don’t know if the PTransform behaves as expected. >>>>> >>>>> >>>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/build.gradle#L67 >>>>> >>>>> >>>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/RenameFieldsTest.java >>>>> >>>>> package ca.loblaw.cerebro.PipelineControl; >>>>>> >>>>>> import com.google.api.services.bigquery.model.TableReference; >>>>>> import >>>>>> org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; >>>>>> import org.apache.beam.sdk.Pipeline; >>>>>> import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; >>>>>> import org.apache.beam.sdk.options.PipelineOptionsFactory; >>>>>> import org.apache.beam.sdk.schemas.Schema; >>>>>> import org.apache.beam.sdk.schemas.transforms.RenameFields; >>>>>> import org.apache.beam.sdk.transforms.Create; >>>>>> import org.apache.beam.sdk.values.Row; >>>>>> >>>>>> import java.io.File; >>>>>> import java.util.Arrays; >>>>>> import java.util.HashSet; >>>>>> import java.util.stream.Collectors; >>>>>> >>>>>> import static java.util.Arrays.*asList*; >>>>>> >>>>>> public class BQRenameFields { >>>>>> public static void main(String[] args) { >>>>>> PipelineOptionsFactory.*register*(DataflowPipelineOptions. >>>>>> class); >>>>>> DataflowPipelineOptions options = PipelineOptionsFactory. >>>>>> *fromArgs*(args).as(DataflowPipelineOptions.class); >>>>>> options.setFilesToStage( >>>>>> Arrays.*stream*(System.*getProperty*( >>>>>> "java.class.path"). >>>>>> split(File.*pathSeparator*)). >>>>>> map(entry -> (new >>>>>> File(entry)).toString()).collect(Collectors.*toList*())); >>>>>> >>>>>> Pipeline pipeline = Pipeline.*create*(options); >>>>>> >>>>>> Schema nestedSchema = Schema.*builder*().addField(Schema. >>>>>> Field.*nullable*("field1_0", Schema.FieldType.*STRING*)).build(); >>>>>> Schema.Field field = Schema.Field.*nullable*("field0_0", >>>>>> Schema.FieldType.*STRING*); >>>>>> Schema.Field nested = Schema.Field.*nullable*("field0_1", >>>>>> Schema.FieldType.*row*(nestedSchema)); >>>>>> Schema.Field runner = Schema.Field.*nullable*("field0_2", >>>>>> Schema.FieldType.*STRING*); >>>>>> Schema rowSchema = Schema.*builder*() >>>>>> .addFields(field, nested, runner) >>>>>> .build(); >>>>>> Row testRow = Row.*withSchema*(rowSchema).attachValues( >>>>>> "value0_0", Row.*withSchema*(nestedSchema).attachValues("value1_0"), >>>>>> options.getRunner().toString()); >>>>>> pipeline >>>>>> .apply(Create.*of*(testRow).withRowSchema(rowSchema)) >>>>>> .apply(RenameFields.<Row>*create*() >>>>>> .rename("field0_0", "stringField") >>>>>> .rename("field0_1", "nestedField") >>>>>> .rename("field0_1.field1_0", >>>>>> "nestedStringField") >>>>>> .rename("field0_2", "runner")) >>>>>> .apply(BigQueryIO.<Row>*write*() >>>>>> .to(new TableReference().setProjectId( >>>>>> "lt-dia-lake-exp-raw").setDatasetId("prototypes").setTableId( >>>>>> "matto_renameFields")) >>>>>> .withCreateDisposition(BigQueryIO.Write. >>>>>> CreateDisposition.*CREATE_IF_NEEDED*) >>>>>> .withWriteDisposition(BigQueryIO.Write. >>>>>> WriteDisposition.*WRITE_APPEND*) >>>>>> .withSchemaUpdateOptions(new HashSet<>( >>>>>> *asList*(BigQueryIO.Write.SchemaUpdateOption.*ALLOW_FIELD_ADDITION* >>>>>> ))) >>>>>> .useBeamSchema()); >>>>>> pipeline.run(); >>>>>> } >>>>>> } >>>>>> >>>>>