This transform is the same across all runners. A few comments on the test: - Using attachValues directly is error prone (per the comment on the method). I recommend using the withFieldValue builders instead. - I recommend capturing the RenameFields PCollection into a local variable of type PCollection<Row> and printing out the schema (which you can get using the PCollection.getSchema method) to ensure that the output schema looks like you expect. - RenameFields doesn't flatten. So renaming field0_1.field1_0 - > nestedStringField results in field0_1.nestedStringField; if you wanted to flatten, then the better transform would be Select.fieldNameAs("field0_1.field1_0", nestedStringField).
This all being said, eyeballing the implementation of RenameFields makes me think that it is buggy in the case where you specify a top-level field multiple times like you do. I think it is simply adding the top-level field into the output schema multiple times, and the second time is with the field0_1 base name; I have no idea why your test doesn't catch this in the DirectRunner, as it's equally broken there. Could you file a JIRA about this issue and assign it to me? Reuven On Tue, Jun 1, 2021 at 12:47 PM Kenneth Knowles <k...@apache.org> wrote: > > > On Tue, Jun 1, 2021 at 12:42 PM Brian Hulette <bhule...@google.com> wrote: > >> Hi Matthew, >> >> > The unit tests also seem to be disabled for this as well and so I don’t >> know if the PTransform behaves as expected. >> >> The exclusion for NeedsRunner tests is just a quirk in our testing >> framework. NeedsRunner indicates that a test suite can't be executed with >> the SDK alone, it needs a runner. So that exclusion just makes sure we >> don't run the test when we're verifying the SDK by itself in the >> :sdks:java:core:test task. The test is still run in other tasks where we >> have a runner, most notably in the Java PreCommit [1], where we run it as >> part of the :runners:direct-java:test task. >> >> That being said, we may only run these tests continuously with the >> DirectRunner, I'm not sure if we test them on all the runners like we do >> with ValidatesRunner tests. >> > > That is correct. The tests are tests _of the transform_ so they run only > on the DirectRunner. They are not tests of the runner, which is only > responsible for correctly implementing Beam's primitives. The transform > should not behave differently on different runners, except for fundamental > differences in how they schedule work and checkpoint. > > Kenn > > >> > The error message I’m receiving, : Error while reading data, error >> message: JSON parsing error in row starting at position 0: No such field: >> nestedField.field1_0, suggests the BigQuery is trying to use the >> original name for the nested field and not the substitute name. >> >> Is there a stacktrace associated with this error? It would be helpful to >> see where the error is coming from. >> >> Brian >> >> >> [1] >> https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/4101/testReport/org.apache.beam.sdk.schemas.transforms/RenameFieldsTest/ >> >> On Mon, May 31, 2021 at 5:02 PM Matthew Ouyang <matthew.ouy...@gmail.com> >> wrote: >> >>> I’m trying to use the RenameFields transform prior to inserting into >>> BigQuery on nested fields. Insertion into BigQuery is successful with >>> DirectRunner, but DataflowRunner has an issue with renamed nested fields >>> The error message I’m receiving, : Error while reading data, error >>> message: JSON parsing error in row starting at position 0: No such field: >>> nestedField.field1_0, suggests the BigQuery is trying to use the >>> original name for the nested field and not the substitute name. >>> >>> The code for RenameFields seems simple enough but does it behave >>> differently in different runners? Will a deep attachValues be necessary in >>> order get the nested renames to work across all runners? Is there something >>> wrong in my code? >>> >>> >>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/RenameFields.java#L186 >>> >>> The unit tests also seem to be disabled for this as well and so I don’t >>> know if the PTransform behaves as expected. >>> >>> >>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/build.gradle#L67 >>> >>> >>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/RenameFieldsTest.java >>> >>> package ca.loblaw.cerebro.PipelineControl; >>>> >>>> import com.google.api.services.bigquery.model.TableReference; >>>> import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions >>>> ; >>>> import org.apache.beam.sdk.Pipeline; >>>> import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; >>>> import org.apache.beam.sdk.options.PipelineOptionsFactory; >>>> import org.apache.beam.sdk.schemas.Schema; >>>> import org.apache.beam.sdk.schemas.transforms.RenameFields; >>>> import org.apache.beam.sdk.transforms.Create; >>>> import org.apache.beam.sdk.values.Row; >>>> >>>> import java.io.File; >>>> import java.util.Arrays; >>>> import java.util.HashSet; >>>> import java.util.stream.Collectors; >>>> >>>> import static java.util.Arrays.*asList*; >>>> >>>> public class BQRenameFields { >>>> public static void main(String[] args) { >>>> PipelineOptionsFactory.*register*(DataflowPipelineOptions.class >>>> ); >>>> DataflowPipelineOptions options = PipelineOptionsFactory. >>>> *fromArgs*(args).as(DataflowPipelineOptions.class); >>>> options.setFilesToStage( >>>> Arrays.*stream*(System.*getProperty*("java.class.path" >>>> ). >>>> split(File.*pathSeparator*)). >>>> map(entry -> (new >>>> File(entry)).toString()).collect(Collectors.*toList*())); >>>> >>>> Pipeline pipeline = Pipeline.*create*(options); >>>> >>>> Schema nestedSchema = Schema.*builder*().addField(Schema.Field. >>>> *nullable*("field1_0", Schema.FieldType.*STRING*)).build(); >>>> Schema.Field field = Schema.Field.*nullable*("field0_0", Schema >>>> .FieldType.*STRING*); >>>> Schema.Field nested = Schema.Field.*nullable*("field0_1", >>>> Schema.FieldType.*row*(nestedSchema)); >>>> Schema.Field runner = Schema.Field.*nullable*("field0_2", >>>> Schema.FieldType.*STRING*); >>>> Schema rowSchema = Schema.*builder*() >>>> .addFields(field, nested, runner) >>>> .build(); >>>> Row testRow = Row.*withSchema*(rowSchema).attachValues( >>>> "value0_0", Row.*withSchema*(nestedSchema).attachValues("value1_0"), >>>> options.getRunner().toString()); >>>> pipeline >>>> .apply(Create.*of*(testRow).withRowSchema(rowSchema)) >>>> .apply(RenameFields.<Row>*create*() >>>> .rename("field0_0", "stringField") >>>> .rename("field0_1", "nestedField") >>>> .rename("field0_1.field1_0", >>>> "nestedStringField") >>>> .rename("field0_2", "runner")) >>>> .apply(BigQueryIO.<Row>*write*() >>>> .to(new TableReference().setProjectId( >>>> "lt-dia-lake-exp-raw").setDatasetId("prototypes").setTableId( >>>> "matto_renameFields")) >>>> .withCreateDisposition(BigQueryIO.Write. >>>> CreateDisposition.*CREATE_IF_NEEDED*) >>>> .withWriteDisposition(BigQueryIO.Write. >>>> WriteDisposition.*WRITE_APPEND*) >>>> .withSchemaUpdateOptions(new HashSet<>(*asList* >>>> (BigQueryIO.Write.SchemaUpdateOption.*ALLOW_FIELD_ADDITION*))) >>>> .useBeamSchema()); >>>> pipeline.run(); >>>> } >>>> } >>>> >>>