I don't have complete comprehension of the topic, but from what I have observed, the runner gets (possibly cross-language) proto description of the pipeline, and the post-processing there might be limited.  That is mainly due to the fact, that we have inverted the expansion flow - we expand Read to SDF and only when "use_deprecated_read" is on, we revert it back to UnboundedSource. The portable runner cannot interfere with that.

On 7/1/21 8:31 PM, Kyle Weaver wrote:
I thought the runner was expected to call convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary. Why do we need to do that in the expansion service?

On Thu, Jul 1, 2021 at 11:16 AM Jan Lukavský <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

    Hi,

    after today's experience I think I have some arguments about why
    we *should* pass (at least some) of the PipelineOptions from SDK
    to expansion service.

     1) there are lots, and lots, and lots of bugs around SDF and
    around the "use_deprecated_read", sorry, but the switch to SDF as
    the default *way* too premature

     2) therefore, the expansion *is* runner dependent (because
    whether to use "use_deprecated_read" or not is runner dependent),
    only the client of the expansion service (the SDK, the driver
    code) knows the target runner - i.e. if the target runner can use
    "new" Read or "deprecated" Read

     3) currently, my opinion is that we hold many portable Flink
    users on 2.24.0, because from 2.25.0, the combination of Kafka +
    Python SDK + Flink is simply not working - until now, there is no
    way to pass arguments to expansion service, and even after that,
    "use_deprecated_read" is simply ignored by the service (pretty
    much the same as was in DirectRunner, see [1])

    We should consider making use_deprecated_read the default for
    Flink (at least), not sure what is the state of other runners
    regarding that. It would be good to rename it, if we do not have
    plans to correctly support SDF (Read), including portability of
    other runners.

    Yes, this might be a temporary issue, but the fact, that expansion
    is runner dependent remains valid, because such situation might
    reappear.

     Jan

    [1]
    
https://github.com/apache/beam/pull/15082/commits/5a46664ceb9f03da3089925b30ecd0a802e8b3eb
    
<https://github.com/apache/beam/pull/15082/commits/5a46664ceb9f03da3089925b30ecd0a802e8b3eb>

    On 7/1/21 9:33 AM, Jan Lukavský wrote:
    On 7/1/21 3:26 AM, Kyle Weaver wrote:

        I think it should accept complete list of PipelineOptions
        (or at least some defined subset -
        PortabilityPipelineOptions, ExperimentalOptions, ...?)


    I'm not totally opposed to redefining some options, either.
    Using PipelineOptions could be confusing because only very few
    options would actually be respected -- even
    PortablePipelineOptions includes many options that wouldn't
    make sense in this context. Maybe better to have a small list of
    options that are guaranteed to work.

    That makes sense. How would we define the subset? I think that
    would probably require some sort of annotation analogous to
    @Validation.Required, maybe @Validation.ExpansionSupported or
    similar. I'm fine with implementing that, but I would need now to
    get the 'hotfix' to upcoming 2.32.0 release. Could we make that
    for 2.33.0? Will you help me review the current PR [1]?

    [1] https://github.com/apache/beam/pull/15082
    <https://github.com/apache/beam/pull/15082>


    On Wed, Jun 30, 2021 at 8:48 AM Jan Lukavský <je...@seznam.cz
    <mailto:je...@seznam.cz>> wrote:

         > Not sure why we need the hacks with NoOpRunner

        As noted earlier (and that was why I started this thread in
        the first
        place :)), adding :runners:direct-java as runtime dependency
        of the
        expansion service causes something like 200 tests in
        pre-commit to fail.
        Looks like there is some kind of conflict among Flink and
        Direct runner.
        I didn't dig too deep into that, though.

         > You could use the Python utilities in your script to
        start/stop it
        manually.

        Yes, that is possible. I'll probably follow that path.

         > This is where the runner's ability to customize
        environments would
        come in handy--e.g. a Java runner could decide to swap out
        the Java
        docker environment for EMBEDDED or LOOPBACK (and a
        Python-based runner
        could do the same for the Python docker env).

        That would be just perfect, as that would make it possible
        to finally
        unify 'classical' and 'portable' runners. But that is a
        whole different
        story. :)

          Jan

        On 6/30/21 5:35 PM, Robert Bradshaw wrote:
        > On Wed, Jun 30, 2021 at 7:41 AM Jan Lukavský
        <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
        >>> java -jar beam-sdks-java-io-expansion-service-2.30.0.jar
        <port>
        >> This does not accept any other parameters than the port.
        That is the first part of this thread - the intent was to
        enable this to accept additional arguments, but there are
        (still waiting to be addressed unresolved) issues. There
        currently even seems to be no other way to adapt
        ExpansionService than to copy&paste the code and modify it,
        because it simply is not extensible. What would be enough is
        wrapping Pipeline.create() [1] call to a protected method,
        or add (protected) constructor that would accept
        PipelineOptions (probably better in this regard). That would
        make it more easy for users to create customized
        ExpansionService and it would (sort of) help solving
        described issues.
        > Yes, let's make it easy to extend/customize/start up a custom
        > ExpansionService, including adding optional command line
        arguments to
        > the "default" one. Not sure why we need the hacks with
        NoOpRunner
        > (IMHO, the direct runner should just be part of the SDK,
        but that's
        > not where we live now).
        >
        >> But even if we do that, we still need to deal with the
        expansion service on two places:
        >>
        >>   a) run it (and stop it)
        >>
        >>   b) specify it in the
        >>
        >> Using the default expansion service is much, much easier,
        it is started and stopped automatically for the user.
        Morever, the JavaJarExpansionService actually even presumes
        that there can be additional arguments passed to the service
        ([2]), the ExpansionService only does not accept them (and
        kafka IO does not expose that - that could be worked-around
        by users by manually creating the JavaJarExpansionService
        from own jar, yes). I would find it natural to add the
        command-line parsing (somehow!) to the ExpansionService
        itself, so that it doesn't need end-user modifications and
        then to figure out how to most easily expose there
        command-line arguments to end-users.
        > You could use the Python utilities in your script to
        start/stop it manually.
        >
        >> Yes, I verified that Flink can use Python Kafka IO over
        PROCESS environment with some hacking of the
        ExpansionService as shown in one of the linked PRs (though
        there is probably still some bugs regarding SDF - [3]).
        Adding --experiments seems have the same issues, need expose
        that to the CLI of ExpansionService. And I'm not sure if
        this [4] is not in conflict with
        --experiments=use_deprecated_read. That is something I still
        need to investigate.
        >>
        >> LOOPBACK is currently not supported by Flink. That is
        nice-to-have feature.
        > Local Flink does support LOOPBACK mode. If you just want
        to run
        > locally, just specifying "FlinkRunner" is enough. It's
        distributed
        > Flink that does not. It seems a lot of complexities are
        due to trying
        > to using minikube, which acts like it's distributed, but
        trying to
        > make it as easy as if it were all local (and the docker
        deficiencies
        > as well, which would make it just work...) Which is a
        worthy goal.
        >
        > This is where the runner's ability to customize
        environments would
        > come in handy--e.g. a Java runner could decide to swap out
        the Java
        > docker environment for EMBEDDED or LOOPBACK (and a
        Python-based runner
        > could do the same for the Python docker env).
        >
        >> [1]
        
