> java -jar beam-sdks-java-io-expansion-service-2.30.0.jar <port>

This does not accept any other parameters than the port. That is the first part of this thread - the intent was to enable this to accept additional arguments, but there are (still waiting to be addressed unresolved) issues. There currently even seems to be no other way to adapt ExpansionService than to copy&paste the code and modify it, because it simply is not extensible. What would be enough is wrapping Pipeline.create() [1] call to a protected method, or add (protected) constructor that would accept PipelineOptions (probably better in this regard). That would make it more easy for users to create customized ExpansionService and it would (sort of) help solving described issues.

But even if we do that, we still need to deal with the expansion service on two places:

 a) run it (and stop it)

 b) specify it in the

Using the default expansion service is much, much easier, it is started and stopped automatically for the user. Morever, the JavaJarExpansionService actually even presumes that there can be additional arguments passed to the service ([2]), the ExpansionService only does not accept them (and kafka IO does not expose that - that could be worked-around by users by manually creating the JavaJarExpansionService from own jar, yes). I would find it natural to add the command-line parsing (somehow!) to the ExpansionService itself, so that it doesn't need end-user modifications and then to figure out how to most easily expose there command-line arguments to end-users.

> Or PROCESS mode.

Yes, I verified that Flink can use Python Kafka IO over PROCESS environment with some hacking of the ExpansionService as shown in one of the linked PRs (though there is probably still some bugs regarding SDF - [3]). Adding --experiments seems have the same issues, need expose that to the CLI of ExpansionService. And I'm not sure if this [4] is not in conflict with --experiments=use_deprecated_read. That is something I still need to investigate.

LOOPBACK is currently not supported by Flink. That is nice-to-have feature.

 Jan

[1] https://github.com/apache/beam/blob/f9a4bfcb027f2e3a8e32578adf49981aeef3586a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L394

[2] https://github.com/apache/beam/blob/f9a4bfcb027f2e3a8e32578adf49981aeef3586a/sdks/python/apache_beam/transforms/external.py#L481

[3] https://issues.apache.org/jira/browse/BEAM-11998

[4] https://github.com/apache/beam/blob/b86fcf94af26a240777f30f8193a314cb7ffc87e/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L398

On 6/30/21 3:57 PM, Chamikara Jayalath wrote:


