[jira] [Created] (FLINK-34016) Janino compile failed when watermark with column by udf

2024-01-07 Thread Jude Zhu (Jira)
Jude Zhu created FLINK-34016:


 Summary: Janino compile failed when watermark with column by udf
 Key: FLINK-34016
 URL: https://issues.apache.org/jira/browse/FLINK-34016
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime
Affects Versions: 1.18.0, 1.15.0
Reporter: Jude Zhu


After submit the following flink sql by sql-client.sh will throw an exception:
{code:java}
Caused by: java.lang.RuntimeException: Could not instantiate generated class 
'WatermarkGenerator$0'
    at 
org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:74)
    at 
org.apache.flink.table.runtime.generated.GeneratedWatermarkGeneratorSupplier.createWatermarkGenerator(GeneratedWatermarkGeneratorSupplier.java:69)
    at 
org.apache.flink.streaming.api.operators.source.ProgressiveTimestampsAndWatermarks.createMainOutput(ProgressiveTimestampsAndWatermarks.java:109)
    at 
org.apache.flink.streaming.api.operators.SourceOperator.initializeMainOutput(SourceOperator.java:462)
    at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNextNotReading(SourceOperator.java:438)
    at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:414)
    at 
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
    at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
    at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkRuntimeException: 
org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
compiled. This is a bug. Please file an issue.
    at 
org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:94)
    at 
org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:101)
    at 
org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:68)
    ... 16 more
Caused by: 
org.apache.flink.shaded.guava31.com.google.common.util.concurrent.UncheckedExecutionException:
 org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
compiled. This is a bug. Please file an issue.
    at 
org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2055)
    at 
org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache.get(LocalCache.java:3966)
    at 
org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4863)
    at 
org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:92)
    ... 18 more
Caused by: org.apache.flink.api.common.InvalidProgramException: Table program 
cannot be compiled. This is a bug. Please file an issue.
    at 
org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:107)
    at 
org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$0(CompileUtils.java:92)
    at 
org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4868)
    at 
org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3533)
    at 
org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2282)
    at 
org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2159)
    at 
org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2049)
    ... 21 more
Caused by: org.codehaus.commons.compiler.CompileException: Line 29, Column 123: 
Line 29, Column 123: Cannot determine simple type name "org"
    at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:7007)
    at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6886)
    at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6899)
    at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6899)
    at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6899)
    at 

Re: [DISCUSS] FLIP-403: High Availability Services for OLAP Scenarios

2024-01-07 Thread Rui Fan
Thanks to Yangze driving this proposal!

Overall looks good to me! This proposal is useful for
the performance when the job doesn't need the failover.

I have some minor questions:

1. How does it work with FLIP-383[1]?

This FLIP introduces a high-availability.enable-job-recovery,
and FLIP-383 introduces a execution.batch.job-recovery.enabled.

IIUC, when high-availability.enable-job-recovery is false, the job
cannot recover even if execution.batch.job-recovery.enabled = true,
right?

If so, could we check some parameters and warn some logs? Or
disable the execution.batch.job-recovery.enabled directly when
high-availability.enable-job-recovery = false.

2. Could we rename it to high-availability.job-recovery.enabled to unify
the naming?

WDYT?

[1] https://cwiki.apache.org/confluence/x/QwqZE

Best,
Rui

On Mon, Jan 8, 2024 at 2:04 PM Yangze Guo  wrote:

> Thanks for your comment, Yong.
>
> Here are my thoughts on the splitting of HighAvailableServices:
> Firstly, I would treat this separation as a result of technical debt
> and a side effect of the FLIP. In order to achieve a cleaner interface
> hierarchy for High Availability before Flink 2.0, the design decision
> should not be limited to OLAP scenarios.
> I agree that the current HAServices can be divided based on either the
> actual target (cluster & job) or the type of functionality (leader
> election & persistence). From a conceptual perspective, I do not see
> one approach being better than the other. However, I have chosen the
> current separation for a clear separation of concerns. After FLIP-285,
> each process has a dedicated LeaderElectionService responsible for
> leader election of all the components within it. This
> LeaderElectionService has its own lifecycle management. If we were to
> split the HAServices into 'ClusterHighAvailabilityService' and
> 'JobHighAvailabilityService', we would need to couple the lifecycle
> management of these two interfaces, as they both rely on the
> LeaderElectionService and other relevant classes. This coupling and
> implicit design assumption will increase the complexity and testing
> difficulty of the system. WDYT?
>
> Best,
> Yangze Guo
>
> On Mon, Jan 8, 2024 at 12:08 PM Yong Fang  wrote:
> >
> > Thanks Yangze for starting this discussion. I have one comment: why do we
> > need to abstract two services as `LeaderServices` and
> > `PersistenceServices`?
> >
> > From the content, the purpose of this FLIP is to make job failover more
> > lightweight, so it would be more appropriate to abstract two services as
> > `ClusterHighAvailabilityService` and `JobHighAvailabilityService` instead
> > of `LeaderServices` and `PersistenceServices` based on leader and store.
> In
> > this way, we can create a `JobHighAvailabilityService` that has a leader
> > service and store for the job that meets the requirements based on the
> > configuration in the zk/k8s high availability service.
> >
> > WDYT?
> >
> > Best,
> > Fang Yong
> >
> > On Fri, Dec 29, 2023 at 8:10 PM xiangyu feng 
> wrote:
> >
> > > Thanks Yangze for restart this discussion.
> > >
> > > +1 for the overall idea. By splitting the HighAvailabilityServices into
> > > LeaderServices and PersistenceServices, we may support configuring
> > > different storage behind them in the future.
> > >
> > > We did run into real problems in production where too much job
> metadata was
> > > being stored on ZK, causing system instability.
> > >
> > >
> > > Yangze Guo  于2023年12月29日周五 10:21写道:
> > >
> > > > Thanks for the response, Zhanghao.
> > > >
> > > > PersistenceServices sounds good to me.
> > > >
> > > > Best,
> > > > Yangze Guo
> > > >
> > > > On Wed, Dec 27, 2023 at 11:30 AM Zhanghao Chen
> > > >  wrote:
> > > > >
> > > > > Thanks for driving this effort, Yangze! The proposal overall LGTM.
> > > Other
> > > > from the throughput enhancement in the OLAP scenario, the separation
> of
> > > > leader election/discovery services and the metadata persistence
> services
> > > > will also make the HA impl clearer and easier to maintain. Just a
> minor
> > > > comment on naming: would it better to rename PersistentServices to
> > > > PersistenceServices, as usually we put a noun before Services?
> > > > >
> > > > > Best,
> > > > > Zhanghao Chen
> > > > > 
> > > > > From: Yangze Guo 
> > > > > Sent: Tuesday, December 19, 2023 17:33
> > > > > To: dev 
> > > > > Subject: [DISCUSS] FLIP-403: High Availability Services for OLAP
> > > > Scenarios
> > > > >
> > > > > Hi, there,
> > > > >
> > > > > We would like to start a discussion thread on "FLIP-403: High
> > > > > Availability Services for OLAP Scenarios"[1].
> > > > >
> > > > > Currently, Flink's high availability service consists of two
> > > > > mechanisms: leader election/retrieval services for JobManager and
> > > > > persistent services for job metadata. However, these mechanisms are
> > > > > set up in an "all or nothing" manner. In OLAP scenarios, we
> typically
> > > > > only require 

Re: [Discuss] FLIP-407: Improve Flink Client performance in interactive scenarios

2024-01-07 Thread Rui Fan
Only one strategy is fine to me.

When the multiplier is set to 1, the exponential-delay will become
fixed-delay.
So fixed-delay may not be needed.

Best,
Rui

On Mon, Jan 8, 2024 at 2:17 PM Yong Fang  wrote:

> I agree with @Rui that the current configuration for Flink Client is a
> little complex. Can we just provide one strategy with less configuration
> items for all scenarios?
>
> Best,
> Fang Yong
>
> On Mon, Jan 8, 2024 at 11:19 AM Rui Fan <1996fan...@gmail.com> wrote:
>
> > Thanks xiangyu for driving this proposal! And sorry for the
> > late reply.
> >
> > Overall looks good to me, I only have some minor questions:
> >
> > 1. Do we need to introduce 3 collect strategies in the first version?
> >
> > Large and comprehensive configuration items will bring
> > additional learning costs and usage costs to users. I tend to
> > provide users with out-of-the-box parameters and 2 collect
> > strategies may be enough for users.
> >
> > IIUC, there is no big difference between exponential-delay and
> > incremental-delay, especially the default parameters provided.
> > I wonder could we provide a multiplier for exponential-delay strategy
> > and removing the incremental-delay strategy?
> >
> > Of course, if you think multiplier option is not needed based on
> > your production experience, it's totally fine for me. Simple is better.
> >
> > 2. Which strategy do you think is best in mass production?
> >
> > I'm working on FLIP-364[1], it's related to Flink failover restart
> > strategy. IIUC, when one cluster only has a few flink jobs,
> > fixed-delay is fine. It guarantees minimal latency without too
> > much stress. But if one cluster has too many jobs, fixed-delay
> > may not be stable.
> >
> > Do you think exponential-delay is better than fixed delay in this
> > scenario? And which strategy is used in your production for now?
> > Would you mind sharing it?
> >
> > Looking forwarding to your opinion~
> >
> > Best,
> > Rui
> >
> > On Sat, Jan 6, 2024 at 5:54 PM xiangyu feng 
> wrote:
> >
> > > Hi all,
> > >
> > > Thanks for the comments.
> > >
> > > If there is no further comment, we will open the voting thread next
> week.
> > >
> > > Regards,
> > > Xiangyu
> > >
> > > Zhanghao Chen  于2024年1月3日周三 16:46写道:
> > >
> > > > Thanks for driving this effort on improving the interactive use
> > > experience
> > > > of Flink. The proposal overall looks good to me.
> > > >
> > > > Best,
> > > > Zhanghao Chen
> > > > 
> > > > From: xiangyu feng 
> > > > Sent: Tuesday, December 26, 2023 16:51
> > > > To: dev@flink.apache.org 
> > > > Subject: [Discuss] FLIP-407: Improve Flink Client performance in
> > > > interactive scenarios
> > > >
> > > > Hi devs,
> > > >
> > > > I'm opening this thread to discuss FLIP-407: Improve Flink Client
> > > > performance in interactive scenarios. The POC test results and design
> > doc
> > > > can be found at: FLIP-407
> > > > <
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-407%3A+Improve+Flink+Client+performance+when+interacting+with+dedicated+Flink+Session+Clusters
> > > > >
> > > > .
> > > >
> > > > Currently, Flink Client is mainly designed for one time interaction
> > with
> > > > the Flink Cluster. All the resources(http connections, threads, ha
> > > > services) and instances(ClusterDescriptor, ClusterClient, RestClient)
> > are
> > > > created and recycled for each interaction. This works well when users
> > do
> > > > not need to interact frequently with Flink Cluster and also saves
> > > resource
> > > > usage since resources are recycled immediately after each usage.
> > > >
> > > > However, in OLAP or StreamingWarehouse scenarios, users might submit
> > > > interactive jobs to a dedicated Flink Session Cluster very often. In
> > this
> > > > case, we find that for short queries that can finish in less than 1s
> in
> > > > Flink Cluster will still have E2E latency greater than 2s. Hence, we
> > > > propose this FLIP to improve the Flink Client performance in this
> > > scenario.
> > > > This could also improve the user experience when using session debug
> > > mode.
> > > >
> > > > The major change in this FLIP is that there will be a new introduced
> > > option
> > > > *'execution.interactive-client'*. When this option is enabled, Flink
> > > > Client will reuse all the necessary resources to improve interactive
> > > > performance, including: HA Services, HTTP connections, threads and
> all
> > > > kinds of instances related to a long-running Flink Cluster. The
> default
> > > > value of this option will be false, then Flink Client will behave as
> > > > before.
> > > >
> > > > Also, this FLIP proposed a configurable RetryStrategy when fetching
> > > results
> > > > from client-side to Flink Cluster. In interactive scenarios, this can
> > > save
> > > > more than 15% of TM CPU usage without performance degradation.
> > > >
> > > > Looking forward to your feedback, thanks.
> > > >
> > > > Best regards,
> > > > Xiangyu

