[RESULT][VOTE] FLIP-211: Kerberos delegation token framework

2022-02-07 Thread Gabor Somogyi
Hi devs,

FLIP-211 [1] Has been accepted.
There were 3 binding votes and 2 non-binding in favor.
None against.

Votes are in the order of arrival:

Binding:
Gyula Fora
Marton Balassi
Chesnay Schepler

Non-binding:
Junfan Zhang
David Moravek

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-211%3A+Kerberos+delegation+token+framework

BR,
G


[jira] [Created] (FLINK-25998) Flink akka runs into NoClassDefFoundError on shutdown

2022-02-07 Thread Robert Metzger (Jira)
Robert Metzger created FLINK-25998:
--

 Summary: Flink akka runs into NoClassDefFoundError on shutdown
 Key: FLINK-25998
 URL: https://issues.apache.org/jira/browse/FLINK-25998
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.15.0
Reporter: Robert Metzger


When trying to start a standalone jobmanager on an unavailable port, I see the 
following unexpected exception:

{code}
2022-02-08 08:07:18,299 INFO  akka.remote.Remoting  
   [] - Starting remoting
2022-02-08 08:07:18,357 ERROR akka.remote.transport.netty.NettyTransport
   [] - failed to bind to /0.0.0.0:6123, shutting down Netty transport
2022-02-08 08:07:18,373 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Shutting 
StandaloneApplicationClusterEntryPoint down with application status FAILED. 
Diagnostics java.net.BindException: Could not start actor system on any port in 
port range 6123
at 
org.apache.flink.runtime.rpc.akka.AkkaBootstrapTools.startRemoteActorSystem(AkkaBootstrapTools.java:133)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils$AkkaRpcServiceBuilder.createAndStart(AkkaRpcServiceUtils.java:358)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils$AkkaRpcServiceBuilder.createAndStart(AkkaRpcServiceUtils.java:327)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils$AkkaRpcServiceBuilder.createAndStart(AkkaRpcServiceUtils.java:247)
at 
org.apache.flink.runtime.rpc.RpcUtils.createRemoteRpcService(RpcUtils.java:191)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:334)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:253)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:203)
at 
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:200)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:684)
at 
org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint.main(StandaloneApplicationClusterEntryPoint.java:82)
.
2022-02-08 08:07:18,377 INFO  
akka.remote.RemoteActorRefProvider$RemotingTerminator[] - Shutting down 
remote daemon.
2022-02-08 08:07:18,377 ERROR org.apache.flink.util.FatalExitExceptionHandler   
   [] - FATAL: Thread 'flink-akka.remote.default-remote-dispatcher-6' 
produced an uncaught exception. Stopping the process...
java.lang.NoClassDefFoundError: 
akka/actor/dungeon/FaultHandling$$anonfun$handleNonFatalOrInterruptedException$1
at 
akka.actor.dungeon.FaultHandling.handleNonFatalOrInterruptedException(FaultHandling.scala:334)
 ~[flink-rpc-akka_ce724655-52fe-4b3a-8cdc-b79ab446e34d.jar:1.15-SNAPSHOT]
at 
akka.actor.dungeon.FaultHandling.handleNonFatalOrInterruptedException$(FaultHandling.scala:334)
 ~[flink-rpc-akka_ce724655-52fe-4b3a-8cdc-b79ab446e34d.jar:1.15-SNAPSHOT]
at 
akka.actor.ActorCell.handleNonFatalOrInterruptedException(ActorCell.scala:411) 
~[flink-rpc-akka_ce724655-52fe-4b3a-8cdc-b79ab446e34d.jar:1.15-SNAPSHOT]
at akka.actor.ActorCell.invoke(ActorCell.scala:551) 
~[flink-rpc-akka_ce724655-52fe-4b3a-8cdc-b79ab446e34d.jar:1.15-SNAPSHOT]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) 
~[flink-rpc-akka_ce724655-52fe-4b3a-8cdc-b79ab446e34d.jar:1.15-SNAPSHOT]
at akka.dispatch.Mailbox.run(Mailbox.scala:231) 
~[flink-rpc-akka_ce724655-52fe-4b3a-8cdc-b79ab446e34d.jar:1.15-SNAPSHOT]
at akka.dispatch.Mailbox.exec(Mailbox.scala:243) 
[flink-rpc-akka_ce724655-52fe-4b3a-8cdc-b79ab446e34d.jar:1.15-SNAPSHOT]
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) 
[?:1.8.0_312]
at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) 
[?:1.8.0_312]
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) 
[?:1.8.0_312]
at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) 
[?:1.8.0_312]
Caused by: java.lang.ClassNotFoundException: 
akka.actor.dungeon.FaultHandling$$anonfun$handleNonFatalOrInterruptedException$1
at java.net.URLClassLoader.findClass(URLClassLoader.java:387) 
~[?:1.8.0_312]
at java.lang.ClassLoader.loadClass(ClassLoader.java:419) ~[?:1.8.0_312]
at 
org.apache.flink.core.classloading.ComponentClassLoader.loadClassFromComponentOnly(ComponentClassLoader.java:149)
 ~[flink-dist-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
at 
org.apache.flink.core.classloading.ComponentClassLoader.loadClass(ComponentClassLoader

Re: [DISCUSS] FLIP-212: Introduce Flink Kubernetes Operator

2022-02-07 Thread Yangze Guo
Thanks @Thomas and @Gyula.
+1 to only introduce necessary and reasonable shorthand proxy parameters.

Best,
Yangze Guo

On Tue, Feb 8, 2022 at 12:47 PM Thomas Weise  wrote:
>
> @Yangze thanks for bringing up the configuration priority. This is
> quite important indeed and should be mentioned in the FLIP.
>
> I agree with the sentiment that whenever possible we should use the
> native configuration directly (either Flink native settings or k8s pod
> template), rather than introducing proxy parameters in the CRD. That
> certainly applies to taskManager.taskSlots which can be specified
> directly under flinkConfiguration.
>
> Thanks @Alexis for the pointers!
>
> Regarding memory: I'm leaning towards starting from total memory at
> the k8s resource level and let Flink derive components by default. For
> many users that would be a more intuitive approach than specifying the
> components. So container memory -> taskmanager.memory.process.size ->
>  [1]
>
> With that approach we could also extract the resource spec from the
> pod template. Although setting memory is something necessary pretty
> much always and defining the pod template not necessarily. Having the
> shorthand proxy parameter may be a good compromise.
>
> Cheers,
> Thomas
>
> [1] 
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/memory/mem_setup/
>
> On Mon, Feb 7, 2022 at 4:33 AM Alexis Sarda-Espinosa
>  wrote:
> >
> > Danny Cranmer mentioned they are interested in standalone mode, and I am 
> > too, so I just wanted to say that if that development starts in parallel, I 
> > might be able to contribute a little.
> >
> > Regarding the CRD, I agree it would be nice to avoid as many "duplications" 
> > as possible if pod templates are to be used. In my PoC I even tried to make 
> > use of existing configuration options like kubernetes.container.image & 
> > pipeline.jars [1]. For CPU/Memory resources, the discussion in [2] might be 
> > relevant.
> >
> > [1] 
> > https://github.com/MicroFocus/opsb-flink-k8s-operator/blob/main/kubernetes/sample_batch_job.yaml
> > [2] https://issues.apache.org/jira/browse/FLINK-24150
> >
> > Regards,
> > Alexis.
> >
> > -Original Message-
> > From: K Fred 
> > Sent: Montag, 7. Februar 2022 09:36
> > To: dev@flink.apache.org
> > Subject: Re: [DISCUSS] FLIP-212: Introduce Flink Kubernetes Operator
> >
> > Hi Gyula!
> >
> > You are right. I think some common flink config options can be put in the 
> > CR, other expert settings continue to be overwritten by flink, and then the 
> > user can choose to customize the configuration.
> >
> > Best Wishes,
> > Peng Yuan
> >
> > On Mon, Feb 7, 2022 at 4:16 PM Gyula Fóra  wrote:
> >
> > > Hi Yangze!
> > >
> > > This is not set in stone at the moment but the way I think it should
> > > work is that first class config options in the CR should always take
> > > precedence over the Flink config.
> > >
> > > In general we should not introduce too many arbitrary config options
> > > that duplicate the flink configs without good reasons but the ones we
> > > introduce should overwrite flink configs.
> > >
> > > We should discuss and decide together what config options to keep in
> > > the flink conf and what to bring on the CR level. Resource related
> > > ones are difficult because on one hand they are integral to every
> > > application, on the other hand there are many expert settings that we
> > > should probably leave in the conf.
> > >
> > > Cheers,
> > > Gyula
> > >
> > >
> > >
> > > On Mon, Feb 7, 2022 at 8:28 AM Yangze Guo  wrote:
> > >
> > > > Thanks everyone for the great effort. The FLIP looks really good.
> > > >
> > > > I just want to make sure the configuration priority in the CR example.
> > > > It seems the requests resources or "taskManager. taskSlots" will be
> > > > transferred to Flink internal config, e.g.
> > > > "taskmanager.memory.process.size" and
> > > > "taskmanager.numberOfTaskSlots", and override the one in
> > > > "flinkConfiguration". Am I understanding this correctly?
> > > >
> > > > Best,
> > > > Yangze Guo
> > > >
> > > > On Mon, Feb 7, 2022 at 10:22 AM Xintong Song 
> > > > wrote:
> > > > >
> > > > > Sorry for the late reply. We were out due to the public holidays
> > > > > in
> > > > China.
> > > > >
> > > > > @Thomas,
> > > > >
> > > > > The intention is to support application management through
> > > > > operator and
> > > > CR,
> > > > > > which means there won't be any 2 step submission process, which
> > > > > > as
> > > you
> > > > > > allude to would defeat the purpose of this project. The CR
> > > > > > example
> > > > shows
> > > > > > the application part. Please note that the bare cluster support
> > > > > > is an
> > > > > > *additional* feature for scenarios that require external job
> > > > management. Is
> > > > > > there anything on the FLIP page that creates a different impression?
> > > > > >
> > > > >
> > > > > Sounds good to me. I don't remember what created the impression of
> > > > > 2
> > > step
> > 

[jira] [Created] (FLINK-25997) FsStateChangelogWriter cannot work well when there is no change appended

2022-02-07 Thread Hangxiang Yu (Jira)
Hangxiang Yu created FLINK-25997:


 Summary: FsStateChangelogWriter cannot work well when there is no 
change appended
 Key: FLINK-25997
 URL: https://issues.apache.org/jira/browse/FLINK-25997
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Affects Versions: 1.15.0
Reporter: Hangxiang Yu


If there is no change appended, we should use the old sqn, but we always use 
sqn.next() currently.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] FLIP-212: Introduce Flink Kubernetes Operator

2022-02-07 Thread Thomas Weise
@Yangze thanks for bringing up the configuration priority. This is
quite important indeed and should be mentioned in the FLIP.

I agree with the sentiment that whenever possible we should use the
native configuration directly (either Flink native settings or k8s pod
template), rather than introducing proxy parameters in the CRD. That
certainly applies to taskManager.taskSlots which can be specified
directly under flinkConfiguration.

Thanks @Alexis for the pointers!

Regarding memory: I'm leaning towards starting from total memory at
the k8s resource level and let Flink derive components by default. For
many users that would be a more intuitive approach than specifying the
components. So container memory -> taskmanager.memory.process.size ->
 [1]

With that approach we could also extract the resource spec from the
pod template. Although setting memory is something necessary pretty
much always and defining the pod template not necessarily. Having the
shorthand proxy parameter may be a good compromise.

Cheers,
Thomas

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/memory/mem_setup/

On Mon, Feb 7, 2022 at 4:33 AM Alexis Sarda-Espinosa
 wrote:
>
> Danny Cranmer mentioned they are interested in standalone mode, and I am too, 
> so I just wanted to say that if that development starts in parallel, I might 
> be able to contribute a little.
>
> Regarding the CRD, I agree it would be nice to avoid as many "duplications" 
> as possible if pod templates are to be used. In my PoC I even tried to make 
> use of existing configuration options like kubernetes.container.image & 
> pipeline.jars [1]. For CPU/Memory resources, the discussion in [2] might be 
> relevant.
>
> [1] 
> https://github.com/MicroFocus/opsb-flink-k8s-operator/blob/main/kubernetes/sample_batch_job.yaml
> [2] https://issues.apache.org/jira/browse/FLINK-24150
>
> Regards,
> Alexis.
>
> -Original Message-
> From: K Fred 
> Sent: Montag, 7. Februar 2022 09:36
> To: dev@flink.apache.org
> Subject: Re: [DISCUSS] FLIP-212: Introduce Flink Kubernetes Operator
>
> Hi Gyula!
>
> You are right. I think some common flink config options can be put in the CR, 
> other expert settings continue to be overwritten by flink, and then the user 
> can choose to customize the configuration.
>
> Best Wishes,
> Peng Yuan
>
> On Mon, Feb 7, 2022 at 4:16 PM Gyula Fóra  wrote:
>
> > Hi Yangze!
> >
> > This is not set in stone at the moment but the way I think it should
> > work is that first class config options in the CR should always take
> > precedence over the Flink config.
> >
> > In general we should not introduce too many arbitrary config options
> > that duplicate the flink configs without good reasons but the ones we
> > introduce should overwrite flink configs.
> >
> > We should discuss and decide together what config options to keep in
> > the flink conf and what to bring on the CR level. Resource related
> > ones are difficult because on one hand they are integral to every
> > application, on the other hand there are many expert settings that we
> > should probably leave in the conf.
> >
> > Cheers,
> > Gyula
> >
> >
> >
> > On Mon, Feb 7, 2022 at 8:28 AM Yangze Guo  wrote:
> >
> > > Thanks everyone for the great effort. The FLIP looks really good.
> > >
> > > I just want to make sure the configuration priority in the CR example.
> > > It seems the requests resources or "taskManager. taskSlots" will be
> > > transferred to Flink internal config, e.g.
> > > "taskmanager.memory.process.size" and
> > > "taskmanager.numberOfTaskSlots", and override the one in
> > > "flinkConfiguration". Am I understanding this correctly?
> > >
> > > Best,
> > > Yangze Guo
> > >
> > > On Mon, Feb 7, 2022 at 10:22 AM Xintong Song 
> > > wrote:
> > > >
> > > > Sorry for the late reply. We were out due to the public holidays
> > > > in
> > > China.
> > > >
> > > > @Thomas,
> > > >
> > > > The intention is to support application management through
> > > > operator and
> > > CR,
> > > > > which means there won't be any 2 step submission process, which
> > > > > as
> > you
> > > > > allude to would defeat the purpose of this project. The CR
> > > > > example
> > > shows
> > > > > the application part. Please note that the bare cluster support
> > > > > is an
> > > > > *additional* feature for scenarios that require external job
> > > management. Is
> > > > > there anything on the FLIP page that creates a different impression?
> > > > >
> > > >
> > > > Sounds good to me. I don't remember what created the impression of
> > > > 2
> > step
> > > > submission back then. I revisited the latest version of this FLIP
> > > > and
> > it
> > > > looks good to me.
> > > >
> > > > @Gyula,
> > > >
> > > > Versioning:
> > > > > Versioning will be independent from Flink and the operator will
> > depend
> > > on a
> > > > > fixed flink version (in every given operator version).
> > > > > This should be the exact same setup as with Stateful Functions (
> > > > > https://git

[jira] [Created] (FLINK-25996) Introduce job property isDynamicGraph to ExecutionConfig

2022-02-07 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-25996:
---

 Summary: Introduce job property isDynamicGraph to ExecutionConfig
 Key: FLINK-25996
 URL: https://issues.apache.org/jira/browse/FLINK-25996
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Zhu Zhu
 Fix For: 1.15.0


To enable FLINK-25995 and FLINK-25046 only in dynamic graph scenario, we need a 
property ExecutionConfig#isDynamicGraph. In the first step, the property will 
be decided automatically, true iff config {{jobmanager.scheduler}} is 
{{AdaptiveBatch}}.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25995) Make implicit assumption of SQL local keyBy/groupBy explicit

2022-02-07 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-25995:
---

 Summary: Make implicit assumption of SQL local keyBy/groupBy 
explicit
 Key: FLINK-25995
 URL: https://issues.apache.org/jira/browse/FLINK-25995
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Affects Versions: 1.15.0
Reporter: Zhu Zhu
 Fix For: 1.15.0


If there are multiple consecutive the same groupBy(i.e. keyBy), SQL planner 
will change them except the first one to use forward partitioner, so that these 
operators can be chained to reduce unnecessary shuffles.
However, sometimes the local keyBy operators are not chained (e.g. multiple 
inputs), and  this kind of forward partitioners will turn into forward job 
edges. These forward edges still have the local keyBy assumption, so that they 
cannot be changed into rescale/rebalance edges, otherwise it can lead to 
incorrect results. This prevents the adaptive batch scheduler from determining 
parallelism for other forward edge downstream job vertices (see FLINK-25046).
To solve it, I propose to introduce a new {{ForwardForRescalePartitioner}}. 
When SQL planner optimizes the case of multiple consecutive the same groupBy, 
it should use the proposed partitioner, so that the runtime framework can 
further decide whether the partitioner can be changed to rescale or not.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] Checkpointing (partially) failing jobs

