> One thing that's been on the back burner for a long time is making CoderProperties into a CoderTester like Guava's EqualityTester.
Reuven's point still applies here though. This issue is not due to a bug in SchemaCoder, it's a problem with the Row we gave SchemaCoder to encode. I'm assuming a CoderTester would require manually generating inputs right? These input Rows represent an illegal state that we wouldn't test with. (That being said I like the idea of a CoderTester in general) Brian On Wed, Jun 2, 2021 at 12:11 PM Kenneth Knowles <k...@apache.org> wrote: > Mutability checking might catch that. > > I meant to suggest not putting the check in the pipeline, but offering a > testing discipline that will catch such issues. One thing that's been on > the back burner for a long time is making CoderProperties into a > CoderTester like Guava's EqualityTester. Then it can run through all the > properties without a user setting up test suites. Downside is that the test > failure signal gets aggregated. > > Kenn > > On Wed, Jun 2, 2021 at 12:09 PM Brian Hulette <bhule...@google.com> wrote: > >> Could the DirectRunner just do an equality check whenever it does an >> encode/decode? It sounds like it's already effectively performing >> a CoderProperties.coderDecodeEncodeEqual for every element, just omitting >> the equality check. >> >> On Wed, Jun 2, 2021 at 12:04 PM Reuven Lax <re...@google.com> wrote: >> >>> There is no bug in the Coder itself, so that wouldn't catch it. We could >>> insert CoderProperties.coderDecodeEncodeEqual in a subsequent ParDo, but if >>> the Direct runner already does an encode/decode before that ParDo, then >>> that would have fixed the problem before we could see it. >>> >>> On Wed, Jun 2, 2021 at 11:53 AM Kenneth Knowles <k...@apache.org> wrote: >>> >>>> Would it be caught by CoderProperties? >>>> >>>> Kenn >>>> >>>> On Wed, Jun 2, 2021 at 8:16 AM Reuven Lax <re...@google.com> wrote: >>>> >>>>> I don't think this bug is schema specific - we created a Java object >>>>> that is inconsistent with its encoded form, which could happen to any >>>>> transform. >>>>> >>>>> This does seem to be a gap in DirectRunner testing though. It also >>>>> makes it hard to test using PAssert, as I believe that puts everything in >>>>> a >>>>> side input, forcing an encoding/decoding. >>>>> >>>>> On Wed, Jun 2, 2021 at 8:12 AM Brian Hulette <bhule...@google.com> >>>>> wrote: >>>>> >>>>>> +dev <d...@beam.apache.org> >>>>>> >>>>>> > I bet the DirectRunner is encoding and decoding in between, which >>>>>> fixes the object. >>>>>> >>>>>> Do we need better testing of schema-aware (and potentially other >>>>>> built-in) transforms in the face of fusion to root out issues like this? >>>>>> >>>>>> Brian >>>>>> >>>>>> On Wed, Jun 2, 2021 at 5:13 AM Matthew Ouyang < >>>>>> matthew.ouy...@gmail.com> wrote: >>>>>> >>>>>>> I have some other work-related things I need to do this week, so I >>>>>>> will likely report back on this over the weekend. Thank you for the >>>>>>> explanation. It makes perfect sense now. >>>>>>> >>>>>>> On Tue, Jun 1, 2021 at 11:18 PM Reuven Lax <re...@google.com> wrote: >>>>>>> >>>>>>>> Some more context - the problem is that RenameFields outputs (in >>>>>>>> this case) Java Row objects that are inconsistent with the actual >>>>>>>> schema. >>>>>>>> For example if you have the following schema: >>>>>>>> >>>>>>>> Row { >>>>>>>> field1: Row { >>>>>>>> field2: string >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> And rename field1.field2 -> renamed, you'll get the following schema >>>>>>>> >>>>>>>> Row { >>>>>>>> field1: Row { >>>>>>>> renamed: string >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> However the Java object for the _nested_ row will return the old >>>>>>>> schema if getSchema() is called on it. This is because we only update >>>>>>>> the >>>>>>>> schema on the top-level row. >>>>>>>> >>>>>>>> I think this explains why your test works in the direct runner. If >>>>>>>> the row ever goes through an encode/decode path, it will come back >>>>>>>> correct. >>>>>>>> The original incorrect Java objects are no longer around, and new >>>>>>>> (consistent) objects are constructed from the raw data and the >>>>>>>> PCollection >>>>>>>> schema. Dataflow tends to fuse ParDos together, so the following ParDo >>>>>>>> will >>>>>>>> see the incorrect Row object. I bet the DirectRunner is encoding and >>>>>>>> decoding in between, which fixes the object. >>>>>>>> >>>>>>>> You can validate this theory by forcing a shuffle after >>>>>>>> RenameFields using Reshufflle. It should fix the issue If it does, let >>>>>>>> me >>>>>>>> know and I'll work on a fix to RenameFields. >>>>>>>> >>>>>>>> On Tue, Jun 1, 2021 at 7:39 PM Reuven Lax <re...@google.com> wrote: >>>>>>>> >>>>>>>>> Aha, yes this indeed another bug in the transform. The schema is >>>>>>>>> set on the top-level Row but not on any nested rows. >>>>>>>>> >>>>>>>>> On Tue, Jun 1, 2021 at 6:37 PM Matthew Ouyang < >>>>>>>>> matthew.ouy...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Thank you everyone for your input. I believe it will be easiest >>>>>>>>>> to respond to all feedback in a single message rather than messages >>>>>>>>>> per >>>>>>>>>> person. >>>>>>>>>> >>>>>>>>>> - NeedsRunner - The tests are run eventually, so obviously >>>>>>>>>> all good on my end. I was trying to run the smallest subset of >>>>>>>>>> test cases >>>>>>>>>> possible and didn't venture beyond `gradle test`. >>>>>>>>>> - Stack Trace - There wasn't any unfortunately because no >>>>>>>>>> exception thrown in the code. The Beam Row was translated into a >>>>>>>>>> BQ >>>>>>>>>> TableRow and an insertion was attempted. The error "message" was >>>>>>>>>> part of >>>>>>>>>> the response JSON that came back as a result of a request against >>>>>>>>>> the BQ >>>>>>>>>> API. >>>>>>>>>> - Desired Behaviour - (field0_1.field1_0, nestedStringField) >>>>>>>>>> -> field0_1.nestedStringField is what I am looking for. >>>>>>>>>> - Info Logging Findings (In Lieu of a Stack Trace) >>>>>>>>>> - The Beam Schema was as expected with all renames applied. >>>>>>>>>> - The example I provided was heavily stripped down in >>>>>>>>>> order to isolate the problem. My work example which a bit >>>>>>>>>> impractical >>>>>>>>>> because it's part of some generic tooling has 4 levels of >>>>>>>>>> nesting and also >>>>>>>>>> produces the correct output too. >>>>>>>>>> - BigQueryUtils.toTableRow(Row) returns the expected >>>>>>>>>> TableRow in DirectRunner. In DataflowRunner however, only the >>>>>>>>>> top-level >>>>>>>>>> renames were reflected in the TableRow and all renames in the >>>>>>>>>> nested fields >>>>>>>>>> weren't. >>>>>>>>>> - BigQueryUtils.toTableRow(Row) recurses on the Row values >>>>>>>>>> and uses the Row.schema to get the field names. This makes >>>>>>>>>> sense to me, >>>>>>>>>> but if a value is actually a Row then its schema appears to be >>>>>>>>>> inconsistent >>>>>>>>>> with the top-level schema >>>>>>>>>> - My Current Workaround - I forked RenameFields and replaced >>>>>>>>>> the attachValues in expand method to be a "deep" rename. This is >>>>>>>>>> obviously >>>>>>>>>> inefficient and I will not be submitting a PR for that. >>>>>>>>>> - JIRA ticket - >>>>>>>>>> https://issues.apache.org/jira/browse/BEAM-12442 >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Tue, Jun 1, 2021 at 5:51 PM Reuven Lax <re...@google.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> This transform is the same across all runners. A few comments on >>>>>>>>>>> the test: >>>>>>>>>>> >>>>>>>>>>> - Using attachValues directly is error prone (per the comment >>>>>>>>>>> on the method). I recommend using the withFieldValue builders >>>>>>>>>>> instead. >>>>>>>>>>> - I recommend capturing the RenameFields PCollection into a >>>>>>>>>>> local variable of type PCollection<Row> and printing out the schema >>>>>>>>>>> (which >>>>>>>>>>> you can get using the PCollection.getSchema method) to ensure that >>>>>>>>>>> the >>>>>>>>>>> output schema looks like you expect. >>>>>>>>>>> - RenameFields doesn't flatten. So renaming field0_1.field1_0 >>>>>>>>>>> - > nestedStringField results in field0_1.nestedStringField; if you >>>>>>>>>>> wanted >>>>>>>>>>> to flatten, then the better transform would be >>>>>>>>>>> Select.fieldNameAs("field0_1.field1_0", nestedStringField). >>>>>>>>>>> >>>>>>>>>>> This all being said, eyeballing the implementation of >>>>>>>>>>> RenameFields makes me think that it is buggy in the case where you >>>>>>>>>>> specify >>>>>>>>>>> a top-level field multiple times like you do. I think it is simply >>>>>>>>>>> adding the top-level field into the output schema multiple times, >>>>>>>>>>> and the >>>>>>>>>>> second time is with the field0_1 base name; I have no idea why your >>>>>>>>>>> test >>>>>>>>>>> doesn't catch this in the DirectRunner, as it's equally broken >>>>>>>>>>> there. Could >>>>>>>>>>> you file a JIRA about this issue and assign it to me? >>>>>>>>>>> >>>>>>>>>>> Reuven >>>>>>>>>>> >>>>>>>>>>> On Tue, Jun 1, 2021 at 12:47 PM Kenneth Knowles <k...@apache.org> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Tue, Jun 1, 2021 at 12:42 PM Brian Hulette < >>>>>>>>>>>> bhule...@google.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Matthew, >>>>>>>>>>>>> >>>>>>>>>>>>> > The unit tests also seem to be disabled for this as well and >>>>>>>>>>>>> so I don’t know if the PTransform behaves as expected. >>>>>>>>>>>>> >>>>>>>>>>>>> The exclusion for NeedsRunner tests is just a quirk in our >>>>>>>>>>>>> testing framework. NeedsRunner indicates that a test suite can't >>>>>>>>>>>>> be >>>>>>>>>>>>> executed with the SDK alone, it needs a runner. So that exclusion >>>>>>>>>>>>> just >>>>>>>>>>>>> makes sure we don't run the test when we're verifying the SDK by >>>>>>>>>>>>> itself in >>>>>>>>>>>>> the :sdks:java:core:test task. The test is still run in other >>>>>>>>>>>>> tasks where >>>>>>>>>>>>> we have a runner, most notably in the Java PreCommit [1], where >>>>>>>>>>>>> we run it >>>>>>>>>>>>> as part of the :runners:direct-java:test task. >>>>>>>>>>>>> >>>>>>>>>>>>> That being said, we may only run these tests continuously with >>>>>>>>>>>>> the DirectRunner, I'm not sure if we test them on all the runners >>>>>>>>>>>>> like we >>>>>>>>>>>>> do with ValidatesRunner tests. >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> That is correct. The tests are tests _of the transform_ so they >>>>>>>>>>>> run only on the DirectRunner. They are not tests of the runner, >>>>>>>>>>>> which is >>>>>>>>>>>> only responsible for correctly implementing Beam's primitives. The >>>>>>>>>>>> transform should not behave differently on different runners, >>>>>>>>>>>> except for >>>>>>>>>>>> fundamental differences in how they schedule work and checkpoint. >>>>>>>>>>>> >>>>>>>>>>>> Kenn >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>> > The error message I’m receiving, : Error while reading >>>>>>>>>>>>> data, error message: JSON parsing error in row starting at >>>>>>>>>>>>> position 0: No >>>>>>>>>>>>> such field: nestedField.field1_0, suggests the BigQuery is >>>>>>>>>>>>> trying to use the original name for the nested field and not the >>>>>>>>>>>>> substitute >>>>>>>>>>>>> name. >>>>>>>>>>>>> >>>>>>>>>>>>> Is there a stacktrace associated with this error? It would be >>>>>>>>>>>>> helpful to see where the error is coming from. >>>>>>>>>>>>> >>>>>>>>>>>>> Brian >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> [1] >>>>>>>>>>>>> https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/4101/testReport/org.apache.beam.sdk.schemas.transforms/RenameFieldsTest/ >>>>>>>>>>>>> >>>>>>>>>>>>> On Mon, May 31, 2021 at 5:02 PM Matthew Ouyang < >>>>>>>>>>>>> matthew.ouy...@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> I’m trying to use the RenameFields transform prior to >>>>>>>>>>>>>> inserting into BigQuery on nested fields. Insertion into >>>>>>>>>>>>>> BigQuery is >>>>>>>>>>>>>> successful with DirectRunner, but DataflowRunner has an issue >>>>>>>>>>>>>> with renamed >>>>>>>>>>>>>> nested fields The error message I’m receiving, : Error >>>>>>>>>>>>>> while reading data, error message: JSON parsing error in row >>>>>>>>>>>>>> starting at >>>>>>>>>>>>>> position 0: No such field: nestedField.field1_0, suggests >>>>>>>>>>>>>> the BigQuery is trying to use the original name for the nested >>>>>>>>>>>>>> field and >>>>>>>>>>>>>> not the substitute name. >>>>>>>>>>>>>> >>>>>>>>>>>>>> The code for RenameFields seems simple enough but does it >>>>>>>>>>>>>> behave differently in different runners? Will a deep >>>>>>>>>>>>>> attachValues be >>>>>>>>>>>>>> necessary in order get the nested renames to work across all >>>>>>>>>>>>>> runners? Is >>>>>>>>>>>>>> there something wrong in my code? >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/RenameFields.java#L186 >>>>>>>>>>>>>> >>>>>>>>>>>>>> The unit tests also seem to be disabled for this as well and >>>>>>>>>>>>>> so I don’t know if the PTransform behaves as expected. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/build.gradle#L67 >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/RenameFieldsTest.java >>>>>>>>>>>>>> >>>>>>>>>>>>>> package ca.loblaw.cerebro.PipelineControl; >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> import com.google.api.services.bigquery.model.TableReference >>>>>>>>>>>>>>> ; >>>>>>>>>>>>>>> import >>>>>>>>>>>>>>> org.apache.beam.runners.dataflow.options.DataflowPipelineOptions >>>>>>>>>>>>>>> ; >>>>>>>>>>>>>>> import org.apache.beam.sdk.Pipeline; >>>>>>>>>>>>>>> import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; >>>>>>>>>>>>>>> import org.apache.beam.sdk.options.PipelineOptionsFactory; >>>>>>>>>>>>>>> import org.apache.beam.sdk.schemas.Schema; >>>>>>>>>>>>>>> import org.apache.beam.sdk.schemas.transforms.RenameFields; >>>>>>>>>>>>>>> import org.apache.beam.sdk.transforms.Create; >>>>>>>>>>>>>>> import org.apache.beam.sdk.values.Row; >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> import java.io.File; >>>>>>>>>>>>>>> import java.util.Arrays; >>>>>>>>>>>>>>> import java.util.HashSet; >>>>>>>>>>>>>>> import java.util.stream.Collectors; >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> import static java.util.Arrays.*asList*; >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> public class BQRenameFields { >>>>>>>>>>>>>>> public static void main(String[] args) { >>>>>>>>>>>>>>> PipelineOptionsFactory.*register*( >>>>>>>>>>>>>>> DataflowPipelineOptions.class); >>>>>>>>>>>>>>> DataflowPipelineOptions options = >>>>>>>>>>>>>>> PipelineOptionsFactory.*fromArgs*(args).as( >>>>>>>>>>>>>>> DataflowPipelineOptions.class); >>>>>>>>>>>>>>> options.setFilesToStage( >>>>>>>>>>>>>>> Arrays.*stream*(System.*getProperty*( >>>>>>>>>>>>>>> "java.class.path"). >>>>>>>>>>>>>>> split(File.*pathSeparator*)). >>>>>>>>>>>>>>> map(entry -> (new >>>>>>>>>>>>>>> File(entry)).toString()).collect(Collectors.*toList*())); >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Pipeline pipeline = Pipeline.*create*(options); >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Schema nestedSchema = Schema.*builder*().addField( >>>>>>>>>>>>>>> Schema.Field.*nullable*("field1_0", Schema.FieldType. >>>>>>>>>>>>>>> *STRING*)).build(); >>>>>>>>>>>>>>> Schema.Field field = Schema.Field.*nullable*( >>>>>>>>>>>>>>> "field0_0", Schema.FieldType.*STRING*); >>>>>>>>>>>>>>> Schema.Field nested = Schema.Field.*nullable*( >>>>>>>>>>>>>>> "field0_1", Schema.FieldType.*row*(nestedSchema)); >>>>>>>>>>>>>>> Schema.Field runner = Schema.Field.*nullable*( >>>>>>>>>>>>>>> "field0_2", Schema.FieldType.*STRING*); >>>>>>>>>>>>>>> Schema rowSchema = Schema.*builder*() >>>>>>>>>>>>>>> .addFields(field, nested, runner) >>>>>>>>>>>>>>> .build(); >>>>>>>>>>>>>>> Row testRow = Row.*withSchema*(rowSchema >>>>>>>>>>>>>>> ).attachValues("value0_0", Row.*withSchema*(nestedSchema >>>>>>>>>>>>>>> ).attachValues("value1_0"), options.getRunner().toString()); >>>>>>>>>>>>>>> pipeline >>>>>>>>>>>>>>> .apply(Create.*of*(testRow).withRowSchema( >>>>>>>>>>>>>>> rowSchema)) >>>>>>>>>>>>>>> .apply(RenameFields.<Row>*create*() >>>>>>>>>>>>>>> .rename("field0_0", "stringField") >>>>>>>>>>>>>>> .rename("field0_1", "nestedField") >>>>>>>>>>>>>>> .rename("field0_1.field1_0", >>>>>>>>>>>>>>> "nestedStringField") >>>>>>>>>>>>>>> .rename("field0_2", "runner")) >>>>>>>>>>>>>>> .apply(BigQueryIO.<Row>*write*() >>>>>>>>>>>>>>> .to(new >>>>>>>>>>>>>>> TableReference().setProjectId("lt-dia-lake-exp-raw" >>>>>>>>>>>>>>> ).setDatasetId("prototypes").setTableId("matto_renameFields" >>>>>>>>>>>>>>> )) >>>>>>>>>>>>>>> .withCreateDisposition(BigQueryIO. >>>>>>>>>>>>>>> Write.CreateDisposition.*CREATE_IF_NEEDED*) >>>>>>>>>>>>>>> .withWriteDisposition(BigQueryIO. >>>>>>>>>>>>>>> Write.WriteDisposition.*WRITE_APPEND*) >>>>>>>>>>>>>>> .withSchemaUpdateOptions(new >>>>>>>>>>>>>>> HashSet<>(*asList*(BigQueryIO.Write.SchemaUpdateOption. >>>>>>>>>>>>>>> *ALLOW_FIELD_ADDITION*))) >>>>>>>>>>>>>>> .useBeamSchema()); >>>>>>>>>>>>>>> pipeline.run(); >>>>>>>>>>>>>>> } >>>>>>>>>>>>>>> } >>>>>>>>>>>>>>> >>>>>>>>>>>>>>