Thank you Kyle for the prompt response. > Yeah, that looks like a bug.
Created BEAM-12251 <https://issues.apache.org/jira/browse/BEAM-12251> to track the issue. > Files can use any of Beam's supported remote file systems (GCS, S3, Azure > Blobstore, HDFS). But arbitrary URLs are not supported. Thanks for the info. In order to use supported remote file systems, does it mean it needs to be passed in as FILE_ARTIFACT_URN since neither ArtifactRetrievalService#URL_ARTIFACT_URN = "beam:artifact:type:url:v1” nor ArtifactRetrievalService#STAGING_TO_ARTIFACT_URN = "beam:artifact:role:staging_to:v1” seems to be supported in getArtifact()? On the other side, under circumstances, such as EXTERNAL environment type with ExternalWorkerService, where artifacts are already available, what is the expected usage to disable artifact staging phase in portable pipeline? In addition, I noticed that the python counterpart worker_pool_main#BeamFnExternalWorkerPoolServicer <https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/worker_pool_main.py#L56> does invoke artifact staging service to get artifacts from artifact endpoint specified in StartWorkerRequest but not in the java ExternalWorkerService. Is this discrepancy expected since java worker pool process does not likely want to start the worker with different classpath/classloader? Best, Ke > On Apr 28, 2021, at 5:55 PM, Kyle Weaver <[email protected]> wrote: > > > I am expecting FileStagingOptions#setFilesToStage in > > PortablePipelineOptions > > <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java#L28> > > is the way to customize artifacts to be staged and resolved in portable > > pipeline, however, it looks like that PortableRunner > > <https://github.com/apache/beam/blob/master/runners/portability/java/src/main/java/org/apache/beam/runners/portability/PortableRunner.java#L129> > > does not add preconfigured files to `filesToStageBuilder` which is used in > > the final options to prepare the job. Is this the expected behavior or > > maybe a bug? > > Yeah, that looks like a bug. > > > In addition, do we support specifying an URL in > > PortablePipelineOptions#filesToStage so that ArtifactRetrievalService can > > retrieve artifacts from a remote address instead of default from JobServer, > > which got artifacts from SDK Client. I am asking because I noticed > > Files can use any of Beam's supported remote file systems (GCS, S3, Azure > Blobstore, HDFS). But arbitrary URLs are not supported. > > On Wed, Apr 28, 2021 at 5:44 PM Ke Wu <[email protected] > <mailto:[email protected]>> wrote: > Hello All, > > I am expecting FileStagingOptions#setFilesToStage in PortablePipelineOptions > <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java#L28> > is the way to customize artifacts to be staged and resolved in portable > pipeline, however, it looks like that PortableRunner > <https://github.com/apache/beam/blob/master/runners/portability/java/src/main/java/org/apache/beam/runners/portability/PortableRunner.java#L129> > does not add preconfigured files to `filesToStageBuilder` which is used in > the final options to prepare the job. Is this the expected behavior or maybe > a bug? > > In addition, do we support specifying an URL in > PortablePipelineOptions#filesToStage so that ArtifactRetrievalService can > retrieve artifacts from a remote address instead of default from JobServer, > which got artifacts from SDK Client. I am asking because I noticed > > public static InputStream getArtifact(RunnerApi.ArtifactInformation artifact) > throws IOException { > switch (artifact.getTypeUrn()) { > case FILE_ARTIFACT_URN: > RunnerApi.ArtifactFilePayload payload = > RunnerApi.ArtifactFilePayload.parseFrom(artifact.getTypePayload()); > return Channels.newInputStream( > FileSystems.open( > FileSystems.matchNewResource(payload.getPath(), false /* is > directory */))); > case EMBEDDED_ARTIFACT_URN: > return > RunnerApi.EmbeddedFilePayload.parseFrom(artifact.getTypePayload()) > .getData() > .newInput(); > default: > throw new UnsupportedOperationException( > "Unexpected artifact type: " + artifact.getTypeUrn()); > } > } > Which indicates that only File and Embed artifacts seem to be supported now. > > Best, > Ke