Re: FLIP-413: Enable unaligned checkpoints by default

2024-01-07 Thread Rui Fan
Thanks to Piotr driving this proposal!

Enabling unaligned checkpoint with aligned checkpoints timeout
is fine for me. I'm not sure if aligned checkpoints timeout =5s is
too aggressive. If the unaligned checkpoint is enabled by default
for all jobs, I recommend that the aligned checkpoints timeout be
at least 30s.

If the 30s is too big for some of the flink jobs, flink users can turn
it down by themselves.

To David, Ken and Zhanghao:

Unaligned checkpoint indeed has some limitations than aligned checkpoint,
but if we set aligned checkpoints timeout= 30s or 60s, it means
when a job can be completed within 30s or 60s, this job still uses the
aligned checkpoint (it doesn't introduce any extra effort).
When the checkpoint cannot be completed within aligned checkpoints timeout,
the aligned checkpoint will be switched to the unaligned checkpoint
The unaligned checkpoint can be completed when backpressure is severe.

In brief, when backpressure is low, enabling them without any effort.
when backpressure is high, enabling them has some benefits.

So I think it doesn't have too many risks when aligned checkpoints timeout
is set to 30s or above. WDYT?

Best,
Rui

On Mon, Jan 8, 2024 at 12:57 PM Zhanghao Chen 
wrote:

> Hi Piotr,
>
> As a platform administer who runs kilos of Flink jobs, I'd be against the
> idea to enable unaligned cp by default for our jobs. It may help a
> significant portion of the users, but the subtle issues around unaligned CP
> for a few jobs will probably raise a lot more on-calls and incidents. From
> my point of view, we'd better not enable it by default before removing all
> the limitations listed in
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/ops/state/checkpointing_under_backpressure/#limitations
> .
>
> Best,
> Zhanghao Chen
> 
> From: Piotr Nowojski 
> Sent: Friday, January 5, 2024 21:41
> To: dev 
> Subject: FLIP-413: Enable unaligned checkpoints by default
>
> Hi!
>
> I would like to propose by default to enable unaligned checkpoints and also
> simultaneously increase the aligned checkpoints timeout from 0ms to 5s. I
> think this change is the right one to do for the majority of Flink users.
>
> For more rationale please take a look into the short FLIP-413 [1].
>
> What do you all think?
>
> Best,
> Piotrek
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-413%3A+Enable+unaligned+checkpoints+by+default
>


[jira] [Created] (FLINK-34015) execution.savepoint.ignore-unclaimed-state is invalid when passing this parameter by dynamic properties

2024-01-07 Thread Renxiang Zhou (Jira)
Renxiang Zhou created FLINK-34015:
-

 Summary: execution.savepoint.ignore-unclaimed-state is invalid 
when passing this parameter by dynamic properties
 Key: FLINK-34015
 URL: https://issues.apache.org/jira/browse/FLINK-34015
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Affects Versions: 1.17.0
Reporter: Renxiang Zhou
 Attachments: image-2024-01-08-14-22-09-758.png, 
image-2024-01-08-14-24-30-665.png, image-2024-01-08-14-29-04-347.png

We set `execution.savepoint.ignore-unclaimed-state` to true and use -D option 
to submit the job, but unfortunately we found the value is still false in 
jobmanager log.

Pic 1: we  set `execution.savepoint.ignore-unclaimed-state` to true in 
submiting job.
!image-2024-01-08-14-22-09-758.png|width=1012,height=222!

Pic 2: The value is still false in jmlog.

!image-2024-01-08-14-24-30-665.png|width=651,height=51!

 

Besides, the parameter `execution.savepoint-restore-mode` has the same problem 
since when we pass it by -D option.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [Discuss] FLIP-407: Improve Flink Client performance in interactive scenarios

2024-01-07 Thread Yong Fang
I agree with @Rui that the current configuration for Flink Client is a
little complex. Can we just provide one strategy with less configuration
items for all scenarios?

Best,
Fang Yong

On Mon, Jan 8, 2024 at 11:19 AM Rui Fan <1996fan...@gmail.com> wrote:

> Thanks xiangyu for driving this proposal! And sorry for the
> late reply.
>
> Overall looks good to me, I only have some minor questions:
>
> 1. Do we need to introduce 3 collect strategies in the first version?
>
> Large and comprehensive configuration items will bring
> additional learning costs and usage costs to users. I tend to
> provide users with out-of-the-box parameters and 2 collect
> strategies may be enough for users.
>
> IIUC, there is no big difference between exponential-delay and
> incremental-delay, especially the default parameters provided.
> I wonder could we provide a multiplier for exponential-delay strategy
> and removing the incremental-delay strategy?
>
> Of course, if you think multiplier option is not needed based on
> your production experience, it's totally fine for me. Simple is better.
>
> 2. Which strategy do you think is best in mass production?
>
> I'm working on FLIP-364[1], it's related to Flink failover restart
> strategy. IIUC, when one cluster only has a few flink jobs,
> fixed-delay is fine. It guarantees minimal latency without too
> much stress. But if one cluster has too many jobs, fixed-delay
> may not be stable.
>
> Do you think exponential-delay is better than fixed delay in this
> scenario? And which strategy is used in your production for now?
> Would you mind sharing it?
>
> Looking forwarding to your opinion~
>
> Best,
> Rui
>
> On Sat, Jan 6, 2024 at 5:54 PM xiangyu feng  wrote:
>
> > Hi all,
> >
> > Thanks for the comments.
> >
> > If there is no further comment, we will open the voting thread next week.
> >
> > Regards,
> > Xiangyu
> >
> > Zhanghao Chen  于2024年1月3日周三 16:46写道:
> >
> > > Thanks for driving this effort on improving the interactive use
> > experience
> > > of Flink. The proposal overall looks good to me.
> > >
> > > Best,
> > > Zhanghao Chen
> > > 
> > > From: xiangyu feng 
> > > Sent: Tuesday, December 26, 2023 16:51
> > > To: dev@flink.apache.org 
> > > Subject: [Discuss] FLIP-407: Improve Flink Client performance in
> > > interactive scenarios
> > >
> > > Hi devs,
> > >
> > > I'm opening this thread to discuss FLIP-407: Improve Flink Client
> > > performance in interactive scenarios. The POC test results and design
> doc
> > > can be found at: FLIP-407
> > > <
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-407%3A+Improve+Flink+Client+performance+when+interacting+with+dedicated+Flink+Session+Clusters
> > > >
> > > .
> > >
> > > Currently, Flink Client is mainly designed for one time interaction
> with
> > > the Flink Cluster. All the resources(http connections, threads, ha
> > > services) and instances(ClusterDescriptor, ClusterClient, RestClient)
> are
> > > created and recycled for each interaction. This works well when users
> do
> > > not need to interact frequently with Flink Cluster and also saves
> > resource
> > > usage since resources are recycled immediately after each usage.
> > >
> > > However, in OLAP or StreamingWarehouse scenarios, users might submit
> > > interactive jobs to a dedicated Flink Session Cluster very often. In
> this
> > > case, we find that for short queries that can finish in less than 1s in
> > > Flink Cluster will still have E2E latency greater than 2s. Hence, we
> > > propose this FLIP to improve the Flink Client performance in this
> > scenario.
> > > This could also improve the user experience when using session debug
> > mode.
> > >
> > > The major change in this FLIP is that there will be a new introduced
> > option
> > > *'execution.interactive-client'*. When this option is enabled, Flink
> > > Client will reuse all the necessary resources to improve interactive
> > > performance, including: HA Services, HTTP connections, threads and all
> > > kinds of instances related to a long-running Flink Cluster. The default
> > > value of this option will be false, then Flink Client will behave as
> > > before.
> > >
> > > Also, this FLIP proposed a configurable RetryStrategy when fetching
> > results
> > > from client-side to Flink Cluster. In interactive scenarios, this can
> > save
> > > more than 15% of TM CPU usage without performance degradation.
> > >
> > > Looking forward to your feedback, thanks.
> > >
> > > Best regards,
> > > Xiangyu
> > >
> >
>


Re: [DISCUSS] FLIP-403: High Availability Services for OLAP Scenarios

2024-01-07 Thread Yangze Guo
Thanks for your comment, Yong.

Here are my thoughts on the splitting of HighAvailableServices:
Firstly, I would treat this separation as a result of technical debt
and a side effect of the FLIP. In order to achieve a cleaner interface
hierarchy for High Availability before Flink 2.0, the design decision
should not be limited to OLAP scenarios.
I agree that the current HAServices can be divided based on either the
actual target (cluster & job) or the type of functionality (leader
election & persistence). From a conceptual perspective, I do not see
one approach being better than the other. However, I have chosen the
current separation for a clear separation of concerns. After FLIP-285,
each process has a dedicated LeaderElectionService responsible for
leader election of all the components within it. This
LeaderElectionService has its own lifecycle management. If we were to
split the HAServices into 'ClusterHighAvailabilityService' and
'JobHighAvailabilityService', we would need to couple the lifecycle
management of these two interfaces, as they both rely on the
LeaderElectionService and other relevant classes. This coupling and
implicit design assumption will increase the complexity and testing
difficulty of the system. WDYT?

Best,
Yangze Guo

On Mon, Jan 8, 2024 at 12:08 PM Yong Fang  wrote:
>
> Thanks Yangze for starting this discussion. I have one comment: why do we
> need to abstract two services as `LeaderServices` and
> `PersistenceServices`?
>
> From the content, the purpose of this FLIP is to make job failover more
> lightweight, so it would be more appropriate to abstract two services as
> `ClusterHighAvailabilityService` and `JobHighAvailabilityService` instead
> of `LeaderServices` and `PersistenceServices` based on leader and store. In
> this way, we can create a `JobHighAvailabilityService` that has a leader
> service and store for the job that meets the requirements based on the
> configuration in the zk/k8s high availability service.
>
> WDYT?
>
> Best,
> Fang Yong
>
> On Fri, Dec 29, 2023 at 8:10 PM xiangyu feng  wrote:
>
> > Thanks Yangze for restart this discussion.
> >
> > +1 for the overall idea. By splitting the HighAvailabilityServices into
> > LeaderServices and PersistenceServices, we may support configuring
> > different storage behind them in the future.
> >
> > We did run into real problems in production where too much job metadata was
> > being stored on ZK, causing system instability.
> >
> >
> > Yangze Guo  于2023年12月29日周五 10:21写道:
> >
> > > Thanks for the response, Zhanghao.
> > >
> > > PersistenceServices sounds good to me.
> > >
> > > Best,
> > > Yangze Guo
> > >
> > > On Wed, Dec 27, 2023 at 11:30 AM Zhanghao Chen
> > >  wrote:
> > > >
> > > > Thanks for driving this effort, Yangze! The proposal overall LGTM.
> > Other
> > > from the throughput enhancement in the OLAP scenario, the separation of
> > > leader election/discovery services and the metadata persistence services
> > > will also make the HA impl clearer and easier to maintain. Just a minor
> > > comment on naming: would it better to rename PersistentServices to
> > > PersistenceServices, as usually we put a noun before Services?
> > > >
> > > > Best,
> > > > Zhanghao Chen
> > > > 
> > > > From: Yangze Guo 
> > > > Sent: Tuesday, December 19, 2023 17:33
> > > > To: dev 
> > > > Subject: [DISCUSS] FLIP-403: High Availability Services for OLAP
> > > Scenarios
> > > >
> > > > Hi, there,
> > > >
> > > > We would like to start a discussion thread on "FLIP-403: High
> > > > Availability Services for OLAP Scenarios"[1].
> > > >
> > > > Currently, Flink's high availability service consists of two
> > > > mechanisms: leader election/retrieval services for JobManager and
> > > > persistent services for job metadata. However, these mechanisms are
> > > > set up in an "all or nothing" manner. In OLAP scenarios, we typically
> > > > only require leader election/retrieval services for JobManager
> > > > components since jobs usually do not have a restart strategy.
> > > > Additionally, the persistence of job states can negatively impact the
> > > > cluster's throughput, especially for short query jobs.
> > > >
> > > > To address these issues, this FLIP proposes splitting the
> > > > HighAvailabilityServices into LeaderServices and PersistentServices,
> > > > and enable users to independently configure the high availability
> > > > strategies specifically related to jobs.
> > > >
> > > > Please find more details in the FLIP wiki document [1]. Looking
> > > > forward to your feedback.
> > > >
> > > > [1]
> > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-403+High+Availability+Services+for+OLAP+Scenarios
> > > >
> > > > Best,
> > > > Yangze Guo
> > >
> >


Re: [VOTE] FLIP-405: Migrate string configuration key to ConfigOption

2024-01-07 Thread Hang Ruan
+1(non-binding)

Best,
Hang

Rui Fan <1996fan...@gmail.com> 于2024年1月8日周一 13:04写道:

> +1(binding)
>
> Best,
> Rui
>
> On Mon, Jan 8, 2024 at 1:00 PM Xuannan Su  wrote:
>
> > Hi everyone,
> >
> > Thanks for all the feedback about the FLIP-405: Migrate string
> > configuration key to ConfigOption [1] [2].
> >
> > I'd like to start a vote for it. The vote will be open for at least 72
> > hours(excluding weekends,until Jan 11, 12:00AM GMT) unless there is an
> > objection or an insufficient number of votes.
> >
> >
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-405%3A+Migrate+string+configuration+key+to+ConfigOption
> > [2] https://lists.apache.org/thread/zfw1b1g3679yn0ppjbsokfrsx9k7ybg0
> >
> >
> > Best,
> > Xuannan
> >
>


Re:Re:Re: Re: [DISCUSS] FLIP-392: Deprecate the Legacy Group Window Aggregation

2024-01-07 Thread Xuyang
Hi, all. 
Considering what Martijn suggested, as a big break, we can directly remove the 
old group window agg operator in 2.0 
without rewriting it to the new window agg operator. 
At the same time, every subtask in this FLIP to align the new and old window 
agg operators is almost covered 
by the existing JIRAs marked as feature shown below. Therefore this FLIP will 
be abandoned. Thanks to everyone 
who participated in the discussion.
1. Support Session Window TVF: https://issues.apache.org/jira/browse/FLINK-24024
2. Support Consuming CDC: https://issues.apache.org/jira/browse/FLINK-20281
3. Support HOP window size with non-integer step length: A new feature jira 
will be opened for it. Because FLIP-145[1]
only has restrictions on the relationship between size and step in CUMULATE 
Window and has no restrictions on HOP 
Window, but currently it will throw an exception directly. So this is an 
unfinished work.


[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-145%3A+Support+SQL+windowing+table-valued+function#FLIP145:SupportSQLwindowingtablevaluedfunction-SessionWindows




--

Best!
Xuyang





At 2024-01-02 09:37:15, "Xuyang"  wrote:
>Hi, Martijn.
>Thank you for your reminder :)
>
>
>My idea is that in the current 1.x version, we can automatically convert the 
>agg operator in the old grammar into the agg operator
>in the new grammar. Huge changes will be introduced in version 2.0 that old 
>syntax will be directly deleted at the code level.
>>That would imply that we will never be able to remove the old SQL
>>language from the code base, since we would always rewrite that old
>>language to the new implementation under the hood.
>I'm a little curious why the old syntax can't be removed in the code in 2.0. 
>If you have any better ideas, let’s discuss it together.
>
>
>
>
>--
>
>Best!
>Xuyang
>
>
>
>
>
>At 2023-12-27 23:20:06, "Martijn Visser"  wrote:
>>Hi Xuyang,
>>
>>It's currently the holiday season in Europe so do expect some slow responses.
>>
>>> The key reason the original FLIP is called "Deprecate the Legacy Group 
>>> Window Aggregation" is that we also plan to automatically rewrite the group 
>>> window agg corresponding the old syntax into the window agg corresponding 
>>> the new window TVF syntax (will provide a fallback option from a 
>>> compatibility perspective). Whether the window agg corresponding the new 
>>> syntax is actively used by user or automatically rewritten, we all rely on 
>>> the alignment of the functionality between the window agg and the legacy 
>>> group window agg.
>>
>>That would imply that we will never be able to remove the old SQL
>>language from the code base, since we would always rewrite that old
>>language to the new implementation under the hood. I don't think
>>that's necessarily a good idea, especially given that Flink 2.0 is
>>coming next year and we could make a clean break there.
>>
>>Best regards,
>>
>>Martijn
>>
>>On Thu, Dec 21, 2023 at 12:44 PM Xuyang  wrote:
>>>
>>> Hi, Timo. Sorry to bother you. There's something I really need to hear your 
>>> thoughts on.
>>>
>>>
>>>
>>>
>>> When I'm trying to split this flip, having reviewed this discussion and the 
>>> FLIP document again, I realized that there is still a key issue that hasn't 
>>> been clarified. The key reason the original FLIP is called "Deprecate the 
>>> Legacy Group Window Aggregation" is that we also plan to automatically 
>>> rewrite the group window agg corresponding the old syntax into the window 
>>> agg corresponding the new window TVF syntax (will provide a fallback option 
>>> from a compatibility perspective). Whether the window agg corresponding the 
>>> new syntax is actively used by user or automatically rewritten, we all rely 
>>> on the alignment of the functionality between the window agg and the legacy 
>>> group window agg.
>>>
>>>
>>>
>>>
>>> To explain in detail, the original flip has the following two core parts.
>>>
>>>
>>>
>>>
>>> 1. Automatically rewrite the legacy group window agg into the new window 
>>> agg during plan optimization. (Corresponding to Section 5 in the Proposed 
>>> Changes of the original FLIP)
>>>
>>>
>>>
>>>
>>> 2. The alignment subtasks that the rewriting work depends on, involve 
>>> aligning the features of the two operators. (No one had objections to this 
>>> part of the work, and some of them are WIP) (Corresponding to Section 1-4 
>>> in the Proposed Changes of the original FLIP)
>>>
>>>
>>>
>>>
>>> Currently, there are two ways to deal with this flip.
>>>
>>>
>>>
>>>
>>> 1. According to your previous suggestion, split the subtasks of the two 
>>> alignment features of supporting cdc stream and supporting HOP window size 
>>> with non-integer step length into independent flips. Moreover, an 
>>> additional Flip should be added to describe the work of automatic plan 
>>> rewriting. In the discussion email, associate these three flips together. 
>>> I'm not sure that's a bit trivial about doing this because the 

Re: [VOTE] FLIP-405: Migrate string configuration key to ConfigOption

2024-01-07 Thread Rui Fan
+1(binding)

Best,
Rui

On Mon, Jan 8, 2024 at 1:00 PM Xuannan Su  wrote:

> Hi everyone,
>
> Thanks for all the feedback about the FLIP-405: Migrate string
> configuration key to ConfigOption [1] [2].
>
> I'd like to start a vote for it. The vote will be open for at least 72
> hours(excluding weekends,until Jan 11, 12:00AM GMT) unless there is an
> objection or an insufficient number of votes.
>
>
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-405%3A+Migrate+string+configuration+key+to+ConfigOption
> [2] https://lists.apache.org/thread/zfw1b1g3679yn0ppjbsokfrsx9k7ybg0
>
>
> Best,
> Xuannan
>


[VOTE] FLIP-405: Migrate string configuration key to ConfigOption

2024-01-07 Thread Xuannan Su
Hi everyone,

Thanks for all the feedback about the FLIP-405: Migrate string
configuration key to ConfigOption [1] [2].

I'd like to start a vote for it. The vote will be open for at least 72
hours(excluding weekends,until Jan 11, 12:00AM GMT) unless there is an
objection or an insufficient number of votes.



[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-405%3A+Migrate+string+configuration+key+to+ConfigOption
[2] https://lists.apache.org/thread/zfw1b1g3679yn0ppjbsokfrsx9k7ybg0


Best,
Xuannan


Re: FLIP-413: Enable unaligned checkpoints by default

2024-01-07 Thread Zhanghao Chen
Hi Piotr,

As a platform administer who runs kilos of Flink jobs, I'd be against the idea 
to enable unaligned cp by default for our jobs. It may help a significant 
portion of the users, but the subtle issues around unaligned CP for a few jobs 
will probably raise a lot more on-calls and incidents. From my point of view, 
we'd better not enable it by default before removing all the limitations listed 
in 
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/ops/state/checkpointing_under_backpressure/#limitations.

Best,
Zhanghao Chen

From: Piotr Nowojski 
Sent: Friday, January 5, 2024 21:41
To: dev 
Subject: FLIP-413: Enable unaligned checkpoints by default

Hi!

I would like to propose by default to enable unaligned checkpoints and also
simultaneously increase the aligned checkpoints timeout from 0ms to 5s. I
think this change is the right one to do for the majority of Flink users.

For more rationale please take a look into the short FLIP-413 [1].

What do you all think?

Best,
Piotrek

https://cwiki.apache.org/confluence/display/FLINK/FLIP-413%3A+Enable+unaligned+checkpoints+by+default


Re: [DISCUSS] FLIP-403: High Availability Services for OLAP Scenarios

2024-01-07 Thread Yong Fang
Thanks Yangze for starting this discussion. I have one comment: why do we
need to abstract two services as `LeaderServices` and
`PersistenceServices`?

>From the content, the purpose of this FLIP is to make job failover more
lightweight, so it would be more appropriate to abstract two services as
`ClusterHighAvailabilityService` and `JobHighAvailabilityService` instead
of `LeaderServices` and `PersistenceServices` based on leader and store. In
this way, we can create a `JobHighAvailabilityService` that has a leader
service and store for the job that meets the requirements based on the
configuration in the zk/k8s high availability service.

WDYT?

Best,
Fang Yong

On Fri, Dec 29, 2023 at 8:10 PM xiangyu feng  wrote:

> Thanks Yangze for restart this discussion.
>
> +1 for the overall idea. By splitting the HighAvailabilityServices into
> LeaderServices and PersistenceServices, we may support configuring
> different storage behind them in the future.
>
> We did run into real problems in production where too much job metadata was
> being stored on ZK, causing system instability.
>
>
> Yangze Guo  于2023年12月29日周五 10:21写道:
>
> > Thanks for the response, Zhanghao.
> >
> > PersistenceServices sounds good to me.
> >
> > Best,
> > Yangze Guo
> >
> > On Wed, Dec 27, 2023 at 11:30 AM Zhanghao Chen
> >  wrote:
> > >
> > > Thanks for driving this effort, Yangze! The proposal overall LGTM.
> Other
> > from the throughput enhancement in the OLAP scenario, the separation of
> > leader election/discovery services and the metadata persistence services
> > will also make the HA impl clearer and easier to maintain. Just a minor
> > comment on naming: would it better to rename PersistentServices to
> > PersistenceServices, as usually we put a noun before Services?
> > >
> > > Best,
> > > Zhanghao Chen
> > > 
> > > From: Yangze Guo 
> > > Sent: Tuesday, December 19, 2023 17:33
> > > To: dev 
> > > Subject: [DISCUSS] FLIP-403: High Availability Services for OLAP
> > Scenarios
> > >
> > > Hi, there,
> > >
> > > We would like to start a discussion thread on "FLIP-403: High
> > > Availability Services for OLAP Scenarios"[1].
> > >
> > > Currently, Flink's high availability service consists of two
> > > mechanisms: leader election/retrieval services for JobManager and
> > > persistent services for job metadata. However, these mechanisms are
> > > set up in an "all or nothing" manner. In OLAP scenarios, we typically
> > > only require leader election/retrieval services for JobManager
> > > components since jobs usually do not have a restart strategy.
> > > Additionally, the persistence of job states can negatively impact the
> > > cluster's throughput, especially for short query jobs.
> > >
> > > To address these issues, this FLIP proposes splitting the
> > > HighAvailabilityServices into LeaderServices and PersistentServices,
> > > and enable users to independently configure the high availability
> > > strategies specifically related to jobs.
> > >
> > > Please find more details in the FLIP wiki document [1]. Looking
> > > forward to your feedback.
> > >
> > > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-403+High+Availability+Services+for+OLAP+Scenarios
> > >
> > > Best,
> > > Yangze Guo
> >
>


[jira] [Created] (FLINK-34014) Jdbc connector can avoid send empty insert to database when there's no buffer data

2024-01-07 Thread luoyuxia (Jira)
luoyuxia created FLINK-34014:


 Summary: Jdbc connector can avoid send empty insert to database 
when there's no buffer data
 Key: FLINK-34014
 URL: https://issues.apache.org/jira/browse/FLINK-34014
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / JDBC
Reporter: luoyuxia


In jdbc connector, we will have a background thread to flush buffered data to 
database, but when no data is in buffer, we can avoid the flush to database.

we can avoid it in method JdbcOutputFormat#attemptFlush or in
JdbcBatchStatementExecutor like TableBufferedStatementExecutor  which can aovid 
calling  {{statementExecutor.executeBatch()}} when buffer is empty



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-398: Improve Serialization Configuration And Usage In Flink

2024-01-07 Thread Yong Fang
Hi Ken,

I think the main reason is that currently Kryo is the only generic
serializer in Flink. I'm looking forward to your FLIP of Fury, and we can
continue to discuss this issue there.

If there are no other questions, I will close the voting for this FLIP.
Thank you again.

Best,
Fang Yong

On Sat, Jan 6, 2024 at 2:27 AM Ken Krugler 
wrote:

> Hi Fang Yong,
>
> Thanks for the response, and I understand the desire to limit the impact
> of this FLIP.
>
> I guess I should spend the time to start a new FLIP on switching to Fury,
> which could include cleaning up method names.
>
> In the context of “facilitate user understanding”, one aspect of this
> cleanup is the current ExecutionConfig.enable/disable/hasGenericTypes()
> methods.
>
> These are inconsistent with the current xxxKryo() methods, and cause
> confusion whenever I’m teaching a Flink course :)
>
> Regards,
>
> — Ken
>
>
>
>
> On Jan 4, 2024, at 6:40 PM, Yong Fang  wrote:
>
> Hi Ken,
>
> Sorry for the late reply. After discussing with @Xintong, we think it is
> better to keep the method names in the FLIP mainly for the following
> reasons:
>
> 1. This FLIP is mainly to support the configurable serializer while
> keeping consistent with Flink at the semantic layer. Keeping the existing
> naming rules can facilitate user understanding.
>
> 2. In the future, if Flink can choose Fury as the generic serializer, we
> can update the corresponding methods in that FLIP after the discussion of
> Fury is completed. This will be a minor modification, and we can avoid
> over-design in the current FLIP.
>
> Thanks for your feedback!
>
> Best,
> Fang Yong
>
> On Fri, Dec 29, 2023 at 12:38 PM Ken Krugler 
> wrote:
>
>> Hi Xintong,
>>
>> I agree that decoupling from Kryo is a bigger topic, well beyond the
>> scope of this FLIP.
>>
>> The reason I’d brought up Fury is that this increases my confidence that
>> Flink will want to decouple from Kryo sooner rather than later.
>>
>> So I feel it would be worth investing in a (minor) name change now, to
>> improve that migration path in the future. Thus my suggestion for avoiding
>> the explicit use of Kryo in method names.
>>
>> Regards,
>>
>> — Ken
>>
>>
>>
>>
>> > On Dec 17, 2023, at 7:16 PM, Xintong Song 
>> wrote:
>> >
>> > Hi Ken,
>> >
>> > I think the main purpose of this FLIP is to change how users interact
>> with
>> > the knobs for customizing the serialization behaviors, from requiring
>> code
>> > changes to working with pure configurations. Redesigning the knobs
>> (i.e.,
>> > names, semantics, etc.), on the other hand, is not the purpose of this
>> > FLIP. Preserving the existing names and semantics should also help
>> minimize
>> > the migration cost for existing users. Therefore, I'm in favor of not
>> > changing them.
>> >
>> > Concerning decoupling from Kryo, and introducing other serialization
>> > frameworks like Fury, I think that's a bigger topic that is worth
>> further
>> > discussion. At the moment, I'm not aware of any community consensus on
>> > doing so. And even if in the future we decide to do so, the changes
>> needed
>> > should be the same w/ or w/o this FLIP. So I'd suggest not to block this
>> > FLIP on these issues.
>> >
>> > WDYT?
>> >
>> > Best,
>> >
>> > Xintong
>> >
>> >
>> >
>> > On Fri, Dec 15, 2023 at 1:40 AM Ken Krugler <
>> kkrugler_li...@transpac.com>
>> > wrote:
>> >
>> >> Hi Yong,
>> >>
>> >> Looks good, thanks for creating this.
>> >>
>> >> One comment - related to my recent email about Fury, I would love to
>> see
>> >> the v2 serialization decoupled from Kryo.
>> >>
>> >> As part of that, instead of using xxxKryo in methods, call them
>> xxxGeneric.
>> >>
>> >> A more extreme change would be to totally rely on Fury (so no more POJO
>> >> serializer). Fury is faster than the POJO serializer in my tests, but
>> this
>> >> would be a much bigger change.
>> >>
>> >> Though it could dramatically simplify the Flink serialization support.
>> >>
>> >> — Ken
>> >>
>> >> PS - a separate issue is how to migrate state from Kryo to something
>> like
>> >> Fury, which supports schema evolution. I think this might be possible,
>> by
>> >> having a smarter deserializer that identifies state as being created by
>> >> Kryo, and using (shaded) Kryo to deserialize, while still writing as
>> Fury.
>> >>
>> >>> On Dec 6, 2023, at 6:35 PM, Yong Fang  wrote:
>> >>>
>> >>> Hi devs,
>> >>>
>> >>> I'd like to start a discussion about FLIP-398: Improve Serialization
>> >>> Configuration And Usage In Flink [1].
>> >>>
>> >>> Currently, users can register custom data types and serializers in
>> Flink
>> >>> jobs through various methods, including registration in code,
>> >>> configuration, and annotations. These lead to difficulties in
>> upgrading
>> >>> Flink jobs and priority issues.
>> >>>
>> >>> In flink-2.0 we would like to manage job data types and serializers
>> >> through
>> >>> configurations. This FLIP will introduce a unified option for data
>> type
>> >> and
>> >>> serializer and users 

[DISCUSS] FLIP-414: Support Retry Mechanism in RocksDBStateDataTransfer

2024-01-07 Thread xiangyu feng
Hi devs,

I'm opening this thread to discuss FLIP-414: Support Retry Mechanism in
RocksDBStateDataTransfer[1].

Currently, there is no retry mechanism for downloading and uploading
RocksDB state files. Any jittering of remote filesystem might lead to a
checkpoint failure. By supporting retry mechanism in
`RocksDBStateDataTransfer`, we can significantly reduce the failure rate of
checkpoint during asynchronous phrase.

To make this retry mechanism configurable, we have introduced two options
in this FLIP: `state.backend.rocksdb.checkpoint.transfer.retry.times` and `
state.backend.rocksdb.checkpoint.transfer.retry.interval`. The default
behavior remains to be no retry will be performed in order to be consistent
with the original behavior.

Looking forward to your feedback, thanks.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-414%3A+Support+Retry+Mechanism+in+RocksDBStateDataTransfer

Best regards,
Xiangyu Feng


Re: [Discuss] FLIP-407: Improve Flink Client performance in interactive scenarios

2024-01-07 Thread Rui Fan
Thanks xiangyu for driving this proposal! And sorry for the
late reply.

Overall looks good to me, I only have some minor questions:

1. Do we need to introduce 3 collect strategies in the first version?

Large and comprehensive configuration items will bring
additional learning costs and usage costs to users. I tend to
provide users with out-of-the-box parameters and 2 collect
strategies may be enough for users.

IIUC, there is no big difference between exponential-delay and
incremental-delay, especially the default parameters provided.
I wonder could we provide a multiplier for exponential-delay strategy
and removing the incremental-delay strategy?

Of course, if you think multiplier option is not needed based on
your production experience, it's totally fine for me. Simple is better.

2. Which strategy do you think is best in mass production?

I'm working on FLIP-364[1], it's related to Flink failover restart
strategy. IIUC, when one cluster only has a few flink jobs,
fixed-delay is fine. It guarantees minimal latency without too
much stress. But if one cluster has too many jobs, fixed-delay
may not be stable.

Do you think exponential-delay is better than fixed delay in this
scenario? And which strategy is used in your production for now?
Would you mind sharing it?

Looking forwarding to your opinion~

Best,
Rui

On Sat, Jan 6, 2024 at 5:54 PM xiangyu feng  wrote:

> Hi all,
>
> Thanks for the comments.
>
> If there is no further comment, we will open the voting thread next week.
>
> Regards,
> Xiangyu
>
> Zhanghao Chen  于2024年1月3日周三 16:46写道:
>
> > Thanks for driving this effort on improving the interactive use
> experience
> > of Flink. The proposal overall looks good to me.
> >
> > Best,
> > Zhanghao Chen
> > 
> > From: xiangyu feng 
> > Sent: Tuesday, December 26, 2023 16:51
> > To: dev@flink.apache.org 
> > Subject: [Discuss] FLIP-407: Improve Flink Client performance in
> > interactive scenarios
> >
> > Hi devs,
> >
> > I'm opening this thread to discuss FLIP-407: Improve Flink Client
> > performance in interactive scenarios. The POC test results and design doc
> > can be found at: FLIP-407
> > <
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-407%3A+Improve+Flink+Client+performance+when+interacting+with+dedicated+Flink+Session+Clusters
> > >
> > .
> >
> > Currently, Flink Client is mainly designed for one time interaction with
> > the Flink Cluster. All the resources(http connections, threads, ha
> > services) and instances(ClusterDescriptor, ClusterClient, RestClient) are
> > created and recycled for each interaction. This works well when users do
> > not need to interact frequently with Flink Cluster and also saves
> resource
> > usage since resources are recycled immediately after each usage.
> >
> > However, in OLAP or StreamingWarehouse scenarios, users might submit
> > interactive jobs to a dedicated Flink Session Cluster very often. In this
> > case, we find that for short queries that can finish in less than 1s in
> > Flink Cluster will still have E2E latency greater than 2s. Hence, we
> > propose this FLIP to improve the Flink Client performance in this
> scenario.
> > This could also improve the user experience when using session debug
> mode.
> >
> > The major change in this FLIP is that there will be a new introduced
> option
> > *'execution.interactive-client'*. When this option is enabled, Flink
> > Client will reuse all the necessary resources to improve interactive
> > performance, including: HA Services, HTTP connections, threads and all
> > kinds of instances related to a long-running Flink Cluster. The default
> > value of this option will be false, then Flink Client will behave as
> > before.
> >
> > Also, this FLIP proposed a configurable RetryStrategy when fetching
> results
> > from client-side to Flink Cluster. In interactive scenarios, this can
> save
> > more than 15% of TM CPU usage without performance degradation.
> >
> > Looking forward to your feedback, thanks.
> >
> > Best regards,
> > Xiangyu
> >
>


Re: [DISCUSS] FLIP-406: Reorganize State & Checkpointing & Recovery Configuration

2024-01-07 Thread Yun Tang
Hi Zakelly,

Thanks for driving this topic. I have two concerns here:

  1.  We shall not describe the configuration with its implementation for 
​'execution.checkpointing.local-copy.*' options, for hashmap state-backend, it 
would write two streams and for Rocksdb state-backend, it would use hard-link 
for backup​. Thus, I think 'execution.checkpointing.local-backup.*' looks 
better.
  2.  What does the 'execution.checkpointing.data-inline-threshold' mean? It 
seems not so easy to understand.

Best
Yun Tang

From: Piotr Nowojski 
Sent: Thursday, January 4, 2024 22:37
To: dev@flink.apache.org 
Subject: Re: [DISCUSS] FLIP-406: Reorganize State & Checkpointing & Recovery 
Configuration

Hi,

Thanks for trying to clean this up! I don't have strong opinions on the
topics discussed here, so generally speaking +1 from my side!

Best,
Piotrek

śr., 3 sty 2024 o 04:16 Rui Fan <1996fan...@gmail.com> napisał(a):

> Thanks for the feedback!
>
> Using the `execution.checkpointing.incremental.enabled`,
> and enabling it by default sounds good to me.
>
> Best,
> Rui
>
> On Wed, Jan 3, 2024 at 11:10 AM Zakelly Lan  wrote:
>
> > Hi Rui,
> >
> > Thanks for your comments!
> >
> > IMO, given that the state backend can be plugably loaded (as you can
> > specify a state backend factory), I prefer not providing state backend
> > specified options in the framework.
> >
> > Secondly, the incremental checkpoint is actually a sharing file strategy
> > across checkpoints, which means the state backend *could* reuse files
> from
> > previous cp but not *must* do so. When the state backend could not reuse
> > the files, it is reasonable to fallback to a full checkpoint.
> >
> > Thus, I suggest we make it `execution.checkpointing.incremental` and
> enable
> > it by default. For those state backends not supporting this, they perform
> > full checkpoints and print a warning to inform users. Users do not need
> to
> > pay special attention to different options to control this across
> different
> > state backends. This is more user-friendly in my opinion. WDYT?
> >
> > On Tue, Jan 2, 2024 at 10:49 AM Rui Fan <1996fan...@gmail.com> wrote:
> >
> > > Hi Zakelly,
> > >
> > > I'm not sure whether we could add the state backend type in the
> > > new key name of state.backend.incremental. It means we use
> > > `execution.checkpointing.rocksdb-incremental` or
> > > `execution.checkpointing.rocksdb-incremental.enabled`.
> > >
> > > So far, state.backend.incremental only works for rocksdb state backend.
> > > And this feature or optimization is very valuable and huge for large
> > > state flink jobs. I believe it's enabled for most production flink jobs
> > > with large rocksdb state.
> > >
> > > If this option isn't generic for all state backend types, I guess we
> > > can enable `execution.checkpointing.rocksdb-incremental.enabled`
> > > by default in Flink 2.0.
> > >
> > > But if it works for all state backends, it's hard to enable it by
> > default.
> > > Enabling great and valuable features or improvements are useful
> > > for users, especially a lot of new flink users. Out-of-the-box options
> > > are good for users.
> > >
> > > WDYT?
> > >
> > > Best,
> > > Rui
> > >
> > > On Fri, Dec 29, 2023 at 1:45 PM Zakelly Lan 
> > wrote:
> > >
> > > > Hi everyone,
> > > >
> > > > Thanks all for your comments!
> > > >
> > > > As many of you have questions about the names for boolean options, I
> > > > suggest we make a naming rule for them. For now I could think of
> three
> > > > options:
> > > >
> > > > Option 1: Use enumeration options if possible. But this may cause
> some
> > > name
> > > > collisions or confusion as we discussed and we should unify the
> > statement
> > > > everywhere.
> > > > Option 2: Use boolean options and add 'enabled' as the suffix.
> > > > Option 3: Use boolean options and ONLY add 'enabled' when there are
> > more
> > > > detailed configurations under the same prefix, to prevent one name
> from
> > > > serving as a prefix to another.
> > > >
> > > > I am slightly inclined to Option 3, since it is more in line with
> > current
> > > > practice and friendly for existing users. Also It reduces the length
> of
> > > > configuration names as much as possible. I really want to hear your
> > > > opinions.
> > > >
> > > >
> > > > @Xuannan
> > > >
> > > > I agree with your comments 1 and 3.
> > > >
> > > > For 2, If we decide to change the name, maybe
> > > > `execution.checkpointing.parallel-cleaner` is better? And as for
> > whether
> > > to
> > > > add 'enabled' I suggest we discuss the rule above. WDYT?
> > > > Thanks!
> > > >
> > > >
> > > > Best,
> > > > Zakelly
> > > >
> > > > On Fri, Dec 29, 2023 at 12:02 PM Xuannan Su 
> > > wrote:
> > > >
> > > > > Hi Zakelly,
> > > > >
> > > > > Thanks for driving this! The organization of the configuration
> option
> > > > > in the FLIP looks much cleaner and easier to understand. +1 to the
> > > > > FLIP.
> > > > >
> > > > > Just some questions from 

Re: [DISCUSS] FLIP-405: Migrate string configuration key to ConfigOption

2024-01-07 Thread Xuannan Su
Hi all,

Thanks for the discussion. I think all the comments and questions have
been addressed. I will open the voting thread today.

Best,
Xuannan


On Tue, Jan 2, 2024 at 11:59 AM Xuannan Su  wrote:
>
> Hi all,
>
> Thank you for all your comments! The FLIP has been updated
> accordingly. Please let me know if you have any further questions or
> comments.
>
> Also, note that many people are on Christmas break, so we will keep
> the discussion open for another week.
>
> Best,
> Xuannan
>
> On Wed, Dec 27, 2023 at 5:20 PM Rui Fan <1996fan...@gmail.com> wrote:
> >
> > > After some investigation, it turns out those options of input/output
> > > format are only publicly exposed in the DataSet docs[2], which is
> > > deprecated. Thus, marking them as deprecated and removed in Flink 2.0
> > > looks fine to me.
> >
> > Thanks Xuannan for the detailed investigation, if so, deprecate them
> > and removing them in Flink 2.0 looks good to me.
> >
> > > I think the key of LOCAL_NUMBER_TASK_MANAGER is better as
> > > 'minicluster.number-of-taskmanagers' or 'minicluster.taskmanager-number'
> > > instead of 'minicluster.number-taskmanager'.
> >
> > Thanks Hang for the good suggestion! 'minicluster.number-of-taskmanagers'
> > sounds good to me, it's similar to taskmanager.numberOfTaskSlots.
> >
> > Best,
> > Rui
> >
> > On Wed, Dec 27, 2023 at 1:56 PM Hang Ruan  wrote:
> >>
> >> Hi, Rui Fan.
> >>
> >> Thanks for this FLIP.
> >>
> >> I think the key of LOCAL_NUMBER_TASK_MANAGER is better as
> >> 'minicluster.number-of-taskmanagers' or 'minicluster.taskmanager-number'
> >> instead of 'minicluster.number-taskmanager'.
> >>
> >> Best,
> >> Hang
> >>
> >> Xuannan Su  于2023年12月27日周三 12:40写道:
> >>
> >> > Hi Xintong and Rui,
> >> >
> >> > Thanks for the quick feedback and the suggestions.
> >> >
> >> > > 1. I think the default value for `TASK_MANAGER_LOG_PATH_KEY` should be
> >> > "no
> >> > > default".
> >> >
> >> > I have considered both ways of describing the default value. However,
> >> > I found out that some of the configurations, such as `web.tmpdir`, put
> >> > `System.getProperty()` in the default value [1]. Some are putting the
> >> > description in the default value column[2]. So I just picked the first
> >> > one. I am fine with either way, so long as they are consistent. WDYT?
> >> >
> >> > > 3. Simply saying "getting / setting value with string key is 
> >> > > discouraged"
> >> > > in JavaDoc of get/setString is IMHO a bit confusing. People may have 
> >> > > the
> >> > > question why would we keep the discouraged interfaces at all. I would
> >> > > suggest the following:
> >> > > ```
> >> > > We encourage users and developers to always use ConfigOption for 
> >> > > getting
> >> > /
> >> > > setting the configurations if possible, for its rich description, type,
> >> > > default-value and other supports. The string-key-based getter / setter
> >> > > should only be used when ConfigOption is not applicable, e.g., the key 
> >> > > is
> >> > > programmatically generated in runtime.
> >> > > ```
> >> >
> >> > The suggested comment looks good to me. Thanks for the suggestion. I
> >> > will update the comment in the FLIP.
> >> >
> >> > > 2. So I wonder if we can simply mark them as deprecated and remove in
> >> > 2.0.
> >> >
> >> > After some investigation, it turns out those options of input/output
> >> > format are only publicly exposed in the DataSet docs[2], which is
> >> > deprecated. Thus, marking them as deprecated and removed in Flink 2.0
> >> > looks fine to me.
> >> >
> >> >
> >> > @Rui
> >> >
> >> > > Configuration has a `public  T get(ConfigOption option)` method.
> >> > > Could we remove all `Xxx getXxx(ConfigOption configOption)` 
> >> > > methods?
> >> >
> >> > +1 Only keep the get(ConfigOption option),
> >> > getOptional(ConfigOption option), and set(ConfigOption option, T
> >> > value).
> >> >
> >> > Best,
> >> > Xuannan
> >> >
> >> > [1]
> >> > https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#web-tmpdir
> >> > [2]
> >> > https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#kubernetes-container-image-ref
> >> > [3]
> >> > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/dataset/overview/#data-sources
> >> >
> >> >
> >> >
> >> >
> >> > On Tue, Dec 26, 2023 at 8:47 PM Xintong Song 
> >> > wrote:
> >> > >
> >> > > >
> >> > > > Configuration has a `public  T get(ConfigOption option)` 
> >> > > > method.
> >> > > > Could we remove all `Xxx getXxx(ConfigOption configOption)`
> >> > methods?
> >> > >
> >> > >
> >> > >
> >> > > Note: all `public void setXxx(ConfigOption key, Xxx value)` 
> >> > > methods
> >> > > > can be replaced with `public  Configuration set(ConfigOption
> >> > option,
> >> > > > T value)` as well.
> >> > >
> >> > >
> >> > > +1
> >> > >
> >> > >
> >> > > Best,
> >> > >
> >> > > Xintong
> >> > >
> >> > >
> >> > >
> >> > > On Tue, Dec 26, 2023 at 8:44 PM Xintong Song 
> >> > wrote:
> >> > >
> >> > > > These features don't have 

Re: [DISCUSS] FLIP-329: Add operator attribute to specify support for object-reuse

2024-01-07 Thread Xuannan Su
Hi Liu,

The voting thread has been open for a long time. We may want to start
a new voting thread. WDYT?

Best,
Xuannan

On Sat, Jan 6, 2024 at 1:51 AM Lu Niu  wrote:
>
> Thank you Dong and Xuannan!
>
> Yes. We can take on this task. Any help during bootstrapping would be greatly 
> appreciated! I realize there is already a voting thread "[VOTE] FLIP-329: Add 
> operator attribute to specify support for object-reuse". What else do we need?
>
> Best
> Lu
>
> On Fri, Jan 5, 2024 at 12:46 AM Xuannan Su  wrote:
>>
>> Hi Lu,
>>
>> I believe this feature is very useful. However, I currently lack the
>> capacity to work on it in the near future. I think it would be great
>> if you could take on the task. I am willing to offer assistance if
>> there are any questions about the FLIP, or to review the PR if needed.
>>
>> Please let me know if you are interested in taking over this task. And
>> also think that we should start the voting thread if no future
>> comments on this FLIP.
>>
>> Best,
>> Xuannan
>>
>>
>>
>> On Fri, Jan 5, 2024 at 2:23 PM Dong Lin  wrote:
>> >
>> > Hi Lu,
>> >
>> > I am not actively working on Flink and this JIRA recently. If Xuannan does 
>> > not plan to work on this anytime soon, I personally think it will be great 
>> > if you can help work on this FLIP. Maybe we can start the voting thread if 
>> > there is no further comment on this FLIP.
>> >
>> > Xuannan, what do you think?
>> >
>> > Thanks,
>> > Dong
>> >
>> >
>> > On Fri, Jan 5, 2024 at 2:03 AM Lu Niu  wrote:
>> >>
>> >> Hi,
>> >>
>> >> Is this still under active development? I notice 
>> >> https://issues.apache.org/jira/browse/FLINK-32476 is labeled as 
>> >> deprioritized. If this is the case, would it be acceptable for us to take 
>> >> on the task?
>> >>
>> >> Best
>> >> Lu
>> >>
>> >>
>> >>
>> >> On Thu, Oct 19, 2023 at 4:26 PM Ken Krugler  
>> >> wrote:
>> >>>
>> >>> Hi Dong,
>> >>>
>> >>> Sorry for not seeing this initially. I did have one question about the 
>> >>> description of the issue in the FLIP:
>> >>>
>> >>> > However, in cases where the upstream and downstream operators do not 
>> >>> > store or access references to the input or output records, this 
>> >>> > deep-copy overhead becomes unnecessary
>> >>>
>> >>> I was interested in getting clarification as to what you meant by “or 
>> >>> access references…”, to see if it covered this situation:
>> >>>
>> >>> StreamX —forward--> operator1
>> >>> StreamX —forward--> operator2
>> >>>
>> >>> If operator1 modifies the record, and object re-use is enabled, then 
>> >>> operator2 will see the modified version, right?
>> >>>
>> >>> Thanks,
>> >>>
>> >>> — Ken
>> >>>
>> >>> > On Jul 2, 2023, at 7:24 PM, Xuannan Su  wrote:
>> >>> >
>> >>> > Hi all,
>> >>> >
>> >>> > Dong(cc'ed) and I are opening this thread to discuss our proposal to
>> >>> > add operator attribute to allow operator to specify support for
>> >>> > object-reuse [1].
>> >>> >
>> >>> > Currently, the default configuration for pipeline.object-reuse is set
>> >>> > to false to avoid data corruption, which can result in suboptimal
>> >>> > performance. We propose adding APIs that operators can utilize to
>> >>> > inform the Flink runtime whether it is safe to reuse the emitted
>> >>> > records. This enhancement would enable Flink to maximize its
>> >>> > performance using the default configuration.
>> >>> >
>> >>> > Please refer to the FLIP document for more details about the proposed
>> >>> > design and implementation. We welcome any feedback and opinions on
>> >>> > this proposal.
>> >>> >
>> >>> > Best regards,
>> >>> >
>> >>> > Dong and Xuannan
>> >>> >
>> >>> > [1] 
>> >>> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255073749
>> >>>
>> >>> --
>> >>> Ken Krugler
>> >>> http://www.scaleunlimited.com
>> >>> Custom big data solutions
>> >>> Flink & Pinot
>> >>>
>> >>>
>> >>>


[jira] [Created] (FLINK-34013) ProfilingServiceTest.testRollingDeletion is unstable on AZP

2024-01-07 Thread Sergey Nuyanzin (Jira)
Sergey Nuyanzin created FLINK-34013:
---

 Summary: ProfilingServiceTest.testRollingDeletion is unstable on 
AZP
 Key: FLINK-34013
 URL: https://issues.apache.org/jira/browse/FLINK-34013
 Project: Flink
  Issue Type: Bug
  Components: API / Core
Affects Versions: 1.19.0
Reporter: Sergey Nuyanzin


This build 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56073=logs=0e7be18f-84f2-53f0-a32d-4a5e4a174679=7c1d86e3-35bd-5fd5-3b7c-30c126a78702=8258
 fails as 
{noformat}
Jan 06 02:09:28 org.opentest4j.AssertionFailedError: expected: <2> but was: <3>
Jan 06 02:09:28 at 
org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
Jan 06 02:09:28 at 
org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
Jan 06 02:09:28 at 
org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197)
Jan 06 02:09:28 at 
org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:150)
Jan 06 02:09:28 at 
org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:145)
Jan 06 02:09:28 at 
org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:531)
Jan 06 02:09:28 at 
org.apache.flink.runtime.util.profiler.ProfilingServiceTest.verifyRollingDeletionWorks(ProfilingServiceTest.java:167)
Jan 06 02:09:28 at 
org.apache.flink.runtime.util.profiler.ProfilingServiceTest.testRollingDeletion(ProfilingServiceTest.java:117)
Jan 06 02:09:28 at java.lang.reflect.Method.invoke(Method.java:498)
Jan 06 02:09:28 at 
java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
Jan 06 02:09:28 at 
java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
Jan 06 02:09:28 at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
Jan 06 02:09:28 at 
java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
Jan 06 02:09:28 at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
{noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34012) Flink python fails with can't read file '/__w/2/s/flink-python/dev/.conda/lib/python3.10/site-packages//google

