Got you. We are definitely interested in java worker pool to support Samza runner use case, and I think we could help implement on it if no one is currently working on it.
Comparing with what python offers, what I see that are missing are: 1. Main class/method to start ExternalWorkerService independently 2. Worker pool mode support in JDK Docker container in boot.go Is there anything else I missed? Best, Ke > On Apr 29, 2021, at 12:54 PM, Kyle Weaver <[email protected]> wrote: > > Thanks for the info. In order to use supported remote file systems, does it > mean it needs to be passed in as FILE_ARTIFACT_URN since neither > ArtifactRetrievalService#URL_ARTIFACT_URN = "beam:artifact:type:url:v1” nor > ArtifactRetrievalService#STAGING_TO_ARTIFACT_URN = > "beam:artifact:role:staging_to:v1” seems to be supported in getArtifact()? > > Yes. > > By the way, it seems the Python implementation of artifact_service does > handle URLs [1] - though it might not support them at every level of the > stack [2]. > > > On the other side, under circumstances, such as EXTERNAL environment type > with ExternalWorkerService, where artifacts are already available, what is > the expected usage to disable artifact staging phase in portable pipeline? > > I think you can just set --filesToStage to empty. > > > In addition, I noticed that the python counterpart > worker_pool_main#BeamFnExternalWorkerPoolServicer > <https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/worker_pool_main.py#L56> > does invoke artifact staging service to get artifacts from artifact endpoint > specified in StartWorkerRequest but not in the java ExternalWorkerService. Is > this discrepancy expected since java worker pool process does not likely want > to start the worker with different classpath/classloader? > > > It looks like ExternalWorkerService is only used for LOOPBACK mode in Java, > so I assume artifact staging/retrieval would be redundant. Whereas in Python, > the worker pool you linked to is started independently of job submission. But > there's no inherent reason it has to be that way. For example, someday we may > want to implement a Java worker pool [3]. > > [1] > https://github.com/apache/beam/blob/e0136ffc176d157d0928e7d501bca4daca3160a8/sdks/python/apache_beam/runners/portability/artifact_service.py#L81-L85 > > <https://github.com/apache/beam/blob/e0136ffc176d157d0928e7d501bca4daca3160a8/sdks/python/apache_beam/runners/portability/artifact_service.py#L81-L85> > [2] https://issues.apache.org/jira/browse/BEAM-11275 > <https://issues.apache.org/jira/browse/BEAM-11275> > [3] https://issues.apache.org/jira/browse/BEAM-8137 > <https://issues.apache.org/jira/browse/BEAM-8137> > On Wed, Apr 28, 2021 at 6:36 PM Ke Wu <[email protected] > <mailto:[email protected]>> wrote: > Thank you Kyle for the prompt response. > > > Yeah, that looks like a bug. > > Created BEAM-12251 <https://issues.apache.org/jira/browse/BEAM-12251> to > track the issue. > > > Files can use any of Beam's supported remote file systems (GCS, S3, Azure > > Blobstore, HDFS). But arbitrary URLs are not supported. > > Thanks for the info. In order to use supported remote file systems, does it > mean it needs to be passed in as FILE_ARTIFACT_URN since neither > ArtifactRetrievalService#URL_ARTIFACT_URN = "beam:artifact:type:url:v1” nor > ArtifactRetrievalService#STAGING_TO_ARTIFACT_URN = > "beam:artifact:role:staging_to:v1” seems to be supported in getArtifact()? > > On the other side, under circumstances, such as EXTERNAL environment type > with ExternalWorkerService, where artifacts are already available, what is > the expected usage to disable artifact staging phase in portable pipeline? > > In addition, I noticed that the python counterpart > worker_pool_main#BeamFnExternalWorkerPoolServicer > <https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/worker_pool_main.py#L56> > does invoke artifact staging service to get artifacts from artifact endpoint > specified in StartWorkerRequest but not in the java ExternalWorkerService. Is > this discrepancy expected since java worker pool process does not likely want > to start the worker with different classpath/classloader? > > Best, > Ke > > >> On Apr 28, 2021, at 5:55 PM, Kyle Weaver <[email protected] >> <mailto:[email protected]>> wrote: >> >> > I am expecting FileStagingOptions#setFilesToStage in >> > PortablePipelineOptions >> > <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java#L28> >> > is the way to customize artifacts to be staged and resolved in portable >> > pipeline, however, it looks like that PortableRunner >> > <https://github.com/apache/beam/blob/master/runners/portability/java/src/main/java/org/apache/beam/runners/portability/PortableRunner.java#L129> >> > does not add preconfigured files to `filesToStageBuilder` which is used >> > in the final options to prepare the job. Is this the expected behavior or >> > maybe a bug? >> >> Yeah, that looks like a bug. >> >> > In addition, do we support specifying an URL in >> > PortablePipelineOptions#filesToStage so that ArtifactRetrievalService can >> > retrieve artifacts from a remote address instead of default from >> > JobServer, which got artifacts from SDK Client. I am asking because I >> > noticed >> >> Files can use any of Beam's supported remote file systems (GCS, S3, Azure >> Blobstore, HDFS). But arbitrary URLs are not supported. >> >> On Wed, Apr 28, 2021 at 5:44 PM Ke Wu <[email protected] >> <mailto:[email protected]>> wrote: >> Hello All, >> >> I am expecting FileStagingOptions#setFilesToStage in PortablePipelineOptions >> <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java#L28> >> is the way to customize artifacts to be staged and resolved in portable >> pipeline, however, it looks like that PortableRunner >> <https://github.com/apache/beam/blob/master/runners/portability/java/src/main/java/org/apache/beam/runners/portability/PortableRunner.java#L129> >> does not add preconfigured files to `filesToStageBuilder` which is used in >> the final options to prepare the job. Is this the expected behavior or maybe >> a bug? >> >> In addition, do we support specifying an URL in >> PortablePipelineOptions#filesToStage so that ArtifactRetrievalService can >> retrieve artifacts from a remote address instead of default from JobServer, >> which got artifacts from SDK Client. I am asking because I noticed >> >> public static InputStream getArtifact(RunnerApi.ArtifactInformation >> artifact) throws IOException { >> switch (artifact.getTypeUrn()) { >> case FILE_ARTIFACT_URN: >> RunnerApi.ArtifactFilePayload payload = >> RunnerApi.ArtifactFilePayload.parseFrom(artifact.getTypePayload()); >> return Channels.newInputStream( >> FileSystems.open( >> FileSystems.matchNewResource(payload.getPath(), false /* is >> directory */))); >> case EMBEDDED_ARTIFACT_URN: >> return >> RunnerApi.EmbeddedFilePayload.parseFrom(artifact.getTypePayload()) >> .getData() >> .newInput(); >> default: >> throw new UnsupportedOperationException( >> "Unexpected artifact type: " + artifact.getTypeUrn()); >> } >> } >> Which indicates that only File and Embed artifacts seem to be supported now. >> >> Best, >> Ke >
