I would absolutely understand this, if it would be mostly impossible or at least really hard to get the user friendly behavior. But we are mostly there in this case. When we can actually quite simply pass the supported environment via parameter, I think we should go for it.

I have created a sketch (I verified that when the ExpansionService is patched 'enough' this works) in [1]. This is only a sketch, because we first must know how to support the default execution environment in ExpansionService.

[1] https://github.com/apache/beam/pull/15099/files

On 6/29/21 11:51 PM, Chamikara Jayalath wrote:


On Tue, Jun 29, 2021 at 2:39 PM Jan Lukavský <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

    On 6/29/21 11:04 PM, Robert Bradshaw wrote:
    > You can configure the environment in the current state, you just
    have
    > to run your own expansion service that has a different environment
    > backed into it (or, makes this configurable).
    Yes, that is true. On the other hand that lacks some
    user-friendliness,
    because ideally, you don't want to worry about expansion services,
    mostly when it comes to some mostly standard IO. The ideal case is
    that
    you either do not basically know that you use external transform
    (which
    is probably the case when you can use docker), or you are able to
    overcome the problem within the SDK (Python) by passing some
    argument to
    the input transform.


Arguments passed to the pipeline level apply to the whole pipeline (not just one transform). So if you pass in a default environment (and configs) at pipeline level, that would mean the default environment and configs used by the pipeline (so Python SDK in this case) not a specific transform. I believe we have made usage of external transforms used-friendly for the general case. But we had to make some assumptions. For example we assumed, * user will be using the default environment of the expansion service (Docker in this case) * User will be using the pre-specified dependency only (sdks:java:io:expansion-service:shadowJar for Kafka)
* User will be in an environment where the jar can be downloaded.

I would consider any use-case where these basic assumptions cannot be met as an advanced use-case. The solution in such a case would be to start a custom expansion service and pass the address of it as a parameter to the transform [1]. I'm fine with extending the capabilities of Java expansion service by adding more parameters (for example, for overriding the environment, for specifying dependencies, for providing pipeline options).

Thanks,
Cham

