Could the DirectRunner just do an equality check whenever it does an encode/decode? It sounds like it's already effectively performing a CoderProperties.coderDecodeEncodeEqual for every element, just omitting the equality check.
On Wed, Jun 2, 2021 at 12:04 PM Reuven Lax <re...@google.com> wrote: > There is no bug in the Coder itself, so that wouldn't catch it. We could > insert CoderProperties.coderDecodeEncodeEqual in a subsequent ParDo, but if > the Direct runner already does an encode/decode before that ParDo, then > that would have fixed the problem before we could see it. > > On Wed, Jun 2, 2021 at 11:53 AM Kenneth Knowles <k...@apache.org> wrote: > >> Would it be caught by CoderProperties? >> >> Kenn >> >> On Wed, Jun 2, 2021 at 8:16 AM Reuven Lax <re...@google.com> wrote: >> >>> I don't think this bug is schema specific - we created a Java object >>> that is inconsistent with its encoded form, which could happen to any >>> transform. >>> >>> This does seem to be a gap in DirectRunner testing though. It also makes >>> it hard to test using PAssert, as I believe that puts everything in a side >>> input, forcing an encoding/decoding. >>> >>> On Wed, Jun 2, 2021 at 8:12 AM Brian Hulette <bhule...@google.com> >>> wrote: >>> >>>> +dev <d...@beam.apache.org> >>>> >>>> > I bet the DirectRunner is encoding and decoding in between, which >>>> fixes the object. >>>> >>>> Do we need better testing of schema-aware (and potentially other >>>> built-in) transforms in the face of fusion to root out issues like this? >>>> >>>> Brian >>>> >>>> On Wed, Jun 2, 2021 at 5:13 AM Matthew Ouyang <matthew.ouy...@gmail.com> >>>> wrote: >>>> >>>>> I have some other work-related things I need to do this week, so I >>>>> will likely report back on this over the weekend. Thank you for the >>>>> explanation. It makes perfect sense now. >>>>> >>>>> On Tue, Jun 1, 2021 at 11:18 PM Reuven Lax <re...@google.com> wrote: >>>>> >>>>>> Some more context - the problem is that RenameFields outputs (in this >>>>>> case) Java Row objects that are inconsistent with the actual schema. >>>>>> For example if you have the following schema: >>>>>> >>>>>> Row { >>>>>> field1: Row { >>>>>> field2: string >>>>>> } >>>>>> } >>>>>> >>>>>> And rename field1.field2 -> renamed, you'll get the following schema >>>>>> >>>>>> Row { >>>>>> field1: Row { >>>>>> renamed: string >>>>>> } >>>>>> } >>>>>> >>>>>> However the Java object for the _nested_ row will return the old >>>>>> schema if getSchema() is called on it. This is because we only update the >>>>>> schema on the top-level row. >>>>>> >>>>>> I think this explains why your test works in the direct runner. If >>>>>> the row ever goes through an encode/decode path, it will come back >>>>>> correct. >>>>>> The original incorrect Java objects are no longer around, and new >>>>>> (consistent) objects are constructed from the raw data and the >>>>>> PCollection >>>>>> schema. Dataflow tends to fuse ParDos together, so the following ParDo >>>>>> will >>>>>> see the incorrect Row object. I bet the DirectRunner is encoding and >>>>>> decoding in between, which fixes the object. >>>>>> >>>>>> You can validate this theory by forcing a shuffle after RenameFields >>>>>> using Reshufflle. It should fix the issue If it does, let me know and >>>>>> I'll >>>>>> work on a fix to RenameFields. >>>>>> >>>>>> On Tue, Jun 1, 2021 at 7:39 PM Reuven Lax <re...@google.com> wrote: >>>>>> >>>>>>> Aha, yes this indeed another bug in the transform. The schema is set >>>>>>> on the top-level Row but not on any nested rows. >>>>>>> >>>>>>> On Tue, Jun 1, 2021 at 6:37 PM Matthew Ouyang < >>>>>>> matthew.ouy...@gmail.com> wrote: >>>>>>> >>>>>>>> Thank you everyone for your input. I believe it will be easiest to >>>>>>>> respond to all feedback in a single message rather than messages per >>>>>>>> person. >>>>>>>> >>>>>>>> - NeedsRunner - The tests are run eventually, so obviously all >>>>>>>> good on my end. I was trying to run the smallest subset of test >>>>>>>> cases >>>>>>>> possible and didn't venture beyond `gradle test`. >>>>>>>> - Stack Trace - There wasn't any unfortunately because no >>>>>>>> exception thrown in the code. The Beam Row was translated into a BQ >>>>>>>> TableRow and an insertion was attempted. The error "message" was >>>>>>>> part of >>>>>>>> the response JSON that came back as a result of a request against >>>>>>>> the BQ >>>>>>>> API. >>>>>>>> - Desired Behaviour - (field0_1.field1_0, nestedStringField) -> >>>>>>>> field0_1.nestedStringField is what I am looking for. >>>>>>>> - Info Logging Findings (In Lieu of a Stack Trace) >>>>>>>> - The Beam Schema was as expected with all renames applied. >>>>>>>> - The example I provided was heavily stripped down in order >>>>>>>> to isolate the problem. My work example which a bit impractical >>>>>>>> because >>>>>>>> it's part of some generic tooling has 4 levels of nesting and >>>>>>>> also produces >>>>>>>> the correct output too. >>>>>>>> - BigQueryUtils.toTableRow(Row) returns the expected >>>>>>>> TableRow in DirectRunner. In DataflowRunner however, only the >>>>>>>> top-level >>>>>>>> renames were reflected in the TableRow and all renames in the >>>>>>>> nested fields >>>>>>>> weren't. >>>>>>>> - BigQueryUtils.toTableRow(Row) recurses on the Row values >>>>>>>> and uses the Row.schema to get the field names. This makes >>>>>>>> sense to me, >>>>>>>> but if a value is actually a Row then its schema appears to be >>>>>>>> inconsistent >>>>>>>> with the top-level schema >>>>>>>> - My Current Workaround - I forked RenameFields and replaced >>>>>>>> the attachValues in expand method to be a "deep" rename. This is >>>>>>>> obviously >>>>>>>> inefficient and I will not be submitting a PR for that. >>>>>>>> - JIRA ticket - https://issues.apache.org/jira/browse/BEAM-12442 >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Jun 1, 2021 at 5:51 PM Reuven Lax <re...@google.com> wrote: >>>>>>>> >>>>>>>>> This transform is the same across all runners. A few comments on >>>>>>>>> the test: >>>>>>>>> >>>>>>>>> - Using attachValues directly is error prone (per the comment on >>>>>>>>> the method). I recommend using the withFieldValue builders instead. >>>>>>>>> - I recommend capturing the RenameFields PCollection into a >>>>>>>>> local variable of type PCollection<Row> and printing out the schema >>>>>>>>> (which >>>>>>>>> you can get using the PCollection.getSchema method) to ensure that the >>>>>>>>> output schema looks like you expect. >>>>>>>>> - RenameFields doesn't flatten. So renaming field0_1.field1_0 - >>>>>>>>> > nestedStringField results in field0_1.nestedStringField; if you >>>>>>>>> > wanted to >>>>>>>>> flatten, then the better transform would be >>>>>>>>> Select.fieldNameAs("field0_1.field1_0", nestedStringField). >>>>>>>>> >>>>>>>>> This all being said, eyeballing the implementation of RenameFields >>>>>>>>> makes me think that it is buggy in the case where you specify a >>>>>>>>> top-level >>>>>>>>> field multiple times like you do. I think it is simply adding the >>>>>>>>> top-level >>>>>>>>> field into the output schema multiple times, and the second time is >>>>>>>>> with >>>>>>>>> the field0_1 base name; I have no idea why your test doesn't catch >>>>>>>>> this in >>>>>>>>> the DirectRunner, as it's equally broken there. Could you file a JIRA >>>>>>>>> about >>>>>>>>> this issue and assign it to me? >>>>>>>>> >>>>>>>>> Reuven >>>>>>>>> >>>>>>>>> On Tue, Jun 1, 2021 at 12:47 PM Kenneth Knowles <k...@apache.org> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Tue, Jun 1, 2021 at 12:42 PM Brian Hulette < >>>>>>>>>> bhule...@google.com> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Matthew, >>>>>>>>>>> >>>>>>>>>>> > The unit tests also seem to be disabled for this as well and >>>>>>>>>>> so I don’t know if the PTransform behaves as expected. >>>>>>>>>>> >>>>>>>>>>> The exclusion for NeedsRunner tests is just a quirk in our >>>>>>>>>>> testing framework. NeedsRunner indicates that a test suite can't be >>>>>>>>>>> executed with the SDK alone, it needs a runner. So that exclusion >>>>>>>>>>> just >>>>>>>>>>> makes sure we don't run the test when we're verifying the SDK by >>>>>>>>>>> itself in >>>>>>>>>>> the :sdks:java:core:test task. The test is still run in other tasks >>>>>>>>>>> where >>>>>>>>>>> we have a runner, most notably in the Java PreCommit [1], where we >>>>>>>>>>> run it >>>>>>>>>>> as part of the :runners:direct-java:test task. >>>>>>>>>>> >>>>>>>>>>> That being said, we may only run these tests continuously with >>>>>>>>>>> the DirectRunner, I'm not sure if we test them on all the runners >>>>>>>>>>> like we >>>>>>>>>>> do with ValidatesRunner tests. >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> That is correct. The tests are tests _of the transform_ so they >>>>>>>>>> run only on the DirectRunner. They are not tests of the runner, >>>>>>>>>> which is >>>>>>>>>> only responsible for correctly implementing Beam's primitives. The >>>>>>>>>> transform should not behave differently on different runners, except >>>>>>>>>> for >>>>>>>>>> fundamental differences in how they schedule work and checkpoint. >>>>>>>>>> >>>>>>>>>> Kenn >>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> > The error message I’m receiving, : Error while reading data, >>>>>>>>>>> error message: JSON parsing error in row starting at position 0: No >>>>>>>>>>> such >>>>>>>>>>> field: nestedField.field1_0, suggests the BigQuery is trying to >>>>>>>>>>> use the original name for the nested field and not the substitute >>>>>>>>>>> name. >>>>>>>>>>> >>>>>>>>>>> Is there a stacktrace associated with this error? It would be >>>>>>>>>>> helpful to see where the error is coming from. >>>>>>>>>>> >>>>>>>>>>> Brian >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> [1] >>>>>>>>>>> https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/4101/testReport/org.apache.beam.sdk.schemas.transforms/RenameFieldsTest/ >>>>>>>>>>> >>>>>>>>>>> On Mon, May 31, 2021 at 5:02 PM Matthew Ouyang < >>>>>>>>>>> matthew.ouy...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> I’m trying to use the RenameFields transform prior to inserting >>>>>>>>>>>> into BigQuery on nested fields. Insertion into BigQuery is >>>>>>>>>>>> successful with >>>>>>>>>>>> DirectRunner, but DataflowRunner has an issue with renamed nested >>>>>>>>>>>> fields >>>>>>>>>>>> The error message I’m receiving, : Error while reading data, >>>>>>>>>>>> error message: JSON parsing error in row starting at position 0: >>>>>>>>>>>> No such >>>>>>>>>>>> field: nestedField.field1_0, suggests the BigQuery is trying >>>>>>>>>>>> to use the original name for the nested field and not the >>>>>>>>>>>> substitute name. >>>>>>>>>>>> >>>>>>>>>>>> The code for RenameFields seems simple enough but does it >>>>>>>>>>>> behave differently in different runners? Will a deep attachValues >>>>>>>>>>>> be >>>>>>>>>>>> necessary in order get the nested renames to work across all >>>>>>>>>>>> runners? Is >>>>>>>>>>>> there something wrong in my code? >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/RenameFields.java#L186 >>>>>>>>>>>> >>>>>>>>>>>> The unit tests also seem to be disabled for this as well and so >>>>>>>>>>>> I don’t know if the PTransform behaves as expected. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/build.gradle#L67 >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/RenameFieldsTest.java >>>>>>>>>>>> >>>>>>>>>>>> package ca.loblaw.cerebro.PipelineControl; >>>>>>>>>>>>> >>>>>>>>>>>>> import com.google.api.services.bigquery.model.TableReference; >>>>>>>>>>>>> import >>>>>>>>>>>>> org.apache.beam.runners.dataflow.options.DataflowPipelineOptions >>>>>>>>>>>>> ; >>>>>>>>>>>>> import org.apache.beam.sdk.Pipeline; >>>>>>>>>>>>> import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; >>>>>>>>>>>>> import org.apache.beam.sdk.options.PipelineOptionsFactory; >>>>>>>>>>>>> import org.apache.beam.sdk.schemas.Schema; >>>>>>>>>>>>> import org.apache.beam.sdk.schemas.transforms.RenameFields; >>>>>>>>>>>>> import org.apache.beam.sdk.transforms.Create; >>>>>>>>>>>>> import org.apache.beam.sdk.values.Row; >>>>>>>>>>>>> >>>>>>>>>>>>> import java.io.File; >>>>>>>>>>>>> import java.util.Arrays; >>>>>>>>>>>>> import java.util.HashSet; >>>>>>>>>>>>> import java.util.stream.Collectors; >>>>>>>>>>>>> >>>>>>>>>>>>> import static java.util.Arrays.*asList*; >>>>>>>>>>>>> >>>>>>>>>>>>> public class BQRenameFields { >>>>>>>>>>>>> public static void main(String[] args) { >>>>>>>>>>>>> PipelineOptionsFactory.*register*( >>>>>>>>>>>>> DataflowPipelineOptions.class); >>>>>>>>>>>>> DataflowPipelineOptions options = >>>>>>>>>>>>> PipelineOptionsFactory.*fromArgs*(args).as( >>>>>>>>>>>>> DataflowPipelineOptions.class); >>>>>>>>>>>>> options.setFilesToStage( >>>>>>>>>>>>> Arrays.*stream*(System.*getProperty*( >>>>>>>>>>>>> "java.class.path"). >>>>>>>>>>>>> split(File.*pathSeparator*)). >>>>>>>>>>>>> map(entry -> (new >>>>>>>>>>>>> File(entry)).toString()).collect(Collectors.*toList*())); >>>>>>>>>>>>> >>>>>>>>>>>>> Pipeline pipeline = Pipeline.*create*(options); >>>>>>>>>>>>> >>>>>>>>>>>>> Schema nestedSchema = Schema.*builder*().addField( >>>>>>>>>>>>> Schema.Field.*nullable*("field1_0", Schema.FieldType.*STRING* >>>>>>>>>>>>> )).build(); >>>>>>>>>>>>> Schema.Field field = Schema.Field.*nullable*( >>>>>>>>>>>>> "field0_0", Schema.FieldType.*STRING*); >>>>>>>>>>>>> Schema.Field nested = Schema.Field.*nullable*( >>>>>>>>>>>>> "field0_1", Schema.FieldType.*row*(nestedSchema)); >>>>>>>>>>>>> Schema.Field runner = Schema.Field.*nullable*( >>>>>>>>>>>>> "field0_2", Schema.FieldType.*STRING*); >>>>>>>>>>>>> Schema rowSchema = Schema.*builder*() >>>>>>>>>>>>> .addFields(field, nested, runner) >>>>>>>>>>>>> .build(); >>>>>>>>>>>>> Row testRow = Row.*withSchema*(rowSchema >>>>>>>>>>>>> ).attachValues("value0_0", Row.*withSchema*(nestedSchema >>>>>>>>>>>>> ).attachValues("value1_0"), options.getRunner().toString()); >>>>>>>>>>>>> pipeline >>>>>>>>>>>>> .apply(Create.*of*(testRow).withRowSchema( >>>>>>>>>>>>> rowSchema)) >>>>>>>>>>>>> .apply(RenameFields.<Row>*create*() >>>>>>>>>>>>> .rename("field0_0", "stringField") >>>>>>>>>>>>> .rename("field0_1", "nestedField") >>>>>>>>>>>>> .rename("field0_1.field1_0", >>>>>>>>>>>>> "nestedStringField") >>>>>>>>>>>>> .rename("field0_2", "runner")) >>>>>>>>>>>>> .apply(BigQueryIO.<Row>*write*() >>>>>>>>>>>>> .to(new TableReference().setProjectId( >>>>>>>>>>>>> "lt-dia-lake-exp-raw").setDatasetId("prototypes").setTableId( >>>>>>>>>>>>> "matto_renameFields")) >>>>>>>>>>>>> .withCreateDisposition(BigQueryIO. >>>>>>>>>>>>> Write.CreateDisposition.*CREATE_IF_NEEDED*) >>>>>>>>>>>>> .withWriteDisposition(BigQueryIO.Write >>>>>>>>>>>>> .WriteDisposition.*WRITE_APPEND*) >>>>>>>>>>>>> .withSchemaUpdateOptions(new >>>>>>>>>>>>> HashSet<>(*asList*(BigQueryIO.Write.SchemaUpdateOption. >>>>>>>>>>>>> *ALLOW_FIELD_ADDITION*))) >>>>>>>>>>>>> .useBeamSchema()); >>>>>>>>>>>>> pipeline.run(); >>>>>>>>>>>>> } >>>>>>>>>>>>> } >>>>>>>>>>>>> >>>>>>>>>>>>