Repository: incubator-beam Updated Branches: refs/heads/master a7689466d -> ee1a3bcfb
[BEAM-430] Add GcpTempLocation, and remove defaulting tempLocation to stagingLocation in DataflowRunner Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3987cc1d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3987cc1d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3987cc1d Branch: refs/heads/master Commit: 3987cc1dac899e4465d4507ffaed623cb6ef300a Parents: a768946 Author: Pei He <pe...@google.com> Authored: Thu Jul 7 16:04:18 2016 -0700 Committer: Dan Halperin <dhalp...@google.com> Committed: Tue Jul 12 22:49:09 2016 -0700 ---------------------------------------------------------------------- .../beam/runners/dataflow/DataflowRunner.java | 46 +++++---------- .../options/DataflowPipelineOptions.java | 46 +++++++++++++-- .../dataflow/testing/TestDataflowRunner.java | 12 ++-- .../DataflowPipelineTranslatorTest.java | 1 + .../runners/dataflow/DataflowRunnerTest.java | 49 +++++++++++----- .../options/DataflowPipelineOptionsTest.java | 62 ++++++++++++++++++++ .../transforms/DataflowGroupByKeyTest.java | 2 +- .../dataflow/transforms/DataflowViewTest.java | 4 +- .../org/apache/beam/sdk/options/GcpOptions.java | 32 ++++++++++ .../apache/beam/sdk/options/GcpOptionsTest.java | 23 ++++++++ 10 files changed, 220 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3987cc1d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 2ba6c7b..1f2fdca 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -181,16 +181,15 @@ import java.util.TreeSet; import javax.annotation.Nullable; /** - * A {@link PipelineRunner} that executes the operations in the - * pipeline by first translating them to the Dataflow representation - * using the {@link DataflowPipelineTranslator} and then submitting + * A {@link PipelineRunner} that executes the operations in the pipeline by first translating them + * to the Dataflow representation using the {@link DataflowPipelineTranslator} and then submitting * them to a Dataflow service for execution. * * <p><h3>Permissions</h3> - * When reading from a Dataflow source or writing to a Dataflow sink using - * {@code DataflowRunner}, the Google cloudservices account and the Google compute engine - * service account of the GCP project running the Dataflow Job will need access to the corresponding - * source/sink. + * + * When reading from a Dataflow source or writing to a Dataflow sink using {@code DataflowRunner}, + * the Google cloudservices account and the Google compute engine service account of the GCP project + * running the Dataflow Job will need access to the corresponding source/sink. * * <p>Please see <a href="https://cloud.google.com/dataflow/security-and-permissions">Google Cloud * Dataflow Security and Permissions</a> for more details. @@ -259,27 +258,14 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { } PathValidator validator = dataflowOptions.getPathValidator(); - checkArgument(!(isNullOrEmpty(dataflowOptions.getTempLocation()) - && isNullOrEmpty(dataflowOptions.getStagingLocation())), - "Missing required value: at least one of tempLocation or stagingLocation must be set."); - - if (dataflowOptions.getStagingLocation() != null) { - validator.validateOutputFilePrefixSupported(dataflowOptions.getStagingLocation()); - } - if (dataflowOptions.getTempLocation() != null) { - validator.validateOutputFilePrefixSupported(dataflowOptions.getTempLocation()); - } - if (isNullOrEmpty(dataflowOptions.getTempLocation())) { - dataflowOptions.setTempLocation(dataflowOptions.getStagingLocation()); - } else if (isNullOrEmpty(dataflowOptions.getStagingLocation())) { - try { - dataflowOptions.setStagingLocation( - IOChannelUtils.resolve(dataflowOptions.getTempLocation(), "staging")); - } catch (IOException e) { - throw new IllegalArgumentException("Unable to resolve PipelineOptions.stagingLocation " - + "from PipelineOptions.tempLocation. Please set the staging location explicitly.", e); - } - } + checkArgument( + !isNullOrEmpty(dataflowOptions.getGcpTempLocation()), + "DataflowRunner requires gcpTempLocation, and it is missing in PipelineOptions."); + validator.validateOutputFilePrefixSupported(dataflowOptions.getGcpTempLocation()); + checkArgument( + !isNullOrEmpty(dataflowOptions.getStagingLocation()), + "DataflowRunner requires stagingLocation, and it is missing in PipelineOptions."); + validator.validateOutputFilePrefixSupported(dataflowOptions.getStagingLocation()); if (dataflowOptions.getFilesToStage() == null) { dataflowOptions.setFilesToStage(detectClassPathResourcesToStage( @@ -538,9 +524,9 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { newJob.getEnvironment().setUserAgent(ReleaseInfo.getReleaseInfo()); // The Dataflow Service may write to the temporary directory directly, so // must be verified. - if (!isNullOrEmpty(options.getTempLocation())) { + if (!isNullOrEmpty(options.getGcpTempLocation())) { newJob.getEnvironment().setTempStoragePrefix( - dataflowOptions.getPathValidator().verifyPath(options.getTempLocation())); + dataflowOptions.getPathValidator().verifyPath(options.getGcpTempLocation())); } newJob.getEnvironment().setDataset(options.getTempDatasetId()); newJob.getEnvironment().setExperiments(options.getExperiments()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3987cc1d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java index f665a08..b69b6f9 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java @@ -17,6 +17,9 @@ */ package org.apache.beam.runners.dataflow.options; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Strings.isNullOrEmpty; + import org.apache.beam.runners.dataflow.DataflowRunner; import org.apache.beam.sdk.options.ApplicationNameOptions; import org.apache.beam.sdk.options.BigQueryOptions; @@ -29,6 +32,8 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PubsubOptions; import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.options.Validation; +import org.apache.beam.sdk.util.IOChannelUtils; +import org.apache.beam.sdk.util.gcsfs.GcsPath; import com.google.common.base.MoreObjects; @@ -37,6 +42,8 @@ import org.joda.time.DateTimeZone; import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; +import java.io.IOException; + /** * Options that can be used to configure the {@link DataflowRunner}. */ @@ -61,14 +68,14 @@ public interface DataflowPipelineOptions * * <p>Must be a valid Cloud Storage URL, beginning with the prefix "gs://" * - * <p>At least one of {@link PipelineOptions#getTempLocation()} or {@link #getStagingLocation()} - * must be set. If {@link #getStagingLocation()} is not set, then the Dataflow - * pipeline defaults to using {@link PipelineOptions#getTempLocation()}. + * <p>If {@link #getStagingLocation()} is not set, it will default to + * {@link GcpOptions#getGcpTempLocation()}. {@link GcpOptions#getGcpTempLocation()} + * must be a valid GCS path. */ @Description("GCS path for staging local files, e.g. \"gs://bucket/object\". " + "Must be a valid Cloud Storage URL, beginning with the prefix \"gs://\". " - + "At least one of stagingLocation or tempLocation must be set. If stagingLocation is unset, " - + "defaults to using tempLocation.") + + "If stagingLocation is unset, defaults to gcpTempLocation with \"/staging\" suffix.") + @Default.InstanceFactory(StagingLocationFactory.class) String getStagingLocation(); void setStagingLocation(String value); @@ -123,4 +130,33 @@ public interface DataflowPipelineOptions return normalizedAppName + "-" + normalizedUserName + "-" + datePart; } } + + /** + * Returns a default staging location under {@link GcpOptions#getGcpTempLocation}. + */ + public static class StagingLocationFactory implements DefaultValueFactory<String> { + + @Override + public String create(PipelineOptions options) { + String gcpTempLocation = options.as(GcpOptions.class).getGcpTempLocation(); + checkArgument(!isNullOrEmpty(gcpTempLocation), + "Error constructing default value for stagingLocation: gcpTempLocation is missing." + + "Either stagingLocation must be set explicitly or a valid value must be provided" + + "for gcpTempLocation."); + try { + GcsPath.fromUri(gcpTempLocation); + } catch (Exception e) { + throw new IllegalArgumentException(String.format( + "Error constructing default value for stagingLocation: gcpTempLocation is not" + + " a valid GCS path, %s. ", gcpTempLocation)); + } + try { + return IOChannelUtils.resolve(gcpTempLocation, "staging"); + } catch (IOException e) { + throw new IllegalArgumentException(String.format( + "Unable to resolve stagingLocation from gcpTempLocation: %s." + + " Please set the staging location explicitly.", gcpTempLocation), e); + } + } + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3987cc1d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java index 1325cf3..6894a10 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java @@ -74,12 +74,14 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> { /** * Constructs a runner from the provided options. */ - public static TestDataflowRunner fromOptions( - PipelineOptions options) { + public static TestDataflowRunner fromOptions(PipelineOptions options) { TestDataflowPipelineOptions dataflowOptions = options.as(TestDataflowPipelineOptions.class); - dataflowOptions.setStagingLocation(Joiner.on("/").join( - new String[]{dataflowOptions.getTempRoot(), - dataflowOptions.getJobName(), "output", "results"})); + String tempLocation = Joiner.on("/").join( + dataflowOptions.getTempRoot(), + dataflowOptions.getJobName(), + "output", + "results"); + dataflowOptions.setTempLocation(tempLocation); return new TestDataflowRunner(dataflowOptions); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3987cc1d/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index 48c757f..d4d571b 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -198,6 +198,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { settings.put("runner", "org.apache.beam.runners.dataflow.DataflowRunner"); settings.put("jobName", "some-job-name"); settings.put("tempLocation", "gs://somebucket/some/path"); + settings.put("gcpTempLocation", "gs://somebucket/some/path"); settings.put("stagingLocation", "gs://somebucket/some/path/staging"); settings.put("stableUniqueNames", "WARNING"); settings.put("streaming", false); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3987cc1d/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index 0cf1ade..f3cbb38 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -187,7 +187,14 @@ public class DataflowRunnerTest { return mockDataflowClient; } - private GcsUtil buildMockGcsUtil(boolean bucketExists) throws IOException { + /** + * Build a mock {@link GcsUtil} with return values. + * + * @param bucketExist first return value + * @param bucketExists next return values + */ + private GcsUtil buildMockGcsUtil(Boolean bucketExist, Boolean... bucketExists) + throws IOException { GcsUtil mockGcsUtil = mock(GcsUtil.class); when(mockGcsUtil.create(any(GcsPath.class), anyString())) .then(new Answer<SeekableByteChannel>() { @@ -206,7 +213,7 @@ public class DataflowRunnerTest { return ImmutableList.of((GcsPath) invocation.getArguments()[0]); } }); - when(mockGcsUtil.bucketExists(any(GcsPath.class))).thenReturn(bucketExists); + when(mockGcsUtil.bucketExists(any(GcsPath.class))).thenReturn(bucketExist, bucketExists); return mockGcsUtil; } @@ -508,11 +515,11 @@ public class DataflowRunnerTest { } @Test - public void testInvalidTempLocation() throws IOException { + public void testInvalidGcpTempLocation() throws IOException { ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class); DataflowPipelineOptions options = buildPipelineOptions(jobCaptor); - options.setTempLocation("file://temp/location"); + options.setGcpTempLocation("file://temp/location"); thrown.expect(IllegalArgumentException.class); thrown.expectMessage(containsString("expected a valid 'gs://' path but was given")); @@ -521,6 +528,19 @@ public class DataflowRunnerTest { } @Test + public void testNonGcsTempLocation() throws IOException { + ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class); + + DataflowPipelineOptions options = buildPipelineOptions(jobCaptor); + options.setTempLocation("file://temp/location"); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage( + "DataflowRunner requires gcpTempLocation, and it is missing in PipelineOptions."); + DataflowRunner.fromOptions(options); + } + + @Test public void testInvalidStagingLocation() throws IOException { DataflowPipelineOptions options = buildPipelineOptions(); options.setStagingLocation("file://my/staging/location"); @@ -543,7 +563,8 @@ public class DataflowRunnerTest { public void testNonExistentTempLocation() throws IOException { ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class); - GcsUtil mockGcsUtil = buildMockGcsUtil(false /* bucket exists */); + GcsUtil mockGcsUtil = + buildMockGcsUtil(false /* temp bucket exists */, true /* staging bucket exists */); DataflowPipelineOptions options = buildPipelineOptions(jobCaptor); options.setGcsUtil(mockGcsUtil); options.setTempLocation("gs://non-existent-bucket/location"); @@ -559,7 +580,8 @@ public class DataflowRunnerTest { public void testNonExistentStagingLocation() throws IOException { ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class); - GcsUtil mockGcsUtil = buildMockGcsUtil(false /* bucket exists */); + GcsUtil mockGcsUtil = + buildMockGcsUtil(true /* temp bucket exists */, false /* staging bucket exists */); DataflowPipelineOptions options = buildPipelineOptions(jobCaptor); options.setGcsUtil(mockGcsUtil); options.setStagingLocation("gs://non-existent-bucket/location"); @@ -593,7 +615,7 @@ public class DataflowRunnerTest { options.setRunner(DataflowRunner.class); options.setProject("foo-12345"); - options.setStagingLocation("gs://spam/ham/eggs"); + options.setGcpTempLocation("gs://spam/ham/eggs"); options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */)); options.setGcpCredential(new TestCredential()); @@ -606,7 +628,7 @@ public class DataflowRunnerTest { options.setRunner(DataflowRunner.class); options.setProject("google.com:some-project-12345"); - options.setStagingLocation("gs://spam/ham/eggs"); + options.setGcpTempLocation("gs://spam/ham/eggs"); options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */)); options.setGcpCredential(new TestCredential()); @@ -619,7 +641,7 @@ public class DataflowRunnerTest { options.setRunner(DataflowRunner.class); options.setProject("12345"); - options.setStagingLocation("gs://spam/ham/eggs"); + options.setGcpTempLocation("gs://spam/ham/eggs"); options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */)); thrown.expect(IllegalArgumentException.class); @@ -635,7 +657,7 @@ public class DataflowRunnerTest { options.setRunner(DataflowRunner.class); options.setProject("some project"); - options.setStagingLocation("gs://spam/ham/eggs"); + options.setGcpTempLocation("gs://spam/ham/eggs"); options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */)); thrown.expect(IllegalArgumentException.class); @@ -651,7 +673,7 @@ public class DataflowRunnerTest { options.setRunner(DataflowRunner.class); options.setProject("foo-12345"); - options.setStagingLocation("gs://spam/ham/eggs"); + options.setTempLocation("gs://spam/ham/eggs"); options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */)); options.as(DataflowPipelineDebugOptions.class).setNumberOfWorkerHarnessThreads(-1); @@ -671,8 +693,7 @@ public class DataflowRunnerTest { thrown.expect(IllegalArgumentException.class); thrown.expectMessage( - "Missing required value: at least one of tempLocation or stagingLocation must be set."); - + "DataflowRunner requires gcpTempLocation, and it is missing in PipelineOptions."); DataflowRunner.fromOptions(options); } @@ -682,7 +703,7 @@ public class DataflowRunnerTest { options.setRunner(DataflowRunner.class); options.setGcpCredential(new TestCredential()); options.setProject("foo-project"); - options.setStagingLocation("gs://spam/ham/eggs"); + options.setGcpTempLocation("gs://spam/ham/eggs"); options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */)); DataflowRunner.fromOptions(options); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3987cc1d/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java index e7db40f..b5ee5e9 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java @@ -17,15 +17,18 @@ */ package org.apache.beam.runners.dataflow.options; +import static com.google.common.base.Strings.isNullOrEmpty; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.ResetDateTimeProvider; import org.apache.beam.sdk.testing.RestoreSystemProperties; +import org.apache.beam.sdk.util.IOChannelUtils; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.rules.TestRule; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -35,6 +38,7 @@ import org.junit.runners.JUnit4; public class DataflowPipelineOptionsTest { @Rule public TestRule restoreSystemProperties = new RestoreSystemProperties(); @Rule public ResetDateTimeProvider resetDateTimeProviderRule = new ResetDateTimeProvider(); + @Rule public ExpectedException thrown = ExpectedException.none(); @Test public void testJobNameIsSet() { @@ -90,4 +94,62 @@ public class DataflowPipelineOptionsTest { options.setAppName("fÉËnÉtık ÉsoÊsiËeıÊn"); assertEquals("f00n0t0k00so0si0e00n-0i00nt00n000n0l0-1208190706", options.getJobName()); } + + @Test + public void testStagingLocation() { + DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + IOChannelUtils.registerStandardIOFactories(options); + options.setTempLocation("file://temp_location"); + options.setStagingLocation("gs://staging_location"); + assertTrue(isNullOrEmpty(options.getGcpTempLocation())); + assertEquals("gs://staging_location", options.getStagingLocation()); + } + + @Test + public void testDefaultToTempLocation() { + DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + IOChannelUtils.registerStandardIOFactories(options); + options.setTempLocation("gs://temp_location"); + assertEquals("gs://temp_location", options.getGcpTempLocation()); + assertEquals("gs://temp_location/staging", options.getStagingLocation()); + } + + @Test + public void testDefaultToGcpTempLocation() { + DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + IOChannelUtils.registerStandardIOFactories(options); + options.setTempLocation("gs://temp_location"); + options.setGcpTempLocation("gs://gcp_temp_location"); + assertEquals("gs://gcp_temp_location/staging", options.getStagingLocation()); + } + + @Test + public void testDefaultNoneGcsTempLocation() { + DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + options.setTempLocation("file://temp_location"); + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage( + "Error constructing default value for stagingLocation: gcpTempLocation is missing."); + options.getStagingLocation(); + } + + @Test + public void testDefaultInvalidGcpTempLocation() { + DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + options.setGcpTempLocation("file://temp_location"); + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage( + "Error constructing default value for stagingLocation: gcpTempLocation is not" + + " a valid GCS path"); + options.getStagingLocation(); + } + + @Test + public void testDefaultStagingLocationUnset() { + DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage( + "Error constructing default value for stagingLocation: gcpTempLocation is missing."); + options.getStagingLocation(); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3987cc1d/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java index a44b8a7..b219ea2 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java @@ -61,7 +61,7 @@ public class DataflowGroupByKeyTest { DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); options.setRunner(DataflowRunner.class); options.setProject("someproject"); - options.setStagingLocation("gs://staging"); + options.setGcpTempLocation("gs://staging"); options.setPathValidatorClass(NoopPathValidator.class); options.setDataflowClient(null); return Pipeline.create(options); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3987cc1d/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java index 1b263d2..95cbaae 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java @@ -54,7 +54,7 @@ public class DataflowViewTest { DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); options.setRunner(DataflowRunner.class); options.setProject("someproject"); - options.setStagingLocation("gs://staging"); + options.setGcpTempLocation("gs://staging"); options.setPathValidatorClass(NoopPathValidator.class); options.setDataflowClient(null); return Pipeline.create(options); @@ -65,7 +65,7 @@ public class DataflowViewTest { options.setRunner(DataflowRunner.class); options.setStreaming(true); options.setProject("someproject"); - options.setStagingLocation("gs://staging"); + options.setGcpTempLocation("gs://staging"); options.setPathValidatorClass(NoopPathValidator.class); options.setDataflowClient(null); return Pipeline.create(options); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3987cc1d/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java index 4585266..1bf4dd6 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java @@ -20,10 +20,12 @@ package org.apache.beam.sdk.options; import org.apache.beam.sdk.util.CredentialFactory; import org.apache.beam.sdk.util.GcpCredentialFactory; import org.apache.beam.sdk.util.InstanceBuilder; +import org.apache.beam.sdk.util.gcsfs.GcsPath; import com.google.api.client.auth.oauth2.Credential; import com.google.api.client.googleapis.auth.oauth2.GoogleOAuthConstants; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Strings; import com.google.common.io.Files; import com.fasterxml.jackson.annotation.JsonIgnore; @@ -290,4 +292,34 @@ public interface GcpOptions extends GoogleApiDebugOptions, PipelineOptions { @Hidden String getAuthorizationServerEncodedUrl(); void setAuthorizationServerEncodedUrl(String value); + + /** + * A GCS path for storing temporary files in GCP. + * + * <p>Its default to {@link PipelineOptions#getTempLocation}. + */ + @Description("A GCS path for storing temporary files in GCP.") + @Default.InstanceFactory(GcpTempLocationFactory.class) + String getGcpTempLocation(); + void setGcpTempLocation(String value); + + /** + * Returns {@link PipelineOptions#getTempLocation} as the default GCP temp location. + */ + public static class GcpTempLocationFactory implements DefaultValueFactory<String> { + + @Override + public String create(PipelineOptions options) { + String tempLocation = options.getTempLocation(); + if (!Strings.isNullOrEmpty(tempLocation)) { + try { + GcsPath.fromUri(tempLocation); + } catch (Exception e) { + // Ignore the temp location because it is not a valid 'gs://' path. + return null; + } + } + return tempLocation; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3987cc1d/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java index c179738..c0f65d8 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java @@ -17,8 +17,10 @@ */ package org.apache.beam.sdk.options; +import static com.google.common.base.Strings.isNullOrEmpty; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; @@ -104,6 +106,27 @@ public class GcpOptionsTest { assertNull(projectFactory.create(PipelineOptionsFactory.create())); } + @Test + public void testEmptyGcpTempLocation() throws Exception { + GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class); + assertTrue(isNullOrEmpty(options.getGcpTempLocation())); + } + + @Test + public void testDefaultGcpTempLocation() throws Exception { + GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class); + String tempLocation = "gs://bucket"; + options.setTempLocation(tempLocation); + assertEquals(tempLocation, options.getGcpTempLocation()); + } + + @Test + public void testDefaultGcpTempLocationInvalid() throws Exception { + GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class); + options.setTempLocation("file://"); + assertTrue(isNullOrEmpty(options.getGcpTempLocation())); + } + private static void makePropertiesFileWithProject(File path, String projectId) throws IOException { String properties = String.format("[core]%n"