[1] https://github.com/apache/beam/blob/b86fcf94af26a240777f30f8193a314cb7ffc87e/sdks/python/apache_beam/io/kafka.py#L133 <https://github.com/apache/beam/blob/b86fcf94af26a240777f30f8193a314cb7ffc87e/sdks/python/apache_beam/io/kafka.py#L133>

    >
    > Is option (1) updating the default expansion service such that
    one can
    > override default environment properties on the command line? (You
    > would still have to start it up manually to use it.)
    Yes and no. :) Updating ExpansionService so that you can specify
    default
    environment on command like makes this accessible to
    JavaJarExpansionService, and that makes it possible to add (optional)
    argument to Python Kafka IO, that would delegate this to the
    (automatically) started expansion service. It is important to note
    that
    both ReadFromKafka and WriteToKafka have expansion that involves only
    single external (Java) SDK. That simplifies things.
    >
    > Maybe it would help to make things more concrete. Suppose I have
    a Go
    > pipeline that uses a library which invokes a Python external
    transform
    > to do ML (say, via TFX), and two Java IOs (which happen to have
    > mutually exclusive dependencies). The ML transform itself uses
    Java to
    > invoke some SQL.
    >
    > The way things work currently is each external transform will
    have an
    > associated fully specified environment and a runner can use
    docker to
    > start up the required workers at the expected time.
    >
    > Now, suppose one doesn't have docker on the workers. One wants
    to run this with
    >
    >      ./my_pipeline --someFlag=someValue
    --someOtherFlag=someOtherValue ...
    >
    > such that docker is no longer needed. What someFlags would we need,
    > and what would their values be? (And how to make this feasible to
    > implement.)
    >
    > Are there meaningful intermediate points that extend to a general
    > solution (or at least aren't hostile to it)?
    I believe that in the option 2) the best way would to use each
    SDK's URN
    Then the arguments could be something like
    "--expansionEnvironments={"apache:beam:go:2.33.0:latest"={"env"="DOCKER",

    config="<image>"}, "apache:beam:python:2.33.0:latest"={env="PROCESS",
    config={...}}". Yes, it would require a lot of "syntactic sugar" to
    configure that. :) (sorry if I don't have URNs for SDKs 100% correct)
    >
    >
    > I still think in the long run having runners understand
    environments,
    > and saying "oh, whenever I see 'apache:beam:java:2.33.0:latest' I'll
    > swap that out for 'path/to/my/java -cp ...' is the right way to go
    > long-term. (I would put this in runners, not SDKs, though a common
    > runners library could be used.)
    Yes, I also agree, that expansion service should be
    runner-dependent (or
    at least runner-aware), as that brings optimizations. Runner could
    ignore settings from previous point when it can be *sure* it can
    do so.
    >
    >
    > On Tue, Jun 29, 2021 at 1:29 PM Jan Lukavský <je...@seznam.cz
    <mailto:je...@seznam.cz>> wrote:
    >> Thanks for pointing to that thread.
    >>
    >> 1) I'm - as well as Kyle - fine with the approach that we use a
    >> "preferred environment" for the expansion service. We only need
    to pass
    >> it via command line. Yes, the command line might be generally
    >> SDK-dependent, and that makes it expansion dependent, because
    whether or
    >> not particular transform is "external" or not is implementation
    detail.
    >> That is the nasty part. The rest of my original question is
    about, how
    >> exactly to do that, because it seems to be tricky, due to the
    fact, that
    >> it is not possible to include runtime dependency on
    DirectRunner (fails
    >> many, many tests) and it is not possible to extract
    PipelineOptions as a
    >> Map either.
    >>
    >> 2) Regarding SDK injecting environment, I still think that is the
    >> correct way. The SDK (the driver code) own the execution
    environment. It
    >> should be able to define (or at least prioritize) runtime
    environments
    >> of all transforms. If we cannot know in advance, which transform is
    >> going to expand to how many nested (and possibly external)
    transforms, I
    >> think that the SDK could be fine with providing a Map(SDK ->
    >> environment). That is: "Run Java using PROCESS", "Run Python using
    >> DOCKER", and so on. A default mapping might exist on the expansion
    >> service as well (which might be passed through command line and
    that is
    >> the point 1)). Yes, the Map approach is definitely not universal,
    >> because one can imagine that the SDK itself is not enough for
    specifying
    >> the environment, but seems that vast majority of cases would
    fit into that.
    >>
    >> 3) The best might be for the SDK to provide a list of supported
    >> environments with additional metrics which the expansion
    service might
    >> choose from.
    >>
    >> These three approaches are all extensions to the current state.
    Current
    >> state has predefined environment without possibility to change it.
    >> Option 1) changes it to single configurable environment, option
    2) to N
    >> environments based on SDK and option 3) to M environments based on
    >> SDK-dependent metrics (and/or capabilitites of particular
    environment).
    >> Seems like gradual extensions of the current state, so maybe we can
    >> focus on the first one, and maybe add other, when there is a need?
    >>
    >> If this could be the first conclusion, then the next one would
    be, what
    >> should be the preferred way to implement it.
    >>
    >> WDYT?
    >>
    >> On 6/29/21 9:15 PM, Robert Bradshaw wrote:
    >>> +1, thanks for digging up that thread.
    >>>
    >>> I am still of the same opinion that I wrote there. To touch on
    some
    >>> things brought up here, copying something like
    >>> defaultEnvironmentConfig doesn't make sense from language to
    language
    >>> (e.g. the docker image name or CLI arguments for subprocess
    mode just
    >>> isn't going to work for all of Python, Java, and Go, and clearly
    >>> embedded type is only going to work for one.)
    >>>
    >>> In the short term, to change environment (or anything else)
    about the
    >>> "default" expansions service, the thing to do is build and
    start your
    >>> own expansion service that sets up the environment for its
    transforms
    >>> in a custom way.
    >>>
    >>> FYI, in Python, one can use --beam_services to use a custom
    expansion
    >>> service. E.g.
    >>>
    >>>
    --beam_services='{":sdks:java:extensions:sql:expansion-service:shadowJar":
    >>> "localhost:port"}'
    >>>
    >>> would override the default one when using SqlTransform.
    >>>
    >>> On Tue, Jun 29, 2021 at 11:47 AM Kyle Weaver
    <kcwea...@google.com <mailto:kcwea...@google.com>> wrote:
    >>>> For context, there was a previous thread which touched on
    many of the same points:
    
