[ 
https://issues.apache.org/jira/browse/BEAM-3433?focusedWorklogId=103015&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-103015
 ]

ASF GitHub Bot logged work on BEAM-3433:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 17/May/18 17:53
            Start Date: 17/May/18 17:53
    Worklog Time Spent: 10m 
      Work Description: jkff closed pull request #5178: [BEAM-3433] Allow a GCP 
project to be explicitly set for a load job
URL: https://github.com/apache/beam/pull/5178
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
index c3f3134523d..daebf47c5eb 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
@@ -28,6 +28,7 @@
 import com.google.common.collect.Lists;
 import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.IterableCoder;
@@ -73,6 +74,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 /** PTransform that uses BigQuery batch-load jobs to write a PCollection to 
BigQuery. */
 class BatchLoads<DestinationT>
     extends PTransform<PCollection<KV<DestinationT, TableRow>>, WriteResult> {
@@ -127,12 +129,14 @@
   private int numFileShards;
   private Duration triggeringFrequency;
   private ValueProvider<String> customGcsTempLocation;
+  private ValueProvider<String> loadJobProjectId;
 
   BatchLoads(WriteDisposition writeDisposition, CreateDisposition 
createDisposition,
              boolean singletonTable,
              DynamicDestinations<?, DestinationT> dynamicDestinations,
              Coder<DestinationT> destinationCoder,
-             ValueProvider<String> customGcsTempLocation) {
+             ValueProvider<String> customGcsTempLocation,
+             @Nullable ValueProvider<String> loadJobProjectId) {
     bigQueryServices = new BigQueryServicesImpl();
     this.writeDisposition = writeDisposition;
     this.createDisposition = createDisposition;
@@ -144,6 +148,7 @@
     this.numFileShards = DEFAULT_NUM_FILE_SHARDS;
     this.triggeringFrequency = null;
     this.customGcsTempLocation = customGcsTempLocation;
+    this.loadJobProjectId = loadJobProjectId;
   }
 
   void setTestServices(BigQueryServices bigQueryServices) {
@@ -507,7 +512,8 @@ public void processElement(ProcessContext c) {
                 WriteDisposition.WRITE_EMPTY,
                 CreateDisposition.CREATE_IF_NEEDED,
                 sideInputs,
-                dynamicDestinations));
+                dynamicDestinations,
+                loadJobProjectId));
   }
 
   // In the case where the files fit into a single load job, there's no need 
