Replace BigQueryIO.Read.from() with BigQueryIO.read().from()
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1a252a77 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1a252a77 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1a252a77 Branch: refs/heads/master Commit: 1a252a771127febe551fda5d499c7ecb3b95cf23 Parents: 1adcbae Author: Eugene Kirpichov <kirpic...@google.com> Authored: Mon Mar 13 16:15:21 2017 -0700 Committer: Thomas Groh <tg...@google.com> Committed: Tue Mar 14 15:54:36 2017 -0700 ---------------------------------------------------------------------- .../examples/cookbook/BigQueryTornadoes.java | 2 +- .../cookbook/CombinePerKeyExamples.java | 2 +- .../beam/examples/cookbook/FilterExamples.java | 2 +- .../beam/examples/cookbook/JoinExamples.java | 4 +-- .../examples/cookbook/MaxPerKeyExamples.java | 2 +- .../org/apache/beam/sdk/io/package-info.java | 2 +- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 36 +++++++++++++------- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 36 ++++++++++---------- 8 files changed, 48 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/1a252a77/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java index 079674a..d3c9167 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java @@ -156,7 +156,7 @@ public class BigQueryTornadoes { fields.add(new TableFieldSchema().setName("tornado_count").setType("INTEGER")); TableSchema schema = new TableSchema().setFields(fields); - p.apply(BigQueryIO.Read.from(options.getInput())) + p.apply(BigQueryIO.read().from(options.getInput())) .apply(new CountTornadoes()) .apply(BigQueryIO.Write .to(options.getOutput()) http://git-wip-us.apache.org/repos/asf/beam/blob/1a252a77/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java index 37f9d79..fc54b13 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java @@ -200,7 +200,7 @@ public class CombinePerKeyExamples { fields.add(new TableFieldSchema().setName("all_plays").setType("STRING")); TableSchema schema = new TableSchema().setFields(fields); - p.apply(BigQueryIO.Read.from(options.getInput())) + p.apply(BigQueryIO.read().from(options.getInput())) .apply(new PlaysForWord()) .apply(BigQueryIO.Write .to(options.getOutput()) http://git-wip-us.apache.org/repos/asf/beam/blob/1a252a77/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java index fb6b507..714a8f2 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java @@ -238,7 +238,7 @@ public class FilterExamples { TableSchema schema = buildWeatherSchemaProjection(); - p.apply(BigQueryIO.Read.from(options.getInput())) + p.apply(BigQueryIO.read().from(options.getInput())) .apply(ParDo.of(new ProjectionFn())) .apply(new BelowGlobalMean(options.getMonthFilter())) .apply(BigQueryIO.Write http://git-wip-us.apache.org/repos/asf/beam/blob/1a252a77/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java index 7cf0942..05a3ad3 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java @@ -166,8 +166,8 @@ public class JoinExamples { Pipeline p = Pipeline.create(options); // the following two 'applys' create multiple inputs to our pipeline, one for each // of our two input sources. - PCollection<TableRow> eventsTable = p.apply(BigQueryIO.Read.from(GDELT_EVENTS_TABLE)); - PCollection<TableRow> countryCodes = p.apply(BigQueryIO.Read.from(COUNTRY_CODES)); + PCollection<TableRow> eventsTable = p.apply(BigQueryIO.read().from(GDELT_EVENTS_TABLE)); + PCollection<TableRow> countryCodes = p.apply(BigQueryIO.read().from(COUNTRY_CODES)); PCollection<String> formattedResults = joinEvents(eventsTable, countryCodes); formattedResults.apply(TextIO.Write.to(options.getOutput())); p.run().waitUntilFinish(); http://git-wip-us.apache.org/repos/asf/beam/blob/1a252a77/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java index eabc42b..7e7bc72 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java @@ -149,7 +149,7 @@ public class MaxPerKeyExamples { fields.add(new TableFieldSchema().setName("max_mean_temp").setType("FLOAT")); TableSchema schema = new TableSchema().setFields(fields); - p.apply(BigQueryIO.Read.from(options.getInput())) + p.apply(BigQueryIO.read().from(options.getInput())) .apply(new MaxMeanTemp()) .apply(BigQueryIO.Write .to(options.getOutput()) http://git-wip-us.apache.org/repos/asf/beam/blob/1a252a77/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java index c4ff158..c65d7dd 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java @@ -24,7 +24,7 @@ * from existing storage: * <pre>{@code * PCollection<TableRow> inputData = pipeline.apply( - * BigQueryIO.Read.from("clouddataflow-readonly:samples.weather_stations")); + * BigQueryIO.read().from("clouddataflow-readonly:samples.weather_stations")); * }</pre> * and {@code Write} transforms that persist PCollections to external storage: * <pre> {@code http://git-wip-us.apache.org/repos/asf/beam/blob/1a252a77/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index e039c8c..dfb7ea6 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -166,7 +166,7 @@ import org.slf4j.LoggerFactory; * This produces a {@link PCollection} of {@link TableRow TableRows} as output: * <pre>{@code * PCollection<TableRow> weatherData = pipeline.apply( - * BigQueryIO.Read.from("clouddataflow-readonly:samples.weather_stations")); + * BigQueryIO.read().from("clouddataflow-readonly:samples.weather_stations")); * }</pre> * * <p>See {@link TableRow} for more information on the {@link TableRow} object. @@ -177,7 +177,7 @@ import org.slf4j.LoggerFactory; * * <pre>{@code * PCollection<TableRow> meanTemperatureData = pipeline.apply( - * BigQueryIO.Read.fromQuery("SELECT year, mean_temp FROM [samples.weather_stations]")); + * BigQueryIO.read().fromQuery("SELECT year, mean_temp FROM [samples.weather_stations]")); * }</pre> * * <p>When creating a BigQuery input transform, users should provide either a query or a table. @@ -454,6 +454,14 @@ public class BigQueryIO { * } * }}</pre> */ + public static Read read() { + return new AutoValue_BigQueryIO_Read.Builder() + .setValidate(true) + .setBigQueryServices(new BigQueryServicesImpl()) + .build(); + } + + /** Implementation of {@link #read}. */ @AutoValue public abstract static class Read extends PTransform<PBegin, PCollection<TableRow>> { @Nullable abstract ValueProvider<String> getJsonTableRef(); @@ -477,25 +485,26 @@ public class BigQueryIO { abstract Read build(); } - private static Builder builder() { - return new AutoValue_BigQueryIO_Read.Builder() - .setValidate(true) - .setBigQueryServices(new BigQueryServicesImpl()); + /** Ensures that methods of the from() / fromQuery() family are called at most once. */ + private void ensureFromNotCalledYet() { + checkState( + getJsonTableRef() == null && getQuery() == null, "from() or fromQuery() already called"); } /** * Reads a BigQuery table specified as {@code "[project_id]:[dataset_id].[table_id]"} or * {@code "[dataset_id].[table_id]"} for tables within the current project. */ - public static Read from(String tableSpec) { + public Read from(String tableSpec) { return from(StaticValueProvider.of(tableSpec)); } /** * Same as {@code from(String)}, but with a {@link ValueProvider}. */ - public static Read from(ValueProvider<String> tableSpec) { - return builder() + public Read from(ValueProvider<String> tableSpec) { + ensureFromNotCalledYet(); + return toBuilder() .setJsonTableRef( NestedValueProvider.of( NestedValueProvider.of(tableSpec, new TableSpecToTableRef()), @@ -505,21 +514,22 @@ public class BigQueryIO { /** * Reads results received after executing the given query. */ - public static Read fromQuery(String query) { + public Read fromQuery(String query) { return fromQuery(StaticValueProvider.of(query)); } /** * Same as {@code from(String)}, but with a {@link ValueProvider}. */ - public static Read fromQuery(ValueProvider<String> query) { - return builder().setQuery(query).setFlattenResults(true).setUseLegacySql(true).build(); + public Read fromQuery(ValueProvider<String> query) { + ensureFromNotCalledYet(); + return toBuilder().setQuery(query).setFlattenResults(true).setUseLegacySql(true).build(); } /** * Reads a BigQuery table specified as a {@link TableReference} object. */ - public static Read from(TableReference table) { + public Read from(TableReference table) { return from(StaticValueProvider.of(toTableSpec(table))); } http://git-wip-us.apache.org/repos/asf/beam/blob/1a252a77/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index 888d9c1..f6a7fb4 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -727,13 +727,13 @@ public class BigQueryIOTest implements Serializable { @Test public void testBuildTableBasedSource() { - BigQueryIO.Read read = BigQueryIO.Read.from("foo.com:project:somedataset.sometable"); + BigQueryIO.Read read = BigQueryIO.read().from("foo.com:project:somedataset.sometable"); checkReadTableObject(read, "foo.com:project", "somedataset", "sometable"); } @Test public void testBuildQueryBasedSource() { - BigQueryIO.Read read = BigQueryIO.Read.fromQuery("foo_query"); + BigQueryIO.Read read = BigQueryIO.read().fromQuery("foo_query"); checkReadQueryObject(read, "foo_query"); } @@ -742,7 +742,7 @@ public class BigQueryIOTest implements Serializable { // This test just checks that using withoutValidation will not trigger object // construction errors. BigQueryIO.Read read = - BigQueryIO.Read.from("foo.com:project:somedataset.sometable").withoutValidation(); + BigQueryIO.read().from("foo.com:project:somedataset.sometable").withoutValidation(); checkReadTableObjectWithValidate(read, "foo.com:project", "somedataset", "sometable", false); } @@ -751,14 +751,14 @@ public class BigQueryIOTest implements Serializable { // This test just checks that using withoutValidation will not trigger object // construction errors. BigQueryIO.Read read = - BigQueryIO.Read.fromQuery("some_query").withoutValidation(); + BigQueryIO.read().fromQuery("some_query").withoutValidation(); checkReadQueryObjectWithValidate(read, "some_query", false); } @Test public void testBuildTableBasedSourceWithDefaultProject() { BigQueryIO.Read read = - BigQueryIO.Read.from("somedataset.sometable"); + BigQueryIO.read().from("somedataset.sometable"); checkReadTableObject(read, null, "somedataset", "sometable"); } @@ -768,7 +768,7 @@ public class BigQueryIOTest implements Serializable { .setProjectId("foo.com:project") .setDatasetId("somedataset") .setTableId("sometable"); - BigQueryIO.Read read = BigQueryIO.Read.from(table); + BigQueryIO.Read read = BigQueryIO.read().from(table); checkReadTableObject(read, "foo.com:project", "somedataset", "sometable"); } @@ -800,7 +800,7 @@ public class BigQueryIOTest implements Serializable { thrown.expect(RuntimeException.class); // Message will be one of following depending on the execution environment. thrown.expectMessage(Matchers.containsString("Unsupported")); - p.apply(BigQueryIO.Read.from(tableRef) + p.apply(BigQueryIO.read().from(tableRef) .withTestServices(fakeBqServices)); } @@ -817,7 +817,7 @@ public class BigQueryIOTest implements Serializable { "Invalid BigQueryIO.Read: Specifies a table with a result flattening preference," + " which only applies to queries"); p.apply("ReadMyTable", - BigQueryIO.Read + BigQueryIO.read() .from("foo.com:project:somedataset.sometable") .withoutResultFlattening()); p.run(); @@ -836,7 +836,7 @@ public class BigQueryIOTest implements Serializable { "Invalid BigQueryIO.Read: Specifies a table with a result flattening preference," + " which only applies to queries"); p.apply( - BigQueryIO.Read + BigQueryIO.read() .from("foo.com:project:somedataset.sometable") .withoutValidation() .withoutResultFlattening()); @@ -856,7 +856,7 @@ public class BigQueryIOTest implements Serializable { "Invalid BigQueryIO.Read: Specifies a table with a SQL dialect preference," + " which only applies to queries"); p.apply( - BigQueryIO.Read + BigQueryIO.read() .from("foo.com:project:somedataset.sometable") .usingStandardSql()); p.run(); @@ -929,7 +929,7 @@ public class BigQueryIOTest implements Serializable { Pipeline p = TestPipeline.create(bqOptions); PCollection<String> output = p - .apply(BigQueryIO.Read.from("non-executing-project:somedataset.sometable") + .apply(BigQueryIO.read().from("non-executing-project:somedataset.sometable") .withTestServices(fakeBqServices) .withoutValidation()) .apply(ParDo.of(new DoFn<TableRow, String>() { @@ -1260,7 +1260,7 @@ public class BigQueryIOTest implements Serializable { public void testBuildSourceDisplayDataTable() { String tableSpec = "project:dataset.tableid"; - BigQueryIO.Read read = BigQueryIO.Read + BigQueryIO.Read read = BigQueryIO.read() .from(tableSpec) .withoutResultFlattening() .usingStandardSql() @@ -1276,7 +1276,7 @@ public class BigQueryIOTest implements Serializable { @Test public void testBuildSourceDisplayDataQuery() { - BigQueryIO.Read read = BigQueryIO.Read + BigQueryIO.Read read = BigQueryIO.read() .fromQuery("myQuery") .withoutResultFlattening() .usingStandardSql() @@ -1295,7 +1295,7 @@ public class BigQueryIOTest implements Serializable { @Ignore("[BEAM-436] DirectRunner RunnableOnService tempLocation configuration insufficient") public void testTableSourcePrimitiveDisplayData() throws IOException, InterruptedException { DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); - BigQueryIO.Read read = BigQueryIO.Read + BigQueryIO.Read read = BigQueryIO.read() .from("project:dataset.tableId") .withTestServices(new FakeBigQueryServices() .withDatasetService(mockDatasetService) @@ -1312,7 +1312,7 @@ public class BigQueryIOTest implements Serializable { @Ignore("[BEAM-436] DirectRunner RunnableOnService tempLocation configuration insufficient") public void testQuerySourcePrimitiveDisplayData() throws IOException, InterruptedException { DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); - BigQueryIO.Read read = BigQueryIO.Read + BigQueryIO.Read read = BigQueryIO.read() .fromQuery("foobar") .withTestServices(new FakeBigQueryServices() .withDatasetService(mockDatasetService) @@ -1674,7 +1674,7 @@ public class BigQueryIOTest implements Serializable { @Test public void testBigQueryIOGetName() { - assertEquals("BigQueryIO.Read", BigQueryIO.Read.from("somedataset.sometable").getName()); + assertEquals("BigQueryIO.Read", BigQueryIO.read().from("somedataset.sometable").getName()); assertEquals("BigQueryIO.Write", BigQueryIO.Write.to("somedataset.sometable").getName()); } @@ -2317,7 +2317,7 @@ public class BigQueryIOTest implements Serializable { BigQueryOptions bqOptions = options.as(BigQueryOptions.class); bqOptions.setTempLocation("gs://testbucket/testdir"); Pipeline pipeline = TestPipeline.create(options); - BigQueryIO.Read read = BigQueryIO.Read.from( + BigQueryIO.Read read = BigQueryIO.read().from( options.getInputTable()).withoutValidation(); pipeline.apply(read); // Test that this doesn't throw. @@ -2330,7 +2330,7 @@ public class BigQueryIOTest implements Serializable { BigQueryOptions bqOptions = options.as(BigQueryOptions.class); bqOptions.setTempLocation("gs://testbucket/testdir"); Pipeline pipeline = TestPipeline.create(options); - BigQueryIO.Read read = BigQueryIO.Read.fromQuery( + BigQueryIO.Read read = BigQueryIO.read().fromQuery( options.getInputQuery()).withoutValidation(); pipeline.apply(read); // Test that this doesn't throw.