Hi,

after today's experience I think I have some arguments about why we *should* pass (at least some) of the PipelineOptions from SDK to expansion service.

 1) there are lots, and lots, and lots of bugs around SDF and around the "use_deprecated_read", sorry, but the switch to SDF as the default *way* too premature

 2) therefore, the expansion *is* runner dependent (because whether to use "use_deprecated_read" or not is runner dependent), only the client of the expansion service (the SDK, the driver code) knows the target runner - i.e. if the target runner can use "new" Read or "deprecated" Read

 3) currently, my opinion is that we hold many portable Flink users on 2.24.0, because from 2.25.0, the combination of Kafka + Python SDK + Flink is simply not working - until now, there is no way to pass arguments to expansion service, and even after that, "use_deprecated_read" is simply ignored by the service (pretty much the same as was in DirectRunner, see [1])

We should consider making use_deprecated_read the default for Flink (at least), not sure what is the state of other runners regarding that. It would be good to rename it, if we do not have plans to correctly support SDF (Read), including portability of other runners.

Yes, this might be a temporary issue, but the fact, that expansion is runner dependent remains valid, because such situation might reappear.

 Jan

[1] https://github.com/apache/beam/pull/15082/commits/5a46664ceb9f03da3089925b30ecd0a802e8b3eb

On 7/1/21 9:33 AM, Jan Lukavský wrote:
On 7/1/21 3:26 AM, Kyle Weaver wrote:

    I think it should accept complete list of PipelineOptions (or at
    least some defined subset - PortabilityPipelineOptions,
    ExperimentalOptions, ...?)


I'm not totally opposed to redefining some options, either. Using PipelineOptions could be confusing because only very few options would actually be respected -- even PortablePipelineOptions includes many options that wouldn't make sense in this context. Maybe better to have a small list of options that are guaranteed to work.

That makes sense. How would we define the subset? I think that would probably require some sort of annotation analogous to @Validation.Required, maybe @Validation.ExpansionSupported or similar. I'm fine with implementing that, but I would need now to get the 'hotfix' to upcoming 2.32.0 release. Could we make that for 2.33.0? Will you help me review the current PR [1]?

[1] https://github.com/apache/beam/pull/15082


On Wed, Jun 30, 2021 at 8:48 AM Jan Lukavský <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

     > Not sure why we need the hacks with NoOpRunner

    As noted earlier (and that was why I started this thread in the
    first
    place :)), adding :runners:direct-java as runtime dependency of the
    expansion service causes something like 200 tests in pre-commit
    to fail.
    Looks like there is some kind of conflict among Flink and Direct
    runner.
    I didn't dig too deep into that, though.

     > You could use the Python utilities in your script to
    start/stop it
    manually.

    Yes, that is possible. I'll probably follow that path.

     > This is where the runner's ability to customize environments
    would
    come in handy--e.g. a Java runner could decide to swap out the Java
    docker environment for EMBEDDED or LOOPBACK (and a Python-based
    runner
    could do the same for the Python docker env).

    That would be just perfect, as that would make it possible to
    finally
    unify 'classical' and 'portable' runners. But that is a whole
    different
    story. :)

      Jan

    On 6/30/21 5:35 PM, Robert Bradshaw wrote:
    > On Wed, Jun 30, 2021 at 7:41 AM Jan Lukavský <je...@seznam.cz
    <mailto:je...@seznam.cz>> wrote:
    >>> java -jar beam-sdks-java-io-expansion-service-2.30.0.jar <port>
    >> This does not accept any other parameters than the port. That
    is the first part of this thread - the intent was to enable this
    to accept additional arguments, but there are (still waiting to
    be addressed unresolved) issues. There currently even seems to be
    no other way to adapt ExpansionService than to copy&paste the
    code and modify it, because it simply is not extensible. What
    would be enough is wrapping Pipeline.create() [1] call to a
    protected method, or add (protected) constructor that would
    accept PipelineOptions (probably better in this regard). That
    would make it more easy for users to create customized
    ExpansionService and it would (sort of) help solving described
    issues.
    > Yes, let's make it easy to extend/customize/start up a custom
    > ExpansionService, including adding optional command line
    arguments to
    > the "default" one. Not sure why we need the hacks with NoOpRunner
    > (IMHO, the direct runner should just be part of the SDK, but that's
    > not where we live now).
    >
    >> But even if we do that, we still need to deal with the
    expansion service on two places:
    >>
    >>   a) run it (and stop it)
    >>
    >>   b) specify it in the
    >>
    >> Using the default expansion service is much, much easier, it
    is started and stopped automatically for the user. Morever, the
    JavaJarExpansionService actually even presumes that there can be
    additional arguments passed to the service ([2]), the
    ExpansionService only does not accept them (and kafka IO does not
    expose that - that could be worked-around by users by manually
    creating the JavaJarExpansionService from own jar, yes). I would
    find it natural to add the command-line parsing (somehow!) to the
    ExpansionService itself, so that it doesn't need end-user
    modifications and then to figure out how to most easily expose
    there command-line arguments to end-users.
    > You could use the Python utilities in your script to start/stop
    it manually.
    >
    >> Yes, I verified that Flink can use Python Kafka IO over
    PROCESS environment with some hacking of the ExpansionService as
    shown in one of the linked PRs (though there is probably still
    some bugs regarding SDF - [3]). Adding --experiments seems have
    the same issues, need expose that to the CLI of ExpansionService.
    And I'm not sure if this [4] is not in conflict with
    --experiments=use_deprecated_read. That is something I still need
    to investigate.
    >>
    >> LOOPBACK is currently not supported by Flink. That is
    nice-to-have feature.
    > Local Flink does support LOOPBACK mode. If you just want to run
    > locally, just specifying "FlinkRunner" is enough. It's distributed
    > Flink that does not. It seems a lot of complexities are due to
    trying
    > to using minikube, which acts like it's distributed, but trying to
    > make it as easy as if it were all local (and the docker
    deficiencies
    > as well, which would make it just work...) Which is a worthy goal.
    >
    > This is where the runner's ability to customize environments would
    > come in handy--e.g. a Java runner could decide to swap out the Java
    > docker environment for EMBEDDED or LOOPBACK (and a Python-based
    runner
    > could do the same for the Python docker env).
    >
    >> [1]
    
