[ 
https://issues.apache.org/jira/browse/BEAM-4835?focusedWorklogId=132781&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-132781
 ]

ASF GitHub Bot logged work on BEAM-4835:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 08/Aug/18 23:54
            Start Date: 08/Aug/18 23:54
    Worklog Time Spent: 10m 
      Work Description: lukecwik closed pull request #6008: [BEAM-4835] Adding 
in additional options for BigQueryIO insert statements
URL: https://github.com/apache/beam/pull/6008
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
index 16c89e7f781..aebe0bc8e55 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
@@ -116,6 +116,7 @@
   private BigQueryServices bigQueryServices;
   private final WriteDisposition writeDisposition;
   private final CreateDisposition createDisposition;
+  private final boolean ignoreUnknownValues;
   // Indicates that we are writing to a constant single table. If this is the 
case, we will create
   // the table, even if there is no data in it.
   private final boolean singletonTable;
@@ -138,7 +139,8 @@
       DynamicDestinations<?, DestinationT> dynamicDestinations,
       Coder<DestinationT> destinationCoder,
       ValueProvider<String> customGcsTempLocation,
-      @Nullable ValueProvider<String> loadJobProjectId) {
+      @Nullable ValueProvider<String> loadJobProjectId,
+      boolean ignoreUnknownValues) {
     bigQueryServices = new BigQueryServicesImpl();
     this.writeDisposition = writeDisposition;
     this.createDisposition = createDisposition;
@@ -151,6 +153,7 @@
     this.triggeringFrequency = null;
     this.customGcsTempLocation = customGcsTempLocation;
     this.loadJobProjectId = loadJobProjectId;
+    this.ignoreUnknownValues = ignoreUnknownValues;
   }
 
   void setTestServices(BigQueryServices bigQueryServices) {
@@ -532,7 +535,8 @@ public void processElement(ProcessContext c) {
                 sideInputs,
                 dynamicDestinations,
                 loadJobProjectId,
-                maxRetryJobs));
+                maxRetryJobs,
+                ignoreUnknownValues));
   }
 
   // In the case where the files fit into a single load job, there's no need 
