I still don't quite grok the details of how this succeeds or fails in
different situations. The invalid row succeeds in serialization because the
coder is not sensitive to the way in which it is invalid?

Kenn

On Wed, Jun 2, 2021 at 2:54 PM Brian Hulette <bhule...@google.com> wrote:

> > One thing that's been on the back burner for a long time is making
> CoderProperties into a CoderTester like Guava's EqualityTester.
>
> Reuven's point still applies here though. This issue is not due to a bug
> in SchemaCoder, it's a problem with the Row we gave SchemaCoder to encode.
> I'm assuming a CoderTester would require manually generating inputs right?
> These input Rows represent an illegal state that we wouldn't test with.
> (That being said I like the idea of a CoderTester in general)
>
> Brian
>
> On Wed, Jun 2, 2021 at 12:11 PM Kenneth Knowles <k...@apache.org> wrote:
>
>> Mutability checking might catch that.
>>
>> I meant to suggest not putting the check in the pipeline, but offering a
>> testing discipline that will catch such issues. One thing that's been on
>> the back burner for a long time is making CoderProperties into a
>> CoderTester like Guava's EqualityTester. Then it can run through all the
>> properties without a user setting up test suites. Downside is that the test
>> failure signal gets aggregated.
>>
>> Kenn
>>
>> On Wed, Jun 2, 2021 at 12:09 PM Brian Hulette <bhule...@google.com>
>> wrote:
>>
>>> Could the DirectRunner just do an equality check whenever it does an
>>> encode/decode? It sounds like it's already effectively performing
>>> a CoderProperties.coderDecodeEncodeEqual for every element, just omitting
>>> the equality check.
>>>
>>> On Wed, Jun 2, 2021 at 12:04 PM Reuven Lax <re...@google.com> wrote:
>>>
>>>> There is no bug in the Coder itself, so that wouldn't catch it. We
>>>> could insert CoderProperties.coderDecodeEncodeEqual in a subsequent ParDo,
>>>> but if the Direct runner already does an encode/decode before that ParDo,
>>>> then that would have fixed the problem before we could see it.
>>>>
>>>> On Wed, Jun 2, 2021 at 11:53 AM Kenneth Knowles <k...@apache.org>
>>>> wrote:
>>>>
>>>>> Would it be caught by CoderProperties?
>>>>>
>>>>> Kenn
>>>>>
>>>>> On Wed, Jun 2, 2021 at 8:16 AM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> I don't think this bug is schema specific - we created a Java object
>>>>>> that is inconsistent with its encoded form, which could happen to any
>>>>>> transform.
>>>>>>
>>>>>> This does seem to be a gap in DirectRunner testing though. It also
>>>>>> makes it hard to test using PAssert, as I believe that puts everything 
>>>>>> in a
>>>>>> side input, forcing an encoding/decoding.
>>>>>>
>>>>>> On Wed, Jun 2, 2021 at 8:12 AM Brian Hulette <bhule...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> +dev <d...@beam.apache.org>
>>>>>>>
>>>>>>> > I bet the DirectRunner is encoding and decoding in between, which
>>>>>>> fixes the object.
>>>>>>>
>>>>>>> Do we need better testing of schema-aware (and potentially other
>>>>>>> built-in) transforms in the face of fusion to root out issues like this?
>>>>>>>
>>>>>>> Brian
>>>>>>>
>>>>>>> On Wed, Jun 2, 2021 at 5:13 AM Matthew Ouyang <
>>>>>>> matthew.ouy...@gmail.com> wrote:
>>>>>>>
>>>>>>>> I have some other work-related things I need to do this week, so I
>>>>>>>> will likely report back on this over the weekend.  Thank you for the
>>>>>>>> explanation.  It makes perfect sense now.
>>>>>>>>
>>>>>>>> On Tue, Jun 1, 2021 at 11:18 PM Reuven Lax <re...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Some more context - the problem is that RenameFields outputs (in
>>>>>>>>> this case) Java Row objects that are inconsistent with the actual 
>>>>>>>>> schema.
>>>>>>>>> For example if you have the following schema:
>>>>>>>>>
>>>>>>>>> Row {
>>>>>>>>>    field1: Row {
>>>>>>>>>       field2: string
>>>>>>>>>     }
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> And rename field1.field2 -> renamed, you'll get the following
>>>>>>>>> schema
>>>>>>>>>
>>>>>>>>> Row {
>>>>>>>>>   field1: Row {
>>>>>>>>>      renamed: string
>>>>>>>>>    }
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> However the Java object for the _nested_ row will return the old
>>>>>>>>> schema if getSchema() is called on it. This is because we only update 
>>>>>>>>> the
>>>>>>>>> schema on the top-level row.
>>>>>>>>>
>>>>>>>>> I think this explains why your test works in the direct runner. If
>>>>>>>>> the row ever goes through an encode/decode path, it will come back 
>>>>>>>>> correct.
>>>>>>>>> The original incorrect Java objects are no longer around, and new
>>>>>>>>> (consistent) objects are constructed from the raw data and the 
>>>>>>>>> PCollection
>>>>>>>>> schema. Dataflow tends to fuse ParDos together, so the following 
>>>>>>>>> ParDo will
>>>>>>>>> see the incorrect Row object. I bet the DirectRunner is encoding and
>>>>>>>>> decoding in between, which fixes the object.
>>>>>>>>>
>>>>>>>>> You can validate this theory by forcing a shuffle after
>>>>>>>>> RenameFields using Reshufflle. It should fix the issue If it does, 
>>>>>>>>> let me
>>>>>>>>> know and I'll work on a fix to RenameFields.
>>>>>>>>>
>>>>>>>>> On Tue, Jun 1, 2021 at 7:39 PM Reuven Lax <re...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Aha, yes this indeed another bug in the transform. The schema is
>>>>>>>>>> set on the top-level Row but not on any nested rows.
>>>>>>>>>>
>>>>>>>>>> On Tue, Jun 1, 2021 at 6:37 PM Matthew Ouyang <
>>>>>>>>>> matthew.ouy...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Thank you everyone for your input.  I believe it will be easiest
>>>>>>>>>>> to respond to all feedback in a single message rather than messages 
>>>>>>>>>>> per
>>>>>>>>>>> person.
>>>>>>>>>>>
>>>>>>>>>>>    - NeedsRunner - The tests are run eventually, so obviously
>>>>>>>>>>>    all good on my end.  I was trying to run the smallest subset of 
>>>>>>>>>>> test cases
>>>>>>>>>>>    possible and didn't venture beyond `gradle test`.
>>>>>>>>>>>    - Stack Trace - There wasn't any unfortunately because no
>>>>>>>>>>>    exception thrown in the code.  The Beam Row was translated into 
>>>>>>>>>>> a BQ
>>>>>>>>>>>    TableRow and an insertion was attempted.  The error "message" 
>>>>>>>>>>> was part of
>>>>>>>>>>>    the response JSON that came back as a result of a request 
>>>>>>>>>>> against the BQ
>>>>>>>>>>>    API.
>>>>>>>>>>>    - Desired Behaviour - (field0_1.field1_0, nestedStringField)
>>>>>>>>>>>    -> field0_1.nestedStringField is what I am looking for.
>>>>>>>>>>>    - Info Logging Findings (In Lieu of a Stack Trace)
>>>>>>>>>>>       - The Beam Schema was as expected with all renames
>>>>>>>>>>>       applied.
>>>>>>>>>>>       - The example I provided was heavily stripped down in
>>>>>>>>>>>       order to isolate the problem.  My work example which a bit 
>>>>>>>>>>> impractical
>>>>>>>>>>>       because it's part of some generic tooling has 4 levels of 
>>>>>>>>>>> nesting and also
>>>>>>>>>>>       produces the correct output too.
>>>>>>>>>>>       - BigQueryUtils.toTableRow(Row) returns the expected
>>>>>>>>>>>       TableRow in DirectRunner.  In DataflowRunner however, only 
>>>>>>>>>>> the top-level
>>>>>>>>>>>       renames were reflected in the TableRow and all renames in the 
>>>>>>>>>>> nested fields
>>>>>>>>>>>       weren't.
>>>>>>>>>>>       - BigQueryUtils.toTableRow(Row) recurses on the Row
>>>>>>>>>>>       values and uses the Row.schema to get the field names.  This 
>>>>>>>>>>> makes sense to
>>>>>>>>>>>       me, but if a value is actually a Row then its schema appears 
>>>>>>>>>>> to be
>>>>>>>>>>>       inconsistent with the top-level schema
>>>>>>>>>>>    - My Current Workaround - I forked RenameFields and replaced
>>>>>>>>>>>    the attachValues in expand method to be a "deep" rename.  This 
>>>>>>>>>>> is obviously
>>>>>>>>>>>    inefficient and I will not be submitting a PR for that.
>>>>>>>>>>>    - JIRA ticket -
>>>>>>>>>>>    https://issues.apache.org/jira/browse/BEAM-12442
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Jun 1, 2021 at 5:51 PM Reuven Lax <re...@google.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> This transform is the same across all runners. A few comments
>>>>>>>>>>>> on the test:
>>>>>>>>>>>>
>>>>>>>>>>>>   - Using attachValues directly is error prone (per the comment
>>>>>>>>>>>> on the method). I recommend using the withFieldValue builders 
>>>>>>>>>>>> instead.
>>>>>>>>>>>>   - I recommend capturing the RenameFields PCollection into a
>>>>>>>>>>>> local variable of type PCollection<Row> and printing out the 
>>>>>>>>>>>> schema (which
>>>>>>>>>>>> you can get using the PCollection.getSchema method) to ensure that 
>>>>>>>>>>>> the
>>>>>>>>>>>> output schema looks like you expect.
>>>>>>>>>>>>    - RenameFields doesn't flatten. So renaming
>>>>>>>>>>>> field0_1.field1_0 - > nestedStringField results in
>>>>>>>>>>>> field0_1.nestedStringField; if you wanted to flatten, then the 
>>>>>>>>>>>> better
>>>>>>>>>>>> transform would be Select.fieldNameAs("field0_1.field1_0",
>>>>>>>>>>>> nestedStringField).
>>>>>>>>>>>>
>>>>>>>>>>>> This all being said, eyeballing the implementation of
>>>>>>>>>>>> RenameFields makes me think that it is buggy in the case where you 
>>>>>>>>>>>> specify
>>>>>>>>>>>> a top-level field multiple times like you do. I think it is simply
>>>>>>>>>>>> adding the top-level field into the output schema multiple times, 
>>>>>>>>>>>> and the
>>>>>>>>>>>> second time is with the field0_1 base name; I have no idea why 
>>>>>>>>>>>> your test
>>>>>>>>>>>> doesn't catch this in the DirectRunner, as it's equally broken 
>>>>>>>>>>>> there. Could
>>>>>>>>>>>> you file a JIRA about this issue and assign it to me?
>>>>>>>>>>>>
>>>>>>>>>>>> Reuven
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, Jun 1, 2021 at 12:47 PM Kenneth Knowles <
>>>>>>>>>>>> k...@apache.org> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Tue, Jun 1, 2021 at 12:42 PM Brian Hulette <
>>>>>>>>>>>>> bhule...@google.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Matthew,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> > The unit tests also seem to be disabled for this as well
>>>>>>>>>>>>>> and so I don’t know if the PTransform behaves as expected.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The exclusion for NeedsRunner tests is just a quirk in our
>>>>>>>>>>>>>> testing framework. NeedsRunner indicates that a test suite can't 
>>>>>>>>>>>>>> be
>>>>>>>>>>>>>> executed with the SDK alone, it needs a runner. So that 
>>>>>>>>>>>>>> exclusion just
>>>>>>>>>>>>>> makes sure we don't run the test when we're verifying the SDK by 
>>>>>>>>>>>>>> itself in
>>>>>>>>>>>>>> the :sdks:java:core:test task. The test is still run in other 
>>>>>>>>>>>>>> tasks where
>>>>>>>>>>>>>> we have a runner, most notably in the Java PreCommit [1], where 
>>>>>>>>>>>>>> we run it
>>>>>>>>>>>>>> as part of the :runners:direct-java:test task.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> That being said, we may only run these tests continuously
>>>>>>>>>>>>>> with the DirectRunner, I'm not sure if we test them on all the 
>>>>>>>>>>>>>> runners like
>>>>>>>>>>>>>> we do with ValidatesRunner tests.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> That is correct. The tests are tests _of the transform_ so
>>>>>>>>>>>>> they run only on the DirectRunner. They are not tests of the 
>>>>>>>>>>>>> runner, which
>>>>>>>>>>>>> is only responsible for correctly implementing Beam's primitives. 
>>>>>>>>>>>>> The
>>>>>>>>>>>>> transform should not behave differently on different runners, 
>>>>>>>>>>>>> except for
>>>>>>>>>>>>> fundamental differences in how they schedule work and checkpoint.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Kenn
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>> > The error message I’m receiving, : Error while reading
>>>>>>>>>>>>>> data, error message: JSON parsing error in row starting at 
>>>>>>>>>>>>>> position 0: No
>>>>>>>>>>>>>> such field: nestedField.field1_0, suggests the BigQuery is
>>>>>>>>>>>>>> trying to use the original name for the nested field and not the 
>>>>>>>>>>>>>> substitute
>>>>>>>>>>>>>> name.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Is there a stacktrace associated with this error? It would be
>>>>>>>>>>>>>> helpful to see where the error is coming from.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Brian
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>> https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/4101/testReport/org.apache.beam.sdk.schemas.transforms/RenameFieldsTest/
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Mon, May 31, 2021 at 5:02 PM Matthew Ouyang <
>>>>>>>>>>>>>> matthew.ouy...@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I’m trying to use the RenameFields transform prior to
>>>>>>>>>>>>>>> inserting into BigQuery on nested fields.  Insertion into 
>>>>>>>>>>>>>>> BigQuery is
>>>>>>>>>>>>>>> successful with DirectRunner, but DataflowRunner has an issue 
>>>>>>>>>>>>>>> with renamed
>>>>>>>>>>>>>>> nested fields  The error message I’m receiving, : Error
>>>>>>>>>>>>>>> while reading data, error message: JSON parsing error in row 
>>>>>>>>>>>>>>> starting at
>>>>>>>>>>>>>>> position 0: No such field: nestedField.field1_0, suggests
>>>>>>>>>>>>>>> the BigQuery is trying to use the original name for the nested 
>>>>>>>>>>>>>>> field and
>>>>>>>>>>>>>>> not the substitute name.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The code for RenameFields seems simple enough but does it
>>>>>>>>>>>>>>> behave differently in different runners?  Will a deep 
>>>>>>>>>>>>>>> attachValues be
>>>>>>>>>>>>>>> necessary in order get the nested renames to work across all 
>>>>>>>>>>>>>>> runners? Is
>>>>>>>>>>>>>>> there something wrong in my code?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/RenameFields.java#L186
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The unit tests also seem to be disabled for this as well and
>>>>>>>>>>>>>>> so I don’t know if the PTransform behaves as expected.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/build.gradle#L67
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/RenameFieldsTest.java
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> package ca.loblaw.cerebro.PipelineControl;
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> import
>>>>>>>>>>>>>>>> com.google.api.services.bigquery.model.TableReference;
>>>>>>>>>>>>>>>> import
>>>>>>>>>>>>>>>> org.apache.beam.runners.dataflow.options.DataflowPipelineOptions
>>>>>>>>>>>>>>>> ;
>>>>>>>>>>>>>>>> import org.apache.beam.sdk.Pipeline;
>>>>>>>>>>>>>>>> import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
>>>>>>>>>>>>>>>> import org.apache.beam.sdk.options.PipelineOptionsFactory;
>>>>>>>>>>>>>>>> import org.apache.beam.sdk.schemas.Schema;
>>>>>>>>>>>>>>>> import org.apache.beam.sdk.schemas.transforms.RenameFields;
>>>>>>>>>>>>>>>> import org.apache.beam.sdk.transforms.Create;
>>>>>>>>>>>>>>>> import org.apache.beam.sdk.values.Row;
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> import java.io.File;
>>>>>>>>>>>>>>>> import java.util.Arrays;
>>>>>>>>>>>>>>>> import java.util.HashSet;
>>>>>>>>>>>>>>>> import java.util.stream.Collectors;
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> import static java.util.Arrays.*asList*;
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> public class BQRenameFields {
>>>>>>>>>>>>>>>>     public static void main(String[] args) {
>>>>>>>>>>>>>>>>         PipelineOptionsFactory.*register*(
>>>>>>>>>>>>>>>> DataflowPipelineOptions.class);
>>>>>>>>>>>>>>>>         DataflowPipelineOptions options =
>>>>>>>>>>>>>>>> PipelineOptionsFactory.*fromArgs*(args).as(
>>>>>>>>>>>>>>>> DataflowPipelineOptions.class);
>>>>>>>>>>>>>>>>         options.setFilesToStage(
>>>>>>>>>>>>>>>>                 Arrays.*stream*(System.*getProperty*(
>>>>>>>>>>>>>>>> "java.class.path").
>>>>>>>>>>>>>>>>                         split(File.*pathSeparator*)).
>>>>>>>>>>>>>>>>                         map(entry -> (new
>>>>>>>>>>>>>>>> File(entry)).toString()).collect(Collectors.*toList*()));
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>         Pipeline pipeline = Pipeline.*create*(options);
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>         Schema nestedSchema = Schema.*builder*().addField(
>>>>>>>>>>>>>>>> Schema.Field.*nullable*("field1_0", Schema.FieldType.
>>>>>>>>>>>>>>>> *STRING*)).build();
>>>>>>>>>>>>>>>>         Schema.Field field = Schema.Field.*nullable*(
>>>>>>>>>>>>>>>> "field0_0", Schema.FieldType.*STRING*);
>>>>>>>>>>>>>>>>         Schema.Field nested = Schema.Field.*nullable*(
>>>>>>>>>>>>>>>> "field0_1", Schema.FieldType.*row*(nestedSchema));
>>>>>>>>>>>>>>>>         Schema.Field runner = Schema.Field.*nullable*(
>>>>>>>>>>>>>>>> "field0_2", Schema.FieldType.*STRING*);
>>>>>>>>>>>>>>>>         Schema rowSchema = Schema.*builder*()
>>>>>>>>>>>>>>>>                 .addFields(field, nested, runner)
>>>>>>>>>>>>>>>>                 .build();
>>>>>>>>>>>>>>>>         Row testRow = Row.*withSchema*(rowSchema
>>>>>>>>>>>>>>>> ).attachValues("value0_0", Row.*withSchema*(nestedSchema
>>>>>>>>>>>>>>>> ).attachValues("value1_0"), options
>>>>>>>>>>>>>>>> .getRunner().toString());
>>>>>>>>>>>>>>>>         pipeline
>>>>>>>>>>>>>>>>                 .apply(Create.*of*(testRow).withRowSchema(
>>>>>>>>>>>>>>>> rowSchema))
>>>>>>>>>>>>>>>>                 .apply(RenameFields.<Row>*create*()
>>>>>>>>>>>>>>>>                         .rename("field0_0", "stringField")
>>>>>>>>>>>>>>>>                         .rename("field0_1", "nestedField")
>>>>>>>>>>>>>>>>                         .rename("field0_1.field1_0",
>>>>>>>>>>>>>>>> "nestedStringField")
>>>>>>>>>>>>>>>>                         .rename("field0_2", "runner"))
>>>>>>>>>>>>>>>>                 .apply(BigQueryIO.<Row>*write*()
>>>>>>>>>>>>>>>>                         .to(new
>>>>>>>>>>>>>>>> TableReference().setProjectId("lt-dia-lake-exp-raw"
>>>>>>>>>>>>>>>> ).setDatasetId("prototypes").setTableId(
>>>>>>>>>>>>>>>> "matto_renameFields"))
>>>>>>>>>>>>>>>>                         .withCreateDisposition(BigQueryIO.
>>>>>>>>>>>>>>>> Write.CreateDisposition.*CREATE_IF_NEEDED*)
>>>>>>>>>>>>>>>>                         .withWriteDisposition(BigQueryIO.
>>>>>>>>>>>>>>>> Write.WriteDisposition.*WRITE_APPEND*)
>>>>>>>>>>>>>>>>                         .withSchemaUpdateOptions(new
>>>>>>>>>>>>>>>> HashSet<>(*asList*(BigQueryIO.Write.SchemaUpdateOption.
>>>>>>>>>>>>>>>> *ALLOW_FIELD_ADDITION*)))
>>>>>>>>>>>>>>>>                         .useBeamSchema());
>>>>>>>>>>>>>>>>         pipeline.run();
>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>

Reply via email to