And it works now The problem was that I was setting jobmaneger.rest.address jobmanager.rpc.address
that was creating actor system on the local host Although I am still getting the below messages in the job manager periodically, but they seem to be harmless ERROR org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler - Caught exception java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:192) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) at org.apache.flink.shaded.netty4.io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288) at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1108) at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:345) at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884) at java.lang.Thread.run(Thread.java:748) 2019-02-22 16:49:22,016 WARN org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - Unhandled exception java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:192) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) at org.apache.flink.shaded.netty4.io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288) at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1108) at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:345) at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884) at java.lang.Thread.run(Thread.java:748) Boris Lublinsky FDP Architect boris.lublin...@lightbend.com https://www.lightbend.com/ > On Feb 22, 2019, at 10:35 AM, Andrey Zagrebin <and...@ververica.com> wrote: > > cc alek...@ververica.com <mailto:alek...@ververica.com> > On Fri, Feb 22, 2019 at 1:28 AM Boris Lublinsky > <boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com>> wrote: > Adding metric-query port makes it a bit better, but there is still an error > > > 019-02-22 00:03:56,173 INFO > org.apache.flink.runtime.taskexecutor.TaskExecutor - Could not > resolve ResourceManager address > akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/user/resourcemanager > <>, retrying in 10000 ms: Ask timed out on > [ActorSelection[Anchor(akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/ > <>), Path(/user/resourcemanager)]] after [10000 ms]. Sender[null] sent > message of type "akka.actor.Identify".. > 2019-02-22 00:04:16,213 INFO > org.apache.flink.runtime.taskexecutor.TaskExecutor - Could not > resolve ResourceManager address > akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/user/resourcemanager > <>, retrying in 10000 ms: Ask timed out on > [ActorSelection[Anchor(akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/ > <>), Path(/user/resourcemanager)]] after [10000 ms]. Sender[null] sent > message of type "akka.actor.Identify".. > 2019-02-22 00:04:36,253 INFO > org.apache.flink.runtime.taskexecutor.TaskExecutor - Could not > resolve ResourceManager address > akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/user/resourcemanager > <>, retrying in 10000 ms: Ask timed out on > [ActorSelection[Anchor(akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/ > <>), Path(/user/resourcemanager)]] after [10000 ms]. Sender[null] sent > message of type "akka.actor.Identify".. > 2019-02-22 00:04:56,293 INFO > org.apache.flink.runtime.taskexecutor.TaskExecutor - Could not > resolve ResourceManager address > akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/user/resourcemanager > <>, retrying in 10000 ms: Ask timed out on > [ActorSelection[Anchor(akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/ > <>), Path(/user/resourcemanager)]] after [10000 ms]. Sender[null] sent > message of type "akka.actor.Identify”.. > > In the task manager and > > 2019-02-21 23:59:46,479 ERROR akka.remote.EndpointWriter > - dropping message [class akka.actor.ActorSelectionMessage] for > non-local recipient > [Actor[akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/ <>]] arriving > at [akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123 <>] inbound > addresses are [akka.tcp://flink@127.0.0.1:6123 <>] > 2019-02-21 23:59:57,808 ERROR akka.remote.EndpointWriter > - dropping message [class akka.actor.ActorSelectionMessage] for > non-local recipient > [Actor[akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/ <>]] arriving > at [akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123 <>] inbound > addresses are [akka.tcp://flink@127.0.0.1:6123 <>] > 2019-02-22 00:00:06,519 ERROR akka.remote.EndpointWriter > - dropping message [class akka.actor.ActorSelectionMessage] for > non-local recipient > [Actor[akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/ <>]] arriving > at [akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123 <>] inbound > addresses are [akka.tcp://flink@127.0.0.1:6123 <>] > 2019-02-22 00:00:17,849 ERROR akka.remote.EndpointWriter > - dropping message [class akka.actor.ActorSelectionMessage] for > non-local recipient > [Actor[akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/ <>]] arriving > at [akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123 <>] inbound > addresses are [akka.tcp://flink@127.0.0.1:6123 <>] > 2019-02-22 00:00:26,558 ERROR akka.remote.EndpointWriter > - dropping message [class akka.actor.ActorSelectionMessage] for > non-local recipient > [Actor[akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/ <>]] arriving > at [akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123 <>] inbound > addresses are [akka.tcp://flink@127.0.0.1:6123 <>] > 2019-02-22 00:00:37,888 ERROR akka.remote.EndpointWriter > - dropping message [class akka.actor.ActorSelectionMessage] for > non-local recipient > [Actor[akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/ <>]] arriving > at [akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123 <>] inbound > addresses are [akka.tcp://flink@127.0.0.1:6123 <>] > > I the job manager > > Port 6123 is opened in both Job Manager deployment > > apiVersion: extensions/v1beta1 > kind: Deployment > metadata: > name: {{ template "fullname" . }}-jobmanager > spec: > replicas: 1 > template: > metadata: > annotations: > prometheus.io/scrape: <http://prometheus.io/scrape:> 'true' > prometheus.io/port: <http://prometheus.io/port:> '9249' > labels: > server: flink > app: {{ template "fullname" . }} > component: jobmanager > spec: > containers: > - name: jobmanager > image: {{ .Values.image }}:{{ .Values.imageTag }} > imagePullPolicy: {{ .Values.imagePullPolicy }} > args: > - jobmanager > ports: > - containerPort: 6123 > name: rpc > - containerPort: 6124 > name: blob > - containerPort: 8081 > name: ui > env: > - name: CONTAINER_METRIC_PORT > value: '{{ .Values.flink.metric_query_port }}' > - name: JOB_MANAGER_RPC_ADDRESS > value : {{ template "fullname" . }}-jobmanager > livenessProbe: > httpGet: > path: /overview > port: 8081 > initialDelaySeconds: 30 > periodSeconds: 10 > resources: > limits: > cpu: {{ .Values.resources.jobmanager.limits.cpu }} > memory: {{ .Values.resources.jobmanager.limits.memory }} > requests: > cpu: {{ .Values.resources.jobmanager.requests.cpu }} > memory: {{ .Values.resources.jobmanager.requests.memory }} > > And Job manager service > > apiVersion: v1 > kind: Service > metadata: > name: {{ template "fullname" . }}-jobmanager > spec: > ports: > - name: rpc > port: 6123 > - name: blob > port: 6124 > - name: ui > port: 8081 > selector: > app: {{ template "fullname" . }} > component: jobmanager > > > > > Boris Lublinsky > FDP Architect > boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com> > https://www.lightbend.com/ <https://www.lightbend.com/> >> On Feb 21, 2019, at 6:13 PM, Boris Lublinsky <boris.lublin...@lightbend.com >> <mailto:boris.lublin...@lightbend.com>> wrote: >> >> >> Boris Lublinsky >> FDP Architect >> boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com> >> https://www.lightbend.com/ <https://www.lightbend.com/> >>> On Feb 21, 2019, at 2:05 AM, Konstantin Knauf <konstan...@ververica.com >>> <mailto:konstan...@ververica.com>> wrote: >>> >>> Hi Boris, >>> >>> the exact command depends on the docker-entrypoint.sh script and the image >>> you are using. For the example contained in the Flink repository it is >>> "task-manager", I think. The important thing is to pass "taskmanager.host" >>> to the Taskmanager process. You can verify by checking the Taskmanager >>> logs. These should contain lines like below: >>> >>> 2019-02-21 08:03:00,004 INFO >>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner [] - Program >>> Arguments: >>> 2019-02-21 08:03:00,008 INFO >>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner [] - >>> -Dtaskmanager.host=10.12.10.173 >>> >>> In the Jobmanager logs you should see that the Taskmanager is registered >>> under the IP above in a line similar to: >>> >>> 2019-02-21 08:03:26,874 INFO >>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - >>> Registering TaskManager with ResourceID a0513ba2c472d2d1efc07626da9c1bda >>> (akka.tcp://flink@10.12.10.173:46531/user/taskmanager_0 >>> <http://flink@10.12.10.173:46531/user/taskmanager_0>) at ResourceManager >>> >>> A service per Taskmanager is not required. The purpose of the config >>> parameter is that the Jobmanager addresses the taskmanagers by IP instead >>> of hostname. >>> >>> Hope this helps! >>> >>> Cheers, >>> >>> Konstantin >>> >>> >>> >>> On Wed, Feb 20, 2019 at 4:37 PM Boris Lublinsky >>> <boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com>> >>> wrote: >>> Also, The suggested workaround does not quite work. >>> 2019-02-20 15:27:43,928 WARN akka.remote.ReliableDeliverySupervisor >>> - Association with remote system >>> [akka.tcp://flink-metrics@flink-taskmanager-1:6170 <>] has failed, address >>> is now gated for [50] ms. Reason: [Association failed with >>> [akka.tcp://flink-metrics@flink-taskmanager-1:6170 <>]] Caused by: >>> [flink-taskmanager-1: No address associated with hostname] >>> 2019-02-20 15:27:48,750 ERROR >>> org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler >>> - Caught exception >>> >>> I think the problem is that its trying to connect to flink-task-manager-1 >>> >>> Using busybody to experiment with nslookup, I can see >>> / # nslookup flink-taskmanager-1.flink-taskmanager >>> Server: 10.0.11.151 >>> Address 1: 10.0.11.151 ip-10-0-11-151.us >>> <http://ip-10-0-11-151.us/>-west-2.compute.internal >>> >>> Name: flink-taskmanager-1.flink-taskmanager >>> Address 1: 10.131.2.136 >>> flink-taskmanager-1.flink-taskmanager.flink.svc.cluster.local >>> / # nslookup flink-taskmanager-1 >>> Server: 10.0.11.151 >>> Address 1: 10.0.11.151 ip-10-0-11-151.us >>> <http://ip-10-0-11-151.us/>-west-2.compute.internal >>> >>> nslookup: can't resolve 'flink-taskmanager-1' >>> / # nslookup flink-taskmanager-0.flink-taskmanager >>> Server: 10.0.11.151 >>> Address 1: 10.0.11.151 ip-10-0-11-151.us >>> <http://ip-10-0-11-151.us/>-west-2.compute.internal >>> >>> Name: flink-taskmanager-0.flink-taskmanager >>> Address 1: 10.131.0.111 >>> flink-taskmanager-0.flink-taskmanager.flink.svc.cluster.local >>> / # nslookup flink-taskmanager-0 >>> Server: 10.0.11.151 >>> Address 1: 10.0.11.151 ip-10-0-11-151.us >>> <http://ip-10-0-11-151.us/>-west-2.compute.internal >>> >>> nslookup: can't resolve 'flink-taskmanager-0' >>> / # >>> >>> So the name should be postfixed with the service name. How do I force it? I >>> suspect I am missing config parameter >>> >>> >>> Boris Lublinsky >>> FDP Architect >>> boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com> >>> https://www.lightbend.com/ <https://www.lightbend.com/> >>>> On Feb 19, 2019, at 4:33 AM, Konstantin Knauf <konstan...@ververica.com >>>> <mailto:konstan...@ververica.com>> wrote: >>>> >>>> Hi Boris, >>>> >>>> the solution is actually simpler than it sounds from the ticket. The only >>>> thing you need to do is to set the "taskmanager.host" to the Pod's IP >>>> address in the Flink configuration. The easiest way to do this is to pass >>>> this config dynamically via a command-line parameter. >>>> >>>> The Deployment spec could looks something like this: >>>> containers: >>>> - name: taskmanager >>>> [...] >>>> args: >>>> - "taskmanager.sh" >>>> - "start-foreground" >>>> - "-Dtaskmanager.host=$(K8S_POD_IP)" >>>> [...] >>>> env: >>>> - name: K8S_POD_IP >>>> valueFrom: >>>> fieldRef: >>>> fieldPath: status.podIP >>>> >>>> Hope this helps and let me know if this works. >>>> >>>> Best, >>>> >>>> Konstantin >>>> >>>> On Sun, Feb 17, 2019 at 9:51 PM Boris Lublinsky >>>> <boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com>> >>>> wrote: >>>> I was looking at this issue >>>> https://issues.apache.org/jira/browse/FLINK-11127 >>>> <https://issues.apache.org/jira/browse/FLINK-11127> >>>> Apparently there is a workaround for it. >>>> Is it possible provide the complete helm chart for it. >>>> Bits and pieces are in the ticket, but it would be nice to see the full >>>> chart >>>> >>>> Boris Lublinsky >>>> FDP Architect >>>> boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com> >>>> https://www.lightbend.com/ <https://www.lightbend.com/> >>>> >>>> >>>> -- >>>> Konstantin Knauf | Solutions Architect >>>> +49 160 91394525 >>>> >>>> <https://www.ververica.com/> >>>> Follow us @VervericaData >>>> -- >>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>>> Conference >>>> Stream Processing | Event Driven | Real Time >>>> -- >>>> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>>> -- >>>> Data Artisans GmbH >>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen >>> >>> >>> >>> -- >>> Konstantin Knauf | Solutions Architect >>> +49 160 91394525 >>> <https://www.ververica.com/> >>> Follow us @VervericaData >>> -- >>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>> Conference >>> Stream Processing | Event Driven | Real Time >>> -- >>> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>> -- >>> Data Artisans GmbH >>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen >> >