Hi,
+1 for empty string being interpreted as [auto] and anything else having
explicit notation.
One more reason that was not part of this discussion yet. In [1] there
was a discussion about LocalEnvironment (that is the one that is
responsible for spawning in process Flink cluster) not using context
classloader and thus can fail loading some user code (if this code was
added to context classloader *after* application has been run).
LocalEnvironment on the other hand supposes that all classes can be
loaded by applicaiton's classloader and doesn't accept any "client
jars". Therefore - when application generates classes dynamically during
runtime it is currently impossible to run those using local flink
runner. There is a nasty hack for JDK <= 8 (injecting URL into
applications URLClassLoader), but that fails hard on JDK >= 9 (obviously).
The conclusion from that thread is that it could be solved by manually
running MiniCluster (which will run on localhost:8081 by default) and
then use this REST address for RemoteEnvironment so that the application
would be actually submitted as if it would be run on remote cluster and
therefore a dynamically generated JAR can be attached to it.
That would mean, that we can actually have two "local" modes - one using
LocalEnvironment and one with manual MiniCluster + RemoteEnvironment (if
for whatever reason we would like to keep both mode of local operation).
That could mean two masters - e.g. [local] and [local-over-remote] or
something like that.
Jan
[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ClassLoader-created-by-BlobLibraryCacheManager-is-not-using-context-classloader-tp32434.html
On 10/29/19 5:50 AM, Thomas Weise wrote:
The current semantics of flink_master are tied to the Flink Java API.
The Flink client / Java API isn't a "REST API". It now uses the REST
API somewhere deep in RemoteEnvironment when the flink_master value is
host:port, but it does a lot of other things as well, such are parsing
config files and running local clusters.
A rest client to me is a thin wrapper around
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html
Different ways used in Beam to submit jobs to Flink:
- classic Flink runner, using Flink Java client
- job server, using Flink Java client
- generated jar, using Flink Java client
- Flink CLI with generated jar, using Flink Java client
- Python "FlinkRunner" using *REST API*
Since the last item does not submit through the Flink Java client,
there is a problem with the flink_master pipeline option. Though we
cannot change the option in a way that breaks existing setups. Like
Robert suggests, we could allow for optional URL syntax so that it can
be used with the REST API, in which case the code path that goes
through the Java client will have to disassemble such URL before
handing it to Flink.
I would find it acceptable to interpret absence of the option as
"[auto]", which really means use CLI or REST API context when present,
or local. I would prefer to not have an empty string value default
(but rather None/null) and no additional magic values.
Thomas
On Mon, Oct 28, 2019 at 4:16 PM Kyle Weaver <kcwea...@google.com
<mailto:kcwea...@google.com>> wrote:
Filed https://issues.apache.org/jira/browse/BEAM-8507 for the
issue I mentioned.
On Mon, Oct 28, 2019 at 4:12 PM Kyle Weaver <kcwea...@google.com
<mailto:kcwea...@google.com>> wrote:
> I'd like to see this issue resolved before 2.17 as changing
the public API once it's released will be harder.
+1. In particular, I misunderstood that [auto] is not
supported by `FlinkUberJarJobServer`. Since [auto] is now the
default, it's broken for Python 3.6+.
requests.exceptions.InvalidURL: Failed to parse: [auto]/v1/config
We definitely should fix that, if nothing else.
> One concern with this is that just supplying host:port is
the existing
> behavior, so we can't start requiring the http://.
The user shouldn't have to specify a protocol for Python, I
think it's preferable and reasonable to handle that for them
in order to maintain existing behavior and align with Java SDK.
> 2. Deprecate the "[auto]" and "[local]" values. It should be
sufficient
> to have either a non-empty address string or an empty one.
The empty
> string would either mean local execution or, in the context
of the Flink
> CLI tool, loading the master address from the config. The
non-empty
> string would be interpreted as a cluster address.
Looks like we also have a [collection] configuration value [1].
If we're using [auto] as the default, I don't think this
really makes so much of a difference (as long as we're
supporting and documenting these properly, of course). I'm not
sure there's a compelling reason to change this?
> always run locally (the least surprising to me
I agree a local cluster should remain the default, whether
that is achieved through [local] or [auto] or some new
mechanism such as the above.
[1]
https://github.com/apache/beam/blob/2f1b56ccc506054e40afe4793a8b556e872e1865/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L80
On Mon, Oct 28, 2019 at 6:35 AM Maximilian Michels
<m...@apache.org <mailto:m...@apache.org>> wrote:
Hi,
Robert and Kyle have been doing great work to simplify
submitting
portable pipelines with the Flink Runner. Part of this is
having a
Python "FlinkRunner" which handles bringing up a Beam job
server and
submitting the pipeline directly via the Flink REST API.
One building
block is the creation of "executable Jars" which contain the
materialized / translated Flink pipeline and do not
require the Beam job
server or the Python driver anymore.
While unifying a newly introduced option
"flink_master_url" with the
pre-existing "flink_master" [1][2], some questions came up
about Flink's
execution modes. (The two options are meant to do the same
thing:
provide the address of the Flink master to hand-over the
translated
pipeline.)
Historically, Flink had a proprietary protocol for
submitting pipelines,
running on port 9091. This has since been replaced with a
REST protocol
at port 8081. To this date, this has implications how you
submit
programs, e.g. the Flink client libraries expects the
address to be of
form "host:port", without a protocol scheme. On the other
hand, external
Rest libraries typically expect a protocol scheme.
But this is only half of the fun. There are also special
addresses for
"flink_master" that influence submission of the pipeline.
If you specify
"[local]" as the address, the pipeline won't be submitted
but executed
in a local in-process Flink cluster. If you specify
"[auto]" and you use
the CLI tool that comes bundled with Flink, then the
master address will
be loaded from the Flink config, including any
configuration like SSL.
If none is found, then it falls back to "[local]".
This is a bit odd, and after a discussion with Robert and
Thomas in [1],
we figured that this needs to be changed:
1. Make the master address a URL. Add "http://" to
"flink_master" in
Python if no scheme is specified. Similarly, remove any
"http://" in
Java, since the Java rest client does not expect a scheme.
In case of
"http_s_://", we have a special treatment to load the SSL
settings from
the Flink config.
2. Deprecate the "[auto]" and "[local]" values. It should
be sufficient
to have either a non-empty address string or an empty one.
The empty
string would either mean local execution or, in the
context of the Flink
CLI tool, loading the master address from the config. The
non-empty
string would be interpreted as a cluster address.
Any opinions on this?
Thanks,
Max
[1] https://github.com/apache/beam/pull/9803
[2] https://github.com/apache/beam/pull/9844