2022-02-07 Thread Gen Luo
Hi,

I'm thinking about Yuan's case. Let's assume that the case is running in
current Flink:
1. CP8 finishes
2. For some reason, PR2 stops consuming records from the source (but is not
stuck), and PR1 continues consuming new records.
3. CP9 and CP10 finish
4. PR2 starts to consume quickly to catch up with PR1, and reaches the same
final status with that in Yuan's case before CP11 starts.

I support that in this case, the status of the job can be the same as in
Yuan's case, and the snapshot (including source states) taken at CP10
should be the same as the composed global snapshot in Yuan's case, which is
combining CP10 of PR1 and CP8 of PR2. This should be true if neither failed
checkpointing nor uncommitted consuming have side effects, both of which
can break the exactly-once semantics when replaying. So I think there
should be no difference between rescaling the combined global snapshot or
the globally taken one, i.e. if the input partitions are not independent,
we are probably not able to rescale the source state in the current Flink
eiter.

And @Thomas, I do agree that the operational burden is
significantly reduced, while I'm a little afraid that checkpointing the
subgraphs individually may increase most of the runtime overhead back
again. Maybe we can find a better way to implement this.

On Tue, Feb 8, 2022 at 5:11 AM Thomas Weise  wrote:

> Hi,
>
> Thanks for opening this discussion! The proposed enhancement would be
> interesting for use cases in our infrastructure as well.
>
> There are scenarios where it makes sense to have multiple disconnected
> subgraphs in a single job because it can significantly reduce the
> operational burden as well as the runtime overhead. Since we allow
> subgraphs to recover independently, then why not allow them to make
> progress independently also, which would imply that checkpointing must
> succeed for non affected subgraphs as certain behavior is tied to
> checkpoint completion, like Kafka offset commit, file output etc.
>
> As for source offset redistribution, offset/position needs to be tied
> to splits (in FLIP-27) and legacy sources. (It applies to both Kafka
> and Kinesis legacy sources and FLIP-27 Kafka source.). With the new
> source framework, it would be hard to implement a source with correct
> behavior that does not track the position along with the split.
>
> In Yuan's example, is there a reason why CP8 could not be promoted to
> CP10 by the coordinator for PR2 once it receives the notification that
> CP10 did not complete? It appears that should be possible since in its
> effect it should be no different than no data processed between CP8
> and CP10?
>
> Thanks,
> Thomas
>
> On Mon, Feb 7, 2022 at 2:36 AM Till Rohrmann  wrote:
> >
> > Thanks for the clarification Yuan and Gen,
> >
> > I agree that the checkpointing of the sources needs to support the
> > rescaling case, otherwise it does not work. Is there currently a source
> > implementation where this wouldn't work? For Kafka it should work because
> > we store the offset per assigned partition. For Kinesis it is probably
> the
> > same. For the Filesource we store the set of unread input splits in the
> > source coordinator and the state of the assigned splits in the sources.
> > This should probably also work since new splits are only handed out to
> > running tasks.
> >
> > Cheers,
> > Till
> >
> > On Mon, Feb 7, 2022 at 10:29 AM Yuan Mei  wrote:
> >
> > > Hey Till,
> > >
> > > > Why rescaling is a problem for pipelined regions/independent
> execution
> > > subgraphs:
> > >
> > > Take a simplified example :
> > > job graph : source  (2 instances) -> sink (2 instances)
> > > execution graph:
> > > source (1/2)  -> sink (1/2)   [pieplined region 1]
> > > source (2/2)  -> sink (2/2)   [pieplined region 2]
> > >
> > > Let's assume checkpoints are still triggered globally, meaning
> different
> > > pipelined regions share the global checkpoint id (PR1 CP1 matches with
> PR2
> > > CP1).
> > >
> > > Now let's assume PR1 completes CP10 and PR2 completes CP8.
> > >
> > > Let's say we want to rescale to parallelism 3 due to increased input.
> > >
> > > - Notice that we can not simply rescale based on the latest completed
> > > checkpoint (CP8), because PR1 has already had data (CP8 -> CP10) output
> > > externally.
> > > - Can we take CP10 from PR1 and CP8 from PR2? I think it depends on
> how the
> > > source's offset redistribution is implemented.
> > >The answer is yes if we treat each input partition as independent
> from
> > > each other, *but I am not sure whether we can make that assumption*.
> > >
> > > If not, the rescaling cannot happen until PR1 and PR2 are aligned with
> CPs.
> > >
> > > Best
> > > -Yuan
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Mon, Feb 7, 2022 at 4:17 PM Till Rohrmann 
> wrote:
> > >
> > > > Hi everyone,
> > > >
> > > > Yuan and Gen could you elaborate why rescaling is a problem if we say
> > > that
> > > > separate pipelined regions can take checkpoints indepen

[jira] [Created] (FLINK-25994) Implement FileStoreExpire

2022-02-07 Thread Caizhi Weng (Jira)
Caizhi Weng created FLINK-25994:
---

 Summary: Implement FileStoreExpire
 Key: FLINK-25994
 URL: https://issues.apache.org/jira/browse/FLINK-25994
 Project: Flink
  Issue Type: Sub-task
  Components: Table Store
Reporter: Caizhi Weng
 Fix For: 1.15.0


Currently FileStoreExpire does not have an implementation. We need an 
implementation to clean up old snapshots and related files.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25993) Option to disable Kryo.setRegistrationRequired(false)

2022-02-07 Thread Shane Bishop (Jira)
Shane Bishop created FLINK-25993:


 Summary: Option to disable Kryo.setRegistrationRequired(false)
 Key: FLINK-25993
 URL: https://issues.apache.org/jira/browse/FLINK-25993
 Project: Flink
  Issue Type: New Feature
Affects Versions: 1.14.3
Reporter: Shane Bishop


I would like to request a mechanism that a Flink library user could use to 
optionally disable Kryo.setRegistrationRequired(false).

