This really does not match my experience. Passing the correct "use_deprecated_read" flag to the expansion service had the expected impact on the Flink's execution DAG and - most of all - it started to work (at least seems so). The UI in Flink also started to reflect that and stopped using SDF (no Impulse in the DAG).

On 7/1/21 10:26 PM, Luke Cwik wrote:
There is no implementation for executing UnboundedSource directly within the Java SDK harness, it only supports executing SDFs and UnboundedSource via the wrapper over SDF. The runner would have to execute the source directly itself. It could attempt to deserialize the SDF ptransform and see if there is an UnboundedSource inside and then do whatever it wants with it.

On Thu, Jul 1, 2021 at 11:39 AM Jan Lukavský <[email protected] <mailto:[email protected]>> wrote:

    I don't have complete comprehension of the topic, but from what I
    have observed, the runner gets (possibly cross-language) proto
    description of the pipeline, and the post-processing there might
    be limited.  That is mainly due to the fact, that we have inverted
    the expansion flow - we expand Read to SDF and only when
    "use_deprecated_read" is on, we revert it back to UnboundedSource.
    The portable runner cannot interfere with that.

    On 7/1/21 8:31 PM, Kyle Weaver wrote:
    I thought the runner was expected to
    call convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary.
    Why do we need to do that in the expansion service?

    On Thu, Jul 1, 2021 at 11:16 AM Jan Lukavský <[email protected]
    <mailto:[email protected]>> wrote:

        Hi,

        after today's experience I think I have some arguments about
        why we *should* pass (at least some) of the PipelineOptions
        from SDK to expansion service.

         1) there are lots, and lots, and lots of bugs around SDF and
        around the "use_deprecated_read", sorry, but the switch to
        SDF as the default *way* too premature

         2) therefore, the expansion *is* runner dependent (because
        whether to use "use_deprecated_read" or not is runner
        dependent), only the client of the expansion service (the
        SDK, the driver code) knows the target runner - i.e. if the
        target runner can use "new" Read or "deprecated" Read

         3) currently, my opinion is that we hold many portable Flink
        users on 2.24.0, because from 2.25.0, the combination of
        Kafka + Python SDK + Flink is simply not working - until now,
        there is no way to pass arguments to expansion service, and
        even after that, "use_deprecated_read" is simply ignored by
        the service (pretty much the same as was in DirectRunner, see
        [1])

        We should consider making use_deprecated_read the default for
        Flink (at least), not sure what is the state of other runners
        regarding that. It would be good to rename it, if we do not
        have plans to correctly support SDF (Read), including
        portability of other runners.

        Yes, this might be a temporary issue, but the fact, that
        expansion is runner dependent remains valid, because such
        situation might reappear.

         Jan

        [1]
        
https://github.com/apache/beam/pull/15082/commits/5a46664ceb9f03da3089925b30ecd0a802e8b3eb
        
