I’m trying to use the RenameFields transform prior to inserting into
BigQuery on nested fields.  Insertion into BigQuery is successful with
DirectRunner, but DataflowRunner has an issue with renamed nested fields
 The error message I’m receiving, : Error while reading data, error
message: JSON parsing error in row starting at position 0: No such field:
nestedField.field1_0, suggests the BigQuery is trying to use the original
name for the nested field and not the substitute name.

The code for RenameFields seems simple enough but does it behave
differently in different runners?  Will a deep attachValues be necessary in
order get the nested renames to work across all runners? Is there something
wrong in my code?

https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/RenameFields.java#L186

The unit tests also seem to be disabled for this as well and so I don’t
know if the PTransform behaves as expected.

https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/build.gradle#L67

https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/RenameFieldsTest.java

package ca.loblaw.cerebro.PipelineControl;
>
> import com.google.api.services.bigquery.model.TableReference;
> import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
> import org.apache.beam.sdk.Pipeline;
> import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
> import org.apache.beam.sdk.options.PipelineOptionsFactory;
> import org.apache.beam.sdk.schemas.Schema;
> import org.apache.beam.sdk.schemas.transforms.RenameFields;
> import org.apache.beam.sdk.transforms.Create;
> import org.apache.beam.sdk.values.Row;
>
> import java.io.File;
> import java.util.Arrays;
> import java.util.HashSet;
> import java.util.stream.Collectors;
>
> import static java.util.Arrays.*asList*;
>
> public class BQRenameFields {
>     public static void main(String[] args) {
>         PipelineOptionsFactory.*register*(DataflowPipelineOptions.class);
>         DataflowPipelineOptions options = PipelineOptionsFactory.
> *fromArgs*(args).as(DataflowPipelineOptions.class);
>         options.setFilesToStage(
>                 Arrays.*stream*(System.*getProperty*("java.class.path").
>                         split(File.*pathSeparator*)).
>                         map(entry -> (new File(entry)).toString()).
> collect(Collectors.*toList*()));
>
>         Pipeline pipeline = Pipeline.*create*(options);
>
>         Schema nestedSchema = Schema.*builder*().addField(Schema.Field.
> *nullable*("field1_0", Schema.FieldType.*STRING*)).build();
>         Schema.Field field = Schema.Field.*nullable*("field0_0", Schema.
> FieldType.*STRING*);
>         Schema.Field nested = Schema.Field.*nullable*("field0_1", Schema.
> FieldType.*row*(nestedSchema));
>         Schema.Field runner = Schema.Field.*nullable*("field0_2", Schema.
> FieldType.*STRING*);
>         Schema rowSchema = Schema.*builder*()
>                 .addFields(field, nested, runner)
>                 .build();
>         Row testRow = Row.*withSchema*(rowSchema).attachValues("value0_0"
> , Row.*withSchema*(nestedSchema).attachValues("value1_0"), options
> .getRunner().toString());
>         pipeline
>                 .apply(Create.*of*(testRow).withRowSchema(rowSchema))
>                 .apply(RenameFields.<Row>*create*()
>                         .rename("field0_0", "stringField")
>                         .rename("field0_1", "nestedField")
>                         .rename("field0_1.field1_0", "nestedStringField")
>                         .rename("field0_2", "runner"))
>                 .apply(BigQueryIO.<Row>*write*()
>                         .to(new TableReference().setProjectId(
> "lt-dia-lake-exp-raw").setDatasetId("prototypes").setTableId("matto_
> renameFields"))
>                         .withCreateDisposition(BigQueryIO.Write.
> CreateDisposition.*CREATE_IF_NEEDED*)
>                         .withWriteDisposition(BigQueryIO.Write.
> WriteDisposition.*WRITE_APPEND*)
>                         .withSchemaUpdateOptions(new HashSet<>(*asList*(
> BigQueryIO.Write.SchemaUpdateOption.*ALLOW_FIELD_ADDITION*)))
>                         .useBeamSchema());
>         pipeline.run();
>     }
> }
>

Reply via email to