Would it be caught by CoderProperties? Kenn
On Wed, Jun 2, 2021 at 8:16 AM Reuven Lax <[email protected]> wrote: > I don't think this bug is schema specific - we created a Java object that > is inconsistent with its encoded form, which could happen to any transform. > > This does seem to be a gap in DirectRunner testing though. It also makes > it hard to test using PAssert, as I believe that puts everything in a side > input, forcing an encoding/decoding. > > On Wed, Jun 2, 2021 at 8:12 AM Brian Hulette <[email protected]> wrote: > >> +dev <[email protected]> >> >> > I bet the DirectRunner is encoding and decoding in between, which fixes >> the object. >> >> Do we need better testing of schema-aware (and potentially other >> built-in) transforms in the face of fusion to root out issues like this? >> >> Brian >> >> On Wed, Jun 2, 2021 at 5:13 AM Matthew Ouyang <[email protected]> >> wrote: >> >>> I have some other work-related things I need to do this week, so I will >>> likely report back on this over the weekend. Thank you for the >>> explanation. It makes perfect sense now. >>> >>> On Tue, Jun 1, 2021 at 11:18 PM Reuven Lax <[email protected]> wrote: >>> >>>> Some more context - the problem is that RenameFields outputs (in this >>>> case) Java Row objects that are inconsistent with the actual schema. >>>> For example if you have the following schema: >>>> >>>> Row { >>>> field1: Row { >>>> field2: string >>>> } >>>> } >>>> >>>> And rename field1.field2 -> renamed, you'll get the following schema >>>> >>>> Row { >>>> field1: Row { >>>> renamed: string >>>> } >>>> } >>>> >>>> However the Java object for the _nested_ row will return the old schema >>>> if getSchema() is called on it. This is because we only update the schema >>>> on the top-level row. >>>> >>>> I think this explains why your test works in the direct runner. If the >>>> row ever goes through an encode/decode path, it will come back correct. The >>>> original incorrect Java objects are no longer around, and new (consistent) >>>> objects are constructed from the raw data and the PCollection schema. >>>> Dataflow tends to fuse ParDos together, so the following ParDo will see the >>>> incorrect Row object. I bet the DirectRunner is encoding and decoding in >>>> between, which fixes the object. >>>> >>>> You can validate this theory by forcing a shuffle after RenameFields >>>> using Reshufflle. It should fix the issue If it does, let me know and I'll >>>> work on a fix to RenameFields. >>>> >>>> On Tue, Jun 1, 2021 at 7:39 PM Reuven Lax <[email protected]> wrote: >>>> >>>>> Aha, yes this indeed another bug in the transform. The schema is set >>>>> on the top-level Row but not on any nested rows. >>>>> >>>>> On Tue, Jun 1, 2021 at 6:37 PM Matthew Ouyang < >>>>> [email protected]> wrote: >>>>> >>>>>> Thank you everyone for your input. I believe it will be easiest to >>>>>> respond to all feedback in a single message rather than messages per >>>>>> person. >>>>>> >>>>>> - NeedsRunner - The tests are run eventually, so obviously all >>>>>> good on my end. I was trying to run the smallest subset of test cases >>>>>> possible and didn't venture beyond `gradle test`. >>>>>> - Stack Trace - There wasn't any unfortunately because no >>>>>> exception thrown in the code. The Beam Row was translated into a BQ >>>>>> TableRow and an insertion was attempted. The error "message" was >>>>>> part of >>>>>> the response JSON that came back as a result of a request against the >>>>>> BQ >>>>>> API. >>>>>> - Desired Behaviour - (field0_1.field1_0, nestedStringField) -> >>>>>> field0_1.nestedStringField is what I am looking for. >>>>>> - Info Logging Findings (In Lieu of a Stack Trace) >>>>>> - The Beam Schema was as expected with all renames applied. >>>>>> - The example I provided was heavily stripped down in order to >>>>>> isolate the problem. My work example which a bit impractical >>>>>> because it's >>>>>> part of some generic tooling has 4 levels of nesting and also >>>>>> produces the >>>>>> correct output too. >>>>>> - BigQueryUtils.toTableRow(Row) returns the expected TableRow >>>>>> in DirectRunner. In DataflowRunner however, only the top-level >>>>>> renames >>>>>> were reflected in the TableRow and all renames in the nested >>>>>> fields weren't. >>>>>> - BigQueryUtils.toTableRow(Row) recurses on the Row values and >>>>>> uses the Row.schema to get the field names. This makes sense to >>>>>> me, but if >>>>>> a value is actually a Row then its schema appears to be >>>>>> inconsistent with >>>>>> the top-level schema >>>>>> - My Current Workaround - I forked RenameFields and replaced the >>>>>> attachValues in expand method to be a "deep" rename. This is >>>>>> obviously >>>>>> inefficient and I will not be submitting a PR for that. >>>>>> - JIRA ticket - https://issues.apache.org/jira/browse/BEAM-12442 >>>>>> >>>>>> >>>>>> On Tue, Jun 1, 2021 at 5:51 PM Reuven Lax <[email protected]> wrote: >>>>>> >>>>>>> This transform is the same across all runners. A few comments on the >>>>>>> test: >>>>>>> >>>>>>> - Using attachValues directly is error prone (per the comment on >>>>>>> the method). I recommend using the withFieldValue builders instead. >>>>>>> - I recommend capturing the RenameFields PCollection into a local >>>>>>> variable of type PCollection<Row> and printing out the schema (which you >>>>>>> can get using the PCollection.getSchema method) to ensure that the >>>>>>> output >>>>>>> schema looks like you expect. >>>>>>> - RenameFields doesn't flatten. So renaming field0_1.field1_0 - > >>>>>>> nestedStringField results in field0_1.nestedStringField; if you wanted >>>>>>> to >>>>>>> flatten, then the better transform would be >>>>>>> Select.fieldNameAs("field0_1.field1_0", nestedStringField). >>>>>>> >>>>>>> This all being said, eyeballing the implementation of RenameFields >>>>>>> makes me think that it is buggy in the case where you specify a >>>>>>> top-level >>>>>>> field multiple times like you do. I think it is simply adding the >>>>>>> top-level >>>>>>> field into the output schema multiple times, and the second time is with >>>>>>> the field0_1 base name; I have no idea why your test doesn't catch this >>>>>>> in >>>>>>> the DirectRunner, as it's equally broken there. Could you file a JIRA >>>>>>> about >>>>>>> this issue and assign it to me? >>>>>>> >>>>>>> Reuven >>>>>>> >>>>>>> On Tue, Jun 1, 2021 at 12:47 PM Kenneth Knowles <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Jun 1, 2021 at 12:42 PM Brian Hulette <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi Matthew, >>>>>>>>> >>>>>>>>> > The unit tests also seem to be disabled for this as well and so >>>>>>>>> I don’t know if the PTransform behaves as expected. >>>>>>>>> >>>>>>>>> The exclusion for NeedsRunner tests is just a quirk in our testing >>>>>>>>> framework. NeedsRunner indicates that a test suite can't be executed >>>>>>>>> with >>>>>>>>> the SDK alone, it needs a runner. So that exclusion just makes sure we >>>>>>>>> don't run the test when we're verifying the SDK by itself in the >>>>>>>>> :sdks:java:core:test task. The test is still run in other tasks where >>>>>>>>> we >>>>>>>>> have a runner, most notably in the Java PreCommit [1], where we run >>>>>>>>> it as >>>>>>>>> part of the :runners:direct-java:test task. >>>>>>>>> >>>>>>>>> That being said, we may only run these tests continuously with the >>>>>>>>> DirectRunner, I'm not sure if we test them on all the runners like we >>>>>>>>> do >>>>>>>>> with ValidatesRunner tests. >>>>>>>>> >>>>>>>> >>>>>>>> That is correct. The tests are tests _of the transform_ so they run >>>>>>>> only on the DirectRunner. They are not tests of the runner, which is >>>>>>>> only >>>>>>>> responsible for correctly implementing Beam's primitives. The transform >>>>>>>> should not behave differently on different runners, except for >>>>>>>> fundamental >>>>>>>> differences in how they schedule work and checkpoint. >>>>>>>> >>>>>>>> Kenn >>>>>>>> >>>>>>>> >>>>>>>>> > The error message I’m receiving, : Error while reading data, >>>>>>>>> error message: JSON parsing error in row starting at position 0: No >>>>>>>>> such >>>>>>>>> field: nestedField.field1_0, suggests the BigQuery is trying to >>>>>>>>> use the original name for the nested field and not the substitute >>>>>>>>> name. >>>>>>>>> >>>>>>>>> Is there a stacktrace associated with this error? It would be >>>>>>>>> helpful to see where the error is coming from. >>>>>>>>> >>>>>>>>> Brian >>>>>>>>> >>>>>>>>> >>>>>>>>> [1] >>>>>>>>> https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/4101/testReport/org.apache.beam.sdk.schemas.transforms/RenameFieldsTest/ >>>>>>>>> >>>>>>>>> On Mon, May 31, 2021 at 5:02 PM Matthew Ouyang < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>>> I’m trying to use the RenameFields transform prior to inserting >>>>>>>>>> into BigQuery on nested fields. Insertion into BigQuery is >>>>>>>>>> successful with >>>>>>>>>> DirectRunner, but DataflowRunner has an issue with renamed nested >>>>>>>>>> fields >>>>>>>>>> The error message I’m receiving, : Error while reading data, >>>>>>>>>> error message: JSON parsing error in row starting at position 0: No >>>>>>>>>> such >>>>>>>>>> field: nestedField.field1_0, suggests the BigQuery is trying to >>>>>>>>>> use the original name for the nested field and not the substitute >>>>>>>>>> name. >>>>>>>>>> >>>>>>>>>> The code for RenameFields seems simple enough but does it behave >>>>>>>>>> differently in different runners? Will a deep attachValues be >>>>>>>>>> necessary in >>>>>>>>>> order get the nested renames to work across all runners? Is there >>>>>>>>>> something >>>>>>>>>> wrong in my code? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/RenameFields.java#L186 >>>>>>>>>> >>>>>>>>>> The unit tests also seem to be disabled for this as well and so I >>>>>>>>>> don’t know if the PTransform behaves as expected. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/build.gradle#L67 >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/RenameFieldsTest.java >>>>>>>>>> >>>>>>>>>> package ca.loblaw.cerebro.PipelineControl; >>>>>>>>>>> >>>>>>>>>>> import com.google.api.services.bigquery.model.TableReference; >>>>>>>>>>> import >>>>>>>>>>> org.apache.beam.runners.dataflow.options.DataflowPipelineOptions >>>>>>>>>>> ; >>>>>>>>>>> import org.apache.beam.sdk.Pipeline; >>>>>>>>>>> import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; >>>>>>>>>>> import org.apache.beam.sdk.options.PipelineOptionsFactory; >>>>>>>>>>> import org.apache.beam.sdk.schemas.Schema; >>>>>>>>>>> import org.apache.beam.sdk.schemas.transforms.RenameFields; >>>>>>>>>>> import org.apache.beam.sdk.transforms.Create; >>>>>>>>>>> import org.apache.beam.sdk.values.Row; >>>>>>>>>>> >>>>>>>>>>> import java.io.File; >>>>>>>>>>> import java.util.Arrays; >>>>>>>>>>> import java.util.HashSet; >>>>>>>>>>> import java.util.stream.Collectors; >>>>>>>>>>> >>>>>>>>>>> import static java.util.Arrays.*asList*; >>>>>>>>>>> >>>>>>>>>>> public class BQRenameFields { >>>>>>>>>>> public static void main(String[] args) { >>>>>>>>>>> PipelineOptionsFactory.*register*( >>>>>>>>>>> DataflowPipelineOptions.class); >>>>>>>>>>> DataflowPipelineOptions options = PipelineOptionsFactory >>>>>>>>>>> .*fromArgs*(args).as(DataflowPipelineOptions.class); >>>>>>>>>>> options.setFilesToStage( >>>>>>>>>>> Arrays.*stream*(System.*getProperty*( >>>>>>>>>>> "java.class.path"). >>>>>>>>>>> split(File.*pathSeparator*)). >>>>>>>>>>> map(entry -> (new >>>>>>>>>>> File(entry)).toString()).collect(Collectors.*toList*())); >>>>>>>>>>> >>>>>>>>>>> Pipeline pipeline = Pipeline.*create*(options); >>>>>>>>>>> >>>>>>>>>>> Schema nestedSchema = Schema.*builder*().addField(Schema >>>>>>>>>>> .Field.*nullable*("field1_0", Schema.FieldType.*STRING* >>>>>>>>>>> )).build(); >>>>>>>>>>> Schema.Field field = Schema.Field.*nullable*("field0_0" >>>>>>>>>>> , Schema.FieldType.*STRING*); >>>>>>>>>>> Schema.Field nested = Schema.Field.*nullable*("field0_1" >>>>>>>>>>> , Schema.FieldType.*row*(nestedSchema)); >>>>>>>>>>> Schema.Field runner = Schema.Field.*nullable*("field0_2" >>>>>>>>>>> , Schema.FieldType.*STRING*); >>>>>>>>>>> Schema rowSchema = Schema.*builder*() >>>>>>>>>>> .addFields(field, nested, runner) >>>>>>>>>>> .build(); >>>>>>>>>>> Row testRow = Row.*withSchema*(rowSchema).attachValues( >>>>>>>>>>> "value0_0", Row.*withSchema*(nestedSchema).attachValues( >>>>>>>>>>> "value1_0"), options.getRunner().toString()); >>>>>>>>>>> pipeline >>>>>>>>>>> .apply(Create.*of*(testRow).withRowSchema( >>>>>>>>>>> rowSchema)) >>>>>>>>>>> .apply(RenameFields.<Row>*create*() >>>>>>>>>>> .rename("field0_0", "stringField") >>>>>>>>>>> .rename("field0_1", "nestedField") >>>>>>>>>>> .rename("field0_1.field1_0", >>>>>>>>>>> "nestedStringField") >>>>>>>>>>> .rename("field0_2", "runner")) >>>>>>>>>>> .apply(BigQueryIO.<Row>*write*() >>>>>>>>>>> .to(new TableReference().setProjectId( >>>>>>>>>>> "lt-dia-lake-exp-raw").setDatasetId("prototypes").setTableId( >>>>>>>>>>> "matto_renameFields")) >>>>>>>>>>> .withCreateDisposition(BigQueryIO.Write. >>>>>>>>>>> CreateDisposition.*CREATE_IF_NEEDED*) >>>>>>>>>>> .withWriteDisposition(BigQueryIO.Write. >>>>>>>>>>> WriteDisposition.*WRITE_APPEND*) >>>>>>>>>>> .withSchemaUpdateOptions(new HashSet<>( >>>>>>>>>>> *asList*(BigQueryIO.Write.SchemaUpdateOption. >>>>>>>>>>> *ALLOW_FIELD_ADDITION*))) >>>>>>>>>>> .useBeamSchema()); >>>>>>>>>>> pipeline.run(); >>>>>>>>>>> } >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>
