This is an automated email from the ASF dual-hosted git repository. anton pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new fe327ef [SQL] Make BigQuery schema conversion order-aware new 120394b Merge pull request #8193 from akedin/bq-schema-order fe327ef is described below commit fe327ef97acc92f31711a242685710c5dfbc8249 Author: akedin <ke...@google.com> AuthorDate: Mon Apr 1 15:48:51 2019 -0700 [SQL] Make BigQuery schema conversion order-aware --- .../{BeamBigQueryTable.java => BigQueryTable.java} | 30 +++++++++++----------- .../provider/bigquery/BigQueryTableProvider.java | 6 +---- .../bigquery/BigQueryTableProviderTest.java | 6 ++--- .../beam/sdk/io/gcp/bigquery/BigQueryUtils.java | 11 ++++---- 4 files changed, 24 insertions(+), 29 deletions(-) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamBigQueryTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java similarity index 72% rename from sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamBigQueryTable.java rename to sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java index fdbcea4..6f3f56a 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamBigQueryTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java @@ -20,25 +20,26 @@ package org.apache.beam.sdk.extensions.sql.meta.provider.bigquery; import java.io.Serializable; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable; +import org.apache.beam.sdk.extensions.sql.meta.Table; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils; -import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.SchemaCoder; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.Row; /** - * {@code BeamBigQueryTable} represent a BigQuery table as a target. This provider does not - * currently support being a source. + * {@code BigQueryTable} represent a BigQuery table as a target. This provider does not currently + * support being a source. */ @Experimental -public class BeamBigQueryTable extends BaseBeamTable implements Serializable { - private String tableSpec; +class BigQueryTable extends BaseBeamTable implements Serializable { + String bqLocation; - public BeamBigQueryTable(Schema beamSchema, String tableSpec) { - super(beamSchema); - this.tableSpec = tableSpec; + BigQueryTable(Table table) { + super(table.getSchema()); + this.bqLocation = table.getLocation(); } @Override @@ -48,9 +49,12 @@ public class BeamBigQueryTable extends BaseBeamTable implements Serializable { @Override public PCollection<Row> buildIOReader(PBegin begin) { - // TODO: make this more generic. return begin - .apply(BigQueryIO.read(BigQueryUtils.toBeamRow(schema)).from(tableSpec)) + .apply( + "Read Input BQ Rows", + BigQueryIO.read(record -> BigQueryUtils.toBeamRow(record.getRecord(), getSchema())) + .from(bqLocation) + .withCoder(SchemaCoder.of(getSchema()))) .setRowSchema(getSchema()); } @@ -60,10 +64,6 @@ public class BeamBigQueryTable extends BaseBeamTable implements Serializable { BigQueryIO.<Row>write() .withSchema(BigQueryUtils.toTableSchema(getSchema())) .withFormatFunction(BigQueryUtils.toTableRow()) - .to(tableSpec)); - } - - String getTableSpec() { - return tableSpec; + .to(bqLocation)); } } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTableProvider.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTableProvider.java index 2eceb23..e96fe29 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTableProvider.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTableProvider.java @@ -22,7 +22,6 @@ import org.apache.beam.sdk.extensions.sql.BeamSqlTable; import org.apache.beam.sdk.extensions.sql.meta.Table; import org.apache.beam.sdk.extensions.sql.meta.provider.InMemoryMetaTableProvider; import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider; -import org.apache.beam.sdk.schemas.Schema; /** * BigQuery table provider. @@ -49,9 +48,6 @@ public class BigQueryTableProvider extends InMemoryMetaTableProvider { @Override public BeamSqlTable buildBeamSqlTable(Table table) { - Schema schema = table.getSchema(); - String filePattern = table.getLocation(); - - return new BeamBigQueryTable(schema, filePattern); + return new BigQueryTable(table); } } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTableProviderTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTableProviderTest.java index 3aa9089..47983e2 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTableProviderTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTableProviderTest.java @@ -43,10 +43,10 @@ public class BigQueryTableProviderTest { BeamSqlTable sqlTable = provider.buildBeamSqlTable(table); assertNotNull(sqlTable); - assertTrue(sqlTable instanceof BeamBigQueryTable); + assertTrue(sqlTable instanceof BigQueryTable); - BeamBigQueryTable bqTable = (BeamBigQueryTable) sqlTable; - assertEquals("project:dataset.table", bqTable.getTableSpec()); + BigQueryTable bqTable = (BigQueryTable) sqlTable; + assertEquals("project:dataset.table", bqTable.bqLocation); } private static Table fakeTable(String name) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java index 6d1d15a..c51bb5c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java @@ -203,13 +203,12 @@ public class BigQueryUtils { } public static Row toBeamRow(GenericRecord record, Schema schema) { - List<Object> values = new ArrayList(); - for (int i = 0; i < record.getSchema().getFields().size(); i++) { - org.apache.avro.Schema.Field avroField = record.getSchema().getFields().get(i); - values.add(AvroUtils.convertAvroFormat(schema.getField(i), record.get(avroField.name()))); - } + List<Object> valuesInOrder = + schema.getFields().stream() + .map(field -> AvroUtils.convertAvroFormat(field, record.get(field.getName()))) + .collect(toList()); - return Row.withSchema(schema).addValues(values).build(); + return Row.withSchema(schema).addValues(valuesInOrder).build(); } /** Convert a BigQuery TableRow to a Beam Row. */