2024-01-07 Thread Sergey Nuyanzin (Jira)
Sergey Nuyanzin created FLINK-34012:
---

 Summary: Flink python fails with  can't read file 
'/__w/2/s/flink-python/dev/.conda/lib/python3.10/site-packages//google
 Key: FLINK-34012
 URL: https://issues.apache.org/jira/browse/FLINK-34012
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Reporter: Sergey Nuyanzin
 Fix For: 1.19.0


This build 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56073=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=c67e71ed-6451-5d26-8920-5a8cf9651901=20755
{noformat}
Jan 06 03:02:43 Installing collected packages: types-pytz, 
types-python-dateutil, types-protobuf
Jan 06 03:02:43 Successfully installed types-protobuf-4.24.0.20240106 
types-python-dateutil-2.8.19.20240106 types-pytz-2023.3.1.1
Jan 06 03:02:44 mypy: can't read file 
'/__w/2/s/flink-python/dev/.conda/lib/python3.10/site-packages//google': No 
such file or directory
Jan 06 03:02:44 Installing missing stub packages:
Jan 06 03:02:44 /__w/2/s/flink-python/dev/.conda/bin/python -m pip install 
types-protobuf types-python-dateutil types-pytz

{noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34011) EmbeddedThreadDependencyTests.test_add_python_file fails with This function should not be called!