https://github.com/apache/beam/blob/f9a4bfcb027f2e3a8e32578adf49981aeef3586a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L394
        
<https://github.com/apache/beam/blob/f9a4bfcb027f2e3a8e32578adf49981aeef3586a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L394>
        >>
        >> [2]
        
https://github.com/apache/beam/blob/f9a4bfcb027f2e3a8e32578adf49981aeef3586a/sdks/python/apache_beam/transforms/external.py#L481
        
<https://github.com/apache/beam/blob/f9a4bfcb027f2e3a8e32578adf49981aeef3586a/sdks/python/apache_beam/transforms/external.py#L481>
        >>
        >> [3] https://issues.apache.org/jira/browse/BEAM-11998
        <https://issues.apache.org/jira/browse/BEAM-11998>
        >>
        >> [4]
        
https://github.com/apache/beam/blob/b86fcf94af26a240777f30f8193a314cb7ffc87e/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L398
        
<https://github.com/apache/beam/blob/b86fcf94af26a240777f30f8193a314cb7ffc87e/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L398>
        >>
        >> On 6/30/21 3:57 PM, Chamikara Jayalath wrote:
        >>
        >>
        >>
        >> On Wed, Jun 30, 2021 at 6:54 AM Chamikara Jayalath
        <chamik...@google.com <mailto:chamik...@google.com>> wrote:
        >>>
        >>>
        >>> On Wed, Jun 30, 2021 at 1:20 AM Jan Lukavský
        <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
        >>>> On 6/30/21 1:16 AM, Robert Bradshaw wrote:
        >>>>> <rant>Why doesn't docker in docker just work, rather
        than having to do
        >>>>> ugly hacks when composing two technologies that both
        rely on
        >>>>> docker...</rant>
        >>>>>
        >>>>> Presumably you're setting up a node for Kafka and
        Flink; why not set
        >>>>> one up for the expansion service as well? The UX of
        >>>>>
        >>>>>
         ReadFromKafka(default_expansion_service_args={"defaultEnvironmentType":
        >>>>> "PROCESS", "defaultEnvironmentConfig": ""{\"os\":
        \"linux\", \"arch\":
        >>>>> \"amd64\", \"command\": \"/path/to/launcher/boot
        >>>>> cp=/some/other/long/path\" ...}")"})
        >>>>>
        >>>>> isn't that great either. Rather than pass arbitrary
        arguments to a
        >>>>> default expansion service, I still think once you get
        to this level
        >>>>> it's better to just start your own expansion service.
        >>>> Sure, that is possible (seems to me, that it would
        still require some
        >>>> changes to ExpansionService to be extendable, but yes,
        kind of tiny
        >>>> changes). The problem is not with Flink or Kafka -
        those are
        >>>> technologies you are actually expecting to set up,
        because you want to
        >>>> use them. The problem is what everything else you must
        set up for making
        >>>> something that seems as easy as "read a few messages
        from kafka in beam
        >>>> python" to work. You must have:
        >>>>
        >>>>    a) Python SDK harness (OK, that is something that
        should be probably
        >>>> expected) - there are few problems with it, namely it
        is somewhat
        >>>> hardcoded that it must run in the same pod as Flink's
        taskmanager to be
        >>>> able to use EXTERNAL environment, but ok, let's go on
        >>>>
        >>>>    b) Java SDK harness, at least installed in docker
        image of taskmanager
        >>>> (to be usable via PROCESS environment) - OK, that
        starts to be weird,
        >>>> taskmanager is java, right? Something like LOOPBACK
        would be cool there,
        >>>> but never mind. You create custom docker image for your
        Flink JM and TM
        >>>> and continue.
        >>>>
        >>>>    c) Implement (extend) and deploy own expansion
        service - ouch, that
        >>>> starts to hurt, that is even going to be a pod that is
        running even
        >>>> though there is nothing using it (yes, can be scaled down).
        >>>>
        >>>> The complexity of a simple task starts to be somewhat
        extraordinary. And
        >>>> most of the users will not be willing to follow this
        path, I'm afraid.
        >>>> People generally don't like to set up complex
        environment for something
        >>>> that looks it should "just work". There is non-trivial
        work necessary
        >>>> to make all of this working, mostly when you are
        starting to evaluate
        >>>> Beam and don't have much experience with it.
        >>>
        >>> I don't think we should expect end-users to implement or
        extend the expansion service. Everything should be already
        implemented and maybe we can even provide a script to easily
        startup a local Java expansion service with additional
        parameters.
        >>>
        >>> Today, to start a Java expansion service for Kafka users
        have to do the following.
        >>>
        >>> * Download expansion service jar released with Beam for
        Kafka. For example [1]
        >>>
        >>> * Run following command:
        >>> java -jar beam-sdks-java-io-expansion-service-2.30.0.jar
        <port>
        >>>
        >>> * To use this they just have to provide
        "localhost:<port>" to [2].
        >>>
        >>> This is a few extra steps but mostly a one time setup
        for the user and nothing to do with portability or other
        complexities of Beam.
        >>>
        >>> I'm all for simplifying the user-experience, but adding
        changes to the transform API that might have to be
        deprecated later sounds like a bad idea. I'd much rather
        provide additional scripts/documentation/examples to
        simplify such use-cases. I think that will be adequate for
        most users.
        >>>
        >>> BTW, slightly orthogonal, I don't think multi-language
        would work in LOOPBACK mode today without additional changes
        to portable runners (at least I've never tested this). Did
        you confirm that this works ?
        >>
        >> Or PROCESS mode.
        >>
        >>>
        >>> Thanks,
        >>> Cham
        >>>
        >>> [1]
        
https://repo1.maven.org/maven2/org/apache/beam/beam-sdks-java-io-expansion-service/2.30.0/beam-sdks-java-io-expansion-service-2.30.0.jar
        
<https://repo1.maven.org/maven2/org/apache/beam/beam-sdks-java-io-expansion-service/2.30.0/beam-sdks-java-io-expansion-service-2.30.0.jar>
        >>> [2]
        
https://github.com/apache/beam/blob/b86fcf94af26a240777f30f8193a314cb7ffc87e/sdks/python/apache_beam/io/kafka.py#L133
        
<https://github.com/apache/beam/blob/b86fcf94af26a240777f30f8193a314cb7ffc87e/sdks/python/apache_beam/io/kafka.py#L133>
        >>>
        >>>
        >>>>
        >>>> We can get rid of b) (implement LOOPBACK in Flink) and
        c) (enable Python
        >>>> SDK Kafka IO to spawn expansion service with the
        LOOPBACK environment
        >>>> when submitting to Flink). That is why I still think
        that this
        >>>> simplification matters a lot.
        >>>>
        >>>>> On Tue, Jun 29, 2021 at 3:33 PM Jan Lukavský
        <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
        >>>>>> I believe we could change that more or less the same
        as we can deprecate / stop supporting any other parameter of
        any method. If python starts to support natively Kafka IO,
        then we can simply log warning / raise exception (one after
        the other). That seems like natural development.
        >>>>>>
        >>>>>> Maybe I should have described the case - I'm trying
        to setup a "simple" use-case for users that want to try
        Python SDK to read using Flink from Kafka using Minikube
        (both Kafka and Flink are running inside Minikube). There
        are tons of problems to use docker from within Minkube and I
        would not say that is the "simple" way we would like to
        present to users. Setting up own expansion service is
        possibility - but that also lacks the UX approach. I pretty
        much think that understanding portability on it's own is
        already a burden we put on users (yes, we do that for a
        reason, but everything else should be as simple as possible).
        >>>>>>
        >>>>>> On 6/30/21 12:16 AM, Chamikara Jayalath wrote:
        >>>>>>
        >>>>>> So I think one downside to this PR is that we assume
        that the default expansion service used by the transform
        (Kafka in this case) will not change. Currently it's fully
        opaque. In the default case we just promise that the
        transform will work (if conditions I mentioned above are
        met). Nothing else.
        >>>>>> If we add a "param default_expansion_service_args",
        we leak the nature of the default expansion service to the
        API and it will be hard to change it in the future.
        >>>>>>
        >>>>>> Thanks,
        >>>>>> Cham
        >>>>>>
        >>>>>> On Tue, Jun 29, 2021 at 3:07 PM Jan Lukavský
        <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
        >>>>>>> I would absolutely understand this, if it would be
        mostly impossible or at least really hard to get the user
        friendly behavior. But we are mostly there in this case.
        When we can actually quite simply pass the supported
        environment via parameter, I think we should go for it.
        >>>>>>>
        >>>>>>> I have created a sketch (I verified that when the
        ExpansionService is patched 'enough' this works) in [1].
        This is only a sketch, because we first must know how to
        support the default execution environment in ExpansionService.
        >>>>>>>
        >>>>>>> [1] https://github.com/apache/beam/pull/15099/files
        <https://github.com/apache/beam/pull/15099/files>
        >>>>>>>
        >>>>>>> On 6/29/21 11:51 PM, Chamikara Jayalath wrote:
        >>>>>>>
        >>>>>>>
        >>>>>>>
        >>>>>>> On Tue, Jun 29, 2021 at 2:39 PM Jan Lukavský
        <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
        >>>>>>>> On 6/29/21 11:04 PM, Robert Bradshaw wrote:
        >>>>>>>>> You can configure the environment in the current
        state, you just have
        >>>>>>>>> to run your own expansion service that has a
        different environment
        >>>>>>>>> backed into it (or, makes this configurable).
        >>>>>>>> Yes, that is true. On the other hand that lacks
        some user-friendliness,
        >>>>>>>> because ideally, you don't want to worry about
        expansion services,
        >>>>>>>> mostly when it comes to some mostly standard IO.
        The ideal case is that
        >>>>>>>> you either do not basically know that you use
        external transform (which
        >>>>>>>> is probably the case when you can use docker), or
        you are able to
        >>>>>>>> overcome the problem within the SDK (Python) by
        passing some argument to
        >>>>>>>> the input transform.
        >>>>>>> Arguments passed to the pipeline level apply to the
        whole pipeline (not just one transform). So if you pass in a
        default environment (and configs) at pipeline level, that
        would mean the default environment and configs used by the
        pipeline (so Python SDK in this case) not a specific transform.
        >>>>>>> I believe we have made usage of external transforms
        used-friendly for the general case. But we had to make some
        assumptions. For example we assumed,
        >>>>>>> * user will be using the default environment of the
        expansion service (Docker in this case)
        >>>>>>> * User will be using the pre-specified dependency
        only (sdks:java:io:expansion-service:shadowJar for Kafka)
        >>>>>>> * User will be in an environment where the jar can
        be downloaded.
        >>>>>>>
        >>>>>>> I would consider any use-case where these basic
        assumptions cannot be met as an advanced use-case. The
        solution in such a case would be to start a custom expansion
        service and pass the address of it as a parameter to the
        transform [1]. I'm fine with extending the capabilities of
        Java expansion service by adding more parameters (for
        example, for overriding the environment, for specifying
        dependencies, for providing pipeline options).
        >>>>>>>
        >>>>>>> Thanks,
        >>>>>>> Cham
        >>>>>>>
        >>>>>>> [1]
        
