Repository: incubator-beam Updated Branches: refs/heads/master 84e8bfb13 -> 0800a8cb2
Forward port DataflowJavaSDK-351 to Beam Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9febae72 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9febae72 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9febae72 Branch: refs/heads/master Commit: 9febae72666ba23cedbfbefdf2a19f6b9d6f14bf Parents: 84e8bfb Author: Pei He <pe...@google.com> Authored: Mon Sep 12 21:02:35 2016 -0700 Committer: Luke Cwik <lc...@google.com> Committed: Tue Sep 13 11:16:56 2016 -0700 ---------------------------------------------------------------------- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 8 +-- .../sdk/io/gcp/bigquery/BigQueryServices.java | 7 +++ .../io/gcp/bigquery/BigQueryServicesImpl.java | 46 ++++++++++++++++ .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 42 ++++++++++++++- .../gcp/bigquery/BigQueryServicesImplTest.java | 56 ++++++++++++++++++++ 5 files changed, 152 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9febae72/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 1306e59..91f6073 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -390,10 +390,6 @@ public class BigQueryIO { "Validation of query \"%1$s\" failed. If the query depends on an earlier stage of the" + " pipeline, This validation can be disabled using #withoutValidation."; - // The maximum number of retries to poll a BigQuery job in the cleanup phase. - // We expect the jobs have already DONE, and don't need a high max retires. - private static final int CLEANUP_JOB_POLL_MAX_RETRIES = 10; - private Bound() { this( null /* name */, @@ -582,8 +578,8 @@ public class BigQueryIO { JobReference jobRef = new JobReference() .setProjectId(executingProject) .setJobId(getExtractJobId(jobIdToken)); - Job extractJob = bqServices.getJobService(bqOptions).pollJob( - jobRef, CLEANUP_JOB_POLL_MAX_RETRIES); + Job extractJob = bqServices.getJobService(bqOptions) + .getJob(jobRef); Collection<String> extractFiles = null; if (extractJob != null) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9febae72/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java index c0951fc..16b3a39 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java @@ -99,6 +99,13 @@ interface BigQueryServices extends Serializable { */ JobStatistics dryRunQuery(String projectId, String query) throws InterruptedException, IOException; + + /** + * Gets the specified {@link Job} by the given {@link JobReference}. + * + * Returns null if the job is not found. + */ + Job getJob(JobReference jobRef) throws IOException, InterruptedException; } /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9febae72/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index 20dadff..7d98401 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -283,6 +283,52 @@ class BigQueryServicesImpl implements BigQueryServices { Sleeper.DEFAULT, backoff).getStatistics(); } + + /** + * {@inheritDoc} + * + * <p>Retries the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. + * + * @throws IOException if it exceeds max RPC retries. + */ + @Override + public Job getJob(JobReference jobRef) throws IOException, InterruptedException { + BackOff backoff = + FluentBackoff.DEFAULT + .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff(); + return getJob(jobRef, Sleeper.DEFAULT, backoff); + } + + @VisibleForTesting + public Job getJob(JobReference jobRef, Sleeper sleeper, BackOff backoff) + throws IOException, InterruptedException { + String jobId = jobRef.getJobId(); + Exception lastException; + do { + try { + return client.jobs().get(jobRef.getProjectId(), jobId).execute(); + } catch (GoogleJsonResponseException e) { + if (errorExtractor.itemNotFound(e)) { + LOG.info("No BigQuery job with job id {} found.", jobId); + return null; + } + LOG.warn( + "Ignoring the error encountered while trying to query the BigQuery job {}", + jobId, e); + lastException = e; + } catch (IOException e) { + LOG.warn( + "Ignoring the error encountered while trying to query the BigQuery job {}", + jobId, e); + lastException = e; + } + } while (nextBackOff(sleeper, backoff)); + throw new IOException( + String.format( + "Unable to find BigQuery job: %s, aborting after %d retries.", + jobRef, MAX_RPC_RETRIES), + lastException); + } } @VisibleForTesting http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9febae72/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index 19eeca5..b8dcdc8 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -229,17 +229,21 @@ public class BigQueryIOTest implements Serializable { private Object[] startJobReturns; private Object[] pollJobReturns; + private Object[] getJobReturns; private String executingProject; // Both counts will be reset back to zeros after serialization. // This is a work around for DoFn's verifyUnmodified check. private transient int startJobCallsCount; private transient int pollJobStatusCallsCount; + private transient int getJobCallsCount; public FakeJobService() { this.startJobReturns = new Object[0]; this.pollJobReturns = new Object[0]; + this.getJobReturns = new Object[0]; this.startJobCallsCount = 0; this.pollJobStatusCallsCount = 0; + this.getJobCallsCount = 0; } /** @@ -254,6 +258,16 @@ public class BigQueryIOTest implements Serializable { } /** + * Sets the return values to mock {@link JobService#getJob}. + * + * <p>Throws if the {@link Object} is a {@link InterruptedException}, returns otherwise. + */ + public FakeJobService getJobReturns(Object... getJobReturns) { + this.getJobReturns = getJobReturns; + return this; + } + + /** * Sets the return values to mock {@link JobService#pollJob}. * * <p>Throws if the {@link Object} is a {@link Exception}, returns otherwise. @@ -350,6 +364,32 @@ public class BigQueryIOTest implements Serializable { throws InterruptedException, IOException { throw new UnsupportedOperationException(); } + + @Override + public Job getJob(JobReference jobRef) throws InterruptedException { + if (!Strings.isNullOrEmpty(executingProject)) { + checkArgument( + jobRef.getProjectId().equals(executingProject), + "Project id: %s is not equal to executing project: %s", + jobRef.getProjectId(), executingProject); + } + + if (getJobCallsCount < getJobReturns.length) { + Object ret = getJobReturns[getJobCallsCount++]; + if (ret == null) { + return null; + } else if (ret instanceof Job) { + return (Job) ret; + } else if (ret instanceof InterruptedException) { + throw (InterruptedException) ret; + } else { + throw new RuntimeException("Unexpected return type: " + ret.getClass()); + } + } else { + throw new RuntimeException( + "Exceeded expected number of calls: " + getJobReturns.length); + } + } } @Rule public transient ExpectedException thrown = ExpectedException.none(); @@ -570,7 +610,7 @@ public class BigQueryIOTest implements Serializable { FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() .withJobService(new FakeJobService() .startJobReturns("done", "done") - .pollJobReturns(Status.UNKNOWN) + .getJobReturns((Job) null) .verifyExecutingProject(bqOptions.getProject())) .readerReturns( toJsonString(new TableRow().set("name", "a").set("number", 1)), http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9febae72/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java index 16cb004..fb472fc 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java @@ -251,6 +251,62 @@ public class BigQueryServicesImplTest { } @Test + public void testGetJobSucceeds() throws Exception { + Job testJob = new Job(); + testJob.setStatus(new JobStatus()); + + when(response.getContentType()).thenReturn(Json.MEDIA_TYPE); + when(response.getStatusCode()).thenReturn(200); + when(response.getContent()).thenReturn(toStream(testJob)); + + BigQueryServicesImpl.JobServiceImpl jobService = + new BigQueryServicesImpl.JobServiceImpl(bigquery); + JobReference jobRef = new JobReference() + .setProjectId("projectId") + .setJobId("jobId"); + Job job = jobService.getJob(jobRef, Sleeper.DEFAULT, BackOff.ZERO_BACKOFF); + + assertEquals(testJob, job); + verify(response, times(1)).getStatusCode(); + verify(response, times(1)).getContent(); + verify(response, times(1)).getContentType(); + } + + @Test + public void testGetJobNotFound() throws Exception { + when(response.getContentType()).thenReturn(Json.MEDIA_TYPE); + when(response.getStatusCode()).thenReturn(404); + + BigQueryServicesImpl.JobServiceImpl jobService = + new BigQueryServicesImpl.JobServiceImpl(bigquery); + JobReference jobRef = new JobReference() + .setProjectId("projectId") + .setJobId("jobId"); + Job job = jobService.getJob(jobRef, Sleeper.DEFAULT, BackOff.ZERO_BACKOFF); + + assertEquals(null, job); + verify(response, times(1)).getStatusCode(); + verify(response, times(1)).getContent(); + verify(response, times(1)).getContentType(); + } + + @Test + public void testGetJobThrows() throws Exception { + when(response.getContentType()).thenReturn(Json.MEDIA_TYPE); + when(response.getStatusCode()).thenReturn(401); + + BigQueryServicesImpl.JobServiceImpl jobService = + new BigQueryServicesImpl.JobServiceImpl(bigquery); + JobReference jobRef = new JobReference() + .setProjectId("projectId") + .setJobId("jobId"); + thrown.expect(IOException.class); + thrown.expectMessage(String.format("Unable to find BigQuery job: %s", jobRef)); + + jobService.getJob(jobRef, Sleeper.DEFAULT, BackOff.STOP_BACKOFF); + } + + @Test public void testExecuteWithRetries() throws IOException, InterruptedException { Table testTable = new Table();