Re: [VOTE] FLIP-444: Native file copy support

2024-06-26 Thread Stefan Richter

+1 (binding)

Best,
Stefan



> On 26. Jun 2024, at 16:14, Hong  wrote:
> 
> +1 (binding)
> 
> Hong
> 
>> On 26 Jun 2024, at 12:27, Keith Lee > > wrote:
>> 
>> +1 (non binding)
>> 
>> Best regards
>> Keith Lee
>> 
>> 
>>> On Wed, Jun 26, 2024 at 9:48 AM Zakelly Lan >> > wrote:
>>> +1 (binding)
>>> Best,
>>> Zakelly
 On Wed, Jun 26, 2024 at 3:54 PM Yuepeng Pan >>> > wrote:
 +1 (non-binding)
 Best regards,
 Yuepeng Pan
 At 2024-06-26 15:27:17, "Piotr Nowojski" >>> > wrote:
> Thanks for pointing this out Zakelly. After the discussion on the dev
> mailing list, I have updated the `PathsCopyingFileSystem` to merge its
> functionalities with `DuplicatingFileSystem`, but I've just forgotten to
> mention that it will removed/replaced with `PathsCopyingFileSystem`.
> Vote can be resumed.
> Best,
> Piotrek
> wt., 25 cze 2024 o 18:57 Piotr Nowojski 
 napisał(a):
>> Ops, I must have forgotten to update the FLIP as we discussed. I will
 fix
>> it tomorrow and the vote period will be extended.
>> Best,
>> Piotrek
>> wt., 25 cze 2024 o 13:56 Zakelly Lan 
 napisał(a):
>>> Hi Piotrek,
>>> I don't see any statement about removing or renaming the
>>> `DuplicatingFileSystem` in the FLIP, shall we do that as mentioned in
 the
>>> discussion thread?
>>> Best,
>>> Zakelly
>>> On Tue, Jun 25, 2024 at 4:58 PM Piotr Nowojski >> wrote:
 Hi all,
 I would like to start a vote for the FLIP-444 [1]. The discussion
>>> thread is
 here [2].
 The vote will be open for at least 72.
 Best,
 Piotrek
 [1] 
 https://www.google.com/url?q=https://cwiki.apache.org/confluence/x/rAn9EQ=gmail-imap=172001618500=AOvVaw2Yrz31zWmRgrWMKU4z4V0k
 [2]
>>> https://www.google.com/url?q=https://lists.apache.org/thread/lkwmyjt2bnmvgx4qpp82rldwmtd4516c=gmail-imap=172001618500=AOvVaw2i8Laq3tyfQM_Zd4rZQoPz



Re: [VOTE] FLIP-443: Interruptible watermark processing

2024-05-27 Thread Stefan Richter

+1 (binding)



> On 24. May 2024, at 09:59, Martijn Visser  wrote:
> 
> +1 (binding)
> 
> On Fri, May 24, 2024 at 7:31 AM weijie guo  >
> wrote:
> 
>> +1(binding)
>> 
>> Thanks for driving this!
>> 
>> Best regards,
>> 
>> Weijie
>> 
>> 
>> Rui Fan <1996fan...@gmail.com> 于2024年5月24日周五 13:03写道:
>> 
>>> +1(binding)
>>> 
>>> Best,
>>> Rui
>>> 
>>> On Fri, May 24, 2024 at 12:01 PM Yanfei Lei  wrote:
>>> 
 Thanks for driving this!
 
 +1 (binding)
 
 Best,
 Yanfei
 
 Zakelly Lan  于2024年5月24日周五 10:13写道:
 
> 
> +1 (binding)
> 
> Best,
> Zakelly
> 
> On Thu, May 23, 2024 at 8:21 PM Piotr Nowojski >> 
 wrote:
> 
>> Hi all,
>> 
>> After reaching what looks like a consensus in the discussion thread
 [1], I
>> would like to put FLIP-443 [2] to the vote.
>> 
>> The vote will be open for at least 72 hours unless there is an
 objection or
>> insufficient votes.
>> 
>> [1]
>> https://www.google.com/url?q=https://lists.apache.org/thread/flxm7rphvfgqdn2gq2z0bb7kl007olpz=gmail-imap=171714246900=AOvVaw1sxqcTTJfXbE_qaBA0l1FH
>> [2] 
>> https://www.google.com/url?q=https://cwiki.apache.org/confluence/x/qgn9EQ=gmail-imap=171714246900=AOvVaw3yQ55VLWPxkY2OHXf0k72Q
>> 
>> Bets,
>> Piotrek



Re: [VOTE] FLIP-452: Allow Skipping Invocation of Function Calls While Constant-folding

2024-05-08 Thread Stefan Richter
Hi Alan,

Thanks for this proposal, the ability to exclude functions from constant 
folding makes sense to me.

+1 (binding)

Best,
Stefan

> On 8. May 2024, at 02:01, Alan Sheinberg  
> wrote:
> 
> Hi everyone,
> 
> I'd like to start a vote on FLIP-452 [1]. It covers adding a new method
> FunctionDefinition.supportsConstantFolding() as part of the Flink Table/SQL
> API to allow skipping invocation of functions while constant-folding. It
> has been discussed in this thread [2].
> 
> I would like to start a vote.  The vote will be open for at least 72 hours
> unless there is an objection or insufficient votes.
> 
> [1]
> https://www.google.com/url?q=https://cwiki.apache.org/confluence/display/FLINK/FLIP-452%253A%2BAllow%2BSkipping%2BInvocation%2Bof%2BFunction%2BCalls%2BWhile%2BConstant-folding=gmail-imap=171573131400=AOvVaw3sVTK3M3Qs45haptzQbUmo
> 
> [2] 
> https://www.google.com/url?q=https://lists.apache.org/thread/ko5ndv5kr87nm011psll2hzzd0nn3ztz=gmail-imap=171573131400=AOvVaw3YKYwhLhbgWkX5hbzHRW31
> 
> Thanks,
> Alan



Re: [VOTE] FLIP-447: Upgrade FRocksDB from 6.20.3 to 8.10.0

2024-05-06 Thread Stefan Richter
+1 (binding)

Thanks for updating to a new version!

Best,
Stefan


> On 6. May 2024, at 08:36, Hangxiang Yu  wrote:
> 
> +1(binding)
> 
> On Mon, May 6, 2024 at 12:25 PM Yuan Mei  > wrote:
> 
>> +1(binding)
>> 
>> Best
>> Yuan
>> 
>> On Mon, May 6, 2024 at 11:28 AM Rui Fan <1996fan...@gmail.com> wrote:
>> 
>>> +1 (binding)
>>> 
>>> Best,
>>> Rui
>>> 
>>> On Mon, May 6, 2024 at 11:01 AM Yanfei Lei  wrote:
>>> 
 +1 (binding)
 
 Best,
 Yanfei
 
 Zakelly Lan  于2024年5月6日周一 11:00写道:
> 
> +1 (binding)
> 
> Thanks for driving this!
> 
> 
> Best,
> Zakelly
> 
> On Mon, May 6, 2024 at 10:54 AM yue ma  wrote:
> 
>> Hi everyone,
>> 
>> Thanks for all the feedback, I'd like to start a vote on the
>>> FLIP-447:
>> Upgrade FRocksDB from 6.20.3 to 8.10.0 [1]. The discussion thread
>> is
 here
>> [2].
>> 
>> The vote will be open for at least 72 hours unless there is an
 objection or
>> insufficient votes.
>> 
>> [1]
>> 
>> 
 
>>> 
>> https://www.google.com/url?q=https://cwiki.apache.org/confluence/display/FLINK/FLIP-447%253A%2BUpgrade%2BFRocksDB%2Bfrom%2B6.20.3%2B%2Bto%2B8.10.0=gmail-imap=171558224200=AOvVaw1DFHTHrb4bjg14MVzV3g6T
>> [2]
>> https://www.google.com/url?q=https://lists.apache.org/thread/lrxjfpjjwlq4sjzm1oolx58n1n8r48hw=gmail-imap=171558224200=AOvVaw18pfJ4Jd8h8nQEso2ecfGD
>> 
>> --
>> Best,
>> Yue
>> 
 
>>> 
>> 
> 
> 
> -- 
> Best,
> Hangxiang.



[jira] [Updated] (FLINK-35217) Missing fsync in FileSystemCheckpointStorage

2024-04-30 Thread Stefan Richter (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter updated FLINK-35217:
---
Fix Version/s: 1.18.2
   1.19.1

> Missing fsync in FileSystemCheckpointStorage
> 
>
> Key: FLINK-35217
> URL: https://issues.apache.org/jira/browse/FLINK-35217
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystems, Runtime / Checkpointing
>Affects Versions: 1.17.0, 1.18.0, 1.19.0
>Reporter: Marc Aurel Fritz
>    Assignee: Stefan Richter
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.18.2, 1.20.0, 1.19.1
>
>
> While running Flink on a system with unstable power supply checkpoints were 
> regularly corrupted in the form of "_metadata" files with a file size of 0 
> bytes. In all cases the previous checkpoint data had already been deleted, 
> causing progress to be lost completely.
> Further investigation revealed that the "FileSystemCheckpointStorage" doesn't 
> perform "fsync" when writing a new checkpoint to disk. This means the old 
> checkpoint gets removed without making sure that the new one is durably 
> persisted on disk. "strace" on the jobmanager's process confirms this 
> behavior:
>  # The checkpoint chk-60's in-progress metadata is written at "openat"
>  # The checkpoint chk-60's in-progress metadata is atomically renamed at 
> "rename"
>  # The old checkpoint chk-59 is deleted at "unlink"
> For durable persistence an "fsync" call is missing before step 3.
> Full "strace" log:
> {code:java}
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 
> 0x7fd2ad5fc970) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 
> 0x7fd2ad5fca00) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc", 
> {st_mode=S_IFDIR|0755, st_size=42, ...}) = 0
> [pid 51618] 11:44:30 
> mkdir("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 0777) 
> = 0
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/_metadata",
>  0x7fd2ad5fc860) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/_metadata",
>  0x7fd2ad5fc740) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8",
>  0x7fd2ad5fc7d0) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 51618] 11:44:30 openat(AT_FDCWD, 
> "/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8",
>  O_WRONLY|O_CREAT|O_EXCL, 0666) = 168
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8",
>  {st_mode=S_IFREG|0644, st_size=23378, ...}) = 0
> [pid 51618] 11:44:30 
> rename("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8",
>  "/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/_metadata") = > 0
> [pid 51644] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59/_metadata",
>  {st_mode=S_IFREG|0644, st_size=23378, ...}) = 0
> [pid 51644] 11:44:30 
> unlink("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59/_metadata")
>  = 0
> [pid 51644] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 51644] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 51644] 11:44:30 openat(AT_FDCWD, 
> "/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", 
> O_RDONLY|O

Re: [DISCUSS] FLIP-444: Native file copy support

2024-04-30 Thread Stefan Richter
Hi,

Thanks for the proposal, I think improving download speed in this way is a 
great idea. Hope we can have similar improvements for other clouds as well.

Best,
Stefan


> On 30. Apr 2024, at 15:15, Piotr Nowojski  wrote:
> 
> Hi all!
> 
> I would like to put under discussion:
> 
> FLIP-444: Native file copy support
> https://www.google.com/url?q=https://cwiki.apache.org/confluence/x/rAn9EQ=gmail-imap=171508775800=AOvVaw1jTnJd2SXZnKzqpuYEwk14
> 
> This proposal aims to speed up Flink recovery times, by speeding up state
> download times. However in the future, the same mechanism could be also
> used to speed up state uploading (checkpointing/savepointing).
> 
> I'm curious to hear your thoughts.
> 
> Best,
> Piotrek



Re: [DISCUSS] FLIP-443: Interruptible watermark processing

2024-04-30 Thread Stefan Richter

Thanks for the improvement proposal, I’m +1 for the change!

Best,
Stefan



> On 30. Apr 2024, at 15:23, Roman Khachatryan  wrote:
> 
> Thanks for the proposal, I definitely see the need for this improvement, +1.
> 
> Regards,
> Roman
> 
> 
> On Tue, Apr 30, 2024 at 3:11 PM Piotr Nowojski  > wrote:
> 
>> Hi Yanfei,
>> 
>> Thanks for the feedback!
>> 
>>> 1. Currently when AbstractStreamOperator or AbstractStreamOperatorV2
>>> processes a watermark, the watermark will be sent to downstream, if
>>> the `InternalTimerServiceImpl#advanceWatermark` is interrupted, when
>>> is the watermark sent downstream?
>> 
>> The watermark would be outputted by an operator only once all relevant
>> timers are fired.
>> In other words, if firing of timers is interrupted a continuation mail to
>> continue firing those
>> interrupted timers is created. Watermark will be emitted downstream at the
>> end of that
>> continuation mail.
>> 
>>> 2. IIUC, processing-timer's firing is also encapsulated into mail and
>>> executed in mailbox. Is processing-timer allowed to be interrupted?
>> 
>> Yes, both firing processing and even time timers share the same code and
>> both will
>> support interruptions in the same way. Actually I've renamed the FLIP from
>> 
>>> Interruptible watermarks processing
>> 
>> to:
>> 
>>> Interruptible timers firing
>> 
>> to make this more clear.
>> 
>> Best,
>> Piotrek
>> 
>> wt., 30 kwi 2024 o 06:08 Yanfei Lei  napisał(a):
>> 
>>> Hi Piotrek,
>>> 
>>> Thanks for this proposal. It looks like it will shorten the checkpoint
>>> duration, especially in the case of back pressure. +1 for it!  I'd
>>> like to ask some questions to understand your thoughts more precisely.
>>> 
>>> 1. Currently when AbstractStreamOperator or AbstractStreamOperatorV2
>>> processes a watermark, the watermark will be sent to downstream, if
>>> the `InternalTimerServiceImpl#advanceWatermark` is interrupted, when
>>> is the watermark sent downstream?
>>> 2. IIUC, processing-timer's firing is also encapsulated into mail and
>>> executed in mailbox. Is processing-timer allowed to be interrupted?
>>> 
>>> Best regards,
>>> Yanfei
>>> 
>>> Piotr Nowojski  于2024年4月29日周一 21:57写道:
>>> 
 
 Hi all,
 
 I would like to start a discussion on FLIP-443: Interruptible watermark
 processing.
 
 https://www.google.com/url?q=https://cwiki.apache.org/confluence/x/qgn9EQ=gmail-imap=171508837000=AOvVaw0eTZDvLwdZUDai5GqoSGrD
 
 This proposal tries to make Flink's subtask thread more responsive when
 processing watermarks/firing timers, and make those operations
 interruptible/break them apart into smaller steps. At the same time,
>> the
 proposed solution could be potentially adopted in other places in the
>>> code
 base as well, to solve similar problems with other flatMap-like
>> operators
 (non windowed joins, aggregations, CepOperator, ...).
 
 I'm looking forward to your thoughts.
 
 Best,
 Piotrek



[jira] [Closed] (FLINK-35217) Missing fsync in FileSystemCheckpointStorage

2024-04-30 Thread Stefan Richter (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-35217.
--
Resolution: Fixed

Merged to master in 
[{{80af4d5}}|https://github.com/apache/flink/commit/80af4d502318348ba15a8f75a2a622ce9dbdc968]
  

> Missing fsync in FileSystemCheckpointStorage
> 
>
> Key: FLINK-35217
> URL: https://issues.apache.org/jira/browse/FLINK-35217
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystems, Runtime / Checkpointing
>Affects Versions: 1.17.0, 1.18.0, 1.19.0
>Reporter: Marc Aurel Fritz
>    Assignee: Stefan Richter
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> While running Flink on a system with unstable power supply checkpoints were 
> regularly corrupted in the form of "_metadata" files with a file size of 0 
> bytes. In all cases the previous checkpoint data had already been deleted, 
> causing progress to be lost completely.
> Further investigation revealed that the "FileSystemCheckpointStorage" doesn't 
> perform "fsync" when writing a new checkpoint to disk. This means the old 
> checkpoint gets removed without making sure that the new one is durably 
> persisted on disk. "strace" on the jobmanager's process confirms this 
> behavior:
>  # The checkpoint chk-60's in-progress metadata is written at "openat"
>  # The checkpoint chk-60's in-progress metadata is atomically renamed at 
> "rename"
>  # The old checkpoint chk-59 is deleted at "unlink"
> For durable persistence an "fsync" call is missing before step 3.
> Full "strace" log:
> {code:java}
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 
> 0x7fd2ad5fc970) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 
> 0x7fd2ad5fca00) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc", 
> {st_mode=S_IFDIR|0755, st_size=42, ...}) = 0
> [pid 51618] 11:44:30 
> mkdir("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 0777) 
> = 0
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/_metadata",
>  0x7fd2ad5fc860) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/_metadata",
>  0x7fd2ad5fc740) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8",
>  0x7fd2ad5fc7d0) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 51618] 11:44:30 openat(AT_FDCWD, 
> "/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8",
>  O_WRONLY|O_CREAT|O_EXCL, 0666) = 168
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8",
>  {st_mode=S_IFREG|0644, st_size=23378, ...}) = 0
> [pid 51618] 11:44:30 
> rename("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8",
>  "/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/_metadata") = > 0
> [pid 51644] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59/_metadata",
>  {st_mode=S_IFREG|0644, st_size=23378, ...}) = 0
> [pid 51644] 11:44:30 
> unlink("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59/_metadata")
>  = 0
> [pid 51644] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 51644] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 51644] 11:44:30 openat(AT_FDCWD, 
> "/opt/flink/statestore/e1c541c

[jira] [Updated] (FLINK-35217) Missing fsync in FileSystemCheckpointStorage

2024-04-30 Thread Stefan Richter (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter updated FLINK-35217:
---
Fix Version/s: 1.20.0

> Missing fsync in FileSystemCheckpointStorage
> 
>
> Key: FLINK-35217
> URL: https://issues.apache.org/jira/browse/FLINK-35217
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystems, Runtime / Checkpointing
>Affects Versions: 1.17.0, 1.18.0, 1.19.0
>Reporter: Marc Aurel Fritz
>    Assignee: Stefan Richter
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> While running Flink on a system with unstable power supply checkpoints were 
> regularly corrupted in the form of "_metadata" files with a file size of 0 
> bytes. In all cases the previous checkpoint data had already been deleted, 
> causing progress to be lost completely.
> Further investigation revealed that the "FileSystemCheckpointStorage" doesn't 
> perform "fsync" when writing a new checkpoint to disk. This means the old 
> checkpoint gets removed without making sure that the new one is durably 
> persisted on disk. "strace" on the jobmanager's process confirms this 
> behavior:
>  # The checkpoint chk-60's in-progress metadata is written at "openat"
>  # The checkpoint chk-60's in-progress metadata is atomically renamed at 
> "rename"
>  # The old checkpoint chk-59 is deleted at "unlink"
> For durable persistence an "fsync" call is missing before step 3.
> Full "strace" log:
> {code:java}
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 
> 0x7fd2ad5fc970) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 
> 0x7fd2ad5fca00) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc", 
> {st_mode=S_IFDIR|0755, st_size=42, ...}) = 0
> [pid 51618] 11:44:30 
> mkdir("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 0777) 
> = 0
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/_metadata",
>  0x7fd2ad5fc860) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/_metadata",
>  0x7fd2ad5fc740) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8",
>  0x7fd2ad5fc7d0) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 51618] 11:44:30 openat(AT_FDCWD, 
> "/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8",
>  O_WRONLY|O_CREAT|O_EXCL, 0666) = 168
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8",
>  {st_mode=S_IFREG|0644, st_size=23378, ...}) = 0
> [pid 51618] 11:44:30 
> rename("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8",
>  "/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/_metadata") = > 0
> [pid 51644] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59/_metadata",
>  {st_mode=S_IFREG|0644, st_size=23378, ...}) = 0
> [pid 51644] 11:44:30 
> unlink("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59/_metadata")
>  = 0
> [pid 51644] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 51644] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 51644] 11:44:30 openat(AT_FDCWD, 
> "/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", 
> O_RDONLY|O_NONBLOCK|O_CLOEXEC|O_DIRECTORY) = 168
> [pid 516

[jira] [Assigned] (FLINK-35217) Missing fsync in FileSystemCheckpointStorage

2024-04-25 Thread Stefan Richter (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter reassigned FLINK-35217:
--

Assignee: Stefan Richter

> Missing fsync in FileSystemCheckpointStorage
> 
>
> Key: FLINK-35217
> URL: https://issues.apache.org/jira/browse/FLINK-35217
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystems, Runtime / Checkpointing
>Affects Versions: 1.17.0, 1.18.0, 1.19.0
>Reporter: Marc Aurel Fritz
>    Assignee: Stefan Richter
>Priority: Critical
>
> While running Flink on a system with unstable power supply checkpoints were 
> regularly corrupted in the form of "_metadata" files with a file size of 0 
> bytes. In all cases the previous checkpoint data had already been deleted, 
> causing progress to be lost completely.
> Further investigation revealed that the "FileSystemCheckpointStorage" doesn't 
> perform "fsync" when writing a new checkpoint to disk. This means the old 
> checkpoint gets removed without making sure that the new one is durably 
> persisted on disk. "strace" on the jobmanager's process confirms this 
> behavior:
>  # The checkpoint chk-60's in-progress metadata is written at "openat"
>  # The checkpoint chk-60's in-progress metadata is atomically renamed at 
> "rename"
>  # The old checkpoint chk-59 is deleted at "unlink"
> For durable persistence an "fsync" call is missing before step 3.
> Full "strace" log:
> {code:java}
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 
> 0x7fd2ad5fc970) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 
> 0x7fd2ad5fca00) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc", 
> {st_mode=S_IFDIR|0755, st_size=42, ...}) = 0
> [pid 51618] 11:44:30 
> mkdir("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 0777) 
> = 0
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/_metadata",
>  0x7fd2ad5fc860) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/_metadata",
>  0x7fd2ad5fc740) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8",
>  0x7fd2ad5fc7d0) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 51618] 11:44:30 openat(AT_FDCWD, 
> "/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8",
>  O_WRONLY|O_CREAT|O_EXCL, 0666) = 168
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8",
>  {st_mode=S_IFREG|0644, st_size=23378, ...}) = 0
> [pid 51618] 11:44:30 
> rename("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8",
>  "/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/_metadata") = > 0
> [pid 51644] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59/_metadata",
>  {st_mode=S_IFREG|0644, st_size=23378, ...}) = 0
> [pid 51644] 11:44:30 
> unlink("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59/_metadata")
>  = 0
> [pid 51644] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 51644] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 51644] 11:44:30 openat(AT_FDCWD, 
> "/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", 
> O_RDONLY|O_NONBLOCK|O_CLOEXEC|O_DIRECTORY) = 168
> [pid 51644] 11:44:30 newfstatat(168, "", {st_mode=S_IFDIR|0755, st_size=0, 
>

[jira] [Commented] (FLINK-35217) Missing fsync in FileSystemCheckpointStorage

2024-04-25 Thread Stefan Richter (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17840787#comment-17840787
 ] 

Stefan Richter commented on FLINK-35217:


I think you are right, close will only guarantee a flush, i.e. passing all data 
to the OS, but not forcing the OS to write to disk.

> Missing fsync in FileSystemCheckpointStorage
> 
>
> Key: FLINK-35217
> URL: https://issues.apache.org/jira/browse/FLINK-35217
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystems, Runtime / Checkpointing
>Affects Versions: 1.17.0, 1.18.0, 1.19.0
>Reporter: Marc Aurel Fritz
>Priority: Critical
>
> While running Flink on a system with unstable power supply checkpoints were 
> regularly corrupted in the form of "_metadata" files with a file size of 0 
> bytes. In all cases the previous checkpoint data had already been deleted, 
> causing progress to be lost completely.
> Further investigation revealed that the "FileSystemCheckpointStorage" doesn't 
> perform "fsync" when writing a new checkpoint to disk. This means the old 
> checkpoint gets removed without making sure that the new one is durably 
> persisted on disk. "strace" on the jobmanager's process confirms this 
> behavior:
>  # The checkpoint chk-60's in-progress metadata is written at "openat"
>  # The checkpoint chk-60's in-progress metadata is atomically renamed at 
> "rename"
>  # The old checkpoint chk-59 is deleted at "unlink"
> For durable persistence an "fsync" call is missing before step 3.
> Full "strace" log:
> {code:java}
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 
> 0x7fd2ad5fc970) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 
> 0x7fd2ad5fca00) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc", 
> {st_mode=S_IFDIR|0755, st_size=42, ...}) = 0
> [pid 51618] 11:44:30 
> mkdir("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 0777) 
> = 0
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/_metadata",
>  0x7fd2ad5fc860) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/_metadata",
>  0x7fd2ad5fc740) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8",
>  0x7fd2ad5fc7d0) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 51618] 11:44:30 openat(AT_FDCWD, 
> "/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8",
>  O_WRONLY|O_CREAT|O_EXCL, 0666) = 168
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8",
>  {st_mode=S_IFREG|0644, st_size=23378, ...}) = 0
> [pid 51618] 11:44:30 
> rename("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8",
>  "/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/_metadata") = > 0
> [pid 51644] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59/_metadata",
>  {st_mode=S_IFREG|0644, st_size=23378, ...}) = 0
> [pid 51644] 11:44:30 
> unlink("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59/_metadata")
>  = 0
> [pid 51644] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 51644] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 51644] 11:44:30 openat(AT_FDCWD, 
> "/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", 
> O_RDONLY|O_NONBLOCK|O_CLOEXEC|O_DIRECTORY) = 1

