Repository: beam Updated Branches: refs/heads/release-2.0.0 7dfc45563 -> 8f1834432
This closes #2953 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/40bdbcb2 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/40bdbcb2 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/40bdbcb2 Branch: refs/heads/release-2.0.0 Commit: 40bdbcb28d5507c601d0791059c3f6ac9581a8f8 Parents: 7dfc455 Author: Eugene Kirpichov <kirpic...@google.com> Authored: Mon May 8 13:41:01 2017 -0700 Committer: Eugene Kirpichov <kirpic...@google.com> Committed: Mon May 8 16:00:33 2017 -0700 ---------------------------------------------------------------------- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 7 +++---- .../bigquery/DynamicDestinationsHelpers.java | 3 ++- .../sdk/io/gcp/bigquery/TableDestination.java | 13 +++++++++--- .../io/gcp/bigquery/TableDestinationCoder.java | 19 +++++++++--------- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 21 ++++++++++---------- 5 files changed, 36 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/40bdbcb2/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 304864a..8fb05ff 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -641,7 +641,6 @@ public class BigQueryIO { public static <T> Write<T> write() { return new AutoValue_BigQueryIO_Write.Builder<T>() .setValidate(true) - .setTableDescription("") .setBigQueryServices(new BigQueryServicesImpl()) .setCreateDisposition(Write.CreateDisposition.CREATE_IF_NEEDED) .setWriteDisposition(Write.WriteDisposition.WRITE_EMPTY) @@ -684,7 +683,7 @@ public class BigQueryIO { abstract CreateDisposition getCreateDisposition(); abstract WriteDisposition getWriteDisposition(); /** Table description. Default is empty. */ - abstract String getTableDescription(); + @Nullable abstract String getTableDescription(); /** An option to indicate if table validation is desired. Default is true. */ abstract boolean getValidate(); abstract BigQueryServices getBigQueryServices(); @@ -1027,8 +1026,8 @@ public class BigQueryIO { .withLabel("Table WriteDisposition")) .addIfNotDefault(DisplayData.item("validation", getValidate()) .withLabel("Validation Enabled"), true) - .addIfNotDefault(DisplayData.item("tableDescription", getTableDescription()) - .withLabel("Table Description"), ""); + .addIfNotNull(DisplayData.item("tableDescription", getTableDescription()) + .withLabel("Table Description")); } /** http://git-wip-us.apache.org/repos/asf/beam/blob/40bdbcb2/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java index 72a3314..530e2b6 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java @@ -40,9 +40,10 @@ class DynamicDestinationsHelpers { */ static class ConstantTableDestinations<T> extends DynamicDestinations<T, TableDestination> { private final ValueProvider<String> tableSpec; + @Nullable private final String tableDescription; - ConstantTableDestinations(ValueProvider<String> tableSpec, String tableDescription) { + ConstantTableDestinations(ValueProvider<String> tableSpec, @Nullable String tableDescription) { this.tableSpec = tableSpec; this.tableDescription = tableDescription; } http://git-wip-us.apache.org/repos/asf/beam/blob/40bdbcb2/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java index 7a82c54..ecf35d8 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java @@ -21,6 +21,7 @@ package org.apache.beam.sdk.io.gcp.bigquery; import com.google.api.services.bigquery.model.TableReference; import java.io.Serializable; import java.util.Objects; +import javax.annotation.Nullable; /** * Encapsulates a BigQuery table destination. @@ -28,15 +29,16 @@ import java.util.Objects; public class TableDestination implements Serializable { private static final long serialVersionUID = 1L; private final String tableSpec; + @Nullable private final String tableDescription; - public TableDestination(String tableSpec, String tableDescription) { + public TableDestination(String tableSpec, @Nullable String tableDescription) { this.tableSpec = tableSpec; this.tableDescription = tableDescription; } - public TableDestination(TableReference tableReference, String tableDescription) { + public TableDestination(TableReference tableReference, @Nullable String tableDescription) { this.tableSpec = BigQueryHelpers.toTableSpec(tableReference); this.tableDescription = tableDescription; } @@ -49,13 +51,18 @@ public class TableDestination implements Serializable { return BigQueryHelpers.parseTableSpec(tableSpec); } + @Nullable public String getTableDescription() { return tableDescription; } @Override public String toString() { - return "tableSpec: " + tableSpec + " tableDescription: " + tableDescription; + String toString = "tableSpec: " + tableSpec; + if (tableDescription != null) { + toString += " tableDescription: " + tableDescription; + } + return toString; } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/40bdbcb2/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java index 3059e2a..01bc558 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java @@ -22,13 +22,16 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.NullableCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; /** A coder for {@link TableDestination} objects. */ public class TableDestinationCoder extends AtomicCoder<TableDestination> { private static final TableDestinationCoder INSTANCE = new TableDestinationCoder(); - private static final StringUtf8Coder stringCoder = StringUtf8Coder.of(); + private static final Coder<String> tableSpecCoder = StringUtf8Coder.of(); + private static final Coder<String> tableDescriptionCoder = NullableCoder.of(StringUtf8Coder.of()); public static TableDestinationCoder of() { return INSTANCE; @@ -40,19 +43,17 @@ public class TableDestinationCoder extends AtomicCoder<TableDestination> { if (value == null) { throw new CoderException("cannot encode a null value"); } - stringCoder.encode(value.getTableSpec(), outStream, context.nested()); - stringCoder.encode(value.getTableDescription(), outStream, context.nested()); + tableSpecCoder.encode(value.getTableSpec(), outStream, context.nested()); + tableDescriptionCoder.encode(value.getTableDescription(), outStream, context); } @Override public TableDestination decode(InputStream inStream, Context context) throws IOException { - return new TableDestination( - stringCoder.decode(inStream, context.nested()), - stringCoder.decode(inStream, context.nested())); + String tableSpec = tableSpecCoder.decode(inStream, context.nested()); + String tableDescription = tableDescriptionCoder.decode(inStream, context); + return new TableDestination(tableSpec, tableDescription); } @Override - public void verifyDeterministic() throws NonDeterministicException { - return; - } + public void verifyDeterministic() throws NonDeterministicException {} } http://git-wip-us.apache.org/repos/asf/beam/blob/40bdbcb2/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index 6c15f87..0d3f000 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -595,6 +595,7 @@ public class BigQueryIOTest implements Serializable { new TableRow().set("name", "c").set("number", 3)) .withCoder(TableRowJsonCoder.of())) .apply(BigQueryIO.writeTableRows().to("dataset-id.table-id") + .withTableDescription(null) .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) .withSchema(new TableSchema().setFields( ImmutableList.of( @@ -981,7 +982,7 @@ public class BigQueryIOTest implements Serializable { BigQueryIO.writeTableRows().to("foo.com:project:somedataset.sometable"); checkWriteObject( write, "foo.com:project", "somedataset", "sometable", - null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, ""); + null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null); } @Test @@ -999,7 +1000,7 @@ public class BigQueryIOTest implements Serializable { null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, - "", + null, false); } @@ -1010,7 +1011,7 @@ public class BigQueryIOTest implements Serializable { checkWriteObject( write, null, "somedataset", "sometable", null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, - ""); + null); } @Test @@ -1022,7 +1023,7 @@ public class BigQueryIOTest implements Serializable { BigQueryIO.Write<TableRow> write = BigQueryIO.writeTableRows().to(table); checkWriteObject( write, "foo.com:project", "somedataset", "sometable", - null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, ""); + null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null); } @Test @@ -1032,7 +1033,7 @@ public class BigQueryIOTest implements Serializable { BigQueryIO.<TableRow>write().to("foo.com:project:somedataset.sometable").withSchema(schema); checkWriteObject( write, "foo.com:project", "somedataset", "sometable", - schema, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, ""); + schema, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null); } @Test @@ -1042,7 +1043,7 @@ public class BigQueryIOTest implements Serializable { .withCreateDisposition(CreateDisposition.CREATE_NEVER); checkWriteObject( write, "foo.com:project", "somedataset", "sometable", - null, CreateDisposition.CREATE_NEVER, WriteDisposition.WRITE_EMPTY, ""); + null, CreateDisposition.CREATE_NEVER, WriteDisposition.WRITE_EMPTY, null); } @Test @@ -1052,7 +1053,7 @@ public class BigQueryIOTest implements Serializable { .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED); checkWriteObject( write, "foo.com:project", "somedataset", "sometable", - null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, ""); + null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null); } @Test @@ -1062,7 +1063,7 @@ public class BigQueryIOTest implements Serializable { .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE); checkWriteObject( write, "foo.com:project", "somedataset", "sometable", - null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_TRUNCATE, ""); + null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_TRUNCATE, null); } @Test @@ -1072,7 +1073,7 @@ public class BigQueryIOTest implements Serializable { .withWriteDisposition(WriteDisposition.WRITE_APPEND); checkWriteObject( write, "foo.com:project", "somedataset", "sometable", - null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_APPEND, ""); + null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_APPEND, null); } @Test @@ -1082,7 +1083,7 @@ public class BigQueryIOTest implements Serializable { .withWriteDisposition(WriteDisposition.WRITE_EMPTY); checkWriteObject( write, "foo.com:project", "somedataset", "sometable", - null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, ""); + null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null); } @Test