Thank you Kyle for the prompt response.

> Yeah, that looks like a bug.

Created BEAM-12251 <https://issues.apache.org/jira/browse/BEAM-12251> to track 
the issue.

> Files can use any of Beam's supported remote file systems (GCS, S3, Azure 
> Blobstore, HDFS). But arbitrary URLs are not supported.

Thanks for the info. In order to use supported remote file systems, does it 
mean it needs to be passed in as FILE_ARTIFACT_URN since neither 
ArtifactRetrievalService#URL_ARTIFACT_URN = "beam:artifact:type:url:v1” nor 
ArtifactRetrievalService#STAGING_TO_ARTIFACT_URN = 
"beam:artifact:role:staging_to:v1” seems to be supported in getArtifact()?

On the other side, under circumstances, such as EXTERNAL environment type with 
ExternalWorkerService, where artifacts are already available, what is the 
expected usage to disable artifact staging phase in portable pipeline?

In addition, I noticed that the python counterpart 
worker_pool_main#BeamFnExternalWorkerPoolServicer 
<https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/worker_pool_main.py#L56>
 does invoke artifact staging service to get artifacts from artifact endpoint 
specified in StartWorkerRequest but not in the java ExternalWorkerService. Is 
this discrepancy expected since java worker pool process does not likely want 
to start the worker with different classpath/classloader?

Best,
Ke


> On Apr 28, 2021, at 5:55 PM, Kyle Weaver <[email protected]> wrote:
> 
> > I am expecting FileStagingOptions#setFilesToStage in 
> > PortablePipelineOptions 
> > <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java#L28>
> >  is the way to customize artifacts to be staged and resolved in portable 
> > pipeline, however, it looks like that PortableRunner 
> > <https://github.com/apache/beam/blob/master/runners/portability/java/src/main/java/org/apache/beam/runners/portability/PortableRunner.java#L129>
> >  does not add preconfigured files to `filesToStageBuilder` which is used in 
> > the final options to prepare the job. Is this the expected behavior or 
> > maybe a bug?
> 
> Yeah, that looks like a bug.
> 
> > In addition, do we support specifying an URL in 
> > PortablePipelineOptions#filesToStage so that ArtifactRetrievalService can 
> > retrieve artifacts from a remote address instead of default from JobServer, 
> > which got artifacts from SDK Client. I am asking because I noticed
> 
> Files can use any of Beam's supported remote file systems (GCS, S3, Azure 
> Blobstore, HDFS). But arbitrary URLs are not supported.
> 
> On Wed, Apr 28, 2021 at 5:44 PM Ke Wu <[email protected] 
> <mailto:[email protected]>> wrote:
> Hello All,
> 
> I am expecting FileStagingOptions#setFilesToStage in PortablePipelineOptions 
> <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java#L28>
>  is the way to customize artifacts to be staged and resolved in portable 
> pipeline, however, it looks like that PortableRunner 
> <https://github.com/apache/beam/blob/master/runners/portability/java/src/main/java/org/apache/beam/runners/portability/PortableRunner.java#L129>
>  does not add preconfigured files to `filesToStageBuilder` which is used in 
> the final options to prepare the job. Is this the expected behavior or maybe 
> a bug?
> 
> In addition, do we support specifying an URL in 
> PortablePipelineOptions#filesToStage so that ArtifactRetrievalService can 
> retrieve artifacts from a remote address instead of default from JobServer, 
> which got artifacts from SDK Client. I am asking because I noticed
> 
> public static InputStream getArtifact(RunnerApi.ArtifactInformation artifact) 
> throws IOException {
>   switch (artifact.getTypeUrn()) {
>     case FILE_ARTIFACT_URN:
>       RunnerApi.ArtifactFilePayload payload =
>           RunnerApi.ArtifactFilePayload.parseFrom(artifact.getTypePayload());
>       return Channels.newInputStream(
>           FileSystems.open(
>               FileSystems.matchNewResource(payload.getPath(), false /* is 
> directory */)));
>     case EMBEDDED_ARTIFACT_URN:
>       return 
> RunnerApi.EmbeddedFilePayload.parseFrom(artifact.getTypePayload())
>           .getData()
>           .newInput();
>     default:
>       throw new UnsupportedOperationException(
>           "Unexpected artifact type: " + artifact.getTypeUrn());
>   }
> }
> Which indicates that only File and Embed artifacts seem to be supported now.
> 
> Best,
> Ke

Reply via email to