[jira] [Commented] (FLINK-35217) Missing fsync in FileSystemCheckpointStorage

2024-04-23 Thread Stefan Richter (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17840115#comment-17840115
 ] 

Stefan Richter commented on FLINK-35217:


Hi, the code is calling close on the output stream which usually implies that 
it's flushed and synced. I'm wondering if this is a OS or Java version specific 
problem?

> Missing fsync in FileSystemCheckpointStorage
> 
>
> Key: FLINK-35217
> URL: https://issues.apache.org/jira/browse/FLINK-35217
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystems, Runtime / Checkpointing
>Affects Versions: 1.17.0, 1.18.0, 1.19.0
>Reporter: Marc Aurel Fritz
>Priority: Critical
>
> While running Flink on a system with unstable power supply checkpoints were 
> regularly corrupted in the form of "_metadata" files with a file size of 0 
> bytes. In all cases the previous checkpoint data had already been deleted, 
> causing progress to be lost completely.
> Further investigation revealed that the "FileSystemCheckpointStorage" doesn't 
> perform "fsync" when writing a new checkpoint to disk. This means the old 
> checkpoint gets removed without making sure that the new one is durably 
> persisted on disk. "strace" on the jobmanager's process confirms this 
> behavior:
>  # The checkpoint chk-60's in-progress metadata is written at "openat"
>  # The checkpoint chk-60's in-progress metadata is atomically renamed at 
> "rename"
>  # The old checkpoint chk-59 is deleted at "unlink"
> For durable persistence an "fsync" call is missing before step 3.
> Full "strace" log:
> {code:java}
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 
> 0x7fd2ad5fc970) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 
> 0x7fd2ad5fca00) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc", 
> {st_mode=S_IFDIR|0755, st_size=42, ...}) = 0
> [pid 51618] 11:44:30 
> mkdir("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 0777) 
> = 0
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/_metadata",
>  0x7fd2ad5fc860) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/_metadata",
>  0x7fd2ad5fc740) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8",
>  0x7fd2ad5fc7d0) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 51618] 11:44:30 openat(AT_FDCWD, 
> "/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8",
>  O_WRONLY|O_CREAT|O_EXCL, 0666) = 168
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8",
>  {st_mode=S_IFREG|0644, st_size=23378, ...}) = 0
> [pid 51618] 11:44:30 
> rename("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8",
>  "/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/_metadata") = > 0
> [pid 51644] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59/_metadata",
>  {st_mode=S_IFREG|0644, st_size=23378, ...}) = 0
> [pid 51644] 11:44:30 
> unlink("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59/_metadata")
>  = 0
> [pid 51644] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 51644] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 51644] 11:44:30 openat(AT_FDCWD, 
> "/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", 
> O_RDONLY|O_

[jira] [Updated] (FLINK-34693) Memory leak in KafkaWriter

2024-03-15 Thread Stefan Richter (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34693?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter updated FLINK-34693:
---
Attachment: (was: image-2024-03-15-10-30-50-902.png)

> Memory leak in KafkaWriter
> --
>
> Key: FLINK-34693
> URL: https://issues.apache.org/jira/browse/FLINK-34693
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0, 1.19.0, 1.18.1
>    Reporter: Stefan Richter
>Priority: Blocker
> Attachments: image-2024-03-15-10-30-08-280.png
>
>
> KafkaWriter is keeping objects in Dequeue of closeables 
> ({{{}producerCloseables{}}}) that are never removed so that the can be GC’ed.
> From heap 
> dump:!04599375-f923-4d1a-8d68-9e17e54b363c#media-blob-url=true=9d1e022e-8762-45b3-877b-d298ec956078==870337=306=2284=!
>   !image-2024-03-15-10-30-08-280.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34693) Memory leak in KafkaWriter

2024-03-15 Thread Stefan Richter (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34693?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter updated FLINK-34693:
---
Description: 
KafkaWriter is keeping objects in Dequeue of closeables 
({{{}producerCloseables{}}}) that are never removed so that the can be GC’ed.

>From heap 
>dump:!04599375-f923-4d1a-8d68-9e17e54b363c#media-blob-url=true=9d1e022e-8762-45b3-877b-d298ec956078==870337=306=2284=!
>  !image-2024-03-15-10-30-08-280.png!

  was:
KafkaWriter is keeping instances of {{TwoPhaseCommitProducer}} in Dequeue of 
closeables ({{{}producerCloseables{}}}). We are only adding instances to the 
queue (for each txn?), but never remove them so that the can be GC’ed.

>From heap dump:

!image-2024-03-15-10-30-50-902.png!
!04599375-f923-4d1a-8d68-9e17e54b363c#media-blob-url=true=9d1e022e-8762-45b3-877b-d298ec956078==870337=306=2284=!
 !image-2024-03-15-10-30-08-280.png!


> Memory leak in KafkaWriter
> --
>
> Key: FLINK-34693
> URL: https://issues.apache.org/jira/browse/FLINK-34693
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0, 1.19.0, 1.18.1
>Reporter: Stefan Richter
>Priority: Blocker
> Attachments: image-2024-03-15-10-30-08-280.png, 
> image-2024-03-15-10-30-50-902.png
>
>
> KafkaWriter is keeping objects in Dequeue of closeables 
> ({{{}producerCloseables{}}}) that are never removed so that the can be GC’ed.
> From heap 
> dump:!04599375-f923-4d1a-8d68-9e17e54b363c#media-blob-url=true=9d1e022e-8762-45b3-877b-d298ec956078==870337=306=2284=!
>   !image-2024-03-15-10-30-08-280.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34693) Memory leak in KafkaWriter

2024-03-15 Thread Stefan Richter (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34693?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter updated FLINK-34693:
---
Description: 
KafkaWriter is keeping instances of {{TwoPhaseCommitProducer}} in Dequeue of 
closeables ({{{}producerCloseables{}}}). We are only adding instances to the 
queue (for each txn?), but never remove them so that the can be GC’ed.

>From heap dump:

!image-2024-03-15-10-30-50-902.png!
!04599375-f923-4d1a-8d68-9e17e54b363c#media-blob-url=true=9d1e022e-8762-45b3-877b-d298ec956078==870337=306=2284=!
 !image-2024-03-15-10-30-08-280.png!

  was:
KafkaWriter is keeping instances of {{TwoPhaseCommitProducer}} in Dequeue of 
closeables ({{{}producerCloseables{}}}). We are only adding instances to the 
queue (for each txn?), but never remove them so that the can be GC’ed.

>From heap dump:

!image-2024-03-15-10-30-50-902.png!
!04599375-f923-4d1a-8d68-9e17e54b363c#media-blob-url=true=9d1e022e-8762-45b3-877b-d298ec956078==870337=306=2284=!!image-2024-03-15-10-30-08-280.png!!image-2024-03-15-10-28-48-591.png!


> Memory leak in KafkaWriter
> --
>
> Key: FLINK-34693
> URL: https://issues.apache.org/jira/browse/FLINK-34693
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0, 1.19.0, 1.18.1
>Reporter: Stefan Richter
>Priority: Blocker
> Attachments: image-2024-03-15-10-30-08-280.png, 
> image-2024-03-15-10-30-50-902.png
>
>
> KafkaWriter is keeping instances of {{TwoPhaseCommitProducer}} in Dequeue of 
> closeables ({{{}producerCloseables{}}}). We are only adding instances to the 
> queue (for each txn?), but never remove them so that the can be GC’ed.
> From heap dump:
> !image-2024-03-15-10-30-50-902.png!
> !04599375-f923-4d1a-8d68-9e17e54b363c#media-blob-url=true=9d1e022e-8762-45b3-877b-d298ec956078==870337=306=2284=!
>  !image-2024-03-15-10-30-08-280.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34693) Memory leak in KafkaWriter

2024-03-15 Thread Stefan Richter (Jira)
Stefan Richter created FLINK-34693:
--

 Summary: Memory leak in KafkaWriter
 Key: FLINK-34693
 URL: https://issues.apache.org/jira/browse/FLINK-34693
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.18.1, 1.19.0, 1.18.0
Reporter: Stefan Richter
 Attachments: image-2024-03-15-10-30-08-280.png, 
image-2024-03-15-10-30-50-902.png

KafkaWriter is keeping instances of {{TwoPhaseCommitProducer}} in Dequeue of 
closeables ({{{}producerCloseables{}}}). We are only adding instances to the 
queue (for each txn?), but never remove them so that the can be GC’ed.

>From heap dump:

!image-2024-03-15-10-30-50-902.png!
!04599375-f923-4d1a-8d68-9e17e54b363c#media-blob-url=true=9d1e022e-8762-45b3-877b-d298ec956078==870337=306=2284=!!image-2024-03-15-10-30-08-280.png!!image-2024-03-15-10-28-48-591.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34693) Memory leak in KafkaWriter

2024-03-15 Thread Stefan Richter (Jira)
Stefan Richter created FLINK-34693:
--

 Summary: Memory leak in KafkaWriter
 Key: FLINK-34693
 URL: https://issues.apache.org/jira/browse/FLINK-34693
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.18.1, 1.19.0, 1.18.0
Reporter: Stefan Richter
 Attachments: image-2024-03-15-10-30-08-280.png, 
image-2024-03-15-10-30-50-902.png

KafkaWriter is keeping instances of {{TwoPhaseCommitProducer}} in Dequeue of 
closeables ({{{}producerCloseables{}}}). We are only adding instances to the 
queue (for each txn?), but never remove them so that the can be GC’ed.

>From heap dump:

!image-2024-03-15-10-30-50-902.png!
!04599375-f923-4d1a-8d68-9e17e54b363c#media-blob-url=true=9d1e022e-8762-45b3-877b-d298ec956078==870337=306=2284=!!image-2024-03-15-10-30-08-280.png!!image-2024-03-15-10-28-48-591.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-34579) Introduce metric for time since last completed checkpoint

2024-03-05 Thread Stefan Richter (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-34579.
--
Resolution: Won't Do

> Introduce metric for time since last completed checkpoint
> -
>
> Key: FLINK-34579
> URL: https://issues.apache.org/jira/browse/FLINK-34579
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>        Reporter: Stefan Richter
>    Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> This metric will help us to identify jobs with checkpointing problems without 
> first requiring to complete or fail the checkpoint first before the problem 
> surfaces.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34579) Introduce metric for time since last completed checkpoint

2024-03-05 Thread Stefan Richter (Jira)
Stefan Richter created FLINK-34579:
--

 Summary: Introduce metric for time since last completed checkpoint
 Key: FLINK-34579
 URL: https://issues.apache.org/jira/browse/FLINK-34579
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing
Reporter: Stefan Richter
Assignee: Stefan Richter
 Fix For: 1.20.0


This metric will help us to identify jobs with checkpointing problems without 
first requiring to complete or fail the checkpoint first before the problem 
surfaces.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34579) Introduce metric for time since last completed checkpoint

2024-03-05 Thread Stefan Richter (Jira)
Stefan Richter created FLINK-34579:
--

 Summary: Introduce metric for time since last completed checkpoint
 Key: FLINK-34579
 URL: https://issues.apache.org/jira/browse/FLINK-34579
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing
Reporter: Stefan Richter
Assignee: Stefan Richter
 Fix For: 1.20.0


This metric will help us to identify jobs with checkpointing problems without 
first requiring to complete or fail the checkpoint first before the problem 
surfaces.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-34546) Emit span with failure labels on failure

2024-03-01 Thread Stefan Richter (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-34546.
--
Resolution: Fixed

> Emit span with failure labels on failure
> 
>
> Key: FLINK-34546
> URL: https://issues.apache.org/jira/browse/FLINK-34546
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
>        Reporter: Stefan Richter
>    Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> To improve observability, we should emit a span for each failure that 
> contains details about the failure classification.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34546) Emit span with failure labels on failure

2024-02-29 Thread Stefan Richter (Jira)
Stefan Richter created FLINK-34546:
--

 Summary: Emit span with failure labels on failure
 Key: FLINK-34546
 URL: https://issues.apache.org/jira/browse/FLINK-34546
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Metrics
Reporter: Stefan Richter
Assignee: Stefan Richter
 Fix For: 1.20.0


To improve observability, we should emit a span for each failure that contains 
details about the failure classification.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34546) Emit span with failure labels on failure

2024-02-29 Thread Stefan Richter (Jira)
Stefan Richter created FLINK-34546:
--

 Summary: Emit span with failure labels on failure
 Key: FLINK-34546
 URL: https://issues.apache.org/jira/browse/FLINK-34546
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Metrics
Reporter: Stefan Richter
Assignee: Stefan Richter
 Fix For: 1.20.0


To improve observability, we should emit a span for each failure that contains 
details about the failure classification.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33555) LocalRecoveryITCase.testRecoverLocallyFromProcessCrashWithWorkingDirectory:

2024-02-16 Thread Stefan Richter (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17817916#comment-17817916
 ] 

Stefan Richter commented on FLINK-33555:


[~mapohl] I did some work related to local recovery (essentially using 
available local state in rescaling), but that doesn't affect allocation or 
non-rescaling scenarios.

> LocalRecoveryITCase.testRecoverLocallyFromProcessCrashWithWorkingDirectory:
> ---
>
> Key: FLINK-33555
> URL: https://issues.apache.org/jira/browse/FLINK-33555
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.19.0, 1.20.0
>Reporter: Matthias Pohl
>Priority: Critical
>  Labels: test-stability
> Attachments: FLINK-33555.log
>
>
> https://github.com/XComp/flink/actions/runs/6868936761/job/18680977238#step:12:13492
> {code}
> Error: 21:44:15 21:44:15.144 [ERROR]   
> LocalRecoveryITCase.testRecoverLocallyFromProcessCrashWithWorkingDirectory:119
>  [The task was deployed to AllocationID(fcf411eadbae8beed895a78ea1653046) but 
> it should have been deployed to 
> AllocationID(dec337d82b9d960004ffd73be8a2c5d5) for local recovery., The task 
> was deployed to AllocationID(a61fd8a6bc5ef9d467f32f918bdfb385) but it should 
> have been deployed to AllocationID(fcf411eadbae8beed895a78ea1653046) for 
> local recovery., The task was deployed to 
> AllocationID(dec337d82b9d960004ffd73be8a2c5d5) but it should have been 
> deployed to AllocationID(a61fd8a6bc5ef9d467f32f918bdfb385) for local 
> recovery.] ==> expected:  but was: 
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33962) Chaining-agnostic OperatorID generation for improved state compatibility on parallelism change

