Thank you everyone for your input. I believe it will be easiest to respond to all feedback in a single message rather than messages per person.
- NeedsRunner - The tests are run eventually, so obviously all good on my end. I was trying to run the smallest subset of test cases possible and didn't venture beyond `gradle test`. - Stack Trace - There wasn't any unfortunately because no exception thrown in the code. The Beam Row was translated into a BQ TableRow and an insertion was attempted. The error "message" was part of the response JSON that came back as a result of a request against the BQ API. - Desired Behaviour - (field0_1.field1_0, nestedStringField) -> field0_1.nestedStringField is what I am looking for. - Info Logging Findings (In Lieu of a Stack Trace) - The Beam Schema was as expected with all renames applied. - The example I provided was heavily stripped down in order to isolate the problem. My work example which a bit impractical because it's part of some generic tooling has 4 levels of nesting and also produces the correct output too. - BigQueryUtils.toTableRow(Row) returns the expected TableRow in DirectRunner. In DataflowRunner however, only the top-level renames were reflected in the TableRow and all renames in the nested fields weren't. - BigQueryUtils.toTableRow(Row) recurses on the Row values and uses the Row.schema to get the field names. This makes sense to me, but if a value is actually a Row then its schema appears to be inconsistent with the top-level schema - My Current Workaround - I forked RenameFields and replaced the attachValues in expand method to be a "deep" rename. This is obviously inefficient and I will not be submitting a PR for that. - JIRA ticket - https://issues.apache.org/jira/browse/BEAM-12442 On Tue, Jun 1, 2021 at 5:51 PM Reuven Lax <re...@google.com> wrote: > This transform is the same across all runners. A few comments on the test: > > - Using attachValues directly is error prone (per the comment on the > method). I recommend using the withFieldValue builders instead. > - I recommend capturing the RenameFields PCollection into a local > variable of type PCollection<Row> and printing out the schema (which you > can get using the PCollection.getSchema method) to ensure that the output > schema looks like you expect. > - RenameFields doesn't flatten. So renaming field0_1.field1_0 - > > nestedStringField results in field0_1.nestedStringField; if you wanted to > flatten, then the better transform would be > Select.fieldNameAs("field0_1.field1_0", nestedStringField). > > This all being said, eyeballing the implementation of RenameFields makes > me think that it is buggy in the case where you specify a top-level field > multiple times like you do. I think it is simply adding the top-level field > into the output schema multiple times, and the second time is with the > field0_1 base name; I have no idea why your test doesn't catch this in the > DirectRunner, as it's equally broken there. Could you file a JIRA about > this issue and assign it to me? > > Reuven > > On Tue, Jun 1, 2021 at 12:47 PM Kenneth Knowles <k...@apache.org> wrote: > >> >> >> On Tue, Jun 1, 2021 at 12:42 PM Brian Hulette <bhule...@google.com> >> wrote: >> >>> Hi Matthew, >>> >>> > The unit tests also seem to be disabled for this as well and so I >>> don’t know if the PTransform behaves as expected. >>> >>> The exclusion for NeedsRunner tests is just a quirk in our testing >>> framework. NeedsRunner indicates that a test suite can't be executed with >>> the SDK alone, it needs a runner. So that exclusion just makes sure we >>> don't run the test when we're verifying the SDK by itself in the >>> :sdks:java:core:test task. The test is still run in other tasks where we >>> have a runner, most notably in the Java PreCommit [1], where we run it as >>> part of the :runners:direct-java:test task. >>> >>> That being said, we may only run these tests continuously with the >>> DirectRunner, I'm not sure if we test them on all the runners like we do >>> with ValidatesRunner tests. >>> >> >> That is correct. The tests are tests _of the transform_ so they run only >> on the DirectRunner. They are not tests of the runner, which is only >> responsible for correctly implementing Beam's primitives. The transform >> should not behave differently on different runners, except for fundamental >> differences in how they schedule work and checkpoint. >> >> Kenn >> >> >>> > The error message I’m receiving, : Error while reading data, error >>> message: JSON parsing error in row starting at position 0: No such field: >>> nestedField.field1_0, suggests the BigQuery is trying to use the >>> original name for the nested field and not the substitute name. >>> >>> Is there a stacktrace associated with this error? It would be helpful to >>> see where the error is coming from. >>> >>> Brian >>> >>> >>> [1] >>> https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/4101/testReport/org.apache.beam.sdk.schemas.transforms/RenameFieldsTest/ >>> >>> On Mon, May 31, 2021 at 5:02 PM Matthew Ouyang <matthew.ouy...@gmail.com> >>> wrote: >>> >>>> I’m trying to use the RenameFields transform prior to inserting into >>>> BigQuery on nested fields. Insertion into BigQuery is successful with >>>> DirectRunner, but DataflowRunner has an issue with renamed nested fields >>>> The error message I’m receiving, : Error while reading data, error >>>> message: JSON parsing error in row starting at position 0: No such field: >>>> nestedField.field1_0, suggests the BigQuery is trying to use the >>>> original name for the nested field and not the substitute name. >>>> >>>> The code for RenameFields seems simple enough but does it behave >>>> differently in different runners? Will a deep attachValues be necessary in >>>> order get the nested renames to work across all runners? Is there something >>>> wrong in my code? >>>> >>>> >>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/RenameFields.java#L186 >>>> >>>> The unit tests also seem to be disabled for this as well and so I don’t >>>> know if the PTransform behaves as expected. >>>> >>>> >>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/build.gradle#L67 >>>> >>>> >>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/RenameFieldsTest.java >>>> >>>> package ca.loblaw.cerebro.PipelineControl; >>>>> >>>>> import com.google.api.services.bigquery.model.TableReference; >>>>> import >>>>> org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; >>>>> import org.apache.beam.sdk.Pipeline; >>>>> import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; >>>>> import org.apache.beam.sdk.options.PipelineOptionsFactory; >>>>> import org.apache.beam.sdk.schemas.Schema; >>>>> import org.apache.beam.sdk.schemas.transforms.RenameFields; >>>>> import org.apache.beam.sdk.transforms.Create; >>>>> import org.apache.beam.sdk.values.Row; >>>>> >>>>> import java.io.File; >>>>> import java.util.Arrays; >>>>> import java.util.HashSet; >>>>> import java.util.stream.Collectors; >>>>> >>>>> import static java.util.Arrays.*asList*; >>>>> >>>>> public class BQRenameFields { >>>>> public static void main(String[] args) { >>>>> PipelineOptionsFactory.*register*(DataflowPipelineOptions. >>>>> class); >>>>> DataflowPipelineOptions options = PipelineOptionsFactory. >>>>> *fromArgs*(args).as(DataflowPipelineOptions.class); >>>>> options.setFilesToStage( >>>>> Arrays.*stream*(System.*getProperty*("java.class.path" >>>>> ). >>>>> split(File.*pathSeparator*)). >>>>> map(entry -> (new >>>>> File(entry)).toString()).collect(Collectors.*toList*())); >>>>> >>>>> Pipeline pipeline = Pipeline.*create*(options); >>>>> >>>>> Schema nestedSchema = Schema.*builder*().addField(Schema.Field >>>>> .*nullable*("field1_0", Schema.FieldType.*STRING*)).build(); >>>>> Schema.Field field = Schema.Field.*nullable*("field0_0", >>>>> Schema.FieldType.*STRING*); >>>>> Schema.Field nested = Schema.Field.*nullable*("field0_1", >>>>> Schema.FieldType.*row*(nestedSchema)); >>>>> Schema.Field runner = Schema.Field.*nullable*("field0_2", >>>>> Schema.FieldType.*STRING*); >>>>> Schema rowSchema = Schema.*builder*() >>>>> .addFields(field, nested, runner) >>>>> .build(); >>>>> Row testRow = Row.*withSchema*(rowSchema).attachValues( >>>>> "value0_0", Row.*withSchema*(nestedSchema).attachValues("value1_0"), >>>>> options.getRunner().toString()); >>>>> pipeline >>>>> .apply(Create.*of*(testRow).withRowSchema(rowSchema)) >>>>> .apply(RenameFields.<Row>*create*() >>>>> .rename("field0_0", "stringField") >>>>> .rename("field0_1", "nestedField") >>>>> .rename("field0_1.field1_0", >>>>> "nestedStringField") >>>>> .rename("field0_2", "runner")) >>>>> .apply(BigQueryIO.<Row>*write*() >>>>> .to(new TableReference().setProjectId( >>>>> "lt-dia-lake-exp-raw").setDatasetId("prototypes").setTableId( >>>>> "matto_renameFields")) >>>>> .withCreateDisposition(BigQueryIO.Write. >>>>> CreateDisposition.*CREATE_IF_NEEDED*) >>>>> .withWriteDisposition(BigQueryIO.Write. >>>>> WriteDisposition.*WRITE_APPEND*) >>>>> .withSchemaUpdateOptions(new HashSet<>( >>>>> *asList*(BigQueryIO.Write.SchemaUpdateOption.*ALLOW_FIELD_ADDITION*))) >>>>> .useBeamSchema()); >>>>> pipeline.run(); >>>>> } >>>>> } >>>>> >>>>