<https://github.com/apache/beam/pull/15082/commits/5a46664ceb9f03da3089925b30ecd0a802e8b3eb>

        On 7/1/21 9:33 AM, Jan Lukavský wrote:
        On 7/1/21 3:26 AM, Kyle Weaver wrote:

            I think it should accept complete list of
            PipelineOptions (or at least some defined subset -
            PortabilityPipelineOptions, ExperimentalOptions, ...?)


        I'm not totally opposed to redefining some options, either.
        Using PipelineOptions could be confusing because only very
        few options would actually be respected -- even
        PortablePipelineOptions includes many options that wouldn't
        make sense in this context. Maybe better to have a small
        list of options that are guaranteed to work.

        That makes sense. How would we define the subset? I think
        that would probably require some sort of annotation
        analogous to @Validation.Required, maybe
        @Validation.ExpansionSupported or similar. I'm fine with
        implementing that, but I would need now to get the 'hotfix'
        to upcoming 2.32.0 release. Could we make that for 2.33.0?
        Will you help me review the current PR [1]?

        [1] https://github.com/apache/beam/pull/15082
        <https://github.com/apache/beam/pull/15082>


        On Wed, Jun 30, 2021 at 8:48 AM Jan Lukavský
        <[email protected] <mailto:[email protected]>> wrote:

             > Not sure why we need the hacks with NoOpRunner

            As noted earlier (and that was why I started this
            thread in the first
            place :)), adding :runners:direct-java as runtime
            dependency of the
            expansion service causes something like 200 tests in
            pre-commit to fail.
            Looks like there is some kind of conflict among Flink
            and Direct runner.
            I didn't dig too deep into that, though.

             > You could use the Python utilities in your script to
            start/stop it
            manually.

            Yes, that is possible. I'll probably follow that path.

             > This is where the runner's ability to customize
            environments would
            come in handy--e.g. a Java runner could decide to swap
            out the Java
            docker environment for EMBEDDED or LOOPBACK (and a
            Python-based runner
            could do the same for the Python docker env).

            That would be just perfect, as that would make it
            possible to finally
            unify 'classical' and 'portable' runners. But that is a
            whole different
            story. :)

              Jan

            On 6/30/21 5:35 PM, Robert Bradshaw wrote:
            > On Wed, Jun 30, 2021 at 7:41 AM Jan Lukavský
            <[email protected] <mailto:[email protected]>> wrote:
            >>> java -jar
            beam-sdks-java-io-expansion-service-2.30.0.jar <port>
            >> This does not accept any other parameters than the
            port. That is the first part of this thread - the
            intent was to enable this to accept additional
            arguments, but there are (still waiting to be addressed
            unresolved) issues. There currently even seems to be no
            other way to adapt ExpansionService than to copy&paste
            the code and modify it, because it simply is not
            extensible. What would be enough is wrapping
            Pipeline.create() [1] call to a protected method, or
            add (protected) constructor that would accept
            PipelineOptions (probably better in this regard). That
            would make it more easy for users to create customized
            ExpansionService and it would (sort of) help solving
            described issues.
            > Yes, let's make it easy to extend/customize/start up
            a custom
            > ExpansionService, including adding optional command
            line arguments to
            > the "default" one. Not sure why we need the hacks
            with NoOpRunner
            > (IMHO, the direct runner should just be part of the
            SDK, but that's
            > not where we live now).
            >
            >> But even if we do that, we still need to deal with
            the expansion service on two places:
            >>
            >>   a) run it (and stop it)
            >>
            >>   b) specify it in the
            >>
            >> Using the default expansion service is much, much
            easier, it is started and stopped automatically for the
            user. Morever, the JavaJarExpansionService actually
            even presumes that there can be additional arguments
            passed to the service ([2]), the ExpansionService only
            does not accept them (and kafka IO does not expose that
            - that could be worked-around by users by manually
            creating the JavaJarExpansionService from own jar,
            yes). I would find it natural to add the command-line
            parsing (somehow!) to the ExpansionService itself, so
            that it doesn't need end-user modifications and then to
            figure out how to most easily expose there command-line
            arguments to end-users.
            > You could use the Python utilities in your script to
            start/stop it manually.
            >
            >> Yes, I verified that Flink can use Python Kafka IO
            over PROCESS environment with some hacking of the
            ExpansionService as shown in one of the linked PRs
            (though there is probably still some bugs regarding SDF
            - [3]). Adding --experiments seems have the same
            issues, need expose that to the CLI of
            ExpansionService. And I'm not sure if this [4] is not
            in conflict with --experiments=use_deprecated_read.
            That is something I still need to investigate.
            >>
            >> LOOPBACK is currently not supported by Flink. That
            is nice-to-have feature.
            > Local Flink does support LOOPBACK mode. If you just
            want to run
            > locally, just specifying "FlinkRunner" is enough.
            It's distributed
            > Flink that does not. It seems a lot of complexities
            are due to trying
            > to using minikube, which acts like it's distributed,
            but trying to
            > make it as easy as if it were all local (and the
            docker deficiencies
            > as well, which would make it just work...) Which is a
            worthy goal.
            >
            > This is where the runner's ability to customize
            environments would
            > come in handy--e.g. a Java runner could decide to
            swap out the Java
            > docker environment for EMBEDDED or LOOPBACK (and a
            Python-based runner
            > could do the same for the Python docker env).
            >
            >> [1]
            
https://github.com/apache/beam/blob/f9a4bfcb027f2e3a8e32578adf49981aeef3586a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L394
            
<https://github.com/apache/beam/blob/f9a4bfcb027f2e3a8e32578adf49981aeef3586a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L394>
            >>
            >> [2]
            
https://github.com/apache/beam/blob/f9a4bfcb027f2e3a8e32578adf49981aeef3586a/sdks/python/apache_beam/transforms/external.py#L481
            
<https://github.com/apache/beam/blob/f9a4bfcb027f2e3a8e32578adf49981aeef3586a/sdks/python/apache_beam/transforms/external.py#L481>
            >>
            >> [3] https://issues.apache.org/jira/browse/BEAM-11998
            <https://issues.apache.org/jira/browse/BEAM-11998>
            >>
            >> [4]
            
https://github.com/apache/beam/blob/b86fcf94af26a240777f30f8193a314cb7ffc87e/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L398
            
