Mutability checking might catch that. I meant to suggest not putting the check in the pipeline, but offering a testing discipline that will catch such issues. One thing that's been on the back burner for a long time is making CoderProperties into a CoderTester like Guava's EqualityTester. Then it can run through all the properties without a user setting up test suites. Downside is that the test failure signal gets aggregated.
Kenn On Wed, Jun 2, 2021 at 12:09 PM Brian Hulette <bhule...@google.com> wrote: > Could the DirectRunner just do an equality check whenever it does an > encode/decode? It sounds like it's already effectively performing > a CoderProperties.coderDecodeEncodeEqual for every element, just omitting > the equality check. > > On Wed, Jun 2, 2021 at 12:04 PM Reuven Lax <re...@google.com> wrote: > >> There is no bug in the Coder itself, so that wouldn't catch it. We could >> insert CoderProperties.coderDecodeEncodeEqual in a subsequent ParDo, but if >> the Direct runner already does an encode/decode before that ParDo, then >> that would have fixed the problem before we could see it. >> >> On Wed, Jun 2, 2021 at 11:53 AM Kenneth Knowles <k...@apache.org> wrote: >> >>> Would it be caught by CoderProperties? >>> >>> Kenn >>> >>> On Wed, Jun 2, 2021 at 8:16 AM Reuven Lax <re...@google.com> wrote: >>> >>>> I don't think this bug is schema specific - we created a Java object >>>> that is inconsistent with its encoded form, which could happen to any >>>> transform. >>>> >>>> This does seem to be a gap in DirectRunner testing though. It also >>>> makes it hard to test using PAssert, as I believe that puts everything in a >>>> side input, forcing an encoding/decoding. >>>> >>>> On Wed, Jun 2, 2021 at 8:12 AM Brian Hulette <bhule...@google.com> >>>> wrote: >>>> >>>>> +dev <d...@beam.apache.org> >>>>> >>>>> > I bet the DirectRunner is encoding and decoding in between, which >>>>> fixes the object. >>>>> >>>>> Do we need better testing of schema-aware (and potentially other >>>>> built-in) transforms in the face of fusion to root out issues like this? >>>>> >>>>> Brian >>>>> >>>>> On Wed, Jun 2, 2021 at 5:13 AM Matthew Ouyang < >>>>> matthew.ouy...@gmail.com> wrote: >>>>> >>>>>> I have some other work-related things I need to do this week, so I >>>>>> will likely report back on this over the weekend. Thank you for the >>>>>> explanation. It makes perfect sense now. >>>>>> >>>>>> On Tue, Jun 1, 2021 at 11:18 PM Reuven Lax <re...@google.com> wrote: >>>>>> >>>>>>> Some more context - the problem is that RenameFields outputs (in >>>>>>> this case) Java Row objects that are inconsistent with the actual >>>>>>> schema. >>>>>>> For example if you have the following schema: >>>>>>> >>>>>>> Row { >>>>>>> field1: Row { >>>>>>> field2: string >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> And rename field1.field2 -> renamed, you'll get the following schema >>>>>>> >>>>>>> Row { >>>>>>> field1: Row { >>>>>>> renamed: string >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> However the Java object for the _nested_ row will return the old >>>>>>> schema if getSchema() is called on it. This is because we only update >>>>>>> the >>>>>>> schema on the top-level row. >>>>>>> >>>>>>> I think this explains why your test works in the direct runner. If >>>>>>> the row ever goes through an encode/decode path, it will come back >>>>>>> correct. >>>>>>> The original incorrect Java objects are no longer around, and new >>>>>>> (consistent) objects are constructed from the raw data and the >>>>>>> PCollection >>>>>>> schema. Dataflow tends to fuse ParDos together, so the following ParDo >>>>>>> will >>>>>>> see the incorrect Row object. I bet the DirectRunner is encoding and >>>>>>> decoding in between, which fixes the object. >>>>>>> >>>>>>> You can validate this theory by forcing a shuffle after RenameFields >>>>>>> using Reshufflle. It should fix the issue If it does, let me know and >>>>>>> I'll >>>>>>> work on a fix to RenameFields. >>>>>>> >>>>>>> On Tue, Jun 1, 2021 at 7:39 PM Reuven Lax <re...@google.com> wrote: >>>>>>> >>>>>>>> Aha, yes this indeed another bug in the transform. The schema is >>>>>>>> set on the top-level Row but not on any nested rows. >>>>>>>> >>>>>>>> On Tue, Jun 1, 2021 at 6:37 PM Matthew Ouyang < >>>>>>>> matthew.ouy...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Thank you everyone for your input. I believe it will be easiest >>>>>>>>> to respond to all feedback in a single message rather than messages >>>>>>>>> per >>>>>>>>> person. >>>>>>>>> >>>>>>>>> - NeedsRunner - The tests are run eventually, so obviously all >>>>>>>>> good on my end. I was trying to run the smallest subset of test >>>>>>>>> cases >>>>>>>>> possible and didn't venture beyond `gradle test`. >>>>>>>>> - Stack Trace - There wasn't any unfortunately because no >>>>>>>>> exception thrown in the code. The Beam Row was translated into a >>>>>>>>> BQ >>>>>>>>> TableRow and an insertion was attempted. The error "message" was >>>>>>>>> part of >>>>>>>>> the response JSON that came back as a result of a request against >>>>>>>>> the BQ >>>>>>>>> API. >>>>>>>>> - Desired Behaviour - (field0_1.field1_0, nestedStringField) >>>>>>>>> -> field0_1.nestedStringField is what I am looking for. >>>>>>>>> - Info Logging Findings (In Lieu of a Stack Trace) >>>>>>>>> - The Beam Schema was as expected with all renames applied. >>>>>>>>> - The example I provided was heavily stripped down in order >>>>>>>>> to isolate the problem. My work example which a bit >>>>>>>>> impractical because >>>>>>>>> it's part of some generic tooling has 4 levels of nesting and >>>>>>>>> also produces >>>>>>>>> the correct output too. >>>>>>>>> - BigQueryUtils.toTableRow(Row) returns the expected >>>>>>>>> TableRow in DirectRunner. In DataflowRunner however, only the >>>>>>>>> top-level >>>>>>>>> renames were reflected in the TableRow and all renames in the >>>>>>>>> nested fields >>>>>>>>> weren't. >>>>>>>>> - BigQueryUtils.toTableRow(Row) recurses on the Row values >>>>>>>>> and uses the Row.schema to get the field names. This makes >>>>>>>>> sense to me, >>>>>>>>> but if a value is actually a Row then its schema appears to be >>>>>>>>> inconsistent >>>>>>>>> with the top-level schema >>>>>>>>> - My Current Workaround - I forked RenameFields and replaced >>>>>>>>> the attachValues in expand method to be a "deep" rename. This is >>>>>>>>> obviously >>>>>>>>> inefficient and I will not be submitting a PR for that. >>>>>>>>> - JIRA ticket - >>>>>>>>> https://issues.apache.org/jira/browse/BEAM-12442 >>>>>>>>> >>>>>>>>> >>>>>>>>> On Tue, Jun 1, 2021 at 5:51 PM Reuven Lax <re...@google.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> This transform is the same across all runners. A few comments on >>>>>>>>>> the test: >>>>>>>>>> >>>>>>>>>> - Using attachValues directly is error prone (per the comment >>>>>>>>>> on the method). I recommend using the withFieldValue builders >>>>>>>>>> instead. >>>>>>>>>> - I recommend capturing the RenameFields PCollection into a >>>>>>>>>> local variable of type PCollection<Row> and printing out the schema >>>>>>>>>> (which >>>>>>>>>> you can get using the PCollection.getSchema method) to ensure that >>>>>>>>>> the >>>>>>>>>> output schema looks like you expect. >>>>>>>>>> - RenameFields doesn't flatten. So renaming field0_1.field1_0 >>>>>>>>>> - > nestedStringField results in field0_1.nestedStringField; if you >>>>>>>>>> wanted >>>>>>>>>> to flatten, then the better transform would be >>>>>>>>>> Select.fieldNameAs("field0_1.field1_0", nestedStringField). >>>>>>>>>> >>>>>>>>>> This all being said, eyeballing the implementation of >>>>>>>>>> RenameFields makes me think that it is buggy in the case where you >>>>>>>>>> specify >>>>>>>>>> a top-level field multiple times like you do. I think it is simply >>>>>>>>>> adding the top-level field into the output schema multiple times, >>>>>>>>>> and the >>>>>>>>>> second time is with the field0_1 base name; I have no idea why your >>>>>>>>>> test >>>>>>>>>> doesn't catch this in the DirectRunner, as it's equally broken >>>>>>>>>> there. Could >>>>>>>>>> you file a JIRA about this issue and assign it to me? >>>>>>>>>> >>>>>>>>>> Reuven >>>>>>>>>> >>>>>>>>>> On Tue, Jun 1, 2021 at 12:47 PM Kenneth Knowles <k...@apache.org> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Tue, Jun 1, 2021 at 12:42 PM Brian Hulette < >>>>>>>>>>> bhule...@google.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Matthew, >>>>>>>>>>>> >>>>>>>>>>>> > The unit tests also seem to be disabled for this as well and >>>>>>>>>>>> so I don’t know if the PTransform behaves as expected. >>>>>>>>>>>> >>>>>>>>>>>> The exclusion for NeedsRunner tests is just a quirk in our >>>>>>>>>>>> testing framework. NeedsRunner indicates that a test suite can't be >>>>>>>>>>>> executed with the SDK alone, it needs a runner. So that exclusion >>>>>>>>>>>> just >>>>>>>>>>>> makes sure we don't run the test when we're verifying the SDK by >>>>>>>>>>>> itself in >>>>>>>>>>>> the :sdks:java:core:test task. The test is still run in other >>>>>>>>>>>> tasks where >>>>>>>>>>>> we have a runner, most notably in the Java PreCommit [1], where we >>>>>>>>>>>> run it >>>>>>>>>>>> as part of the :runners:direct-java:test task. >>>>>>>>>>>> >>>>>>>>>>>> That being said, we may only run these tests continuously with >>>>>>>>>>>> the DirectRunner, I'm not sure if we test them on all the runners >>>>>>>>>>>> like we >>>>>>>>>>>> do with ValidatesRunner tests. >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> That is correct. The tests are tests _of the transform_ so they >>>>>>>>>>> run only on the DirectRunner. They are not tests of the runner, >>>>>>>>>>> which is >>>>>>>>>>> only responsible for correctly implementing Beam's primitives. The >>>>>>>>>>> transform should not behave differently on different runners, >>>>>>>>>>> except for >>>>>>>>>>> fundamental differences in how they schedule work and checkpoint. >>>>>>>>>>> >>>>>>>>>>> Kenn >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> > The error message I’m receiving, : Error while reading data, >>>>>>>>>>>> error message: JSON parsing error in row starting at position 0: >>>>>>>>>>>> No such >>>>>>>>>>>> field: nestedField.field1_0, suggests the BigQuery is trying >>>>>>>>>>>> to use the original name for the nested field and not the >>>>>>>>>>>> substitute name. >>>>>>>>>>>> >>>>>>>>>>>> Is there a stacktrace associated with this error? It would be >>>>>>>>>>>> helpful to see where the error is coming from. >>>>>>>>>>>> >>>>>>>>>>>> Brian >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> [1] >>>>>>>>>>>> https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/4101/testReport/org.apache.beam.sdk.schemas.transforms/RenameFieldsTest/ >>>>>>>>>>>> >>>>>>>>>>>> On Mon, May 31, 2021 at 5:02 PM Matthew Ouyang < >>>>>>>>>>>> matthew.ouy...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> I’m trying to use the RenameFields transform prior to >>>>>>>>>>>>> inserting into BigQuery on nested fields. Insertion into >>>>>>>>>>>>> BigQuery is >>>>>>>>>>>>> successful with DirectRunner, but DataflowRunner has an issue >>>>>>>>>>>>> with renamed >>>>>>>>>>>>> nested fields The error message I’m receiving, : Error while >>>>>>>>>>>>> reading data, error message: JSON parsing error in row starting >>>>>>>>>>>>> at position >>>>>>>>>>>>> 0: No such field: nestedField.field1_0, suggests the BigQuery >>>>>>>>>>>>> is trying to use the original name for the nested field and not >>>>>>>>>>>>> the >>>>>>>>>>>>> substitute name. >>>>>>>>>>>>> >>>>>>>>>>>>> The code for RenameFields seems simple enough but does it >>>>>>>>>>>>> behave differently in different runners? Will a deep >>>>>>>>>>>>> attachValues be >>>>>>>>>>>>> necessary in order get the nested renames to work across all >>>>>>>>>>>>> runners? Is >>>>>>>>>>>>> there something wrong in my code? >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/RenameFields.java#L186 >>>>>>>>>>>>> >>>>>>>>>>>>> The unit tests also seem to be disabled for this as well and >>>>>>>>>>>>> so I don’t know if the PTransform behaves as expected. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/build.gradle#L67 >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/RenameFieldsTest.java >>>>>>>>>>>>> >>>>>>>>>>>>> package ca.loblaw.cerebro.PipelineControl; >>>>>>>>>>>>>> >>>>>>>>>>>>>> import com.google.api.services.bigquery.model.TableReference; >>>>>>>>>>>>>> import >>>>>>>>>>>>>> org.apache.beam.runners.dataflow.options.DataflowPipelineOptions >>>>>>>>>>>>>> ; >>>>>>>>>>>>>> import org.apache.beam.sdk.Pipeline; >>>>>>>>>>>>>> import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; >>>>>>>>>>>>>> import org.apache.beam.sdk.options.PipelineOptionsFactory; >>>>>>>>>>>>>> import org.apache.beam.sdk.schemas.Schema; >>>>>>>>>>>>>> import org.apache.beam.sdk.schemas.transforms.RenameFields; >>>>>>>>>>>>>> import org.apache.beam.sdk.transforms.Create; >>>>>>>>>>>>>> import org.apache.beam.sdk.values.Row; >>>>>>>>>>>>>> >>>>>>>>>>>>>> import java.io.File; >>>>>>>>>>>>>> import java.util.Arrays; >>>>>>>>>>>>>> import java.util.HashSet; >>>>>>>>>>>>>> import java.util.stream.Collectors; >>>>>>>>>>>>>> >>>>>>>>>>>>>> import static java.util.Arrays.*asList*; >>>>>>>>>>>>>> >>>>>>>>>>>>>> public class BQRenameFields { >>>>>>>>>>>>>> public static void main(String[] args) { >>>>>>>>>>>>>> PipelineOptionsFactory.*register*( >>>>>>>>>>>>>> DataflowPipelineOptions.class); >>>>>>>>>>>>>> DataflowPipelineOptions options = >>>>>>>>>>>>>> PipelineOptionsFactory.*fromArgs*(args).as( >>>>>>>>>>>>>> DataflowPipelineOptions.class); >>>>>>>>>>>>>> options.setFilesToStage( >>>>>>>>>>>>>> Arrays.*stream*(System.*getProperty*( >>>>>>>>>>>>>> "java.class.path"). >>>>>>>>>>>>>> split(File.*pathSeparator*)). >>>>>>>>>>>>>> map(entry -> (new >>>>>>>>>>>>>> File(entry)).toString()).collect(Collectors.*toList*())); >>>>>>>>>>>>>> >>>>>>>>>>>>>> Pipeline pipeline = Pipeline.*create*(options); >>>>>>>>>>>>>> >>>>>>>>>>>>>> Schema nestedSchema = Schema.*builder*().addField( >>>>>>>>>>>>>> Schema.Field.*nullable*("field1_0", Schema.FieldType.*STRING* >>>>>>>>>>>>>> )).build(); >>>>>>>>>>>>>> Schema.Field field = Schema.Field.*nullable*( >>>>>>>>>>>>>> "field0_0", Schema.FieldType.*STRING*); >>>>>>>>>>>>>> Schema.Field nested = Schema.Field.*nullable*( >>>>>>>>>>>>>> "field0_1", Schema.FieldType.*row*(nestedSchema)); >>>>>>>>>>>>>> Schema.Field runner = Schema.Field.*nullable*( >>>>>>>>>>>>>> "field0_2", Schema.FieldType.*STRING*); >>>>>>>>>>>>>> Schema rowSchema = Schema.*builder*() >>>>>>>>>>>>>> .addFields(field, nested, runner) >>>>>>>>>>>>>> .build(); >>>>>>>>>>>>>> Row testRow = Row.*withSchema*(rowSchema >>>>>>>>>>>>>> ).attachValues("value0_0", Row.*withSchema*(nestedSchema >>>>>>>>>>>>>> ).attachValues("value1_0"), options.getRunner().toString()); >>>>>>>>>>>>>> pipeline >>>>>>>>>>>>>> .apply(Create.*of*(testRow).withRowSchema( >>>>>>>>>>>>>> rowSchema)) >>>>>>>>>>>>>> .apply(RenameFields.<Row>*create*() >>>>>>>>>>>>>> .rename("field0_0", "stringField") >>>>>>>>>>>>>> .rename("field0_1", "nestedField") >>>>>>>>>>>>>> .rename("field0_1.field1_0", >>>>>>>>>>>>>> "nestedStringField") >>>>>>>>>>>>>> .rename("field0_2", "runner")) >>>>>>>>>>>>>> .apply(BigQueryIO.<Row>*write*() >>>>>>>>>>>>>> .to(new >>>>>>>>>>>>>> TableReference().setProjectId("lt-dia-lake-exp-raw" >>>>>>>>>>>>>> ).setDatasetId("prototypes").setTableId("matto_renameFields" >>>>>>>>>>>>>> )) >>>>>>>>>>>>>> .withCreateDisposition(BigQueryIO. >>>>>>>>>>>>>> Write.CreateDisposition.*CREATE_IF_NEEDED*) >>>>>>>>>>>>>> .withWriteDisposition(BigQueryIO. >>>>>>>>>>>>>> Write.WriteDisposition.*WRITE_APPEND*) >>>>>>>>>>>>>> .withSchemaUpdateOptions(new >>>>>>>>>>>>>> HashSet<>(*asList*(BigQueryIO.Write.SchemaUpdateOption. >>>>>>>>>>>>>> *ALLOW_FIELD_ADDITION*))) >>>>>>>>>>>>>> .useBeamSchema()); >>>>>>>>>>>>>> pipeline.run(); >>>>>>>>>>>>>> } >>>>>>>>>>>>>> } >>>>>>>>>>>>>> >>>>>>>>>>>>>