I would slightly disagree that this breaks the black box nature of the expansion, the "how the transform expands" remains unknown to the SDK requesting the expansion, the "how the transform executes" - on the other hand - is something that the SDK must cooperate on - it knows (or could or should know) what is the environment that the pipeline is going to be executed on looks like. That is why expansion service on its own cannot correctly define the execution environment. It could, if it would be bound to runner (and its environemnt) - for instance FlinkRunnerExpansionService could probably expand KafkaIO to something more 'native'. But that requires knowledge of the target runner. If the expansion service is not dedicated to a runner, the only place where it can be defined, is the SDK - and therefore the expansion request.

> Power users can always modify the output produced by the expansion service as well.

I'm not sure if I follow this, do you mean that power users, who run the expansion service can modify the output? Or is the output (protobuf) of the expansion service easily transferable between different execution environments?- I had the impression, that execution environments do not necessarily have to have the same payloads associated with them, and therefore it is impossible to 'postprocess' the output of the expansion. Is that wrong assumption?

On 6/29/21 7:55 PM, Luke Cwik wrote:
This would "break" the black box where the expansion service is supposed to hide the implementation internals from the caller and pushes compatibility of these kinds of environment overrides on to the expansion service and its implementer.

Power users can always modify the output produced by the expansion service as well.

On Tue, Jun 29, 2021 at 10:08 AM Jan Lukavský <[email protected] <mailto:[email protected]>> wrote:

    The argument for being able to accept (possibly ordered list of)
    execution environments is in that this could make a single
    instance of execution service reusable by various clients with
    different requirements. Moreover, the two approaches are probably
    orthogonal - users could specify 'defaultExecutionEnvironment' for
    the service which could be used in case when there is no
    preference given by the client.

    On 6/29/21 7:03 PM, Luke Cwik wrote:
    I would be much more inclined for the user being able to
    configure the expansion service for their needs instead of
    changing the expansion service API.

    On Tue, Jun 29, 2021 at 9:42 AM Jan Lukavský <[email protected]
    <mailto:[email protected]>> wrote:

        If I understand it correctly, there is currently no place to
        set the
        defaultEnvironmentType - python's KafkaIO uses either
        'expansion_service' given by the user (which might be a
        host:port, or an
        object that has appropriate method), or calls
        'default_io_expansion_service' - which in turn runs
        ExpansionService
        using gradle. Either way, it ends up in ExpansionService#main
        [1]. It
        could be possible to adapt ExpansionService and call it
        locally -
        provided ExpansionService would provide a way to extend it
        (using
        protected method createPipeline()) seems to be enough - but
        that is not
        too much user-friendly. If we could specify the
        defaultEnvironmentConfig
        when starting the ExpansionService, it would be possible to
        add these
        parameters in the python SDK's KafkaIO, which would mean
        users do not
        have to worry about the expansion service at all (leaving
        aside that
        using too many ReafFromKafka or WriteToKafka transforms would
        somewhat
        hurt performance during pipeline build, but that applies to
        the pipeline
        build time only). I have created [2] to track that.

        Does that make sense, or is my analysis incorrect?

          Jan

        [1]
        
https://github.com/apache/beam/blob/22205ee1a84581e9206c5c61bad88a799779b4bc/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L511
        
<https://github.com/apache/beam/blob/22205ee1a84581e9206c5c61bad88a799779b4bc/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L511>

        [2] https://issues.apache.org/jira/browse/BEAM-12539
        <https://issues.apache.org/jira/browse/BEAM-12539>


        On 6/29/21 6:24 PM, Alexey Romanenko wrote:
        > I’m sorry if I missed something but do you mean that
        PortablePipelineOptions.setDefaultEnvironmentType(String)
        doesn’t work for you? Or it’s only a specific case while
        using portable KafkaIO?
        >
        >> On 29 Jun 2021, at 09:51, Jan Lukavský
        <[email protected] <mailto:[email protected]>> wrote:
        >>
        >> Hi,
        >>
        >> I have come across an issue with cross-language
        transforms. My setup is I have working environment type
        PROCESS and I cannot use DOCKER. When I use Python's KafkaIO,
        it unfortunately - by default - expands to docker
        environment, which then fails due to missing 'docker'
        command. I didn't find a solution without tackling the
        expansion service, yet.
        >>
        >> I see several possible solutions to that:
        >>
        >>   a) I would say, that the cleanest solution would be to
        add preferred environment type to the expansion request to
        the expansion service (probably along with additional flags,
        probably --experiments?). This requires deeper changes to the
        expansion RPC defintion, probably serializing the
        PipelineOptions from the client environment into the
        ExpansionRequest.
        >>
        >>   b) Another option would be to allow specifying some of
        the command-line arguments when starting the expansion
        service, which currently accepts only port on command line,
        see [1]. The straightforward 'fix' (see [2]) unfortunately
        does not work, because it requires DirectRunner to be on the
        classpath, which then breaks other runners (see [3]). It
        seems possible to copy hand selected options from command
        line to the Pipeline, but that feels hackish. It would
        require to either be able to construct the Pipeline without a
        runner specified (which seems possible when calling
        Pipeline.create(), but not when using PipelineOptions create
        by parsing command-line arguments) or to be able to create a
        Map<String, String> from PIpelineOptions and then the ability
        to copy all options into the Pipeline's options.
        >>
        >> My proposal would be to create a hackish shortcut and just
        copy the --defaultEnvironmentType, --defaultEnvironmentConfig
        and --experiments into Pipeline's options for now, and create
        an issue for a proper solution (possible a)?).
        >>
        >> WDYT? Or did I miss a way to override the default expansion?
        >>
        >> Thanks for comments,
        >>
        >>   Jan
        >>
        >> [1]
        
https://github.com/apache/beam/blob/22205ee1a84581e9206c5c61bad88a799779b4bc/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L511
        
<https://github.com/apache/beam/blob/22205ee1a84581e9206c5c61bad88a799779b4bc/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L511>
        >>
        >> [2] https://github.com/apache/beam/pull/15082
        <https://github.com/apache/beam/pull/15082>
        >>
        >> [3]
        https://ci-beam.apache.org/job/beam_PreCommit_Java_Commit/18169/
        <https://ci-beam.apache.org/job/beam_PreCommit_Java_Commit/18169/>
        >>

Reply via email to