This transform is the same across all runners. A few comments on the test:

  - Using attachValues directly is error prone (per the comment on the
method). I recommend using the withFieldValue builders instead.
  - I recommend capturing the RenameFields PCollection into a local
variable of type PCollection<Row> and printing out the schema (which you
can get using the PCollection.getSchema method) to ensure that the output
schema looks like you expect.
   - RenameFields doesn't flatten. So renaming field0_1.field1_0 - >
nestedStringField results in field0_1.nestedStringField; if you wanted to
flatten, then the better transform would be
Select.fieldNameAs("field0_1.field1_0", nestedStringField).

This all being said, eyeballing the implementation of RenameFields makes me
think that it is buggy in the case where you specify a top-level field
multiple times like you do. I think it is simply adding the top-level field
into the output schema multiple times, and the second time is with the
field0_1 base name; I have no idea why your test doesn't catch this in the
DirectRunner, as it's equally broken there. Could you file a JIRA about
this issue and assign it to me?

Reuven

On Tue, Jun 1, 2021 at 12:47 PM Kenneth Knowles <k...@apache.org> wrote:

>
>
> On Tue, Jun 1, 2021 at 12:42 PM Brian Hulette <bhule...@google.com> wrote:
>
>> Hi Matthew,
>>
>> > The unit tests also seem to be disabled for this as well and so I don’t
>> know if the PTransform behaves as expected.
>>
>> The exclusion for NeedsRunner tests is just a quirk in our testing
>> framework. NeedsRunner indicates that a test suite can't be executed with
>> the SDK alone, it needs a runner. So that exclusion just makes sure we
>> don't run the test when we're verifying the SDK by itself in the
>> :sdks:java:core:test task. The test is still run in other tasks where we
>> have a runner, most notably in the Java PreCommit [1], where we run it as
>> part of the :runners:direct-java:test task.
>>
>> That being said, we may only run these tests continuously with the
>> DirectRunner, I'm not sure if we test them on all the runners like we do
>> with ValidatesRunner tests.
>>
>
> That is correct. The tests are tests _of the transform_ so they run only
> on the DirectRunner. They are not tests of the runner, which is only
> responsible for correctly implementing Beam's primitives. The transform
> should not behave differently on different runners, except for fundamental
> differences in how they schedule work and checkpoint.
>
> Kenn
>
>
>> > The error message I’m receiving, : Error while reading data, error
>> message: JSON parsing error in row starting at position 0: No such field:
>> nestedField.field1_0, suggests the BigQuery is trying to use the
>> original name for the nested field and not the substitute name.
>>
>> Is there a stacktrace associated with this error? It would be helpful to
>> see where the error is coming from.
>>
>> Brian
>>
>>
>> [1]
>> https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/4101/testReport/org.apache.beam.sdk.schemas.transforms/RenameFieldsTest/
>>
>> On Mon, May 31, 2021 at 5:02 PM Matthew Ouyang <matthew.ouy...@gmail.com>
>> wrote:
>>
>>> I’m trying to use the RenameFields transform prior to inserting into
>>> BigQuery on nested fields.  Insertion into BigQuery is successful with
>>> DirectRunner, but DataflowRunner has an issue with renamed nested fields
>>>  The error message I’m receiving, : Error while reading data, error
>>> message: JSON parsing error in row starting at position 0: No such field:
>>> nestedField.field1_0, suggests the BigQuery is trying to use the
>>> original name for the nested field and not the substitute name.
>>>
>>> The code for RenameFields seems simple enough but does it behave
>>> differently in different runners?  Will a deep attachValues be necessary in
>>> order get the nested renames to work across all runners? Is there something
>>> wrong in my code?
>>>
>>>
>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/RenameFields.java#L186
>>>
>>> The unit tests also seem to be disabled for this as well and so I don’t
>>> know if the PTransform behaves as expected.
>>>
>>>
>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/build.gradle#L67
>>>
>>>
>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/RenameFieldsTest.java
>>>
>>> package ca.loblaw.cerebro.PipelineControl;
>>>>
>>>> import com.google.api.services.bigquery.model.TableReference;
>>>> import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions
>>>> ;
>>>> import org.apache.beam.sdk.Pipeline;
>>>> import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
>>>> import org.apache.beam.sdk.options.PipelineOptionsFactory;
>>>> import org.apache.beam.sdk.schemas.Schema;
>>>> import org.apache.beam.sdk.schemas.transforms.RenameFields;
>>>> import org.apache.beam.sdk.transforms.Create;
>>>> import org.apache.beam.sdk.values.Row;
>>>>
>>>> import java.io.File;
>>>> import java.util.Arrays;
>>>> import java.util.HashSet;
>>>> import java.util.stream.Collectors;
>>>>
>>>> import static java.util.Arrays.*asList*;
>>>>
>>>> public class BQRenameFields {
>>>>     public static void main(String[] args) {
>>>>         PipelineOptionsFactory.*register*(DataflowPipelineOptions.class
>>>> );
>>>>         DataflowPipelineOptions options = PipelineOptionsFactory.
>>>> *fromArgs*(args).as(DataflowPipelineOptions.class);
>>>>         options.setFilesToStage(
>>>>                 Arrays.*stream*(System.*getProperty*("java.class.path"
>>>> ).
>>>>                         split(File.*pathSeparator*)).
>>>>                         map(entry -> (new
>>>> File(entry)).toString()).collect(Collectors.*toList*()));
>>>>
>>>>         Pipeline pipeline = Pipeline.*create*(options);
>>>>
>>>>         Schema nestedSchema = Schema.*builder*().addField(Schema.Field.
>>>> *nullable*("field1_0", Schema.FieldType.*STRING*)).build();
>>>>         Schema.Field field = Schema.Field.*nullable*("field0_0", Schema
>>>> .FieldType.*STRING*);
>>>>         Schema.Field nested = Schema.Field.*nullable*("field0_1",
>>>> Schema.FieldType.*row*(nestedSchema));
>>>>         Schema.Field runner = Schema.Field.*nullable*("field0_2",
>>>> Schema.FieldType.*STRING*);
>>>>         Schema rowSchema = Schema.*builder*()
>>>>                 .addFields(field, nested, runner)
>>>>                 .build();
>>>>         Row testRow = Row.*withSchema*(rowSchema).attachValues(
>>>> "value0_0", Row.*withSchema*(nestedSchema).attachValues("value1_0"),
>>>> options.getRunner().toString());
>>>>         pipeline
>>>>                 .apply(Create.*of*(testRow).withRowSchema(rowSchema))
>>>>                 .apply(RenameFields.<Row>*create*()
>>>>                         .rename("field0_0", "stringField")
>>>>                         .rename("field0_1", "nestedField")
>>>>                         .rename("field0_1.field1_0",
>>>> "nestedStringField")
>>>>                         .rename("field0_2", "runner"))
>>>>                 .apply(BigQueryIO.<Row>*write*()
>>>>                         .to(new TableReference().setProjectId(
>>>> "lt-dia-lake-exp-raw").setDatasetId("prototypes").setTableId(
>>>> "matto_renameFields"))
>>>>                         .withCreateDisposition(BigQueryIO.Write.
>>>> CreateDisposition.*CREATE_IF_NEEDED*)
>>>>                         .withWriteDisposition(BigQueryIO.Write.
>>>> WriteDisposition.*WRITE_APPEND*)
>>>>                         .withSchemaUpdateOptions(new HashSet<>(*asList*
>>>> (BigQueryIO.Write.SchemaUpdateOption.*ALLOW_FIELD_ADDITION*)))
>>>>                         .useBeamSchema());
>>>>         pipeline.run();
>>>>     }
>>>> }
>>>>
>>>

Reply via email to