Hi Til,

Thank you very much for that. And thanks for your help. I have finally managed 
to get the multi-cloud setup on Docker Swarm working by tweaking the Flink 
image slightly to set these configuration options to known values. I have also 
used the Weave Net Docker plugin to create my cross-cloud network.


I am in the process of documenting my experience in a blog article, which I 
will share in this list so others can hopefully benefit from it.


Thank you and the rest of the Flink team once again for all your help and 
support.


Best wishes,


Thalita

________________________________
From: Till Rohrmann <trohrm...@apache.org>
Sent: 10 November 2017 12:15:00
To: Vergilio, Thalita
Cc: Piotr Nowojski; user@flink.apache.org; Patrick Lucas
Subject: Re: Docker-Flink Project: TaskManagers can't talk to JobManager if 
they are on different nodes


Hi Thalita, yes you can use the mentioned configuration parameters to set the 
ports for the TaskManager and the BlobServer. However, you must make sure that 
there is at most one TM running on a host, otherwise you run into port 
collisions.

For taskmanager.rpc.port and blob.server.port you can define a range.

Cheers,
Till

​

On Fri, Nov 10, 2017 at 11:47 AM, Vergilio, Thalita 
<t.vergilio4...@student.leedsbeckett.ac.uk<mailto:t.vergilio4...@student.leedsbeckett.ac.uk>>
 wrote:

Hi All,


I just wanted to let you know that I have finally managed to get the 
multi-cloud setup working!! I honestly can't believe my eyes. I used a Docker 
plugin called Weave to create the Swarm network, a public external IP address 
for each node and opened a range of ports, and I can now get my Google Cloud 
machine to connect to the Azure machines.


There are still some minor issues, i.e. I don't know which exact ports to open 
for TaskManager communication in Flink. They seem to be getting assigned 
randomly at runtime, so I had to open a wide range of ports to allow the 
communication to happen, which is far from ideal.


Is there a way of finding out what these ports are and setting them to a 
constant value? Looking at the documentation, the suspects are:


  *   taskmanager.rpc.port: The task manager’s IPC port (DEFAULT: 0, which lets 
the OS choose a free port).

  *   taskmanager.data.port: The task manager’s port used for data exchange 
operations (DEFAULT: 0, which lets the OS choose a free port).

  *   blob.server.port: Port definition for the blob server (serving user JARs) 
on the TaskManagers. By default the port is set to 0, which means that the 
operating system is picking an ephemeral port. Flink also accepts a list of 
ports (“50100,50101”), ranges (“50100-50200”) or a combination of both. It is 
recommended to set a range of ports to avoid collisions when multiple 
JobManagers are running on the same machine.

Many thanks,


Thalita


________________________________
From: Vergilio, Thalita
Sent: 09 November 2017 22:04:24
To: Till Rohrmann

Cc: Piotr Nowojski; user@flink.apache.org<mailto:user@flink.apache.org>; 
Patrick Lucas
Subject: Re: Docker-Flink Project: TaskManagers can't talk to JobManager if 
they are on different nodes


Hi Till,


I have made some progress with the name resolution for machines that are not in 
the same subnet. The problem I am facing now is Flink-specific, so I wonder if 
you could help me.


It is all running fine in a multi-cloud setup with the jobmanager in Azure and 
the taskmanager in the Google cloud. However, when I scale the taskmanager up 
and it start running on Azure nodes as well, I get an Akka error which I 
presume means the taskmanagers can't talk to each other when parallelising the 
task.


Do you know what the IP address and port below are? Are they assigned by Flink?


Thank you very much.


Thalita


java.lang.Exception: Cannot deploy task Source: Read(UnboundedKafkaSource) -> 
Flat Map -> KafkaPuePipelineProcessor/Window.Into()/Window.Assign.out -> 
ParMultiDo(Anonymous) -> ToKeyedWorkItem (2/3) 
(b9f31626fb7d83d39e24e570e034f03e) - TaskManager 
(3a9c37463c88510a44097df0c99b5f90 @ 172.18.0.3 (dataPort=38963)) not responding 
after a timeout of 10000 ms
        at 
org.apache.flink.runtime.executiongraph.Execution$2.apply(Execution.java:437)
        at 
org.apache.flink.runtime.executiongraph.Execution$2.apply(Execution.java:429)
        at 
