Repository: incubator-beam Updated Branches: refs/heads/master 1a34bf038 -> 5a80bdc61
[BEAM-48] Code cleanup in BigQueryIO 1. Use toJsonString() to avoid try catch 2. add getTableWithDefaultProject in BigQuery.Write Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6ddfd8a4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6ddfd8a4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6ddfd8a4 Branch: refs/heads/master Commit: 6ddfd8a45c70b5ec741e14f7d759fb6c33513530 Parents: 1a34bf0 Author: Pei He <pe...@google.com> Authored: Fri May 20 17:16:58 2016 -0700 Committer: Dan Halperin <dhalp...@google.com> Committed: Tue Jun 7 17:23:33 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/io/BigQueryIO.java | 50 ++++++++++---------- 1 file changed, 24 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6ddfd8a4/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java index 030dde0..38009bf 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java @@ -635,13 +635,13 @@ public class BigQueryIO { } /** - * Returns the table to write, or {@code null} if reading from a query instead. + * Returns the table to read, or {@code null} if reading from a query instead. * - * <p>If the table's project is not specified, use the default one. + * <p>If the table's project is not specified, use the executing project. */ @Nullable private TableReference getTableWithDefaultProject(BigQueryOptions bqOptions) { TableReference table = getTable(); - if (table != null && table.getProjectId() == null) { + if (table != null && Strings.isNullOrEmpty(table.getProjectId())) { // If user does not specify a project we assume the table to be located in // the default project. table.setProjectId(bqOptions.getProject()); @@ -652,6 +652,7 @@ public class BigQueryIO { /** * Returns the table to read, or {@code null} if reading from a query instead. */ + @Nullable public TableReference getTable() { return fromJsonString(jsonTableRef, TableReference.class); } @@ -767,11 +768,7 @@ public class BigQueryIO { String executingProject) { super(jobIdToken, extractDestinationDir, bqServices, executingProject); checkNotNull(table, "table"); - try { - this.jsonTable = JSON_FACTORY.toString(table); - } catch (IOException e) { - throw new RuntimeException("Cannot initialize table to JSON strings.", e); - } + this.jsonTable = toJsonString(table); this.tableSizeBytes = new AtomicReference<>(); } @@ -849,11 +846,7 @@ public class BigQueryIO { super(jobIdToken, extractDestinationDir, bqServices, checkNotNull(queryTempTableRef, "queryTempTableRef").getProjectId()); this.query = checkNotNull(query, "query"); - try { - this.jsonQueryTempTable = JSON_FACTORY.toString(queryTempTableRef); - } catch (IOException e) { - throw new RuntimeException("Cannot initialize table to JSON strings.", e); - } + this.jsonQueryTempTable = toJsonString(queryTempTableRef); this.flattenResults = checkNotNull(flattenResults, "flattenResults"); this.dryRunJobStats = new AtomicReference<>(); } @@ -1075,12 +1068,8 @@ public class BigQueryIO { new SerializableFunction<GenericRecord, TableRow>() { @Override public TableRow apply(GenericRecord input) { - try { - return AvroUtils.convertGenericRecordToTableRow( - input, JSON_FACTORY.fromString(jsonSchema, TableSchema.class)); - } catch (IOException e) { - throw new RuntimeException("Failed to convert GenericRecord to TableRow", e); - } + return AvroUtils.convertGenericRecordToTableRow( + input, fromJsonString(jsonSchema, TableSchema.class)); }}; List<BoundedSource<TableRow>> avroSources = Lists.newArrayList(); @@ -1670,13 +1659,7 @@ public class BigQueryIO { // The user specified a table. if (jsonTableRef != null && validate) { - TableReference table = getTable(); - - // If user does not specify a project we assume the table to be located in the project - // configured in BigQueryOptions. - if (Strings.isNullOrEmpty(table.getProjectId())) { - table.setProjectId(options.getProject()); - } + TableReference table = getTableWithDefaultProject(options); // Check for destination table presence and emptiness for early failure notification. // Note that a presence check can fail when the table or dataset is created by an earlier @@ -1807,6 +1790,21 @@ public class BigQueryIO { return fromJsonString(jsonSchema, TableSchema.class); } + /** + * Returns the table to write, or {@code null} if writing with {@code tableRefFunction}. + * + * <p>If the table's project is not specified, use the executing project. + */ + @Nullable private TableReference getTableWithDefaultProject(BigQueryOptions bqOptions) { + TableReference table = getTable(); + if (table != null && Strings.isNullOrEmpty(table.getProjectId())) { + // If user does not specify a project we assume the table to be located in + // the default project. + table.setProjectId(bqOptions.getProject()); + } + return table; + } + /** Returns the table reference, or {@code null}. */ @Nullable public TableReference getTable() {