robertwb commented on a change in pull request #11039: URL: https://github.com/apache/beam/pull/11039#discussion_r418651280
########## File path: model/pipeline/src/main/proto/beam_runner_api.proto ########## @@ -1271,6 +1271,11 @@ message DeferredArtifactPayload { message ArtifactStagingToRolePayload { // A generated staged name (relative path under staging directory). string staged_name = 1; + + // (Optional) An artifact name when a runner supports it. + // For example, DataflowRunner requires predefined names for some artifacts + // such as "dataflow-worker.jar", "windmill_main". + string alias_name = 2; Review comment: Why does this have to be distinct from staged_name? (Also, eventually we hope that designated roles can remove the need for magic names.) ########## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java ########## @@ -210,56 +209,55 @@ public static Environment createProcessEnvironment( } } - private static List<ArtifactInformation> getArtifacts(List<String> stagingFiles) { - Set<String> pathsToStage = Sets.newHashSet(stagingFiles); + public static List<ArtifactInformation> getArtifacts( + List<String> stagingFiles, StagingFileNameGenerator generator) { ImmutableList.Builder<ArtifactInformation> artifactsBuilder = ImmutableList.builder(); - for (String path : pathsToStage) { + for (String path : ImmutableSet.copyOf(stagingFiles)) { Review comment: Isn't order important to preserve? (Also, why do we need to make a copy?) ########## File path: runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ########## @@ -784,7 +877,25 @@ public DataflowPipelineJob run(Pipeline pipeline) { "Executing pipeline on the Dataflow Service, which will have billing implications " + "related to Google Compute Engine usage and other Google Cloud Services."); - List<DataflowPackage> packages = options.getStager().stageDefaultFiles(); + // Capture the sdkComponents for look up during step translations + SdkComponents sdkComponents = SdkComponents.create(); + + DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class); + String workerHarnessContainerImageURL = DataflowRunner.getContainerImageForJob(dataflowOptions); + RunnerApi.Environment defaultEnvironmentForDataflow = + Environments.createDockerEnvironment(workerHarnessContainerImageURL); + + sdkComponents.registerEnvironment( + defaultEnvironmentForDataflow + .toBuilder() + .addAllDependencies(getDefaultArtifacts()) Review comment: How does this get invoked for cross-language pipelines? ########## File path: runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java ########## @@ -336,25 +323,26 @@ public DataflowPackage stageToFile( final AtomicInteger numCached = new AtomicInteger(0); List<CompletionStage<DataflowPackage>> destinationPackages = new ArrayList<>(); - for (String classpathElement : classpathElements) { - DataflowPackage sourcePackage = new DataflowPackage(); - if (classpathElement.contains("=")) { - String[] components = classpathElement.split("=", 2); - sourcePackage.setName(components[0]); - sourcePackage.setLocation(components[1]); - } else { - sourcePackage.setName(null); - sourcePackage.setLocation(classpathElement); + for (StagedFile classpathElement : classpathElements) { + DataflowPackage targetPackage = classpathElement.getStagedPackage(); + String source = classpathElement.getSource(); + if (source.contains("=")) { Review comment: Why do we have to handle this here and above? ########## File path: runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ########## @@ -772,6 +783,88 @@ private Debuggee registerDebuggee(CloudDebugger debuggerClient, String uniquifie } } + private List<DataflowPackage> stageArtifacts(RunnerApi.Pipeline pipeline) { + ImmutableList.Builder<StagedFile> filesToStageBuilder = ImmutableList.builder(); + for (Map.Entry<String, RunnerApi.Environment> entry : + pipeline.getComponents().getEnvironmentsMap().entrySet()) { + for (RunnerApi.ArtifactInformation info : entry.getValue().getDependenciesList()) { + if (!BeamUrns.getUrn(RunnerApi.StandardArtifacts.Types.FILE).equals(info.getTypeUrn())) { + throw new RuntimeException( + String.format("unsupported artifact type %s", info.getTypeUrn())); + } + RunnerApi.ArtifactFilePayload filePayload; + try { + filePayload = RunnerApi.ArtifactFilePayload.parseFrom(info.getTypePayload()); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException("Error parsing artifact file payload.", e); + } + if (!BeamUrns.getUrn(RunnerApi.StandardArtifacts.Roles.STAGING_TO) + .equals(info.getRoleUrn())) { + throw new RuntimeException( + String.format("unsupported artifact role %s", info.getRoleUrn())); + } + RunnerApi.ArtifactStagingToRolePayload stagingPayload; + try { + stagingPayload = RunnerApi.ArtifactStagingToRolePayload.parseFrom(info.getRolePayload()); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException("Error parsing artifact staging_to role payload.", e); + } + DataflowPackage target = new DataflowPackage(); + target.setLocation(stagingPayload.getStagedName()); + if (!Strings.isNullOrEmpty(stagingPayload.getAliasName())) { + target.setName(stagingPayload.getAliasName()); + } + filesToStageBuilder.add(StagedFile.of(filePayload.getPath(), target)); + } + } + return options.getStager().stageFiles(filesToStageBuilder.build()); + } + + private List<RunnerApi.ArtifactInformation> getDefaultArtifacts() { + ImmutableList.Builder<String> pathsToStageBuilder = ImmutableList.builder(); + ImmutableMap.Builder<String, String> aliasMapBuilder = ImmutableMap.builder(); + String windmillBinary = + options.as(DataflowPipelineDebugOptions.class).getOverrideWindmillBinary(); + String dataflowWorkerJar = options.getDataflowWorkerJar(); + if (dataflowWorkerJar != null && !dataflowWorkerJar.isEmpty()) { + // Put the user specified worker jar at the start of the classpath, to be consistent with the + // built in worker order. + pathsToStageBuilder.add(dataflowWorkerJar); + aliasMapBuilder.put(dataflowWorkerJar, "dataflow-worker.jar"); + } + for (String path : options.getFilesToStage()) { + if (path.contains("=")) { Review comment: Is this = support a dataflow-only thing? Seems we don't support that in `Environments.getArtifacts()` (but if we did nearly all of this code could go away). ########## File path: runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ########## @@ -784,7 +877,25 @@ public DataflowPipelineJob run(Pipeline pipeline) { "Executing pipeline on the Dataflow Service, which will have billing implications " + "related to Google Compute Engine usage and other Google Cloud Services."); - List<DataflowPackage> packages = options.getStager().stageDefaultFiles(); + // Capture the sdkComponents for look up during step translations + SdkComponents sdkComponents = SdkComponents.create(); + + DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class); + String workerHarnessContainerImageURL = DataflowRunner.getContainerImageForJob(dataflowOptions); + RunnerApi.Environment defaultEnvironmentForDataflow = + Environments.createDockerEnvironment(workerHarnessContainerImageURL); + + sdkComponents.registerEnvironment( + defaultEnvironmentForDataflow + .toBuilder() + .addAllDependencies(getDefaultArtifacts()) + .build()); + + RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, sdkComponents, true); + + LOG.debug("Portable pipeline proto:\n{}", TextFormat.printToString(pipelineProto)); Review comment: This is a really big log message, even for debug. (Even computing it could be expensive, for pipelines with 1000s of stages.) ########## File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py ########## @@ -462,7 +462,8 @@ def run_pipeline(self, pipeline, options): use_fnapi = apiclient._use_fnapi(options) from apache_beam.transforms import environments default_environment = environments.DockerEnvironment.from_container_image( - apiclient.get_container_image_from_options(options)) + apiclient.get_container_image_from_options(options), + artifacts=environments.python_sdk_dependencies(options)) Review comment: Should we push populating artifacts into from_container_image? ########## File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py ########## @@ -102,7 +102,8 @@ def setUp(self): '--staging_location=ignored', '--temp_location=/dev/null', '--no_auth', - '--dry_run=True' + '--dry_run=True', + '--sdk_location=container' Review comment: Why this change? ########## File path: runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java ########## @@ -442,45 +448,56 @@ public static StagingResult uploaded(PackageAttributes attributes) { /** Holds the metadata necessary to stage a file or confirm that a staged file has not changed. */ @AutoValue abstract static class PackageAttributes { - - public static PackageAttributes forFileToStage(File source, String stagingPath) + public static PackageAttributes forFileToStage(File file, String stagingPath) throws IOException { + return forFileToStage(file.getPath(), null, stagingPath); + } + public static PackageAttributes forFileToStage( Review comment: This seems highly redundant with what we're already doing in Environments.getArtifacts. Can't we ensure we have a set of (existing, non-directory) files in the environment, and then have these utilities simply do the uploading? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org