Repository: incubator-beam Updated Branches: refs/heads/master ef2c51400 -> 4a0e426a8
[BEAM-142] - BigQueryIO: don't unnecessarily initialize an ExecutorService to validate parameters By default, BigQueryIO initializes a ExecutorContext, however AppEngine doesn't allow modification of threads to be daemon threads. Using GcsOptions.ExecutorContext to create the executorContext. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/623bed9e Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/623bed9e Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/623bed9e Branch: refs/heads/master Commit: 623bed9ea3f1ce3b7efd1d875d1839b2737ebda3 Parents: ef2c514 Author: Lucas Amorim <lucasamo...@protonmail.com> Authored: Tue Jun 28 14:28:17 2016 -0700 Committer: Luke Cwik <lc...@google.com> Committed: Thu Jun 30 09:54:12 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/io/BigQueryIO.java | 10 +++---- .../beam/sdk/util/BigQueryTableInserter.java | 31 +++++++++++++------- .../sdk/util/BigQueryTableInserterTest.java | 17 +++++++---- .../apache/beam/sdk/util/BigQueryUtilTest.java | 12 +++++--- .../util/RetryHttpRequestInitializerTest.java | 5 +++- 5 files changed, 48 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/623bed9e/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java index a9d85b8..790e3ff 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java @@ -1594,7 +1594,7 @@ public class BigQueryIO { TableReference table) { try { Bigquery client = Transport.newBigQueryClient(options).build(); - BigQueryTableInserter inserter = new BigQueryTableInserter(client); + BigQueryTableInserter inserter = new BigQueryTableInserter(client, options); if (!inserter.isEmpty(table)) { throw new IllegalArgumentException( "BigQuery table is not empty: " + BigQueryIO.toTableSpec(table)); @@ -2084,7 +2084,7 @@ public class BigQueryIO { for (String tableSpec : tableRows.keySet()) { TableReference tableReference = getOrCreateTable(options, tableSpec); flushRows(client, tableReference, tableRows.get(tableSpec), - uniqueIdsForTableRows.get(tableSpec)); + uniqueIdsForTableRows.get(tableSpec), options); } tableRows.clear(); uniqueIdsForTableRows.clear(); @@ -2109,7 +2109,7 @@ public class BigQueryIO { if (!createdTables.contains(tableSpec)) { TableSchema tableSchema = JSON_FACTORY.fromString(jsonTableSchema, TableSchema.class); Bigquery client = Transport.newBigQueryClient(options).build(); - BigQueryTableInserter inserter = new BigQueryTableInserter(client); + BigQueryTableInserter inserter = new BigQueryTableInserter(client, options); inserter.getOrCreateTable(tableReference, WriteDisposition.WRITE_APPEND, CreateDisposition.CREATE_IF_NEEDED, tableSchema); createdTables.add(tableSpec); @@ -2121,10 +2121,10 @@ public class BigQueryIO { /** Writes the accumulated rows into BigQuery with streaming API. */ private void flushRows(Bigquery client, TableReference tableReference, - List<TableRow> tableRows, List<String> uniqueIds) { + List<TableRow> tableRows, List<String> uniqueIds, BigQueryOptions options) { if (!tableRows.isEmpty()) { try { - BigQueryTableInserter inserter = new BigQueryTableInserter(client); + BigQueryTableInserter inserter = new BigQueryTableInserter(client, options); inserter.insertAll(tableReference, tableRows, uniqueIds, byteCountAggregator); } catch (IOException e) { throw new RuntimeException(e); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/623bed9e/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableInserter.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableInserter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableInserter.java index f87a3c4..84004a7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableInserter.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableInserter.java @@ -22,6 +22,8 @@ import static com.google.common.base.Preconditions.checkNotNull; import org.apache.beam.sdk.io.BigQueryIO; import org.apache.beam.sdk.io.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.BigQueryIO.Write.WriteDisposition; +import org.apache.beam.sdk.options.GcsOptions; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator; import com.google.api.client.util.BackOff; @@ -38,7 +40,6 @@ import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; import com.google.cloud.hadoop.util.ApiErrorExtractor; import com.google.common.annotations.VisibleForTesting; -import com.google.common.util.concurrent.MoreExecutors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,9 +51,7 @@ import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; @@ -83,43 +82,51 @@ public class BigQueryTableInserter { private final TableReference defaultRef; private final long maxRowsPerBatch; - private static final ExecutorService executor = MoreExecutors.getExitingExecutorService( - (ThreadPoolExecutor) Executors.newFixedThreadPool(100), 10, TimeUnit.SECONDS); + private ExecutorService executor; /** * Constructs a new row inserter. * * @param client a BigQuery client + * @param options a PipelineOptions object */ - public BigQueryTableInserter(Bigquery client) { + public BigQueryTableInserter(Bigquery client, PipelineOptions options) { this.client = client; this.defaultRef = null; this.maxRowsPerBatch = MAX_ROWS_PER_BATCH; + this.executor = options.as(GcsOptions.class).getExecutorService(); } /** * Constructs a new row inserter. * * @param client a BigQuery client + * @param options a PipelineOptions object * @param defaultRef identifies the table to insert into - * @deprecated replaced by {@link #BigQueryTableInserter(Bigquery)} + * @deprecated replaced by {@link #BigQueryTableInserter(Bigquery, PipelineOptions)} */ @Deprecated - public BigQueryTableInserter(Bigquery client, TableReference defaultRef) { + public BigQueryTableInserter(Bigquery client, PipelineOptions options, + TableReference defaultRef) { this.client = client; this.defaultRef = defaultRef; this.maxRowsPerBatch = MAX_ROWS_PER_BATCH; + this.executor = options.as(GcsOptions.class).getExecutorService(); } /** * Constructs a new row inserter. * * @param client a BigQuery client + * @param options a PipelineOptions object + * @param maxRowsPerBatch maximum number of rows to insert per call to BigQuery */ - public BigQueryTableInserter(Bigquery client, int maxRowsPerBatch) { + public BigQueryTableInserter(Bigquery client, PipelineOptions options, + int maxRowsPerBatch) { this.client = client; this.defaultRef = null; this.maxRowsPerBatch = maxRowsPerBatch; + this.executor = options.as(GcsOptions.class).getExecutorService(); } /** @@ -127,13 +134,15 @@ public class BigQueryTableInserter { * * @param client a BigQuery client * @param defaultRef identifies the default table to insert into - * @deprecated replaced by {@link #BigQueryTableInserter(Bigquery, int)} + * @deprecated replaced by {@link #BigQueryTableInserter(Bigquery, PipelineOptions, int)} */ @Deprecated - public BigQueryTableInserter(Bigquery client, TableReference defaultRef, int maxRowsPerBatch) { + public BigQueryTableInserter(Bigquery client, PipelineOptions options, + TableReference defaultRef, int maxRowsPerBatch) { this.client = client; this.defaultRef = defaultRef; this.maxRowsPerBatch = maxRowsPerBatch; + this.executor = options.as(GcsOptions.class).getExecutorService(); } /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/623bed9e/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryTableInserterTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryTableInserterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryTableInserterTest.java index 7d9c8a8..344e916 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryTableInserterTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryTableInserterTest.java @@ -28,6 +28,8 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.ExpectedLogs; import com.google.api.client.googleapis.json.GoogleJsonError; @@ -75,6 +77,7 @@ public class BigQueryTableInserterTest { @Rule public ExpectedLogs expectedLogs = ExpectedLogs.none(BigQueryTableInserter.class); @Mock private LowLevelHttpResponse response; private Bigquery bigquery; + private PipelineOptions options; @Before public void setUp() { @@ -97,6 +100,8 @@ public class BigQueryTableInserterTest { new Bigquery.Builder( transport, Transport.getJsonFactory(), new RetryHttpRequestInitializer()) .build(); + + options = PipelineOptionsFactory.create(); } @After @@ -139,7 +144,7 @@ public class BigQueryTableInserterTest { when(response.getStatusCode()).thenReturn(200); when(response.getContent()).thenReturn(toStream(testTable)); - BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery); + BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery, options); Table ret = inserter.tryCreateTable( new Table(), @@ -160,7 +165,7 @@ public class BigQueryTableInserterTest { public void testCreateTableSucceedsAlreadyExists() throws IOException { when(response.getStatusCode()).thenReturn(409); // 409 means already exists - BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery); + BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery, options); Table ret = inserter.tryCreateTable( new Table(), @@ -191,7 +196,7 @@ public class BigQueryTableInserterTest { .thenReturn(toStream(errorWithReasonAndStatus("rateLimitExceeded", 403))) .thenReturn(toStream(testTable)); - BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery); + BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery, options); Table ret = inserter.tryCreateTable( testTable, @@ -227,7 +232,7 @@ public class BigQueryTableInserterTest { thrown.expect(GoogleJsonResponseException.class); thrown.expectMessage("actually forbidden"); - BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery); + BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery, options); try { inserter.tryCreateTable( new Table(), @@ -261,7 +266,7 @@ public class BigQueryTableInserterTest { .thenReturn(toStream(errorWithReasonAndStatus("rateLimitExceeded", 403))) .thenReturn(toStream(new TableDataInsertAllResponse())); - BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery); + BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery, options); inserter.insertAll(ref, rows); verify(response, times(2)).getStatusCode(); @@ -291,7 +296,7 @@ public class BigQueryTableInserterTest { thrown.expect(GoogleJsonResponseException.class); thrown.expectMessage("actually forbidden"); - BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery); + BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery, options); try { inserter.insertAll(ref, rows); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/623bed9e/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryUtilTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryUtilTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryUtilTest.java index 65fbeb7..c033a7d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryUtilTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryUtilTest.java @@ -31,6 +31,8 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import org.apache.beam.sdk.io.BigQueryIO; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.Sum; @@ -81,10 +83,12 @@ public class BigQueryUtilTest { @Mock private Bigquery.Tables.Get mockTablesGet; @Mock private Bigquery.Tabledata mockTabledata; @Mock private Bigquery.Tabledata.List mockTabledataList; + private PipelineOptions options; @Before public void setUp() { MockitoAnnotations.initMocks(this); + this.options = PipelineOptionsFactory.create(); } @After @@ -369,7 +373,7 @@ public class BigQueryUtilTest { TableReference ref = BigQueryIO .parseTableSpec("project:dataset.table"); - BigQueryTableInserter inserter = new BigQueryTableInserter(mockClient); + BigQueryTableInserter inserter = new BigQueryTableInserter(mockClient, options); inserter.getOrCreateTable(ref, BigQueryIO.Write.WriteDisposition.WRITE_APPEND, BigQueryIO.Write.CreateDisposition.CREATE_NEVER, null); @@ -387,7 +391,7 @@ public class BigQueryUtilTest { TableReference ref = BigQueryIO .parseTableSpec("project:dataset.table"); - BigQueryTableInserter inserter = new BigQueryTableInserter(mockClient); + BigQueryTableInserter inserter = new BigQueryTableInserter(mockClient, options); inserter.getOrCreateTable(ref, BigQueryIO.Write.WriteDisposition.WRITE_EMPTY, BigQueryIO.Write.CreateDisposition.CREATE_NEVER, null); @@ -408,7 +412,7 @@ public class BigQueryUtilTest { TableReference ref = BigQueryIO .parseTableSpec("project:dataset.table"); - BigQueryTableInserter inserter = new BigQueryTableInserter(mockClient); + BigQueryTableInserter inserter = new BigQueryTableInserter(mockClient, options); try { inserter.getOrCreateTable(ref, BigQueryIO.Write.WriteDisposition.WRITE_EMPTY, @@ -432,7 +436,7 @@ public class BigQueryUtilTest { TableReference ref = BigQueryIO .parseTableSpec("project:dataset.table"); - BigQueryTableInserter inserter = new BigQueryTableInserter(mockClient, 5); + BigQueryTableInserter inserter = new BigQueryTableInserter(mockClient, options, 5); List<TableRow> rows = new ArrayList<>(); List<String> ids = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/623bed9e/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java index 7d212d4..83ffaa1 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java @@ -30,6 +30,8 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; +import org.apache.beam.sdk.options.PipelineOptionsFactory; + import com.google.api.client.auth.oauth2.Credential; import com.google.api.client.http.HttpRequest; import com.google.api.client.http.HttpResponse; @@ -281,7 +283,8 @@ public class RetryHttpRequestInitializerTest { // RetryHttpInitializer. Bigquery b = new Bigquery.Builder( transport, Transport.getJsonFactory(), new RetryHttpRequestInitializer()).build(); - BigQueryTableInserter inserter = new BigQueryTableInserter(b); + + BigQueryTableInserter inserter = new BigQueryTableInserter(b, PipelineOptionsFactory.create()); TableReference t = new TableReference() .setProjectId("project").setDatasetId("dataset").setTableId("table");