The motivation is that Kyro.setRegistrationRequired(true) was made the safe 
default in [this 
commit|https://github.com/EsotericSoftware/kryo/commit/fc7f0cc7037ff1384b4cdac5b7ada287c64f0a00]
 (specifically the change was [this 
line|https://github.com/EsotericSoftware/kryo/commit/fc7f0cc7037ff1384b4cdac5b7ada287c64f0a00#diff-6d4638ca49aa0d0d9171ff04a0faa22e241f8320fda4a8a12c95853188d055a0R130]
 in the commit). This default is overriden in the 1.14.3 Flink release (see 
[KryoSerializer.java|https://github.com/apache/flink/blob/release-1.14.3/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java#L492]
 and 
[FlinkScalaKryoInstantiator.scala|https://github.com/apache/flink/blob/release-1.14.3/flink-scala/src/main/scala/org/apache/flink/runtime/types/FlinkScalaKryoInstantiator.scala#L46]).

Reference to thread in mailing list: 
https://lists.apache.org/thread/0bzro3qmmsqrz5kh7lmx3xwfyj8wl5rk



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] Checkpointing (partially) failing jobs

2022-02-07 Thread Thomas Weise
Hi,

Thanks for opening this discussion! The proposed enhancement would be
interesting for use cases in our infrastructure as well.

There are scenarios where it makes sense to have multiple disconnected
subgraphs in a single job because it can significantly reduce the
operational burden as well as the runtime overhead. Since we allow
subgraphs to recover independently, then why not allow them to make
progress independently also, which would imply that checkpointing must
succeed for non affected subgraphs as certain behavior is tied to
checkpoint completion, like Kafka offset commit, file output etc.

As for source offset redistribution, offset/position needs to be tied
to splits (in FLIP-27) and legacy sources. (It applies to both Kafka
and Kinesis legacy sources and FLIP-27 Kafka source.). With the new
source framework, it would be hard to implement a source with correct
behavior that does not track the position along with the split.

In Yuan's example, is there a reason why CP8 could not be promoted to
CP10 by the coordinator for PR2 once it receives the notification that
CP10 did not complete? It appears that should be possible since in its
effect it should be no different than no data processed between CP8
and CP10?

Thanks,
Thomas

On Mon, Feb 7, 2022 at 2:36 AM Till Rohrmann  wrote:
>
> Thanks for the clarification Yuan and Gen,
>
> I agree that the checkpointing of the sources needs to support the
> rescaling case, otherwise it does not work. Is there currently a source
> implementation where this wouldn't work? For Kafka it should work because
> we store the offset per assigned partition. For Kinesis it is probably the
> same. For the Filesource we store the set of unread input splits in the
> source coordinator and the state of the assigned splits in the sources.
> This should probably also work since new splits are only handed out to
> running tasks.
>
> Cheers,
> Till
>
> On Mon, Feb 7, 2022 at 10:29 AM Yuan Mei  wrote:
>
> > Hey Till,
> >
> > > Why rescaling is a problem for pipelined regions/independent execution
> > subgraphs:
> >
> > Take a simplified example :
> > job graph : source  (2 instances) -> sink (2 instances)
> > execution graph:
> > source (1/2)  -> sink (1/2)   [pieplined region 1]
> > source (2/2)  -> sink (2/2)   [pieplined region 2]
> >
> > Let's assume checkpoints are still triggered globally, meaning different
> > pipelined regions share the global checkpoint id (PR1 CP1 matches with PR2
> > CP1).
> >
> > Now let's assume PR1 completes CP10 and PR2 completes CP8.
> >
> > Let's say we want to rescale to parallelism 3 due to increased input.
> >
> > - Notice that we can not simply rescale based on the latest completed
> > checkpoint (CP8), because PR1 has already had data (CP8 -> CP10) output
> > externally.
> > - Can we take CP10 from PR1 and CP8 from PR2? I think it depends on how the
> > source's offset redistribution is implemented.
> >The answer is yes if we treat each input partition as independent from
> > each other, *but I am not sure whether we can make that assumption*.
> >
> > If not, the rescaling cannot happen until PR1 and PR2 are aligned with CPs.
> >
> > Best
> > -Yuan
> >
> >
> >
> >
> >
> >
> >
> > On Mon, Feb 7, 2022 at 4:17 PM Till Rohrmann  wrote:
> >
> > > Hi everyone,
> > >
> > > Yuan and Gen could you elaborate why rescaling is a problem if we say
> > that
> > > separate pipelined regions can take checkpoints independently?
> > > Conceptually, I somehow think that a pipelined region that is failed and
> > > cannot create a new checkpoint is more or less the same as a pipelined
> > > region that didn't get new input or a very very slow pipelined region
> > which
> > > couldn't read new records since the last checkpoint (assuming that the
> > > checkpoint coordinator can create a global checkpoint by combining
> > > individual checkpoints (e.g. taking the last completed checkpoint from
> > each
> > > pipelined region)). If this comparison is correct, then this would mean
> > > that we have rescaling problems under the latter two cases.
> > >
> > > Cheers,
> > > Till
> > >
> > > On Mon, Feb 7, 2022 at 8:55 AM Gen Luo  wrote:
> > >
> > > > Hi Gyula,
> > > >
> > > > Thanks for sharing the idea. As Yuan mentioned, I think we can discuss
> > > this
> > > > within two scopes. One is the job subgraph, the other is the execution
> > > > subgraph, which I suppose is the same as PipelineRegion.
> > > >
> > > > An idea is to individually checkpoint the PipelineRegions, for the
> > > > recovering in a single run.
> > > >
> > > > Flink has now supported PipelineRegion based failover, with a subset
> > of a
> > > > global checkpoint snapshot. The checkpoint barriers are spread within a
> > > > PipelineRegion, so the checkpointing of individual PipelineRegions is
> > > > actually independent. Since in a single run of a job, the
> > PipelineRegions
> > > > are fixed, we can individually checkpoint separated PipelineRegions,
> > > > despite what status the other Pi

[jira] [Created] (FLINK-25992) JobDispatcherITCase..testRecoverFromCheckpointAfterLosingAndRegainingLeadership fails on azure

2022-02-07 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-25992:
-

 Summary: 
JobDispatcherITCase..testRecoverFromCheckpointAfterLosingAndRegainingLeadership 
fails on azure
 Key: FLINK-25992
 URL: https://issues.apache.org/jira/browse/FLINK-25992
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.15.0
Reporter: Roman Khachatryan
 Fix For: 1.15.0


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=30871&view=logs&j=0da23115-68bb-5dcd-192c-bd4c8adebde1&t=24c3384f-1bcb-57b3-224f-51bf973bbee8&l=9154

{code}
19:41:35,515 [flink-akka.actor.default-dispatcher-9] WARN  
org.apache.flink.runtime.taskmanager.Task[] - jobVertex 
(1/1)#0 (7efdea21f5f95490e02117063ce8a314) switched from RUNNING to FAILED with 
failure cause: java.lang.RuntimeException: Error while notify checkpoint ABORT.
at 
org.apache.flink.runtime.taskmanager.Task.notifyCheckpoint(Task.java:1457)
at 
org.apache.flink.runtime.taskmanager.Task.notifyCheckpointAborted(Task.java:1407)
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.abortCheckpoint(TaskExecutor.java:1021)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316)
at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
at akka.actor.ActorCell.invoke(ActorCell.scala:548)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
Caused by: java.lang.UnsupportedOperationException: notifyCheckpointAbortAsync 
not supported by 
org.apache.flink.runtime.dispatcher.JobDispatcherITCase$AtLeastOneCheckpointInvokable
at 
org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable.notifyCheckpointAbortAsync(AbstractInvokable.java:205)
at 
org.apache.flink.runtime.taskmanager.Task.notifyCheckpoint(Task.java:1430)
... 31 more

{code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25991) Allow custom metadata to be committed alongside the offsets

2022-02-07 Thread Frederic Hemery (Jira)
Frederic Hemery created FLINK-25991:
---

 Summary: Allow custom metadata to be committed alongside the 
offsets
 Key: FLINK-25991
 URL: https://issues.apache.org/jira/browse/FLINK-25991
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Reporter: Frederic Hemery


A Kafka Consumer can commit metadata alongside its offsets. This can be used 
for different purposes but, in our case, we use the metadata to provide 
consistent lag (in seconds, not offsets) reporting across our stack.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25990) Expose uid generator for DataStream/Transformation providers

2022-02-07 Thread Timo Walther (Jira)
Timo Walther created FLINK-25990:


 Summary: Expose uid generator for DataStream/Transformation 
providers
 Key: FLINK-25990
 URL: https://issues.apache.org/jira/browse/FLINK-25990
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: Timo Walther


The following providers can create more than one transformation and are thus 
kind of a black block for the planner:

- TransformationScanProvider
- DataStreamScanProvider
- DataStreamSinkProvider
- TransformationSinkProvider

We should add a context to `TransformationScanProvider#createTransformation` 
and `DataStreamScanProvider#produceDataStream`.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25989) EventTimeWindowCheckpointingITCase failed with exit code 137

2022-02-07 Thread Yun Gao (Jira)
Yun Gao created FLINK-25989:
---

 Summary: EventTimeWindowCheckpointingITCase failed with exit code 
137
 Key: FLINK-25989
 URL: https://issues.apache.org/jira/browse/FLINK-25989
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.13.5
Reporter: Yun Gao



{code:java}
Feb 07 10:35:00 Starting 
org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase#testSlidingTimeWindow[statebackend
 type =ROCKSDB_FULLY_ASYNC].
##[error]Exit code 137 returned from process: file name '/bin/docker', 
arguments 'exec -i -u 1006  -w /home/agent07_azpcontainer 
cc88843b38dcb3e53e10065d2a740015cf3fd387e5ef47621c224a9b348fbb37 
/__a/externals/node/bin/node /__w/_temp/containerHandlerInvoker.js'.
Finishing: Test - tests
{code}

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=30833&view=logs&j=5c8e7682-d68f-54d1-16a2-a09310218a49&t=f508e270-48d6-5f1e-3138-42a17e0714f0&l=4495



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25988) UnalignedCheckpointRescaleITCase failed with exit code 137

2022-02-07 Thread Yun Gao (Jira)
Yun Gao created FLINK-25988:
---

 Summary: UnalignedCheckpointRescaleITCase failed with exit code 137
 Key: FLINK-25988
 URL: https://issues.apache.org/jira/browse/FLINK-25988
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.15.0
Reporter: Yun Gao



{code:java}
Feb 07 10:35:02 Starting 
org.apache.flink.test.checkpointing.UnalignedCheckpointRescaleITCase#shouldRescaleUnalignedCheckpoint[no
 scale multi_input from 7 to 7, buffersPerChannel = 0].
Feb 07 10:35:02 Starting 
org.apache.flink.test.checkpointing.UnalignedCheckpointRescaleITCase#shouldRescaleUnalignedCheckpoint[no
 scale multi_input from 7 to 7, buffersPerChannel = 0].
Feb 07 10:35:02 Finished 
org.apache.flink.test.checkpointing.UnalignedCheckpointRescaleITCase#shouldRescaleUnalignedCheckpoint[no
 scale multi_input from 7 to 7, buffersPerChannel = 0].
##[error]Exit code 137 returned from process: file name '/bin/docker', 
arguments 'exec -i -u 1003  -w /home/agent04_azpcontainer 
54763e695d61cf1a80b5d4d6c397eec13218a705cddf2d5d659cec21533e1f8a 
/__a/externals/node/bin/node /__w/_temp/containerHandlerInvoker.js'.
Finishing: Test - finegrained_resource_management
{code}

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=30829&view=logs&j=a57e0635-3fad-5b08-57c7-a4142d7d6fa9&t=2ef0effc-1da1-50e5-c2bd-aab434b1c5b7&l=12459




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25987) IllegalArgumentException thrown from FsStateChangelogWriter.truncate

2022-02-07 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-25987:
-

 Summary: IllegalArgumentException thrown from 
FsStateChangelogWriter.truncate
 Key: FLINK-25987
 URL: https://issues.apache.org/jira/browse/FLINK-25987
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Affects Versions: 1.15.0
Reporter: Roman Khachatryan
Assignee: Roman Khachatryan
 Fix For: 1.15.0


{code}
java.lang.IllegalArgumentException
at 
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:122)
at 
org.apache.flink.changelog.fs.FsStateChangelogWriter.truncate(FsStateChangelogWriter.java:278)
at 
org.apache.flink.state.changelog.ChangelogKeyedStateBackend.updateChangelogSnapshotState(ChangelogKeyedStateBackend.java:702)
at 
org.apache.flink.state.changelog.PeriodicMaterializationManager.lambda$null$2(PeriodicMaterializationManager.java:163)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.lang.Thread.run(Thread.java:750){code}




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25986) Add FLIP-190 new API methods to python

2022-02-07 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-25986:
---

 Summary: Add FLIP-190 new API methods to python
 Key: FLINK-25986
 URL: https://issues.apache.org/jira/browse/FLINK-25986
 Project: Flink
  Issue Type: Bug
  Components: API / Python, Table SQL / API
Reporter: Francesco Guardiani






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25985) e2e test covering the main functionality of the JobResultStore

2022-02-07 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-25985:
-

 Summary: e2e test covering the main functionality of the 
JobResultStore
 Key: FLINK-25985
 URL: https://issues.apache.org/jira/browse/FLINK-25985
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Affects Versions: 1.15.0
Reporter: Matthias Pohl






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [VOTE] Release 1.13.6, release candidate #1

2022-02-07 Thread Chesnay Schepler

+1 (binding)

- signatures OK
- checksums OK
- tag OK
- all artifacts accounted for
- PR looks good

On 05/02/2022 21:06, Konstantin Knauf wrote:

Hi everyone,

Please review and vote on the release candidate #1 for the version 1.13.6,
as follows:
[ ] +1, Approve the release
[ ] -1, Do not approve the release (please provide specific comments)

  The complete staging area is available for your review, which includes:
* JIRA release notes [1],
* the official Apache source release and binary convenience releases to be
deployed to dist.apache.org [2], which are signed with the key with
fingerprint 8C3FB007FE60 DEFA [3],
* all artifacts to be deployed to the Maven Central Repository [4],
* source code tag "release-1.2.3-rc3" [5],
* website pull request listing the new release and adding announcement blog
post [6].

The vote will be open for at least 72 hours. It is adopted by majority
approval, with at least 3 PMC affirmative votes.

