Re: "Error while retrieving the leader gateway" when using Kubernetes HA

2023-01-31 Thread Anton Ippolitov via user
Makes sense, thank you!

On Tue, Jan 31, 2023 at 10:48 AM Gyula Fóra  wrote:

> Thanks @Anton Ippolitov 
> At this stage I would highly recommend the native mode if you have the
> liberty to try that.
> I think that has better production characteristics and will work out of
> the box with the autoscaler. (the standalone mode won't)
>
> Gyula
>
> On Tue, Jan 31, 2023 at 10:41 AM Anton Ippolitov <
> anton.ippoli...@datadoghq.com> wrote:
>
>> I am using the Standalone Mode indeed, should've mentioned it right away.
>> This fix looks exactly like what I need, thank you!!
>>
>> On Tue, Jan 31, 2023 at 9:16 AM Gyula Fóra  wrote:
>>
>>> There is also a pending fix for the standalone + k8s HA case :
>>> https://github.com/apache/flink-kubernetes-operator/pull/518
>>>
>>> You could maybe try and review the fix :)
>>>
>>> Gyula
>>>
>>> On Tue, Jan 31, 2023 at 8:36 AM Yang Wang 
>>> wrote:
>>>
 I assume you are using the standalone mode. Right?

 For the native K8s mode, the leader address should be 
 *akka.tcp://flink@JM_POD_IP:6123/user/rpc/dispatcher_1
 *when HA enabled.


 Best,
 Yang

 Anton Ippolitov via user  于2023年1月31日周二 00:21写道:

