FYI - this should be fixed by https://github.com/apache/beam/pull/14960
On Thu, Jun 3, 2021 at 10:00 AM Reuven Lax <re...@google.com> wrote: > Correct. > > On Thu, Jun 3, 2021 at 9:51 AM Kenneth Knowles <k...@apache.org> wrote: > >> I still don't quite grok the details of how this succeeds or fails in >> different situations. The invalid row succeeds in serialization because the >> coder is not sensitive to the way in which it is invalid? >> >> Kenn >> >> On Wed, Jun 2, 2021 at 2:54 PM Brian Hulette <bhule...@google.com> wrote: >> >>> > One thing that's been on the back burner for a long time is making >>> CoderProperties into a CoderTester like Guava's EqualityTester. >>> >>> Reuven's point still applies here though. This issue is not due to a bug >>> in SchemaCoder, it's a problem with the Row we gave SchemaCoder to encode. >>> I'm assuming a CoderTester would require manually generating inputs right? >>> These input Rows represent an illegal state that we wouldn't test with. >>> (That being said I like the idea of a CoderTester in general) >>> >>> Brian >>> >>> On Wed, Jun 2, 2021 at 12:11 PM Kenneth Knowles <k...@apache.org> wrote: >>> >>>> Mutability checking might catch that. >>>> >>>> I meant to suggest not putting the check in the pipeline, but offering >>>> a testing discipline that will catch such issues. One thing that's been on >>>> the back burner for a long time is making CoderProperties into a >>>> CoderTester like Guava's EqualityTester. Then it can run through all the >>>> properties without a user setting up test suites. Downside is that the test >>>> failure signal gets aggregated. >>>> >>>> Kenn >>>> >>>> On Wed, Jun 2, 2021 at 12:09 PM Brian Hulette <bhule...@google.com> >>>> wrote: >>>> >>>>> Could the DirectRunner just do an equality check whenever it does an >>>>> encode/decode? It sounds like it's already effectively performing >>>>> a CoderProperties.coderDecodeEncodeEqual for every element, just omitting >>>>> the equality check. >>>>> >>>>> On Wed, Jun 2, 2021 at 12:04 PM Reuven Lax <re...@google.com> wrote: >>>>> >>>>>> There is no bug in the Coder itself, so that wouldn't catch it. We >>>>>> could insert CoderProperties.coderDecodeEncodeEqual in a subsequent >>>>>> ParDo, >>>>>> but if the Direct runner already does an encode/decode before that ParDo, >>>>>> then that would have fixed the problem before we could see it. >>>>>> >>>>>> On Wed, Jun 2, 2021 at 11:53 AM Kenneth Knowles <k...@apache.org> >>>>>> wrote: >>>>>> >>>>>>> Would it be caught by CoderProperties? >>>>>>> >>>>>>> Kenn >>>>>>> >>>>>>> On Wed, Jun 2, 2021 at 8:16 AM Reuven Lax <re...@google.com> wrote: >>>>>>> >>>>>>>> I don't think this bug is schema specific - we created a Java >>>>>>>> object that is inconsistent with its encoded form, which could happen >>>>>>>> to >>>>>>>> any transform. >>>>>>>> >>>>>>>> This does seem to be a gap in DirectRunner testing though. It also >>>>>>>> makes it hard to test using PAssert, as I believe that puts everything >>>>>>>> in a >>>>>>>> side input, forcing an encoding/decoding. >>>>>>>> >>>>>>>> On Wed, Jun 2, 2021 at 8:12 AM Brian Hulette <bhule...@google.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> +dev <d...@beam.apache.org> >>>>>>>>> >>>>>>>>> > I bet the DirectRunner is encoding and decoding in between, >>>>>>>>> which fixes the object. >>>>>>>>> >>>>>>>>> Do we need better testing of schema-aware (and potentially other >>>>>>>>> built-in) transforms in the face of fusion to root out issues like >>>>>>>>> this? >>>>>>>>> >>>>>>>>> Brian >>>>>>>>> >>>>>>>>> On Wed, Jun 2, 2021 at 5:13 AM Matthew Ouyang < >>>>>>>>> matthew.ouy...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> I have some other work-related things I need to do this week, so >>>>>>>>>> I will likely report back on this over the weekend. Thank you for >>>>>>>>>> the >>>>>>>>>> explanation. It makes perfect sense now. >>>>>>>>>> >>>>>>>>>> On Tue, Jun 1, 2021 at 11:18 PM Reuven Lax <re...@google.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Some more context - the problem is that RenameFields outputs (in >>>>>>>>>>> this case) Java Row objects that are inconsistent with the actual >>>>>>>>>>> schema. >>>>>>>>>>> For example if you have the following schema: >>>>>>>>>>> >>>>>>>>>>> Row { >>>>>>>>>>> field1: Row { >>>>>>>>>>> field2: string >>>>>>>>>>> } >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> And rename field1.field2 -> renamed, you'll get the following >>>>>>>>>>> schema >>>>>>>>>>> >>>>>>>>>>> Row { >>>>>>>>>>> field1: Row { >>>>>>>>>>> renamed: string >>>>>>>>>>> } >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> However the Java object for the _nested_ row will return the old >>>>>>>>>>> schema if getSchema() is called on it. This is because we only >>>>>>>>>>> update the >>>>>>>>>>> schema on the top-level row. >>>>>>>>>>> >>>>>>>>>>> I think this explains why your test works in the direct runner. >>>>>>>>>>> If the row ever goes through an encode/decode path, it will come >>>>>>>>>>> back >>>>>>>>>>> correct. The original incorrect Java objects are no longer around, >>>>>>>>>>> and new >>>>>>>>>>> (consistent) objects are constructed from the raw data and the >>>>>>>>>>> PCollection >>>>>>>>>>> schema. Dataflow tends to fuse ParDos together, so the following >>>>>>>>>>> ParDo will >>>>>>>>>>> see the incorrect Row object. I bet the DirectRunner is encoding and >>>>>>>>>>> decoding in between, which fixes the object. >>>>>>>>>>> >>>>>>>>>>> You can validate this theory by forcing a shuffle after >>>>>>>>>>> RenameFields using Reshufflle. It should fix the issue If it does, >>>>>>>>>>> let me >>>>>>>>>>> know and I'll work on a fix to RenameFields. >>>>>>>>>>> >>>>>>>>>>> On Tue, Jun 1, 2021 at 7:39 PM Reuven Lax <re...@google.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Aha, yes this indeed another bug in the transform. The schema >>>>>>>>>>>> is set on the top-level Row but not on any nested rows. >>>>>>>>>>>> >>>>>>>>>>>> On Tue, Jun 1, 2021 at 6:37 PM Matthew Ouyang < >>>>>>>>>>>> matthew.ouy...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Thank you everyone for your input. I believe it will be >>>>>>>>>>>>> easiest to respond to all feedback in a single message rather >>>>>>>>>>>>> than messages >>>>>>>>>>>>> per person. >>>>>>>>>>>>> >>>>>>>>>>>>> - NeedsRunner - The tests are run eventually, so obviously >>>>>>>>>>>>> all good on my end. I was trying to run the smallest subset >>>>>>>>>>>>> of test cases >>>>>>>>>>>>> possible and didn't venture beyond `gradle test`. >>>>>>>>>>>>> - Stack Trace - There wasn't any unfortunately because no >>>>>>>>>>>>> exception thrown in the code. The Beam Row was translated >>>>>>>>>>>>> into a BQ >>>>>>>>>>>>> TableRow and an insertion was attempted. The error "message" >>>>>>>>>>>>> was part of >>>>>>>>>>>>> the response JSON that came back as a result of a request >>>>>>>>>>>>> against the BQ >>>>>>>>>>>>> API. >>>>>>>>>>>>> - Desired Behaviour - (field0_1.field1_0, >>>>>>>>>>>>> nestedStringField) -> field0_1.nestedStringField is what I am >>>>>>>>>>>>> looking for. >>>>>>>>>>>>> - Info Logging Findings (In Lieu of a Stack Trace) >>>>>>>>>>>>> - The Beam Schema was as expected with all renames >>>>>>>>>>>>> applied. >>>>>>>>>>>>> - The example I provided was heavily stripped down in >>>>>>>>>>>>> order to isolate the problem. My work example which a bit >>>>>>>>>>>>> impractical >>>>>>>>>>>>> because it's part of some generic tooling has 4 levels of >>>>>>>>>>>>> nesting and also >>>>>>>>>>>>> produces the correct output too. >>>>>>>>>>>>> - BigQueryUtils.toTableRow(Row) returns the expected >>>>>>>>>>>>> TableRow in DirectRunner. In DataflowRunner however, only >>>>>>>>>>>>> the top-level >>>>>>>>>>>>> renames were reflected in the TableRow and all renames in >>>>>>>>>>>>> the nested fields >>>>>>>>>>>>> weren't. >>>>>>>>>>>>> - BigQueryUtils.toTableRow(Row) recurses on the Row >>>>>>>>>>>>> values and uses the Row.schema to get the field names. >>>>>>>>>>>>> This makes sense to >>>>>>>>>>>>> me, but if a value is actually a Row then its schema >>>>>>>>>>>>> appears to be >>>>>>>>>>>>> inconsistent with the top-level schema >>>>>>>>>>>>> - My Current Workaround - I forked RenameFields and >>>>>>>>>>>>> replaced the attachValues in expand method to be a "deep" >>>>>>>>>>>>> rename. This is >>>>>>>>>>>>> obviously inefficient and I will not be submitting a PR for >>>>>>>>>>>>> that. >>>>>>>>>>>>> - JIRA ticket - >>>>>>>>>>>>> https://issues.apache.org/jira/browse/BEAM-12442 >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Tue, Jun 1, 2021 at 5:51 PM Reuven Lax <re...@google.com> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> This transform is the same across all runners. A few comments >>>>>>>>>>>>>> on the test: >>>>>>>>>>>>>> >>>>>>>>>>>>>> - Using attachValues directly is error prone (per the >>>>>>>>>>>>>> comment on the method). I recommend using the withFieldValue >>>>>>>>>>>>>> builders instead. >>>>>>>>>>>>>> - I recommend capturing the RenameFields PCollection into a >>>>>>>>>>>>>> local variable of type PCollection<Row> and printing out the >>>>>>>>>>>>>> schema (which >>>>>>>>>>>>>> you can get using the PCollection.getSchema method) to ensure >>>>>>>>>>>>>> that the >>>>>>>>>>>>>> output schema looks like you expect. >>>>>>>>>>>>>> - RenameFields doesn't flatten. So renaming >>>>>>>>>>>>>> field0_1.field1_0 - > nestedStringField results in >>>>>>>>>>>>>> field0_1.nestedStringField; if you wanted to flatten, then the >>>>>>>>>>>>>> better >>>>>>>>>>>>>> transform would be Select.fieldNameAs("field0_1.field1_0", >>>>>>>>>>>>>> nestedStringField). >>>>>>>>>>>>>> >>>>>>>>>>>>>> This all being said, eyeballing the implementation of >>>>>>>>>>>>>> RenameFields makes me think that it is buggy in the case where >>>>>>>>>>>>>> you specify >>>>>>>>>>>>>> a top-level field multiple times like you do. I think it is >>>>>>>>>>>>>> simply >>>>>>>>>>>>>> adding the top-level field into the output schema multiple >>>>>>>>>>>>>> times, and the >>>>>>>>>>>>>> second time is with the field0_1 base name; I have no idea why >>>>>>>>>>>>>> your test >>>>>>>>>>>>>> doesn't catch this in the DirectRunner, as it's equally broken >>>>>>>>>>>>>> there. Could >>>>>>>>>>>>>> you file a JIRA about this issue and assign it to me? >>>>>>>>>>>>>> >>>>>>>>>>>>>> Reuven >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Tue, Jun 1, 2021 at 12:47 PM Kenneth Knowles < >>>>>>>>>>>>>> k...@apache.org> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Tue, Jun 1, 2021 at 12:42 PM Brian Hulette < >>>>>>>>>>>>>>> bhule...@google.com> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi Matthew, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> > The unit tests also seem to be disabled for this as well >>>>>>>>>>>>>>>> and so I don’t know if the PTransform behaves as expected. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> The exclusion for NeedsRunner tests is just a quirk in our >>>>>>>>>>>>>>>> testing framework. NeedsRunner indicates that a test suite >>>>>>>>>>>>>>>> can't be >>>>>>>>>>>>>>>> executed with the SDK alone, it needs a runner. So that >>>>>>>>>>>>>>>> exclusion just >>>>>>>>>>>>>>>> makes sure we don't run the test when we're verifying the SDK >>>>>>>>>>>>>>>> by itself in >>>>>>>>>>>>>>>> the :sdks:java:core:test task. The test is still run in other >>>>>>>>>>>>>>>> tasks where >>>>>>>>>>>>>>>> we have a runner, most notably in the Java PreCommit [1], >>>>>>>>>>>>>>>> where we run it >>>>>>>>>>>>>>>> as part of the :runners:direct-java:test task. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> That being said, we may only run these tests continuously >>>>>>>>>>>>>>>> with the DirectRunner, I'm not sure if we test them on all the >>>>>>>>>>>>>>>> runners like >>>>>>>>>>>>>>>> we do with ValidatesRunner tests. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> That is correct. The tests are tests _of the transform_ so >>>>>>>>>>>>>>> they run only on the DirectRunner. They are not tests of the >>>>>>>>>>>>>>> runner, which >>>>>>>>>>>>>>> is only responsible for correctly implementing Beam's >>>>>>>>>>>>>>> primitives. The >>>>>>>>>>>>>>> transform should not behave differently on different runners, >>>>>>>>>>>>>>> except for >>>>>>>>>>>>>>> fundamental differences in how they schedule work and >>>>>>>>>>>>>>> checkpoint. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Kenn >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> > The error message I’m receiving, : Error while reading >>>>>>>>>>>>>>>> data, error message: JSON parsing error in row starting at >>>>>>>>>>>>>>>> position 0: No >>>>>>>>>>>>>>>> such field: nestedField.field1_0, suggests the BigQuery is >>>>>>>>>>>>>>>> trying to use the original name for the nested field and not >>>>>>>>>>>>>>>> the substitute >>>>>>>>>>>>>>>> name. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Is there a stacktrace associated with this error? It would >>>>>>>>>>>>>>>> be helpful to see where the error is coming from. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Brian >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> [1] >>>>>>>>>>>>>>>> https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/4101/testReport/org.apache.beam.sdk.schemas.transforms/RenameFieldsTest/ >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Mon, May 31, 2021 at 5:02 PM Matthew Ouyang < >>>>>>>>>>>>>>>> matthew.ouy...@gmail.com> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I’m trying to use the RenameFields transform prior to >>>>>>>>>>>>>>>>> inserting into BigQuery on nested fields. Insertion into >>>>>>>>>>>>>>>>> BigQuery is >>>>>>>>>>>>>>>>> successful with DirectRunner, but DataflowRunner has an issue >>>>>>>>>>>>>>>>> with renamed >>>>>>>>>>>>>>>>> nested fields The error message I’m receiving, : Error >>>>>>>>>>>>>>>>> while reading data, error message: JSON parsing error in row >>>>>>>>>>>>>>>>> starting at >>>>>>>>>>>>>>>>> position 0: No such field: nestedField.field1_0, suggests >>>>>>>>>>>>>>>>> the BigQuery is trying to use the original name for the >>>>>>>>>>>>>>>>> nested field and >>>>>>>>>>>>>>>>> not the substitute name. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> The code for RenameFields seems simple enough but does it >>>>>>>>>>>>>>>>> behave differently in different runners? Will a deep >>>>>>>>>>>>>>>>> attachValues be >>>>>>>>>>>>>>>>> necessary in order get the nested renames to work across all >>>>>>>>>>>>>>>>> runners? Is >>>>>>>>>>>>>>>>> there something wrong in my code? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/RenameFields.java#L186 >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> The unit tests also seem to be disabled for this as well >>>>>>>>>>>>>>>>> and so I don’t know if the PTransform behaves as expected. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/build.gradle#L67 >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/RenameFieldsTest.java >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> package ca.loblaw.cerebro.PipelineControl; >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> import >>>>>>>>>>>>>>>>>> com.google.api.services.bigquery.model.TableReference; >>>>>>>>>>>>>>>>>> import >>>>>>>>>>>>>>>>>> org.apache.beam.runners.dataflow.options.DataflowPipelineOptions >>>>>>>>>>>>>>>>>> ; >>>>>>>>>>>>>>>>>> import org.apache.beam.sdk.Pipeline; >>>>>>>>>>>>>>>>>> import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; >>>>>>>>>>>>>>>>>> import org.apache.beam.sdk.options.PipelineOptionsFactory >>>>>>>>>>>>>>>>>> ; >>>>>>>>>>>>>>>>>> import org.apache.beam.sdk.schemas.Schema; >>>>>>>>>>>>>>>>>> import >>>>>>>>>>>>>>>>>> org.apache.beam.sdk.schemas.transforms.RenameFields; >>>>>>>>>>>>>>>>>> import org.apache.beam.sdk.transforms.Create; >>>>>>>>>>>>>>>>>> import org.apache.beam.sdk.values.Row; >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> import java.io.File; >>>>>>>>>>>>>>>>>> import java.util.Arrays; >>>>>>>>>>>>>>>>>> import java.util.HashSet; >>>>>>>>>>>>>>>>>> import java.util.stream.Collectors; >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> import static java.util.Arrays.*asList*; >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> public class BQRenameFields { >>>>>>>>>>>>>>>>>> public static void main(String[] args) { >>>>>>>>>>>>>>>>>> PipelineOptionsFactory.*register*( >>>>>>>>>>>>>>>>>> DataflowPipelineOptions.class); >>>>>>>>>>>>>>>>>> DataflowPipelineOptions options = >>>>>>>>>>>>>>>>>> PipelineOptionsFactory.*fromArgs*(args).as( >>>>>>>>>>>>>>>>>> DataflowPipelineOptions.class); >>>>>>>>>>>>>>>>>> options.setFilesToStage( >>>>>>>>>>>>>>>>>> Arrays.*stream*(System.*getProperty*( >>>>>>>>>>>>>>>>>> "java.class.path"). >>>>>>>>>>>>>>>>>> split(File.*pathSeparator*)). >>>>>>>>>>>>>>>>>> map(entry -> (new >>>>>>>>>>>>>>>>>> File(entry)).toString()).collect(Collectors.*toList*())); >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Pipeline pipeline = Pipeline.*create*(options); >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Schema nestedSchema = Schema.*builder* >>>>>>>>>>>>>>>>>> ().addField(Schema.Field.*nullable*("field1_0", Schema. >>>>>>>>>>>>>>>>>> FieldType.*STRING*)).build(); >>>>>>>>>>>>>>>>>> Schema.Field field = Schema.Field.*nullable*( >>>>>>>>>>>>>>>>>> "field0_0", Schema.FieldType.*STRING*); >>>>>>>>>>>>>>>>>> Schema.Field nested = Schema.Field.*nullable*( >>>>>>>>>>>>>>>>>> "field0_1", Schema.FieldType.*row*(nestedSchema)); >>>>>>>>>>>>>>>>>> Schema.Field runner = Schema.Field.*nullable*( >>>>>>>>>>>>>>>>>> "field0_2", Schema.FieldType.*STRING*); >>>>>>>>>>>>>>>>>> Schema rowSchema = Schema.*builder*() >>>>>>>>>>>>>>>>>> .addFields(field, nested, runner) >>>>>>>>>>>>>>>>>> .build(); >>>>>>>>>>>>>>>>>> Row testRow = Row.*withSchema*(rowSchema >>>>>>>>>>>>>>>>>> ).attachValues("value0_0", Row.*withSchema*(nestedSchema >>>>>>>>>>>>>>>>>> ).attachValues("value1_0"), options >>>>>>>>>>>>>>>>>> .getRunner().toString()); >>>>>>>>>>>>>>>>>> pipeline >>>>>>>>>>>>>>>>>> .apply(Create.*of*(testRow >>>>>>>>>>>>>>>>>> ).withRowSchema(rowSchema)) >>>>>>>>>>>>>>>>>> .apply(RenameFields.<Row>*create*() >>>>>>>>>>>>>>>>>> .rename("field0_0", "stringField" >>>>>>>>>>>>>>>>>> ) >>>>>>>>>>>>>>>>>> .rename("field0_1", "nestedField" >>>>>>>>>>>>>>>>>> ) >>>>>>>>>>>>>>>>>> .rename("field0_1.field1_0", >>>>>>>>>>>>>>>>>> "nestedStringField") >>>>>>>>>>>>>>>>>> .rename("field0_2", "runner")) >>>>>>>>>>>>>>>>>> .apply(BigQueryIO.<Row>*write*() >>>>>>>>>>>>>>>>>> .to(new >>>>>>>>>>>>>>>>>> TableReference().setProjectId("lt-dia-lake-exp-raw" >>>>>>>>>>>>>>>>>> ).setDatasetId("prototypes").setTableId( >>>>>>>>>>>>>>>>>> "matto_renameFields")) >>>>>>>>>>>>>>>>>> .withCreateDisposition(BigQueryIO >>>>>>>>>>>>>>>>>> .Write.CreateDisposition.*CREATE_IF_NEEDED*) >>>>>>>>>>>>>>>>>> .withWriteDisposition(BigQueryIO. >>>>>>>>>>>>>>>>>> Write.WriteDisposition.*WRITE_APPEND*) >>>>>>>>>>>>>>>>>> .withSchemaUpdateOptions(new >>>>>>>>>>>>>>>>>> HashSet<>(*asList*(BigQueryIO.Write.SchemaUpdateOption. >>>>>>>>>>>>>>>>>> *ALLOW_FIELD_ADDITION*))) >>>>>>>>>>>>>>>>>> .useBeamSchema()); >>>>>>>>>>>>>>>>>> pipeline.run(); >>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>