Updates BigQueryTableSource to consider data in streaming buffer when determining estimated size.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b2c9fba4 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b2c9fba4 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b2c9fba4 Branch: refs/heads/mr-runner Commit: b2c9fba4dd5f7f5a0ac0045f9ff8f30d55088a34 Parents: 482d178 Author: chamik...@google.com <chamik...@google.com> Authored: Sat Oct 21 19:20:07 2017 -0700 Committer: chamik...@google.com <chamik...@google.com> Committed: Thu Nov 2 15:20:27 2017 -0700 ---------------------------------------------------------------------- .../io/gcp/bigquery/BigQueryTableSource.java | 10 ++- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 81 +++++++++++++++++++- 2 files changed, 88 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/b2c9fba4/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java index f717cb7..dbac00f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java @@ -21,6 +21,7 @@ package org.apache.beam.sdk.io.gcp.bigquery; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; +import com.google.api.services.bigquery.model.Table; import com.google.api.services.bigquery.model.TableReference; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; @@ -102,8 +103,13 @@ class BigQueryTableSource<T> extends BigQuerySourceBase<T> { TableReference table = setDefaultProjectIfAbsent(options.as(BigQueryOptions.class), BigQueryIO.JSON_FACTORY.fromString(jsonTable.get(), TableReference.class)); - Long numBytes = bqServices.getDatasetService(options.as(BigQueryOptions.class)) - .getTable(table).getNumBytes(); + Table tableRef = bqServices.getDatasetService(options.as(BigQueryOptions.class)) + .getTable(table); + Long numBytes = tableRef.getNumBytes(); + if (tableRef.getStreamingBuffer() != null) { + numBytes += tableRef.getStreamingBuffer().getEstimatedBytes().longValue(); + } + tableSizeBytes.compareAndSet(null, numBytes); } return tableSizeBytes.get(); http://git-wip-us.apache.org/repos/asf/beam/blob/b2c9fba4/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index aa818c6..5b4b7e6 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -41,6 +41,7 @@ import com.google.api.services.bigquery.model.JobStatistics; import com.google.api.services.bigquery.model.JobStatistics2; import com.google.api.services.bigquery.model.JobStatistics4; import com.google.api.services.bigquery.model.JobStatus; +import com.google.api.services.bigquery.model.Streamingbuffer; import com.google.api.services.bigquery.model.Table; import com.google.api.services.bigquery.model.TableDataInsertAllResponse; import com.google.api.services.bigquery.model.TableFieldSchema; @@ -64,6 +65,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; +import java.math.BigInteger; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; @@ -343,7 +345,7 @@ public class BigQueryIOTest implements Serializable { })); PAssert.that(output).containsInAnyOrder(ImmutableList.of(KV.of("a", 1L), KV.of("b", 2L), KV.of("c", 3L), KV.of("d", 4L), KV.of("e", 5L), KV.of("f", 6L))); - p.run(); + p.run(); } @Test @@ -1697,6 +1699,83 @@ public class BigQueryIOTest implements Serializable { } @Test + public void testEstimatedSizeWithoutStreamingBuffer() throws Exception { + FakeDatasetService fakeDatasetService = new FakeDatasetService(); + FakeJobService fakeJobService = new FakeJobService(); + FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() + .withJobService(fakeJobService) + .withDatasetService(fakeDatasetService); + + List<TableRow> data = ImmutableList.of( + new TableRow().set("name", "a").set("number", 1L), + new TableRow().set("name", "b").set("number", 2L), + new TableRow().set("name", "c").set("number", 3L), + new TableRow().set("name", "d").set("number", 4L), + new TableRow().set("name", "e").set("number", 5L), + new TableRow().set("name", "f").set("number", 6L)); + + TableReference table = BigQueryHelpers.parseTableSpec("project:data_set.table_name"); + fakeDatasetService.createDataset("project", "data_set", "", "", null); + fakeDatasetService.createTable(new Table().setTableReference(table) + .setSchema(new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema().setName("name").setType("STRING"), + new TableFieldSchema().setName("number").setType("INTEGER"))))); + fakeDatasetService.insertAll(table, data, null); + + String stepUuid = "testStepUuid"; + BoundedSource<TableRow> bqSource = BigQueryTableSource.create( + stepUuid, + StaticValueProvider.of(table), + fakeBqServices, + TableRowJsonCoder.of(), + BigQueryIO.TableRowParser.INSTANCE); + + PipelineOptions options = PipelineOptionsFactory.create(); + assertEquals(108, bqSource.getEstimatedSizeBytes(options)); + } + + @Test + public void testEstimatedSizeWithStreamingBuffer() throws Exception { + FakeDatasetService fakeDatasetService = new FakeDatasetService(); + FakeJobService fakeJobService = new FakeJobService(); + FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() + .withJobService(fakeJobService) + .withDatasetService(fakeDatasetService); + + List<TableRow> data = ImmutableList.of( + new TableRow().set("name", "a").set("number", 1L), + new TableRow().set("name", "b").set("number", 2L), + new TableRow().set("name", "c").set("number", 3L), + new TableRow().set("name", "d").set("number", 4L), + new TableRow().set("name", "e").set("number", 5L), + new TableRow().set("name", "f").set("number", 6L)); + + TableReference table = BigQueryHelpers.parseTableSpec("project:data_set.table_name"); + fakeDatasetService.createDataset("project", "data_set", "", "", null); + fakeDatasetService.createTable(new Table().setTableReference(table) + .setSchema(new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema().setName("name").setType("STRING"), + new TableFieldSchema().setName("number").setType("INTEGER")))) + .setStreamingBuffer(new Streamingbuffer().setEstimatedBytes(BigInteger.valueOf(10)))); + fakeDatasetService.insertAll(table, data, null); + + String stepUuid = "testStepUuid"; + BoundedSource<TableRow> bqSource = BigQueryTableSource.create( + stepUuid, + StaticValueProvider.of(table), + fakeBqServices, + TableRowJsonCoder.of(), + BigQueryIO.TableRowParser.INSTANCE); + + PipelineOptions options = PipelineOptionsFactory.create(); + assertEquals(118, bqSource.getEstimatedSizeBytes(options)); + } + + @Test public void testBigQueryQuerySourceInitSplit() throws Exception { TableReference dryRunTable = new TableReference();