[RESULT][VOTE] FLIP-211: Kerberos delegation token framework
Hi devs, FLIP-211 [1] Has been accepted. There were 3 binding votes and 2 non-binding in favor. None against. Votes are in the order of arrival: Binding: Gyula Fora Marton Balassi Chesnay Schepler Non-binding: Junfan Zhang David Moravek [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-211%3A+Kerberos+delegation+token+framework BR, G
[jira] [Created] (FLINK-25998) Flink akka runs into NoClassDefFoundError on shutdown
Robert Metzger created FLINK-25998: -- Summary: Flink akka runs into NoClassDefFoundError on shutdown Key: FLINK-25998 URL: https://issues.apache.org/jira/browse/FLINK-25998 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.15.0 Reporter: Robert Metzger When trying to start a standalone jobmanager on an unavailable port, I see the following unexpected exception: {code} 2022-02-08 08:07:18,299 INFO akka.remote.Remoting [] - Starting remoting 2022-02-08 08:07:18,357 ERROR akka.remote.transport.netty.NettyTransport [] - failed to bind to /0.0.0.0:6123, shutting down Netty transport 2022-02-08 08:07:18,373 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Shutting StandaloneApplicationClusterEntryPoint down with application status FAILED. Diagnostics java.net.BindException: Could not start actor system on any port in port range 6123 at org.apache.flink.runtime.rpc.akka.AkkaBootstrapTools.startRemoteActorSystem(AkkaBootstrapTools.java:133) at org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils$AkkaRpcServiceBuilder.createAndStart(AkkaRpcServiceUtils.java:358) at org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils$AkkaRpcServiceBuilder.createAndStart(AkkaRpcServiceUtils.java:327) at org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils$AkkaRpcServiceBuilder.createAndStart(AkkaRpcServiceUtils.java:247) at org.apache.flink.runtime.rpc.RpcUtils.createRemoteRpcService(RpcUtils.java:191) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:334) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:253) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:203) at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:200) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:684) at org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint.main(StandaloneApplicationClusterEntryPoint.java:82) . 2022-02-08 08:07:18,377 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator[] - Shutting down remote daemon. 2022-02-08 08:07:18,377 ERROR org.apache.flink.util.FatalExitExceptionHandler [] - FATAL: Thread 'flink-akka.remote.default-remote-dispatcher-6' produced an uncaught exception. Stopping the process... java.lang.NoClassDefFoundError: akka/actor/dungeon/FaultHandling$$anonfun$handleNonFatalOrInterruptedException$1 at akka.actor.dungeon.FaultHandling.handleNonFatalOrInterruptedException(FaultHandling.scala:334) ~[flink-rpc-akka_ce724655-52fe-4b3a-8cdc-b79ab446e34d.jar:1.15-SNAPSHOT] at akka.actor.dungeon.FaultHandling.handleNonFatalOrInterruptedException$(FaultHandling.scala:334) ~[flink-rpc-akka_ce724655-52fe-4b3a-8cdc-b79ab446e34d.jar:1.15-SNAPSHOT] at akka.actor.ActorCell.handleNonFatalOrInterruptedException(ActorCell.scala:411) ~[flink-rpc-akka_ce724655-52fe-4b3a-8cdc-b79ab446e34d.jar:1.15-SNAPSHOT] at akka.actor.ActorCell.invoke(ActorCell.scala:551) ~[flink-rpc-akka_ce724655-52fe-4b3a-8cdc-b79ab446e34d.jar:1.15-SNAPSHOT] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) ~[flink-rpc-akka_ce724655-52fe-4b3a-8cdc-b79ab446e34d.jar:1.15-SNAPSHOT] at akka.dispatch.Mailbox.run(Mailbox.scala:231) ~[flink-rpc-akka_ce724655-52fe-4b3a-8cdc-b79ab446e34d.jar:1.15-SNAPSHOT] at akka.dispatch.Mailbox.exec(Mailbox.scala:243) [flink-rpc-akka_ce724655-52fe-4b3a-8cdc-b79ab446e34d.jar:1.15-SNAPSHOT] at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_312] at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) [?:1.8.0_312] at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) [?:1.8.0_312] at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) [?:1.8.0_312] Caused by: java.lang.ClassNotFoundException: akka.actor.dungeon.FaultHandling$$anonfun$handleNonFatalOrInterruptedException$1 at java.net.URLClassLoader.findClass(URLClassLoader.java:387) ~[?:1.8.0_312] at java.lang.ClassLoader.loadClass(ClassLoader.java:419) ~[?:1.8.0_312] at org.apache.flink.core.classloading.ComponentClassLoader.loadClassFromComponentOnly(ComponentClassLoader.java:149) ~[flink-dist-1.15-SNAPSHOT.jar:1.15-SNAPSHOT] at org.apache.flink.core.classloading.ComponentClassLoader.loadClass(ComponentClassLoader
Re: [DISCUSS] FLIP-212: Introduce Flink Kubernetes Operator
Thanks @Thomas and @Gyula. +1 to only introduce necessary and reasonable shorthand proxy parameters. Best, Yangze Guo On Tue, Feb 8, 2022 at 12:47 PM Thomas Weise wrote: > > @Yangze thanks for bringing up the configuration priority. This is > quite important indeed and should be mentioned in the FLIP. > > I agree with the sentiment that whenever possible we should use the > native configuration directly (either Flink native settings or k8s pod > template), rather than introducing proxy parameters in the CRD. That > certainly applies to taskManager.taskSlots which can be specified > directly under flinkConfiguration. > > Thanks @Alexis for the pointers! > > Regarding memory: I'm leaning towards starting from total memory at > the k8s resource level and let Flink derive components by default. For > many users that would be a more intuitive approach than specifying the > components. So container memory -> taskmanager.memory.process.size -> > [1] > > With that approach we could also extract the resource spec from the > pod template. Although setting memory is something necessary pretty > much always and defining the pod template not necessarily. Having the > shorthand proxy parameter may be a good compromise. > > Cheers, > Thomas > > [1] > https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/memory/mem_setup/ > > On Mon, Feb 7, 2022 at 4:33 AM Alexis Sarda-Espinosa > wrote: > > > > Danny Cranmer mentioned they are interested in standalone mode, and I am > > too, so I just wanted to say that if that development starts in parallel, I > > might be able to contribute a little. > > > > Regarding the CRD, I agree it would be nice to avoid as many "duplications" > > as possible if pod templates are to be used. In my PoC I even tried to make > > use of existing configuration options like kubernetes.container.image & > > pipeline.jars [1]. For CPU/Memory resources, the discussion in [2] might be > > relevant. > > > > [1] > > https://github.com/MicroFocus/opsb-flink-k8s-operator/blob/main/kubernetes/sample_batch_job.yaml > > [2] https://issues.apache.org/jira/browse/FLINK-24150 > > > > Regards, > > Alexis. > > > > -Original Message- > > From: K Fred > > Sent: Montag, 7. Februar 2022 09:36 > > To: dev@flink.apache.org > > Subject: Re: [DISCUSS] FLIP-212: Introduce Flink Kubernetes Operator > > > > Hi Gyula! > > > > You are right. I think some common flink config options can be put in the > > CR, other expert settings continue to be overwritten by flink, and then the > > user can choose to customize the configuration. > > > > Best Wishes, > > Peng Yuan > > > > On Mon, Feb 7, 2022 at 4:16 PM Gyula Fóra wrote: > > > > > Hi Yangze! > > > > > > This is not set in stone at the moment but the way I think it should > > > work is that first class config options in the CR should always take > > > precedence over the Flink config. > > > > > > In general we should not introduce too many arbitrary config options > > > that duplicate the flink configs without good reasons but the ones we > > > introduce should overwrite flink configs. > > > > > > We should discuss and decide together what config options to keep in > > > the flink conf and what to bring on the CR level. Resource related > > > ones are difficult because on one hand they are integral to every > > > application, on the other hand there are many expert settings that we > > > should probably leave in the conf. > > > > > > Cheers, > > > Gyula > > > > > > > > > > > > On Mon, Feb 7, 2022 at 8:28 AM Yangze Guo wrote: > > > > > > > Thanks everyone for the great effort. The FLIP looks really good. > > > > > > > > I just want to make sure the configuration priority in the CR example. > > > > It seems the requests resources or "taskManager. taskSlots" will be > > > > transferred to Flink internal config, e.g. > > > > "taskmanager.memory.process.size" and > > > > "taskmanager.numberOfTaskSlots", and override the one in > > > > "flinkConfiguration". Am I understanding this correctly? > > > > > > > > Best, > > > > Yangze Guo > > > > > > > > On Mon, Feb 7, 2022 at 10:22 AM Xintong Song > > > > wrote: > > > > > > > > > > Sorry for the late reply. We were out due to the public holidays > > > > > in > > > > China. > > > > > > > > > > @Thomas, > > > > > > > > > > The intention is to support application management through > > > > > operator and > > > > CR, > > > > > > which means there won't be any 2 step submission process, which > > > > > > as > > > you > > > > > > allude to would defeat the purpose of this project. The CR > > > > > > example > > > > shows > > > > > > the application part. Please note that the bare cluster support > > > > > > is an > > > > > > *additional* feature for scenarios that require external job > > > > management. Is > > > > > > there anything on the FLIP page that creates a different impression? > > > > > > > > > > > > > > > > Sounds good to me. I don't remember what created the impression of > > > > > 2 > > > step > >
[jira] [Created] (FLINK-25997) FsStateChangelogWriter cannot work well when there is no change appended
Hangxiang Yu created FLINK-25997: Summary: FsStateChangelogWriter cannot work well when there is no change appended Key: FLINK-25997 URL: https://issues.apache.org/jira/browse/FLINK-25997 Project: Flink Issue Type: Bug Components: Runtime / State Backends Affects Versions: 1.15.0 Reporter: Hangxiang Yu If there is no change appended, we should use the old sqn, but we always use sqn.next() currently. -- This message was sent by Atlassian Jira (v8.20.1#820001)
Re: [DISCUSS] FLIP-212: Introduce Flink Kubernetes Operator
@Yangze thanks for bringing up the configuration priority. This is quite important indeed and should be mentioned in the FLIP. I agree with the sentiment that whenever possible we should use the native configuration directly (either Flink native settings or k8s pod template), rather than introducing proxy parameters in the CRD. That certainly applies to taskManager.taskSlots which can be specified directly under flinkConfiguration. Thanks @Alexis for the pointers! Regarding memory: I'm leaning towards starting from total memory at the k8s resource level and let Flink derive components by default. For many users that would be a more intuitive approach than specifying the components. So container memory -> taskmanager.memory.process.size -> [1] With that approach we could also extract the resource spec from the pod template. Although setting memory is something necessary pretty much always and defining the pod template not necessarily. Having the shorthand proxy parameter may be a good compromise. Cheers, Thomas [1] https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/memory/mem_setup/ On Mon, Feb 7, 2022 at 4:33 AM Alexis Sarda-Espinosa wrote: > > Danny Cranmer mentioned they are interested in standalone mode, and I am too, > so I just wanted to say that if that development starts in parallel, I might > be able to contribute a little. > > Regarding the CRD, I agree it would be nice to avoid as many "duplications" > as possible if pod templates are to be used. In my PoC I even tried to make > use of existing configuration options like kubernetes.container.image & > pipeline.jars [1]. For CPU/Memory resources, the discussion in [2] might be > relevant. > > [1] > https://github.com/MicroFocus/opsb-flink-k8s-operator/blob/main/kubernetes/sample_batch_job.yaml > [2] https://issues.apache.org/jira/browse/FLINK-24150 > > Regards, > Alexis. > > -Original Message- > From: K Fred > Sent: Montag, 7. Februar 2022 09:36 > To: dev@flink.apache.org > Subject: Re: [DISCUSS] FLIP-212: Introduce Flink Kubernetes Operator > > Hi Gyula! > > You are right. I think some common flink config options can be put in the CR, > other expert settings continue to be overwritten by flink, and then the user > can choose to customize the configuration. > > Best Wishes, > Peng Yuan > > On Mon, Feb 7, 2022 at 4:16 PM Gyula Fóra wrote: > > > Hi Yangze! > > > > This is not set in stone at the moment but the way I think it should > > work is that first class config options in the CR should always take > > precedence over the Flink config. > > > > In general we should not introduce too many arbitrary config options > > that duplicate the flink configs without good reasons but the ones we > > introduce should overwrite flink configs. > > > > We should discuss and decide together what config options to keep in > > the flink conf and what to bring on the CR level. Resource related > > ones are difficult because on one hand they are integral to every > > application, on the other hand there are many expert settings that we > > should probably leave in the conf. > > > > Cheers, > > Gyula > > > > > > > > On Mon, Feb 7, 2022 at 8:28 AM Yangze Guo wrote: > > > > > Thanks everyone for the great effort. The FLIP looks really good. > > > > > > I just want to make sure the configuration priority in the CR example. > > > It seems the requests resources or "taskManager. taskSlots" will be > > > transferred to Flink internal config, e.g. > > > "taskmanager.memory.process.size" and > > > "taskmanager.numberOfTaskSlots", and override the one in > > > "flinkConfiguration". Am I understanding this correctly? > > > > > > Best, > > > Yangze Guo > > > > > > On Mon, Feb 7, 2022 at 10:22 AM Xintong Song > > > wrote: > > > > > > > > Sorry for the late reply. We were out due to the public holidays > > > > in > > > China. > > > > > > > > @Thomas, > > > > > > > > The intention is to support application management through > > > > operator and > > > CR, > > > > > which means there won't be any 2 step submission process, which > > > > > as > > you > > > > > allude to would defeat the purpose of this project. The CR > > > > > example > > > shows > > > > > the application part. Please note that the bare cluster support > > > > > is an > > > > > *additional* feature for scenarios that require external job > > > management. Is > > > > > there anything on the FLIP page that creates a different impression? > > > > > > > > > > > > > Sounds good to me. I don't remember what created the impression of > > > > 2 > > step > > > > submission back then. I revisited the latest version of this FLIP > > > > and > > it > > > > looks good to me. > > > > > > > > @Gyula, > > > > > > > > Versioning: > > > > > Versioning will be independent from Flink and the operator will > > depend > > > on a > > > > > fixed flink version (in every given operator version). > > > > > This should be the exact same setup as with Stateful Functions ( > > > > > https://git
[jira] [Created] (FLINK-25996) Introduce job property isDynamicGraph to ExecutionConfig
Zhu Zhu created FLINK-25996: --- Summary: Introduce job property isDynamicGraph to ExecutionConfig Key: FLINK-25996 URL: https://issues.apache.org/jira/browse/FLINK-25996 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: Zhu Zhu Fix For: 1.15.0 To enable FLINK-25995 and FLINK-25046 only in dynamic graph scenario, we need a property ExecutionConfig#isDynamicGraph. In the first step, the property will be decided automatically, true iff config {{jobmanager.scheduler}} is {{AdaptiveBatch}}. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25995) Make implicit assumption of SQL local keyBy/groupBy explicit
Zhu Zhu created FLINK-25995: --- Summary: Make implicit assumption of SQL local keyBy/groupBy explicit Key: FLINK-25995 URL: https://issues.apache.org/jira/browse/FLINK-25995 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Affects Versions: 1.15.0 Reporter: Zhu Zhu Fix For: 1.15.0 If there are multiple consecutive the same groupBy(i.e. keyBy), SQL planner will change them except the first one to use forward partitioner, so that these operators can be chained to reduce unnecessary shuffles. However, sometimes the local keyBy operators are not chained (e.g. multiple inputs), and this kind of forward partitioners will turn into forward job edges. These forward edges still have the local keyBy assumption, so that they cannot be changed into rescale/rebalance edges, otherwise it can lead to incorrect results. This prevents the adaptive batch scheduler from determining parallelism for other forward edge downstream job vertices (see FLINK-25046). To solve it, I propose to introduce a new {{ForwardForRescalePartitioner}}. When SQL planner optimizes the case of multiple consecutive the same groupBy, it should use the proposed partitioner, so that the runtime framework can further decide whether the partitioner can be changed to rescale or not. -- This message was sent by Atlassian Jira (v8.20.1#820001)
Re: [DISCUSS] Checkpointing (partially) failing jobs
Hi, I'm thinking about Yuan's case. Let's assume that the case is running in current Flink: 1. CP8 finishes 2. For some reason, PR2 stops consuming records from the source (but is not stuck), and PR1 continues consuming new records. 3. CP9 and CP10 finish 4. PR2 starts to consume quickly to catch up with PR1, and reaches the same final status with that in Yuan's case before CP11 starts. I support that in this case, the status of the job can be the same as in Yuan's case, and the snapshot (including source states) taken at CP10 should be the same as the composed global snapshot in Yuan's case, which is combining CP10 of PR1 and CP8 of PR2. This should be true if neither failed checkpointing nor uncommitted consuming have side effects, both of which can break the exactly-once semantics when replaying. So I think there should be no difference between rescaling the combined global snapshot or the globally taken one, i.e. if the input partitions are not independent, we are probably not able to rescale the source state in the current Flink eiter. And @Thomas, I do agree that the operational burden is significantly reduced, while I'm a little afraid that checkpointing the subgraphs individually may increase most of the runtime overhead back again. Maybe we can find a better way to implement this. On Tue, Feb 8, 2022 at 5:11 AM Thomas Weise wrote: > Hi, > > Thanks for opening this discussion! The proposed enhancement would be > interesting for use cases in our infrastructure as well. > > There are scenarios where it makes sense to have multiple disconnected > subgraphs in a single job because it can significantly reduce the > operational burden as well as the runtime overhead. Since we allow > subgraphs to recover independently, then why not allow them to make > progress independently also, which would imply that checkpointing must > succeed for non affected subgraphs as certain behavior is tied to > checkpoint completion, like Kafka offset commit, file output etc. > > As for source offset redistribution, offset/position needs to be tied > to splits (in FLIP-27) and legacy sources. (It applies to both Kafka > and Kinesis legacy sources and FLIP-27 Kafka source.). With the new > source framework, it would be hard to implement a source with correct > behavior that does not track the position along with the split. > > In Yuan's example, is there a reason why CP8 could not be promoted to > CP10 by the coordinator for PR2 once it receives the notification that > CP10 did not complete? It appears that should be possible since in its > effect it should be no different than no data processed between CP8 > and CP10? > > Thanks, > Thomas > > On Mon, Feb 7, 2022 at 2:36 AM Till Rohrmann wrote: > > > > Thanks for the clarification Yuan and Gen, > > > > I agree that the checkpointing of the sources needs to support the > > rescaling case, otherwise it does not work. Is there currently a source > > implementation where this wouldn't work? For Kafka it should work because > > we store the offset per assigned partition. For Kinesis it is probably > the > > same. For the Filesource we store the set of unread input splits in the > > source coordinator and the state of the assigned splits in the sources. > > This should probably also work since new splits are only handed out to > > running tasks. > > > > Cheers, > > Till > > > > On Mon, Feb 7, 2022 at 10:29 AM Yuan Mei wrote: > > > > > Hey Till, > > > > > > > Why rescaling is a problem for pipelined regions/independent > execution > > > subgraphs: > > > > > > Take a simplified example : > > > job graph : source (2 instances) -> sink (2 instances) > > > execution graph: > > > source (1/2) -> sink (1/2) [pieplined region 1] > > > source (2/2) -> sink (2/2) [pieplined region 2] > > > > > > Let's assume checkpoints are still triggered globally, meaning > different > > > pipelined regions share the global checkpoint id (PR1 CP1 matches with > PR2 > > > CP1). > > > > > > Now let's assume PR1 completes CP10 and PR2 completes CP8. > > > > > > Let's say we want to rescale to parallelism 3 due to increased input. > > > > > > - Notice that we can not simply rescale based on the latest completed > > > checkpoint (CP8), because PR1 has already had data (CP8 -> CP10) output > > > externally. > > > - Can we take CP10 from PR1 and CP8 from PR2? I think it depends on > how the > > > source's offset redistribution is implemented. > > >The answer is yes if we treat each input partition as independent > from > > > each other, *but I am not sure whether we can make that assumption*. > > > > > > If not, the rescaling cannot happen until PR1 and PR2 are aligned with > CPs. > > > > > > Best > > > -Yuan > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Feb 7, 2022 at 4:17 PM Till Rohrmann > wrote: > > > > > > > Hi everyone, > > > > > > > > Yuan and Gen could you elaborate why rescaling is a problem if we say > > > that > > > > separate pipelined regions can take checkpoints indepen
[jira] [Created] (FLINK-25994) Implement FileStoreExpire
Caizhi Weng created FLINK-25994: --- Summary: Implement FileStoreExpire Key: FLINK-25994 URL: https://issues.apache.org/jira/browse/FLINK-25994 Project: Flink Issue Type: Sub-task Components: Table Store Reporter: Caizhi Weng Fix For: 1.15.0 Currently FileStoreExpire does not have an implementation. We need an implementation to clean up old snapshots and related files. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25993) Option to disable Kryo.setRegistrationRequired(false)
Shane Bishop created FLINK-25993: Summary: Option to disable Kryo.setRegistrationRequired(false) Key: FLINK-25993 URL: https://issues.apache.org/jira/browse/FLINK-25993 Project: Flink Issue Type: New Feature Affects Versions: 1.14.3 Reporter: Shane Bishop I would like to request a mechanism that a Flink library user could use to optionally disable Kryo.setRegistrationRequired(false). The motivation is that Kyro.setRegistrationRequired(true) was made the safe default in [this commit|https://github.com/EsotericSoftware/kryo/commit/fc7f0cc7037ff1384b4cdac5b7ada287c64f0a00] (specifically the change was [this line|https://github.com/EsotericSoftware/kryo/commit/fc7f0cc7037ff1384b4cdac5b7ada287c64f0a00#diff-6d4638ca49aa0d0d9171ff04a0faa22e241f8320fda4a8a12c95853188d055a0R130] in the commit). This default is overriden in the 1.14.3 Flink release (see [KryoSerializer.java|https://github.com/apache/flink/blob/release-1.14.3/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java#L492] and [FlinkScalaKryoInstantiator.scala|https://github.com/apache/flink/blob/release-1.14.3/flink-scala/src/main/scala/org/apache/flink/runtime/types/FlinkScalaKryoInstantiator.scala#L46]). Reference to thread in mailing list: https://lists.apache.org/thread/0bzro3qmmsqrz5kh7lmx3xwfyj8wl5rk -- This message was sent by Atlassian Jira (v8.20.1#820001)
Re: [DISCUSS] Checkpointing (partially) failing jobs
Hi, Thanks for opening this discussion! The proposed enhancement would be interesting for use cases in our infrastructure as well. There are scenarios where it makes sense to have multiple disconnected subgraphs in a single job because it can significantly reduce the operational burden as well as the runtime overhead. Since we allow subgraphs to recover independently, then why not allow them to make progress independently also, which would imply that checkpointing must succeed for non affected subgraphs as certain behavior is tied to checkpoint completion, like Kafka offset commit, file output etc. As for source offset redistribution, offset/position needs to be tied to splits (in FLIP-27) and legacy sources. (It applies to both Kafka and Kinesis legacy sources and FLIP-27 Kafka source.). With the new source framework, it would be hard to implement a source with correct behavior that does not track the position along with the split. In Yuan's example, is there a reason why CP8 could not be promoted to CP10 by the coordinator for PR2 once it receives the notification that CP10 did not complete? It appears that should be possible since in its effect it should be no different than no data processed between CP8 and CP10? Thanks, Thomas On Mon, Feb 7, 2022 at 2:36 AM Till Rohrmann wrote: > > Thanks for the clarification Yuan and Gen, > > I agree that the checkpointing of the sources needs to support the > rescaling case, otherwise it does not work. Is there currently a source > implementation where this wouldn't work? For Kafka it should work because > we store the offset per assigned partition. For Kinesis it is probably the > same. For the Filesource we store the set of unread input splits in the > source coordinator and the state of the assigned splits in the sources. > This should probably also work since new splits are only handed out to > running tasks. > > Cheers, > Till > > On Mon, Feb 7, 2022 at 10:29 AM Yuan Mei wrote: > > > Hey Till, > > > > > Why rescaling is a problem for pipelined regions/independent execution > > subgraphs: > > > > Take a simplified example : > > job graph : source (2 instances) -> sink (2 instances) > > execution graph: > > source (1/2) -> sink (1/2) [pieplined region 1] > > source (2/2) -> sink (2/2) [pieplined region 2] > > > > Let's assume checkpoints are still triggered globally, meaning different > > pipelined regions share the global checkpoint id (PR1 CP1 matches with PR2 > > CP1). > > > > Now let's assume PR1 completes CP10 and PR2 completes CP8. > > > > Let's say we want to rescale to parallelism 3 due to increased input. > > > > - Notice that we can not simply rescale based on the latest completed > > checkpoint (CP8), because PR1 has already had data (CP8 -> CP10) output > > externally. > > - Can we take CP10 from PR1 and CP8 from PR2? I think it depends on how the > > source's offset redistribution is implemented. > >The answer is yes if we treat each input partition as independent from > > each other, *but I am not sure whether we can make that assumption*. > > > > If not, the rescaling cannot happen until PR1 and PR2 are aligned with CPs. > > > > Best > > -Yuan > > > > > > > > > > > > > > > > On Mon, Feb 7, 2022 at 4:17 PM Till Rohrmann wrote: > > > > > Hi everyone, > > > > > > Yuan and Gen could you elaborate why rescaling is a problem if we say > > that > > > separate pipelined regions can take checkpoints independently? > > > Conceptually, I somehow think that a pipelined region that is failed and > > > cannot create a new checkpoint is more or less the same as a pipelined > > > region that didn't get new input or a very very slow pipelined region > > which > > > couldn't read new records since the last checkpoint (assuming that the > > > checkpoint coordinator can create a global checkpoint by combining > > > individual checkpoints (e.g. taking the last completed checkpoint from > > each > > > pipelined region)). If this comparison is correct, then this would mean > > > that we have rescaling problems under the latter two cases. > > > > > > Cheers, > > > Till > > > > > > On Mon, Feb 7, 2022 at 8:55 AM Gen Luo wrote: > > > > > > > Hi Gyula, > > > > > > > > Thanks for sharing the idea. As Yuan mentioned, I think we can discuss > > > this > > > > within two scopes. One is the job subgraph, the other is the execution > > > > subgraph, which I suppose is the same as PipelineRegion. > > > > > > > > An idea is to individually checkpoint the PipelineRegions, for the > > > > recovering in a single run. > > > > > > > > Flink has now supported PipelineRegion based failover, with a subset > > of a > > > > global checkpoint snapshot. The checkpoint barriers are spread within a > > > > PipelineRegion, so the checkpointing of individual PipelineRegions is > > > > actually independent. Since in a single run of a job, the > > PipelineRegions > > > > are fixed, we can individually checkpoint separated PipelineRegions, > > > > despite what status the other Pi
[jira] [Created] (FLINK-25992) JobDispatcherITCase..testRecoverFromCheckpointAfterLosingAndRegainingLeadership fails on azure
Roman Khachatryan created FLINK-25992: - Summary: JobDispatcherITCase..testRecoverFromCheckpointAfterLosingAndRegainingLeadership fails on azure Key: FLINK-25992 URL: https://issues.apache.org/jira/browse/FLINK-25992 Project: Flink Issue Type: Bug Components: Tests Affects Versions: 1.15.0 Reporter: Roman Khachatryan Fix For: 1.15.0 https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=30871&view=logs&j=0da23115-68bb-5dcd-192c-bd4c8adebde1&t=24c3384f-1bcb-57b3-224f-51bf973bbee8&l=9154 {code} 19:41:35,515 [flink-akka.actor.default-dispatcher-9] WARN org.apache.flink.runtime.taskmanager.Task[] - jobVertex (1/1)#0 (7efdea21f5f95490e02117063ce8a314) switched from RUNNING to FAILED with failure cause: java.lang.RuntimeException: Error while notify checkpoint ABORT. at org.apache.flink.runtime.taskmanager.Task.notifyCheckpoint(Task.java:1457) at org.apache.flink.runtime.taskmanager.Task.notifyCheckpointAborted(Task.java:1407) at org.apache.flink.runtime.taskexecutor.TaskExecutor.abortCheckpoint(TaskExecutor.java:1021) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316) at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at akka.actor.Actor.aroundReceive(Actor.scala:537) at akka.actor.Actor.aroundReceive$(Actor.scala:535) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) at akka.actor.ActorCell.invoke(ActorCell.scala:548) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) at akka.dispatch.Mailbox.run(Mailbox.scala:231) at akka.dispatch.Mailbox.exec(Mailbox.scala:243) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) Caused by: java.lang.UnsupportedOperationException: notifyCheckpointAbortAsync not supported by org.apache.flink.runtime.dispatcher.JobDispatcherITCase$AtLeastOneCheckpointInvokable at org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable.notifyCheckpointAbortAsync(AbstractInvokable.java:205) at org.apache.flink.runtime.taskmanager.Task.notifyCheckpoint(Task.java:1430) ... 31 more {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25991) Allow custom metadata to be committed alongside the offsets
Frederic Hemery created FLINK-25991: --- Summary: Allow custom metadata to be committed alongside the offsets Key: FLINK-25991 URL: https://issues.apache.org/jira/browse/FLINK-25991 Project: Flink Issue Type: Improvement Components: Connectors / Kafka Reporter: Frederic Hemery A Kafka Consumer can commit metadata alongside its offsets. This can be used for different purposes but, in our case, we use the metadata to provide consistent lag (in seconds, not offsets) reporting across our stack. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25990) Expose uid generator for DataStream/Transformation providers
Timo Walther created FLINK-25990: Summary: Expose uid generator for DataStream/Transformation providers Key: FLINK-25990 URL: https://issues.apache.org/jira/browse/FLINK-25990 Project: Flink Issue Type: Sub-task Components: Table SQL / API Reporter: Timo Walther The following providers can create more than one transformation and are thus kind of a black block for the planner: - TransformationScanProvider - DataStreamScanProvider - DataStreamSinkProvider - TransformationSinkProvider We should add a context to `TransformationScanProvider#createTransformation` and `DataStreamScanProvider#produceDataStream`. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25989) EventTimeWindowCheckpointingITCase failed with exit code 137
Yun Gao created FLINK-25989: --- Summary: EventTimeWindowCheckpointingITCase failed with exit code 137 Key: FLINK-25989 URL: https://issues.apache.org/jira/browse/FLINK-25989 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Affects Versions: 1.13.5 Reporter: Yun Gao {code:java} Feb 07 10:35:00 Starting org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase#testSlidingTimeWindow[statebackend type =ROCKSDB_FULLY_ASYNC]. ##[error]Exit code 137 returned from process: file name '/bin/docker', arguments 'exec -i -u 1006 -w /home/agent07_azpcontainer cc88843b38dcb3e53e10065d2a740015cf3fd387e5ef47621c224a9b348fbb37 /__a/externals/node/bin/node /__w/_temp/containerHandlerInvoker.js'. Finishing: Test - tests {code} https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=30833&view=logs&j=5c8e7682-d68f-54d1-16a2-a09310218a49&t=f508e270-48d6-5f1e-3138-42a17e0714f0&l=4495 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25988) UnalignedCheckpointRescaleITCase failed with exit code 137
Yun Gao created FLINK-25988: --- Summary: UnalignedCheckpointRescaleITCase failed with exit code 137 Key: FLINK-25988 URL: https://issues.apache.org/jira/browse/FLINK-25988 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Affects Versions: 1.15.0 Reporter: Yun Gao {code:java} Feb 07 10:35:02 Starting org.apache.flink.test.checkpointing.UnalignedCheckpointRescaleITCase#shouldRescaleUnalignedCheckpoint[no scale multi_input from 7 to 7, buffersPerChannel = 0]. Feb 07 10:35:02 Starting org.apache.flink.test.checkpointing.UnalignedCheckpointRescaleITCase#shouldRescaleUnalignedCheckpoint[no scale multi_input from 7 to 7, buffersPerChannel = 0]. Feb 07 10:35:02 Finished org.apache.flink.test.checkpointing.UnalignedCheckpointRescaleITCase#shouldRescaleUnalignedCheckpoint[no scale multi_input from 7 to 7, buffersPerChannel = 0]. ##[error]Exit code 137 returned from process: file name '/bin/docker', arguments 'exec -i -u 1003 -w /home/agent04_azpcontainer 54763e695d61cf1a80b5d4d6c397eec13218a705cddf2d5d659cec21533e1f8a /__a/externals/node/bin/node /__w/_temp/containerHandlerInvoker.js'. Finishing: Test - finegrained_resource_management {code} https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=30829&view=logs&j=a57e0635-3fad-5b08-57c7-a4142d7d6fa9&t=2ef0effc-1da1-50e5-c2bd-aab434b1c5b7&l=12459 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25987) IllegalArgumentException thrown from FsStateChangelogWriter.truncate
Roman Khachatryan created FLINK-25987: - Summary: IllegalArgumentException thrown from FsStateChangelogWriter.truncate Key: FLINK-25987 URL: https://issues.apache.org/jira/browse/FLINK-25987 Project: Flink Issue Type: Bug Components: Runtime / State Backends Affects Versions: 1.15.0 Reporter: Roman Khachatryan Assignee: Roman Khachatryan Fix For: 1.15.0 {code} java.lang.IllegalArgumentException at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:122) at org.apache.flink.changelog.fs.FsStateChangelogWriter.truncate(FsStateChangelogWriter.java:278) at org.apache.flink.state.changelog.ChangelogKeyedStateBackend.updateChangelogSnapshotState(ChangelogKeyedStateBackend.java:702) at org.apache.flink.state.changelog.PeriodicMaterializationManager.lambda$null$2(PeriodicMaterializationManager.java:163) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at java.lang.Thread.run(Thread.java:750){code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25986) Add FLIP-190 new API methods to python
Francesco Guardiani created FLINK-25986: --- Summary: Add FLIP-190 new API methods to python Key: FLINK-25986 URL: https://issues.apache.org/jira/browse/FLINK-25986 Project: Flink Issue Type: Bug Components: API / Python, Table SQL / API Reporter: Francesco Guardiani -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25985) e2e test covering the main functionality of the JobResultStore
Matthias Pohl created FLINK-25985: - Summary: e2e test covering the main functionality of the JobResultStore Key: FLINK-25985 URL: https://issues.apache.org/jira/browse/FLINK-25985 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Affects Versions: 1.15.0 Reporter: Matthias Pohl -- This message was sent by Atlassian Jira (v8.20.1#820001)
Re: [VOTE] Release 1.13.6, release candidate #1
+1 (binding) - signatures OK - checksums OK - tag OK - all artifacts accounted for - PR looks good On 05/02/2022 21:06, Konstantin Knauf wrote: Hi everyone, Please review and vote on the release candidate #1 for the version 1.13.6, as follows: [ ] +1, Approve the release [ ] -1, Do not approve the release (please provide specific comments) The complete staging area is available for your review, which includes: * JIRA release notes [1], * the official Apache source release and binary convenience releases to be deployed to dist.apache.org [2], which are signed with the key with fingerprint 8C3FB007FE60 DEFA [3], * all artifacts to be deployed to the Maven Central Repository [4], * source code tag "release-1.2.3-rc3" [5], * website pull request listing the new release and adding announcement blog post [6]. The vote will be open for at least 72 hours. It is adopted by majority approval, with at least 3 PMC affirmative votes. Thanks, Konstantin [1] https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12351074 [2] https://dist.apache.org/repos/dist/dev/flink/flink-1.13.6-rc1/ [3] https://dist.apache.org/repos/dist/release/flink/KEYS [4] https://repository.apache.org/content/repositories/orgapacheflink-1486/ [5] https://github.com/apache/flink/tree/release-1.13.6-rc1 [6] https://github.com/apache/flink-web/pull/505
[jira] [Created] (FLINK-25984) Add types to ConfigOptions and replace usages of the untyped methods defaultValue and noDefaultValue with the corresponding typed ones.
Marios Trivyzas created FLINK-25984: --- Summary: Add types to ConfigOptions and replace usages of the untyped methods defaultValue and noDefaultValue with the corresponding typed ones. Key: FLINK-25984 URL: https://issues.apache.org/jira/browse/FLINK-25984 Project: Flink Issue Type: Technical Debt Reporter: Marios Trivyzas Assignee: Marios Trivyzas -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25983) Add WatermarkStrategy#withWatermarkAlignment
Dawid Wysakowicz created FLINK-25983: Summary: Add WatermarkStrategy#withWatermarkAlignment Key: FLINK-25983 URL: https://issues.apache.org/jira/browse/FLINK-25983 Project: Flink Issue Type: Sub-task Components: API / DataStream Reporter: Dawid Wysakowicz Assignee: Dawid Wysakowicz Fix For: 1.15.0 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25982) Support idleness with watermark alignment
Piotr Nowojski created FLINK-25982: -- Summary: Support idleness with watermark alignment Key: FLINK-25982 URL: https://issues.apache.org/jira/browse/FLINK-25982 Project: Flink Issue Type: Sub-task Reporter: Piotr Nowojski -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25981) ZooKeeperMultipleComponentLeaderElectionDriverTest failed
Matthias Pohl created FLINK-25981: - Summary: ZooKeeperMultipleComponentLeaderElectionDriverTest failed Key: FLINK-25981 URL: https://issues.apache.org/jira/browse/FLINK-25981 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.15.0 Reporter: Matthias Pohl We experienced a [build failure|https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=30783&view=logs&j=a57e0635-3fad-5b08-57c7-a4142d7d6fa9&t=2ef0effc-1da1-50e5-c2bd-aab434b1c5b7&l=15997] in {{ZooKeeperMultipleComponentLeaderElectionDriverTest.testLeaderElectionWithMultipleDrivers}}. The test halted when waiting for the next leader in [ZooKeeperMultipleComponentLeaderElectionDriverTest:256|https://github.com/apache/flink/blob/e8742f7f5cac34852d0e621036e1614bbdfe8ec3/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperMultipleComponentLeaderElectionDriverTest.java#L256] {code} Feb 04 18:02:54 "main" #1 prio=5 os_prio=0 tid=0x7fab0800b800 nid=0xe07 waiting on condition [0x7fab12574000] Feb 04 18:02:54java.lang.Thread.State: WAITING (parking) Feb 04 18:02:54 at sun.misc.Unsafe.park(Native Method) Feb 04 18:02:54 - parking to wait for <0x8065c5c8> (a java.util.concurrent.CompletableFuture$Signaller) Feb 04 18:02:54 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) Feb 04 18:02:54 at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707) Feb 04 18:02:54 at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) Feb 04 18:02:54 at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742) Feb 04 18:02:54 at java.util.concurrent.CompletableFuture.join(CompletableFuture.java:1947) Feb 04 18:02:54 at org.apache.flink.runtime.leaderelection.ZooKeeperMultipleComponentLeaderElectionDriverTest.testLeaderElectionWithMultipleDrivers(ZooKeeperMultipleComponentLeaderElectionDriverTest.java:256) [...] {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25980) remove unnecessary condition in IntervalJoinOperator
hongshu created FLINK-25980: --- Summary: remove unnecessary condition in IntervalJoinOperator Key: FLINK-25980 URL: https://issues.apache.org/jira/browse/FLINK-25980 Project: Flink Issue Type: Improvement Components: API / DataStream Affects Versions: 1.14.3, 1.14.2, 1.13.5, 1.12.7, 1.11.6 Reporter: hongshu Fix For: 1.15.0 Condition 'currentWatermark != Long.MIN_VALUE' covered by subsequent condition 'timestamp < currentWatermark' org.apache.flink.streaming.api.operators.co.IntervalJoinOperator#isLate {code:java} private boolean isLate(long timestamp) { long currentWatermark = internalTimerService.currentWatermark(); return currentWatermark != Long.MIN_VALUE && timestamp < currentWatermark; } {code} if currentWatermark == Long.MIN_VALUE, timestamp < currentWatermark is also return false, so condition currentWatermark != Long.MIN_VALUE is unnecessary We can use the following code directly {code:java} private boolean isLate(long timestamp) { long currentWatermark = internalTimerService.currentWatermark(); return timestamp < currentWatermark; } {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
RE: [DISCUSS] FLIP-212: Introduce Flink Kubernetes Operator
Danny Cranmer mentioned they are interested in standalone mode, and I am too, so I just wanted to say that if that development starts in parallel, I might be able to contribute a little. Regarding the CRD, I agree it would be nice to avoid as many "duplications" as possible if pod templates are to be used. In my PoC I even tried to make use of existing configuration options like kubernetes.container.image & pipeline.jars [1]. For CPU/Memory resources, the discussion in [2] might be relevant. [1] https://github.com/MicroFocus/opsb-flink-k8s-operator/blob/main/kubernetes/sample_batch_job.yaml [2] https://issues.apache.org/jira/browse/FLINK-24150 Regards, Alexis. -Original Message- From: K Fred Sent: Montag, 7. Februar 2022 09:36 To: dev@flink.apache.org Subject: Re: [DISCUSS] FLIP-212: Introduce Flink Kubernetes Operator Hi Gyula! You are right. I think some common flink config options can be put in the CR, other expert settings continue to be overwritten by flink, and then the user can choose to customize the configuration. Best Wishes, Peng Yuan On Mon, Feb 7, 2022 at 4:16 PM Gyula Fóra wrote: > Hi Yangze! > > This is not set in stone at the moment but the way I think it should > work is that first class config options in the CR should always take > precedence over the Flink config. > > In general we should not introduce too many arbitrary config options > that duplicate the flink configs without good reasons but the ones we > introduce should overwrite flink configs. > > We should discuss and decide together what config options to keep in > the flink conf and what to bring on the CR level. Resource related > ones are difficult because on one hand they are integral to every > application, on the other hand there are many expert settings that we > should probably leave in the conf. > > Cheers, > Gyula > > > > On Mon, Feb 7, 2022 at 8:28 AM Yangze Guo wrote: > > > Thanks everyone for the great effort. The FLIP looks really good. > > > > I just want to make sure the configuration priority in the CR example. > > It seems the requests resources or "taskManager. taskSlots" will be > > transferred to Flink internal config, e.g. > > "taskmanager.memory.process.size" and > > "taskmanager.numberOfTaskSlots", and override the one in > > "flinkConfiguration". Am I understanding this correctly? > > > > Best, > > Yangze Guo > > > > On Mon, Feb 7, 2022 at 10:22 AM Xintong Song > > wrote: > > > > > > Sorry for the late reply. We were out due to the public holidays > > > in > > China. > > > > > > @Thomas, > > > > > > The intention is to support application management through > > > operator and > > CR, > > > > which means there won't be any 2 step submission process, which > > > > as > you > > > > allude to would defeat the purpose of this project. The CR > > > > example > > shows > > > > the application part. Please note that the bare cluster support > > > > is an > > > > *additional* feature for scenarios that require external job > > management. Is > > > > there anything on the FLIP page that creates a different impression? > > > > > > > > > > Sounds good to me. I don't remember what created the impression of > > > 2 > step > > > submission back then. I revisited the latest version of this FLIP > > > and > it > > > looks good to me. > > > > > > @Gyula, > > > > > > Versioning: > > > > Versioning will be independent from Flink and the operator will > depend > > on a > > > > fixed flink version (in every given operator version). > > > > This should be the exact same setup as with Stateful Functions ( > > > > https://github.com/apache/flink-statefun). So independent > > > > release > > cycle > > > > but > > > > still within the Flink umbrella. > > > > > > > > > > Does this mean if someone wants to upgrade Flink to a version that > > > is released after the operator version that is being used, he/she > > > would > need > > > to upgrade the operator version first? > > > I'm not questioning this, just trying to make sure I'm > > > understanding > this > > > correctly. > > > > > > Thank you~ > > > > > > Xintong Song > > > > > > > > > > > > On Mon, Feb 7, 2022 at 3:14 AM Gyula Fóra > wrote: > > > > > > > Thank you Alexis, > > > > > > > > Will definitely check this out. You are right, Kotlin makes it > > difficult to > > > > adopt pieces of this code directly but I think it will be good > > > > to get inspiration for the architecture and look at how > > > > particular problems > > have > > > > been solved. It will be a great help for us I am sure. > > > > > > > > Cheers, > > > > Gyula > > > > > > > > On Sat, Feb 5, 2022 at 12:28 PM Alexis Sarda-Espinosa < > > > > alexis.sarda-espin...@microfocus.com> wrote: > > > > > > > > > Hi everyone, > > > > > > > > > > just wanted to mention that my employer agreed to open source > > > > > the > > PoC I > > > > > developed: > > > > > https://github.com/MicroFocus/opsb-flink-k8s-operator > > > > > > > > > > I understand the concern for
[jira] [Created] (FLINK-25979) Suspicious Classloading error during close of KafkaEnumerator
Chesnay Schepler created FLINK-25979: Summary: Suspicious Classloading error during close of KafkaEnumerator Key: FLINK-25979 URL: https://issues.apache.org/jira/browse/FLINK-25979 Project: Flink Issue Type: Bug Components: Connectors / Common, Connectors / Kafka Affects Versions: 1.14.2 Reporter: Chesnay Schepler A user reported kafka logging a warning when the KafkaEnumerator was being closed. {code} 2022-02-04 15:16:30,801 WARN org.apache.kafka.common.utils.Utils [] - Failed to close KafkaClient with type org.apache.kafka.clients.NetworkClient java.lang.NoClassDefFoundError: org/apache/kafka/common/network/Selector$CloseMode at org.apache.kafka.common.network.Selector.close(Selector.java:806) ~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?] at org.apache.kafka.common.network.Selector.close(Selector.java:365) ~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?] at org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:639) ~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?] at org.apache.kafka.common.utils.Utils.closeQuietly(Utils.java:834) [blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?] at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1219) [blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?] at java.lang.Thread.run(Thread.java:829) [?:?] Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.network.Selector$CloseMode at java.net.URLClassLoader.findClass(URLClassLoader.java:476) ~[?:?] at java.lang.ClassLoader.loadClass(ClassLoader.java:589) ~[?:?] at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at java.lang.ClassLoader.loadClass(ClassLoader.java:522) ~[?:?] ... 6 more 2022-02-04 15:16:30,802 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source coordinator for source Source: Kafka Source -> Sink: Print to Std. Out closed. {code} {code} KafkaSource source = KafkaSource .builder() .setBootstrapServers(brokers) .setGroupId(groupId) .setTopics(kafkaInputTopic) .setValueOnlyDeserializer(new SimpleStringSchema()) //.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeSerializer.class)) .setStartingOffsets(OffsetsInitializer.earliest()) .setBounded(OffsetsInitializer.latest()) .build(); //withIdleness.duration() //env.fromSource(source, WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source"); DataStream ds = env.fromSource(source, WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source"); {code} This error is overall a bit weird; I don't think I've ever seen a class being unable to load one of it's inner classes. intuitively I would think this is caused by the classloader being closed prematurely. ds.print(); {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25978) DispatcherTest.testJobDataAreCleanedUpInCorrectOrderOn*Job can be removed
Matthias Pohl created FLINK-25978: - Summary: DispatcherTest.testJobDataAreCleanedUpInCorrectOrderOn*Job can be removed Key: FLINK-25978 URL: https://issues.apache.org/jira/browse/FLINK-25978 Project: Flink Issue Type: Sub-task Affects Versions: 1.15.0 Reporter: Matthias Pohl These test cases are covered in {{DispatcherResourceCleanupTest}} and {{DispatcherResourceCleanerFactoryTest}} verifying the correct order of events. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25977) Close sink client and sink http client for KDS/KDF Sinks
Zichen Liu created FLINK-25977: -- Summary: Close sink client and sink http client for KDS/KDF Sinks Key: FLINK-25977 URL: https://issues.apache.org/jira/browse/FLINK-25977 Project: Flink Issue Type: Improvement Components: Connectors / Kinesis Reporter: Zichen Liu Assignee: Ahmed Hamdy Fix For: 1.15.0 h2. Update: DEFAULT_MAX_IN_FLIGHT_REQUESTS=50 to match with the default threads in the AWS SDK v2 HTTP Client default. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25976) Update the KDS and KDF Sink's defaults & update the docs
Zichen Liu created FLINK-25976: -- Summary: Update the KDS and KDF Sink's defaults & update the docs Key: FLINK-25976 URL: https://issues.apache.org/jira/browse/FLINK-25976 Project: Flink Issue Type: Bug Components: Connectors / Kinesis Reporter: Zichen Liu Assignee: Ahmed Hamdy Fix For: 1.15.0 h2. Bug: Async Sink Base is too being flushed too frequently resulting in backpressure even when buffer is near empty *Cause:* During a write(), flushIfAble() is called, which checks if the number of buffered elements is greater than a batch size, and if so, insists that the sink flushes immediately, even if the number of inFlightRequests is greater than the maximum allowed number of inFlightRequests, resulting in a yield of the current mailbox thread, and hence blocks. Notice that this can occur even if the buffer is near empty, so the blocking behaviour is unnecessary and undesirable, since we would like the element to be written to the buffer and no blocking to occur. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25975) add doc for how to use AvroParquetRecordFormat
Jing Ge created FLINK-25975: --- Summary: add doc for how to use AvroParquetRecordFormat Key: FLINK-25975 URL: https://issues.apache.org/jira/browse/FLINK-25975 Project: Flink Issue Type: Improvement Components: Documentation Reporter: Jing Ge https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/formats/parquet/ -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25974) Make cancellation of jobs depend on the JobResultStore
Matthias Pohl created FLINK-25974: - Summary: Make cancellation of jobs depend on the JobResultStore Key: FLINK-25974 URL: https://issues.apache.org/jira/browse/FLINK-25974 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Affects Versions: 1.15.0 Reporter: Matthias Pohl {{JobManagerRunner}} instances were cancellable as long as the instance was still registered in the {{Dispatcher.jobManagerRunnerRegistry}}. With the cleanup being done concurrently (i.e. not relying on the {{JobManagerRunnerRegistry}} to be cleaned up anymore), the cancellation of a job should only be possible as long as the job is not globally finished and before cleanup is triggered. We should verify whether a job is listed in the {{JobResultStore}} and only enable the user to cancel the job if that's not the case. -- This message was sent by Atlassian Jira (v8.20.1#820001)
Re: [DISCUSS] Checkpointing (partially) failing jobs
Thanks for the clarification Yuan and Gen, I agree that the checkpointing of the sources needs to support the rescaling case, otherwise it does not work. Is there currently a source implementation where this wouldn't work? For Kafka it should work because we store the offset per assigned partition. For Kinesis it is probably the same. For the Filesource we store the set of unread input splits in the source coordinator and the state of the assigned splits in the sources. This should probably also work since new splits are only handed out to running tasks. Cheers, Till On Mon, Feb 7, 2022 at 10:29 AM Yuan Mei wrote: > Hey Till, > > > Why rescaling is a problem for pipelined regions/independent execution > subgraphs: > > Take a simplified example : > job graph : source (2 instances) -> sink (2 instances) > execution graph: > source (1/2) -> sink (1/2) [pieplined region 1] > source (2/2) -> sink (2/2) [pieplined region 2] > > Let's assume checkpoints are still triggered globally, meaning different > pipelined regions share the global checkpoint id (PR1 CP1 matches with PR2 > CP1). > > Now let's assume PR1 completes CP10 and PR2 completes CP8. > > Let's say we want to rescale to parallelism 3 due to increased input. > > - Notice that we can not simply rescale based on the latest completed > checkpoint (CP8), because PR1 has already had data (CP8 -> CP10) output > externally. > - Can we take CP10 from PR1 and CP8 from PR2? I think it depends on how the > source's offset redistribution is implemented. >The answer is yes if we treat each input partition as independent from > each other, *but I am not sure whether we can make that assumption*. > > If not, the rescaling cannot happen until PR1 and PR2 are aligned with CPs. > > Best > -Yuan > > > > > > > > On Mon, Feb 7, 2022 at 4:17 PM Till Rohrmann wrote: > > > Hi everyone, > > > > Yuan and Gen could you elaborate why rescaling is a problem if we say > that > > separate pipelined regions can take checkpoints independently? > > Conceptually, I somehow think that a pipelined region that is failed and > > cannot create a new checkpoint is more or less the same as a pipelined > > region that didn't get new input or a very very slow pipelined region > which > > couldn't read new records since the last checkpoint (assuming that the > > checkpoint coordinator can create a global checkpoint by combining > > individual checkpoints (e.g. taking the last completed checkpoint from > each > > pipelined region)). If this comparison is correct, then this would mean > > that we have rescaling problems under the latter two cases. > > > > Cheers, > > Till > > > > On Mon, Feb 7, 2022 at 8:55 AM Gen Luo wrote: > > > > > Hi Gyula, > > > > > > Thanks for sharing the idea. As Yuan mentioned, I think we can discuss > > this > > > within two scopes. One is the job subgraph, the other is the execution > > > subgraph, which I suppose is the same as PipelineRegion. > > > > > > An idea is to individually checkpoint the PipelineRegions, for the > > > recovering in a single run. > > > > > > Flink has now supported PipelineRegion based failover, with a subset > of a > > > global checkpoint snapshot. The checkpoint barriers are spread within a > > > PipelineRegion, so the checkpointing of individual PipelineRegions is > > > actually independent. Since in a single run of a job, the > PipelineRegions > > > are fixed, we can individually checkpoint separated PipelineRegions, > > > despite what status the other PipelineRegions are, and use a snapshot > of > > a > > > failing region to recover, instead of the subset of a global snapshot. > > This > > > can support separated job subgraphs as well, since they will also be > > > separated into different PipelineRegions. I think this can fulfill your > > > needs. > > > > > > In fact the individual snapshots of all PipelineRegions can form a > global > > > snapshot, and the alignment of snapshots of individual regions is not > > > necessary. But rescaling this global snapshot can be potentially > > complex. I > > > think it's better to use the individual snapshots in a single run, and > > take > > > a global checkpoint/savepoint before restarting the job, rescaling it > or > > > not. > > > > > > A major issue of this plan is that it breaks the checkpoint mechanism > of > > > Flink. As far as I know, even in the approximate recovery, the snapshot > > > used to recover a single task is still a part of a global snapshot. To > > > implement the individual checkpointing of PipelineRegions, there may > need > > > to be a checkpoint coordinator for each PipelineRegion, and a new > global > > > checkpoint coordinator. When the scale goes up, there can be many > > > individual regions, which can be a big burden to the job manager. The > > > meaning of the checkpoint id will also be changed, which can affect > many > > > aspects. There can be lots of work and risks, and the risks still exist > > if > > > we only individually checkpoint separated job
[jira] [Created] (FLINK-25973) Rename ArchivedExecutionGraph.createFromInitializingJob into more generic createSparseArchivedExecutionGraph
Matthias Pohl created FLINK-25973: - Summary: Rename ArchivedExecutionGraph.createFromInitializingJob into more generic createSparseArchivedExecutionGraph Key: FLINK-25973 URL: https://issues.apache.org/jira/browse/FLINK-25973 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Affects Versions: 1.15.0 Reporter: Matthias Pohl The use cases for this method changed. We should rename it into something that fits both usecases. -- This message was sent by Atlassian Jira (v8.20.1#820001)
Re: [DISCUSS] Checkpointing (partially) failing jobs
Hey Till, > Why rescaling is a problem for pipelined regions/independent execution subgraphs: Take a simplified example : job graph : source (2 instances) -> sink (2 instances) execution graph: source (1/2) -> sink (1/2) [pieplined region 1] source (2/2) -> sink (2/2) [pieplined region 2] Let's assume checkpoints are still triggered globally, meaning different pipelined regions share the global checkpoint id (PR1 CP1 matches with PR2 CP1). Now let's assume PR1 completes CP10 and PR2 completes CP8. Let's say we want to rescale to parallelism 3 due to increased input. - Notice that we can not simply rescale based on the latest completed checkpoint (CP8), because PR1 has already had data (CP8 -> CP10) output externally. - Can we take CP10 from PR1 and CP8 from PR2? I think it depends on how the source's offset redistribution is implemented. The answer is yes if we treat each input partition as independent from each other, *but I am not sure whether we can make that assumption*. If not, the rescaling cannot happen until PR1 and PR2 are aligned with CPs. Best -Yuan On Mon, Feb 7, 2022 at 4:17 PM Till Rohrmann wrote: > Hi everyone, > > Yuan and Gen could you elaborate why rescaling is a problem if we say that > separate pipelined regions can take checkpoints independently? > Conceptually, I somehow think that a pipelined region that is failed and > cannot create a new checkpoint is more or less the same as a pipelined > region that didn't get new input or a very very slow pipelined region which > couldn't read new records since the last checkpoint (assuming that the > checkpoint coordinator can create a global checkpoint by combining > individual checkpoints (e.g. taking the last completed checkpoint from each > pipelined region)). If this comparison is correct, then this would mean > that we have rescaling problems under the latter two cases. > > Cheers, > Till > > On Mon, Feb 7, 2022 at 8:55 AM Gen Luo wrote: > > > Hi Gyula, > > > > Thanks for sharing the idea. As Yuan mentioned, I think we can discuss > this > > within two scopes. One is the job subgraph, the other is the execution > > subgraph, which I suppose is the same as PipelineRegion. > > > > An idea is to individually checkpoint the PipelineRegions, for the > > recovering in a single run. > > > > Flink has now supported PipelineRegion based failover, with a subset of a > > global checkpoint snapshot. The checkpoint barriers are spread within a > > PipelineRegion, so the checkpointing of individual PipelineRegions is > > actually independent. Since in a single run of a job, the PipelineRegions > > are fixed, we can individually checkpoint separated PipelineRegions, > > despite what status the other PipelineRegions are, and use a snapshot of > a > > failing region to recover, instead of the subset of a global snapshot. > This > > can support separated job subgraphs as well, since they will also be > > separated into different PipelineRegions. I think this can fulfill your > > needs. > > > > In fact the individual snapshots of all PipelineRegions can form a global > > snapshot, and the alignment of snapshots of individual regions is not > > necessary. But rescaling this global snapshot can be potentially > complex. I > > think it's better to use the individual snapshots in a single run, and > take > > a global checkpoint/savepoint before restarting the job, rescaling it or > > not. > > > > A major issue of this plan is that it breaks the checkpoint mechanism of > > Flink. As far as I know, even in the approximate recovery, the snapshot > > used to recover a single task is still a part of a global snapshot. To > > implement the individual checkpointing of PipelineRegions, there may need > > to be a checkpoint coordinator for each PipelineRegion, and a new global > > checkpoint coordinator. When the scale goes up, there can be many > > individual regions, which can be a big burden to the job manager. The > > meaning of the checkpoint id will also be changed, which can affect many > > aspects. There can be lots of work and risks, and the risks still exist > if > > we only individually checkpoint separated job subgraphs, since the > > mechanism is still broken. If that is what you need, maybe separating > them > > into different jobs is an easier and better choice, as Caizhi and Yuan > > mentioned. > > > > On Mon, Feb 7, 2022 at 11:39 AM Yuan Mei wrote: > > > > > Hey Gyula, > > > > > > That's a very interesting idea. The discussion about the `Individual` > vs > > > `Global` checkpoint was raised before, but the main concern was from > two > > > aspects: > > > > > > - Non-deterministic replaying may lead to an inconsistent view of > > > checkpoint > > > - It is not easy to form a clear cut of past and future and hence no > > clear > > > cut of where the start point of the source should begin to replay from. > > > > > > Starting from independent subgraphs as you proposed may be a good > > starting > > > point. However, when we talk
Re: [DISCUSS] Checkpointing (partially) failing jobs
Hi Till, I agree that a failing task is much like a very slow or deadlock task to the checkpointing. The main difference is whether a checkpoint of the region the task in can be triggered. Triggering a checkpoint on a failing region makes no sense since the checkpoint should be discarded right away. But we can still compose a global checkpoint with the new snapshots of other regions taken when the region is failing, and the former successful snapshot of this failing region. This global snapshot is still valid and can be rescaled like a normal one, if the normal ones are possible to rescale. As far as I know some snapshotting methods are using or depending on the ascending checkpoint id. Checkpointing individually probably means to count the checkpoint id individually. Composing snapshots of different checkpoint ids may cause errors. I am also afraid that there might be issues with the shared states, though I can't figure out a case right now. On Mon, Feb 7, 2022 at 4:17 PM Till Rohrmann wrote: > Hi everyone, > > Yuan and Gen could you elaborate why rescaling is a problem if we say that > separate pipelined regions can take checkpoints independently? > Conceptually, I somehow think that a pipelined region that is failed and > cannot create a new checkpoint is more or less the same as a pipelined > region that didn't get new input or a very very slow pipelined region which > couldn't read new records since the last checkpoint (assuming that the > checkpoint coordinator can create a global checkpoint by combining > individual checkpoints (e.g. taking the last completed checkpoint from each > pipelined region)). If this comparison is correct, then this would mean > that we have rescaling problems under the latter two cases. > > Cheers, > Till > > On Mon, Feb 7, 2022 at 8:55 AM Gen Luo wrote: > > > Hi Gyula, > > > > Thanks for sharing the idea. As Yuan mentioned, I think we can discuss > this > > within two scopes. One is the job subgraph, the other is the execution > > subgraph, which I suppose is the same as PipelineRegion. > > > > An idea is to individually checkpoint the PipelineRegions, for the > > recovering in a single run. > > > > Flink has now supported PipelineRegion based failover, with a subset of a > > global checkpoint snapshot. The checkpoint barriers are spread within a > > PipelineRegion, so the checkpointing of individual PipelineRegions is > > actually independent. Since in a single run of a job, the PipelineRegions > > are fixed, we can individually checkpoint separated PipelineRegions, > > despite what status the other PipelineRegions are, and use a snapshot of > a > > failing region to recover, instead of the subset of a global snapshot. > This > > can support separated job subgraphs as well, since they will also be > > separated into different PipelineRegions. I think this can fulfill your > > needs. > > > > In fact the individual snapshots of all PipelineRegions can form a global > > snapshot, and the alignment of snapshots of individual regions is not > > necessary. But rescaling this global snapshot can be potentially > complex. I > > think it's better to use the individual snapshots in a single run, and > take > > a global checkpoint/savepoint before restarting the job, rescaling it or > > not. > > > > A major issue of this plan is that it breaks the checkpoint mechanism of > > Flink. As far as I know, even in the approximate recovery, the snapshot > > used to recover a single task is still a part of a global snapshot. To > > implement the individual checkpointing of PipelineRegions, there may need > > to be a checkpoint coordinator for each PipelineRegion, and a new global > > checkpoint coordinator. When the scale goes up, there can be many > > individual regions, which can be a big burden to the job manager. The > > meaning of the checkpoint id will also be changed, which can affect many > > aspects. There can be lots of work and risks, and the risks still exist > if > > we only individually checkpoint separated job subgraphs, since the > > mechanism is still broken. If that is what you need, maybe separating > them > > into different jobs is an easier and better choice, as Caizhi and Yuan > > mentioned. > > > > On Mon, Feb 7, 2022 at 11:39 AM Yuan Mei wrote: > > > > > Hey Gyula, > > > > > > That's a very interesting idea. The discussion about the `Individual` > vs > > > `Global` checkpoint was raised before, but the main concern was from > two > > > aspects: > > > > > > - Non-deterministic replaying may lead to an inconsistent view of > > > checkpoint > > > - It is not easy to form a clear cut of past and future and hence no > > clear > > > cut of where the start point of the source should begin to replay from. > > > > > > Starting from independent subgraphs as you proposed may be a good > > starting > > > point. However, when we talk about subgraph, do we mention it as a job > > > subgraph (each vertex is one or more operators) or execution subgraph > > (each > > > vertex
Re: [VOTE] FLIP-212: Introduce Flink Kubernetes Operator
+1 (binding) Best, Yangze Guo On Mon, Feb 7, 2022 at 5:04 PM K Fred wrote: > > +1 (non-binding) > > Best Regards > Peng Yuan > > On Mon, Feb 7, 2022 at 4:49 PM Danny Cranmer > wrote: > > > +1 (binding) > > > > Thanks for this effort! > > Danny Cranmer > > > > On Mon, Feb 7, 2022 at 7:59 AM Biao Geng wrote: > > > > > +1 (non-binding) > > > > > > Best, > > > Biao Geng > > > > > > Peter Huang 于2022年2月7日周一 14:31写道: > > > > > > > +1 (non-binding) > > > > > > > > > > > > Best Regards > > > > Peter Huang > > > > > > > > On Sun, Feb 6, 2022 at 7:35 PM Yang Wang > > wrote: > > > > > > > > > +1 (binding) > > > > > > > > > > Best, > > > > > Yang > > > > > > > > > > Xintong Song 于2022年2月7日周一 10:25写道: > > > > > > > > > > > +1 (binding) > > > > > > > > > > > > Thank you~ > > > > > > > > > > > > Xintong Song > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Feb 7, 2022 at 12:52 AM Márton Balassi < > > > > balassi.mar...@gmail.com > > > > > > > > > > > > wrote: > > > > > > > > > > > > > +1 (binding) > > > > > > > > > > > > > > On Sat, Feb 5, 2022 at 5:35 PM Israel Ekpo > > > > > > > wrote: > > > > > > > > > > > > > > > I am very excited to see this. > > > > > > > > > > > > > > > > Thanks for driving the effort > > > > > > > > > > > > > > > > +1 (non-binding) > > > > > > > > > > > > > > > > > > > > > > > > On Sat, Feb 5, 2022 at 10:53 AM Shqiprim Bunjaku < > > > > > > > > shqiprimbunj...@gmail.com> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > +1 (non-binding) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Sat, Feb 5, 2022 at 4:39 PM Chenya Zhang < > > > > > > > chenyazhangche...@gmail.com > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > +1 (non-binding) > > > > > > > > > > > > > > > > > > > > Thanks folks for leading this effort and making it happen > > so > > > > > fast! > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > Chenya > > > > > > > > > > > > > > > > > > > > On Sat, Feb 5, 2022 at 12:02 AM Gyula Fóra < > > > gyf...@apache.org> > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Hi Thomas! > > > > > > > > > > > > > > > > > > > > > > +1 (binding) from my side > > > > > > > > > > > > > > > > > > > > > > Happy to see this effort getting some traction! > > > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > Gyula > > > > > > > > > > > > > > > > > > > > > > On Sat, Feb 5, 2022 at 3:00 AM Thomas Weise < > > > t...@apache.org> > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > Hi everyone, > > > > > > > > > > > > > > > > > > > > > > > > I'd like to start a vote on FLIP-212: Introduce Flink > > > > > > Kubernetes > > > > > > > > > > > > Operator [1] which has been discussed in [2]. > > > > > > > > > > > > > > > > > > > > > > > > The vote will be open for at least 72 hours unless > > there > > > is > > > > > an > > > > > > > > > > > > objection or not enough votes. > > > > > > > > > > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-212%3A+Introduce+Flink+Kubernetes+Operator > > > > > > > > > > > > [2] > > > > > > > > > > https://lists.apache.org/thread/1z78t6rf70h45v7fbd2m93rm2y1bvh0z > > > > > > > > > > > > > > > > > > > > > > > > Thanks! > > > > > > > > > > > > Thomas > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > Israel Ekpo > > > > > > > > Lead Instructor, IzzyAcademy.com > > > > > > > > https://www.youtube.com/c/izzyacademy > > > > > > > > https://izzyacademy.com/ > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
Re: [VOTE] FLIP-212: Introduce Flink Kubernetes Operator
+1 (non-binding) Best Regards Peng Yuan On Mon, Feb 7, 2022 at 4:49 PM Danny Cranmer wrote: > +1 (binding) > > Thanks for this effort! > Danny Cranmer > > On Mon, Feb 7, 2022 at 7:59 AM Biao Geng wrote: > > > +1 (non-binding) > > > > Best, > > Biao Geng > > > > Peter Huang 于2022年2月7日周一 14:31写道: > > > > > +1 (non-binding) > > > > > > > > > Best Regards > > > Peter Huang > > > > > > On Sun, Feb 6, 2022 at 7:35 PM Yang Wang > wrote: > > > > > > > +1 (binding) > > > > > > > > Best, > > > > Yang > > > > > > > > Xintong Song 于2022年2月7日周一 10:25写道: > > > > > > > > > +1 (binding) > > > > > > > > > > Thank you~ > > > > > > > > > > Xintong Song > > > > > > > > > > > > > > > > > > > > On Mon, Feb 7, 2022 at 12:52 AM Márton Balassi < > > > balassi.mar...@gmail.com > > > > > > > > > > wrote: > > > > > > > > > > > +1 (binding) > > > > > > > > > > > > On Sat, Feb 5, 2022 at 5:35 PM Israel Ekpo > > > > > wrote: > > > > > > > > > > > > > I am very excited to see this. > > > > > > > > > > > > > > Thanks for driving the effort > > > > > > > > > > > > > > +1 (non-binding) > > > > > > > > > > > > > > > > > > > > > On Sat, Feb 5, 2022 at 10:53 AM Shqiprim Bunjaku < > > > > > > > shqiprimbunj...@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > +1 (non-binding) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Sat, Feb 5, 2022 at 4:39 PM Chenya Zhang < > > > > > > chenyazhangche...@gmail.com > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > +1 (non-binding) > > > > > > > > > > > > > > > > > > Thanks folks for leading this effort and making it happen > so > > > > fast! > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > Chenya > > > > > > > > > > > > > > > > > > On Sat, Feb 5, 2022 at 12:02 AM Gyula Fóra < > > gyf...@apache.org> > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hi Thomas! > > > > > > > > > > > > > > > > > > > > +1 (binding) from my side > > > > > > > > > > > > > > > > > > > > Happy to see this effort getting some traction! > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > Gyula > > > > > > > > > > > > > > > > > > > > On Sat, Feb 5, 2022 at 3:00 AM Thomas Weise < > > t...@apache.org> > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Hi everyone, > > > > > > > > > > > > > > > > > > > > > > I'd like to start a vote on FLIP-212: Introduce Flink > > > > > Kubernetes > > > > > > > > > > > Operator [1] which has been discussed in [2]. > > > > > > > > > > > > > > > > > > > > > > The vote will be open for at least 72 hours unless > there > > is > > > > an > > > > > > > > > > > objection or not enough votes. > > > > > > > > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-212%3A+Introduce+Flink+Kubernetes+Operator > > > > > > > > > > > [2] > > > > > > > > https://lists.apache.org/thread/1z78t6rf70h45v7fbd2m93rm2y1bvh0z > > > > > > > > > > > > > > > > > > > > > > Thanks! > > > > > > > > > > > Thomas > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > Israel Ekpo > > > > > > > Lead Instructor, IzzyAcademy.com > > > > > > > https://www.youtube.com/c/izzyacademy > > > > > > > https://izzyacademy.com/ > > > > > > > > > > > > > > > > > > > > > > > > > > > >
Re: [VOTE] FLIP-212: Introduce Flink Kubernetes Operator
+1 (binding) Thanks for this effort! Danny Cranmer On Mon, Feb 7, 2022 at 7:59 AM Biao Geng wrote: > +1 (non-binding) > > Best, > Biao Geng > > Peter Huang 于2022年2月7日周一 14:31写道: > > > +1 (non-binding) > > > > > > Best Regards > > Peter Huang > > > > On Sun, Feb 6, 2022 at 7:35 PM Yang Wang wrote: > > > > > +1 (binding) > > > > > > Best, > > > Yang > > > > > > Xintong Song 于2022年2月7日周一 10:25写道: > > > > > > > +1 (binding) > > > > > > > > Thank you~ > > > > > > > > Xintong Song > > > > > > > > > > > > > > > > On Mon, Feb 7, 2022 at 12:52 AM Márton Balassi < > > balassi.mar...@gmail.com > > > > > > > > wrote: > > > > > > > > > +1 (binding) > > > > > > > > > > On Sat, Feb 5, 2022 at 5:35 PM Israel Ekpo > > > wrote: > > > > > > > > > > > I am very excited to see this. > > > > > > > > > > > > Thanks for driving the effort > > > > > > > > > > > > +1 (non-binding) > > > > > > > > > > > > > > > > > > On Sat, Feb 5, 2022 at 10:53 AM Shqiprim Bunjaku < > > > > > > shqiprimbunj...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > +1 (non-binding) > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Sat, Feb 5, 2022 at 4:39 PM Chenya Zhang < > > > > > chenyazhangche...@gmail.com > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > +1 (non-binding) > > > > > > > > > > > > > > > > Thanks folks for leading this effort and making it happen so > > > fast! > > > > > > > > > > > > > > > > Best, > > > > > > > > Chenya > > > > > > > > > > > > > > > > On Sat, Feb 5, 2022 at 12:02 AM Gyula Fóra < > gyf...@apache.org> > > > > > wrote: > > > > > > > > > > > > > > > > > Hi Thomas! > > > > > > > > > > > > > > > > > > +1 (binding) from my side > > > > > > > > > > > > > > > > > > Happy to see this effort getting some traction! > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > Gyula > > > > > > > > > > > > > > > > > > On Sat, Feb 5, 2022 at 3:00 AM Thomas Weise < > t...@apache.org> > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hi everyone, > > > > > > > > > > > > > > > > > > > > I'd like to start a vote on FLIP-212: Introduce Flink > > > > Kubernetes > > > > > > > > > > Operator [1] which has been discussed in [2]. > > > > > > > > > > > > > > > > > > > > The vote will be open for at least 72 hours unless there > is > > > an > > > > > > > > > > objection or not enough votes. > > > > > > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-212%3A+Introduce+Flink+Kubernetes+Operator > > > > > > > > > > [2] > > > > > > https://lists.apache.org/thread/1z78t6rf70h45v7fbd2m93rm2y1bvh0z > > > > > > > > > > > > > > > > > > > > Thanks! > > > > > > > > > > Thomas > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > Israel Ekpo > > > > > > Lead Instructor, IzzyAcademy.com > > > > > > https://www.youtube.com/c/izzyacademy > > > > > > https://izzyacademy.com/ > > > > > > > > > > > > > > > > > > > > >
[jira] [Created] (FLINK-25972) org.apache.calcite.rel.metadata.JaninoRelMetadataProvider#HANDLERS might cause class leaks
Yun Gao created FLINK-25972: --- Summary: org.apache.calcite.rel.metadata.JaninoRelMetadataProvider#HANDLERS might cause class leaks Key: FLINK-25972 URL: https://issues.apache.org/jira/browse/FLINK-25972 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Affects Versions: 1.15.0 Reporter: Yun Gao JaninoRelMetadataProvider -> static field HANDLERS -> generated class GeneratedMetadataHandler_ColumnNullCount loaded by org.codehaus.janino.ByteArrayClassLoader. This cache seems to use the strong references. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25971) org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil#OBJECT_MAPPER_INSTANCE might cause class leaks
Yun Gao created FLINK-25971: --- Summary: org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil#OBJECT_MAPPER_INSTANCE might cause class leaks Key: FLINK-25971 URL: https://issues.apache.org/jira/browse/FLINK-25971 Project: Flink Issue Type: Sub-task Reporter: Yun Gao It might need a double check if JsonSerdeUtil#OBJECT_MAPPER_INSTANCE -> DefaultDeserializationContext _deserializationContext -> extends DeserializationContext -> DeserializerCache _cache -> _cachedDeserializers / _incompleteDeserializers might hold user ClassLoaders -- This message was sent by Atlassian Jira (v8.20.1#820001)
Re: [DISCUSS] FLIP-212: Introduce Flink Kubernetes Operator
Hi Gyula! You are right. I think some common flink config options can be put in the CR, other expert settings continue to be overwritten by flink, and then the user can choose to customize the configuration. Best Wishes, Peng Yuan On Mon, Feb 7, 2022 at 4:16 PM Gyula Fóra wrote: > Hi Yangze! > > This is not set in stone at the moment but the way I think it should work > is that first class config options in the CR should always take precedence > over the Flink config. > > In general we should not introduce too many arbitrary config options that > duplicate the flink configs without good reasons but the ones we introduce > should overwrite flink configs. > > We should discuss and decide together what config options to keep in the > flink conf and what to bring on the CR level. Resource related ones are > difficult because on one hand they are integral to every application, on > the other hand there are many expert settings that we should probably leave > in the conf. > > Cheers, > Gyula > > > > On Mon, Feb 7, 2022 at 8:28 AM Yangze Guo wrote: > > > Thanks everyone for the great effort. The FLIP looks really good. > > > > I just want to make sure the configuration priority in the CR example. > > It seems the requests resources or "taskManager. taskSlots" will be > > transferred to Flink internal config, e.g. > > "taskmanager.memory.process.size" and "taskmanager.numberOfTaskSlots", > > and override the one in "flinkConfiguration". Am I understanding this > > correctly? > > > > Best, > > Yangze Guo > > > > On Mon, Feb 7, 2022 at 10:22 AM Xintong Song > > wrote: > > > > > > Sorry for the late reply. We were out due to the public holidays in > > China. > > > > > > @Thomas, > > > > > > The intention is to support application management through operator and > > CR, > > > > which means there won't be any 2 step submission process, which as > you > > > > allude to would defeat the purpose of this project. The CR example > > shows > > > > the application part. Please note that the bare cluster support is an > > > > *additional* feature for scenarios that require external job > > management. Is > > > > there anything on the FLIP page that creates a different impression? > > > > > > > > > > Sounds good to me. I don't remember what created the impression of 2 > step > > > submission back then. I revisited the latest version of this FLIP and > it > > > looks good to me. > > > > > > @Gyula, > > > > > > Versioning: > > > > Versioning will be independent from Flink and the operator will > depend > > on a > > > > fixed flink version (in every given operator version). > > > > This should be the exact same setup as with Stateful Functions ( > > > > https://github.com/apache/flink-statefun). So independent release > > cycle > > > > but > > > > still within the Flink umbrella. > > > > > > > > > > Does this mean if someone wants to upgrade Flink to a version that is > > > released after the operator version that is being used, he/she would > need > > > to upgrade the operator version first? > > > I'm not questioning this, just trying to make sure I'm understanding > this > > > correctly. > > > > > > Thank you~ > > > > > > Xintong Song > > > > > > > > > > > > On Mon, Feb 7, 2022 at 3:14 AM Gyula Fóra > wrote: > > > > > > > Thank you Alexis, > > > > > > > > Will definitely check this out. You are right, Kotlin makes it > > difficult to > > > > adopt pieces of this code directly but I think it will be good to get > > > > inspiration for the architecture and look at how particular problems > > have > > > > been solved. It will be a great help for us I am sure. > > > > > > > > Cheers, > > > > Gyula > > > > > > > > On Sat, Feb 5, 2022 at 12:28 PM Alexis Sarda-Espinosa < > > > > alexis.sarda-espin...@microfocus.com> wrote: > > > > > > > > > Hi everyone, > > > > > > > > > > just wanted to mention that my employer agreed to open source the > > PoC I > > > > > developed: https://github.com/MicroFocus/opsb-flink-k8s-operator > > > > > > > > > > I understand the concern for maintainability, so Gradle & Kotlin > > might > > > > not > > > > > be appealing to you, but at least it gives you another reference. > The > > > > Helm > > > > > resources in particular might be useful. > > > > > > > > > > There are bits and pieces there referring to Flink sessions, but > > those > > > > are > > > > > just placeholders, the functioning parts use application mode with > > native > > > > > integration. > > > > > > > > > > Regards, > > > > > Alexis. > > > > > > > > > > > > > > > From: Thomas Weise > > > > > Sent: Saturday, February 5, 2022 2:41 AM > > > > > To: dev > > > > > Subject: Re: [DISCUSS] FLIP-212: Introduce Flink Kubernetes > Operator > > > > > > > > > > Hi, > > > > > > > > > > Thanks for the continued feedback and discussion. Looks like we are > > > > > ready to start a VOTE, I will initiate it shortly. > > > > > > > > > > In parallel it would be good to find the repository name. > > > > > > > > > > My
[jira] [Created] (FLINK-25970) SerializedThrowable should record type of the original throwable.
Xintong Song created FLINK-25970: Summary: SerializedThrowable should record type of the original throwable. Key: FLINK-25970 URL: https://issues.apache.org/jira/browse/FLINK-25970 Project: Flink Issue Type: Improvement Components: API / Core Affects Versions: 1.14.3 Reporter: Xintong Song Currently, only the message and stack of the original throwable is preserved in {{{}SerializedThrowable{}}}, while the type of the original throwable is discarded. Sometimes, it would be helpful if message of {{SerializedThrowable}} can also include the full class name of the original throwable. E.g., in the following stack. {code:java} Caused by: org.apache.flink.util.SerializedThrowable at java.util.HashMap$HashIterator.nextNode(HashMap.java:1437) ~[?:1.8.0_102] at java.util.HashMap$EntryIterator.next(HashMap.java:1471) ~[?:1.8.0_102] at java.util.HashMap$EntryIterator.next(HashMap.java:1469) ~[?:1.8.0_102] ... {code} It's not that easy to understand what is wrong from this stack. JDK does not provide a message for the original exception. We have to look into the JDK source codes to find out what's going on. Sometimes it's even more annoying having to look for the JDK source codes of the exactly same version in order to match the line numbers. Turns out the original exception was a {{ConcurrentModificationException}}. I think it would be much more straightforward if we can have a stack like the following. {code} Caused by: org.apache.flink.util.SerializedThrowable: java.util.ConcurrentModificationException at java.util.HashMap$HashIterator.nextNode(HashMap.java:1437) ~[?:1.8.0_102] at java.util.HashMap$EntryIterator.next(HashMap.java:1471) ~[?:1.8.0_102] at java.util.HashMap$EntryIterator.next(HashMap.java:1469) ~[?:1.8.0_102] ... {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25969) org.apache.flink.table.runtime.generated.CompileUtils might cause class leaks
Yun Gao created FLINK-25969: --- Summary: org.apache.flink.table.runtime.generated.CompileUtils might cause class leaks Key: FLINK-25969 URL: https://issues.apache.org/jira/browse/FLINK-25969 Project: Flink Issue Type: Sub-task Components: Table SQL / Runtime Affects Versions: 1.15.0 Reporter: Yun Gao CompileUtils has two static caches, namely COMPILED_CACHE and COMPILED_EXPRESSION_CACHE. COMPILED_CACHE is check that it might cache the user ClassLoaders with strong references, thus it might need be improved. COMPILED_EXPRESSION_CACHE would need a double check. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25968) Possible class leaks in flink-table / sql modules
Yun Gao created FLINK-25968: --- Summary: Possible class leaks in flink-table / sql modules Key: FLINK-25968 URL: https://issues.apache.org/jira/browse/FLINK-25968 Project: Flink Issue Type: Bug Components: Table SQL / Planner, Table SQL / Runtime Affects Versions: 1.15.0 Reporter: Yun Gao This is the umbrella issues for possible class leaks in flink-table / sql planner and runtimes. Currently for a flink cluster, the flink classes are loaded by the system ClassLoader while each job would have separate user ClassLoaders. In this case, if some class loaded by the system ClassLoader has static variables that reference the classes loaded by the user ClassLoaders, the user ClassLoaders would not be able to be released, which might cause class leak in some way. -- This message was sent by Atlassian Jira (v8.20.1#820001)
Re: [DISCUSS] Checkpointing (partially) failing jobs
Hi everyone, Yuan and Gen could you elaborate why rescaling is a problem if we say that separate pipelined regions can take checkpoints independently? Conceptually, I somehow think that a pipelined region that is failed and cannot create a new checkpoint is more or less the same as a pipelined region that didn't get new input or a very very slow pipelined region which couldn't read new records since the last checkpoint (assuming that the checkpoint coordinator can create a global checkpoint by combining individual checkpoints (e.g. taking the last completed checkpoint from each pipelined region)). If this comparison is correct, then this would mean that we have rescaling problems under the latter two cases. Cheers, Till On Mon, Feb 7, 2022 at 8:55 AM Gen Luo wrote: > Hi Gyula, > > Thanks for sharing the idea. As Yuan mentioned, I think we can discuss this > within two scopes. One is the job subgraph, the other is the execution > subgraph, which I suppose is the same as PipelineRegion. > > An idea is to individually checkpoint the PipelineRegions, for the > recovering in a single run. > > Flink has now supported PipelineRegion based failover, with a subset of a > global checkpoint snapshot. The checkpoint barriers are spread within a > PipelineRegion, so the checkpointing of individual PipelineRegions is > actually independent. Since in a single run of a job, the PipelineRegions > are fixed, we can individually checkpoint separated PipelineRegions, > despite what status the other PipelineRegions are, and use a snapshot of a > failing region to recover, instead of the subset of a global snapshot. This > can support separated job subgraphs as well, since they will also be > separated into different PipelineRegions. I think this can fulfill your > needs. > > In fact the individual snapshots of all PipelineRegions can form a global > snapshot, and the alignment of snapshots of individual regions is not > necessary. But rescaling this global snapshot can be potentially complex. I > think it's better to use the individual snapshots in a single run, and take > a global checkpoint/savepoint before restarting the job, rescaling it or > not. > > A major issue of this plan is that it breaks the checkpoint mechanism of > Flink. As far as I know, even in the approximate recovery, the snapshot > used to recover a single task is still a part of a global snapshot. To > implement the individual checkpointing of PipelineRegions, there may need > to be a checkpoint coordinator for each PipelineRegion, and a new global > checkpoint coordinator. When the scale goes up, there can be many > individual regions, which can be a big burden to the job manager. The > meaning of the checkpoint id will also be changed, which can affect many > aspects. There can be lots of work and risks, and the risks still exist if > we only individually checkpoint separated job subgraphs, since the > mechanism is still broken. If that is what you need, maybe separating them > into different jobs is an easier and better choice, as Caizhi and Yuan > mentioned. > > On Mon, Feb 7, 2022 at 11:39 AM Yuan Mei wrote: > > > Hey Gyula, > > > > That's a very interesting idea. The discussion about the `Individual` vs > > `Global` checkpoint was raised before, but the main concern was from two > > aspects: > > > > - Non-deterministic replaying may lead to an inconsistent view of > > checkpoint > > - It is not easy to form a clear cut of past and future and hence no > clear > > cut of where the start point of the source should begin to replay from. > > > > Starting from independent subgraphs as you proposed may be a good > starting > > point. However, when we talk about subgraph, do we mention it as a job > > subgraph (each vertex is one or more operators) or execution subgraph > (each > > vertex is a task instance)? > > > > If it is a job subgraph, then indeed, why not separate it into multiple > > jobs as Caizhi mentioned. > > If it is an execution subgraph, then it is difficult to handle rescaling > > due to inconsistent views of checkpoints between tasks of the same > > operator. > > > > `Individual/Subgraph Checkpointing` is definitely an interesting > direction > > to think of, and I'd love to hear more from you! > > > > Best, > > > > Yuan > > > > > > > > > > > > > > > > On Mon, Feb 7, 2022 at 10:16 AM Caizhi Weng > wrote: > > > > > Hi Gyula! > > > > > > Thanks for raising this discussion. I agree that this will be an > > > interesting feature but I actually have some doubts about the > motivation > > > and use case. If there are multiple individual subgraphs in the same > job, > > > why not just distribute them to multiple jobs so that each job contains > > > only one individual graph and can now fail without disturbing the > others? > > > > > > > > > Gyula Fóra 于2022年2月7日周一 05:22写道: > > > > > > > Hi all! > > > > > > > > At the moment checkpointing only works for healthy jobs with all > > running > > > > (or some finished) tasks. This sounds reasonabl
Re: [DISCUSS] FLIP-212: Introduce Flink Kubernetes Operator
Hi Yangze! This is not set in stone at the moment but the way I think it should work is that first class config options in the CR should always take precedence over the Flink config. In general we should not introduce too many arbitrary config options that duplicate the flink configs without good reasons but the ones we introduce should overwrite flink configs. We should discuss and decide together what config options to keep in the flink conf and what to bring on the CR level. Resource related ones are difficult because on one hand they are integral to every application, on the other hand there are many expert settings that we should probably leave in the conf. Cheers, Gyula On Mon, Feb 7, 2022 at 8:28 AM Yangze Guo wrote: > Thanks everyone for the great effort. The FLIP looks really good. > > I just want to make sure the configuration priority in the CR example. > It seems the requests resources or "taskManager. taskSlots" will be > transferred to Flink internal config, e.g. > "taskmanager.memory.process.size" and "taskmanager.numberOfTaskSlots", > and override the one in "flinkConfiguration". Am I understanding this > correctly? > > Best, > Yangze Guo > > On Mon, Feb 7, 2022 at 10:22 AM Xintong Song > wrote: > > > > Sorry for the late reply. We were out due to the public holidays in > China. > > > > @Thomas, > > > > The intention is to support application management through operator and > CR, > > > which means there won't be any 2 step submission process, which as you > > > allude to would defeat the purpose of this project. The CR example > shows > > > the application part. Please note that the bare cluster support is an > > > *additional* feature for scenarios that require external job > management. Is > > > there anything on the FLIP page that creates a different impression? > > > > > > > Sounds good to me. I don't remember what created the impression of 2 step > > submission back then. I revisited the latest version of this FLIP and it > > looks good to me. > > > > @Gyula, > > > > Versioning: > > > Versioning will be independent from Flink and the operator will depend > on a > > > fixed flink version (in every given operator version). > > > This should be the exact same setup as with Stateful Functions ( > > > https://github.com/apache/flink-statefun). So independent release > cycle > > > but > > > still within the Flink umbrella. > > > > > > > Does this mean if someone wants to upgrade Flink to a version that is > > released after the operator version that is being used, he/she would need > > to upgrade the operator version first? > > I'm not questioning this, just trying to make sure I'm understanding this > > correctly. > > > > Thank you~ > > > > Xintong Song > > > > > > > > On Mon, Feb 7, 2022 at 3:14 AM Gyula Fóra wrote: > > > > > Thank you Alexis, > > > > > > Will definitely check this out. You are right, Kotlin makes it > difficult to > > > adopt pieces of this code directly but I think it will be good to get > > > inspiration for the architecture and look at how particular problems > have > > > been solved. It will be a great help for us I am sure. > > > > > > Cheers, > > > Gyula > > > > > > On Sat, Feb 5, 2022 at 12:28 PM Alexis Sarda-Espinosa < > > > alexis.sarda-espin...@microfocus.com> wrote: > > > > > > > Hi everyone, > > > > > > > > just wanted to mention that my employer agreed to open source the > PoC I > > > > developed: https://github.com/MicroFocus/opsb-flink-k8s-operator > > > > > > > > I understand the concern for maintainability, so Gradle & Kotlin > might > > > not > > > > be appealing to you, but at least it gives you another reference. The > > > Helm > > > > resources in particular might be useful. > > > > > > > > There are bits and pieces there referring to Flink sessions, but > those > > > are > > > > just placeholders, the functioning parts use application mode with > native > > > > integration. > > > > > > > > Regards, > > > > Alexis. > > > > > > > > > > > > From: Thomas Weise > > > > Sent: Saturday, February 5, 2022 2:41 AM > > > > To: dev > > > > Subject: Re: [DISCUSS] FLIP-212: Introduce Flink Kubernetes Operator > > > > > > > > Hi, > > > > > > > > Thanks for the continued feedback and discussion. Looks like we are > > > > ready to start a VOTE, I will initiate it shortly. > > > > > > > > In parallel it would be good to find the repository name. > > > > > > > > My suggestion would be: flink-kubernetes-operator > > > > > > > > I thought "flink-operator" could be a bit misleading since the term > > > > operator already has a meaning in Flink. > > > > > > > > I also considered "flink-k8s-operator" but that would be almost > > > > identical to existing operator implementations and could lead to > > > > confusion in the future. > > > > > > > > Thoughts? > > > > > > > > Thanks, > > > > Thomas > > > > > > > > > > > > > > > > On Fri, Feb 4, 2022 at 5:15 AM Gyula Fóra > wrote: > > > > > > > > > > Hi Danny, > > > > > > > > > > So f
Re: [VOTE] FLIP-211: Kerberos delegation token framework
+1 (binding) On 21/01/2022 15:57, Gabor Somogyi wrote: Hi devs, I would like to start the vote for FLIP-211 [1], which was discussed and reached a consensus in the discussion thread [2]. The vote will be open for at least 72h, unless there is an objection or not enough votes. BR, G [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-211%3A+Kerberos+delegation+token+framework [2] https://lists.apache.org/thread/cvwknd5fhohj0wfv8mfwn70jwpjvxrjj