2024-02-07 Thread Stefan Richter (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17815274#comment-17815274
 ] 

Stefan Richter commented on FLINK-33962:


Hi [~Zhanghao Chen] ! The proposed change in this Jira makes sense to me, and I 
think that using the same idea as outlined in the FLIP should also work for 
this case. From the top of my head I don't see a problem to revive the previous 
mechanism for compatibility.

> Chaining-agnostic OperatorID generation for improved state compatibility on 
> parallelism change
> --
>
> Key: FLINK-33962
> URL: https://issues.apache.org/jira/browse/FLINK-33962
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Core
>Reporter: Zhanghao Chen
>Priority: Major
>
> *Background*
> Flink restores opeartor state from snapshots based on matching the 
> operatorIDs. Since Flink 1.2, {{StreamGraphHasherV2}} is used for operatorID 
> generation when no user-set uid exists. The generated OperatorID is 
> deterministic with respect to:
>  * node-local properties (the traverse ID in the BFS for the stream graph)
>  * chained output nodes
>  * input nodes hashes
> *Problem*
> The chaining behavior will affect state compatibility, as the generation of 
> the OperatorID of an Op is dependent on its chained output nodes. For 
> example, a simple source->sink DAG with source and sink chained together is 
> state imcompatible with an otherwise identical DAG with source and sink 
> unchained (either because the parallelisms of the two ops are changed to be 
> unequal or chaining is disabled). This greatly limits the flexibility to 
> perform chain-breaking/joining for performance tuning.
> *Proposal*
> Introduce {{StreamGraphHasherV3}} that is agnostic to the chaining behavior 
> of operators, which effectively just removes L227-235 of 
> [flink/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java
>  at master · apache/flink 
> (github.com)|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java].
>  
> This will not hurt the deteministicity of the ID generation across job 
> submission as long as the stream graph topology doesn't change, and since new 
> versions of Flink have already adopted pure operator-level state recovery, 
> this will not break state recovery across job submission as long as both 
> submissions use the same hasher.
> This will, however, break cross-version state compatibility. So we can 
> introduce a new option to enable using HasherV3 in v1.19 and consider making 
> it the default hasher in v2.0.
> Looking forward to suggestions on this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-34050) Rocksdb state has space amplification after rescaling with DeleteRange

2024-02-05 Thread Stefan Richter (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17814312#comment-17814312
 ] 

Stefan Richter edited comment on FLINK-34050 at 2/5/24 10:41 AM:
-

Just one idea: since the current proposal is making the rescaling times worse, 
it can have significant drawback. How about we call deleteFiles async before 
the next checkpoint after a rescaling, thus making sure that the space 
amplification never makes it into the checkpoint and doing it outside of a 
critical path for restoring. Wdyt?


was (Author: srichter):
Just one idea: since the current proposal is making the rescaling times worse, 
it can have significant drawback. How about we call deleteFiles in the async 
part of the next checkpoint after a rescaling, thus making sure that the space 
amplification never makes it into the checkpoint and doing it outside of a 
critical path for restoring or processing. Wdyt?

> Rocksdb state has space amplification after rescaling with DeleteRange
> --
>
> Key: FLINK-34050
> URL: https://issues.apache.org/jira/browse/FLINK-34050
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Jinzhong Li
>Assignee: Jinzhong Li
>Priority: Major
> Attachments: image-2024-01-10-21-23-48-134.png, 
> image-2024-01-10-21-24-10-983.png, image-2024-01-10-21-28-24-312.png
>
>
> FLINK-21321 use deleteRange to speed up rocksdb rescaling, however it will 
> cause space amplification in some case.
> We can reproduce this problem using wordCount job:
> 1) before rescaling, state operator in wordCount job has 2 parallelism and 
> 4G+ full checkpoint size;
> !image-2024-01-10-21-24-10-983.png|width=266,height=130!
> 2) then restart job with 4 parallelism (for state operator),  the full 
> checkpoint size of new job will be 8G+ ;
> 3) after many successful checkpoints, the full checkpoint size is still 8G+;
> !image-2024-01-10-21-28-24-312.png|width=454,height=111!
>  
> The root cause of this issue is that the deleted keyGroupRange does not 
> overlap with current DB keyGroupRange, so new data written into rocksdb after 
> rescaling almost never do LSM compaction with the deleted data (belonging to 
> other keyGroupRange.)
>  
> And the space amplification may affect Rocksdb read performance and disk 
> space usage after rescaling. It looks like a regression due to the 
> introduction of deleteRange for rescaling optimization.
>  
> To slove this problem, I think maybe we can invoke 
> Rocksdb.deleteFilesInRanges after deleteRange?
> {code:java}
> public static void clipDBWithKeyGroupRange() {
>   //...
>   List ranges = new ArrayList<>();
>   //...
>   deleteRange(db, columnFamilyHandles, beginKeyGroupBytes, endKeyGroupBytes);
>   ranges.add(beginKeyGroupBytes);
>   ranges.add(endKeyGroupBytes);
>   //
>   for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) {
>  db.deleteFilesInRanges(columnFamilyHandle, ranges, false);
>   }
> }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34050) Rocksdb state has space amplification after rescaling with DeleteRange

2024-02-05 Thread Stefan Richter (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17814312#comment-17814312
 ] 

Stefan Richter commented on FLINK-34050:


Just one idea: since the current proposal is making the rescaling times worse, 
it can have significant drawback. How about we call deleteFiles in the async 
part of the next checkpoint after a rescaling, thus making sure that the space 
amplification never makes it into the checkpoint and doing it outside of a 
critical path for restoring or processing. Wdyt?

> Rocksdb state has space amplification after rescaling with DeleteRange
> --
>
> Key: FLINK-34050
> URL: https://issues.apache.org/jira/browse/FLINK-34050
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Jinzhong Li
>Assignee: Jinzhong Li
>Priority: Major
> Attachments: image-2024-01-10-21-23-48-134.png, 
> image-2024-01-10-21-24-10-983.png, image-2024-01-10-21-28-24-312.png
>
>
> FLINK-21321 use deleteRange to speed up rocksdb rescaling, however it will 
> cause space amplification in some case.
> We can reproduce this problem using wordCount job:
> 1) before rescaling, state operator in wordCount job has 2 parallelism and 
> 4G+ full checkpoint size;
> !image-2024-01-10-21-24-10-983.png|width=266,height=130!
> 2) then restart job with 4 parallelism (for state operator),  the full 
> checkpoint size of new job will be 8G+ ;
> 3) after many successful checkpoints, the full checkpoint size is still 8G+;
> !image-2024-01-10-21-28-24-312.png|width=454,height=111!
>  
> The root cause of this issue is that the deleted keyGroupRange does not 
> overlap with current DB keyGroupRange, so new data written into rocksdb after 
> rescaling almost never do LSM compaction with the deleted data (belonging to 
> other keyGroupRange.)
>  
> And the space amplification may affect Rocksdb read performance and disk 
> space usage after rescaling. It looks like a regression due to the 
> introduction of deleteRange for rescaling optimization.
>  
> To slove this problem, I think maybe we can invoke 
> Rocksdb.deleteFilesInRanges after deleteRange?
> {code:java}
> public static void clipDBWithKeyGroupRange() {
>   //...
>   List ranges = new ArrayList<>();
>   //...
>   deleteRange(db, columnFamilyHandles, beginKeyGroupBytes, endKeyGroupBytes);
>   ranges.add(beginKeyGroupBytes);
>   ranges.add(endKeyGroupBytes);
>   //
>   for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) {
>  db.deleteFilesInRanges(columnFamilyHandle, ranges, false);
>   }
> }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34200) AutoRescalingITCase#testCheckpointRescalingInKeyedState fails

2024-01-26 Thread Stefan Richter (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17811194#comment-17811194
 ] 

Stefan Richter commented on FLINK-34200:


[~fanrui] I think you misunderstood my comment. I believe the problem is with 
the test code/test utils, not in Flink.

> AutoRescalingITCase#testCheckpointRescalingInKeyedState fails
> -
>
> Key: FLINK-34200
> URL: https://issues.apache.org/jira/browse/FLINK-34200
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.19.0
>Reporter: Matthias Pohl
>Priority: Major
>  Labels: test-stability
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56601=logs=8fd9202e-fd17-5b26-353c-ac1ff76c8f28=ea7cf968-e585-52cb-e0fc-f48de023a7ca=8200]
> {code:java}
> Jan 19 02:31:53 02:31:53.954 [ERROR] Tests run: 32, Failures: 1, Errors: 0, 
> Skipped: 0, Time elapsed: 1050 s <<< FAILURE! -- in 
> org.apache.flink.test.checkpointing.AutoRescalingITCase
> Jan 19 02:31:53 02:31:53.954 [ERROR] 
> org.apache.flink.test.checkpointing.AutoRescalingITCase.testCheckpointRescalingInKeyedState[backend
>  = rocksdb, buffersPerChannel = 2] -- Time elapsed: 59.10 s <<< FAILURE!
> Jan 19 02:31:53 java.lang.AssertionError: expected:<[(0,8000), (0,32000), 
> (0,48000), (0,72000), (1,78000), (1,3), (1,54000), (0,2000), (0,1), 
> (0,5), (0,66000), (0,74000), (0,82000), (1,8), (1,0), (1,16000), 
> (1,24000), (1,4), (1,56000), (1,64000), (0,12000), (0,28000), (0,52000), 
> (0,6), (0,68000), (0,76000), (1,18000), (1,26000), (1,34000), (1,42000), 
> (1,58000), (0,6000), (0,14000), (0,22000), (0,38000), (0,46000), (0,62000), 
> (0,7), (1,4000), (1,2), (1,36000), (1,44000)]> but was:<[(0,8000), 
> (0,32000), (0,48000), (0,72000), (1,78000), (1,3), (1,54000), (0,2000), 
> (0,1), (0,5), (0,66000), (0,74000), (0,82000), (1,8), (1,0), 
> (1,16000), (1,24000), (1,4), (1,56000), (1,64000), (0,12000), (0,28000), 
> (0,52000), (0,6), (0,68000), (0,76000), (0,1000), (0,25000), (0,33000), 
> (0,41000), (1,18000), (1,26000), (1,34000), (1,42000), (1,58000), (0,6000), 
> (0,14000), (0,22000), (0,38000), (0,46000), (0,62000), (0,7), (1,4000), 
> (1,2), (1,36000), (1,44000)]>
> Jan 19 02:31:53   at org.junit.Assert.fail(Assert.java:89)
> Jan 19 02:31:53   at org.junit.Assert.failNotEquals(Assert.java:835)
> Jan 19 02:31:53   at org.junit.Assert.assertEquals(Assert.java:120)
> Jan 19 02:31:53   at org.junit.Assert.assertEquals(Assert.java:146)
> Jan 19 02:31:53   at 
> org.apache.flink.test.checkpointing.AutoRescalingITCase.testCheckpointRescalingKeyedState(AutoRescalingITCase.java:296)
> Jan 19 02:31:53   at 
> org.apache.flink.test.checkpointing.AutoRescalingITCase.testCheckpointRescalingInKeyedState(AutoRescalingITCase.java:196)
> Jan 19 02:31:53   at java.lang.reflect.Method.invoke(Method.java:498)
> Jan 19 02:31:53   at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) 
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-32410) Allocate hash-based collections with sufficient capacity for expected size

2024-01-24 Thread Stefan Richter (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter resolved FLINK-32410.

Fix Version/s: 1.18.0
   (was: 1.19.0)
   Resolution: Done

> Allocate hash-based collections with sufficient capacity for expected size
> --
>
> Key: FLINK-32410
> URL: https://issues.apache.org/jira/browse/FLINK-32410
> Project: Flink
>  Issue Type: Improvement
>    Reporter: Stefan Richter
>        Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.18.0
>
>
> The JDK API to create hash-based collections for a certain capacity is 
> arguable misleading because it doesn't size the collections to "hold a 
> specific number of items" like you'd expect it would. Instead it sizes it to 
> hold load-factor% of the specified number.
> For the common pattern to allocate a hash-based collection with the size of 
> expected elements to avoid rehashes, this means that a rehash is essentially 
> guaranteed.
> We should introduce helper methods (similar to Guava's 
> `Maps.newHashMapWithExpectedSize(int)`) for allocations for expected size and 
> replace  the direct constructor calls with those.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32410) Allocate hash-based collections with sufficient capacity for expected size

2024-01-24 Thread Stefan Richter (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17810420#comment-17810420
 ] 

Stefan Richter commented on FLINK-32410:


Yes, it's already done.

> Allocate hash-based collections with sufficient capacity for expected size
> --
>
> Key: FLINK-32410
> URL: https://issues.apache.org/jira/browse/FLINK-32410
> Project: Flink
>  Issue Type: Improvement
>    Reporter: Stefan Richter
>        Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.19.0
>
>
> The JDK API to create hash-based collections for a certain capacity is 
> arguable misleading because it doesn't size the collections to "hold a 
> specific number of items" like you'd expect it would. Instead it sizes it to 
> hold load-factor% of the specified number.
> For the common pattern to allocate a hash-based collection with the size of 
> expected elements to avoid rehashes, this means that a rehash is essentially 
> guaranteed.
> We should introduce helper methods (similar to Guava's 
> `Maps.newHashMapWithExpectedSize(int)`) for allocations for expected size and 
> replace  the direct constructor calls with those.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-33696) FLIP-385: Add OpenTelemetryTraceReporter and OpenTelemetryMetricReporter

2024-01-24 Thread Stefan Richter (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter resolved FLINK-33696.

Resolution: Done

merged in 7db2ecad

> FLIP-385: Add OpenTelemetryTraceReporter and OpenTelemetryMetricReporter
> 
>
> Key: FLINK-33696
> URL: https://issues.apache.org/jira/browse/FLINK-33696
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Metrics
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
> Fix For: 1.19.0
>
>
> h1. Motivation
> [FLIP-384|https://cwiki.apache.org/confluence/display/FLINK/FLIP-384%3A+Introduce+TraceReporter+and+use+it+to+create+checkpointing+and+recovery+traces]
>  is adding TraceReporter interface. However with 
> [FLIP-384|https://cwiki.apache.org/confluence/display/FLINK/FLIP-384%3A+Introduce+TraceReporter+and+use+it+to+create+checkpointing+and+recovery+traces]
>  alone, Log4jTraceReporter would be the only available implementation of 
> TraceReporter interface, which is not very helpful.
> In this FLIP I’m proposing to contribute both MetricExporter and 
> TraceReporter implementation using OpenTelemetry.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34218) AutoRescalingITCase#testCheckpointRescalingInKeyedState fails

2024-01-24 Thread Stefan Richter (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17810313#comment-17810313
 ] 

Stefan Richter commented on FLINK-34218:


Hi, I also cannot reproduce the problem locally, but I'm rather confident that 
it's a test problem.

> AutoRescalingITCase#testCheckpointRescalingInKeyedState fails
> -
>
> Key: FLINK-34218
> URL: https://issues.apache.org/jira/browse/FLINK-34218
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Reporter: Rui Fan
>Priority: Major
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56740=logs=5c8e7682-d68f-54d1-16a2-a09310218a49=86f654fa-ab48-5c1a-25f4-7e7f6afb9bba=ae4f8708-9994-57d3-c2d7-b892156e7812



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34199) Add tracing for durations of rescaling/restoring RocksDB incremental checkpoints from downloaded and local state

2024-01-23 Thread Stefan Richter (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter updated FLINK-34199:
---
Summary: Add tracing for durations of rescaling/restoring RocksDB 
incremental checkpoints from downloaded and local state  (was: Add tracing for 
durations of rescaling/restoring from local state)

> Add tracing for durations of rescaling/restoring RocksDB incremental 
> checkpoints from downloaded and local state
> 
>
> Key: FLINK-34199
> URL: https://issues.apache.org/jira/browse/FLINK-34199
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.19.0
>    Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> Adds tracing for durations of rescaling/restoring from local state. This 
> enables more fine grained monitoring of restore operations.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34199) Add tracing for durations of rescaling/restoring from local state

2024-01-22 Thread Stefan Richter (Jira)
Stefan Richter created FLINK-34199:
--

 Summary: Add tracing for durations of rescaling/restoring from 
local state
 Key: FLINK-34199
 URL: https://issues.apache.org/jira/browse/FLINK-34199
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Affects Versions: 1.19.0
Reporter: Stefan Richter
Assignee: Stefan Richter


Adds tracing for durations of rescaling/restoring from local state. This 
enables more fine grained monitoring of restore operations.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34199) Add tracing for durations of rescaling/restoring from local state

2024-01-22 Thread Stefan Richter (Jira)
Stefan Richter created FLINK-34199:
--

 Summary: Add tracing for durations of rescaling/restoring from 
local state
 Key: FLINK-34199
 URL: https://issues.apache.org/jira/browse/FLINK-34199
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Affects Versions: 1.19.0
Reporter: Stefan Richter
Assignee: Stefan Richter


Adds tracing for durations of rescaling/restoring from local state. This 
enables more fine grained monitoring of restore operations.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34134) Add tracing for restored state size and locations

2024-01-17 Thread Stefan Richter (Jira)
Stefan Richter created FLINK-34134:
--

 Summary: Add tracing for restored state size and locations
 Key: FLINK-34134
 URL: https://issues.apache.org/jira/browse/FLINK-34134
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Task
Reporter: Stefan Richter
Assignee: Stefan Richter


We can add tracing during the restore that reports the state size that was 
restored by location(s). This is particularly interesting for a mixed recovery 
with some local and some remote state.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34134) Add tracing for restored state size and locations

2024-01-17 Thread Stefan Richter (Jira)
Stefan Richter created FLINK-34134:
--

 Summary: Add tracing for restored state size and locations
 Key: FLINK-34134
 URL: https://issues.apache.org/jira/browse/FLINK-34134
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Task
Reporter: Stefan Richter
Assignee: Stefan Richter


We can add tracing during the restore that reports the state size that was 
restored by location(s). This is particularly interesting for a mixed recovery 
with some local and some remote state.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32444) Enable object reuse for Flink SQL jobs by default

2023-11-30 Thread Stefan Richter (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17791536#comment-17791536
 ] 

Stefan Richter commented on FLINK-32444:


[~pnowojski] if there really is an issue with heap backend, then we also need 
to be careful about what type of caching we can build for RocksDB in the future.

> Enable object reuse for Flink SQL jobs by default
> -
>
> Key: FLINK-32444
> URL: https://issues.apache.org/jira/browse/FLINK-32444
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Reporter: Jark Wu
>Priority: Major
> Fix For: 1.19.0
>
>
> Currently, object reuse is not enabled by default for Flink Streaming Jobs, 
> but is enabled by default for Flink Batch jobs. That is not consistent for 
> stream-batch unification. Besides, SQL operators are safe to enable object 
> reuse and this is a great performance improvement for SQL jobs. 
> We should also be careful with the Table-DataStream conversion case 
> (StreamTableEnvironment) which is not safe to enable object reuse by default. 
> Maybe we can just enable it for SQL Client/Gateway and TableEnvironment. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] FLIP-386: Support adding custom metrics in Recovery Spans

2023-11-29 Thread Stefan Richter
+1 (binding)

Best,
Stefan