to write temporary
@@ -563,7 +567,8 @@ void writeSinglePartition(
                 sideInputs,
                 dynamicDestinations,
                 loadJobProjectId,
-                maxRetryJobs));
+                maxRetryJobs,
+                ignoreUnknownValues));
   }
 
   private WriteResult writeResult(Pipeline p) {
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 9b1515f9bd6..564e80b494f 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -1053,6 +1053,8 @@ static String getExtractDestinationUri(String 
extractDestinationDir) {
         .setNumFileShards(0)
         .setMethod(Write.Method.DEFAULT)
         .setExtendedErrorInfo(false)
+        .setSkipInvalidRows(false)
+        .setIgnoreUnknownValues(false)
         .build();
   }
 
@@ -1161,6 +1163,10 @@ static String getExtractDestinationUri(String 
extractDestinationDir) {
 
     abstract boolean getExtendedErrorInfo();
 
+    abstract Boolean getSkipInvalidRows();
+
+    abstract Boolean getIgnoreUnknownValues();
+
     abstract Builder<T> toBuilder();
 
     @AutoValue.Builder
@@ -1208,6 +1214,10 @@ static String getExtractDestinationUri(String 
extractDestinationDir) {
 
       abstract Builder<T> setExtendedErrorInfo(boolean extendedErrorInfo);
 
+      abstract Builder<T> setSkipInvalidRows(Boolean skipInvalidRows);
+
+      abstract Builder<T> setIgnoreUnknownValues(Boolean ignoreUnknownValues);
+
       abstract Write<T> build();
     }
 
@@ -1497,6 +1507,23 @@ static String getExtractDestinationUri(String 
extractDestinationDir) {
       return toBuilder().setExtendedErrorInfo(true).build();
     }
 
+    /**
+     * Insert all valid rows of a request, even if invalid rows exist. This is 
only applicable when
+     * the write method is set to {@link Method#STREAMING_INSERTS}. The 
default value is false,
+     * which causes the entire request to fail if any invalid rows exist.
+     */
+    public Write<T> skipInvalidRows() {
+      return toBuilder().setSkipInvalidRows(true).build();
+    }
+
+    /**
+     * Accept rows that contain values that do not match the schema. The 
unknown values are ignored.
+     * Default is false, which treats unknown values as errors.
+     */
+    public Write<T> ignoreUnknownValues() {
+      return toBuilder().setIgnoreUnknownValues(true).build();
+    }
+
     @VisibleForTesting
     /** This method is for test usage only */
     public Write<T> withTestServices(BigQueryServices testServices) {
@@ -1682,7 +1709,9 @@ public WriteResult expand(PCollection<T> input) {
             new StreamingInserts<>(getCreateDisposition(), dynamicDestinations)
                 .withInsertRetryPolicy(retryPolicy)
                 .withTestServices((getBigQueryServices()))
-                .withExtendedErrorInfo(getExtendedErrorInfo());
+                .withExtendedErrorInfo(getExtendedErrorInfo())
+                .withSkipInvalidRows(getSkipInvalidRows())
+                .withIgnoreUnknownValues(getIgnoreUnknownValues());
         return rowsWithDestination.apply(streamingInserts);
       } else {
         checkArgument(
@@ -1697,7 +1726,8 @@ public WriteResult expand(PCollection<T> input) {
                 dynamicDestinations,
                 destinationCoder,
                 getCustomGcsTempLocation(),
-                getLoadJobProjectId());
+                getLoadJobProjectId(),
+                getIgnoreUnknownValues());
         batchLoads.setTestServices(getBigQueryServices());
         if (getMaxFilesPerBundle() != null) {
           batchLoads.setMaxNumWritersPerBundle(getMaxFilesPerBundle());
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
index 12bbd359a9f..85df41adc63 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
@@ -141,7 +141,9 @@ void createDataset(
         @Nullable List<String> insertIdList,
         InsertRetryPolicy retryPolicy,
         List<ValueInSingleWindow<T>> failedInserts,
-        ErrorContainer<T> errorContainer)
+        ErrorContainer<T> errorContainer,
+        boolean skipInvalidRows,
+        boolean ignoreUnknownValues)
         throws IOException, InterruptedException;
 
     /** Patch BigQuery {@link Table} description. */
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index fd4b4396330..809b80a302c 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -671,7 +671,9 @@ public void deleteDataset(String projectId, String 
datasetId)
         final Sleeper sleeper,
         InsertRetryPolicy retryPolicy,
         List<ValueInSingleWindow<T>> failedInserts,
-        ErrorContainer<T> errorContainer)
+        ErrorContainer<T> errorContainer,
+        boolean skipInvalidRows,
+        boolean ignoreUnkownValues)
         throws IOException, InterruptedException {
       checkNotNull(ref, "ref");
       if (executor == null) {
@@ -716,6 +718,8 @@ public void deleteDataset(String projectId, String 
datasetId)
               || i == rowsToPublish.size() - 1) {
             TableDataInsertAllRequest content = new 
TableDataInsertAllRequest();
             content.setRows(rows);
+            content.setSkipInvalidRows(skipInvalidRows);
+            content.setIgnoreUnknownValues(ignoreUnkownValues);
 
             final Bigquery.Tabledata.InsertAll insert =
                 client
@@ -818,7 +822,9 @@ public void deleteDataset(String projectId, String 
datasetId)
         @Nullable List<String> insertIdList,
         InsertRetryPolicy retryPolicy,
         List<ValueInSingleWindow<T>> failedInserts,
-        ErrorContainer<T> errorContainer)
+        ErrorContainer<T> errorContainer,
+        boolean skipInvalidRows,
+        boolean ignoreUnknownValues)
         throws IOException, InterruptedException {
       return insertAll(
           ref,
@@ -828,7 +834,9 @@ public void deleteDataset(String projectId, String 
datasetId)
           Sleeper.DEFAULT,
           retryPolicy,
           failedInserts,
-          errorContainer);
+          errorContainer,
+          skipInvalidRows,
+          ignoreUnknownValues);
     }
 
     @Override
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
index b3658c524a8..438611d4d7a 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
@@ -35,6 +35,8 @@
   private final DynamicDestinations<?, DestinationT> dynamicDestinations;
   private InsertRetryPolicy retryPolicy;
   private boolean extendedErrorInfo;
+  private final boolean skipInvalidRows;
+  private final boolean ignoreUnknownValues;
 
   /** Constructor. */
   public StreamingInserts(
@@ -45,6 +47,8 @@ public StreamingInserts(
         dynamicDestinations,
         new BigQueryServicesImpl(),
         InsertRetryPolicy.alwaysRetry(),
+        false,
+        false,
         false);
   }
 
@@ -54,29 +58,73 @@ private StreamingInserts(
       DynamicDestinations<?, DestinationT> dynamicDestinations,
       BigQueryServices bigQueryServices,
       InsertRetryPolicy retryPolicy,
-      boolean extendedErrorInfo) {
+      boolean extendedErrorInfo,
+      boolean skipInvalidRows,
+      boolean ignoreUnknownValues) {
     this.createDisposition = createDisposition;
     this.dynamicDestinations = dynamicDestinations;
     this.bigQueryServices = bigQueryServices;
     this.retryPolicy = retryPolicy;
     this.extendedErrorInfo = extendedErrorInfo;
+    this.skipInvalidRows = skipInvalidRows;
+    this.ignoreUnknownValues = ignoreUnknownValues;
   }
 
   /** Specify a retry policy for failed inserts. */
   public StreamingInserts<DestinationT> 
withInsertRetryPolicy(InsertRetryPolicy retryPolicy) {
     return new StreamingInserts<>(
-        createDisposition, dynamicDestinations, bigQueryServices, retryPolicy, 
extendedErrorInfo);
+        createDisposition,
+        dynamicDestinations,
+        bigQueryServices,
+        retryPolicy,
+        extendedErrorInfo,
+        skipInvalidRows,
+        ignoreUnknownValues);
   }
 
   /** Specify whether to use extended error info or not. */
   public StreamingInserts<DestinationT> withExtendedErrorInfo(boolean 
extendedErrorInfo) {
     return new StreamingInserts<>(
-        createDisposition, dynamicDestinations, bigQueryServices, retryPolicy, 
extendedErrorInfo);
+        createDisposition,
+        dynamicDestinations,
+        bigQueryServices,
+        retryPolicy,
+        extendedErrorInfo,
+        skipInvalidRows,
+        ignoreUnknownValues);
+  }
+
+  StreamingInserts<DestinationT> withSkipInvalidRows(boolean skipInvalidRows) {
+    return new StreamingInserts<>(
+        createDisposition,
+        dynamicDestinations,
+        bigQueryServices,
+        retryPolicy,
+        extendedErrorInfo,
+        skipInvalidRows,
+        ignoreUnknownValues);
+  }
+
+  StreamingInserts<DestinationT> withIgnoreUnknownValues(boolean 
ignoreUnknownValues) {
+    return new StreamingInserts<>(
+        createDisposition,
+        dynamicDestinations,
+        bigQueryServices,
+        retryPolicy,
+        extendedErrorInfo,
+        skipInvalidRows,
+        ignoreUnknownValues);
   }
 
   StreamingInserts<DestinationT> withTestServices(BigQueryServices 
bigQueryServices) {
     return new StreamingInserts<>(
-        createDisposition, dynamicDestinations, bigQueryServices, retryPolicy, 
extendedErrorInfo);
+        createDisposition,
+        dynamicDestinations,
+        bigQueryServices,
+        retryPolicy,
+        extendedErrorInfo,
+        skipInvalidRows,
+        ignoreUnknownValues);
   }
 
   @Override