On Wed, Jun 30, 2021 at 6:54 AM Chamikara Jayalath <chamik...@google.com <mailto:chamik...@google.com>> wrote:



    On Wed, Jun 30, 2021 at 1:20 AM Jan Lukavský <je...@seznam.cz
    <mailto:je...@seznam.cz>> wrote:

        On 6/30/21 1:16 AM, Robert Bradshaw wrote:
        > <rant>Why doesn't docker in docker just work, rather than
        having to do
        > ugly hacks when composing two technologies that both rely on
        > docker...</rant>
        >
        > Presumably you're setting up a node for Kafka and Flink; why
        not set
        > one up for the expansion service as well? The UX of
        >
        >
        ReadFromKafka(default_expansion_service_args={"defaultEnvironmentType":
        > "PROCESS", "defaultEnvironmentConfig": ""{\"os\": \"linux\",
        \"arch\":
        > \"amd64\", \"command\": \"/path/to/launcher/boot
        > cp=/some/other/long/path\" ...}")"})
        >
        > isn't that great either. Rather than pass arbitrary
        arguments to a
        > default expansion service, I still think once you get to
        this level
        > it's better to just start your own expansion service.

        Sure, that is possible (seems to me, that it would still
        require some
        changes to ExpansionService to be extendable, but yes, kind of
        tiny
        changes). The problem is not with Flink or Kafka - those are
        technologies you are actually expecting to set up, because you
        want to
        use them. The problem is what everything else you must set up
        for making
        something that seems as easy as "read a few messages from
        kafka in beam
        python" to work. You must have:

          a) Python SDK harness (OK, that is something that should be
        probably
        expected) - there are few problems with it, namely it is somewhat
        hardcoded that it must run in the same pod as Flink's
        taskmanager to be
        able to use EXTERNAL environment, but ok, let's go on

          b) Java SDK harness, at least installed in docker image of
        taskmanager
        (to be usable via PROCESS environment) - OK, that starts to be
        weird,
        taskmanager is java, right? Something like LOOPBACK would be
        cool there,
        but never mind. You create custom docker image for your Flink
        JM and TM
        and continue.

          c) Implement (extend) and deploy own expansion service -
        ouch, that
        starts to hurt, that is even going to be a pod that is running
        even
        though there is nothing using it (yes, can be scaled down).

        The complexity of a simple task starts to be somewhat
        extraordinary. And
        most of the users will not be willing to follow this path, I'm
        afraid.
        People generally don't like to set up complex environment for
        something
        that looks it should "just work".  There is non-trivial work
        necessary
        to make all of this working, mostly when you are starting to
        evaluate
        Beam and don't have much experience with it.


    I don't think we should expect end-users to implement or extend
    the expansion service. Everything should be already implemented
    and maybe we can even provide a script to easily startup a local
    Java expansion service with additional parameters.

    Today, to start a Java expansion service for Kafka users have to
    do the following.

    * Download expansion service jar released with Beam for Kafka. For
    example [1]

    * Run following command:
    java -jar beam-sdks-java-io-expansion-service-2.30.0.jar <port>

    * To use this they just have to provide "localhost:<port>" to [2].

    This is a few extra steps but mostly a one time setup for the user
    and nothing to do with portability or other complexities of Beam.

    I'm all for simplifying the user-experience, but adding changes to
    the transform API that might have to be deprecated later sounds
    like a bad idea. I'd much rather provide additional
    scripts/documentation/examples to simplify such use-cases. I think
    that will be adequate for most users.

    BTW, slightly orthogonal, I don't think multi-language would work
    in LOOPBACK mode today without additional changes to portable
    runners (at least I've never tested this). Did you confirm that
    this works ?


Or PROCESS mode.


    Thanks,
    Cham

    [1]
    
https://repo1.maven.org/maven2/org/apache/beam/beam-sdks-java-io-expansion-service/2.30.0/beam-sdks-java-io-expansion-service-2.30.0.jar
    
<https://repo1.maven.org/maven2/org/apache/beam/beam-sdks-java-io-expansion-service/2.30.0/beam-sdks-java-io-expansion-service-2.30.0.jar>
    [2]
    
https://github.com/apache/beam/blob/b86fcf94af26a240777f30f8193a314cb7ffc87e/sdks/python/apache_beam/io/kafka.py#L133
    
<https://github.com/apache/beam/blob/b86fcf94af26a240777f30f8193a314cb7ffc87e/sdks/python/apache_beam/io/kafka.py#L133>


        We can get rid of b) (implement LOOPBACK in Flink) and c)
        (enable Python
        SDK Kafka IO to spawn expansion service with the LOOPBACK
        environment
        when submitting to Flink). That is why I still think that this
        simplification matters a lot.

        >
        > On Tue, Jun 29, 2021 at 3:33 PM Jan Lukavský
        <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
        >> I believe we could change that more or less the same as we
        can deprecate / stop supporting any other parameter of any
        method. If python starts to support natively Kafka IO, then we
        can simply log warning / raise exception (one after the
        other). That seems like natural development.
        >>
        >> Maybe I should have described the case - I'm trying to
        setup a "simple" use-case for users that want to try Python
        SDK to read using Flink from Kafka using Minikube (both Kafka
        and Flink are running inside Minikube). There are tons of
        problems to use docker from within Minkube and I would not say
        that is the "simple" way we would like to present to users.
        Setting up own expansion service is possibility - but that
        also lacks the UX approach. I pretty much think that
        understanding portability on it's own is already a burden we
        put on users (yes, we do that for a reason, but everything
        else should be as simple as possible).
        >>
        >> On 6/30/21 12:16 AM, Chamikara Jayalath wrote:
        >>
        >> So I think one downside to this PR is that we assume that
        the default expansion service used by the transform (Kafka in
        this case) will not change. Currently it's fully opaque. In
        the default case we just promise that the transform will work
        (if conditions I mentioned above are met). Nothing else.
        >> If we add a "param default_expansion_service_args", we leak
        the nature of the default expansion service to the API and it
        will be hard to change it in the future.
        >>
        >> Thanks,
        >> Cham
        >>
        >> On Tue, Jun 29, 2021 at 3:07 PM Jan Lukavský
        <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
        >>> I would absolutely understand this, if it would be mostly
        impossible or at least really hard to get the user friendly
        behavior. But we are mostly there in this case. When we can
        actually quite simply pass the supported environment via
        parameter, I think we should go for it.
        >>>
        >>> I have created a sketch (I verified that when the
        ExpansionService is patched 'enough' this works) in [1]. This
        is only a sketch, because we first must know how to support
        the default execution environment in ExpansionService.
        >>>
        >>> [1] https://github.com/apache/beam/pull/15099/files
        <https://github.com/apache/beam/pull/15099/files>
        >>>
        >>> On 6/29/21 11:51 PM, Chamikara Jayalath wrote:
        >>>
        >>>
        >>>
        >>> On Tue, Jun 29, 2021 at 2:39 PM Jan Lukavský
        <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
        >>>> On 6/29/21 11:04 PM, Robert Bradshaw wrote:
        >>>>> You can configure the environment in the current state,
        you just have
        >>>>> to run your own expansion service that has a different
        environment
        >>>>> backed into it (or, makes this configurable).
        >>>> Yes, that is true. On the other hand that lacks some
        user-friendliness,
        >>>> because ideally, you don't want to worry about expansion
        services,
        >>>> mostly when it comes to some mostly standard IO. The
        ideal case is that
        >>>> you either do not basically know that you use external
        transform (which
        >>>> is probably the case when you can use docker), or you are
        able to
        >>>> overcome the problem within the SDK (Python) by passing
        some argument to
        >>>> the input transform.
        >>>
        >>> Arguments passed to the pipeline level apply to the whole
        pipeline (not just one transform). So if you pass in a default
        environment (and configs) at pipeline level, that would mean
        the default environment and configs used by the pipeline (so
        Python SDK in this case) not a specific transform.
        >>> I believe we have made usage of external transforms
        used-friendly for the general case. But we had to make some
        assumptions. For example we assumed,
        >>> * user will be using the default environment of the
        expansion service (Docker in this case)
        >>> * User will be using the pre-specified dependency only
        (sdks:java:io:expansion-service:shadowJar for Kafka)
        >>> * User will be in an environment where the jar can be
        downloaded.
        >>>
        >>> I would consider any use-case where these basic
        assumptions cannot be met as an advanced use-case. The
        solution in such a case would be to start a custom expansion
        service and pass the address of it as a parameter to the
        transform [1]. I'm fine with extending the capabilities of
        Java expansion service by adding more parameters (for example,
        for overriding the environment, for specifying dependencies,
        for providing pipeline options).
        >>>
        >>> Thanks,
        >>> Cham
        >>>
        >>> [1]
        
