This is an automated email from the ASF dual-hosted git repository.

anton pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new fe327ef  [SQL] Make BigQuery schema conversion order-aware
     new 120394b  Merge pull request #8193 from akedin/bq-schema-order
fe327ef is described below

commit fe327ef97acc92f31711a242685710c5dfbc8249
Author: akedin <ke...@google.com>
AuthorDate: Mon Apr 1 15:48:51 2019 -0700

    [SQL] Make BigQuery schema conversion order-aware
---
 .../{BeamBigQueryTable.java => BigQueryTable.java} | 30 +++++++++++-----------
 .../provider/bigquery/BigQueryTableProvider.java   |  6 +----
 .../bigquery/BigQueryTableProviderTest.java        |  6 ++---
 .../beam/sdk/io/gcp/bigquery/BigQueryUtils.java    | 11 ++++----
 4 files changed, 24 insertions(+), 29 deletions(-)

diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamBigQueryTable.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java
similarity index 72%
rename from 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamBigQueryTable.java
rename to 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java
index fdbcea4..6f3f56a 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamBigQueryTable.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java
@@ -20,25 +20,26 @@ package 
org.apache.beam.sdk.extensions.sql.meta.provider.bigquery;
 import java.io.Serializable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
-import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaCoder;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.Row;
 
 /**
- * {@code BeamBigQueryTable} represent a BigQuery table as a target. This 
provider does not
- * currently support being a source.
+ * {@code BigQueryTable} represent a BigQuery table as a target. This provider 
does not currently
+ * support being a source.
  */
 @Experimental
-public class BeamBigQueryTable extends BaseBeamTable implements Serializable {
-  private String tableSpec;
+class BigQueryTable extends BaseBeamTable implements Serializable {
+  String bqLocation;
 
-  public BeamBigQueryTable(Schema beamSchema, String tableSpec) {
-    super(beamSchema);
-    this.tableSpec = tableSpec;
+  BigQueryTable(Table table) {
+    super(table.getSchema());
+    this.bqLocation = table.getLocation();
   }
 
   @Override
@@ -48,9 +49,12 @@ public class BeamBigQueryTable extends BaseBeamTable 
implements Serializable {
 
   @Override
   public PCollection<Row> buildIOReader(PBegin begin) {
-    // TODO: make this more generic.
     return begin
-        
.apply(BigQueryIO.read(BigQueryUtils.toBeamRow(schema)).from(tableSpec))
+        .apply(
+            "Read Input BQ Rows",
+            BigQueryIO.read(record -> 
BigQueryUtils.toBeamRow(record.getRecord(), getSchema()))
+                .from(bqLocation)
+                .withCoder(SchemaCoder.of(getSchema())))
         .setRowSchema(getSchema());
   }
 
@@ -60,10 +64,6 @@ public class BeamBigQueryTable extends BaseBeamTable 
implements Serializable {
         BigQueryIO.<Row>write()
             .withSchema(BigQueryUtils.toTableSchema(getSchema()))
             .withFormatFunction(BigQueryUtils.toTableRow())
-            .to(tableSpec));
-  }
-
-  String getTableSpec() {
-    return tableSpec;
+            .to(bqLocation));
   }
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTableProvider.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTableProvider.java
index 2eceb23..e96fe29 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTableProvider.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTableProvider.java
@@ -22,7 +22,6 @@ import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
 import org.apache.beam.sdk.extensions.sql.meta.Table;
 import 
org.apache.beam.sdk.extensions.sql.meta.provider.InMemoryMetaTableProvider;
 import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
-import org.apache.beam.sdk.schemas.Schema;
 
 /**
  * BigQuery table provider.
@@ -49,9 +48,6 @@ public class BigQueryTableProvider extends 
InMemoryMetaTableProvider {
 
   @Override
   public BeamSqlTable buildBeamSqlTable(Table table) {
-    Schema schema = table.getSchema();
-    String filePattern = table.getLocation();
-
-    return new BeamBigQueryTable(schema, filePattern);
+    return new BigQueryTable(table);
   }
 }
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTableProviderTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTableProviderTest.java
index 3aa9089..47983e2 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTableProviderTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTableProviderTest.java
@@ -43,10 +43,10 @@ public class BigQueryTableProviderTest {
     BeamSqlTable sqlTable = provider.buildBeamSqlTable(table);
 
     assertNotNull(sqlTable);
-    assertTrue(sqlTable instanceof BeamBigQueryTable);
+    assertTrue(sqlTable instanceof BigQueryTable);
 
-    BeamBigQueryTable bqTable = (BeamBigQueryTable) sqlTable;
-    assertEquals("project:dataset.table", bqTable.getTableSpec());
+    BigQueryTable bqTable = (BigQueryTable) sqlTable;
+    assertEquals("project:dataset.table", bqTable.bqLocation);
   }
 
   private static Table fakeTable(String name) {
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
index 6d1d15a..c51bb5c 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
@@ -203,13 +203,12 @@ public class BigQueryUtils {
   }
 
   public static Row toBeamRow(GenericRecord record, Schema schema) {
-    List<Object> values = new ArrayList();
-    for (int i = 0; i < record.getSchema().getFields().size(); i++) {
-      org.apache.avro.Schema.Field avroField = 
record.getSchema().getFields().get(i);
-      values.add(AvroUtils.convertAvroFormat(schema.getField(i), 
record.get(avroField.name())));
-    }
+    List<Object> valuesInOrder =
+        schema.getFields().stream()
+            .map(field -> AvroUtils.convertAvroFormat(field, 
record.get(field.getName())))
+            .collect(toList());
 
-    return Row.withSchema(schema).addValues(values).build();
+    return Row.withSchema(schema).addValues(valuesInOrder).build();
   }
 
   /** Convert a BigQuery TableRow to a Beam Row. */

Reply via email to