https://github.com/apache/beam/blob/f9a4bfcb027f2e3a8e32578adf49981aeef3586a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L394
    
<https://github.com/apache/beam/blob/f9a4bfcb027f2e3a8e32578adf49981aeef3586a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L394>
    >>
    >> [2]
    
https://github.com/apache/beam/blob/f9a4bfcb027f2e3a8e32578adf49981aeef3586a/sdks/python/apache_beam/transforms/external.py#L481
    
<https://github.com/apache/beam/blob/f9a4bfcb027f2e3a8e32578adf49981aeef3586a/sdks/python/apache_beam/transforms/external.py#L481>
    >>
    >> [3] https://issues.apache.org/jira/browse/BEAM-11998
    <https://issues.apache.org/jira/browse/BEAM-11998>
    >>
    >> [4]
    
https://github.com/apache/beam/blob/b86fcf94af26a240777f30f8193a314cb7ffc87e/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L398
    
<https://github.com/apache/beam/blob/b86fcf94af26a240777f30f8193a314cb7ffc87e/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L398>
    >>
    >> On 6/30/21 3:57 PM, Chamikara Jayalath wrote:
    >>
    >>
    >>
    >> On Wed, Jun 30, 2021 at 6:54 AM Chamikara Jayalath
    <chamik...@google.com <mailto:chamik...@google.com>> wrote:
    >>>
    >>>
    >>> On Wed, Jun 30, 2021 at 1:20 AM Jan Lukavský <je...@seznam.cz
    <mailto:je...@seznam.cz>> wrote:
    >>>> On 6/30/21 1:16 AM, Robert Bradshaw wrote:
    >>>>> <rant>Why doesn't docker in docker just work, rather than
    having to do
    >>>>> ugly hacks when composing two technologies that both rely on
    >>>>> docker...</rant>
    >>>>>
    >>>>> Presumably you're setting up a node for Kafka and Flink;
    why not set
    >>>>> one up for the expansion service as well? The UX of
    >>>>>
    >>>>>
     ReadFromKafka(default_expansion_service_args={"defaultEnvironmentType":
    >>>>> "PROCESS", "defaultEnvironmentConfig": ""{\"os\":
    \"linux\", \"arch\":
    >>>>> \"amd64\", \"command\": \"/path/to/launcher/boot
    >>>>> cp=/some/other/long/path\" ...}")"})
    >>>>>
    >>>>> isn't that great either. Rather than pass arbitrary
    arguments to a
    >>>>> default expansion service, I still think once you get to
    this level
    >>>>> it's better to just start your own expansion service.
    >>>> Sure, that is possible (seems to me, that it would still
    require some
    >>>> changes to ExpansionService to be extendable, but yes, kind
    of tiny
    >>>> changes). The problem is not with Flink or Kafka - those are
    >>>> technologies you are actually expecting to set up, because
    you want to
    >>>> use them. The problem is what everything else you must set
    up for making
    >>>> something that seems as easy as "read a few messages from
    kafka in beam
    >>>> python" to work. You must have:
    >>>>
    >>>>    a) Python SDK harness (OK, that is something that should
    be probably
    >>>> expected) - there are few problems with it, namely it is
    somewhat
    >>>> hardcoded that it must run in the same pod as Flink's
    taskmanager to be
    >>>> able to use EXTERNAL environment, but ok, let's go on
    >>>>
    >>>>    b) Java SDK harness, at least installed in docker image
    of taskmanager
    >>>> (to be usable via PROCESS environment) - OK, that starts to
    be weird,
    >>>> taskmanager is java, right? Something like LOOPBACK would be
    cool there,
    >>>> but never mind. You create custom docker image for your
    Flink JM and TM
    >>>> and continue.
    >>>>
    >>>>    c) Implement (extend) and deploy own expansion service -
    ouch, that
    >>>> starts to hurt, that is even going to be a pod that is
    running even
    >>>> though there is nothing using it (yes, can be scaled down).
    >>>>
    >>>> The complexity of a simple task starts to be somewhat
    extraordinary. And
    >>>> most of the users will not be willing to follow this path,
    I'm afraid.
    >>>> People generally don't like to set up complex environment
    for something
    >>>> that looks it should "just work".  There is non-trivial work
    necessary
    >>>> to make all of this working, mostly when you are starting to
    evaluate
    >>>> Beam and don't have much experience with it.
    >>>
    >>> I don't think we should expect end-users to implement or
    extend the expansion service. Everything should be already
    implemented and maybe we can even provide a script to easily
    startup a local Java expansion service with additional parameters.
    >>>
    >>> Today, to start a Java expansion service for Kafka users have
    to do the following.
    >>>
    >>> * Download expansion service jar released with Beam for
    Kafka. For example [1]
    >>>
    >>> * Run following command:
    >>> java -jar beam-sdks-java-io-expansion-service-2.30.0.jar <port>
    >>>
    >>> * To use this they just have to provide "localhost:<port>" to
    [2].
    >>>
    >>> This is a few extra steps but mostly a one time setup for the
    user and nothing to do with portability or other complexities of
    Beam.
    >>>
    >>> I'm all for simplifying the user-experience, but adding
    changes to the transform API that might have to be deprecated
    later sounds like a bad idea. I'd much rather provide additional
    scripts/documentation/examples to simplify such use-cases. I
    think that will be adequate for most users.
    >>>
    >>> BTW, slightly orthogonal, I don't think multi-language would
    work in LOOPBACK mode today without additional changes to
    portable runners (at least I've never tested this). Did you
    confirm that this works ?
    >>
    >> Or PROCESS mode.
    >>
    >>>
    >>> Thanks,
    >>> Cham
    >>>
    >>> [1]
    
https://repo1.maven.org/maven2/org/apache/beam/beam-sdks-java-io-expansion-service/2.30.0/beam-sdks-java-io-expansion-service-2.30.0.jar
    
<https://repo1.maven.org/maven2/org/apache/beam/beam-sdks-java-io-expansion-service/2.30.0/beam-sdks-java-io-expansion-service-2.30.0.jar>
    >>> [2]
    
https://github.com/apache/beam/blob/b86fcf94af26a240777f30f8193a314cb7ffc87e/sdks/python/apache_beam/io/kafka.py#L133
    
<https://github.com/apache/beam/blob/b86fcf94af26a240777f30f8193a314cb7ffc87e/sdks/python/apache_beam/io/kafka.py#L133>
    >>>
    >>>
    >>>>
    >>>> We can get rid of b) (implement LOOPBACK in Flink) and c)
    (enable Python
    >>>> SDK Kafka IO to spawn expansion service with the LOOPBACK
    environment
    >>>> when submitting to Flink). That is why I still think that this
    >>>> simplification matters a lot.
    >>>>
    >>>>> On Tue, Jun 29, 2021 at 3:33 PM Jan Lukavský
    <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
    >>>>>> I believe we could change that more or less the same as we
    can deprecate / stop supporting any other parameter of any
    method. If python starts to support natively Kafka IO, then we
    can simply log warning / raise exception (one after the other).
    That seems like natural development.
    >>>>>>
    >>>>>> Maybe I should have described the case - I'm trying to
    setup a "simple" use-case for users that want to try Python SDK
    to read using Flink from Kafka using Minikube (both Kafka and
    Flink are running inside Minikube). There are tons of problems to
    use docker from within Minkube and I would not say that is the
    "simple" way we would like to present to users. Setting up own
    expansion service is possibility - but that also lacks the UX
    approach. I pretty much think that understanding portability on
    it's own is already a burden we put on users (yes, we do that for
    a reason, but everything else should be as simple as possible).
    >>>>>>
    >>>>>> On 6/30/21 12:16 AM, Chamikara Jayalath wrote:
    >>>>>>
    >>>>>> So I think one downside to this PR is that we assume that
    the default expansion service used by the transform (Kafka in
    this case) will not change. Currently it's fully opaque. In the
    default case we just promise that the transform will work (if
    conditions I mentioned above are met). Nothing else.
    >>>>>> If we add a "param default_expansion_service_args", we
    leak the nature of the default expansion service to the API and
    it will be hard to change it in the future.
    >>>>>>
    >>>>>> Thanks,
    >>>>>> Cham
    >>>>>>
    >>>>>> On Tue, Jun 29, 2021 at 3:07 PM Jan Lukavský
    <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
    >>>>>>> I would absolutely understand this, if it would be mostly
    impossible or at least really hard to get the user friendly
    behavior. But we are mostly there in this case. When we can
    actually quite simply pass the supported environment via
    parameter, I think we should go for it.
    >>>>>>>
    >>>>>>> I have created a sketch (I verified that when the
    ExpansionService is patched 'enough' this works) in [1]. This is
    only a sketch, because we first must know how to support the
    default execution environment in ExpansionService.
    >>>>>>>
    >>>>>>> [1] https://github.com/apache/beam/pull/15099/files
    <https://github.com/apache/beam/pull/15099/files>
    >>>>>>>
    >>>>>>> On 6/29/21 11:51 PM, Chamikara Jayalath wrote:
    >>>>>>>
    >>>>>>>
    >>>>>>>
    >>>>>>> On Tue, Jun 29, 2021 at 2:39 PM Jan Lukavský
    <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
    >>>>>>>> On 6/29/21 11:04 PM, Robert Bradshaw wrote:
    >>>>>>>>> You can configure the environment in the current state,
    you just have
    >>>>>>>>> to run your own expansion service that has a different
    environment
    >>>>>>>>> backed into it (or, makes this configurable).
    >>>>>>>> Yes, that is true. On the other hand that lacks some
    user-friendliness,
    >>>>>>>> because ideally, you don't want to worry about expansion
    services,
    >>>>>>>> mostly when it comes to some mostly standard IO. The
    ideal case is that
    >>>>>>>> you either do not basically know that you use external
    transform (which
    >>>>>>>> is probably the case when you can use docker), or you
    are able to
    >>>>>>>> overcome the problem within the SDK (Python) by passing
    some argument to
    >>>>>>>> the input transform.
    >>>>>>> Arguments passed to the pipeline level apply to the whole
    pipeline (not just one transform). So if you pass in a default
    environment (and configs) at pipeline level, that would mean the
    default environment and configs used by the pipeline (so Python
    SDK in this case) not a specific transform.
    >>>>>>> I believe we have made usage of external transforms
    used-friendly for the general case. But we had to make some
    assumptions. For example we assumed,
    >>>>>>> * user will be using the default environment of the
    expansion service (Docker in this case)
    >>>>>>> * User will be using the pre-specified dependency only
    (sdks:java:io:expansion-service:shadowJar for Kafka)
    >>>>>>> * User will be in an environment where the jar can be
    downloaded.
    >>>>>>>
    >>>>>>> I would consider any use-case where these basic
    assumptions cannot be met as an advanced use-case. The solution
    in such a case would be to start a custom expansion service and
    pass the address of it as a parameter to the transform [1]. I'm
    fine with extending the capabilities of Java expansion service by
    adding more parameters (for example, for overriding the
    environment, for specifying dependencies, for providing pipeline
    options).
    >>>>>>>
    >>>>>>> Thanks,
    >>>>>>> Cham
    >>>>>>>
    >>>>>>> [1]
    