<https://github.com/apache/beam/blob/b86fcf94af26a240777f30f8193a314cb7ffc87e/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L398>
            >>
            >> On 6/30/21 3:57 PM, Chamikara Jayalath wrote:
            >>
            >>
            >>
            >> On Wed, Jun 30, 2021 at 6:54 AM Chamikara Jayalath
            <[email protected] <mailto:[email protected]>> wrote:
            >>>
            >>>
            >>> On Wed, Jun 30, 2021 at 1:20 AM Jan Lukavský
            <[email protected] <mailto:[email protected]>> wrote:
            >>>> On 6/30/21 1:16 AM, Robert Bradshaw wrote:
            >>>>> <rant>Why doesn't docker in docker just work,
            rather than having to do
            >>>>> ugly hacks when composing two technologies that
            both rely on
            >>>>> docker...</rant>
            >>>>>
            >>>>> Presumably you're setting up a node for Kafka and
            Flink; why not set
            >>>>> one up for the expansion service as well? The UX of
            >>>>>
            >>>>>
             
ReadFromKafka(default_expansion_service_args={"defaultEnvironmentType":
            >>>>> "PROCESS", "defaultEnvironmentConfig": ""{\"os\":
            \"linux\", \"arch\":
            >>>>> \"amd64\", \"command\": \"/path/to/launcher/boot
            >>>>> cp=/some/other/long/path\" ...}")"})
            >>>>>
            >>>>> isn't that great either. Rather than pass
            arbitrary arguments to a
            >>>>> default expansion service, I still think once you
            get to this level
            >>>>> it's better to just start your own expansion service.
            >>>> Sure, that is possible (seems to me, that it would
            still require some
            >>>> changes to ExpansionService to be extendable, but
            yes, kind of tiny
            >>>> changes). The problem is not with Flink or Kafka -
            those are
            >>>> technologies you are actually expecting to set up,
            because you want to
            >>>> use them. The problem is what everything else you
            must set up for making
            >>>> something that seems as easy as "read a few
            messages from kafka in beam
            >>>> python" to work. You must have:
            >>>>
            >>>>    a) Python SDK harness (OK, that is something
            that should be probably
            >>>> expected) - there are few problems with it, namely
            it is somewhat
            >>>> hardcoded that it must run in the same pod as
            Flink's taskmanager to be
            >>>> able to use EXTERNAL environment, but ok, let's go on
            >>>>
            >>>>    b) Java SDK harness, at least installed in
            docker image of taskmanager
            >>>> (to be usable via PROCESS environment) - OK, that
            starts to be weird,
            >>>> taskmanager is java, right? Something like
            LOOPBACK would be cool there,
            >>>> but never mind. You create custom docker image for
            your Flink JM and TM
            >>>> and continue.
            >>>>
            >>>>    c) Implement (extend) and deploy own expansion
            service - ouch, that
            >>>> starts to hurt, that is even going to be a pod
            that is running even
            >>>> though there is nothing using it (yes, can be
            scaled down).
            >>>>
            >>>> The complexity of a simple task starts to be
            somewhat extraordinary. And
            >>>> most of the users will not be willing to follow
            this path, I'm afraid.
            >>>> People generally don't like to set up complex
            environment for something
            >>>> that looks it should "just work".  There is
            non-trivial work necessary
            >>>> to make all of this working, mostly when you are
            starting to evaluate
            >>>> Beam and don't have much experience with it.
            >>>
            >>> I don't think we should expect end-users to
            implement or extend the expansion service. Everything
            should be already implemented and maybe we can even
            provide a script to easily startup a local Java
            expansion service with additional parameters.
            >>>
            >>> Today, to start a Java expansion service for Kafka
            users have to do the following.
            >>>
            >>> * Download expansion service jar released with Beam
            for Kafka. For example [1]
            >>>
            >>> * Run following command:
            >>> java -jar
            beam-sdks-java-io-expansion-service-2.30.0.jar <port>
            >>>
            >>> * To use this they just have to provide
            "localhost:<port>" to [2].
            >>>
            >>> This is a few extra steps but mostly a one time
            setup for the user and nothing to do with portability
            or other complexities of Beam.
            >>>
            >>> I'm all for simplifying the user-experience, but
            adding changes to the transform API that might have to
            be deprecated later sounds like a bad idea. I'd much
            rather provide additional
            scripts/documentation/examples to simplify such
            use-cases. I think that will be adequate for most users.
            >>>
            >>> BTW, slightly orthogonal, I don't think
            multi-language would work in LOOPBACK mode today
            without additional changes to portable runners (at
            least I've never tested this). Did you confirm that
            this works ?
            >>
            >> Or PROCESS mode.
            >>
            >>>
            >>> Thanks,
            >>> Cham
            >>>
            >>> [1]
            
https://repo1.maven.org/maven2/org/apache/beam/beam-sdks-java-io-expansion-service/2.30.0/beam-sdks-java-io-expansion-service-2.30.0.jar
            
<https://repo1.maven.org/maven2/org/apache/beam/beam-sdks-java-io-expansion-service/2.30.0/beam-sdks-java-io-expansion-service-2.30.0.jar>
            >>> [2]
            
https://github.com/apache/beam/blob/b86fcf94af26a240777f30f8193a314cb7ffc87e/sdks/python/apache_beam/io/kafka.py#L133
            
<https://github.com/apache/beam/blob/b86fcf94af26a240777f30f8193a314cb7ffc87e/sdks/python/apache_beam/io/kafka.py#L133>
            >>>
            >>>
            >>>>
            >>>> We can get rid of b) (implement LOOPBACK in Flink)
            and c) (enable Python
            >>>> SDK Kafka IO to spawn expansion service with the
            LOOPBACK environment
            >>>> when submitting to Flink). That is why I still
            think that this
            >>>> simplification matters a lot.
            >>>>
            >>>>> On Tue, Jun 29, 2021 at 3:33 PM Jan Lukavský
            <[email protected] <mailto:[email protected]>> wrote:
            >>>>>> I believe we could change that more or less the
            same as we can deprecate / stop supporting any other
            parameter of any method. If python starts to support
            natively Kafka IO, then we can simply log warning /
            raise exception (one after the other). That seems like
            natural development.
            >>>>>>
            >>>>>> Maybe I should have described the case - I'm
            trying to setup a "simple" use-case for users that want
            to try Python SDK to read using Flink from Kafka using
            Minikube (both Kafka and Flink are running inside
            Minikube). There are tons of problems to use docker
            from within Minkube and I would not say that is the
            "simple" way we would like to present to users. Setting
            up own expansion service is possibility - but that also
            lacks the UX approach. I pretty much think that
            understanding portability on it's own is already a
            burden we put on users (yes, we do that for a reason,
            but everything else should be as simple as possible).
            >>>>>>
            >>>>>> On 6/30/21 12:16 AM, Chamikara Jayalath wrote:
            >>>>>>
            >>>>>> So I think one downside to this PR is that we
            assume that the default expansion service used by the
            transform (Kafka in this case) will not change.
            Currently it's fully opaque. In the default case we
            just promise that the transform will work (if
            conditions I mentioned above are met). Nothing else.
            >>>>>> If we add a "param
            default_expansion_service_args", we leak the nature of
            the default expansion service to the API and it will be
            hard to change it in the future.
            >>>>>>
            >>>>>> Thanks,
            >>>>>> Cham
            >>>>>>
            >>>>>> On Tue, Jun 29, 2021 at 3:07 PM Jan Lukavský
            <[email protected] <mailto:[email protected]>> wrote:
            >>>>>>> I would absolutely understand this, if it would
            be mostly impossible or at least really hard to get the
            user friendly behavior. But we are mostly there in this
            case. When we can actually quite simply pass the
            supported environment via parameter, I think we should
            go for it.
            >>>>>>>
            >>>>>>> I have created a sketch (I verified that when
            the ExpansionService is patched 'enough' this works) in
            [1]. This is only a sketch, because we first must know
            how to support the default execution environment in
            ExpansionService.
            >>>>>>>
            >>>>>>> [1]
            https://github.com/apache/beam/pull/15099/files
            <https://github.com/apache/beam/pull/15099/files>
            >>>>>>>
            >>>>>>> On 6/29/21 11:51 PM, Chamikara Jayalath wrote:
            >>>>>>>
            >>>>>>>
            >>>>>>>
            >>>>>>> On Tue, Jun 29, 2021 at 2:39 PM Jan Lukavský
            <[email protected] <mailto:[email protected]>> wrote:
            >>>>>>>> On 6/29/21 11:04 PM, Robert Bradshaw wrote:
            >>>>>>>>> You can configure the environment in the
            current state, you just have
            >>>>>>>>> to run your own expansion service that has a
            different environment
            >>>>>>>>> backed into it (or, makes this configurable).
            >>>>>>>> Yes, that is true. On the other hand that
            lacks some user-friendliness,
            >>>>>>>> because ideally, you don't want to worry about
            expansion services,
            >>>>>>>> mostly when it comes to some mostly standard
            IO. The ideal case is that
            >>>>>>>> you either do not basically know that you use
            external transform (which
            >>>>>>>> is probably the case when you can use docker),
            or you are able to
            >>>>>>>> overcome the problem within the SDK (Python)
            by passing some argument to
            >>>>>>>> the input transform.
            >>>>>>> Arguments passed to the pipeline level apply to
            the whole pipeline (not just one transform). So if you
            pass in a default environment (and configs) at pipeline
            level, that would mean the default environment and
            configs used by the pipeline (so Python SDK in this
            case) not a specific transform.
            >>>>>>> I believe we have made usage of external
            transforms used-friendly for the general case. But we
            had to make some assumptions. For example we assumed,
            >>>>>>> * user will be using the default environment of
            the expansion service (Docker in this case)
            >>>>>>> * User will be using the pre-specified
            dependency only
            (sdks:java:io:expansion-service:shadowJar for Kafka)
            >>>>>>> * User will be in an environment where the jar
            can be downloaded.
            >>>>>>>
            >>>>>>> I would consider any use-case where these basic
            assumptions cannot be met as an advanced use-case. The
            solution in such a case would be to start a custom
            expansion service and pass the address of it as a
            parameter to the transform [1]. I'm fine with extending
            the capabilities of Java expansion service by adding
            more parameters (for example, for overriding the
            environment, for specifying dependencies, for providing
            pipeline options).
            >>>>>>>
            >>>>>>> Thanks,
            >>>>>>> Cham
            >>>>>>>
            >>>>>>> [1]
            
