[ https://issues.apache.org/jira/browse/BEAM-5514?focusedWorklogId=173882&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-173882 ]
ASF GitHub Bot logged work on BEAM-5514: ---------------------------------------- Author: ASF GitHub Bot Created on: 11/Dec/18 03:18 Start Date: 11/Dec/18 03:18 Worklog Time Spent: 10m Work Description: chamikaramj closed pull request #7189: [BEAM-5514] BigQueryIO doesn't handle quotaExceeded errors properly URL: https://github.com/apache/beam/pull/7189 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index cbdb44f8ade3..f147634f9cab 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -736,16 +736,15 @@ public void deleteDataset(String projectId, String datasetId) try { return insert.execute().getInsertErrors(); } catch (IOException e) { - if (new ApiErrorExtractor().rateLimited(e)) { - LOG.info("BigQuery insertAll exceeded rate limit, retrying"); - try { - sleeper.sleep(backoff1.nextBackOffMillis()); - } catch (InterruptedException interrupted) { - throw new IOException( - "Interrupted while waiting before retrying insertAll"); - } - } else { - throw e; + LOG.info( + String.format( + "BigQuery insertAll error, retrying: %s", + ApiErrorExtractor.INSTANCE.getErrorMessage(e))); + try { + sleeper.sleep(backoff1.nextBackOffMillis()); + } catch (InterruptedException interrupted) { + throw new IOException( + "Interrupted while waiting before retrying insertAll"); } } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java index 9f20e5087e92..48fd5b9eea41 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java @@ -490,9 +490,9 @@ public void testExecuteWithRetries() throws IOException, InterruptedException { PaneInfo.ON_TIME_AND_ONLY_FIRING); } - /** Tests that {@link DatasetServiceImpl#insertAll} retries quota rate limited attempts. */ + /** Tests that {@link DatasetServiceImpl#insertAll} retries rate limited attempts. */ @Test - public void testInsertRetry() throws Exception { + public void testInsertRateLimitRetry() throws Exception { TableReference ref = new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table"); List<ValueInSingleWindow<TableRow>> rows = new ArrayList<>(); @@ -521,8 +521,43 @@ public void testInsertRetry() throws Exception { verify(response, times(2)).getStatusCode(); verify(response, times(2)).getContent(); verify(response, times(2)).getContentType(); - expectedLogs.verifyInfo("BigQuery insertAll exceeded rate limit, retrying"); + expectedLogs.verifyInfo("BigQuery insertAll error, retrying:"); } + + /** Tests that {@link DatasetServiceImpl#insertAll} retries quota exceeded attempts. */ + @Test + public void testInsertQuotaExceededRetry() throws Exception { + TableReference ref = + new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table"); + List<ValueInSingleWindow<TableRow>> rows = new ArrayList<>(); + rows.add(wrapValue(new TableRow())); + + // First response is 403 quota exceeded, second response has valid payload. + when(response.getContentType()).thenReturn(Json.MEDIA_TYPE); + when(response.getStatusCode()).thenReturn(403).thenReturn(200); + when(response.getContent()) + .thenReturn(toStream(errorWithReasonAndStatus("quotaExceeded", 403))) + .thenReturn(toStream(new TableDataInsertAllResponse())); + + DatasetServiceImpl dataService = + new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create()); + dataService.insertAll( + ref, + rows, + null, + BackOffAdapter.toGcpBackOff(TEST_BACKOFF.backoff()), + new MockSleeper(), + InsertRetryPolicy.alwaysRetry(), + null, + null, + false, + false); + verify(response, times(2)).getStatusCode(); + verify(response, times(2)).getContent(); + verify(response, times(2)).getContentType(); + expectedLogs.verifyInfo("BigQuery insertAll error, retrying:"); + } + // A BackOff that makes a total of 4 attempts private static final FluentBackoff TEST_BACKOFF = FluentBackoff.DEFAULT @@ -626,15 +661,18 @@ public void testInsertFailsGracefully() throws Exception { expectedLogs.verifyInfo("Retrying 1 failed inserts to BigQuery"); } - /** Tests that {@link DatasetServiceImpl#insertAll} does not retry non-rate-limited attempts. */ + /** + * Tests that {@link DatasetServiceImpl#insertAll} retries other non-rate-limited, + * non-quota-exceeded attempts. + */ @Test - public void testInsertDoesNotRetry() throws Throwable { + public void testInsertOtherRetry() throws Throwable { TableReference ref = new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table"); List<ValueInSingleWindow<TableRow>> rows = new ArrayList<>(); rows.add(wrapValue(new TableRow())); - // First response is 403 not-rate-limited, second response has valid payload but should not + // First response is 403 non-{rate-limited, quota-exceeded}, second response has valid payload but should not // be invoked. when(response.getContentType()).thenReturn(Json.MEDIA_TYPE); when(response.getStatusCode()).thenReturn(403).thenReturn(200); @@ -642,31 +680,23 @@ public void testInsertDoesNotRetry() throws Throwable { .thenReturn(toStream(errorWithReasonAndStatus("actually forbidden", 403))) .thenReturn(toStream(new TableDataInsertAllResponse())); - thrown.expect(GoogleJsonResponseException.class); - thrown.expectMessage("actually forbidden"); - DatasetServiceImpl dataService = new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create()); - - try { - dataService.insertAll( - ref, - rows, - null, - BackOffAdapter.toGcpBackOff(TEST_BACKOFF.backoff()), - new MockSleeper(), - InsertRetryPolicy.alwaysRetry(), - null, - null, - false, - false); - fail(); - } catch (RuntimeException e) { - verify(response, times(1)).getStatusCode(); - verify(response, times(1)).getContent(); - verify(response, times(1)).getContentType(); - throw e.getCause(); - } + dataService.insertAll( + ref, + rows, + null, + BackOffAdapter.toGcpBackOff(TEST_BACKOFF.backoff()), + new MockSleeper(), + InsertRetryPolicy.alwaysRetry(), + null, + null, + false, + false); + verify(response, times(2)).getStatusCode(); + verify(response, times(2)).getContent(); + verify(response, times(2)).getContentType(); + expectedLogs.verifyInfo("BigQuery insertAll error, retrying:"); } /** ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 173882) Time Spent: 4h (was: 3h 50m) > BigQueryIO doesn't handle quotaExceeded errors properly > ------------------------------------------------------- > > Key: BEAM-5514 > URL: https://issues.apache.org/jira/browse/BEAM-5514 > Project: Beam > Issue Type: Bug > Components: io-java-gcp > Reporter: Kevin Peterson > Assignee: Heejong Lee > Priority: Major > Time Spent: 4h > Remaining Estimate: 0h > > When exceeding a streaming quota for BigQuery insertAll requests, BigQuery > returns a 403 with reason "quotaExceeded". > The current implementation of BigQueryIO does not consider this to be a rate > limited exception, and therefore does not perform exponential backoff > properly, leading to repeated calls to BQ. > The actual error is in the > [ApiErrorExtractor|https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java#L739] > class, which is called from > [BigQueryServicesImpl|https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/util/src/main/java/com/google/cloud/hadoop/util/ApiErrorExtractor.java#L263] > to determine how to retry the failure. -- This message was sent by Atlassian JIRA (v7.6.3#76005)