BigQueryIO: Remove tempLocation usage at pipeline construction time
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d0a2249f Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d0a2249f Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d0a2249f Branch: refs/heads/DSL_SQL Commit: d0a2249f17446156d9ce35b7d0b559b51e62b0b8 Parents: 1bc50d6 Author: Vikas Kedigehalli <vika...@google.com> Authored: Wed May 3 15:42:50 2017 -0700 Committer: Kenneth Knowles <k...@google.com> Committed: Wed May 3 20:50:53 2017 -0700 ---------------------------------------------------------------------- .../beam/sdk/io/gcp/bigquery/BatchLoads.java | 42 +++++++-------- .../sdk/io/gcp/bigquery/BigQueryHelpers.java | 15 ++++++ .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 18 +++---- .../io/gcp/bigquery/BigQueryQuerySource.java | 5 +- .../sdk/io/gcp/bigquery/BigQuerySourceBase.java | 14 +++-- .../io/gcp/bigquery/BigQueryTableSource.java | 6 +-- .../io/gcp/bigquery/WriteBundlesToFiles.java | 19 +++---- .../beam/sdk/io/gcp/bigquery/WriteTables.java | 19 +++---- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 57 ++++---------------- 9 files changed, 75 insertions(+), 120 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/d0a2249f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java index 4e14696..78d39b5 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java @@ -19,11 +19,11 @@ package org.apache.beam.sdk.io.gcp.bigquery; import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation; import com.google.api.services.bigquery.model.TableRow; import com.google.common.base.Strings; import com.google.common.collect.Lists; -import java.io.IOException; import java.util.List; import java.util.Map; import org.apache.beam.sdk.Pipeline; @@ -36,6 +36,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -44,8 +45,6 @@ import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.util.IOChannelFactory; -import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.util.Reshuffle; import org.apache.beam.sdk.util.gcsfs.GcsPath; import org.apache.beam.sdk.values.KV; @@ -106,26 +105,23 @@ class BatchLoads<DestinationT> @Override public WriteResult expand(PCollection<KV<DestinationT, TableRow>> input) { Pipeline p = input.getPipeline(); - BigQueryOptions options = p.getOptions().as(BigQueryOptions.class); - - validate(p.getOptions()); - final String stepUuid = BigQueryHelpers.randomUUIDString(); - String tempLocation = options.getTempLocation(); - String tempFilePrefix; - try { - IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation); - tempFilePrefix = - factory.resolve(factory.resolve(tempLocation, "BigQueryWriteTemp"), stepUuid); - } catch (IOException e) { - throw new RuntimeException( - String.format("Failed to resolve BigQuery temp location in %s", tempLocation), e); - } - // Create a singleton job ID token at execution time. This will be used as the base for all - // load jobs issued from this instance of the transfomr. - PCollection<String> singleton = p.apply("Create", Create.of(tempFilePrefix)); + // load jobs issued from this instance of the transform. + PCollection<String> singleton = p + .apply("Create", Create.of((Void) null)) + .apply("GetTempFilePrefix", ParDo.of(new DoFn<Void, String>() { + @ProcessElement + public void getTempFilePrefix(ProcessContext c) { + c.output( + resolveTempLocation( + c.getPipelineOptions().getTempLocation(), + "BigQueryWriteTemp", + stepUuid)); + } + })); + PCollectionView<String> jobIdTokenView = p.apply("TriggerIdCreation", Create.of("ignored")) .apply( @@ -152,7 +148,7 @@ class BatchLoads<DestinationT> PCollection<WriteBundlesToFiles.Result<DestinationT>> results = inputInGlobalWindow .apply("WriteBundlesToFiles", ParDo.of( - new WriteBundlesToFiles<DestinationT>(tempFilePrefix))) + new WriteBundlesToFiles<DestinationT>(stepUuid))) .setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder)); TupleTag<KV<ShardedKey<DestinationT>, List<String>>> multiPartitionsTag = @@ -209,7 +205,7 @@ class BatchLoads<DestinationT> bigQueryServices, jobIdTokenView, schemasView, - tempFilePrefix, + stepUuid, WriteDisposition.WRITE_EMPTY, CreateDisposition.CREATE_IF_NEEDED, dynamicDestinations)) @@ -247,7 +243,7 @@ class BatchLoads<DestinationT> bigQueryServices, jobIdTokenView, schemasView, - tempFilePrefix, + stepUuid, writeDisposition, createDisposition, dynamicDestinations)) http://git-wip-us.apache.org/repos/asf/beam/blob/d0a2249f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java index 70e7a5f..6b4e518 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java @@ -40,6 +40,8 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.util.IOChannelFactory; +import org.apache.beam.sdk.util.IOChannelUtils; /** A set of helper functions and classes used by {@link BigQueryIO}. */ public class BigQueryHelpers { @@ -304,4 +306,17 @@ public class BigQueryHelpers { .setTableId(queryTempTableId); return queryTempTableRef; } + + static String resolveTempLocation( + String tempLocationDir, String bigQueryOperationName, String stepUuid) { + try { + IOChannelFactory factory = IOChannelUtils.getFactory(tempLocationDir); + return factory.resolve( + factory.resolve(tempLocationDir, bigQueryOperationName), stepUuid); + } catch (IOException e) { + throw new RuntimeException( + String.format("Failed to resolve temp destination directory in %s", + tempLocationDir), e); + } + } } http://git-wip-us.apache.org/repos/asf/beam/blob/d0a2249f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 29491d8..c76ee86 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.createJobIdToken; import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.getExtractJobId; +import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation; import com.google.api.client.json.JsonFactory; import com.google.api.services.bigquery.model.Job; @@ -482,17 +483,7 @@ public class BigQueryIO { @Override public PCollection<TableRow> expand(PBegin input) { final String stepUuid = BigQueryHelpers.randomUUIDString(); - BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class); BoundedSource<TableRow> source; - final String extractDestinationDir; - String tempLocation = bqOptions.getTempLocation(); - try { - IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation); - extractDestinationDir = factory.resolve(tempLocation, stepUuid); - } catch (IOException e) { - throw new RuntimeException( - String.format("Failed to resolve extract destination directory in %s", tempLocation)); - } if (getQuery() != null && (!getQuery().isAccessible() || !Strings.isNullOrEmpty(getQuery().get()))) { @@ -502,14 +493,12 @@ public class BigQueryIO { getQuery(), getFlattenResults(), getUseLegacySql(), - extractDestinationDir, getBigQueryServices()); } else { source = BigQueryTableSource.create( stepUuid, getTableProvider(), - extractDestinationDir, getBigQueryServices()); } PassThroughThenCleanup.CleanupOperation cleanupOperation = @@ -517,6 +506,11 @@ public class BigQueryIO { @Override void cleanup(PipelineOptions options) throws Exception { BigQueryOptions bqOptions = options.as(BigQueryOptions.class); + final String extractDestinationDir = + resolveTempLocation( + bqOptions.getTempLocation(), + "BigQueryExtractTemp", + stepUuid); JobReference jobRef = new JobReference() http://git-wip-us.apache.org/repos/asf/beam/blob/d0a2249f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java index 205f9cc..710c934 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java @@ -53,14 +53,12 @@ class BigQueryQuerySource extends BigQuerySourceBase { ValueProvider<String> query, Boolean flattenResults, Boolean useLegacySql, - String extractDestinationDir, BigQueryServices bqServices) { return new BigQueryQuerySource( stepUuid, query, flattenResults, useLegacySql, - extractDestinationDir, bqServices); } @@ -74,9 +72,8 @@ class BigQueryQuerySource extends BigQuerySourceBase { ValueProvider<String> query, Boolean flattenResults, Boolean useLegacySql, - String extractDestinationDir, BigQueryServices bqServices) { - super(stepUuid, extractDestinationDir, bqServices); + super(stepUuid, bqServices); this.query = checkNotNull(query, "query"); this.flattenResults = checkNotNull(flattenResults, "flattenResults"); this.useLegacySql = checkNotNull(useLegacySql, "useLegacySql"); http://git-wip-us.apache.org/repos/asf/beam/blob/d0a2249f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java index 0171046..41e298c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java @@ -21,6 +21,7 @@ package org.apache.beam.sdk.io.gcp.bigquery; import static com.google.common.base.Preconditions.checkNotNull; import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.createJobIdToken; import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.getExtractJobId; +import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation; import com.google.api.services.bigquery.model.Job; import com.google.api.services.bigquery.model.JobConfigurationExtract; @@ -64,14 +65,12 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> { protected static final int JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE; protected final String stepUuid; - protected final String extractDestinationDir; protected final BigQueryServices bqServices; private transient List<BoundedSource<TableRow>> cachedSplitResult; - BigQuerySourceBase(String stepUuid, String extractDestinationDir, BigQueryServices bqServices) { + BigQuerySourceBase(String stepUuid, BigQueryServices bqServices) { this.stepUuid = checkNotNull(stepUuid, "stepUuid"); - this.extractDestinationDir = checkNotNull(extractDestinationDir, "extractDestinationDir"); this.bqServices = checkNotNull(bqServices, "bqServices"); } @@ -86,9 +85,13 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> { BigQueryOptions bqOptions = options.as(BigQueryOptions.class); TableReference tableToExtract = getTableToExtract(bqOptions); JobService jobService = bqServices.getJobService(bqOptions); + + final String extractDestinationDir = + resolveTempLocation(bqOptions.getTempLocation(), "BigQueryExtractTemp", stepUuid); + String extractJobId = getExtractJobId(createJobIdToken(options.getJobName(), stepUuid)); List<String> tempFiles = executeExtract( - extractJobId, tableToExtract, jobService, bqOptions.getProject()); + extractJobId, tableToExtract, jobService, bqOptions.getProject(), extractDestinationDir); TableSchema tableSchema = bqServices.getDatasetService(bqOptions) .getTable(tableToExtract).getSchema(); @@ -114,7 +117,8 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> { } private List<String> executeExtract( - String jobId, TableReference table, JobService jobService, String executingProject) + String jobId, TableReference table, JobService jobService, String executingProject, + String extractDestinationDir) throws InterruptedException, IOException { JobReference jobRef = new JobReference() .setProjectId(executingProject) http://git-wip-us.apache.org/repos/asf/beam/blob/d0a2249f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java index e754bd2..1d45641 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java @@ -45,9 +45,8 @@ class BigQueryTableSource extends BigQuerySourceBase { static BigQueryTableSource create( String stepUuid, ValueProvider<TableReference> table, - String extractDestinationDir, BigQueryServices bqServices) { - return new BigQueryTableSource(stepUuid, table, extractDestinationDir, bqServices); + return new BigQueryTableSource(stepUuid, table, bqServices); } private final ValueProvider<String> jsonTable; @@ -56,9 +55,8 @@ class BigQueryTableSource extends BigQuerySourceBase { private BigQueryTableSource( String stepUuid, ValueProvider<TableReference> table, - String extractDestinationDir, BigQueryServices bqServices) { - super(stepUuid, extractDestinationDir, bqServices); + super(stepUuid, bqServices); this.jsonTable = NestedValueProvider.of(checkNotNull(table, "table"), new TableRefToJson()); this.tableSizeBytes = new AtomicReference<>(); } http://git-wip-us.apache.org/repos/asf/beam/blob/d0a2249f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java index 4f609b2..e90b974 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java @@ -18,6 +18,8 @@ package org.apache.beam.sdk.io.gcp.bigquery; +import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation; + import com.google.api.services.bigquery.model.TableRow; import com.google.common.collect.Maps; import java.io.IOException; @@ -32,7 +34,6 @@ import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.KV; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,7 +48,7 @@ class WriteBundlesToFiles<DestinationT> // Map from tablespec to a writer for that table. private transient Map<DestinationT, TableRowWriter> writers; - private final String tempFilePrefix; + private final String stepUuid; /** * The result of the {@link WriteBundlesToFiles} transform. Corresponds to a single output file, @@ -104,8 +105,8 @@ class WriteBundlesToFiles<DestinationT> public void verifyDeterministic() {} } - WriteBundlesToFiles(String tempFilePrefix) { - this.tempFilePrefix = tempFilePrefix; + WriteBundlesToFiles(String stepUuid) { + this.stepUuid = stepUuid; } @StartBundle @@ -117,6 +118,8 @@ class WriteBundlesToFiles<DestinationT> @ProcessElement public void processElement(ProcessContext c) throws Exception { + String tempFilePrefix = resolveTempLocation( + c.getPipelineOptions().getTempLocation(), "BigQueryWriteTemp", stepUuid); TableRowWriter writer = writers.get(c.element().getKey()); if (writer == null) { writer = new TableRowWriter(tempFilePrefix); @@ -147,12 +150,4 @@ class WriteBundlesToFiles<DestinationT> } writers.clear(); } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - - builder.addIfNotNull( - DisplayData.item("tempFilePrefix", tempFilePrefix).withLabel("Temporary File Prefix")); - } } http://git-wip-us.apache.org/repos/asf/beam/blob/d0a2249f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java index b299244..c480b42 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java @@ -18,6 +18,8 @@ package org.apache.beam.sdk.io.gcp.bigquery; +import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation; + import com.google.api.services.bigquery.model.Job; import com.google.api.services.bigquery.model.JobConfigurationLoad; import com.google.api.services.bigquery.model.JobReference; @@ -41,7 +43,6 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.FileIOChannelFactory; import org.apache.beam.sdk.util.GcsIOChannelFactory; import org.apache.beam.sdk.util.GcsUtil; @@ -73,7 +74,7 @@ class WriteTables<DestinationT> private final BigQueryServices bqServices; private final PCollectionView<String> jobIdToken; private final PCollectionView<Map<DestinationT, String>> schemasView; - private final String tempFilePrefix; + private final String stepUuid; private final WriteDisposition writeDisposition; private final CreateDisposition createDisposition; private final DynamicDestinations<?, DestinationT> dynamicDestinations; @@ -83,7 +84,7 @@ class WriteTables<DestinationT> BigQueryServices bqServices, PCollectionView<String> jobIdToken, PCollectionView<Map<DestinationT, String>> schemasView, - String tempFilePrefix, + String stepUuid, WriteDisposition writeDisposition, CreateDisposition createDisposition, DynamicDestinations<?, DestinationT> dynamicDestinations) { @@ -91,7 +92,7 @@ class WriteTables<DestinationT> this.bqServices = bqServices; this.jobIdToken = jobIdToken; this.schemasView = schemasView; - this.tempFilePrefix = tempFilePrefix; + this.stepUuid = stepUuid; this.writeDisposition = writeDisposition; this.createDisposition = createDisposition; this.dynamicDestinations = dynamicDestinations; @@ -113,6 +114,8 @@ class WriteTables<DestinationT> tableReference, tableDestination.getTableDescription()); } + String tempFilePrefix = resolveTempLocation( + c.getPipelineOptions().getTempLocation(), "BigQueryWriteTemp", stepUuid); Integer partition = c.element().getKey().getShardNumber(); List<String> partitionFiles = Lists.newArrayList(c.element().getValue()); String jobIdPrefix = @@ -213,12 +216,4 @@ class WriteTables<DestinationT> throw new IOException("Unrecognized file system."); } } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - - builder.addIfNotNull( - DisplayData.item("tempFilePrefix", tempFilePrefix).withLabel("Temporary File Prefix")); - } } http://git-wip-us.apache.org/repos/asf/beam/blob/d0a2249f/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index e267dab..026afce 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -91,7 +91,6 @@ import org.apache.beam.sdk.io.gcp.bigquery.PassThroughThenCleanup.CleanupOperati import org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.Result; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; @@ -136,7 +135,6 @@ import org.joda.time.Instant; import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -946,7 +944,6 @@ public class BigQueryIOTest implements Serializable { } @Test - @Ignore("[BEAM-436] DirectRunner tempLocation configuration insufficient") public void testTableSourcePrimitiveDisplayData() throws IOException, InterruptedException { DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); BigQueryIO.Read read = BigQueryIO.read() @@ -962,7 +959,6 @@ public class BigQueryIOTest implements Serializable { } @Test - @Ignore("[BEAM-436] DirectRunner tempLocation configuration insufficient") public void testQuerySourcePrimitiveDisplayData() throws IOException, InterruptedException { DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); BigQueryIO.Read read = BigQueryIO.read() @@ -988,40 +984,6 @@ public class BigQueryIOTest implements Serializable { } @Test - @Ignore("[BEAM-436] DirectRunner tempLocation configuration insufficient") - public void testBatchWritePrimitiveDisplayData() throws IOException, InterruptedException { - testWritePrimitiveDisplayData(/* streaming: */ false); - } - - @Test - @Ignore("[BEAM-436] DirectRunner tempLocation configuration insufficient") - public void testStreamingWritePrimitiveDisplayData() throws IOException, InterruptedException { - testWritePrimitiveDisplayData(/* streaming: */ true); - } - - private void testWritePrimitiveDisplayData(boolean streaming) throws IOException, - InterruptedException { - PipelineOptions options = TestPipeline.testingPipelineOptions(); - options.as(StreamingOptions.class).setStreaming(streaming); - DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(options); - - BigQueryIO.Write write = BigQueryIO.writeTableRows() - .to("project:dataset.table") - .withSchema(new TableSchema().set("col1", "type1").set("col2", "type2")) - .withTestServices(new FakeBigQueryServices() - .withDatasetService(new FakeDatasetService()) - .withJobService(new FakeJobService())) - .withoutValidation(); - - Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write); - assertThat("BigQueryIO.Write should include the table spec in its primitive display data", - displayData, hasItem(hasDisplayItem("tableSpec"))); - - assertThat("BigQueryIO.Write should include the table schema in its primitive display data", - displayData, hasItem(hasDisplayItem("schema"))); - } - - @Test public void testBuildWriteWithoutValidation() { // This test just checks that using withoutValidation will not trigger object // construction errors. @@ -1360,9 +1322,10 @@ public class BigQueryIOTest implements Serializable { Path baseDir = Files.createTempDirectory(tempFolder, "testBigQueryTableSourceThroughJsonAPI"); String stepUuid = "testStepUuid"; BoundedSource<TableRow> bqSource = BigQueryTableSource.create( - stepUuid, StaticValueProvider.of(table), baseDir.toString(), fakeBqServices); + stepUuid, StaticValueProvider.of(table), fakeBqServices); PipelineOptions options = PipelineOptionsFactory.create(); + options.setTempLocation(baseDir.toString()); Assert.assertThat( SourceTestUtils.readFromSource(bqSource, options), CoreMatchers.is(expected)); @@ -1399,9 +1362,8 @@ public class BigQueryIOTest implements Serializable { Path baseDir = Files.createTempDirectory(tempFolder, "testBigQueryTableSourceInitSplit"); String stepUuid = "testStepUuid"; - String extractDestinationDir = baseDir.toString(); BoundedSource<TableRow> bqSource = BigQueryTableSource.create( - stepUuid, StaticValueProvider.of(table), extractDestinationDir, fakeBqServices); + stepUuid, StaticValueProvider.of(table), fakeBqServices); PipelineOptions options = PipelineOptionsFactory.create(); options.setTempLocation(baseDir.toString()); @@ -1479,12 +1441,10 @@ public class BigQueryIOTest implements Serializable { String query = FakeBigQueryServices.encodeQuery(expected); - String extractDestinationDir = baseDir.toString(); BoundedSource<TableRow> bqSource = BigQueryQuerySource.create( stepUuid, StaticValueProvider.of(query), - true /* flattenResults */, true /* useLegacySql */, - extractDestinationDir, fakeBqServices); - options.setTempLocation(extractDestinationDir); + true /* flattenResults */, true /* useLegacySql */, fakeBqServices); + options.setTempLocation(baseDir.toString()); TableReference queryTable = new TableReference() .setProjectId(bqOptions.getProject()) @@ -1571,7 +1531,7 @@ public class BigQueryIOTest implements Serializable { BoundedSource<TableRow> bqSource = BigQueryQuerySource.create( stepUuid, StaticValueProvider.of(query), - true /* flattenResults */, true /* useLegacySql */, baseDir.toString(), fakeBqServices); + true /* flattenResults */, true /* useLegacySql */, fakeBqServices); options.setTempLocation(baseDir.toString()); @@ -1845,7 +1805,7 @@ public class BigQueryIOTest implements Serializable { long numPartitions = 3; long numFilesPerPartition = 10; String jobIdToken = "jobIdToken"; - String tempFilePrefix = "tempFilePrefix"; + String stepUuid = "stepUuid"; Map<TableDestination, List<String>> expectedTempTables = Maps.newHashMap(); Path baseDir = Files.createTempDirectory(tempFolder, "testWriteTables"); @@ -1898,7 +1858,7 @@ public class BigQueryIOTest implements Serializable { fakeBqServices, jobIdTokenView, schemaMapView, - tempFilePrefix, + stepUuid, WriteDisposition.WRITE_EMPTY, CreateDisposition.CREATE_IF_NEEDED, new IdentityDynamicTables()); @@ -1907,6 +1867,7 @@ public class BigQueryIOTest implements Serializable { KV<TableDestination, String>> tester = DoFnTester.of(writeTables); tester.setSideInput(jobIdTokenView, GlobalWindow.INSTANCE, jobIdToken); tester.setSideInput(schemaMapView, GlobalWindow.INSTANCE, ImmutableMap.<String, String>of()); + tester.getPipelineOptions().setTempLocation("tempLocation"); for (KV<ShardedKey<String>, List<String>> partition : partitions) { tester.processElement(partition); }