Hi,

+1 for empty string being interpreted as [auto] and anything else having explicit notation.

One more reason that was not part of this discussion yet. In [1] there was a discussion about LocalEnvironment (that is the one that is responsible for spawning in process Flink cluster) not using context classloader and thus can fail loading some user code (if this code was added to context classloader *after* application has been run). LocalEnvironment on the other hand supposes that all classes can be loaded by applicaiton's classloader and doesn't accept any "client jars". Therefore - when application generates classes dynamically during runtime it is currently impossible to run those using local flink runner. There is a nasty hack for JDK <= 8 (injecting URL into applications URLClassLoader), but that fails hard on JDK >= 9 (obviously).

The conclusion from that thread is that it could be solved by manually running MiniCluster (which will run on localhost:8081 by default) and then use this REST address for RemoteEnvironment so that the application would be actually submitted as if it would be run on remote cluster and therefore a dynamically generated JAR can be attached to it.

That would mean, that we can actually have two "local" modes - one using LocalEnvironment and one with manual MiniCluster + RemoteEnvironment (if for whatever reason we would like to keep both mode of local operation). That could mean two masters - e.g. [local] and [local-over-remote] or something like that.

Jan

[1] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ClassLoader-created-by-BlobLibraryCacheManager-is-not-using-context-classloader-tp32434.html

On 10/29/19 5:50 AM, Thomas Weise wrote:
The current semantics of flink_master are tied to the Flink Java API. The Flink client / Java API isn't a "REST API". It now uses the REST API somewhere deep in RemoteEnvironment when the flink_master value is host:port, but it does a lot of other things as well, such are parsing config files and running local clusters.

A rest client to me is a thin wrapper around https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html

Different ways used in Beam to submit jobs to Flink:

- classic Flink runner, using Flink Java client
- job server, using Flink Java client
- generated jar, using Flink Java client
- Flink CLI with generated jar, using Flink Java client
- Python "FlinkRunner" using *REST API*

Since the last item does not submit through the Flink Java client, there is a problem with the flink_master pipeline option. Though we cannot change the option in a way that breaks existing setups. Like Robert suggests, we could allow for optional URL syntax so that it can be used with the REST API, in which case the code path that goes through the Java client will have to disassemble such URL before handing it to Flink.

I would find it acceptable to interpret absence of the option as "[auto]", which really means use CLI or REST API context when present, or local. I would prefer to not have an empty string value default (but rather None/null) and no additional magic values.

Thomas


On Mon, Oct 28, 2019 at 4:16 PM Kyle Weaver <kcwea...@google.com <mailto:kcwea...@google.com>> wrote:

    Filed https://issues.apache.org/jira/browse/BEAM-8507 for the
    issue I mentioned.

    On Mon, Oct 28, 2019 at 4:12 PM Kyle Weaver <kcwea...@google.com
    <mailto:kcwea...@google.com>> wrote:

        > I'd like to see this issue resolved before 2.17 as changing
        the public API once it's released will be harder.

        +1. In particular, I misunderstood that [auto] is not
        supported by `FlinkUberJarJobServer`. Since [auto] is now the
        default, it's broken for Python 3.6+.

        requests.exceptions.InvalidURL: Failed to parse: [auto]/v1/config

        We definitely should fix that, if nothing else.

        > One concern with this is that just supplying host:port is
        the existing
        > behavior, so we can't start requiring the http://.

        The user shouldn't have to specify a protocol for Python, I
        think it's preferable and reasonable to handle that for them
        in order to maintain existing behavior and align with Java SDK.

        > 2. Deprecate the "[auto]" and "[local]" values. It should be
        sufficient
        > to have either a non-empty address string or an empty one.
        The empty
        > string would either mean local execution or, in the context
        of the Flink
        > CLI tool, loading the master address from the config. The
        non-empty
        > string would be interpreted as a cluster address.

        Looks like we also have a [collection] configuration value [1].

        If we're using [auto] as the default, I don't think this
        really makes so much of a difference (as long as we're
        supporting and documenting these properly, of course). I'm not
        sure there's a compelling reason to change this?

        > always run locally (the least surprising to me

        I agree a local cluster should remain the default, whether
        that is achieved through [local] or [auto] or some new
        mechanism such as the above.

        [1]
        
https://github.com/apache/beam/blob/2f1b56ccc506054e40afe4793a8b556e872e1865/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L80

        On Mon, Oct 28, 2019 at 6:35 AM Maximilian Michels
        <m...@apache.org <mailto:m...@apache.org>> wrote:

            Hi,

            Robert and Kyle have been doing great work to simplify
            submitting
            portable pipelines with the Flink Runner. Part of this is
            having a
            Python "FlinkRunner" which handles bringing up a Beam job
            server and
            submitting the pipeline directly via the Flink REST API.
            One building
            block is the creation of "executable Jars" which contain the
            materialized / translated Flink pipeline and do not
            require the Beam job
            server or the Python driver anymore.

            While unifying a newly introduced option
            "flink_master_url" with the
            pre-existing "flink_master" [1][2], some questions came up
            about Flink's
            execution modes. (The two options are meant to do the same
            thing:
            provide the address of the Flink master to hand-over the
            translated
            pipeline.)

            Historically, Flink had a proprietary protocol for
            submitting pipelines,
            running on port 9091. This has since been replaced with a
            REST protocol
            at port 8081. To this date, this has implications how you
            submit
            programs, e.g. the Flink client libraries expects the
            address to be of
            form "host:port", without a protocol scheme. On the other
            hand, external
            Rest libraries typically expect a protocol scheme.

            But this is only half of the fun. There are also special
            addresses for
            "flink_master" that influence submission of the pipeline.
            If you specify
            "[local]" as the address, the pipeline won't be submitted
            but executed
            in a local in-process Flink cluster. If you specify
            "[auto]" and you use
            the CLI tool that comes bundled with Flink, then the
            master address will
            be loaded from the Flink config, including any
            configuration like SSL.
            If none is found, then it falls back to "[local]".

            This is a bit odd, and after a discussion with Robert and
            Thomas in [1],
            we figured that this needs to be changed:

            1. Make the master address a URL. Add "http://"; to
            "flink_master" in
            Python if no scheme is specified. Similarly, remove any
            "http://"; in
            Java, since the Java rest client does not expect a scheme.
            In case of
            "http_s_://", we have a special treatment to load the SSL
            settings from
            the Flink config.

            2. Deprecate the "[auto]" and "[local]" values. It should
            be sufficient
            to have either a non-empty address string or an empty one.
            The empty
            string would either mean local execution or, in the
            context of the Flink
            CLI tool, loading the master address from the config. The
            non-empty
            string would be interpreted as a cluster address.


            Any opinions on this?


            Thanks,
            Max


            [1] https://github.com/apache/beam/pull/9803
            [2] https://github.com/apache/beam/pull/9844

Reply via email to