Thanks for clarifying. So when I run "./flink my_pipeline.jar"  or
upload the jar via the REST API (and its main method invoked on the
master) then [auto] reads the config and does the right thing, but if
I do java my_pipeline.jar it'll run locally.

Correct.

Python needs to know even whether to start up the loopback server, and
provide the address when submitting the pipeline.

I was thinking, it could do this anyway and tear down that server if the Runner does not need it. Clearly not the ideal solution.

If I understood
correctly above, the only time that the job server interprets [auto]
as something other than [local] is when creating the jar for later
submission. (In this case the flink master isn't even used, other than
being baked into the jar, right?

Just to clarify, we are talking about Beam's UberJar job server here. Yes, 'flink_master' will be set to '[auto]' in the case of outputting an uber jar. Everything else would result in a jar that you wouldn't run on a Flink cluster.

The reason for this is that 'flink_master' is _always_ respected. If it is set to '[local]', it will execute locally. If it is set to '[auto]', it will use the address from the config, or fall back to local execution if there is none. If it is set to an address, it will use that address. The latter will result in a failure if the Flink config has been loaded.

And baking anything in but [auto]
seems wrong...)

That's why we need to override it by '[auto]' in case we're outputting a jar.

So it seems we could guard using LOOPBACK it on this
flag + [local] or [auto].

Only problem being that this flag does not yet exist :) We can probably infer from the context if this is a one-shot jar creation or it is going to be reused. In the former case we can default to LOOPBACK.

On 30.10.19 23:52, Robert Bradshaw wrote:
On Wed, Oct 30, 2019 at 3:34 PM Maximilian Michels <m...@apache.org> wrote:

One thing I don't understand is what it means for "CLI or REST API
context [to be] present." Where does this context come from? A config
file in a standard location on the user's machine? Or is this
something that is only present when a user uploads a jar and then
Flink runs it in a specific context? Or both?

When you upload a Jar to Flink, it can be run by the Flink master.
Running a Jar on the job manager will just invoke the main method. The
same happens when you use the Flink CLI tool, the only difference being
that the Jar runs on the user machine vs on the Flink master. In both
cases, the Flink config file will be present in a standard location,
i.e. "<flink_installation>/conf/flink_conf.yaml".

What Flink users typically do, is to call this method to construct a
Flink dataflow (a.k.a. job):
    env = ExecutionEnvironment.getExecutionEnvironment()
    env.addSource().flatMap()
    env.execute()
This registers an environment which holds the Flink dataflow definition.

The Flink Runner also does this when flink_master is set to "[auto]".
When it is set to "[local]", it will attempt to execute the whole
pipeline locally (LocalEnvironment). When an address has been specified
it will submit against a remote Flink cluster (RemoteEnvironment). The
last two use cases do not make any sense for the user when they use the
Python FlinkRunner which uses the jar upload feature of Flink. That's
why "[auto]" is our choice for the FlinkUberJarJobServer feature of the
Python FlinkRunner.

In the case of the FlinkJarJobServer to use "[auto]" can also make sense
because you can even specify a Flink config directory for the Flink job
server to use. Without a config, auto will always fall back to local
execution.

Thanks for clarifying. So when I run "./flink my_pipeline.jar"  or
upload the jar via the REST API (and its main method invoked on the
master) then [auto] reads the config and does the right thing, but if
I do java my_pipeline.jar it'll run locally.

One more question: https://issues.apache.org/jira/browse/BEAM-8396
still seems valuable, but with [auto] as the default, how should we
detect whether LOOPBACK is safe to enable from Python?

Yes, it is valuable. I suspect we only want to enable it for local
execution?

Yes.

We could let the actual Runner handle this by falling back to
the default environment in case it detects that the execution will not
be local. It can simply tell Python then to shutdown the loopback
server, or it shuts itself down after a timeout.

Python needs to know even whether to start up the loopback server, and
provide the address when submitting the pipeline. If I understood
correctly above, the only time that the job server interprets [auto]
as something other than [local] is when creating the jar for later
submission. (In this case the flink master isn't even used, other than
being baked into the jar, right? And baking anything in but [auto]
seems wrong...) So it seems we could guard using LOOPBACK it on this
flag + [local] or [auto].

Another option would
be, to only support it when the mode is set to "[local]".

Well, I'd really like to support it by default...

On 30.10.19 21:05, Robert Bradshaw wrote:
One more question: https://issues.apache.org/jira/browse/BEAM-8396
still seems valuable, but with [auto] as the default, how should we
detect whether LOOPBACK is safe to enable from Python?

On Wed, Oct 30, 2019 at 11:53 AM Robert Bradshaw <rober...@google.com> wrote:

Sounds good to me.