https://github.com/apache/beam/blob/b86fcf94af26a240777f30f8193a314cb7ffc87e/sdks/python/apache_beam/io/kafka.py#L133
        
<https://github.com/apache/beam/blob/b86fcf94af26a240777f30f8193a314cb7ffc87e/sdks/python/apache_beam/io/kafka.py#L133>
        >>>>>>>
        >>>>>>>
        >>>>>>>>> Is option (1) updating the default expansion
        service such that one can
        >>>>>>>>> override default environment properties on the
        command line? (You
        >>>>>>>>> would still have to start it up manually to use it.)
        >>>>>>>> Yes and no. :) Updating ExpansionService so that
        you can specify default
        >>>>>>>> environment on command like makes this accessible to
        >>>>>>>> JavaJarExpansionService, and that makes it possible
        to add (optional)
        >>>>>>>> argument to Python Kafka IO, that would delegate
        this to the
        >>>>>>>> (automatically) started expansion service. It is
        important to note that
        >>>>>>>> both ReadFromKafka and WriteToKafka have expansion
        that involves only
        >>>>>>>> single external (Java) SDK. That simplifies things.
        >>>>>>>>> Maybe it would help to make things more concrete.
        Suppose I have a Go
        >>>>>>>>> pipeline that uses a library which invokes a
        Python external transform
        >>>>>>>>> to do ML (say, via TFX), and two Java IOs (which
        happen to have
        >>>>>>>>> mutually exclusive dependencies). The ML transform
        itself uses Java to
        >>>>>>>>> invoke some SQL.
        >>>>>>>>>
        >>>>>>>>> The way things work currently is each external
        transform will have an
        >>>>>>>>> associated fully specified environment and a
        runner can use docker to
        >>>>>>>>> start up the required workers at the expected time.
        >>>>>>>>>
        >>>>>>>>> Now, suppose one doesn't have docker on the
        workers. One wants to run this with
        >>>>>>>>>
        >>>>>>>>> ./my_pipeline --someFlag=someValue
        --someOtherFlag=someOtherValue ...
        >>>>>>>>>
        >>>>>>>>> such that docker is no longer needed. What
        someFlags would we need,
        >>>>>>>>> and what would their values be? (And how to make
        this feasible to
        >>>>>>>>> implement.)
        >>>>>>>>>
        >>>>>>>>> Are there meaningful intermediate points that
        extend to a general
        >>>>>>>>> solution (or at least aren't hostile to it)?
        >>>>>>>> I believe that in the option 2) the best way would
        to use each SDK's URN
        >>>>>>>> Then the arguments could be something like
        >>>>>>>>
        