> On 23. Nov 2023, at 09:49, Roman Khachatryan  wrote:
> 
> +1 (binding)
> 
> Regards,
> Roman
> 
> 
> On Wed, Nov 22, 2023 at 12:55 PM Rui Fan <1996fan...@gmail.com 
> > wrote:
> 
>> +1(binding)
>> 
>> Thanks for driving this  proposal!
>> 
>> Best,
>> Rui
>> 
>> On Wed, Nov 22, 2023 at 7:44 PM Piotr Nowojski 
>> wrote:
>> 
>>> Hi All,
>>> 
>>> I'd like to start a vote on the FLIP-386: Support adding custom metrics
>> in
>>> Recovery Spans [1]. The discussion thread is here [2].
>>> 
>>> The vote will be open for at least 72 hours unless there is an objection
>> or
>>> not enough votes.
>>> 
>>> [1] 
>>> https://www.google.com/url?q=https://cwiki.apache.org/confluence/x/VAuZE=gmail-imap=170133425800=AOvVaw3oQ0TtRfjerwf5nJ0hQkPA
>>> [2] 
>>> https://www.google.com/url?q=https://lists.apache.org/thread/zt4ykyhv6cco83j9hjngn52b1oprj1tv=gmail-imap=170133425800=AOvVaw0LqLxSwykk0rLpBTXAB0WA



Re: [VOTE] FLIP-385: Add OpenTelemetryTraceReporter and OpenTelemetryMetricReporter

2023-11-29 Thread Stefan Richter
+1 (binding)

Best,
Stefan


> On 22. Nov 2023, at 12:34, Jing Ge  wrote:
> 
> +1(binding)
> Thanks!
> 
> Best Regards,
> Jing
> 
> On Wed, Nov 22, 2023 at 11:21 AM Roman Khachatryan  > wrote:
> 
>> +1 (binding)
>> 
>> Regards,
>> Roman
>> 
>> On Wed, Nov 22, 2023, 7:30 AM Hangxiang Yu  wrote:
>> 
>>> +1(binding)
>>> 
>>> On Wed, Nov 22, 2023 at 10:29 AM Rui Fan <1996fan...@gmail.com> wrote:
>>> 
 +1(binding)
 
 Best,
 Rui
 
 On Wed, Nov 22, 2023 at 1:20 AM Piotr Nowojski 
 wrote:
 
> Hi All,
> 
> I'd like to start a vote on the FLIP-385: Add
>>> OpenTelemetryTraceReporter
> and OpenTelemetryMetricReporter [1]. The discussion thread is here
>> [2].
> 
> The vote will be open for at least 72 hours unless there is an
>>> objection
 or
> not enough votes.
> 
> [1] 
> https://www.google.com/url?q=https://cwiki.apache.org/confluence/x/UAuZE=gmail-imap=170125778900=AOvVaw2pEEbI5Qe9cEDMKKf77bb-
> [2] 
> https://www.google.com/url?q=https://lists.apache.org/thread/1rqp8czz8wnplpzgn8m4qmzvf14lyx0k=gmail-imap=170125778900=AOvVaw1prt9Abiq4GEXW4zLSIEaW
> 
> 
> Best,
> Piotrek
> 
 
>>> 
>>> 
>>> --
>>> Best,
>>> Hangxiang.



Re: [VOTE] FLIP-384: Introduce TraceReporter and use it to create checkpointing and recovery traces

2023-11-29 Thread Stefan Richter
+1 (binding)

Best,
Stefan


> On 22. Nov 2023, at 11:20, Roman Khachatryan  wrote:
> 
> +1 (binding)
> 
> Regards,
> Roman
> 
> On Wed, Nov 22, 2023, 7:08 AM Zakelly Lan  > wrote:
> 
>> +1(non-binding)
>> 
>> Best,
>> Zakelly
>> 
>> On Wed, Nov 22, 2023 at 3:04 PM Hangxiang Yu  wrote:
>> 
>>> +1 (binding)
>>> Thanks for driving this again!
>>> 
>>> On Wed, Nov 22, 2023 at 10:30 AM Rui Fan <1996fan...@gmail.com> wrote:
>>> 
 +1(binding)
 
 Best,
 Rui
 
 On Wed, Nov 22, 2023 at 6:43 AM Jing Ge 
 wrote:
 
> +1(binding) Thanks!
> 
> Best regards,
> Jing
> 
> On Tue, Nov 21, 2023 at 6:17 PM Piotr Nowojski >> 
> wrote:
> 
>> Hi All,
>> 
>> I'd like to start a vote on the FLIP-384: Introduce TraceReporter
>> and
 use
>> it to create checkpointing and recovery traces [1]. The discussion
 thread
>> is here [2].
>> 
>> The vote will be open for at least 72 hours unless there is an
 objection
> or
>> not enough votes.
>> 
>> [1] 
>> https://www.google.com/url?q=https://cwiki.apache.org/confluence/x/TguZE=gmail-imap=170125329000=AOvVaw3QR8LNFApod9Cz_gw2y64w
>> [2]
>> https://www.google.com/url?q=https://lists.apache.org/thread/7lql5f5q1np68fw1wc9trq3d9l2ox8f4=gmail-imap=170125329000=AOvVaw28yzl2wfrtrnoPLsdLW-7q
>> 
>> 
>> Best,
>> Piotrek
>> 
> 
 
>>> 
>>> 
>>> --
>>> Best,
>>> Hangxiang.



[jira] [Closed] (FLINK-33341) Use available local keyed state for rescaling

2023-10-27 Thread Stefan Richter (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33341?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-33341.
--
Resolution: Fixed

merged in a4ad86f 

> Use available local keyed state for rescaling
> -
>
> Key: FLINK-33341
> URL: https://issues.apache.org/jira/browse/FLINK-33341
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>        Reporter: Stefan Richter
>    Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> Local state is currently only used for recovery. However, it would make sense 
> to also use available local state in rescaling scenarios to reduce the amount 
> of data to download from remote storage.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33341) Use available local keyed state for rescaling

2023-10-26 Thread Stefan Richter (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33341?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter updated FLINK-33341:
---
Summary: Use available local keyed state for rescaling  (was: Use available 
local state for rescaling)

> Use available local keyed state for rescaling
> -
>
> Key: FLINK-33341
> URL: https://issues.apache.org/jira/browse/FLINK-33341
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>        Reporter: Stefan Richter
>    Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> Local state is currently only used for recovery. However, it would make sense 
> to also use available local state in rescaling scenarios to reduce the amount 
> of data to download from remote storage.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33341) Use available local state for rescaling

2023-10-24 Thread Stefan Richter (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17779129#comment-17779129
 ] 

Stefan Richter commented on FLINK-33341:


FYI, here is a link to the development branch: 
https://github.com/apache/flink/compare/master...StefanRRichter:flink:srichter-local-rescaling-FLINK-33341

> Use available local state for rescaling
> ---
>
> Key: FLINK-33341
> URL: https://issues.apache.org/jira/browse/FLINK-33341
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>        Reporter: Stefan Richter
>    Assignee: Stefan Richter
>Priority: Major
>
> Local state is currently only used for recovery. However, it would make sense 
> to also use available local state in rescaling scenarios to reduce the amount 
> of data to download from remote storage.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33341) Use available local state for rescaling

2023-10-24 Thread Stefan Richter (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17779124#comment-17779124
 ] 

Stefan Richter commented on FLINK-33341:


[~Yanfei Lei], yes only the previous local state is available to be used in 
rescaling, so we might still need to download additional state from remote. But 
oftentimes we don't need to download everything from remote, in particular if 
we scale out we will often find the complete state locally on some machines and 
just need to drop some key-groups. And for scale-in, we should at least find 
one piece of the state locally. There is no good reason not to 
opportunistically use local state also in rescaling scenarios. No change to the 
scheduler will be needed.

> Use available local state for rescaling
> ---
>
> Key: FLINK-33341
> URL: https://issues.apache.org/jira/browse/FLINK-33341
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>        Reporter: Stefan Richter
>    Assignee: Stefan Richter
>Priority: Major
>
> Local state is currently only used for recovery. However, it would make sense 
> to also use available local state in rescaling scenarios to reduce the amount 
> of data to download from remote storage.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33341) Use available local state for rescaling

2023-10-23 Thread Stefan Richter (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17778677#comment-17778677
 ] 

Stefan Richter commented on FLINK-33341:


Exactly, we support local state for recovery - but not for rescaling, yet.

> Use available local state for rescaling
> ---
>
> Key: FLINK-33341
> URL: https://issues.apache.org/jira/browse/FLINK-33341
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>        Reporter: Stefan Richter
>    Assignee: Stefan Richter
>Priority: Major
>
> Local state is currently only used for recovery. However, it would make sense 
> to also use available local state in rescaling scenarios to reduce the amount 
> of data to download from remote storage.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33341) Use available local state for rescaling

2023-10-23 Thread Stefan Richter (Jira)
Stefan Richter created FLINK-33341:
--

 Summary: Use available local state for rescaling
 Key: FLINK-33341
 URL: https://issues.apache.org/jira/browse/FLINK-33341
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Reporter: Stefan Richter
Assignee: Stefan Richter


Local state is currently only used for recovery. However, it would make sense 
to also use available local state in rescaling scenarios to reduce the amount 
of data to download from remote storage.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33341) Use available local state for rescaling

2023-10-23 Thread Stefan Richter (Jira)
Stefan Richter created FLINK-33341:
--

 Summary: Use available local state for rescaling
 Key: FLINK-33341
 URL: https://issues.apache.org/jira/browse/FLINK-33341
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Reporter: Stefan Richter
Assignee: Stefan Richter


Local state is currently only used for recovery. However, it would make sense 
to also use available local state in rescaling scenarios to reduce the amount 
of data to download from remote storage.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33246) Add RescalingIT case that uses checkpoints and resource requests

2023-10-11 Thread Stefan Richter (Jira)
Stefan Richter created FLINK-33246:
--

 Summary: Add RescalingIT case that uses checkpoints and resource 
requests
 Key: FLINK-33246
 URL: https://issues.apache.org/jira/browse/FLINK-33246
 Project: Flink
  Issue Type: Improvement
  Components: Tests
Reporter: Stefan Richter
Assignee: Stefan Richter


RescalingITCase currently uses savepoints and cancel/restart for rescaling. We 
should add a test that also tests rescaling from checkpoints under changing 
resource requirements, i.e. without cancelation of the job.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33246) Add RescalingIT case that uses checkpoints and resource requests

2023-10-11 Thread Stefan Richter (Jira)
Stefan Richter created FLINK-33246:
--

 Summary: Add RescalingIT case that uses checkpoints and resource 
requests
 Key: FLINK-33246
 URL: https://issues.apache.org/jira/browse/FLINK-33246
 Project: Flink
  Issue Type: Improvement
  Components: Tests
Reporter: Stefan Richter
Assignee: Stefan Richter


RescalingITCase currently uses savepoints and cancel/restart for rescaling. We 
should add a test that also tests rescaling from checkpoints under changing 
resource requirements, i.e. without cancelation of the job.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33077) Minimize the risk of hard back-pressure with buffer debloating enabled

2023-09-12 Thread Stefan Richter (Jira)
Stefan Richter created FLINK-33077:
--

 Summary: Minimize the risk of hard back-pressure with buffer 
debloating enabled
 Key: FLINK-33077
 URL: https://issues.apache.org/jira/browse/FLINK-33077
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing
Reporter: Stefan Richter
Assignee: Stefan Richter
 Fix For: 1.18.0


{*}Problem{*}:
Buffer debloating sets buffer size to {{256}} bytes because of back-pressure.
Such small buffers might not be enough to emit the processing results of a 
single record. The task thread would request new buffers, and often block.
That results in significant checkpoint delays (up to minutes instead of 
seconds).

Adding more overdraft buffers helps, but depends on the job DoP
Raising {{taskmanager.memory.min-segment-size}} from {{256}} helps, but depends 
on the multiplication factor of the operator.

{*}Solution{*}:
 * Ignore Buffer Debloater hints and extend the buffer if possible - when this 
prevents emitting an output record fully AND this is the last available buffer.
 * Prevent the subsequent flush of the buffer so that more output records can 
be emitted (flatMap-like and join operators)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33077) Minimize the risk of hard back-pressure with buffer debloating enabled

2023-09-12 Thread Stefan Richter (Jira)
Stefan Richter created FLINK-33077:
--

 Summary: Minimize the risk of hard back-pressure with buffer 
debloating enabled
 Key: FLINK-33077
 URL: https://issues.apache.org/jira/browse/FLINK-33077
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing
Reporter: Stefan Richter
Assignee: Stefan Richter
 Fix For: 1.18.0


{*}Problem{*}:
Buffer debloating sets buffer size to {{256}} bytes because of back-pressure.
Such small buffers might not be enough to emit the processing results of a 
single record. The task thread would request new buffers, and often block.
That results in significant checkpoint delays (up to minutes instead of 
seconds).

Adding more overdraft buffers helps, but depends on the job DoP
Raising {{taskmanager.memory.min-segment-size}} from {{256}} helps, but depends 
on the multiplication factor of the operator.

{*}Solution{*}:
 * Ignore Buffer Debloater hints and extend the buffer if possible - when this 
prevents emitting an output record fully AND this is the last available buffer.
 * Prevent the subsequent flush of the buffer so that more output records can 
be emitted (flatMap-like and join operators)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-357: Deprecate Iteration API of DataStream

2023-09-06 Thread Stefan Richter

+1

Thanks,
Stefan


> On 5. Sep 2023, at 10:45, David Morávek  wrote:
> 
> +1 since there is an alternative, more complete implementation available
> 
> Best,
> D.
> 
> On Sat, Sep 2, 2023 at 12:07 AM David Anderson  > wrote:
> 
>> +1
>> 
>> Keeping the legacy implementation in place is confusing and encourages
>> adoption of something that really shouldn't be used.
>> 
>> Thanks for driving this,
>> David
>> 
>> On Fri, Sep 1, 2023 at 8:45 AM Jing Ge  wrote:
>>> 
>>> Hi Wencong,
>>> 
>>> Thanks for your clarification! +1
>>> 
>>> Best regards,
>>> Jing
>>> 
>>> On Fri, Sep 1, 2023 at 12:36 PM Wencong Liu 
>> wrote:
>>> 
 Hi Jing,
 
 
 Thanks for your reply!
 
 
> Or the "independent module extraction" mentioned in the FLIP does
>> mean an
 independent module in Flink?
 
 
 Yes. If there are submodules in Flink repository needs the iteration
 (currently not),
 we could consider extracting them to a new submodule of Flink.
 
 
> users will have to add one more dependency of Flink ML. If iteration
>> is
 the
 only feature they need, it will look a little bit weird.
 
 
 If users only need to execute iteration jobs, they can simply remove
>> the
 Flink
 dependency and add the necessary dependencies related to Flink ML.
 However,
 they can still utilize the DataStream API as it is also a dependency of
 Flink ML.
 
 
 Keeping an iteration submodule in Flink repository and make Flink ML
 depends it
 is also another solution. But the current implementation of Iteration
>> in
 DataStream
 should be removed definitely due to its Incompleteness.
 
 
 The placement of the Iteration API in the repository is a topic that
>> has
 multiple
 potential solutions. WDYT?
 
 
 Best,
 Wencong
 
 
 
 
 
 
 
 
 
 
 
 At 2023-09-01 17:59:34, "Jing Ge"  wrote:
> Hi Wencong,
> 
> Thanks for the proposal!
> 
> "The Iteration API in DataStream is planned be deprecated in Flink
>> 1.19
 and
> then finally removed in Flink 2.0. For the users that rely on the
 Iteration
> API in DataStream, they will have to migrate to Flink ML."
> - Does it make sense to migrate the iteration module into Flink
>> directly?
> Or the "independent module extraction" mentioned in the FLIP does
>> mean an
> independent module in Flink? Since the iteration will be removed in
>> Flink,
> users will have to add one more dependency of Flink ML. If iteration
>> is
 the
> only feature they need, it will look a little bit weird.
> 
> 
> Best regards,
> Jing
> 
> On Fri, Sep 1, 2023 at 11:05 AM weijie guo >> 
> wrote:
> 
>> Thanks, +1 for this.
>> 
>> Best regards,
>> 
>> Weijie
>> 
>> 
>> Yangze Guo  于2023年9月1日周五 14:29写道:
>> 
>>> +1
>>> 
>>> Thanks for driving this.
>>> 
>>> Best,
>>> Yangze Guo
>>> 
>>> On Fri, Sep 1, 2023 at 2:00 PM Xintong Song <
>> tonysong...@gmail.com>
>> wrote:
 
 +1
 
 Best,
 
 Xintong
 
 
 
 On Fri, Sep 1, 2023 at 1:11 PM Dong Lin 
 wrote:
 
> Thanks Wencong for initiating the discussion.
> 
> +1 for the proposal.
> 
> On Fri, Sep 1, 2023 at 12:00 PM Wencong Liu <
>> liuwencle...@163.com
> 
>>> wrote:
> 
>> Hi devs,
>> 
>> I would like to start a discussion on FLIP-357: Deprecate
 Iteration
>>> API
> of
>> DataStream [1].
>> 
>> Currently, the Iteration API of DataStream is incomplete.
>> For
>>> instance,
> it
>> lacks support
>> for iteration in sync mode and exactly once semantics.
>> Additionally,
>>> it
>> does not offer the
>> ability to set iteration termination conditions. As a
>> result,
 it's
>>> hard
>> for developers to
>> build an iteration pipeline by DataStream in the practical
>>> applications
>> such as machine learning.
>> 
>> FLIP-176: Unified Iteration to Support Algorithms [2] has
>> introduced
>>> a
>> unified iteration library
>> in the Flink ML repository. This library addresses all the
 issues
>>> present
>> in the Iteration API of
>> DataStream and could provide solution for all the iteration
>>> use-cases.
>> However, maintaining two
>> separate implementations of iteration in both the Flink
 repository
>>> and
> the
>> Flink ML repository
>> would introduce unnecessary complexity and make it
>> difficult to
>>> maintain
>> the Iteration API.
>> 
>> As such I propose 

[jira] [Updated] (FLINK-32782) Release Testing: Disable WAL in RocksDBWriteBatchWrapper by default

2023-08-29 Thread Stefan Richter (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32782?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter updated FLINK-32782:
---
Description: 
Covered by nightly tests, for example 

- run_test "Resuming Externalized Checkpoint (rocks, incremental, no 
parallelism change) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 rocks 
true true" "skip_check_exceptions"
- run_test "Resuming Externalized Checkpoint (rocks, incremental, scale up) 
end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 4 rocks 
true true" "skip_check_exceptions"
- run_test "Resuming Externalized Checkpoint (rocks, incremental, scale down) 
end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 4 2 rocks 
true true" "skip_check_exceptions"

> Release Testing:  Disable WAL in RocksDBWriteBatchWrapper by default
> 
>
> Key: FLINK-32782
> URL: https://issues.apache.org/jira/browse/FLINK-32782
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.18.0
>Reporter: Qingsheng Ren
>Priority: Major
> Fix For: 1.18.0
>
>
> Covered by nightly tests, for example 
> - run_test "Resuming Externalized Checkpoint (rocks, incremental, no 
> parallelism change) end-to-end test" 
> "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 
> rocks true true" "skip_check_exceptions"
> - run_test "Resuming Externalized Checkpoint (rocks, incremental, scale up) 
> end-to-end test" 
> "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 4 
> rocks true true" "skip_check_exceptions"
> - run_test "Resuming Externalized Checkpoint (rocks, incremental, scale down) 
> end-to-end test" 
> "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 4 2 
> rocks true true" "skip_check_exceptions"



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32783) Release Testing: Improve parallel download of RocksDB incremental state

2023-08-29 Thread Stefan Richter (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32783?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter updated FLINK-32783:
---
Description: 
This feature is automatically used whenever we download state during a restart 
from a RocksDB incremental checkpoint. This should be tested with and without 
task-local recovery.

Will be covered by the nightly tests:

* run_test "Resuming Externalized Checkpoint (rocks, incremental, no 
parallelism change) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 rocks 
true true" "skip_check_exceptions"
* run_test "Resuming Externalized Checkpoint (rocks, incremental, scale up) 
end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 4 rocks 
true true" "skip_check_exceptions"
* run_test "Resuming Externalized Checkpoint (rocks, incremental, scale 
down) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 4 2 rocks 
true true" "skip_check_exceptions"

  was:This feature is automatically used whenever we download state during a 
