Could the DirectRunner just do an equality check whenever it does an
encode/decode? It sounds like it's already effectively performing
a CoderProperties.coderDecodeEncodeEqual for every element, just omitting
the equality check.

On Wed, Jun 2, 2021 at 12:04 PM Reuven Lax <re...@google.com> wrote:

> There is no bug in the Coder itself, so that wouldn't catch it. We could
> insert CoderProperties.coderDecodeEncodeEqual in a subsequent ParDo, but if
> the Direct runner already does an encode/decode before that ParDo, then
> that would have fixed the problem before we could see it.
>
> On Wed, Jun 2, 2021 at 11:53 AM Kenneth Knowles <k...@apache.org> wrote:
>
>> Would it be caught by CoderProperties?
>>
>> Kenn
>>
>> On Wed, Jun 2, 2021 at 8:16 AM Reuven Lax <re...@google.com> wrote:
>>
>>> I don't think this bug is schema specific - we created a Java object
>>> that is inconsistent with its encoded form, which could happen to any
>>> transform.
>>>
>>> This does seem to be a gap in DirectRunner testing though. It also makes
>>> it hard to test using PAssert, as I believe that puts everything in a side
>>> input, forcing an encoding/decoding.
>>>
>>> On Wed, Jun 2, 2021 at 8:12 AM Brian Hulette <bhule...@google.com>
>>> wrote:
>>>
>>>> +dev <d...@beam.apache.org>
>>>>
>>>> > I bet the DirectRunner is encoding and decoding in between, which
>>>> fixes the object.
>>>>
>>>> Do we need better testing of schema-aware (and potentially other
>>>> built-in) transforms in the face of fusion to root out issues like this?
>>>>
>>>> Brian
>>>>
>>>> On Wed, Jun 2, 2021 at 5:13 AM Matthew Ouyang <matthew.ouy...@gmail.com>
>>>> wrote:
>>>>
>>>>> I have some other work-related things I need to do this week, so I
>>>>> will likely report back on this over the weekend.  Thank you for the
>>>>> explanation.  It makes perfect sense now.
>>>>>
>>>>> On Tue, Jun 1, 2021 at 11:18 PM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> Some more context - the problem is that RenameFields outputs (in this
>>>>>> case) Java Row objects that are inconsistent with the actual schema.
>>>>>> For example if you have the following schema:
>>>>>>
>>>>>> Row {
>>>>>>    field1: Row {
>>>>>>       field2: string
>>>>>>     }
>>>>>> }
>>>>>>
>>>>>> And rename field1.field2 -> renamed, you'll get the following schema
>>>>>>
>>>>>> Row {
>>>>>>   field1: Row {
>>>>>>      renamed: string
>>>>>>    }
>>>>>> }
>>>>>>
>>>>>> However the Java object for the _nested_ row will return the old
>>>>>> schema if getSchema() is called on it. This is because we only update the
>>>>>> schema on the top-level row.
>>>>>>
>>>>>> I think this explains why your test works in the direct runner. If
>>>>>> the row ever goes through an encode/decode path, it will come back 
>>>>>> correct.
>>>>>> The original incorrect Java objects are no longer around, and new
>>>>>> (consistent) objects are constructed from the raw data and the 
>>>>>> PCollection
>>>>>> schema. Dataflow tends to fuse ParDos together, so the following ParDo 
>>>>>> will
>>>>>> see the incorrect Row object. I bet the DirectRunner is encoding and
>>>>>> decoding in between, which fixes the object.
>>>>>>
>>>>>> You can validate this theory by forcing a shuffle after RenameFields
>>>>>> using Reshufflle. It should fix the issue If it does, let me know and 
>>>>>> I'll
>>>>>> work on a fix to RenameFields.
>>>>>>
>>>>>> On Tue, Jun 1, 2021 at 7:39 PM Reuven Lax <re...@google.com> wrote:
>>>>>>
>>>>>>> Aha, yes this indeed another bug in the transform. The schema is set
>>>>>>> on the top-level Row but not on any nested rows.
>>>>>>>
>>>>>>> On Tue, Jun 1, 2021 at 6:37 PM Matthew Ouyang <
>>>>>>> matthew.ouy...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Thank you everyone for your input.  I believe it will be easiest to
>>>>>>>> respond to all feedback in a single message rather than messages per 
>>>>>>>> person.
>>>>>>>>
>>>>>>>>    - NeedsRunner - The tests are run eventually, so obviously all
>>>>>>>>    good on my end.  I was trying to run the smallest subset of test 
>>>>>>>> cases
>>>>>>>>    possible and didn't venture beyond `gradle test`.
>>>>>>>>    - Stack Trace - There wasn't any unfortunately because no
>>>>>>>>    exception thrown in the code.  The Beam Row was translated into a BQ
>>>>>>>>    TableRow and an insertion was attempted.  The error "message" was 
>>>>>>>> part of
>>>>>>>>    the response JSON that came back as a result of a request against 
>>>>>>>> the BQ
>>>>>>>>    API.
>>>>>>>>    - Desired Behaviour - (field0_1.field1_0, nestedStringField) ->
>>>>>>>>    field0_1.nestedStringField is what I am looking for.
>>>>>>>>    - Info Logging Findings (In Lieu of a Stack Trace)
>>>>>>>>       - The Beam Schema was as expected with all renames applied.
>>>>>>>>       - The example I provided was heavily stripped down in order
>>>>>>>>       to isolate the problem.  My work example which a bit impractical 
>>>>>>>> because
>>>>>>>>       it's part of some generic tooling has 4 levels of nesting and 
>>>>>>>> also produces
>>>>>>>>       the correct output too.
>>>>>>>>       - BigQueryUtils.toTableRow(Row) returns the expected
>>>>>>>>       TableRow in DirectRunner.  In DataflowRunner however, only the 
>>>>>>>> top-level
>>>>>>>>       renames were reflected in the TableRow and all renames in the 
>>>>>>>> nested fields
>>>>>>>>       weren't.
>>>>>>>>       - BigQueryUtils.toTableRow(Row) recurses on the Row values
>>>>>>>>       and uses the Row.schema to get the field names.  This makes 
>>>>>>>> sense to me,
>>>>>>>>       but if a value is actually a Row then its schema appears to be 
>>>>>>>> inconsistent
>>>>>>>>       with the top-level schema
>>>>>>>>    - My Current Workaround - I forked RenameFields and replaced
>>>>>>>>    the attachValues in expand method to be a "deep" rename.  This is 
>>>>>>>> obviously
>>>>>>>>    inefficient and I will not be submitting a PR for that.
>>>>>>>>    - JIRA ticket - https://issues.apache.org/jira/browse/BEAM-12442
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Jun 1, 2021 at 5:51 PM Reuven Lax <re...@google.com> wrote:
>>>>>>>>
>>>>>>>>> This transform is the same across all runners. A few comments on
>>>>>>>>> the test:
>>>>>>>>>
>>>>>>>>>   - Using attachValues directly is error prone (per the comment on
>>>>>>>>> the method). I recommend using the withFieldValue builders instead.
>>>>>>>>>   - I recommend capturing the RenameFields PCollection into a
>>>>>>>>> local variable of type PCollection<Row> and printing out the schema 
>>>>>>>>> (which
>>>>>>>>> you can get using the PCollection.getSchema method) to ensure that the
>>>>>>>>> output schema looks like you expect.
>>>>>>>>>    - RenameFields doesn't flatten. So renaming field0_1.field1_0 -
>>>>>>>>> > nestedStringField results in field0_1.nestedStringField; if you 
>>>>>>>>> > wanted to
>>>>>>>>> flatten, then the better transform would be
>>>>>>>>> Select.fieldNameAs("field0_1.field1_0", nestedStringField).
>>>>>>>>>
>>>>>>>>> This all being said, eyeballing the implementation of RenameFields
>>>>>>>>> makes me think that it is buggy in the case where you specify a 
>>>>>>>>> top-level
>>>>>>>>> field multiple times like you do. I think it is simply adding the 
>>>>>>>>> top-level
>>>>>>>>> field into the output schema multiple times, and the second time is 
>>>>>>>>> with
>>>>>>>>> the field0_1 base name; I have no idea why your test doesn't catch 
>>>>>>>>> this in
>>>>>>>>> the DirectRunner, as it's equally broken there. Could you file a JIRA 
>>>>>>>>> about
>>>>>>>>> this issue and assign it to me?
>>>>>>>>>
>>>>>>>>> Reuven
>>>>>>>>>
>>>>>>>>> On Tue, Jun 1, 2021 at 12:47 PM Kenneth Knowles <k...@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Tue, Jun 1, 2021 at 12:42 PM Brian Hulette <
>>>>>>>>>> bhule...@google.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Matthew,
>>>>>>>>>>>
>>>>>>>>>>> > The unit tests also seem to be disabled for this as well and
>>>>>>>>>>> so I don’t know if the PTransform behaves as expected.
>>>>>>>>>>>
>>>>>>>>>>> The exclusion for NeedsRunner tests is just a quirk in our
>>>>>>>>>>> testing framework. NeedsRunner indicates that a test suite can't be
>>>>>>>>>>> executed with the SDK alone, it needs a runner. So that exclusion 
>>>>>>>>>>> just
>>>>>>>>>>> makes sure we don't run the test when we're verifying the SDK by 
>>>>>>>>>>> itself in
>>>>>>>>>>> the :sdks:java:core:test task. The test is still run in other tasks 
>>>>>>>>>>> where
>>>>>>>>>>> we have a runner, most notably in the Java PreCommit [1], where we 
>>>>>>>>>>> run it
>>>>>>>>>>> as part of the :runners:direct-java:test task.
>>>>>>>>>>>
>>>>>>>>>>> That being said, we may only run these tests continuously with
>>>>>>>>>>> the DirectRunner, I'm not sure if we test them on all the runners 
>>>>>>>>>>> like we
>>>>>>>>>>> do with ValidatesRunner tests.
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> That is correct. The tests are tests _of the transform_ so they
>>>>>>>>>> run only on the DirectRunner. They are not tests of the runner, 
>>>>>>>>>> which is
>>>>>>>>>> only responsible for correctly implementing Beam's primitives. The
>>>>>>>>>> transform should not behave differently on different runners, except 
>>>>>>>>>> for
>>>>>>>>>> fundamental differences in how they schedule work and checkpoint.
>>>>>>>>>>
>>>>>>>>>> Kenn
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>> > The error message I’m receiving, : Error while reading data,
>>>>>>>>>>> error message: JSON parsing error in row starting at position 0: No 
>>>>>>>>>>> such
>>>>>>>>>>> field: nestedField.field1_0, suggests the BigQuery is trying to
>>>>>>>>>>> use the original name for the nested field and not the substitute 
>>>>>>>>>>> name.
>>>>>>>>>>>
>>>>>>>>>>> Is there a stacktrace associated with this error? It would be
>>>>>>>>>>> helpful to see where the error is coming from.
>>>>>>>>>>>
>>>>>>>>>>> Brian
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> [1]
>>>>>>>>>>> https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/4101/testReport/org.apache.beam.sdk.schemas.transforms/RenameFieldsTest/
>>>>>>>>>>>
>>>>>>>>>>> On Mon, May 31, 2021 at 5:02 PM Matthew Ouyang <
>>>>>>>>>>> matthew.ouy...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> I’m trying to use the RenameFields transform prior to inserting
>>>>>>>>>>>> into BigQuery on nested fields.  Insertion into BigQuery is 
>>>>>>>>>>>> successful with
>>>>>>>>>>>> DirectRunner, but DataflowRunner has an issue with renamed nested 
>>>>>>>>>>>> fields
>>>>>>>>>>>>  The error message I’m receiving, : Error while reading data,
>>>>>>>>>>>> error message: JSON parsing error in row starting at position 0: 
>>>>>>>>>>>> No such
>>>>>>>>>>>> field: nestedField.field1_0, suggests the BigQuery is trying
>>>>>>>>>>>> to use the original name for the nested field and not the 
>>>>>>>>>>>> substitute name.
>>>>>>>>>>>>
>>>>>>>>>>>> The code for RenameFields seems simple enough but does it
>>>>>>>>>>>> behave differently in different runners?  Will a deep attachValues 
>>>>>>>>>>>> be
>>>>>>>>>>>> necessary in order get the nested renames to work across all 
>>>>>>>>>>>> runners? Is
>>>>>>>>>>>> there something wrong in my code?
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/RenameFields.java#L186
>>>>>>>>>>>>
>>>>>>>>>>>> The unit tests also seem to be disabled for this as well and so
>>>>>>>>>>>> I don’t know if the PTransform behaves as expected.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/build.gradle#L67
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/RenameFieldsTest.java
>>>>>>>>>>>>
>>>>>>>>>>>> package ca.loblaw.cerebro.PipelineControl;
>>>>>>>>>>>>>
>>>>>>>>>>>>> import com.google.api.services.bigquery.model.TableReference;
>>>>>>>>>>>>> import
>>>>>>>>>>>>> org.apache.beam.runners.dataflow.options.DataflowPipelineOptions
>>>>>>>>>>>>> ;
>>>>>>>>>>>>> import org.apache.beam.sdk.Pipeline;
>>>>>>>>>>>>> import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
>>>>>>>>>>>>> import org.apache.beam.sdk.options.PipelineOptionsFactory;
>>>>>>>>>>>>> import org.apache.beam.sdk.schemas.Schema;
>>>>>>>>>>>>> import org.apache.beam.sdk.schemas.transforms.RenameFields;
>>>>>>>>>>>>> import org.apache.beam.sdk.transforms.Create;
>>>>>>>>>>>>> import org.apache.beam.sdk.values.Row;
>>>>>>>>>>>>>
>>>>>>>>>>>>> import java.io.File;
>>>>>>>>>>>>> import java.util.Arrays;
>>>>>>>>>>>>> import java.util.HashSet;
>>>>>>>>>>>>> import java.util.stream.Collectors;
>>>>>>>>>>>>>
>>>>>>>>>>>>> import static java.util.Arrays.*asList*;
>>>>>>>>>>>>>
>>>>>>>>>>>>> public class BQRenameFields {
>>>>>>>>>>>>>     public static void main(String[] args) {
>>>>>>>>>>>>>         PipelineOptionsFactory.*register*(
>>>>>>>>>>>>> DataflowPipelineOptions.class);
>>>>>>>>>>>>>         DataflowPipelineOptions options =
>>>>>>>>>>>>> PipelineOptionsFactory.*fromArgs*(args).as(
>>>>>>>>>>>>> DataflowPipelineOptions.class);
>>>>>>>>>>>>>         options.setFilesToStage(
>>>>>>>>>>>>>                 Arrays.*stream*(System.*getProperty*(
>>>>>>>>>>>>> "java.class.path").
>>>>>>>>>>>>>                         split(File.*pathSeparator*)).
>>>>>>>>>>>>>                         map(entry -> (new
>>>>>>>>>>>>> File(entry)).toString()).collect(Collectors.*toList*()));
>>>>>>>>>>>>>
>>>>>>>>>>>>>         Pipeline pipeline = Pipeline.*create*(options);
>>>>>>>>>>>>>
>>>>>>>>>>>>>         Schema nestedSchema = Schema.*builder*().addField(
>>>>>>>>>>>>> Schema.Field.*nullable*("field1_0", Schema.FieldType.*STRING*
>>>>>>>>>>>>> )).build();
>>>>>>>>>>>>>         Schema.Field field = Schema.Field.*nullable*(
>>>>>>>>>>>>> "field0_0", Schema.FieldType.*STRING*);
>>>>>>>>>>>>>         Schema.Field nested = Schema.Field.*nullable*(
>>>>>>>>>>>>> "field0_1", Schema.FieldType.*row*(nestedSchema));
>>>>>>>>>>>>>         Schema.Field runner = Schema.Field.*nullable*(
>>>>>>>>>>>>> "field0_2", Schema.FieldType.*STRING*);
>>>>>>>>>>>>>         Schema rowSchema = Schema.*builder*()
>>>>>>>>>>>>>                 .addFields(field, nested, runner)
>>>>>>>>>>>>>                 .build();
>>>>>>>>>>>>>         Row testRow = Row.*withSchema*(rowSchema
>>>>>>>>>>>>> ).attachValues("value0_0", Row.*withSchema*(nestedSchema
>>>>>>>>>>>>> ).attachValues("value1_0"), options.getRunner().toString());
>>>>>>>>>>>>>         pipeline
>>>>>>>>>>>>>                 .apply(Create.*of*(testRow).withRowSchema(
>>>>>>>>>>>>> rowSchema))
>>>>>>>>>>>>>                 .apply(RenameFields.<Row>*create*()
>>>>>>>>>>>>>                         .rename("field0_0", "stringField")
>>>>>>>>>>>>>                         .rename("field0_1", "nestedField")
>>>>>>>>>>>>>                         .rename("field0_1.field1_0",
>>>>>>>>>>>>> "nestedStringField")
>>>>>>>>>>>>>                         .rename("field0_2", "runner"))
>>>>>>>>>>>>>                 .apply(BigQueryIO.<Row>*write*()
>>>>>>>>>>>>>                         .to(new TableReference().setProjectId(
>>>>>>>>>>>>> "lt-dia-lake-exp-raw").setDatasetId("prototypes").setTableId(
>>>>>>>>>>>>> "matto_renameFields"))
>>>>>>>>>>>>>                         .withCreateDisposition(BigQueryIO.
>>>>>>>>>>>>> Write.CreateDisposition.*CREATE_IF_NEEDED*)
>>>>>>>>>>>>>                         .withWriteDisposition(BigQueryIO.Write
>>>>>>>>>>>>> .WriteDisposition.*WRITE_APPEND*)
>>>>>>>>>>>>>                         .withSchemaUpdateOptions(new
>>>>>>>>>>>>> HashSet<>(*asList*(BigQueryIO.Write.SchemaUpdateOption.
>>>>>>>>>>>>> *ALLOW_FIELD_ADDITION*)))
>>>>>>>>>>>>>                         .useBeamSchema());
>>>>>>>>>>>>>         pipeline.run();
>>>>>>>>>>>>>     }
>>>>>>>>>>>>> }
>>>>>>>>>>>>>
>>>>>>>>>>>>

Reply via email to