ihji commented on a change in pull request #11039:
URL: https://github.com/apache/beam/pull/11039#discussion_r419832339



##########
File path: 
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
##########
@@ -772,6 +783,88 @@ private Debuggee registerDebuggee(CloudDebugger 
debuggerClient, String uniquifie
     }
   }
 
+  private List<DataflowPackage> stageArtifacts(RunnerApi.Pipeline pipeline) {
+    ImmutableList.Builder<StagedFile> filesToStageBuilder = 
ImmutableList.builder();
+    for (Map.Entry<String, RunnerApi.Environment> entry :
+        pipeline.getComponents().getEnvironmentsMap().entrySet()) {
+      for (RunnerApi.ArtifactInformation info : 
entry.getValue().getDependenciesList()) {
+        if 
(!BeamUrns.getUrn(RunnerApi.StandardArtifacts.Types.FILE).equals(info.getTypeUrn()))
 {
+          throw new RuntimeException(
+              String.format("unsupported artifact type %s", 
info.getTypeUrn()));
+        }
+        RunnerApi.ArtifactFilePayload filePayload;
+        try {
+          filePayload = 
RunnerApi.ArtifactFilePayload.parseFrom(info.getTypePayload());
+        } catch (InvalidProtocolBufferException e) {
+          throw new RuntimeException("Error parsing artifact file payload.", 
e);
+        }
+        if (!BeamUrns.getUrn(RunnerApi.StandardArtifacts.Roles.STAGING_TO)
+            .equals(info.getRoleUrn())) {
+          throw new RuntimeException(
+              String.format("unsupported artifact role %s", 
info.getRoleUrn()));
+        }
+        RunnerApi.ArtifactStagingToRolePayload stagingPayload;
+        try {
+          stagingPayload = 
RunnerApi.ArtifactStagingToRolePayload.parseFrom(info.getRolePayload());
+        } catch (InvalidProtocolBufferException e) {
+          throw new RuntimeException("Error parsing artifact staging_to role 
payload.", e);
+        }
+        DataflowPackage target = new DataflowPackage();
+        target.setLocation(stagingPayload.getStagedName());
+        if (!Strings.isNullOrEmpty(stagingPayload.getAliasName())) {
+          target.setName(stagingPayload.getAliasName());
+        }
+        filesToStageBuilder.add(StagedFile.of(filePayload.getPath(), target));
+      }
+    }
+    return options.getStager().stageFiles(filesToStageBuilder.build());
+  }
+
+  private List<RunnerApi.ArtifactInformation> getDefaultArtifacts() {
+    ImmutableList.Builder<String> pathsToStageBuilder = 
ImmutableList.builder();
+    ImmutableMap.Builder<String, String> aliasMapBuilder = 
ImmutableMap.builder();
+    String windmillBinary =
+        
options.as(DataflowPipelineDebugOptions.class).getOverrideWindmillBinary();
+    String dataflowWorkerJar = options.getDataflowWorkerJar();
+    if (dataflowWorkerJar != null && !dataflowWorkerJar.isEmpty()) {
+      // Put the user specified worker jar at the start of the classpath, to 
be consistent with the
+      // built in worker order.
+      pathsToStageBuilder.add(dataflowWorkerJar);
+      aliasMapBuilder.put(dataflowWorkerJar, "dataflow-worker.jar");
+    }
+    for (String path : options.getFilesToStage()) {
+      if (path.contains("=")) {

Review comment:
       Yes. This syntax is only supported in Dataflow runner. `DataflowPackage` 
has a separate field `name` in addition to `location` and "=" separator allows 
to prefix `name` to the location of the source e.g. 
"dataflow.jar=/tmp/foo.jar". I could remove this special syntax but I decided 
to keep it since it's already exposed to users via `--filesToStage` option so 
removing it may cause backward compatibility issue.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to