restart from a RocksDB incremental checkpoint. This should be tested with and 
without task-local recovery.


> Release Testing: Improve parallel download of RocksDB incremental state
> ---
>
> Key: FLINK-32783
> URL: https://issues.apache.org/jira/browse/FLINK-32783
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.18.0
>Reporter: Qingsheng Ren
>Priority: Major
> Fix For: 1.18.0
>
>
> This feature is automatically used whenever we download state during a 
> restart from a RocksDB incremental checkpoint. This should be tested with and 
> without task-local recovery.
> Will be covered by the nightly tests:
> * run_test "Resuming Externalized Checkpoint (rocks, incremental, no 
> parallelism change) end-to-end test" 
> "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 
> rocks true true" "skip_check_exceptions"
> * run_test "Resuming Externalized Checkpoint (rocks, incremental, scale 
> up) end-to-end test" 
> "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 4 
> rocks true true" "skip_check_exceptions"
> * run_test "Resuming Externalized Checkpoint (rocks, incremental, scale 
> down) end-to-end test" 
> "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 4 2 
> rocks true true" "skip_check_exceptions"



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32783) Release Testing: Improve parallel download of RocksDB incremental state

2023-08-23 Thread Stefan Richter (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32783?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter updated FLINK-32783:
---
Description: This feature is automatically used whenever we download state 
during a restart from a RocksDB incremental checkpoint. This should be tested 
with and without task-local recovery.

> Release Testing: Improve parallel download of RocksDB incremental state
> ---
>
> Key: FLINK-32783
> URL: https://issues.apache.org/jira/browse/FLINK-32783
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.18.0
>Reporter: Qingsheng Ren
>Priority: Major
> Fix For: 1.18.0
>
>
> This feature is automatically used whenever we download state during a 
> restart from a RocksDB incremental checkpoint. This should be tested with and 
> without task-local recovery.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-32681) RocksDBStateDownloaderTest.testMultiThreadCleanupOnFailure unstablie

2023-08-01 Thread Stefan Richter (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32681?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-32681.
--
Resolution: Fixed

merged into master d11cc32

> RocksDBStateDownloaderTest.testMultiThreadCleanupOnFailure unstablie
> 
>
> Key: FLINK-32681
> URL: https://issues.apache.org/jira/browse/FLINK-32681
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends, Tests
>Affects Versions: 1.18.0
>Reporter: Chesnay Schepler
>    Assignee: Stefan Richter
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.18.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=51712=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=125e07e7-8de0-5c6c-a541-a567415af3ef
> Failed 3 times in yesterdays nightly run.
> {code}
> Jul 26 01:12:46 01:12:46.889 [ERROR] 
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloaderTest.testMultiThreadCleanupOnFailure
>   Time elapsed: 0.044 s  <<< FAILURE!
> Jul 26 01:12:46 java.lang.AssertionError
> Jul 26 01:12:46   at org.junit.Assert.fail(Assert.java:87)
> Jul 26 01:12:46   at org.junit.Assert.assertTrue(Assert.java:42)
> Jul 26 01:12:46   at org.junit.Assert.assertFalse(Assert.java:65)
> Jul 26 01:12:46   at org.junit.Assert.assertFalse(Assert.java:75)
> Jul 26 01:12:46   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloaderTest.testMultiThreadCleanupOnFailure(RocksDBStateDownloaderTest.java:151)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32681) RocksDBStateDownloaderTest.testMultiThreadCleanupOnFailure unstablie

2023-08-01 Thread Stefan Richter (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17749599#comment-17749599
 ] 

Stefan Richter commented on FLINK-32681:


[~Feifan Wang]Thanks for the offer but I've already worked on a fix yesterday. 
But please feel free to review it later.

> RocksDBStateDownloaderTest.testMultiThreadCleanupOnFailure unstablie
> 
>
> Key: FLINK-32681
> URL: https://issues.apache.org/jira/browse/FLINK-32681
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends, Tests
>Affects Versions: 1.18.0
>Reporter: Chesnay Schepler
>    Assignee: Stefan Richter
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.18.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=51712=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=125e07e7-8de0-5c6c-a541-a567415af3ef
> Failed 3 times in yesterdays nightly run.
> {code}
> Jul 26 01:12:46 01:12:46.889 [ERROR] 
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloaderTest.testMultiThreadCleanupOnFailure
>   Time elapsed: 0.044 s  <<< FAILURE!
> Jul 26 01:12:46 java.lang.AssertionError
> Jul 26 01:12:46   at org.junit.Assert.fail(Assert.java:87)
> Jul 26 01:12:46   at org.junit.Assert.assertTrue(Assert.java:42)
> Jul 26 01:12:46   at org.junit.Assert.assertFalse(Assert.java:65)
> Jul 26 01:12:46   at org.junit.Assert.assertFalse(Assert.java:75)
> Jul 26 01:12:46   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloaderTest.testMultiThreadCleanupOnFailure(RocksDBStateDownloaderTest.java:151)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32681) RocksDBStateDownloaderTest.testMultiThreadCleanupOnFailure unstablie

2023-07-31 Thread Stefan Richter (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17749247#comment-17749247
 ] 

Stefan Richter commented on FLINK-32681:


I'm running on Mac, but with a sleep I already managed to reproduce it. I'll 
take a look what's going on.

> RocksDBStateDownloaderTest.testMultiThreadCleanupOnFailure unstablie
> 
>
> Key: FLINK-32681
> URL: https://issues.apache.org/jira/browse/FLINK-32681
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends, Tests
>Affects Versions: 1.18.0
>Reporter: Chesnay Schepler
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.18.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=51712=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=125e07e7-8de0-5c6c-a541-a567415af3ef
> Failed 3 times in yesterdays nightly run.
> {code}
> Jul 26 01:12:46 01:12:46.889 [ERROR] 
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloaderTest.testMultiThreadCleanupOnFailure
>   Time elapsed: 0.044 s  <<< FAILURE!
> Jul 26 01:12:46 java.lang.AssertionError
> Jul 26 01:12:46   at org.junit.Assert.fail(Assert.java:87)
> Jul 26 01:12:46   at org.junit.Assert.assertTrue(Assert.java:42)
> Jul 26 01:12:46   at org.junit.Assert.assertFalse(Assert.java:65)
> Jul 26 01:12:46   at org.junit.Assert.assertFalse(Assert.java:75)
> Jul 26 01:12:46   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloaderTest.testMultiThreadCleanupOnFailure(RocksDBStateDownloaderTest.java:151)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32681) RocksDBStateDownloaderTest.testMultiThreadCleanupOnFailure unstablie

2023-07-31 Thread Stefan Richter (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17749203#comment-17749203
 ] 

Stefan Richter commented on FLINK-32681:


[~mapohl] I was trying to reproduce this locally without success, could it be 
an infra related problem? The code is calling 
`FileUtils::deleteDirectoryQuietly` to cleanup files and if something goes 
wrong during the deletion, it could still find the directories and fail the 
test. I could also just try to test if delete was called and not if the files 
where actually deleted to abstract away from such infra problems, wdyt?

> RocksDBStateDownloaderTest.testMultiThreadCleanupOnFailure unstablie
> 
>
> Key: FLINK-32681
> URL: https://issues.apache.org/jira/browse/FLINK-32681
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends, Tests
>Affects Versions: 1.18.0
>Reporter: Chesnay Schepler
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.18.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=51712=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=125e07e7-8de0-5c6c-a541-a567415af3ef
> Failed 3 times in yesterdays nightly run.
> {code}
> Jul 26 01:12:46 01:12:46.889 [ERROR] 
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloaderTest.testMultiThreadCleanupOnFailure
>   Time elapsed: 0.044 s  <<< FAILURE!
> Jul 26 01:12:46 java.lang.AssertionError
> Jul 26 01:12:46   at org.junit.Assert.fail(Assert.java:87)
> Jul 26 01:12:46   at org.junit.Assert.assertTrue(Assert.java:42)
> Jul 26 01:12:46   at org.junit.Assert.assertFalse(Assert.java:65)
> Jul 26 01:12:46   at org.junit.Assert.assertFalse(Assert.java:75)
> Jul 26 01:12:46   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloaderTest.testMultiThreadCleanupOnFailure(RocksDBStateDownloaderTest.java:151)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS][2.0] FLIP-349: Move RocksDB statebackend classes to o.a.f.state.rocksdb package

2023-07-25 Thread Stefan Richter

+1



> On 24. Jul 2023, at 12:25, Chesnay Schepler  wrote:
> 
> To properly reflect the state of the rocksdb statebackend I propose to move 
> all classes in the state-backend-rocksdb module under the classes to 
> o.a.f.state.rocksdb package.
> 
> 
> https://www.google.com/url?q=https://cwiki.apache.org/confluence/display/FLINK/FLIP-349%253A%2BMove%2BRocksDB%2Bstatebackend%2Bclasses%2Bto%2Bo.a.f.state.rocksdb%2Bpackage=gmail-imap=169079912800=AOvVaw3OiTwgsLEiTcJpNTVM-Y8y
>  



Re: [VOTE] FLIP-309: Support using larger checkpointing interval when source is processing backlog

2023-07-20 Thread Stefan Richter
+1 (binding)

Thanks,
Stefan


> On 20. Jul 2023, at 05:24, Zhu Zhu  wrote:
> 
> +1 (binding)
> 
> Thanks,
> Zhu
> 
> yuxia mailto:luoyu...@alumni.sjtu.edu.cn>> 
> 于2023年7月20日周四 09:23写道:
>> 
>> +1(binding)
>> 
>> Best regards,
>> Yuxia
>> 
>> - 原始邮件 -
>> 发件人: "Guowei Ma" 
>> 收件人: "dev" 
>> 发送时间: 星期三, 2023年 7 月 19日 下午 1:54:52
>> 主题: Re: [VOTE] FLIP-309: Support using larger checkpointing interval when 
>> source is processing backlog
>> 
>> +1(binding)
>> Best,
>> Guowei
>> 
>> 
>> On Wed, Jul 19, 2023 at 11:18 AM Hang Ruan  wrote:
>> 
>>> +1 (non-binding)
>>> 
>>> Thanks for driving.
>>> 
>>> Best,
>>> Hang
>>> 
>>> Leonard Xu  于2023年7月19日周三 10:42写道:
>>> 
 Thanks Dong for the continuous work.
 
 +1(binding)
 
 Best,
 Leonard
 
> On Jul 18, 2023, at 10:16 PM, Jingsong Li 
 wrote:
> 
> +1 binding
> 
> Thanks Dong for continuous driving.
> 
> Best,
> Jingsong
> 
> On Tue, Jul 18, 2023 at 10:04 PM Jark Wu  wrote:
>> 
>> +1 (binding)
>> 
>> Best,
>> Jark
>> 
>> On Tue, 18 Jul 2023 at 20:30, Piotr Nowojski 
 wrote:
>> 
>>> +1 (binding)
>>> 
>>> Piotrek
>>> 
>>> wt., 18 lip 2023 o 08:51 Jing Ge 
 napisał(a):
>>> 
 +1(binding)
 
 Best regards,
 Jing
 
 On Tue, Jul 18, 2023 at 8:31 AM Rui Fan <1996fan...@gmail.com>
>>> wrote:
 
> +1(binding)
> 
> Best,
> Rui Fan
> 
> 
> On Tue, Jul 18, 2023 at 12:04 PM Dong Lin 
 wrote:
> 
>> Hi all,
>> 
>> We would like to start the vote for FLIP-309: Support using larger
>> checkpointing interval when source is processing backlog [1]. This
>>> FLIP
> was
>> discussed in this thread [2].
>> 
>> The vote will be open until at least July 21st (at least 72
>>> hours),
>> following
>> the consensus voting process.
>> 
>> Cheers,
>> Yunfeng and Dong
>> 
>> [1] 
>> https://www.google.com/url?q=https://cwiki.apache.org/confluence/display/FLINK/FLIP-309=gmail-imap=169042832200=AOvVaw1wfp503kSgJUL104d3jGMP
>> 
>> 
> 
 
>>> 
 
>>> %3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
>> [2]
 https://www.google.com/url?q=https://lists.apache.org/thread/l1l7f30h7zldjp6ow97y70dcthx7tl37=gmail-imap=169042832200=AOvVaw3vOlMrVETPjeFtQe-wZ_7I



Re: [DISCUSS] FLIP-325: Support configuring end-to-end allowed latency

2023-07-10 Thread Stefan Richter
Hi,

After reading through the discussion, I think the FLIP should provide 
additional details and explanations about the exact semantics of the end-to-end 
latency configuration and how it interacts with all other configurations around 
latency, such as checkpointing. In this context, I have a few more questions:

1. If the idea of the configuration is to enable operators to apply some 
batching as optimization within the bounds of the configured latency, how would 
we use that information to globally optimize the operators in a job? Today, no 
operator knows about the assumed latency of its downstream operators. So if the 
goal is to provide a target latency for the whole pipeline, how do we plan to 
split the “budget” among all operators and how can operators locally decide how 
much latency is ok to introduce? Or did I misunderstand and the configuration 
is per operator and adds up for the whole pipeline? How do window operators fit 
into this model, in particular if the window duration is longer than the 
configured end-to-end latency? Would they just forward and ignore flush events? 
But arguably this would lead far away from any end-to-end time guarantees.

2. How does this idea interact with checkpointing. I know this question was 
asked before and the answer was that this should be independent, but I don’t 
see how it can be independent for exactly-once where we should only be able to 
produce outputs on checkpoints. Would end-to-end latency config be rather 
useless if the checkpoint interval is greater than the end-to-end latency 
config? Or are you planning to adjust the checkpointing interval w.r.t. the 
latency config?

3. Why do we need record attributes AND flush events? Couldn't the flush events 
be tagged with either backlog = true or false?

4. What happens with events that were submitted under backlog = false and 
caught in an aggregation when the regiment changes to backlog = true or would 
we require a flush before we can a change? The point of my question is to ask 
whether any events could become “trapped” when an operator moves from streaming 
to buffering mode.

5. Small nitpick on your definition on latency in the FLIP: not all events make 
contribute to output that reaches a sink, for example a flatMap could simply 
filter out some events. Meaning, there is at least a big loophole in the 
current definition of end-to-end latency as the maximum of this time, because 
it would be infinite for such events.

Thanks,
Stefan


> On 7. Jul 2023, at 09:49, Jing Ge  wrote:
> 
> Hi Dong,
> 
> Thanks for your clarification.
> 
> 
>> Actually, I think it could make sense to toggle isBacklog between true and
>> false while the job is running.
>> 
> 
> If isBacklog is toggled too often back and forth(e.g. by unexpected
> mistake, unstable system, etc), a large amount of RecordAttributes might be
> triggered, which will lead to performance issues. This should not be the
> right way to use RecordAttributes right? Devs and users should be aware of
> it and know how to monitor, maintain, and fix issues.
> 
> Your reply contains valuable information. It might make sense to add them
> into the FLIP:
> 
> 1. It is up to the operator to decide when to emit RecordAttributes. But
> devs and users should be aware that the number of RecordAttributes should
> not be too high to cause performance issues.
> 2. Although users can decide how to configure them, the end-to-end latency
> should be (commonly?) configured lower than the checkpoint interval.
> 3. The three ways you mentioned for how to derive isBacklog.
> 
> WDYT?
> 
> Best regards,
> Jing
> 
> 
> On Fri, Jul 7, 2023 at 3:13 AM Dong Lin  > wrote:
> 
>> Hi Jing,
>> 
>> Thanks for the comments. Please see my reply inline.
>> 
>> On Fri, Jul 7, 2023 at 5:40 AM Jing Ge  wrote:
>> 
>>> Hi,
>>> 
>>> Thank you all for the inspired discussion. Really appreciate it!
>>> 
>>> @Dong I'd like to ask some (stupid) questions to make sure I understand
>>> your thoughts correctly.
>>> 
>>> 1. It will make no sense to send the same type of RecordAttributes right?
>>> e.g.  if one RecordAttributes(isBacklog=true) has been sent, a new
>>> RecordAttributes will be only sent when isBacklog is changed to be false,
>>> and vice versa. In this way, the number of RecordAttributes will be very
>>> limited.
>>> 
>> 
>> Yes, you are right. Actually, this is what we plan to do when we update
>> operators to emit RecordAttributes via `Output#emitRecordAttributes()`.
>> 
>> Note that the FLIP does not specify the frequency of how operators should
>> invoke `Output#emitRecordAttributes()`. It is up to the operator
>> to decide when to emit RecordAttributes.
>> 
>> 
>>> 2. Since source readers can invoke Output#emitRecordAttributes to emit
>>> RecordAttributes(isBacklog=true/false), it might be weird to send
>>> RecordAttributes with different isBacklog back and forth too often. Devs
>>> and users should pay attention to it. Something is wrong when 

Re: [VOTE] FLIP-321: introduce an API deprecation process

2023-07-03 Thread Stefan Richter
+1 (binding)


> On 3. Jul 2023, at 10:08, Martijn Visser  wrote:
> 
> +1 (binding)
> 
> 
> 
> On Mon, Jul 3, 2023 at 10:03 AM Xintong Song  > wrote:
> 
>> +1 (binding)
>> 
>> Best,
>> 
>> Xintong
>> 
>> 
>> 
>> On Sat, Jul 1, 2023 at 11:26 PM Dong Lin  wrote:
>> 
>>> Thanks for the FLIP.
>>> 
>>> +1 (binding)
>>> 
>>> On Fri, Jun 30, 2023 at 5:39 PM Becket Qin  wrote:
>>> 
 Hi folks,
 
 I'd like to start the VOTE for FLIP-321[1] which proposes to introduce
>> an
 API deprecation process to Flink. The discussion thread for the FLIP
>> can
>>> be
 found here[2].
 
 The vote will be open until at least July 4, following the consensus
>>> voting
 process.
 
 Thanks,
 
 Jiangjie (Becket) Qin
 
 [1]
 
 
>>> 
>> https://www.google.com/url?q=https://cwiki.apache.org/confluence/display/FLINK/FLIP-321%253A%2BIntroduce%2Ban%2BAPI%2Bdeprecation%2Bprocess=gmail-imap=168897655400=AOvVaw24XYJrIcv_vYj1fJVQ7TNY
 [2] 
 https://www.google.com/url?q=https://lists.apache.org/thread/vmhzv8fcw2b33pqxp43486owrxbkd5x9=gmail-imap=168897655400=AOvVaw1yaMLBBkFfvbBhvyAbHYfX



[jira] [Updated] (FLINK-32410) Allocate hash-based collections with sufficient capacity for expected size

2023-06-27 Thread Stefan Richter (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter updated FLINK-32410:
---
Fix Version/s: 1.18.0
   (was: 1.19.0)

> Allocate hash-based collections with sufficient capacity for expected size
> --
>
> Key: FLINK-32410
> URL: https://issues.apache.org/jira/browse/FLINK-32410
> Project: Flink
>  Issue Type: Improvement
>    Reporter: Stefan Richter
>        Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> The JDK API to create hash-based collections for a certain capacity is 
> arguable misleading because it doesn't size the collections to "hold a 
> specific number of items" like you'd expect it would. Instead it sizes it to 
> hold load-factor% of the specified number.
> For the common pattern to allocate a hash-based collection with the size of 
> expected elements to avoid rehashes, this means that a rehash is essentially 
> guaranteed.
> We should introduce helper methods (similar to Guava's 
> `Maps.newHashMapWithExpectedSize(int)`) for allocations for expected size and 
> replace  the direct constructor calls with those.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32345) Improve parallel download of RocksDB incremental state