"--expansionEnvironments={"apache:beam:go:2.33.0:latest"={"env"="DOCKER",
        >>>>>>>> config="<image>"},
        "apache:beam:python:2.33.0:latest"={env="PROCESS",
        >>>>>>>> config={...}}". Yes, it would require a lot of
        "syntactic sugar" to
        >>>>>>>> configure that. :) (sorry if I don't have URNs for
        SDKs 100% correct)
        >>>>>>>>> I still think in the long run having runners
        understand environments,
        >>>>>>>>> and saying "oh, whenever I see
        'apache:beam:java:2.33.0:latest' I'll
        >>>>>>>>> swap that out for 'path/to/my/java -cp ...' is the
        right way to go
        >>>>>>>>> long-term. (I would put this in runners, not SDKs,
        though a common
        >>>>>>>>> runners library could be used.)
        >>>>>>>> Yes, I also agree, that expansion service should be
        runner-dependent (or
        >>>>>>>> at least runner-aware), as that brings
        optimizations. Runner could
        >>>>>>>> ignore settings from previous point when it can be
        *sure* it can do so.
        >>>>>>>>> On Tue, Jun 29, 2021 at 1:29 PM Jan Lukavský
        <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
        >>>>>>>>>> Thanks for pointing to that thread.
        >>>>>>>>>>
        >>>>>>>>>> 1) I'm - as well as Kyle - fine with the approach
        that we use a
        >>>>>>>>>> "preferred environment" for the expansion
        service. We only need to pass
        >>>>>>>>>> it via command line. Yes, the command line might
        be generally
        >>>>>>>>>> SDK-dependent, and that makes it expansion
        dependent, because whether or
        >>>>>>>>>> not particular transform is "external" or not is
        implementation detail.
        >>>>>>>>>> That is the nasty part. The rest of my original
        question is about, how
        >>>>>>>>>> exactly to do that, because it seems to be
        tricky, due to the fact, that
        >>>>>>>>>> it is not possible to include runtime dependency
        on DirectRunner (fails
        >>>>>>>>>> many, many tests) and it is not possible to
        extract PipelineOptions as a
        >>>>>>>>>> Map either.
        >>>>>>>>>>
        >>>>>>>>>> 2) Regarding SDK injecting environment, I still
        think that is the
        >>>>>>>>>> correct way. The SDK (the driver code) own the
        execution environment. It
        >>>>>>>>>> should be able to define (or at least prioritize)
        runtime environments
        >>>>>>>>>> of all transforms. If we cannot know in advance,
        which transform is
        >>>>>>>>>> going to expand to how many nested (and possibly
        external) transforms, I
        >>>>>>>>>> think that the SDK could be fine with providing a
        Map(SDK ->
        >>>>>>>>>> environment). That is: "Run Java using PROCESS",
        "Run Python using
        >>>>>>>>>> DOCKER", and so on. A default mapping might exist
        on the expansion
        >>>>>>>>>> service as well (which might be passed through
        command line and that is
        >>>>>>>>>> the point 1)). Yes, the Map approach is
        definitely not universal,
        >>>>>>>>>> because one can imagine that the SDK itself is
        not enough for specifying
        >>>>>>>>>> the environment, but seems that vast majority of
        cases would fit into that.
        >>>>>>>>>>
        >>>>>>>>>> 3) The best might be for the SDK to provide a
        list of supported
        >>>>>>>>>> environments with additional metrics which the
        expansion service might
        >>>>>>>>>> choose from.
        >>>>>>>>>>
        >>>>>>>>>> These three approaches are all extensions to the
        current state. Current
        >>>>>>>>>> state has predefined environment without
        possibility to change it.
        >>>>>>>>>> Option 1) changes it to single configurable
        environment, option 2) to N
        >>>>>>>>>> environments based on SDK and option 3) to M
        environments based on
        >>>>>>>>>> SDK-dependent metrics (and/or capabilitites of
        particular environment).
        >>>>>>>>>> Seems like gradual extensions of the current
        state, so maybe we can
        >>>>>>>>>> focus on the first one, and maybe add other, when
        there is a need?
        >>>>>>>>>>
        >>>>>>>>>> If this could be the first conclusion, then the
        next one would be, what
        >>>>>>>>>> should be the preferred way to implement it.
        >>>>>>>>>>
        >>>>>>>>>> WDYT?
        >>>>>>>>>>
        >>>>>>>>>> On 6/29/21 9:15 PM, Robert Bradshaw wrote:
        >>>>>>>>>>> +1, thanks for digging up that thread.
        >>>>>>>>>>>
        >>>>>>>>>>> I am still of the same opinion that I wrote
        there. To touch on some
        >>>>>>>>>>> things brought up here, copying something like
        >>>>>>>>>>> defaultEnvironmentConfig doesn't make sense from
        language to language
        >>>>>>>>>>> (e.g. the docker image name or CLI arguments for
        subprocess mode just
        >>>>>>>>>>> isn't going to work for all of Python, Java, and
        Go, and clearly
        >>>>>>>>>>> embedded type is only going to work for one.)
        >>>>>>>>>>>
        >>>>>>>>>>> In the short term, to change environment (or
        anything else) about the
        >>>>>>>>>>> "default" expansions service, the thing to do is
        build and start your
        >>>>>>>>>>> own expansion service that sets up the
        environment for its transforms
        >>>>>>>>>>> in a custom way.
        >>>>>>>>>>>
        >>>>>>>>>>> FYI, in Python, one can use --beam_services to
        use a custom expansion
        >>>>>>>>>>> service. E.g.
        >>>>>>>>>>>
        >>>>>>>>>>>
        
