[ https://issues.apache.org/jira/browse/BEAM-9383?focusedWorklogId=430494&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-430494 ]
ASF GitHub Bot logged work on BEAM-9383: ---------------------------------------- Author: ASF GitHub Bot Created on: 05/May/20 02:27 Start Date: 05/May/20 02:27 Worklog Time Spent: 10m Work Description: ihji commented on a change in pull request #11039: URL: https://github.com/apache/beam/pull/11039#discussion_r419832339 ########## File path: runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ########## @@ -772,6 +783,88 @@ private Debuggee registerDebuggee(CloudDebugger debuggerClient, String uniquifie } } + private List<DataflowPackage> stageArtifacts(RunnerApi.Pipeline pipeline) { + ImmutableList.Builder<StagedFile> filesToStageBuilder = ImmutableList.builder(); + for (Map.Entry<String, RunnerApi.Environment> entry : + pipeline.getComponents().getEnvironmentsMap().entrySet()) { + for (RunnerApi.ArtifactInformation info : entry.getValue().getDependenciesList()) { + if (!BeamUrns.getUrn(RunnerApi.StandardArtifacts.Types.FILE).equals(info.getTypeUrn())) { + throw new RuntimeException( + String.format("unsupported artifact type %s", info.getTypeUrn())); + } + RunnerApi.ArtifactFilePayload filePayload; + try { + filePayload = RunnerApi.ArtifactFilePayload.parseFrom(info.getTypePayload()); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException("Error parsing artifact file payload.", e); + } + if (!BeamUrns.getUrn(RunnerApi.StandardArtifacts.Roles.STAGING_TO) + .equals(info.getRoleUrn())) { + throw new RuntimeException( + String.format("unsupported artifact role %s", info.getRoleUrn())); + } + RunnerApi.ArtifactStagingToRolePayload stagingPayload; + try { + stagingPayload = RunnerApi.ArtifactStagingToRolePayload.parseFrom(info.getRolePayload()); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException("Error parsing artifact staging_to role payload.", e); + } + DataflowPackage target = new DataflowPackage(); + target.setLocation(stagingPayload.getStagedName()); + if (!Strings.isNullOrEmpty(stagingPayload.getAliasName())) { + target.setName(stagingPayload.getAliasName()); + } + filesToStageBuilder.add(StagedFile.of(filePayload.getPath(), target)); + } + } + return options.getStager().stageFiles(filesToStageBuilder.build()); + } + + private List<RunnerApi.ArtifactInformation> getDefaultArtifacts() { + ImmutableList.Builder<String> pathsToStageBuilder = ImmutableList.builder(); + ImmutableMap.Builder<String, String> aliasMapBuilder = ImmutableMap.builder(); + String windmillBinary = + options.as(DataflowPipelineDebugOptions.class).getOverrideWindmillBinary(); + String dataflowWorkerJar = options.getDataflowWorkerJar(); + if (dataflowWorkerJar != null && !dataflowWorkerJar.isEmpty()) { + // Put the user specified worker jar at the start of the classpath, to be consistent with the + // built in worker order. + pathsToStageBuilder.add(dataflowWorkerJar); + aliasMapBuilder.put(dataflowWorkerJar, "dataflow-worker.jar"); + } + for (String path : options.getFilesToStage()) { + if (path.contains("=")) { Review comment: Yes. This syntax is only supported in Dataflow runner. `DataflowPackage` has a separate field `name` in addition to `location` and "=" separator allows to prefix `name` to the location of the source e.g. "dataflow.jar=/tmp/foo.jar". I could remove this special syntax but I decided to keep it since it's already exposed to users via `--filesToStage` option so removing it may cause backward compatibility issue. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 430494) Time Spent: 7h 20m (was: 7h 10m) > Staging Dataflow artifacts from environment > ------------------------------------------- > > Key: BEAM-9383 > URL: https://issues.apache.org/jira/browse/BEAM-9383 > Project: Beam > Issue Type: Sub-task > Components: java-fn-execution > Reporter: Heejong Lee > Assignee: Heejong Lee > Priority: Major > Time Spent: 7h 20m > Remaining Estimate: 0h > > Staging Dataflow artifacts from environment -- This message was sent by Atlassian Jira (v8.3.4#803005)