Repository: beam Updated Branches: refs/heads/master 29e054a8d -> 391fb77c3
Cache result of BigQuerySourceBase.split Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1533e2b9 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1533e2b9 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1533e2b9 Branch: refs/heads/master Commit: 1533e2b9bc49971929277b804587d93d8d2cae4c Parents: 29e054a Author: Eugene Kirpichov <kirpic...@google.com> Authored: Wed Apr 19 10:09:42 2017 -0700 Committer: Eugene Kirpichov <kirpic...@google.com> Committed: Wed Apr 19 11:39:21 2017 -0700 ---------------------------------------------------------------------- .../sdk/io/gcp/bigquery/BigQuerySourceBase.java | 31 +++++++++++++------- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 18 +++++------- .../sdk/io/gcp/bigquery/FakeJobService.java | 9 ++++++ 3 files changed, 37 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/1533e2b9/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java index 1b90dc3..4142da9 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java @@ -69,6 +69,8 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> { protected final BigQueryServices bqServices; protected final ValueProvider<String> executingProject; + private List<BoundedSource<TableRow>> cachedSplitResult; + BigQuerySourceBase( ValueProvider<String> jobIdToken, String extractDestinationDir, @@ -83,17 +85,24 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> { @Override public List<BoundedSource<TableRow>> split( long desiredBundleSizeBytes, PipelineOptions options) throws Exception { - BigQueryOptions bqOptions = options.as(BigQueryOptions.class); - TableReference tableToExtract = getTableToExtract(bqOptions); - JobService jobService = bqServices.getJobService(bqOptions); - String extractJobId = BigQueryIO.getExtractJobId(jobIdToken); - List<String> tempFiles = executeExtract(extractJobId, tableToExtract, jobService); - - TableSchema tableSchema = bqServices.getDatasetService(bqOptions) - .getTable(tableToExtract).getSchema(); - - cleanupTempResource(bqOptions); - return createSources(tempFiles, tableSchema); + // split() can be called multiple times, e.g. Dataflow runner may call it multiple times + // with different desiredBundleSizeBytes in case the split() call produces too many sources. + // We ignore desiredBundleSizeBytes anyway, however in any case, we should not initiate + // another BigQuery extract job for the repeated split() calls. + if (cachedSplitResult == null) { + BigQueryOptions bqOptions = options.as(BigQueryOptions.class); + TableReference tableToExtract = getTableToExtract(bqOptions); + JobService jobService = bqServices.getJobService(bqOptions); + String extractJobId = BigQueryIO.getExtractJobId(jobIdToken); + List<String> tempFiles = executeExtract(extractJobId, tableToExtract, jobService); + + TableSchema tableSchema = bqServices.getDatasetService(bqOptions) + .getTable(tableToExtract).getSchema(); + + cleanupTempResource(bqOptions); + cachedSplitResult = checkNotNull(createSources(tempFiles, tableSchema)); + } + return cachedSplitResult; } protected abstract TableReference getTableToExtract(BigQueryOptions bqOptions) throws Exception; http://git-wip-us.apache.org/repos/asf/beam/blob/1533e2b9/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index d0004e4..62c5b5f 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -28,7 +28,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; - import com.google.api.client.util.Data; import com.google.api.services.bigquery.model.Job; import com.google.api.services.bigquery.model.JobStatistics; @@ -1230,17 +1229,10 @@ public class BigQueryIOTest implements Serializable { @Test public void testBigQueryTableSourceInitSplit() throws Exception { - Job extractJob = new Job(); - JobStatistics jobStats = new JobStatistics(); - JobStatistics4 extractStats = new JobStatistics4(); - extractStats.setDestinationUriFileCounts(ImmutableList.of(1L)); - jobStats.setExtract(extractStats); - extractJob.setStatus(new JobStatus()) - .setStatistics(jobStats); - FakeDatasetService fakeDatasetService = new FakeDatasetService(); + FakeJobService fakeJobService = new FakeJobService(); FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() - .withJobService(new FakeJobService()) + .withJobService(fakeJobService) .withDatasetService(fakeDatasetService); List<TableRow> expected = ImmutableList.of( @@ -1280,8 +1272,14 @@ public class BigQueryIOTest implements Serializable { List<? extends BoundedSource<TableRow>> sources = bqSource.split(100, options); assertEquals(2, sources.size()); + // Simulate a repeated call to split(), like a Dataflow worker will sometimes do. + sources = bqSource.split(200, options); + assertEquals(2, sources.size()); BoundedSource<TableRow> actual = sources.get(0); assertThat(actual, CoreMatchers.instanceOf(TransformingSource.class)); + + // A repeated call to split() should not have caused a duplicate extract job. + assertEquals(1, fakeJobService.getNumExtractJobCalls()); } @Test http://git-wip-us.apache.org/repos/asf/beam/blob/1533e2b9/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java index a2454fb..cffd873 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java @@ -95,6 +95,7 @@ class FakeJobService implements JobService, Serializable { private static final com.google.common.collect.Table<String, String, JobInfo> allJobs = HashBasedTable.create(); + private static int numExtractJobCalls = 0; private static final com.google.common.collect.Table<String, String, List<String>> filesForLoadJobs = HashBasedTable.create(); @@ -136,6 +137,8 @@ class FakeJobService implements JobService, Serializable { checkArgument(extractConfig.getDestinationFormat().equals("AVRO"), "Only extract to AVRO is supported"); synchronized (allJobs) { + ++numExtractJobCalls; + Job job = new Job(); job.setJobReference(jobRef); job.setConfiguration(new JobConfiguration().setExtract(extractConfig)); @@ -145,6 +148,12 @@ class FakeJobService implements JobService, Serializable { } } + public int getNumExtractJobCalls() { + synchronized (allJobs) { + return numExtractJobCalls; + } + } + @Override public void startQueryJob(JobReference jobRef, JobConfigurationQuery query) throws IOException, InterruptedException {