[ https://issues.apache.org/jira/browse/BEAM-3433?focusedWorklogId=103015&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-103015 ]
ASF GitHub Bot logged work on BEAM-3433: ---------------------------------------- Author: ASF GitHub Bot Created on: 17/May/18 17:53 Start Date: 17/May/18 17:53 Worklog Time Spent: 10m Work Description: jkff closed pull request #5178: [BEAM-3433] Allow a GCP project to be explicitly set for a load job URL: https://github.com/apache/beam/pull/5178 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java index c3f3134523d..daebf47c5eb 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java @@ -28,6 +28,7 @@ import com.google.common.collect.Lists; import java.util.List; import java.util.concurrent.ThreadLocalRandom; +import javax.annotation.Nullable; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.IterableCoder; @@ -73,6 +74,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; + /** PTransform that uses BigQuery batch-load jobs to write a PCollection to BigQuery. */ class BatchLoads<DestinationT> extends PTransform<PCollection<KV<DestinationT, TableRow>>, WriteResult> { @@ -127,12 +129,14 @@ private int numFileShards; private Duration triggeringFrequency; private ValueProvider<String> customGcsTempLocation; + private ValueProvider<String> loadJobProjectId; BatchLoads(WriteDisposition writeDisposition, CreateDisposition createDisposition, boolean singletonTable, DynamicDestinations<?, DestinationT> dynamicDestinations, Coder<DestinationT> destinationCoder, - ValueProvider<String> customGcsTempLocation) { + ValueProvider<String> customGcsTempLocation, + @Nullable ValueProvider<String> loadJobProjectId) { bigQueryServices = new BigQueryServicesImpl(); this.writeDisposition = writeDisposition; this.createDisposition = createDisposition; @@ -144,6 +148,7 @@ this.numFileShards = DEFAULT_NUM_FILE_SHARDS; this.triggeringFrequency = null; this.customGcsTempLocation = customGcsTempLocation; + this.loadJobProjectId = loadJobProjectId; } void setTestServices(BigQueryServices bigQueryServices) { @@ -507,7 +512,8 @@ public void processElement(ProcessContext c) { WriteDisposition.WRITE_EMPTY, CreateDisposition.CREATE_IF_NEEDED, sideInputs, - dynamicDestinations)); + dynamicDestinations, + loadJobProjectId)); } // In the case where the files fit into a single load job, there's no need to write temporary @@ -536,7 +542,8 @@ void writeSinglePartition( writeDisposition, createDisposition, sideInputs, - dynamicDestinations)); + dynamicDestinations, + loadJobProjectId)); } private WriteResult writeResult(Pipeline p) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index e9eb3cb4fd3..e5aa287b2b7 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -1122,6 +1122,8 @@ static String getExtractDestinationUri(String extractDestinationDir) { abstract Method getMethod(); + @Nullable abstract ValueProvider<String> getLoadJobProjectId(); + @Nullable abstract InsertRetryPolicy getFailedInsertRetryPolicy(); @Nullable abstract ValueProvider<String> getCustomGcsTempLocation(); @@ -1151,6 +1153,7 @@ static String getExtractDestinationUri(String extractDestinationDir) { abstract Builder<T> setTriggeringFrequency(Duration triggeringFrequency); abstract Builder<T> setMethod(Method method); + abstract Builder<T> setLoadJobProjectId(ValueProvider<String> loadJobProjectId); abstract Builder<T> setFailedInsertRetryPolicy(InsertRetryPolicy retryPolicy); @@ -1391,6 +1394,20 @@ static String getExtractDestinationUri(String extractDestinationDir) { return toBuilder().setMethod(method).build(); } + /** + * Set the project the BigQuery load job will be initiated from. This is only applicable when + * the write method is set to {@link Method#FILE_LOADS}. If omitted, the project of the + * destination table is used. + */ + public Write<T> withLoadJobProjectId(String loadJobProjectId) { + return withLoadJobProjectId(StaticValueProvider.of(loadJobProjectId)); + } + + public Write<T> withLoadJobProjectId(ValueProvider<String> loadJobProjectId) { + checkArgument(loadJobProjectId != null, "loadJobProjectId can not be null"); + return toBuilder().setLoadJobProjectId(loadJobProjectId).build(); + } + /** * Choose the frequency at which file writes are triggered. * @@ -1620,7 +1637,8 @@ public WriteResult expand(PCollection<T> input) { getJsonTableRef() != null, dynamicDestinations, destinationCoder, - getCustomGcsTempLocation()); + getCustomGcsTempLocation(), + getLoadJobProjectId()); batchLoads.setTestServices(getBigQueryServices()); if (getMaxFilesPerBundle() != null) { batchLoads.setMaxNumWritersPerBundle(getMaxFilesPerBundle()); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java index cd128a1dfe5..01e3243d326 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java @@ -44,6 +44,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService; +import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; @@ -90,6 +91,7 @@ private final List<PCollectionView<?>> sideInputs; private final TupleTag<KV<TableDestination, String>> mainOutputTag; private final TupleTag<String> temporaryFilesTag; + private final ValueProvider<String> loadJobProjectId; private class WriteTablesDoFn @@ -187,7 +189,8 @@ public WriteTables( WriteDisposition writeDisposition, CreateDisposition createDisposition, List<PCollectionView<?>> sideInputs, - DynamicDestinations<?, DestinationT> dynamicDestinations) { + DynamicDestinations<?, DestinationT> dynamicDestinations, + @Nullable ValueProvider<String> loadJobProjectId) { this.singlePartition = singlePartition; this.bqServices = bqServices; this.loadJobIdPrefixView = loadJobIdPrefixView; @@ -197,6 +200,7 @@ public WriteTables( this.dynamicDestinations = dynamicDestinations; this.mainOutputTag = new TupleTag<>("WriteTablesMainOutput"); this.temporaryFilesTag = new TupleTag<>("TemporaryFiles"); + this.loadJobProjectId = loadJobProjectId; } @Override @@ -251,7 +255,7 @@ private void load( if (timePartitioning != null) { loadConfig.setTimePartitioning(timePartitioning); } - String projectId = ref.getProjectId(); + String projectId = loadJobProjectId == null ? ref.getProjectId() : loadJobProjectId.get(); Job lastFailedLoadJob = null; String bqLocation = BigQueryHelpers.getDatasetLocation(datasetService, ref.getProjectId(), ref.getDatasetId()); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java index 762a3bdd4db..bf47ab1a00d 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java @@ -1168,7 +1168,7 @@ public void testWriteTables() throws Exception { BigQueryIO.Write.WriteDisposition.WRITE_EMPTY, BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED, sideInputs, - new IdentityDynamicTables()); + new IdentityDynamicTables(), null); PCollection<KV<TableDestination, String>> writeTablesOutput = writeTablesInput.apply(writeTables); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 103015) Time Spent: 1h 50m (was: 1h 40m) > Allow BigQueryIO to use a different project for the load job in batch mode. > --------------------------------------------------------------------------- > > Key: BEAM-3433 > URL: https://issues.apache.org/jira/browse/BEAM-3433 > Project: Beam > Issue Type: Bug > Components: io-java-gcp > Reporter: Kevin Peterson > Assignee: Chamikara Jayalath > Priority: Minor > Time Spent: 1h 50m > Remaining Estimate: 0h > > BigQueryIO is currently configured to always run a batch load job using the > same projects as the destination table: > https://github.com/apache/beam/blob/192b4c70927901860312f8c8acd27bd47e4a4259/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java#L256 > This may not always be desirable, since a pipeline may have write access to a > dataset in a different project, but not jobs.create access in that project. > This parameter should be settable in the interface. -- This message was sent by Atlassian JIRA (v7.6.3#76005)