The argument for being able to accept (possibly ordered list of) execution environments is in that this could make a single instance of execution service reusable by various clients with different requirements. Moreover, the two approaches are probably orthogonal - users could specify 'defaultExecutionEnvironment' for the service which could be used in case when there is no preference given by the client.

On 6/29/21 7:03 PM, Luke Cwik wrote:
I would be much more inclined for the user being able to configure the expansion service for their needs instead of changing the expansion service API.

On Tue, Jun 29, 2021 at 9:42 AM Jan Lukavský <[email protected] <mailto:[email protected]>> wrote:

    If I understand it correctly, there is currently no place to set the
    defaultEnvironmentType - python's KafkaIO uses either
    'expansion_service' given by the user (which might be a host:port,
    or an
    object that has appropriate method), or calls
    'default_io_expansion_service' - which in turn runs ExpansionService
    using gradle. Either way, it ends up in ExpansionService#main [1]. It
    could be possible to adapt ExpansionService and call it locally -
    provided ExpansionService would provide a way to extend it (using
    protected method createPipeline()) seems to be enough - but that
    is not
    too much user-friendly. If we could specify the
    defaultEnvironmentConfig
    when starting the ExpansionService, it would be possible to add these
    parameters in the python SDK's KafkaIO, which would mean users do not
    have to worry about the expansion service at all (leaving aside that
    using too many ReafFromKafka or WriteToKafka transforms would
    somewhat
    hurt performance during pipeline build, but that applies to the
    pipeline
    build time only). I have created [2] to track that.

    Does that make sense, or is my analysis incorrect?

      Jan

    [1]
    
https://github.com/apache/beam/blob/22205ee1a84581e9206c5c61bad88a799779b4bc/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L511
    
<https://github.com/apache/beam/blob/22205ee1a84581e9206c5c61bad88a799779b4bc/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L511>

    [2] https://issues.apache.org/jira/browse/BEAM-12539
    <https://issues.apache.org/jira/browse/BEAM-12539>


    On 6/29/21 6:24 PM, Alexey Romanenko wrote:
    > I’m sorry if I missed something but do you mean that
    PortablePipelineOptions.setDefaultEnvironmentType(String) doesn’t
    work for you? Or it’s only a specific case while using portable
    KafkaIO?
    >
    >> On 29 Jun 2021, at 09:51, Jan Lukavský <[email protected]
    <mailto:[email protected]>> wrote:
    >>
    >> Hi,
    >>
    >> I have come across an issue with cross-language transforms. My
    setup is I have working environment type PROCESS and I cannot use
    DOCKER. When I use Python's KafkaIO, it unfortunately - by default
    - expands to docker environment, which then fails due to missing
    'docker' command. I didn't find a solution without tackling the
    expansion service, yet.
    >>
    >> I see several possible solutions to that:
    >>
    >>   a) I would say, that the cleanest solution would be to add
    preferred environment type to the expansion request to the
    expansion service (probably along with additional flags, probably
    --experiments?). This requires deeper changes to the expansion RPC
    defintion, probably serializing the PipelineOptions from the
    client environment into the ExpansionRequest.
    >>
    >>   b) Another option would be to allow specifying some of the
    command-line arguments when starting the expansion service, which
    currently accepts only port on command line, see [1]. The
    straightforward 'fix' (see [2]) unfortunately does not work,
    because it requires DirectRunner to be on the classpath, which
    then breaks other runners (see [3]). It seems possible to copy
    hand selected options from command line to the Pipeline, but that
    feels hackish. It would require to either be able to construct the
    Pipeline without a runner specified (which seems possible when
    calling Pipeline.create(), but not when using PipelineOptions
    create by parsing command-line arguments) or to be able to create
    a Map<String, String> from PIpelineOptions and then the ability to
    copy all options into the Pipeline's options.
    >>
    >> My proposal would be to create a hackish shortcut and just copy
    the --defaultEnvironmentType, --defaultEnvironmentConfig and
    --experiments into Pipeline's options for now, and create an issue
    for a proper solution (possible a)?).
    >>
    >> WDYT? Or did I miss a way to override the default expansion?
    >>
    >> Thanks for comments,
    >>
    >>   Jan
    >>
    >> [1]
    
https://github.com/apache/beam/blob/22205ee1a84581e9206c5c61bad88a799779b4bc/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L511
    
<https://github.com/apache/beam/blob/22205ee1a84581e9206c5c61bad88a799779b4bc/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L511>
    >>
    >> [2] https://github.com/apache/beam/pull/15082
    <https://github.com/apache/beam/pull/15082>
    >>
    >> [3]
    https://ci-beam.apache.org/job/beam_PreCommit_Java_Commit/18169/
    <https://ci-beam.apache.org/job/beam_PreCommit_Java_Commit/18169/>
    >>

Reply via email to