On Wed, Oct 30, 2019 at 3:34 PM Maximilian Michels <m...@apache.org> wrote: > > > One thing I don't understand is what it means for "CLI or REST API > > context [to be] present." Where does this context come from? A config > > file in a standard location on the user's machine? Or is this > > something that is only present when a user uploads a jar and then > > Flink runs it in a specific context? Or both? > > When you upload a Jar to Flink, it can be run by the Flink master. > Running a Jar on the job manager will just invoke the main method. The > same happens when you use the Flink CLI tool, the only difference being > that the Jar runs on the user machine vs on the Flink master. In both > cases, the Flink config file will be present in a standard location, > i.e. "<flink_installation>/conf/flink_conf.yaml". > > What Flink users typically do, is to call this method to construct a > Flink dataflow (a.k.a. job): > env = ExecutionEnvironment.getExecutionEnvironment() > env.addSource().flatMap() > env.execute() > This registers an environment which holds the Flink dataflow definition. > > The Flink Runner also does this when flink_master is set to "[auto]". > When it is set to "[local]", it will attempt to execute the whole > pipeline locally (LocalEnvironment). When an address has been specified > it will submit against a remote Flink cluster (RemoteEnvironment). The > last two use cases do not make any sense for the user when they use the > Python FlinkRunner which uses the jar upload feature of Flink. That's > why "[auto]" is our choice for the FlinkUberJarJobServer feature of the > Python FlinkRunner. > > In the case of the FlinkJarJobServer to use "[auto]" can also make sense > because you can even specify a Flink config directory for the Flink job > server to use. Without a config, auto will always fall back to local > execution.
Thanks for clarifying. So when I run "./flink my_pipeline.jar" or upload the jar via the REST API (and its main method invoked on the master) then [auto] reads the config and does the right thing, but if I do java my_pipeline.jar it'll run locally. > > One more question: https://issues.apache.org/jira/browse/BEAM-8396 > > still seems valuable, but with [auto] as the default, how should we > > detect whether LOOPBACK is safe to enable from Python? > > Yes, it is valuable. I suspect we only want to enable it for local > execution? Yes. > We could let the actual Runner handle this by falling back to > the default environment in case it detects that the execution will not > be local. It can simply tell Python then to shutdown the loopback > server, or it shuts itself down after a timeout. Python needs to know even whether to start up the loopback server, and provide the address when submitting the pipeline. If I understood correctly above, the only time that the job server interprets [auto] as something other than [local] is when creating the jar for later submission. (In this case the flink master isn't even used, other than being baked into the jar, right? And baking anything in but [auto] seems wrong...) So it seems we could guard using LOOPBACK it on this flag + [local] or [auto]. > Another option would > be, to only support it when the mode is set to "[local]". Well, I'd really like to support it by default... > On 30.10.19 21:05, Robert Bradshaw wrote: > > One more question: https://issues.apache.org/jira/browse/BEAM-8396 > > still seems valuable, but with [auto] as the default, how should we > > detect whether LOOPBACK is safe to enable from Python? > > > > On Wed, Oct 30, 2019 at 11:53 AM Robert Bradshaw <rober...@google.com> > > wrote: > >> > >> Sounds good to me. > >> > >> One thing I don't understand is what it means for "CLI or REST API > >> context [to be] present." Where does this context come from? A config > >> file in a standard location on the user's machine? Or is this > >> something that is only present when a user uploads a jar and then > >> Flink runs it in a specific context? Or both? > >> > >> On Tue, Oct 29, 2019 at 3:27 AM Maximilian Michels <m...@apache.org> wrote: > >>> > >>> tl;dr: > >>> > >>> - I see consensus for inferring "http://" in Python to align it with the > >>> Java behavior which currently requires leaving out the protocol scheme. > >>> Optionally, Java could also accept a scheme which gets removed as > >>> required by the Flink Java Rest client. > >>> > >>> - We won't support "https://" in Python for now, because it requires > >>> additional SSL setup, i.e. parsing the Flink config file and setting up > >>> the truststore > >>> > >>> - We want to keep "[auto]"/"[local]" but fix the current broken behavior > >>> via https://issues.apache.org/jira/browse/BEAM-8507 > >>> > >>> > >>> Additional comments below: > >>> > >>>> One concern with this is that just supplying host:port is the existing > >>>> behavior, so we can't start requiring the http:// > >>> > >>> I wouldn't require it but optionally support it, otherwise add it > >>> automatically. > >>> > >>>> One question I have is if there are other > >>>> authentication parameters that may be required for speaking to a flink > >>>> endpoint that we should be aware of (that would normally be buried in > >>>> the config file). > >>> > >>> Yes, essentially all the SSL configuration is in the config file, > >>> including the location of the truststore, the password, certificates, etc. > >>> > >>> For now, I would say we cannot properly support SSL in Python, unless we > >>> find a way to load the truststore from Python. > >>> > >>>> I do like being explicit with something like [local] rather than > >>>> treating the empty string in a magical way. > >>> > >>> Fine, we can keep "[local]" and throw an error in case the address is > >>> empty. Let's also throw an error in case the Flink CLI tool is used with > >>> local execution because that is clearly not what the user wants. > >>> > >>>>> I'd like to see this issue resolved before 2.17 as changing the public > >>>>> API once it's released will be harder. > >>>> > >>>> +1. In particular, I misunderstood that [auto] is not supported by > >>>> `FlinkUberJarJobServer`. Since [auto] is now the default, it's broken > >>>> for Python 3.6+. > >>> > >>> +1 Let's fix this in time for the release. > >>> > >>>> The user shouldn't have to specify a protocol for Python, I think it's > >>>> preferable and reasonable to handle that for them in order to maintain > >>>> existing behavior and align with Java SDK. > >>> > >>> +1 > >>> > >>>> Looks like we also have a [collection] configuration value [1]. > >>> > >>> Yeah, I think it is acceptable to remove this entirely. This has never > >>> been used by anyone and is an unmaintained Flink feature. > >>> > >>>> I would find it acceptable to interpret absence of the option as > >>>> "[auto]", which really means use CLI or REST API context when present, > >>>> or local. I would prefer to not have an empty string value default (but > >>>> rather None/null) and no additional magic values. > >>> > >>> Let's keep "[auto]" then to keep it explicit. An empty string should > >>> throw an error. > >>> > >>> > >>>> One more reason that was not part of this discussion yet. > >>> > >>> @Jan: Supporting context classloaders in local mode is a new feature and > >>> for keeping it simple, I'd start a new thread for it. > >>> > >>> > >>> On 29.10.19 10:55, Jan Lukavský wrote: > >>>> Hi, > >>>> > >>>> +1 for empty string being interpreted as [auto] and anything else having > >>>> explicit notation. > >>>> > >>>> One more reason that was not part of this discussion yet. In [1] there > >>>> was a discussion about LocalEnvironment (that is the one that is > >>>> responsible for spawning in process Flink cluster) not using context > >>>> classloader and thus can fail loading some user code (if this code was > >>>> added to context classloader *after* application has been run). > >>>> LocalEnvironment on the other hand supposes that all classes can be > >>>> loaded by applicaiton's classloader and doesn't accept any "client > >>>> jars". Therefore - when application generates classes dynamically during > >>>> runtime it is currently impossible to run those using local flink > >>>> runner. There is a nasty hack for JDK <= 8 (injecting URL into > >>>> applications URLClassLoader), but that fails hard on JDK >= 9 > >>>> (obviously). > >>>> > >>>> The conclusion from that thread is that it could be solved by manually > >>>> running MiniCluster (which will run on localhost:8081 by default) and > >>>> then use this REST address for RemoteEnvironment so that the application > >>>> would be actually submitted as if it would be run on remote cluster and > >>>> therefore a dynamically generated JAR can be attached to it. > >>>> > >>>> That would mean, that we can actually have two "local" modes - one using > >>>> LocalEnvironment and one with manual MiniCluster + RemoteEnvironment (if > >>>> for whatever reason we would like to keep both mode of local operation). > >>>> That could mean two masters - e.g. [local] and [local-over-remote] or > >>>> something like that. > >>>> > >>>> Jan > >>>> > >>>> [1] > >>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ClassLoader-created-by-BlobLibraryCacheManager-is-not-using-context-classloader-tp32434.html > >>>> > >>>> On 10/29/19 5:50 AM, Thomas Weise wrote: > >>>>> The current semantics of flink_master are tied to the Flink Java API. > >>>>> The Flink client / Java API isn't a "REST API". It now uses the REST > >>>>> API somewhere deep in RemoteEnvironment when the flink_master value is > >>>>> host:port, but it does a lot of other things as well, such are parsing > >>>>> config files and running local clusters. > >>>>> > >>>>> A rest client to me is a thin wrapper around > >>>>> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html > >>>>> > >>>>> > >>>>> Different ways used in Beam to submit jobs to Flink: > >>>>> > >>>>> - classic Flink runner, using Flink Java client > >>>>> - job server, using Flink Java client > >>>>> - generated jar, using Flink Java client > >>>>> - Flink CLI with generated jar, using Flink Java client > >>>>> - Python "FlinkRunner" using *REST API* > >>>>> > >>>>> Since the last item does not submit through the Flink Java client, > >>>>> there is a problem with the flink_master pipeline option. Though we > >>>>> cannot change the option in a way that breaks existing setups. Like > >>>>> Robert suggests, we could allow for optional URL syntax so that it can > >>>>> be used with the REST API, in which case the code path that goes > >>>>> through the Java client will have to disassemble such URL before > >>>>> handing it to Flink. > >>>>> > >>>>> I would find it acceptable to interpret absence of the option as > >>>>> "[auto]", which really means use CLI or REST API context when present, > >>>>> or local. I would prefer to not have an empty string value default > >>>>> (but rather None/null) and no additional magic values. > >>>>> > >>>>> Thomas > >>>>> > >>>>> > >>>>> On Mon, Oct 28, 2019 at 4:16 PM Kyle Weaver <kcwea...@google.com > >>>>> <mailto:kcwea...@google.com>> wrote: > >>>>> > >>>>> Filed https://issues.apache.org/jira/browse/BEAM-8507 for the > >>>>> issue I mentioned. > >>>>> > >>>>> On Mon, Oct 28, 2019 at 4:12 PM Kyle Weaver <kcwea...@google.com > >>>>> <mailto:kcwea...@google.com>> wrote: > >>>>> > >>>>> > I'd like to see this issue resolved before 2.17 as changing > >>>>> the public API once it's released will be harder. > >>>>> > >>>>> +1. In particular, I misunderstood that [auto] is not > >>>>> supported by `FlinkUberJarJobServer`. Since [auto] is now the > >>>>> default, it's broken for Python 3.6+. > >>>>> > >>>>> requests.exceptions.InvalidURL: Failed to parse: > >>>>> [auto]/v1/config > >>>>> > >>>>> We definitely should fix that, if nothing else. > >>>>> > >>>>> > One concern with this is that just supplying host:port is > >>>>> the existing > >>>>> > behavior, so we can't start requiring the http://. > >>>>> > >>>>> The user shouldn't have to specify a protocol for Python, I > >>>>> think it's preferable and reasonable to handle that for them > >>>>> in order to maintain existing behavior and align with Java SDK. > >>>>> > >>>>> > 2. Deprecate the "[auto]" and "[local]" values. It should be > >>>>> sufficient > >>>>> > to have either a non-empty address string or an empty one. > >>>>> The empty > >>>>> > string would either mean local execution or, in the context > >>>>> of the Flink > >>>>> > CLI tool, loading the master address from the config. The > >>>>> non-empty > >>>>> > string would be interpreted as a cluster address. > >>>>> > >>>>> Looks like we also have a [collection] configuration value [1]. > >>>>> > >>>>> If we're using [auto] as the default, I don't think this > >>>>> really makes so much of a difference (as long as we're > >>>>> supporting and documenting these properly, of course). I'm not > >>>>> sure there's a compelling reason to change this? > >>>>> > >>>>> > always run locally (the least surprising to me > >>>>> > >>>>> I agree a local cluster should remain the default, whether > >>>>> that is achieved through [local] or [auto] or some new > >>>>> mechanism such as the above. > >>>>> > >>>>> [1] > >>>>> > >>>>> https://github.com/apache/beam/blob/2f1b56ccc506054e40afe4793a8b556e872e1865/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L80 > >>>>> > >>>>> On Mon, Oct 28, 2019 at 6:35 AM Maximilian Michels > >>>>> <m...@apache.org <mailto:m...@apache.org>> wrote: > >>>>> > >>>>> Hi, > >>>>> > >>>>> Robert and Kyle have been doing great work to simplify > >>>>> submitting > >>>>> portable pipelines with the Flink Runner. Part of this is > >>>>> having a > >>>>> Python "FlinkRunner" which handles bringing up a Beam job > >>>>> server and > >>>>> submitting the pipeline directly via the Flink REST API. > >>>>> One building > >>>>> block is the creation of "executable Jars" which contain > >>>>> the > >>>>> materialized / translated Flink pipeline and do not > >>>>> require the Beam job > >>>>> server or the Python driver anymore. > >>>>> > >>>>> While unifying a newly introduced option > >>>>> "flink_master_url" with the > >>>>> pre-existing "flink_master" [1][2], some questions came up > >>>>> about Flink's > >>>>> execution modes. (The two options are meant to do the same > >>>>> thing: > >>>>> provide the address of the Flink master to hand-over the > >>>>> translated > >>>>> pipeline.) > >>>>> > >>>>> Historically, Flink had a proprietary protocol for > >>>>> submitting pipelines, > >>>>> running on port 9091. This has since been replaced with a > >>>>> REST protocol > >>>>> at port 8081. To this date, this has implications how you > >>>>> submit > >>>>> programs, e.g. the Flink client libraries expects the > >>>>> address to be of > >>>>> form "host:port", without a protocol scheme. On the other > >>>>> hand, external > >>>>> Rest libraries typically expect a protocol scheme. > >>>>> > >>>>> But this is only half of the fun. There are also special > >>>>> addresses for > >>>>> "flink_master" that influence submission of the pipeline. > >>>>> If you specify > >>>>> "[local]" as the address, the pipeline won't be submitted > >>>>> but executed > >>>>> in a local in-process Flink cluster. If you specify > >>>>> "[auto]" and you use > >>>>> the CLI tool that comes bundled with Flink, then the > >>>>> master address will > >>>>> be loaded from the Flink config, including any > >>>>> configuration like SSL. > >>>>> If none is found, then it falls back to "[local]". > >>>>> > >>>>> This is a bit odd, and after a discussion with Robert and > >>>>> Thomas in [1], > >>>>> we figured that this needs to be changed: > >>>>> > >>>>> 1. Make the master address a URL. Add "http://" to > >>>>> "flink_master" in > >>>>> Python if no scheme is specified. Similarly, remove any > >>>>> "http://" in > >>>>> Java, since the Java rest client does not expect a scheme. > >>>>> In case of > >>>>> "http_s_://", we have a special treatment to load the SSL > >>>>> settings from > >>>>> the Flink config. > >>>>> > >>>>> 2. Deprecate the "[auto]" and "[local]" values. It should > >>>>> be sufficient > >>>>> to have either a non-empty address string or an empty one. > >>>>> The empty > >>>>> string would either mean local execution or, in the > >>>>> context of the Flink > >>>>> CLI tool, loading the master address from the config. The > >>>>> non-empty > >>>>> string would be interpreted as a cluster address. > >>>>> > >>>>> > >>>>> Any opinions on this? > >>>>> > >>>>> > >>>>> Thanks, > >>>>> Max > >>>>> > >>>>> > >>>>> [1] https://github.com/apache/beam/pull/9803 > >>>>> [2] https://github.com/apache/beam/pull/9844 > >>>>>