https://github.com/apache/beam/blob/b86fcf94af26a240777f30f8193a314cb7ffc87e/sdks/python/apache_beam/io/kafka.py#L133
    
<https://github.com/apache/beam/blob/b86fcf94af26a240777f30f8193a314cb7ffc87e/sdks/python/apache_beam/io/kafka.py#L133>
    >>>>>>>
    >>>>>>>
    >>>>>>>>> Is option (1) updating the default expansion service
    such that one can
    >>>>>>>>> override default environment properties on the command
    line? (You
    >>>>>>>>> would still have to start it up manually to use it.)
    >>>>>>>> Yes and no. :) Updating ExpansionService so that you can
    specify default
    >>>>>>>> environment on command like makes this accessible to
    >>>>>>>> JavaJarExpansionService, and that makes it possible to
    add (optional)
    >>>>>>>> argument to Python Kafka IO, that would delegate this to the
    >>>>>>>> (automatically) started expansion service. It is
    important to note that
    >>>>>>>> both ReadFromKafka and WriteToKafka have expansion that
    involves only
    >>>>>>>> single external (Java) SDK. That simplifies things.
    >>>>>>>>> Maybe it would help to make things more concrete.
    Suppose I have a Go
    >>>>>>>>> pipeline that uses a library which invokes a Python
    external transform
    >>>>>>>>> to do ML (say, via TFX), and two Java IOs (which happen
    to have
    >>>>>>>>> mutually exclusive dependencies). The ML transform
    itself uses Java to
    >>>>>>>>> invoke some SQL.
    >>>>>>>>>
    >>>>>>>>> The way things work currently is each external
    transform will have an
    >>>>>>>>> associated fully specified environment and a runner can
    use docker to
    >>>>>>>>> start up the required workers at the expected time.
    >>>>>>>>>
    >>>>>>>>> Now, suppose one doesn't have docker on the workers.
    One wants to run this with
    >>>>>>>>>
    >>>>>>>>>        ./my_pipeline --someFlag=someValue
    --someOtherFlag=someOtherValue ...
    >>>>>>>>>
    >>>>>>>>> such that docker is no longer needed. What someFlags
    would we need,
    >>>>>>>>> and what would their values be? (And how to make this
    feasible to
    >>>>>>>>> implement.)
    >>>>>>>>>
    >>>>>>>>> Are there meaningful intermediate points that extend to
    a general
    >>>>>>>>> solution (or at least aren't hostile to it)?
    >>>>>>>> I believe that in the option 2) the best way would to
    use each SDK's URN
    >>>>>>>> Then the arguments could be something like
    >>>>>>>>
    "--expansionEnvironments={"apache:beam:go:2.33.0:latest"={"env"="DOCKER",
    >>>>>>>> config="<image>"},
    "apache:beam:python:2.33.0:latest"={env="PROCESS",
    >>>>>>>> config={...}}". Yes, it would require a lot of
    "syntactic sugar" to
    >>>>>>>> configure that. :) (sorry if I don't have URNs for SDKs
    100% correct)
    >>>>>>>>> I still think in the long run having runners understand
    environments,
    >>>>>>>>> and saying "oh, whenever I see
    'apache:beam:java:2.33.0:latest' I'll
    >>>>>>>>> swap that out for 'path/to/my/java -cp ...' is the
    right way to go
    >>>>>>>>> long-term. (I would put this in runners, not SDKs,
    though a common
    >>>>>>>>> runners library could be used.)
    >>>>>>>> Yes, I also agree, that expansion service should be
    runner-dependent (or
    >>>>>>>> at least runner-aware), as that brings optimizations.
    Runner could
    >>>>>>>> ignore settings from previous point when it can be
    *sure* it can do so.
    >>>>>>>>> On Tue, Jun 29, 2021 at 1:29 PM Jan Lukavský
    <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
    >>>>>>>>>> Thanks for pointing to that thread.
    >>>>>>>>>>
    >>>>>>>>>> 1) I'm - as well as Kyle - fine with the approach that
    we use a
    >>>>>>>>>> "preferred environment" for the expansion service. We
    only need to pass
    >>>>>>>>>> it via command line. Yes, the command line might be
    generally
    >>>>>>>>>> SDK-dependent, and that makes it expansion dependent,
    because whether or
    >>>>>>>>>> not particular transform is "external" or not is
    implementation detail.
    >>>>>>>>>> That is the nasty part. The rest of my original
    question is about, how
    >>>>>>>>>> exactly to do that, because it seems to be tricky, due
    to the fact, that
    >>>>>>>>>> it is not possible to include runtime dependency on
    DirectRunner (fails
    >>>>>>>>>> many, many tests) and it is not possible to extract
    PipelineOptions as a
    >>>>>>>>>> Map either.
    >>>>>>>>>>
    >>>>>>>>>> 2) Regarding SDK injecting environment, I still think
    that is the
    >>>>>>>>>> correct way. The SDK (the driver code) own the
    execution environment. It
    >>>>>>>>>> should be able to define (or at least prioritize)
    runtime environments
    >>>>>>>>>> of all transforms. If we cannot know in advance, which
    transform is
    >>>>>>>>>> going to expand to how many nested (and possibly
    external) transforms, I
    >>>>>>>>>> think that the SDK could be fine with providing a
    Map(SDK ->
    >>>>>>>>>> environment). That is: "Run Java using PROCESS", "Run
    Python using
    >>>>>>>>>> DOCKER", and so on. A default mapping might exist on
    the expansion
    >>>>>>>>>> service as well (which might be passed through command
    line and that is
    >>>>>>>>>> the point 1)). Yes, the Map approach is definitely not
    universal,
    >>>>>>>>>> because one can imagine that the SDK itself is not
    enough for specifying
    >>>>>>>>>> the environment, but seems that vast majority of cases
    would fit into that.
    >>>>>>>>>>
    >>>>>>>>>> 3) The best might be for the SDK to provide a list of
    supported
    >>>>>>>>>> environments with additional metrics which the
    expansion service might
    >>>>>>>>>> choose from.
    >>>>>>>>>>
    >>>>>>>>>> These three approaches are all extensions to the
    current state. Current
    >>>>>>>>>> state has predefined environment without possibility
    to change it.
    >>>>>>>>>> Option 1) changes it to single configurable
    environment, option 2) to N
    >>>>>>>>>> environments based on SDK and option 3) to M
    environments based on
    >>>>>>>>>> SDK-dependent metrics (and/or capabilitites of
    particular environment).
    >>>>>>>>>> Seems like gradual extensions of the current state, so
    maybe we can
    >>>>>>>>>> focus on the first one, and maybe add other, when
    there is a need?
    >>>>>>>>>>
    >>>>>>>>>> If this could be the first conclusion, then the next
    one would be, what
    >>>>>>>>>> should be the preferred way to implement it.
    >>>>>>>>>>
    >>>>>>>>>> WDYT?
    >>>>>>>>>>
    >>>>>>>>>> On 6/29/21 9:15 PM, Robert Bradshaw wrote:
    >>>>>>>>>>> +1, thanks for digging up that thread.
    >>>>>>>>>>>
    >>>>>>>>>>> I am still of the same opinion that I wrote there. To
    touch on some
    >>>>>>>>>>> things brought up here, copying something like
    >>>>>>>>>>> defaultEnvironmentConfig doesn't make sense from
    language to language
    >>>>>>>>>>> (e.g. the docker image name or CLI arguments for
    subprocess mode just
    >>>>>>>>>>> isn't going to work for all of Python, Java, and Go,
    and clearly
    >>>>>>>>>>> embedded type is only going to work for one.)
    >>>>>>>>>>>
    >>>>>>>>>>> In the short term, to change environment (or anything
    else) about the
    >>>>>>>>>>> "default" expansions service, the thing to do is
    build and start your
    >>>>>>>>>>> own expansion service that sets up the environment
    for its transforms
    >>>>>>>>>>> in a custom way.
    >>>>>>>>>>>
    >>>>>>>>>>> FYI, in Python, one can use --beam_services to use a
    custom expansion
    >>>>>>>>>>> service. E.g.
    >>>>>>>>>>>
    >>>>>>>>>>>
    --beam_services='{":sdks:java:extensions:sql:expansion-service:shadowJar":
    >>>>>>>>>>> "localhost:port"}'
    >>>>>>>>>>>
    >>>>>>>>>>> would override the default one when using SqlTransform.
    >>>>>>>>>>>
    >>>>>>>>>>> On Tue, Jun 29, 2021 at 11:47 AM Kyle Weaver
    <kcwea...@google.com <mailto:kcwea...@google.com>> wrote:
    >>>>>>>>>>>> For context, there was a previous thread which
    touched on many of the same points:
    
