Aha, yes this indeed another bug in the transform. The schema is set on the
top-level Row but not on any nested rows.

On Tue, Jun 1, 2021 at 6:37 PM Matthew Ouyang <matthew.ouy...@gmail.com>
wrote:

> Thank you everyone for your input.  I believe it will be easiest to
> respond to all feedback in a single message rather than messages per person.
>
>    - NeedsRunner - The tests are run eventually, so obviously all good on
>    my end.  I was trying to run the smallest subset of test cases possible and
>    didn't venture beyond `gradle test`.
>    - Stack Trace - There wasn't any unfortunately because no exception
>    thrown in the code.  The Beam Row was translated into a BQ TableRow and an
>    insertion was attempted.  The error "message" was part of the response JSON
>    that came back as a result of a request against the BQ API.
>    - Desired Behaviour - (field0_1.field1_0, nestedStringField) ->
>    field0_1.nestedStringField is what I am looking for.
>    - Info Logging Findings (In Lieu of a Stack Trace)
>       - The Beam Schema was as expected with all renames applied.
>       - The example I provided was heavily stripped down in order to
>       isolate the problem.  My work example which a bit impractical because 
> it's
>       part of some generic tooling has 4 levels of nesting and also produces 
> the
>       correct output too.
>       - BigQueryUtils.toTableRow(Row) returns the expected TableRow in
>       DirectRunner.  In DataflowRunner however, only the top-level renames 
> were
>       reflected in the TableRow and all renames in the nested fields weren't.
>       - BigQueryUtils.toTableRow(Row) recurses on the Row values and uses
>       the Row.schema to get the field names.  This makes sense to me, but if a
>       value is actually a Row then its schema appears to be inconsistent with 
> the
>       top-level schema
>    - My Current Workaround - I forked RenameFields and replaced the
>    attachValues in expand method to be a "deep" rename.  This is obviously
>    inefficient and I will not be submitting a PR for that.
>    - JIRA ticket - https://issues.apache.org/jira/browse/BEAM-12442
>
>
> On Tue, Jun 1, 2021 at 5:51 PM Reuven Lax <re...@google.com> wrote:
>
>> This transform is the same across all runners. A few comments on the test:
>>
>>   - Using attachValues directly is error prone (per the comment on the
>> method). I recommend using the withFieldValue builders instead.
>>   - I recommend capturing the RenameFields PCollection into a local
>> variable of type PCollection<Row> and printing out the schema (which you
>> can get using the PCollection.getSchema method) to ensure that the output
>> schema looks like you expect.
>>    - RenameFields doesn't flatten. So renaming field0_1.field1_0 - >
>> nestedStringField results in field0_1.nestedStringField; if you wanted to
>> flatten, then the better transform would be
>> Select.fieldNameAs("field0_1.field1_0", nestedStringField).
>>
>> This all being said, eyeballing the implementation of RenameFields makes
>> me think that it is buggy in the case where you specify a top-level field
>> multiple times like you do. I think it is simply adding the top-level field
>> into the output schema multiple times, and the second time is with the
>> field0_1 base name; I have no idea why your test doesn't catch this in the
>> DirectRunner, as it's equally broken there. Could you file a JIRA about
>> this issue and assign it to me?
>>
>> Reuven
>>
>> On Tue, Jun 1, 2021 at 12:47 PM Kenneth Knowles <k...@apache.org> wrote:
>>
>>>
>>>
>>> On Tue, Jun 1, 2021 at 12:42 PM Brian Hulette <bhule...@google.com>
>>> wrote:
>>>
>>>> Hi Matthew,
>>>>
>>>> > The unit tests also seem to be disabled for this as well and so I
>>>> don’t know if the PTransform behaves as expected.
>>>>
>>>> The exclusion for NeedsRunner tests is just a quirk in our testing
>>>> framework. NeedsRunner indicates that a test suite can't be executed with
>>>> the SDK alone, it needs a runner. So that exclusion just makes sure we
>>>> don't run the test when we're verifying the SDK by itself in the
>>>> :sdks:java:core:test task. The test is still run in other tasks where we
>>>> have a runner, most notably in the Java PreCommit [1], where we run it as
>>>> part of the :runners:direct-java:test task.
>>>>
>>>> That being said, we may only run these tests continuously with the
>>>> DirectRunner, I'm not sure if we test them on all the runners like we do
>>>> with ValidatesRunner tests.
>>>>
>>>
>>> That is correct. The tests are tests _of the transform_ so they run only
>>> on the DirectRunner. They are not tests of the runner, which is only
>>> responsible for correctly implementing Beam's primitives. The transform
>>> should not behave differently on different runners, except for fundamental
>>> differences in how they schedule work and checkpoint.
>>>
>>> Kenn
>>>
>>>
>>>> > The error message I’m receiving, : Error while reading data, error
>>>> message: JSON parsing error in row starting at position 0: No such field:
>>>> nestedField.field1_0, suggests the BigQuery is trying to use the
>>>> original name for the nested field and not the substitute name.
>>>>
>>>> Is there a stacktrace associated with this error? It would be helpful
>>>> to see where the error is coming from.
>>>>
>>>> Brian
>>>>
>>>>
>>>> [1]
>>>> https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/4101/testReport/org.apache.beam.sdk.schemas.transforms/RenameFieldsTest/
>>>>
>>>> On Mon, May 31, 2021 at 5:02 PM Matthew Ouyang <
>>>> matthew.ouy...@gmail.com> wrote:
>>>>
>>>>> I’m trying to use the RenameFields transform prior to inserting into
>>>>> BigQuery on nested fields.  Insertion into BigQuery is successful with
>>>>> DirectRunner, but DataflowRunner has an issue with renamed nested fields
>>>>>  The error message I’m receiving, : Error while reading data, error
>>>>> message: JSON parsing error in row starting at position 0: No such field:
>>>>> nestedField.field1_0, suggests the BigQuery is trying to use the
>>>>> original name for the nested field and not the substitute name.
>>>>>
>>>>> The code for RenameFields seems simple enough but does it behave
>>>>> differently in different runners?  Will a deep attachValues be necessary 
>>>>> in
>>>>> order get the nested renames to work across all runners? Is there 
>>>>> something
>>>>> wrong in my code?
>>>>>
>>>>>
>>>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/RenameFields.java#L186
>>>>>
>>>>> The unit tests also seem to be disabled for this as well and so I
>>>>> don’t know if the PTransform behaves as expected.
>>>>>
>>>>>
>>>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/build.gradle#L67
>>>>>
>>>>>
>>>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/RenameFieldsTest.java
>>>>>
>>>>> package ca.loblaw.cerebro.PipelineControl;
>>>>>>
>>>>>> import com.google.api.services.bigquery.model.TableReference;
>>>>>> import
>>>>>> org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
>>>>>> import org.apache.beam.sdk.Pipeline;
>>>>>> import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
>>>>>> import org.apache.beam.sdk.options.PipelineOptionsFactory;
>>>>>> import org.apache.beam.sdk.schemas.Schema;
>>>>>> import org.apache.beam.sdk.schemas.transforms.RenameFields;
>>>>>> import org.apache.beam.sdk.transforms.Create;
>>>>>> import org.apache.beam.sdk.values.Row;
>>>>>>
>>>>>> import java.io.File;
>>>>>> import java.util.Arrays;
>>>>>> import java.util.HashSet;
>>>>>> import java.util.stream.Collectors;
>>>>>>
>>>>>> import static java.util.Arrays.*asList*;
>>>>>>
>>>>>> public class BQRenameFields {
>>>>>>     public static void main(String[] args) {
>>>>>>         PipelineOptionsFactory.*register*(DataflowPipelineOptions.
>>>>>> class);
>>>>>>         DataflowPipelineOptions options = PipelineOptionsFactory.
>>>>>> *fromArgs*(args).as(DataflowPipelineOptions.class);
>>>>>>         options.setFilesToStage(
>>>>>>                 Arrays.*stream*(System.*getProperty*(
>>>>>> "java.class.path").
>>>>>>                         split(File.*pathSeparator*)).
>>>>>>                         map(entry -> (new
>>>>>> File(entry)).toString()).collect(Collectors.*toList*()));
>>>>>>
>>>>>>         Pipeline pipeline = Pipeline.*create*(options);
>>>>>>
>>>>>>         Schema nestedSchema = Schema.*builder*().addField(Schema.
>>>>>> Field.*nullable*("field1_0", Schema.FieldType.*STRING*)).build();
>>>>>>         Schema.Field field = Schema.Field.*nullable*("field0_0",
>>>>>> Schema.FieldType.*STRING*);
>>>>>>         Schema.Field nested = Schema.Field.*nullable*("field0_1",
>>>>>> Schema.FieldType.*row*(nestedSchema));
>>>>>>         Schema.Field runner = Schema.Field.*nullable*("field0_2",
>>>>>> Schema.FieldType.*STRING*);
>>>>>>         Schema rowSchema = Schema.*builder*()
>>>>>>                 .addFields(field, nested, runner)
>>>>>>                 .build();
>>>>>>         Row testRow = Row.*withSchema*(rowSchema).attachValues(
>>>>>> "value0_0", Row.*withSchema*(nestedSchema).attachValues("value1_0"),
>>>>>> options.getRunner().toString());
>>>>>>         pipeline
>>>>>>                 .apply(Create.*of*(testRow).withRowSchema(rowSchema))
>>>>>>                 .apply(RenameFields.<Row>*create*()
>>>>>>                         .rename("field0_0", "stringField")
>>>>>>                         .rename("field0_1", "nestedField")
>>>>>>                         .rename("field0_1.field1_0",
>>>>>> "nestedStringField")
>>>>>>                         .rename("field0_2", "runner"))
>>>>>>                 .apply(BigQueryIO.<Row>*write*()
>>>>>>                         .to(new TableReference().setProjectId(
>>>>>> "lt-dia-lake-exp-raw").setDatasetId("prototypes").setTableId(
>>>>>> "matto_renameFields"))
>>>>>>                         .withCreateDisposition(BigQueryIO.Write.
>>>>>> CreateDisposition.*CREATE_IF_NEEDED*)
>>>>>>                         .withWriteDisposition(BigQueryIO.Write.
>>>>>> WriteDisposition.*WRITE_APPEND*)
>>>>>>                         .withSchemaUpdateOptions(new HashSet<>(
>>>>>> *asList*(BigQueryIO.Write.SchemaUpdateOption.*ALLOW_FIELD_ADDITION*
>>>>>> )))
>>>>>>                         .useBeamSchema());
>>>>>>         pipeline.run();
>>>>>>     }
>>>>>> }
>>>>>>
>>>>>

Reply via email to