@@ -91,6 +139,8 @@ public WriteResult expand(PCollection<KV<DestinationT, 
TableRow>> input) {
         new StreamingWriteTables()
             .withTestServices(bigQueryServices)
             .withInsertRetryPolicy(retryPolicy)
-            .withExtendedErrorInfo(extendedErrorInfo));
+            .withExtendedErrorInfo(extendedErrorInfo)
+            .withSkipInvalidRows(skipInvalidRows)
+            .withIgnoreUnknownValues(ignoreUnknownValues));
   }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
index 2af900e86b1..33a1e4327b2 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
@@ -44,6 +44,8 @@
   private final InsertRetryPolicy retryPolicy;
   private final TupleTag<ErrorT> failedOutputTag;
   private final ErrorContainer<ErrorT> errorContainer;
+  private final boolean skipInvalidRows;
+  private final boolean ignoreUnknownValues;
 
   /** JsonTableRows to accumulate BigQuery rows in order to batch writes. */
   private transient Map<String, List<ValueInSingleWindow<TableRow>>> tableRows;
@@ -58,11 +60,15 @@
       BigQueryServices bqServices,
       InsertRetryPolicy retryPolicy,
       TupleTag<ErrorT> failedOutputTag,
-      ErrorContainer<ErrorT> errorContainer) {
+      ErrorContainer<ErrorT> errorContainer,
+      boolean skipInvalidRows,
+      boolean ignoreUnknownValues) {
     this.bqServices = bqServices;
     this.retryPolicy = retryPolicy;
     this.failedOutputTag = failedOutputTag;
     this.errorContainer = errorContainer;
+    this.skipInvalidRows = skipInvalidRows;
+    this.ignoreUnknownValues = ignoreUnknownValues;
   }
 
   /** Prepares a target BigQuery table. */
