> One thing that's been on the back burner for a long time is making
CoderProperties into a CoderTester like Guava's EqualityTester.

Reuven's point still applies here though. This issue is not due to a bug in
SchemaCoder, it's a problem with the Row we gave SchemaCoder to encode. I'm
assuming a CoderTester would require manually generating inputs right?
These input Rows represent an illegal state that we wouldn't test with.
(That being said I like the idea of a CoderTester in general)

Brian

On Wed, Jun 2, 2021 at 12:11 PM Kenneth Knowles <k...@apache.org> wrote:

> Mutability checking might catch that.
>
> I meant to suggest not putting the check in the pipeline, but offering a
> testing discipline that will catch such issues. One thing that's been on
> the back burner for a long time is making CoderProperties into a
> CoderTester like Guava's EqualityTester. Then it can run through all the
> properties without a user setting up test suites. Downside is that the test
> failure signal gets aggregated.
>
> Kenn
>
> On Wed, Jun 2, 2021 at 12:09 PM Brian Hulette <bhule...@google.com> wrote:
>
>> Could the DirectRunner just do an equality check whenever it does an
>> encode/decode? It sounds like it's already effectively performing
>> a CoderProperties.coderDecodeEncodeEqual for every element, just omitting
>> the equality check.
>>
>> On Wed, Jun 2, 2021 at 12:04 PM Reuven Lax <re...@google.com> wrote:
>>
>>> There is no bug in the Coder itself, so that wouldn't catch it. We could
>>> insert CoderProperties.coderDecodeEncodeEqual in a subsequent ParDo, but if
>>> the Direct runner already does an encode/decode before that ParDo, then
>>> that would have fixed the problem before we could see it.
>>>
>>> On Wed, Jun 2, 2021 at 11:53 AM Kenneth Knowles <k...@apache.org> wrote:
>>>
>>>> Would it be caught by CoderProperties?
>>>>
>>>> Kenn
>>>>
>>>> On Wed, Jun 2, 2021 at 8:16 AM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> I don't think this bug is schema specific - we created a Java object
>>>>> that is inconsistent with its encoded form, which could happen to any
>>>>> transform.
>>>>>
>>>>> This does seem to be a gap in DirectRunner testing though. It also
>>>>> makes it hard to test using PAssert, as I believe that puts everything in 
>>>>> a
>>>>> side input, forcing an encoding/decoding.
>>>>>
>>>>> On Wed, Jun 2, 2021 at 8:12 AM Brian Hulette <bhule...@google.com>
>>>>> wrote:
>>>>>
>>>>>> +dev <d...@beam.apache.org>
>>>>>>
>>>>>> > I bet the DirectRunner is encoding and decoding in between, which
>>>>>> fixes the object.
>>>>>>
>>>>>> Do we need better testing of schema-aware (and potentially other
>>>>>> built-in) transforms in the face of fusion to root out issues like this?
>>>>>>
>>>>>> Brian
>>>>>>
>>>>>> On Wed, Jun 2, 2021 at 5:13 AM Matthew Ouyang <
>>>>>> matthew.ouy...@gmail.com> wrote:
>>>>>>
>>>>>>> I have some other work-related things I need to do this week, so I
>>>>>>> will likely report back on this over the weekend.  Thank you for the
>>>>>>> explanation.  It makes perfect sense now.
>>>>>>>
>>>>>>> On Tue, Jun 1, 2021 at 11:18 PM Reuven Lax <re...@google.com> wrote:
>>>>>>>
>>>>>>>> Some more context - the problem is that RenameFields outputs (in
>>>>>>>> this case) Java Row objects that are inconsistent with the actual 
>>>>>>>> schema.
>>>>>>>> For example if you have the following schema:
>>>>>>>>
>>>>>>>> Row {
>>>>>>>>    field1: Row {
>>>>>>>>       field2: string
>>>>>>>>     }
>>>>>>>> }
>>>>>>>>
>>>>>>>> And rename field1.field2 -> renamed, you'll get the following schema
>>>>>>>>
>>>>>>>> Row {
>>>>>>>>   field1: Row {
>>>>>>>>      renamed: string
>>>>>>>>    }
>>>>>>>> }
>>>>>>>>
>>>>>>>> However the Java object for the _nested_ row will return the old
>>>>>>>> schema if getSchema() is called on it. This is because we only update 
>>>>>>>> the
>>>>>>>> schema on the top-level row.
>>>>>>>>
>>>>>>>> I think this explains why your test works in the direct runner. If
>>>>>>>> the row ever goes through an encode/decode path, it will come back 
>>>>>>>> correct.
>>>>>>>> The original incorrect Java objects are no longer around, and new
>>>>>>>> (consistent) objects are constructed from the raw data and the 
>>>>>>>> PCollection
>>>>>>>> schema. Dataflow tends to fuse ParDos together, so the following ParDo 
>>>>>>>> will
>>>>>>>> see the incorrect Row object. I bet the DirectRunner is encoding and
>>>>>>>> decoding in between, which fixes the object.
>>>>>>>>
>>>>>>>> You can validate this theory by forcing a shuffle after
>>>>>>>> RenameFields using Reshufflle. It should fix the issue If it does, let 
>>>>>>>> me
>>>>>>>> know and I'll work on a fix to RenameFields.
>>>>>>>>
>>>>>>>> On Tue, Jun 1, 2021 at 7:39 PM Reuven Lax <re...@google.com> wrote:
>>>>>>>>
>>>>>>>>> Aha, yes this indeed another bug in the transform. The schema is
>>>>>>>>> set on the top-level Row but not on any nested rows.
>>>>>>>>>
>>>>>>>>> On Tue, Jun 1, 2021 at 6:37 PM Matthew Ouyang <
>>>>>>>>> matthew.ouy...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Thank you everyone for your input.  I believe it will be easiest
>>>>>>>>>> to respond to all feedback in a single message rather than messages 
>>>>>>>>>> per
>>>>>>>>>> person.
>>>>>>>>>>
>>>>>>>>>>    - NeedsRunner - The tests are run eventually, so obviously
>>>>>>>>>>    all good on my end.  I was trying to run the smallest subset of 
>>>>>>>>>> test cases
>>>>>>>>>>    possible and didn't venture beyond `gradle test`.
>>>>>>>>>>    - Stack Trace - There wasn't any unfortunately because no
>>>>>>>>>>    exception thrown in the code.  The Beam Row was translated into a 
>>>>>>>>>> BQ
>>>>>>>>>>    TableRow and an insertion was attempted.  The error "message" was 
>>>>>>>>>> part of
>>>>>>>>>>    the response JSON that came back as a result of a request against 
>>>>>>>>>> the BQ
>>>>>>>>>>    API.
>>>>>>>>>>    - Desired Behaviour - (field0_1.field1_0, nestedStringField)
>>>>>>>>>>    -> field0_1.nestedStringField is what I am looking for.
>>>>>>>>>>    - Info Logging Findings (In Lieu of a Stack Trace)
>>>>>>>>>>       - The Beam Schema was as expected with all renames applied.
>>>>>>>>>>       - The example I provided was heavily stripped down in
>>>>>>>>>>       order to isolate the problem.  My work example which a bit 
>>>>>>>>>> impractical
>>>>>>>>>>       because it's part of some generic tooling has 4 levels of 
>>>>>>>>>> nesting and also
>>>>>>>>>>       produces the correct output too.
>>>>>>>>>>       - BigQueryUtils.toTableRow(Row) returns the expected
>>>>>>>>>>       TableRow in DirectRunner.  In DataflowRunner however, only the 
>>>>>>>>>> top-level
>>>>>>>>>>       renames were reflected in the TableRow and all renames in the 
>>>>>>>>>> nested fields
>>>>>>>>>>       weren't.
>>>>>>>>>>       - BigQueryUtils.toTableRow(Row) recurses on the Row values
>>>>>>>>>>       and uses the Row.schema to get the field names.  This makes 
>>>>>>>>>> sense to me,
>>>>>>>>>>       but if a value is actually a Row then its schema appears to be 
>>>>>>>>>> inconsistent
>>>>>>>>>>       with the top-level schema
>>>>>>>>>>    - My Current Workaround - I forked RenameFields and replaced
>>>>>>>>>>    the attachValues in expand method to be a "deep" rename.  This is 
>>>>>>>>>> obviously
>>>>>>>>>>    inefficient and I will not be submitting a PR for that.
>>>>>>>>>>    - JIRA ticket -
>>>>>>>>>>    https://issues.apache.org/jira/browse/BEAM-12442
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Tue, Jun 1, 2021 at 5:51 PM Reuven Lax <re...@google.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> This transform is the same across all runners. A few comments on
>>>>>>>>>>> the test:
>>>>>>>>>>>
>>>>>>>>>>>   - Using attachValues directly is error prone (per the comment
>>>>>>>>>>> on the method). I recommend using the withFieldValue builders 
>>>>>>>>>>> instead.
>>>>>>>>>>>   - I recommend capturing the RenameFields PCollection into a
>>>>>>>>>>> local variable of type PCollection<Row> and printing out the schema 
>>>>>>>>>>> (which
>>>>>>>>>>> you can get using the PCollection.getSchema method) to ensure that 
>>>>>>>>>>> the
>>>>>>>>>>> output schema looks like you expect.
>>>>>>>>>>>    - RenameFields doesn't flatten. So renaming field0_1.field1_0
>>>>>>>>>>> - > nestedStringField results in field0_1.nestedStringField; if you 
>>>>>>>>>>> wanted
>>>>>>>>>>> to flatten, then the better transform would be
>>>>>>>>>>> Select.fieldNameAs("field0_1.field1_0", nestedStringField).
>>>>>>>>>>>
>>>>>>>>>>> This all being said, eyeballing the implementation of
>>>>>>>>>>> RenameFields makes me think that it is buggy in the case where you 
>>>>>>>>>>> specify
>>>>>>>>>>> a top-level field multiple times like you do. I think it is simply
>>>>>>>>>>> adding the top-level field into the output schema multiple times, 
>>>>>>>>>>> and the
>>>>>>>>>>> second time is with the field0_1 base name; I have no idea why your 
>>>>>>>>>>> test
>>>>>>>>>>> doesn't catch this in the DirectRunner, as it's equally broken 
>>>>>>>>>>> there. Could
>>>>>>>>>>> you file a JIRA about this issue and assign it to me?
>>>>>>>>>>>
>>>>>>>>>>> Reuven
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Jun 1, 2021 at 12:47 PM Kenneth Knowles <k...@apache.org>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, Jun 1, 2021 at 12:42 PM Brian Hulette <
>>>>>>>>>>>> bhule...@google.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Matthew,
>>>>>>>>>>>>>
>>>>>>>>>>>>> > The unit tests also seem to be disabled for this as well and
>>>>>>>>>>>>> so I don’t know if the PTransform behaves as expected.
>>>>>>>>>>>>>
>>>>>>>>>>>>> The exclusion for NeedsRunner tests is just a quirk in our
>>>>>>>>>>>>> testing framework. NeedsRunner indicates that a test suite can't 
>>>>>>>>>>>>> be
>>>>>>>>>>>>> executed with the SDK alone, it needs a runner. So that exclusion 
>>>>>>>>>>>>> just
>>>>>>>>>>>>> makes sure we don't run the test when we're verifying the SDK by 
>>>>>>>>>>>>> itself in
>>>>>>>>>>>>> the :sdks:java:core:test task. The test is still run in other 
>>>>>>>>>>>>> tasks where
>>>>>>>>>>>>> we have a runner, most notably in the Java PreCommit [1], where 
>>>>>>>>>>>>> we run it
>>>>>>>>>>>>> as part of the :runners:direct-java:test task.
>>>>>>>>>>>>>
>>>>>>>>>>>>> That being said, we may only run these tests continuously with
>>>>>>>>>>>>> the DirectRunner, I'm not sure if we test them on all the runners 
>>>>>>>>>>>>> like we
>>>>>>>>>>>>> do with ValidatesRunner tests.
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> That is correct. The tests are tests _of the transform_ so they
>>>>>>>>>>>> run only on the DirectRunner. They are not tests of the runner, 
>>>>>>>>>>>> which is
>>>>>>>>>>>> only responsible for correctly implementing Beam's primitives. The
>>>>>>>>>>>> transform should not behave differently on different runners, 
>>>>>>>>>>>> except for
>>>>>>>>>>>> fundamental differences in how they schedule work and checkpoint.
>>>>>>>>>>>>
>>>>>>>>>>>> Kenn
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>> > The error message I’m receiving, : Error while reading
>>>>>>>>>>>>> data, error message: JSON parsing error in row starting at 
>>>>>>>>>>>>> position 0: No
>>>>>>>>>>>>> such field: nestedField.field1_0, suggests the BigQuery is
>>>>>>>>>>>>> trying to use the original name for the nested field and not the 
>>>>>>>>>>>>> substitute
>>>>>>>>>>>>> name.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Is there a stacktrace associated with this error? It would be
>>>>>>>>>>>>> helpful to see where the error is coming from.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Brian
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> [1]
>>>>>>>>>>>>> https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/4101/testReport/org.apache.beam.sdk.schemas.transforms/RenameFieldsTest/
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Mon, May 31, 2021 at 5:02 PM Matthew Ouyang <
>>>>>>>>>>>>> matthew.ouy...@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> I’m trying to use the RenameFields transform prior to
>>>>>>>>>>>>>> inserting into BigQuery on nested fields.  Insertion into 
>>>>>>>>>>>>>> BigQuery is
>>>>>>>>>>>>>> successful with DirectRunner, but DataflowRunner has an issue 
>>>>>>>>>>>>>> with renamed
>>>>>>>>>>>>>> nested fields  The error message I’m receiving, : Error
>>>>>>>>>>>>>> while reading data, error message: JSON parsing error in row 
>>>>>>>>>>>>>> starting at
>>>>>>>>>>>>>> position 0: No such field: nestedField.field1_0, suggests
>>>>>>>>>>>>>> the BigQuery is trying to use the original name for the nested 
>>>>>>>>>>>>>> field and
>>>>>>>>>>>>>> not the substitute name.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The code for RenameFields seems simple enough but does it
>>>>>>>>>>>>>> behave differently in different runners?  Will a deep 
>>>>>>>>>>>>>> attachValues be
>>>>>>>>>>>>>> necessary in order get the nested renames to work across all 
>>>>>>>>>>>>>> runners? Is
>>>>>>>>>>>>>> there something wrong in my code?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/RenameFields.java#L186
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The unit tests also seem to be disabled for this as well and
>>>>>>>>>>>>>> so I don’t know if the PTransform behaves as expected.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/build.gradle#L67
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/RenameFieldsTest.java
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> package ca.loblaw.cerebro.PipelineControl;
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> import com.google.api.services.bigquery.model.TableReference
>>>>>>>>>>>>>>> ;
>>>>>>>>>>>>>>> import
>>>>>>>>>>>>>>> org.apache.beam.runners.dataflow.options.DataflowPipelineOptions
>>>>>>>>>>>>>>> ;
>>>>>>>>>>>>>>> import org.apache.beam.sdk.Pipeline;
>>>>>>>>>>>>>>> import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
>>>>>>>>>>>>>>> import org.apache.beam.sdk.options.PipelineOptionsFactory;
>>>>>>>>>>>>>>> import org.apache.beam.sdk.schemas.Schema;
>>>>>>>>>>>>>>> import org.apache.beam.sdk.schemas.transforms.RenameFields;
>>>>>>>>>>>>>>> import org.apache.beam.sdk.transforms.Create;
>>>>>>>>>>>>>>> import org.apache.beam.sdk.values.Row;
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> import java.io.File;
>>>>>>>>>>>>>>> import java.util.Arrays;
>>>>>>>>>>>>>>> import java.util.HashSet;
>>>>>>>>>>>>>>> import java.util.stream.Collectors;
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> import static java.util.Arrays.*asList*;
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> public class BQRenameFields {
>>>>>>>>>>>>>>>     public static void main(String[] args) {
>>>>>>>>>>>>>>>         PipelineOptionsFactory.*register*(
>>>>>>>>>>>>>>> DataflowPipelineOptions.class);
>>>>>>>>>>>>>>>         DataflowPipelineOptions options =
>>>>>>>>>>>>>>> PipelineOptionsFactory.*fromArgs*(args).as(
>>>>>>>>>>>>>>> DataflowPipelineOptions.class);
>>>>>>>>>>>>>>>         options.setFilesToStage(
>>>>>>>>>>>>>>>                 Arrays.*stream*(System.*getProperty*(
>>>>>>>>>>>>>>> "java.class.path").
>>>>>>>>>>>>>>>                         split(File.*pathSeparator*)).
>>>>>>>>>>>>>>>                         map(entry -> (new
>>>>>>>>>>>>>>> File(entry)).toString()).collect(Collectors.*toList*()));
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>         Pipeline pipeline = Pipeline.*create*(options);
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>         Schema nestedSchema = Schema.*builder*().addField(
>>>>>>>>>>>>>>> Schema.Field.*nullable*("field1_0", Schema.FieldType.
>>>>>>>>>>>>>>> *STRING*)).build();
>>>>>>>>>>>>>>>         Schema.Field field = Schema.Field.*nullable*(
>>>>>>>>>>>>>>> "field0_0", Schema.FieldType.*STRING*);
>>>>>>>>>>>>>>>         Schema.Field nested = Schema.Field.*nullable*(
>>>>>>>>>>>>>>> "field0_1", Schema.FieldType.*row*(nestedSchema));
>>>>>>>>>>>>>>>         Schema.Field runner = Schema.Field.*nullable*(
>>>>>>>>>>>>>>> "field0_2", Schema.FieldType.*STRING*);
>>>>>>>>>>>>>>>         Schema rowSchema = Schema.*builder*()
>>>>>>>>>>>>>>>                 .addFields(field, nested, runner)
>>>>>>>>>>>>>>>                 .build();
>>>>>>>>>>>>>>>         Row testRow = Row.*withSchema*(rowSchema
>>>>>>>>>>>>>>> ).attachValues("value0_0", Row.*withSchema*(nestedSchema
>>>>>>>>>>>>>>> ).attachValues("value1_0"), options.getRunner().toString());
>>>>>>>>>>>>>>>         pipeline
>>>>>>>>>>>>>>>                 .apply(Create.*of*(testRow).withRowSchema(
>>>>>>>>>>>>>>> rowSchema))
>>>>>>>>>>>>>>>                 .apply(RenameFields.<Row>*create*()
>>>>>>>>>>>>>>>                         .rename("field0_0", "stringField")
>>>>>>>>>>>>>>>                         .rename("field0_1", "nestedField")
>>>>>>>>>>>>>>>                         .rename("field0_1.field1_0",
>>>>>>>>>>>>>>> "nestedStringField")
>>>>>>>>>>>>>>>                         .rename("field0_2", "runner"))
>>>>>>>>>>>>>>>                 .apply(BigQueryIO.<Row>*write*()
>>>>>>>>>>>>>>>                         .to(new
>>>>>>>>>>>>>>> TableReference().setProjectId("lt-dia-lake-exp-raw"
>>>>>>>>>>>>>>> ).setDatasetId("prototypes").setTableId("matto_renameFields"
>>>>>>>>>>>>>>> ))
>>>>>>>>>>>>>>>                         .withCreateDisposition(BigQueryIO.
>>>>>>>>>>>>>>> Write.CreateDisposition.*CREATE_IF_NEEDED*)
>>>>>>>>>>>>>>>                         .withWriteDisposition(BigQueryIO.
>>>>>>>>>>>>>>> Write.WriteDisposition.*WRITE_APPEND*)
>>>>>>>>>>>>>>>                         .withSchemaUpdateOptions(new
>>>>>>>>>>>>>>> HashSet<>(*asList*(BigQueryIO.Write.SchemaUpdateOption.
>>>>>>>>>>>>>>> *ALLOW_FIELD_ADDITION*)))
>>>>>>>>>>>>>>>                         .useBeamSchema());
>>>>>>>>>>>>>>>         pipeline.run();
>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>

Reply via email to