https://lists.apache.org/thread.html/r6f6fc207ed62e1bf2a1d41deeeab554e35cd2af389ce38289a303cea%40%3Cdev.beam.apache.org%3E
    
<https://lists.apache.org/thread.html/r6f6fc207ed62e1bf2a1d41deeeab554e35cd2af389ce38289a303cea%40%3Cdev.beam.apache.org%3E>
    >>>>>>>>>>>>
    >>>>>>>>>>>> On Tue, Jun 29, 2021 at 11:16 AM Jan Lukavský
    <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
    >>>>>>>>>>>>> I would slightly disagree that this breaks the
    black box nature of the expansion, the "how the transform
    expands" remains unknown to the SDK requesting the expansion, the
    "how the transform executes" - on the other hand - is something
    that the SDK must cooperate on - it knows (or could or should
    know) what is the environment that the pipeline is going to be
    executed on looks like. That is why expansion service on its own
    cannot correctly define the execution environment. It could, if
    it would be bound to runner (and its environemnt) - for instance
    FlinkRunnerExpansionService could probably expand KafkaIO to
    something more 'native'. But that requires knowledge of the
    target runner. If the expansion service is not dedicated to a
    runner, the only place where it can be defined, is the SDK - and
    therefore the expansion request.
    >>>>>>>>>>>>>
    >>>>>>>>>>>>>> Power users can always modify the output produced
    by the expansion service as well.
    >>>>>>>>>>>>> I'm not sure if I follow this, do you mean that
    power users, who run the expansion service can modify the output?
    Or is the output (protobuf) of the expansion service easily
    transferable between different execution environments?- I had the
    impression, that execution environments do not necessarily have
    to have the same payloads associated with them, and therefore it
    is impossible to 'postprocess' the output of the expansion. Is
    that wrong assumption?
    >>>>>>>>>>>>>
    >>>>>>>>>>>>> On 6/29/21 7:55 PM, Luke Cwik wrote:
    >>>>>>>>>>>>>
    >>>>>>>>>>>>> This would "break" the black box where the
    expansion service is supposed to hide the implementation
    internals from the caller and pushes compatibility of these kinds
    of environment overrides on to the expansion service and its
    implementer.
    >>>>>>>>>>>>>
    >>>>>>>>>>>>> Power users can always modify the output produced
    by the expansion service as well.
    >>>>>>>>>>>>>
    >>>>>>>>>>>>> On Tue, Jun 29, 2021 at 10:08 AM Jan Lukavský
    <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
    >>>>>>>>>>>>>> The argument for being able to accept (possibly
    ordered list of) execution environments is in that this could
    make a single instance of execution service reusable by various
    clients with different requirements. Moreover, the two approaches
    are probably orthogonal - users could specify
    'defaultExecutionEnvironment' for the service which could be used
    in case when there is no preference given by the client.
    >>>>>>>>>>>>>>
    >>>>>>>>>>>>>> On 6/29/21 7:03 PM, Luke Cwik wrote:
    >>>>>>>>>>>>>>
    >>>>>>>>>>>>>> I would be much more inclined for the user being
    able to configure the expansion service for their needs instead
    of changing the expansion service API.
    >>>>>>>>>>>>>>
    >>>>>>>>>>>>>> On Tue, Jun 29, 2021 at 9:42 AM Jan Lukavský
    <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
    >>>>>>>>>>>>>>> If I understand it correctly, there is currently
    no place to set the
    >>>>>>>>>>>>>>> defaultEnvironmentType - python's KafkaIO uses either
    >>>>>>>>>>>>>>> 'expansion_service' given by the user (which
    might be a host:port, or an
    >>>>>>>>>>>>>>> object that has appropriate method), or calls
    >>>>>>>>>>>>>>> 'default_io_expansion_service' - which in turn
    runs ExpansionService
    >>>>>>>>>>>>>>> using gradle. Either way, it ends up in
    ExpansionService#main [1]. It
    >>>>>>>>>>>>>>> could be possible to adapt ExpansionService and
    call it locally -
    >>>>>>>>>>>>>>> provided ExpansionService would provide a way to
    extend it (using
    >>>>>>>>>>>>>>> protected method createPipeline()) seems to be
    enough - but that is not
    >>>>>>>>>>>>>>> too much user-friendly. If we could specify the
    defaultEnvironmentConfig
    >>>>>>>>>>>>>>> when starting the ExpansionService, it would be
    possible to add these
    >>>>>>>>>>>>>>> parameters in the python SDK's KafkaIO, which
    would mean users do not
    >>>>>>>>>>>>>>> have to worry about the expansion service at all
    (leaving aside that
    >>>>>>>>>>>>>>> using too many ReafFromKafka or WriteToKafka
    transforms would somewhat
    >>>>>>>>>>>>>>> hurt performance during pipeline build, but that
    applies to the pipeline
    >>>>>>>>>>>>>>> build time only). I have created [2] to track that.
    >>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>> Does that make sense, or is my analysis incorrect?
    >>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>>       Jan
    >>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>> [1]
    >>>>>>>>>>>>>>>
    