https://github.com/apache/beam/blob/b86fcf94af26a240777f30f8193a314cb7ffc87e/sdks/python/apache_beam/io/kafka.py#L133
            
<https://github.com/apache/beam/blob/b86fcf94af26a240777f30f8193a314cb7ffc87e/sdks/python/apache_beam/io/kafka.py#L133>
            >>>>>>>
            >>>>>>>
            >>>>>>>>> Is option (1) updating the default expansion
            service such that one can
            >>>>>>>>> override default environment properties on
            the command line? (You
            >>>>>>>>> would still have to start it up manually to
            use it.)
            >>>>>>>> Yes and no. :) Updating ExpansionService so
            that you can specify default
            >>>>>>>> environment on command like makes this
            accessible to
            >>>>>>>> JavaJarExpansionService, and that makes it
            possible to add (optional)
            >>>>>>>> argument to Python Kafka IO, that would
            delegate this to the
            >>>>>>>> (automatically) started expansion service. It
            is important to note that
            >>>>>>>> both ReadFromKafka and WriteToKafka have
            expansion that involves only
            >>>>>>>> single external (Java) SDK. That simplifies
            things.
            >>>>>>>>> Maybe it would help to make things more
            concrete. Suppose I have a Go
            >>>>>>>>> pipeline that uses a library which invokes a
            Python external transform
            >>>>>>>>> to do ML (say, via TFX), and two Java IOs
            (which happen to have
            >>>>>>>>> mutually exclusive dependencies). The ML
            transform itself uses Java to
            >>>>>>>>> invoke some SQL.
            >>>>>>>>>
            >>>>>>>>> The way things work currently is each
            external transform will have an
            >>>>>>>>> associated fully specified environment and a
            runner can use docker to
            >>>>>>>>> start up the required workers at the expected
            time.
            >>>>>>>>>
            >>>>>>>>> Now, suppose one doesn't have docker on the
            workers. One wants to run this with
            >>>>>>>>>
            >>>>>>>>> ./my_pipeline --someFlag=someValue
            --someOtherFlag=someOtherValue ...
            >>>>>>>>>
            >>>>>>>>> such that docker is no longer needed. What
            someFlags would we need,
            >>>>>>>>> and what would their values be? (And how to
            make this feasible to
            >>>>>>>>> implement.)
            >>>>>>>>>
            >>>>>>>>> Are there meaningful intermediate points that
            extend to a general
            >>>>>>>>> solution (or at least aren't hostile to it)?
            >>>>>>>> I believe that in the option 2) the best way
            would to use each SDK's URN
            >>>>>>>> Then the arguments could be something like
            >>>>>>>>
            