https://github.com/apache/beam/blob/b86fcf94af26a240777f30f8193a314cb7ffc87e/sdks/python/apache_beam/io/kafka.py#L133
        
<https://github.com/apache/beam/blob/b86fcf94af26a240777f30f8193a314cb7ffc87e/sdks/python/apache_beam/io/kafka.py#L133>
        >>>
        >>>
        >>>>> Is option (1) updating the default expansion service
        such that one can
        >>>>> override default environment properties on the command
        line? (You
        >>>>> would still have to start it up manually to use it.)
        >>>> Yes and no. :) Updating ExpansionService so that you can
        specify default
        >>>> environment on command like makes this accessible to
        >>>> JavaJarExpansionService, and that makes it possible to
        add (optional)
        >>>> argument to Python Kafka IO, that would delegate this to the
        >>>> (automatically) started expansion service. It is
        important to note that
        >>>> both ReadFromKafka and WriteToKafka have expansion that
        involves only
        >>>> single external (Java) SDK. That simplifies things.
        >>>>> Maybe it would help to make things more concrete.
        Suppose I have a Go
        >>>>> pipeline that uses a library which invokes a Python
        external transform
        >>>>> to do ML (say, via TFX), and two Java IOs (which happen
        to have
        >>>>> mutually exclusive dependencies). The ML transform
        itself uses Java to
        >>>>> invoke some SQL.
        >>>>>
        >>>>> The way things work currently is each external transform
        will have an
        >>>>> associated fully specified environment and a runner can
        use docker to
        >>>>> start up the required workers at the expected time.
        >>>>>
        >>>>> Now, suppose one doesn't have docker on the workers. One
        wants to run this with
        >>>>>
        >>>>>       ./my_pipeline --someFlag=someValue
        --someOtherFlag=someOtherValue ...
        >>>>>
        >>>>> such that docker is no longer needed. What someFlags
        would we need,
        >>>>> and what would their values be? (And how to make this
        feasible to
        >>>>> implement.)
        >>>>>
        >>>>> Are there meaningful intermediate points that extend to
        a general
        >>>>> solution (or at least aren't hostile to it)?
        >>>> I believe that in the option 2) the best way would to use
        each SDK's URN
        >>>> Then the arguments could be something like
        >>>>
        