> This is actually what I'm already doing, I'm only setting 
> high-availability:
> kubernetes myself. The other values are either defaults or set by the
> Operator:
> - jobmanager.rpc.port: 6123 is the default value (docs
> 
> )
> -  high-availability.jobmanager.port: 6123 is set by the Operator here
> 
>
> - jobmanager.rpc.address: SERVICE-NAME-HERE.NAMESPACE-HERE is set by
> the Operator here
> 
>  (the
> actual code which gets executed is here
> 
> )
>
>  Looking at what the Lyft Operator is doing here
> ,
>  I thought
> this would be a common issue but since you've never seen this error 
> before,
> not sure what to do 樂
>
> On Fri, Jan 27, 2023 at 10:59 PM Gyula Fóra 
> wrote:
>
>> We never encountered this problem before but also we don't configure
>> those settings.
>> Can you simply try:
>>
>> high-availability: kubernetes
>>
>> And remove the other configs? I think that can only cause problems
>> and should not achieve anything :)
>>
>> Gyula
>>
>> On Fri, Jan 27, 2023 at 6:44 PM Anton Ippolitov via user <
>> user@flink.apache.org> wrote:
>>
>>> Hi everyone,
>>>
>>> I've been experimenting with Kubernetes HA and the Kubernetes
>>> Operator and ran into the following issue which is happening regularly 
>>> on
>>> TaskManagers with Flink 1.16:
>>>
>>> Error while retrieving the leader gateway. Retrying to connect to 
>>> akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE:6123/user/rpc/dispatcher_1.
>>> org.apache.flink.util.concurrent.FutureUtils$RetryException: Could not 
>>> complete the operation. Number of retries has been exhausted.
>>>
>>> (The whole stacktrace is quite long, I put it in a Github Gist here
>>> .
>>> Note that I put placeholder values for the Kubernetes Service name and 
>>> the
>>> Namespace name)
>>>
>>> The job configuration has the following values which should be
>>> relevant:
>>> high-availability: kubernetes
>>> high-availability.jobmanager.port: 6123
>>> jobmanager.rpc.address: SERVICE-NAME-HERE.NAMESPACE-HERE
>>> jobmanager.rpc.port: 6123
>>>
>>> Looking a bit more into the logs, I can see that the Akka Actor
>>> System is started with an external address pointing to the Kubernetes
>>> Service defined by jobmanager.rpc.address:
>>> Trying to start actor system, external
>>> address SERVICE-NAME-HERE.NAMESPACE-HERE:6123, bind address
>>> 0.0.0.0:6123.
>>> Actor system started at
>>> akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE:6123
>>>
>>> (I believe the external address for the Akka Actor System is set to
>>> 

Re: "Error while retrieving the leader gateway" when using Kubernetes HA

2023-01-31 Thread Gyula Fóra
Thanks @Anton Ippolitov 
At this stage I would highly recommend the native mode if you have the
liberty to try that.
I think that has better production characteristics and will work out of the
box with the autoscaler. (the standalone mode won't)

Gyula

On Tue, Jan 31, 2023 at 10:41 AM Anton Ippolitov <
anton.ippoli...@datadoghq.com> wrote:

> I am using the Standalone Mode indeed, should've mentioned it right away.
> This fix looks exactly like what I need, thank you!!
>
> On Tue, Jan 31, 2023 at 9:16 AM Gyula Fóra  wrote:
>
>> There is also a pending fix for the standalone + k8s HA case :
>> https://github.com/apache/flink-kubernetes-operator/pull/518
>>
>> You could maybe try and review the fix :)
>>
>> Gyula
>>
>> On Tue, Jan 31, 2023 at 8:36 AM Yang Wang 
>> wrote:
>>
>>> I assume you are using the standalone mode. Right?
>>>
>>> For the native K8s mode, the leader address should be 
>>> *akka.tcp://flink@JM_POD_IP:6123/user/rpc/dispatcher_1
>>> *when HA enabled.
>>>
>>>
>>> Best,
>>> Yang
>>>
>>> Anton Ippolitov via user  于2023年1月31日周二 00:21写道:
>>>
 This is actually what I'm already doing, I'm only setting 
 high-availability:
 kubernetes myself. The other values are either defaults or set by the
 Operator:
 - jobmanager.rpc.port: 6123 is the default value (docs
 
 )
 -  high-availability.jobmanager.port: 6123 is set by the Operator here
 

 - jobmanager.rpc.address: SERVICE-NAME-HERE.NAMESPACE-HERE is set by
 the Operator here
 
  (the
 actual code which gets executed is here
 
 )

  Looking at what the Lyft Operator is doing here
 ,
  I thought
 this would be a common issue but since you've never seen this error before,
 not sure what to do 樂

 On Fri, Jan 27, 2023 at 10:59 PM Gyula Fóra 
 wrote:

> We never encountered this problem before but also we don't configure
> those settings.
> Can you simply try:
>
> high-availability: kubernetes
>
> And remove the other configs? I think that can only cause problems and
> should not achieve anything :)
>
> Gyula
>
> On Fri, Jan 27, 2023 at 6:44 PM Anton Ippolitov via user <
> user@flink.apache.org> wrote:
>
>> Hi everyone,
>>
>> I've been experimenting with Kubernetes HA and the Kubernetes
>> Operator and ran into the following issue which is happening regularly on
>> TaskManagers with Flink 1.16:
>>
>> Error while retrieving the leader gateway. Retrying to connect to 
>> akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE:6123/user/rpc/dispatcher_1.
>> org.apache.flink.util.concurrent.FutureUtils$RetryException: Could not 
>> complete the operation. Number of retries has been exhausted.
>>
>> (The whole stacktrace is quite long, I put it in a Github Gist here
>> .
>> Note that I put placeholder values for the Kubernetes Service name and 
>> the
>> Namespace name)
>>
>> The job configuration has the following values which should be
>> relevant:
>> high-availability: kubernetes
>> high-availability.jobmanager.port: 6123
>> jobmanager.rpc.address: SERVICE-NAME-HERE.NAMESPACE-HERE
>> jobmanager.rpc.port: 6123
>>
>> Looking a bit more into the logs, I can see that the Akka Actor
>> System is started with an external address pointing to the Kubernetes
>> Service defined by jobmanager.rpc.address:
>> Trying to start actor system, external
>> address SERVICE-NAME-HERE.NAMESPACE-HERE:6123, bind address
>> 0.0.0.0:6123.
>> Actor system started at
>> akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE:6123
>>
>> (I believe the external address for the Akka Actor System is set to
>> jobmanager.rpc.address from this place
>> 
>> in 

Re: "Error while retrieving the leader gateway" when using Kubernetes HA

2023-01-31 Thread Anton Ippolitov via user
I am using the Standalone Mode indeed, should've mentioned it right away.
This fix looks exactly like what I need, thank you!!

On Tue, Jan 31, 2023 at 9:16 AM Gyula Fóra  wrote:

> There is also a pending fix for the standalone + k8s HA case :
> https://github.com/apache/flink-kubernetes-operator/pull/518
>
> You could maybe try and review the fix :)
>
> Gyula
>
> On Tue, Jan 31, 2023 at 8:36 AM Yang Wang  wrote:
>
>> I assume you are using the standalone mode. Right?
>>
>> For the native K8s mode, the leader address should be 
>> *akka.tcp://flink@JM_POD_IP:6123/user/rpc/dispatcher_1
>> *when HA enabled.
>>
>>
>> Best,
>> Yang
>>
>> Anton Ippolitov via user  于2023年1月31日周二 00:21写道:
>>
>>> This is actually what I'm already doing, I'm only setting high-availability:
>>> kubernetes myself. The other values are either defaults or set by the
>>> Operator:
>>> - jobmanager.rpc.port: 6123 is the default value (docs
>>> 
>>> )
>>> -  high-availability.jobmanager.port: 6123 is set by the Operator here
>>> 
>>>
>>> - jobmanager.rpc.address: SERVICE-NAME-HERE.NAMESPACE-HERE is set by
>>> the Operator here
>>> 
>>>  (the
>>> actual code which gets executed is here
>>> 
>>> )
>>>
>>>  Looking at what the Lyft Operator is doing here
>>> ,
>>>  I thought
>>> this would be a common issue but since you've never seen this error before,
>>> not sure what to do 樂
>>>
>>> On Fri, Jan 27, 2023 at 10:59 PM Gyula Fóra 
>>> wrote:
>>>
 We never encountered this problem before but also we don't configure
 those settings.
 Can you simply try:

 high-availability: kubernetes

 And remove the other configs? I think that can only cause problems and
 should not achieve anything :)

 Gyula

 On Fri, Jan 27, 2023 at 6:44 PM Anton Ippolitov via user <
 user@flink.apache.org> wrote:

