kennknowles commented on code in PR #36138:
URL: https://github.com/apache/beam/pull/36138#discussion_r2345000717


##########
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java:
##########
@@ -2634,48 +2637,96 @@ public Map<PCollection<?>, ReplacementOutput> 
mapOutputs(
   }
 
   @VisibleForTesting
-  static String getContainerImageForJob(DataflowPipelineOptions options) {
+  static String getV1WorkerContainerImageForJob(DataflowPipelineOptions 
options) {
+    String containerImage = options.getWorkerHarnessContainerImage();
+
+    if (containerImage == null) {
+      // If not set, construct and return default image URL.
+      return getDefaultV1WorkerContainerImageUrl(options);
+    } else if (containerImage.contains("IMAGE")) {
+      // Replace placeholder with default image name
+      return containerImage.replace("IMAGE", 
getDefaultV1WorkerContainerImageNameForJob(options));
+    } else {
+      return containerImage;
+    }
+  }
+
+  static String getV2SdkHarnessContainerImageForJob(DataflowPipelineOptions 
options) {
     String containerImage = options.getSdkContainerImage();
 
     if (containerImage == null) {
       // If not set, construct and return default image URL.
-      return getDefaultContainerImageUrl(options);
+      return getDefaultV2SdkHarnessContainerImageUrl(options);
     } else if (containerImage.contains("IMAGE")) {
       // Replace placeholder with default image name
-      return containerImage.replace("IMAGE", 
getDefaultContainerImageNameForJob(options));
+      return containerImage.replace("IMAGE", 
getDefaultV2SdkHarnessContainerImageNameForJob());
     } else {
       return containerImage;
     }
   }
 
-  /** Construct the default Dataflow container full image URL. */
-  static String getDefaultContainerImageUrl(DataflowPipelineOptions options) {
+  /** Construct the default Dataflow worker container full image URL. */
+  static String getDefaultV1WorkerContainerImageUrl(DataflowPipelineOptions 
options) {
     DataflowRunnerInfo dataflowRunnerInfo = 
DataflowRunnerInfo.getDataflowRunnerInfo();
     return String.format(
         "%s/%s:%s",
         dataflowRunnerInfo.getContainerImageBaseRepository(),
-        getDefaultContainerImageNameForJob(options),
-        getDefaultContainerVersion(options));
+        getDefaultV1WorkerContainerImageNameForJob(options),
+        getDefaultV1WorkerContainerVersion(options));
+  }
+
+  /** Construct the default Java SDK container full image URL. */
+  static String 
getDefaultV2SdkHarnessContainerImageUrl(DataflowPipelineOptions options) {
+    DataflowRunnerInfo dataflowRunnerInfo = 
DataflowRunnerInfo.getDataflowRunnerInfo();
+    return String.format(
+        "%s/%s:%s",
+        dataflowRunnerInfo.getContainerImageBaseRepository(),
+        getDefaultV2SdkHarnessContainerImageNameForJob(),
+        getDefaultV2SdkHarnessContainerVersion(options));
   }
 
   /**
-   * Construct the default Dataflow container image name based on pipeline 
type and Java version.
+   * Construct the default Dataflow V1 worker container image name based on 
pipeline type and Java
+   * version.
    */
-  static String getDefaultContainerImageNameForJob(DataflowPipelineOptions 
options) {
+  static String 
getDefaultV1WorkerContainerImageNameForJob(DataflowPipelineOptions options) {
     Environments.JavaVersion javaVersion = Environments.getJavaVersion();
-    if (useUnifiedWorker(options)) {
-      return String.format("beam_%s_sdk", javaVersion.name());
-    } else if (options.isStreaming()) {
+    if (options.isStreaming()) {
       return String.format("beam-%s-streaming", javaVersion.legacyName());
     } else {
       return String.format("beam-%s-batch", javaVersion.legacyName());
     }
   }
 
+  /**
+   * Construct the default Java SDK container image name based on pipeline 
type and Java version,
+   * for use by Dataflow V2.
+   */
+  static String getDefaultV2SdkHarnessContainerImageNameForJob() {
+    Environments.JavaVersion javaVersion = Environments.getJavaVersion();
+    return String.format("beam_%s_sdk", javaVersion.name());
+  }
+
+  /**
+   * Construct the default Dataflow V1 worker container image name based on 
pipeline type and Java
+   * version.
+   */
+  static String getDefaultV1WorkerContainerVersion(DataflowPipelineOptions 
options) {
+    DataflowRunnerInfo dataflowRunnerInfo = 
DataflowRunnerInfo.getDataflowRunnerInfo();
+    ReleaseInfo releaseInfo = ReleaseInfo.getReleaseInfo();
+    if (releaseInfo.isDevSdkVersion()) {
+      if (useUnifiedWorker(options)) {

Review Comment:
   Missed this. We don't need this statement.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to