We never encountered this problem before but also we don't configure those
settings.
Can you simply try:

high-availability: kubernetes

And remove the other configs? I think that can only cause problems and
should not achieve anything :)

Gyula

On Fri, Jan 27, 2023 at 6:44 PM Anton Ippolitov via user <
user@flink.apache.org> wrote:

> Hi everyone,
>
> I've been experimenting with Kubernetes HA and the Kubernetes Operator and
> ran into the following issue which is happening regularly on TaskManagers
> with Flink 1.16:
>
> Error while retrieving the leader gateway. Retrying to connect to 
> akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE:6123/user/rpc/dispatcher_1.
> org.apache.flink.util.concurrent.FutureUtils$RetryException: Could not 
> complete the operation. Number of retries has been exhausted.
>
> (The whole stacktrace is quite long, I put it in a Github Gist here
> <https://gist.github.com/antonipp/41b4cb732522a91799e0f57ea96805a3>. Note
> that I put placeholder values for the Kubernetes Service name and the
> Namespace name)
>
> The job configuration has the following values which should be relevant:
> high-availability: kubernetes
> high-availability.jobmanager.port: 6123
> jobmanager.rpc.address: SERVICE-NAME-HERE.NAMESPACE-HERE
> jobmanager.rpc.port: 6123
>
> Looking a bit more into the logs, I can see that the Akka Actor System is
> started with an external address pointing to the Kubernetes Service defined
> by jobmanager.rpc.address:
> Trying to start actor system, external
> address SERVICE-NAME-HERE.NAMESPACE-HERE:6123, bind address 0.0.0.0:6123.
> Actor system started at akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE
> :6123
>
> (I believe the external address for the Akka Actor System is set to
> jobmanager.rpc.address from this place
> <https://github.com/apache/flink/blob/0141f13ca801d5db45435d101a9c3ef83889bbc0/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java#L367>
> in the code but I might be wrong)
>
> I can also see these logs for the Dispatcher RPC endpoint:
> Starting RPC endpoint for
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher at
> akka://flink/user/rpc/dispatcher_1 .
> Successfully wrote leader information
> LeaderInformation{leaderSessionID='8fd2bda3-1775-4b51-bf63-8da385247a18',
> leaderAddress=akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE:6123/user/rpc/dispatcher_1}
> for leader dispatcher into the config map JOB-NAME-HERE-cluster-config-map.
>
> I confirmed that the HA ConfigMap contains an address which also uses the
> Kubernetes Service defined by jobmanager.rpc.address:
> $ kubectl get cm JOB-NAME-HERE-cluster-config-map -o json | jq -r
> '.data["org.apache.flink.k8s.leader.dispatcher"]'
>
> ce33b6d4-a55f-475c-9b6e-b21d25c8e6b5,akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE
> :6123/user/rpc/dispatcher_1
>
> When looking at the code of the Operator and Flink itself, I can see
> that jobmanager.rpc.address is set automatically by the
> InternalServiceDecorator
> <https://github.com/apache/flink/blob/1ae09ae6942eaaf5a6197c68fc12ee4e9fc1a105/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InternalServiceDecorator.java#L60-L66>
>  and
> it points to the Kubernetes Service.
> However, the comment
> <https://github.com/apache/flink/blob/1ae09ae6942eaaf5a6197c68fc12ee4e9fc1a105/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InternalServiceDecorator.java#L34-L38>
> above clearly says that "only the non-HA scenario relies on this Service
> for internal communication, since in the HA mode, the TaskManager(s)
> directly connects to the JobManager via IP address." According to the docs
> <https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/config/#basic-setup>,
> jobmanager.rpc.address "is ignored on setups with high-availability where
> the leader election mechanism is used to discover this automatically."
>
> This is not what I'm observing as it seems that despite enabling HA, the
> TaskManagers don't use IP addresses but still use this Kubernetes Service
> for JM communication.
>
> Moreover, I've used the Lyft Kubernetes Operator before and it has these
> interesting lines in the code:
> https://github.com/lyft/flinkk8soperator/blob/435640258b72d9c9efdbadc3579411224f510a72/pkg/controller/flink/container_utils.go#L212-L216
> It explicitly sets jobmanager.rpc.address to the host IPs.
>
> Am I misconfiguring or misunderstanding something? Is there any way to fix
> these errors?
>
> Thanks!
> Anton
>

Reply via email to