@@ -128,7 +134,9 @@ private void flushRows(
                     uniqueIds,
                     retryPolicy,
                     failedInserts,
-                    errorContainer);
+                    errorContainer,
+                    skipInvalidRows,
+                    ignoreUnknownValues);
         byteCounter.inc(totalBytes);
       } catch (IOException e) {
         throw new RuntimeException(e);
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
index 8767f97b7e6..a812385c2e6 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
@@ -49,28 +49,49 @@
   private InsertRetryPolicy retryPolicy;
   private boolean extendedErrorInfo;
   private static final String FAILED_INSERTS_TAG_ID = "failedInserts";
+  private final boolean skipInvalidRows;
+  private final boolean ignoreUnknownValues;
 
   public StreamingWriteTables() {
-    this(new BigQueryServicesImpl(), InsertRetryPolicy.alwaysRetry(), false);
+    this(new BigQueryServicesImpl(), InsertRetryPolicy.alwaysRetry(), false, 
false, false);
   }
 
   private StreamingWriteTables(
-      BigQueryServices bigQueryServices, InsertRetryPolicy retryPolicy, 
boolean extendedErrorInfo) {
+      BigQueryServices bigQueryServices,
+      InsertRetryPolicy retryPolicy,
+      boolean extendedErrorInfo,
+      boolean skipInvalidRows,
+      boolean ignoreUnknownValues) {
     this.bigQueryServices = bigQueryServices;
     this.retryPolicy = retryPolicy;
     this.extendedErrorInfo = extendedErrorInfo;
+    this.skipInvalidRows = skipInvalidRows;
+    this.ignoreUnknownValues = ignoreUnknownValues;
   }
 
   StreamingWriteTables withTestServices(BigQueryServices bigQueryServices) {
-    return new StreamingWriteTables(bigQueryServices, retryPolicy, 
extendedErrorInfo);
+    return new StreamingWriteTables(
+        bigQueryServices, retryPolicy, extendedErrorInfo, skipInvalidRows, 
ignoreUnknownValues);
   }
 
   StreamingWriteTables withInsertRetryPolicy(InsertRetryPolicy retryPolicy) {
-    return new StreamingWriteTables(bigQueryServices, retryPolicy, 
extendedErrorInfo);
+    return new StreamingWriteTables(
+        bigQueryServices, retryPolicy, extendedErrorInfo, skipInvalidRows, 
ignoreUnknownValues);
   }
 
   StreamingWriteTables withExtendedErrorInfo(boolean extendedErrorInfo) {
-    return new StreamingWriteTables(bigQueryServices, retryPolicy, 
extendedErrorInfo);
+    return new StreamingWriteTables(
+        bigQueryServices, retryPolicy, extendedErrorInfo, skipInvalidRows, 
ignoreUnknownValues);
+  }
+
+  StreamingWriteTables withSkipInvalidRows(boolean skipInvalidRows) {
+    return new StreamingWriteTables(
+        bigQueryServices, retryPolicy, extendedErrorInfo, skipInvalidRows, 
ignoreUnknownValues);
+  }
+
+  StreamingWriteTables withIgnoreUnknownValues(boolean ignoreUnknownValues) {
+    return new StreamingWriteTables(
+        bigQueryServices, retryPolicy, extendedErrorInfo, skipInvalidRows, 
ignoreUnknownValues);
   }
 
   @Override
@@ -140,7 +161,12 @@ public WriteResult expand(PCollection<KV<TableDestination, 
TableRow>> input) {
                 "StreamingWrite",
                 ParDo.of(
                         new StreamingWriteFn<>(
-                            bigQueryServices, retryPolicy, failedInsertsTag, 
errorContainer))
+                            bigQueryServices,
+                            retryPolicy,
+                            failedInsertsTag,
+                            errorContainer,
+                            skipInvalidRows,
+                            ignoreUnknownValues))
                     .withOutputTags(mainOutputTag, 
TupleTagList.of(failedInsertsTag)));
     PCollection<T> failedInserts = tuple.get(failedInsertsTag);
     failedInserts.setCoder(coder);
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
index b6ad1378dbf..a509fa2eda7 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
@@ -102,6 +102,7 @@
   private final TupleTag<String> temporaryFilesTag;
   private final ValueProvider<String> loadJobProjectId;
   private final int maxRetryJobs;
