Would it be caught by CoderProperties?

Kenn

On Wed, Jun 2, 2021 at 8:16 AM Reuven Lax <[email protected]> wrote:

> I don't think this bug is schema specific - we created a Java object that
> is inconsistent with its encoded form, which could happen to any transform.
>
> This does seem to be a gap in DirectRunner testing though. It also makes
> it hard to test using PAssert, as I believe that puts everything in a side
> input, forcing an encoding/decoding.
>
> On Wed, Jun 2, 2021 at 8:12 AM Brian Hulette <[email protected]> wrote:
>
>> +dev <[email protected]>
>>
>> > I bet the DirectRunner is encoding and decoding in between, which fixes
>> the object.
>>
>> Do we need better testing of schema-aware (and potentially other
>> built-in) transforms in the face of fusion to root out issues like this?
>>
>> Brian
>>
>> On Wed, Jun 2, 2021 at 5:13 AM Matthew Ouyang <[email protected]>
>> wrote:
>>
>>> I have some other work-related things I need to do this week, so I will
>>> likely report back on this over the weekend.  Thank you for the
>>> explanation.  It makes perfect sense now.
>>>
>>> On Tue, Jun 1, 2021 at 11:18 PM Reuven Lax <[email protected]> wrote:
>>>
>>>> Some more context - the problem is that RenameFields outputs (in this
>>>> case) Java Row objects that are inconsistent with the actual schema.
>>>> For example if you have the following schema:
>>>>
>>>> Row {
>>>>    field1: Row {
>>>>       field2: string
>>>>     }
>>>> }
>>>>
>>>> And rename field1.field2 -> renamed, you'll get the following schema
>>>>
>>>> Row {
>>>>   field1: Row {
>>>>      renamed: string
>>>>    }
>>>> }
>>>>
>>>> However the Java object for the _nested_ row will return the old schema
>>>> if getSchema() is called on it. This is because we only update the schema
>>>> on the top-level row.
>>>>
>>>> I think this explains why your test works in the direct runner. If the
>>>> row ever goes through an encode/decode path, it will come back correct. The
>>>> original incorrect Java objects are no longer around, and new (consistent)
>>>> objects are constructed from the raw data and the PCollection schema.
>>>> Dataflow tends to fuse ParDos together, so the following ParDo will see the
>>>> incorrect Row object. I bet the DirectRunner is encoding and decoding in
>>>> between, which fixes the object.
>>>>
>>>> You can validate this theory by forcing a shuffle after RenameFields
>>>> using Reshufflle. It should fix the issue If it does, let me know and I'll
>>>> work on a fix to RenameFields.
>>>>
>>>> On Tue, Jun 1, 2021 at 7:39 PM Reuven Lax <[email protected]> wrote:
>>>>
>>>>> Aha, yes this indeed another bug in the transform. The schema is set
>>>>> on the top-level Row but not on any nested rows.
>>>>>
>>>>> On Tue, Jun 1, 2021 at 6:37 PM Matthew Ouyang <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> Thank you everyone for your input.  I believe it will be easiest to
>>>>>> respond to all feedback in a single message rather than messages per 
>>>>>> person.
>>>>>>
>>>>>>    - NeedsRunner - The tests are run eventually, so obviously all
>>>>>>    good on my end.  I was trying to run the smallest subset of test cases
>>>>>>    possible and didn't venture beyond `gradle test`.
>>>>>>    - Stack Trace - There wasn't any unfortunately because no
>>>>>>    exception thrown in the code.  The Beam Row was translated into a BQ
>>>>>>    TableRow and an insertion was attempted.  The error "message" was 
>>>>>> part of
>>>>>>    the response JSON that came back as a result of a request against the 
>>>>>> BQ
>>>>>>    API.
>>>>>>    - Desired Behaviour - (field0_1.field1_0, nestedStringField) ->
>>>>>>    field0_1.nestedStringField is what I am looking for.
>>>>>>    - Info Logging Findings (In Lieu of a Stack Trace)
>>>>>>       - The Beam Schema was as expected with all renames applied.
>>>>>>       - The example I provided was heavily stripped down in order to
>>>>>>       isolate the problem.  My work example which a bit impractical 
>>>>>> because it's
>>>>>>       part of some generic tooling has 4 levels of nesting and also 
>>>>>> produces the
>>>>>>       correct output too.
>>>>>>       - BigQueryUtils.toTableRow(Row) returns the expected TableRow
>>>>>>       in DirectRunner.  In DataflowRunner however, only the top-level 
>>>>>> renames
>>>>>>       were reflected in the TableRow and all renames in the nested 
>>>>>> fields weren't.
>>>>>>       - BigQueryUtils.toTableRow(Row) recurses on the Row values and
>>>>>>       uses the Row.schema to get the field names.  This makes sense to 
>>>>>> me, but if
>>>>>>       a value is actually a Row then its schema appears to be 
>>>>>> inconsistent with
>>>>>>       the top-level schema
>>>>>>    - My Current Workaround - I forked RenameFields and replaced the
>>>>>>    attachValues in expand method to be a "deep" rename.  This is 
>>>>>> obviously
>>>>>>    inefficient and I will not be submitting a PR for that.
>>>>>>    - JIRA ticket - https://issues.apache.org/jira/browse/BEAM-12442
>>>>>>
>>>>>>
>>>>>> On Tue, Jun 1, 2021 at 5:51 PM Reuven Lax <[email protected]> wrote:
>>>>>>
>>>>>>> This transform is the same across all runners. A few comments on the
>>>>>>> test:
>>>>>>>
>>>>>>>   - Using attachValues directly is error prone (per the comment on
>>>>>>> the method). I recommend using the withFieldValue builders instead.
>>>>>>>   - I recommend capturing the RenameFields PCollection into a local
>>>>>>> variable of type PCollection<Row> and printing out the schema (which you
>>>>>>> can get using the PCollection.getSchema method) to ensure that the 
>>>>>>> output
>>>>>>> schema looks like you expect.
>>>>>>>    - RenameFields doesn't flatten. So renaming field0_1.field1_0 - >
>>>>>>> nestedStringField results in field0_1.nestedStringField; if you wanted 
>>>>>>> to
>>>>>>> flatten, then the better transform would be
>>>>>>> Select.fieldNameAs("field0_1.field1_0", nestedStringField).
>>>>>>>
>>>>>>> This all being said, eyeballing the implementation of RenameFields
>>>>>>> makes me think that it is buggy in the case where you specify a 
>>>>>>> top-level
>>>>>>> field multiple times like you do. I think it is simply adding the 
>>>>>>> top-level
>>>>>>> field into the output schema multiple times, and the second time is with
>>>>>>> the field0_1 base name; I have no idea why your test doesn't catch this 
>>>>>>> in
>>>>>>> the DirectRunner, as it's equally broken there. Could you file a JIRA 
>>>>>>> about
>>>>>>> this issue and assign it to me?
>>>>>>>
>>>>>>> Reuven
>>>>>>>
>>>>>>> On Tue, Jun 1, 2021 at 12:47 PM Kenneth Knowles <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Jun 1, 2021 at 12:42 PM Brian Hulette <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Matthew,
>>>>>>>>>
>>>>>>>>> > The unit tests also seem to be disabled for this as well and so
>>>>>>>>> I don’t know if the PTransform behaves as expected.
>>>>>>>>>
>>>>>>>>> The exclusion for NeedsRunner tests is just a quirk in our testing
>>>>>>>>> framework. NeedsRunner indicates that a test suite can't be executed 
>>>>>>>>> with
>>>>>>>>> the SDK alone, it needs a runner. So that exclusion just makes sure we
>>>>>>>>> don't run the test when we're verifying the SDK by itself in the
>>>>>>>>> :sdks:java:core:test task. The test is still run in other tasks where 
>>>>>>>>> we
>>>>>>>>> have a runner, most notably in the Java PreCommit [1], where we run 
>>>>>>>>> it as
>>>>>>>>> part of the :runners:direct-java:test task.
>>>>>>>>>
>>>>>>>>> That being said, we may only run these tests continuously with the
>>>>>>>>> DirectRunner, I'm not sure if we test them on all the runners like we 
>>>>>>>>> do
>>>>>>>>> with ValidatesRunner tests.
>>>>>>>>>
>>>>>>>>
>>>>>>>> That is correct. The tests are tests _of the transform_ so they run
>>>>>>>> only on the DirectRunner. They are not tests of the runner, which is 
>>>>>>>> only
>>>>>>>> responsible for correctly implementing Beam's primitives. The transform
>>>>>>>> should not behave differently on different runners, except for 
>>>>>>>> fundamental
>>>>>>>> differences in how they schedule work and checkpoint.
>>>>>>>>
>>>>>>>> Kenn
>>>>>>>>
>>>>>>>>
>>>>>>>>> > The error message I’m receiving, : Error while reading data,
>>>>>>>>> error message: JSON parsing error in row starting at position 0: No 
>>>>>>>>> such
>>>>>>>>> field: nestedField.field1_0, suggests the BigQuery is trying to
>>>>>>>>> use the original name for the nested field and not the substitute 
>>>>>>>>> name.
>>>>>>>>>
>>>>>>>>> Is there a stacktrace associated with this error? It would be
>>>>>>>>> helpful to see where the error is coming from.
>>>>>>>>>
>>>>>>>>> Brian
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>> https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/4101/testReport/org.apache.beam.sdk.schemas.transforms/RenameFieldsTest/
>>>>>>>>>
>>>>>>>>> On Mon, May 31, 2021 at 5:02 PM Matthew Ouyang <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> I’m trying to use the RenameFields transform prior to inserting
>>>>>>>>>> into BigQuery on nested fields.  Insertion into BigQuery is 
>>>>>>>>>> successful with
>>>>>>>>>> DirectRunner, but DataflowRunner has an issue with renamed nested 
>>>>>>>>>> fields
>>>>>>>>>>  The error message I’m receiving, : Error while reading data,
>>>>>>>>>> error message: JSON parsing error in row starting at position 0: No 
>>>>>>>>>> such
>>>>>>>>>> field: nestedField.field1_0, suggests the BigQuery is trying to
>>>>>>>>>> use the original name for the nested field and not the substitute 
>>>>>>>>>> name.
>>>>>>>>>>
>>>>>>>>>> The code for RenameFields seems simple enough but does it behave
>>>>>>>>>> differently in different runners?  Will a deep attachValues be 
>>>>>>>>>> necessary in
>>>>>>>>>> order get the nested renames to work across all runners? Is there 
>>>>>>>>>> something
>>>>>>>>>> wrong in my code?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/RenameFields.java#L186
>>>>>>>>>>
>>>>>>>>>> The unit tests also seem to be disabled for this as well and so I
>>>>>>>>>> don’t know if the PTransform behaves as expected.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/build.gradle#L67
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/RenameFieldsTest.java
>>>>>>>>>>
>>>>>>>>>> package ca.loblaw.cerebro.PipelineControl;
>>>>>>>>>>>
>>>>>>>>>>> import com.google.api.services.bigquery.model.TableReference;
>>>>>>>>>>> import
>>>>>>>>>>> org.apache.beam.runners.dataflow.options.DataflowPipelineOptions
>>>>>>>>>>> ;
>>>>>>>>>>> import org.apache.beam.sdk.Pipeline;
>>>>>>>>>>> import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
>>>>>>>>>>> import org.apache.beam.sdk.options.PipelineOptionsFactory;
>>>>>>>>>>> import org.apache.beam.sdk.schemas.Schema;
>>>>>>>>>>> import org.apache.beam.sdk.schemas.transforms.RenameFields;
>>>>>>>>>>> import org.apache.beam.sdk.transforms.Create;
>>>>>>>>>>> import org.apache.beam.sdk.values.Row;
>>>>>>>>>>>
>>>>>>>>>>> import java.io.File;
>>>>>>>>>>> import java.util.Arrays;
>>>>>>>>>>> import java.util.HashSet;
>>>>>>>>>>> import java.util.stream.Collectors;
>>>>>>>>>>>
>>>>>>>>>>> import static java.util.Arrays.*asList*;
>>>>>>>>>>>
>>>>>>>>>>> public class BQRenameFields {
>>>>>>>>>>>     public static void main(String[] args) {
>>>>>>>>>>>         PipelineOptionsFactory.*register*(
>>>>>>>>>>> DataflowPipelineOptions.class);
>>>>>>>>>>>         DataflowPipelineOptions options = PipelineOptionsFactory
>>>>>>>>>>> .*fromArgs*(args).as(DataflowPipelineOptions.class);
>>>>>>>>>>>         options.setFilesToStage(
>>>>>>>>>>>                 Arrays.*stream*(System.*getProperty*(
>>>>>>>>>>> "java.class.path").
>>>>>>>>>>>                         split(File.*pathSeparator*)).
>>>>>>>>>>>                         map(entry -> (new
>>>>>>>>>>> File(entry)).toString()).collect(Collectors.*toList*()));
>>>>>>>>>>>
>>>>>>>>>>>         Pipeline pipeline = Pipeline.*create*(options);
>>>>>>>>>>>
>>>>>>>>>>>         Schema nestedSchema = Schema.*builder*().addField(Schema
>>>>>>>>>>> .Field.*nullable*("field1_0", Schema.FieldType.*STRING*
>>>>>>>>>>> )).build();
>>>>>>>>>>>         Schema.Field field = Schema.Field.*nullable*("field0_0"
>>>>>>>>>>> , Schema.FieldType.*STRING*);
>>>>>>>>>>>         Schema.Field nested = Schema.Field.*nullable*("field0_1"
>>>>>>>>>>> , Schema.FieldType.*row*(nestedSchema));
>>>>>>>>>>>         Schema.Field runner = Schema.Field.*nullable*("field0_2"
>>>>>>>>>>> , Schema.FieldType.*STRING*);
>>>>>>>>>>>         Schema rowSchema = Schema.*builder*()
>>>>>>>>>>>                 .addFields(field, nested, runner)
>>>>>>>>>>>                 .build();
>>>>>>>>>>>         Row testRow = Row.*withSchema*(rowSchema).attachValues(
>>>>>>>>>>> "value0_0", Row.*withSchema*(nestedSchema).attachValues(
>>>>>>>>>>> "value1_0"), options.getRunner().toString());
>>>>>>>>>>>         pipeline
>>>>>>>>>>>                 .apply(Create.*of*(testRow).withRowSchema(
>>>>>>>>>>> rowSchema))
>>>>>>>>>>>                 .apply(RenameFields.<Row>*create*()
>>>>>>>>>>>                         .rename("field0_0", "stringField")
>>>>>>>>>>>                         .rename("field0_1", "nestedField")
>>>>>>>>>>>                         .rename("field0_1.field1_0",
>>>>>>>>>>> "nestedStringField")
>>>>>>>>>>>                         .rename("field0_2", "runner"))
>>>>>>>>>>>                 .apply(BigQueryIO.<Row>*write*()
>>>>>>>>>>>                         .to(new TableReference().setProjectId(
>>>>>>>>>>> "lt-dia-lake-exp-raw").setDatasetId("prototypes").setTableId(
>>>>>>>>>>> "matto_renameFields"))
>>>>>>>>>>>                         .withCreateDisposition(BigQueryIO.Write.
>>>>>>>>>>> CreateDisposition.*CREATE_IF_NEEDED*)
>>>>>>>>>>>                         .withWriteDisposition(BigQueryIO.Write.
>>>>>>>>>>> WriteDisposition.*WRITE_APPEND*)
>>>>>>>>>>>                         .withSchemaUpdateOptions(new HashSet<>(
>>>>>>>>>>> *asList*(BigQueryIO.Write.SchemaUpdateOption.
>>>>>>>>>>> *ALLOW_FIELD_ADDITION*)))
>>>>>>>>>>>                         .useBeamSchema());
>>>>>>>>>>>         pipeline.run();
>>>>>>>>>>>     }
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>

Reply via email to