> Hi everyone,
>
> I've been experimenting with Kubernetes HA and the Kubernetes Operator
> and ran into the following issue which is happening regularly on
> TaskManagers with Flink 1.16:
>
> Error while retrieving the leader gateway. Retrying to connect to 
> akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE:6123/user/rpc/dispatcher_1.
> org.apache.flink.util.concurrent.FutureUtils$RetryException: Could not 
> complete the operation. Number of retries has been exhausted.
>
> (The whole stacktrace is quite long, I put it in a Github Gist here
> .
> Note that I put placeholder values for the Kubernetes Service name and the
> Namespace name)
>
> The job configuration has the following values which should be
> relevant:
> high-availability: kubernetes
> high-availability.jobmanager.port: 6123
> jobmanager.rpc.address: SERVICE-NAME-HERE.NAMESPACE-HERE
> jobmanager.rpc.port: 6123
>
> Looking a bit more into the logs, I can see that the Akka Actor System
> is started with an external address pointing to the Kubernetes Service
> defined by jobmanager.rpc.address:
> Trying to start actor system, external
> address SERVICE-NAME-HERE.NAMESPACE-HERE:6123, bind address
> 0.0.0.0:6123.
> Actor system started at
> akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE:6123
>
> (I believe the external address for the Akka Actor System is set to
> jobmanager.rpc.address from this place
> 
> in the code but I might be wrong)
>
> I can also see these logs for the Dispatcher RPC endpoint:
> Starting RPC endpoint for
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher at
> akka://flink/user/rpc/dispatcher_1 .
> Successfully wrote leader information
> LeaderInformation{leaderSessionID='8fd2bda3-1775-4b51-bf63-8da385247a18',
> leaderAddress=akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE:6123/user/rpc/dispatcher_1}