Thanks,
Konstantin

[1]
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12351074
[2] https://dist.apache.org/repos/dist/dev/flink/flink-1.13.6-rc1/
[3] https://dist.apache.org/repos/dist/release/flink/KEYS
[4] https://repository.apache.org/content/repositories/orgapacheflink-1486/
[5] https://github.com/apache/flink/tree/release-1.13.6-rc1
[6] https://github.com/apache/flink-web/pull/505





[jira] [Created] (FLINK-25984) Add types to ConfigOptions and replace usages of the untyped methods defaultValue and noDefaultValue with the corresponding typed ones.

2022-02-07 Thread Marios Trivyzas (Jira)
Marios Trivyzas created FLINK-25984:
---

 Summary: Add types to ConfigOptions and replace usages of the 
untyped methods defaultValue and noDefaultValue with the corresponding typed 
ones.
 Key: FLINK-25984
 URL: https://issues.apache.org/jira/browse/FLINK-25984
 Project: Flink
  Issue Type: Technical Debt
Reporter: Marios Trivyzas
Assignee: Marios Trivyzas






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25983) Add WatermarkStrategy#withWatermarkAlignment

2022-02-07 Thread Dawid Wysakowicz (Jira)
Dawid Wysakowicz created FLINK-25983:


 Summary: Add WatermarkStrategy#withWatermarkAlignment
 Key: FLINK-25983
 URL: https://issues.apache.org/jira/browse/FLINK-25983
 Project: Flink
  Issue Type: Sub-task
  Components: API / DataStream
Reporter: Dawid Wysakowicz
Assignee: Dawid Wysakowicz
 Fix For: 1.15.0






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25982) Support idleness with watermark alignment

2022-02-07 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-25982:
--

 Summary: Support idleness with watermark alignment
 Key: FLINK-25982
 URL: https://issues.apache.org/jira/browse/FLINK-25982
 Project: Flink
  Issue Type: Sub-task
Reporter: Piotr Nowojski






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25981) ZooKeeperMultipleComponentLeaderElectionDriverTest failed

2022-02-07 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-25981:
-

 Summary: ZooKeeperMultipleComponentLeaderElectionDriverTest failed
 Key: FLINK-25981
 URL: https://issues.apache.org/jira/browse/FLINK-25981
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.15.0
Reporter: Matthias Pohl


We experienced a [build 
failure|https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=30783&view=logs&j=a57e0635-3fad-5b08-57c7-a4142d7d6fa9&t=2ef0effc-1da1-50e5-c2bd-aab434b1c5b7&l=15997]
 in 
{{ZooKeeperMultipleComponentLeaderElectionDriverTest.testLeaderElectionWithMultipleDrivers}}.
 The test halted when waiting for the next leader in 
[ZooKeeperMultipleComponentLeaderElectionDriverTest:256|https://github.com/apache/flink/blob/e8742f7f5cac34852d0e621036e1614bbdfe8ec3/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperMultipleComponentLeaderElectionDriverTest.java#L256]
{code}
Feb 04 18:02:54 "main" #1 prio=5 os_prio=0 tid=0x7fab0800b800 nid=0xe07 
waiting on condition [0x7fab12574000]
Feb 04 18:02:54java.lang.Thread.State: WAITING (parking)
Feb 04 18:02:54 at sun.misc.Unsafe.park(Native Method)
Feb 04 18:02:54 - parking to wait for  <0x8065c5c8> (a 
java.util.concurrent.CompletableFuture$Signaller)
Feb 04 18:02:54 at 
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
Feb 04 18:02:54 at 
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
Feb 04 18:02:54 at 
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
Feb 04 18:02:54 at 
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
Feb 04 18:02:54 at 
java.util.concurrent.CompletableFuture.join(CompletableFuture.java:1947)
Feb 04 18:02:54 at 
org.apache.flink.runtime.leaderelection.ZooKeeperMultipleComponentLeaderElectionDriverTest.testLeaderElectionWithMultipleDrivers(ZooKeeperMultipleComponentLeaderElectionDriverTest.java:256)
[...]
{code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25980) remove unnecessary condition in IntervalJoinOperator

2022-02-07 Thread hongshu (Jira)
hongshu created FLINK-25980:
---

 Summary: remove unnecessary condition in IntervalJoinOperator
 Key: FLINK-25980
 URL: https://issues.apache.org/jira/browse/FLINK-25980
 Project: Flink
  Issue Type: Improvement
  Components: API / DataStream
Affects Versions: 1.14.3, 1.14.2, 1.13.5, 1.12.7, 1.11.6
Reporter: hongshu
 Fix For: 1.15.0


Condition 'currentWatermark != Long.MIN_VALUE' covered by subsequent condition 
'timestamp < currentWatermark' 

org.apache.flink.streaming.api.operators.co.IntervalJoinOperator#isLate

 
{code:java}
private boolean isLate(long timestamp) {
long currentWatermark = internalTimerService.currentWatermark();
return currentWatermark != Long.MIN_VALUE && timestamp < currentWatermark;
} {code}
if currentWatermark == Long.MIN_VALUE, timestamp < currentWatermark is also 
return false, so condition currentWatermark != Long.MIN_VALUE is unnecessary

We can use the following code directly
{code:java}
private boolean isLate(long timestamp) {
long currentWatermark = internalTimerService.currentWatermark();
return timestamp < currentWatermark;
} {code}
 

 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


RE: [DISCUSS] FLIP-212: Introduce Flink Kubernetes Operator

2022-02-07 Thread Alexis Sarda-Espinosa
Danny Cranmer mentioned they are interested in standalone mode, and I am too, 
so I just wanted to say that if that development starts in parallel, I might be 
able to contribute a little.

Regarding the CRD, I agree it would be nice to avoid as many "duplications" as 
possible if pod templates are to be used. In my PoC I even tried to make use of 
existing configuration options like kubernetes.container.image & pipeline.jars 
[1]. For CPU/Memory resources, the discussion in [2] might be relevant.

[1] 
https://github.com/MicroFocus/opsb-flink-k8s-operator/blob/main/kubernetes/sample_batch_job.yaml
[2] https://issues.apache.org/jira/browse/FLINK-24150

Regards,
Alexis.

-Original Message-
From: K Fred  
Sent: Montag, 7. Februar 2022 09:36
To: dev@flink.apache.org
Subject: Re: [DISCUSS] FLIP-212: Introduce Flink Kubernetes Operator

Hi Gyula!

You are right. I think some common flink config options can be put in the CR, 
other expert settings continue to be overwritten by flink, and then the user 
can choose to customize the configuration.

Best Wishes,
Peng Yuan

On Mon, Feb 7, 2022 at 4:16 PM Gyula Fóra  wrote:

> Hi Yangze!
>
> This is not set in stone at the moment but the way I think it should 
> work is that first class config options in the CR should always take 
> precedence over the Flink config.
>
> In general we should not introduce too many arbitrary config options 
> that duplicate the flink configs without good reasons but the ones we 
> introduce should overwrite flink configs.
>
> We should discuss and decide together what config options to keep in 
> the flink conf and what to bring on the CR level. Resource related 
> ones are difficult because on one hand they are integral to every 
> application, on the other hand there are many expert settings that we 
> should probably leave in the conf.
>
> Cheers,
> Gyula
>
>
>
> On Mon, Feb 7, 2022 at 8:28 AM Yangze Guo  wrote:
>
> > Thanks everyone for the great effort. The FLIP looks really good.
> >
> > I just want to make sure the configuration priority in the CR example.
> > It seems the requests resources or "taskManager. taskSlots" will be 
> > transferred to Flink internal config, e.g.
> > "taskmanager.memory.process.size" and 
> > "taskmanager.numberOfTaskSlots", and override the one in 
> > "flinkConfiguration". Am I understanding this correctly?
> >
> > Best,
> > Yangze Guo
> >
> > On Mon, Feb 7, 2022 at 10:22 AM Xintong Song 
> > wrote:
> > >
> > > Sorry for the late reply. We were out due to the public holidays 
> > > in
> > China.
> > >
> > > @Thomas,
> > >
> > > The intention is to support application management through 
> > > operator and
> > CR,
> > > > which means there won't be any 2 step submission process, which 
> > > > as
> you
> > > > allude to would defeat the purpose of this project. The CR 
> > > > example
> > shows
> > > > the application part. Please note that the bare cluster support 
> > > > is an
> > > > *additional* feature for scenarios that require external job
> > management. Is
> > > > there anything on the FLIP page that creates a different impression?
> > > >
> > >
> > > Sounds good to me. I don't remember what created the impression of 
> > > 2
> step
> > > submission back then. I revisited the latest version of this FLIP 
> > > and
> it
> > > looks good to me.
> > >
> > > @Gyula,
> > >
> > > Versioning:
> > > > Versioning will be independent from Flink and the operator will
> depend
> > on a
> > > > fixed flink version (in every given operator version).
> > > > This should be the exact same setup as with Stateful Functions ( 
> > > > https://github.com/apache/flink-statefun). So independent 
> > > > release
> > cycle
> > > > but
> > > > still within the Flink umbrella.
> > > >
> > >
> > > Does this mean if someone wants to upgrade Flink to a version that 
> > > is released after the operator version that is being used, he/she 
> > > would
> need
> > > to upgrade the operator version first?
> > > I'm not questioning this, just trying to make sure I'm 
> > > understanding
> this
> > > correctly.
> > >
> > > Thank you~
> > >
> > > Xintong Song
> > >
> > >
> > >
> > > On Mon, Feb 7, 2022 at 3:14 AM Gyula Fóra 
> wrote:
> > >
> > > > Thank you Alexis,
> > > >
> > > > Will definitely check this out. You are right, Kotlin makes it
> > difficult to
> > > > adopt pieces of this code directly but I think it will be good 
> > > > to get inspiration for the architecture and look at how 
> > > > particular problems
> > have
> > > > been solved. It will be a great help for us I am sure.
> > > >
> > > > Cheers,
> > > > Gyula
> > > >
> > > > On Sat, Feb 5, 2022 at 12:28 PM Alexis Sarda-Espinosa < 
> > > > alexis.sarda-espin...@microfocus.com> wrote:
> > > >
> > > > > Hi everyone,
> > > > >
> > > > > just wanted to mention that my employer agreed to open source 
> > > > > the
> > PoC I
> > > > > developed: 
> > > > > https://github.com/MicroFocus/opsb-flink-k8s-operator
> > > > >
> > > > > I understand the concern for

[jira] [Created] (FLINK-25979) Suspicious Classloading error during close of KafkaEnumerator

2022-02-07 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-25979:


 Summary: Suspicious Classloading error during close of 
KafkaEnumerator
 Key: FLINK-25979
 URL: https://issues.apache.org/jira/browse/FLINK-25979
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Common, Connectors / Kafka
Affects Versions: 1.14.2
Reporter: Chesnay Schepler


A user reported kafka logging a warning when the KafkaEnumerator was being 
closed.

{code}
2022-02-04 15:16:30,801 WARN  org.apache.kafka.common.utils.Utils   
   [] - Failed to close KafkaClient with type 
org.apache.kafka.clients.NetworkClient
java.lang.NoClassDefFoundError: 
org/apache/kafka/common/network/Selector$CloseMode
at org.apache.kafka.common.network.Selector.close(Selector.java:806) 
~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
at org.apache.kafka.common.network.Selector.close(Selector.java:365) 
~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
at org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:639) 
~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
at org.apache.kafka.common.utils.Utils.closeQuietly(Utils.java:834) 
[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
at 
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1219)
 
[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: java.lang.ClassNotFoundException: 
org.apache.kafka.common.network.Selector$CloseMode
at java.net.URLClassLoader.findClass(URLClassLoader.java:476) ~[?:?]
at java.lang.ClassLoader.loadClass(ClassLoader.java:589) ~[?:?]
at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at 
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at java.lang.ClassLoader.loadClass(ClassLoader.java:522) ~[?:?]
... 6 more
2022-02-04 15:16:30,802 INFO  
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source 
coordinator for source Source: Kafka Source -> Sink: Print to Std. Out closed.
{code}

{code}
KafkaSource source = KafkaSource
.builder()
.setBootstrapServers(brokers)
.setGroupId(groupId)
.setTopics(kafkaInputTopic)
.setValueOnlyDeserializer(new SimpleStringSchema())
//.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeSerializer.class))
.setStartingOffsets(OffsetsInitializer.earliest())
.setBounded(OffsetsInitializer.latest())
.build();

//withIdleness.duration()
//env.fromSource(source, WatermarkStrategy.forMonotonousTimestamps(), "Kafka 
Source");
DataStream ds = env.fromSource(source, 
WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source");
{code}

This error is overall a bit weird; I don't think I've ever seen a class being 
unable to load one of it's inner classes. intuitively I would think this is 
caused by the classloader being closed prematurely.
ds.print();
{code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25978) DispatcherTest.testJobDataAreCleanedUpInCorrectOrderOn*Job can be removed

2022-02-07 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-25978:
-

 Summary: 
DispatcherTest.testJobDataAreCleanedUpInCorrectOrderOn*Job can be removed
 Key: FLINK-25978
 URL: https://issues.apache.org/jira/browse/FLINK-25978
 Project: Flink
  Issue Type: Sub-task
Affects Versions: 1.15.0
Reporter: Matthias Pohl


These test cases are covered in {{DispatcherResourceCleanupTest}} and 
{{DispatcherResourceCleanerFactoryTest}} verifying the correct order of events.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25977) Close sink client and sink http client for KDS/KDF Sinks

2022-02-07 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-25977:
--

 Summary: Close sink client and sink http client for KDS/KDF Sinks
 Key: FLINK-25977
 URL: https://issues.apache.org/jira/browse/FLINK-25977
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kinesis
Reporter: Zichen Liu
Assignee: Ahmed Hamdy
 Fix For: 1.15.0


h2. Update:

DEFAULT_MAX_IN_FLIGHT_REQUESTS=50

to match with the default threads in the AWS SDK v2 HTTP Client default.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25976) Update the KDS and KDF Sink's defaults & update the docs

2022-02-07 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-25976:
--

 Summary: Update the KDS and KDF Sink's defaults & update the docs
 Key: FLINK-25976
 URL: https://issues.apache.org/jira/browse/FLINK-25976
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kinesis
Reporter: Zichen Liu
Assignee: Ahmed Hamdy
 Fix For: 1.15.0


h2. Bug:

Async Sink Base is too being flushed too frequently resulting in backpressure 
even when buffer is near empty

*Cause:*

During a write(), flushIfAble() is called, which checks if the number of 
buffered elements is greater than a batch size, and if so, insists that the 
sink flushes immediately, even if the number of inFlightRequests is greater 
than the maximum allowed number of inFlightRequests, resulting in a yield of 
the current mailbox thread, and hence blocks.

Notice that this can occur even if the buffer is near empty, so the blocking 
behaviour is unnecessary and undesirable, since we would like the element to be 
written to the buffer and no blocking to occur.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25975) add doc for how to use AvroParquetRecordFormat