One thing I don't understand is what it means for "CLI or REST API
context [to be] present." Where does this context come from? A config
file in a standard location on the user's machine? Or is this
something that is only present when a user uploads a jar and then
Flink runs it in a specific context? Or both?

On Tue, Oct 29, 2019 at 3:27 AM Maximilian Michels <m...@apache.org> wrote:

tl;dr:

- I see consensus for inferring "http://"; in Python to align it with the
Java behavior which currently requires leaving out the protocol scheme.
Optionally, Java could also accept a scheme which gets removed as
required by the Flink Java Rest client.

- We won't support "https://"; in Python for now, because it requires
additional SSL setup, i.e. parsing the Flink config file and setting up
the truststore

- We want to keep "[auto]"/"[local]" but fix the current broken behavior
via https://issues.apache.org/jira/browse/BEAM-8507


Additional comments below:

One concern with this is that just supplying host:port is the existing
behavior, so we can't start requiring the http://

I wouldn't require it but optionally support it, otherwise add it
automatically.

One question I have is if there are other
authentication parameters that may be required for speaking to a flink
endpoint that we should be aware of (that would normally be buried in
the config file).

Yes, essentially all the SSL configuration is in the config file,
including the location of the truststore, the password, certificates, etc.

For now, I would say we cannot properly support SSL in Python, unless we
find a way to load the truststore from Python.

I do like being explicit with something like [local] rather than
treating the empty string in a magical way.

Fine, we can keep "[local]" and throw an error in case the address is
empty. Let's also throw an error in case the Flink CLI tool is used with
local execution because that is clearly not what the user wants.

I'd like to see this issue resolved before 2.17 as changing the public API once 
it's released will be harder.

+1. In particular, I misunderstood that [auto] is not supported by 
`FlinkUberJarJobServer`. Since [auto] is now the default, it's broken for 
Python 3.6+.

+1 Let's fix this in time for the release.

The user shouldn't have to specify a protocol for Python, I think it's 
preferable and reasonable to handle that for them in order to maintain existing 
behavior and align with Java SDK.

+1

Looks like we also have a [collection] configuration value [1].

Yeah, I think it is acceptable to remove this entirely. This has never
been used by anyone and is an unmaintained Flink feature.

I would find it acceptable to interpret absence of the option as "[auto]", 
which really means use CLI or REST API context when present, or local. I would prefer to 
not have an empty string value default (but rather None/null) and no additional magic 
values.

Let's keep "[auto]" then to keep it explicit. An empty string should
throw an error.


One more reason that was not part of this discussion yet.

@Jan: Supporting context classloaders in local mode is a new feature and
for keeping it simple, I'd start a new thread for it.


On 29.10.19 10:55, Jan Lukavský wrote:
Hi,

+1 for empty string being interpreted as [auto] and anything else having
explicit notation.

One more reason that was not part of this discussion yet. In [1] there
was a discussion about LocalEnvironment (that is the one that is
responsible for spawning in process Flink cluster) not using context
classloader and thus can fail loading some user code (if this code was
added to context classloader *after* application has been run).
LocalEnvironment on the other hand supposes that all classes can be
loaded by applicaiton's classloader and doesn't accept any "client
jars". Therefore - when application generates classes dynamically during
runtime it is currently impossible to run those using local flink
runner. There is a nasty hack for JDK <= 8 (injecting URL into
applications URLClassLoader), but that fails hard on JDK >= 9 (obviously).

The conclusion from that thread is that it could be solved by manually
running MiniCluster (which will run on localhost:8081 by default) and
then use this REST address for RemoteEnvironment so that the application
would be actually submitted as if it would be run on remote cluster and
therefore a dynamically generated JAR can be attached to it.

That would mean, that we can actually have two "local" modes - one using
LocalEnvironment and one with manual MiniCluster + RemoteEnvironment (if
for whatever reason we would like to keep both mode of local operation).
That could mean two masters - e.g. [local] and [local-over-remote] or
something like that.

Jan

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ClassLoader-created-by-BlobLibraryCacheManager-is-not-using-context-classloader-tp32434.html

On 10/29/19 5:50 AM, Thomas Weise wrote:
The current semantics of flink_master are tied to the Flink Java API.
The Flink client / Java API isn't a "REST API". It now uses the REST
API somewhere deep in RemoteEnvironment when the flink_master value is
host:port, but it does a lot of other things as well, such are parsing
config files and running local clusters.

A rest client to me is a thin wrapper around
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html


Different ways used in Beam to submit jobs to Flink:

- classic Flink runner, using Flink Java client
- job server, using Flink Java client
- generated jar, using Flink Java client
- Flink CLI with generated jar, using Flink Java client
- Python "FlinkRunner" using *REST API*

Since the last item does not submit through the Flink Java client,
there is a problem with the flink_master pipeline option. Though we
cannot change the option in a way that breaks existing setups. Like
Robert suggests, we could allow for optional URL syntax so that it can
be used with the REST API, in which case the code path that goes
through the Java client will have to disassemble such URL before
handing it to Flink.