Re: "Error while retrieving the leader gateway" when using Kubernetes HA

2023-01-31 Thread Gyula Fóra
There is also a pending fix for the standalone + k8s HA case :
https://github.com/apache/flink-kubernetes-operator/pull/518

You could maybe try and review the fix :)

Gyula

On Tue, Jan 31, 2023 at 8:36 AM Yang Wang  wrote:

> I assume you are using the standalone mode. Right?
>
> For the native K8s mode, the leader address should be 
> *akka.tcp://flink@JM_POD_IP:6123/user/rpc/dispatcher_1
> *when HA enabled.
>
>
> Best,
> Yang
>
> Anton Ippolitov via user  于2023年1月31日周二 00:21写道:
>
>> This is actually what I'm already doing, I'm only setting high-availability:
>> kubernetes myself. The other values are either defaults or set by the
>> Operator:
>> - jobmanager.rpc.port: 6123 is the default value (docs
>> 
>> )
>> -  high-availability.jobmanager.port: 6123 is set by the Operator here
>> 
>>
>> - jobmanager.rpc.address: SERVICE-NAME-HERE.NAMESPACE-HERE is set by the
>> Operator here
>> 
>>  (the
>> actual code which gets executed is here
>> 
>> )
>>
>>  Looking at what the Lyft Operator is doing here
>> ,
>>  I thought
>> this would be a common issue but since you've never seen this error before,
>> not sure what to do 樂
>>
>> On Fri, Jan 27, 2023 at 10:59 PM Gyula Fóra  wrote:
>>
>>> We never encountered this problem before but also we don't configure
>>> those settings.
>>> Can you simply try:
>>>
>>> high-availability: kubernetes
>>>
>>> And remove the other configs? I think that can only cause problems and
>>> should not achieve anything :)
>>>
>>> Gyula
>>>
>>> On Fri, Jan 27, 2023 at 6:44 PM Anton Ippolitov via user <
>>> user@flink.apache.org> wrote:
>>>
 Hi everyone,

 I've been experimenting with Kubernetes HA and the Kubernetes Operator
 and ran into the following issue which is happening regularly on
 TaskManagers with Flink 1.16:

 Error while retrieving the leader gateway. Retrying to connect to 
 akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE:6123/user/rpc/dispatcher_1.
 org.apache.flink.util.concurrent.FutureUtils$RetryException: Could not 
 complete the operation. Number of retries has been exhausted.

 (The whole stacktrace is quite long, I put it in a Github Gist here
 .
 Note that I put placeholder values for the Kubernetes Service name and the
 Namespace name)

 The job configuration has the following values which should be relevant:
 high-availability: kubernetes
 high-availability.jobmanager.port: 6123
 jobmanager.rpc.address: SERVICE-NAME-HERE.NAMESPACE-HERE
 jobmanager.rpc.port: 6123

 Looking a bit more into the logs, I can see that the Akka Actor System
 is started with an external address pointing to the Kubernetes Service
 defined by jobmanager.rpc.address:
 Trying to start actor system, external
 address SERVICE-NAME-HERE.NAMESPACE-HERE:6123, bind address
 0.0.0.0:6123.
 Actor system started at
 akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE:6123

 (I believe the external address for the Akka Actor System is set to
 jobmanager.rpc.address from this place
 
 in the code but I might be wrong)

 I can also see these logs for the Dispatcher RPC endpoint:
 Starting RPC endpoint for
 org.apache.flink.runtime.dispatcher.StandaloneDispatcher at
 akka://flink/user/rpc/dispatcher_1 .
 Successfully wrote leader information
 LeaderInformation{leaderSessionID='8fd2bda3-1775-4b51-bf63-8da385247a18',
 leaderAddress=akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE:6123/user/rpc/dispatcher_1}
 for leader dispatcher into the config map JOB-NAME-HERE-cluster-config-map.

 I confirmed that the HA ConfigMap contains an address which also uses
 the Kubernetes Service defined by jobmanager.rpc.address:
 $ kubectl get cm JOB-NAME-HERE-cluster-config-map -o json | jq -r