to write temporary
@@ -536,7 +542,8 @@ void writeSinglePartition(
                 writeDisposition,
                 createDisposition,
                 sideInputs,
-                dynamicDestinations));
+                dynamicDestinations,
+                loadJobProjectId));
   }
 
   private WriteResult writeResult(Pipeline p) {
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index e9eb3cb4fd3..e5aa287b2b7 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -1122,6 +1122,8 @@ static String getExtractDestinationUri(String 
extractDestinationDir) {
 
     abstract Method getMethod();
 
+    @Nullable abstract ValueProvider<String> getLoadJobProjectId();
+
     @Nullable abstract InsertRetryPolicy getFailedInsertRetryPolicy();
 
     @Nullable abstract ValueProvider<String> getCustomGcsTempLocation();
@@ -1151,6 +1153,7 @@ static String getExtractDestinationUri(String 
extractDestinationDir) {
       abstract Builder<T> setTriggeringFrequency(Duration triggeringFrequency);
 
       abstract Builder<T> setMethod(Method method);
+      abstract Builder<T> setLoadJobProjectId(ValueProvider<String> 
loadJobProjectId);
 
       abstract Builder<T> setFailedInsertRetryPolicy(InsertRetryPolicy 
retryPolicy);
 
@@ -1391,6 +1394,20 @@ static String getExtractDestinationUri(String 
extractDestinationDir) {
       return toBuilder().setMethod(method).build();
     }
 
+    /**
+     * Set the project the BigQuery load job will be initiated from. This is 
only applicable when
+     * the write method is set to {@link Method#FILE_LOADS}. If omitted, the 
project of the
+     * destination table is used.
+     */
+    public Write<T> withLoadJobProjectId(String loadJobProjectId) {
+      return withLoadJobProjectId(StaticValueProvider.of(loadJobProjectId));
+    }
+
+    public Write<T> withLoadJobProjectId(ValueProvider<String> 
loadJobProjectId) {
+      checkArgument(loadJobProjectId != null, "loadJobProjectId can not be 
null");
+      return toBuilder().setLoadJobProjectId(loadJobProjectId).build();
+    }
+
     /**
      * Choose the frequency at which file writes are triggered.
      *
@@ -1620,7 +1637,8 @@ public WriteResult expand(PCollection<T> input) {
             getJsonTableRef() != null,
             dynamicDestinations,
             destinationCoder,
-            getCustomGcsTempLocation());
+            getCustomGcsTempLocation(),
+            getLoadJobProjectId());
         batchLoads.setTestServices(getBigQueryServices());
         if (getMaxFilesPerBundle() != null) {
           batchLoads.setMaxNumWritersPerBundle(getMaxFilesPerBundle());
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
index cd128a1dfe5..01e3243d326 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
@@ -44,6 +44,7 @@
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
+import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -90,6 +91,7 @@
   private final List<PCollectionView<?>> sideInputs;
   private final TupleTag<KV<TableDestination, String>> mainOutputTag;
   private final TupleTag<String> temporaryFilesTag;
+  private final ValueProvider<String> loadJobProjectId;
 
 
   private class WriteTablesDoFn
@@ -187,7 +189,8 @@ public WriteTables(
       WriteDisposition writeDisposition,
       CreateDisposition createDisposition,
       List<PCollectionView<?>> sideInputs,
-      DynamicDestinations<?, DestinationT> dynamicDestinations) {
+      DynamicDestinations<?, DestinationT> dynamicDestinations,
+      @Nullable ValueProvider<String> loadJobProjectId) {
     this.singlePartition = singlePartition;
     this.bqServices = bqServices;
     this.loadJobIdPrefixView = loadJobIdPrefixView;
@@ -197,6 +200,7 @@ public WriteTables(
     this.dynamicDestinations = dynamicDestinations;
     this.mainOutputTag = new TupleTag<>("WriteTablesMainOutput");
     this.temporaryFilesTag = new TupleTag<>("TemporaryFiles");
+    this.loadJobProjectId = loadJobProjectId;
   }
 
   @Override
@@ -251,7 +255,7 @@ private void load(
     if (timePartitioning != null) {
       loadConfig.setTimePartitioning(timePartitioning);
     }
-    String projectId = ref.getProjectId();
+    String projectId = loadJobProjectId == null ? ref.getProjectId() : 
loadJobProjectId.get();
     Job lastFailedLoadJob = null;
     String bqLocation =
         BigQueryHelpers.getDatasetLocation(datasetService, ref.getProjectId(), 
ref.getDatasetId());
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
index 762a3bdd4db..bf47ab1a00d 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
@@ -1168,7 +1168,7 @@ public void testWriteTables() throws Exception {
             BigQueryIO.Write.WriteDisposition.WRITE_EMPTY,
             BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED,
             sideInputs,
-            new IdentityDynamicTables());
+            new IdentityDynamicTables(), null);
 
     PCollection<KV<TableDestination, String>> writeTablesOutput =
         writeTablesInput.apply(writeTables);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 103015)
    Time Spent: 1h 50m  (was: 1h 40m)

> Allow BigQueryIO to use a different project for the load job in batch mode.
> ---------------------------------------------------------------------------
>
>                 Key: BEAM-3433
>                 URL: https://issues.apache.org/jira/browse/BEAM-3433
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-gcp
>            Reporter: Kevin Peterson
>            Assignee: Chamikara Jayalath
>            Priority: Minor
>          Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> BigQueryIO is currently configured to always run a batch load job using the 
> same projects as the destination table: 
> https://github.com/apache/beam/blob/192b4c70927901860312f8c8acd27bd47e4a4259/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java#L256
> This may not always be desirable, since a pipeline may have write access to a 
> dataset in a different project, but not jobs.create access in that project. 
> This parameter should be settable in the interface.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to