https://github.com/apache/beam/blob/22205ee1a84581e9206c5c61bad88a799779b4bc/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L511
    
<https://github.com/apache/beam/blob/22205ee1a84581e9206c5c61bad88a799779b4bc/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L511>
    >>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>> [2]
    https://issues.apache.org/jira/browse/BEAM-12539
    <https://issues.apache.org/jira/browse/BEAM-12539>
    >>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>> On 6/29/21 6:24 PM, Alexey Romanenko wrote:
    >>>>>>>>>>>>>>>> I’m sorry if I missed something but do you mean
    that PortablePipelineOptions.setDefaultEnvironmentType(String)
    doesn’t work for you? Or it’s only a specific case while using
    portable KafkaIO?
    >>>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>>>> On 29 Jun 2021, at 09:51, Jan Lukavský
    <x666je...@gmail.com <mailto:x666je...@gmail.com>> wrote:
    >>>>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>>>> Hi,
    >>>>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>>>> I have come across an issue with cross-language
    transforms. My setup is I have working environment type PROCESS
    and I cannot use DOCKER. When I use Python's KafkaIO, it
    unfortunately - by default - expands to docker environment, which
    then fails due to missing 'docker' command. I didn't find a
    solution without tackling the expansion service, yet.
    >>>>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>>>> I see several possible solutions to that:
    >>>>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>>>>    a) I would say, that the cleanest solution
    would be to add preferred environment type to the expansion
    request to the expansion service (probably along with additional
    flags, probably --experiments?). This requires deeper changes to
    the expansion RPC defintion, probably serializing the
    PipelineOptions from the client environment into the
    ExpansionRequest.
    >>>>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>>>>    b) Another option would be to allow
    specifying some of the command-line arguments when starting the
    expansion service, which currently accepts only port on command
    line, see [1]. The straightforward 'fix' (see [2]) unfortunately
    does not work, because it requires DirectRunner to be on the
    classpath, which then breaks other runners (see [3]). It seems
    possible to copy hand selected options from command line to the
    Pipeline, but that feels hackish. It would require to either be
    able to construct the Pipeline without a runner specified (which
    seems possible when calling Pipeline.create(), but not when using
    PipelineOptions create by parsing command-line arguments) or to
    be able to create a Map<String, String> from PIpelineOptions and
    then the ability to copy all options into the Pipeline's options.
    >>>>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>>>> My proposal would be to create a hackish
    shortcut and just copy the --defaultEnvironmentType,
    --defaultEnvironmentConfig and --experiments into Pipeline's
    options for now, and create an issue for a proper solution
    (possible a)?).
    >>>>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>>>> WDYT? Or did I miss a way to override the
    default expansion?
    >>>>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>>>> Thanks for comments,
    >>>>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>>>>    Jan
    >>>>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>>>> [1]
    
https://github.com/apache/beam/blob/22205ee1a84581e9206c5c61bad88a799779b4bc/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L511
    
<https://github.com/apache/beam/blob/22205ee1a84581e9206c5c61bad88a799779b4bc/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L511>
    >>>>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>>>> [2] https://github.com/apache/beam/pull/15082
    <https://github.com/apache/beam/pull/15082>
    >>>>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>>>> [3]
    https://ci-beam.apache.org/job/beam_PreCommit_Java_Commit/18169/
    <https://ci-beam.apache.org/job/beam_PreCommit_Java_Commit/18169/>
    >>>>>>>>>>>>>>>>>

Reply via email to