I would find it acceptable to interpret absence of the option as
"[auto]", which really means use CLI or REST API context when present,
or local. I would prefer to not have an empty string value default
(but rather None/null) and no additional magic values.

Thomas


On Mon, Oct 28, 2019 at 4:16 PM Kyle Weaver <kcwea...@google.com
<mailto:kcwea...@google.com>> wrote:

      Filed https://issues.apache.org/jira/browse/BEAM-8507 for the
      issue I mentioned.

      On Mon, Oct 28, 2019 at 4:12 PM Kyle Weaver <kcwea...@google.com
      <mailto:kcwea...@google.com>> wrote:

          > I'd like to see this issue resolved before 2.17 as changing
          the public API once it's released will be harder.

          +1. In particular, I misunderstood that [auto] is not
          supported by `FlinkUberJarJobServer`. Since [auto] is now the
          default, it's broken for Python 3.6+.

          requests.exceptions.InvalidURL: Failed to parse: [auto]/v1/config

          We definitely should fix that, if nothing else.

          > One concern with this is that just supplying host:port is
          the existing
          > behavior, so we can't start requiring the http://.

          The user shouldn't have to specify a protocol for Python, I
          think it's preferable and reasonable to handle that for them
          in order to maintain existing behavior and align with Java SDK.

          > 2. Deprecate the "[auto]" and "[local]" values. It should be
          sufficient
          > to have either a non-empty address string or an empty one.
          The empty
          > string would either mean local execution or, in the context
          of the Flink
          > CLI tool, loading the master address from the config. The
          non-empty
          > string would be interpreted as a cluster address.

          Looks like we also have a [collection] configuration value [1].

          If we're using [auto] as the default, I don't think this
          really makes so much of a difference (as long as we're
          supporting and documenting these properly, of course). I'm not
          sure there's a compelling reason to change this?

          > always run locally (the least surprising to me

          I agree a local cluster should remain the default, whether
          that is achieved through [local] or [auto] or some new
          mechanism such as the above.

          [1]
          
https://github.com/apache/beam/blob/2f1b56ccc506054e40afe4793a8b556e872e1865/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L80

          On Mon, Oct 28, 2019 at 6:35 AM Maximilian Michels
          <m...@apache.org <mailto:m...@apache.org>> wrote:

              Hi,

              Robert and Kyle have been doing great work to simplify
              submitting
              portable pipelines with the Flink Runner. Part of this is
              having a
              Python "FlinkRunner" which handles bringing up a Beam job
              server and
              submitting the pipeline directly via the Flink REST API.
              One building
              block is the creation of "executable Jars" which contain the
              materialized / translated Flink pipeline and do not
              require the Beam job
              server or the Python driver anymore.

              While unifying a newly introduced option
              "flink_master_url" with the
              pre-existing "flink_master" [1][2], some questions came up
              about Flink's
              execution modes. (The two options are meant to do the same
              thing:
              provide the address of the Flink master to hand-over the
              translated
              pipeline.)

              Historically, Flink had a proprietary protocol for
              submitting pipelines,
              running on port 9091. This has since been replaced with a
              REST protocol
              at port 8081. To this date, this has implications how you
              submit
              programs, e.g. the Flink client libraries expects the
              address to be of
              form "host:port", without a protocol scheme. On the other
              hand, external
              Rest libraries typically expect a protocol scheme.

              But this is only half of the fun. There are also special
              addresses for
              "flink_master" that influence submission of the pipeline.
              If you specify
              "[local]" as the address, the pipeline won't be submitted
              but executed
              in a local in-process Flink cluster. If you specify
              "[auto]" and you use
              the CLI tool that comes bundled with Flink, then the
              master address will
              be loaded from the Flink config, including any
              configuration like SSL.
              If none is found, then it falls back to "[local]".

              This is a bit odd, and after a discussion with Robert and
              Thomas in [1],
              we figured that this needs to be changed:

              1. Make the master address a URL. Add "http://"; to
              "flink_master" in
              Python if no scheme is specified. Similarly, remove any
              "http://"; in
              Java, since the Java rest client does not expect a scheme.
              In case of
              "http_s_://", we have a special treatment to load the SSL
              settings from
              the Flink config.

              2. Deprecate the "[auto]" and "[local]" values. It should
              be sufficient
              to have either a non-empty address string or an empty one.
              The empty
              string would either mean local execution or, in the
              context of the Flink
              CLI tool, loading the master address from the config. The
              non-empty
              string would be interpreted as a cluster address.


              Any opinions on this?


              Thanks,
              Max


              [1] https://github.com/apache/beam/pull/9803
              [2] https://github.com/apache/beam/pull/9844

Reply via email to