"--expansionEnvironments={"apache:beam:go:2.33.0:latest"={"env"="DOCKER",
            >>>>>>>> config="<image>"},
            "apache:beam:python:2.33.0:latest"={env="PROCESS",
            >>>>>>>> config={...}}". Yes, it would require a lot of
            "syntactic sugar" to
            >>>>>>>> configure that. :) (sorry if I don't have URNs
            for SDKs 100% correct)
            >>>>>>>>> I still think in the long run having runners
            understand environments,
            >>>>>>>>> and saying "oh, whenever I see
            'apache:beam:java:2.33.0:latest' I'll
            >>>>>>>>> swap that out for 'path/to/my/java -cp ...'
            is the right way to go
            >>>>>>>>> long-term. (I would put this in runners, not
            SDKs, though a common
            >>>>>>>>> runners library could be used.)
            >>>>>>>> Yes, I also agree, that expansion service
            should be runner-dependent (or
            >>>>>>>> at least runner-aware), as that brings
            optimizations. Runner could
            >>>>>>>> ignore settings from previous point when it
            can be *sure* it can do so.
            >>>>>>>>> On Tue, Jun 29, 2021 at 1:29 PM Jan Lukavský
            <[email protected] <mailto:[email protected]>> wrote:
            >>>>>>>>>> Thanks for pointing to that thread.
            >>>>>>>>>>
            >>>>>>>>>> 1) I'm - as well as Kyle - fine with the
            approach that we use a
            >>>>>>>>>> "preferred environment" for the expansion
            service. We only need to pass
            >>>>>>>>>> it via command line. Yes, the command line
            might be generally
            >>>>>>>>>> SDK-dependent, and that makes it expansion
            dependent, because whether or
            >>>>>>>>>> not particular transform is "external" or
            not is implementation detail.
            >>>>>>>>>> That is the nasty part. The rest of my
            original question is about, how
            >>>>>>>>>> exactly to do that, because it seems to be
            tricky, due to the fact, that
            >>>>>>>>>> it is not possible to include runtime
            dependency on DirectRunner (fails
            >>>>>>>>>> many, many tests) and it is not possible to
            extract PipelineOptions as a
            >>>>>>>>>> Map either.
            >>>>>>>>>>
            >>>>>>>>>> 2) Regarding SDK injecting environment, I
            still think that is the
            >>>>>>>>>> correct way. The SDK (the driver code) own
            the execution environment. It
            >>>>>>>>>> should be able to define (or at least
            prioritize) runtime environments
            >>>>>>>>>> of all transforms. If we cannot know in
            advance, which transform is
            >>>>>>>>>> going to expand to how many nested (and
            possibly external) transforms, I
            >>>>>>>>>> think that the SDK could be fine with
            providing a Map(SDK ->
            >>>>>>>>>> environment). That is: "Run Java using
            PROCESS", "Run Python using
            >>>>>>>>>> DOCKER", and so on. A default mapping might
            exist on the expansion
            >>>>>>>>>> service as well (which might be passed
            through command line and that is
            >>>>>>>>>> the point 1)). Yes, the Map approach is
            definitely not universal,
            >>>>>>>>>> because one can imagine that the SDK itself
            is not enough for specifying
            >>>>>>>>>> the environment, but seems that vast
            majority of cases would fit into that.
            >>>>>>>>>>
            >>>>>>>>>> 3) The best might be for the SDK to provide
            a list of supported
            >>>>>>>>>> environments with additional metrics which
            the expansion service might
            >>>>>>>>>> choose from.
            >>>>>>>>>>
            >>>>>>>>>> These three approaches are all extensions to
            the current state. Current
            >>>>>>>>>> state has predefined environment without
            possibility to change it.
            >>>>>>>>>> Option 1) changes it to single configurable
            environment, option 2) to N
            >>>>>>>>>> environments based on SDK and option 3) to M
            environments based on
            >>>>>>>>>> SDK-dependent metrics (and/or capabilitites
            of particular environment).
            >>>>>>>>>> Seems like gradual extensions of the current
            state, so maybe we can
            >>>>>>>>>> focus on the first one, and maybe add other,
            when there is a need?
            >>>>>>>>>>
            >>>>>>>>>> If this could be the first conclusion, then
            the next one would be, what
            >>>>>>>>>> should be the preferred way to implement it.
            >>>>>>>>>>
            >>>>>>>>>> WDYT?
            >>>>>>>>>>
            >>>>>>>>>> On 6/29/21 9:15 PM, Robert Bradshaw wrote:
            >>>>>>>>>>> +1, thanks for digging up that thread.
            >>>>>>>>>>>
            >>>>>>>>>>> I am still of the same opinion that I wrote
            there. To touch on some
            >>>>>>>>>>> things brought up here, copying something like
            >>>>>>>>>>> defaultEnvironmentConfig doesn't make sense
            from language to language
            >>>>>>>>>>> (e.g. the docker image name or CLI
            arguments for subprocess mode just
            >>>>>>>>>>> isn't going to work for all of Python,
            Java, and Go, and clearly
            >>>>>>>>>>> embedded type is only going to work for one.)
            >>>>>>>>>>>
            >>>>>>>>>>> In the short term, to change environment
            (or anything else) about the
            >>>>>>>>>>> "default" expansions service, the thing to
            do is build and start your
            >>>>>>>>>>> own expansion service that sets up the
            environment for its transforms
            >>>>>>>>>>> in a custom way.
            >>>>>>>>>>>
            >>>>>>>>>>> FYI, in Python, one can use --beam_services
            to use a custom expansion
            >>>>>>>>>>> service. E.g.
            >>>>>>>>>>>
            >>>>>>>>>>>
            