https://lists.apache.org/thread.html/r6f6fc207ed62e1bf2a1d41deeeab554e35cd2af389ce38289a303cea%40%3Cdev.beam.apache.org%3E
    
<https://lists.apache.org/thread.html/r6f6fc207ed62e1bf2a1d41deeeab554e35cd2af389ce38289a303cea%40%3Cdev.beam.apache.org%3E>
    >>>>
    >>>> On Tue, Jun 29, 2021 at 11:16 AM Jan Lukavský
    <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
    >>>>> I would slightly disagree that this breaks the black box
    nature of the expansion, the "how the transform expands" remains
    unknown to the SDK requesting the expansion, the "how the
    transform executes" - on the other hand - is something that the
    SDK must cooperate on - it knows (or could or should know) what is
    the environment that the pipeline is going to be executed on looks
    like. That is why expansion service on its own cannot correctly
    define the execution environment. It could, if it would be bound
    to runner (and its environemnt) - for instance
    FlinkRunnerExpansionService could probably expand KafkaIO to
    something more 'native'. But that requires knowledge of the target
    runner. If the expansion service is not dedicated to a runner, the
    only place where it can be defined, is the SDK - and therefore the
    expansion request.
    >>>>>
    >>>>>> Power users can always modify the output produced by the
    expansion service as well.
    >>>>> I'm not sure if I follow this, do you mean that power users,
    who run the expansion service can modify the output? Or is the
    output (protobuf) of the expansion service easily transferable
    between different execution environments?- I had the impression,
    that execution environments do not necessarily have to have the
    same payloads associated with them, and therefore it is impossible
    to 'postprocess' the output of the expansion. Is that wrong
    assumption?
    >>>>>
    >>>>> On 6/29/21 7:55 PM, Luke Cwik wrote:
    >>>>>
    >>>>> This would "break" the black box where the expansion service
    is supposed to hide the implementation internals from the caller
    and pushes compatibility of these kinds of environment overrides
    on to the expansion service and its implementer.
    >>>>>
    >>>>> Power users can always modify the output produced by the
    expansion service as well.
    >>>>>
    >>>>> On Tue, Jun 29, 2021 at 10:08 AM Jan Lukavský
    <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
    >>>>>> The argument for being able to accept (possibly ordered
    list of) execution environments is in that this could make a
    single instance of execution service reusable by various clients
    with different requirements. Moreover, the two approaches are
    probably orthogonal - users could specify
    'defaultExecutionEnvironment' for the service which could be used
    in case when there is no preference given by the client.
    >>>>>>
    >>>>>> On 6/29/21 7:03 PM, Luke Cwik wrote:
    >>>>>>
    >>>>>> I would be much more inclined for the user being able to
    configure the expansion service for their needs instead of
    changing the expansion service API.
    >>>>>>
    >>>>>> On Tue, Jun 29, 2021 at 9:42 AM Jan Lukavský
    <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
    >>>>>>> If I understand it correctly, there is currently no place
    to set the
    >>>>>>> defaultEnvironmentType - python's KafkaIO uses either
    >>>>>>> 'expansion_service' given by the user (which might be a
    host:port, or an
    >>>>>>> object that has appropriate method), or calls
    >>>>>>> 'default_io_expansion_service' - which in turn runs
    ExpansionService
    >>>>>>> using gradle. Either way, it ends up in
    ExpansionService#main [1]. It
    >>>>>>> could be possible to adapt ExpansionService and call it
    locally -
    >>>>>>> provided ExpansionService would provide a way to extend it
    (using
    >>>>>>> protected method createPipeline()) seems to be enough -
    but that is not
    >>>>>>> too much user-friendly. If we could specify the
    defaultEnvironmentConfig
    >>>>>>> when starting the ExpansionService, it would be possible
    to add these
    >>>>>>> parameters in the python SDK's KafkaIO, which would mean
    users do not
    >>>>>>> have to worry about the expansion service at all (leaving
    aside that
    >>>>>>> using too many ReafFromKafka or WriteToKafka transforms
    would somewhat
    >>>>>>> hurt performance during pipeline build, but that applies
    to the pipeline
    >>>>>>> build time only). I have created [2] to track that.
    >>>>>>>
    >>>>>>> Does that make sense, or is my analysis incorrect?
    >>>>>>>
    >>>>>>>     Jan
    >>>>>>>
    >>>>>>> [1]
    >>>>>>>
    
