+dev <d...@beam.apache.org>

> I bet the DirectRunner is encoding and decoding in between, which fixes
the object.

Do we need better testing of schema-aware (and potentially other built-in)
transforms in the face of fusion to root out issues like this?

Brian

On Wed, Jun 2, 2021 at 5:13 AM Matthew Ouyang <matthew.ouy...@gmail.com>
wrote:

> I have some other work-related things I need to do this week, so I will
> likely report back on this over the weekend.  Thank you for the
> explanation.  It makes perfect sense now.
>
> On Tue, Jun 1, 2021 at 11:18 PM Reuven Lax <re...@google.com> wrote:
>
>> Some more context - the problem is that RenameFields outputs (in this
>> case) Java Row objects that are inconsistent with the actual schema.
>> For example if you have the following schema:
>>
>> Row {
>>    field1: Row {
>>       field2: string
>>     }
>> }
>>
>> And rename field1.field2 -> renamed, you'll get the following schema
>>
>> Row {
>>   field1: Row {
>>      renamed: string
>>    }
>> }
>>
>> However the Java object for the _nested_ row will return the old schema
>> if getSchema() is called on it. This is because we only update the schema
>> on the top-level row.
>>
>> I think this explains why your test works in the direct runner. If the
>> row ever goes through an encode/decode path, it will come back correct. The
>> original incorrect Java objects are no longer around, and new (consistent)
>> objects are constructed from the raw data and the PCollection schema.
>> Dataflow tends to fuse ParDos together, so the following ParDo will see the
>> incorrect Row object. I bet the DirectRunner is encoding and decoding in
>> between, which fixes the object.
>>
>> You can validate this theory by forcing a shuffle after RenameFields
>> using Reshufflle. It should fix the issue If it does, let me know and I'll
>> work on a fix to RenameFields.
>>
>> On Tue, Jun 1, 2021 at 7:39 PM Reuven Lax <re...@google.com> wrote:
>>
>>> Aha, yes this indeed another bug in the transform. The schema is set on
>>> the top-level Row but not on any nested rows.
>>>
>>> On Tue, Jun 1, 2021 at 6:37 PM Matthew Ouyang <matthew.ouy...@gmail.com>
>>> wrote:
>>>
>>>> Thank you everyone for your input.  I believe it will be easiest to
>>>> respond to all feedback in a single message rather than messages per 
>>>> person.
>>>>
>>>>    - NeedsRunner - The tests are run eventually, so obviously all good
>>>>    on my end.  I was trying to run the smallest subset of test cases 
>>>> possible
>>>>    and didn't venture beyond `gradle test`.
>>>>    - Stack Trace - There wasn't any unfortunately because no exception
>>>>    thrown in the code.  The Beam Row was translated into a BQ TableRow and 
>>>> an
>>>>    insertion was attempted.  The error "message" was part of the response 
>>>> JSON
>>>>    that came back as a result of a request against the BQ API.
>>>>    - Desired Behaviour - (field0_1.field1_0, nestedStringField) ->
>>>>    field0_1.nestedStringField is what I am looking for.
>>>>    - Info Logging Findings (In Lieu of a Stack Trace)
>>>>       - The Beam Schema was as expected with all renames applied.
>>>>       - The example I provided was heavily stripped down in order to
>>>>       isolate the problem.  My work example which a bit impractical 
>>>> because it's
>>>>       part of some generic tooling has 4 levels of nesting and also 
>>>> produces the
>>>>       correct output too.
>>>>       - BigQueryUtils.toTableRow(Row) returns the expected TableRow in
>>>>       DirectRunner.  In DataflowRunner however, only the top-level renames 
>>>> were
>>>>       reflected in the TableRow and all renames in the nested fields 
>>>> weren't.
>>>>       - BigQueryUtils.toTableRow(Row) recurses on the Row values and
>>>>       uses the Row.schema to get the field names.  This makes sense to me, 
>>>> but if
>>>>       a value is actually a Row then its schema appears to be inconsistent 
>>>> with
>>>>       the top-level schema
>>>>    - My Current Workaround - I forked RenameFields and replaced the
>>>>    attachValues in expand method to be a "deep" rename.  This is obviously
>>>>    inefficient and I will not be submitting a PR for that.
>>>>    - JIRA ticket - https://issues.apache.org/jira/browse/BEAM-12442
>>>>
>>>>
>>>> On Tue, Jun 1, 2021 at 5:51 PM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> This transform is the same across all runners. A few comments on the
>>>>> test:
>>>>>
>>>>>   - Using attachValues directly is error prone (per the comment on the
>>>>> method). I recommend using the withFieldValue builders instead.
>>>>>   - I recommend capturing the RenameFields PCollection into a local
>>>>> variable of type PCollection<Row> and printing out the schema (which you
>>>>> can get using the PCollection.getSchema method) to ensure that the output
>>>>> schema looks like you expect.
>>>>>    - RenameFields doesn't flatten. So renaming field0_1.field1_0 - >
>>>>> nestedStringField results in field0_1.nestedStringField; if you wanted to
>>>>> flatten, then the better transform would be
>>>>> Select.fieldNameAs("field0_1.field1_0", nestedStringField).
>>>>>
>>>>> This all being said, eyeballing the implementation of RenameFields
>>>>> makes me think that it is buggy in the case where you specify a top-level
>>>>> field multiple times like you do. I think it is simply adding the 
>>>>> top-level
>>>>> field into the output schema multiple times, and the second time is with
>>>>> the field0_1 base name; I have no idea why your test doesn't catch this in
>>>>> the DirectRunner, as it's equally broken there. Could you file a JIRA 
>>>>> about
>>>>> this issue and assign it to me?
>>>>>
>>>>> Reuven
>>>>>
>>>>> On Tue, Jun 1, 2021 at 12:47 PM Kenneth Knowles <k...@apache.org>
>>>>> wrote:
>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Jun 1, 2021 at 12:42 PM Brian Hulette <bhule...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Matthew,
>>>>>>>
>>>>>>> > The unit tests also seem to be disabled for this as well and so I
>>>>>>> don’t know if the PTransform behaves as expected.
>>>>>>>
>>>>>>> The exclusion for NeedsRunner tests is just a quirk in our testing
>>>>>>> framework. NeedsRunner indicates that a test suite can't be executed 
>>>>>>> with
>>>>>>> the SDK alone, it needs a runner. So that exclusion just makes sure we
>>>>>>> don't run the test when we're verifying the SDK by itself in the
>>>>>>> :sdks:java:core:test task. The test is still run in other tasks where we
>>>>>>> have a runner, most notably in the Java PreCommit [1], where we run it 
>>>>>>> as
>>>>>>> part of the :runners:direct-java:test task.
>>>>>>>
>>>>>>> That being said, we may only run these tests continuously with the
>>>>>>> DirectRunner, I'm not sure if we test them on all the runners like we do
>>>>>>> with ValidatesRunner tests.
>>>>>>>
>>>>>>
>>>>>> That is correct. The tests are tests _of the transform_ so they run
>>>>>> only on the DirectRunner. They are not tests of the runner, which is only
>>>>>> responsible for correctly implementing Beam's primitives. The transform
>>>>>> should not behave differently on different runners, except for 
>>>>>> fundamental
>>>>>> differences in how they schedule work and checkpoint.
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>>
>>>>>>> > The error message I’m receiving, : Error while reading data,
>>>>>>> error message: JSON parsing error in row starting at position 0: No such
>>>>>>> field: nestedField.field1_0, suggests the BigQuery is trying to use
>>>>>>> the original name for the nested field and not the substitute name.
>>>>>>>
>>>>>>> Is there a stacktrace associated with this error? It would be
>>>>>>> helpful to see where the error is coming from.
>>>>>>>
>>>>>>> Brian
>>>>>>>
>>>>>>>
>>>>>>> [1]
>>>>>>> https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/4101/testReport/org.apache.beam.sdk.schemas.transforms/RenameFieldsTest/
>>>>>>>
>>>>>>> On Mon, May 31, 2021 at 5:02 PM Matthew Ouyang <
>>>>>>> matthew.ouy...@gmail.com> wrote:
>>>>>>>
>>>>>>>> I’m trying to use the RenameFields transform prior to inserting
>>>>>>>> into BigQuery on nested fields.  Insertion into BigQuery is successful 
>>>>>>>> with
>>>>>>>> DirectRunner, but DataflowRunner has an issue with renamed nested 
>>>>>>>> fields
>>>>>>>>  The error message I’m receiving, : Error while reading data,
>>>>>>>> error message: JSON parsing error in row starting at position 0: No 
>>>>>>>> such
>>>>>>>> field: nestedField.field1_0, suggests the BigQuery is trying to
>>>>>>>> use the original name for the nested field and not the substitute name.
>>>>>>>>
>>>>>>>> The code for RenameFields seems simple enough but does it behave
>>>>>>>> differently in different runners?  Will a deep attachValues be 
>>>>>>>> necessary in
>>>>>>>> order get the nested renames to work across all runners? Is there 
>>>>>>>> something
>>>>>>>> wrong in my code?
>>>>>>>>
>>>>>>>>
>>>>>>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/RenameFields.java#L186
>>>>>>>>
>>>>>>>> The unit tests also seem to be disabled for this as well and so I
>>>>>>>> don’t know if the PTransform behaves as expected.
>>>>>>>>
>>>>>>>>
>>>>>>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/build.gradle#L67
>>>>>>>>
>>>>>>>>
>>>>>>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/RenameFieldsTest.java
>>>>>>>>
>>>>>>>> package ca.loblaw.cerebro.PipelineControl;
>>>>>>>>>
>>>>>>>>> import com.google.api.services.bigquery.model.TableReference;
>>>>>>>>> import
>>>>>>>>> org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
>>>>>>>>> import org.apache.beam.sdk.Pipeline;
>>>>>>>>> import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
>>>>>>>>> import org.apache.beam.sdk.options.PipelineOptionsFactory;
>>>>>>>>> import org.apache.beam.sdk.schemas.Schema;
>>>>>>>>> import org.apache.beam.sdk.schemas.transforms.RenameFields;
>>>>>>>>> import org.apache.beam.sdk.transforms.Create;
>>>>>>>>> import org.apache.beam.sdk.values.Row;
>>>>>>>>>
>>>>>>>>> import java.io.File;
>>>>>>>>> import java.util.Arrays;
>>>>>>>>> import java.util.HashSet;
>>>>>>>>> import java.util.stream.Collectors;
>>>>>>>>>
>>>>>>>>> import static java.util.Arrays.*asList*;
>>>>>>>>>
>>>>>>>>> public class BQRenameFields {
>>>>>>>>>     public static void main(String[] args) {
>>>>>>>>>         PipelineOptionsFactory.*register*(DataflowPipelineOptions.
>>>>>>>>> class);
>>>>>>>>>         DataflowPipelineOptions options = PipelineOptionsFactory.
>>>>>>>>> *fromArgs*(args).as(DataflowPipelineOptions.class);
>>>>>>>>>         options.setFilesToStage(
>>>>>>>>>                 Arrays.*stream*(System.*getProperty*(
>>>>>>>>> "java.class.path").
>>>>>>>>>                         split(File.*pathSeparator*)).
>>>>>>>>>                         map(entry -> (new
>>>>>>>>> File(entry)).toString()).collect(Collectors.*toList*()));
>>>>>>>>>
>>>>>>>>>         Pipeline pipeline = Pipeline.*create*(options);
>>>>>>>>>
>>>>>>>>>         Schema nestedSchema = Schema.*builder*().addField(Schema.
>>>>>>>>> Field.*nullable*("field1_0", Schema.FieldType.*STRING*)).build();
>>>>>>>>>         Schema.Field field = Schema.Field.*nullable*("field0_0",
>>>>>>>>> Schema.FieldType.*STRING*);
>>>>>>>>>         Schema.Field nested = Schema.Field.*nullable*("field0_1",
>>>>>>>>> Schema.FieldType.*row*(nestedSchema));
>>>>>>>>>         Schema.Field runner = Schema.Field.*nullable*("field0_2",
>>>>>>>>> Schema.FieldType.*STRING*);
>>>>>>>>>         Schema rowSchema = Schema.*builder*()
>>>>>>>>>                 .addFields(field, nested, runner)
>>>>>>>>>                 .build();
>>>>>>>>>         Row testRow = Row.*withSchema*(rowSchema).attachValues(
>>>>>>>>> "value0_0", Row.*withSchema*(nestedSchema).attachValues("value1_0"
>>>>>>>>> ), options.getRunner().toString());
>>>>>>>>>         pipeline
>>>>>>>>>                 .apply(Create.*of*(testRow).withRowSchema(
>>>>>>>>> rowSchema))
>>>>>>>>>                 .apply(RenameFields.<Row>*create*()
>>>>>>>>>                         .rename("field0_0", "stringField")
>>>>>>>>>                         .rename("field0_1", "nestedField")
>>>>>>>>>                         .rename("field0_1.field1_0",
>>>>>>>>> "nestedStringField")
>>>>>>>>>                         .rename("field0_2", "runner"))
>>>>>>>>>                 .apply(BigQueryIO.<Row>*write*()
>>>>>>>>>                         .to(new TableReference().setProjectId(
>>>>>>>>> "lt-dia-lake-exp-raw").setDatasetId("prototypes").setTableId(
>>>>>>>>> "matto_renameFields"))
>>>>>>>>>                         .withCreateDisposition(BigQueryIO.Write.
>>>>>>>>> CreateDisposition.*CREATE_IF_NEEDED*)
>>>>>>>>>                         .withWriteDisposition(BigQueryIO.Write.
>>>>>>>>> WriteDisposition.*WRITE_APPEND*)
>>>>>>>>>                         .withSchemaUpdateOptions(new HashSet<>(
>>>>>>>>> *asList*(BigQueryIO.Write.SchemaUpdateOption.
>>>>>>>>> *ALLOW_FIELD_ADDITION*)))
>>>>>>>>>                         .useBeamSchema());
>>>>>>>>>         pipeline.run();
>>>>>>>>>     }
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>

Reply via email to