--beam_services='{":sdks:java:extensions:sql:expansion-service:shadowJar":
            >>>>>>>>>>> "localhost:port"}'
            >>>>>>>>>>>
            >>>>>>>>>>> would override the default one when using
            SqlTransform.
            >>>>>>>>>>>
            >>>>>>>>>>> On Tue, Jun 29, 2021 at 11:47 AM Kyle
            Weaver <[email protected]
            <mailto:[email protected]>> wrote:
            >>>>>>>>>>>> For context, there was a previous thread
            which touched on many of the same points:
            
https://lists.apache.org/thread.html/r6f6fc207ed62e1bf2a1d41deeeab554e35cd2af389ce38289a303cea%40%3Cdev.beam.apache.org%3E
            
<https://lists.apache.org/thread.html/r6f6fc207ed62e1bf2a1d41deeeab554e35cd2af389ce38289a303cea%40%3Cdev.beam.apache.org%3E>
            >>>>>>>>>>>>
            >>>>>>>>>>>> On Tue, Jun 29, 2021 at 11:16 AM Jan
            Lukavský <[email protected] <mailto:[email protected]>> wrote:
            >>>>>>>>>>>>> I would slightly disagree that this
            breaks the black box nature of the expansion, the "how
            the transform expands" remains unknown to the SDK
            requesting the expansion, the "how the transform
            executes" - on the other hand - is something that the
            SDK must cooperate on - it knows (or could or should
            know) what is the environment that the pipeline is
            going to be executed on looks like. That is why
            expansion service on its own cannot correctly define
            the execution environment. It could, if it would be
            bound to runner (and its environemnt) - for instance
            FlinkRunnerExpansionService could probably expand
            KafkaIO to something more 'native'. But that requires
            knowledge of the target runner. If the expansion
            service is not dedicated to a runner, the only place
            where it can be defined, is the SDK - and therefore the
            expansion request.
            >>>>>>>>>>>>>
            >>>>>>>>>>>>>> Power users can always modify the output
            produced by the expansion service as well.
            >>>>>>>>>>>>> I'm not sure if I follow this, do you
            mean that power users, who run the expansion service
            can modify the output? Or is the output (protobuf) of
            the expansion service easily transferable between
            different execution environments?- I had the
            impression, that execution environments do not
            necessarily have to have the same payloads associated
            with them, and therefore it is impossible to
            'postprocess' the output of the expansion. Is that
            wrong assumption?
            >>>>>>>>>>>>>
            >>>>>>>>>>>>> On 6/29/21 7:55 PM, Luke Cwik wrote:
            >>>>>>>>>>>>>
            >>>>>>>>>>>>> This would "break" the black box where
            the expansion service is supposed to hide the
            implementation internals from the caller and pushes
            compatibility of these kinds of environment overrides
            on to the expansion service and its implementer.
            >>>>>>>>>>>>>
            >>>>>>>>>>>>> Power users can always modify the output
            produced by the expansion service as well.
            >>>>>>>>>>>>>
            >>>>>>>>>>>>> On Tue, Jun 29, 2021 at 10:08 AM Jan
            Lukavský <[email protected] <mailto:[email protected]>> wrote:
            >>>>>>>>>>>>>> The argument for being able to accept
            (possibly ordered list of) execution environments is in
            that this could make a single instance of execution
            service reusable by various clients with different
            requirements. Moreover, the two approaches are probably
            orthogonal - users could specify
            'defaultExecutionEnvironment' for the service which
            could be used in case when there is no preference given
            by the client.
            >>>>>>>>>>>>>>
            >>>>>>>>>>>>>> On 6/29/21 7:03 PM, Luke Cwik wrote:
            >>>>>>>>>>>>>>
            >>>>>>>>>>>>>> I would be much more inclined for the
            user being able to configure the expansion service for
            their needs instead of changing the expansion service API.
            >>>>>>>>>>>>>>
            >>>>>>>>>>>>>> On Tue, Jun 29, 2021 at 9:42 AM Jan
            Lukavský <[email protected] <mailto:[email protected]>> wrote:
            >>>>>>>>>>>>>>> If I understand it correctly, there is
            currently no place to set the
            >>>>>>>>>>>>>>> defaultEnvironmentType - python's
            KafkaIO uses either
            >>>>>>>>>>>>>>> 'expansion_service' given by the user
            (which might be a host:port, or an
            >>>>>>>>>>>>>>> object that has appropriate method), or
            calls
            >>>>>>>>>>>>>>> 'default_io_expansion_service' - which
            in turn runs ExpansionService
            >>>>>>>>>>>>>>> using gradle. Either way, it ends up in
            ExpansionService#main [1]. It
            >>>>>>>>>>>>>>> could be possible to adapt
            ExpansionService and call it locally -
            >>>>>>>>>>>>>>> provided ExpansionService would provide
            a way to extend it (using
            >>>>>>>>>>>>>>> protected method createPipeline())
            seems to be enough - but that is not
            >>>>>>>>>>>>>>> too much user-friendly. If we could
            specify the defaultEnvironmentConfig
            >>>>>>>>>>>>>>> when starting the ExpansionService, it
            would be possible to add these
            >>>>>>>>>>>>>>> parameters in the python SDK's KafkaIO,
            which would mean users do not
            >>>>>>>>>>>>>>> have to worry about the expansion
            service at all (leaving aside that
            >>>>>>>>>>>>>>> using too many ReafFromKafka or
            WriteToKafka transforms would somewhat
            >>>>>>>>>>>>>>> hurt performance during pipeline build,
            but that applies to the pipeline
            >>>>>>>>>>>>>>> build time only). I have created [2] to
            track that.
            >>>>>>>>>>>>>>>
            >>>>>>>>>>>>>>> Does that make sense, or is my analysis
            incorrect?
            >>>>>>>>>>>>>>>
            >>>>>>>>>>>>>>>       Jan
            >>>>>>>>>>>>>>>
            >>>>>>>>>>>>>>> [1]
            >>>>>>>>>>>>>>>
            