--beam_services='{":sdks:java:extensions:sql:expansion-service:shadowJar":
        >>>>>>>>>>> "localhost:port"}'
        >>>>>>>>>>>
        >>>>>>>>>>> would override the default one when using
        SqlTransform.
        >>>>>>>>>>>
        >>>>>>>>>>> On Tue, Jun 29, 2021 at 11:47 AM Kyle Weaver
        <kcwea...@google.com <mailto:kcwea...@google.com>> wrote:
        >>>>>>>>>>>> For context, there was a previous thread which
        touched on many of the same points:
        
https://lists.apache.org/thread.html/r6f6fc207ed62e1bf2a1d41deeeab554e35cd2af389ce38289a303cea%40%3Cdev.beam.apache.org%3E
        
<https://lists.apache.org/thread.html/r6f6fc207ed62e1bf2a1d41deeeab554e35cd2af389ce38289a303cea%40%3Cdev.beam.apache.org%3E>
        >>>>>>>>>>>>
        >>>>>>>>>>>> On Tue, Jun 29, 2021 at 11:16 AM Jan Lukavský
        <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
        >>>>>>>>>>>>> I would slightly disagree that this breaks the
        black box nature of the expansion, the "how the transform
        expands" remains unknown to the SDK requesting the
        expansion, the "how the transform executes" - on the other
        hand - is something that the SDK must cooperate on - it
        knows (or could or should know) what is the environment that
        the pipeline is going to be executed on looks like. That is
        why expansion service on its own cannot correctly define the
        execution environment. It could, if it would be bound to
        runner (and its environemnt) - for instance
        FlinkRunnerExpansionService could probably expand KafkaIO to
        something more 'native'. But that requires knowledge of the
        target runner. If the expansion service is not dedicated to
        a runner, the only place where it can be defined, is the SDK
        - and therefore the expansion request.
        >>>>>>>>>>>>>
        >>>>>>>>>>>>>> Power users can always modify the output
        produced by the expansion service as well.
        >>>>>>>>>>>>> I'm not sure if I follow this, do you mean
        that power users, who run the expansion service can modify
        the output? Or is the output (protobuf) of the expansion
        service easily transferable between different execution
        environments?- I had the impression, that execution
        environments do not necessarily have to have the same
        payloads associated with them, and therefore it is
        impossible to 'postprocess' the output of the expansion. Is
        that wrong assumption?
        >>>>>>>>>>>>>
        >>>>>>>>>>>>> On 6/29/21 7:55 PM, Luke Cwik wrote:
        >>>>>>>>>>>>>
        >>>>>>>>>>>>> This would "break" the black box where the
        expansion service is supposed to hide the implementation
        internals from the caller and pushes compatibility of these
        kinds of environment overrides on to the expansion service
        and its implementer.
        >>>>>>>>>>>>>
        >>>>>>>>>>>>> Power users can always modify the output
        produced by the expansion service as well.
        >>>>>>>>>>>>>
        >>>>>>>>>>>>> On Tue, Jun 29, 2021 at 10:08 AM Jan Lukavský
        <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
        >>>>>>>>>>>>>> The argument for being able to accept
        (possibly ordered list of) execution environments is in that
        this could make a single instance of execution service
        reusable by various clients with different requirements.
        Moreover, the two approaches are probably orthogonal - users
        could specify 'defaultExecutionEnvironment' for the service
        which could be used in case when there is no preference
        given by the client.
        >>>>>>>>>>>>>>
        >>>>>>>>>>>>>> On 6/29/21 7:03 PM, Luke Cwik wrote:
        >>>>>>>>>>>>>>
        >>>>>>>>>>>>>> I would be much more inclined for the user
        being able to configure the expansion service for their
        needs instead of changing the expansion service API.
        >>>>>>>>>>>>>>
        >>>>>>>>>>>>>> On Tue, Jun 29, 2021 at 9:42 AM Jan Lukavský
        <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
        >>>>>>>>>>>>>>> If I understand it correctly, there is
        currently no place to set the
        >>>>>>>>>>>>>>> defaultEnvironmentType - python's KafkaIO
        uses either
        >>>>>>>>>>>>>>> 'expansion_service' given by the user (which
        might be a host:port, or an
        >>>>>>>>>>>>>>> object that has appropriate method), or calls
        >>>>>>>>>>>>>>> 'default_io_expansion_service' - which in
        turn runs ExpansionService
        >>>>>>>>>>>>>>> using gradle. Either way, it ends up in
        ExpansionService#main [1]. It
        >>>>>>>>>>>>>>> could be possible to adapt ExpansionService
        and call it locally -
        >>>>>>>>>>>>>>> provided ExpansionService would provide a
        way to extend it (using
        >>>>>>>>>>>>>>> protected method createPipeline()) seems to
        be enough - but that is not
        >>>>>>>>>>>>>>> too much user-friendly. If we could specify
        the defaultEnvironmentConfig
        >>>>>>>>>>>>>>> when starting the ExpansionService, it would
        be possible to add these
        >>>>>>>>>>>>>>> parameters in the python SDK's KafkaIO,
        which would mean users do not
        >>>>>>>>>>>>>>> have to worry about the expansion service at
        all (leaving aside that
        >>>>>>>>>>>>>>> using too many ReafFromKafka or WriteToKafka
        transforms would somewhat
        >>>>>>>>>>>>>>> hurt performance during pipeline build, but
        that applies to the pipeline
        >>>>>>>>>>>>>>> build time only). I have created [2] to
        track that.
        >>>>>>>>>>>>>>>
        >>>>>>>>>>>>>>> Does that make sense, or is my analysis
        incorrect?
        >>>>>>>>>>>>>>>
        >>>>>>>>>>>>>>>       Jan
        >>>>>>>>>>>>>>>
        >>>>>>>>>>>>>>> [1]
        >>>>>>>>>>>>>>>
        