Re: "Error while retrieving the leader gateway" when using Kubernetes HA

2023-01-30 Thread Yang Wang
I assume you are using the standalone mode. Right?

For the native K8s mode, the leader address should be
*akka.tcp://flink@JM_POD_IP:6123/user/rpc/dispatcher_1
*when HA enabled.


Best,
Yang

Anton Ippolitov via user  于2023年1月31日周二 00:21写道:

> This is actually what I'm already doing, I'm only setting high-availability:
> kubernetes myself. The other values are either defaults or set by the
> Operator:
> - jobmanager.rpc.port: 6123 is the default value (docs
> 
> )
> -  high-availability.jobmanager.port: 6123 is set by the Operator here
> 
>
> - jobmanager.rpc.address: SERVICE-NAME-HERE.NAMESPACE-HERE is set by the
> Operator here
> 
>  (the
> actual code which gets executed is here
> 
> )
>
>  Looking at what the Lyft Operator is doing here
> ,
>  I thought
> this would be a common issue but since you've never seen this error before,
> not sure what to do 樂
>
> On Fri, Jan 27, 2023 at 10:59 PM Gyula Fóra  wrote:
>
>> We never encountered this problem before but also we don't configure
>> those settings.
>> Can you simply try:
>>
>> high-availability: kubernetes
>>
>> And remove the other configs? I think that can only cause problems and
>> should not achieve anything :)
>>
>> Gyula
>>
>> On Fri, Jan 27, 2023 at 6:44 PM Anton Ippolitov via user <
>> user@flink.apache.org> wrote:
>>
>>> Hi everyone,
>>>
>>> I've been experimenting with Kubernetes HA and the Kubernetes Operator
>>> and ran into the following issue which is happening regularly on
>>> TaskManagers with Flink 1.16:
>>>
>>> Error while retrieving the leader gateway. Retrying to connect to 
>>> akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE:6123/user/rpc/dispatcher_1.
>>> org.apache.flink.util.concurrent.FutureUtils$RetryException: Could not 
>>> complete the operation. Number of retries has been exhausted.
>>>
>>> (The whole stacktrace is quite long, I put it in a Github Gist here
>>> .
>>> Note that I put placeholder values for the Kubernetes Service name and the
>>> Namespace name)
>>>
>>> The job configuration has the following values which should be relevant:
>>> high-availability: kubernetes
>>> high-availability.jobmanager.port: 6123
>>> jobmanager.rpc.address: SERVICE-NAME-HERE.NAMESPACE-HERE
>>> jobmanager.rpc.port: 6123
>>>
>>> Looking a bit more into the logs, I can see that the Akka Actor System
>>> is started with an external address pointing to the Kubernetes Service
>>> defined by jobmanager.rpc.address:
>>> Trying to start actor system, external
>>> address SERVICE-NAME-HERE.NAMESPACE-HERE:6123, bind address 0.0.0.0:6123
>>> .
>>> Actor system started at akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE
>>> :6123
>>>
>>> (I believe the external address for the Akka Actor System is set to
>>> jobmanager.rpc.address from this place
>>> 
>>> in the code but I might be wrong)
>>>
>>> I can also see these logs for the Dispatcher RPC endpoint:
>>> Starting RPC endpoint for
>>> org.apache.flink.runtime.dispatcher.StandaloneDispatcher at
>>> akka://flink/user/rpc/dispatcher_1 .
>>> Successfully wrote leader information
>>> LeaderInformation{leaderSessionID='8fd2bda3-1775-4b51-bf63-8da385247a18',
>>> leaderAddress=akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE:6123/user/rpc/dispatcher_1}
>>> for leader dispatcher into the config map JOB-NAME-HERE-cluster-config-map.
>>>
>>> I confirmed that the HA ConfigMap contains an address which also uses
>>> the Kubernetes Service defined by jobmanager.rpc.address:
>>> $ kubectl get cm JOB-NAME-HERE-cluster-config-map -o json | jq -r
>>> '.data["org.apache.flink.k8s.leader.dispatcher"]'
>>>
>>> ce33b6d4-a55f-475c-9b6e-b21d25c8e6b5,akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE
>>> :6123/user/rpc/dispatcher_1
>>>
>>> When looking at the code of the Operator and Flink itself, I can see
>>> that jobmanager.rpc.address is set automatically by the
>>> 

