[ 
https://issues.apache.org/jira/browse/BEAM-5105?focusedWorklogId=152059&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-152059
 ]

ASF GitHub Bot logged work on BEAM-5105:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 07/Oct/18 11:42
            Start Date: 07/Oct/18 11:42
    Worklog Time Spent: 10m 
      Work Description: reuvenlax commented on a change in pull request #6416: 
[BEAM-5105] Better parallelize BigQuery load jobs
URL: https://github.com/apache/beam/pull/6416#discussion_r223210018
 
 

 ##########
 File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
 ##########
 @@ -79,24 +86,202 @@ public RetryJobIdResult(RetryJobId jobId, boolean 
shouldRetry) {
     }
   }
 
+  // A class that waits for pending jobs, retrying them according to policy if 
they fail.
+  static class PendingJobManager {
+    private static class JobInfo {
+      private final PendingJob pendingJob;
+      @Nullable private final SerializableFunction<PendingJob, Exception> 
onSuccess;
+
+      public JobInfo(PendingJob pendingJob, SerializableFunction<PendingJob, 
Exception> onSuccess) {
+        this.pendingJob = pendingJob;
+        this.onSuccess = onSuccess;
+      }
+    }
+
+    private List<JobInfo> pendingJobs = Lists.newArrayList();
+
+    // Add a pending job and a function to call when the job has completed 
successfully.
+    PendingJobManager addPendingJob(
+        PendingJob pendingJob, @Nullable SerializableFunction<PendingJob, 
Exception> onSuccess) {
+      this.pendingJobs.add(new JobInfo(pendingJob, onSuccess));
+      return this;
+    }
+
+    void waitForDone() throws Exception {
+      BackOff backoff =
+          BackOffAdapter.toGcpBackOff(
+              FluentBackoff.DEFAULT
+                  .withMaxRetries(Integer.MAX_VALUE)
+                  .withInitialBackoff(Duration.standardSeconds(1))
+                  .withMaxBackoff(Duration.standardMinutes(1))
+                  .backoff());
+      Sleeper sleeper = Sleeper.DEFAULT;
+      while (!pendingJobs.isEmpty()) {
+        List<JobInfo> retryJobs = Lists.newArrayList();
+        for (JobInfo jobInfo : pendingJobs) {
+          if (jobInfo.pendingJob.pollJob()) {
+            // Job has completed successfully.
+            Exception e = jobInfo.onSuccess.apply(null);
+            if (e != null) {
+              throw e;
+            }
+          } else {
+            // Job failed, start it again. If it has hit the maximum number of 
retries then runJob
+            // will throw an exception.
+            jobInfo.pendingJob.runJob();
+            retryJobs.add(jobInfo);
 
 Review comment:
   good catch. fixed

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 152059)
    Time Spent: 1h 40m  (was: 1.5h)

> Move load job poll to finishBundle() method to better parallelize execution
> ---------------------------------------------------------------------------
>
>                 Key: BEAM-5105
>                 URL: https://issues.apache.org/jira/browse/BEAM-5105
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-gcp
>            Reporter: Chamikara Jayalath
>            Priority: Major
>          Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> It appears that when we write to BigQuery using WriteTablesDoFn we start a 
> load job and wait for that job to finish.
> [https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java#L318]
>  
> In cases where we are trying to write a PCollection of tables (for example, 
> when user use dynamic destinations feature) this relies on dynamic work 
> rebalancing to parallellize execution of load jobs. If the runner does not 
> support dynamic work rebalancing or does not execute dynamic work rebalancing 
> from some reason this could have significant performance drawbacks. For 
> example, scheduling times for load jobs will add up.
>  
> A better approach might be to start load jobs at process() method but wait 
> for all load jobs to finish at finishBundle() method. This will parallelize 
> any overheads as well as job execution (assuming more than one job is 
> schedule by BQ.).
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to