This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 2300f45 add sdkContainerImage to Java WorkerPool PipelineOptions new 018f5e7 Merge pull request #14575 from [BEAM-12212] Adds --sdkContainerImage as new Java Dataflow PipelineOption 2300f45 is described below commit 2300f4547bbdc5c0e1cdcafa537d1a605bedc522 Author: Emily Ye <emil...@google.com> AuthorDate: Sun Apr 18 23:58:24 2021 -0700 add sdkContainerImage to Java WorkerPool PipelineOptions --- .../org/apache/beam/gradle/BeamModulePlugin.groovy | 4 +- runners/google-cloud-dataflow-java/build.gradle | 3 +- .../examples-streaming/build.gradle | 4 +- .../beam/runners/dataflow/DataflowRunner.java | 85 ++++++++++++++++++---- .../beam/runners/dataflow/DataflowRunnerInfo.java | 10 ++- .../options/DataflowPipelineWorkerPoolOptions.java | 38 +++++----- .../beam/runners/dataflow/dataflow.properties | 1 + .../dataflow/DataflowPipelineTranslatorTest.java | 46 ++++++++++-- .../runners/dataflow/DataflowRunnerInfoTest.java | 15 +++- .../beam/runners/dataflow/DataflowRunnerTest.java | 68 ++++++++++++++--- 10 files changed, 214 insertions(+), 60 deletions(-) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 465061d..4c4d41b 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -1583,13 +1583,15 @@ class BeamModulePlugin implements Plugin<Project> { if (pipelineOptionsString.contains('use_runner_v2')) { def dockerImageName = project.project(':runners:google-cloud-dataflow-java').ext.dockerImageName allOptionsList.addAll([ - "--workerHarnessContainerImage=${dockerImageName}", + "--sdkContainerImage=${dockerImageName}", "--region=${dataflowRegion}" ]) } else { def dataflowWorkerJar = project.findProperty('dataflowWorkerJar') ?: project.project(":runners:google-cloud-dataflow-java:worker:legacy-worker").shadowJar.archivePath allOptionsList.addAll([ + // Keep as legacy flag to ensure via test this flag works for + // legacy pipeline. '--workerHarnessContainerImage=', "--dataflowWorkerJar=${dataflowWorkerJar}", "--region=${dataflowRegion}" diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index 80941db..a3db5a2 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -46,6 +46,7 @@ processResources { 'dataflow.legacy_environment_major_version' : '8', 'dataflow.fnapi_environment_major_version' : '8', 'dataflow.container_version' : 'beam-master-20210419', + 'dataflow.container_base_repository' : 'gcr.io/cloud-dataflow/v1beta3', ] } @@ -147,7 +148,7 @@ def runnerV2PipelineOptions = [ "--project=${dataflowProject}", "--region=${dataflowRegion}", "--tempRoot=${dataflowValidatesTempRoot}", - "--workerHarnessContainerImage=${dockerImageContainer}:${dockerTag}", + "--sdkContainerImage=${dockerImageContainer}:${dockerTag}", // TODO(BEAM-11779) remove shuffle_mode=appliance with runner v2 once issue is resolved. "--experiments=beam_fn_api,use_unified_worker,use_runner_v2,shuffle_mode=appliance", ] diff --git a/runners/google-cloud-dataflow-java/examples-streaming/build.gradle b/runners/google-cloud-dataflow-java/examples-streaming/build.gradle index fd8705d..bc5f584d 100644 --- a/runners/google-cloud-dataflow-java/examples-streaming/build.gradle +++ b/runners/google-cloud-dataflow-java/examples-streaming/build.gradle @@ -40,8 +40,8 @@ task windmillPreCommit(type: Test) { dependsOn ":runners:google-cloud-dataflow-java:worker:legacy-worker:shadowJar" def dataflowWorkerJar = project.findProperty('dataflowWorkerJar') ?: project(":runners:google-cloud-dataflow-java:worker:legacy-worker").shadowJar.archivePath - // Set workerHarnessContainerImage to empty to make Dataflow pick up the non-versioned container - // image, which handles a staged worker jar. + // Set workerHarnessContainerImage to empty to make Dataflow pick up the + // non-versioned container image, which handles a staged worker jar. def preCommitBeamTestPipelineOptions = [ "--project=${gcpProject}", "--region=${gcpRegion}", diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 42d463e..cae1cf2 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -269,7 +269,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { "Missing required pipeline options: " + Joiner.on(',').join(missing)); } - validateWorkerSettings(PipelineOptionsValidator.validate(GcpOptions.class, options)); + validateWorkerSettings( + PipelineOptionsValidator.validate(DataflowPipelineWorkerPoolOptions.class, options)); PathValidator validator = dataflowOptions.getPathValidator(); String gcpTempLocation; @@ -401,8 +402,39 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { return Strings.isNullOrEmpty(endpoint) || Pattern.matches(ENDPOINT_REGEXP, endpoint); } + static void validateSdkContainerImageOptions(DataflowPipelineWorkerPoolOptions workerOptions) { + // Check against null - empty string value for workerHarnessContainerImage + // must be preserved for legacy dataflowWorkerJar to work. + String sdkContainerOption = workerOptions.getSdkContainerImage(); + String workerHarnessOption = workerOptions.getWorkerHarnessContainerImage(); + Preconditions.checkArgument( + sdkContainerOption == null + || workerHarnessOption == null + || sdkContainerOption.equals(workerHarnessOption), + "Cannot use legacy option workerHarnessContainerImage with sdkContainerImage. Prefer sdkContainerImage."); + + // Default to new option, which may be null. + String containerImage = workerOptions.getSdkContainerImage(); + if (workerOptions.getWorkerHarnessContainerImage() != null + && workerOptions.getSdkContainerImage() == null) { + // Set image to old option if old option was set but new option is not set. + LOG.warn( + "Prefer --sdkContainerImage over deprecated legacy option --workerHarnessContainerImage."); + containerImage = workerOptions.getWorkerHarnessContainerImage(); + } + + // Make sure both options have same value. + workerOptions.setSdkContainerImage(containerImage); + workerOptions.setWorkerHarnessContainerImage(containerImage); + } + @VisibleForTesting - static void validateWorkerSettings(GcpOptions gcpOptions) { + static void validateWorkerSettings(DataflowPipelineWorkerPoolOptions workerOptions) { + DataflowPipelineOptions dataflowOptions = workerOptions.as(DataflowPipelineOptions.class); + + validateSdkContainerImageOptions(workerOptions); + + GcpOptions gcpOptions = workerOptions.as(GcpOptions.class); Preconditions.checkArgument( gcpOptions.getZone() == null || gcpOptions.getWorkerRegion() == null, "Cannot use option zone with workerRegion. Prefer either workerZone or workerRegion."); @@ -413,7 +445,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { gcpOptions.getWorkerRegion() == null || gcpOptions.getWorkerZone() == null, "workerRegion and workerZone options are mutually exclusive."); - DataflowPipelineOptions dataflowOptions = gcpOptions.as(DataflowPipelineOptions.class); boolean hasExperimentWorkerRegion = false; if (dataflowOptions.getExperiments() != null) { for (String experiment : dataflowOptions.getExperiments()) { @@ -1092,9 +1123,9 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { // Set the Docker container image that executes Dataflow worker harness, residing in Google // Container Registry. Translator is guaranteed to create a worker pool prior to this point. - String workerHarnessContainerImage = getContainerImageForJob(options); + String containerImage = getContainerImageForJob(options); for (WorkerPool workerPool : newJob.getEnvironment().getWorkerPools()) { - workerPool.setWorkerHarnessContainerImage(workerHarnessContainerImage); + workerPool.setWorkerHarnessContainerImage(containerImage); } configureSdkHarnessContainerImages(options, portablePipelineProto, newJob); @@ -2175,21 +2206,45 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { @VisibleForTesting static String getContainerImageForJob(DataflowPipelineOptions options) { - String workerHarnessContainerImage = options.getWorkerHarnessContainerImage(); + String containerImage = options.getSdkContainerImage(); + + if (containerImage == null) { + // If not set, construct and return default image URL. + return getDefaultContainerImageUrl(options); + } else if (containerImage.contains("IMAGE")) { + // Replace placeholder with default image name + // TODO(emilymye): See if we can remove this placeholder + return containerImage.replace("IMAGE", getDefaultContainerImageNameForJob(options)); + } else { + return containerImage; + } + } + + /** Construct the default Dataflow container full image URL. */ + static String getDefaultContainerImageUrl(DataflowPipelineOptions options) { + DataflowRunnerInfo dataflowRunnerInfo = DataflowRunnerInfo.getDataflowRunnerInfo(); + return String.format( + "%s/%s:%s", + dataflowRunnerInfo.getContainerImageBaseRepository(), + getDefaultContainerImageNameForJob(options), + dataflowRunnerInfo.getContainerVersion()); + } + /** + * Construct the default Dataflow container image name based on pipeline type and Environment Java + * version. + */ + static String getDefaultContainerImageNameForJob(DataflowPipelineOptions options) { Environments.JavaVersion javaVersion = Environments.getJavaVersion(); - String javaVersionId = + String legacyJavaVersionId = (javaVersion == Environments.JavaVersion.v8) ? "java" : javaVersion.toString(); - if (!workerHarnessContainerImage.contains("IMAGE")) { - return workerHarnessContainerImage; - } else if (useUnifiedWorker(options)) { - return workerHarnessContainerImage.replace("IMAGE", "java"); + + if (useUnifiedWorker(options)) { + return "java"; } else if (options.isStreaming()) { - return workerHarnessContainerImage.replace( - "IMAGE", String.format("beam-%s-streaming", javaVersionId)); + return String.format("beam-%s-streaming", legacyJavaVersionId); } else { - return workerHarnessContainerImage.replace( - "IMAGE", String.format("beam-%s-batch", javaVersionId)); + return String.format("beam-%s-batch", legacyJavaVersionId); } } diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunnerInfo.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunnerInfo.java index f54643f..4c5b272 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunnerInfo.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunnerInfo.java @@ -42,6 +42,7 @@ public final class DataflowRunnerInfo extends ReleaseInfo { private static final String LEGACY_ENVIRONMENT_MAJOR_VERSION_KEY = "legacy.environment.major.version"; private static final String CONTAINER_VERSION_KEY = "container.version"; + private static final String CONTAINER_BASE_REPOSITORY_KEY = "container.base_repository"; private static class LazyInit { private static final DataflowRunnerInfo INSTANCE; @@ -99,12 +100,19 @@ public final class DataflowRunnerInfo extends ReleaseInfo { return properties.get(FNAPI_ENVIRONMENT_MAJOR_VERSION_KEY); } - /** Provides the container version that will be used for constructing harness image paths. */ + /** Provides the version/tag for constructing the container image path. */ public String getContainerVersion() { checkState(properties.containsKey(CONTAINER_VERSION_KEY), "Unknown container version"); return properties.get(CONTAINER_VERSION_KEY); } + /** Provides the version/tag for constructing the container image path. */ + public String getContainerImageBaseRepository() { + checkState( + properties.containsKey(CONTAINER_BASE_REPOSITORY_KEY), "Unknown container base repository"); + return properties.get(CONTAINER_BASE_REPOSITORY_KEY); + } + @Override public Map<String, String> getProperties() { return ImmutableMap.copyOf((Map) properties); diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java index 751bca6..1978b17 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java @@ -18,16 +18,12 @@ package org.apache.beam.runners.dataflow.options; import com.fasterxml.jackson.annotation.JsonIgnore; -import org.apache.beam.runners.dataflow.DataflowRunnerInfo; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; -import org.apache.beam.sdk.options.Default; -import org.apache.beam.sdk.options.DefaultValueFactory; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.FileStagingOptions; import org.apache.beam.sdk.options.Hidden; -import org.apache.beam.sdk.options.PipelineOptions; import org.checkerframework.checker.nullness.qual.Nullable; /** Options that are used to configure the Dataflow pipeline worker pool. */ @@ -112,30 +108,32 @@ public interface DataflowPipelineWorkerPoolOptions extends GcpOptions, FileStagi void setDiskSizeGb(int value); - /** - * Docker container image that executes Dataflow worker harness, residing in Google Container - * Registry. - */ - @Default.InstanceFactory(WorkerHarnessContainerImageFactory.class) + /** Container image used as Dataflow worker harness image. */ + /** @deprecated Use {@link #getSdkContainerImage} instead. */ @Description( - "Docker container image that executes Dataflow worker harness, residing in Google " - + " Container Registry.") + "Container image used to configure a Dataflow worker. " + + "Can only be used for official Dataflow container images. " + + "Prefer using sdkContainerImage instead.") + @Deprecated @Hidden String getWorkerHarnessContainerImage(); + /** @deprecated Use {@link #setSdkContainerImage} instead. */ + @Deprecated + @Hidden void setWorkerHarnessContainerImage(String value); /** - * Returns the default Docker container image that executes Dataflow worker harness, residing in - * Google Container Registry. + * Container image used to configure SDK execution environment on worker. Used for custom + * containers on portable pipelines only. */ - class WorkerHarnessContainerImageFactory implements DefaultValueFactory<String> { - @Override - public String create(PipelineOptions options) { - String containerVersion = DataflowRunnerInfo.getDataflowRunnerInfo().getContainerVersion(); - return String.format("gcr.io/cloud-dataflow/v1beta3/IMAGE:%s", containerVersion); - } - } + @Description( + "Container image used to configure the SDK execution environment of " + + "pipeline code on a worker. For non-portable pipelines, can only be " + + "used for official Dataflow container images.") + String getSdkContainerImage(); + + void setSdkContainerImage(String value); /** * GCE <a href="https://cloud.google.com/compute/docs/networking">network</a> for launching diff --git a/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties b/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties index 7d4bdf0..58e92ee 100644 --- a/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties +++ b/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties @@ -19,3 +19,4 @@ legacy.environment.major.version=@dataflow.legacy_environment_major_version@ fnapi.environment.major.version=@dataflow.fnapi_environment_major_version@ container.version=@dataflow.container_version@ +container.base_repository=@dataflow.container_base_repository@ diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index a4834e2..c38037f 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -141,10 +141,10 @@ public class DataflowPipelineTranslatorTest implements Serializable { private SdkComponents createSdkComponents(PipelineOptions options) { SdkComponents sdkComponents = SdkComponents.create(); - String workerHarnessContainerImageURL = + String containerImageURL = DataflowRunner.getContainerImageForJob(options.as(DataflowPipelineOptions.class)); RunnerApi.Environment defaultEnvironmentForDataflow = - Environments.createDockerEnvironment(workerHarnessContainerImageURL); + Environments.createDockerEnvironment(containerImageURL); sdkComponents.registerEnvironment(defaultEnvironmentForDataflow); return sdkComponents; @@ -1260,15 +1260,15 @@ public class DataflowPipelineTranslatorTest implements Serializable { } /** - * Tests that when {@link DataflowPipelineOptions#setWorkerHarnessContainerImage(String)} pipeline - * option is set, {@link DataflowRunner} sets that value as the {@link - * DockerPayload#getContainerImage()} of the default {@link Environment} used when generating the - * model pipeline proto. + * Tests that when (deprecated) {@link + * DataflowPipelineOptions#setWorkerHarnessContainerImage(String)} pipeline option is set, {@link + * DataflowRunner} sets that value as the {@link DockerPayload#getContainerImage()} of the default + * {@link Environment} used when generating the model pipeline proto. */ @Test public void testSetWorkerHarnessContainerImageInPipelineProto() throws Exception { DataflowPipelineOptions options = buildPipelineOptions(); - String containerImage = "gcr.io/IMAGE/foo"; + String containerImage = "gcr.io/image:foo"; options.as(DataflowPipelineOptions.class).setWorkerHarnessContainerImage(containerImage); Pipeline p = Pipeline.create(options); @@ -1292,6 +1292,38 @@ public class DataflowPipelineTranslatorTest implements Serializable { assertEquals(DataflowRunner.getContainerImageForJob(options), payload.getContainerImage()); } + /** + * Tests that when {@link DataflowPipelineOptions#setSdkContainerImage(String)} pipeline option is + * set, {@link DataflowRunner} sets that value as the {@link DockerPayload#getContainerImage()} of + * the default {@link Environment} used when generating the model pipeline proto. + */ + @Test + public void testSetSdkContainerImageInPipelineProto() throws Exception { + DataflowPipelineOptions options = buildPipelineOptions(); + String containerImage = "gcr.io/image:foo"; + options.as(DataflowPipelineOptions.class).setSdkContainerImage(containerImage); + + Pipeline p = Pipeline.create(options); + SdkComponents sdkComponents = createSdkComponents(options); + RunnerApi.Pipeline proto = PipelineTranslation.toProto(p, sdkComponents, true); + JobSpecification specification = + DataflowPipelineTranslator.fromOptions(options) + .translate( + p, + proto, + sdkComponents, + DataflowRunner.fromOptions(options), + Collections.emptyList()); + RunnerApi.Pipeline pipelineProto = specification.getPipelineProto(); + + assertEquals(1, pipelineProto.getComponents().getEnvironmentsCount()); + Environment defaultEnvironment = + Iterables.getOnlyElement(pipelineProto.getComponents().getEnvironmentsMap().values()); + + DockerPayload payload = DockerPayload.parseFrom(defaultEnvironment.getPayload()); + assertEquals(DataflowRunner.getContainerImageForJob(options), payload.getContainerImage()); + } + @Test public void testDataflowServiceOptionsSet() throws IOException { final List<String> dataflowServiceOptions = diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerInfoTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerInfoTest.java index 83e617f..fbb8ed2 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerInfoTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerInfoTest.java @@ -49,8 +49,19 @@ public class DataflowRunnerInfoTest { String.format("FnAPI environment major version number %s is not a number", version), version.matches("\\d+")); - // Validate container version does not contain a $ (indicating it was not filled in). - assertThat("container version invalid", info.getContainerVersion(), not(containsString("$"))); + // Validate container version does not contain the property name (indicating it was not filled + // in). + assertThat( + "container version invalid", + info.getContainerVersion(), + not(containsString("dataflow.container_version"))); + + // Validate container base repository does not contain the property name + // (indicating it was not filled in). + assertThat( + "container repository invalid", + info.getContainerImageBaseRepository(), + not(containsString("dataflow.container_base_repository"))); for (String property : new String[] {"java.vendor", "java.version", "os.arch", "os.name", "os.version"}) { diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index d7cf5ae..e3b426f 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -584,7 +584,8 @@ public class DataflowRunnerTest implements Serializable { @Test public void testZoneAndWorkerRegionMutuallyExclusive() { - GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class); + DataflowPipelineWorkerPoolOptions options = + PipelineOptionsFactory.as(DataflowPipelineWorkerPoolOptions.class); options.setZone("us-east1-b"); options.setWorkerRegion("us-east1"); assertThrows( @@ -593,7 +594,8 @@ public class DataflowRunnerTest implements Serializable { @Test public void testZoneAndWorkerZoneMutuallyExclusive() { - GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class); + DataflowPipelineWorkerPoolOptions options = + PipelineOptionsFactory.as(DataflowPipelineWorkerPoolOptions.class); options.setZone("us-east1-b"); options.setWorkerZone("us-east1-c"); assertThrows( @@ -602,7 +604,8 @@ public class DataflowRunnerTest implements Serializable { @Test public void testExperimentRegionAndWorkerRegionMutuallyExclusive() { - GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class); + DataflowPipelineWorkerPoolOptions options = + PipelineOptionsFactory.as(DataflowPipelineWorkerPoolOptions.class); DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class); ExperimentalOptions.addExperiment(dataflowOptions, "worker_region=us-west1"); options.setWorkerRegion("us-east1"); @@ -612,7 +615,8 @@ public class DataflowRunnerTest implements Serializable { @Test public void testExperimentRegionAndWorkerZoneMutuallyExclusive() { - GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class); + DataflowPipelineWorkerPoolOptions options = + PipelineOptionsFactory.as(DataflowPipelineWorkerPoolOptions.class); DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class); ExperimentalOptions.addExperiment(dataflowOptions, "worker_region=us-west1"); options.setWorkerZone("us-east1-b"); @@ -622,7 +626,8 @@ public class DataflowRunnerTest implements Serializable { @Test public void testWorkerRegionAndWorkerZoneMutuallyExclusive() { - GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class); + DataflowPipelineWorkerPoolOptions options = + PipelineOptionsFactory.as(DataflowPipelineWorkerPoolOptions.class); options.setWorkerRegion("us-east1"); options.setWorkerZone("us-east1-b"); assertThrows( @@ -631,7 +636,8 @@ public class DataflowRunnerTest implements Serializable { @Test public void testZoneAliasWorkerZone() { - GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class); + DataflowPipelineWorkerPoolOptions options = + PipelineOptionsFactory.as(DataflowPipelineWorkerPoolOptions.class); options.setZone("us-east1-b"); DataflowRunner.validateWorkerSettings(options); assertNull(options.getZone()); @@ -639,6 +645,28 @@ public class DataflowRunnerTest implements Serializable { } @Test + public void testAliasForLegacyWorkerHarnessContainerImage() { + DataflowPipelineWorkerPoolOptions options = + PipelineOptionsFactory.as(DataflowPipelineWorkerPoolOptions.class); + String testImage = "image.url:worker"; + options.setWorkerHarnessContainerImage(testImage); + DataflowRunner.validateWorkerSettings(options); + assertEquals(testImage, options.getWorkerHarnessContainerImage()); + assertEquals(testImage, options.getSdkContainerImage()); + } + + @Test + public void testAliasForSdkContainerImage() { + DataflowPipelineWorkerPoolOptions options = + PipelineOptionsFactory.as(DataflowPipelineWorkerPoolOptions.class); + String testImage = "image.url:sdk"; + options.setSdkContainerImage("image.url:sdk"); + DataflowRunner.validateWorkerSettings(options); + assertEquals(testImage, options.getWorkerHarnessContainerImage()); + assertEquals(testImage, options.getSdkContainerImage()); + } + + @Test public void testRegionRequiredForServiceRunner() throws IOException { DataflowPipelineOptions options = buildPipelineOptions(); options.setRegion(null); @@ -1591,15 +1619,33 @@ public class DataflowRunnerTest implements Serializable { } @Test - public void testWorkerHarnessContainerImage() { + public void testGetContainerImageForJobFromOption() { DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); - // default image set - options.setWorkerHarnessContainerImage("some-container"); - assertThat(getContainerImageForJob(options), equalTo("some-container")); + String[] testCases = { + "some-container", + + // It is important that empty string is preserved, as + // dataflowWorkerJar relies on being passed an empty value vs + // not providing the container image option at all. + "", + }; + + for (String testCase : testCases) { + // When image option is set, should use that exact image. + options.setSdkContainerImage(testCase); + assertThat(getContainerImageForJob(options), equalTo(testCase)); + } + } + + @Test + public void testGetContainerImageForJobFromOptionWithPlaceholder() { + DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + // When image option contains placeholder, container image should + // have placeholder replaced with default image for job. + options.setSdkContainerImage("gcr.io/IMAGE/foo"); // batch, legacy - options.setWorkerHarnessContainerImage("gcr.io/IMAGE/foo"); options.setExperiments(null); options.setStreaming(false); System.setProperty("java.specification.version", "1.8");