Repository: incubator-beam
Updated Branches:
  refs/heads/master 84e8bfb13 -> 0800a8cb2


Forward port DataflowJavaSDK-351 to Beam


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9febae72
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9febae72
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9febae72

Branch: refs/heads/master
Commit: 9febae72666ba23cedbfbefdf2a19f6b9d6f14bf
Parents: 84e8bfb
Author: Pei He <pe...@google.com>
Authored: Mon Sep 12 21:02:35 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Sep 13 11:16:56 2016 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    |  8 +--
 .../sdk/io/gcp/bigquery/BigQueryServices.java   |  7 +++
 .../io/gcp/bigquery/BigQueryServicesImpl.java   | 46 ++++++++++++++++
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     | 42 ++++++++++++++-
 .../gcp/bigquery/BigQueryServicesImplTest.java  | 56 ++++++++++++++++++++
 5 files changed, 152 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9febae72/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 1306e59..91f6073 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -390,10 +390,6 @@ public class BigQueryIO {
           "Validation of query \"%1$s\" failed. If the query depends on an 
earlier stage of the"
           + " pipeline, This validation can be disabled using 
#withoutValidation.";
 
-      // The maximum number of retries to poll a BigQuery job in the cleanup 
phase.
-      // We expect the jobs have already DONE, and don't need a high max 
retires.
-      private static final int CLEANUP_JOB_POLL_MAX_RETRIES = 10;
-
       private Bound() {
         this(
             null /* name */,
@@ -582,8 +578,8 @@ public class BigQueryIO {
                 JobReference jobRef = new JobReference()
                     .setProjectId(executingProject)
                     .setJobId(getExtractJobId(jobIdToken));
-                Job extractJob = bqServices.getJobService(bqOptions).pollJob(
-                    jobRef, CLEANUP_JOB_POLL_MAX_RETRIES);
+                Job extractJob = bqServices.getJobService(bqOptions)
+                    .getJob(jobRef);
 
                 Collection<String> extractFiles = null;
                 if (extractJob != null) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9febae72/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
index c0951fc..16b3a39 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
@@ -99,6 +99,13 @@ interface BigQueryServices extends Serializable {
      */
     JobStatistics dryRunQuery(String projectId, String query)
         throws InterruptedException, IOException;
+
+    /**
+     * Gets the specified {@link Job} by the given {@link JobReference}.
+     *
+     * Returns null if the job is not found.
+     */
+    Job getJob(JobReference jobRef) throws IOException, InterruptedException;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9febae72/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index 20dadff..7d98401 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -283,6 +283,52 @@ class BigQueryServicesImpl implements BigQueryServices {
           Sleeper.DEFAULT,
           backoff).getStatistics();
     }
+
+    /**
+     * {@inheritDoc}
+     *
+     * <p>Retries the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it 
succeeds.
+     *
+     * @throws IOException if it exceeds max RPC retries.
+     */
+    @Override
+    public Job getJob(JobReference jobRef) throws IOException, 
InterruptedException {
+      BackOff backoff =
+          FluentBackoff.DEFAULT
+              
.withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff();
+     return getJob(jobRef, Sleeper.DEFAULT, backoff);
+    }
+
+    @VisibleForTesting
+    public Job getJob(JobReference jobRef, Sleeper sleeper, BackOff backoff)
+        throws IOException, InterruptedException {
+      String jobId = jobRef.getJobId();
+      Exception lastException;
+      do {
+        try {
+          return client.jobs().get(jobRef.getProjectId(), jobId).execute();
+        } catch (GoogleJsonResponseException e) {
+          if (errorExtractor.itemNotFound(e)) {
+            LOG.info("No BigQuery job with job id {} found.", jobId);
+            return null;
+          }
+          LOG.warn(
+              "Ignoring the error encountered while trying to query the 
BigQuery job {}",
+              jobId, e);
+          lastException = e;
+        } catch (IOException e) {
+          LOG.warn(
+              "Ignoring the error encountered while trying to query the 
BigQuery job {}",
+              jobId, e);
+          lastException = e;
+        }
+      } while (nextBackOff(sleeper, backoff));
+      throw new IOException(
+          String.format(
+              "Unable to find BigQuery job: %s, aborting after %d retries.",
+              jobRef, MAX_RPC_RETRIES),
+          lastException);
+    }
   }
 
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9febae72/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 19eeca5..b8dcdc8 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -229,17 +229,21 @@ public class BigQueryIOTest implements Serializable {
 
     private Object[] startJobReturns;
     private Object[] pollJobReturns;
+    private Object[] getJobReturns;
     private String executingProject;
     // Both counts will be reset back to zeros after serialization.
     // This is a work around for DoFn's verifyUnmodified check.
     private transient int startJobCallsCount;
     private transient int pollJobStatusCallsCount;
+    private transient int getJobCallsCount;
 
     public FakeJobService() {
       this.startJobReturns = new Object[0];
       this.pollJobReturns = new Object[0];
+      this.getJobReturns = new Object[0];
       this.startJobCallsCount = 0;
       this.pollJobStatusCallsCount = 0;
+      this.getJobCallsCount = 0;
     }
 
     /**
@@ -254,6 +258,16 @@ public class BigQueryIOTest implements Serializable {
     }
 
     /**
+     * Sets the return values to mock {@link JobService#getJob}.
+     *
+     * <p>Throws if the {@link Object} is a {@link InterruptedException}, 
returns otherwise.
+     */
+    public FakeJobService getJobReturns(Object... getJobReturns) {
+      this.getJobReturns = getJobReturns;
+      return this;
+    }
+
+    /**
      * Sets the return values to mock {@link JobService#pollJob}.
      *
      * <p>Throws if the {@link Object} is a {@link Exception}, returns 
otherwise.
@@ -350,6 +364,32 @@ public class BigQueryIOTest implements Serializable {
         throws InterruptedException, IOException {
       throw new UnsupportedOperationException();
     }
+
+    @Override
+    public Job getJob(JobReference jobRef) throws InterruptedException {
+      if (!Strings.isNullOrEmpty(executingProject)) {
+        checkArgument(
+            jobRef.getProjectId().equals(executingProject),
+            "Project id: %s is not equal to executing project: %s",
+            jobRef.getProjectId(), executingProject);
+      }
+
+      if (getJobCallsCount < getJobReturns.length) {
+        Object ret = getJobReturns[getJobCallsCount++];
+        if (ret == null) {
+          return null;
+        } else if (ret instanceof Job) {
+          return (Job) ret;
+        } else if (ret instanceof InterruptedException) {
+          throw (InterruptedException) ret;
+        } else {
+          throw new RuntimeException("Unexpected return type: " + 
ret.getClass());
+        }
+      } else {
+        throw new RuntimeException(
+            "Exceeded expected number of calls: " + getJobReturns.length);
+      }
+    }
   }
 
   @Rule public transient ExpectedException thrown = ExpectedException.none();
@@ -570,7 +610,7 @@ public class BigQueryIOTest implements Serializable {
     FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
         .withJobService(new FakeJobService()
             .startJobReturns("done", "done")
-            .pollJobReturns(Status.UNKNOWN)
+            .getJobReturns((Job) null)
             .verifyExecutingProject(bqOptions.getProject()))
         .readerReturns(
             toJsonString(new TableRow().set("name", "a").set("number", 1)),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9febae72/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
index 16cb004..fb472fc 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
@@ -251,6 +251,62 @@ public class BigQueryServicesImplTest {
   }
 
   @Test
+  public void testGetJobSucceeds() throws Exception {
+    Job testJob = new Job();
+    testJob.setStatus(new JobStatus());
+
+    when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+    when(response.getStatusCode()).thenReturn(200);
+    when(response.getContent()).thenReturn(toStream(testJob));
+
+    BigQueryServicesImpl.JobServiceImpl jobService =
+        new BigQueryServicesImpl.JobServiceImpl(bigquery);
+    JobReference jobRef = new JobReference()
+        .setProjectId("projectId")
+        .setJobId("jobId");
+    Job job = jobService.getJob(jobRef, Sleeper.DEFAULT, BackOff.ZERO_BACKOFF);
+
+    assertEquals(testJob, job);
+    verify(response, times(1)).getStatusCode();
+    verify(response, times(1)).getContent();
+    verify(response, times(1)).getContentType();
+  }
+
+  @Test
+  public void testGetJobNotFound() throws Exception {
+    when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+    when(response.getStatusCode()).thenReturn(404);
+
+    BigQueryServicesImpl.JobServiceImpl jobService =
+        new BigQueryServicesImpl.JobServiceImpl(bigquery);
+    JobReference jobRef = new JobReference()
+        .setProjectId("projectId")
+        .setJobId("jobId");
+    Job job = jobService.getJob(jobRef, Sleeper.DEFAULT, BackOff.ZERO_BACKOFF);
+
+    assertEquals(null, job);
+    verify(response, times(1)).getStatusCode();
+    verify(response, times(1)).getContent();
+    verify(response, times(1)).getContentType();
+  }
+
+  @Test
+  public void testGetJobThrows() throws Exception {
+    when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+    when(response.getStatusCode()).thenReturn(401);
+
+    BigQueryServicesImpl.JobServiceImpl jobService =
+        new BigQueryServicesImpl.JobServiceImpl(bigquery);
+    JobReference jobRef = new JobReference()
+        .setProjectId("projectId")
+        .setJobId("jobId");
+    thrown.expect(IOException.class);
+    thrown.expectMessage(String.format("Unable to find BigQuery job: %s", 
jobRef));
+
+    jobService.getJob(jobRef, Sleeper.DEFAULT, BackOff.STOP_BACKOFF);
+  }
+
+  @Test
   public void testExecuteWithRetries() throws IOException, 
InterruptedException {
     Table testTable = new Table();
 

Reply via email to