Repository: incubator-beam Updated Branches: refs/heads/master 495d8b358 -> 8245f9b4b
Support BigQuery DATE type Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/287061b7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/287061b7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/287061b7 Branch: refs/heads/master Commit: 287061b75ddaf6908dc14726876059443083f62c Parents: 495d8b3 Author: Pei He <pe...@google.com> Authored: Wed Sep 21 19:56:01 2016 -0700 Committer: Luke Cwik <lc...@google.com> Committed: Thu Sep 22 14:25:51 2016 -0700 ---------------------------------------------------------------------- .../beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java | 4 ++++ .../sdk/io/gcp/bigquery/BigQueryTableRowIterator.java | 2 +- .../beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java | 11 ++++++++--- .../io/gcp/bigquery/BigQueryTableRowIteratorTest.java | 9 ++++++--- 4 files changed, 19 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/287061b7/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java index d9b5423..2cc2df7 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java @@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.gcp.bigquery; import static com.google.common.base.MoreObjects.firstNonNull; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Verify.verify; +import static com.google.common.base.Verify.verifyNotNull; import com.google.api.services.bigquery.model.TableFieldSchema; import com.google.api.services.bigquery.model.TableRow; @@ -162,11 +163,13 @@ class BigQueryAvroUtils { .put("BOOLEAN", Type.BOOLEAN) .put("TIMESTAMP", Type.LONG) .put("RECORD", Type.RECORD) + .put("DATE", Type.STRING) .build(); // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the type field // is required, so it may not be null. String bqType = fieldSchema.getType(); Type expectedAvroType = fieldMap.get(bqType); + verifyNotNull(expectedAvroType, "Unsupported BigQuery type: %s", bqType); verify( avroType == expectedAvroType, "Expected Avro schema type %s, not %s, for BigQuery %s field %s", @@ -176,6 +179,7 @@ class BigQueryAvroUtils { fieldSchema.getName()); switch (fieldSchema.getType()) { case "STRING": + case "DATE": // Avro will use a CharSequence to represent String objects, but it may not always use // java.lang.String; for example, it may prefer org.apache.avro.util.Utf8. verify(v instanceof CharSequence, "Expected CharSequence (String), got %s", v.getClass()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/287061b7/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java index 420f30c..d7423a1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java @@ -253,7 +253,7 @@ class BigQueryTableRowIterator implements AutoCloseable { return BigQueryAvroUtils.formatTimestamp((String) v); } - // Returns the original value for String and base64 encoded BYTES + // Returns the original value for String, base64 encoded BYTES, and DATE string. return v; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/287061b7/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java index b9199b0..6924732 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java @@ -32,6 +32,7 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.avro.reflect.Nullable; +import org.apache.avro.util.Utf8; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.DefaultCoder; import org.junit.Test; @@ -65,6 +66,7 @@ public class BigQueryAvroUtilsTest { new TableFieldSchema().setName("birthday").setType("TIMESTAMP").setMode("NULLABLE"), new TableFieldSchema().setName("flighted").setType("BOOLEAN").setMode("NULLABLE"), new TableFieldSchema().setName("sound").setType("BYTES").setMode("NULLABLE"), + new TableFieldSchema().setName("anniversary").setType("DATE").setMode("NULLABLE"), new TableFieldSchema().setName("scion").setType("RECORD").setMode("NULLABLE") .setFields(subFields), new TableFieldSchema().setName("associates").setType("RECORD").setMode("REPEATED") @@ -83,16 +85,17 @@ public class BigQueryAvroUtilsTest { assertEquals(row, convertedRow); } { - // Test type conversion for TIMESTAMP, INTEGER, BOOLEAN, BYTES, and FLOAT. + // Test type conversion for INTEGER, FLOAT, TIMESTAMP, BOOLEAN, BYTES, and DATE. GenericRecord record = new GenericData.Record(avroSchema); byte[] soundBytes = "chirp,chirp".getBytes(); - ByteBuffer soundByteBuffer = ByteBuffer.allocate(soundBytes.length).put(soundBytes); + ByteBuffer soundByteBuffer = ByteBuffer.wrap(soundBytes); soundByteBuffer.rewind(); record.put("number", 5L); record.put("quality", 5.0); record.put("birthday", 5L); record.put("flighted", Boolean.TRUE); record.put("sound", soundByteBuffer); + record.put("anniversary", new Utf8("2000-01-01")); TableRow convertedRow = BigQueryAvroUtils.convertGenericRecordToTableRow(record, tableSchema); TableRow row = new TableRow() .set("number", "5") @@ -100,7 +103,8 @@ public class BigQueryAvroUtilsTest { .set("quality", 5.0) .set("associates", new ArrayList<TableRow>()) .set("flighted", Boolean.TRUE) - .set("sound", BaseEncoding.base64().encode(soundBytes)); + .set("sound", BaseEncoding.base64().encode(soundBytes)) + .set("anniversary", "2000-01-01"); assertEquals(row, convertedRow); } { @@ -133,6 +137,7 @@ public class BigQueryAvroUtilsTest { @Nullable Long birthday; // Exercises TIMESTAMP. @Nullable Boolean flighted; @Nullable ByteBuffer sound; + @Nullable Utf8 anniversary; @Nullable SubBird scion; SubBird[] associates; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/287061b7/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIteratorTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIteratorTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIteratorTest.java index 4f0cac9..7bd24df 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIteratorTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIteratorTest.java @@ -124,7 +124,8 @@ public class BigQueryTableRowIteratorTest { Arrays.asList( new TableFieldSchema().setName("name").setType("STRING"), new TableFieldSchema().setName("answer").setType("INTEGER"), - new TableFieldSchema().setName("photo").setType("BYTES")))); + new TableFieldSchema().setName("photo").setType("BYTES"), + new TableFieldSchema().setName("anniversary").setType("DATE")))); } private TableRow rawRow(Object... args) { @@ -168,10 +169,10 @@ public class BigQueryTableRowIteratorTest { String photoBytesEncoded = BaseEncoding.base64().encode(photoBytes); // Mock table data fetch. when(mockTabledataList.execute()) - .thenReturn(rawDataList(rawRow("Arthur", 42, photoBytesEncoded))); + .thenReturn(rawDataList(rawRow("Arthur", 42, photoBytesEncoded, "2000-01-01"))); // Run query and verify - String query = "SELECT name, count, photoBytes from table"; + String query = "SELECT name, count, photo, anniversary from table"; try (BigQueryTableRowIterator iterator = BigQueryTableRowIterator.fromQuery(query, "project", mockClient, null)) { iterator.open(); @@ -181,9 +182,11 @@ public class BigQueryTableRowIteratorTest { assertTrue(row.containsKey("name")); assertTrue(row.containsKey("answer")); assertTrue(row.containsKey("photo")); + assertTrue(row.containsKey("anniversary")); assertEquals("Arthur", row.get("name")); assertEquals(42, row.get("answer")); assertEquals(photoBytesEncoded, row.get("photo")); + assertEquals("2000-01-01", row.get("anniversary")); assertFalse(iterator.advance()); }