"--expansionEnvironments={"apache:beam:go:2.33.0:latest"={"env"="DOCKER",
        >>>> config="<image>"},
        "apache:beam:python:2.33.0:latest"={env="PROCESS",
        >>>> config={...}}". Yes, it would require a lot of "syntactic
        sugar" to
        >>>> configure that. :) (sorry if I don't have URNs for SDKs
        100% correct)
        >>>>>
        >>>>> I still think in the long run having runners understand
        environments,
        >>>>> and saying "oh, whenever I see
        'apache:beam:java:2.33.0:latest' I'll
        >>>>> swap that out for 'path/to/my/java -cp ...' is the right
        way to go
        >>>>> long-term. (I would put this in runners, not SDKs,
        though a common
        >>>>> runners library could be used.)
        >>>> Yes, I also agree, that expansion service should be
        runner-dependent (or
        >>>> at least runner-aware), as that brings optimizations.
        Runner could
        >>>> ignore settings from previous point when it can be *sure*
        it can do so.
        >>>>>
        >>>>> On Tue, Jun 29, 2021 at 1:29 PM Jan Lukavský
        <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
        >>>>>> Thanks for pointing to that thread.
        >>>>>>
        >>>>>> 1) I'm - as well as Kyle - fine with the approach that
        we use a
        >>>>>> "preferred environment" for the expansion service. We
        only need to pass
        >>>>>> it via command line. Yes, the command line might be
        generally
        >>>>>> SDK-dependent, and that makes it expansion dependent,
        because whether or
        >>>>>> not particular transform is "external" or not is
        implementation detail.
        >>>>>> That is the nasty part. The rest of my original
        question is about, how
        >>>>>> exactly to do that, because it seems to be tricky, due
        to the fact, that
        >>>>>> it is not possible to include runtime dependency on
        DirectRunner (fails
        >>>>>> many, many tests) and it is not possible to extract
        PipelineOptions as a
        >>>>>> Map either.
        >>>>>>
        >>>>>> 2) Regarding SDK injecting environment, I still think
        that is the
        >>>>>> correct way. The SDK (the driver code) own the
        execution environment. It
        >>>>>> should be able to define (or at least prioritize)
        runtime environments
        >>>>>> of all transforms. If we cannot know in advance, which
        transform is
        >>>>>> going to expand to how many nested (and possibly
        external) transforms, I
        >>>>>> think that the SDK could be fine with providing a
        Map(SDK ->
        >>>>>> environment). That is: "Run Java using PROCESS", "Run
        Python using
        >>>>>> DOCKER", and so on. A default mapping might exist on
        the expansion
        >>>>>> service as well (which might be passed through command
        line and that is
        >>>>>> the point 1)). Yes, the Map approach is definitely not
        universal,
        >>>>>> because one can imagine that the SDK itself is not
        enough for specifying
        >>>>>> the environment, but seems that vast majority of cases
        would fit into that.
        >>>>>>
        >>>>>> 3) The best might be for the SDK to provide a list of
        supported
        >>>>>> environments with additional metrics which the
        expansion service might
        >>>>>> choose from.
        >>>>>>
        >>>>>> These three approaches are all extensions to the
        current state. Current
        >>>>>> state has predefined environment without possibility to
        change it.
        >>>>>> Option 1) changes it to single configurable
        environment, option 2) to N
        >>>>>> environments based on SDK and option 3) to M
        environments based on
        >>>>>> SDK-dependent metrics (and/or capabilitites of
        particular environment).
        >>>>>> Seems like gradual extensions of the current state, so
        maybe we can
        >>>>>> focus on the first one, and maybe add other, when there
        is a need?
        >>>>>>
        >>>>>> If this could be the first conclusion, then the next
        one would be, what
        >>>>>> should be the preferred way to implement it.
        >>>>>>
        >>>>>> WDYT?
        >>>>>>
        >>>>>> On 6/29/21 9:15 PM, Robert Bradshaw wrote:
        >>>>>>> +1, thanks for digging up that thread.
        >>>>>>>
        >>>>>>> I am still of the same opinion that I wrote there. To
        touch on some
        >>>>>>> things brought up here, copying something like
        >>>>>>> defaultEnvironmentConfig doesn't make sense from
        language to language
        >>>>>>> (e.g. the docker image name or CLI arguments for
        subprocess mode just
        >>>>>>> isn't going to work for all of Python, Java, and Go,
        and clearly
        >>>>>>> embedded type is only going to work for one.)
        >>>>>>>
        >>>>>>> In the short term, to change environment (or anything
        else) about the
        >>>>>>> "default" expansions service, the thing to do is build
        and start your
        >>>>>>> own expansion service that sets up the environment for
        its transforms
        >>>>>>> in a custom way.
        >>>>>>>
        >>>>>>> FYI, in Python, one can use --beam_services to use a
        custom expansion
        >>>>>>> service. E.g.
        >>>>>>>
        >>>>>>>
        