+  private final boolean ignoreUnknownValues;
 
   private class WriteTablesDoFn
       extends DoFn<KV<ShardedKey<DestinationT>, List<String>>, 
KV<TableDestination, String>> {
@@ -200,7 +201,8 @@ public WriteTables(
       List<PCollectionView<?>> sideInputs,
       DynamicDestinations<?, DestinationT> dynamicDestinations,
       @Nullable ValueProvider<String> loadJobProjectId,
-      int maxRetryJobs) {
+      int maxRetryJobs,
+      boolean ignoreUnknownValues) {
     this.singlePartition = singlePartition;
     this.bqServices = bqServices;
     this.loadJobIdPrefixView = loadJobIdPrefixView;
@@ -212,6 +214,7 @@ public WriteTables(
     this.temporaryFilesTag = new TupleTag<>("TemporaryFiles");
     this.loadJobProjectId = loadJobProjectId;
     this.maxRetryJobs = maxRetryJobs;
+    this.ignoreUnknownValues = ignoreUnknownValues;
   }
 
   @Override
@@ -264,7 +267,8 @@ private void load(
             .setSourceUris(gcsUris)
             .setWriteDisposition(writeDisposition.name())
             .setCreateDisposition(createDisposition.name())
-            .setSourceFormat("NEWLINE_DELIMITED_JSON");
+            .setSourceFormat("NEWLINE_DELIMITED_JSON")
+            .setIgnoreUnknownValues(ignoreUnknownValues);
     if (timePartitioning != null) {
       loadConfig.setTimePartitioning(timePartitioning);
     }
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
index b4d0a6b14f1..971fed8d52f 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
@@ -1193,7 +1193,8 @@ public void testWriteTables() throws Exception {
             sideInputs,
             new IdentityDynamicTables(),
             null,
-            4);
+            4,
+            false);
 
     PCollection<KV<TableDestination, String>> writeTablesOutput =
         writeTablesInput.apply(writeTables);
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
index 731c8bf707c..9f20e5087e9 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
@@ -25,6 +25,7 @@
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -49,6 +50,7 @@
 import com.google.api.services.bigquery.model.JobReference;
 import com.google.api.services.bigquery.model.JobStatus;
 import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableDataInsertAllRequest;
 import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
 import 
com.google.api.services.bigquery.model.TableDataInsertAllResponse.InsertErrors;
 import com.google.api.services.bigquery.model.TableDataList;
@@ -93,23 +95,25 @@
   @Rule public ExpectedException thrown = ExpectedException.none();
   @Rule public ExpectedLogs expectedLogs = 
ExpectedLogs.none(BigQueryServicesImpl.class);
   @Mock private LowLevelHttpResponse response;
+  private MockLowLevelHttpRequest request;
   private Bigquery bigquery;
 
   @Before
   public void setUp() {
     MockitoAnnotations.initMocks(this);
 
+    // Set up the MockHttpRequest for future inspection
+    request =
+        new MockLowLevelHttpRequest() {
+          @Override
+          public LowLevelHttpResponse execute() throws IOException {
+            return response;
+          }
+        };
+
     // A mock transport that lets us mock the API responses.
     MockHttpTransport transport =
-        new MockHttpTransport.Builder()
-            .setLowLevelHttpRequest(
-                new MockLowLevelHttpRequest() {
-                  @Override
-                  public LowLevelHttpResponse execute() throws IOException {
-                    return response;
-                  }
-                })
-            .build();
+        new 
MockHttpTransport.Builder().setLowLevelHttpRequest(request).build();
 
     // A sample BigQuery API client that uses default JsonFactory and 
RetryHttpInitializer.
     bigquery =
@@ -511,7 +515,9 @@ public void testInsertRetry() throws Exception {
         new MockSleeper(),
         InsertRetryPolicy.alwaysRetry(),
         null,
-        null);
+        null,
+        false,
+        false);
     verify(response, times(2)).getStatusCode();
     verify(response, times(2)).getContent();
     verify(response, times(2)).getContentType();
@@ -558,7 +564,9 @@ public void testInsertRetrySelectRows() throws Exception {
         new MockSleeper(),
         InsertRetryPolicy.alwaysRetry(),
         null,
-        null);
+        null,
+        false,
+        false);
     verify(response, times(2)).getStatusCode();
     verify(response, times(2)).getContent();
     verify(response, times(2)).getContentType();
@@ -601,7 +609,9 @@ public void testInsertFailsGracefully() throws Exception {
           new MockSleeper(),
           InsertRetryPolicy.alwaysRetry(),
           null,
-          null);
+          null,
+          false,
+          false);
       fail();
     } catch (IOException e) {
       assertThat(e, instanceOf(IOException.class));
@@ -647,7 +657,9 @@ public void testInsertDoesNotRetry() throws Throwable {
           new MockSleeper(),
           InsertRetryPolicy.alwaysRetry(),
           null,
-          null);
+          null,
+          false,
+          false);
       fail();
     } catch (RuntimeException e) {
       verify(response, times(1)).getStatusCode();
@@ -716,11 +728,81 @@ public void testInsertRetryPolicy() throws 
InterruptedException, IOException {
         new MockSleeper(),
         InsertRetryPolicy.retryTransientErrors(),
         failedInserts,
-        ErrorContainer.TABLE_ROW_ERROR_CONTAINER);
+        ErrorContainer.TABLE_ROW_ERROR_CONTAINER,
+        false,
+        false);
     assertEquals(1, failedInserts.size());
     expectedLogs.verifyInfo("Retrying 1 failed inserts to BigQuery");
   }
 
