> > Thanks for the info. In order to use supported remote file systems, does > it mean it needs to be passed in as FILE_ARTIFACT_URN since neither > *ArtifactRetrievalService#URL_ARTIFACT_URN = "beam:artifact:type:url:v1” * > nor *ArtifactRetrievalService#STAGING_TO_ARTIFACT_URN = > "beam:artifact:role:staging_to:v1” *seems to be supported in > getArtifact()? >
Yes. By the way, it seems the Python implementation of artifact_service *does *handle URLs [1] - though it might not support them at every level of the stack [2]. > > On the other side, under circumstances, such as EXTERNAL environment type > with ExternalWorkerService, where artifacts are already available, what is > the expected usage to disable artifact staging phase in portable pipeline? > I think you can just set --filesToStage to empty. > > In addition, I noticed that the python counterpart > worker_pool_main#BeamFnExternalWorkerPoolServicer > <https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/worker_pool_main.py#L56> > does > invoke artifact staging service to get artifacts from artifact endpoint > specified in StartWorkerRequest but not in the java ExternalWorkerService. > Is this discrepancy expected since java worker pool process does not likely > want to start the worker with different classpath/classloader? > > It looks like ExternalWorkerService is only used for LOOPBACK mode in Java, so I assume artifact staging/retrieval would be redundant. Whereas in Python, the worker pool you linked to is started independently of job submission. But there's no inherent reason it has to be that way. For example, someday we may want to implement a Java worker pool [3]. [1] https://github.com/apache/beam/blob/e0136ffc176d157d0928e7d501bca4daca3160a8/sdks/python/apache_beam/runners/portability/artifact_service.py#L81-L85 [2] https://issues.apache.org/jira/browse/BEAM-11275 [3] https://issues.apache.org/jira/browse/BEAM-8137 On Wed, Apr 28, 2021 at 6:36 PM Ke Wu <[email protected]> wrote: > Thank you Kyle for the prompt response. > > > Yeah, that looks like a bug. > > Created BEAM-12251 <https://issues.apache.org/jira/browse/BEAM-12251> to > track the issue. > > > Files can use any of Beam's supported remote file systems (GCS, S3, > Azure Blobstore, HDFS). But arbitrary URLs are not supported. > > Thanks for the info. In order to use supported remote file systems, does > it mean it needs to be passed in as FILE_ARTIFACT_URN since neither > *ArtifactRetrievalService#URL_ARTIFACT_URN = "beam:artifact:type:url:v1” > *nor *ArtifactRetrievalService#STAGING_TO_ARTIFACT_URN = > "beam:artifact:role:staging_to:v1” *seems to be supported in > getArtifact()? > > On the other side, under circumstances, such as EXTERNAL environment type > with ExternalWorkerService, where artifacts are already available, what is > the expected usage to disable artifact staging phase in portable pipeline? > > In addition, I noticed that the python counterpart > worker_pool_main#BeamFnExternalWorkerPoolServicer > <https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/worker_pool_main.py#L56> > does > invoke artifact staging service to get artifacts from artifact endpoint > specified in StartWorkerRequest but not in the java ExternalWorkerService. > Is this discrepancy expected since java worker pool process does not likely > want to start the worker with different classpath/classloader? > > Best, > Ke > > > On Apr 28, 2021, at 5:55 PM, Kyle Weaver <[email protected]> wrote: > > > I am expecting FileStagingOptions#setFilesToStage in > PortablePipelineOptions > <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java#L28> > is > the way to customize artifacts to be staged and resolved in portable > pipeline, however, it looks like that PortableRunner > <https://github.com/apache/beam/blob/master/runners/portability/java/src/main/java/org/apache/beam/runners/portability/PortableRunner.java#L129> > does > not add preconfigured files to `filesToStageBuilder` which is used in the > final options to prepare the job. Is this the expected behavior or maybe a > bug? > > Yeah, that looks like a bug. > > > In addition, do we support specifying an URL in > PortablePipelineOptions#filesToStage so that ArtifactRetrievalService can > retrieve artifacts from a remote address instead of default from JobServer, > which got artifacts from SDK Client. I am asking because I noticed > > Files can use any of Beam's supported remote file systems (GCS, S3, Azure > Blobstore, HDFS). But arbitrary URLs are not supported. > > On Wed, Apr 28, 2021 at 5:44 PM Ke Wu <[email protected]> wrote: > >> Hello All, >> >> I am expecting FileStagingOptions#setFilesToStage in >> PortablePipelineOptions >> <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java#L28> >> is >> the way to customize artifacts to be staged and resolved in portable >> pipeline, however, it looks like that PortableRunner >> <https://github.com/apache/beam/blob/master/runners/portability/java/src/main/java/org/apache/beam/runners/portability/PortableRunner.java#L129> >> does >> not add preconfigured files to `filesToStageBuilder` which is used in the >> final options to prepare the job. Is this the expected behavior or maybe a >> bug? >> >> In addition, do we support specifying an URL in >> PortablePipelineOptions#filesToStage so that ArtifactRetrievalService can >> retrieve artifacts from a remote address instead of default from JobServer, >> which got artifacts from SDK Client. I am asking because I noticed >> >> public static InputStream getArtifact(RunnerApi.ArtifactInformation >> artifact) throws IOException { >> switch (artifact.getTypeUrn()) { >> case FILE_ARTIFACT_URN: >> RunnerApi.ArtifactFilePayload payload = >> RunnerApi.ArtifactFilePayload.parseFrom(artifact.getTypePayload()); >> return Channels.newInputStream( >> FileSystems.open( >> FileSystems.matchNewResource(payload.getPath(), false /* is >> directory */))); >> case EMBEDDED_ARTIFACT_URN: >> return >> RunnerApi.EmbeddedFilePayload.parseFrom(artifact.getTypePayload()) >> .getData() >> .newInput(); >> default: >> throw new UnsupportedOperationException( >> "Unexpected artifact type: " + artifact.getTypeUrn()); >> } >> } >> >> Which indicates that only File and Embed artifacts seem to be supported >> now. >> >> Best, >> Ke >> > >