2022-02-07 Thread Jing Ge (Jira)
Jing Ge created FLINK-25975:
---

 Summary: add doc for how to use AvroParquetRecordFormat
 Key: FLINK-25975
 URL: https://issues.apache.org/jira/browse/FLINK-25975
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Reporter: Jing Ge


https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/formats/parquet/



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25974) Make cancellation of jobs depend on the JobResultStore

2022-02-07 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-25974:
-

 Summary: Make cancellation of jobs depend on the JobResultStore
 Key: FLINK-25974
 URL: https://issues.apache.org/jira/browse/FLINK-25974
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Affects Versions: 1.15.0
Reporter: Matthias Pohl


{{JobManagerRunner}} instances were cancellable as long as the instance was 
still registered in the {{Dispatcher.jobManagerRunnerRegistry}}. With the 
cleanup being done concurrently (i.e. not relying on the 
{{JobManagerRunnerRegistry}} to be cleaned up anymore), the cancellation of a 
job should only be possible as long as the job is not globally finished and 
before cleanup is triggered.

We should verify whether a job is listed in the {{JobResultStore}} and only 
enable the user to cancel the job if that's not the case.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] Checkpointing (partially) failing jobs

2022-02-07 Thread Till Rohrmann
Thanks for the clarification Yuan and Gen,

I agree that the checkpointing of the sources needs to support the
rescaling case, otherwise it does not work. Is there currently a source
implementation where this wouldn't work? For Kafka it should work because
we store the offset per assigned partition. For Kinesis it is probably the
same. For the Filesource we store the set of unread input splits in the
source coordinator and the state of the assigned splits in the sources.
This should probably also work since new splits are only handed out to
running tasks.

Cheers,
Till

On Mon, Feb 7, 2022 at 10:29 AM Yuan Mei  wrote:

> Hey Till,
>
> > Why rescaling is a problem for pipelined regions/independent execution
> subgraphs:
>
> Take a simplified example :
> job graph : source  (2 instances) -> sink (2 instances)
> execution graph:
> source (1/2)  -> sink (1/2)   [pieplined region 1]
> source (2/2)  -> sink (2/2)   [pieplined region 2]
>
> Let's assume checkpoints are still triggered globally, meaning different
> pipelined regions share the global checkpoint id (PR1 CP1 matches with PR2
> CP1).
>
> Now let's assume PR1 completes CP10 and PR2 completes CP8.
>
> Let's say we want to rescale to parallelism 3 due to increased input.
>
> - Notice that we can not simply rescale based on the latest completed
> checkpoint (CP8), because PR1 has already had data (CP8 -> CP10) output
> externally.
> - Can we take CP10 from PR1 and CP8 from PR2? I think it depends on how the
> source's offset redistribution is implemented.
>The answer is yes if we treat each input partition as independent from
> each other, *but I am not sure whether we can make that assumption*.
>
> If not, the rescaling cannot happen until PR1 and PR2 are aligned with CPs.
>
> Best
> -Yuan
>
>
>
>
>
>
>
> On Mon, Feb 7, 2022 at 4:17 PM Till Rohrmann  wrote:
>
> > Hi everyone,
> >
> > Yuan and Gen could you elaborate why rescaling is a problem if we say
> that
> > separate pipelined regions can take checkpoints independently?
> > Conceptually, I somehow think that a pipelined region that is failed and
> > cannot create a new checkpoint is more or less the same as a pipelined
> > region that didn't get new input or a very very slow pipelined region
> which
> > couldn't read new records since the last checkpoint (assuming that the
> > checkpoint coordinator can create a global checkpoint by combining
> > individual checkpoints (e.g. taking the last completed checkpoint from
> each
> > pipelined region)). If this comparison is correct, then this would mean
> > that we have rescaling problems under the latter two cases.
> >
> > Cheers,
> > Till
> >
> > On Mon, Feb 7, 2022 at 8:55 AM Gen Luo  wrote:
> >
> > > Hi Gyula,
> > >
> > > Thanks for sharing the idea. As Yuan mentioned, I think we can discuss
> > this
> > > within two scopes. One is the job subgraph, the other is the execution
> > > subgraph, which I suppose is the same as PipelineRegion.
> > >
> > > An idea is to individually checkpoint the PipelineRegions, for the
> > > recovering in a single run.
> > >
> > > Flink has now supported PipelineRegion based failover, with a subset
> of a
> > > global checkpoint snapshot. The checkpoint barriers are spread within a
> > > PipelineRegion, so the checkpointing of individual PipelineRegions is
> > > actually independent. Since in a single run of a job, the
> PipelineRegions
> > > are fixed, we can individually checkpoint separated PipelineRegions,
> > > despite what status the other PipelineRegions are, and use a snapshot
> of
> > a
> > > failing region to recover, instead of the subset of a global snapshot.
> > This
> > > can support separated job subgraphs as well, since they will also be
> > > separated into different PipelineRegions. I think this can fulfill your
> > > needs.
> > >
> > > In fact the individual snapshots of all PipelineRegions can form a
> global
> > > snapshot, and the alignment of snapshots of individual regions is not
> > > necessary. But rescaling this global snapshot can be potentially
> > complex. I
> > > think it's better to use the individual snapshots in a single run, and
> > take
> > > a global checkpoint/savepoint before restarting the job, rescaling it
> or
> > > not.
> > >
> > > A major issue of this plan is that it breaks the checkpoint mechanism
> of
> > > Flink. As far as I know, even in the approximate recovery, the snapshot
> > > used to recover a single task is still a part of a global snapshot. To
> > > implement the individual checkpointing of PipelineRegions, there may
> need
> > > to be a checkpoint coordinator for each PipelineRegion, and a new
> global
> > > checkpoint coordinator. When the scale goes up, there can be many
> > > individual regions, which can be a big burden to the job manager. The
> > > meaning of the checkpoint id will also be changed, which can affect
> many
> > > aspects. There can be lots of work and risks, and the risks still exist
> > if
> > > we only individually checkpoint separated job

[jira] [Created] (FLINK-25973) Rename ArchivedExecutionGraph.createFromInitializingJob into more generic createSparseArchivedExecutionGraph

2022-02-07 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-25973:
-

 Summary: Rename ArchivedExecutionGraph.createFromInitializingJob 
into more generic createSparseArchivedExecutionGraph
 Key: FLINK-25973
 URL: https://issues.apache.org/jira/browse/FLINK-25973
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Affects Versions: 1.15.0
Reporter: Matthias Pohl


The use cases for this method changed. We should rename it into something that 
fits both usecases.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] Checkpointing (partially) failing jobs

2022-02-07 Thread Yuan Mei
Hey Till,

> Why rescaling is a problem for pipelined regions/independent execution
subgraphs:

Take a simplified example :
job graph : source  (2 instances) -> sink (2 instances)
execution graph:
source (1/2)  -> sink (1/2)   [pieplined region 1]
source (2/2)  -> sink (2/2)   [pieplined region 2]

Let's assume checkpoints are still triggered globally, meaning different
pipelined regions share the global checkpoint id (PR1 CP1 matches with PR2
CP1).

Now let's assume PR1 completes CP10 and PR2 completes CP8.

Let's say we want to rescale to parallelism 3 due to increased input.

- Notice that we can not simply rescale based on the latest completed
checkpoint (CP8), because PR1 has already had data (CP8 -> CP10) output
externally.
- Can we take CP10 from PR1 and CP8 from PR2? I think it depends on how the
source's offset redistribution is implemented.
   The answer is yes if we treat each input partition as independent from
each other, *but I am not sure whether we can make that assumption*.

If not, the rescaling cannot happen until PR1 and PR2 are aligned with CPs.

Best
-Yuan







On Mon, Feb 7, 2022 at 4:17 PM Till Rohrmann  wrote:

> Hi everyone,
>
> Yuan and Gen could you elaborate why rescaling is a problem if we say that
> separate pipelined regions can take checkpoints independently?
> Conceptually, I somehow think that a pipelined region that is failed and
> cannot create a new checkpoint is more or less the same as a pipelined
> region that didn't get new input or a very very slow pipelined region which
> couldn't read new records since the last checkpoint (assuming that the
> checkpoint coordinator can create a global checkpoint by combining
> individual checkpoints (e.g. taking the last completed checkpoint from each
> pipelined region)). If this comparison is correct, then this would mean
> that we have rescaling problems under the latter two cases.
>
> Cheers,
> Till
>
> On Mon, Feb 7, 2022 at 8:55 AM Gen Luo  wrote:
>
> > Hi Gyula,
> >
> > Thanks for sharing the idea. As Yuan mentioned, I think we can discuss
> this
> > within two scopes. One is the job subgraph, the other is the execution
> > subgraph, which I suppose is the same as PipelineRegion.
> >
> > An idea is to individually checkpoint the PipelineRegions, for the
> > recovering in a single run.
> >
> > Flink has now supported PipelineRegion based failover, with a subset of a
> > global checkpoint snapshot. The checkpoint barriers are spread within a
> > PipelineRegion, so the checkpointing of individual PipelineRegions is
> > actually independent. Since in a single run of a job, the PipelineRegions
> > are fixed, we can individually checkpoint separated PipelineRegions,
> > despite what status the other PipelineRegions are, and use a snapshot of
> a
> > failing region to recover, instead of the subset of a global snapshot.
> This
> > can support separated job subgraphs as well, since they will also be
> > separated into different PipelineRegions. I think this can fulfill your
> > needs.
> >
> > In fact the individual snapshots of all PipelineRegions can form a global
> > snapshot, and the alignment of snapshots of individual regions is not
> > necessary. But rescaling this global snapshot can be potentially
> complex. I
> > think it's better to use the individual snapshots in a single run, and
> take
> > a global checkpoint/savepoint before restarting the job, rescaling it or
> > not.
> >
> > A major issue of this plan is that it breaks the checkpoint mechanism of
> > Flink. As far as I know, even in the approximate recovery, the snapshot
> > used to recover a single task is still a part of a global snapshot. To
> > implement the individual checkpointing of PipelineRegions, there may need
> > to be a checkpoint coordinator for each PipelineRegion, and a new global
> > checkpoint coordinator. When the scale goes up, there can be many
> > individual regions, which can be a big burden to the job manager. The
> > meaning of the checkpoint id will also be changed, which can affect many
> > aspects. There can be lots of work and risks, and the risks still exist
> if
> > we only individually checkpoint separated job subgraphs, since the
> > mechanism is still broken. If that is what you need, maybe separating
> them
> > into different jobs is an easier and better choice, as Caizhi and Yuan
> > mentioned.
> >
> > On Mon, Feb 7, 2022 at 11:39 AM Yuan Mei  wrote:
> >
> > > Hey Gyula,
> > >
> > > That's a very interesting idea. The discussion about the `Individual`
> vs
> > > `Global` checkpoint was raised before, but the main concern was from
> two
> > > aspects:
> > >
> > > - Non-deterministic replaying may lead to an inconsistent view of
> > > checkpoint
> > > - It is not easy to form a clear cut of past and future and hence no
> > clear
> > > cut of where the start point of the source should begin to replay from.
> > >
> > > Starting from independent subgraphs as you proposed may be a good
> > starting
> > > point. However, when we talk

