Got you. We are definitely interested in java worker pool to support Samza 
runner use case, and I think we could help implement on it if no one is 
currently working on it.

Comparing with what python offers, what I see that are missing are:

1. Main class/method to start ExternalWorkerService independently 
2. Worker pool mode support in JDK Docker container in boot.go

Is there anything else I missed?

Best,
Ke

> On Apr 29, 2021, at 12:54 PM, Kyle Weaver <[email protected]> wrote:
> 
> Thanks for the info. In order to use supported remote file systems, does it 
> mean it needs to be passed in as FILE_ARTIFACT_URN since neither 
> ArtifactRetrievalService#URL_ARTIFACT_URN = "beam:artifact:type:url:v1” nor 
> ArtifactRetrievalService#STAGING_TO_ARTIFACT_URN = 
> "beam:artifact:role:staging_to:v1” seems to be supported in getArtifact()?
> 
> Yes.
> 
> By the way, it seems the Python implementation of artifact_service does 
> handle URLs [1] - though it might not support them at every level of the 
> stack [2].
>  
> 
> On the other side, under circumstances, such as EXTERNAL environment type 
> with ExternalWorkerService, where artifacts are already available, what is 
> the expected usage to disable artifact staging phase in portable pipeline?
> 
> I think you can just set --filesToStage to empty.
>  
> 
> In addition, I noticed that the python counterpart 
> worker_pool_main#BeamFnExternalWorkerPoolServicer 
> <https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/worker_pool_main.py#L56>
>  does invoke artifact staging service to get artifacts from artifact endpoint 
> specified in StartWorkerRequest but not in the java ExternalWorkerService. Is 
> this discrepancy expected since java worker pool process does not likely want 
> to start the worker with different classpath/classloader?
> 
> 
> It looks like ExternalWorkerService is only used for LOOPBACK mode in Java, 
> so I assume artifact staging/retrieval would be redundant. Whereas in Python, 
> the worker pool you linked to is started independently of job submission. But 
> there's no inherent reason it has to be that way. For example, someday we may 
> want to implement a Java worker pool [3].
> 
> [1] 
> https://github.com/apache/beam/blob/e0136ffc176d157d0928e7d501bca4daca3160a8/sdks/python/apache_beam/runners/portability/artifact_service.py#L81-L85
>  
> <https://github.com/apache/beam/blob/e0136ffc176d157d0928e7d501bca4daca3160a8/sdks/python/apache_beam/runners/portability/artifact_service.py#L81-L85>
> [2] https://issues.apache.org/jira/browse/BEAM-11275 
> <https://issues.apache.org/jira/browse/BEAM-11275>
> [3] https://issues.apache.org/jira/browse/BEAM-8137 
> <https://issues.apache.org/jira/browse/BEAM-8137>
> On Wed, Apr 28, 2021 at 6:36 PM Ke Wu <[email protected] 
> <mailto:[email protected]>> wrote:
> Thank you Kyle for the prompt response.
> 
> > Yeah, that looks like a bug.
> 
> Created BEAM-12251 <https://issues.apache.org/jira/browse/BEAM-12251> to 
> track the issue.
> 
> > Files can use any of Beam's supported remote file systems (GCS, S3, Azure 
> > Blobstore, HDFS). But arbitrary URLs are not supported.
> 
> Thanks for the info. In order to use supported remote file systems, does it 
> mean it needs to be passed in as FILE_ARTIFACT_URN since neither 
> ArtifactRetrievalService#URL_ARTIFACT_URN = "beam:artifact:type:url:v1” nor 
> ArtifactRetrievalService#STAGING_TO_ARTIFACT_URN = 
> "beam:artifact:role:staging_to:v1” seems to be supported in getArtifact()?
> 
> On the other side, under circumstances, such as EXTERNAL environment type 
> with ExternalWorkerService, where artifacts are already available, what is 
> the expected usage to disable artifact staging phase in portable pipeline?
> 
> In addition, I noticed that the python counterpart 
> worker_pool_main#BeamFnExternalWorkerPoolServicer 
> <https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/worker_pool_main.py#L56>
>  does invoke artifact staging service to get artifacts from artifact endpoint 
> specified in StartWorkerRequest but not in the java ExternalWorkerService. Is 
> this discrepancy expected since java worker pool process does not likely want 
> to start the worker with different classpath/classloader?
> 
> Best,
> Ke
> 
> 
>> On Apr 28, 2021, at 5:55 PM, Kyle Weaver <[email protected] 
>> <mailto:[email protected]>> wrote:
>> 
>> > I am expecting FileStagingOptions#setFilesToStage in 
>> > PortablePipelineOptions 
>> > <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java#L28>
>> >  is the way to customize artifacts to be staged and resolved in portable 
>> > pipeline, however, it looks like that PortableRunner 
>> > <https://github.com/apache/beam/blob/master/runners/portability/java/src/main/java/org/apache/beam/runners/portability/PortableRunner.java#L129>
>> >  does not add preconfigured files to `filesToStageBuilder` which is used 
>> > in the final options to prepare the job. Is this the expected behavior or 
>> > maybe a bug?
>> 
>> Yeah, that looks like a bug.
>> 
>> > In addition, do we support specifying an URL in 
>> > PortablePipelineOptions#filesToStage so that ArtifactRetrievalService can 
>> > retrieve artifacts from a remote address instead of default from 
>> > JobServer, which got artifacts from SDK Client. I am asking because I 
>> > noticed
>> 
>> Files can use any of Beam's supported remote file systems (GCS, S3, Azure 
>> Blobstore, HDFS). But arbitrary URLs are not supported.
>> 
>> On Wed, Apr 28, 2021 at 5:44 PM Ke Wu <[email protected] 
>> <mailto:[email protected]>> wrote:
>> Hello All,
>> 
>> I am expecting FileStagingOptions#setFilesToStage in PortablePipelineOptions 
>> <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java#L28>
>>  is the way to customize artifacts to be staged and resolved in portable 
>> pipeline, however, it looks like that PortableRunner 
>> <https://github.com/apache/beam/blob/master/runners/portability/java/src/main/java/org/apache/beam/runners/portability/PortableRunner.java#L129>
>>  does not add preconfigured files to `filesToStageBuilder` which is used in 
>> the final options to prepare the job. Is this the expected behavior or maybe 
>> a bug?
>> 
>> In addition, do we support specifying an URL in 
>> PortablePipelineOptions#filesToStage so that ArtifactRetrievalService can 
>> retrieve artifacts from a remote address instead of default from JobServer, 
>> which got artifacts from SDK Client. I am asking because I noticed
>> 
>> public static InputStream getArtifact(RunnerApi.ArtifactInformation 
>> artifact) throws IOException {
>>   switch (artifact.getTypeUrn()) {
>>     case FILE_ARTIFACT_URN:
>>       RunnerApi.ArtifactFilePayload payload =
>>           RunnerApi.ArtifactFilePayload.parseFrom(artifact.getTypePayload());
>>       return Channels.newInputStream(
>>           FileSystems.open(
>>               FileSystems.matchNewResource(payload.getPath(), false /* is 
>> directory */)));
>>     case EMBEDDED_ARTIFACT_URN:
>>       return 
>> RunnerApi.EmbeddedFilePayload.parseFrom(artifact.getTypePayload())
>>           .getData()
>>           .newInput();
>>     default:
>>       throw new UnsupportedOperationException(
>>           "Unexpected artifact type: " + artifact.getTypeUrn());
>>   }
>> }
>> Which indicates that only File and Embed artifacts seem to be supported now.
>> 
>> Best,
>> Ke
> 

Reply via email to