+dev <d...@beam.apache.org> > I bet the DirectRunner is encoding and decoding in between, which fixes the object.
Do we need better testing of schema-aware (and potentially other built-in) transforms in the face of fusion to root out issues like this? Brian On Wed, Jun 2, 2021 at 5:13 AM Matthew Ouyang <matthew.ouy...@gmail.com> wrote: > I have some other work-related things I need to do this week, so I will > likely report back on this over the weekend. Thank you for the > explanation. It makes perfect sense now. > > On Tue, Jun 1, 2021 at 11:18 PM Reuven Lax <re...@google.com> wrote: > >> Some more context - the problem is that RenameFields outputs (in this >> case) Java Row objects that are inconsistent with the actual schema. >> For example if you have the following schema: >> >> Row { >> field1: Row { >> field2: string >> } >> } >> >> And rename field1.field2 -> renamed, you'll get the following schema >> >> Row { >> field1: Row { >> renamed: string >> } >> } >> >> However the Java object for the _nested_ row will return the old schema >> if getSchema() is called on it. This is because we only update the schema >> on the top-level row. >> >> I think this explains why your test works in the direct runner. If the >> row ever goes through an encode/decode path, it will come back correct. The >> original incorrect Java objects are no longer around, and new (consistent) >> objects are constructed from the raw data and the PCollection schema. >> Dataflow tends to fuse ParDos together, so the following ParDo will see the >> incorrect Row object. I bet the DirectRunner is encoding and decoding in >> between, which fixes the object. >> >> You can validate this theory by forcing a shuffle after RenameFields >> using Reshufflle. It should fix the issue If it does, let me know and I'll >> work on a fix to RenameFields. >> >> On Tue, Jun 1, 2021 at 7:39 PM Reuven Lax <re...@google.com> wrote: >> >>> Aha, yes this indeed another bug in the transform. The schema is set on >>> the top-level Row but not on any nested rows. >>> >>> On Tue, Jun 1, 2021 at 6:37 PM Matthew Ouyang <matthew.ouy...@gmail.com> >>> wrote: >>> >>>> Thank you everyone for your input. I believe it will be easiest to >>>> respond to all feedback in a single message rather than messages per >>>> person. >>>> >>>> - NeedsRunner - The tests are run eventually, so obviously all good >>>> on my end. I was trying to run the smallest subset of test cases >>>> possible >>>> and didn't venture beyond `gradle test`. >>>> - Stack Trace - There wasn't any unfortunately because no exception >>>> thrown in the code. The Beam Row was translated into a BQ TableRow and >>>> an >>>> insertion was attempted. The error "message" was part of the response >>>> JSON >>>> that came back as a result of a request against the BQ API. >>>> - Desired Behaviour - (field0_1.field1_0, nestedStringField) -> >>>> field0_1.nestedStringField is what I am looking for. >>>> - Info Logging Findings (In Lieu of a Stack Trace) >>>> - The Beam Schema was as expected with all renames applied. >>>> - The example I provided was heavily stripped down in order to >>>> isolate the problem. My work example which a bit impractical >>>> because it's >>>> part of some generic tooling has 4 levels of nesting and also >>>> produces the >>>> correct output too. >>>> - BigQueryUtils.toTableRow(Row) returns the expected TableRow in >>>> DirectRunner. In DataflowRunner however, only the top-level renames >>>> were >>>> reflected in the TableRow and all renames in the nested fields >>>> weren't. >>>> - BigQueryUtils.toTableRow(Row) recurses on the Row values and >>>> uses the Row.schema to get the field names. This makes sense to me, >>>> but if >>>> a value is actually a Row then its schema appears to be inconsistent >>>> with >>>> the top-level schema >>>> - My Current Workaround - I forked RenameFields and replaced the >>>> attachValues in expand method to be a "deep" rename. This is obviously >>>> inefficient and I will not be submitting a PR for that. >>>> - JIRA ticket - https://issues.apache.org/jira/browse/BEAM-12442 >>>> >>>> >>>> On Tue, Jun 1, 2021 at 5:51 PM Reuven Lax <re...@google.com> wrote: >>>> >>>>> This transform is the same across all runners. A few comments on the >>>>> test: >>>>> >>>>> - Using attachValues directly is error prone (per the comment on the >>>>> method). I recommend using the withFieldValue builders instead. >>>>> - I recommend capturing the RenameFields PCollection into a local >>>>> variable of type PCollection<Row> and printing out the schema (which you >>>>> can get using the PCollection.getSchema method) to ensure that the output >>>>> schema looks like you expect. >>>>> - RenameFields doesn't flatten. So renaming field0_1.field1_0 - > >>>>> nestedStringField results in field0_1.nestedStringField; if you wanted to >>>>> flatten, then the better transform would be >>>>> Select.fieldNameAs("field0_1.field1_0", nestedStringField). >>>>> >>>>> This all being said, eyeballing the implementation of RenameFields >>>>> makes me think that it is buggy in the case where you specify a top-level >>>>> field multiple times like you do. I think it is simply adding the >>>>> top-level >>>>> field into the output schema multiple times, and the second time is with >>>>> the field0_1 base name; I have no idea why your test doesn't catch this in >>>>> the DirectRunner, as it's equally broken there. Could you file a JIRA >>>>> about >>>>> this issue and assign it to me? >>>>> >>>>> Reuven >>>>> >>>>> On Tue, Jun 1, 2021 at 12:47 PM Kenneth Knowles <k...@apache.org> >>>>> wrote: >>>>> >>>>>> >>>>>> >>>>>> On Tue, Jun 1, 2021 at 12:42 PM Brian Hulette <bhule...@google.com> >>>>>> wrote: >>>>>> >>>>>>> Hi Matthew, >>>>>>> >>>>>>> > The unit tests also seem to be disabled for this as well and so I >>>>>>> don’t know if the PTransform behaves as expected. >>>>>>> >>>>>>> The exclusion for NeedsRunner tests is just a quirk in our testing >>>>>>> framework. NeedsRunner indicates that a test suite can't be executed >>>>>>> with >>>>>>> the SDK alone, it needs a runner. So that exclusion just makes sure we >>>>>>> don't run the test when we're verifying the SDK by itself in the >>>>>>> :sdks:java:core:test task. The test is still run in other tasks where we >>>>>>> have a runner, most notably in the Java PreCommit [1], where we run it >>>>>>> as >>>>>>> part of the :runners:direct-java:test task. >>>>>>> >>>>>>> That being said, we may only run these tests continuously with the >>>>>>> DirectRunner, I'm not sure if we test them on all the runners like we do >>>>>>> with ValidatesRunner tests. >>>>>>> >>>>>> >>>>>> That is correct. The tests are tests _of the transform_ so they run >>>>>> only on the DirectRunner. They are not tests of the runner, which is only >>>>>> responsible for correctly implementing Beam's primitives. The transform >>>>>> should not behave differently on different runners, except for >>>>>> fundamental >>>>>> differences in how they schedule work and checkpoint. >>>>>> >>>>>> Kenn >>>>>> >>>>>> >>>>>>> > The error message I’m receiving, : Error while reading data, >>>>>>> error message: JSON parsing error in row starting at position 0: No such >>>>>>> field: nestedField.field1_0, suggests the BigQuery is trying to use >>>>>>> the original name for the nested field and not the substitute name. >>>>>>> >>>>>>> Is there a stacktrace associated with this error? It would be >>>>>>> helpful to see where the error is coming from. >>>>>>> >>>>>>> Brian >>>>>>> >>>>>>> >>>>>>> [1] >>>>>>> https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/4101/testReport/org.apache.beam.sdk.schemas.transforms/RenameFieldsTest/ >>>>>>> >>>>>>> On Mon, May 31, 2021 at 5:02 PM Matthew Ouyang < >>>>>>> matthew.ouy...@gmail.com> wrote: >>>>>>> >>>>>>>> I’m trying to use the RenameFields transform prior to inserting >>>>>>>> into BigQuery on nested fields. Insertion into BigQuery is successful >>>>>>>> with >>>>>>>> DirectRunner, but DataflowRunner has an issue with renamed nested >>>>>>>> fields >>>>>>>> The error message I’m receiving, : Error while reading data, >>>>>>>> error message: JSON parsing error in row starting at position 0: No >>>>>>>> such >>>>>>>> field: nestedField.field1_0, suggests the BigQuery is trying to >>>>>>>> use the original name for the nested field and not the substitute name. >>>>>>>> >>>>>>>> The code for RenameFields seems simple enough but does it behave >>>>>>>> differently in different runners? Will a deep attachValues be >>>>>>>> necessary in >>>>>>>> order get the nested renames to work across all runners? Is there >>>>>>>> something >>>>>>>> wrong in my code? >>>>>>>> >>>>>>>> >>>>>>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/RenameFields.java#L186 >>>>>>>> >>>>>>>> The unit tests also seem to be disabled for this as well and so I >>>>>>>> don’t know if the PTransform behaves as expected. >>>>>>>> >>>>>>>> >>>>>>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/build.gradle#L67 >>>>>>>> >>>>>>>> >>>>>>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/RenameFieldsTest.java >>>>>>>> >>>>>>>> package ca.loblaw.cerebro.PipelineControl; >>>>>>>>> >>>>>>>>> import com.google.api.services.bigquery.model.TableReference; >>>>>>>>> import >>>>>>>>> org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; >>>>>>>>> import org.apache.beam.sdk.Pipeline; >>>>>>>>> import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; >>>>>>>>> import org.apache.beam.sdk.options.PipelineOptionsFactory; >>>>>>>>> import org.apache.beam.sdk.schemas.Schema; >>>>>>>>> import org.apache.beam.sdk.schemas.transforms.RenameFields; >>>>>>>>> import org.apache.beam.sdk.transforms.Create; >>>>>>>>> import org.apache.beam.sdk.values.Row; >>>>>>>>> >>>>>>>>> import java.io.File; >>>>>>>>> import java.util.Arrays; >>>>>>>>> import java.util.HashSet; >>>>>>>>> import java.util.stream.Collectors; >>>>>>>>> >>>>>>>>> import static java.util.Arrays.*asList*; >>>>>>>>> >>>>>>>>> public class BQRenameFields { >>>>>>>>> public static void main(String[] args) { >>>>>>>>> PipelineOptionsFactory.*register*(DataflowPipelineOptions. >>>>>>>>> class); >>>>>>>>> DataflowPipelineOptions options = PipelineOptionsFactory. >>>>>>>>> *fromArgs*(args).as(DataflowPipelineOptions.class); >>>>>>>>> options.setFilesToStage( >>>>>>>>> Arrays.*stream*(System.*getProperty*( >>>>>>>>> "java.class.path"). >>>>>>>>> split(File.*pathSeparator*)). >>>>>>>>> map(entry -> (new >>>>>>>>> File(entry)).toString()).collect(Collectors.*toList*())); >>>>>>>>> >>>>>>>>> Pipeline pipeline = Pipeline.*create*(options); >>>>>>>>> >>>>>>>>> Schema nestedSchema = Schema.*builder*().addField(Schema. >>>>>>>>> Field.*nullable*("field1_0", Schema.FieldType.*STRING*)).build(); >>>>>>>>> Schema.Field field = Schema.Field.*nullable*("field0_0", >>>>>>>>> Schema.FieldType.*STRING*); >>>>>>>>> Schema.Field nested = Schema.Field.*nullable*("field0_1", >>>>>>>>> Schema.FieldType.*row*(nestedSchema)); >>>>>>>>> Schema.Field runner = Schema.Field.*nullable*("field0_2", >>>>>>>>> Schema.FieldType.*STRING*); >>>>>>>>> Schema rowSchema = Schema.*builder*() >>>>>>>>> .addFields(field, nested, runner) >>>>>>>>> .build(); >>>>>>>>> Row testRow = Row.*withSchema*(rowSchema).attachValues( >>>>>>>>> "value0_0", Row.*withSchema*(nestedSchema).attachValues("value1_0" >>>>>>>>> ), options.getRunner().toString()); >>>>>>>>> pipeline >>>>>>>>> .apply(Create.*of*(testRow).withRowSchema( >>>>>>>>> rowSchema)) >>>>>>>>> .apply(RenameFields.<Row>*create*() >>>>>>>>> .rename("field0_0", "stringField") >>>>>>>>> .rename("field0_1", "nestedField") >>>>>>>>> .rename("field0_1.field1_0", >>>>>>>>> "nestedStringField") >>>>>>>>> .rename("field0_2", "runner")) >>>>>>>>> .apply(BigQueryIO.<Row>*write*() >>>>>>>>> .to(new TableReference().setProjectId( >>>>>>>>> "lt-dia-lake-exp-raw").setDatasetId("prototypes").setTableId( >>>>>>>>> "matto_renameFields")) >>>>>>>>> .withCreateDisposition(BigQueryIO.Write. >>>>>>>>> CreateDisposition.*CREATE_IF_NEEDED*) >>>>>>>>> .withWriteDisposition(BigQueryIO.Write. >>>>>>>>> WriteDisposition.*WRITE_APPEND*) >>>>>>>>> .withSchemaUpdateOptions(new HashSet<>( >>>>>>>>> *asList*(BigQueryIO.Write.SchemaUpdateOption. >>>>>>>>> *ALLOW_FIELD_ADDITION*))) >>>>>>>>> .useBeamSchema()); >>>>>>>>> pipeline.run(); >>>>>>>>> } >>>>>>>>> } >>>>>>>>> >>>>>>>>