Re: [DISCUSS] Checkpointing (partially) failing jobs

2022-02-07 Thread Gen Luo
Hi Till,

I agree that a failing task is much like a very slow or deadlock task to
the checkpointing. The main difference is whether a checkpoint of the
region the task in can be triggered. Triggering a checkpoint on a failing
region makes no sense since the checkpoint should be discarded right away.
But we can still compose a global checkpoint with the new snapshots of
other regions taken when the region is failing, and the former successful
snapshot of this failing region. This global snapshot is still valid and
can be rescaled like a normal one, if the normal ones are possible to
rescale.

As far as I know some snapshotting methods are using or depending on the
ascending checkpoint id. Checkpointing individually probably means to count
the checkpoint id individually. Composing snapshots of different checkpoint
ids may cause errors.

I am also afraid that there might be issues with the shared states, though
I can't figure out a case right now.

On Mon, Feb 7, 2022 at 4:17 PM Till Rohrmann  wrote:

> Hi everyone,
>
> Yuan and Gen could you elaborate why rescaling is a problem if we say that
> separate pipelined regions can take checkpoints independently?
> Conceptually, I somehow think that a pipelined region that is failed and
> cannot create a new checkpoint is more or less the same as a pipelined
> region that didn't get new input or a very very slow pipelined region which
> couldn't read new records since the last checkpoint (assuming that the
> checkpoint coordinator can create a global checkpoint by combining
> individual checkpoints (e.g. taking the last completed checkpoint from each
> pipelined region)). If this comparison is correct, then this would mean
> that we have rescaling problems under the latter two cases.
>
> Cheers,
> Till
>
> On Mon, Feb 7, 2022 at 8:55 AM Gen Luo  wrote:
>
> > Hi Gyula,
> >
> > Thanks for sharing the idea. As Yuan mentioned, I think we can discuss
> this
> > within two scopes. One is the job subgraph, the other is the execution
> > subgraph, which I suppose is the same as PipelineRegion.
> >
> > An idea is to individually checkpoint the PipelineRegions, for the
> > recovering in a single run.
> >
> > Flink has now supported PipelineRegion based failover, with a subset of a
> > global checkpoint snapshot. The checkpoint barriers are spread within a
> > PipelineRegion, so the checkpointing of individual PipelineRegions is
> > actually independent. Since in a single run of a job, the PipelineRegions
> > are fixed, we can individually checkpoint separated PipelineRegions,
> > despite what status the other PipelineRegions are, and use a snapshot of
> a
> > failing region to recover, instead of the subset of a global snapshot.
> This
> > can support separated job subgraphs as well, since they will also be
> > separated into different PipelineRegions. I think this can fulfill your
> > needs.
> >
> > In fact the individual snapshots of all PipelineRegions can form a global
> > snapshot, and the alignment of snapshots of individual regions is not
> > necessary. But rescaling this global snapshot can be potentially
> complex. I
> > think it's better to use the individual snapshots in a single run, and
> take
> > a global checkpoint/savepoint before restarting the job, rescaling it or
> > not.
> >
> > A major issue of this plan is that it breaks the checkpoint mechanism of
> > Flink. As far as I know, even in the approximate recovery, the snapshot
> > used to recover a single task is still a part of a global snapshot. To
> > implement the individual checkpointing of PipelineRegions, there may need
> > to be a checkpoint coordinator for each PipelineRegion, and a new global
> > checkpoint coordinator. When the scale goes up, there can be many
> > individual regions, which can be a big burden to the job manager. The
> > meaning of the checkpoint id will also be changed, which can affect many
> > aspects. There can be lots of work and risks, and the risks still exist
> if
> > we only individually checkpoint separated job subgraphs, since the
> > mechanism is still broken. If that is what you need, maybe separating
> them
> > into different jobs is an easier and better choice, as Caizhi and Yuan
> > mentioned.
> >
> > On Mon, Feb 7, 2022 at 11:39 AM Yuan Mei  wrote:
> >
> > > Hey Gyula,
> > >
> > > That's a very interesting idea. The discussion about the `Individual`
> vs
> > > `Global` checkpoint was raised before, but the main concern was from
> two
> > > aspects:
> > >
> > > - Non-deterministic replaying may lead to an inconsistent view of
> > > checkpoint
> > > - It is not easy to form a clear cut of past and future and hence no
> > clear
> > > cut of where the start point of the source should begin to replay from.
> > >
> > > Starting from independent subgraphs as you proposed may be a good
> > starting
> > > point. However, when we talk about subgraph, do we mention it as a job
> > > subgraph (each vertex is one or more operators) or execution subgraph
> > (each
> > > vertex

Re: [VOTE] FLIP-212: Introduce Flink Kubernetes Operator

2022-02-07 Thread Yangze Guo
+1 (binding)

Best,
Yangze Guo

On Mon, Feb 7, 2022 at 5:04 PM K Fred  wrote:
>
> +1 (non-binding)
>
> Best Regards
> Peng Yuan
>
> On Mon, Feb 7, 2022 at 4:49 PM Danny Cranmer 
> wrote:
>
> > +1 (binding)
> >
> > Thanks for this effort!
> > Danny Cranmer
> >
> > On Mon, Feb 7, 2022 at 7:59 AM Biao Geng  wrote:
> >
> > > +1 (non-binding)
> > >
> > > Best,
> > > Biao Geng
> > >
> > > Peter Huang  于2022年2月7日周一 14:31写道:
> > >
> > > > +1 (non-binding)
> > > >
> > > >
> > > > Best Regards
> > > > Peter Huang
> > > >
> > > > On Sun, Feb 6, 2022 at 7:35 PM Yang Wang 
> > wrote:
> > > >
> > > > > +1 (binding)
> > > > >
> > > > > Best,
> > > > > Yang
> > > > >
> > > > > Xintong Song  于2022年2月7日周一 10:25写道:
> > > > >
> > > > > > +1 (binding)
> > > > > >
> > > > > > Thank you~
> > > > > >
> > > > > > Xintong Song
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Mon, Feb 7, 2022 at 12:52 AM Márton Balassi <
> > > > balassi.mar...@gmail.com
> > > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > +1 (binding)
> > > > > > >
> > > > > > > On Sat, Feb 5, 2022 at 5:35 PM Israel Ekpo  > >
> > > > > wrote:
> > > > > > >
> > > > > > > > I am very excited to see this.
> > > > > > > >
> > > > > > > > Thanks for driving the effort
> > > > > > > >
> > > > > > > > +1 (non-binding)
> > > > > > > >
> > > > > > > >
> > > > > > > > On Sat, Feb 5, 2022 at 10:53 AM Shqiprim Bunjaku <
> > > > > > > > shqiprimbunj...@gmail.com>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > +1 (non-binding)
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Sat, Feb 5, 2022 at 4:39 PM Chenya Zhang <
> > > > > > > chenyazhangche...@gmail.com
> > > > > > > > >
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > +1 (non-binding)
> > > > > > > > > >
> > > > > > > > > > Thanks folks for leading this effort and making it happen
> > so
> > > > > fast!
> > > > > > > > > >
> > > > > > > > > > Best,
> > > > > > > > > > Chenya
> > > > > > > > > >
> > > > > > > > > > On Sat, Feb 5, 2022 at 12:02 AM Gyula Fóra <
> > > gyf...@apache.org>
> > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi Thomas!
> > > > > > > > > > >
> > > > > > > > > > > +1 (binding) from my side
> > > > > > > > > > >
> > > > > > > > > > > Happy to see this effort getting some traction!
> > > > > > > > > > >
> > > > > > > > > > > Cheers,
> > > > > > > > > > > Gyula
> > > > > > > > > > >
> > > > > > > > > > > On Sat, Feb 5, 2022 at 3:00 AM Thomas Weise <
> > > t...@apache.org>
> > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi everyone,
> > > > > > > > > > > >
> > > > > > > > > > > > I'd like to start a vote on FLIP-212: Introduce Flink
> > > > > > Kubernetes
> > > > > > > > > > > > Operator [1] which has been discussed in [2].
> > > > > > > > > > > >
> > > > > > > > > > > > The vote will be open for at least 72 hours unless
> > there
> > > is
> > > > > an
> > > > > > > > > > > > objection or not enough votes.
> > > > > > > > > > > >
> > > > > > > > > > > > [1]
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-212%3A+Introduce+Flink+Kubernetes+Operator
> > > > > > > > > > > > [2]
> > > > > > > >
> > https://lists.apache.org/thread/1z78t6rf70h45v7fbd2m93rm2y1bvh0z
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks!
> > > > > > > > > > > > Thomas
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > --
> > > > > > > > Israel Ekpo
> > > > > > > > Lead Instructor, IzzyAcademy.com
> > > > > > > > https://www.youtube.com/c/izzyacademy
> > > > > > > > https://izzyacademy.com/
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >


Re: [VOTE] FLIP-212: Introduce Flink Kubernetes Operator

2022-02-07 Thread K Fred
+1 (non-binding)

Best Regards
Peng Yuan

On Mon, Feb 7, 2022 at 4:49 PM Danny Cranmer 
wrote:

> +1 (binding)
>
> Thanks for this effort!
> Danny Cranmer
>
> On Mon, Feb 7, 2022 at 7:59 AM Biao Geng  wrote:
>
> > +1 (non-binding)
> >
> > Best,
> > Biao Geng
> >
> > Peter Huang  于2022年2月7日周一 14:31写道:
> >
> > > +1 (non-binding)
> > >
> > >
> > > Best Regards
> > > Peter Huang
> > >
> > > On Sun, Feb 6, 2022 at 7:35 PM Yang Wang 
> wrote:
> > >
> > > > +1 (binding)
> > > >
> > > > Best,
> > > > Yang
> > > >
> > > > Xintong Song  于2022年2月7日周一 10:25写道:
> > > >
> > > > > +1 (binding)
> > > > >
> > > > > Thank you~
> > > > >
> > > > > Xintong Song
> > > > >
> > > > >
> > > > >
> > > > > On Mon, Feb 7, 2022 at 12:52 AM Márton Balassi <
> > > balassi.mar...@gmail.com
> > > > >
> > > > > wrote:
> > > > >
> > > > > > +1 (binding)
> > > > > >
> > > > > > On Sat, Feb 5, 2022 at 5:35 PM Israel Ekpo  >
> > > > wrote:
> > > > > >
> > > > > > > I am very excited to see this.
> > > > > > >
> > > > > > > Thanks for driving the effort
> > > > > > >
> > > > > > > +1 (non-binding)
> > > > > > >
> > > > > > >
> > > > > > > On Sat, Feb 5, 2022 at 10:53 AM Shqiprim Bunjaku <
> > > > > > > shqiprimbunj...@gmail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > +1 (non-binding)
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Sat, Feb 5, 2022 at 4:39 PM Chenya Zhang <
> > > > > > chenyazhangche...@gmail.com
> > > > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > +1 (non-binding)
> > > > > > > > >
> > > > > > > > > Thanks folks for leading this effort and making it happen
> so
> > > > fast!
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > Chenya
> > > > > > > > >
> > > > > > > > > On Sat, Feb 5, 2022 at 12:02 AM Gyula Fóra <
> > gyf...@apache.org>
> > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Thomas!
> > > > > > > > > >
> > > > > > > > > > +1 (binding) from my side
> > > > > > > > > >
> > > > > > > > > > Happy to see this effort getting some traction!
> > > > > > > > > >
> > > > > > > > > > Cheers,
> > > > > > > > > > Gyula
> > > > > > > > > >
> > > > > > > > > > On Sat, Feb 5, 2022 at 3:00 AM Thomas Weise <
> > t...@apache.org>
> > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi everyone,
> > > > > > > > > > >
> > > > > > > > > > > I'd like to start a vote on FLIP-212: Introduce Flink
> > > > > Kubernetes
> > > > > > > > > > > Operator [1] which has been discussed in [2].
> > > > > > > > > > >
> > > > > > > > > > > The vote will be open for at least 72 hours unless
> there
> > is
> > > > an
> > > > > > > > > > > objection or not enough votes.
> > > > > > > > > > >
> > > > > > > > > > > [1]
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-212%3A+Introduce+Flink+Kubernetes+Operator
> > > > > > > > > > > [2]
> > > > > > >
> https://lists.apache.org/thread/1z78t6rf70h45v7fbd2m93rm2y1bvh0z
> > > > > > > > > > >
> > > > > > > > > > > Thanks!
> > > > > > > > > > > Thomas
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > > --
> > > > > > > Israel Ekpo
> > > > > > > Lead Instructor, IzzyAcademy.com
> > > > > > > https://www.youtube.com/c/izzyacademy
> > > > > > > https://izzyacademy.com/
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>


Re: [VOTE] FLIP-212: Introduce Flink Kubernetes Operator

2022-02-07 Thread Danny Cranmer
+1 (binding)

Thanks for this effort!
Danny Cranmer

On Mon, Feb 7, 2022 at 7:59 AM Biao Geng  wrote:

> +1 (non-binding)
>
> Best,
> Biao Geng
>
> Peter Huang  于2022年2月7日周一 14:31写道:
>
> > +1 (non-binding)
> >
> >
> > Best Regards
> > Peter Huang
> >
> > On Sun, Feb 6, 2022 at 7:35 PM Yang Wang  wrote:
> >
> > > +1 (binding)
> > >
> > > Best,
> > > Yang
> > >
> > > Xintong Song  于2022年2月7日周一 10:25写道:
> > >
> > > > +1 (binding)
> > > >
> > > > Thank you~
> > > >
> > > > Xintong Song
> > > >
> > > >
> > > >
> > > > On Mon, Feb 7, 2022 at 12:52 AM Márton Balassi <
> > balassi.mar...@gmail.com
> > > >
> > > > wrote:
> > > >
> > > > > +1 (binding)
> > > > >
> > > > > On Sat, Feb 5, 2022 at 5:35 PM Israel Ekpo 
> > > wrote:
> > > > >
> > > > > > I am very excited to see this.
> > > > > >
> > > > > > Thanks for driving the effort
> > > > > >
> > > > > > +1 (non-binding)
> > > > > >
> > > > > >
> > > > > > On Sat, Feb 5, 2022 at 10:53 AM Shqiprim Bunjaku <
> > > > > > shqiprimbunj...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > +1 (non-binding)
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Sat, Feb 5, 2022 at 4:39 PM Chenya Zhang <
> > > > > chenyazhangche...@gmail.com
> > > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > +1 (non-binding)
> > > > > > > >
> > > > > > > > Thanks folks for leading this effort and making it happen so
> > > fast!
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Chenya
> > > > > > > >
> > > > > > > > On Sat, Feb 5, 2022 at 12:02 AM Gyula Fóra <
> gyf...@apache.org>
> > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Thomas!
> > > > > > > > >
> > > > > > > > > +1 (binding) from my side
> > > > > > > > >
> > > > > > > > > Happy to see this effort getting some traction!
> > > > > > > > >
> > > > > > > > > Cheers,
> > > > > > > > > Gyula
> > > > > > > > >
> > > > > > > > > On Sat, Feb 5, 2022 at 3:00 AM Thomas Weise <
> t...@apache.org>
> > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi everyone,
> > > > > > > > > >
> > > > > > > > > > I'd like to start a vote on FLIP-212: Introduce Flink
> > > > Kubernetes
> > > > > > > > > > Operator [1] which has been discussed in [2].
> > > > > > > > > >
> > > > > > > > > > The vote will be open for at least 72 hours unless there
> is
> > > an
> > > > > > > > > > objection or not enough votes.
> > > > > > > > > >
> > > > > > > > > > [1]
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-212%3A+Introduce+Flink+Kubernetes+Operator
> > > > > > > > > > [2]
> > > > > > https://lists.apache.org/thread/1z78t6rf70h45v7fbd2m93rm2y1bvh0z
> > > > > > > > > >
> > > > > > > > > > Thanks!
> > > > > > > > > > Thomas
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > --
> > > > > > Israel Ekpo
> > > > > > Lead Instructor, IzzyAcademy.com
> > > > > > https://www.youtube.com/c/izzyacademy
> > > > > > https://izzyacademy.com/
> > > > > >
> > > > >
> > > >
> > >
> >
>


[jira] [Created] (FLINK-25972) org.apache.calcite.rel.metadata.JaninoRelMetadataProvider#HANDLERS might cause class leaks

2022-02-07 Thread Yun Gao (Jira)
Yun Gao created FLINK-25972:
---

 Summary: 
org.apache.calcite.rel.metadata.JaninoRelMetadataProvider#HANDLERS might cause 
class leaks
 Key: FLINK-25972
 URL: https://issues.apache.org/jira/browse/FLINK-25972
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Affects Versions: 1.15.0
Reporter: Yun Gao


JaninoRelMetadataProvider -> static field HANDLERS -> generated class 
GeneratedMetadataHandler_ColumnNullCount loaded by 
org.codehaus.janino.ByteArrayClassLoader. This cache seems to use the strong 
references.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25971) org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil#OBJECT_MAPPER_INSTANCE might cause class leaks