https://github.com/apache/beam/blob/22205ee1a84581e9206c5c61bad88a799779b4bc/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L511
            
<https://github.com/apache/beam/blob/22205ee1a84581e9206c5c61bad88a799779b4bc/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L511>
            >>>>>>>>>>>>>>>
            >>>>>>>>>>>>>>> [2]
            https://issues.apache.org/jira/browse/BEAM-12539
            <https://issues.apache.org/jira/browse/BEAM-12539>
            >>>>>>>>>>>>>>>
            >>>>>>>>>>>>>>>
            >>>>>>>>>>>>>>> On 6/29/21 6:24 PM, Alexey Romanenko wrote:
            >>>>>>>>>>>>>>>> I’m sorry if I missed something but do
            you mean that
            PortablePipelineOptions.setDefaultEnvironmentType(String)
            doesn’t work for you? Or it’s only a specific case
            while using portable KafkaIO?
            >>>>>>>>>>>>>>>>
            >>>>>>>>>>>>>>>>> On 29 Jun 2021, at 09:51, Jan
            Lukavský <[email protected]
            <mailto:[email protected]>> wrote:
            >>>>>>>>>>>>>>>>>
            >>>>>>>>>>>>>>>>> Hi,
            >>>>>>>>>>>>>>>>>
            >>>>>>>>>>>>>>>>> I have come across an issue with
            cross-language transforms. My setup is I have working
            environment type PROCESS and I cannot use DOCKER. When
            I use Python's KafkaIO, it unfortunately - by default -
            expands to docker environment, which then fails due to
            missing 'docker' command. I didn't find a solution
            without tackling the expansion service, yet.
            >>>>>>>>>>>>>>>>>
            >>>>>>>>>>>>>>>>> I see several possible solutions to that:
            >>>>>>>>>>>>>>>>>
            >>>>>>>>>>>>>>>>>    a) I would say, that the cleanest
            solution would be to add preferred environment type to
            the expansion request to the expansion service
            (probably along with additional flags, probably
            --experiments?). This requires deeper changes to the
            expansion RPC defintion, probably serializing the
            PipelineOptions from the client environment into the
            ExpansionRequest.
            >>>>>>>>>>>>>>>>>
            >>>>>>>>>>>>>>>>>    b) Another option would be to
            allow specifying some of the command-line arguments
            when starting the expansion service, which currently
            accepts only port on command line, see [1]. The
            straightforward 'fix' (see [2]) unfortunately does not
            work, because it requires DirectRunner to be on the
            classpath, which then breaks other runners (see [3]).
            It seems possible to copy hand selected options from
            command line to the Pipeline, but that feels hackish.
            It would require to either be able to construct the
            Pipeline without a runner specified (which seems
            possible when calling Pipeline.create(), but not when
            using PipelineOptions create by parsing command-line
            arguments) or to be able to create a Map<String,
            String> from PIpelineOptions and then the ability to
            copy all options into the Pipeline's options.
            >>>>>>>>>>>>>>>>>
            >>>>>>>>>>>>>>>>> My proposal would be to create a
            hackish shortcut and just copy the
            --defaultEnvironmentType, --defaultEnvironmentConfig
            and --experiments into Pipeline's options for now, and
            create an issue for a proper solution (possible a)?).
            >>>>>>>>>>>>>>>>>
            >>>>>>>>>>>>>>>>> WDYT? Or did I miss a way to override
            the default expansion?
            >>>>>>>>>>>>>>>>>
            >>>>>>>>>>>>>>>>> Thanks for comments,
            >>>>>>>>>>>>>>>>>
            >>>>>>>>>>>>>>>>>    Jan
            >>>>>>>>>>>>>>>>>
            >>>>>>>>>>>>>>>>> [1]
            
https://github.com/apache/beam/blob/22205ee1a84581e9206c5c61bad88a799779b4bc/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L511
            
<https://github.com/apache/beam/blob/22205ee1a84581e9206c5c61bad88a799779b4bc/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L511>
            >>>>>>>>>>>>>>>>>
            >>>>>>>>>>>>>>>>> [2]
            https://github.com/apache/beam/pull/15082
            <https://github.com/apache/beam/pull/15082>
            >>>>>>>>>>>>>>>>>
            >>>>>>>>>>>>>>>>> [3]
            https://ci-beam.apache.org/job/beam_PreCommit_Java_Commit/18169/
            <https://ci-beam.apache.org/job/beam_PreCommit_Java_Commit/18169/>
            >>>>>>>>>>>>>>>>>

Reply via email to