2023-06-27 Thread Stefan Richter (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32345?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter updated FLINK-32345:
---
Fix Version/s: 1.18.0
   (was: 1.19.0)

> Improve parallel download of RocksDB incremental state
> --
>
> Key: FLINK-32345
> URL: https://issues.apache.org/jira/browse/FLINK-32345
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>        Reporter: Stefan Richter
>    Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> {{RocksDBStateDownloader}} is used to download the files for incremental 
> checkpoints in parallel. However, the parallelism is currently restricted to 
> a single {{IncrementalRemoteKeyedStateHandle}} and also a single state type 
> (shared, private) within the handle at a time.
> We should support parallelization across multiple state types and across 
> multiple state handles. In particular, this can improve our download times 
> for scale-in.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-32326) Disable WAL in RocksDBWriteBatchWrapper by default

2023-06-27 Thread Stefan Richter (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32326?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-32326.
--
Resolution: Fixed

> Disable WAL in RocksDBWriteBatchWrapper by default
> --
>
> Key: FLINK-32326
> URL: https://issues.apache.org/jira/browse/FLINK-32326
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>        Reporter: Stefan Richter
>    Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> We should disable WAL by default in RocksDBWriteBatchWrapper for the case 
> that now WriteOption is provided. This is the case in all restore operations 
> and can impact the performance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-32345) Improve parallel download of RocksDB incremental state

2023-06-27 Thread Stefan Richter (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32345?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-32345.
--
Resolution: Fixed

> Improve parallel download of RocksDB incremental state
> --
>
> Key: FLINK-32345
> URL: https://issues.apache.org/jira/browse/FLINK-32345
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>        Reporter: Stefan Richter
>    Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> {{RocksDBStateDownloader}} is used to download the files for incremental 
> checkpoints in parallel. However, the parallelism is currently restricted to 
> a single {{IncrementalRemoteKeyedStateHandle}} and also a single state type 
> (shared, private) within the handle at a time.
> We should support parallelization across multiple state types and across 
> multiple state handles. In particular, this can improve our download times 
> for scale-in.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32326) Disable WAL in RocksDBWriteBatchWrapper by default

2023-06-27 Thread Stefan Richter (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32326?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter updated FLINK-32326:
---
Fix Version/s: 1.18.0
   (was: 1.19.0)

> Disable WAL in RocksDBWriteBatchWrapper by default
> --
>
> Key: FLINK-32326
> URL: https://issues.apache.org/jira/browse/FLINK-32326
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>        Reporter: Stefan Richter
>    Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> We should disable WAL by default in RocksDBWriteBatchWrapper for the case 
> that now WriteOption is provided. This is the case in all restore operations 
> and can impact the performance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32347) Exceptions from the CompletedCheckpointStore are not registered by the CheckpointFailureManager

2023-06-27 Thread Stefan Richter (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter updated FLINK-32347:
---
Fix Version/s: 1.18.0

> Exceptions from the CompletedCheckpointStore are not registered by the 
> CheckpointFailureManager 
> 
>
> Key: FLINK-32347
> URL: https://issues.apache.org/jira/browse/FLINK-32347
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.3, 1.16.2, 1.17.1
>Reporter: Tigran Manasyan
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> Currently if an error occurs while saving a completed checkpoint in the 
> {_}CompletedCheckpointStore{_}, _CheckpointCoordinator_ doesn't call 
> _CheckpointFailureManager_ to handle the error. Such behavior leads to the 
> fact, that errors from _CompletedCheckpointStore_ don't increase the failed 
> checkpoints count and 
> _'execution.checkpointing.tolerable-failed-checkpoints'_ option does not 
> limit the number of errors of this kind in any way.
> Possible solution may be to move the notification of 
> _CheckpointFailureManager_ about successful checkpoint after storing 
> completed checkpoint in the _CompletedCheckpointStore_ and providing the 
> exception to the _CheckpointFailureManager_ in the 
> {_}CheckpointCoordinator#{_}{_}[addCompletedCheckpointToStoreAndSubsumeOldest()|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1440]{_}
>  method.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-32347) Exceptions from the CompletedCheckpointStore are not registered by the CheckpointFailureManager

2023-06-27 Thread Stefan Richter (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-32347.
--
Resolution: Fixed

> Exceptions from the CompletedCheckpointStore are not registered by the 
> CheckpointFailureManager 
> 
>
> Key: FLINK-32347
> URL: https://issues.apache.org/jira/browse/FLINK-32347
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.3, 1.16.2, 1.17.1
>Reporter: Tigran Manasyan
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> Currently if an error occurs while saving a completed checkpoint in the 
> {_}CompletedCheckpointStore{_}, _CheckpointCoordinator_ doesn't call 
> _CheckpointFailureManager_ to handle the error. Such behavior leads to the 
> fact, that errors from _CompletedCheckpointStore_ don't increase the failed 
> checkpoints count and 
> _'execution.checkpointing.tolerable-failed-checkpoints'_ option does not 
> limit the number of errors of this kind in any way.
> Possible solution may be to move the notification of 
> _CheckpointFailureManager_ about successful checkpoint after storing 
> completed checkpoint in the _CompletedCheckpointStore_ and providing the 
> exception to the _CheckpointFailureManager_ in the 
> {_}CheckpointCoordinator#{_}{_}[addCompletedCheckpointToStoreAndSubsumeOldest()|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1440]{_}
>  method.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-32441) DefaultSchedulerTest#testTriggerCheckpointAndCompletedAfterStore fails with timeout on AZP

