And it works now
The problem was that I was setting jobmaneger.rest.address 
 jobmanager.rpc.address

that was creating actor system on the local host

Although I am still getting the below messages in the job manager periodically, 
but they seem to be harmless

ERROR 
org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler  - 
Caught exception
java.io.IOException: Connection reset by peer
        at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
        at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
        at sun.nio.ch.IOUtil.read(IOUtil.java:192)
        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
        at 
org.apache.flink.shaded.netty4.io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288)
        at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1108)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:345)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
        at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)
        at java.lang.Thread.run(Thread.java:748)
2019-02-22 16:49:22,016 WARN  
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Unhandled 
exception
java.io.IOException: Connection reset by peer
        at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
        at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
        at sun.nio.ch.IOUtil.read(IOUtil.java:192)
        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
        at 
org.apache.flink.shaded.netty4.io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288)
        at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1108)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:345)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
        at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)
        at java.lang.Thread.run(Thread.java:748)





Boris Lublinsky
FDP Architect
boris.lublin...@lightbend.com
https://www.lightbend.com/

> On Feb 22, 2019, at 10:35 AM, Andrey Zagrebin <and...@ververica.com> wrote:
> 
> cc alek...@ververica.com <mailto:alek...@ververica.com>
> On Fri, Feb 22, 2019 at 1:28 AM Boris Lublinsky 
> <boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com>> wrote:
> Adding metric-query port makes it a bit better, but there is still an error
> 
> 
> 019-02-22 00:03:56,173 INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor            - Could not 
> resolve ResourceManager address 
> akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/user/resourcemanager 
> <>, retrying in 10000 ms: Ask timed out on 
> [ActorSelection[Anchor(akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/
>  <>), Path(/user/resourcemanager)]] after [10000 ms]. Sender[null] sent 
> message of type "akka.actor.Identify"..
> 2019-02-22 00:04:16,213 INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor            - Could not 
> resolve ResourceManager address 
> akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/user/resourcemanager 
> <>, retrying in 10000 ms: Ask timed out on 
> [ActorSelection[Anchor(akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/
>  <>), Path(/user/resourcemanager)]] after [10000 ms]. Sender[null] sent 
> message of type "akka.actor.Identify"..
> 2019-02-22 00:04:36,253 INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor            - Could not 
> resolve ResourceManager address 
> akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/user/resourcemanager 
> <>, retrying in 10000 ms: Ask timed out on 
> [ActorSelection[Anchor(akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/
>  <>), Path(/user/resourcemanager)]] after [10000 ms]. Sender[null] sent 
> message of type "akka.actor.Identify"..
> 2019-02-22 00:04:56,293 INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor            - Could not 
> resolve ResourceManager address 
> akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/user/resourcemanager 
> <>, retrying in 10000 ms: Ask timed out on 
> [ActorSelection[Anchor(akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/
>  <>), Path(/user/resourcemanager)]] after [10000 ms]. Sender[null] sent 
> message of type "akka.actor.Identify”..
> 
> In the task manager and
> 
> 2019-02-21 23:59:46,479 ERROR akka.remote.EndpointWriter                      
>               - dropping message [class akka.actor.ActorSelectionMessage] for 
> non-local recipient 
> [Actor[akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/ <>]] arriving 
> at [akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123 <>] inbound 
> addresses are [akka.tcp://flink@127.0.0.1:6123 <>]
> 2019-02-21 23:59:57,808 ERROR akka.remote.EndpointWriter                      
>               - dropping message [class akka.actor.ActorSelectionMessage] for 
> non-local recipient 
> [Actor[akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/ <>]] arriving 
> at [akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123 <>] inbound 
> addresses are [akka.tcp://flink@127.0.0.1:6123 <>]
> 2019-02-22 00:00:06,519 ERROR akka.remote.EndpointWriter                      
>               - dropping message [class akka.actor.ActorSelectionMessage] for 
> non-local recipient 
> [Actor[akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/ <>]] arriving 
> at [akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123 <>] inbound 
> addresses are [akka.tcp://flink@127.0.0.1:6123 <>]
> 2019-02-22 00:00:17,849 ERROR akka.remote.EndpointWriter                      
>               - dropping message [class akka.actor.ActorSelectionMessage] for 
> non-local recipient 
> [Actor[akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/ <>]] arriving 
> at [akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123 <>] inbound 
> addresses are [akka.tcp://flink@127.0.0.1:6123 <>]
> 2019-02-22 00:00:26,558 ERROR akka.remote.EndpointWriter                      
>               - dropping message [class akka.actor.ActorSelectionMessage] for 
> non-local recipient 
> [Actor[akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/ <>]] arriving 
> at [akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123 <>] inbound 
> addresses are [akka.tcp://flink@127.0.0.1:6123 <>]
> 2019-02-22 00:00:37,888 ERROR akka.remote.EndpointWriter                      
>               - dropping message [class akka.actor.ActorSelectionMessage] for 
> non-local recipient 
> [Actor[akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/ <>]] arriving 
> at [akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123 <>] inbound 
> addresses are [akka.tcp://flink@127.0.0.1:6123 <>]
> 
> I the job manager
> 
> Port 6123 is opened in both Job Manager deployment
> 
> apiVersion: extensions/v1beta1
> kind: Deployment
> metadata:
>   name: {{ template "fullname" . }}-jobmanager
> spec:
>   replicas: 1
>   template:
>     metadata:
>       annotations:
>         prometheus.io/scrape: <http://prometheus.io/scrape:> 'true'
>         prometheus.io/port: <http://prometheus.io/port:> '9249'
>       labels:
>         server: flink
>         app: {{ template "fullname" . }}
>         component: jobmanager
>     spec:
>       containers:
>       - name: jobmanager
>         image: {{ .Values.image }}:{{ .Values.imageTag }}
>         imagePullPolicy: {{ .Values.imagePullPolicy }}
>         args:
>         - jobmanager
>         ports:
>         - containerPort: 6123
>           name: rpc
>         - containerPort: 6124
>           name: blob
>         - containerPort: 8081
>           name: ui
>         env:
>         - name: CONTAINER_METRIC_PORT
>           value: '{{ .Values.flink.metric_query_port }}'
>         - name: JOB_MANAGER_RPC_ADDRESS
>           value : {{ template "fullname" . }}-jobmanager
>         livenessProbe:
>           httpGet:
>             path: /overview
>             port: 8081
>           initialDelaySeconds: 30
>           periodSeconds: 10
>         resources:
>           limits:
>             cpu: {{ .Values.resources.jobmanager.limits.cpu }}
>             memory: {{ .Values.resources.jobmanager.limits.memory }}
>           requests:
>             cpu: {{ .Values.resources.jobmanager.requests.cpu }}
>             memory: {{ .Values.resources.jobmanager.requests.memory }}
> 
> And Job manager service
> 
> apiVersion: v1
> kind: Service
> metadata:
>   name: {{ template "fullname" . }}-jobmanager
> spec:
>   ports:
>   - name: rpc
>     port: 6123
>   - name: blob
>     port: 6124
>   - name: ui
>     port: 8081
>   selector:
>     app: {{ template "fullname" . }}
>     component: jobmanager
> 
> 
> 
> 
> Boris Lublinsky
> FDP Architect
> boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com>
> https://www.lightbend.com/ <https://www.lightbend.com/>
>> On Feb 21, 2019, at 6:13 PM, Boris Lublinsky <boris.lublin...@lightbend.com 
>> <mailto:boris.lublin...@lightbend.com>> wrote:
>> 
>> 
>> Boris Lublinsky
>> FDP Architect
>> boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com>
>> https://www.lightbend.com/ <https://www.lightbend.com/>
>>> On Feb 21, 2019, at 2:05 AM, Konstantin Knauf <konstan...@ververica.com 
>>> <mailto:konstan...@ververica.com>> wrote:
>>> 
>>> Hi Boris, 
>>> 
>>> the exact command depends on the docker-entrypoint.sh script and the image 
>>> you are using. For the example contained in the Flink repository it is 
>>> "task-manager", I think. The important thing is to pass "taskmanager.host" 
>>> to the Taskmanager process. You can verify by checking the Taskmanager 
>>> logs. These should contain lines like below:
>>> 
>>> 2019-02-21 08:03:00,004 INFO  
>>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -  Program 
>>> Arguments:
>>> 2019-02-21 08:03:00,008 INFO  
>>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -     
>>> -Dtaskmanager.host=10.12.10.173
>>> 
>>> In the Jobmanager logs you should see that the Taskmanager is registered 
>>> under the IP above in a line similar to:
>>> 
>>> 2019-02-21 08:03:26,874 INFO  
>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - 
>>> Registering TaskManager with ResourceID a0513ba2c472d2d1efc07626da9c1bda 
>>> (akka.tcp://flink@10.12.10.173:46531/user/taskmanager_0 
>>> <http://flink@10.12.10.173:46531/user/taskmanager_0>) at ResourceManager
>>> 
>>> A service per Taskmanager is not required. The purpose of the config 
>>> parameter is that the Jobmanager addresses the taskmanagers by IP instead 
>>> of hostname.
>>> 
>>> Hope this helps!
>>> 
>>> Cheers, 
>>> 
>>> Konstantin
>>> 
>>> 
>>> 
>>> On Wed, Feb 20, 2019 at 4:37 PM Boris Lublinsky 
>>> <boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com>> 
>>> wrote:
>>> Also, The suggested workaround does not quite work.
>>> 2019-02-20 15:27:43,928 WARN  akka.remote.ReliableDeliverySupervisor        
>>>                 - Association with remote system 
>>> [akka.tcp://flink-metrics@flink-taskmanager-1:6170 <>] has failed, address 
>>> is now gated for [50] ms. Reason: [Association failed with 
>>> [akka.tcp://flink-metrics@flink-taskmanager-1:6170 <>]] Caused by: 
>>> [flink-taskmanager-1: No address associated with hostname]
>>> 2019-02-20 15:27:48,750 ERROR 
>>> org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler  
>>> - Caught exception
>>> 
>>> I think the problem is that its trying to connect to flink-task-manager-1
>>> 
>>> Using busybody to experiment with nslookup, I can see
>>> / # nslookup flink-taskmanager-1.flink-taskmanager
>>> Server:    10.0.11.151
>>> Address 1: 10.0.11.151 ip-10-0-11-151.us 
>>> <http://ip-10-0-11-151.us/>-west-2.compute.internal
>>> 
>>> Name:      flink-taskmanager-1.flink-taskmanager
>>> Address 1: 10.131.2.136 
>>> flink-taskmanager-1.flink-taskmanager.flink.svc.cluster.local
>>> / # nslookup flink-taskmanager-1
>>> Server:    10.0.11.151
>>> Address 1: 10.0.11.151 ip-10-0-11-151.us 
>>> <http://ip-10-0-11-151.us/>-west-2.compute.internal
>>> 
>>> nslookup: can't resolve 'flink-taskmanager-1'
>>> / # nslookup flink-taskmanager-0.flink-taskmanager
>>> Server:    10.0.11.151
>>> Address 1: 10.0.11.151 ip-10-0-11-151.us 
>>> <http://ip-10-0-11-151.us/>-west-2.compute.internal
>>> 
>>> Name:      flink-taskmanager-0.flink-taskmanager
>>> Address 1: 10.131.0.111 
>>> flink-taskmanager-0.flink-taskmanager.flink.svc.cluster.local
>>> / # nslookup flink-taskmanager-0
>>> Server:    10.0.11.151
>>> Address 1: 10.0.11.151 ip-10-0-11-151.us 
>>> <http://ip-10-0-11-151.us/>-west-2.compute.internal
>>> 
>>> nslookup: can't resolve 'flink-taskmanager-0'
>>> / # 
>>> 
>>> So the name should be postfixed with the service name. How do I force it? I 
>>> suspect I am missing config parameter
>>> 
>>>  
>>> Boris Lublinsky
>>> FDP Architect
>>> boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com>
>>> https://www.lightbend.com/ <https://www.lightbend.com/>
>>>> On Feb 19, 2019, at 4:33 AM, Konstantin Knauf <konstan...@ververica.com 
>>>> <mailto:konstan...@ververica.com>> wrote:
>>>> 
>>>> Hi Boris, 
>>>> 
>>>> the solution is actually simpler than it sounds from the ticket. The only 
>>>> thing you need to do is to set the "taskmanager.host" to the Pod's IP 
>>>> address in the Flink configuration. The easiest way to do this is to pass 
>>>> this config dynamically via a command-line parameter. 
>>>> 
>>>> The Deployment spec could looks something like this:
>>>> containers:
>>>> - name: taskmanager
>>>>   [...]
>>>>   args:
>>>>   - "taskmanager.sh"
>>>>   - "start-foreground"
>>>>   - "-Dtaskmanager.host=$(K8S_POD_IP)"
>>>>   [...]
>>>>   env:
>>>>   - name: K8S_POD_IP
>>>>     valueFrom:
>>>>       fieldRef:
>>>>         fieldPath: status.podIP
>>>> 
>>>> Hope this helps and let me know if this works. 
>>>> 
>>>> Best, 
>>>> 
>>>> Konstantin
>>>> 
>>>> On Sun, Feb 17, 2019 at 9:51 PM Boris Lublinsky 
>>>> <boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com>> 
>>>> wrote:
>>>> I was looking at this issue 
>>>> https://issues.apache.org/jira/browse/FLINK-11127 
>>>> <https://issues.apache.org/jira/browse/FLINK-11127>
>>>> Apparently there is a workaround for it.
>>>> Is it possible provide the complete helm chart for it.
>>>> Bits and pieces are in the ticket, but it would be nice to see the full 
>>>> chart
>>>> 
>>>> Boris Lublinsky
>>>> FDP Architect
>>>> boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com>
>>>> https://www.lightbend.com/ <https://www.lightbend.com/>
>>>> 
>>>> 
>>>> -- 
>>>> Konstantin Knauf | Solutions Architect
>>>> +49 160 91394525
>>>> 
>>>>  <https://www.ververica.com/>
>>>> Follow us @VervericaData
>>>> --
>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink 
>>>> Conference
>>>> Stream Processing | Event Driven | Real Time
>>>> --
>>>> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>> --
>>>> Data Artisans GmbH
>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen    
>>> 
>>> 
>>> 
>>> -- 
>>> Konstantin Knauf | Solutions Architect
>>> +49 160 91394525
>>>  <https://www.ververica.com/>
>>> Follow us @VervericaData
>>> --
>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink 
>>> Conference
>>> Stream Processing | Event Driven | Real Time
>>> --
>>> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>> --
>>> Data Artisans GmbH
>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen    
>> 
> 

Reply via email to