Re: "Error while retrieving the leader gateway" when using Kubernetes HA

2023-01-30 Thread Anton Ippolitov via user
This is actually what I'm already doing, I'm only setting high-availability:
kubernetes myself. The other values are either defaults or set by the
Operator:
- jobmanager.rpc.port: 6123 is the default value (docs

)
-  high-availability.jobmanager.port: 6123 is set by the Operator here


- jobmanager.rpc.address: SERVICE-NAME-HERE.NAMESPACE-HERE is set by the
Operator here

(the
actual code which gets executed is here

)

 Looking at what the Lyft Operator is doing here
,
I thought
this would be a common issue but since you've never seen this error before,
not sure what to do 樂

On Fri, Jan 27, 2023 at 10:59 PM Gyula Fóra  wrote:

> We never encountered this problem before but also we don't configure those
> settings.
> Can you simply try:
>
> high-availability: kubernetes
>
> And remove the other configs? I think that can only cause problems and
> should not achieve anything :)
>
> Gyula
>
> On Fri, Jan 27, 2023 at 6:44 PM Anton Ippolitov via user <
> user@flink.apache.org> wrote:
>
>> Hi everyone,
>>
>> I've been experimenting with Kubernetes HA and the Kubernetes Operator
>> and ran into the following issue which is happening regularly on
>> TaskManagers with Flink 1.16:
>>
>> Error while retrieving the leader gateway. Retrying to connect to 
>> akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE:6123/user/rpc/dispatcher_1.
>> org.apache.flink.util.concurrent.FutureUtils$RetryException: Could not 
>> complete the operation. Number of retries has been exhausted.
>>
>> (The whole stacktrace is quite long, I put it in a Github Gist here
>> .
>> Note that I put placeholder values for the Kubernetes Service name and the
>> Namespace name)
>>
>> The job configuration has the following values which should be relevant:
>> high-availability: kubernetes
>> high-availability.jobmanager.port: 6123
>> jobmanager.rpc.address: SERVICE-NAME-HERE.NAMESPACE-HERE
>> jobmanager.rpc.port: 6123
>>
>> Looking a bit more into the logs, I can see that the Akka Actor System is
>> started with an external address pointing to the Kubernetes Service defined
>> by jobmanager.rpc.address:
>> Trying to start actor system, external
>> address SERVICE-NAME-HERE.NAMESPACE-HERE:6123, bind address 0.0.0.0:6123.
>> Actor system started at akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE
>> :6123
>>
>> (I believe the external address for the Akka Actor System is set to
>> jobmanager.rpc.address from this place
>> 
>> in the code but I might be wrong)
>>
>> I can also see these logs for the Dispatcher RPC endpoint:
>> Starting RPC endpoint for
>> org.apache.flink.runtime.dispatcher.StandaloneDispatcher at
>> akka://flink/user/rpc/dispatcher_1 .
>> Successfully wrote leader information
>> LeaderInformation{leaderSessionID='8fd2bda3-1775-4b51-bf63-8da385247a18',
>> leaderAddress=akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE:6123/user/rpc/dispatcher_1}
>> for leader dispatcher into the config map JOB-NAME-HERE-cluster-config-map.
>>
>> I confirmed that the HA ConfigMap contains an address which also uses the
>> Kubernetes Service defined by jobmanager.rpc.address:
>> $ kubectl get cm JOB-NAME-HERE-cluster-config-map -o json | jq -r
>> '.data["org.apache.flink.k8s.leader.dispatcher"]'
>>
>> ce33b6d4-a55f-475c-9b6e-b21d25c8e6b5,akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE
>> :6123/user/rpc/dispatcher_1
>>
>> When looking at the code of the Operator and Flink itself, I can see
>> that jobmanager.rpc.address is set automatically by the
>> InternalServiceDecorator
>> 
>>  and
>> it points to the Kubernetes Service.
>> However, the comment
>> 

