Hi Omer,

could you share a bit more of the logs with us? I would be interested in
what has happened before "Stopping DefaultLeaderRetrievalService" is
logged. One problem you might run into is FLINK-20417. This problem should
be fixed with Flink 1.12.2.

[1] https://issues.apache.org/jira/browse/FLINK-20417

Cheers,
Till

On Thu, Feb 18, 2021 at 2:54 PM Omer Ozery <omeroz...@gmail.com> wrote:

> Hey guys
> It looks like the flink cluster is deployed successfully, it starts with
> no errors.
> but when we try to deploy the jobs, some jobs are starting and some can't
> find available slots for some reason, even when we have free ones.
> happens with different jobs every time..
> below are the exceptions thrown by the components.
> and I also attached an image showing the taskamangers and the free slots.
>
> *jobManager throws this error:*
> 2021-02-17 11:19:41,956 INFO
>  org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService []
> - Stopping DefaultLeaderRetrievalService.
> 2021-02-17 11:19:41,956 INFO
>  org.apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriver
> [] - Stopping
> KubernetesLeaderRetrievalDriver{configMapName='flink-cluster-1796779318657734fcbc261f8d01d250-jobmanager-leader'}.
> 2021-02-17 11:19:41,956 INFO
>  org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapWatcher
> [] - The watcher is closing.
> 2021-02-17 11:19:41,956 INFO
>  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
> Registration of job manager bec569547a4ab5be4e2068a28164415a
> @akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_19 failed.
> 2021-02-17 11:19:41,956 INFO
>  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
> Registration of job manager bec569547a4ab5be4e2068a28164415a
> @akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_19 failed.
> 2021-02-17 11:19:41,956 INFO
>  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
> Registration of job manager bec569547a4ab5be4e2068a28164415a
> @akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_19 failed.
> 2021-02-17 11:19:41,956 INFO
>  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
> Registration of job manager bec569547a4ab5be4e2068a28164415a
> @akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_19 failed.
> 2021-02-17 11:19:41,956 INFO  org.apache.flink.runtime.jobmaster.JobMaster
>                 [] - Registration at ResourceManager was declined:
> java.lang.Exception: Job leader id service has been stopped.
>
>
> java.util.concurrent.CompletionException:
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Slot request bulk is not fulfillable! Could not allocate the required slot
> within slot request timeout
> at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
> ~[?:?]
> at
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
> ~[?:?]
> at
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:632)
> ~[?:?]
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> ~[?:?]
> at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
> ~[?:?]
> at
> org.apache.flink.runtime.scheduler.SharedSlot.cancelLogicalSlotRequest(SharedSlot.java:223)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.runtime.scheduler.SlotSharingExecutionSlotAllocator.cancelLogicalSlotRequest(SlotSharingExecutionSlotAllocator.java:168)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.runtime.scheduler.SharingPhysicalSlotRequestBulk.cancel(SharingPhysicalSlotRequestBulk.java:86)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkWithTimestamp.cancel(PhysicalSlotRequestBulkWithTimestamp.java:66)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:91)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> ~[?:?]
> at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:442)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:209)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> Caused by:
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Slot request bulk is not fulfillable! Could not allocate the required slot
> within slot request timeout
> at
> org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:86)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> ... 24 more
> Caused by: java.util.concurrent.TimeoutException: Timeout has occurred:
> 300000 ms
> at
> org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:86)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> ... 24 more
> 2021-02-17 11:18:18,977 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] -
> Discarding the results produced by task execution
> 317da8a62cf57f74cf587e6bd733f5e7.
> 2021-02-17 11:18:18,977 INFO
>  
> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
> [] - Calculating tasks to restart to recover the failed task
> e1ff02ab7657079c9d2254f12c031b2a_0.
> 2021-02-17 11:18:18,977 INFO
>  
> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
> [] - 13 tasks should be restarted to recover the failed task
> e1ff02ab7657079c9d2254f12c031b2a_0.
>
> *The jobs throws this error*
> 2021-02-17 14:13:48
> java.util.concurrent.CompletionException:
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Slot request bulk is not fulfillable! Could not allocate the required slot
> within slot request timeout
> at
> java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
> at
> java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
> at
> java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:632)
> at
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> at
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
> at
> org.apache.flink.runtime.scheduler.SharedSlot.cancelLogicalSlotRequest(SharedSlot.java:223)
> at
> org.apache.flink.runtime.scheduler.SlotSharingExecutionSlotAllocator.cancelLogicalSlotRequest(SlotSharingExecutionSlotAllocator.java:168)
> at
> org.apache.flink.runtime.scheduler.SharingPhysicalSlotRequestBulk.cancel(SharingPhysicalSlotRequestBulk.java:86)
> at
> org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkWithTimestamp.cancel(PhysicalSlotRequestBulkWithTimestamp.java:66)
> at
> org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:91)
> at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:442)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:209)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(Unknown Source)
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by:
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Slot request bulk is not fulfillable! Could not allocate the required slot
> within slot request timeout
> at
> org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:86)
> ... 24 more
> Caused by: java.util.concurrent.TimeoutException: Timeout has occurred:
> 300000 ms
> ... 25 more
>
> Any suggestions ?
>
> Thanks
> Omer
>
> On Tue, Feb 16, 2021 at 6:54 PM Till Rohrmann <trohrm...@apache.org>
> wrote:
>
>> If you are running a session cluster, then Flink will create a config map
>> for every submitted job. These config maps will unfortunately only be
>> cleaned up when you shut down the cluster. This is a known limitation which
>> we want to fix soon [1, 2].
>>
>> If you can help us with updating the documentation properly (e.g. which
>> role binding to use for the service account with minimal permissions), then
>> we would highly appreciate your help.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-20695
>> [2] https://issues.apache.org/jira/browse/FLINK-21008
>>
>> Cheers,
>> Till
>>
>> On Tue, Feb 16, 2021 at 3:45 PM Omer Ozery <omeroz...@gmail.com> wrote:
>>
>>> Hey guys,
>>> You are right, the documentation lacks this part, and the flink needs it
>>> to start.
>>> I'm not sure if it's 100% solved our problem because it creates endless
>>> copies of the configmaps with random ids and also our jobs can't schedule
>>> for some reason.
>>> I will investigate this further with Daniel and let you know.
>>> Also the access control given using this document is vast, imprecise and
>>> clusterwide (it uses a default edit-all clusterRole), so when you create a
>>> PR, make sure that whoever is in charge of  the flink-k8s integration,
>>> document the accurate permissions to create and attach to the flink's
>>> components.
>>>
>>> Thanks very much for your help!
>>> we will keep you updated.
>>> Omer
>>>
>>> On Tue, Feb 16, 2021 at 3:26 PM Till Rohrmann <trohrm...@apache.org>
>>> wrote:
>>>
>>>> Hi Omar,
>>>>
>>>> I think Matthias is right. The K8s HA services create and edit config
>>>> maps. Hence they need the rights to do this. In the native K8s
>>>> documentation there is a section about how to create a service account with
>>>> the right permissions [1].
>>>>
>>>> I think that our K8s HA documentation currently lacks this part. I will
>>>> create a PR to update the documentation.
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html#rbac
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Mon, Feb 15, 2021 at 9:32 AM Matthias Pohl <matth...@ververica.com>
>>>> wrote:
>>>>
>>>>> I'm adding the Flink user ML to the conversation again.
>>>>>
>>>>> On Mon, Feb 15, 2021 at 8:18 AM Matthias Pohl <matth...@ververica.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Omer,
>>>>>> thanks for sharing the configuration. You're right: Using NFS for
>>>>>> HA's storageDir is fine.
>>>>>>
>>>>>> About the error message you're referring to: I haven't worked with
>>>>>> the HA k8s service, yet. But the RBAC is a good hint. Flink's native
>>>>>> Kubernetes documentation [1] points out that you can use a custom service
>>>>>> account. This one needs special permissions to start/stop pods
>>>>>> automatically (which does not apply in your case) but also to access
>>>>>> ConfigMaps. You might want to try setting the permission as described in
>>>>>> [1].
>>>>>>
>>>>>> Best,
>>>>>> Matthias
>>>>>>
>>>>>> [1]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html#rbac
>>>>>>
>>>>>> On Sun, Feb 14, 2021 at 7:16 PM Omer Ozery <omeroz...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hey Matthias.
>>>>>>> My name is Omer, i am Daniel's devops, i will elaborate about our
>>>>>>> flink situation.
>>>>>>> these our flink resource definitions, as they are generated using
>>>>>>> the helm template command (minus log4j,metrics configuration and some
>>>>>>> sensitive data)
>>>>>>> ---
>>>>>>> # Source: flink/templates/flink-configmap.yaml
>>>>>>> apiVersion: v1
>>>>>>> kind: ConfigMap
>>>>>>> metadata:
>>>>>>>   name: flink-config
>>>>>>>   labels:
>>>>>>>     app: flink
>>>>>>> data:
>>>>>>>   flink-conf.yaml: |
>>>>>>>     jobmanager.rpc.address: flink-jobmanager
>>>>>>>     jobmanager.rpc.port: 6123
>>>>>>>     jobmanager.execution.failover-strategy: region
>>>>>>>     jobmanager.memory.process.size: 8g
>>>>>>>     taskmanager.memory.process.size: 24g
>>>>>>>     taskmanager.memory.task.off-heap.size: 1g
>>>>>>>     taskmanager.numberOfTaskSlots: 4
>>>>>>>     queryable-state.proxy.ports: 6125
>>>>>>>     queryable-state.enable: true
>>>>>>>     blob.server.port: 6124
>>>>>>>     parallelism.default: 1
>>>>>>>     state.backend.incremental: true
>>>>>>>     state.backend: rocksdb
>>>>>>>     state.backend.rocksdb.localdir: /opt/flink/rocksdb
>>>>>>>     state.checkpoints.dir: file:///opt/flink/checkpoints
>>>>>>>     classloader.resolve-order: child-first
>>>>>>>     kubernetes.cluster-id: flink-cluster
>>>>>>>     kubernetes.namespace: intel360-beta
>>>>>>>     high-availability:
>>>>>>> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>>>>>>>     high-availability.storageDir: file:///opt/flink/recovery
>>>>>>>
>>>>>>> ---
>>>>>>> # Source: flink/templates/flink-service.yaml
>>>>>>> apiVersion: v1
>>>>>>> kind: Service
>>>>>>> metadata:
>>>>>>>   name: flink-jobmanager
>>>>>>>   labels:
>>>>>>>     {}
>>>>>>> spec:
>>>>>>>   ports:
>>>>>>>   - name: http-ui
>>>>>>>     port: 8081
>>>>>>>     targetPort: http-ui
>>>>>>>   - name: tcp-rpc
>>>>>>>     port: 6123
>>>>>>>     targetPort: tcp-rpc
>>>>>>>   - name: tcp-blob
>>>>>>>     port: 6124
>>>>>>>     targetPort: tcp-blob
>>>>>>>   selector:
>>>>>>>     app: flink
>>>>>>>     component: jobmanager
>>>>>>> ---
>>>>>>> # Source: flink/templates/flink-deployment.yaml
>>>>>>> apiVersion: apps/v1
>>>>>>> kind: Deployment
>>>>>>> metadata:
>>>>>>>   name: flink-jobmanager
>>>>>>> spec:
>>>>>>>   replicas: 1
>>>>>>>   selector:
>>>>>>>     matchLabels:
>>>>>>>       app: flink
>>>>>>>       component: jobmanager
>>>>>>>   template:
>>>>>>>     metadata:
>>>>>>>       labels:
>>>>>>>         app: flink
>>>>>>>         component: jobmanager
>>>>>>>       annotations:
>>>>>>>         checksum/config:
>>>>>>> f22b7a7c8a618b2f5257476737883s6a76a88c25b37e455257e120d8e4ed941
>>>>>>>     spec:
>>>>>>>       containers:
>>>>>>>       - name: jobmanager
>>>>>>>         image: flink:1.12.1-scala_2.11-java11
>>>>>>>         args: [ "jobmanager" ]
>>>>>>>         ports:
>>>>>>>         - name: http-ui
>>>>>>>           containerPort: 8081
>>>>>>>         - name: tcp-rpc
>>>>>>>           containerPort: 6123
>>>>>>>         - name: tcp-blob
>>>>>>>           containerPort: 6124
>>>>>>>         resources:
>>>>>>>           {}
>>>>>>>         # Environment Variables
>>>>>>>         env:
>>>>>>>         - name: ENABLE_CHECKPOINTING
>>>>>>>           value: "true"
>>>>>>>         - name: JOB_MANAGER_RPC_ADDRESS
>>>>>>>           value: "flink-jobmanager"
>>>>>>>         volumeMounts:
>>>>>>>         - name: flink-config
>>>>>>>           mountPath: /opt/flink/conf/flink-conf.yaml
>>>>>>>           subPath: flink-conf.yaml
>>>>>>>         # NFS mounts
>>>>>>>         - name: flink-checkpoints
>>>>>>>           mountPath: "/opt/flink/checkpoints"
>>>>>>>         - name: flink-recovery
>>>>>>>           mountPath: "/opt/flink/recovery"
>>>>>>>       volumes:
>>>>>>>       - name: flink-config
>>>>>>>         configMap:
>>>>>>>           name: flink-config
>>>>>>>       # NFS volumes
>>>>>>>       - name: flink-checkpoints
>>>>>>>         nfs:
>>>>>>>           server: "my-nfs-server.my-org"
>>>>>>>           path: "/my-shared-nfs-dir/flink/checkpoints"
>>>>>>>       - name: flink-recovery
>>>>>>>         nfs:
>>>>>>>           server: "my-nfs-server.my-org"
>>>>>>>           path: "/my-shared-nfs-dir/flink/recovery"
>>>>>>> ---
>>>>>>> # Source: flink/templates/flink-deployment.yaml
>>>>>>> apiVersion: apps/v1
>>>>>>> kind: Deployment
>>>>>>> metadata:
>>>>>>>   name: flink-taskmanager
>>>>>>> spec:
>>>>>>>   replicas: 7
>>>>>>>   selector:
>>>>>>>     matchLabels:
>>>>>>>       app: flink
>>>>>>>       component: taskmanager
>>>>>>>   template:
>>>>>>>     metadata:
>>>>>>>       labels:
>>>>>>>         app: flink
>>>>>>>         component: taskmanager
>>>>>>>       annotations:
>>>>>>>         checksum/config:
>>>>>>> f22b7a7c8a618b2f5257476737883s6a76a88c25b37e455257e120d8e4ed941
>>>>>>>     spec:
>>>>>>>       containers:
>>>>>>>       - name: taskmanager
>>>>>>>         image: flink:1.12.1-scala_2.11-java11
>>>>>>>         args: [ "taskmanager" ]
>>>>>>>         resources:
>>>>>>>           limits:
>>>>>>>             cpu: 6000m
>>>>>>>             memory: 24Gi
>>>>>>>           requests:
>>>>>>>             cpu: 6000m
>>>>>>>             memory: 24Gi
>>>>>>>         # Environment Variables
>>>>>>>         env:
>>>>>>>         - name: ENABLE_CHECKPOINTING
>>>>>>>           value: "true"
>>>>>>>         - name: JOB_MANAGER_RPC_ADDRESS
>>>>>>>           value: "flink-jobmanager"
>>>>>>>         volumeMounts:
>>>>>>>         - name: flink-config
>>>>>>>           mountPath: /opt/flink/conf/flink-conf.yaml
>>>>>>>           subPath: flink-conf.yaml
>>>>>>>         # NFS mounts
>>>>>>>         - name: flink-checkpoints
>>>>>>>           mountPath: "/opt/flink/checkpoints"
>>>>>>>         - name: flink-recovery
>>>>>>>           mountPath: "/opt/flink/recovery"
>>>>>>>       volumes:
>>>>>>>       - name: flink-config
>>>>>>>         configMap:
>>>>>>>           name: flink-config
>>>>>>>       # NFS volumes
>>>>>>>       - name: flink-checkpoints
>>>>>>>         nfs:
>>>>>>>           server: "my-nfs-server.my-org"
>>>>>>>           path: "/my-shared-nfs-dir/flink/checkpoints"
>>>>>>>       - name: flink-recovery
>>>>>>>         nfs:
>>>>>>>           server: "my-nfs-server.my-org"
>>>>>>>           path: "/my-shared-nfs-dir/flink/recovery"
>>>>>>> ---
>>>>>>> # Source: flink/templates/flink-ingress.yaml
>>>>>>> apiVersion: extensions/v1beta1
>>>>>>> kind: Ingress
>>>>>>> metadata:
>>>>>>>   name: jobmanager
>>>>>>> spec:
>>>>>>>   rules:
>>>>>>>     - host: my.flink.job.manager.url
>>>>>>>       http:
>>>>>>>         paths:
>>>>>>>           - path: /
>>>>>>>             backend:
>>>>>>>               serviceName: flink-jobmanager
>>>>>>>               servicePort: 8081
>>>>>>> ---
>>>>>>>
>>>>>>> as you can see we are using the skeleton of the standalone
>>>>>>> configuration as it documented here:
>>>>>>>
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/standalone/kubernetes.html
>>>>>>> with some per-company configuration obviously, but still under the
>>>>>>> scope of this document..
>>>>>>>
>>>>>>> on a normal beautiful day and without the HA configuration,
>>>>>>> everything works fine.
>>>>>>> when trying to configure kubernetes HA using this document:
>>>>>>>
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html
>>>>>>> with the following parameters:
>>>>>>>     kubernetes.cluster-id: flink-cluster
>>>>>>>     high-availability:
>>>>>>> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>>>>>>>     high-availability.storageDir: file:///opt/flink/recovery
>>>>>>>
>>>>>>> the jobmanager fails with the following error:
>>>>>>> 2021-02-14 16:57:19,103 ERROR
>>>>>>> io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector [] -
>>>>>>> Exception occurred while acquiring lock 'ConfigMapLock: default -
>>>>>>> flink-cluster-restserver-leader (54211907-eba9-47b1-813e-11f12ba89ccb)'
>>>>>>> io.fabric8.kubernetes.client.KubernetesClientException: Failure
>>>>>>> executing: GET at:
>>>>>>> https://10.96.0.1/api/v1/namespaces/default/configmaps/flink-cluster-restserver-leader.
>>>>>>> Message: Forbidden!Configured service account doesn't have access. 
>>>>>>> Service
>>>>>>> account may have been revoked. configmaps 
>>>>>>> "flink-cluster-restserver-leader"
>>>>>>> is forbidden: User "system:serviceaccount:intel360-beta:default" cannot 
>>>>>>> get
>>>>>>> resource "configmaps" in API group "" in the namespace "default".
>>>>>>>
>>>>>>> so we added this line as well (as you can see in the flink-config
>>>>>>> configmap above)
>>>>>>> kubernetes.namespace: intel360-beta
>>>>>>> although it is not part of the document and i don't think flink
>>>>>>> should be aware of the namespace it resides in, it damages the 
>>>>>>> modularity
>>>>>>> of upper layers of configurations, regardless we added it and then got 
>>>>>>> the
>>>>>>> the following error:
>>>>>>>
>>>>>>> 2021-02-14 17:00:57,086 ERROR
>>>>>>> io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector [] -
>>>>>>> Exception occurred while acquiring lock 'ConfigMapLock: intel360-beta -
>>>>>>> flink-cluster-restserver-leader (66180ce6-c62e-4ea4-9420-0a3134bef3d6)'
>>>>>>> io.fabric8.kubernetes.client.KubernetesClientException: Failure
>>>>>>> executing: GET at:
>>>>>>> https://10.96.0.1/api/v1/namespaces/intel360-beta/configmaps/flink-cluster-restserver-leader.
>>>>>>> Message: Forbidden!Configured service account doesn't have access. 
>>>>>>> Service
>>>>>>> account may have been revoked. configmaps 
>>>>>>> "flink-cluster-restserver-leader"
>>>>>>> is forbidden: User "system:serviceaccount:intel360-beta:default" cannot 
>>>>>>> get
>>>>>>> resource "configmaps" in API group "" in the namespace "intel360-beta".
>>>>>>>
>>>>>>> which is bassically the same error message just directed to the
>>>>>>> flink's namespace.
>>>>>>> my question is, do i need to add RBAC to the flink's service
>>>>>>> account, because i got the impression from the flink official documents 
>>>>>>> and
>>>>>>> some blogs responses that it designed to function without any special
>>>>>>> permissions.
>>>>>>> if we do need RBAC can you give an official documentations reference
>>>>>>> of the exact permissions.
>>>>>>>
>>>>>>> NOTE: as you can see our flink-checkpoints and recovery locations
>>>>>>> are directed to a local directory mounted to a shared NFS between all 
>>>>>>> tasks
>>>>>>> and job manager, since our infrastructure is bare-metal by design.
>>>>>>> (although this one is hosted in AWS)
>>>>>>>
>>>>>>> thanks in advance
>>>>>>> Omer
>>>>>>>
>>>>>>>
>>>>>>> ---------- Forwarded message ---------
>>>>>>> From: Daniel Peled <daniel.peled.w...@gmail.com>
>>>>>>> Date: Sun, Feb 14, 2021 at 6:18 PM
>>>>>>> Subject: Fwd: Flink’s Kubernetes HA services - NOT working
>>>>>>> To: <omeroz...@gmail.com>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> ---------- Forwarded message ---------
>>>>>>> מאת: Matthias Pohl <matth...@ververica.com>
>>>>>>> ‪Date: יום ה׳, 11 בפבר׳ 2021 ב-17:37‬
>>>>>>> Subject: Re: Flink’s Kubernetes HA services - NOT working
>>>>>>> To: Matthias Pohl <matth...@ververica.com>
>>>>>>> Cc: Daniel Peled <daniel.peled.w...@gmail.com>, user <
>>>>>>> user@flink.apache.org>
>>>>>>>
>>>>>>>
>>>>>>> One other thing: It looks like you've set
>>>>>>> high-availability.storageDir to a local path file:///opt/flink/recovery.
>>>>>>> You should use a storage path that is accessible from all Flink cluster
>>>>>>> components (e.g. using S3). Only references are stored in Kubernetes
>>>>>>> ConfigMaps [1].
>>>>>>>
>>>>>>> Best,
>>>>>>> Matthias
>>>>>>>
>>>>>>> [1]
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/ha/kubernetes_ha.html#configuration
>>>>>>>
>>>>>>> On Wed, Feb 10, 2021 at 6:08 PM Matthias Pohl <
>>>>>>> matth...@ververica.com> wrote:
>>>>>>>
>>>>>>>> Hi Daniel,
>>>>>>>> what's the exact configuration you used? Did you use the resource
>>>>>>>> definitions provided in the Standalone Flink on Kubernetes docs [1]? 
>>>>>>>> Did
>>>>>>>> you do certain things differently in comparison to the documentation?
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Matthias
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html#appendix
>>>>>>>>
>>>>>>>> On Wed, Feb 10, 2021 at 1:31 PM Daniel Peled <
>>>>>>>> daniel.peled.w...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>>
>>>>>>>>> ,Hey
>>>>>>>>>
>>>>>>>>> We are using standalone flink on kubernetes
>>>>>>>>> :"And we have followed the instructions in the following link
>>>>>>>>> "Kubernetes HA Services
>>>>>>>>>
>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html
>>>>>>>>> .We were unable to make it work
>>>>>>>>> .We are facing a lot of problems
>>>>>>>>> For example some of the jobs don't start complaining that there
>>>>>>>>> are not enough slots available - although there are enough slots  and 
>>>>>>>>> it
>>>>>>>>> seems as the job manager is NOT aware of all the task managers
>>>>>>>>> .In other scenario we were unable to run any job at all
>>>>>>>>>  The flink dashboard is unresponsive and we get the error
>>>>>>>>> "flink service temporarily unavailable due to an ongoing leader
>>>>>>>>> election. please refresh"
>>>>>>>>> .We believe we are missing some configurations
>>>>>>>>>  ?Are there any more detailed instructions
>>>>>>>>> ?And suggestions/tips
>>>>>>>>>  .Attached is the log of the job manager in one of the attempts
>>>>>>>>>
>>>>>>>>> Please give me some advice.
>>>>>>>>> BR,
>>>>>>>>> Danny
>>>>>>>>>
>>>>>>>>
>>>>>>>

Reply via email to