--beam_services='{":sdks:java:extensions:sql:expansion-service:shadowJar":
        >>>>>>> "localhost:port"}'
        >>>>>>>
        >>>>>>> would override the default one when using SqlTransform.
        >>>>>>>
        >>>>>>> On Tue, Jun 29, 2021 at 11:47 AM Kyle Weaver
        <kcwea...@google.com <mailto:kcwea...@google.com>> wrote:
        >>>>>>>> For context, there was a previous thread which
        touched on many of the same points:
        
https://lists.apache.org/thread.html/r6f6fc207ed62e1bf2a1d41deeeab554e35cd2af389ce38289a303cea%40%3Cdev.beam.apache.org%3E
        
<https://lists.apache.org/thread.html/r6f6fc207ed62e1bf2a1d41deeeab554e35cd2af389ce38289a303cea%40%3Cdev.beam.apache.org%3E>
        >>>>>>>>
        >>>>>>>> On Tue, Jun 29, 2021 at 11:16 AM Jan Lukavský
        <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
        >>>>>>>>> I would slightly disagree that this breaks the black
        box nature of the expansion, the "how the transform expands"
        remains unknown to the SDK requesting the expansion, the "how
        the transform executes" - on the other hand - is something
        that the SDK must cooperate on - it knows (or could or should
        know) what is the environment that the pipeline is going to be
        executed on looks like. That is why expansion service on its
        own cannot correctly define the execution environment. It
        could, if it would be bound to runner (and its environemnt) -
        for instance FlinkRunnerExpansionService could probably expand
        KafkaIO to something more 'native'. But that requires
        knowledge of the target runner. If the expansion service is
        not dedicated to a runner, the only place where it can be
        defined, is the SDK - and therefore the expansion request.
        >>>>>>>>>
        >>>>>>>>>> Power users can always modify the output produced
        by the expansion service as well.
        >>>>>>>>> I'm not sure if I follow this, do you mean that
        power users, who run the expansion service can modify the
        output? Or is the output (protobuf) of the expansion service
        easily transferable between different execution environments?-
        I had the impression, that execution environments do not
        necessarily have to have the same payloads associated with
        them, and therefore it is impossible to 'postprocess' the
        output of the expansion. Is that wrong assumption?
        >>>>>>>>>
        >>>>>>>>> On 6/29/21 7:55 PM, Luke Cwik wrote:
        >>>>>>>>>
        >>>>>>>>> This would "break" the black box where the expansion
        service is supposed to hide the implementation internals from
        the caller and pushes compatibility of these kinds of
        environment overrides on to the expansion service and its
        implementer.
        >>>>>>>>>
        >>>>>>>>> Power users can always modify the output produced by
        the expansion service as well.
        >>>>>>>>>
        >>>>>>>>> On Tue, Jun 29, 2021 at 10:08 AM Jan Lukavský
        <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
        >>>>>>>>>> The argument for being able to accept (possibly
        ordered list of) execution environments is in that this could
        make a single instance of execution service reusable by
        various clients with different requirements. Moreover, the two
        approaches are probably orthogonal - users could specify
        'defaultExecutionEnvironment' for the service which could be
        used in case when there is no preference given by the client.
        >>>>>>>>>>
        >>>>>>>>>> On 6/29/21 7:03 PM, Luke Cwik wrote:
        >>>>>>>>>>
        >>>>>>>>>> I would be much more inclined for the user being
        able to configure the expansion service for their needs
        instead of changing the expansion service API.
        >>>>>>>>>>
        >>>>>>>>>> On Tue, Jun 29, 2021 at 9:42 AM Jan Lukavský
        <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
        >>>>>>>>>>> If I understand it correctly, there is currently
        no place to set the
        >>>>>>>>>>> defaultEnvironmentType - python's KafkaIO uses either
        >>>>>>>>>>> 'expansion_service' given by the user (which might
        be a host:port, or an
        >>>>>>>>>>> object that has appropriate method), or calls
        >>>>>>>>>>> 'default_io_expansion_service' - which in turn
        runs ExpansionService
        >>>>>>>>>>> using gradle. Either way, it ends up in
        ExpansionService#main [1]. It
        >>>>>>>>>>> could be possible to adapt ExpansionService and
        call it locally -
        >>>>>>>>>>> provided ExpansionService would provide a way to
        extend it (using
        >>>>>>>>>>> protected method createPipeline()) seems to be
        enough - but that is not
        >>>>>>>>>>> too much user-friendly. If we could specify the
        defaultEnvironmentConfig
        >>>>>>>>>>> when starting the ExpansionService, it would be
        possible to add these
        >>>>>>>>>>> parameters in the python SDK's KafkaIO, which
        would mean users do not
        >>>>>>>>>>> have to worry about the expansion service at all
        (leaving aside that
        >>>>>>>>>>> using too many ReafFromKafka or WriteToKafka
        transforms would somewhat
        >>>>>>>>>>> hurt performance during pipeline build, but that
        applies to the pipeline
        >>>>>>>>>>> build time only). I have created [2] to track that.
        >>>>>>>>>>>
        >>>>>>>>>>> Does that make sense, or is my analysis incorrect?
        >>>>>>>>>>>
        >>>>>>>>>>>      Jan
        >>>>>>>>>>>
        >>>>>>>>>>> [1]
        >>>>>>>>>>>
        
