I have some other work-related things I need to do this week, so I will likely report back on this over the weekend. Thank you for the explanation. It makes perfect sense now.
On Tue, Jun 1, 2021 at 11:18 PM Reuven Lax <re...@google.com> wrote: > Some more context - the problem is that RenameFields outputs (in this > case) Java Row objects that are inconsistent with the actual schema. > For example if you have the following schema: > > Row { > field1: Row { > field2: string > } > } > > And rename field1.field2 -> renamed, you'll get the following schema > > Row { > field1: Row { > renamed: string > } > } > > However the Java object for the _nested_ row will return the old schema if > getSchema() is called on it. This is because we only update the schema on > the top-level row. > > I think this explains why your test works in the direct runner. If the row > ever goes through an encode/decode path, it will come back correct. The > original incorrect Java objects are no longer around, and new (consistent) > objects are constructed from the raw data and the PCollection schema. > Dataflow tends to fuse ParDos together, so the following ParDo will see the > incorrect Row object. I bet the DirectRunner is encoding and decoding in > between, which fixes the object. > > You can validate this theory by forcing a shuffle after RenameFields using > Reshufflle. It should fix the issue If it does, let me know and I'll work > on a fix to RenameFields. > > On Tue, Jun 1, 2021 at 7:39 PM Reuven Lax <re...@google.com> wrote: > >> Aha, yes this indeed another bug in the transform. The schema is set on >> the top-level Row but not on any nested rows. >> >> On Tue, Jun 1, 2021 at 6:37 PM Matthew Ouyang <matthew.ouy...@gmail.com> >> wrote: >> >>> Thank you everyone for your input. I believe it will be easiest to >>> respond to all feedback in a single message rather than messages per person. >>> >>> - NeedsRunner - The tests are run eventually, so obviously all good >>> on my end. I was trying to run the smallest subset of test cases >>> possible >>> and didn't venture beyond `gradle test`. >>> - Stack Trace - There wasn't any unfortunately because no exception >>> thrown in the code. The Beam Row was translated into a BQ TableRow and >>> an >>> insertion was attempted. The error "message" was part of the response >>> JSON >>> that came back as a result of a request against the BQ API. >>> - Desired Behaviour - (field0_1.field1_0, nestedStringField) -> >>> field0_1.nestedStringField is what I am looking for. >>> - Info Logging Findings (In Lieu of a Stack Trace) >>> - The Beam Schema was as expected with all renames applied. >>> - The example I provided was heavily stripped down in order to >>> isolate the problem. My work example which a bit impractical because >>> it's >>> part of some generic tooling has 4 levels of nesting and also >>> produces the >>> correct output too. >>> - BigQueryUtils.toTableRow(Row) returns the expected TableRow in >>> DirectRunner. In DataflowRunner however, only the top-level renames >>> were >>> reflected in the TableRow and all renames in the nested fields >>> weren't. >>> - BigQueryUtils.toTableRow(Row) recurses on the Row values and >>> uses the Row.schema to get the field names. This makes sense to me, >>> but if >>> a value is actually a Row then its schema appears to be inconsistent >>> with >>> the top-level schema >>> - My Current Workaround - I forked RenameFields and replaced the >>> attachValues in expand method to be a "deep" rename. This is obviously >>> inefficient and I will not be submitting a PR for that. >>> - JIRA ticket - https://issues.apache.org/jira/browse/BEAM-12442 >>> >>> >>> On Tue, Jun 1, 2021 at 5:51 PM Reuven Lax <re...@google.com> wrote: >>> >>>> This transform is the same across all runners. A few comments on the >>>> test: >>>> >>>> - Using attachValues directly is error prone (per the comment on the >>>> method). I recommend using the withFieldValue builders instead. >>>> - I recommend capturing the RenameFields PCollection into a local >>>> variable of type PCollection<Row> and printing out the schema (which you >>>> can get using the PCollection.getSchema method) to ensure that the output >>>> schema looks like you expect. >>>> - RenameFields doesn't flatten. So renaming field0_1.field1_0 - > >>>> nestedStringField results in field0_1.nestedStringField; if you wanted to >>>> flatten, then the better transform would be >>>> Select.fieldNameAs("field0_1.field1_0", nestedStringField). >>>> >>>> This all being said, eyeballing the implementation of RenameFields >>>> makes me think that it is buggy in the case where you specify a top-level >>>> field multiple times like you do. I think it is simply adding the top-level >>>> field into the output schema multiple times, and the second time is with >>>> the field0_1 base name; I have no idea why your test doesn't catch this in >>>> the DirectRunner, as it's equally broken there. Could you file a JIRA about >>>> this issue and assign it to me? >>>> >>>> Reuven >>>> >>>> On Tue, Jun 1, 2021 at 12:47 PM Kenneth Knowles <k...@apache.org> >>>> wrote: >>>> >>>>> >>>>> >>>>> On Tue, Jun 1, 2021 at 12:42 PM Brian Hulette <bhule...@google.com> >>>>> wrote: >>>>> >>>>>> Hi Matthew, >>>>>> >>>>>> > The unit tests also seem to be disabled for this as well and so I >>>>>> don’t know if the PTransform behaves as expected. >>>>>> >>>>>> The exclusion for NeedsRunner tests is just a quirk in our testing >>>>>> framework. NeedsRunner indicates that a test suite can't be executed with >>>>>> the SDK alone, it needs a runner. So that exclusion just makes sure we >>>>>> don't run the test when we're verifying the SDK by itself in the >>>>>> :sdks:java:core:test task. The test is still run in other tasks where we >>>>>> have a runner, most notably in the Java PreCommit [1], where we run it as >>>>>> part of the :runners:direct-java:test task. >>>>>> >>>>>> That being said, we may only run these tests continuously with the >>>>>> DirectRunner, I'm not sure if we test them on all the runners like we do >>>>>> with ValidatesRunner tests. >>>>>> >>>>> >>>>> That is correct. The tests are tests _of the transform_ so they run >>>>> only on the DirectRunner. They are not tests of the runner, which is only >>>>> responsible for correctly implementing Beam's primitives. The transform >>>>> should not behave differently on different runners, except for fundamental >>>>> differences in how they schedule work and checkpoint. >>>>> >>>>> Kenn >>>>> >>>>> >>>>>> > The error message I’m receiving, : Error while reading data, error >>>>>> message: JSON parsing error in row starting at position 0: No such field: >>>>>> nestedField.field1_0, suggests the BigQuery is trying to use the >>>>>> original name for the nested field and not the substitute name. >>>>>> >>>>>> Is there a stacktrace associated with this error? It would be helpful >>>>>> to see where the error is coming from. >>>>>> >>>>>> Brian >>>>>> >>>>>> >>>>>> [1] >>>>>> https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/4101/testReport/org.apache.beam.sdk.schemas.transforms/RenameFieldsTest/ >>>>>> >>>>>> On Mon, May 31, 2021 at 5:02 PM Matthew Ouyang < >>>>>> matthew.ouy...@gmail.com> wrote: >>>>>> >>>>>>> I’m trying to use the RenameFields transform prior to inserting into >>>>>>> BigQuery on nested fields. Insertion into BigQuery is successful with >>>>>>> DirectRunner, but DataflowRunner has an issue with renamed nested fields >>>>>>> The error message I’m receiving, : Error while reading data, error >>>>>>> message: JSON parsing error in row starting at position 0: No such >>>>>>> field: >>>>>>> nestedField.field1_0, suggests the BigQuery is trying to use the >>>>>>> original name for the nested field and not the substitute name. >>>>>>> >>>>>>> The code for RenameFields seems simple enough but does it behave >>>>>>> differently in different runners? Will a deep attachValues be >>>>>>> necessary in >>>>>>> order get the nested renames to work across all runners? Is there >>>>>>> something >>>>>>> wrong in my code? >>>>>>> >>>>>>> >>>>>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/RenameFields.java#L186 >>>>>>> >>>>>>> The unit tests also seem to be disabled for this as well and so I >>>>>>> don’t know if the PTransform behaves as expected. >>>>>>> >>>>>>> >>>>>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/build.gradle#L67 >>>>>>> >>>>>>> >>>>>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/RenameFieldsTest.java >>>>>>> >>>>>>> package ca.loblaw.cerebro.PipelineControl; >>>>>>>> >>>>>>>> import com.google.api.services.bigquery.model.TableReference; >>>>>>>> import >>>>>>>> org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; >>>>>>>> import org.apache.beam.sdk.Pipeline; >>>>>>>> import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; >>>>>>>> import org.apache.beam.sdk.options.PipelineOptionsFactory; >>>>>>>> import org.apache.beam.sdk.schemas.Schema; >>>>>>>> import org.apache.beam.sdk.schemas.transforms.RenameFields; >>>>>>>> import org.apache.beam.sdk.transforms.Create; >>>>>>>> import org.apache.beam.sdk.values.Row; >>>>>>>> >>>>>>>> import java.io.File; >>>>>>>> import java.util.Arrays; >>>>>>>> import java.util.HashSet; >>>>>>>> import java.util.stream.Collectors; >>>>>>>> >>>>>>>> import static java.util.Arrays.*asList*; >>>>>>>> >>>>>>>> public class BQRenameFields { >>>>>>>> public static void main(String[] args) { >>>>>>>> PipelineOptionsFactory.*register*(DataflowPipelineOptions. >>>>>>>> class); >>>>>>>> DataflowPipelineOptions options = PipelineOptionsFactory. >>>>>>>> *fromArgs*(args).as(DataflowPipelineOptions.class); >>>>>>>> options.setFilesToStage( >>>>>>>> Arrays.*stream*(System.*getProperty*( >>>>>>>> "java.class.path"). >>>>>>>> split(File.*pathSeparator*)). >>>>>>>> map(entry -> (new >>>>>>>> File(entry)).toString()).collect(Collectors.*toList*())); >>>>>>>> >>>>>>>> Pipeline pipeline = Pipeline.*create*(options); >>>>>>>> >>>>>>>> Schema nestedSchema = Schema.*builder*().addField(Schema. >>>>>>>> Field.*nullable*("field1_0", Schema.FieldType.*STRING*)).build(); >>>>>>>> Schema.Field field = Schema.Field.*nullable*("field0_0", >>>>>>>> Schema.FieldType.*STRING*); >>>>>>>> Schema.Field nested = Schema.Field.*nullable*("field0_1", >>>>>>>> Schema.FieldType.*row*(nestedSchema)); >>>>>>>> Schema.Field runner = Schema.Field.*nullable*("field0_2", >>>>>>>> Schema.FieldType.*STRING*); >>>>>>>> Schema rowSchema = Schema.*builder*() >>>>>>>> .addFields(field, nested, runner) >>>>>>>> .build(); >>>>>>>> Row testRow = Row.*withSchema*(rowSchema).attachValues( >>>>>>>> "value0_0", Row.*withSchema*(nestedSchema).attachValues("value1_0" >>>>>>>> ), options.getRunner().toString()); >>>>>>>> pipeline >>>>>>>> .apply(Create.*of*(testRow).withRowSchema(rowSchema >>>>>>>> )) >>>>>>>> .apply(RenameFields.<Row>*create*() >>>>>>>> .rename("field0_0", "stringField") >>>>>>>> .rename("field0_1", "nestedField") >>>>>>>> .rename("field0_1.field1_0", >>>>>>>> "nestedStringField") >>>>>>>> .rename("field0_2", "runner")) >>>>>>>> .apply(BigQueryIO.<Row>*write*() >>>>>>>> .to(new TableReference().setProjectId( >>>>>>>> "lt-dia-lake-exp-raw").setDatasetId("prototypes").setTableId( >>>>>>>> "matto_renameFields")) >>>>>>>> .withCreateDisposition(BigQueryIO.Write. >>>>>>>> CreateDisposition.*CREATE_IF_NEEDED*) >>>>>>>> .withWriteDisposition(BigQueryIO.Write. >>>>>>>> WriteDisposition.*WRITE_APPEND*) >>>>>>>> .withSchemaUpdateOptions(new HashSet<>( >>>>>>>> *asList*(BigQueryIO.Write.SchemaUpdateOption.*ALLOW_FIELD_ADDITION* >>>>>>>> ))) >>>>>>>> .useBeamSchema()); >>>>>>>> pipeline.run(); >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>