org.apache.flink.runtime.concurrent.impl.FlinkFuture$3.recover(FlinkFuture.java:201)
        at akka.dispatch.Recover.internal(Future.scala:268)
        at akka.dispatch.japi$RecoverBridge.apply(Future.scala:184)
        at akka.dispatch.japi$RecoverBridge.apply(Future.scala:182)
        at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:185)
        at scala.util.Try$.apply(Try.scala:161)
        at scala.util.Failure.recover(Try.scala:185)
        at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
        at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: akka.pattern.AskTimeoutException: Ask timed out on 
[Actor[akka.tcp://flink@172.18.0.3:37959/user/taskmanager#364916492<http://flink@172.18.0.3:37959/user/taskmanager#364916492>]]
 after [10000 ms]
        at 
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
        at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
        at 
scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
        at 
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
        at 
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
        at 
akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
        at 
akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)
        at 
akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
        ... 1 more


________________________________
From: Till Rohrmann <trohrm...@apache.org<mailto:trohrm...@apache.org>>
Sent: 06 November 2017 13:48:59
To: Vergilio, Thalita
Cc: Piotr Nowojski; user@flink.apache.org<mailto:user@flink.apache.org>; 
Patrick Lucas
Subject: Re: Docker-Flink Project: TaskManagers can't talk to JobManager if 
they are on different nodes

I'm not entirely sure how docker swarm works but from the Flink perspective 
there mustn't be two TaskManagers running on the same host (meaning an entity 
where you share the same address) if you set the TaskManager data port to a 
fixed value (otherwise only one of them can be started due to port conflicts). 
If you can ensure that this is the case, then it should be save to specify a 
port for the data transmission.

Cheers,
Till

On Mon, Nov 6, 2017 at 2:37 PM, Vergilio, Thalita 
<t.vergilio4...@student.leedsbeckett.ac.uk<mailto:t.vergilio4...@student.leedsbeckett.ac.uk>>
 wrote:

Hi Till,


Thanks a lot for your answer.


Is the taskmanager.data.port unique per TaskManager? The documentation says it 
is assigned at runtime by the OS. My thinking here is that you would need to 
know what that is at service creation time, which would go against the whole 
idea of how services are scaled in Docker Swarm.


When you create a Swarm service using 'docker stack deploy' or 'docker service 
create', the configuration that is used at that point is the same that will be 
used by all instances of the service. If you then scale TaskManager to 8 or 10 
containers, each of them gets the same service configuration(the one used to 
create the service).


I have in fact tried to map specific ports in the TaskManager service 
configuration, but then I got "port already in use" when I tried to scale up 
the service.


I wonder if there is a way around it.


Maybe the people who developed the create-docker-swarm-service.sh script in the 
docker-flink project would be able to shed some light?



________________________________
From: Till Rohrmann <trohrm...@apache.org<mailto:trohrm...@apache.org>>
Sent: 06 November 2017 12:40:33
To: Piotr Nowojski
Cc: Vergilio, Thalita; user@flink.apache.org<mailto:user@flink.apache.org>; 
Patrick Lucas

Subject: Re: Docker-Flink Project: TaskManagers can't talk to JobManager if 
they are on different nodes

Hi Thalita,

in order to make Flink work, I think you have to expose the JobManager RPC 
port, the Blob server port and make sure that the TaskManager can talk to each 
other by exposing the `taskmanager.data.port`. The query server port is only 
necessary if you want to use queryable state.

I've pulled in Patrick who has more experience with running Flink on top of 
Docker. He'll definitely be able to provide more detailed recommendations.

Cheers,
Till

On Mon, Nov 6, 2017 at 9:22 AM, Piotr Nowojski 
<pi...@data-artisans.com<mailto:pi...@data-artisans.com>> wrote:
Till, is there somewhere a list of ports that need to exposed that’s more up to 
date compared to docker-flunk README?

Piotrek

On 3 Nov 2017, at 10:23, Vergilio, Thalita 
<t.vergilio4...@student.leedsbeckett.ac.uk<mailto:t.vergilio4...@student.leedsbeckett.ac.uk>>
 wrote:

Just an update: by changing the JOB_MANAGER_RPC_ADDRESS to the public IP of the 
JobManager and exposing port 6123 as {{PUBLIC_IP}}:6123:6123, I manged to get 
the TaskManagers from different nodes and even different subnets to talk to the 
JobManager.