2024-01-07 Thread Sergey Nuyanzin (Jira)
Sergey Nuyanzin created FLINK-34011:
---

 Summary: EmbeddedThreadDependencyTests.test_add_python_file fails 
with This function should not be called!
 Key: FLINK-34011
 URL: https://issues.apache.org/jira/browse/FLINK-34011
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.18.2
Reporter: Sergey Nuyanzin


This build 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=55966=logs=bf5e383b-9fd3-5f02-ca1c-8f788e2e76d3=85189c57-d8a0-5c9c-b61d-fc05cfac62cf=27052
fails as 
{noformat}
Jan 02 03:27:17 E   Caused by: pemja.core.PythonException: 
: This function should not be called!
Jan 02 03:27:17 E   at 
/__w/1/s/flink-python/pyflink/fn_execution/embedded/operation_utils.process_element(operation_utils.py:72)
Jan 02 03:27:17 E   at 
/__w/1/s/flink-python/pyflink/fn_execution/table/operations.process_element(operations.py:102)
Jan 02 03:27:17 E   at .(:1)
Jan 02 03:27:17 E   at 
/__w/1/s/flink-python/pyflink/table/tests/test_dependency.plus_two(test_dependency.py:52)
Jan 02 03:27:17 E   at 
/tmp/python-dist-98ae39ca-f087-4122-8e67-f358a13f57c7/python-files/blob_p-bedc014754a86159784fd7cebff77ab534802643-43ac971c5ca669babbfe4ee1d5a1ad0e/test_dependency_manage_lib.add_two(test_dependency_manage_lib.py:2)
Jan 02 03:27:17 E   at 
pemja.core.PythonInterpreter.invokeMethodOneArgLong(Native Method)
Jan 02 03:27:17 E   at 
pemja.core.PythonInterpreter.invokeMethodOneArg(PythonInterpreter.java:222)
Jan 02 03:27:17 E   at 
pemja.core.PythonInterpreter.invokeMethod(PythonInterpreter.java:116)
Jan 02 03:27:17 E   at 
org.apache.flink.table.runtime.operators.python.scalar.EmbeddedPythonScalarFunctionOperator.processElement(EmbeddedPythonScalarFunctionOperator.java:170)
Jan 02 03:27:17 E   at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
Jan 02 03:27:17 E   at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
Jan 02 03:27:17 E   at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
Jan 02 03:27:17 E   at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:425)
Jan 02 03:27:17 E   at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:520)
Jan 02 03:27:17 E   at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.collect(StreamSourceContexts.java:110)
Jan 02 03:27:17 E   at 
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:99)
Jan 02 03:27:17 E   at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:114)
Jan 02 03:27:17 E   at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:71)
Jan 02 03:27:17 E   at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:338)
{noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34010) KafkaTableITCase.testPerPartitionWatermarkWithIdleSource is unstable on AZP