Re: "Error while retrieving the leader gateway" when using Kubernetes HA

2023-01-27 Thread Gyula Fóra
We never encountered this problem before but also we don't configure those
settings.
Can you simply try:

high-availability: kubernetes

And remove the other configs? I think that can only cause problems and
should not achieve anything :)

Gyula

On Fri, Jan 27, 2023 at 6:44 PM Anton Ippolitov via user <
user@flink.apache.org> wrote:

> Hi everyone,
>
> I've been experimenting with Kubernetes HA and the Kubernetes Operator and
> ran into the following issue which is happening regularly on TaskManagers
> with Flink 1.16:
>
> Error while retrieving the leader gateway. Retrying to connect to 
> akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE:6123/user/rpc/dispatcher_1.
> org.apache.flink.util.concurrent.FutureUtils$RetryException: Could not 
> complete the operation. Number of retries has been exhausted.
>
> (The whole stacktrace is quite long, I put it in a Github Gist here
> . Note
> that I put placeholder values for the Kubernetes Service name and the
> Namespace name)
>
> The job configuration has the following values which should be relevant:
> high-availability: kubernetes
> high-availability.jobmanager.port: 6123
> jobmanager.rpc.address: SERVICE-NAME-HERE.NAMESPACE-HERE
> jobmanager.rpc.port: 6123
>
> Looking a bit more into the logs, I can see that the Akka Actor System is
> started with an external address pointing to the Kubernetes Service defined
> by jobmanager.rpc.address:
> Trying to start actor system, external
> address SERVICE-NAME-HERE.NAMESPACE-HERE:6123, bind address 0.0.0.0:6123.
> Actor system started at akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE
> :6123
>
> (I believe the external address for the Akka Actor System is set to
> jobmanager.rpc.address from this place
> 
> in the code but I might be wrong)
>
> I can also see these logs for the Dispatcher RPC endpoint:
> Starting RPC endpoint for
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher at
> akka://flink/user/rpc/dispatcher_1 .
> Successfully wrote leader information
> LeaderInformation{leaderSessionID='8fd2bda3-1775-4b51-bf63-8da385247a18',
> leaderAddress=akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE:6123/user/rpc/dispatcher_1}
> for leader dispatcher into the config map JOB-NAME-HERE-cluster-config-map.
>
> I confirmed that the HA ConfigMap contains an address which also uses the
> Kubernetes Service defined by jobmanager.rpc.address:
> $ kubectl get cm JOB-NAME-HERE-cluster-config-map -o json | jq -r
> '.data["org.apache.flink.k8s.leader.dispatcher"]'
>
> ce33b6d4-a55f-475c-9b6e-b21d25c8e6b5,akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE
> :6123/user/rpc/dispatcher_1
>
> When looking at the code of the Operator and Flink itself, I can see
> that jobmanager.rpc.address is set automatically by the
> InternalServiceDecorator
> 
>  and
> it points to the Kubernetes Service.
> However, the comment
> 
> above clearly says that "only the non-HA scenario relies on this Service
> for internal communication, since in the HA mode, the TaskManager(s)
> directly connects to the JobManager via IP address." According to the docs
> ,
> jobmanager.rpc.address "is ignored on setups with high-availability where
> the leader election mechanism is used to discover this automatically."
>
> This is not what I'm observing as it seems that despite enabling HA, the
> TaskManagers don't use IP addresses but still use this Kubernetes Service
> for JM communication.
>
> Moreover, I've used the Lyft Kubernetes Operator before and it has these
> interesting lines in the code:
> https://github.com/lyft/flinkk8soperator/blob/435640258b72d9c9efdbadc3579411224f510a72/pkg/controller/flink/container_utils.go#L212-L216
> It explicitly sets jobmanager.rpc.address to the host IPs.
>
> Am I misconfiguring or misunderstanding something? Is there any way to fix
> these errors?
>
> Thanks!
> Anton
>


