Re: [BULK]Re: [SURVEY] Remove Mesos support
Hello Xintong, Thanks for the insights and support. Browsing the Mesos backlog and didn't identify anything critical, which is left there. I see that there are were quite a lot of contributions to the Flink Mesos in the recent version: https://github.com/apache/flink/commits/master/flink-mesos. We plan to validate the current Flink master (or release 1.12 branch) our Mesos setup. In case of any issues, we will try to propose changes. My feeling is that our test results shouldn't affect the Flink 1.12 release cycle. And if any potential commits will land into the 1.12.1 it should be totally fine. In the future, we would be glad to help you guys with any maintenance-related questions. One of the highest priorities around this component seems to be the development of the full e2e test. Kind Regards Oleksandr Nitavskyi From: Xintong Song Sent: Tuesday, October 27, 2020 7:14 AM To: dev ; user Cc: Piyush Narang Subject: [BULK]Re: [SURVEY] Remove Mesos support Hi Piyush, Thanks a lot for sharing the information. It would be a great relief that you are good with Flink on Mesos as is. As for the jira issues, I believe the most essential ones should have already been resolved. You may find some remaining open issues here [1], but not all of them are necessary if we decide to keep Flink on Mesos as is. At the moment and in the short future, I think helps are mostly needed on testing the upcoming release 1.12 with Mesos use cases. The community is currently actively preparing the new release, and hopefully we could come up with a release candidate early next month. It would be greatly appreciated if you fork as experienced Flink on Mesos users can help with verifying the release candidates. Thank you~ Xintong Song [1] https://issues.apache.org/jira/browse/FLINK-17402?jql=project%20%3D%20FLINK%20AND%20component%20%3D%20%22Deployment%20%2F%20Mesos%22%20AND%20status%20%3D%20Open<https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-17402%3Fjql%3Dproject%2520%253D%2520FLINK%2520AND%2520component%2520%253D%2520%2522Deployment%2520%252F%2520Mesos%2522%2520AND%2520status%2520%253D%2520Open=04%7C01%7Co.nitavskyi%40criteo.com%7C3585e1f25bdf4e091af808d87a3f92db%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637393760750820881%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=hytJFQE0MCPzMLiQTQTdbg3GVckX5M3r1NPRGrRV8j4%3D=0> On Tue, Oct 27, 2020 at 2:58 AM Piyush Narang mailto:p.nar...@criteo.com>> wrote: Hi Xintong, Do you have any jiras that cover any of the items on 1 or 2? I can reach out to folks internally and see if I can get some folks to commit to helping out. To cover the other qs: * Yes, we’ve not got a plan at the moment to get off Mesos. We use Yarn for some our Flink workloads when we can. Mesos is only used when we need streaming capabilities in our WW dcs (as our Yarn is centralized in one DC) * We’re currently on Flink 1.9 (old planner). We have a plan to bump to 1.11 / 1.12 this quarter. * We typically upgrade once every 6 months to a year (not every release). We’d like to speed up the cadence but we’re not there yet. * We’d largely be good with keeping Flink on Mesos as-is and functional while missing out on some of the newer features. We understand the pain on the communities side and we can take on the work if we see some fancy improvement in Flink on Yarn / K8s that we want in Mesos to put in the request to port it over. Thanks, -- Piyush From: Xintong Song mailto:tonysong...@gmail.com>> Date: Sunday, October 25, 2020 at 10:57 PM To: dev mailto:d...@flink.apache.org>>, user mailto:user@flink.apache.org>> Cc: Lasse Nedergaard mailto:lassenedergaardfl...@gmail.com>>, mailto:p.nar...@criteo.com>> Subject: Re: [SURVEY] Remove Mesos support Thanks for sharing the information with us, Piyush an Lasse. @Piyush Thanks for offering the help. IMO, there are currently several problems that make supporting Flink on Mesos challenging for us. 1. Lack of Mesos experts. AFAIK, there are very few people (if not none) among the active contributors in this community that are familiar with Mesos and can help with development on this component. 2. Absence of tests. Mesos does not provide a testing cluster, like `MiniYARNCluster`, making it hard to test interactions between Flink and Mesos. We have only a few very simple e2e tests running on Mesos deployed in a docker, covering the most fundamental workflows. We are not sure how well those tests work, especially against some potential corner cases. 3. Divergence from other deployment. Because of 1 and 2, the new efforts (features, maintenance, refactors) tend to exclude Mesos if possible. When the new efforts have to touch the Mesos related components (e.g., changes to the common resource manag
Re: [BULK]Re: DisableGenericTypes is not compatible with Kafka
Thanks, guys for the answers. Aljoscha, I have a question to ensure I get it right. Am I correctly understand that this newly created TypeSerializer should use Kryo under the hood, so we keep the backward compatibility of the state and do not get an exception if generic types are disabled? Thanks Kind Regards Oleksandr From: Aljoscha Krettek Sent: Tuesday, February 4, 2020 2:29 PM To: user@flink.apache.org Subject: [BULK]Re: DisableGenericTypes is not compatible with Kafka Unfortunately, the fact that the Kafka Sources use Kryo for state serialization is a very early design misstep that we cannot get rid of for now. We will get rid of that when the new source interface lands ([1]) and when we have a new Kafka Source based on that. As a workaround, we should change the Kafka Consumer to go through a different constructor of ListStateDescriptor which directly takes a TypeSerializer instead of a TypeInformation here: [2]. This should sidestep the "no generic types" check. I created a Jira Issue for this: https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-15904data=02%7C01%7Co.nitavskyi%40criteo.com%7C798f14900d7a450b4f3508d7a9763f2a%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637164197634652116sdata=OIMcxBp5dh%2FxZQw%2BBWTEkQnMHh%2BzengVNvW%2B%2FZvZRbY%3Dreserved=0 Best, Aljoscha [1] https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FFLINK%2FFLIP-27%253A%2BRefactor%2BSource%2BInterfacedata=02%7C01%7Co.nitavskyi%40criteo.com%7C798f14900d7a450b4f3508d7a9763f2a%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637164197634662107sdata=sY7nurLvKaR7YnHIAr8ZFEdUmjuMfN%2BrYvMliCRSBh0%3Dreserved=0 [2] https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2F68cc21e4af71505efa142110e35a1f8b1c25fe6e%2Fflink-connectors%2Fflink-connector-kafka-base%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fstreaming%2Fconnectors%2Fkafka%2FFlinkKafkaConsumerBase.java%23L860data=02%7C01%7Co.nitavskyi%40criteo.com%7C798f14900d7a450b4f3508d7a9763f2a%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637164197634662107sdata=fB%2F%2FOK7sSA93TycaSV5Z0g8EPYglH8fSlRhRt3nJLVE%3Dreserved=0 On 01.02.20 09:44, Guowei Ma wrote: > Hi, > I think there could be two workaround ways to 'disableGenericType' in case > of KafkaSource : > 1. adding the TypeInfo annotation [1] to the KafaTopicPartition. > 2. using the reflection to call the private method. :) > > Maybe we could add this TypeInfo annotation to the KafakaConnector. > > [1] > https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.9%2Fdev%2Ftypes_serialization.html%23defining-type-information-using-a-factorydata=02%7C01%7Co.nitavskyi%40criteo.com%7C798f14900d7a450b4f3508d7a9763f2a%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637164197634662107sdata=YbnYb1Cjf%2BqotG8WkE8hC8ElpX9S2C%2BPDn464Hn5XyI%3Dreserved=0 > > Best, > Guowei > > > Oleksandr Nitavskyi 于2020年1月31日周五 上午12:40写道: > >> Hi guys, >> >> >> >> We have encountered on some issue related with possibility to >> ‘disableGenericTypes’ (disabling Kryo for the job). It seems a very nice as >> idea to ensure that nobody introduce some random change which penalize the >> performance of the job. >> >> >> >> The issue we have encountered is that Flink’s KafkaSource is storing >> KafkaTopicPartition in the state for offset recovery, which is serialized >> with Kryo. >> >> For sure this feature itself is not penalizing performance, but looks like >> it reduces the usefulness of the possibility to ‘disableGenericTypes’. Also >> on the side of Flink user there is no good tool to add >> KafkaTopicPartition’s non-Kryo type information. >> >> >> >> On of the related tickets I have found: >> https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-12031data=02%7C01%7Co.nitavskyi%40criteo.com%7C798f14900d7a450b4f3508d7a9763f2a%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637164197634662107sdata=%2BvVo6XdXdYbHgOQWO59On8zim4WR2yIPTVwUgUxql6w%3Dreserved=0 >> >> >> >> Do you know any workaround to ‘disableGenericType’ in case of KafkaSources >> or what do you think making some development to address this issue? >> >> >> >> Kind Regards >> >> Oleksandr >> >> >> >
DisableGenericTypes is not compatible with Kafka
Hi guys, We have encountered on some issue related with possibility to ‘disableGenericTypes’ (disabling Kryo for the job). It seems a very nice as idea to ensure that nobody introduce some random change which penalize the performance of the job. The issue we have encountered is that Flink’s KafkaSource is storing KafkaTopicPartition in the state for offset recovery, which is serialized with Kryo. For sure this feature itself is not penalizing performance, but looks like it reduces the usefulness of the possibility to ‘disableGenericTypes’. Also on the side of Flink user there is no good tool to add KafkaTopicPartition’s non-Kryo type information. On of the related tickets I have found: https://issues.apache.org/jira/browse/FLINK-12031 Do you know any workaround to ‘disableGenericType’ in case of KafkaSources or what do you think making some development to address this issue? Kind Regards Oleksandr
Re: Flink on Mesos
Hey guys. We have also made implementation in Flink on Mesos component in order to support network bandwidth configuration. Will somebody be able to have a look on our PR: https://github.com/apache/flink/pull/8652 There are for sure some details to clarify. Cheers Oleksandr From: Till Rohrmann Date: Friday 5 April 2019 at 16:46 To: Juan Gentile Cc: "user@flink.apache.org" , Oleksandr Nitavskyi Subject: Re: Flink on Mesos Hi Juan, thanks for reporting this issue. If you could open an issue and also provide a fix for it, then this would be awesome. Please let me know the ticket number so that I can monitor it and give your PR a review. Cheers, Till On Fri, Apr 5, 2019 at 5:34 AM Juan Gentile mailto:j.gent...@criteo.com>> wrote: Hello! We are having a small problem while trying to deploy Flink on Mesos using marathon. In our set up of Mesos we are required to specify the amount of disk space we want to have for the applications we deploy there. The current default value in Flink is 0 and it’s currently is not parameterizable. This means that we ask 0 disk space for our instances so Flink can’t work. I’d appreciate suggestions if you have any. Otherwise and since this is causing some problems on our side, I’d like to know if I can create a ticket on Flink and work on it; looks like the fix should be quite easy to implement. Thank you, Juan.
Re: 1.6 UI issues
Hello guys. Happy new year! Context: we started to have some troubles with UI after bumping our Flink version from 1.4 to 1.6.3. UI couldn’t render Job details page, so inspecting of the jobs for us has become impossible with the new version. And looks like we have a workaround for our UI issue. After some investigation we realized that starting from Flink 1.5 version we started to have a timeout on the actor call: restfulGateway.requestJob(jobId, timeout) in ExecutionGraphCache. So we have increased web.timeout parameter and we have stopped to have timeout exception on the JobManager side. Also in SingleJobController on the Angular JS side we needed to tweak web.refresh-interval in order to ensure that Front-End is waiting for back-end request to be finished. Otherwise Angular JS side can make another request in SingleJobController and don’t know why when older request is finished no UI has been changed. We will have a look closer on this behavior. Does it ring a bell for you probably? Thank you Kind Regards Oleksandr From: Till Rohrmann Date: Wednesday 19 December 2018 at 16:52 To: Juan Gentile Cc: "dwysakow...@apache.org" , Jeff Bean , Oleksandr Nitavskyi Subject: Re: 1.6 UI issues Hi Juan, thanks for the log. The log file does not contain anything suspicious. Are you sure that you sent me the right file? The timestamps don't seem to match. In the attached log, the job seems to run without problems. Cheers, Till On Wed, Dec 19, 2018 at 10:26 AM Juan Gentile mailto:j.gent...@criteo.com>> wrote: Hello Till, Dawid Sorry for the late response on this issue and thank you Jeff for helping us with this. Yes we are using 1.6.2 I attach the logs from the Job Master. Also we noticed this exception: 2018-12-19 08:50:10,497 ERROR org.apache.flink.runtime.rest.handler.job.JobDetailsHandler - Implementation error: Unhandled exception. java.util.concurrent.CancellationException at java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2263) at org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache.getExecutionGraph(ExecutionGraphCache.java:124) at org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler.handleRequest(AbstractExecutionGraphHandler.java:76) at org.apache.flink.runtime.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:78) at org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:154) at org.apache.flink.runtime.rest.handler.RedirectHandler.lambda$null$0(RedirectHandler.java:142) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:748) 2018-12-19 08:50:17,977 ERROR org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler - Implementation error: Unhandled exception. akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#-760166654]] after [1 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage". at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604) at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126) at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601) at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329) at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280) at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284) at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236) at java.lang.Thread.run(Thread.java:748) For which we tested with this parameter: -Dakka.ask.timeout=60s But the issue remains. Thank you Juan From: Till Rohrmann mailto:trohrm...@apache.org>> Date: Thursday, 8 November 2018 at 1
Flink 1.6, User Interface response time
Hello! We are migrating the the last 1.6.2 version and all the jobs seem to work fine, but when we check individual jobs through the web interface we encounter the issue that after clicking on a job, either it takes too long to load the information of the job or it never loads at all. Has anyone had this issue? I know that UI responsiveness is quite subjective, but is there anybody who notice also some degradation between Flink 1.4 and 1.6? Thank you, Juan
Re: Weird behaviour after change sources in a job.
Great ! So I have created a ticket: https://issues.apache.org/jira/browse/FLINK-10342 with a test which reproduces the issue: https://github.com/apache/flink/pull/6691/files If it seems reasonable I can create a fix for this. Regards Oleksandr From: Dawid Wysakowicz Date: Thursday, 13 September 2018 at 11:15 To: Oleksandr Nitavskyi Cc: R/Product Engineering/PRIME/Delight , "gor...@data-artisans.com" , Juan Gentile , "user@flink.apache.org" Subject: Re: Weird behaviour after change sources in a job. Hi Oleksandr, The mapping of state to operator is done based on operator id, not on its name. That's why changing source's name might not be enough. That actually might be a valuable addition to check if the restored partitions still match with the provided topic/topic pattern. Would you like to open jira ticket for it? Best, Dawid On 13/09/18 11:06, Oleksandr Nitavskyi wrote: Hello Dawid, Thank you for the answer. In our case we did change the name of the Kafka source so we expected it shouldn’t restore state for a given Kafka source operator. Anyway shouldn’t FlinkKafkaConsumerBase have a safeguard which do not allow restoring of the KafkaTopicPartitions from the topics which are different from the currently consumed one. Thank you Oleksandr From: Dawid Wysakowicz <mailto:dwysakow...@apache.org> Date: Thursday, 13 September 2018 at 09:59 To: Juan Gentile <mailto:j.gent...@criteo.com>, "user@flink.apache.org"<mailto:user@flink.apache.org> <mailto:user@flink.apache.org> Cc: R/Product Engineering/PRIME/Delight <mailto:deli...@criteo.com>, <mailto:gor...@data-artisans.com> Subject: Re: Weird behaviour after change sources in a job. Hi Juan, I think this is somehow expected behaviour. Flink, in order to provide proper processing semantics keeps track of partitions offsets internally, and checkpoints those offsets. FlinkKafkaConsumer supports also new partitions discovery. Having in mind both of those features, if you restart your job with savepoint/checkpoint but with changed topic, it will restore old partitions with offsets from checkpoint, and will discover partitions from the new topic. This is why it consumes from both old and new topic. If you defined your source manually (you were not using Kafka010TableSource) what you can do is set new uid for the source and enable allowNonRestoredState. This way you will keep state for all other operators, but you will lose information about offsets in Kafka. I also cc @Gordon, who might want to add something to this. On 12/09/18 18:03, Juan Gentile wrote: Hello! We have found a weird issue while replacing the source in one of our Flink SQL Jobs. We have a job which was reading from a Kafka topic (with externalize checkpoints) and we needed to change the topic while keeping the same logic for the job/SQL. After we restarted the job, instead of consuming from the new Kafka topic, it consumed from both! Duplicating the input of our job. We were able to reproduce the issue but we don’t understand if this is a bug or expected behavior and in this case we should have restarted from a clean state. We are using Flink 1.4 at the moment and Kafka 0.10.2.1 Thank you, Juan
Re: Weird behaviour after change sources in a job.
Hello Dawid, Thank you for the answer. In our case we did change the name of the Kafka source so we expected it shouldn’t restore state for a given Kafka source operator. Anyway shouldn’t FlinkKafkaConsumerBase have a safeguard which do not allow restoring of the KafkaTopicPartitions from the topics which are different from the currently consumed one. Thank you Oleksandr From: Dawid Wysakowicz Date: Thursday, 13 September 2018 at 09:59 To: Juan Gentile , "user@flink.apache.org" Cc: R/Product Engineering/PRIME/Delight , Subject: Re: Weird behaviour after change sources in a job. Hi Juan, I think this is somehow expected behaviour. Flink, in order to provide proper processing semantics keeps track of partitions offsets internally, and checkpoints those offsets. FlinkKafkaConsumer supports also new partitions discovery. Having in mind both of those features, if you restart your job with savepoint/checkpoint but with changed topic, it will restore old partitions with offsets from checkpoint, and will discover partitions from the new topic. This is why it consumes from both old and new topic. If you defined your source manually (you were not using Kafka010TableSource) what you can do is set new uid for the source and enable allowNonRestoredState. This way you will keep state for all other operators, but you will lose information about offsets in Kafka. I also cc @Gordon, who might want to add something to this. On 12/09/18 18:03, Juan Gentile wrote: Hello! We have found a weird issue while replacing the source in one of our Flink SQL Jobs. We have a job which was reading from a Kafka topic (with externalize checkpoints) and we needed to change the topic while keeping the same logic for the job/SQL. After we restarted the job, instead of consuming from the new Kafka topic, it consumed from both! Duplicating the input of our job. We were able to reproduce the issue but we don’t understand if this is a bug or expected behavior and in this case we should have restarted from a clean state. We are using Flink 1.4 at the moment and Kafka 0.10.2.1 Thank you, Juan
Table API, custom window
Hello guys, I am curious, is there a way to define custom window (assigners/trigger/evictor) for Table/Sql Flink API? Looks like documentation keep silence about this, but is there are plans for it? Or should we go with DataStream API in case we need such kind of functionality? Thanks Oleksandr Nitavskyi
CoreOptions.TMP_DIRS bug
Hello guys, We have discovered minor issue with Flink 1.5 on YARN particularly which was related with the way Flink manages temp paths (io.tmp.dirs ) in configuration: https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#io-tmp-dirs 1. From what we can see in the code, default option from documentation doesn’t correspond to the reality on YARN or on Mesos deployments. Looks like it equals to env variable ‘_FLINK_TMP_DIR’ on Mesos and to `LOCAL_DIRS` on Yarn. 2. The issue on Yarn is that it is impossible to have different LOCAL_DIRS on JobManager and TaskManager, despite LOCAL_DIRS value depends on the container. The issue is that CoreOptions.TMP_DIRS is configured to the default value during JobManager initialization and added to the configuration object. When TaskManager is launched the appropriate configuration object is cloned with LOCAL_DIRS which makes sense only for Job Manager container. When YARN container with TaskManager from his point of view CoreOptions.TMP_DIRS is always equal either to path in flink.yml or to the or to the LOCAL_DIRS of Job Manager (default behaviour). Is TaskManager’s container do not have an access to another folders, that folders allocated by YARN TaskManager cannot be started. Could you please confirm that it is a bug and I will create a Jira ticket to track it? Thanks Kind Regards Oleksandr Nitavskyi
Flink long-running streaming job, Keytab authentication
Hello all, I have a question about Kerberos authentication in Yarn environment for long running streaming job. According to the documentation ( https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/security-kerberos.html#yarnmesos-mode ) Flink’s solution is to use keytab in order to perform authentication in YARN perimeter. If keytab is configured, Flink uses UserGroupInformation#loginUserFromKeytab method in order to perform authentication. In the YARN Security documentation ( https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md#keytabs-for-am-and-containers-distributed-via-yarn ) mentioned that it should be enough: Launched containers must themselves log in via UserGroupInformation.loginUserFromKeytab(). UGI handles the login, and schedules a background thread to relogin the user periodically. But in reality if we check the Source code of UGI, we can see that no background Thread is created: https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java#L1153. There are just created javax.security.auth.login.LoginContext and performed authentication. Looks like it is true for different Hadoop branches - 2.7, 2.8, 3.0, trunk. So Flink also doesn’t create any background Threads: https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java#L69. So in my case job loses credentials for ResourceManager and HDFS after some time (12 hours in my case). Looks like UGI’s code is not aligned with the documentation and it doesn’t relogin periodically. But do you think patching with background Thread which performs UGI#reloginUserFromKeytab can be a solution? P.S. We are running Flink as a single job on Yarn.