2023-06-27 Thread Stefan Richter (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32441?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-32441.
--
Resolution: Fixed

> DefaultSchedulerTest#testTriggerCheckpointAndCompletedAfterStore fails with 
> timeout on AZP
> --
>
> Key: FLINK-32441
> URL: https://issues.apache.org/jira/browse/FLINK-32441
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core, Tests
>Affects Versions: 1.18.0
>Reporter: Sergey Nuyanzin
>Assignee: Stefan Richter
>Priority: Critical
>  Labels: pull-request-available, test-stability
>
> This build 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=50461=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=9274
> fails with timeout on 
> {{DefaultSchedulerTest#testTriggerCheckpointAndCompletedAfterStore}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32441) DefaultSchedulerTest#testTriggerCheckpointAndCompletedAfterStore fails with timeout on AZP

2023-06-27 Thread Stefan Richter (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32441?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17737642#comment-17737642
 ] 

Stefan Richter commented on FLINK-32441:


Fixed in master 0c787f5.

> DefaultSchedulerTest#testTriggerCheckpointAndCompletedAfterStore fails with 
> timeout on AZP
> --
>
> Key: FLINK-32441
> URL: https://issues.apache.org/jira/browse/FLINK-32441
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core, Tests
>Affects Versions: 1.18.0
>Reporter: Sergey Nuyanzin
>Assignee: Stefan Richter
>Priority: Critical
>  Labels: pull-request-available, test-stability
>
> This build 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=50461=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=9274
> fails with timeout on 
> {{DefaultSchedulerTest#testTriggerCheckpointAndCompletedAfterStore}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (FLINK-30859) Remove flink-connector-kafka from master branch

2023-06-27 Thread Stefan Richter (Jira)


[ https://issues.apache.org/jira/browse/FLINK-30859 ]


Stefan Richter deleted comment on FLINK-30859:


was (Author: srichter):
[~tzulitai] I think you forgot to remove some code here: 
https://github.com/apache/flink/blob/149a5e34c1ed8d8943c901a98c65c70693915811/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java#L30C2-L30C2
 and it causes compilation errors.

> Remove flink-connector-kafka from master branch
> ---
>
> Key: FLINK-30859
> URL: https://issues.apache.org/jira/browse/FLINK-30859
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0
>Reporter: Mason Chen
>Assignee: Mason Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> Remove flink-connector-kafka from master branch since the repo has now been 
> externalized and 1.17 commits have been sync'ed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30859) Remove flink-connector-kafka from master branch

2023-06-27 Thread Stefan Richter (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17737569#comment-17737569
 ] 

Stefan Richter commented on FLINK-30859:


[~tzulitai] I think you forgot to remove some code here: 
https://github.com/apache/flink/blob/149a5e34c1ed8d8943c901a98c65c70693915811/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java#L30C2-L30C2
 and it causes compilation errors.

> Remove flink-connector-kafka from master branch
> ---
>
> Key: FLINK-30859
> URL: https://issues.apache.org/jira/browse/FLINK-30859
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0
>Reporter: Mason Chen
>Assignee: Mason Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> Remove flink-connector-kafka from master branch since the repo has now been 
> externalized and 1.17 commits have been sync'ed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-32441) DefaultSchedulerTest#testTriggerCheckpointAndCompletedAfterStore fails with timeout on AZP

2023-06-27 Thread Stefan Richter (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32441?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter reassigned FLINK-32441:
--

Assignee: Stefan Richter

> DefaultSchedulerTest#testTriggerCheckpointAndCompletedAfterStore fails with 
> timeout on AZP
> --
>
> Key: FLINK-32441
> URL: https://issues.apache.org/jira/browse/FLINK-32441
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core, Tests
>Affects Versions: 1.18.0
>Reporter: Sergey Nuyanzin
>Assignee: Stefan Richter
>Priority: Critical
>  Labels: pull-request-available, test-stability
>
> This build 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=50461=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=9274
> fails with timeout on 
> {{DefaultSchedulerTest#testTriggerCheckpointAndCompletedAfterStore}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32437) Determine and set correct maxParallelism for operator chains

2023-06-26 Thread Stefan Richter (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter updated FLINK-32437:
---
Fix Version/s: 2.0.0
   (was: 1.19.0)

> Determine and set correct maxParallelism for operator chains
> 
>
> Key: FLINK-32437
> URL: https://issues.apache.org/jira/browse/FLINK-32437
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>        Reporter: Stefan Richter
>    Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>
> Current code in {{StreamingJobGraphGenerator}} does not properly determine 
> and set the correct maxParallelism of operator chains. We should set the 
> maxParallelism of the chain as the minimum of all the maxParallelism values 
> among operators in the chain.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32437) Determine and set correct maxParallelism for operator chains

2023-06-26 Thread Stefan Richter (Jira)
Stefan Richter created FLINK-32437:
--

 Summary: Determine and set correct maxParallelism for operator 
chains
 Key: FLINK-32437
 URL: https://issues.apache.org/jira/browse/FLINK-32437
 Project: Flink
  Issue Type: Bug
  Components: API / DataStream
Reporter: Stefan Richter
Assignee: Stefan Richter
 Fix For: 1.19.0


Current code in {{StreamingJobGraphGenerator}} does not properly determine and 
set the correct maxParallelism of operator chains. We should set the 
maxParallelism of the chain as the minimum of all the maxParallelism values 
among operators in the chain.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32437) Determine and set correct maxParallelism for operator chains

2023-06-26 Thread Stefan Richter (Jira)
Stefan Richter created FLINK-32437:
--

 Summary: Determine and set correct maxParallelism for operator 
chains
 Key: FLINK-32437
 URL: https://issues.apache.org/jira/browse/FLINK-32437
 Project: Flink
  Issue Type: Bug
  Components: API / DataStream
Reporter: Stefan Richter
Assignee: Stefan Richter
 Fix For: 1.19.0


Current code in {{StreamingJobGraphGenerator}} does not properly determine and 
set the correct maxParallelism of operator chains. We should set the 
maxParallelism of the chain as the minimum of all the maxParallelism values 
among operators in the chain.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32410) Allocate hash-based collections with sufficient capacity for expected size

2023-06-21 Thread Stefan Richter (Jira)
Stefan Richter created FLINK-32410:
--

 Summary: Allocate hash-based collections with sufficient capacity 
for expected size
 Key: FLINK-32410
 URL: https://issues.apache.org/jira/browse/FLINK-32410
 Project: Flink
  Issue Type: Improvement
Reporter: Stefan Richter
Assignee: Stefan Richter
 Fix For: 1.19.0


The JDK API to create hash-based collections for a certain capacity is arguable 
misleading because it doesn't size the collections to "hold a specific number 
of items" like you'd expect it would. Instead it sizes it to hold load-factor% 
of the specified number.

For the common pattern to allocate a hash-based collection with the size of 
expected elements to avoid rehashes, this means that a rehash is essentially 
guaranteed.

We should introduce helper methods (similar to Guava's 
`Maps.newHashMapWithExpectedSize(int)`) for allocations for expected size and 
replace  the direct constructor calls with those.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32410) Allocate hash-based collections with sufficient capacity for expected size

2023-06-21 Thread Stefan Richter (Jira)
Stefan Richter created FLINK-32410:
--

 Summary: Allocate hash-based collections with sufficient capacity 
for expected size
 Key: FLINK-32410
 URL: https://issues.apache.org/jira/browse/FLINK-32410
 Project: Flink
  Issue Type: Improvement
Reporter: Stefan Richter
Assignee: Stefan Richter
 Fix For: 1.19.0


The JDK API to create hash-based collections for a certain capacity is arguable 
misleading because it doesn't size the collections to "hold a specific number 
of items" like you'd expect it would. Instead it sizes it to hold load-factor% 
of the specified number.

For the common pattern to allocate a hash-based collection with the size of 
expected elements to avoid rehashes, this means that a rehash is essentially 
guaranteed.

We should introduce helper methods (similar to Guava's 
`Maps.newHashMapWithExpectedSize(int)`) for allocations for expected size and 
replace  the direct constructor calls with those.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-321: Introduce an API deprecation process

2023-06-20 Thread Stefan Richter
Hi Xintong,

Thanks for the summary, most of the points that you agreed upon also make sense 
to me.

>   2. Dropping deprecated Public APIs in minor releases, or demoting APIs
>   from Public to PublicEvolving / Experimental / Retired in any version bump,
>   are not good practices.

I hope we can can go beyond calling it “not a good practice" and reach 
consensus that we will not demote or remove public APIs in minor releases
and (technically) enforce this rule for all contributions. I think it’s a 
reasonable expectation for stability from a project as mature as Flink.

> I'm personally in favor of option 1. As the migration-period rule is newly
> proposed, I think it's fair to make exceptions for cases where we already
> missed the best chance for planning the deprecation. Moreover, I do believe
> having a quick major version bump does confuse users. Yes, we can explain
> to users that bumping from 2.x to 3.0 does not cost anything other than the
> removal of an deprecated API. But having to explain this itself is an
> indicator that it might be confusing for users.
> 
> 
> Becket, on the other hand, prefers option 2. From my understanding, his
> major point is that a quick major version bump causes barely any actual
> lose on users, while in option 1 not providing the migration period or
> providing a shorter on is an actual lose on users. (@Becket, please correct
> me if I'm mistaken.)
> 

I would be open to both options and maybe postpone this decision until we have 
the complete roadmap for Flink 2.0. We would have a better overview about the 
technical consequences, e.g. how hard it would be to offer and maintain both 
APIs side by side, how well we get both APIs separated, and whether or not we 
will be able to use the full potential of our breaking changes before the 
DataStream API is completely removed. One example for the last point I can 
think of would be moving from lazy to eager state declaration. If we believe 
that it is reasonable to support all planned changes while maintaining both 
APIs, I'm leaning towards option 2.

Best,
Stefan


> On 20. Jun 2023, at 14:43, Xintong Song  wrote:
> 
> Becket and I had an offline voice call earlier today. We have reached
> consensus on some of the arguments, but still hold different opinions on
> some others. I'd like to post the outcome here for transparency.
> 
> We both agree that:
> 
>   1. Providing a migration period would be beneficial for our users, and
>   we should do that
>   2. Dropping deprecated Public APIs in minor releases, or demoting APIs
>   from Public to PublicEvolving / Experimental / Retired in any version bump,
>   are not good practices.
>   3. Ideally, with this FLIP, developers should be more careful and plan
>   API changes ahead. That means:
>  1. Be more careful with designing APIs and promoting them to Public,
>  so that they won't be changed / removed very soon, and also the
>  maintenance overhead for keeping them after deprecation should be
>  affordable. (I believe this also aligns with John's opinion.)
>  2. Plan the deprecation / removal of old APIs that we don't want to
>  carry to the next major version earlier, so that they'll have the
>  2-minor-release migration period before the next major version bump.
>   4. A practical situation is that, we are not ready for deprecating the
>   DataStream API, nor afford to carry it for another couple of years.
>  1. We cannot deprecate it now, because the new ProcessFunction API
>  (the planned replacement for DataStream API) is not yet ready. We haven't
>  planned the ProcessFunction API earlier because the original
> plan was to do
>  in-place changes directly on the DataStream API in release 2.0, until the
>  smooth migration period is proposed. Before this, the only guarantees we
>  provide for Public APIs is that they'll stay compatible until the next
>  major version, which led to the understanding the in-place changes on
>  Public APIs should be fine.
>  2. We also cannot afford carrying the DataStream API for another
>  couple of years. One of the reasons that we want to refactor / replace
>  DataStream API is that it exposes and depends on too many
> Flink's internal
>  runtime implementations, which has already limited / blocked plans for
>  Flink's internal improvements for many times. Also due to the wide and
>  transitive dependencies on the internals, maintaining two sets of lower
>  APIs itself is expensive and would be better to keep it short. This is
>  admittedly a historical design flaw which should be avoided in future.
> 
> What we haven't reached consensus is about how to deal with the current
> situation around the DataStream API. There are two options.
> 
> 
>   1. We say this FLIP is enforced starting release 2.0. For current 1.x
>   APIs, we provide a migration period with best effort, while allowing
>   exceptions for immediate removal in 2.0. That 

[jira] [Commented] (FLINK-31238) Use IngestDB to speed up Rocksdb rescaling recovery

2023-06-19 Thread Stefan Richter (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17734183#comment-17734183
 ] 

Stefan Richter commented on FLINK-31238:


[~mayuehappy] Thanks for working on this topic! I saw that your PR against 
RocksDB was merged a few days ago. With your code in RocksDB, are you planning 
to continue with this JIRA and is there any planned release?

> Use IngestDB to speed up Rocksdb rescaling recovery 
> 
>
> Key: FLINK-31238
> URL: https://issues.apache.org/jira/browse/FLINK-31238
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.16.1
>Reporter: Yue Ma
>Assignee: Yue Ma
>Priority: Major
> Attachments: image-2023-02-27-16-41-18-552.png, 
> image-2023-02-27-16-57-18-435.png, image-2023-03-07-14-27-10-260.png, 
> image-2023-03-09-15-23-30-581.png, image-2023-03-09-15-26-12-314.png, 
> image-2023-03-09-15-28-32-363.png, image-2023-03-09-15-41-03-074.png, 
> image-2023-03-09-15-41-08-379.png, image-2023-03-09-15-45-56-081.png, 
> image-2023-03-09-15-46-01-176.png, image-2023-03-09-15-50-04-281.png, 
> image-2023-03-29-15-25-21-868.png, screenshot-1.png
>
>
> (The detailed design is in this document
> [https://docs.google.com/document/d/10MNVytTsyiDLZQSR89kDkVdmK_YjbM6jh0teerfDFfI|https://docs.google.com/document/d/10MNVytTsyiDLZQSR89kDkVdmK_YjbM6jh0teerfDFfI])
> There have been many discussions and optimizations in the community about 
> optimizing rocksdb scaling and recovery.
> https://issues.apache.org/jira/browse/FLINK-17971
> https://issues.apache.org/jira/browse/FLINK-8845
> https://issues.apache.org/jira/browse/FLINK-21321
> We hope to discuss some of our explorations under this ticket
> The process of scaling and recovering in rocksdb simply requires two steps
>  # Insert the valid keyGroup data of the new task.
>  # Delete the invalid data in the old stateHandle.
> The current method for data writing is to specify the main Db first and then 
> insert data using writeBatch.In addition, the method of deleteRange is 
> currently used to speed up the ClipDB. But in our production environment, we 
> found that the speed of rescaling is still very slow, especially when the 
> state of a single Task is large. 
>  
> We hope that the previous sst file can be reused directly when restoring 
> state, instead of retraversing the data. So we made some attempts to optimize 
> it in our internal version of flink and frocksdb.
>  
> We added two APIs *ClipDb* and *IngestDb* in frocksdb. 
>  * ClipDB is used to clip the data of a DB. Different from db.DeteleRange and 
> db.Delete, DeleteValue and RangeTombstone will not be generated for parts 
> beyond the key range. We will iterate over the FileMetaData of db. Process 
> each sst file. There are three situations here. 
> If all the keys of a file are required, we will keep the sst file and do 
> nothing 
> If all the keys of the sst file exceed the specified range, we will delete 
> the file directly. 
> If we only need some part of the sst file, we will rewrite the required keys 
> to generate a new sst file。
> All sst file changes will be placed in a VersionEdit, and the current 
> versions will LogAndApply this edit to ensure that these changes can take 
> effect
>  * IngestDb is used to directly ingest all sst files of one DB into another 
> DB. But it is necessary to strictly ensure that the keys of the two DBs do 
> not overlap, which is easy to do in the Flink scenario. The hard link method 
> will be used in the process of ingesting files, so it will be very fast. At 
> the same time, the file number of the main DB will be incremented 
> sequentially, and the SequenceNumber of the main DB will be updated to the 
> larger SequenceNumber of the two DBs.
> When IngestDb and ClipDb are supported, the state restoration logic is as 
> follows
>  * Open the first StateHandle as the main DB and pause the compaction.
>  * Clip the main DB according to the KeyGroup range of the Task with ClipDB
>  * Open other StateHandles in sequence as Tmp DB, and perform ClipDb  
> according to the KeyGroup range
>  * Ingest all tmpDb into the main Db after tmpDb cliped
>  * Open the Compaction process of the main DB
> !screenshot-1.png|width=923,height=243!
> We have done some benchmark tests on the internal Flink version, and the test 
> results show that compared with the writeBatch method, the expansion and 
> recovery speed of IngestDb can be increased by 5 to 10 times as follows 
> (SstFileWriter means uses the recovery method of g

Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs

2023-06-19 Thread Stefan Richter

Hi Lijie,

I think you understood me correctly. But I would not consider this a true 
cyclic dependency in the dataflow because I would not suggest to send the 
filter through an edge in the job graph from join to scan. I’d rather bypass 
the stream graph for exchanging bringing the filter to the scan. For example, 
the join could report the filter after the build phase, e.g. to the JM or a 
predefined DFS folder. And when the probe scan is scheduled, the JM provides 
the filter information to the scan when it gets scheduled for execution or the 
scan looks in DFS if it can find any filter that it can use as part of 
initialization. I’m not suggesting to do it exactly in those ways, but just to 
show what I mean by "bypassing the dataflow".

Anyways, I’m fine with excluding this optimization from the current FLIP if you 
believe it would be hard to implement in Flink.

Best,
Stefan


> On 19. Jun 2023, at 14:07, Lijie Wang  wrote:
> 
> Hi Stefan,
> 
> If I understand correctly(I hope so), the hash join operator needs to send
> the bloom filter to probe scan, and probe scan also needs to send the
> filtered data to the hash join operator. This means there will be a cycle
> in the data flow, it will be hard for current Flink to schedule this kind
> of graph. I admit we can find a way to do this, but that's probably a
> bit outside the scope of this FLIP.  So let's do these complex
> optimizations later, WDYT?
> 
> Best,
> Lijie
> 
> Stefan Richter  <mailto:srich...@confluent.io.invalid>> 于2023年6月19日周一 18:15写道:
> 
>> Hi Lijie,
>> 
>> Exactly, my proposal was to build the bloom filter in the hash operator. I
>> don’t know about all the details about the implementation of Flink’s join
>> operator, but I’d assume that even if the join is a two input operator it
>> gets scheduled for 2 different pipelines. First the build phase with the
>> scan from the dimension table and after that’s completed the probe phase
>> with the scan of the fact table. I’m not proposing the use the bloom filter
>> only in the join operator, but rather send the bloom filter to the probe
>> scan before starting the probe. I assume this would require some form of
>> side channel to transport the filter and coordination to tell the sources
>> that such a filter is available. I cannot answer how hard those would be to
>> implement, but the idea doesn’t seem impossible to me.
>> 
>> Best,
>> Stefan
>> 
>> 
>>> On 19. Jun 2023, at 11:56, Lijie Wang  wrote:
>>> 
>>> Hi Stefan,
>>> 
>>> Now I know what you mean about point 1. But currently it is unfeasible
>> for
>>> Flink, because the building of the hash table is inside the hash join
>>> operator. The hash join operator has two inputs, it will first process
>> the
>>> data of the build-input to build a hash table, and then use the hash
>> table
>>> to process the data of the probe-input. If we want to use the built hash
>>> table to deduplicate data for bloom filter, we must put the bloom filter
>>> inside the hash join operator.  However, in this way, the data reaching
>> the
>>> join operator cannot be reduced (the shuffle/network overhead cannot be
>>> reduced), which is not what we expected.
>>> 
>>> Regarding the filter type, I agree with you, more types of filters can
>>> get further
>>> optimization,  and it is in our future plan (We described it in the
>> section
>>> Future+Improvements#More+underlying+implementations).
>>> 
>>> Best,
>>> Lijie
>>> 
>>> Stefan Richter >> <mailto:srich...@confluent.io.invalid> > srich...@confluent.io.invalid <mailto:srich...@confluent.io.invalid>>> 
>> 于2023年6月19日周一 15:58写道:
>>> 
>>>> 
>>>> Hi Lijie,
>>>> 
>>>> thanks for your response, I agree with what you said about points 2 and
>> 3.
>>>> Let me explain a bit more about point 1. This would not apply to all
>> types
>>>> of joins and my suggestion is also *not* to build a hash table only for
>> the
>>>> purpose to build the bloom filter.
>>>> I was thinking about the scenario of a hash join, where you would build
>>>> the hash table as part of the join algorithm anyways and then use the
>>>> keyset of that hash table to 1) have better insights on about NDV and
>> 2) be
>>>> able to construct the bloom filter without duplicates and therefore
>> faster.
>>>> So the preconditions where I would use this is if you are building a
>> hash
>>>> table as part of the

Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs

2023-06-19 Thread Stefan Richter
Hi Lijie,

Exactly, my proposal was to build the bloom filter in the hash operator. I 
don’t know about all the details about the implementation of Flink’s join 
operator, but I’d assume that even if the join is a two input operator it gets 
scheduled for 2 different pipelines. First the build phase with the scan from 
the dimension table and after that’s completed the probe phase with the scan of 
the fact table. I’m not proposing the use the bloom filter only in the join 
operator, but rather send the bloom filter to the probe scan before starting 
the probe. I assume this would require some form of side channel to transport 
the filter and coordination to tell the sources that such a filter is 
available. I cannot answer how hard those would be to implement, but the idea 
doesn’t seem impossible to me.

Best,
Stefan


> On 19. Jun 2023, at 11:56, Lijie Wang  wrote:
> 
> Hi Stefan,
> 
> Now I know what you mean about point 1. But currently it is unfeasible for
> Flink, because the building of the hash table is inside the hash join
> operator. The hash join operator has two inputs, it will first process the
> data of the build-input to build a hash table, and then use the hash table
> to process the data of the probe-input. If we want to use the built hash
> table to deduplicate data for bloom filter, we must put the bloom filter
> inside the hash join operator.  However, in this way, the data reaching the
> join operator cannot be reduced (the shuffle/network overhead cannot be
> reduced), which is not what we expected.
> 
> Regarding the filter type, I agree with you, more types of filters can
> get further
> optimization,  and it is in our future plan (We described it in the section
> Future+Improvements#More+underlying+implementations).
> 
> Best,
> Lijie
> 
> Stefan Richter  <mailto:srich...@confluent.io.invalid>> 于2023年6月19日周一 15:58写道:
> 
>> 
>> Hi Lijie,
>> 
>> thanks for your response, I agree with what you said about points 2 and 3.
>> Let me explain a bit more about point 1. This would not apply to all types
>> of joins and my suggestion is also *not* to build a hash table only for the
>> purpose to build the bloom filter.
>> I was thinking about the scenario of a hash join, where you would build
>> the hash table as part of the join algorithm anyways and then use the
>> keyset of that hash table to 1) have better insights on about NDV and 2) be
>> able to construct the bloom filter without duplicates and therefore faster.
>> So the preconditions where I would use this is if you are building a hash
>> table as part of the join and you know you are not building for a key
>> column (because there would be no duplicates to eliminate). Then your bloom
>> filter construction could benefit already from the deduplication work that
>> was done for building the hash table.
>> 
>> I also wanted to point out that besides bloom filter and IN filter you
>> could also think of other types of filter that can become interesting for
>> certain distributions and meta data. For example, if you have min/max
>> information about columns and partitions you could have a bit vector
>> represent equilibrium-sized ranges of the key space between min and max and
>> have the bits represent what part of the range is present and push that
>> information down to the scan.
>> 
>> Best,
>> Stefan
>> 
>> 
>>> On 19. Jun 2023, at 08:26, Lijie Wang  wrote:
>>> 
>>> Hi Stefan,
>>> 
>>> Thanks for your feedback. Let me briefly summarize the optimization
>> points
>>> you mentioned above (Please correct me if I'm wrong):
>>> 
>>> 1. Build an extra hash table for deduplication before building the bloom
>>> filter.
>>> 2. Use the two-phase approach to build the bloom filter(first local, then
>>> OR-combine).
>>> 3. Use blocked bloom filters to improve the cache efficiency.
>>> 
>>> For the above 3 points, I have the following questions or opinions:
>>> 
>>> For point 1, it seems that building a hash table also requires traversing
>>> all build side data, and the overhead seems to be the same as building a
>>> bloom filter directly? In addition, the hash table will take up more
>> space
>>> when the amount of data is large, which is why we choose to use bloom
>>> filter instead of hash table.
>>> 
>>> For point 2, I think it's a good idea to use the two-phase approach to
>>> build the bloom filter. But rather than directly broadcasting the local
>>> bloom filter to the probe side, I prefer to introduce a global node for
>> the
>>> OR-combine(like 

Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs

2023-06-19 Thread Stefan Richter

Hi Lijie,

thanks for your response, I agree with what you said about points 2 and 3. Let 
me explain a bit more about point 1. This would not apply to all types of joins 
and my suggestion is also *not* to build a hash table only for the purpose to 
build the bloom filter.
I was thinking about the scenario of a hash join, where you would build the 
hash table as part of the join algorithm anyways and then use the keyset of 
that hash table to 1) have better insights on about NDV and 2) be able to 
construct the bloom filter without duplicates and therefore faster. So the 
preconditions where I would use this is if you are building a hash table as 
part of the join and you know you are not building for a key column (because 
there would be no duplicates to eliminate). Then your bloom filter construction 
could benefit already from the deduplication work that was done for building 
the hash table.

I also wanted to point out that besides bloom filter and IN filter you could 
also think of other types of filter that can become interesting for certain 
distributions and meta data. For example, if you have min/max information about 
columns and partitions you could have a bit vector represent equilibrium-sized 
ranges of the key space between min and max and have the bits represent what 
part of the range is present and push that information down to the scan.

Best,
Stefan 


> On 19. Jun 2023, at 08:26, Lijie Wang  wrote:
> 
> Hi Stefan,
> 
> Thanks for your feedback. Let me briefly summarize the optimization points
> you mentioned above (Please correct me if I'm wrong):
> 
> 1. Build an extra hash table for deduplication before building the bloom
> filter.
> 2. Use the two-phase approach to build the bloom filter(first local, then
> OR-combine).
> 3. Use blocked bloom filters to improve the cache efficiency.
> 
> For the above 3 points, I have the following questions or opinions:
> 
> For point 1, it seems that building a hash table also requires traversing
> all build side data, and the overhead seems to be the same as building a
> bloom filter directly? In addition, the hash table will take up more space
> when the amount of data is large, which is why we choose to use bloom
> filter instead of hash table.
> 
> For point 2, I think it's a good idea to use the two-phase approach to
> build the bloom filter. But rather than directly broadcasting the local
> bloom filter to the probe side, I prefer to introduce a global node for the
> OR-combine(like two-phase-agg[1]), then broadcast the combined bloom filter
> to the probe side. The latter can reduce the amount of data transferred by
> the network. I will change the FLIP like this.
> 
> For point 3, I think it's a nice optimization, but I prefer to put it to
> the future improvements. There is already an implementation of bloom filter
> in flink, we can simply reuse it. Introducing a new bloom filter
> implementation introduces some complexity  (we need to implement it, test
> it, etc), and is not the focus of this FLIP.
> 
> [1]
> https://www.google.com/url?q=https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/%23local-global-aggregation=gmail-imap=168776080400=AOvVaw2eoXknGWmG4TSiznxtHFWG
> 
> Best,
> Lijie
> 
> Stefan Richter  <mailto:srich...@confluent.io.invalid>> 于2023年6月16日周五 16:45写道:
> 
>> Hi,
>> 
>> Thanks for the proposal of this feature! I have a question about the
>> filter build and a some suggestions for potential improvements. First, I
>> wonder why you suggest to run the filter builder as separate operator with
>> parallelism 1. I’d suggest to integrate the filter distributed build with
>> the hash table build phase as follows:
>> 
>> 1. Build the hash table completely in each subtask.
>> 2. The keyset of the hash table is giving us a precise NDV count for every
>> subtask.
>> 3. Build a filter from the subtask hash table. For low cardinality tables,
>> I’d go with the suggested optimization of IN-filter.
>> 4. Each build subtask transfers the local bloom filter to all probe
>> operators.
>> 5. On the probe operator we can either probe against the individual
>> filters, or we OR-combine all subtask filters into aggregated bloom filter.
>> 
>> I’m suggesting this because building inserting into a (larger) bloom
>> filter can be costly, especially once the filter exceeds cache sizes and is
>> therefor better parallelized. First inserting into the hash table also
>> deduplicates the keys and we avoid inserting records twice into the bloom
>> filter. If we want to improve cache efficiency for the build of larger
>> filters, we could structure them as blocked bloom filters, where the filter
>> is separated into blocks and all bits

Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs

2023-06-16 Thread Stefan Richter
Hi,

Thanks for the proposal of this feature! I have a question about the filter 
build and a some suggestions for potential improvements. First, I wonder why 
you suggest to run the filter builder as separate operator with parallelism 1. 
I’d suggest to integrate the filter distributed build with the hash table build 
phase as follows:

1. Build the hash table completely in each subtask.
2. The keyset of the hash table is giving us a precise NDV count for every 
subtask.
3. Build a filter from the subtask hash table. For low cardinality tables, I’d 
go with the suggested optimization of IN-filter.
4. Each build subtask transfers the local bloom filter to all probe operators.
5. On the probe operator we can either probe against the individual filters, or 
we OR-combine all subtask filters into aggregated bloom filter.

I’m suggesting this because building inserting into a (larger) bloom filter can 
be costly, especially once the filter exceeds cache sizes and is therefor 
better parallelized. First inserting into the hash table also deduplicates the 
keys and we avoid inserting records twice into the bloom filter. If we want to 
improve cache efficiency for the build of larger filters, we could structure 
them as blocked bloom filters, where the filter is separated into blocks and 
all bits of one key go only into one block. That allows us to apply software 
managed buffering to first group keys that go into the same partition (ideally 
fitting into cache) and then bulk load partitions once we collected enough keys 
for one round of loading.

Best,
Stefan 


  <https://www.confluent.io/>   
Stefan Richter
Principal Engineer II

Follow us:  
<https://www.confluent.io/blog?utm_source=footer_medium=email_campaign=ch.email-signature_type.community_content.blog>
 <https://twitter.com/ConfluentInc>



> On 15. Jun 2023, at 13:35, Lijie Wang  wrote:
> 
> Hi,  Benchao and Aitozi,
> 
> Thanks for your feedback about this FLIP.
> 
> @Benchao
> 
>>> I think it would be reasonable to also support "pipeline shuffle" if
> possible.
> As I said above, runtime filter can work well with all shuffle mode,
> including pipeline shuffle.
> 
>>> if the RuntimeFIlterBuilder could be done quickly than RuntimeFilter
> operator, it can still filter out additional data afterwards.
> I think the main purpose of runtime filter is to reduce the shuffle data
> and the data arriving at join. Although eagerly running the large
> table side can process datas in advance, most of the data may be
> irrelevant, causing huge shuffle overhead and slowing the join. In
> addition, if the join is a hash-join, the probe side of the hash-join also
> needs to wait for its build side to complete, so the large table side is
> likely to be back-pressed.
> In addition, I don't tend to add too many configuration options in the
> first version, which may make it more difficult to use (users need to
> understand a lot of internal implementation details). Maybe it could be a
> future improvement (if it's worthwhile)?
> 
> 
> @Aitozi
> 
>>> IMO, In the current implementation two source table operators will be
> executed simultaneously.
> The example in FLIP uses blocking shuffle(I will add this point to FLIP).
> The runtime filter is generally chained with the large table side to reduce
> the shuffle data (as shown in Figure 2 of FLIP). The job vertices should be
> scheduled in topological order, so the large table side can only be
> scheduled after the RuntimeFilterBuilder finishes.
> 
>>> Are there some tests to show the default value of
> table.optimizer.runtime-filter.min-probe-data-size 10G is a good default
> value.
> It's not tested yet, but it will be done before merge the code. The current
> value refers to systems such as spark and hive. Before code merging, we
> will test on TPC-DS 10 T to find an optimal set of values. If you have
> relevant experience on it, welcome to give some suggestions.
> 
>>> What's the representation of the runtime filter node in planner ?
> As shown in Figure 1 of FLIP, we intend to add two new physical nodes,
> RuntimeFilterBuilder and RuntimeFilter.
> 
> Best,
> Lijie
> 
> Aitozi mailto:gjying1...@gmail.com>> 于2023年6月15日周四 
> 15:52写道:
> 
>> Hi Lijie,
>> 
>>Nice to see this valuable feature. After reading the FLIP I have some
>> questions below:
>> 
>>> Schedule the TableSource(dim) first.
>> 
>> How does it know to schedule the TableSource(dim) first ? IMO, In the
>> current implementation two source table operators will be executed
>> simultaneously.
>> 
>>> If the data volume on the probe side is too small, the overhead of
>> building runtime filter is not worth it.
>> 
>> Are there s

[jira] [Commented] (FLINK-32347) Exceptions from the CompletedCheckpointStore are not registered by the CheckpointFailureManager

2023-06-15 Thread Stefan Richter (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17733059#comment-17733059
 ] 

Stefan Richter commented on FLINK-32347:


Hey, I've already opened a PR. The issue was still unassigned, so I thought I 
can still work on it.

> Exceptions from the CompletedCheckpointStore are not registered by the 
> CheckpointFailureManager 
> 
>
> Key: FLINK-32347
> URL: https://issues.apache.org/jira/browse/FLINK-32347
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.3, 1.16.2, 1.17.1
>Reporter: Tigran Manasyan
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> Currently if an error occurs while saving a completed checkpoint in the 
> {_}CompletedCheckpointStore{_}, _CheckpointCoordinator_ doesn't call 
> _CheckpointFailureManager_ to handle the error. Such behavior leads to the 
> fact, that errors from _CompletedCheckpointStore_ don't increase the failed 
> checkpoints count and 
> _'execution.checkpointing.tolerable-failed-checkpoints'_ option does not 
> limit the number of errors of this kind in any way.
> Possible solution may be to move the notification of 
> _CheckpointFailureManager_ about successful checkpoint after storing 
> completed checkpoint in the _CompletedCheckpointStore_ and providing the 
> exception to the _CheckpointFailureManager_ in the 
> {_}CheckpointCoordinator#{_}{_}[addCompletedCheckpointToStoreAndSubsumeOldest()|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1440]{_}
>  method.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-32347) Exceptions from the CompletedCheckpointStore are not registered by the CheckpointFailureManager

2023-06-15 Thread Stefan Richter (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter reassigned FLINK-32347:
--

Assignee: Stefan Richter

> Exceptions from the CompletedCheckpointStore are not registered by the 
> CheckpointFailureManager 
> 
>
> Key: FLINK-32347
> URL: https://issues.apache.org/jira/browse/FLINK-32347
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.3, 1.16.2, 1.17.1
>Reporter: Tigran Manasyan
>Assignee: Stefan Richter
>Priority: Major
>
> Currently if an error occurs while saving a completed checkpoint in the 
> {_}CompletedCheckpointStore{_}, _CheckpointCoordinator_ doesn't call 
> _CheckpointFailureManager_ to handle the error. Such behavior leads to the 
> fact, that errors from _CompletedCheckpointStore_ don't increase the failed 
> checkpoints count and 
> _'execution.checkpointing.tolerable-failed-checkpoints'_ option does not 
> limit the number of errors of this kind in any way.
> Possible solution may be to move the notification of 
> _CheckpointFailureManager_ about successful checkpoint after storing 
> completed checkpoint in the _CompletedCheckpointStore_ and providing the 
> exception to the _CheckpointFailureManager_ in the 
> {_}CheckpointCoordinator#{_}{_}[addCompletedCheckpointToStoreAndSubsumeOldest()|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1440]{_}
>  method.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32345) Improve parallel download of RocksDB incremental state

2023-06-15 Thread Stefan Richter (Jira)
Stefan Richter created FLINK-32345:
--

 Summary: Improve parallel download of RocksDB incremental state
 Key: FLINK-32345
 URL: https://issues.apache.org/jira/browse/FLINK-32345
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Reporter: Stefan Richter
Assignee: Stefan Richter
 Fix For: 1.19.0


{{RocksDBStateDownloader}} is used to download the files for incremental 
checkpoints in parallel. However, the parallelism is currently restricted to a 
single {{IncrementalRemoteKeyedStateHandle}} and also a single state type 
(shared, private) within the handle at a time.
We should support parallelization across multiple state types and across 
multiple state handles. In particular, this can improve our download times for 
scale-in.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32345) Improve parallel download of RocksDB incremental state

2023-06-15 Thread Stefan Richter (Jira)
Stefan Richter created FLINK-32345:
--

 Summary: Improve parallel download of RocksDB incremental state
 Key: FLINK-32345
 URL: https://issues.apache.org/jira/browse/FLINK-32345
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Reporter: Stefan Richter
Assignee: Stefan Richter
 Fix For: 1.19.0


{{RocksDBStateDownloader}} is used to download the files for incremental 
checkpoints in parallel. However, the parallelism is currently restricted to a 
single {{IncrementalRemoteKeyedStateHandle}} and also a single state type 
(shared, private) within the handle at a time.
We should support parallelization across multiple state types and across 
multiple state handles. In particular, this can improve our download times for 
scale-in.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-321: Introduce an API deprecation process

2023-06-13 Thread Stefan Richter
Hi Becket,



> *1. What is the scope of public interfaces?*
> We actually already have a definition of public interfaces in the FLIP[1]
> wiki, it is basically anything that is sensible to the users, including
> packages, classes, method signature and behavior (blocking v.s.
> non-blocking, for example), metrics, configurations, CLI tools and
> arguments, and so on.

I agree with the scope that is defined by the annotation and think all 
observable behavior and also semantics should be covered by the policy. We 
should also review that all classes/methods/etc that are reachable through the 
API are annotated and define a sensible default in case something was missed. 

> 
> *2. What API changes can happen across different versions?*
> FLIP-196 answers the first question by defining in which versions
> programming APIs (methods and classes annotated with "Public",
> "PublicEvolving" or "Experimental") can have breaking changes. This allows
> us to get rid of deprecated APIs in patch / minor / major versions, while
> providing a clear expectation for our users.

I think the table has some contradiction with your first email for public API:

> @Public API: can only change between major releases.

Vs

> PublicAt least 2 minor releases.


I assume the entry in the table is a mistake?

> 
> *3. When can breaking changes happen?*
> This FLIP tries to answer the second question, i.e. how users can adapt to
> breaking changes. We should avoid sudden API changes between two versions,
> and always leave time for the users to have a planned migration away from
> breaking changes. This basically means we will always take two steps when
> making breaking changes:
> 1. Introduce the new API (if needed), and mark the old API as deprecated.
> 2. After some time, which are the migration periods defined in this FLIP,
> remove the old API.
> 
> And in this FLIP, we would like to define the exact length of the migration
> period. It is going to be a trade-off between the maintenance cost of
> deprecated APIs and the time users have to migrate away from those
> deprecated APIs. In this FLIP we want to define the minimum lengths of the
> migration periods.
> 
> *4. How do users upgrade to Flink versions with breaking changes?*
> With the answers to the above three questions, the user upgrade path should
> be simple and clear:
> 1. upgrade to a Flink version that contains both the deprecated and new API.
> 2. have a planned migration to move to the new API.
> 3. upgrade to later Flink versions in which the code of the deprecated API
> is removed.


This is where things get interesting when we want to make Flink fit for running 
as a service. So far our release and compatibility model is very developer 
centric (easy for development to move forward quickly) and not user friendly. 
In SaaS, there is not necessarily a user available to upgrade a job when the 
service undergoes an upgrade. And the expectation of a user would be that a 
service can update without breaking their continuously running jobs. We should 
answer the question what level of stability we want to provide our user in the 
future and elevate the Flink technology to a level that allows to create a 
service with continuous delivery across versions. Other software, like e.g. the 
Linux OS gives way stronger stability guarantees across versions, and I wonder 
if the discussion should not also explore how we can reach a comparable level 
of stability for Flink.

> 
> So, it looks like our story for API stability and compatibility would be
> complete with this FLIP.
> 
> Thanks,
> 
> Jiangjie (Becket) Qin
> 
> 
> On Tue, Jun 13, 2023 at 12:30 AM Stefan Richter
> mailto:srich...@confluent.io.invalid>> wrote:
> 
>> Hi,
>> 
>> Thanks a lot for bringing up this topic and for the initial proposal. As
>> more and more people are looking into running Flink as a continuous service
>> this discussion is becoming very relevant.
>> 
>> What I would like to see is a clearer definition for what we understand by
>> stability and compatibility. Our current policy only talks about being able
>> to “compile” and “run” with a different version. As far as I can see, there
>> is no guarantee about the stability of observable behavior. I believe it’s
>> important for the community to include this important aspect in the
>> guarantees that we give as our policy.
>> 
>> For all changes that we do to the stable parts of the API, we should also
>> consider how easy or difficult different types of changes would be for
>> running Flink as a service with continuous delivery. For example,
>> introducing a new interface to evolve the methods would make it easier to
>> write adapter code than cha

[jira] [Created] (FLINK-32326) Disable WAL in RocksDBWriteBatchWrapper by default

2023-06-13 Thread Stefan Richter (Jira)
Stefan Richter created FLINK-32326:
--

 Summary: Disable WAL in RocksDBWriteBatchWrapper by default
 Key: FLINK-32326
 URL: https://issues.apache.org/jira/browse/FLINK-32326
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Reporter: Stefan Richter
Assignee: Stefan Richter
 Fix For: 1.19.0


We should disable WAL by default in RocksDBWriteBatchWrapper for the case that 
now WriteOption is provided. This is the case in all restore operations and can 
impact the performance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32326) Disable WAL in RocksDBWriteBatchWrapper by default

2023-06-13 Thread Stefan Richter (Jira)
Stefan Richter created FLINK-32326:
--

 Summary: Disable WAL in RocksDBWriteBatchWrapper by default
 Key: FLINK-32326
 URL: https://issues.apache.org/jira/browse/FLINK-32326
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Reporter: Stefan Richter
Assignee: Stefan Richter
 Fix For: 1.19.0


We should disable WAL by default in RocksDBWriteBatchWrapper for the case that 
now WriteOption is provided. This is the case in all restore operations and can 
impact the performance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-321: Introduce an API deprecation process

2023-06-12 Thread Stefan Richter
Hi,

Thanks a lot for bringing up this topic and for the initial proposal. As more 
and more people are looking into running Flink as a continuous service this 
discussion is becoming very relevant.

What I would like to see is a clearer definition for what we understand by 
stability and compatibility. Our current policy only talks about being able to 
“compile” and “run” with a different version. As far as I can see, there is no 
guarantee about the stability of observable behavior. I believe it’s important 
for the community to include this important aspect in the guarantees that we 
give as our policy.

For all changes that we do to the stable parts of the API, we should also 
consider how easy or difficult different types of changes would be for running 
Flink as a service with continuous delivery. For example, introducing a new 
interface to evolve the methods would make it easier to write adapter code than 
changing method signatures in-place on the existing interface. Those concerns 
should be considered in our process for evolving interfaces.

Best,
Stefan



  <https://www.confluent.io/>   
Stefan Richter
Principal Engineer II

Follow us:  
<https://www.confluent.io/blog?utm_source=footer_medium=email_campaign=ch.email-signature_type.community_content.blog>
 <https://twitter.com/ConfluentInc>



> On 11. Jun 2023, at 14:30, Becket Qin  wrote:
> 
> Hi folks,
> 
> As one of the release 2.0 efforts, the release managers were discussing our
> API lifecycle policies. There have been FLIP-196[1] and FLIP-197[2] that
> are relevant to this topic. These two FLIPs defined the stability guarantee
> of the programming APIs with various different stability annotations, and
> the promotion process. A recap of the conclusion is following:
> 
> Stability:
> @Internal API: can change between major/minor/patch releases.
> @Experimental API: can change between major/minor/patch releases.
> @PublicEvolving API: can change between major/minor releases.
> @Public API: can only change between major releases.
> 
> Promotion:
> An @Experimental API should be promoted to @PublicEvolving after two
> releases, and a @PublicEvolving API should be promoted to @Public API after
> two releases, unless there is a documented reason not to do so.
> 
> One thing not mentioned in these two FLIPs is the API deprecation process,
> which is in fact critical and fundamental to how the stability guarantee is
> provided in practice, because the stability is all about removing existing
> APIs. For example, if we want to change a method "ResultFoo foo(ArgumentFoo
> arg)" to "ResultBar bar(ArgumentBar arg)", there will be two ways to do
> this:
> 
> 1. Mark method "foo" as deprecated and add the new method "bar". At some
> point later, remove the method "foo".
> 2. Simply change the API in place, that basically means removing method foo
> and adding method bar at the same time.
> 
> In the first option, users are given a period with stability guarantee to
> migrate from "foo" to "bar". For the second option, this migration period
> is effectively 0. A zero migration period is problematic because end users
> may need a feature/bug fix from a new version, but cannot upgrade right
> away due to some backwards compatible changes, even though these changes
> perfectly comply with the API stability guarantees defined above. So the
> migration period is critical to the API stability guarantees for the end
> users.
> 
> The migration period is essentially how long a deprecated API can be
> removed from the source code. So with this FLIP, I'd like to kick off the
> discussion about our deprecation process.
> 
> https://www.google.com/url?q=https://cwiki.apache.org/confluence/display/FLINK/FLIP-321%253A%2BIntroduce%2Ban%2BAPI%2Bdeprecation%2Bprocess=gmail-imap=168709152300=AOvVaw0h_j72PiGBNM3BFmuHUOis
> 
> Comments are welcome!
> 
> Thanks,
> 
> Jiangjie (Becket) Qin
> 
> [1]
> https://www.google.com/url?q=https://cwiki.apache.org/confluence/display/FLINK/FLIP-196%253A%2BSource%2BAPI%2Bstability%2Bguarantees=gmail-imap=168709152300=AOvVaw0VDOsdIOFCOsGLlpPJq-ZD
> [2]
> https://www.google.com/url?q=https://cwiki.apache.org/confluence/display/FLINK/FLIP-197%253A%2BAPI%2Bstability%2Bgraduation%2Bprocess=gmail-imap=168709152300=AOvVaw11GfC5R4cD44a8amORO8EY



[jira] [Commented] (FLINK-31963) java.lang.ArrayIndexOutOfBoundsException when scaling down with unaligned checkpoints

2023-05-16 Thread Stefan Richter (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17723051#comment-17723051
 ] 

Stefan Richter commented on FLINK-31963:


Yes, PR is currently in review here: https://github.com/apache/flink/pull/22584

> java.lang.ArrayIndexOutOfBoundsException when scaling down with unaligned 
> checkpoints
> -
>
> Key: FLINK-31963
> URL: https://issues.apache.org/jira/browse/FLINK-31963
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.17.0, 1.16.1, 1.15.4, 1.18.0
> Environment: Flink: 1.17.0
> FKO: 1.4.0
> StateBackend: RocksDB(Genetic Incremental Checkpoint & Unaligned Checkpoint 
> enabled)
>    Reporter: Tan Kim
>Assignee: Stefan Richter
>Priority: Critical
>  Labels: stability
> Attachments: image-2023-04-29-02-49-05-607.png, jobmanager_error.txt, 
> taskmanager_error.txt
>
>
> I'm testing Autoscaler through Kubernetes Operator and I'm facing the 
> following issue.
> As you know, when a job is scaled down through the autoscaler, the job 
> manager and task manager go down and then back up again.
> When this happens, an index out of bounds exception is thrown and the state 
> is not restored from a checkpoint.
> [~gyfora] told me via the Flink Slack troubleshooting channel that this is 
> likely an issue with Unaligned Checkpoint and not an issue with the 
> autoscaler, but I'm opening a ticket with Gyula for more clarification.
> Please see the attached JM and TM error logs.
> Thank you.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31963) java.lang.ArrayIndexOutOfBoundsException when scaling down with unaligned checkpoints

2023-05-12 Thread Stefan Richter (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17722120#comment-17722120
 ] 

Stefan Richter commented on FLINK-31963:


Seems that this is similar to the problem described in FLINK-27031.

> java.lang.ArrayIndexOutOfBoundsException when scaling down with unaligned 
> checkpoints
> -
>
> Key: FLINK-31963
> URL: https://issues.apache.org/jira/browse/FLINK-31963
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.17.0
> Environment: Flink: 1.17.0
> FKO: 1.4.0
> StateBackend: RocksDB(Genetic Incremental Checkpoint & Unaligned Checkpoint 
> enabled)
>    Reporter: Tan Kim
>Assignee: Stefan Richter
>Priority: Critical
>  Labels: stability
> Attachments: image-2023-04-29-02-49-05-607.png, jobmanager_error.txt, 
> taskmanager_error.txt
>
>
> I'm testing Autoscaler through Kubernetes Operator and I'm facing the 
> following issue.
> As you know, when a job is scaled down through the autoscaler, the job 
> manager and task manager go down and then back up again.
> When this happens, an index out of bounds exception is thrown and the state 
> is not restored from a checkpoint.
> [~gyfora] told me via the Flink Slack troubleshooting channel that this is 
> likely an issue with Unaligned Checkpoint and not an issue with the 
> autoscaler, but I'm opening a ticket with Gyula for more clarification.
> Please see the attached JM and TM error logs.
> Thank you.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31963) java.lang.ArrayIndexOutOfBoundsException when scaling down with unaligned checkpoints

2023-05-11 Thread Stefan Richter (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17721788#comment-17721788
 ] 

Stefan Richter commented on FLINK-31963:


I have a local reproducer as well as a fix, will open a PR once I have written 
the tests.

> java.lang.ArrayIndexOutOfBoundsException when scaling down with unaligned 
> checkpoints
> -
>
> Key: FLINK-31963
> URL: https://issues.apache.org/jira/browse/FLINK-31963
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.17.0
> Environment: Flink: 1.17.0
> FKO: 1.4.0
> StateBackend: RocksDB(Genetic Incremental Checkpoint & Unaligned Checkpoint 
> enabled)
>    Reporter: Tan Kim
>Assignee: Stefan Richter
>Priority: Critical
>  Labels: stability
> Attachments: image-2023-04-29-02-49-05-607.png, jobmanager_error.txt, 
> taskmanager_error.txt
>
>
> I'm testing Autoscaler through Kubernetes Operator and I'm facing the 
> following issue.
> As you know, when a job is scaled down through the autoscaler, the job 
> manager and task manager go down and then back up again.
> When this happens, an index out of bounds exception is thrown and the state 
> is not restored from a checkpoint.
> [~gyfora] told me via the Flink Slack troubleshooting channel that this is 
> likely an issue with Unaligned Checkpoint and not an issue with the 
> autoscaler, but I'm opening a ticket with Gyula for more clarification.
> Please see the attached JM and TM error logs.
> Thank you.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31963) java.lang.ArrayIndexOutOfBoundsException when scaling down with unaligned checkpoints

2023-05-10 Thread Stefan Richter (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17721387#comment-17721387
 ] 

Stefan Richter commented on FLINK-31963:


[~masteryhx] Did your job also make use of side-outputs? Just fishing among 
things that are potentially "unusual" about the jobs.

> java.lang.ArrayIndexOutOfBoundsException when scaling down with unaligned 
> checkpoints
> -
>
> Key: FLINK-31963
> URL: https://issues.apache.org/jira/browse/FLINK-31963
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.17.0
> Environment: Flink: 1.17.0
> FKO: 1.4.0
> StateBackend: RocksDB(Genetic Incremental Checkpoint & Unaligned Checkpoint 
> enabled)
>Reporter: Tan Kim
>Priority: Critical
>  Labels: stability
> Attachments: image-2023-04-29-02-49-05-607.png, jobmanager_error.txt, 
> taskmanager_error.txt
>
>
> I'm testing Autoscaler through Kubernetes Operator and I'm facing the 
> following issue.
> As you know, when a job is scaled down through the autoscaler, the job 
> manager and task manager go down and then back up again.
> When this happens, an index out of bounds exception is thrown and the state 
> is not restored from a checkpoint.
> [~gyfora] told me via the Flink Slack troubleshooting channel that this is 
> likely an issue with Unaligned Checkpoint and not an issue with the 
> autoscaler, but I'm opening a ticket with Gyula for more clarification.
> Please see the attached JM and TM error logs.
> Thank you.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31963) java.lang.ArrayIndexOutOfBoundsException when scaling down with unaligned checkpoints

2023-05-08 Thread Stefan Richter (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17720596#comment-17720596
 ] 

Stefan Richter commented on FLINK-31963:


Hi, just to clarify: when you say a checkpoint that fails once fails always - 
does this only apply for restore with rescaling or can you also not recover 
from the CP when the parallelism remains unchanged? If it only happens with 
rescaling, can you at least recover for some parallelism values or for no 
change at all?

> java.lang.ArrayIndexOutOfBoundsException when scaling down with unaligned 
> checkpoints
> -
>
> Key: FLINK-31963
> URL: https://issues.apache.org/jira/browse/FLINK-31963
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.17.0
> Environment: Flink: 1.17.0
> FKO: 1.4.0
> StateBackend: RocksDB(Genetic Incremental Checkpoint & Unaligned Checkpoint 
> enabled)
>Reporter: Tan Kim
>Priority: Critical
>  Labels: stability
> Attachments: image-2023-04-29-02-49-05-607.png, jobmanager_error.txt, 
> taskmanager_error.txt
>
>
> I'm testing Autoscaler through Kubernetes Operator and I'm facing the 
> following issue.
> As you know, when a job is scaled down through the autoscaler, the job 
> manager and task manager go down and then back up again.
> When this happens, an index out of bounds exception is thrown and the state 
> is not restored from a checkpoint.
> [~gyfora] told me via the Flink Slack troubleshooting channel that this is 
> likely an issue with Unaligned Checkpoint and not an issue with the 
> autoscaler, but I'm opening a ticket with Gyula for more clarification.
> Please see the attached JM and TM error logs.
> Thank you.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   3   4   5   6   7   8   9   10   >