2022-02-07 Thread Yun Gao (Jira)
Yun Gao created FLINK-25971:
---

 Summary: 
org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil#OBJECT_MAPPER_INSTANCE
 might cause class leaks
 Key: FLINK-25971
 URL: https://issues.apache.org/jira/browse/FLINK-25971
 Project: Flink
  Issue Type: Sub-task
Reporter: Yun Gao


It might need a double check if 

JsonSerdeUtil#OBJECT_MAPPER_INSTANCE -> DefaultDeserializationContext 
_deserializationContext -> extends DeserializationContext -> DeserializerCache 
_cache -> _cachedDeserializers / _incompleteDeserializers might hold user 
ClassLoaders



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] FLIP-212: Introduce Flink Kubernetes Operator

2022-02-07 Thread K Fred
Hi Gyula!

You are right. I think some common flink config options can be put in the
CR, other expert settings continue to be overwritten by flink, and then the
user can choose to customize the configuration.

Best Wishes,
Peng Yuan

On Mon, Feb 7, 2022 at 4:16 PM Gyula Fóra  wrote:

> Hi Yangze!
>
> This is not set in stone at the moment but the way I think it should work
> is that first class config options in the CR should always take precedence
> over the Flink config.
>
> In general we should not introduce too many arbitrary config options that
> duplicate the flink configs without good reasons but the ones we introduce
> should overwrite flink configs.
>
> We should discuss and decide together what config options to keep in the
> flink conf and what to bring on the CR level. Resource related ones are
> difficult because on one hand they are integral to every application, on
> the other hand there are many expert settings that we should probably leave
> in the conf.
>
> Cheers,
> Gyula
>
>
>
> On Mon, Feb 7, 2022 at 8:28 AM Yangze Guo  wrote:
>
> > Thanks everyone for the great effort. The FLIP looks really good.
> >
> > I just want to make sure the configuration priority in the CR example.
> > It seems the requests resources or "taskManager. taskSlots" will be
> > transferred to Flink internal config, e.g.
> > "taskmanager.memory.process.size" and "taskmanager.numberOfTaskSlots",
> > and override the one in "flinkConfiguration". Am I understanding this
> > correctly?
> >
> > Best,
> > Yangze Guo
> >
> > On Mon, Feb 7, 2022 at 10:22 AM Xintong Song 
> > wrote:
> > >
> > > Sorry for the late reply. We were out due to the public holidays in
> > China.
> > >
> > > @Thomas,
> > >
> > > The intention is to support application management through operator and
> > CR,
> > > > which means there won't be any 2 step submission process, which as
> you
> > > > allude to would defeat the purpose of this project. The CR example
> > shows
> > > > the application part. Please note that the bare cluster support is an
> > > > *additional* feature for scenarios that require external job
> > management. Is
> > > > there anything on the FLIP page that creates a different impression?
> > > >
> > >
> > > Sounds good to me. I don't remember what created the impression of 2
> step
> > > submission back then. I revisited the latest version of this FLIP and
> it
> > > looks good to me.
> > >
> > > @Gyula,
> > >
> > > Versioning:
> > > > Versioning will be independent from Flink and the operator will
> depend
> > on a
> > > > fixed flink version (in every given operator version).
> > > > This should be the exact same setup as with Stateful Functions (
> > > > https://github.com/apache/flink-statefun). So independent release
> > cycle
> > > > but
> > > > still within the Flink umbrella.
> > > >
> > >
> > > Does this mean if someone wants to upgrade Flink to a version that is
> > > released after the operator version that is being used, he/she would
> need
> > > to upgrade the operator version first?
> > > I'm not questioning this, just trying to make sure I'm understanding
> this
> > > correctly.
> > >
> > > Thank you~
> > >
> > > Xintong Song
> > >
> > >
> > >
> > > On Mon, Feb 7, 2022 at 3:14 AM Gyula Fóra 
> wrote:
> > >
> > > > Thank you Alexis,
> > > >
> > > > Will definitely check this out. You are right, Kotlin makes it
> > difficult to
> > > > adopt pieces of this code directly but I think it will be good to get
> > > > inspiration for the architecture and look at how particular problems
> > have
> > > > been solved. It will be a great help for us I am sure.
> > > >
> > > > Cheers,
> > > > Gyula
> > > >
> > > > On Sat, Feb 5, 2022 at 12:28 PM Alexis Sarda-Espinosa <
> > > > alexis.sarda-espin...@microfocus.com> wrote:
> > > >
> > > > > Hi everyone,
> > > > >
> > > > > just wanted to mention that my employer agreed to open source the
> > PoC I
> > > > > developed: https://github.com/MicroFocus/opsb-flink-k8s-operator
> > > > >
> > > > > I understand the concern for maintainability, so Gradle & Kotlin
> > might
> > > > not
> > > > > be appealing to you, but at least it gives you another reference.
> The
> > > > Helm
> > > > > resources in particular might be useful.
> > > > >
> > > > > There are bits and pieces there referring to Flink sessions, but
> > those
> > > > are
> > > > > just placeholders, the functioning parts use application mode with
> > native
> > > > > integration.
> > > > >
> > > > > Regards,
> > > > > Alexis.
> > > > >
> > > > > 
> > > > > From: Thomas Weise 
> > > > > Sent: Saturday, February 5, 2022 2:41 AM
> > > > > To: dev 
> > > > > Subject: Re: [DISCUSS] FLIP-212: Introduce Flink Kubernetes
> Operator
> > > > >
> > > > > Hi,
> > > > >
> > > > > Thanks for the continued feedback and discussion. Looks like we are
> > > > > ready to start a VOTE, I will initiate it shortly.
> > > > >
> > > > > In parallel it would be good to find the repository name.
> > > > >
> > > > > My 

[jira] [Created] (FLINK-25970) SerializedThrowable should record type of the original throwable.

2022-02-07 Thread Xintong Song (Jira)
Xintong Song created FLINK-25970:


 Summary: SerializedThrowable should record type of the original 
throwable.
 Key: FLINK-25970
 URL: https://issues.apache.org/jira/browse/FLINK-25970
 Project: Flink
  Issue Type: Improvement
  Components: API / Core
Affects Versions: 1.14.3
Reporter: Xintong Song


Currently, only the message and stack of the original throwable is preserved in 
{{{}SerializedThrowable{}}}, while the type of the original throwable is 
discarded.

Sometimes, it would be helpful if message of {{SerializedThrowable}} can also 
include the full class name of the original throwable.

E.g., in the following stack.
{code:java}
Caused by: org.apache.flink.util.SerializedThrowable
at java.util.HashMap$HashIterator.nextNode(HashMap.java:1437) ~[?:1.8.0_102]
at java.util.HashMap$EntryIterator.next(HashMap.java:1471) ~[?:1.8.0_102]
at java.util.HashMap$EntryIterator.next(HashMap.java:1469) ~[?:1.8.0_102]
...
{code}
It's not that easy to understand what is wrong from this stack. JDK does not 
provide a message for the original exception. We have to look into the JDK 
source codes to find out what's going on. Sometimes it's even more annoying 
having to look for the JDK source codes of the exactly same version in order to 
match the line numbers.

Turns out the original exception was a {{ConcurrentModificationException}}. I 
think it would be much more straightforward if we can have a stack like the 
following.
{code}
Caused by: org.apache.flink.util.SerializedThrowable: 
java.util.ConcurrentModificationException
at java.util.HashMap$HashIterator.nextNode(HashMap.java:1437) ~[?:1.8.0_102]
at java.util.HashMap$EntryIterator.next(HashMap.java:1471) ~[?:1.8.0_102]
at java.util.HashMap$EntryIterator.next(HashMap.java:1469) ~[?:1.8.0_102]
...
{code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25969) org.apache.flink.table.runtime.generated.CompileUtils might cause class leaks

2022-02-07 Thread Yun Gao (Jira)
Yun Gao created FLINK-25969:
---

 Summary: org.apache.flink.table.runtime.generated.CompileUtils 
might cause class leaks
 Key: FLINK-25969
 URL: https://issues.apache.org/jira/browse/FLINK-25969
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Runtime
Affects Versions: 1.15.0
Reporter: Yun Gao


CompileUtils has two static caches, namely COMPILED_CACHE and 
COMPILED_EXPRESSION_CACHE. COMPILED_CACHE is check that it might cache the user 
ClassLoaders with strong references, thus it might need be improved. 
COMPILED_EXPRESSION_CACHE would need a double check.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25968) Possible class leaks in flink-table / sql modules