+  /**
+   * Tests that {@link DatasetServiceImpl#insertAll} respects the 
skipInvalidRows and
+   * ignoreUnknownValues parameters.
+   */
+  @Test
+  public void testSkipInvalidRowsIgnoreUnknownValuesStreaming()
+      throws InterruptedException, IOException {
+    TableReference ref =
+        new 
TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
+    List<ValueInSingleWindow<TableRow>> rows =
+        ImmutableList.of(wrapValue(new TableRow()), wrapValue(new TableRow()));
+
+    final TableDataInsertAllResponse allRowsSucceeded = new 
TableDataInsertAllResponse();
+
+    // Return a 200 response each time
+    when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+    when(response.getStatusCode()).thenReturn(200);
+    when(response.getContent())
+        .thenReturn(toStream(allRowsSucceeded))
+        .thenReturn(toStream(allRowsSucceeded));
+
+    DatasetServiceImpl dataService =
+        new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());
+
+    // First, test with both flags disabled
+    dataService.insertAll(
+        ref,
+        rows,
+        null,
+        BackOffAdapter.toGcpBackOff(TEST_BACKOFF.backoff()),
+        new MockSleeper(),
+        InsertRetryPolicy.neverRetry(),
+        Lists.newArrayList(),
+        ErrorContainer.TABLE_ROW_ERROR_CONTAINER,
+        false,
+        false);
+
+    TableDataInsertAllRequest parsedRequest =
+        fromString(request.getContentAsString(), 
TableDataInsertAllRequest.class);
+
+    assertFalse(parsedRequest.getSkipInvalidRows());
+    assertFalse(parsedRequest.getIgnoreUnknownValues());
+
+    // Then with both enabled
+    dataService.insertAll(
+        ref,
+        rows,
+        null,
+        BackOffAdapter.toGcpBackOff(TEST_BACKOFF.backoff()),
+        new MockSleeper(),
+        InsertRetryPolicy.neverRetry(),
+        Lists.newArrayList(),
+        ErrorContainer.TABLE_ROW_ERROR_CONTAINER,
+        true,
+        true);
+
+    parsedRequest = fromString(request.getContentAsString(), 
TableDataInsertAllRequest.class);
+
+    assertTrue(parsedRequest.getSkipInvalidRows());
+    assertTrue(parsedRequest.getIgnoreUnknownValues());
+  }
+
+  /** A helper to convert a string response back to a {@link GenericJson} 
subclass. */
+  private static <T extends GenericJson> T fromString(String content, Class<T> 
clazz)
+      throws IOException {
+    return JacksonFactory.getDefaultInstance().fromString(content, clazz);
+  }
+
   /** A helper to wrap a {@link GenericJson} object in a content stream. */
   private static InputStream toStream(GenericJson content) throws IOException {
     return new 
ByteArrayInputStream(JacksonFactory.getDefaultInstance().toByteArray(content));
@@ -887,7 +969,9 @@ public void testSimpleErrorRetrieval() throws 
InterruptedException, IOException
         new MockSleeper(),
         InsertRetryPolicy.neverRetry(),
         failedInserts,
-        ErrorContainer.TABLE_ROW_ERROR_CONTAINER);
+        ErrorContainer.TABLE_ROW_ERROR_CONTAINER,
+        false,
+        false);
 
     assertThat(failedInserts, is(rows));
   }
