[ https://issues.apache.org/jira/browse/BEAM-5105?focusedWorklogId=152059&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-152059 ]
ASF GitHub Bot logged work on BEAM-5105: ---------------------------------------- Author: ASF GitHub Bot Created on: 07/Oct/18 11:42 Start Date: 07/Oct/18 11:42 Worklog Time Spent: 10m Work Description: reuvenlax commented on a change in pull request #6416: [BEAM-5105] Better parallelize BigQuery load jobs URL: https://github.com/apache/beam/pull/6416#discussion_r223210018 ########## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java ########## @@ -79,24 +86,202 @@ public RetryJobIdResult(RetryJobId jobId, boolean shouldRetry) { } } + // A class that waits for pending jobs, retrying them according to policy if they fail. + static class PendingJobManager { + private static class JobInfo { + private final PendingJob pendingJob; + @Nullable private final SerializableFunction<PendingJob, Exception> onSuccess; + + public JobInfo(PendingJob pendingJob, SerializableFunction<PendingJob, Exception> onSuccess) { + this.pendingJob = pendingJob; + this.onSuccess = onSuccess; + } + } + + private List<JobInfo> pendingJobs = Lists.newArrayList(); + + // Add a pending job and a function to call when the job has completed successfully. + PendingJobManager addPendingJob( + PendingJob pendingJob, @Nullable SerializableFunction<PendingJob, Exception> onSuccess) { + this.pendingJobs.add(new JobInfo(pendingJob, onSuccess)); + return this; + } + + void waitForDone() throws Exception { + BackOff backoff = + BackOffAdapter.toGcpBackOff( + FluentBackoff.DEFAULT + .withMaxRetries(Integer.MAX_VALUE) + .withInitialBackoff(Duration.standardSeconds(1)) + .withMaxBackoff(Duration.standardMinutes(1)) + .backoff()); + Sleeper sleeper = Sleeper.DEFAULT; + while (!pendingJobs.isEmpty()) { + List<JobInfo> retryJobs = Lists.newArrayList(); + for (JobInfo jobInfo : pendingJobs) { + if (jobInfo.pendingJob.pollJob()) { + // Job has completed successfully. + Exception e = jobInfo.onSuccess.apply(null); + if (e != null) { + throw e; + } + } else { + // Job failed, start it again. If it has hit the maximum number of retries then runJob + // will throw an exception. + jobInfo.pendingJob.runJob(); + retryJobs.add(jobInfo); Review comment: good catch. fixed ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 152059) Time Spent: 1h 40m (was: 1.5h) > Move load job poll to finishBundle() method to better parallelize execution > --------------------------------------------------------------------------- > > Key: BEAM-5105 > URL: https://issues.apache.org/jira/browse/BEAM-5105 > Project: Beam > Issue Type: Improvement > Components: io-java-gcp > Reporter: Chamikara Jayalath > Priority: Major > Time Spent: 1h 40m > Remaining Estimate: 0h > > It appears that when we write to BigQuery using WriteTablesDoFn we start a > load job and wait for that job to finish. > [https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java#L318] > > In cases where we are trying to write a PCollection of tables (for example, > when user use dynamic destinations feature) this relies on dynamic work > rebalancing to parallellize execution of load jobs. If the runner does not > support dynamic work rebalancing or does not execute dynamic work rebalancing > from some reason this could have significant performance drawbacks. For > example, scheduling times for load jobs will add up. > > A better approach might be to start load jobs at process() method but wait > for all load jobs to finish at finishBundle() method. This will parallelize > any overheads as well as job execution (assuming more than one job is > schedule by BQ.). > -- This message was sent by Atlassian JIRA (v7.6.3#76005)