This is an automated email from the ASF dual-hosted git repository.

kenn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 389f649  [BEAM-7755] adding support for nested rows and arrays in 
BigQuery to Beam row conversion (#9089)
389f649 is described below

commit 389f649691344f5b86e1f7092db157dfe004c2bb
Author: Sahith Nallapareddy <[email protected]>
AuthorDate: Fri Jul 26 16:21:34 2019 -0400

    [BEAM-7755] adding support for nested rows and arrays in BigQuery to Beam 
row conversion (#9089)
---
 .../beam/sdk/io/gcp/bigquery/BigQueryUtils.java    | 39 +++++++----
 .../sdk/io/gcp/bigquery/BigQueryUtilsTest.java     | 79 ++++++++++++++++++++--
 2 files changed, 100 insertions(+), 18 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
index 286420e..1316a18 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
@@ -34,6 +34,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.IntStream;
+import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.beam.sdk.schemas.LogicalTypes;
 import org.apache.beam.sdk.schemas.Schema;
@@ -313,7 +314,15 @@ public class BigQueryUtils {
   public static Row toBeamRow(GenericRecord record, Schema schema, 
ConversionOptions options) {
     List<Object> valuesInOrder =
         schema.getFields().stream()
-            .map(field -> convertAvroFormat(field, 
record.get(field.getName()), options))
+            .map(
+                field -> {
+                  try {
+                    return convertAvroFormat(field.getType(), 
record.get(field.getName()), options);
+                  } catch (Exception cause) {
+                    throw new IllegalArgumentException(
+                        "Error converting field " + field + ": " + 
cause.getMessage(), cause);
+                  }
+                })
             .collect(toList());
 
     return Row.withSchema(schema).addValues(valuesInOrder).build();
@@ -471,14 +480,13 @@ public class BigQueryUtils {
    * Beam field.
    */
   public static Object convertAvroFormat(
-      Field beamField, Object avroValue, BigQueryUtils.ConversionOptions 
options) {
-    TypeName beamFieldTypeName = beamField.getType().getTypeName();
+      FieldType beamFieldType, Object avroValue, 
BigQueryUtils.ConversionOptions options) {
+    TypeName beamFieldTypeName = beamFieldType.getTypeName();
     if (avroValue == null) {
-      if (beamField.getType().getNullable()) {
+      if (beamFieldType.getNullable()) {
         return null;
       } else {
-        throw new IllegalArgumentException(
-            String.format("Field %s not nullable", beamField.getName()));
+        throw new IllegalArgumentException(String.format("Field %s not 
nullable", beamFieldType));
       }
     }
     switch (beamFieldTypeName) {
@@ -505,9 +513,9 @@ public class BigQueryUtils {
       case STRING:
         return convertAvroPrimitiveTypes(beamFieldTypeName, avroValue);
       case ARRAY:
-        return convertAvroArray(beamField, avroValue);
+        return convertAvroArray(beamFieldType, avroValue, options);
       case LOGICAL_TYPE:
-        String identifier = 
beamField.getType().getLogicalType().getIdentifier();
+        String identifier = beamFieldType.getLogicalType().getIdentifier();
         if (SQL_DATE_TIME_TYPES.contains(identifier)) {
           switch (options.getTruncateTimestamps()) {
             case TRUNCATE:
@@ -524,6 +532,13 @@ public class BigQueryUtils {
         } else {
           throw new RuntimeException("Unknown logical type " + identifier);
         }
+      case ROW:
+        Schema rowSchema = beamFieldType.getRowSchema();
+        if (rowSchema == null) {
+          throw new IllegalArgumentException("Nested ROW missing row schema");
+        }
+        GenericData.Record record = (GenericData.Record) avroValue;
+        return toBeamRow(record, rowSchema, options);
       case DECIMAL:
         throw new RuntimeException("Does not support converting DECIMAL type 
value");
       case MAP:
@@ -553,14 +568,14 @@ public class BigQueryUtils {
     return new Instant((long) value / 1000);
   }
 
-  private static Object convertAvroArray(Field beamField, Object value) {
+  private static Object convertAvroArray(
+      FieldType beamField, Object value, BigQueryUtils.ConversionOptions 
options) {
     // Check whether the type of array element is equal.
     List<Object> values = (List<Object>) value;
     List<Object> ret = new ArrayList();
+    FieldType collectionElement = beamField.getCollectionElementType();
     for (Object v : values) {
-      ret.add(
-          convertAvroPrimitiveTypes(
-              beamField.getType().getCollectionElementType().getTypeName(), 
v));
+      ret.add(convertAvroFormat(collectionElement, v, options));
     }
     return (Object) ret;
   }
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
index 86b2d27..67d997e 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
@@ -35,8 +35,10 @@ import java.util.Arrays;
 import java.util.Base64;
 import java.util.Collections;
 import java.util.List;
+import org.apache.avro.generic.GenericData;
 import 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.ConversionOptions.TruncateTimestamps;
 import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
 import org.apache.beam.sdk.values.Row;
 import org.joda.time.DateTime;
 import org.joda.time.Instant;
@@ -172,6 +174,20 @@ public class BigQueryUtilsTest {
   private static final TableSchema BQ_ARRAY_ROW_TYPE =
       new TableSchema().setFields(Arrays.asList(ROWS));
 
+  private static final Schema AVRO_FLAT_TYPE =
+      Schema.builder()
+          .addNullableField("id", Schema.FieldType.INT64)
+          .addNullableField("value", Schema.FieldType.DOUBLE)
+          .addNullableField("name", Schema.FieldType.STRING)
+          .addNullableField("valid", Schema.FieldType.BOOLEAN)
+          .build();
+
+  private static final Schema AVRO_ARRAY_TYPE =
+      Schema.builder().addArrayField("rows", 
Schema.FieldType.row(AVRO_FLAT_TYPE)).build();
+
+  private static final Schema AVRO_ARRAY_ARRAY_TYPE =
+      Schema.builder().addArrayField("array_rows", 
Schema.FieldType.row(AVRO_ARRAY_TYPE)).build();
+
   @Test
   public void testToTableSchema_flat() {
     TableSchema schema = toTableSchema(FLAT_TYPE);
@@ -289,7 +305,9 @@ public class BigQueryUtilsTest {
         IllegalArgumentException.class,
         () ->
             BigQueryUtils.convertAvroFormat(
-                Schema.Field.of("dummy", Schema.FieldType.DATETIME), 
1000000001L, REJECT_OPTIONS));
+                Schema.Field.of("dummy", Schema.FieldType.DATETIME).getType(),
+                1000000001L,
+                REJECT_OPTIONS));
   }
 
   @Test
@@ -297,7 +315,9 @@ public class BigQueryUtilsTest {
     long millis = 123456789L;
     assertThat(
         BigQueryUtils.convertAvroFormat(
-            Schema.Field.of("dummy", Schema.FieldType.DATETIME), millis * 
1000, REJECT_OPTIONS),
+            Schema.Field.of("dummy", Schema.FieldType.DATETIME).getType(),
+            millis * 1000,
+            REJECT_OPTIONS),
         equalTo(new Instant(millis)));
   }
 
@@ -306,7 +326,7 @@ public class BigQueryUtilsTest {
     long millis = 123456789L;
     assertThat(
         BigQueryUtils.convertAvroFormat(
-            Schema.Field.of("dummy", Schema.FieldType.DATETIME),
+            Schema.Field.of("dummy", Schema.FieldType.DATETIME).getType(),
             millis * 1000 + 123,
             TRUNCATE_OPTIONS),
         equalTo(new Instant(millis)));
@@ -319,7 +339,8 @@ public class BigQueryUtilsTest {
         IllegalArgumentException.class,
         () ->
             BigQueryUtils.convertAvroFormat(
-                Schema.Field.of("dummy", Schema.FieldType.logicalType(new 
FakeSqlTimeType())),
+                Schema.Field.of("dummy", Schema.FieldType.logicalType(new 
FakeSqlTimeType()))
+                    .getType(),
                 1000000001L,
                 REJECT_OPTIONS));
   }
@@ -329,7 +350,7 @@ public class BigQueryUtilsTest {
     long millis = 123456789L;
     assertThat(
         BigQueryUtils.convertAvroFormat(
-            Schema.Field.of("dummy", Schema.FieldType.logicalType(new 
FakeSqlTimeType())),
+            Schema.Field.of("dummy", Schema.FieldType.logicalType(new 
FakeSqlTimeType())).getType(),
             millis * 1000,
             REJECT_OPTIONS),
         equalTo(new Instant(millis)));
@@ -340,7 +361,7 @@ public class BigQueryUtilsTest {
     long millis = 123456789L;
     assertThat(
         BigQueryUtils.convertAvroFormat(
-            Schema.Field.of("dummy", Schema.FieldType.logicalType(new 
FakeSqlTimeType())),
+            Schema.Field.of("dummy", Schema.FieldType.logicalType(new 
FakeSqlTimeType())).getType(),
             millis * 1000 + 123,
             TRUNCATE_OPTIONS),
         equalTo(new Instant(millis)));
@@ -422,4 +443,50 @@ public class BigQueryUtilsTest {
     Row beamRow = BigQueryUtils.toBeamRow(ARRAY_ROW_TYPE, BQ_ARRAY_ROW_ROW);
     assertEquals(ARRAY_ROW_ROW, beamRow);
   }
+
+  @Test
+  public void testToBeamRow_avro_array_row() {
+    Row flatRowExpected =
+        Row.withSchema(AVRO_FLAT_TYPE).addValues(123L, 123.456, "test", 
false).build();
+    Row expected =
+        Row.withSchema(AVRO_ARRAY_TYPE).addValues((Object) 
Arrays.asList(flatRowExpected)).build();
+    GenericData.Record record = new 
GenericData.Record(AvroUtils.toAvroSchema(AVRO_ARRAY_TYPE));
+    GenericData.Record flat = new 
GenericData.Record(AvroUtils.toAvroSchema(AVRO_FLAT_TYPE));
+    flat.put("id", 123L);
+    flat.put("value", 123.456);
+    flat.put("name", "test");
+    flat.put("valid", false);
+    record.put("rows", Arrays.asList(flat));
+    Row beamRow =
+        BigQueryUtils.toBeamRow(
+            record, AVRO_ARRAY_TYPE, 
BigQueryUtils.ConversionOptions.builder().build());
+    assertEquals(expected, beamRow);
+  }
+
+  @Test
+  public void testToBeamRow_avro_array_array_row() {
+    Row flatRowExpected =
+        Row.withSchema(AVRO_FLAT_TYPE).addValues(123L, 123.456, "test", 
false).build();
+    Row arrayRowExpected =
+        Row.withSchema(AVRO_ARRAY_TYPE).addValues((Object) 
Arrays.asList(flatRowExpected)).build();
+    Row expected =
+        Row.withSchema(AVRO_ARRAY_ARRAY_TYPE)
+            .addValues((Object) Arrays.asList(arrayRowExpected))
+            .build();
+    GenericData.Record arrayRecord =
+        new GenericData.Record(AvroUtils.toAvroSchema(AVRO_ARRAY_TYPE));
+    GenericData.Record flat = new 
GenericData.Record(AvroUtils.toAvroSchema(AVRO_FLAT_TYPE));
+    GenericData.Record record =
+        new GenericData.Record(AvroUtils.toAvroSchema(AVRO_ARRAY_ARRAY_TYPE));
+    flat.put("id", 123L);
+    flat.put("value", 123.456);
+    flat.put("name", "test");
+    flat.put("valid", false);
+    arrayRecord.put("rows", Arrays.asList(flat));
+    record.put("array_rows", Arrays.asList(arrayRecord));
+    Row beamRow =
+        BigQueryUtils.toBeamRow(
+            record, AVRO_ARRAY_ARRAY_TYPE, 
BigQueryUtils.ConversionOptions.builder().build());
+    assertEquals(expected, beamRow);
+  }
 }

Reply via email to