https://github.com/apache/beam/blob/22205ee1a84581e9206c5c61bad88a799779b4bc/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L511
        
<https://github.com/apache/beam/blob/22205ee1a84581e9206c5c61bad88a799779b4bc/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L511>
        >>>>>>>>>>>>>>>
        >>>>>>>>>>>>>>> [2]
        https://issues.apache.org/jira/browse/BEAM-12539
        <https://issues.apache.org/jira/browse/BEAM-12539>
        >>>>>>>>>>>>>>>
        >>>>>>>>>>>>>>>
        >>>>>>>>>>>>>>> On 6/29/21 6:24 PM, Alexey Romanenko wrote:
        >>>>>>>>>>>>>>>> I’m sorry if I missed something but do you
        mean that
        PortablePipelineOptions.setDefaultEnvironmentType(String)
        doesn’t work for you? Or it’s only a specific case while
        using portable KafkaIO?
        >>>>>>>>>>>>>>>>
        >>>>>>>>>>>>>>>>> On 29 Jun 2021, at 09:51, Jan Lukavský
        <x666je...@gmail.com <mailto:x666je...@gmail.com>> wrote:
        >>>>>>>>>>>>>>>>>
        >>>>>>>>>>>>>>>>> Hi,
        >>>>>>>>>>>>>>>>>
        >>>>>>>>>>>>>>>>> I have come across an issue with
        cross-language transforms. My setup is I have working
        environment type PROCESS and I cannot use DOCKER. When I use
        Python's KafkaIO, it unfortunately - by default - expands to
        docker environment, which then fails due to missing 'docker'
        command. I didn't find a solution without tackling the
        expansion service, yet.
        >>>>>>>>>>>>>>>>>
        >>>>>>>>>>>>>>>>> I see several possible solutions to that:
        >>>>>>>>>>>>>>>>>
        >>>>>>>>>>>>>>>>>    a) I would say, that the cleanest
        solution would be to add preferred environment type to the
        expansion request to the expansion service (probably along
        with additional flags, probably --experiments?). This
        requires deeper changes to the expansion RPC defintion,
        probably serializing the PipelineOptions from the client
        environment into the ExpansionRequest.
        >>>>>>>>>>>>>>>>>
        >>>>>>>>>>>>>>>>>    b) Another option would be to allow
        specifying some of the command-line arguments when starting
        the expansion service, which currently accepts only port on
        command line, see [1]. The straightforward 'fix' (see [2])
        unfortunately does not work, because it requires
        DirectRunner to be on the classpath, which then breaks other
        runners (see [3]). It seems possible to copy hand selected
        options from command line to the Pipeline, but that feels
        hackish. It would require to either be able to construct the
        Pipeline without a runner specified (which seems possible
        when calling Pipeline.create(), but not when using
        PipelineOptions create by parsing command-line arguments) or
        to be able to create a Map<String, String> from
        PIpelineOptions and then the ability to copy all options
        into the Pipeline's options.
        >>>>>>>>>>>>>>>>>
        >>>>>>>>>>>>>>>>> My proposal would be to create a hackish
        shortcut and just copy the --defaultEnvironmentType,
        --defaultEnvironmentConfig and --experiments into Pipeline's
        options for now, and create an issue for a proper solution
        (possible a)?).
        >>>>>>>>>>>>>>>>>
        >>>>>>>>>>>>>>>>> WDYT? Or did I miss a way to override the
        default expansion?
        >>>>>>>>>>>>>>>>>
        >>>>>>>>>>>>>>>>> Thanks for comments,
        >>>>>>>>>>>>>>>>>
        >>>>>>>>>>>>>>>>>    Jan
        >>>>>>>>>>>>>>>>>
        >>>>>>>>>>>>>>>>> [1]
        
https://github.com/apache/beam/blob/22205ee1a84581e9206c5c61bad88a799779b4bc/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L511
        
<https://github.com/apache/beam/blob/22205ee1a84581e9206c5c61bad88a799779b4bc/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L511>
        >>>>>>>>>>>>>>>>>
        >>>>>>>>>>>>>>>>> [2]
        https://github.com/apache/beam/pull/15082
        <https://github.com/apache/beam/pull/15082>
        >>>>>>>>>>>>>>>>>
        >>>>>>>>>>>>>>>>> [3]
        https://ci-beam.apache.org/job/beam_PreCommit_Java_Commit/18169/
        <https://ci-beam.apache.org/job/beam_PreCommit_Java_Commit/18169/>
        >>>>>>>>>>>>>>>>>

Reply via email to