Hi Omer, could you share a bit more of the logs with us? I would be interested in what has happened before "Stopping DefaultLeaderRetrievalService" is logged. One problem you might run into is FLINK-20417. This problem should be fixed with Flink 1.12.2.
[1] https://issues.apache.org/jira/browse/FLINK-20417 Cheers, Till On Thu, Feb 18, 2021 at 2:54 PM Omer Ozery <omeroz...@gmail.com> wrote: > Hey guys > It looks like the flink cluster is deployed successfully, it starts with > no errors. > but when we try to deploy the jobs, some jobs are starting and some can't > find available slots for some reason, even when we have free ones. > happens with different jobs every time.. > below are the exceptions thrown by the components. > and I also attached an image showing the taskamangers and the free slots. > > *jobManager throws this error:* > 2021-02-17 11:19:41,956 INFO > org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] > - Stopping DefaultLeaderRetrievalService. > 2021-02-17 11:19:41,956 INFO > org.apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriver > [] - Stopping > KubernetesLeaderRetrievalDriver{configMapName='flink-cluster-1796779318657734fcbc261f8d01d250-jobmanager-leader'}. > 2021-02-17 11:19:41,956 INFO > org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapWatcher > [] - The watcher is closing. > 2021-02-17 11:19:41,956 INFO > org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - > Registration of job manager bec569547a4ab5be4e2068a28164415a > @akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_19 failed. > 2021-02-17 11:19:41,956 INFO > org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - > Registration of job manager bec569547a4ab5be4e2068a28164415a > @akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_19 failed. > 2021-02-17 11:19:41,956 INFO > org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - > Registration of job manager bec569547a4ab5be4e2068a28164415a > @akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_19 failed. > 2021-02-17 11:19:41,956 INFO > org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - > Registration of job manager bec569547a4ab5be4e2068a28164415a > @akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_19 failed. > 2021-02-17 11:19:41,956 INFO org.apache.flink.runtime.jobmaster.JobMaster > [] - Registration at ResourceManager was declined: > java.lang.Exception: Job leader id service has been stopped. > > > java.util.concurrent.CompletionException: > org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: > Slot request bulk is not fulfillable! Could not allocate the required slot > within slot request timeout > at > java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331) > ~[?:?] > at > java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346) > ~[?:?] > at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:632) > ~[?:?] > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > ~[?:?] > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) > ~[?:?] > at > org.apache.flink.runtime.scheduler.SharedSlot.cancelLogicalSlotRequest(SharedSlot.java:223) > ~[flink-dist_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.runtime.scheduler.SlotSharingExecutionSlotAllocator.cancelLogicalSlotRequest(SlotSharingExecutionSlotAllocator.java:168) > ~[flink-dist_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.runtime.scheduler.SharingPhysicalSlotRequestBulk.cancel(SharingPhysicalSlotRequestBulk.java:86) > ~[flink-dist_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkWithTimestamp.cancel(PhysicalSlotRequestBulkWithTimestamp.java:66) > ~[flink-dist_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:91) > ~[flink-dist_2.11-1.12.1.jar:1.12.1] > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > ~[?:?] > at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?] > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:442) > ~[flink-dist_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:209) > ~[flink-dist_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) > ~[flink-dist_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159) > ~[flink-dist_2.11-1.12.1.jar:1.12.1] > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > [flink-dist_2.11-1.12.1.jar:1.12.1] > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > [flink-dist_2.11-1.12.1.jar:1.12.1] > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > [flink-dist_2.11-1.12.1.jar:1.12.1] > at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > [flink-dist_2.11-1.12.1.jar:1.12.1] > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) > [flink-dist_2.11-1.12.1.jar:1.12.1] > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > [flink-dist_2.11-1.12.1.jar:1.12.1] > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > [flink-dist_2.11-1.12.1.jar:1.12.1] > at akka.actor.Actor$class.aroundReceive(Actor.scala:517) > [flink-dist_2.11-1.12.1.jar:1.12.1] > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > [flink-dist_2.11-1.12.1.jar:1.12.1] > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > [flink-dist_2.11-1.12.1.jar:1.12.1] > at akka.actor.ActorCell.invoke(ActorCell.scala:561) > [flink-dist_2.11-1.12.1.jar:1.12.1] > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > [flink-dist_2.11-1.12.1.jar:1.12.1] > at akka.dispatch.Mailbox.run(Mailbox.scala:225) > [flink-dist_2.11-1.12.1.jar:1.12.1] > at akka.dispatch.Mailbox.exec(Mailbox.scala:235) > [flink-dist_2.11-1.12.1.jar:1.12.1] > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > [flink-dist_2.11-1.12.1.jar:1.12.1] > at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > [flink-dist_2.11-1.12.1.jar:1.12.1] > at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > [flink-dist_2.11-1.12.1.jar:1.12.1] > at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > [flink-dist_2.11-1.12.1.jar:1.12.1] > Caused by: > org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: > Slot request bulk is not fulfillable! Could not allocate the required slot > within slot request timeout > at > org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:86) > ~[flink-dist_2.11-1.12.1.jar:1.12.1] > ... 24 more > Caused by: java.util.concurrent.TimeoutException: Timeout has occurred: > 300000 ms > at > org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:86) > ~[flink-dist_2.11-1.12.1.jar:1.12.1] > ... 24 more > 2021-02-17 11:18:18,977 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - > Discarding the results produced by task execution > 317da8a62cf57f74cf587e6bd733f5e7. > 2021-02-17 11:18:18,977 INFO > > org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy > [] - Calculating tasks to restart to recover the failed task > e1ff02ab7657079c9d2254f12c031b2a_0. > 2021-02-17 11:18:18,977 INFO > > org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy > [] - 13 tasks should be restarted to recover the failed task > e1ff02ab7657079c9d2254f12c031b2a_0. > > *The jobs throws this error* > 2021-02-17 14:13:48 > java.util.concurrent.CompletionException: > org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: > Slot request bulk is not fulfillable! Could not allocate the required slot > within slot request timeout > at > java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331) > at > java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346) > at > java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:632) > at > java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > at > java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) > at > org.apache.flink.runtime.scheduler.SharedSlot.cancelLogicalSlotRequest(SharedSlot.java:223) > at > org.apache.flink.runtime.scheduler.SlotSharingExecutionSlotAllocator.cancelLogicalSlotRequest(SlotSharingExecutionSlotAllocator.java:168) > at > org.apache.flink.runtime.scheduler.SharingPhysicalSlotRequestBulk.cancel(SharingPhysicalSlotRequestBulk.java:86) > at > org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkWithTimestamp.cancel(PhysicalSlotRequestBulkWithTimestamp.java:66) > at > org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:91) > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:442) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:209) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at akka.actor.Actor$class.aroundReceive(Actor.scala:517) > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > at akka.actor.ActorCell.invoke(ActorCell.scala:561) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > at akka.dispatch.Mailbox.run(Mailbox.scala:225) > at akka.dispatch.Mailbox.exec(Mailbox.scala:235) > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at akka.dispatch.forkjoin.ForkJoinPool.runWorker(Unknown Source) > at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: > org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: > Slot request bulk is not fulfillable! Could not allocate the required slot > within slot request timeout > at > org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:86) > ... 24 more > Caused by: java.util.concurrent.TimeoutException: Timeout has occurred: > 300000 ms > ... 25 more > > Any suggestions ? > > Thanks > Omer > > On Tue, Feb 16, 2021 at 6:54 PM Till Rohrmann <trohrm...@apache.org> > wrote: > >> If you are running a session cluster, then Flink will create a config map >> for every submitted job. These config maps will unfortunately only be >> cleaned up when you shut down the cluster. This is a known limitation which >> we want to fix soon [1, 2]. >> >> If you can help us with updating the documentation properly (e.g. which >> role binding to use for the service account with minimal permissions), then >> we would highly appreciate your help. >> >> [1] https://issues.apache.org/jira/browse/FLINK-20695 >> [2] https://issues.apache.org/jira/browse/FLINK-21008 >> >> Cheers, >> Till >> >> On Tue, Feb 16, 2021 at 3:45 PM Omer Ozery <omeroz...@gmail.com> wrote: >> >>> Hey guys, >>> You are right, the documentation lacks this part, and the flink needs it >>> to start. >>> I'm not sure if it's 100% solved our problem because it creates endless >>> copies of the configmaps with random ids and also our jobs can't schedule >>> for some reason. >>> I will investigate this further with Daniel and let you know. >>> Also the access control given using this document is vast, imprecise and >>> clusterwide (it uses a default edit-all clusterRole), so when you create a >>> PR, make sure that whoever is in charge of the flink-k8s integration, >>> document the accurate permissions to create and attach to the flink's >>> components. >>> >>> Thanks very much for your help! >>> we will keep you updated. >>> Omer >>> >>> On Tue, Feb 16, 2021 at 3:26 PM Till Rohrmann <trohrm...@apache.org> >>> wrote: >>> >>>> Hi Omar, >>>> >>>> I think Matthias is right. The K8s HA services create and edit config >>>> maps. Hence they need the rights to do this. In the native K8s >>>> documentation there is a section about how to create a service account with >>>> the right permissions [1]. >>>> >>>> I think that our K8s HA documentation currently lacks this part. I will >>>> create a PR to update the documentation. >>>> >>>> [1] >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html#rbac >>>> >>>> Cheers, >>>> Till >>>> >>>> On Mon, Feb 15, 2021 at 9:32 AM Matthias Pohl <matth...@ververica.com> >>>> wrote: >>>> >>>>> I'm adding the Flink user ML to the conversation again. >>>>> >>>>> On Mon, Feb 15, 2021 at 8:18 AM Matthias Pohl <matth...@ververica.com> >>>>> wrote: >>>>> >>>>>> Hi Omer, >>>>>> thanks for sharing the configuration. You're right: Using NFS for >>>>>> HA's storageDir is fine. >>>>>> >>>>>> About the error message you're referring to: I haven't worked with >>>>>> the HA k8s service, yet. But the RBAC is a good hint. Flink's native >>>>>> Kubernetes documentation [1] points out that you can use a custom service >>>>>> account. This one needs special permissions to start/stop pods >>>>>> automatically (which does not apply in your case) but also to access >>>>>> ConfigMaps. You might want to try setting the permission as described in >>>>>> [1]. >>>>>> >>>>>> Best, >>>>>> Matthias >>>>>> >>>>>> [1] >>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html#rbac >>>>>> >>>>>> On Sun, Feb 14, 2021 at 7:16 PM Omer Ozery <omeroz...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hey Matthias. >>>>>>> My name is Omer, i am Daniel's devops, i will elaborate about our >>>>>>> flink situation. >>>>>>> these our flink resource definitions, as they are generated using >>>>>>> the helm template command (minus log4j,metrics configuration and some >>>>>>> sensitive data) >>>>>>> --- >>>>>>> # Source: flink/templates/flink-configmap.yaml >>>>>>> apiVersion: v1 >>>>>>> kind: ConfigMap >>>>>>> metadata: >>>>>>> name: flink-config >>>>>>> labels: >>>>>>> app: flink >>>>>>> data: >>>>>>> flink-conf.yaml: | >>>>>>> jobmanager.rpc.address: flink-jobmanager >>>>>>> jobmanager.rpc.port: 6123 >>>>>>> jobmanager.execution.failover-strategy: region >>>>>>> jobmanager.memory.process.size: 8g >>>>>>> taskmanager.memory.process.size: 24g >>>>>>> taskmanager.memory.task.off-heap.size: 1g >>>>>>> taskmanager.numberOfTaskSlots: 4 >>>>>>> queryable-state.proxy.ports: 6125 >>>>>>> queryable-state.enable: true >>>>>>> blob.server.port: 6124 >>>>>>> parallelism.default: 1 >>>>>>> state.backend.incremental: true >>>>>>> state.backend: rocksdb >>>>>>> state.backend.rocksdb.localdir: /opt/flink/rocksdb >>>>>>> state.checkpoints.dir: file:///opt/flink/checkpoints >>>>>>> classloader.resolve-order: child-first >>>>>>> kubernetes.cluster-id: flink-cluster >>>>>>> kubernetes.namespace: intel360-beta >>>>>>> high-availability: >>>>>>> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory >>>>>>> high-availability.storageDir: file:///opt/flink/recovery >>>>>>> >>>>>>> --- >>>>>>> # Source: flink/templates/flink-service.yaml >>>>>>> apiVersion: v1 >>>>>>> kind: Service >>>>>>> metadata: >>>>>>> name: flink-jobmanager >>>>>>> labels: >>>>>>> {} >>>>>>> spec: >>>>>>> ports: >>>>>>> - name: http-ui >>>>>>> port: 8081 >>>>>>> targetPort: http-ui >>>>>>> - name: tcp-rpc >>>>>>> port: 6123 >>>>>>> targetPort: tcp-rpc >>>>>>> - name: tcp-blob >>>>>>> port: 6124 >>>>>>> targetPort: tcp-blob >>>>>>> selector: >>>>>>> app: flink >>>>>>> component: jobmanager >>>>>>> --- >>>>>>> # Source: flink/templates/flink-deployment.yaml >>>>>>> apiVersion: apps/v1 >>>>>>> kind: Deployment >>>>>>> metadata: >>>>>>> name: flink-jobmanager >>>>>>> spec: >>>>>>> replicas: 1 >>>>>>> selector: >>>>>>> matchLabels: >>>>>>> app: flink >>>>>>> component: jobmanager >>>>>>> template: >>>>>>> metadata: >>>>>>> labels: >>>>>>> app: flink >>>>>>> component: jobmanager >>>>>>> annotations: >>>>>>> checksum/config: >>>>>>> f22b7a7c8a618b2f5257476737883s6a76a88c25b37e455257e120d8e4ed941 >>>>>>> spec: >>>>>>> containers: >>>>>>> - name: jobmanager >>>>>>> image: flink:1.12.1-scala_2.11-java11 >>>>>>> args: [ "jobmanager" ] >>>>>>> ports: >>>>>>> - name: http-ui >>>>>>> containerPort: 8081 >>>>>>> - name: tcp-rpc >>>>>>> containerPort: 6123 >>>>>>> - name: tcp-blob >>>>>>> containerPort: 6124 >>>>>>> resources: >>>>>>> {} >>>>>>> # Environment Variables >>>>>>> env: >>>>>>> - name: ENABLE_CHECKPOINTING >>>>>>> value: "true" >>>>>>> - name: JOB_MANAGER_RPC_ADDRESS >>>>>>> value: "flink-jobmanager" >>>>>>> volumeMounts: >>>>>>> - name: flink-config >>>>>>> mountPath: /opt/flink/conf/flink-conf.yaml >>>>>>> subPath: flink-conf.yaml >>>>>>> # NFS mounts >>>>>>> - name: flink-checkpoints >>>>>>> mountPath: "/opt/flink/checkpoints" >>>>>>> - name: flink-recovery >>>>>>> mountPath: "/opt/flink/recovery" >>>>>>> volumes: >>>>>>> - name: flink-config >>>>>>> configMap: >>>>>>> name: flink-config >>>>>>> # NFS volumes >>>>>>> - name: flink-checkpoints >>>>>>> nfs: >>>>>>> server: "my-nfs-server.my-org" >>>>>>> path: "/my-shared-nfs-dir/flink/checkpoints" >>>>>>> - name: flink-recovery >>>>>>> nfs: >>>>>>> server: "my-nfs-server.my-org" >>>>>>> path: "/my-shared-nfs-dir/flink/recovery" >>>>>>> --- >>>>>>> # Source: flink/templates/flink-deployment.yaml >>>>>>> apiVersion: apps/v1 >>>>>>> kind: Deployment >>>>>>> metadata: >>>>>>> name: flink-taskmanager >>>>>>> spec: >>>>>>> replicas: 7 >>>>>>> selector: >>>>>>> matchLabels: >>>>>>> app: flink >>>>>>> component: taskmanager >>>>>>> template: >>>>>>> metadata: >>>>>>> labels: >>>>>>> app: flink >>>>>>> component: taskmanager >>>>>>> annotations: >>>>>>> checksum/config: >>>>>>> f22b7a7c8a618b2f5257476737883s6a76a88c25b37e455257e120d8e4ed941 >>>>>>> spec: >>>>>>> containers: >>>>>>> - name: taskmanager >>>>>>> image: flink:1.12.1-scala_2.11-java11 >>>>>>> args: [ "taskmanager" ] >>>>>>> resources: >>>>>>> limits: >>>>>>> cpu: 6000m >>>>>>> memory: 24Gi >>>>>>> requests: >>>>>>> cpu: 6000m >>>>>>> memory: 24Gi >>>>>>> # Environment Variables >>>>>>> env: >>>>>>> - name: ENABLE_CHECKPOINTING >>>>>>> value: "true" >>>>>>> - name: JOB_MANAGER_RPC_ADDRESS >>>>>>> value: "flink-jobmanager" >>>>>>> volumeMounts: >>>>>>> - name: flink-config >>>>>>> mountPath: /opt/flink/conf/flink-conf.yaml >>>>>>> subPath: flink-conf.yaml >>>>>>> # NFS mounts >>>>>>> - name: flink-checkpoints >>>>>>> mountPath: "/opt/flink/checkpoints" >>>>>>> - name: flink-recovery >>>>>>> mountPath: "/opt/flink/recovery" >>>>>>> volumes: >>>>>>> - name: flink-config >>>>>>> configMap: >>>>>>> name: flink-config >>>>>>> # NFS volumes >>>>>>> - name: flink-checkpoints >>>>>>> nfs: >>>>>>> server: "my-nfs-server.my-org" >>>>>>> path: "/my-shared-nfs-dir/flink/checkpoints" >>>>>>> - name: flink-recovery >>>>>>> nfs: >>>>>>> server: "my-nfs-server.my-org" >>>>>>> path: "/my-shared-nfs-dir/flink/recovery" >>>>>>> --- >>>>>>> # Source: flink/templates/flink-ingress.yaml >>>>>>> apiVersion: extensions/v1beta1 >>>>>>> kind: Ingress >>>>>>> metadata: >>>>>>> name: jobmanager >>>>>>> spec: >>>>>>> rules: >>>>>>> - host: my.flink.job.manager.url >>>>>>> http: >>>>>>> paths: >>>>>>> - path: / >>>>>>> backend: >>>>>>> serviceName: flink-jobmanager >>>>>>> servicePort: 8081 >>>>>>> --- >>>>>>> >>>>>>> as you can see we are using the skeleton of the standalone >>>>>>> configuration as it documented here: >>>>>>> >>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/standalone/kubernetes.html >>>>>>> with some per-company configuration obviously, but still under the >>>>>>> scope of this document.. >>>>>>> >>>>>>> on a normal beautiful day and without the HA configuration, >>>>>>> everything works fine. >>>>>>> when trying to configure kubernetes HA using this document: >>>>>>> >>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html >>>>>>> with the following parameters: >>>>>>> kubernetes.cluster-id: flink-cluster >>>>>>> high-availability: >>>>>>> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory >>>>>>> high-availability.storageDir: file:///opt/flink/recovery >>>>>>> >>>>>>> the jobmanager fails with the following error: >>>>>>> 2021-02-14 16:57:19,103 ERROR >>>>>>> io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector [] - >>>>>>> Exception occurred while acquiring lock 'ConfigMapLock: default - >>>>>>> flink-cluster-restserver-leader (54211907-eba9-47b1-813e-11f12ba89ccb)' >>>>>>> io.fabric8.kubernetes.client.KubernetesClientException: Failure >>>>>>> executing: GET at: >>>>>>> https://10.96.0.1/api/v1/namespaces/default/configmaps/flink-cluster-restserver-leader. >>>>>>> Message: Forbidden!Configured service account doesn't have access. >>>>>>> Service >>>>>>> account may have been revoked. configmaps >>>>>>> "flink-cluster-restserver-leader" >>>>>>> is forbidden: User "system:serviceaccount:intel360-beta:default" cannot >>>>>>> get >>>>>>> resource "configmaps" in API group "" in the namespace "default". >>>>>>> >>>>>>> so we added this line as well (as you can see in the flink-config >>>>>>> configmap above) >>>>>>> kubernetes.namespace: intel360-beta >>>>>>> although it is not part of the document and i don't think flink >>>>>>> should be aware of the namespace it resides in, it damages the >>>>>>> modularity >>>>>>> of upper layers of configurations, regardless we added it and then got >>>>>>> the >>>>>>> the following error: >>>>>>> >>>>>>> 2021-02-14 17:00:57,086 ERROR >>>>>>> io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector [] - >>>>>>> Exception occurred while acquiring lock 'ConfigMapLock: intel360-beta - >>>>>>> flink-cluster-restserver-leader (66180ce6-c62e-4ea4-9420-0a3134bef3d6)' >>>>>>> io.fabric8.kubernetes.client.KubernetesClientException: Failure >>>>>>> executing: GET at: >>>>>>> https://10.96.0.1/api/v1/namespaces/intel360-beta/configmaps/flink-cluster-restserver-leader. >>>>>>> Message: Forbidden!Configured service account doesn't have access. >>>>>>> Service >>>>>>> account may have been revoked. configmaps >>>>>>> "flink-cluster-restserver-leader" >>>>>>> is forbidden: User "system:serviceaccount:intel360-beta:default" cannot >>>>>>> get >>>>>>> resource "configmaps" in API group "" in the namespace "intel360-beta". >>>>>>> >>>>>>> which is bassically the same error message just directed to the >>>>>>> flink's namespace. >>>>>>> my question is, do i need to add RBAC to the flink's service >>>>>>> account, because i got the impression from the flink official documents >>>>>>> and >>>>>>> some blogs responses that it designed to function without any special >>>>>>> permissions. >>>>>>> if we do need RBAC can you give an official documentations reference >>>>>>> of the exact permissions. >>>>>>> >>>>>>> NOTE: as you can see our flink-checkpoints and recovery locations >>>>>>> are directed to a local directory mounted to a shared NFS between all >>>>>>> tasks >>>>>>> and job manager, since our infrastructure is bare-metal by design. >>>>>>> (although this one is hosted in AWS) >>>>>>> >>>>>>> thanks in advance >>>>>>> Omer >>>>>>> >>>>>>> >>>>>>> ---------- Forwarded message --------- >>>>>>> From: Daniel Peled <daniel.peled.w...@gmail.com> >>>>>>> Date: Sun, Feb 14, 2021 at 6:18 PM >>>>>>> Subject: Fwd: Flink’s Kubernetes HA services - NOT working >>>>>>> To: <omeroz...@gmail.com> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> ---------- Forwarded message --------- >>>>>>> מאת: Matthias Pohl <matth...@ververica.com> >>>>>>> Date: יום ה׳, 11 בפבר׳ 2021 ב-17:37 >>>>>>> Subject: Re: Flink’s Kubernetes HA services - NOT working >>>>>>> To: Matthias Pohl <matth...@ververica.com> >>>>>>> Cc: Daniel Peled <daniel.peled.w...@gmail.com>, user < >>>>>>> user@flink.apache.org> >>>>>>> >>>>>>> >>>>>>> One other thing: It looks like you've set >>>>>>> high-availability.storageDir to a local path file:///opt/flink/recovery. >>>>>>> You should use a storage path that is accessible from all Flink cluster >>>>>>> components (e.g. using S3). Only references are stored in Kubernetes >>>>>>> ConfigMaps [1]. >>>>>>> >>>>>>> Best, >>>>>>> Matthias >>>>>>> >>>>>>> [1] >>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/ha/kubernetes_ha.html#configuration >>>>>>> >>>>>>> On Wed, Feb 10, 2021 at 6:08 PM Matthias Pohl < >>>>>>> matth...@ververica.com> wrote: >>>>>>> >>>>>>>> Hi Daniel, >>>>>>>> what's the exact configuration you used? Did you use the resource >>>>>>>> definitions provided in the Standalone Flink on Kubernetes docs [1]? >>>>>>>> Did >>>>>>>> you do certain things differently in comparison to the documentation? >>>>>>>> >>>>>>>> Best, >>>>>>>> Matthias >>>>>>>> >>>>>>>> [1] >>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html#appendix >>>>>>>> >>>>>>>> On Wed, Feb 10, 2021 at 1:31 PM Daniel Peled < >>>>>>>> daniel.peled.w...@gmail.com> wrote: >>>>>>>> >>>>>>>>> >>>>>>>>> ,Hey >>>>>>>>> >>>>>>>>> We are using standalone flink on kubernetes >>>>>>>>> :"And we have followed the instructions in the following link >>>>>>>>> "Kubernetes HA Services >>>>>>>>> >>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html >>>>>>>>> .We were unable to make it work >>>>>>>>> .We are facing a lot of problems >>>>>>>>> For example some of the jobs don't start complaining that there >>>>>>>>> are not enough slots available - although there are enough slots and >>>>>>>>> it >>>>>>>>> seems as the job manager is NOT aware of all the task managers >>>>>>>>> .In other scenario we were unable to run any job at all >>>>>>>>> The flink dashboard is unresponsive and we get the error >>>>>>>>> "flink service temporarily unavailable due to an ongoing leader >>>>>>>>> election. please refresh" >>>>>>>>> .We believe we are missing some configurations >>>>>>>>> ?Are there any more detailed instructions >>>>>>>>> ?And suggestions/tips >>>>>>>>> .Attached is the log of the job manager in one of the attempts >>>>>>>>> >>>>>>>>> Please give me some advice. >>>>>>>>> BR, >>>>>>>>> Danny >>>>>>>>> >>>>>>>> >>>>>>>