@@ -939,7 +1023,9 @@ public void testExtendedErrorRetrieval() throws 
InterruptedException, IOExceptio
         new MockSleeper(),
         InsertRetryPolicy.neverRetry(),
         failedInserts,
-        ErrorContainer.BIG_QUERY_INSERT_ERROR_ERROR_CONTAINER);
+        ErrorContainer.BIG_QUERY_INSERT_ERROR_ERROR_CONTAINER,
+        false,
+        false);
 
     assertThat(failedInserts, is(expected));
   }
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
index 48c92064d01..12adc2e6b44 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
@@ -204,7 +204,8 @@ public void testInsertAll() throws Exception {
     long totalBytes = 0;
     try {
       totalBytes =
-          datasetService.insertAll(ref, rows, ids, 
InsertRetryPolicy.alwaysRetry(), null, null);
+          datasetService.insertAll(
+              ref, rows, ids, InsertRetryPolicy.alwaysRetry(), null, null, 
false, false);
     } finally {
       verifyInsertAll(5);
       // Each of the 25 rows is 23 bytes: "{f=[{v=foo}, {v=1234}]}"
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
index 9d0692a26e7..f8b241b0ade 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
@@ -204,7 +204,8 @@ public long insertAll(
               GlobalWindow.INSTANCE,
               PaneInfo.ON_TIME_AND_ONLY_FIRING));
     }