2022-02-07 Thread Yun Gao (Jira)
Yun Gao created FLINK-25968:
---

 Summary: Possible class leaks in flink-table / sql modules
 Key: FLINK-25968
 URL: https://issues.apache.org/jira/browse/FLINK-25968
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner, Table SQL / Runtime
Affects Versions: 1.15.0
Reporter: Yun Gao


This is the umbrella issues for possible class leaks in flink-table / sql 
planner and runtimes.

Currently for a flink cluster, the flink classes are loaded by the system 
ClassLoader while each job would have separate user ClassLoaders. In this case, 
if some class loaded by the system ClassLoader has static variables that 
reference the classes loaded by the user  ClassLoaders, the user ClassLoaders 
would not be able to be released, which might cause class leak in some way. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] Checkpointing (partially) failing jobs

2022-02-07 Thread Till Rohrmann
Hi everyone,

Yuan and Gen could you elaborate why rescaling is a problem if we say that
separate pipelined regions can take checkpoints independently?
Conceptually, I somehow think that a pipelined region that is failed and
cannot create a new checkpoint is more or less the same as a pipelined
region that didn't get new input or a very very slow pipelined region which
couldn't read new records since the last checkpoint (assuming that the
checkpoint coordinator can create a global checkpoint by combining
individual checkpoints (e.g. taking the last completed checkpoint from each
pipelined region)). If this comparison is correct, then this would mean
that we have rescaling problems under the latter two cases.

Cheers,
Till

On Mon, Feb 7, 2022 at 8:55 AM Gen Luo  wrote:

> Hi Gyula,
>
> Thanks for sharing the idea. As Yuan mentioned, I think we can discuss this
> within two scopes. One is the job subgraph, the other is the execution
> subgraph, which I suppose is the same as PipelineRegion.
>
> An idea is to individually checkpoint the PipelineRegions, for the
> recovering in a single run.
>
> Flink has now supported PipelineRegion based failover, with a subset of a
> global checkpoint snapshot. The checkpoint barriers are spread within a
> PipelineRegion, so the checkpointing of individual PipelineRegions is
> actually independent. Since in a single run of a job, the PipelineRegions
> are fixed, we can individually checkpoint separated PipelineRegions,
> despite what status the other PipelineRegions are, and use a snapshot of a
> failing region to recover, instead of the subset of a global snapshot. This
> can support separated job subgraphs as well, since they will also be
> separated into different PipelineRegions. I think this can fulfill your
> needs.
>
> In fact the individual snapshots of all PipelineRegions can form a global
> snapshot, and the alignment of snapshots of individual regions is not
> necessary. But rescaling this global snapshot can be potentially complex. I
> think it's better to use the individual snapshots in a single run, and take
> a global checkpoint/savepoint before restarting the job, rescaling it or
> not.
>
> A major issue of this plan is that it breaks the checkpoint mechanism of
> Flink. As far as I know, even in the approximate recovery, the snapshot
> used to recover a single task is still a part of a global snapshot. To
> implement the individual checkpointing of PipelineRegions, there may need
> to be a checkpoint coordinator for each PipelineRegion, and a new global
> checkpoint coordinator. When the scale goes up, there can be many
> individual regions, which can be a big burden to the job manager. The
> meaning of the checkpoint id will also be changed, which can affect many
> aspects. There can be lots of work and risks, and the risks still exist if
> we only individually checkpoint separated job subgraphs, since the
> mechanism is still broken. If that is what you need, maybe separating them
> into different jobs is an easier and better choice, as Caizhi and Yuan
> mentioned.
>
> On Mon, Feb 7, 2022 at 11:39 AM Yuan Mei  wrote:
>
> > Hey Gyula,
> >
> > That's a very interesting idea. The discussion about the `Individual` vs
> > `Global` checkpoint was raised before, but the main concern was from two
> > aspects:
> >
> > - Non-deterministic replaying may lead to an inconsistent view of
> > checkpoint
> > - It is not easy to form a clear cut of past and future and hence no
> clear
> > cut of where the start point of the source should begin to replay from.
> >
> > Starting from independent subgraphs as you proposed may be a good
> starting
> > point. However, when we talk about subgraph, do we mention it as a job
> > subgraph (each vertex is one or more operators) or execution subgraph
> (each
> > vertex is a task instance)?
> >
> > If it is a job subgraph, then indeed, why not separate it into multiple
> > jobs as Caizhi mentioned.
> > If it is an execution subgraph, then it is difficult to handle rescaling
> > due to inconsistent views of checkpoints between tasks of the same
> > operator.
> >
> > `Individual/Subgraph Checkpointing` is definitely an interesting
> direction
> > to think of, and I'd love to hear more from you!
> >
> > Best,
> >
> > Yuan
> >
> >
> >
> >
> >
> >
> >
> > On Mon, Feb 7, 2022 at 10:16 AM Caizhi Weng 
> wrote:
> >
> > > Hi Gyula!
> > >
> > > Thanks for raising this discussion. I agree that this will be an
> > > interesting feature but I actually have some doubts about the
> motivation
> > > and use case. If there are multiple individual subgraphs in the same
> job,
> > > why not just distribute them to multiple jobs so that each job contains
> > > only one individual graph and can now fail without disturbing the
> others?
> > >
> > >
> > > Gyula Fóra  于2022年2月7日周一 05:22写道:
> > >
> > > > Hi all!
> > > >
> > > > At the moment checkpointing only works for healthy jobs with all
> > running
> > > > (or some finished) tasks. This sounds reasonabl

Re: [DISCUSS] FLIP-212: Introduce Flink Kubernetes Operator

2022-02-07 Thread Gyula Fóra
Hi Yangze!

This is not set in stone at the moment but the way I think it should work
is that first class config options in the CR should always take precedence
over the Flink config.

In general we should not introduce too many arbitrary config options that
duplicate the flink configs without good reasons but the ones we introduce
should overwrite flink configs.

We should discuss and decide together what config options to keep in the
flink conf and what to bring on the CR level. Resource related ones are
difficult because on one hand they are integral to every application, on
the other hand there are many expert settings that we should probably leave
in the conf.

Cheers,
Gyula



On Mon, Feb 7, 2022 at 8:28 AM Yangze Guo  wrote:

> Thanks everyone for the great effort. The FLIP looks really good.
>
> I just want to make sure the configuration priority in the CR example.
> It seems the requests resources or "taskManager. taskSlots" will be
> transferred to Flink internal config, e.g.
> "taskmanager.memory.process.size" and "taskmanager.numberOfTaskSlots",
> and override the one in "flinkConfiguration". Am I understanding this
> correctly?
>
> Best,
> Yangze Guo
>
> On Mon, Feb 7, 2022 at 10:22 AM Xintong Song 
> wrote:
> >
> > Sorry for the late reply. We were out due to the public holidays in
> China.
> >
> > @Thomas,
> >
> > The intention is to support application management through operator and
> CR,
> > > which means there won't be any 2 step submission process, which as you
> > > allude to would defeat the purpose of this project. The CR example
> shows
> > > the application part. Please note that the bare cluster support is an
> > > *additional* feature for scenarios that require external job
> management. Is
> > > there anything on the FLIP page that creates a different impression?
> > >
> >
> > Sounds good to me. I don't remember what created the impression of 2 step
> > submission back then. I revisited the latest version of this FLIP and it
> > looks good to me.
> >
> > @Gyula,
> >
> > Versioning:
> > > Versioning will be independent from Flink and the operator will depend
> on a
> > > fixed flink version (in every given operator version).
> > > This should be the exact same setup as with Stateful Functions (
> > > https://github.com/apache/flink-statefun). So independent release
> cycle
> > > but
> > > still within the Flink umbrella.
> > >
> >
> > Does this mean if someone wants to upgrade Flink to a version that is
> > released after the operator version that is being used, he/she would need
> > to upgrade the operator version first?
> > I'm not questioning this, just trying to make sure I'm understanding this
> > correctly.
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Mon, Feb 7, 2022 at 3:14 AM Gyula Fóra  wrote:
> >
> > > Thank you Alexis,
> > >
> > > Will definitely check this out. You are right, Kotlin makes it
> difficult to
> > > adopt pieces of this code directly but I think it will be good to get
> > > inspiration for the architecture and look at how particular problems
> have
> > > been solved. It will be a great help for us I am sure.
> > >
> > > Cheers,
> > > Gyula
> > >
> > > On Sat, Feb 5, 2022 at 12:28 PM Alexis Sarda-Espinosa <
> > > alexis.sarda-espin...@microfocus.com> wrote:
> > >
> > > > Hi everyone,
> > > >
> > > > just wanted to mention that my employer agreed to open source the
> PoC I
> > > > developed: https://github.com/MicroFocus/opsb-flink-k8s-operator
> > > >
> > > > I understand the concern for maintainability, so Gradle & Kotlin
> might
> > > not
> > > > be appealing to you, but at least it gives you another reference. The
> > > Helm
> > > > resources in particular might be useful.
> > > >
> > > > There are bits and pieces there referring to Flink sessions, but
> those
> > > are
> > > > just placeholders, the functioning parts use application mode with
> native
> > > > integration.
> > > >
> > > > Regards,
> > > > Alexis.
> > > >
> > > > 
> > > > From: Thomas Weise 
> > > > Sent: Saturday, February 5, 2022 2:41 AM
> > > > To: dev 
> > > > Subject: Re: [DISCUSS] FLIP-212: Introduce Flink Kubernetes Operator
> > > >
> > > > Hi,
> > > >
> > > > Thanks for the continued feedback and discussion. Looks like we are
> > > > ready to start a VOTE, I will initiate it shortly.
> > > >
> > > > In parallel it would be good to find the repository name.
> > > >
> > > > My suggestion would be: flink-kubernetes-operator
> > > >
> > > > I thought "flink-operator" could be a bit misleading since the term
> > > > operator already has a meaning in Flink.
> > > >
> > > > I also considered "flink-k8s-operator" but that would be almost
> > > > identical to existing operator implementations and could lead to
> > > > confusion in the future.
> > > >
> > > > Thoughts?
> > > >
> > > > Thanks,
> > > > Thomas
> > > >
> > > >
> > > >
> > > > On Fri, Feb 4, 2022 at 5:15 AM Gyula Fóra 
> wrote:
> > > > >
> > > > > Hi Danny,
> > > > >
> > > > > So f

Re: [VOTE] FLIP-211: Kerberos delegation token framework

2022-02-07 Thread Chesnay Schepler

+1 (binding)

On 21/01/2022 15:57, Gabor Somogyi wrote:

Hi devs,

I would like to start the vote for FLIP-211 [1], which was discussed and
reached a consensus in the discussion thread [2].

The vote will be open for at least 72h, unless there is an objection or not
enough votes.

BR,
G

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-211%3A+Kerberos+delegation+token+framework

[2] https://lists.apache.org/thread/cvwknd5fhohj0wfv8mfwn70jwpjvxrjj