Patrick Lucas created FLINK-6369:
------------------------------------

             Summary: Better support for overlay networks
                 Key: FLINK-6369
                 URL: https://issues.apache.org/jira/browse/FLINK-6369
             Project: Flink
          Issue Type: Improvement
          Components: Docker, Network
    Affects Versions: 1.2.0
            Reporter: Patrick Lucas


Running Flink in an environment that utilizes an overlay network (containerized 
environments like Kubernetes or Docker Compose, or cloud platforms like AWS 
VPC) poses various challenges related to networking.

The core problem is that in these environments, applications are frequently 
addressed by a name different from that with which the application sees itself.

For instance, it is plausible that the Flink UI (served by the Jobmanager) is 
accessed via an ELB, which poses a problem in HA mode when the non-leader UI 
returns an HTTP redirect to the leader—but the user may not be able to connect 
directly to the leader.

Or, if a user is using [Docker 
Compose|https://github.com/apache/flink/blob/aa21f853ab0380ec1f68ae1d0b7c8d9268da4533/flink-contrib/docker-flink/docker-compose.yml],
 they cannot submit a job via the CLI since there is a mismatch between the 
name used to address the Jobmanager and what the Jobmanager perceives as its 
hostname. (see \[1] below for more detail)

----

h3. Problems and proposed solutions

There are four instances of this issue that I've run into so far:

h4. Jobmanagers must be addressed by the same name they are configured with due 
to limitations of Akka

Akka enforces that messages it receives are addressed with the hostname it is 
configured with. Newer versions of Akka (>= 2.4) than what Flink uses 
(2.3-custom) have support for accepting messages with the "wrong" hostname, but 
it limited to a single "external" hostname.

In many environments, it is likely that not all parties that want to connect to 
the Jobmanager have the same way of addressing it (e.g. the ELB example above). 
Other similarly-used protocols like HTTP generally don't have this restriction: 
if you connect on a socket and send a well-formed message, the system assumes 
that it is the desired recipient.

One solution is to not use Akka at all when communicating with the cluster from 
the outside, perhaps using an HTTP API instead. This would be somewhat 
involved, and probabyl best left as a longer-term goal.

A more immediate solution would be to override this behavior within Flakka, the 
custom fork of Akka currently in use by Flink. I'm not sure how much effort 
this would take.

h4. The Jobmanager needs to be able to address the Taskmanagers for e.g. 
metrics collection

Having the Taskmanagers register themselves by IP is probably the best solution 
here. It's a reasonable assumption that IPs can always be used for 
communication between the nodes of a single cluster. Asking that each 
Taskmanager container have a resolvable hostname is unreasonable.

h4. Jobmanagers in HA mode send HTTP redirects to URLs that aren't externally 
resolvable/routable

If multiple Jobmanagers are used in HA mode, HTTP requests to non-leaders (such 
as if you put a Kubernetes Service in front of all Jobmanagers in a cluster) 
get redirected to the (supposed) hostname of the leader, but this is 
potentially unresolvable/unroutable externally.

Enabling non-leader Jobmanagers to proxy API calls to the leader would solve 
this. The non-leaders could even serve static asset requests (e.g. for css or 
js files) directly.

h4. Queryable state requests involve direct communication with Taskmanagers

Currently, queryable state requests involve communication between the client 
and the Jobmanager (for key partitioning lookups) and between the client and 
all Taskmanagers.

If the client is inside the network (as would be common in production use-cases 
where high-volume lookups are required) this is a non-issue, but problems crop 
up if the client is outside the network.

For the communication with the Jobmanager, a similar solution as above can be 
used: if all Jobmanagers can service all key partitioning lookup requests (e.g. 
by proxying) then a simple Service can be used.

The story is a bit different for the Taskmanagers. The partitioning lookup to 
the Jobmanager would return the name of the particular Taskmanager that owned 
the desired data, but that name (likely an IP, as proposed in the second 
section above) is not necessarily resolvable/routable from the client.

In the context of Kubernetes, where individual containers are generally not 
addressible, a very ugly solution would involve creating a Service for each 
Taskmanager, then cleverly configuring things such that the same name could be 
used to address a specific Taskmanager both inside and outside the network. \[2]

A much nicer solution would be, like in the previous section, to enable 
Taskmanagers to proxy any queryable state lookup to the appropriate member of 
the cluster. Once again, the principle is for every node to be able to fulfill 
every request.

This is of course less efficient than addressing the "correct" Taskmanager 
directly, but it greatly simplifies the situation for users that want to make 
queryable state requests from outside the network.

----

h3. Subtasks

Once there has been some discussion about the proposed solutions above, this 
issue can be used as umbrella ticket for any relevant subtasks.

----

h3. Footnotes

\[1] In this example, the Jobmanager may be configured with 
{{jobmanager.rpc.address: jobmanager}} and indeed, within the Docker network 
containing the nodes of the cluster, the name {{jobmanager}} is resolveable. 
But outside the Docker network, the port is mapped to {{localhost}}. When the 
user runs {{$ flink run -m localhost:6123 ...}}, the CLI connects to the 
Jobmanager using Akka, but Akka enforces that received messages are addressed 
with the same name it is configured with. The result is that the CLI hangs 
until a timeout is reached, and warning messages appear in the Jobmanager's log 
like:

{noformat}dropping message [class akka.actor.ActorSelectionMessage] for 
non-local recipient [Actor[akka.tcp://flink@localhost:6123/]] arriving at 
[akka.tcp://flink@localhost:6123] inbound addresses are 
[akka.tcp://flink@jobmanager:6123]
2017-04-24 09:47:52,560 WARN  akka.remote.ReliableDeliverySupervisor{noformat}

\[2] Another option is to use a Kubernetes StatefulSet, which gives you per-pod 
addressability. The downside is that currently all scaling operations on a 
StatefulSet (including initial creation) always create or delete pods in 
sequence instead of concurrently, making cluster launch time linear with the 
number of nodes in the cluster.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to