2024-01-07 Thread Sergey Nuyanzin (Jira)
Sergey Nuyanzin created FLINK-34010:
---

 Summary: KafkaTableITCase.testPerPartitionWatermarkWithIdleSource 
is unstable on AZP
 Key: FLINK-34010
 URL: https://issues.apache.org/jira/browse/FLINK-34010
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.17.3
Reporter: Sergey Nuyanzin


This build 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=55965=logs=9c5a5fe6-2f39-545e-1630-feb3d8d0a1ba=99b23320-1d05-5741-d63f-9e78473da39e=32434
fails as 
{noformat}
Jan 02 05:24:56 [ERROR] Tests run: 39, Failures: 0, Errors: 1, Skipped: 0, Time 
elapsed: 97.948 s <<< FAILURE! - in 
org.apache.flink.streaming.connectors.kafka.table.KafkaTableITCase
Jan 02 05:24:56 [ERROR] 
KafkaTableITCase.testPerPartitionWatermarkWithIdleSource  Time elapsed: 5.494 s 
 <<< ERROR!
Jan 02 05:24:56 java.util.concurrent.TimeoutException: Can not get the expected 
result.
Jan 02 05:24:56 at 
org.apache.flink.core.testutils.CommonTestUtils.waitUtil(CommonTestUtils.java:214)
Jan 02 05:24:56 at 
org.apache.flink.core.testutils.CommonTestUtils.waitUtil(CommonTestUtils.java:230)
Jan 02 05:24:56 at 
org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestUtils.waitingExpectedResults(KafkaTableTestUtils.java:82)
Jan 02 05:24:56 at 
org.apache.flink.streaming.connectors.kafka.table.KafkaTableITCase.testPerPartitionWatermarkWithIdleSource(KafkaTableITCase.java:951)
Jan 02 05:24:56 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
Jan 02 05:24:56 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
Jan 02 05:24:56 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
{noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-413: Enable unaligned checkpoints by default

2024-01-07 Thread Ken Krugler
In line with what David said, after having to explain the (often subtle) issues 
around unaligned checkpoints and upgrades while teaching Flink, I would also be 
concerned about enabling it by default.

Would it be better to provide more automatic detection of situations where 
unaligned checkpoints helped, along with appropriate warnings?

— Ken

PS - and I hope I’m not banging on a lonely drum, but Fury supports schema 
evolution and is faster than the POJO serializer…so if we switched to that, we 
could in theory support evolution of checkpoints that contain on-the-wire 
records.

> On Jan 7, 2024, at 9:52 AM, David Anderson  wrote:
> 
> Piotr, I think the situation is more nuanced than what you've described.
> 
> One concern I have is that unaligned checkpoints are somewhat less flexible
> in terms of which operational tasks can be safely performed with them --
> i.e., if you look at the table in the docs [1], aligned checkpoints support
> arbitrary job upgrades and flink minor version upgrades, and unaligned
> checkpoints do not.
> 
> The change you propose makes the situation here more delicate, because for
> most users, most of their checkpoints will actually be aligned checkpoints
> (since their checkpoints will typically not contain any on-the-wire state),
> and so these unsupported operations would actually work -- but they could
> fail. So if a user is in the habit of doing job upgrades with checkpoints,
> and are unaware of the danger posed by the change you propose, and continue
> to do these operations afterwards, their upgrades will probably continue to
> work -- until someday when they may mysteriously fail.
> 
> On a separate point, in the sentence below it seems to me it would be
> clearer to say that in the unlikely scenario you've described, the change
> would "significantly increase checkpoint sizes" -- assuming I understand
> things correctly.
> 
>> For those users [the] change to the unaligned checkpoints will
> significantly increase state size, without any benefits.
> 
> It seems to me that the worst case would be situations where this
> increase in checkpoint size causes checkpoint failures because the
> available throughput to the checkpoint storage is insufficient to handle
> the increase in size, resulting in timeouts where it was (perhaps just
> barely) okay before.
> 
> Admittedly, this is perhaps a contrived scenario, but it is possible.
> 
> I haven't made up my mind about this proposal. Overall I'm unhappy about
> the level of complexity we've created, and am trying to figure out if this
> proposal makes things better or worse overall. At the moment I'm guessing
> it makes things better for a significant minority of users, and worse for a
> smaller minority.
> 
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/ops/state/checkpoints_vs_savepoints/#capabilities-and-limitations
> 
> David
> 
> On Fri, Jan 5, 2024 at 5:42 AM Piotr Nowojski  wrote:
> 
>> Ops, fixing the topic.
>> 
>> Hi!
>>> 
>>> I would like to propose by default to enable unaligned checkpoints and
>>> also simultaneously increase the aligned checkpoints timeout from 0ms to
>>> 5s. I think this change is the right one to do for the majority of Flink
>>> users.
>>> 
>>> For more rationale please take a look into the short FLIP-413 [1].
>>> 
>>> What do you all think?
>>> 
>>> Best,
>>> Piotrek
>>> 
>>> 
>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-413%3A+Enable+unaligned+checkpoints+by+default
>>> 
>> 

--
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink & Pinot





Re: [DISCUSS] FLIP-413: Enable unaligned checkpoints by default

2024-01-07 Thread David Anderson
Piotr, I think the situation is more nuanced than what you've described.

One concern I have is that unaligned checkpoints are somewhat less flexible
in terms of which operational tasks can be safely performed with them --
i.e., if you look at the table in the docs [1], aligned checkpoints support
arbitrary job upgrades and flink minor version upgrades, and unaligned
checkpoints do not.

The change you propose makes the situation here more delicate, because for
most users, most of their checkpoints will actually be aligned checkpoints
(since their checkpoints will typically not contain any on-the-wire state),
and so these unsupported operations would actually work -- but they could
fail. So if a user is in the habit of doing job upgrades with checkpoints,
and are unaware of the danger posed by the change you propose, and continue
to do these operations afterwards, their upgrades will probably continue to
work -- until someday when they may mysteriously fail.

On a separate point, in the sentence below it seems to me it would be
clearer to say that in the unlikely scenario you've described, the change
would "significantly increase checkpoint sizes" -- assuming I understand
things correctly.

> For those users [the] change to the unaligned checkpoints will
significantly increase state size, without any benefits.

It seems to me that the worst case would be situations where this
increase in checkpoint size causes checkpoint failures because the
available throughput to the checkpoint storage is insufficient to handle
the increase in size, resulting in timeouts where it was (perhaps just
barely) okay before.

Admittedly, this is perhaps a contrived scenario, but it is possible.

I haven't made up my mind about this proposal. Overall I'm unhappy about
the level of complexity we've created, and am trying to figure out if this
proposal makes things better or worse overall. At the moment I'm guessing
it makes things better for a significant minority of users, and worse for a
smaller minority.

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/ops/state/checkpoints_vs_savepoints/#capabilities-and-limitations

David

On Fri, Jan 5, 2024 at 5:42 AM Piotr Nowojski  wrote:

> Ops, fixing the topic.
>
> Hi!
> >
> > I would like to propose by default to enable unaligned checkpoints and
> > also simultaneously increase the aligned checkpoints timeout from 0ms to
> > 5s. I think this change is the right one to do for the majority of Flink
> > users.
> >
> > For more rationale please take a look into the short FLIP-413 [1].
> >
> > What do you all think?
> >
> > Best,
> > Piotrek
> >
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-413%3A+Enable+unaligned+checkpoints+by+default
> >
>


Re: [VOTE] FLIP-387: Support named parameters for functions and call procedures

2024-01-07 Thread Feng Jin
Hi Alexey

Thank you for the reminder, the link has been updated.

Best,
Feng Jin

On Sat, Jan 6, 2024 at 12:55 AM Alexey Leonov-Vendrovskiy <
vendrov...@gmail.com> wrote:

> Thanks for starting the vote!
> Do you mind adding a link from the FLIP to this thread?
>
> Thanks,
> Alexey
>
> On Thu, Jan 4, 2024 at 6:48 PM Feng Jin  wrote:
>
> > Hi everyone
> >
> > Thanks for all the feedback about the FLIP-387: Support named parameters
> > for functions and call procedures [1] [2] .
> >
> > I'd like to start a vote for it. The vote will be open for at least 72
> > hours(excluding weekends,until Jan 10, 12:00AM GMT) unless there is an
> > objection or an insufficient number of votes.
> >
> >
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-387%3A+Support+named+parameters+for+functions+and+call+procedures
> > [2] https://lists.apache.org/thread/bto7mpjvcx7d7k86owb00dwrm65jx8cn
> >
> >
> > Best,
> > Feng Jin
> >
>


[jira] [Created] (FLINK-34009) Apache flink: Checkpoint restoration issue on Application Mode of deployment

2024-01-07 Thread Vijay (Jira)
Vijay created FLINK-34009:
-

 Summary: Apache flink: Checkpoint restoration issue on Application 
Mode of deployment
 Key: FLINK-34009
 URL: https://issues.apache.org/jira/browse/FLINK-34009
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.18.0
 Environment: Flink version: 1.18

Zookeeper version: 3.7.2

Env: Custom flink docker image (with embedded application class) deployed over 
kubernetes (v1.26.11).
Reporter: Vijay


Hi Team,

Good Day. Wish you all a happy new year 2024.

We are using Flink (1.18) version on our flink cluster. Job manager has been 
deployed on "Application mode" and HA is disabled (high-availability.type: 
NONE), under this configuration parameters we are able to start multiple jobs 
(using env.executeAsync()) of a single application.

Note: We have also setup checkpoint on a s3 instance with 
RETAIN_ON_CANCELLATION mode (plus other required settings).

Lets say now we start two jobs of the same application (ex: Jobidxxx1, 
jobidxxx2) and they are currently running on the k8s env. If we have to perform 
Flink minor upgrade (or) upgrade of our application with minor changes, in that 
case we will stop the Job Manager and Task Managers instances and perform the 
necessary up-gradation then when we start both Job Manager and Task Managers 
instance. On startup we expect the job's to be restored back from the last 
checkpoint, but the job restoration is not happening on Job manager startup. 
Please let us know if this is an bug (or) its the general behavior of flink 
under application mode of deployment.

Additional information: If we enable HA (using Zookeeper) on Application mode, 
we are able to startup only one job (i.e., per-job behavior). When we perform 
Flink minor upgrade (or) upgrade of our application with minor changes, the 
checkpoint restoration is working properly on Job Manager & Task Managers 
restart process.

It seems checkpoint restoration and HA are inter-related, but why checkpoint 
restoration doesn't work when HA is disabled.

 

Please let us know if anyone has experienced similar issues or if have any 
suggestions, it will be highly appreciated. Thanks in advance for your 
assistance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


RE: [VOTE] Release flink-connector-jdbc, release candidate #1

2024-01-07 Thread David Radley
Hi ,
I am working on FLINK-33365. I am making good progress;  thanks Sergey for your 
fabulous feedback. A lot of the query cases are now working with the latest fix 
but not all. I think it is pragmatic to revert the lookup join predicate 
pushdown support, so we can release a functional JDBC connector. I can then 
work on fixing the remaining FLINK-33365 query cases, which should not take too 
long, but I am out until Thursday this week so will be looking at it then,
   Kind regards, David.


From: Martijn Visser 
Date: Friday, 5 January 2024 at 14:24
To: dev@flink.apache.org 
Subject: [EXTERNAL] Re: [VOTE] Release flink-connector-jdbc, release candidate 
#1
Hi,

Hmmm, it would have been good to mark the Jira ticket as a Blocker
then for the JDBC connector. Since it's marked as Critical, it doesn't
appear. It has also been open for multiple months, so it doesn't
really feel like a Blocker. I'm +0 with including this fix, but then
we should either get that in quickly or revert FLINK-16024, especially
since this bug ticket has been open for multiple months. Right now, it
means that we don't have a working JDBC connector for Flink 1.17 and
Flink 1.18. That shouldn't be OK.

Thanks,

Martijn

On Fri, Jan 5, 2024 at 2:31 PM Sergey Nuyanzin  wrote:
>
> Thanks for driving this
>
> the thing which makes me thinking about -1 (not sure yet and that's why
> asking here) is that there is FLINK-33365 [1]
> mentioned as a blocker for JDBC connector release at [2]
> Since the reason for that is FLINK-16024 [3] as also was explained in
> comments for [1].
>
> So should we wait for a fix of [1] or revert [3] for 3.1.x and continue
> releasing 3.1.2?
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-33365
> [2] https://lists.apache.org/thread/sdkm5qshqozow9sljz6c0qjft6kg9cwc
>
> [3] https://issues.apache.org/jira/browse/FLINK-16024
>
> On Fri, Jan 5, 2024 at 2:19 PM Martijn Visser 
> wrote:
>
> > Hi everyone,
> > Please review and vote on the release candidate #1 for the version
> > 3.1.2, as follows:
> > [ ] +1, Approve the release
> > [ ] -1, Do not approve the release (please provide specific comments)
> >
> > This version is compatible with Flink 1.16.x, 1.17.x and 1.18.x.
> >
> > The complete staging area is available for your review, which includes:
> > * JIRA release notes [1],
> > * the official Apache source release to be deployed to dist.apache.org
> > [2], which are signed with the key with fingerprint
> > A5F3BCE4CBE993573EC5966A65321B8382B219AF [3],
> > * all artifacts to be deployed to the Maven Central Repository [4],
> > * source code tag v3.1.2-rc1 [5],
> > * website pull request listing the new release [6].
> >
> > The vote will be open for at least 72 hours. It is adopted by majority
> > approval, with at least 3 PMC affirmative votes.
> >
> > Thanks,
> > Release Manager
> >
> > [1]
> > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12354088
> > [2]
> > https://dist.apache.org/repos/dist/dev/flink/flink-connector-jdbc-3.1.2-rc1
> > [3] https://dist.apache.org/repos/dist/release/flink/KEYS
> > [4]
> > https://repository.apache.org/content/repositories/orgapacheflink-1691/
> > [5] https://github.com/apache/flink-connector-jdbc/releases/tag/v3.1.2-rc1
> > [6] https://github.com/apache/flink-web/pull/707
> >
>
>
> --
> Best regards,
> Sergey

Unless otherwise stated above:

IBM United Kingdom Limited
Registered in England and Wales with number 741598
Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU