Repository: incubator-beam
Updated Branches:
  refs/heads/master 495d8b358 -> 8245f9b4b


Support BigQuery DATE type


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/287061b7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/287061b7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/287061b7

Branch: refs/heads/master
Commit: 287061b75ddaf6908dc14726876059443083f62c
Parents: 495d8b3
Author: Pei He <pe...@google.com>
Authored: Wed Sep 21 19:56:01 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu Sep 22 14:25:51 2016 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java      |  4 ++++
 .../sdk/io/gcp/bigquery/BigQueryTableRowIterator.java    |  2 +-
 .../beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java  | 11 ++++++++---
 .../io/gcp/bigquery/BigQueryTableRowIteratorTest.java    |  9 ++++++---
 4 files changed, 19 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/287061b7/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
index d9b5423..2cc2df7 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.gcp.bigquery;
 import static com.google.common.base.MoreObjects.firstNonNull;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Verify.verify;
+import static com.google.common.base.Verify.verifyNotNull;
 
 import com.google.api.services.bigquery.model.TableFieldSchema;
 import com.google.api.services.bigquery.model.TableRow;
@@ -162,11 +163,13 @@ class BigQueryAvroUtils {
             .put("BOOLEAN", Type.BOOLEAN)
             .put("TIMESTAMP", Type.LONG)
             .put("RECORD", Type.RECORD)
+            .put("DATE", Type.STRING)
             .build();
     // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, 
the type field
     // is required, so it may not be null.
     String bqType = fieldSchema.getType();
     Type expectedAvroType = fieldMap.get(bqType);
+    verifyNotNull(expectedAvroType, "Unsupported BigQuery type: %s", bqType);
     verify(
         avroType == expectedAvroType,
         "Expected Avro schema type %s, not %s, for BigQuery %s field %s",
@@ -176,6 +179,7 @@ class BigQueryAvroUtils {
         fieldSchema.getName());
     switch (fieldSchema.getType()) {
       case "STRING":
+      case "DATE":
         // Avro will use a CharSequence to represent String objects, but it 
may not always use
         // java.lang.String; for example, it may prefer 
org.apache.avro.util.Utf8.
         verify(v instanceof CharSequence, "Expected CharSequence (String), got 
%s", v.getClass());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/287061b7/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java
index 420f30c..d7423a1 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java
@@ -253,7 +253,7 @@ class BigQueryTableRowIterator implements AutoCloseable {
       return BigQueryAvroUtils.formatTimestamp((String) v);
     }
 
-    // Returns the original value for String and base64 encoded BYTES
+    // Returns the original value for String, base64 encoded BYTES, and DATE 
string.
     return v;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/287061b7/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java
index b9199b0..6924732 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java
@@ -32,6 +32,7 @@ import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.reflect.Nullable;
+import org.apache.avro.util.Utf8;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.DefaultCoder;
 import org.junit.Test;
@@ -65,6 +66,7 @@ public class BigQueryAvroUtilsTest {
             new 
TableFieldSchema().setName("birthday").setType("TIMESTAMP").setMode("NULLABLE"),
             new 
TableFieldSchema().setName("flighted").setType("BOOLEAN").setMode("NULLABLE"),
             new 
TableFieldSchema().setName("sound").setType("BYTES").setMode("NULLABLE"),
+            new 
TableFieldSchema().setName("anniversary").setType("DATE").setMode("NULLABLE"),
             new 
TableFieldSchema().setName("scion").setType("RECORD").setMode("NULLABLE")
                 .setFields(subFields),
             new 
TableFieldSchema().setName("associates").setType("RECORD").setMode("REPEATED")
@@ -83,16 +85,17 @@ public class BigQueryAvroUtilsTest {
       assertEquals(row, convertedRow);
     }
     {
-      // Test type conversion for TIMESTAMP, INTEGER, BOOLEAN, BYTES, and 
FLOAT.
+      // Test type conversion for INTEGER, FLOAT, TIMESTAMP, BOOLEAN, BYTES, 
and DATE.
       GenericRecord record = new GenericData.Record(avroSchema);
       byte[] soundBytes = "chirp,chirp".getBytes();
-      ByteBuffer soundByteBuffer = 
ByteBuffer.allocate(soundBytes.length).put(soundBytes);
+      ByteBuffer soundByteBuffer = ByteBuffer.wrap(soundBytes);
       soundByteBuffer.rewind();
       record.put("number", 5L);
       record.put("quality", 5.0);
       record.put("birthday", 5L);
       record.put("flighted", Boolean.TRUE);
       record.put("sound", soundByteBuffer);
+      record.put("anniversary", new Utf8("2000-01-01"));
       TableRow convertedRow = 
BigQueryAvroUtils.convertGenericRecordToTableRow(record, tableSchema);
       TableRow row = new TableRow()
           .set("number", "5")
@@ -100,7 +103,8 @@ public class BigQueryAvroUtilsTest {
           .set("quality", 5.0)
           .set("associates", new ArrayList<TableRow>())
           .set("flighted", Boolean.TRUE)
-          .set("sound", BaseEncoding.base64().encode(soundBytes));
+          .set("sound", BaseEncoding.base64().encode(soundBytes))
+          .set("anniversary", "2000-01-01");
       assertEquals(row, convertedRow);
     }
     {
@@ -133,6 +137,7 @@ public class BigQueryAvroUtilsTest {
     @Nullable Long birthday;  // Exercises TIMESTAMP.
     @Nullable Boolean flighted;
     @Nullable ByteBuffer sound;
+    @Nullable Utf8 anniversary;
     @Nullable SubBird scion;
     SubBird[] associates;
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/287061b7/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIteratorTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIteratorTest.java
index 4f0cac9..7bd24df 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIteratorTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIteratorTest.java
@@ -124,7 +124,8 @@ public class BigQueryTableRowIteratorTest {
                     Arrays.asList(
                         new 
TableFieldSchema().setName("name").setType("STRING"),
                         new 
TableFieldSchema().setName("answer").setType("INTEGER"),
-                        new 
TableFieldSchema().setName("photo").setType("BYTES"))));
+                        new 
TableFieldSchema().setName("photo").setType("BYTES"),
+                        new 
TableFieldSchema().setName("anniversary").setType("DATE"))));
   }
 
   private TableRow rawRow(Object... args) {
@@ -168,10 +169,10 @@ public class BigQueryTableRowIteratorTest {
     String photoBytesEncoded = BaseEncoding.base64().encode(photoBytes);
     // Mock table data fetch.
     when(mockTabledataList.execute())
-        .thenReturn(rawDataList(rawRow("Arthur", 42, photoBytesEncoded)));
+        .thenReturn(rawDataList(rawRow("Arthur", 42, photoBytesEncoded, 
"2000-01-01")));
 
     // Run query and verify
-    String query = "SELECT name, count, photoBytes from table";
+    String query = "SELECT name, count, photo, anniversary from table";
     try (BigQueryTableRowIterator iterator =
             BigQueryTableRowIterator.fromQuery(query, "project", mockClient, 
null)) {
       iterator.open();
@@ -181,9 +182,11 @@ public class BigQueryTableRowIteratorTest {
       assertTrue(row.containsKey("name"));
       assertTrue(row.containsKey("answer"));
       assertTrue(row.containsKey("photo"));
+      assertTrue(row.containsKey("anniversary"));
       assertEquals("Arthur", row.get("name"));
       assertEquals(42, row.get("answer"));
       assertEquals(photoBytesEncoded, row.get("photo"));
+      assertEquals("2000-01-01", row.get("anniversary"));
 
       assertFalse(iterator.advance());
     }

Reply via email to