[ 
https://issues.apache.org/jira/browse/BEAM-5514?focusedWorklogId=173882&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-173882
 ]

ASF GitHub Bot logged work on BEAM-5514:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 11/Dec/18 03:18
            Start Date: 11/Dec/18 03:18
    Worklog Time Spent: 10m 
      Work Description: chamikaramj closed pull request #7189: [BEAM-5514] 
BigQueryIO doesn't handle quotaExceeded errors properly
URL: https://github.com/apache/beam/pull/7189
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index cbdb44f8ade3..f147634f9cab 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -736,16 +736,15 @@ public void deleteDataset(String projectId, String 
datasetId)
                         try {
                           return insert.execute().getInsertErrors();
                         } catch (IOException e) {
-                          if (new ApiErrorExtractor().rateLimited(e)) {
-                            LOG.info("BigQuery insertAll exceeded rate limit, 
retrying");
-                            try {
-                              sleeper.sleep(backoff1.nextBackOffMillis());
-                            } catch (InterruptedException interrupted) {
-                              throw new IOException(
-                                  "Interrupted while waiting before retrying 
insertAll");
-                            }
-                          } else {
-                            throw e;
+                          LOG.info(
+                              String.format(
+                                  "BigQuery insertAll error, retrying: %s",
+                                  
ApiErrorExtractor.INSTANCE.getErrorMessage(e)));
+                          try {
+                            sleeper.sleep(backoff1.nextBackOffMillis());
+                          } catch (InterruptedException interrupted) {
+                            throw new IOException(
+                                "Interrupted while waiting before retrying 
insertAll");
                           }
                         }
                       }
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
index 9f20e5087e92..48fd5b9eea41 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
@@ -490,9 +490,9 @@ public void testExecuteWithRetries() throws IOException, 
InterruptedException {
         PaneInfo.ON_TIME_AND_ONLY_FIRING);
   }
 
-  /** Tests that {@link DatasetServiceImpl#insertAll} retries quota rate 
limited attempts. */
+  /** Tests that {@link DatasetServiceImpl#insertAll} retries rate limited 
attempts. */
   @Test
-  public void testInsertRetry() throws Exception {
+  public void testInsertRateLimitRetry() throws Exception {
     TableReference ref =
         new 
TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
     List<ValueInSingleWindow<TableRow>> rows = new ArrayList<>();
@@ -521,8 +521,43 @@ public void testInsertRetry() throws Exception {
     verify(response, times(2)).getStatusCode();
     verify(response, times(2)).getContent();
     verify(response, times(2)).getContentType();
-    expectedLogs.verifyInfo("BigQuery insertAll exceeded rate limit, 
retrying");
+    expectedLogs.verifyInfo("BigQuery insertAll error, retrying:");
   }
+
+  /** Tests that {@link DatasetServiceImpl#insertAll} retries quota exceeded 
attempts. */
+  @Test
+  public void testInsertQuotaExceededRetry() throws Exception {
+    TableReference ref =
+        new 
TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
+    List<ValueInSingleWindow<TableRow>> rows = new ArrayList<>();
+    rows.add(wrapValue(new TableRow()));
+
+    // First response is 403 quota exceeded, second response has valid payload.
+    when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+    when(response.getStatusCode()).thenReturn(403).thenReturn(200);
+    when(response.getContent())
+        .thenReturn(toStream(errorWithReasonAndStatus("quotaExceeded", 403)))
+        .thenReturn(toStream(new TableDataInsertAllResponse()));
+
+    DatasetServiceImpl dataService =
+        new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());
+    dataService.insertAll(
+        ref,
+        rows,
+        null,
+        BackOffAdapter.toGcpBackOff(TEST_BACKOFF.backoff()),
+        new MockSleeper(),
+        InsertRetryPolicy.alwaysRetry(),
+        null,
+        null,
+        false,
+        false);
+    verify(response, times(2)).getStatusCode();
+    verify(response, times(2)).getContent();
+    verify(response, times(2)).getContentType();
+    expectedLogs.verifyInfo("BigQuery insertAll error, retrying:");
+  }
+
   // A BackOff that makes a total of 4 attempts
   private static final FluentBackoff TEST_BACKOFF =
       FluentBackoff.DEFAULT
@@ -626,15 +661,18 @@ public void testInsertFailsGracefully() throws Exception {
     expectedLogs.verifyInfo("Retrying 1 failed inserts to BigQuery");
   }
 
-  /** Tests that {@link DatasetServiceImpl#insertAll} does not retry 
non-rate-limited attempts. */
+  /**
+   * Tests that {@link DatasetServiceImpl#insertAll} retries other 
non-rate-limited,
+   * non-quota-exceeded attempts.
+   */
   @Test
-  public void testInsertDoesNotRetry() throws Throwable {
+  public void testInsertOtherRetry() throws Throwable {
     TableReference ref =
         new 
TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
     List<ValueInSingleWindow<TableRow>> rows = new ArrayList<>();
     rows.add(wrapValue(new TableRow()));
 
-    // First response is 403 not-rate-limited, second response has valid 
payload but should not
+    // First response is 403 non-{rate-limited, quota-exceeded}, second 
response has valid payload but should not
     // be invoked.
     when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
     when(response.getStatusCode()).thenReturn(403).thenReturn(200);
@@ -642,31 +680,23 @@ public void testInsertDoesNotRetry() throws Throwable {
         .thenReturn(toStream(errorWithReasonAndStatus("actually forbidden", 
403)))
         .thenReturn(toStream(new TableDataInsertAllResponse()));
 
-    thrown.expect(GoogleJsonResponseException.class);
-    thrown.expectMessage("actually forbidden");
-
     DatasetServiceImpl dataService =
         new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());
-
-    try {
-      dataService.insertAll(
-          ref,
-          rows,
-          null,
-          BackOffAdapter.toGcpBackOff(TEST_BACKOFF.backoff()),
-          new MockSleeper(),
-          InsertRetryPolicy.alwaysRetry(),
-          null,
-          null,
-          false,
-          false);
-      fail();
-    } catch (RuntimeException e) {
-      verify(response, times(1)).getStatusCode();
-      verify(response, times(1)).getContent();
-      verify(response, times(1)).getContentType();
-      throw e.getCause();
-    }
+    dataService.insertAll(
+        ref,
+        rows,
+        null,
+        BackOffAdapter.toGcpBackOff(TEST_BACKOFF.backoff()),
+        new MockSleeper(),
+        InsertRetryPolicy.alwaysRetry(),
+        null,
+        null,
+        false,
+        false);
+    verify(response, times(2)).getStatusCode();
+    verify(response, times(2)).getContent();
+    verify(response, times(2)).getContentType();
+    expectedLogs.verifyInfo("BigQuery insertAll error, retrying:");
   }
 
   /**


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 173882)
    Time Spent: 4h  (was: 3h 50m)

> BigQueryIO doesn't handle quotaExceeded errors properly
> -------------------------------------------------------
>
>                 Key: BEAM-5514
>                 URL: https://issues.apache.org/jira/browse/BEAM-5514
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-gcp
>            Reporter: Kevin Peterson
>            Assignee: Heejong Lee
>            Priority: Major
>          Time Spent: 4h
>  Remaining Estimate: 0h
>
> When exceeding a streaming quota for BigQuery insertAll requests, BigQuery 
> returns a 403 with reason "quotaExceeded".
> The current implementation of BigQueryIO does not consider this to be a rate 
> limited exception, and therefore does not perform exponential backoff 
> properly, leading to repeated calls to BQ.
> The actual error is in the 
> [ApiErrorExtractor|https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java#L739]
>  class, which is called from 
> [BigQueryServicesImpl|https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/util/src/main/java/com/google/cloud/hadoop/util/ApiErrorExtractor.java#L263]
>  to determine how to retry the failure.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to