"Error while retrieving the leader gateway" when using Kubernetes HA

2023-01-27 Thread Anton Ippolitov via user
Hi everyone,

I've been experimenting with Kubernetes HA and the Kubernetes Operator and
ran into the following issue which is happening regularly on TaskManagers
with Flink 1.16:

Error while retrieving the leader gateway. Retrying to connect to
akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE:6123/user/rpc/dispatcher_1.
org.apache.flink.util.concurrent.FutureUtils$RetryException: Could not
complete the operation. Number of retries has been exhausted.

(The whole stacktrace is quite long, I put it in a Github Gist here
. Note
that I put placeholder values for the Kubernetes Service name and the
Namespace name)

The job configuration has the following values which should be relevant:
high-availability: kubernetes
high-availability.jobmanager.port: 6123
jobmanager.rpc.address: SERVICE-NAME-HERE.NAMESPACE-HERE
jobmanager.rpc.port: 6123

Looking a bit more into the logs, I can see that the Akka Actor System is
started with an external address pointing to the Kubernetes Service defined
by jobmanager.rpc.address:
Trying to start actor system, external
address SERVICE-NAME-HERE.NAMESPACE-HERE:6123, bind address 0.0.0.0:6123.
Actor system started at akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE
:6123

(I believe the external address for the Akka Actor System is set to
jobmanager.rpc.address from this place

in the code but I might be wrong)

I can also see these logs for the Dispatcher RPC endpoint:
Starting RPC endpoint for
org.apache.flink.runtime.dispatcher.StandaloneDispatcher at
akka://flink/user/rpc/dispatcher_1 .
Successfully wrote leader information
LeaderInformation{leaderSessionID='8fd2bda3-1775-4b51-bf63-8da385247a18',
leaderAddress=akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE:6123/user/rpc/dispatcher_1}
for leader dispatcher into the config map JOB-NAME-HERE-cluster-config-map.

I confirmed that the HA ConfigMap contains an address which also uses the
Kubernetes Service defined by jobmanager.rpc.address:
$ kubectl get cm JOB-NAME-HERE-cluster-config-map -o json | jq -r
'.data["org.apache.flink.k8s.leader.dispatcher"]'
ce33b6d4-a55f-475c-9b6e-b21d25c8e6b5,akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE
:6123/user/rpc/dispatcher_1

When looking at the code of the Operator and Flink itself, I can see
that jobmanager.rpc.address is set automatically by the
InternalServiceDecorator

and
it points to the Kubernetes Service.
However, the comment

above clearly says that "only the non-HA scenario relies on this Service
for internal communication, since in the HA mode, the TaskManager(s)
directly connects to the JobManager via IP address." According to the docs
,
jobmanager.rpc.address "is ignored on setups with high-availability where
the leader election mechanism is used to discover this automatically."

This is not what I'm observing as it seems that despite enabling HA, the
TaskManagers don't use IP addresses but still use this Kubernetes Service
for JM communication.

Moreover, I've used the Lyft Kubernetes Operator before and it has these
interesting lines in the code:
https://github.com/lyft/flinkk8soperator/blob/435640258b72d9c9efdbadc3579411224f510a72/pkg/controller/flink/container_utils.go#L212-L216
It explicitly sets jobmanager.rpc.address to the host IPs.

Am I misconfiguring or misunderstanding something? Is there any way to fix
these errors?

Thanks!
Anton