Condense BigQueryIO.Write.Bound into BigQueryIO.Write
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7d1f4400 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7d1f4400 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7d1f4400 Branch: refs/heads/master Commit: 7d1f4400ab844c7b4e636482891be55174390431 Parents: 825338a Author: Eugene Kirpichov <[email protected]> Authored: Thu Mar 2 17:26:09 2017 -0800 Committer: Thomas Groh <[email protected]> Committed: Tue Mar 14 15:54:24 2017 -0700 ---------------------------------------------------------------------- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 1080 +++++++++--------- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 116 +- 2 files changed, 552 insertions(+), 644 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/7d1f4400/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index f6c8575..90d7f67 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -1415,7 +1415,43 @@ public class BigQueryIO { * } * }}</pre> */ - public static class Write { + public static class Write extends PTransform<PCollection<TableRow>, PDone> { + // Maximum number of files in a single partition. + static final int MAX_NUM_FILES = 10000; + + // Maximum number of bytes in a single partition -- 11 TiB just under BQ's 12 TiB limit. + static final long MAX_SIZE_BYTES = 11 * (1L << 40); + + // The maximum number of retry jobs. + private static final int MAX_RETRY_JOBS = 3; + + // The maximum number of retries to poll the status of a job. + // It sets to {@code Integer.MAX_VALUE} to block until the BigQuery job finishes. + private static final int LOAD_JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE; + + @Nullable private final ValueProvider<String> jsonTableRef; + + @Nullable private final SerializableFunction<BoundedWindow, TableReference> tableRefFunction; + + // Table schema. The schema is required only if the table does not exist. + @Nullable private final ValueProvider<String> jsonSchema; + + // Options for creating the table. Valid values are CREATE_IF_NEEDED and + // CREATE_NEVER. + final CreateDisposition createDisposition; + + // Options for writing to the table. Valid values are WRITE_TRUNCATE, + // WRITE_APPEND and WRITE_EMPTY. + final WriteDisposition writeDisposition; + + @Nullable + final String tableDescription; + + // An option to indicate if table validation is desired. Default is true. + final boolean validate; + + @Nullable private BigQueryServices bigQueryServices; + /** * An enumeration type for the BigQuery create disposition strings. * @@ -1488,18 +1524,18 @@ public class BigQueryIO { * * <p>Refer to {@link #parseTableSpec(String)} for the specification format. */ - public static Bound to(String tableSpec) { - return new Bound().to(tableSpec); + public static Write to(String tableSpec) { + return new Write().withTableSpec(tableSpec); } /** Creates a write transformation for the given table. */ - public static Bound to(ValueProvider<String> tableSpec) { - return new Bound().to(tableSpec); + public static Write to(ValueProvider<String> tableSpec) { + return new Write().withTableSpec(tableSpec); } /** Creates a write transformation for the given table. */ - public static Bound to(TableReference table) { - return new Bound().to(table); + public static Write to(TableReference table) { + return new Write().withTableRef(table); } /** @@ -1513,8 +1549,8 @@ public class BigQueryIO { * <p>{@code tableSpecFunction} should be deterministic. When given the same window, it should * always return the same table specification. */ - public static Bound to(SerializableFunction<BoundedWindow, String> tableSpecFunction) { - return new Bound().to(tableSpecFunction); + public static Write to(SerializableFunction<BoundedWindow, String> tableSpecFunction) { + return new Write().withTableSpec(tableSpecFunction); } /** @@ -1524,634 +1560,547 @@ public class BigQueryIO { * <p>{@code tableRefFunction} should be deterministic. When given the same window, it should * always return the same table reference. */ - public static Bound toTableReference( + private static Write toTableReference( SerializableFunction<BoundedWindow, TableReference> tableRefFunction) { - return new Bound().toTableReference(tableRefFunction); + return new Write().withTableRef(tableRefFunction); + } + + private static class TranslateTableSpecFunction implements + SerializableFunction<BoundedWindow, TableReference> { + private SerializableFunction<BoundedWindow, String> tableSpecFunction; + + TranslateTableSpecFunction(SerializableFunction<BoundedWindow, String> tableSpecFunction) { + this.tableSpecFunction = tableSpecFunction; + } + + @Override + public TableReference apply(BoundedWindow value) { + return parseTableSpec(tableSpecFunction.apply(value)); + } + } + + private Write() { + this( + null /* name */, + null /* jsonTableRef */, + null /* tableRefFunction */, + null /* jsonSchema */, + CreateDisposition.CREATE_IF_NEEDED, + WriteDisposition.WRITE_EMPTY, + null /* tableDescription */, + true /* validate */, + null /* bigQueryServices */); + } + + private Write(String name, @Nullable ValueProvider<String> jsonTableRef, + @Nullable SerializableFunction<BoundedWindow, TableReference> tableRefFunction, + @Nullable ValueProvider<String> jsonSchema, + CreateDisposition createDisposition, + WriteDisposition writeDisposition, + @Nullable String tableDescription, + boolean validate, + @Nullable BigQueryServices bigQueryServices) { + super(name); + this.jsonTableRef = jsonTableRef; + this.tableRefFunction = tableRefFunction; + this.jsonSchema = jsonSchema; + this.createDisposition = checkNotNull(createDisposition, "createDisposition"); + this.writeDisposition = checkNotNull(writeDisposition, "writeDisposition"); + this.tableDescription = tableDescription; + this.validate = validate; + this.bigQueryServices = bigQueryServices; } /** - * Creates a write transformation with the specified schema to use in table creation. + * Returns a copy of this write transformation, but writing to the specified table. Refer to + * {@link #parseTableSpec(String)} for the specification format. * - * <p>The schema is <i>required</i> only if writing to a table that does not already - * exist, and {@link CreateDisposition} is set to - * {@link CreateDisposition#CREATE_IF_NEEDED}. + * <p>Does not modify this object. */ - public static Bound withSchema(TableSchema schema) { - return new Bound().withSchema(schema); + private Write withTableSpec(String tableSpec) { + return withTableRef(NestedValueProvider.of( + StaticValueProvider.of(tableSpec), new TableSpecToTableRef())); } /** - * Like {@link #withSchema(TableSchema)}, but with a {@link ValueProvider}. + * Returns a copy of this write transformation, but writing to the specified table. + * + * <p>Does not modify this object. */ - public static Bound withSchema(ValueProvider<TableSchema> schema) { - return new Bound().withSchema(schema); + public Write withTableRef(TableReference table) { + return withTableSpec(StaticValueProvider.of(toTableSpec(table))); } - /** Creates a write transformation with the specified options for creating the table. */ - public static Bound withCreateDisposition(CreateDisposition disposition) { - return new Bound().withCreateDisposition(disposition); + /** + * Returns a copy of this write transformation, but writing to the specified table. Refer to + * {@link #parseTableSpec(String)} for the specification format. + * + * <p>Does not modify this object. + */ + public Write withTableSpec(ValueProvider<String> tableSpec) { + return withTableRef(NestedValueProvider.of(tableSpec, new TableSpecToTableRef())); } - /** Creates a write transformation with the specified options for writing to the table. */ - public static Bound withWriteDisposition(WriteDisposition disposition) { - return new Bound().withWriteDisposition(disposition); + /** + * Returns a copy of this write transformation, but writing to the specified table. + * + * <p>Does not modify this object. + */ + private Write withTableRef(ValueProvider<TableReference> table) { + return new Write(name, + NestedValueProvider.of(table, new TableRefToJson()), + tableRefFunction, jsonSchema, createDisposition, + writeDisposition, tableDescription, validate, bigQueryServices); } - /** Creates a write transformation with the specified table description. */ - public static Bound withTableDescription(@Nullable String tableDescription) { - return new Bound().withTableDescription(tableDescription); + /** + * Returns a copy of this write transformation, but using the specified function to determine + * which table to write to for each window. + * + * <p>Does not modify this object. + * + * <p>{@code tableSpecFunction} should be deterministic. When given the same window, it + * should always return the same table specification. + */ + private Write withTableSpec( + SerializableFunction<BoundedWindow, String> tableSpecFunction) { + return toTableReference(new TranslateTableSpecFunction(tableSpecFunction)); } /** - * Creates a write transformation with BigQuery table validation disabled. + * Returns a copy of this write transformation, but using the specified function to determine + * which table to write to for each window. + * + * <p>Does not modify this object. + * + * <p>{@code tableRefFunction} should be deterministic. When given the same window, it should + * always return the same table reference. */ - public static Bound withoutValidation() { - return new Bound().withoutValidation(); + private Write withTableRef( + SerializableFunction<BoundedWindow, TableReference> tableRefFunction) { + return new Write(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition, + writeDisposition, tableDescription, validate, bigQueryServices); } /** - * A {@link PTransform} that can write either a bounded or unbounded - * {@link PCollection} of {@link TableRow TableRows} to a BigQuery table. + * Returns a copy of this write transformation, but using the specified schema for rows + * to be written. + * + * <p>Does not modify this object. */ - public static class Bound extends PTransform<PCollection<TableRow>, PDone> { - // Maximum number of files in a single partition. - static final int MAX_NUM_FILES = 10000; - - // Maximum number of bytes in a single partition -- 11 TiB just under BQ's 12 TiB limit. - static final long MAX_SIZE_BYTES = 11 * (1L << 40); - - // The maximum number of retry jobs. - static final int MAX_RETRY_JOBS = 3; - - // The maximum number of retries to poll the status of a job. - // It sets to {@code Integer.MAX_VALUE} to block until the BigQuery job finishes. - static final int LOAD_JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE; - - @Nullable final ValueProvider<String> jsonTableRef; - - @Nullable final SerializableFunction<BoundedWindow, TableReference> tableRefFunction; - - // Table schema. The schema is required only if the table does not exist. - @Nullable final ValueProvider<String> jsonSchema; + public Write withSchema(TableSchema schema) { + return new Write(name, jsonTableRef, tableRefFunction, + StaticValueProvider.of(toJsonString(schema)), + createDisposition, writeDisposition, tableDescription, validate, bigQueryServices); + } - // Options for creating the table. Valid values are CREATE_IF_NEEDED and - // CREATE_NEVER. - final CreateDisposition createDisposition; + /** + * Like {@link #withSchema(TableSchema)}, but with a {@link ValueProvider}. + */ + public Write withSchema(ValueProvider<TableSchema> schema) { + return new Write(name, jsonTableRef, tableRefFunction, + NestedValueProvider.of(schema, new TableSchemaToJsonSchema()), + createDisposition, writeDisposition, tableDescription, validate, bigQueryServices); + } - // Options for writing to the table. Valid values are WRITE_TRUNCATE, - // WRITE_APPEND and WRITE_EMPTY. - final WriteDisposition writeDisposition; + /** + * Returns a copy of this write transformation, but using the specified create disposition. + * + * <p>Does not modify this object. + */ + public Write withCreateDisposition(CreateDisposition createDisposition) { + return new Write(name, jsonTableRef, tableRefFunction, jsonSchema, + createDisposition, writeDisposition, tableDescription, validate, bigQueryServices); + } - @Nullable final String tableDescription; + /** + * Returns a copy of this write transformation, but using the specified write disposition. + * + * <p>Does not modify this object. + */ + public Write withWriteDisposition(WriteDisposition writeDisposition) { + return new Write(name, jsonTableRef, tableRefFunction, jsonSchema, + createDisposition, writeDisposition, tableDescription, validate, bigQueryServices); + } - // An option to indicate if table validation is desired. Default is true. - final boolean validate; + /** + * Returns a copy of this write transformation, but using the specified table description. + * + * <p>Does not modify this object. + */ + public Write withTableDescription(@Nullable String tableDescription) { + return new Write(name, jsonTableRef, tableRefFunction, jsonSchema, + createDisposition, writeDisposition, tableDescription, validate, bigQueryServices); + } - @Nullable private BigQueryServices bigQueryServices; + /** + * Returns a copy of this write transformation, but without BigQuery table validation. + * + * <p>Does not modify this object. + */ + public Write withoutValidation() { + return new Write(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition, + writeDisposition, tableDescription, false, bigQueryServices); + } - private static class TranslateTableSpecFunction implements - SerializableFunction<BoundedWindow, TableReference> { - private SerializableFunction<BoundedWindow, String> tableSpecFunction; + @VisibleForTesting + Write withTestServices(BigQueryServices testServices) { + return new Write(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition, + writeDisposition, tableDescription, validate, testServices); + } - TranslateTableSpecFunction(SerializableFunction<BoundedWindow, String> tableSpecFunction) { - this.tableSpecFunction = tableSpecFunction; + private static void verifyTableNotExistOrEmpty( + DatasetService datasetService, + TableReference tableRef) { + try { + if (datasetService.getTable(tableRef) != null) { + checkState( + datasetService.isTableEmpty(tableRef), + "BigQuery table is not empty: %s.", + BigQueryIO.toTableSpec(tableRef)); } - - @Override - public TableReference apply(BoundedWindow value) { - return parseTableSpec(tableSpecFunction.apply(value)); + } catch (IOException | InterruptedException e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); } + throw new RuntimeException( + "unable to confirm BigQuery table emptiness for table " + + BigQueryIO.toTableSpec(tableRef), e); } + } - /** - * @deprecated Should be private. Instead, use one of the factory methods in - * {@link BigQueryIO.Write}, such as {@link BigQueryIO.Write#to(String)}, to create an - * instance of this class. - */ - @Deprecated - public Bound() { - this( - null /* name */, - null /* jsonTableRef */, - null /* tableRefFunction */, - null /* jsonSchema */, - CreateDisposition.CREATE_IF_NEEDED, - WriteDisposition.WRITE_EMPTY, - null /* tableDescription */, - true /* validate */, - null /* bigQueryServices */); - } - - private Bound(String name, @Nullable ValueProvider<String> jsonTableRef, - @Nullable SerializableFunction<BoundedWindow, TableReference> tableRefFunction, - @Nullable ValueProvider<String> jsonSchema, - CreateDisposition createDisposition, - WriteDisposition writeDisposition, - @Nullable String tableDescription, - boolean validate, - @Nullable BigQueryServices bigQueryServices) { - super(name); - this.jsonTableRef = jsonTableRef; - this.tableRefFunction = tableRefFunction; - this.jsonSchema = jsonSchema; - this.createDisposition = checkNotNull(createDisposition, "createDisposition"); - this.writeDisposition = checkNotNull(writeDisposition, "writeDisposition"); - this.tableDescription = tableDescription; - this.validate = validate; - this.bigQueryServices = bigQueryServices; - } - - /** - * Returns a copy of this write transformation, but writing to the specified table. Refer to - * {@link #parseTableSpec(String)} for the specification format. - * - * <p>Does not modify this object. - */ - public Bound to(String tableSpec) { - return toTableRef(NestedValueProvider.of( - StaticValueProvider.of(tableSpec), new TableSpecToTableRef())); - } + @Override + public void validate(PCollection<TableRow> input) { + BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class); - /** - * Returns a copy of this write transformation, but writing to the specified table. - * - * <p>Does not modify this object. - */ - public Bound to(TableReference table) { - return to(StaticValueProvider.of(toTableSpec(table))); - } + // Exactly one of the table and table reference can be configured. + checkState( + jsonTableRef != null || tableRefFunction != null, + "must set the table reference of a BigQueryIO.Write transform"); + checkState( + jsonTableRef == null || tableRefFunction == null, + "Cannot set both a table reference and a table function for a BigQueryIO.Write" + + " transform"); - /** - * Returns a copy of this write transformation, but writing to the specified table. Refer to - * {@link #parseTableSpec(String)} for the specification format. - * - * <p>Does not modify this object. - */ - public Bound to(ValueProvider<String> tableSpec) { - return toTableRef(NestedValueProvider.of(tableSpec, new TableSpecToTableRef())); + // Require a schema if creating one or more tables. + checkArgument( + createDisposition != CreateDisposition.CREATE_IF_NEEDED || jsonSchema != null, + "CreateDisposition is CREATE_IF_NEEDED, however no schema was provided."); + + // The user specified a table. + if (jsonTableRef != null && validate) { + TableReference table = getTableWithDefaultProject(options).get(); + + DatasetService datasetService = getBigQueryServices().getDatasetService(options); + // Check for destination table presence and emptiness for early failure notification. + // Note that a presence check can fail when the table or dataset is created by an earlier + // stage of the pipeline. For these cases the #withoutValidation method can be used to + // disable the check. + verifyDatasetPresence(datasetService, table); + if (getCreateDisposition() == BigQueryIO.Write.CreateDisposition.CREATE_NEVER) { + verifyTablePresence(datasetService, table); + } + if (getWriteDisposition() == BigQueryIO.Write.WriteDisposition.WRITE_EMPTY) { + verifyTableNotExistOrEmpty(datasetService, table); + } } - /** - * Returns a copy of this write transformation, but writing to the specified table. - * - * <p>Does not modify this object. - */ - private Bound toTableRef(ValueProvider<TableReference> table) { - return new Bound(name, - NestedValueProvider.of(table, new TableRefToJson()), - tableRefFunction, jsonSchema, createDisposition, - writeDisposition, tableDescription, validate, bigQueryServices); - } + if (input.isBounded() == PCollection.IsBounded.UNBOUNDED || tableRefFunction != null) { + // We will use BigQuery's streaming write API -- validate supported dispositions. + if (tableRefFunction != null) { + checkArgument( + createDisposition != CreateDisposition.CREATE_NEVER, + "CreateDisposition.CREATE_NEVER is not supported when using a tablespec" + + " function."); + } + if (jsonSchema == null) { + checkArgument( + createDisposition == CreateDisposition.CREATE_NEVER, + "CreateDisposition.CREATE_NEVER must be used if jsonSchema is null."); + } - /** - * Returns a copy of this write transformation, but using the specified function to determine - * which table to write to for each window. - * - * <p>Does not modify this object. - * - * <p>{@code tableSpecFunction} should be deterministic. When given the same window, it - * should always return the same table specification. - */ - public Bound to( - SerializableFunction<BoundedWindow, String> tableSpecFunction) { - return toTableReference(new TranslateTableSpecFunction(tableSpecFunction)); + checkArgument( + writeDisposition != WriteDisposition.WRITE_TRUNCATE, + "WriteDisposition.WRITE_TRUNCATE is not supported for an unbounded PCollection or" + + " when using a tablespec function."); + } else { + // We will use a BigQuery load job -- validate the temp location. + String tempLocation = options.getTempLocation(); + checkArgument( + !Strings.isNullOrEmpty(tempLocation), + "BigQueryIO.Write needs a GCS temp location to store temp files."); + if (bigQueryServices == null) { + try { + GcsPath.fromUri(tempLocation); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException( + String.format( + "BigQuery temp location expected a valid 'gs://' path, but was given '%s'", + tempLocation), + e); + } + } } + } - /** - * Returns a copy of this write transformation, but using the specified function to determine - * which table to write to for each window. - * - * <p>Does not modify this object. - * - * <p>{@code tableRefFunction} should be deterministic. When given the same window, it should - * always return the same table reference. - */ - public Bound toTableReference( - SerializableFunction<BoundedWindow, TableReference> tableRefFunction) { - return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition, - writeDisposition, tableDescription, validate, bigQueryServices); + @Override + public PDone expand(PCollection<TableRow> input) { + Pipeline p = input.getPipeline(); + BigQueryOptions options = p.getOptions().as(BigQueryOptions.class); + BigQueryServices bqServices = getBigQueryServices(); + + // When writing an Unbounded PCollection, or when a tablespec function is defined, we use + // StreamWithDeDup and BigQuery's streaming import API. + if (input.isBounded() == IsBounded.UNBOUNDED || tableRefFunction != null) { + return input.apply( + new StreamWithDeDup(getTable(), tableRefFunction, + jsonSchema == null ? null : NestedValueProvider.of( + jsonSchema, new JsonSchemaToTableSchema()), + createDisposition, + tableDescription, + bqServices)); } - /** - * Returns a copy of this write transformation, but using the specified schema for rows - * to be written. - * - * <p>Does not modify this object. - */ - public Bound withSchema(TableSchema schema) { - return new Bound(name, jsonTableRef, tableRefFunction, - StaticValueProvider.of(toJsonString(schema)), - createDisposition, writeDisposition, tableDescription, validate, bigQueryServices); - } + ValueProvider<TableReference> table = getTableWithDefaultProject(options); - /** - * Like {@link #withSchema(TableSchema)}, but with a {@link ValueProvider}. - */ - public Bound withSchema(ValueProvider<TableSchema> schema) { - return new Bound(name, jsonTableRef, tableRefFunction, - NestedValueProvider.of(schema, new TableSchemaToJsonSchema()), - createDisposition, writeDisposition, tableDescription, validate, bigQueryServices); - } + String stepUuid = randomUUIDString(); - /** - * Returns a copy of this write transformation, but using the specified create disposition. - * - * <p>Does not modify this object. - */ - public Bound withCreateDisposition(CreateDisposition createDisposition) { - return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, - createDisposition, writeDisposition, tableDescription, validate, bigQueryServices); + String tempLocation = options.getTempLocation(); + String tempFilePrefix; + try { + IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation); + tempFilePrefix = factory.resolve( + factory.resolve(tempLocation, "BigQueryWriteTemp"), + stepUuid); + } catch (IOException e) { + throw new RuntimeException( + String.format("Failed to resolve BigQuery temp location in %s", tempLocation), + e); } - /** - * Returns a copy of this write transformation, but using the specified write disposition. - * - * <p>Does not modify this object. - */ - public Bound withWriteDisposition(WriteDisposition writeDisposition) { - return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, - createDisposition, writeDisposition, tableDescription, validate, bigQueryServices); - } + // Create a singleton job ID token at execution time. + PCollection<String> singleton = p.apply("Create", Create.of(tempFilePrefix)); + PCollectionView<String> jobIdTokenView = p + .apply("TriggerIdCreation", Create.of("ignored")) + .apply("CreateJobId", MapElements.via( + new SimpleFunction<String, String>() { + @Override + public String apply(String input) { + return randomUUIDString(); + } + })) + .apply(View.<String>asSingleton()); + + PCollection<TableRow> inputInGlobalWindow = + input.apply( + Window.<TableRow>into(new GlobalWindows()) + .triggering(DefaultTrigger.of()) + .discardingFiredPanes()); + + PCollection<KV<String, Long>> results = inputInGlobalWindow + .apply("WriteBundles", + ParDo.of(new WriteBundles(tempFilePrefix))); + + TupleTag<KV<Long, List<String>>> multiPartitionsTag = + new TupleTag<KV<Long, List<String>>>("multiPartitionsTag") {}; + TupleTag<KV<Long, List<String>>> singlePartitionTag = + new TupleTag<KV<Long, List<String>>>("singlePartitionTag") {}; + + PCollectionView<Iterable<KV<String, Long>>> resultsView = results + .apply("ResultsView", View.<KV<String, Long>>asIterable()); + PCollectionTuple partitions = singleton.apply(ParDo + .of(new WritePartition( + resultsView, + multiPartitionsTag, + singlePartitionTag)) + .withSideInputs(resultsView) + .withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag))); + + // Write multiple partitions to separate temporary tables + PCollection<String> tempTables = partitions.get(multiPartitionsTag) + .apply("MultiPartitionsGroupByKey", GroupByKey.<Long, List<String>>create()) + .apply("MultiPartitionsWriteTables", ParDo.of(new WriteTables( + false, + bqServices, + jobIdTokenView, + tempFilePrefix, + NestedValueProvider.of(table, new TableRefToJson()), + jsonSchema, + WriteDisposition.WRITE_EMPTY, + CreateDisposition.CREATE_IF_NEEDED, + tableDescription)) + .withSideInputs(jobIdTokenView)); + + PCollectionView<Iterable<String>> tempTablesView = tempTables + .apply("TempTablesView", View.<String>asIterable()); + singleton.apply(ParDo + .of(new WriteRename( + bqServices, + jobIdTokenView, + NestedValueProvider.of(table, new TableRefToJson()), + writeDisposition, + createDisposition, + tempTablesView, + tableDescription)) + .withSideInputs(tempTablesView, jobIdTokenView)); + + // Write single partition to final table + partitions.get(singlePartitionTag) + .apply("SinglePartitionGroupByKey", GroupByKey.<Long, List<String>>create()) + .apply("SinglePartitionWriteTables", ParDo.of(new WriteTables( + true, + bqServices, + jobIdTokenView, + tempFilePrefix, + NestedValueProvider.of(table, new TableRefToJson()), + jsonSchema, + writeDisposition, + createDisposition, + tableDescription)) + .withSideInputs(jobIdTokenView)); - /** - * Returns a copy of this write transformation, but using the specified table description. - * - * <p>Does not modify this object. - */ - public Bound withTableDescription(@Nullable String tableDescription) { - return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, - createDisposition, writeDisposition, tableDescription, validate, bigQueryServices); - } + return PDone.in(input.getPipeline()); + } - /** - * Returns a copy of this write transformation, but without BigQuery table validation. - * - * <p>Does not modify this object. - */ - public Bound withoutValidation() { - return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition, - writeDisposition, tableDescription, false, bigQueryServices); - } + private static class WriteBundles extends DoFn<TableRow, KV<String, Long>> { + private transient TableRowWriter writer = null; + private final String tempFilePrefix; - @VisibleForTesting - Bound withTestServices(BigQueryServices testServices) { - return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition, - writeDisposition, tableDescription, validate, testServices); + WriteBundles(String tempFilePrefix) { + this.tempFilePrefix = tempFilePrefix; } - private static void verifyTableNotExistOrEmpty( - DatasetService datasetService, - TableReference tableRef) { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + if (writer == null) { + writer = new TableRowWriter(tempFilePrefix); + writer.open(UUID.randomUUID().toString()); + LOG.debug("Done opening writer {}", writer); + } try { - if (datasetService.getTable(tableRef) != null) { - checkState( - datasetService.isTableEmpty(tableRef), - "BigQuery table is not empty: %s.", - BigQueryIO.toTableSpec(tableRef)); - } - } catch (IOException | InterruptedException e) { - if (e instanceof InterruptedException) { - Thread.currentThread().interrupt(); + writer.write(c.element()); + } catch (Exception e) { + // Discard write result and close the write. + try { + writer.close(); + // The writer does not need to be reset, as this DoFn cannot be reused. + } catch (Exception closeException) { + // Do not mask the exception that caused the write to fail. + e.addSuppressed(closeException); } - throw new RuntimeException( - "unable to confirm BigQuery table emptiness for table " - + BigQueryIO.toTableSpec(tableRef), e); + throw e; } } - @Override - public void validate(PCollection<TableRow> input) { - BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class); - - // Exactly one of the table and table reference can be configured. - checkState( - jsonTableRef != null || tableRefFunction != null, - "must set the table reference of a BigQueryIO.Write transform"); - checkState( - jsonTableRef == null || tableRefFunction == null, - "Cannot set both a table reference and a table function for a BigQueryIO.Write" - + " transform"); - - // Require a schema if creating one or more tables. - checkArgument( - createDisposition != CreateDisposition.CREATE_IF_NEEDED || jsonSchema != null, - "CreateDisposition is CREATE_IF_NEEDED, however no schema was provided."); - - // The user specified a table. - if (jsonTableRef != null && validate) { - TableReference table = getTableWithDefaultProject(options).get(); - - DatasetService datasetService = getBigQueryServices().getDatasetService(options); - // Check for destination table presence and emptiness for early failure notification. - // Note that a presence check can fail when the table or dataset is created by an earlier - // stage of the pipeline. For these cases the #withoutValidation method can be used to - // disable the check. - verifyDatasetPresence(datasetService, table); - if (getCreateDisposition() == BigQueryIO.Write.CreateDisposition.CREATE_NEVER) { - verifyTablePresence(datasetService, table); - } - if (getWriteDisposition() == BigQueryIO.Write.WriteDisposition.WRITE_EMPTY) { - verifyTableNotExistOrEmpty(datasetService, table); - } - } - - if (input.isBounded() == PCollection.IsBounded.UNBOUNDED || tableRefFunction != null) { - // We will use BigQuery's streaming write API -- validate supported dispositions. - if (tableRefFunction != null) { - checkArgument( - createDisposition != CreateDisposition.CREATE_NEVER, - "CreateDisposition.CREATE_NEVER is not supported when using a tablespec" - + " function."); - } - if (jsonSchema == null) { - checkArgument( - createDisposition == CreateDisposition.CREATE_NEVER, - "CreateDisposition.CREATE_NEVER must be used if jsonSchema is null."); - } - - checkArgument( - writeDisposition != WriteDisposition.WRITE_TRUNCATE, - "WriteDisposition.WRITE_TRUNCATE is not supported for an unbounded PCollection or" - + " when using a tablespec function."); - } else { - // We will use a BigQuery load job -- validate the temp location. - String tempLocation = options.getTempLocation(); - checkArgument( - !Strings.isNullOrEmpty(tempLocation), - "BigQueryIO.Write needs a GCS temp location to store temp files."); - if (bigQueryServices == null) { - try { - GcsPath.fromUri(tempLocation); - } catch (IllegalArgumentException e) { - throw new IllegalArgumentException( - String.format( - "BigQuery temp location expected a valid 'gs://' path, but was given '%s'", - tempLocation), - e); - } - } + @FinishBundle + public void finishBundle(Context c) throws Exception { + if (writer != null) { + c.output(writer.close()); + writer = null; } } @Override - public PDone expand(PCollection<TableRow> input) { - Pipeline p = input.getPipeline(); - BigQueryOptions options = p.getOptions().as(BigQueryOptions.class); - BigQueryServices bqServices = getBigQueryServices(); - - // When writing an Unbounded PCollection, or when a tablespec function is defined, we use - // StreamWithDeDup and BigQuery's streaming import API. - if (input.isBounded() == IsBounded.UNBOUNDED || tableRefFunction != null) { - return input.apply( - new StreamWithDeDup(getTable(), tableRefFunction, - jsonSchema == null ? null : NestedValueProvider.of( - jsonSchema, new JsonSchemaToTableSchema()), - createDisposition, - tableDescription, - bqServices)); - } - - ValueProvider<TableReference> table = getTableWithDefaultProject(options); - - String stepUuid = randomUUIDString(); - - String tempLocation = options.getTempLocation(); - String tempFilePrefix; - try { - IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation); - tempFilePrefix = factory.resolve( - factory.resolve(tempLocation, "BigQueryWriteTemp"), - stepUuid); - } catch (IOException e) { - throw new RuntimeException( - String.format("Failed to resolve BigQuery temp location in %s", tempLocation), - e); - } - - // Create a singleton job ID token at execution time. - PCollection<String> singleton = p.apply("Create", Create.of(tempFilePrefix)); - PCollectionView<String> jobIdTokenView = p - .apply("TriggerIdCreation", Create.of("ignored")) - .apply("CreateJobId", MapElements.via( - new SimpleFunction<String, String>() { - @Override - public String apply(String input) { - return randomUUIDString(); - } - })) - .apply(View.<String>asSingleton()); - - PCollection<TableRow> inputInGlobalWindow = - input.apply( - Window.<TableRow>into(new GlobalWindows()) - .triggering(DefaultTrigger.of()) - .discardingFiredPanes()); - - PCollection<KV<String, Long>> results = inputInGlobalWindow - .apply("WriteBundles", - ParDo.of(new WriteBundles(tempFilePrefix))); - - TupleTag<KV<Long, List<String>>> multiPartitionsTag = - new TupleTag<KV<Long, List<String>>>("multiPartitionsTag") {}; - TupleTag<KV<Long, List<String>>> singlePartitionTag = - new TupleTag<KV<Long, List<String>>>("singlePartitionTag") {}; - - PCollectionView<Iterable<KV<String, Long>>> resultsView = results - .apply("ResultsView", View.<KV<String, Long>>asIterable()); - PCollectionTuple partitions = singleton.apply(ParDo - .of(new WritePartition( - resultsView, - multiPartitionsTag, - singlePartitionTag)) - .withSideInputs(resultsView) - .withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag))); - - // Write multiple partitions to separate temporary tables - PCollection<String> tempTables = partitions.get(multiPartitionsTag) - .apply("MultiPartitionsGroupByKey", GroupByKey.<Long, List<String>>create()) - .apply("MultiPartitionsWriteTables", ParDo.of(new WriteTables( - false, - bqServices, - jobIdTokenView, - tempFilePrefix, - NestedValueProvider.of(table, new TableRefToJson()), - jsonSchema, - WriteDisposition.WRITE_EMPTY, - CreateDisposition.CREATE_IF_NEEDED, - tableDescription)) - .withSideInputs(jobIdTokenView)); - - PCollectionView<Iterable<String>> tempTablesView = tempTables - .apply("TempTablesView", View.<String>asIterable()); - singleton.apply(ParDo - .of(new WriteRename( - bqServices, - jobIdTokenView, - NestedValueProvider.of(table, new TableRefToJson()), - writeDisposition, - createDisposition, - tempTablesView, - tableDescription)) - .withSideInputs(tempTablesView, jobIdTokenView)); - - // Write single partition to final table - partitions.get(singlePartitionTag) - .apply("SinglePartitionGroupByKey", GroupByKey.<Long, List<String>>create()) - .apply("SinglePartitionWriteTables", ParDo.of(new WriteTables( - true, - bqServices, - jobIdTokenView, - tempFilePrefix, - NestedValueProvider.of(table, new TableRefToJson()), - jsonSchema, - writeDisposition, - createDisposition, - tableDescription)) - .withSideInputs(jobIdTokenView)); + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); - return PDone.in(input.getPipeline()); + builder + .addIfNotNull(DisplayData.item("tempFilePrefix", tempFilePrefix) + .withLabel("Temporary File Prefix")); } + } - private static class WriteBundles extends DoFn<TableRow, KV<String, Long>> { - private transient TableRowWriter writer = null; - private final String tempFilePrefix; - - WriteBundles(String tempFilePrefix) { - this.tempFilePrefix = tempFilePrefix; - } - - @ProcessElement - public void processElement(ProcessContext c) throws Exception { - if (writer == null) { - writer = new TableRowWriter(tempFilePrefix); - writer.open(UUID.randomUUID().toString()); - LOG.debug("Done opening writer {}", writer); - } - try { - writer.write(c.element()); - } catch (Exception e) { - // Discard write result and close the write. - try { - writer.close(); - // The writer does not need to be reset, as this DoFn cannot be reused. - } catch (Exception closeException) { - // Do not mask the exception that caused the write to fail. - e.addSuppressed(closeException); - } - throw e; - } - } - - @FinishBundle - public void finishBundle(Context c) throws Exception { - if (writer != null) { - c.output(writer.close()); - writer = null; - } - } + @Override + protected Coder<Void> getDefaultOutputCoder() { + return VoidCoder.of(); + } - @Override - public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); - builder - .addIfNotNull(DisplayData.item("tempFilePrefix", tempFilePrefix) - .withLabel("Temporary File Prefix")); - } - } + builder + .addIfNotNull(DisplayData.item("table", jsonTableRef) + .withLabel("Table Reference")) + .addIfNotNull(DisplayData.item("schema", jsonSchema) + .withLabel("Table Schema")); - @Override - protected Coder<Void> getDefaultOutputCoder() { - return VoidCoder.of(); + if (tableRefFunction != null) { + builder.add(DisplayData.item("tableFn", tableRefFunction.getClass()) + .withLabel("Table Reference Function")); } - @Override - public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - - builder - .addIfNotNull(DisplayData.item("table", jsonTableRef) - .withLabel("Table Reference")) - .addIfNotNull(DisplayData.item("schema", jsonSchema) - .withLabel("Table Schema")); - - if (tableRefFunction != null) { - builder.add(DisplayData.item("tableFn", tableRefFunction.getClass()) - .withLabel("Table Reference Function")); - } + builder + .add(DisplayData.item("createDisposition", createDisposition.toString()) + .withLabel("Table CreateDisposition")) + .add(DisplayData.item("writeDisposition", writeDisposition.toString()) + .withLabel("Table WriteDisposition")) + .addIfNotDefault(DisplayData.item("validation", validate) + .withLabel("Validation Enabled"), true) + .addIfNotNull(DisplayData.item("tableDescription", tableDescription) + .withLabel("Table Description")); + } - builder - .add(DisplayData.item("createDisposition", createDisposition.toString()) - .withLabel("Table CreateDisposition")) - .add(DisplayData.item("writeDisposition", writeDisposition.toString()) - .withLabel("Table WriteDisposition")) - .addIfNotDefault(DisplayData.item("validation", validate) - .withLabel("Validation Enabled"), true) - .addIfNotNull(DisplayData.item("tableDescription", tableDescription) - .withLabel("Table Description")); - } + /** Returns the create disposition. */ + public CreateDisposition getCreateDisposition() { + return createDisposition; + } - /** Returns the create disposition. */ - public CreateDisposition getCreateDisposition() { - return createDisposition; - } + /** Returns the write disposition. */ + public WriteDisposition getWriteDisposition() { + return writeDisposition; + } - /** Returns the write disposition. */ - public WriteDisposition getWriteDisposition() { - return writeDisposition; - } + /** Returns the table schema. */ + public TableSchema getSchema() { + return fromJsonString( + jsonSchema == null ? null : jsonSchema.get(), TableSchema.class); + } - /** Returns the table schema. */ - public TableSchema getSchema() { - return fromJsonString( - jsonSchema == null ? null : jsonSchema.get(), TableSchema.class); + /** + * Returns the table to write, or {@code null} if writing with {@code tableRefFunction}. + * + * <p>If the table's project is not specified, use the executing project. + */ + @Nullable private ValueProvider<TableReference> getTableWithDefaultProject( + BigQueryOptions bqOptions) { + ValueProvider<TableReference> table = getTable(); + if (table == null) { + return table; } - - /** - * Returns the table to write, or {@code null} if writing with {@code tableRefFunction}. - * - * <p>If the table's project is not specified, use the executing project. - */ - @Nullable private ValueProvider<TableReference> getTableWithDefaultProject( - BigQueryOptions bqOptions) { - ValueProvider<TableReference> table = getTable(); - if (table == null) { - return table; - } - if (!table.isAccessible()) { - LOG.info("Using a dynamic value for table input. This must contain a project" - + " in the table reference: {}", table); - return table; - } - if (Strings.isNullOrEmpty(table.get().getProjectId())) { - // If user does not specify a project we assume the table to be located in - // the default project. - TableReference tableRef = table.get(); - tableRef.setProjectId(bqOptions.getProject()); - return NestedValueProvider.of(StaticValueProvider.of( - toJsonString(tableRef)), new JsonTableRefToTableRef()); - } + if (!table.isAccessible()) { + LOG.info("Using a dynamic value for table input. This must contain a project" + + " in the table reference: {}", table); return table; } - - /** Returns the table reference, or {@code null}. */ - @Nullable - public ValueProvider<TableReference> getTable() { - return jsonTableRef == null ? null : - NestedValueProvider.of(jsonTableRef, new JsonTableRefToTableRef()); + if (Strings.isNullOrEmpty(table.get().getProjectId())) { + // If user does not specify a project we assume the table to be located in + // the default project. + TableReference tableRef = table.get(); + tableRef.setProjectId(bqOptions.getProject()); + return NestedValueProvider.of(StaticValueProvider.of( + toJsonString(tableRef)), new JsonTableRefToTableRef()); } + return table; + } - /** Returns {@code true} if table validation is enabled. */ - public boolean getValidate() { - return validate; - } + /** Returns the table reference, or {@code null}. */ + @Nullable + public ValueProvider<TableReference> getTable() { + return jsonTableRef == null ? null : + NestedValueProvider.of(jsonTableRef, new JsonTableRefToTableRef()); + } - private BigQueryServices getBigQueryServices() { - if (bigQueryServices == null) { - bigQueryServices = new BigQueryServicesImpl(); - } - return bigQueryServices; + /** Returns {@code true} if table validation is enabled. */ + public boolean getValidate() { + return validate; + } + + private BigQueryServices getBigQueryServices() { + if (bigQueryServices == null) { + bigQueryServices = new BigQueryServicesImpl(); } + return bigQueryServices; } static class TableRowWriter { @@ -2231,8 +2180,8 @@ public class BigQueryIO { List<String> currResults = Lists.newArrayList(); for (int i = 0; i < results.size(); ++i) { KV<String, Long> fileResult = results.get(i); - if (currNumFiles + 1 > Bound.MAX_NUM_FILES - || currSizeBytes + fileResult.getValue() > Bound.MAX_SIZE_BYTES) { + if (currNumFiles + 1 > Write.MAX_NUM_FILES + || currSizeBytes + fileResult.getValue() > Write.MAX_SIZE_BYTES) { c.sideOutput(multiPartitionsTag, KV.of(++partitionId, currResults)); currResults = Lists.newArrayList(); currNumFiles = 0; @@ -2331,13 +2280,13 @@ public class BigQueryIO { String projectId = ref.getProjectId(); Job lastFailedLoadJob = null; - for (int i = 0; i < Bound.MAX_RETRY_JOBS; ++i) { + for (int i = 0; i < Write.MAX_RETRY_JOBS; ++i) { String jobId = jobIdPrefix + "-" + i; JobReference jobRef = new JobReference() .setProjectId(projectId) .setJobId(jobId); jobService.startLoadJob(jobRef, loadConfig); - Job loadJob = jobService.pollJob(jobRef, Bound.LOAD_JOB_POLL_MAX_RETRIES); + Job loadJob = jobService.pollJob(jobRef, Write.LOAD_JOB_POLL_MAX_RETRIES); Status jobStatus = parseStatus(loadJob); switch (jobStatus) { case SUCCEEDED: @@ -2361,7 +2310,7 @@ public class BigQueryIO { "Failed to create load job with id prefix %s, " + "reached max retries: %d, last failed load job: %s.", jobIdPrefix, - Bound.MAX_RETRY_JOBS, + Write.MAX_RETRY_JOBS, jobToPrettyString(lastFailedLoadJob))); } @@ -2477,13 +2426,13 @@ public class BigQueryIO { String projectId = ref.getProjectId(); Job lastFailedCopyJob = null; - for (int i = 0; i < Bound.MAX_RETRY_JOBS; ++i) { + for (int i = 0; i < Write.MAX_RETRY_JOBS; ++i) { String jobId = jobIdPrefix + "-" + i; JobReference jobRef = new JobReference() .setProjectId(projectId) .setJobId(jobId); jobService.startCopyJob(jobRef, copyConfig); - Job copyJob = jobService.pollJob(jobRef, Bound.LOAD_JOB_POLL_MAX_RETRIES); + Job copyJob = jobService.pollJob(jobRef, Write.LOAD_JOB_POLL_MAX_RETRIES); Status jobStatus = parseStatus(copyJob); switch (jobStatus) { case SUCCEEDED: @@ -2507,7 +2456,7 @@ public class BigQueryIO { "Failed to create copy job with id prefix %s, " + "reached max retries: %d, last failed copy job: %s.", jobIdPrefix, - Bound.MAX_RETRY_JOBS, + Write.MAX_RETRY_JOBS, jobToPrettyString(lastFailedCopyJob))); } @@ -2536,9 +2485,6 @@ public class BigQueryIO { .withLabel("Create Disposition")); } } - - /** Disallow construction of utility class. */ - private Write() {} } private static String jobToPrettyString(@Nullable Job job) throws IOException { http://git-wip-us.apache.org/repos/asf/beam/blob/7d1f4400/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index bb1528b..f403c5a 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -690,11 +690,11 @@ public class BigQueryIOTest implements Serializable { } private void checkWriteObject( - BigQueryIO.Write.Bound bound, String project, String dataset, String table, + BigQueryIO.Write write, String project, String dataset, String table, TableSchema schema, CreateDisposition createDisposition, WriteDisposition writeDisposition, String tableDescription) { checkWriteObjectWithValidate( - bound, + write, project, dataset, table, @@ -706,17 +706,17 @@ public class BigQueryIOTest implements Serializable { } private void checkWriteObjectWithValidate( - BigQueryIO.Write.Bound bound, String project, String dataset, String table, + BigQueryIO.Write write, String project, String dataset, String table, TableSchema schema, CreateDisposition createDisposition, WriteDisposition writeDisposition, String tableDescription, boolean validate) { - assertEquals(project, bound.getTable().get().getProjectId()); - assertEquals(dataset, bound.getTable().get().getDatasetId()); - assertEquals(table, bound.getTable().get().getTableId()); - assertEquals(schema, bound.getSchema()); - assertEquals(createDisposition, bound.createDisposition); - assertEquals(writeDisposition, bound.writeDisposition); - assertEquals(tableDescription, bound.tableDescription); - assertEquals(validate, bound.validate); + assertEquals(project, write.getTable().get().getProjectId()); + assertEquals(dataset, write.getTable().get().getDatasetId()); + assertEquals(table, write.getTable().get().getTableId()); + assertEquals(schema, write.getSchema()); + assertEquals(createDisposition, write.createDisposition); + assertEquals(writeDisposition, write.writeDisposition); + assertEquals(tableDescription, write.tableDescription); + assertEquals(validate, write.validate); } @Before @@ -1328,10 +1328,10 @@ public class BigQueryIOTest implements Serializable { @Test public void testBuildWrite() { - BigQueryIO.Write.Bound bound = + BigQueryIO.Write write = BigQueryIO.Write.to("foo.com:project:somedataset.sometable"); checkWriteObject( - bound, "foo.com:project", "somedataset", "sometable", + write, "foo.com:project", "somedataset", "sometable", null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null); } @@ -1355,7 +1355,7 @@ public class BigQueryIOTest implements Serializable { options.as(StreamingOptions.class).setStreaming(streaming); DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(options); - BigQueryIO.Write.Bound write = BigQueryIO.Write + BigQueryIO.Write write = BigQueryIO.Write .to("project:dataset.table") .withSchema(new TableSchema().set("col1", "type1").set("col2", "type2")) .withTestServices(new FakeBigQueryServices() @@ -1375,10 +1375,10 @@ public class BigQueryIOTest implements Serializable { public void testBuildWriteWithoutValidation() { // This test just checks that using withoutValidation will not trigger object // construction errors. - BigQueryIO.Write.Bound bound = + BigQueryIO.Write write = BigQueryIO.Write.to("foo.com:project:somedataset.sometable").withoutValidation(); checkWriteObjectWithValidate( - bound, + write, "foo.com:project", "somedataset", "sometable", @@ -1391,9 +1391,9 @@ public class BigQueryIOTest implements Serializable { @Test public void testBuildWriteDefaultProject() { - BigQueryIO.Write.Bound bound = BigQueryIO.Write.to("somedataset.sometable"); + BigQueryIO.Write write = BigQueryIO.Write.to("somedataset.sometable"); checkWriteObject( - bound, null, "somedataset", "sometable", + write, null, "somedataset", "sometable", null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null); } @@ -1403,89 +1403,80 @@ public class BigQueryIOTest implements Serializable { .setProjectId("foo.com:project") .setDatasetId("somedataset") .setTableId("sometable"); - BigQueryIO.Write.Bound bound = BigQueryIO.Write.to(table); + BigQueryIO.Write write = BigQueryIO.Write.to(table); checkWriteObject( - bound, "foo.com:project", "somedataset", "sometable", + write, "foo.com:project", "somedataset", "sometable", null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null); } @Test - @Category(NeedsRunner.class) - public void testBuildWriteWithoutTable() { - - thrown.expect(IllegalStateException.class); - thrown.expectMessage("must set the table reference"); - p.apply(Create.empty(TableRowJsonCoder.of())).apply(BigQueryIO.Write.withoutValidation()); - } - - @Test public void testBuildWriteWithSchema() { TableSchema schema = new TableSchema(); - BigQueryIO.Write.Bound bound = + BigQueryIO.Write write = BigQueryIO.Write.to("foo.com:project:somedataset.sometable").withSchema(schema); checkWriteObject( - bound, "foo.com:project", "somedataset", "sometable", + write, "foo.com:project", "somedataset", "sometable", schema, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null); } @Test public void testBuildWriteWithCreateDispositionNever() { - BigQueryIO.Write.Bound bound = BigQueryIO.Write + BigQueryIO.Write write = BigQueryIO.Write .to("foo.com:project:somedataset.sometable") .withCreateDisposition(CreateDisposition.CREATE_NEVER); checkWriteObject( - bound, "foo.com:project", "somedataset", "sometable", + write, "foo.com:project", "somedataset", "sometable", null, CreateDisposition.CREATE_NEVER, WriteDisposition.WRITE_EMPTY, null); } @Test public void testBuildWriteWithCreateDispositionIfNeeded() { - BigQueryIO.Write.Bound bound = BigQueryIO.Write + BigQueryIO.Write write = BigQueryIO.Write .to("foo.com:project:somedataset.sometable") .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED); checkWriteObject( - bound, "foo.com:project", "somedataset", "sometable", + write, "foo.com:project", "somedataset", "sometable", null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null); } @Test public void testBuildWriteWithWriteDispositionTruncate() { - BigQueryIO.Write.Bound bound = BigQueryIO.Write + BigQueryIO.Write write = BigQueryIO.Write .to("foo.com:project:somedataset.sometable") .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE); checkWriteObject( - bound, "foo.com:project", "somedataset", "sometable", + write, "foo.com:project", "somedataset", "sometable", null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_TRUNCATE, null); } @Test public void testBuildWriteWithWriteDispositionAppend() { - BigQueryIO.Write.Bound bound = BigQueryIO.Write + BigQueryIO.Write write = BigQueryIO.Write .to("foo.com:project:somedataset.sometable") .withWriteDisposition(WriteDisposition.WRITE_APPEND); checkWriteObject( - bound, "foo.com:project", "somedataset", "sometable", + write, "foo.com:project", "somedataset", "sometable", null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_APPEND, null); } @Test public void testBuildWriteWithWriteDispositionEmpty() { - BigQueryIO.Write.Bound bound = BigQueryIO.Write + BigQueryIO.Write write = BigQueryIO.Write .to("foo.com:project:somedataset.sometable") .withWriteDisposition(WriteDisposition.WRITE_EMPTY); checkWriteObject( - bound, "foo.com:project", "somedataset", "sometable", + write, "foo.com:project", "somedataset", "sometable", null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null); } @Test public void testBuildWriteWithWriteWithTableDescription() { final String tblDescription = "foo bar table"; - BigQueryIO.Write.Bound bound = BigQueryIO.Write + BigQueryIO.Write write = BigQueryIO.Write .to("foo.com:project:somedataset.sometable") .withTableDescription(tblDescription); checkWriteObject( - bound, + write, "foo.com:project", "somedataset", "sometable", @@ -1501,7 +1492,7 @@ public class BigQueryIOTest implements Serializable { TableSchema schema = new TableSchema().set("col1", "type1").set("col2", "type2"); final String tblDescription = "foo bar table"; - BigQueryIO.Write.Bound write = BigQueryIO.Write + BigQueryIO.Write write = BigQueryIO.Write .to(tableSpec) .withSchema(schema) .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) @@ -1702,35 +1693,6 @@ public class BigQueryIOTest implements Serializable { } @Test - public void testWriteValidateFailsTableAndTableSpec() { - p.enableAbandonedNodeEnforcement(false); - - thrown.expect(IllegalStateException.class); - thrown.expectMessage("Cannot set both a table reference and a table function"); - p - .apply(Create.empty(TableRowJsonCoder.of())) - .apply(BigQueryIO.Write - .to("dataset.table") - .to(new SerializableFunction<BoundedWindow, String>() { - @Override - public String apply(BoundedWindow input) { - return null; - } - })); - } - - @Test - public void testWriteValidateFailsNoTableAndNoTableSpec() { - p.enableAbandonedNodeEnforcement(false); - - thrown.expect(IllegalStateException.class); - thrown.expectMessage("must set the table reference of a BigQueryIO.Write transform"); - p - .apply(Create.empty(TableRowJsonCoder.of())) - .apply("name", BigQueryIO.Write.withoutValidation()); - } - - @Test public void testBigQueryTableSourceThroughJsonAPI() throws Exception { FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() .withJobService(mockJobService) @@ -2094,7 +2056,7 @@ public class BigQueryIOTest implements Serializable { @Test public void testWritePartitionSinglePartition() throws Exception { - long numFiles = BigQueryIO.Write.Bound.MAX_NUM_FILES; + long numFiles = BigQueryIO.Write.MAX_NUM_FILES; long fileSize = 1; // One partition is needed. @@ -2104,7 +2066,7 @@ public class BigQueryIOTest implements Serializable { @Test public void testWritePartitionManyFiles() throws Exception { - long numFiles = BigQueryIO.Write.Bound.MAX_NUM_FILES * 3; + long numFiles = BigQueryIO.Write.MAX_NUM_FILES * 3; long fileSize = 1; // One partition is needed for each group of BigQueryWrite.MAX_NUM_FILES files. @@ -2115,7 +2077,7 @@ public class BigQueryIOTest implements Serializable { @Test public void testWritePartitionLargeFileSize() throws Exception { long numFiles = 10; - long fileSize = BigQueryIO.Write.Bound.MAX_SIZE_BYTES / 3; + long fileSize = BigQueryIO.Write.MAX_SIZE_BYTES / 3; // One partition is needed for each group of three files. long expectedNumPartitions = 4; @@ -2382,7 +2344,7 @@ public class BigQueryIOTest implements Serializable { BigQueryOptions bqOptions = options.as(BigQueryOptions.class); bqOptions.setTempLocation("gs://testbucket/testdir"); Pipeline pipeline = TestPipeline.create(options); - BigQueryIO.Write.Bound write = BigQueryIO.Write + BigQueryIO.Write write = BigQueryIO.Write .to(options.getOutputTable()) .withSchema(NestedValueProvider.of( options.getOutputSchema(), new JsonSchemaToTableSchema()))