https://github.com/apache/beam/blob/22205ee1a84581e9206c5c61bad88a799779b4bc/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L511
        
<https://github.com/apache/beam/blob/22205ee1a84581e9206c5c61bad88a799779b4bc/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L511>
        >>>>>>>>>>>
        >>>>>>>>>>> [2]
        https://issues.apache.org/jira/browse/BEAM-12539
        <https://issues.apache.org/jira/browse/BEAM-12539>
        >>>>>>>>>>>
        >>>>>>>>>>>
        >>>>>>>>>>> On 6/29/21 6:24 PM, Alexey Romanenko wrote:
        >>>>>>>>>>>> I’m sorry if I missed something but do you mean
        that PortablePipelineOptions.setDefaultEnvironmentType(String)
        doesn’t work for you? Or it’s only a specific case while using
        portable KafkaIO?
        >>>>>>>>>>>>
        >>>>>>>>>>>>> On 29 Jun 2021, at 09:51, Jan Lukavský
        <x666je...@gmail.com <mailto:x666je...@gmail.com>> wrote:
        >>>>>>>>>>>>>
        >>>>>>>>>>>>> Hi,
        >>>>>>>>>>>>>
        >>>>>>>>>>>>> I have come across an issue with cross-language
        transforms. My setup is I have working environment type
        PROCESS and I cannot use DOCKER. When I use Python's KafkaIO,
        it unfortunately - by default - expands to docker environment,
        which then fails due to missing 'docker' command. I didn't
        find a solution without tackling the expansion service, yet.
        >>>>>>>>>>>>>
        >>>>>>>>>>>>> I see several possible solutions to that:
        >>>>>>>>>>>>>
        >>>>>>>>>>>>>     a) I would say, that the cleanest solution
        would be to add preferred environment type to the expansion
        request to the expansion service (probably along with
        additional flags, probably --experiments?). This requires
        deeper changes to the expansion RPC defintion, probably
        serializing the PipelineOptions from the client environment
        into the ExpansionRequest.
        >>>>>>>>>>>>>
        >>>>>>>>>>>>>     b) Another option would be to allow
        specifying some of the command-line arguments when starting
        the expansion service, which currently accepts only port on
        command line, see [1]. The straightforward 'fix' (see [2])
        unfortunately does not work, because it requires DirectRunner
        to be on the classpath, which then breaks other runners (see
        [3]). It seems possible to copy hand selected options from
        command line to the Pipeline, but that feels hackish. It would
        require to either be able to construct the Pipeline without a
        runner specified (which seems possible when calling
        Pipeline.create(), but not when using PipelineOptions create
        by parsing command-line arguments) or to be able to create a
        Map<String, String> from PIpelineOptions and then the ability
        to copy all options into the Pipeline's options.
        >>>>>>>>>>>>>
        >>>>>>>>>>>>> My proposal would be to create a hackish
        shortcut and just copy the --defaultEnvironmentType,
        --defaultEnvironmentConfig and --experiments into Pipeline's
        options for now, and create an issue for a proper solution
        (possible a)?).
        >>>>>>>>>>>>>
        >>>>>>>>>>>>> WDYT? Or did I miss a way to override the
        default expansion?
        >>>>>>>>>>>>>
        >>>>>>>>>>>>> Thanks for comments,
        >>>>>>>>>>>>>
        >>>>>>>>>>>>>     Jan
        >>>>>>>>>>>>>
        >>>>>>>>>>>>> [1]
        
https://github.com/apache/beam/blob/22205ee1a84581e9206c5c61bad88a799779b4bc/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L511
        
<https://github.com/apache/beam/blob/22205ee1a84581e9206c5c61bad88a799779b4bc/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L511>
        >>>>>>>>>>>>>
        >>>>>>>>>>>>> [2] https://github.com/apache/beam/pull/15082
        <https://github.com/apache/beam/pull/15082>
        >>>>>>>>>>>>>
        >>>>>>>>>>>>> [3]
        https://ci-beam.apache.org/job/beam_PreCommit_Java_Commit/18169/
        <https://ci-beam.apache.org/job/beam_PreCommit_Java_Commit/18169/>
        >>>>>>>>>>>>>

Reply via email to