robertwb commented on a change in pull request #11039:
URL: https://github.com/apache/beam/pull/11039#discussion_r418651280



##########
File path: model/pipeline/src/main/proto/beam_runner_api.proto
##########
@@ -1271,6 +1271,11 @@ message DeferredArtifactPayload {
 message ArtifactStagingToRolePayload {
   // A generated staged name (relative path under staging directory).
   string staged_name = 1;
+
+  // (Optional) An artifact name when a runner supports it.
+  // For example, DataflowRunner requires predefined names for some artifacts
+  // such as "dataflow-worker.jar", "windmill_main".
+  string alias_name = 2;

Review comment:
       Why does this have to be distinct from staged_name? (Also, eventually we 
hope that designated roles can remove the need for magic names.)

##########
File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java
##########
@@ -210,56 +209,55 @@ public static Environment createProcessEnvironment(
     }
   }
 
-  private static List<ArtifactInformation> getArtifacts(List<String> 
stagingFiles) {
-    Set<String> pathsToStage = Sets.newHashSet(stagingFiles);
+  public static List<ArtifactInformation> getArtifacts(
+      List<String> stagingFiles, StagingFileNameGenerator generator) {
     ImmutableList.Builder<ArtifactInformation> artifactsBuilder = 
ImmutableList.builder();
-    for (String path : pathsToStage) {
+    for (String path : ImmutableSet.copyOf(stagingFiles)) {

Review comment:
       Isn't order important to preserve? (Also, why do we need to make a copy?)

##########
File path: 
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
##########
@@ -784,7 +877,25 @@ public DataflowPipelineJob run(Pipeline pipeline) {
         "Executing pipeline on the Dataflow Service, which will have billing 
implications "
             + "related to Google Compute Engine usage and other Google Cloud 
Services.");
 
-    List<DataflowPackage> packages = options.getStager().stageDefaultFiles();
+    // Capture the sdkComponents for look up during step translations
+    SdkComponents sdkComponents = SdkComponents.create();
+
+    DataflowPipelineOptions dataflowOptions = 
options.as(DataflowPipelineOptions.class);
+    String workerHarnessContainerImageURL = 
DataflowRunner.getContainerImageForJob(dataflowOptions);
+    RunnerApi.Environment defaultEnvironmentForDataflow =
+        Environments.createDockerEnvironment(workerHarnessContainerImageURL);
+
+    sdkComponents.registerEnvironment(
+        defaultEnvironmentForDataflow
+            .toBuilder()
+            .addAllDependencies(getDefaultArtifacts())

Review comment:
       How does this get invoked for cross-language pipelines?

##########
File path: 
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
##########
@@ -336,25 +323,26 @@ public DataflowPackage stageToFile(
     final AtomicInteger numCached = new AtomicInteger(0);
     List<CompletionStage<DataflowPackage>> destinationPackages = new 
ArrayList<>();
 
-    for (String classpathElement : classpathElements) {
-      DataflowPackage sourcePackage = new DataflowPackage();
-      if (classpathElement.contains("=")) {
-        String[] components = classpathElement.split("=", 2);
-        sourcePackage.setName(components[0]);
-        sourcePackage.setLocation(components[1]);
-      } else {
-        sourcePackage.setName(null);
-        sourcePackage.setLocation(classpathElement);
+    for (StagedFile classpathElement : classpathElements) {
+      DataflowPackage targetPackage = classpathElement.getStagedPackage();
+      String source = classpathElement.getSource();
+      if (source.contains("=")) {

Review comment:
       Why do we have to handle this here and above?

##########
File path: 
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
##########
@@ -772,6 +783,88 @@ private Debuggee registerDebuggee(CloudDebugger 
debuggerClient, String uniquifie
     }
   }
 
+  private List<DataflowPackage> stageArtifacts(RunnerApi.Pipeline pipeline) {
+    ImmutableList.Builder<StagedFile> filesToStageBuilder = 
ImmutableList.builder();
+    for (Map.Entry<String, RunnerApi.Environment> entry :
+        pipeline.getComponents().getEnvironmentsMap().entrySet()) {
+      for (RunnerApi.ArtifactInformation info : 
entry.getValue().getDependenciesList()) {
+        if 
(!BeamUrns.getUrn(RunnerApi.StandardArtifacts.Types.FILE).equals(info.getTypeUrn()))
 {
+          throw new RuntimeException(
+              String.format("unsupported artifact type %s", 
info.getTypeUrn()));
+        }
+        RunnerApi.ArtifactFilePayload filePayload;
+        try {
+          filePayload = 
RunnerApi.ArtifactFilePayload.parseFrom(info.getTypePayload());
+        } catch (InvalidProtocolBufferException e) {
+          throw new RuntimeException("Error parsing artifact file payload.", 
e);
+        }
+        if (!BeamUrns.getUrn(RunnerApi.StandardArtifacts.Roles.STAGING_TO)
+            .equals(info.getRoleUrn())) {
+          throw new RuntimeException(
+              String.format("unsupported artifact role %s", 
info.getRoleUrn()));
+        }
+        RunnerApi.ArtifactStagingToRolePayload stagingPayload;
+        try {
+          stagingPayload = 
RunnerApi.ArtifactStagingToRolePayload.parseFrom(info.getRolePayload());
+        } catch (InvalidProtocolBufferException e) {
+          throw new RuntimeException("Error parsing artifact staging_to role 
payload.", e);
+        }
+        DataflowPackage target = new DataflowPackage();
+        target.setLocation(stagingPayload.getStagedName());
+        if (!Strings.isNullOrEmpty(stagingPayload.getAliasName())) {
+          target.setName(stagingPayload.getAliasName());
+        }
+        filesToStageBuilder.add(StagedFile.of(filePayload.getPath(), target));
+      }
+    }
+    return options.getStager().stageFiles(filesToStageBuilder.build());
+  }
+
+  private List<RunnerApi.ArtifactInformation> getDefaultArtifacts() {
+    ImmutableList.Builder<String> pathsToStageBuilder = 
ImmutableList.builder();
+    ImmutableMap.Builder<String, String> aliasMapBuilder = 
ImmutableMap.builder();
+    String windmillBinary =
+        
options.as(DataflowPipelineDebugOptions.class).getOverrideWindmillBinary();
+    String dataflowWorkerJar = options.getDataflowWorkerJar();
+    if (dataflowWorkerJar != null && !dataflowWorkerJar.isEmpty()) {
+      // Put the user specified worker jar at the start of the classpath, to 
be consistent with the
+      // built in worker order.
+      pathsToStageBuilder.add(dataflowWorkerJar);
+      aliasMapBuilder.put(dataflowWorkerJar, "dataflow-worker.jar");
+    }
+    for (String path : options.getFilesToStage()) {
+      if (path.contains("=")) {

Review comment:
       Is this = support a dataflow-only thing? Seems we don't support that in 
`Environments.getArtifacts()` (but if we did nearly all of this code could go 
away). 

##########
File path: 
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
##########
@@ -784,7 +877,25 @@ public DataflowPipelineJob run(Pipeline pipeline) {
         "Executing pipeline on the Dataflow Service, which will have billing 
implications "
             + "related to Google Compute Engine usage and other Google Cloud 
Services.");
 
-    List<DataflowPackage> packages = options.getStager().stageDefaultFiles();
+    // Capture the sdkComponents for look up during step translations
+    SdkComponents sdkComponents = SdkComponents.create();
+
+    DataflowPipelineOptions dataflowOptions = 
options.as(DataflowPipelineOptions.class);
+    String workerHarnessContainerImageURL = 
DataflowRunner.getContainerImageForJob(dataflowOptions);
+    RunnerApi.Environment defaultEnvironmentForDataflow =
+        Environments.createDockerEnvironment(workerHarnessContainerImageURL);
+
+    sdkComponents.registerEnvironment(
+        defaultEnvironmentForDataflow
+            .toBuilder()
+            .addAllDependencies(getDefaultArtifacts())
+            .build());
+
+    RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, 
sdkComponents, true);
+
+    LOG.debug("Portable pipeline proto:\n{}", 
TextFormat.printToString(pipelineProto));

Review comment:
       This is a really big log message, even for debug. (Even computing it 
could be expensive, for pipelines with 1000s of stages.)

##########
File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
##########
@@ -462,7 +462,8 @@ def run_pipeline(self, pipeline, options):
     use_fnapi = apiclient._use_fnapi(options)
     from apache_beam.transforms import environments
     default_environment = environments.DockerEnvironment.from_container_image(
-        apiclient.get_container_image_from_options(options))
+        apiclient.get_container_image_from_options(options),
+        artifacts=environments.python_sdk_dependencies(options))

Review comment:
       Should we push populating artifacts into from_container_image? 

##########
File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
##########
@@ -102,7 +102,8 @@ def setUp(self):
         '--staging_location=ignored',
         '--temp_location=/dev/null',
         '--no_auth',
-        '--dry_run=True'
+        '--dry_run=True',
+        '--sdk_location=container'

Review comment:
       Why this change?

##########
File path: 
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
##########
@@ -442,45 +448,56 @@ public static StagingResult uploaded(PackageAttributes 
attributes) {
   /** Holds the metadata necessary to stage a file or confirm that a staged 
file has not changed. */
   @AutoValue
   abstract static class PackageAttributes {
-
-    public static PackageAttributes forFileToStage(File source, String 
stagingPath)
+    public static PackageAttributes forFileToStage(File file, String 
stagingPath)
         throws IOException {
+      return forFileToStage(file.getPath(), null, stagingPath);
+    }
 
+    public static PackageAttributes forFileToStage(

Review comment:
       This seems highly redundant with what we're already doing in 
Environments.getArtifacts. Can't we ensure we have a set of (existing, 
non-directory) files in the environment, and then have these utilities simply 
do the uploading? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to