https://github.com/apache/beam/blob/22205ee1a84581e9206c5c61bad88a799779b4bc/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L511
    
<https://github.com/apache/beam/blob/22205ee1a84581e9206c5c61bad88a799779b4bc/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L511>
    >>>>>>>
    >>>>>>> [2] https://issues.apache.org/jira/browse/BEAM-12539
    <https://issues.apache.org/jira/browse/BEAM-12539>
    >>>>>>>
    >>>>>>>
    >>>>>>> On 6/29/21 6:24 PM, Alexey Romanenko wrote:
    >>>>>>>> I’m sorry if I missed something but do you mean that
    PortablePipelineOptions.setDefaultEnvironmentType(String) doesn’t
    work for you? Or it’s only a specific case while using portable
    KafkaIO?
    >>>>>>>>
    >>>>>>>>> On 29 Jun 2021, at 09:51, Jan Lukavský
    <x666je...@gmail.com <mailto:x666je...@gmail.com>> wrote:
    >>>>>>>>>
    >>>>>>>>> Hi,
    >>>>>>>>>
    >>>>>>>>> I have come across an issue with cross-language
    transforms. My setup is I have working environment type PROCESS
    and I cannot use DOCKER. When I use Python's KafkaIO, it
    unfortunately - by default - expands to docker environment, which
    then fails due to missing 'docker' command. I didn't find a
    solution without tackling the expansion service, yet.
    >>>>>>>>>
    >>>>>>>>> I see several possible solutions to that:
    >>>>>>>>>
    >>>>>>>>>     a) I would say, that the cleanest solution would be
    to add preferred environment type to the expansion request to the
    expansion service (probably along with additional flags, probably
    --experiments?). This requires deeper changes to the expansion RPC
    defintion, probably serializing the PipelineOptions from the
    client environment into the ExpansionRequest.
    >>>>>>>>>
    >>>>>>>>>     b) Another option would be to allow specifying some
    of the command-line arguments when starting the expansion service,
    which currently accepts only port on command line, see [1]. The
    straightforward 'fix' (see [2]) unfortunately does not work,
    because it requires DirectRunner to be on the classpath, which
    then breaks other runners (see [3]). It seems possible to copy
    hand selected options from command line to the Pipeline, but that
    feels hackish. It would require to either be able to construct the
    Pipeline without a runner specified (which seems possible when
    calling Pipeline.create(), but not when using PipelineOptions
    create by parsing command-line arguments) or to be able to create
    a Map<String, String> from PIpelineOptions and then the ability to
    copy all options into the Pipeline's options.
    >>>>>>>>>
    >>>>>>>>> My proposal would be to create a hackish shortcut and
    just copy the --defaultEnvironmentType, --defaultEnvironmentConfig
    and --experiments into Pipeline's options for now, and create an
    issue for a proper solution (possible a)?).
    >>>>>>>>>
    >>>>>>>>> WDYT? Or did I miss a way to override the default expansion?
    >>>>>>>>>
    >>>>>>>>> Thanks for comments,
    >>>>>>>>>
    >>>>>>>>>     Jan
    >>>>>>>>>
    >>>>>>>>> [1]
    
https://github.com/apache/beam/blob/22205ee1a84581e9206c5c61bad88a799779b4bc/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L511
    
<https://github.com/apache/beam/blob/22205ee1a84581e9206c5c61bad88a799779b4bc/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L511>
    >>>>>>>>>
    >>>>>>>>> [2] https://github.com/apache/beam/pull/15082
    <https://github.com/apache/beam/pull/15082>
    >>>>>>>>>
    >>>>>>>>> [3]
    https://ci-beam.apache.org/job/beam_PreCommit_Java_Commit/18169/
    <https://ci-beam.apache.org/job/beam_PreCommit_Java_Commit/18169/>
    >>>>>>>>>

Reply via email to