Re: Flink Metric isBackPressured not available
Thanks for your reply. I'm using Flink 1.12. I'm checking in Datadog and the metric is not available there. It has other task/operator metrics such as numRecordsIn/numRecordsOut there but not the isBackPressured. On Mon, Apr 12, 2021 at 8:40 AM Roman Khachatryan wrote: > Hi, > > The metric is registered upon task deployment and reported periodically. > > Which Flink version are you using? The metric was added in 1.10. > Are you checking it in the UI? > > Regards, > Roman > > On Fri, Apr 9, 2021 at 8:50 PM Claude M wrote: > > > > Hello, > > > > The documentation here > https://ci.apache.org/projects/flink/flink-docs-stable/ops/metrics.html > states there is a isBackPressured metric available yet I don't see it. Any > ideas why? > > > > > > Thanks >
Flink Metric isBackPressured not available
Hello, The documentation here https://ci.apache.org/projects/flink/flink-docs-stable/ops/metrics.html states there is a isBackPressured metric available yet I don't see it. Any ideas why? Thanks
Flink Metrics emitted from a Kubernetes Application Cluster
Hello, I've setup Flink as an Application Cluster in Kubernetes. Now I'm looking into monitoring the Flink cluster in Datadog. This is what is configured in the flink-conf.yaml to emit metrics: metrics.scope.jm: flink.jobmanager metrics.scope.jm.job: flink.jobmanager.job metrics.scope.tm: flink.taskmanager metrics.scope.tm.job: flink.taskmanager.job metrics.scope.task: flink.task metrics.scope.operator: flink.operator metrics.reporter.dghttp.class: org.apache.flink.metrics.datadog.DatadogHttpReporter metrics.reporter.dghttp.apikey: {{ datadog_api_key }} metrics.reporter.dghttp.tags: environment: {{ environment }} When it gets to Datadog though, the metrics for the flink.jobmanager and flink.taskmanager is filtered by the host which is the Pod IP. However, I would like it to use the pod name. How can this be accomplished? Thanks
Re: Restoring from Flink Savepoint in Kubernetes not working
Thanks for your reply. I'm using the flink docker image flink:1.12.2-scala_2.11-java8. Yes, the folder was created in S3. I took a look at the UI and it showed the following: *Latest Restore ID: 49Restore Time: 2021-03-31 09:37:43Type: CheckpointPath: s3:fcc82deebb4565f31a7f63989939c463/chk-49* However, this is different from the savepoint path I specified. I specified the following: *s3:savepoint2/savepoint-9fe457-504c312ffabe* Is there anything specific you're looking for in the logs? I did not find any exceptions and there is a lot of sensitive information I would have to extract from it. Also, this morning, I tried creating another savepoint. It first showed it was In Progress. curl http://localhost:8081/jobs/fcc82deebb4565f31a7f63989939c463/savepoints/4d19307dd99337257c4738871b1c63d8 {"status":{"id":"IN_PROGRESS"},"operation":null} Then later when I tried to check the status, I saw the attached exception. In the UI, I see the following: *Latest Failed Checkpoint ID: 50Failure Time: 2021-03-31 09:34:43Cause: Asynchronous task checkpoint failed.* What does this failure mean? On Wed, Mar 31, 2021 at 9:22 AM Matthias Pohl wrote: > Hi Claude, > thanks for reaching out to the Flink community. Could you provide the > Flink logs for this run to get a better understanding of what's going on? > Additionally, what exact Flink 1.12 version are you using? Did you also > verify that the snapshot was created by checking the actual folder? > > Best, > Matthias > > On Wed, Mar 31, 2021 at 4:56 AM Claude M wrote: > >> Hello, >> >> I have Flink setup as an Application Cluster in Kubernetes, using Flink >> version 1.12. I created a savepoint using the curl command and the status >> indicated it was completed. I then tried to relaunch the job from that >> save point using the following arguments as indicated in the doc found >> here: >> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes >> >> args: ["standalone-job", "--job-classname", "", "--job-id", >> "", "--fromSavepoint", "s3:///", >> "--allowNonRestoredState"] >> >> After the job launches, I check the offsets and they are not the same as >> when the savepoint was created. The job id passed in also does not match >> the job id that was launched. I even put an incorrect savepoint path to >> see what happens and there were no errors in the logs and the job still >> launches. It seems these arguments are not even being evaluated. Any >> ideas about this? >> >> >> Thanks >> > {"errors":["org.apache.flink.runtime.rest.NotFoundException: Operation not found under key: org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey@4b261c41\n\tat org.apache.flink.runtime.rest.handler.async.AbstractAsynchronousOperationHandlers$StatusHandler.handleRequest (AbstractAsynchronousOperationHandlers.java:182)\n\tat org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers$SavepointStatusHandler.handleRequest (SavepointHandlers.java:219)\n\tat org.apache.flink.runtime.rest.handler.AbstractRestHandler.respondToRequest (AbstractRestHandler.java:83)\n\tat org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader (AbstractHandler.java:195)\n\tat org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0 (LeaderRetrievalHandler.java:83)\n\tat java.util.Optional.ifPresent(Optional.java:159)\n\tat org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45)\n\tat org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80)\n\tat org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)\n\tat org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:115)\n\tat org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:94)\n\tat org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:55)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelIn
Restoring from Flink Savepoint in Kubernetes not working
Hello, I have Flink setup as an Application Cluster in Kubernetes, using Flink version 1.12. I created a savepoint using the curl command and the status indicated it was completed. I then tried to relaunch the job from that save point using the following arguments as indicated in the doc found here: https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes args: ["standalone-job", "--job-classname", "", "--job-id", "", "--fromSavepoint", "s3:///", "--allowNonRestoredState"] After the job launches, I check the offsets and they are not the same as when the savepoint was created. The job id passed in also does not match the job id that was launched. I even put an incorrect savepoint path to see what happens and there were no errors in the logs and the job still launches. It seems these arguments are not even being evaluated. Any ideas about this? Thanks
Flink failing to restore from checkpoint
Hello, I executed a flink job in a Kubernetes Application cluster w/ four taskmanagers. The job was running fine for several hours but then crashed w/ the following exception which seems to be when restoring from a checkpoint.The UI shows the following for the checkpoint counts: Triggered: 68In Progress: 0Completed: 67Failed: 1Restored: 292 Any ideas about this failure? Thanks java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:254) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:427) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:543) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:533) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for KeyedProcessOperator_233fe9791f870db6076db489fea576c1_(31/32) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) ... 9 more Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception. at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:362) at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:587) at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:93) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ... 11 more Caused by: java.io.FileNotFoundException: /mnt/checkpoints/5dde50b6e70608c63708cbf979bce4aa/shared/47993871-c7eb-4fec-ae23-207d307c384a (No such file or directory) at java.io.FileInputStream.open0(Native Method) at java.io.FileInputStream.open(FileInputStream.java:195) at java.io.FileInputStream.(FileInputStream.java:138) at org.apache.flink.core.fs.local.LocalDataInputStream.(LocalDataInputStream.java:50) at org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:134) at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:87) at org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68) at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:127) at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:110) at org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:49) at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640) at org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:217) at java.util.concurrent.CompletableFuture.asyncRunStage(CompletableFuture.java:1654) at java.util.concurrent.CompletableFuture.runAsync(CompletableFuture.java:1871) at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForAllStateHandles(RocksDBStateDownloader.java:82) at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.transferAllStateDataToDirectory(RocksDBStateDownloader.java:63) at
Re: Kubernetes Application Cluster Not Working
This issue was resolved by adding the following environment variable to both the jobmanager and taskmanager: - name: JOB_MANAGER_RPC_ADDRESS value: jobmanager On Wed, Mar 24, 2021 at 1:33 AM Yang Wang wrote: > Are you sure that the JobManager akka address is binded to > "flink-jobmanager"? > You could set "jobmanager.rpc.address" to flink-jobmanager in the > ConfigMap. > > Best, > Yang > > Guowei Ma 于2021年3月24日周三 上午10:22写道: > >> Hi, M >> Could you give the full stack? This might not be the root cause. >> Best, >> Guowei >> >> >> On Wed, Mar 24, 2021 at 2:46 AM Claude M wrote: >> >>> Hello, >>> >>> I'm trying to setup Flink in Kubernetes using the Application Mode as >>> described here: >>> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes >>> >>> The doc mentions that there needs to be a aervice exposing the >>> JobManager’s REST and UI ports. It then points to a link w/ the resource >>> definitions: >>> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes/#application-cluster-resource-definitions >>> and I defined the following service along w/ the jobmanager, taskmanager, >>> and flink-conf. >>> >>> apiVersion: v1 >>> kind: Service >>> metadata: >>> name: flink-jobmanager >>> spec: >>> type: ClusterIP >>> ports: >>> - name: rpc >>> port: 6123 >>> - name: blob-server >>> port: 6124 >>> - name: webui >>> port: 8081 >>> selector: >>> app: flink >>> component: jobmanager >>> >>> >>> I am able to access the jobmanager UI but the taskmanagers are failing >>> w/ the following error: >>> Could not resolve ResourceManager address >>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_* >>> >>> Any ideas about this? >>> >>> >>> Thanks >>> >>
Kubernetes Application Cluster Not Working
Hello, I'm trying to setup Flink in Kubernetes using the Application Mode as described here: https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes The doc mentions that there needs to be a aervice exposing the JobManager’s REST and UI ports. It then points to a link w/ the resource definitions: https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes/#application-cluster-resource-definitions and I defined the following service along w/ the jobmanager, taskmanager, and flink-conf. apiVersion: v1 kind: Service metadata: name: flink-jobmanager spec: type: ClusterIP ports: - name: rpc port: 6123 - name: blob-server port: 6124 - name: webui port: 8081 selector: app: flink component: jobmanager I am able to access the jobmanager UI but the taskmanagers are failing w/ the following error: Could not resolve ResourceManager address akka.tcp://flink@flink-jobmanager :6123/user/rpc/resourcemanager_* Any ideas about this? Thanks
Timeout Exception When Producing/Consuming Messages to Hundreds of Topics
Hello, I'm trying to run an experiment w/ two flink jobs: - A producer producing messages to hundreds of topics - A consumer consuming the messages from all the topics After the job runs after a few minutes, it will fail w/ following error: Caused by: org.apache.kafka.common.errors.TimeoutException: Topic not present in metadata after 6 ms If I run the job w/ a few topics, it will work. I have tried setting the following properties in the job but still encounter the problem: properties.setProperty("retries", "20"); properties.setProperty("request.timeout.ms", "30"); properties.setProperty("metadata.fetch.timeout.ms", "30"); Any ideas about this? Thanks
Re: Producer Configuration
Yes, the flink job also works in producing messages. It's just that after a short period of time, it fails w/ a timeout. That is why I'm trying to set a longer timeout period but it doesn't seem like the properties are being picked up. On Sat, Feb 27, 2021 at 1:17 PM Alexey Trenikhun wrote: > Can you produce messages using Kafka console producer connect using same > properties ? > > -- > *From:* Claude M > *Sent:* Saturday, February 27, 2021 8:05 AM > *To:* Alexey Trenikhun > *Cc:* user > *Subject:* Re: Producer Configuration > > Thanks for your reply, yes it was specified. Sorry I forgot to include > it: > properties.setProperty("bootstrap.servers", "localhost:9092"); > > On Fri, Feb 26, 2021 at 7:56 PM Alexey Trenikhun wrote: > > I believe bootstrap.servers is mandatory Kafka property, but it looks like > you didn’t set it > > -- > *From:* Claude M > *Sent:* Friday, February 26, 2021 12:02:10 PM > *To:* user > *Subject:* Producer Configuration > > Hello, > > I created a simple Producer and when the job ran, it was getting the > following error: > Caused by: org.apache.kafka.common.errors.TimeoutException > > I read about increasing the request.timeout.ms. Thus, I added the > following properties. > > Properties properties = new Properties(); > properties.setProperty("request.timeout.ms", "3"); > properties.setProperty("retries", "20"); > DataStream stream = env.addSource(new SimpleStringGenerator()); > stream.addSink(new FlinkKafkaProducer<>("flink-test", new > SimpleStringSchema(), properties)); > > However, after the job is submitted, the User Configuration is empty, > please see attached. > Therefore, it seems these properties are taking into effect since I still > have the same problem. > Any help on these issues are appreciated, thanks. > >
Re: Producer Configuration
Thanks for your reply, yes it was specified. Sorry I forgot to include it: properties.setProperty("bootstrap.servers", "localhost:9092"); On Fri, Feb 26, 2021 at 7:56 PM Alexey Trenikhun wrote: > I believe bootstrap.servers is mandatory Kafka property, but it looks like > you didn’t set it > > ------ > *From:* Claude M > *Sent:* Friday, February 26, 2021 12:02:10 PM > *To:* user > *Subject:* Producer Configuration > > Hello, > > I created a simple Producer and when the job ran, it was getting the > following error: > Caused by: org.apache.kafka.common.errors.TimeoutException > > I read about increasing the request.timeout.ms. Thus, I added the > following properties. > > Properties properties = new Properties(); > properties.setProperty("request.timeout.ms", "3"); > properties.setProperty("retries", "20"); > DataStream stream = env.addSource(new SimpleStringGenerator()); > stream.addSink(new FlinkKafkaProducer<>("flink-test", new > SimpleStringSchema(), properties)); > > However, after the job is submitted, the User Configuration is empty, > please see attached. > Therefore, it seems these properties are taking into effect since I still > have the same problem. > Any help on these issues are appreciated, thanks. >
Producer Configuration
Hello, I created a simple Producer and when the job ran, it was getting the following error: Caused by: org.apache.kafka.common.errors.TimeoutException I read about increasing the request.timeout.ms. Thus, I added the following properties. Properties properties = new Properties(); properties.setProperty("request.timeout.ms", "3"); properties.setProperty("retries", "20"); DataStream stream = env.addSource(new SimpleStringGenerator()); stream.addSink(new FlinkKafkaProducer<>("flink-test", new SimpleStringSchema(), properties)); However, after the job is submitted, the User Configuration is empty, please see attached. Therefore, it seems these properties are taking into effect since I still have the same problem. Any help on these issues are appreciated, thanks.
Flink Datadog Timeout
Hello, I have a Flink jobmanager and taskmanagers deployed in a Kubernetes cluster. I integrated it with Datadog by having the following specified in the flink-conf.yaml. metrics.reporter.dghttp.class: org.apache.flink.metrics.datadog.DatadogHttpReporter metrics.reporter.dghttp.apikey: However, I'm seeing random timeouts in the log and don't know why this is occurring and how to solve the issue. Please see attached file showing the error. Thanks WARN org.apache.flink.metrics.datadog.DatadogHttpClient - Failed sending request to Datadog java.net.SocketTimeoutException: timeout at org.apache.flink.shaded.okhttp3.internal.http2.Http2Stream$StreamTimeout.newTimeoutException(Http2Stream.java:593) at org.apache.flink.shaded.okhttp3.internal.http2.Http2Stream$StreamTimeout.exitAndThrowIfTimedOut(Http2Stream.java:601) at org.apache.flink.shaded.okhttp3.internal.http2.Http2Stream.takeResponseHeaders(Http2Stream.java:146) at org.apache.flink.shaded.okhttp3.internal.http2.Http2Codec.readResponseHeaders(Http2Codec.java:120) at org.apache.flink.shaded.okhttp3.internal.http.CallServerInterceptor.intercept(CallServerInterceptor.java:75) at org.apache.flink.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92) at org.apache.flink.shaded.okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:45) at org.apache.flink.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92) at org.apache.flink.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67) at org.apache.flink.shaded.okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93) at org.apache.flink.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92) at org.apache.flink.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67) at org.apache.flink.shaded.okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93) at org.apache.flink.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92) at org.apache.flink.shaded.okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:120) at org.apache.flink.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92) at org.apache.flink.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67) at org.apache.flink.shaded.okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:185) at org.apache.flink.shaded.okhttp3.RealCall$AsyncCall.execute(RealCall.java:135) at org.apache.flink.shaded.okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
Re: Error while retrieving the leader gateway after making Flink config changes
This issue had to do with the update strategy for the Flink deployment. When I changed it to the following, it will work: strategy: type: RollingUpdate rollingUpdate: maxSurge: 0 maxUnavailable: 1 On Tue, Nov 3, 2020 at 1:39 PM Robert Metzger wrote: > Thanks a lot for providing the logs. > > My theory of what is happening is the following: > 1. You are probably increasing the memory for the JobManager, when > changing the jobmanager.memory.flink.size configuration value > 2. Due to this changed memory configuration, Kubernetes, Docker or the > Linux kernel are killing your JobManager process because it allocates too > much memory. > > Flink should not stop like this. Fatal errors are logged explicitly, kill > signals are also logged. > Can you check Kubernetes, Docker, Linux for any signs that they are > killing your JobManager? > > > > On Tue, Nov 3, 2020 at 7:06 PM Claude M wrote: > >> Thanks for your reply Robert. Please see attached log from the job >> manager, the last line is the only thing I see different from a pod that >> starts up successfully. >> >> On Tue, Nov 3, 2020 at 10:41 AM Robert Metzger >> wrote: >> >>> Hi Claude, >>> >>> I agree that you should be able to restart individual pods with a >>> changed memory configuration. Can you share the full Jobmanager log of the >>> failed restart attempt? >>> >>> I don't think that the log statement you've posted explains a start >>> failure. >>> >>> Regards, >>> Robert >>> >>> On Tue, Nov 3, 2020 at 2:33 AM Claude M wrote: >>> >>>> >>>> Hello, >>>> >>>> I have Flink 1.10.2 installed in a Kubernetes cluster. >>>> Anytime I make a change to the flink.conf, the Flink jobmanager pod >>>> fails to restart. >>>> For example, I modified the following memory setting in the flink.conf: >>>> jobmanager.memory.flink.size. >>>> After I deploy the change, the pod fails to restart and the following >>>> is seen in the log: >>>> >>>> WARN >>>> org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever - >>>> Error while retrieving the leader gateway. Retrying to connect to >>>> akka.tcp://flink@flink-jobmanager:50010/user/dispatcher. >>>> >>>> The pod can be restored by doing one of the following but these are not >>>> acceptable solutions: >>>> >>>>- Revert the changes made to the flink.conf to the previous settings >>>>- Remove the Flink Kubernetes deployment before doing a deployment >>>>- Delete the flink cluster folder in Zookeeper >>>> >>>> I don't understand why making any changes in the flink.conf causes this >>>> problem. >>>> Any help is appreciated. >>>> >>>> >>>> Thank You >>>> >>>
Re: Error while retrieving the leader gateway after making Flink config changes
Thanks for your reply Robert. Please see attached log from the job manager, the last line is the only thing I see different from a pod that starts up successfully. On Tue, Nov 3, 2020 at 10:41 AM Robert Metzger wrote: > Hi Claude, > > I agree that you should be able to restart individual pods with a changed > memory configuration. Can you share the full Jobmanager log of the failed > restart attempt? > > I don't think that the log statement you've posted explains a start > failure. > > Regards, > Robert > > On Tue, Nov 3, 2020 at 2:33 AM Claude M wrote: > >> >> Hello, >> >> I have Flink 1.10.2 installed in a Kubernetes cluster. >> Anytime I make a change to the flink.conf, the Flink jobmanager pod fails >> to restart. >> For example, I modified the following memory setting in the flink.conf: >> jobmanager.memory.flink.size. >> After I deploy the change, the pod fails to restart and the following is >> seen in the log: >> >> WARN >> org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever - >> Error while retrieving the leader gateway. Retrying to connect to >> akka.tcp://flink@flink-jobmanager:50010/user/dispatcher. >> >> The pod can be restored by doing one of the following but these are not >> acceptable solutions: >> >>- Revert the changes made to the flink.conf to the previous settings >>- Remove the Flink Kubernetes deployment before doing a deployment >>- Delete the flink cluster folder in Zookeeper >> >> I don't understand why making any changes in the flink.conf causes this >> problem. >> Any help is appreciated. >> >> >> Thank You >> > Processing template /mnt/flink-conf/..2020_11_03_17_59_21.864132437/log4j-console.properties.tmpl to file /opt/flink/conf/log4j-console.properties Processing template /mnt/flink-conf/..2020_11_03_17_59_21.864132437/flink-conf.yaml.tmpl to file /opt/flink/conf/flink-conf.yaml Processing template /mnt/flink-conf/log4j-console.properties.tmpl to file /opt/flink/conf/log4j-console.properties Processing template /mnt/flink-conf/flink-conf.yaml.tmpl to file /opt/flink/conf/flink-conf.yaml Starting Job Manager FLINK-11843 zookeeper bug workaround start --- Processing cluster betacluster looking for orphans jobregistry will be listed in r.txt, and jobgraph j.txt FLINK-11843 zookeeper bug workaround end --- config file: blob.server.port: 6124 jobmanager.rpc.address: flink-betacluster-jobmanager jobmanager.rpc.port: 6123 query.server.port: 6125 high-availability: zookeeper high-availability.zookeeper.quorum: zookeeper-0.zk-quorum.default.svc.cluster.local:2181,zookeeper-1.zk-quorum.default.svc.cluster.local:2181,zookeeper-2.zk-quorum.default.svc.cluster.local:2181 high-availability.zookeeper.path.root: /flink high-availability.cluster-id: /betacluster high-availability.jobmanager.port: 50010 high-availability.zookeeper.client.connection-timeout: high-availability.zookeeper.client.session-timeout: akka.ask.timeout: 180s metrics.scope.jm: flink.jobmanager metrics.scope.jm.job: flink.jobmanager.job metrics.scope.tm: flink.taskmanager metrics.scope.tm.job: flink.taskmanager.job metrics.scope.task: flink.task metrics.scope.operator: flink.operator metrics.reporter.dghttp.class: org.apache.flink.metrics.datadog.DatadogHttpReporter metrics.reporter.dghttp.proxyHost: proxy.host metrics.reporter.dghttp.proxyPort: 3128 jobmanager.memory.flink.size: 1024m taskmanager.memory.flink.size: taskmanager.memory.jvm-metaspace.size: 128m cluster.evenly-spread-out-slots: true env.java.opts: -XX:+UseG1GC -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/opt/flink/log Starting standalonesession as a console application on host flink-betacluster-jobmanager-759cccbdf9-g2mhs. log4j:ERROR Could not find value for key log4j.appender.file log4j:ERROR Could not instantiate appender named "file". 2020-11-03 17:59:23,771 WARN org.apache.flink.configuration.GlobalConfiguration- Error while trying to split key and value in configuration file /opt/flink/conf/flink-conf.yaml:20: "high-availability.zookeeper.client.connection-timeout: " 2020-11-03 17:59:23,772 WARN org.apache.flink.configuration.GlobalConfiguration- Error while trying to split key and value in configuration file /opt/flink/conf/flink-conf.yaml:21: "high-availability.zookeeper.client.session-timeout: " 2020-11-03 17:59:23,773 WARN org.apache.flink.configuration.GlobalConfiguration- Error while trying to split key and value in configuration file /opt/flink/conf/flink-conf.yaml:53: "taskmanager.memory.flink.size: " 2020-11-03 17:59:24,223 WARN org.apache.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java clas
Error while retrieving the leader gateway after making Flink config changes
Hello, I have Flink 1.10.2 installed in a Kubernetes cluster. Anytime I make a change to the flink.conf, the Flink jobmanager pod fails to restart. For example, I modified the following memory setting in the flink.conf: jobmanager.memory.flink.size. After I deploy the change, the pod fails to restart and the following is seen in the log: WARN org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever - Error while retrieving the leader gateway. Retrying to connect to akka.tcp://flink@flink-jobmanager:50010/user/dispatcher. The pod can be restored by doing one of the following but these are not acceptable solutions: - Revert the changes made to the flink.conf to the previous settings - Remove the Flink Kubernetes deployment before doing a deployment - Delete the flink cluster folder in Zookeeper I don't understand why making any changes in the flink.conf causes this problem. Any help is appreciated. Thank You
Re: metaspace out-of-memory & error while retrieving the leader gateway
I have 35 task managers, 1 slot on each. I'm running a total of 7 jobs in the cluster. All the slots are occupied. When you say that 33 instances of the ChildFirstClassLoader does not sound right, what should I be expecting? Could the number of jobs running in the cluster contribute to the out of memory? I used to have 26 task managers in this cluster w/ 5 jobs. I added 9 additional task managers and 2 jobs. I noticed this problem started occurring after I made these additions. If this is the cause of the problem, how can it be resolved? On Thu, Sep 24, 2020 at 1:06 AM Xintong Song wrote: > How many slots do you have on each task manager? > > Flink uses ChildFirstClassLoader for loading user codes, to avoid > dependency conflicts between user codes and Flink's framework. Ideally, > after a slot is freed and reassigned to a new job, the user class loaders > of the previous job should be unloaded. 33 instances of them does not > sound right. It might be worth looking into where the references that keep > these instances alive come from. > > Flink 1.10.3 is not released yet. If you want to try the unreleased > version, you would need to download the sources [1], build the flink > distribution [2] and build your custom image (from the 1.0.2 image and > replace the flink distribution with the one you built). > > Thank you~ > > Xintong Song > > > [1] https://github.com/apache/flink/tree/release-1.10 > > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.10/flinkDev/building.html > > > > On Wed, Sep 23, 2020 at 8:29 PM Claude M wrote: > >> It was mentioned that this issue may be fixed in 1.10.3 but there is no >> 1.10.3 docker image here: https://hub.docker.com/_/flink >> >> >> On Wed, Sep 23, 2020 at 7:14 AM Claude M wrote: >> >>> In regards to the metaspace memory issue, I was able to get a heap dump >>> and the following is the output: >>> >>> Problem Suspect 1 >>> One instance of *"java.lang.ref.Finalizer"* loaded by *">> loader>"* occupies *4,112,624 (11.67%)* bytes. The instance is >>> referenced by *sun.misc.Cleaner @ 0xb5d6b520* , loaded by *">> class loader>"*. The memory is accumulated in one instance of >>> *"java.lang.Object[]"* loaded by *""*. >>> >>> Problem Suspect 2 >>> 33 instances of *"org.apache.flink.util.ChildFirstClassLoader"*, loaded >>> by *"sun.misc.Launcher$AppClassLoader @ 0xb4068680"* occupy *6,615,416 >>> (18.76%)*bytes. >>> >>> Based on this, I'm not clear on what needs to be done to solve this. >>> >>> >>> On Tue, Sep 22, 2020 at 3:10 PM Claude M wrote: >>> >>>> Thanks for your responses. >>>> 1. There were no job re-starts prior to the metaspace OEM. >>>> 2. I tried increasing the CPU request and still encountered the >>>> problem. Any configuration change I make to the job manager, whether it's >>>> in the flink-conf.yaml or increasing the pod's CPU/memory request, results >>>> with this problem. >>>> >>>> >>>> On Tue, Sep 22, 2020 at 12:04 AM Xintong Song >>>> wrote: >>>> >>>>> Thanks for the input, Brain. >>>>> >>>>> This looks like what we are looking for. The issue is fixed in 1.10.3, >>>>> which also matches this problem occurred in 1.10.2. >>>>> >>>>> Maybe Claude can further confirm it. >>>>> >>>>> Thank you~ >>>>> >>>>> Xintong Song >>>>> >>>>> >>>>> >>>>> On Tue, Sep 22, 2020 at 10:57 AM Zhou, Brian wrote: >>>>> >>>>>> Hi Xintong and Claude, >>>>>> >>>>>> >>>>>> >>>>>> In our internal tests, we also encounter these two issues and we >>>>>> spent much time debugging them. There are two points I need to confirm if >>>>>> we share the same problem. >>>>>> >>>>>>1. Your job is using default restart strategy, which is >>>>>>per-second restart. >>>>>>2. Your CPU resource on jobmanager might be small >>>>>> >>>>>> >>>>>> >>>>>> Here is some findings I want to share. >>>>>> >>>>>> ## Metaspace OOM >>>>>> >>>>>> Due to https://issues.apache.org/jira/browse/FLINK-15467 , when we >
Re: metaspace out-of-memory & error while retrieving the leader gateway
It was mentioned that this issue may be fixed in 1.10.3 but there is no 1.10.3 docker image here: https://hub.docker.com/_/flink On Wed, Sep 23, 2020 at 7:14 AM Claude M wrote: > In regards to the metaspace memory issue, I was able to get a heap dump > and the following is the output: > > Problem Suspect 1 > One instance of *"java.lang.ref.Finalizer"* loaded by *" loader>"* occupies *4,112,624 (11.67%)* bytes. The instance is referenced > by *sun.misc.Cleaner @ 0xb5d6b520* , loaded by *""*. > The memory is accumulated in one instance of *"java.lang.Object[]"* loaded > by *""*. > > Problem Suspect 2 > 33 instances of *"org.apache.flink.util.ChildFirstClassLoader"*, loaded by > *"sun.misc.Launcher$AppClassLoader @ 0xb4068680"* occupy *6,615,416 > (18.76%)*bytes. > > Based on this, I'm not clear on what needs to be done to solve this. > > > On Tue, Sep 22, 2020 at 3:10 PM Claude M wrote: > >> Thanks for your responses. >> 1. There were no job re-starts prior to the metaspace OEM. >> 2. I tried increasing the CPU request and still encountered the >> problem. Any configuration change I make to the job manager, whether it's >> in the flink-conf.yaml or increasing the pod's CPU/memory request, results >> with this problem. >> >> >> On Tue, Sep 22, 2020 at 12:04 AM Xintong Song >> wrote: >> >>> Thanks for the input, Brain. >>> >>> This looks like what we are looking for. The issue is fixed in 1.10.3, >>> which also matches this problem occurred in 1.10.2. >>> >>> Maybe Claude can further confirm it. >>> >>> Thank you~ >>> >>> Xintong Song >>> >>> >>> >>> On Tue, Sep 22, 2020 at 10:57 AM Zhou, Brian wrote: >>> >>>> Hi Xintong and Claude, >>>> >>>> >>>> >>>> In our internal tests, we also encounter these two issues and we spent >>>> much time debugging them. There are two points I need to confirm if we >>>> share the same problem. >>>> >>>>1. Your job is using default restart strategy, which is per-second >>>>restart. >>>>2. Your CPU resource on jobmanager might be small >>>> >>>> >>>> >>>> Here is some findings I want to share. >>>> >>>> ## Metaspace OOM >>>> >>>> Due to https://issues.apache.org/jira/browse/FLINK-15467 , when we >>>> have some job restarts, there will be some threads from the sourceFunction >>>> hanging, cause the class loader cannot close. New restarts would load new >>>> classes, then expand the metaspace, and finally OOM happens. >>>> >>>> >>>> >>>> ## Leader retrieving >>>> >>>> Constant restarts may be heavy for jobmanager, if JM CPU resources are >>>> not enough, the thread for leader retrieving may be stuck. >>>> >>>> >>>> >>>> Best Regards, >>>> >>>> Brian >>>> >>>> >>>> >>>> *From:* Xintong Song >>>> *Sent:* Tuesday, September 22, 2020 10:16 >>>> *To:* Claude M; user >>>> *Subject:* Re: metaspace out-of-memory & error while retrieving the >>>> leader gateway >>>> >>>> >>>> >>>> ## Metaspace OOM >>>> >>>> As the error message already suggested, the metaspace OOM you >>>> encountered is likely caused by a class loading leak. I think you are on >>>> the right direction trying to look into the heap dump and find out where >>>> the leak comes from. IIUC, after removing the ZK folder, you are now able >>>> to run Flink with the heap dump options. >>>> >>>> >>>> >>>> The problem does not occur in previous versions because Flink starts to >>>> set the metaspace limit since the 1.10 release. The class loading leak >>>> might have already been there, but is never discovered. This could lead to >>>> unpredictable stability and performance issues. That's why Flink updated >>>> its memory model and explicitly set the metaspace limit in the 1.10 >>>> release. >>>> >>>> >>>> >>>> ## Leader retrieving >>>> >>>> The command looks good to me. If this problem happens only once, it >>>> could be irrelevant to adding the options. If
Re: metaspace out-of-memory & error while retrieving the leader gateway
In regards to the metaspace memory issue, I was able to get a heap dump and the following is the output: Problem Suspect 1 One instance of *"java.lang.ref.Finalizer"* loaded by *""* occupies *4,112,624 (11.67%)* bytes. The instance is referenced by *sun.misc.Cleaner @ 0xb5d6b520* , loaded by *""*. The memory is accumulated in one instance of *"java.lang.Object[]"* loaded by *""*. Problem Suspect 2 33 instances of *"org.apache.flink.util.ChildFirstClassLoader"*, loaded by *"sun.misc.Launcher$AppClassLoader @ 0xb4068680"* occupy *6,615,416 (18.76%)*bytes. Based on this, I'm not clear on what needs to be done to solve this. On Tue, Sep 22, 2020 at 3:10 PM Claude M wrote: > Thanks for your responses. > 1. There were no job re-starts prior to the metaspace OEM. > 2. I tried increasing the CPU request and still encountered the problem. > Any configuration change I make to the job manager, whether it's in the > flink-conf.yaml or increasing the pod's CPU/memory request, results > with this problem. > > > On Tue, Sep 22, 2020 at 12:04 AM Xintong Song > wrote: > >> Thanks for the input, Brain. >> >> This looks like what we are looking for. The issue is fixed in 1.10.3, >> which also matches this problem occurred in 1.10.2. >> >> Maybe Claude can further confirm it. >> >> Thank you~ >> >> Xintong Song >> >> >> >> On Tue, Sep 22, 2020 at 10:57 AM Zhou, Brian wrote: >> >>> Hi Xintong and Claude, >>> >>> >>> >>> In our internal tests, we also encounter these two issues and we spent >>> much time debugging them. There are two points I need to confirm if we >>> share the same problem. >>> >>>1. Your job is using default restart strategy, which is per-second >>>restart. >>>2. Your CPU resource on jobmanager might be small >>> >>> >>> >>> Here is some findings I want to share. >>> >>> ## Metaspace OOM >>> >>> Due to https://issues.apache.org/jira/browse/FLINK-15467 , when we have >>> some job restarts, there will be some threads from the sourceFunction >>> hanging, cause the class loader cannot close. New restarts would load new >>> classes, then expand the metaspace, and finally OOM happens. >>> >>> >>> >>> ## Leader retrieving >>> >>> Constant restarts may be heavy for jobmanager, if JM CPU resources are >>> not enough, the thread for leader retrieving may be stuck. >>> >>> >>> >>> Best Regards, >>> >>> Brian >>> >>> >>> >>> *From:* Xintong Song >>> *Sent:* Tuesday, September 22, 2020 10:16 >>> *To:* Claude M; user >>> *Subject:* Re: metaspace out-of-memory & error while retrieving the >>> leader gateway >>> >>> >>> >>> ## Metaspace OOM >>> >>> As the error message already suggested, the metaspace OOM you >>> encountered is likely caused by a class loading leak. I think you are on >>> the right direction trying to look into the heap dump and find out where >>> the leak comes from. IIUC, after removing the ZK folder, you are now able >>> to run Flink with the heap dump options. >>> >>> >>> >>> The problem does not occur in previous versions because Flink starts to >>> set the metaspace limit since the 1.10 release. The class loading leak >>> might have already been there, but is never discovered. This could lead to >>> unpredictable stability and performance issues. That's why Flink updated >>> its memory model and explicitly set the metaspace limit in the 1.10 release. >>> >>> >>> >>> ## Leader retrieving >>> >>> The command looks good to me. If this problem happens only once, it >>> could be irrelevant to adding the options. If that does not block you from >>> getting the heap dump, we can look into it later. >>> >>> >>> Thank you~ >>> >>> Xintong Song >>> >>> >>> >>> >>> >>> On Mon, Sep 21, 2020 at 9:37 PM Claude M wrote: >>> >>> Hi Xintong, >>> >>> >>> >>> Thanks for your reply. Here is the command output w/ the java.opts: >>> >>> >>> >>> /usr/local/openjdk-8/bin/java -Xms768m -Xmx768m -XX:+UseG1GC >>> -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/opt/flink/log >>> -Dlog4j.c
Re: metaspace out-of-memory & error while retrieving the leader gateway
Thanks for your responses. 1. There were no job re-starts prior to the metaspace OEM. 2. I tried increasing the CPU request and still encountered the problem. Any configuration change I make to the job manager, whether it's in the flink-conf.yaml or increasing the pod's CPU/memory request, results with this problem. On Tue, Sep 22, 2020 at 12:04 AM Xintong Song wrote: > Thanks for the input, Brain. > > This looks like what we are looking for. The issue is fixed in 1.10.3, > which also matches this problem occurred in 1.10.2. > > Maybe Claude can further confirm it. > > Thank you~ > > Xintong Song > > > > On Tue, Sep 22, 2020 at 10:57 AM Zhou, Brian wrote: > >> Hi Xintong and Claude, >> >> >> >> In our internal tests, we also encounter these two issues and we spent >> much time debugging them. There are two points I need to confirm if we >> share the same problem. >> >>1. Your job is using default restart strategy, which is per-second >>restart. >>2. Your CPU resource on jobmanager might be small >> >> >> >> Here is some findings I want to share. >> >> ## Metaspace OOM >> >> Due to https://issues.apache.org/jira/browse/FLINK-15467 , when we have >> some job restarts, there will be some threads from the sourceFunction >> hanging, cause the class loader cannot close. New restarts would load new >> classes, then expand the metaspace, and finally OOM happens. >> >> >> >> ## Leader retrieving >> >> Constant restarts may be heavy for jobmanager, if JM CPU resources are >> not enough, the thread for leader retrieving may be stuck. >> >> >> >> Best Regards, >> >> Brian >> >> >> >> *From:* Xintong Song >> *Sent:* Tuesday, September 22, 2020 10:16 >> *To:* Claude M; user >> *Subject:* Re: metaspace out-of-memory & error while retrieving the >> leader gateway >> >> >> >> ## Metaspace OOM >> >> As the error message already suggested, the metaspace OOM you encountered >> is likely caused by a class loading leak. I think you are on the right >> direction trying to look into the heap dump and find out where the leak >> comes from. IIUC, after removing the ZK folder, you are now able to run >> Flink with the heap dump options. >> >> >> >> The problem does not occur in previous versions because Flink starts to >> set the metaspace limit since the 1.10 release. The class loading leak >> might have already been there, but is never discovered. This could lead to >> unpredictable stability and performance issues. That's why Flink updated >> its memory model and explicitly set the metaspace limit in the 1.10 release. >> >> >> >> ## Leader retrieving >> >> The command looks good to me. If this problem happens only once, it could >> be irrelevant to adding the options. If that does not block you from >> getting the heap dump, we can look into it later. >> >> >> Thank you~ >> >> Xintong Song >> >> >> >> >> >> On Mon, Sep 21, 2020 at 9:37 PM Claude M wrote: >> >> Hi Xintong, >> >> >> >> Thanks for your reply. Here is the command output w/ the java.opts: >> >> >> >> /usr/local/openjdk-8/bin/java -Xms768m -Xmx768m -XX:+UseG1GC >> -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/opt/flink/log >> -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties >> -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml >> -classpath >> /opt/flink/lib/flink-metrics-datadog-statsd-2.11-0.1.jar:/opt/flink/lib/flink-shaded-hadoop-2-uber-2.7.5-10.0.jar:/opt/flink/lib/flink-table-blink_2.11-1.10.2.jar:/opt/flink/lib/flink-table_2.11-1.10.2.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.15.jar:/opt/flink/lib/flink-dist_2.11-1.10.2.jar::/etc/hadoop/conf: >> org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint >> --configDir /opt/flink/conf --executionMode cluster >> >> >> >> To answer your questions: >> >>- Correct, in order for the pod to start up, I have to remove the >>flink app folder from zookeeper. I only have to delete once after >> applying >>the java.opts arguments. It doesn't make sense though that I should have >>to do this just from adding a parameter. >>- I'm using the standalone deployment. >>- I'm using job cluster mode. >> >> A higher priority issue I'm trying to solve is this metaspace out of >> memory that is occ
metaspace out-of-memory & error while retrieving the leader gateway
Hello, I upgraded from Flink 1.7.2 to 1.10.2. One of the jobs running on the task managers is periodically crashing w/ the following error: java.lang.OutOfMemoryError: Metaspace. The metaspace out-of-memory error has occurred. This can mean two things: either the job requires a larger size of JVM metaspace to load classes or there is a class loading leak. In the first case 'taskmanager.memory.jvm-metaspace.size' configuration option should be increased. If the error persists (usually in cluster after several job (re-)submissions) then there is probably a class loading leak which has to be investigated and fixed. The task executor has to be shutdown. I found this issue regarding it: https://issues.apache.org/jira/browse/FLINK-16406 I have tried increasing the taskmanager.memory.jvm-metaspace.size to 256M & 512M and still was having the problem. I then added the following to the flink.conf to try to get more information about the error: env.java.opts: -XX:+UseG1GC -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/opt/flink/log When I deployed the change which is in a Kubernetes cluster, the jobmanager pod fails to start up and the following message shows repeatedly: 2020-09-18 17:03:46,255 WARN org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever - Error while retrieving the leader gateway. Retrying to connect to akka.tcp://flink@flink-jobmanager:50010/user/dispatcher. The only way I can resolve this is to delete the folder from zookeeper which I shouldn't have to do. Any ideas on these issues?