This is how I created the services:


docker network create -d overlay overlay

docker service create --name jobmanager --env 
JOB_MANAGER_RPC_ADDRESS={{PUBLIC_IP}}  -p 8081:8081 -p{{PUBLIC_IP}}:6123:6123 
-p 48081:48081 -p 6124:6124 -p 6125:6125 --network overlay --constraint 
'node.hostname == ubuntu-swarm-manager' flink jobmanager

docker service create --name taskmanager --env 
JOB_MANAGER_RPC_ADDRESS={{PUBLIC_IP}}  -p 6121:6121 -p 6122:6122  --network 
overlay --constraint 'node.hostname != ubuntu-swarm-manager' flink taskmanager

However, I am still encountering errors further down the line. When I submit a 
job using the Web UI, it fails because the JobManager can't talk to the 
TaskManager on port 35033. I presume this is the taskmanager.data.port, which 
needs to be set to a range and this range exposed when I create the service?

Are there any other ports that I need to open at service creation time?


Connecting the channel failed: Connecting to remote task manager + 
'/{{IP_ADDRESS_OF_MANAGER}}:35033' has failed. This might indicate that the 
remote task manager has been lost.
        at 
org.apache.flink.runtime.io<http://org.apache.flink.runtime.io>.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)
        at 
org.apache.flink.runtime.io<http://org.apache.flink.runtime.io>.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:131)
        at 
org.apache.flink.runtime.io<http://org.apache.flink.runtime.io>.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:83)
        at 
org.apache.flink.runtime.io<http://org.apache.flink.runtime.io>.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:59)
        at 
org.apache.flink.runtime.io<http://org.apache.flink.runtime.io>.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:112)
        at 
org.apache.flink.runtime.io<http://org.apache.flink.runtime.io>.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:433)
        at 
org.apache.flink.runtime.io<http://org.apache.flink.runtime.io>.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:455)
        at 
org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:91)
        at 
org.apache.flink.streaming.runtime.io<http://runtime.io>.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
        at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:748)



________________________________
From: Piotr Nowojski <pi...@data-artisans.com<mailto:pi...@data-artisans.com>>
Sent: 02 November 2017 14:26:32
To: Vergilio, Thalita
Cc: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: Docker-Flink Project: TaskManagers can't talk to JobManager if 
they are on different nodes

Did you try to expose required ports that are listed in the README when 
starting the containers?

https://github.com/apache/flink/tree/master/flink-contrib/docker-flink

Ports:
• The Web Client is on port 48081
• JobManager RPC port 6123 (default, not exposed to host)
• TaskManagers RPC port 6122 (default, not exposed to host)
• TaskManagers Data port 6121 (default, not exposed to host)

Piotrek

On 2 Nov 2017, at 14:44, javalass 
<t.vergilio4...@student.leedsbeckett.ac.uk<mailto:t.vergilio4...@student.leedsbeckett.ac.uk>>
 wrote:

I am using the Docker-Flink project in:
https://github.com/apache/flink/tree/master/flink-contrib/docker-flink

I am creating the services with the following commands:
docker network create -d overlay overlay
docker service create --name jobmanager --env
JOB_MANAGER_RPC_ADDRESS=jobmanager -p 8081:8081 --network overlay
--constraint 'node.hostname == ubuntu-swarm-manager' flink jobmanager
docker service create --name taskmanager --env
JOB_MANAGER_RPC_ADDRESS=jobmanager --network overlay --constraint
'node.hostname != ubuntu-swarm-manager' flink taskmanager

I wonder if there's any configuration I'm missing. This is the error I get:
- Trying to register at JobManager akka.tcp://flink@jobmanager:6123/
user/jobmanager (attempt 4, timeout: 4000 milliseconds)







--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

To view the terms under which this email is distributed, please go to:-
http://disclaimer.leedsbeckett.ac.uk/disclaimer/disclaimer.html


To view the terms under which this email is distributed, please go to:-
http://disclaimer.leedsbeckett.ac.uk/disclaimer/disclaimer.html

To view the terms under which this email is distributed, please go to:-
http://disclaimer.leedsbeckett.ac.uk/disclaimer/disclaimer.html

To view the terms under which this email is distributed, please go to:-
http://disclaimer.leedsbeckett.ac.uk/disclaimer/disclaimer.html

Reply via email to