-    return insertAll(ref, windowedRows, insertIdList, 
InsertRetryPolicy.alwaysRetry(), null, null);
+    return insertAll(
+        ref, windowedRows, insertIdList, InsertRetryPolicy.alwaysRetry(), 
null, null, false, false);
   }
 
   @Override
@@ -214,7 +215,9 @@ public long insertAll(
       @Nullable List<String> insertIdList,
       InsertRetryPolicy retryPolicy,
       List<ValueInSingleWindow<T>> failedInserts,
-      ErrorContainer<T> errorContainer)
+      ErrorContainer<T> errorContainer,
+      boolean skipInvalidRows,
+      boolean ignoreUnknownValues)
       throws IOException, InterruptedException {
     Map<TableRow, List<TableDataInsertAllResponse.InsertErrors>> insertErrors 
= getInsertErrors();
     synchronized (tables) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 132781)
    Time Spent: 50m  (was: 40m)

> Add more flexible options for data loading to BigQueryIO.Write
> --------------------------------------------------------------
>
>                 Key: BEAM-4835
>                 URL: https://issues.apache.org/jira/browse/BEAM-4835
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-gcp
>            Reporter: Gene Peters
>            Assignee: Chamikara Jayalath
>            Priority: Minor
>              Labels: pull-request-available
>          Time Spent: 50m
>  Remaining Estimate: 0h
>
> As part of the BigQuery API, there are a few options exposed to end-users 
> which allow for more flexible data loading.
> For both 
> [streaming|https://developers.google.com/resources/api-libraries/documentation/bigquery/v2/java/latest/com/google/api/services/bigquery/model/TableDataInsertAllRequest.html#setIgnoreUnknownValues-java.lang.Boolean-]
>  and 
> [batch|https://developers.google.com/resources/api-libraries/documentation/bigquery/v2/java/latest/com/google/api/services/bigquery/model/JobConfigurationLoad.html#setIgnoreUnknownValues-java.lang.Boolean-]
>  inserts, the flag "ignoreUnknownValues" can be set, which indicates if 
> BigQuery should accept rows that contain values that do not match the schema. 
> [In 
> addition,|https://developers.google.com/resources/api-libraries/documentation/bigquery/v2/java/latest/com/google/api/services/bigquery/model/TableDataInsertAllRequest.html#setSkipInvalidRows-java.lang.Boolean-]
>  streaming inserts allow for the option of accepting an inserted batch of 
> rows even if some of of the rows are invalid.
> I've made the necessary code changes to make this available within 
> BigQueryIO.Write and will be attaching the pull request to this ticket for 
> review. Both flags are off by default.
> Let me know if you have any questions or feedback about this!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to