[jira] [Commented] (FLINK-18647) How to handle processing time timers with bounded input

2022-10-12 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17616283#comment-17616283
 ] 

Piotr Nowojski commented on FLINK-18647:


+1 for moving the discussion to the dev mailing list. 

>From [~dkapoor1] we already know that there is a need for switching the 
>WindowOperator between fire immediately, and wait for timers to fire 
>naturally. I'm pretty sure this is a valid request, with more potential use 
>cases. It hasn't been reported too much so far, because not that many Flink 
>streaming users are using their streaming jobs with bounded inputs.

> How to handle processing time timers with bounded input
> ---
>
> Key: FLINK-18647
> URL: https://issues.apache.org/jira/browse/FLINK-18647
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Affects Versions: 1.11.0
>Reporter: Piotr Nowojski
>Priority: Not a Priority
>  Labels: auto-deprioritized-critical, auto-deprioritized-major, 
> stale-minor
>
> (most of this description comes from an offline discussion between me, 
> [~AHeise], [~roman_khachatryan], [~aljoscha] and [~sunhaibotb])
> In case of end of input (for example for bounded sources), all pending 
> (untriggered) processing time timers are ignored/dropped. In some cases this 
> is desirable, but for example for {{WindowOperator}} it means that last 
> trailing window will not be triggered, causing an apparent data loss.
> There are a couple of ideas what should be considered.
> 1. Provide a way for users to decide what to do with such timers: cancel, 
> wait, trigger immediately. For example by overloading the existing methods: 
> {{ProcessingTimeService#registerTimer}} and 
> {{ProcessingTimeService#scheduleAtFixedRate}} in the following way:
> {code:java}
> ScheduledFuture registerTimer(long timestamp, ProcessingTimeCallback 
> target, TimerAction timerAction);
> enum TimerAction { 
> CANCEL_ON_END_OF_INPUT, 
> TRIGGER_ON_END_OF_INPUT,
> WAIT_ON_END_OF_INPUT}
> {code}
> or maybe:
> {code}
> public interface TimerAction {
> void onEndOfInput(ScheduledFuture timer);
> }
> {code}
> But this would also mean we store additional state with each timer and we 
> need to modify the serialisation format (providing some kind of state 
> migration path) and potentially increase the size foot print of the timers.
> Extra overhead could have been avoided via some kind of {{Map TimerAction>}}, with lack of entry meaning some default value.
> 2. 
> Also another way to solve this problem might be let the operator code decide 
> what to do with the given timer. Either ask an operator what should happen 
> with given timer (a), or let the operator iterate and cancel the timers on 
> endOfInput() (b), or just fire the timer with some endOfInput flag (c).
> I think none of the (a), (b), and (c) would require braking API changes, no 
> state changes and no additional overheads. Just the logic what to do with the 
> timer would have to be “hardcoded” in the operator’s code. (which btw might 
> even has an additional benefit of being easier to change in case of some 
> bugs, like a timer was registered with wrong/incorrect {{TimerAction}}).
> This is complicated a bit by a question, how (if at all?) options a), b) or 
> c) should be exposed to UDFs? 
> 3. 
> Maybe we need a combination of both? Pre existing operators could implement 
> some custom handling of this issue (via 2a, 2b or 2c), while UDFs could be 
> handled by 1.? 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29584) Upgrade java 11 version on the microbenchmark worker

2022-10-11 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17615773#comment-17615773
 ] 

Piotr Nowojski commented on FLINK-29584:


I think it might, but I think the issue we are seeing here is probably related 
to a quite ancient early access build of openjdk. I doubt it would affect many 
production setups.

> Upgrade java 11 version on the microbenchmark worker
> 
>
> Key: FLINK-29584
> URL: https://issues.apache.org/jira/browse/FLINK-29584
> Project: Flink
>  Issue Type: Improvement
>  Components: Benchmarks
>Affects Versions: 1.17.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
> Fix For: 1.17.0
>
>
> It looks like the older JDK 11 builds have problems with JIT in the 
> microbenchmarks, as for example visible 
> [here|http://codespeed.dak8s.net:8000/timeline/?ben=globalWindow=2]. 
> Locally I was able to reproduce this problem and the issue goes away after 
> upgrading to a newer JDK 11 build.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29584) Upgrade java 11 version on the microbenchmark worker

2022-10-11 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-29584:
--

 Summary: Upgrade java 11 version on the microbenchmark worker
 Key: FLINK-29584
 URL: https://issues.apache.org/jira/browse/FLINK-29584
 Project: Flink
  Issue Type: Improvement
  Components: Benchmarks
Affects Versions: 1.17.0
Reporter: Piotr Nowojski
Assignee: Piotr Nowojski
 Fix For: 1.17.0


It looks like the older JDK 11 builds have problems with JIT in the 
microbenchmarks, as for example visible 
[here|http://codespeed.dak8s.net:8000/timeline/?ben=globalWindow=2]. 
Locally I was able to reproduce this problem and the issue goes away after 
upgrading to a newer JDK 11 build.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-28474) ChannelStateWriteResult may not fail after checkpoint abort

2022-10-07 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-28474.
--
Fix Version/s: (was: 1.15.3)
   (was: 1.14.7)
   (was: 1.16.1)
 Assignee: fanrui
   Resolution: Fixed

Merged to master as a6119121164^..a6119121164

Thanks for the fix [~fanrui].

I think there is no need to back port it to the previous releases, as this bug 
doesn't cause any symptoms without FLINK-26803

> ChannelStateWriteResult may not fail after checkpoint abort
> ---
>
> Key: FLINK-28474
> URL: https://issues.apache.org/jira/browse/FLINK-28474
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.14.5, 1.15.1
>Reporter: fanrui
>Assignee: fanrui
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
> Attachments: image-2022-07-09-22-21-24-417.png
>
>
> After Checkpoint abort, ChannelStateWriteResult should fail.
> But if _channelStateWriter.start(id, checkpointOptions);_ is executed after 
> Checkpoint abort, ChannelStateWriteResult will not fail.
>  
> h2. Cause Analysis:
> When abort checkpoint, channelStateWriter.start(id, checkpointOptions); may 
> not be executed yet. These checkpointIds will be stored in the 
> abortedCheckpointIds of SubtaskCheckpointCoordinatorImpl, and when 
> checkpointState is called, it will check if the checkpointId should be 
> aborted.
> _ChannelStateWriter.abort(checkpointId, exception, true) should also be 
> executed here._
> The unit test can reproduce this bug.
> !image-2022-07-09-22-21-24-417.png|width=803,height=307!
>  
> Note: channelStateWriter.abort is only called in notifyCheckpointAborted, it 
> doesn't account for channelStateWriter.start after notifyCheckpointAborted.
> JIRA: FLINK-17869
> commit: 
> https://github.com/apache/flink/pull/12478/commits/22c99845ef4f863f1753d17b109fd2faecc8201e
>  
> The bug will affect the new feature FLINK-26803, because the channel state 
> file can be closed only after the Checkpoints of all tasks of the shared file 
> are complete or abort. So when the checkpoint of some tasks fails, if abort 
> is not called, the file cannot be closed and all tasks sharing the file 
> cannot execute inputChannelStateHandles.completeExceptionally(e); and 
> resultSubpartitionStateHandles.completeExceptionally(e); , 
> AsyncCheckpointRunnable will wait forever.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-16908) FlinkKafkaProducerITCase testScaleUpAfterScalingDown Timeout expired while initializing transactional state in 60000ms.

2022-09-30 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-16908:
---
Fix Version/s: (was: 1.17.0)

> FlinkKafkaProducerITCase testScaleUpAfterScalingDown Timeout expired while 
> initializing transactional state in 6ms.
> ---
>
> Key: FLINK-16908
> URL: https://issues.apache.org/jira/browse/FLINK-16908
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.11.0, 1.12.0, 1.13.2, 1.14.3
>Reporter: Piotr Nowojski
>Assignee: Fabian Paul
>Priority: Minor
>  Labels: auto-deprioritized-major, stale-assigned, test-stability
>
> https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6889=logs=c5f0071e-1851-543e-9a45-9ac140befc32=f66652e3-384e-5b25-be29-abfea69ea8da
> {noformat}
> [ERROR] 
> testScaleUpAfterScalingDown(org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase)
>   Time elapsed: 64.353 s  <<< ERROR!
> org.apache.kafka.common.errors.TimeoutException: Timeout expired while 
> initializing transactional state in 6ms.
> {noformat}
> After this initial error many other tests (I think all following unit tests) 
> failed with errors like:
> {noformat}
> [ERROR] 
> testFailAndRecoverSameCheckpointTwice(org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase)
>   Time elapsed: 7.895 s  <<< FAILURE!
> java.lang.AssertionError: Detected producer leak. Thread name: 
> kafka-producer-network-thread | producer-196
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase.checkProducerLeak(FlinkKafkaProducerITCase.java:675)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase.testFailAndRecoverSameCheckpointTwice(FlinkKafkaProducerITCase.java:311)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-21307) Revisit activation model of FlinkSecurityManager

2022-09-30 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-21307:
---
Fix Version/s: (was: 1.17.0)

> Revisit activation model of FlinkSecurityManager
> 
>
> Key: FLINK-21307
> URL: https://issues.apache.org/jira/browse/FLINK-21307
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.13.0
>Reporter: Robert Metzger
>Priority: Minor
>  Labels: auto-deprioritized-critical, auto-deprioritized-major
>
> In FLINK-15156, we introduced a feature that allows users to log or 
> completely disable calls to System.exit(). This feature is enabled for 
> certain threads / code sections intended to execute user-code.
> The activation of the security manager (for monitoring user calls to 
> System.exit() is currently not well-defined, and only implemented on a 
> best-effort basis.
> This ticket is to revisit the activation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-15550) testCancelTaskExceptionAfterTaskMarkedFailed failed on azure

2022-09-30 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-15550:
---
Fix Version/s: (was: 1.17.0)

> testCancelTaskExceptionAfterTaskMarkedFailed failed on azure
> 
>
> Key: FLINK-15550
> URL: https://issues.apache.org/jira/browse/FLINK-15550
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.11.0, 1.12.5, 1.13.6, 1.14.3
>Reporter: Yun Tang
>Priority: Major
>  Labels: pull-request-available, stale-assigned
>
> Instance: 
> https://dev.azure.com/rmetzger/Flink/_build/results?buildId=4241=ms.vss-test-web.build-test-results-tab=12434=108939=debug
> {code:java}
> java.lang.AssertionError: expected: but was:
>   at 
> org.apache.flink.runtime.taskmanager.TaskTest.testCancelTaskExceptionAfterTaskMarkedFailed(TaskTest.java:525)
> {code}
> {code:java}
> expected: but was:
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-26568) BlockingShuffleITCase.testDeletePartitionFileOfBoundedBlockingShuffle timing out on Azure

2022-09-30 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26568?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-26568:
---
Fix Version/s: (was: 1.17.0)

> BlockingShuffleITCase.testDeletePartitionFileOfBoundedBlockingShuffle timing 
> out on Azure
> -
>
> Key: FLINK-26568
> URL: https://issues.apache.org/jira/browse/FLINK-26568
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network, Runtime / Task
>Affects Versions: 1.15.0
>Reporter: Matthias Pohl
>Priority: Major
>  Labels: test-stability
> Attachments: image-2022-03-22-18-19-53-171.png
>
>
> [This 
> build|https://dev.azure.com/mapohl/flink/_build/results?buildId=845=logs=0a15d512-44ac-5ba5-97ab-13a5d066c22c=9a028d19-6c4b-5a4e-d378-03fca149d0b1=12865]
>  timed out due the test 
> {{BlockingShuffleITCase.testDeletePartitionFileOfBoundedBlockingShuffle}} not 
> finishing.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-23970) Split off the behaviour for finished StreamTask(s)

2022-09-30 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-23970:
---
Fix Version/s: (was: 1.17.0)

> Split off the behaviour for finished StreamTask(s)
> --
>
> Key: FLINK-23970
> URL: https://issues.apache.org/jira/browse/FLINK-23970
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing
>Reporter: Yun Gao
>Priority: Major
>
> We will have a double check on how we could better abstract the behavior of 
> the tasks marked as finished on recovery, the target is to make the behaviors 
> implemented as centralized as we can (like introducing specialized 
> _FinishedStreamTask_).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-18647) How to handle processing time timers with bounded input

2022-09-29 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17611059#comment-17611059
 ] 

Piotr Nowojski edited comment on FLINK-18647 at 9/29/22 2:20 PM:
-

{quote}
I tend to we directly head to the final solution without the shortcuts one. How 
do you think about that ?
{quote}
+1 for this. I don't think the final solution would require much more work then 
the intermediate solution, especially that we would have to make sure that any 
intermediate solution doesn't conflict with the final one.

{quote}
However, I think it might be not easy to use to let users to specify the 
actions when building graphs. For one thing, operators like window operator 
seems always need to fire the timers, thus it might be directly specified by 
the operator itself.
{quote}
I'm not sure if I understand your concern [~gaoyunhaii]? I have a feeling that 
providing something like 
{{org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator#setEndOfInputTimerBehaviour}}
 might be good solution here. We could specify that:
* each operator might have different default behaviour
* some operators might override/ignore/reject such changes, for all/some timers 
- like maybe hypothetical {{WindowOperatorWithTTLTimers}} registering two 
different types of timers could honour the setting for firing results, but 
would always drop the TTL timers 
Furthermore, actually {{WindowOperator}} users might be interested in either of 
the three settings for the processing time windows - depending on the business 
logic it might be the most appropriate to either: fire immediately, drop the 
timers, or wait for the timers to fire naturally. 



was (Author: pnowojski):
{quote}
I tend to we directly head to the final solution without the shortcuts one. How 
do you think about that ?
{quote}
+1 for this.

{quote}
However, I think it might be not easy to use to let users to specify the 
actions when building graphs. For one thing, operators like window operator 
seems always need to fire the timers, thus it might be directly specified by 
the operator itself.
{quote}
I'm not sure if I understand your concern [~gaoyunhaii]? I have a feeling that 
providing something like 
{{org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator#setEndOfInputTimerBehaviour}}
 might be good solution here. We could specify that:
* each operator might have different default behaviour
* some operators might override/ignore/reject such changes, for all/some timers 
- like maybe hypothetical {{WindowOperatorWithTTLTimers}} registering two 
different types of timers could honour the setting for firing results, but 
would always drop the TTL timers 
Furthermore, actually {{WindowOperator}} users might be interested in either of 
the three settings for the processing time windows - depending on the business 
logic it might be the most appropriate to either: fire immediately, drop the 
timers, or wait for the timers to fire naturally. 


> How to handle processing time timers with bounded input
> ---
>
> Key: FLINK-18647
> URL: https://issues.apache.org/jira/browse/FLINK-18647
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Affects Versions: 1.11.0
>Reporter: Piotr Nowojski
>Priority: Not a Priority
>  Labels: auto-deprioritized-critical, auto-deprioritized-major, 
> stale-minor
>
> (most of this description comes from an offline discussion between me, 
> [~AHeise], [~roman_khachatryan], [~aljoscha] and [~sunhaibotb])
> In case of end of input (for example for bounded sources), all pending 
> (untriggered) processing time timers are ignored/dropped. In some cases this 
> is desirable, but for example for {{WindowOperator}} it means that last 
> trailing window will not be triggered, causing an apparent data loss.
> There are a couple of ideas what should be considered.
> 1. Provide a way for users to decide what to do with such timers: cancel, 
> wait, trigger immediately. For example by overloading the existing methods: 
> {{ProcessingTimeService#registerTimer}} and 
> {{ProcessingTimeService#scheduleAtFixedRate}} in the following way:
> {code:java}
> ScheduledFuture registerTimer(long timestamp, ProcessingTimeCallback 
> target, TimerAction timerAction);
> enum TimerAction { 
> CANCEL_ON_END_OF_INPUT, 
> TRIGGER_ON_END_OF_INPUT,
> WAIT_ON_END_OF_INPUT}
> {code}
> or maybe:
> {code}
> public interface TimerAction {
> void onEndOfInput(ScheduledFuture timer);
> }
> {code}
> But this would also mean we store additional state with each timer and we 
> need to modify the serialisation format (providing some kind of state 
> migration path) and potentially increase the size foot print of the timers.
> Extra overhead could have been 

[jira] [Commented] (FLINK-18647) How to handle processing time timers with bounded input

2022-09-29 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17611059#comment-17611059
 ] 

Piotr Nowojski commented on FLINK-18647:


{quote}
I tend to we directly head to the final solution without the shortcuts one. How 
do you think about that ?
{quote}
+1 for this.

{quote}
However, I think it might be not easy to use to let users to specify the 
actions when building graphs. For one thing, operators like window operator 
seems always need to fire the timers, thus it might be directly specified by 
the operator itself.
{quote}
I'm not sure if I understand your concern [~gaoyunhaii]? I have a feeling that 
providing something like 
{{org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator#setEndOfInputTimerBehaviour}}
 might be good solution here. We could specify that:
* each operator might have different default behaviour
* some operators might override/ignore/reject such changes, for all/some timers 
- like maybe hypothetical {{WindowOperatorWithTTLTimers}} registering two 
different types of timers could honour the setting for firing results, but 
would always drop the TTL timers 
Furthermore, actually {{WindowOperator}} users might be interested in either of 
the three settings for the processing time windows - depending on the business 
logic it might be the most appropriate to either: fire immediately, drop the 
timers, or wait for the timers to fire naturally. 


> How to handle processing time timers with bounded input
> ---
>
> Key: FLINK-18647
> URL: https://issues.apache.org/jira/browse/FLINK-18647
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Affects Versions: 1.11.0
>Reporter: Piotr Nowojski
>Priority: Not a Priority
>  Labels: auto-deprioritized-critical, auto-deprioritized-major, 
> stale-minor
>
> (most of this description comes from an offline discussion between me, 
> [~AHeise], [~roman_khachatryan], [~aljoscha] and [~sunhaibotb])
> In case of end of input (for example for bounded sources), all pending 
> (untriggered) processing time timers are ignored/dropped. In some cases this 
> is desirable, but for example for {{WindowOperator}} it means that last 
> trailing window will not be triggered, causing an apparent data loss.
> There are a couple of ideas what should be considered.
> 1. Provide a way for users to decide what to do with such timers: cancel, 
> wait, trigger immediately. For example by overloading the existing methods: 
> {{ProcessingTimeService#registerTimer}} and 
> {{ProcessingTimeService#scheduleAtFixedRate}} in the following way:
> {code:java}
> ScheduledFuture registerTimer(long timestamp, ProcessingTimeCallback 
> target, TimerAction timerAction);
> enum TimerAction { 
> CANCEL_ON_END_OF_INPUT, 
> TRIGGER_ON_END_OF_INPUT,
> WAIT_ON_END_OF_INPUT}
> {code}
> or maybe:
> {code}
> public interface TimerAction {
> void onEndOfInput(ScheduledFuture timer);
> }
> {code}
> But this would also mean we store additional state with each timer and we 
> need to modify the serialisation format (providing some kind of state 
> migration path) and potentially increase the size foot print of the timers.
> Extra overhead could have been avoided via some kind of {{Map TimerAction>}}, with lack of entry meaning some default value.
> 2. 
> Also another way to solve this problem might be let the operator code decide 
> what to do with the given timer. Either ask an operator what should happen 
> with given timer (a), or let the operator iterate and cancel the timers on 
> endOfInput() (b), or just fire the timer with some endOfInput flag (c).
> I think none of the (a), (b), and (c) would require braking API changes, no 
> state changes and no additional overheads. Just the logic what to do with the 
> timer would have to be “hardcoded” in the operator’s code. (which btw might 
> even has an additional benefit of being easier to change in case of some 
> bugs, like a timer was registered with wrong/incorrect {{TimerAction}}).
> This is complicated a bit by a question, how (if at all?) options a), b) or 
> c) should be exposed to UDFs? 
> 3. 
> Maybe we need a combination of both? Pre existing operators could implement 
> some custom handling of this issue (via 2a, 2b or 2c), while UDFs could be 
> handled by 1.? 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-29406) Expose Finish Method For TableFunction

2022-09-26 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski reassigned FLINK-29406:
--

Assignee: lincoln lee

> Expose Finish Method For TableFunction
> --
>
> Key: FLINK-29406
> URL: https://issues.apache.org/jira/browse/FLINK-29406
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.14.5, 1.16.0, 1.15.2
>Reporter: lincoln lee
>Assignee: lincoln lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> FLIP-260: Expose Finish Method For TableFunction
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-260%3A+Expose+Finish+Method+For+TableFunction



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-18647) How to handle processing time timers with bounded input

2022-09-21 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17607948#comment-17607948
 ] 

Piotr Nowojski commented on FLINK-18647:


[~dkapoor1], is there some other ticket for the
{quote}
Processing Time CEP is broken in minicluster
{quote}
that you are referring to?

Apart of that, I would be afraid that even changing the behaviour of the 
minicluster to waiting for timers before shutdown would be problematic, 
prolonging the tests. Keep in mind that as a workaround in tests, you can keep 
alive your artificial source until some timer fires.

> How to handle processing time timers with bounded input
> ---
>
> Key: FLINK-18647
> URL: https://issues.apache.org/jira/browse/FLINK-18647
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Affects Versions: 1.11.0
>Reporter: Piotr Nowojski
>Priority: Not a Priority
>  Labels: auto-deprioritized-critical, auto-deprioritized-major, 
> stale-minor
>
> (most of this description comes from an offline discussion between me, 
> [~AHeise], [~roman_khachatryan], [~aljoscha] and [~sunhaibotb])
> In case of end of input (for example for bounded sources), all pending 
> (untriggered) processing time timers are ignored/dropped. In some cases this 
> is desirable, but for example for {{WindowOperator}} it means that last 
> trailing window will not be triggered, causing an apparent data loss.
> There are a couple of ideas what should be considered.
> 1. Provide a way for users to decide what to do with such timers: cancel, 
> wait, trigger immediately. For example by overloading the existing methods: 
> {{ProcessingTimeService#registerTimer}} and 
> {{ProcessingTimeService#scheduleAtFixedRate}} in the following way:
> {code:java}
> ScheduledFuture registerTimer(long timestamp, ProcessingTimeCallback 
> target, TimerAction timerAction);
> enum TimerAction { 
> CANCEL_ON_END_OF_INPUT, 
> TRIGGER_ON_END_OF_INPUT,
> WAIT_ON_END_OF_INPUT}
> {code}
> or maybe:
> {code}
> public interface TimerAction {
> void onEndOfInput(ScheduledFuture timer);
> }
> {code}
> But this would also mean we store additional state with each timer and we 
> need to modify the serialisation format (providing some kind of state 
> migration path) and potentially increase the size foot print of the timers.
> Extra overhead could have been avoided via some kind of {{Map TimerAction>}}, with lack of entry meaning some default value.
> 2. 
> Also another way to solve this problem might be let the operator code decide 
> what to do with the given timer. Either ask an operator what should happen 
> with given timer (a), or let the operator iterate and cancel the timers on 
> endOfInput() (b), or just fire the timer with some endOfInput flag (c).
> I think none of the (a), (b), and (c) would require braking API changes, no 
> state changes and no additional overheads. Just the logic what to do with the 
> timer would have to be “hardcoded” in the operator’s code. (which btw might 
> even has an additional benefit of being easier to change in case of some 
> bugs, like a timer was registered with wrong/incorrect {{TimerAction}}).
> This is complicated a bit by a question, how (if at all?) options a), b) or 
> c) should be exposed to UDFs? 
> 3. 
> Maybe we need a combination of both? Pre existing operators could implement 
> some custom handling of this issue (via 2a, 2b or 2c), while UDFs could be 
> handled by 1.? 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-27101) Periodically break the chain of incremental checkpoint (trigger checkpoints via REST API)

2022-09-20 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-27101:
---
Component/s: Runtime / REST

> Periodically break the chain of incremental checkpoint (trigger checkpoints 
> via REST API)
> -
>
> Key: FLINK-27101
> URL: https://issues.apache.org/jira/browse/FLINK-27101
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Checkpointing, Runtime / REST
>Reporter: Steven Zhen Wu
>Assignee: Jiale Tan
>Priority: Major
>  Labels: pull-request-available
>
> Incremental checkpoint is almost a must for large-state jobs. It greatly 
> reduces the bytes uploaded to DFS per checkpoint. However, there are  a few 
> implications from incremental checkpoint that are problematic for production 
> operations.  Will use S3 as an example DFS for the rest of description.
> 1. Because there is no way to deterministically know how far back the 
> incremental checkpoint can refer to files uploaded to S3, it is very 
> difficult to set S3 bucket/object TTL. In one application, we have observed 
> Flink checkpoint referring to files uploaded over 6 months ago. S3 TTL can 
> corrupt the Flink checkpoints.
> S3 TTL is important for a few reasons
> - purge orphaned files (like external checkpoints from previous deployments) 
> to keep the storage cost in check. This problem can be addressed by 
> implementing proper garbage collection (similar to JVM) by traversing the 
> retained checkpoints from all jobs and traverse the file references. But that 
> is an expensive solution from engineering cost perspective.
> - Security and privacy. E.g., there may be requirement that Flink state can't 
> keep the data for more than some duration threshold (hours/days/weeks). 
> Application is expected to purge keys to satisfy the requirement. However, 
> with incremental checkpoint and how deletion works in RocksDB, it is hard to 
> set S3 TTL to purge S3 files. Even though those old S3 files don't contain 
> live keys, they may still be referrenced by retained Flink checkpoints.
> 2. Occasionally, corrupted checkpoint files (on S3) are observed. As a 
> result, restoring from checkpoint failed. With incremental checkpoint, it 
> usually doesn't help to try other older checkpoints, because they may refer 
> to the same corrupted file. It is unclear whether the corruption happened 
> before or during S3 upload. This risk can be mitigated with periodical 
> savepoints.
> It all boils down to periodical full snapshot (checkpoint or savepoint) to 
> deterministically break the chain of incremental checkpoints. Search the jira 
> history, the behavior that FLINK-23949 [1] trying to fix is actually close to 
> what we would need here.
> There are a few options
> 1. Periodically trigger savepoints (via control plane). This is actually not 
> a bad practice and might be appealing to some people. The problem is that it 
> requires a job deployment to break the chain of incremental checkpoint. 
> periodical job deployment may sound hacky. If we make the behavior of full 
> checkpoint after a savepoint (fixed in FLINK-23949) configurable, it might be 
> an acceptable compromise. The benefit is that no job deployment is required 
> after savepoints.
> 2. Build the feature in Flink incremental checkpoint. Periodically (with some 
> cron style config) trigger a full checkpoint to break the incremental chain. 
> If the full checkpoint failed (due to whatever reason), the following 
> checkpoints should attempt full checkpoint as well until one successful full 
> checkpoint is completed.
> 3. For the security/privacy requirement, the main thing is to apply 
> compaction on the deleted keys. That could probably avoid references to the 
> old files. Is there any RocksDB compation can achieve full compaction of 
> removing old delete markers. Recent delete markers are fine
> [1] https://issues.apache.org/jira/browse/FLINK-23949



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-27101) Periodically break the chain of incremental checkpoint (trigger checkpoints via REST API)

2022-09-20 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-27101:
---
Summary: Periodically break the chain of incremental checkpoint (trigger 
checkpoints via REST API)  (was: Periodically break the chain of incremental 
checkpoint)

> Periodically break the chain of incremental checkpoint (trigger checkpoints 
> via REST API)
> -
>
> Key: FLINK-27101
> URL: https://issues.apache.org/jira/browse/FLINK-27101
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Checkpointing
>Reporter: Steven Zhen Wu
>Assignee: Jiale Tan
>Priority: Major
>  Labels: pull-request-available
>
> Incremental checkpoint is almost a must for large-state jobs. It greatly 
> reduces the bytes uploaded to DFS per checkpoint. However, there are  a few 
> implications from incremental checkpoint that are problematic for production 
> operations.  Will use S3 as an example DFS for the rest of description.
> 1. Because there is no way to deterministically know how far back the 
> incremental checkpoint can refer to files uploaded to S3, it is very 
> difficult to set S3 bucket/object TTL. In one application, we have observed 
> Flink checkpoint referring to files uploaded over 6 months ago. S3 TTL can 
> corrupt the Flink checkpoints.
> S3 TTL is important for a few reasons
> - purge orphaned files (like external checkpoints from previous deployments) 
> to keep the storage cost in check. This problem can be addressed by 
> implementing proper garbage collection (similar to JVM) by traversing the 
> retained checkpoints from all jobs and traverse the file references. But that 
> is an expensive solution from engineering cost perspective.
> - Security and privacy. E.g., there may be requirement that Flink state can't 
> keep the data for more than some duration threshold (hours/days/weeks). 
> Application is expected to purge keys to satisfy the requirement. However, 
> with incremental checkpoint and how deletion works in RocksDB, it is hard to 
> set S3 TTL to purge S3 files. Even though those old S3 files don't contain 
> live keys, they may still be referrenced by retained Flink checkpoints.
> 2. Occasionally, corrupted checkpoint files (on S3) are observed. As a 
> result, restoring from checkpoint failed. With incremental checkpoint, it 
> usually doesn't help to try other older checkpoints, because they may refer 
> to the same corrupted file. It is unclear whether the corruption happened 
> before or during S3 upload. This risk can be mitigated with periodical 
> savepoints.
> It all boils down to periodical full snapshot (checkpoint or savepoint) to 
> deterministically break the chain of incremental checkpoints. Search the jira 
> history, the behavior that FLINK-23949 [1] trying to fix is actually close to 
> what we would need here.
> There are a few options
> 1. Periodically trigger savepoints (via control plane). This is actually not 
> a bad practice and might be appealing to some people. The problem is that it 
> requires a job deployment to break the chain of incremental checkpoint. 
> periodical job deployment may sound hacky. If we make the behavior of full 
> checkpoint after a savepoint (fixed in FLINK-23949) configurable, it might be 
> an acceptable compromise. The benefit is that no job deployment is required 
> after savepoints.
> 2. Build the feature in Flink incremental checkpoint. Periodically (with some 
> cron style config) trigger a full checkpoint to break the incremental chain. 
> If the full checkpoint failed (due to whatever reason), the following 
> checkpoints should attempt full checkpoint as well until one successful full 
> checkpoint is completed.
> 3. For the security/privacy requirement, the main thing is to apply 
> compaction on the deleted keys. That could probably avoid references to the 
> old files. Is there any RocksDB compation can achieve full compaction of 
> removing old delete markers. Recent delete markers are fine
> [1] https://issues.apache.org/jira/browse/FLINK-23949



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-6755) Allow triggering Checkpoints through command line client

2022-09-19 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-6755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17458438#comment-17458438
 ] 

Piotr Nowojski edited comment on FLINK-6755 at 9/19/22 10:19 AM:
-

The motivation behind this feature request will be covered by FLINK-25276.

As mentioned above by Aljoscha, there might be still a value of exposing manual 
checkpoint triggering REST API hook, so I'm keeping this ticket open. However 
it doesn't look like such feature is well motivated. Implementation of this 
should be quite straightforward since Flink internally already supports this 
(FLINK-24280). It's just not exposed in anyway to the user.

edit: Although this idea might be still valid, I strongly think we should not 
expose checkpoint directory when triggering checkpoints as proposed in the 
description:
{noformat}
./bin/flink checkpoint  [checkpointDirectory]
{noformat}
Since checkpoints are owned fully by Flink, CLI/REST API call to trigger 
checkpoints should not expose anything like that. If anything, it should be 
just a simple trigger with optionally parameters like whether the checkpoint 
should be full or incremental.

The same remark applies to:
{noformat}
./bin/flink cancel -c [targetDirectory] 
{noformat}
and I don't see a point of supporting cancelling/stopping job with checkpoint.



was (Author: pnowojski):
The motivation behind this feature request will be covered by FLINK-25276.

As mentioned above by Aljoscha, there might be still a value of exposing manual 
checkpoint triggering REST API hook, so I'm keeping this ticket open. However 
it doesn't look like such feature is well motivated. Implementation of this 
should be quite straightforward since Flink internally already supports this 
(FLINK-24280). It's just not exposed in anyway to the user.

edit: Although this idea might be still valid, I strongly think we should not 
expose checkpoint directory when triggering checkpoints as proposed in the 
description:
{noformat}
./bin/flink checkpoint  [checkpointDirectory]
{noformat}
Since checkpoints are owned fully by Flink, CLI/REST API call to trigger 
checkpoints should not expose anything like that. If anything, it should be 
just a simple trigger with optionally parameters like whether the checkpoint 
should be full or incremental.

The same remark applies to:
{noformat}
./bin/flink cancel -c [targetDirectory] 
{nofrmat}
and I don't see a point of supporting cancelling/stopping job with checkpoint.


> Allow triggering Checkpoints through command line client
> 
>
> Key: FLINK-6755
> URL: https://issues.apache.org/jira/browse/FLINK-6755
> Project: Flink
>  Issue Type: New Feature
>  Components: Command Line Client, Runtime / Checkpointing
>Affects Versions: 1.3.0
>Reporter: Gyula Fora
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-unassigned
>
> The command line client currently only allows triggering (and canceling with) 
> Savepoints. 
> While this is good if we want to fork or modify the pipelines in a 
> non-checkpoint compatible way, now with incremental checkpoints this becomes 
> wasteful for simple job restarts/pipeline updates. 
> I suggest we add a new command: 
> ./bin/flink checkpoint  [checkpointDirectory]
> and a new flag -c for the cancel command to indicate we want to trigger a 
> checkpoint:
> ./bin/flink cancel -c [targetDirectory] 
> Otherwise this can work similar to the current savepoint taking logic, we 
> could probably even piggyback on the current messages by adding boolean flag 
> indicating whether it should be a savepoint or a checkpoint.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27530) FLIP-227: Support overdraft buffer

2022-09-19 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17606522#comment-17606522
 ] 

Piotr Nowojski commented on FLINK-27530:


[~godfreyhe] there were no braking changes that user should be aware of after 
this change, so there is no need for release notes. Unless we have changed the 
rules of how are we using the release notes field that I have not been aware 
of? If that's the case sorry.

> FLIP-227: Support overdraft buffer
> --
>
> Key: FLINK-27530
> URL: https://issues.apache.org/jira/browse/FLINK-27530
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing, Runtime / Network
>Reporter: fanrui
>Assignee: fanrui
>Priority: Major
> Fix For: 1.16.0
>
>
> This is the umbrella issue for the feature of unaligned checkpoints. Refer to 
> the 
> [FLIP-227|https://cwiki.apache.org/confluence/display/FLINK/FLIP-227%3A+Support+overdraft+buffer]
>   for more details.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-18647) How to handle processing time timers with bounded input

2022-09-19 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17606503#comment-17606503
 ] 

Piotr Nowojski edited comment on FLINK-18647 at 9/19/22 8:59 AM:
-

{quote}
I would suggest going for an implementation that only implements Option 1 and 
Option 2 (as Option 1 is roughly the same as Option 3 without the complexity of 
timer cancels). User code can easily convert Option 1 to Option 3 if they so 
desire by skipping the timer body. 
{quote}
I would be very careful with such change. Current behaviour makes sense for 
some use cases, like a code that is supposed to emit something every N seconds 
or TTLs. Also keep in mind that what you are suggesting would cause a braking 
change to a stable `@Public` API.

{quote}
logically users usually have a uniform requirements on the pending timers to 
keep the semantics consistent
{quote}
Is that true [~gaoyunhaii]? I've always thought that this is highly operator 
dependant. Apart of the testing purposes as pointed out by [~dkapoor1], I can 
see timers from different operator can have different use cases:
# mark end of some windowed aggregation
# handle CEP style timeouts, like emit record X if record Y hasn't arrived 
within 30 seconds after record Z
# handle timeouts when dealing with external systems. Something like in async 
function or maybe in some sinks. Do something if an external system doesn't 
respond within 30 seconds. 
# clean internal flink state, like some form of TTL

1. should be fired immediately on EOF, or waited. Depending on the business 
logic. Indeed I could see this being correlated through out the job - most 
likely all windowed operations should behave in the same way.
2. should be either dropped on EOF, or fired immediately, depending on the 
business logic. If firing immediately is the correct thing to do, waiting would 
be also correct, but inefficient. 
3. most likely can be dropped, as this should have been dealt by some kind of 
clean up code. For example `AsyncWaitOperator` is waiting for all async 
operations to complete anyway. But theoretically I could see this depending on 
the business logic.
4. ideally should be dropped on EOF. Can be also fired or waited, but either of 
those two is inefficient. When TTL is huge (hours, days or months) waiting can 
be impractical.

I can maybe see that in SQL/Table API, we are dealing only with options 1. and 
maybe 4., so the global configuration might be acceptable, if the TTL is small. 
But is that the case always? Do we have TTL like use cases of timers in SQL 
[~gaoyunhaii]? [~dwysakowicz] [~twalthr]?

I could see that allowing users to set this globally, might be 
harmful/confusing in the long run. For example user has an issue with some 
windowed operator, he changes the global setting in order to fix it, but 
inadvertently brakes something else, without realising it. 

Now that I think of potential solutions, maybe we should allow this to be 
configured via a call like 
`org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator#setEndOfInputTimerBehaviour`?
 In SQL either we could do this via some session variable or indeed via a 
global Table API config if TTL is not an issue? If it is, maybe SQL planner can 
set the windowed joins/aggregations behaviour to the global behaviour (from 
session/config variable), but set all TTLs to be ignored anyway? But I don't 
like the idea of allowing global config for DataStream users, since setting 
this per operator is very easy and IMO less error prone there.


was (Author: pnowojski):
{{quote}}
I would suggest going for an implementation that only implements Option 1 and 
Option 2 (as Option 1 is roughly the same as Option 3 without the complexity of 
timer cancels). User code can easily convert Option 1 to Option 3 if they so 
desire by skipping the timer body. 
{{quote}}
I would be very careful with such change. Current behaviour makes sense for 
some use cases, like a code that is supposed to emit something every N seconds 
or TTLs. Also keep in mind that what you are suggesting would cause a braking 
change to a stable `@Public` API.

{{quote}}
logically users usually have a uniform requirements on the pending timers to 
keep the semantics consistent
{{quote}}
Is that true [~gaoyunhaii]? I've always thought that this is highly operator 
dependant. Apart of the testing purposes as pointed out by [~dkapoor1], I can 
see timers from different operator can have different use cases:
# mark end of some windowed aggregation
# handle CEP style timeouts, like emit record X if record Y hasn't arrived 
within 30 seconds after record Z
# handle timeouts when dealing with external systems. Something like in async 
function or maybe in some sinks. Do something if an external system doesn't 
respond within 30 seconds. 
# clean internal flink state, like some form of TTL

1. should be fired immediately on EOF, 

[jira] [Commented] (FLINK-18647) How to handle processing time timers with bounded input

2022-09-19 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17606503#comment-17606503
 ] 

Piotr Nowojski commented on FLINK-18647:


{{quote}}
I would suggest going for an implementation that only implements Option 1 and 
Option 2 (as Option 1 is roughly the same as Option 3 without the complexity of 
timer cancels). User code can easily convert Option 1 to Option 3 if they so 
desire by skipping the timer body. 
{{quote}}
I would be very careful with such change. Current behaviour makes sense for 
some use cases, like a code that is supposed to emit something every N seconds 
or TTLs. Also keep in mind that what you are suggesting would cause a braking 
change to a stable `@Public` API.

{{quote}}
logically users usually have a uniform requirements on the pending timers to 
keep the semantics consistent
{{quote}}
Is that true [~gaoyunhaii]? I've always thought that this is highly operator 
dependant. Apart of the testing purposes as pointed out by [~dkapoor1], I can 
see timers from different operator can have different use cases:
# mark end of some windowed aggregation
# handle CEP style timeouts, like emit record X if record Y hasn't arrived 
within 30 seconds after record Z
# handle timeouts when dealing with external systems. Something like in async 
function or maybe in some sinks. Do something if an external system doesn't 
respond within 30 seconds. 
# clean internal flink state, like some form of TTL

1. should be fired immediately on EOF, or waited. Depending on the business 
logic. Indeed I could see this being correlated through out the job - most 
likely all windowed operations should behave in the same way.
2. should be either dropped on EOF, or fired immediately, depending on the 
business logic. If firing immediately is the correct thing to do, waiting would 
be also correct, but inefficient. 
3. most likely can be dropped, as this should have been dealt by some kind of 
clean up code. For example `AsyncWaitOperator` is waiting for all async 
operations to complete anyway. But theoretically I could see this depending on 
the business logic.
4. ideally should be dropped on EOF. Can be also fired or waited, but either of 
those two is inefficient. When TTL is huge (hours, days or months) waiting can 
be impractical.

I can maybe see that in SQL/Table API, we are dealing only with options 1. and 
maybe 4., so the global configuration might be acceptable, if the TTL is small. 
But is that the case always? Do we have TTL like use cases of timers in SQL 
[~gaoyunhaii]? [~dwysakowicz] [~twalthr]?

I could see that allowing users to set this globally, might be 
harmful/confusing in the long run. For example user has an issue with some 
windowed operator, he changes the global setting in order to fix it, but 
inadvertently brakes something else, without realising it. 

Now that I think of potential solutions, maybe we should allow this to be 
configured via a call like 
`org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator#setEndOfInputTimerBehaviour`?
 In SQL either we could do this via some session variable or indeed via a 
global Table API config if TTL is not an issue? If it is, maybe SQL planner can 
set the windowed joins/aggregations behaviour to the global behaviour (from 
session/config variable), but set all TTLs to be ignored anyway? But I don't 
like the idea of allowing global config for DataStream users, since setting 
this per operator is very easy and IMO less error prone there.

> How to handle processing time timers with bounded input
> ---
>
> Key: FLINK-18647
> URL: https://issues.apache.org/jira/browse/FLINK-18647
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Affects Versions: 1.11.0
>Reporter: Piotr Nowojski
>Priority: Not a Priority
>  Labels: auto-deprioritized-critical, auto-deprioritized-major, 
> stale-minor
>
> (most of this description comes from an offline discussion between me, 
> [~AHeise], [~roman_khachatryan], [~aljoscha] and [~sunhaibotb])
> In case of end of input (for example for bounded sources), all pending 
> (untriggered) processing time timers are ignored/dropped. In some cases this 
> is desirable, but for example for {{WindowOperator}} it means that last 
> trailing window will not be triggered, causing an apparent data loss.
> There are a couple of ideas what should be considered.
> 1. Provide a way for users to decide what to do with such timers: cancel, 
> wait, trigger immediately. For example by overloading the existing methods: 
> {{ProcessingTimeService#registerTimer}} and 
> {{ProcessingTimeService#scheduleAtFixedRate}} in the following way:
> {code:java}
> ScheduledFuture registerTimer(long timestamp, ProcessingTimeCallback 
> target, 

[jira] [Updated] (FLINK-23886) An exception is thrown out when recover job timers from checkpoint file

2022-09-12 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-23886:
---
Affects Version/s: 1.14.4
   1.13.5

> An exception is thrown out when recover job timers from checkpoint file
> ---
>
> Key: FLINK-23886
> URL: https://issues.apache.org/jira/browse/FLINK-23886
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.10.0, 1.11.3, 1.13.2, 1.13.5, 1.14.4
>Reporter: Jing Zhang
>Priority: Major
> Attachments: image-2021-08-25-16-38-04-023.png, 
> image-2021-08-25-16-38-12-308.png, image-2021-08-25-17-06-29-806.png, 
> image-2021-08-25-17-07-38-327.png, segment-drop-corrupted-timer-state.diff
>
>
> A user report the bug in the [mailist. 
> |http://mail-archives.apache.org/mod_mbox/flink-user/202108.mbox/%3ccakmsf43j14nkjmgjuy4dh5qn2vbjtw4tfh4pmmuyvcvfhgf...@mail.gmail.com%3E]I
>  paste the content here.
> Setup Specifics:
>  Version: 1.6.2
>  RocksDB Map State
>  Timers stored in rocksdb
>   
>  When we have this job running for long periods of time like > 30 days, if 
> for some reason the job restarts, we encounter "Error while deserializing the 
> element". Is this a known issue fixed in later versions? I see some changes 
> to code for FLINK-10175, but we don't use any queryable state 
>   
>  Below is the stack trace
>   
>  org.apache.flink.util.FlinkRuntimeException: Error while deserializing the 
> element.
> at 
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:389)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:146)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:56)
> at 
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:274)
> at 
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:261)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueue.isElementPriorityLessThen(HeapPriorityQueue.java:164)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueue.siftUp(HeapPriorityQueue.java:121)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:85)
> at 
> org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:73)
> at 
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.(KeyGroupPartitionedPriorityQueue.java:89)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBPriorityQueueSetFactory.create(RocksDBKeyedStateBackend.java:2792)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.create(RocksDBKeyedStateBackend.java:450)
> at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.createTimerPriorityQueue(InternalTimeServiceManager.java:121)
> at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.registerOrGetTimerService(InternalTimeServiceManager.java:106)
> at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.getInternalTimerService(InternalTimeServiceManager.java:87)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getInternalTimerService(AbstractStreamOperator.java:764)
> at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:61)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.EOFException
> at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290)
> at org.apache.flink.types.StringValue.readString(StringValue.java:769)
> at 
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
> at 
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
> at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:179)
> at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:46)
> at 
> org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:168)
> at 
> org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:45)
> at 
> 

[jira] [Comment Edited] (FLINK-28975) withIdleness marks all streams from FLIP-27 sources as idle

2022-08-31 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17598346#comment-17598346
 ] 

Piotr Nowojski edited comment on FLINK-28975 at 8/31/22 12:05 PM:
--

Thanks for the explanation
{quote}WatermarksWithIdleness (which is an impl of WatermarkGenerator) 
periodically checks if there's any invocations of onEvent, and mark the 
watermark output as idle if the onEvent is never called with in the idleness 
timeout.{quote}

Wouldn't it be more correct to not call {{output.markIdle();}} if previous 
{{onPeriodicEmit}} call has already called (i.e {{idlenessTimer.checkIfIdle()}} 
returned true in the previous call)?

{quote}A possible solution in my mind is like we add another layer on 
WatermarkOutput of source that only mark idle if both main and per-split output 
are idle. WDYT?{quote}
I think that sounds about right.




was (Author: pnowojski):
{quote}WatermarksWithIdleness (which is an impl of WatermarkGenerator) 
periodically checks if there's any invocations of onEvent, and mark the 
watermark output as idle if the onEvent is never called with in the idleness 
timeout.{quote}

Wouldn't it be more correct to not call {{output.markIdle();}} if previous 
{{onPeriodicEmit}} call has already called (i.e {{idlenessTimer.checkIfIdle()}} 
returned true in the previous call)?

{quote}A possible solution in my mind is like we add another layer on 
WatermarkOutput of source that only mark idle if both main and per-split output 
are idle. WDYT?{quote}
I think that sounds about right.



> withIdleness marks all streams from FLIP-27 sources as idle
> ---
>
> Key: FLINK-28975
> URL: https://issues.apache.org/jira/browse/FLINK-28975
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.15.1
>Reporter: David Anderson
>Assignee: Qingsheng Ren
>Priority: Blocker
> Fix For: 1.16.0, 1.15.3
>
>
> Using withIdleness with a FLIP-27 source leads to all of the streams from the 
> source being marked idle, which in turn leads to incorrect results, e.g., 
> from joins that rely on watermarks.
> Quoting from the user ML thread:
> In org.apache.flink.streaming.api.operators.SourceOperator, there are 
> separate instances of WatermarksWithIdleness created for each split output 
> and the main output. There is multiplexing of watermarks between split 
> outputs but no multiplexing between split output and main output.
>  
> For a source such as org.apache.flink.connector.kafka.source.KafkaSource, 
> {color:#353833}there is only output from splits and no output from main. 
> Hence the main output will (after an initial timeout) be marked as 
> idle.{color}
> {color:#353833} {color}
> {color:#353833}The implementation of {color}WatermarksWithIdleness is such 
> that once an output is idle, it will periodically re-mark the output as idle. 
> Since there is no multiplexing between split outputs and main output, the 
> idle marks coming from main output will repeatedly set the output to idle 
> even though there are events from the splits. Result is that the entire 
> source is repeatedly marked as idle.
> See this ML thread for more details: 
> [https://lists.apache.org/thread/bbokccohs16tzkdtybqtv1vx76gqkqj4]
> This probably affects older versions of Flink as well, but that needs to be 
> verified.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28975) withIdleness marks all streams from FLIP-27 sources as idle

2022-08-31 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17598346#comment-17598346
 ] 

Piotr Nowojski commented on FLINK-28975:


{quote}WatermarksWithIdleness (which is an impl of WatermarkGenerator) 
periodically checks if there's any invocations of onEvent, and mark the 
watermark output as idle if the onEvent is never called with in the idleness 
timeout.{quote}

Wouldn't it be more correct to not call {{output.markIdle();}} if previous 
{{onPeriodicEmit}} call has already called (i.e {{idlenessTimer.checkIfIdle()}} 
returned true in the previous call)?

{quote}A possible solution in my mind is like we add another layer on 
WatermarkOutput of source that only mark idle if both main and per-split output 
are idle. WDYT?{quote}
I think that sounds about right.



> withIdleness marks all streams from FLIP-27 sources as idle
> ---
>
> Key: FLINK-28975
> URL: https://issues.apache.org/jira/browse/FLINK-28975
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.15.1
>Reporter: David Anderson
>Assignee: Qingsheng Ren
>Priority: Blocker
> Fix For: 1.16.0, 1.15.3
>
>
> Using withIdleness with a FLIP-27 source leads to all of the streams from the 
> source being marked idle, which in turn leads to incorrect results, e.g., 
> from joins that rely on watermarks.
> Quoting from the user ML thread:
> In org.apache.flink.streaming.api.operators.SourceOperator, there are 
> separate instances of WatermarksWithIdleness created for each split output 
> and the main output. There is multiplexing of watermarks between split 
> outputs but no multiplexing between split output and main output.
>  
> For a source such as org.apache.flink.connector.kafka.source.KafkaSource, 
> {color:#353833}there is only output from splits and no output from main. 
> Hence the main output will (after an initial timeout) be marked as 
> idle.{color}
> {color:#353833} {color}
> {color:#353833}The implementation of {color}WatermarksWithIdleness is such 
> that once an output is idle, it will periodically re-mark the output as idle. 
> Since there is no multiplexing between split outputs and main output, the 
> idle marks coming from main output will repeatedly set the output to idle 
> even though there are events from the splits. Result is that the entire 
> source is repeatedly marked as idle.
> See this ML thread for more details: 
> [https://lists.apache.org/thread/bbokccohs16tzkdtybqtv1vx76gqkqj4]
> This probably affects older versions of Flink as well, but that needs to be 
> verified.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28975) withIdleness marks all streams from FLIP-27 sources as idle

2022-08-30 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17597833#comment-17597833
 ] 

Piotr Nowojski commented on FLINK-28975:


Why are we re-emitting idleness periodically? 

> withIdleness marks all streams from FLIP-27 sources as idle
> ---
>
> Key: FLINK-28975
> URL: https://issues.apache.org/jira/browse/FLINK-28975
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.15.1
>Reporter: David Anderson
>Assignee: Qingsheng Ren
>Priority: Blocker
> Fix For: 1.16.0, 1.15.3
>
>
> Using withIdleness with a FLIP-27 source leads to all of the streams from the 
> source being marked idle, which in turn leads to incorrect results, e.g., 
> from joins that rely on watermarks.
> Quoting from the user ML thread:
> In org.apache.flink.streaming.api.operators.SourceOperator, there are 
> separate instances of WatermarksWithIdleness created for each split output 
> and the main output. There is multiplexing of watermarks between split 
> outputs but no multiplexing between split output and main output.
>  
> For a source such as org.apache.flink.connector.kafka.source.KafkaSource, 
> {color:#353833}there is only output from splits and no output from main. 
> Hence the main output will (after an initial timeout) be marked as 
> idle.{color}
> {color:#353833} {color}
> {color:#353833}The implementation of {color}WatermarksWithIdleness is such 
> that once an output is idle, it will periodically re-mark the output as idle. 
> Since there is no multiplexing between split outputs and main output, the 
> idle marks coming from main output will repeatedly set the output to idle 
> even though there are events from the splits. Result is that the entire 
> source is repeatedly marked as idle.
> See this ML thread for more details: 
> [https://lists.apache.org/thread/bbokccohs16tzkdtybqtv1vx76gqkqj4]
> This probably affects older versions of Flink as well, but that needs to be 
> verified.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-26326) FLIP-203: Support native and incremental savepoints 1.1

2022-08-24 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26326?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-26326:
---
Fix Version/s: (was: 1.17.0)

> FLIP-203: Support native and incremental savepoints 1.1
> ---
>
> Key: FLINK-26326
> URL: https://issues.apache.org/jira/browse/FLINK-26326
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Checkpointing
>Reporter: Anton Kalashnikov
>Priority: Major
>
> This is the second part of the implementation of 
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-203%3A+Incremental+savepoints]
> The first one: FLINK-25276
>  
> Motivation. Currently with non incremental canonical format savepoints, with 
> very large state, both taking and recovery from savepoints can take very long 
> time. Providing options to take native format and incremental savepoint would 
> alleviate this problem.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-26683) Commit side effects if stop-with-savepoint failed while finishing

2022-08-24 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26683?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-26683:
---
Fix Version/s: (was: 1.17.0)

> Commit side effects if stop-with-savepoint failed while finishing
> -
>
> Key: FLINK-26683
> URL: https://issues.apache.org/jira/browse/FLINK-26683
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing, Runtime / Coordination
>Affects Versions: 1.14.4, 1.15.0
>Reporter: Liu
>Priority: Major
>
> When we stop with savepoint, the savepoint finishes. But some tasks failover 
> for some reason and restart to running. In the end, some tasks are finished 
> and some tasks are running. In this case, I think that we should terminate 
> all the tasks anyway instead of restarting since the savepoint is finished 
> and the job stops consuming data. What do you think?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-22805) Dynamic configuration of Flink checkpoint interval

2022-08-24 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22805?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-22805:
---
Fix Version/s: (was: 1.17.0)

> Dynamic configuration of Flink checkpoint interval
> --
>
> Key: FLINK-22805
> URL: https://issues.apache.org/jira/browse/FLINK-22805
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Checkpointing
>Affects Versions: 1.13.1
>Reporter: Fu Kai
>Priority: Major
>  Labels: auto-deprioritized-critical
>
> Flink currently does not support dynamic configuration of checkpoint interval 
> on the fly. It's useful for use cases like backfill/cold-start from a stream 
> containing whole history.
>  
> In the cold-start phase, resources are fully utilized and the back-pressure 
> is high for all upstream operators, causing the checkpoint timeout 
> constantly. The real production traffic is far less than that and the 
> provisioned resource is capable of handling it. 
>  
> With the dynamic checkpoint interval configuration, the cold-start process 
> can be speeded up with less frequent checkpoint interval or even turned off. 
> After the process is completed, the checkpoint interval can be updated to 
> normal.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-11499) Extend StreamingFileSink BulkFormats to support arbitrary roll policies

2022-08-24 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-11499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-11499:
---
Priority: Not a Priority  (was: Minor)

> Extend StreamingFileSink BulkFormats to support arbitrary roll policies
> ---
>
> Key: FLINK-11499
> URL: https://issues.apache.org/jira/browse/FLINK-11499
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Reporter: Seth Wiesman
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, usability
>
> Currently when using the StreamingFilleSink Bulk-encoding formats can only be 
> combined with the `OnCheckpointRollingPolicy`, which rolls the in-progress 
> part file on every checkpoint.
> However, many bulk formats such as parquet are most efficient when written as 
> large files; this is not possible when frequent checkpointing is enabled. 
> Currently the only work-around is to have long checkpoint intervals which is 
> not ideal.
>  
> The StreamingFileSink should be enhanced to support arbitrary roll policy's 
> so users may write large bulk files while retaining frequent checkpoints.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-11499) Extend StreamingFileSink BulkFormats to support arbitrary roll policies

2022-08-24 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-11499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-11499:
---
Fix Version/s: (was: 1.17.0)

> Extend StreamingFileSink BulkFormats to support arbitrary roll policies
> ---
>
> Key: FLINK-11499
> URL: https://issues.apache.org/jira/browse/FLINK-11499
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Reporter: Seth Wiesman
>Priority: Minor
>  Labels: auto-deprioritized-major, usability
>
> Currently when using the StreamingFilleSink Bulk-encoding formats can only be 
> combined with the `OnCheckpointRollingPolicy`, which rolls the in-progress 
> part file on every checkpoint.
> However, many bulk formats such as parquet are most efficient when written as 
> large files; this is not possible when frequent checkpointing is enabled. 
> Currently the only work-around is to have long checkpoint intervals which is 
> not ideal.
>  
> The StreamingFileSink should be enhanced to support arbitrary roll policy's 
> so users may write large bulk files while retaining frequent checkpoints.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-15378) StreamFileSystemSink supported mutil hdfs plugins.

2022-08-24 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-15378:
---
Fix Version/s: (was: 1.17.0)

> StreamFileSystemSink supported mutil hdfs plugins.
> --
>
> Key: FLINK-15378
> URL: https://issues.apache.org/jira/browse/FLINK-15378
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem, FileSystems
>Affects Versions: 1.9.2, 1.10.0
>Reporter: ouyangwulin
>Priority: Minor
>  Labels: auto-deprioritized-major, pull-request-available
> Attachments: jobmananger.log
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> [As report from 
> maillist|[https://lists.apache.org/thread.html/7a6b1e341bde0ef632a82f8d46c9c93da358244b6bac0d8d544d11cb%40%3Cuser.flink.apache.org%3E]]
> Request 1:  FileSystem plugins not effect the default yarn dependecies.
> Request 2:  StreamFileSystemSink supported mutil hdfs plugins under the same 
> schema
> As Problem describe :
>     when I put a ' filesystem plugin to FLINK_HOME/pulgins in flink', and the 
> clas{color:#172b4d}s '*com.filesystem.plugin.FileSystemFactoryEnhance*' 
> implements '*FileSystemFactory*', when jm start, It will call 
> FileSystem.initialize(configuration, 
> PluginUtils.createPluginManagerFromRootFolder(configuration)) to load 
> factories to map  FileSystem#**{color}FS_FACTORIES, and the key is only 
> schema. When tm/jm use local hadoop conf A ,   the user code use hadoop conf 
> Bin 'filesystem plugin',  Conf A and Conf B is used to different hadoop 
> cluster. and The Jm will start failed, beacuse of the blodserver in JM will 
> load Conf B to get filesystem. the full log add appendix.
>  
> AS reslove method:
>     use  schema and spec identify as key for ' FileSystem#**FS_FACTORIES '
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-15012) Checkpoint directory not cleaned up

2022-08-24 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15012?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-15012:
---
Fix Version/s: (was: 1.17.0)

> Checkpoint directory not cleaned up
> ---
>
> Key: FLINK-15012
> URL: https://issues.apache.org/jira/browse/FLINK-15012
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.9.1
>Reporter: Nico Kruber
>Priority: Minor
>  Labels: auto-deprioritized-major, auto-unassigned, 
> pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> I started a Flink cluster with 2 TMs using {{start-cluster.sh}} and the 
> following config (in addition to the default {{flink-conf.yaml}})
> {code:java}
> state.checkpoints.dir: file:///path/to/checkpoints/
> state.backend: rocksdb {code}
> After submitting a jobwith checkpoints enabled (every 5s), checkpoints show 
> up, e.g.
> {code:java}
> bb969f842bbc0ecc3b41b7fbe23b047b/
> ├── chk-2
> │   ├── 238969e1-6949-4b12-98e7-1411c186527c
> │   ├── 2702b226-9cfc-4327-979d-e5508ab2e3d5
> │   ├── 4c51cb24-6f71-4d20-9d4c-65ed6e826949
> │   ├── e706d574-c5b2-467a-8640-1885ca252e80
> │   └── _metadata
> ├── shared
> └── taskowned {code}
> If I shut down the cluster via {{stop-cluster.sh}}, these files will remain 
> on disk and not be cleaned up.
> In contrast, if I cancel the job, at least {{chk-2}} will be deleted, but 
> still leaving the (empty) directories.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-20103) Improve test coverage with chaos testing & side-by-side tests

2022-08-24 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-20103:
---
Fix Version/s: (was: 1.17.0)

> Improve test coverage with chaos testing & side-by-side tests
> -
>
> Key: FLINK-20103
> URL: https://issues.apache.org/jira/browse/FLINK-20103
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing, Runtime / Network, Runtime / 
> State Backends, Tests
>Reporter: Roman Khachatryan
>Priority: Minor
>  Labels: auto-deprioritized-major, pull-request-available
>
> This is a follow-up ticket after FLINK-20097.
> With the current setup (UnalignedITCase):
>  - race conditions are not detected reliably (1 per tens of runs)
>  - require changing the configuration (low checkpoint timeout)
>  - adding a new job graph often reveals a new bug
> An additional issue with the current setup is that it's difficult to git 
> bisect (for long ranges). 
> Changes that might hide the bugs:
>  - having Preconditions in ChannelStatePersister (slow down processing)
>  - some Preconditions may mask errors by causing job restart
>  - timings in tests (UnalignedITCase)
>  Some options to consider
>  # chaos monkey tests including induced latency and/or CPU bursts - on 
> different workloads/configs
>  # side-by-side tests with randomized inputs/configs
> Extending Jepsen coverage further (validating output) does not seem promising 
> in the context of Flink because it's output isn't linearisable.
>   
> Some tools for (1) that could be used:
> 1. https://github.com/chaosblade-io/chaosblade (docs need translation)
> 2. https://github.com/Netflix/chaosmonkey - requires spinnaker (CD)
> 3. jvm agent: https://github.com/mrwilson/byte-monkey
> 4. https://vmware.github.io/mangle/ - supports java method latency; ui 
> oriented?; not actively maintained?
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-18405) Add watermark support for unaligned checkpoints

2022-08-24 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18405?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-18405:
---
Fix Version/s: (was: 1.17.0)

> Add watermark support for unaligned checkpoints
> ---
>
> Key: FLINK-18405
> URL: https://issues.apache.org/jira/browse/FLINK-18405
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Affects Versions: 1.12.0
>Reporter: Arvid Heise
>Priority: Not a Priority
>  Labels: auto-deprioritized-major
>
> Currently, Flink generates the watermark as a first step of recovery instead 
> of 
> storing the latest watermark in the operators to ease rescaling. In unaligned 
> checkpoints, that means on recovery, Flink generates watermarks after it 
> restores in-flight data. If your pipeline uses an operator that applies the
> latest watermark on each record, it will produce incorrect results during 
> recovery if the watermark is not directly or indirectly part of the operator 
> state. Thus, SQL OVER operator should not be used with unaligned
> checkpoints, while window operators are safe to use. 
> A possible solution is to store the watermark in the operator state. If 
> rescaling may occur, watermarks should be stored per key-group in a 
> union-state. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-28853) FLIP-217 Support watermark alignment of source splits

2022-08-23 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28853?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski reassigned FLINK-28853:
--

Assignee: Sebastian Mattheis

> FLIP-217 Support watermark alignment of source splits
> -
>
> Key: FLINK-28853
> URL: https://issues.apache.org/jira/browse/FLINK-28853
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Common, Runtime / Checkpointing
>Affects Versions: 1.16.0
>Reporter: Sebastian Mattheis
>Assignee: Sebastian Mattheis
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> This improvement implements 
> [FLIP-217|https://cwiki.apache.org/confluence/display/FLINK/FLIP-217+Support+watermark+alignment+of+source+splits]
>  to support watermark alignment of source splits.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28504) Local-Global aggregation causes watermark alignment (table.exec.source.idle-timeout) of idle partition invalid

2022-08-23 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17583571#comment-17583571
 ] 

Piotr Nowojski commented on FLINK-28504:


{quote}
After giving this a second look I think the current code behaviour is indeed 
correct and I don't see any bug here.

You idle source subtasks have never sent any watermark to some of the 
downstream local aggregations. So those LocalWindowAggregate correctly don't 
report any watermark, resulting in Low Watermark in the webUI not being 
reported. As correctly, low watermark is NaN for the task as a whole.

It might be a bit confusing the current behaviour. Ideally maybe WebUI should 
present the lowest non NaN watermark, with a caveate/asteriks, that some 
subtasks are idle. But to me the current behaviour is also valid.
{quote}

> Local-Global aggregation causes watermark alignment 
> (table.exec.source.idle-timeout) of idle partition invalid
> --
>
> Key: FLINK-28504
> URL: https://issues.apache.org/jira/browse/FLINK-28504
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.14.5
> Environment: flink 1.14
> kafka 2.4
>Reporter: nyingping
>Assignee: nyingping
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2022-07-12-15-11-51-653.png, 
> image-2022-07-12-15-19-29-950.png, image-2022-07-12-15-20-06-919.png
>
>
> I have a window topN test task, the code is as follows
>  
> {code:java}
>  Configuration configuration = new Configuration();
>         configuration.setInteger(RestOptions.PORT, 8082);
>         StreamExecutionEnvironment streamExecutionEnvironment =
>                 
> StreamExecutionEnvironment.getExecutionEnvironment(configuration);
>  
>         StreamTableEnvironment st = 
> StreamTableEnvironment.create(streamExecutionEnvironment);
>  
>         
> st.getConfig().getConfiguration().setString("table.exec.source.idle-timeout", 
> "10s");
>         st.executeSql(
>                 "CREATE TABLE test (\n"
>                         + "  `key` STRING,\n"
>                         + "  `time` TIMESTAMP(3),\n"
>                         + "  `price` float,\n"
>                         + "  WATERMARK FOR `time` AS `time` - INTERVAL '10' 
> SECOND"
>                         + ") WITH (\n"
>                         + "  'connector' = 'kafka',\n"
>                         + "  'topic' = 'test',\n"
>                         + "  'properties.bootstrap.servers' = 
> 'testlocal:9092',\n"
>                         + "  'properties.group.id' = 'windowGroup',\n"
>                         + "  'scan.startup.mode' = 'latest-offset',\n"
>                         + "  'format' = 'json'\n"
>                         + ")"
>     String sqlWindowTopN =
>                 "select * from (" +
>                 "  select *, " +
>                 "   ROW_NUMBER() over (partition by window_start, window_end 
> order by total desc ) as rownum " +
>                 "     from (" +
>                 "       select key,window_start,window_end,count(key) as 
> `count`,sum(price) total from table (" +
>                 "           tumble(TABLE test, DESCRIPTOR(`time`), interval 
> '1' minute)" +
>                 "        ) group by window_start, window_end, key" +
>                 "   )" +
>                 ") where rownum <= 3";
>     st.executeSql(sqlWindowTopN).print(); {code}
>  
>  
> Run and do not get result on long time after.
> Watermark appears as follows on the UI
>  
> !image-2022-07-12-15-11-51-653.png|width=898,height=388!
> I didn't set the parallelism manually, so it defaults to 12. The data source 
> Kafka has only one partition, so there are free partitions. To align the 
> watermarks for the entire task, I use the `table.exec. source. Idle-timeout` 
> configuration.
>  
> As above show,I found that the system automatically split window-Topn SQL 
> into local-global aggregation tasks. In the Local phase, watermark didn't 
> work as well as I expected.
>  
> Manually setting the parallelism to 1 did what I expected.
> {code:java}
> streamExecutionEnvironment.setParallelism(1); {code}
> !image-2022-07-12-15-19-29-950.png|width=872,height=384!
>  
> I can also manually configure the system not to split into local-global 
> phases. At this point, the `table.exec.source-idle-timeout ` configuration 
> takes effect and the watermark is aligned.
> {code:java}
> st.getConfig().getConfiguration().setString("table.optimizer.agg-phase-strategy",
>  "ONE_PHASE"); {code}
> result:
> !image-2022-07-12-15-20-06-919.png|width=866,height=357!
>  
> To sum up, when the parallelism of Kafka partition is different from that of 
> Flink, and idle partitions are generated, I expect to use 

[jira] [Closed] (FLINK-28835) Savepoint and checkpoint capabilities and limitations table is incorrect

2022-08-05 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-28835.
--
Resolution: Fixed

fixed on release-1.15 as 39a737f31be
merged commit b1c40bd into apache:master

> Savepoint and checkpoint capabilities and limitations table is incorrect
> 
>
> Key: FLINK-28835
> URL: https://issues.apache.org/jira/browse/FLINK-28835
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.15.1, 1.16.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.16.0, 1.15.2
>
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpoints_vs_savepoints/
> is inconsistent with 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-203%3A+Incremental+savepoints#FLIP203:Incrementalsavepoints-Proposal.
>  "Non-arbitrary job upgrade" for unaligned checkpoints should be officially 
> supported. 
> It looks like a typo in the original PR FLINK-26134



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-28835) Savepoint and checkpoint capabilities and limitations table is incorrect

2022-08-05 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski reassigned FLINK-28835:
--

Assignee: Piotr Nowojski

> Savepoint and checkpoint capabilities and limitations table is incorrect
> 
>
> Key: FLINK-28835
> URL: https://issues.apache.org/jira/browse/FLINK-28835
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.15.1, 1.16.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
> Fix For: 1.16.0, 1.15.2
>
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpoints_vs_savepoints/
> is inconsistent with 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-203%3A+Incremental+savepoints#FLIP203:Incrementalsavepoints-Proposal.
>  "Non-arbitrary job upgrade" for unaligned checkpoints should be officially 
> supported. 
> It looks like a typo in the original PR FLINK-26134



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28835) Savepoint and checkpoint capabilities and limitations table is incorrect

2022-08-05 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-28835:
---
Priority: Critical  (was: Major)

> Savepoint and checkpoint capabilities and limitations table is incorrect
> 
>
> Key: FLINK-28835
> URL: https://issues.apache.org/jira/browse/FLINK-28835
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.15.1, 1.16.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Critical
> Fix For: 1.16.0, 1.15.2
>
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpoints_vs_savepoints/
> is inconsistent with 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-203%3A+Incremental+savepoints#FLIP203:Incrementalsavepoints-Proposal.
>  "Non-arbitrary job upgrade" for unaligned checkpoints should be officially 
> supported. 
> It looks like a typo in the original PR FLINK-26134



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28835) Savepoint and checkpoint capabilities and limitations table is incorrect

2022-08-05 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-28835:
--

 Summary: Savepoint and checkpoint capabilities and limitations 
table is incorrect
 Key: FLINK-28835
 URL: https://issues.apache.org/jira/browse/FLINK-28835
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.15.1, 1.16.0
Reporter: Piotr Nowojski
 Fix For: 1.16.0, 1.15.2


https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpoints_vs_savepoints/

is inconsistent with 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-203%3A+Incremental+savepoints#FLIP203:Incrementalsavepoints-Proposal.
 "Non-arbitrary job upgrade" for unaligned checkpoints should be officially 
supported. 

It looks like a typo in the original PR FLINK-26134



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-25256) Savepoints do not work with ExternallyInducedSources

2022-07-15 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25256?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-25256.
--
Fix Version/s: 1.15.0
   (was: 1.14.6)
   Resolution: Fixed

> Savepoints do not work with ExternallyInducedSources
> 
>
> Key: FLINK-25256
> URL: https://issues.apache.org/jira/browse/FLINK-25256
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.14.0, 1.13.3, 1.12.7
>Reporter: Dawid Wysakowicz
>Assignee: Arvid Heise
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.16.0, 1.15.0
>
>
> It is not possible to take a proper savepoint with 
> {{ExternallyInducedSource}} or {{ExternallyInducedSourceReader}} (both legacy 
> and FLIP-27 versions). The problem is that we're hardcoding 
> {{CheckpointOptions}} in the {{triggerHook}}.
> The outcome of current state is that operators would try to take checkpoints 
> in the checkpoint location whereas the {{CheckpointCoordinator}} will write 
> metadata for those states in the savepoint location.
> Moreover the situation gets even weirder (I have not checked it entirely), if 
> we have a mixture of {{ExternallyInducedSource(s)}} and regular sources. In 
> such a case the location and format at which the state of a particular task 
> is persisted depends on the order of barriers arrival. If a barrier from a 
> regular source arrives last the task takes a savepoint, on the other hand if 
> last barrier is from an externally induced source it will take a checkpoint.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28474) ChannelStateWriteResult may not fail after checkpoint abort

2022-07-13 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-28474:
---
Description: 
After Checkpoint abort, ChannelStateWriteResult should fail.

But if _channelStateWriter.start(id, checkpointOptions);_ is executed after 
Checkpoint abort, ChannelStateWriteResult will not fail.

 
h2. Cause Analysis:

When abort checkpoint, channelStateWriter.start(id, checkpointOptions); may not 
be executed yet. These checkpointIds will be stored in the abortedCheckpointIds 
of SubtaskCheckpointCoordinatorImpl, and when checkpointState is called, it 
will check if the checkpointId should be aborted.

_ChannelStateWriter.abort(checkpointId, exception, true) should also be 
executed here._

The unit test can reproduce this bug.

!image-2022-07-09-22-21-24-417.png|width=803,height=307!

 

Note: channelStateWriter.abort is only called in notifyCheckpointAborted, it 
doesn't account for channelStateWriter.start after notifyCheckpointAborted.

JIRA: FLINK-17869

commit: 
https://github.com/apache/flink/pull/12478/commits/22c99845ef4f863f1753d17b109fd2faecc8201e

 

The bug will affect the new feature FLINK-26803, because the channel state file 
can be closed only after the Checkpoints of all tasks of the shared file are 
complete or abort. So when the checkpoint of some tasks fails, if abort is not 
called, the file cannot be closed and all tasks sharing the file cannot execute 
inputChannelStateHandles.completeExceptionally(e); and 
resultSubpartitionStateHandles.completeExceptionally(e); , 
AsyncCheckpointRunnable will wait forever.

  was:
After Checkpoint abort, ChannelStateWriteResult should fail.

But if _channelStateWriter.start(id, checkpointOptions);_ is executed after 
Checkpoint abort, ChannelStateWriteResult will not fail.

 
h2. Cause Analysis:

When abort checkpoint, channelStateWriter.start(id, checkpointOptions); may not 
be executed yet. These checkpointIds will be stored in the abortedCheckpointIds 
of SubtaskCheckpointCoordinatorImpl, and when checkpointState is called, it 
will check if the checkpointId should be aborted.

_ChannelStateWriter.abort(checkpointId, exception, true) should also be 
executed here._

The unit test can reproduce this bug.

!image-2022-07-09-22-21-24-417.png|width=803,height=307!

 

Note: channelStateWriter.abort is only called in notifyCheckpointAborted, it 
doesn't account for channelStateWriter.start after notifyCheckpointAborted.

JIRA: FLINK-17869

commit: 
https://github.com/apache/flink/pull/12478/commits/22c99845ef4f863f1753d17b109fd2faecc8201e

 

 


> ChannelStateWriteResult may not fail after checkpoint abort
> ---
>
> Key: FLINK-28474
> URL: https://issues.apache.org/jira/browse/FLINK-28474
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.14.5, 1.15.1
>Reporter: fanrui
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0, 1.15.2, 1.14.6
>
> Attachments: image-2022-07-09-22-21-24-417.png
>
>
> After Checkpoint abort, ChannelStateWriteResult should fail.
> But if _channelStateWriter.start(id, checkpointOptions);_ is executed after 
> Checkpoint abort, ChannelStateWriteResult will not fail.
>  
> h2. Cause Analysis:
> When abort checkpoint, channelStateWriter.start(id, checkpointOptions); may 
> not be executed yet. These checkpointIds will be stored in the 
> abortedCheckpointIds of SubtaskCheckpointCoordinatorImpl, and when 
> checkpointState is called, it will check if the checkpointId should be 
> aborted.
> _ChannelStateWriter.abort(checkpointId, exception, true) should also be 
> executed here._
> The unit test can reproduce this bug.
> !image-2022-07-09-22-21-24-417.png|width=803,height=307!
>  
> Note: channelStateWriter.abort is only called in notifyCheckpointAborted, it 
> doesn't account for channelStateWriter.start after notifyCheckpointAborted.
> JIRA: FLINK-17869
> commit: 
> https://github.com/apache/flink/pull/12478/commits/22c99845ef4f863f1753d17b109fd2faecc8201e
>  
> The bug will affect the new feature FLINK-26803, because the channel state 
> file can be closed only after the Checkpoints of all tasks of the shared file 
> are complete or abort. So when the checkpoint of some tasks fails, if abort 
> is not called, the file cannot be closed and all tasks sharing the file 
> cannot execute inputChannelStateHandles.completeExceptionally(e); and 
> resultSubpartitionStateHandles.completeExceptionally(e); , 
> AsyncCheckpointRunnable will wait forever.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-27165) Benchmark test SerializationFrameworkMiniBenchmarks#serializerHeavyString became unstable

2022-07-13 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-27165:
---
Priority: Not a Priority  (was: Minor)

> Benchmark test SerializationFrameworkMiniBenchmarks#serializerHeavyString 
> became unstable
> -
>
> Key: FLINK-27165
> URL: https://issues.apache.org/jira/browse/FLINK-27165
> Project: Flink
>  Issue Type: Bug
>  Components: Benchmarks
>Affects Versions: 1.15.0, 1.16.0
>Reporter: Matthias Pohl
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, test-stability
>
> The benchmark test 
> {{SerializationFrameworkMiniBenchmarks#serializerHeavyString}} became 
> unstable mid of January 2022 which cannot be explained (see 
> [graph|http://codespeed.dak8s.net:8000/timeline/#/?exe=1=serializerHeavyString=on=on=off=2=1000]).
>  
> There is some suspicion that it was caused by FLINK-25246 because Java 11 was 
> added during that time.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-22643) Too many TCP connections among TaskManagers for large scale jobs

2022-07-11 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17565043#comment-17565043
 ] 

Piotr Nowojski edited comment on FLINK-22643 at 7/11/22 2:55 PM:
-

After closing a ticket I forgot to mention that this seemed to have improved 
low latency performance of the network stack by quite a bit 
!Screenshot 2022-07-11 at 16.49.42.png|width=1000!
!Screenshot 2022-07-11 at 16.55.02.png|width=1000! 
Previously the difference between 100ms and 1ms in the microbenchmark was ~18%, 
after merging this change it's around ~4%.


was (Author: pnowojski):
After closing a ticket I forgot to mention that this seemed to have improved 
low latency performance of the network stack by quite a bit 
!Screenshot 2022-07-11 at 16.49.42.png|width=1000!
Previously the difference between 100ms and 1ms in the microbenchmark was ~18%, 
after merging this change it's around ~4%.

> Too many TCP connections among TaskManagers for large scale jobs
> 
>
> Key: FLINK-22643
> URL: https://issues.apache.org/jira/browse/FLINK-22643
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Affects Versions: 1.14.0, 1.13.2
>Reporter: Zhilong Hong
>Assignee: fanrui
>Priority: Minor
>  Labels: auto-deprioritized-major, pull-request-available
> Fix For: 1.15.0
>
> Attachments: Screenshot 2022-07-11 at 16.49.42.png, Screenshot 
> 2022-07-11 at 16.55.02.png
>
>
> For the large scale jobs, there will be too many TCP connections among 
> TaskManagers. Let's take an example.
> For a streaming job with 20 JobVertices, each JobVertex has 500 parallelism. 
> We divide the vertices into 5 slot sharing groups. Each TaskManager has 5 
> slots. Thus there will be 400 taskmanagers in this job. Let's assume that job 
> runs on a cluster with 20 machines.
> If all the job edges are all-to-all edges, there will be 19 * 20 * 399 * 2 = 
> 303,240 TCP connections for each machine. If we run several jobs on this 
> cluster, the TCP connections may exceed the maximum limit of linux, which is 
> 1,048,576. This will stop the TaskManagers from creating new TCP connections 
> and cause task failovers.
> As we run our production jobs on a K8S cluster, the job always failover due 
> to exceptions related to network, such as {{Sending the partition request to 
> 'null' failed}}, and etc.
> We think that we can decrease the number of connections by letting tasks 
> reuse the same connection. We implemented a POC that makes all tasks on the 
> same TaskManager reuse one TCP connection. For the example job we mentioned 
> above, the number of connections will decrease from 303,240 to 15960. With 
> the POC, the frequency of meeting exceptions related to network in our 
> production jobs drops significantly.
> The POC is illustrated in: 
> https://github.com/wsry/flink/commit/bf1c09e80450f40d018a1d1d4fe3dfd2de777fdc
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-22643) Too many TCP connections among TaskManagers for large scale jobs

2022-07-11 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-22643:
---
Attachment: Screenshot 2022-07-11 at 16.55.02.png

> Too many TCP connections among TaskManagers for large scale jobs
> 
>
> Key: FLINK-22643
> URL: https://issues.apache.org/jira/browse/FLINK-22643
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Affects Versions: 1.14.0, 1.13.2
>Reporter: Zhilong Hong
>Assignee: fanrui
>Priority: Minor
>  Labels: auto-deprioritized-major, pull-request-available
> Fix For: 1.15.0
>
> Attachments: Screenshot 2022-07-11 at 16.49.42.png, Screenshot 
> 2022-07-11 at 16.55.02.png
>
>
> For the large scale jobs, there will be too many TCP connections among 
> TaskManagers. Let's take an example.
> For a streaming job with 20 JobVertices, each JobVertex has 500 parallelism. 
> We divide the vertices into 5 slot sharing groups. Each TaskManager has 5 
> slots. Thus there will be 400 taskmanagers in this job. Let's assume that job 
> runs on a cluster with 20 machines.
> If all the job edges are all-to-all edges, there will be 19 * 20 * 399 * 2 = 
> 303,240 TCP connections for each machine. If we run several jobs on this 
> cluster, the TCP connections may exceed the maximum limit of linux, which is 
> 1,048,576. This will stop the TaskManagers from creating new TCP connections 
> and cause task failovers.
> As we run our production jobs on a K8S cluster, the job always failover due 
> to exceptions related to network, such as {{Sending the partition request to 
> 'null' failed}}, and etc.
> We think that we can decrease the number of connections by letting tasks 
> reuse the same connection. We implemented a POC that makes all tasks on the 
> same TaskManager reuse one TCP connection. For the example job we mentioned 
> above, the number of connections will decrease from 303,240 to 15960. With 
> the POC, the frequency of meeting exceptions related to network in our 
> production jobs drops significantly.
> The POC is illustrated in: 
> https://github.com/wsry/flink/commit/bf1c09e80450f40d018a1d1d4fe3dfd2de777fdc
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-22643) Too many TCP connections among TaskManagers for large scale jobs

2022-07-11 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17565043#comment-17565043
 ] 

Piotr Nowojski edited comment on FLINK-22643 at 7/11/22 2:52 PM:
-

After closing a ticket I forgot to mention that this seemed to have improved 
low latency performance of the network stack by quite a bit 
!Screenshot 2022-07-11 at 16.49.42.png|width=1000!
Previously the difference between 100ms and 1ms in the microbenchmark was ~18%, 
after merging this change it's around ~4%.


was (Author: pnowojski):
After closing a ticket I forgot to mention that this seemed to have improved 
low latency performance of the network stack by quite a bit !Screenshot 
2022-07-11 at 16.49.42.png|width=650! Previously the difference between 100ms 
and 1ms in the microbenchmark was ~18%, after merging this change it's around 
~4%.

> Too many TCP connections among TaskManagers for large scale jobs
> 
>
> Key: FLINK-22643
> URL: https://issues.apache.org/jira/browse/FLINK-22643
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Affects Versions: 1.14.0, 1.13.2
>Reporter: Zhilong Hong
>Assignee: fanrui
>Priority: Minor
>  Labels: auto-deprioritized-major, pull-request-available
> Fix For: 1.15.0
>
> Attachments: Screenshot 2022-07-11 at 16.49.42.png
>
>
> For the large scale jobs, there will be too many TCP connections among 
> TaskManagers. Let's take an example.
> For a streaming job with 20 JobVertices, each JobVertex has 500 parallelism. 
> We divide the vertices into 5 slot sharing groups. Each TaskManager has 5 
> slots. Thus there will be 400 taskmanagers in this job. Let's assume that job 
> runs on a cluster with 20 machines.
> If all the job edges are all-to-all edges, there will be 19 * 20 * 399 * 2 = 
> 303,240 TCP connections for each machine. If we run several jobs on this 
> cluster, the TCP connections may exceed the maximum limit of linux, which is 
> 1,048,576. This will stop the TaskManagers from creating new TCP connections 
> and cause task failovers.
> As we run our production jobs on a K8S cluster, the job always failover due 
> to exceptions related to network, such as {{Sending the partition request to 
> 'null' failed}}, and etc.
> We think that we can decrease the number of connections by letting tasks 
> reuse the same connection. We implemented a POC that makes all tasks on the 
> same TaskManager reuse one TCP connection. For the example job we mentioned 
> above, the number of connections will decrease from 303,240 to 15960. With 
> the POC, the frequency of meeting exceptions related to network in our 
> production jobs drops significantly.
> The POC is illustrated in: 
> https://github.com/wsry/flink/commit/bf1c09e80450f40d018a1d1d4fe3dfd2de777fdc
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-22643) Too many TCP connections among TaskManagers for large scale jobs

2022-07-11 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17565043#comment-17565043
 ] 

Piotr Nowojski edited comment on FLINK-22643 at 7/11/22 2:52 PM:
-

After closing a ticket I forgot to mention that this seemed to have improved 
low latency performance of the network stack by quite a bit !Screenshot 
2022-07-11 at 16.49.42.png|width=650! Previously the difference between 100ms 
and 1ms in the microbenchmark was ~18%, after merging this change it's around 
~4%.


was (Author: pnowojski):
After closing a ticket I forgot to mention that this seemed to have improved 
low latency performance of the network stack by quite a bit !Screenshot 
2022-07-11 at 16.49.42.png! Previously the difference between 100ms and 1ms in 
the microbenchmark was ~18%, after merging this change it's around ~4%.

> Too many TCP connections among TaskManagers for large scale jobs
> 
>
> Key: FLINK-22643
> URL: https://issues.apache.org/jira/browse/FLINK-22643
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Affects Versions: 1.14.0, 1.13.2
>Reporter: Zhilong Hong
>Assignee: fanrui
>Priority: Minor
>  Labels: auto-deprioritized-major, pull-request-available
> Fix For: 1.15.0
>
> Attachments: Screenshot 2022-07-11 at 16.49.42.png
>
>
> For the large scale jobs, there will be too many TCP connections among 
> TaskManagers. Let's take an example.
> For a streaming job with 20 JobVertices, each JobVertex has 500 parallelism. 
> We divide the vertices into 5 slot sharing groups. Each TaskManager has 5 
> slots. Thus there will be 400 taskmanagers in this job. Let's assume that job 
> runs on a cluster with 20 machines.
> If all the job edges are all-to-all edges, there will be 19 * 20 * 399 * 2 = 
> 303,240 TCP connections for each machine. If we run several jobs on this 
> cluster, the TCP connections may exceed the maximum limit of linux, which is 
> 1,048,576. This will stop the TaskManagers from creating new TCP connections 
> and cause task failovers.
> As we run our production jobs on a K8S cluster, the job always failover due 
> to exceptions related to network, such as {{Sending the partition request to 
> 'null' failed}}, and etc.
> We think that we can decrease the number of connections by letting tasks 
> reuse the same connection. We implemented a POC that makes all tasks on the 
> same TaskManager reuse one TCP connection. For the example job we mentioned 
> above, the number of connections will decrease from 303,240 to 15960. With 
> the POC, the frequency of meeting exceptions related to network in our 
> production jobs drops significantly.
> The POC is illustrated in: 
> https://github.com/wsry/flink/commit/bf1c09e80450f40d018a1d1d4fe3dfd2de777fdc
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-22643) Too many TCP connections among TaskManagers for large scale jobs

2022-07-11 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17565043#comment-17565043
 ] 

Piotr Nowojski commented on FLINK-22643:


After closing a ticket I forgot to mention that this seemed to have improved 
low latency performance of the network stack by quite a bit !Screenshot 
2022-07-11 at 16.49.42.png! Previously the difference between 100ms and 1ms in 
the microbenchmark was ~18%, after merging this change it's around ~4%.

> Too many TCP connections among TaskManagers for large scale jobs
> 
>
> Key: FLINK-22643
> URL: https://issues.apache.org/jira/browse/FLINK-22643
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Affects Versions: 1.14.0, 1.13.2
>Reporter: Zhilong Hong
>Assignee: fanrui
>Priority: Minor
>  Labels: auto-deprioritized-major, pull-request-available
> Fix For: 1.15.0
>
> Attachments: Screenshot 2022-07-11 at 16.49.42.png
>
>
> For the large scale jobs, there will be too many TCP connections among 
> TaskManagers. Let's take an example.
> For a streaming job with 20 JobVertices, each JobVertex has 500 parallelism. 
> We divide the vertices into 5 slot sharing groups. Each TaskManager has 5 
> slots. Thus there will be 400 taskmanagers in this job. Let's assume that job 
> runs on a cluster with 20 machines.
> If all the job edges are all-to-all edges, there will be 19 * 20 * 399 * 2 = 
> 303,240 TCP connections for each machine. If we run several jobs on this 
> cluster, the TCP connections may exceed the maximum limit of linux, which is 
> 1,048,576. This will stop the TaskManagers from creating new TCP connections 
> and cause task failovers.
> As we run our production jobs on a K8S cluster, the job always failover due 
> to exceptions related to network, such as {{Sending the partition request to 
> 'null' failed}}, and etc.
> We think that we can decrease the number of connections by letting tasks 
> reuse the same connection. We implemented a POC that makes all tasks on the 
> same TaskManager reuse one TCP connection. For the example job we mentioned 
> above, the number of connections will decrease from 303,240 to 15960. With 
> the POC, the frequency of meeting exceptions related to network in our 
> production jobs drops significantly.
> The POC is illustrated in: 
> https://github.com/wsry/flink/commit/bf1c09e80450f40d018a1d1d4fe3dfd2de777fdc
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-22643) Too many TCP connections among TaskManagers for large scale jobs

2022-07-11 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-22643:
---
Attachment: Screenshot 2022-07-11 at 16.49.42.png

> Too many TCP connections among TaskManagers for large scale jobs
> 
>
> Key: FLINK-22643
> URL: https://issues.apache.org/jira/browse/FLINK-22643
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Affects Versions: 1.14.0, 1.13.2
>Reporter: Zhilong Hong
>Assignee: fanrui
>Priority: Minor
>  Labels: auto-deprioritized-major, pull-request-available
> Fix For: 1.15.0
>
> Attachments: Screenshot 2022-07-11 at 16.49.42.png
>
>
> For the large scale jobs, there will be too many TCP connections among 
> TaskManagers. Let's take an example.
> For a streaming job with 20 JobVertices, each JobVertex has 500 parallelism. 
> We divide the vertices into 5 slot sharing groups. Each TaskManager has 5 
> slots. Thus there will be 400 taskmanagers in this job. Let's assume that job 
> runs on a cluster with 20 machines.
> If all the job edges are all-to-all edges, there will be 19 * 20 * 399 * 2 = 
> 303,240 TCP connections for each machine. If we run several jobs on this 
> cluster, the TCP connections may exceed the maximum limit of linux, which is 
> 1,048,576. This will stop the TaskManagers from creating new TCP connections 
> and cause task failovers.
> As we run our production jobs on a K8S cluster, the job always failover due 
> to exceptions related to network, such as {{Sending the partition request to 
> 'null' failed}}, and etc.
> We think that we can decrease the number of connections by letting tasks 
> reuse the same connection. We implemented a POC that makes all tasks on the 
> same TaskManager reuse one TCP connection. For the example job we mentioned 
> above, the number of connections will decrease from 303,240 to 15960. With 
> the POC, the frequency of meeting exceptions related to network in our 
> production jobs drops significantly.
> The POC is illustrated in: 
> https://github.com/wsry/flink/commit/bf1c09e80450f40d018a1d1d4fe3dfd2de777fdc
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-28487) Introduce configurable RateLimitingStrategy for Async Sink

2022-07-11 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski reassigned FLINK-28487:
--

Assignee: Hong Liang Teoh

> Introduce configurable RateLimitingStrategy for Async Sink
> --
>
> Key: FLINK-28487
> URL: https://issues.apache.org/jira/browse/FLINK-28487
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common
>Reporter: Hong Liang Teoh
>Assignee: Hong Liang Teoh
>Priority: Major
> Fix For: 1.16.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Introduce a configurable RateLimitingStrategy to the AsyncSinkWriter.
> This change will allow sink implementers using AsyncSinkWriter to configure 
> their own RateLimitingStrategy instead of using the default 
> AIMDRateLimitingStrategy.
> See [FLIP-242: Introduce configurable RateLimitingStrategy for Async 
> Sink|https://cwiki.apache.org/confluence/display/FLINK/FLIP-242%3A+Introduce+configurable+RateLimitingStrategy+for+Async+Sink].
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28357) Watermark issue when recovering Finished sources

2022-07-09 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17564588#comment-17564588
 ] 

Piotr Nowojski commented on FLINK-28357:


Hi [~Sandys-Lumsdaine]
{quote}
Maybe my example above is actually the same scenario as before
{quote}
Yes, it looks like that. On the screen shot above for whatever a reason the 
{{Empty Stream Map}} is not chained with the {{Short Running Source}}. I'm not 
sure why, maybe you have slot sharing disabled, or maybe there is a keyed 
exchange before the {{Empty Stream Map}} (note that in my ITCase I've removed 
keyed exchanges). You can see that for example {{Join}} and {{Sink: Join Sink}} 
operators are chained together in one single task. 

> Watermark issue when recovering Finished sources
> 
>
> Key: FLINK-28357
> URL: https://issues.apache.org/jira/browse/FLINK-28357
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.0
> Environment: This can be reproduced in an IDE with the attached 
> sample program.
>Reporter: James
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0, 1.15.2, 1.14.6
>
> Attachments: WatermarkDemoMain.java, 
> image-2022-07-01-16-18-14-768.png, image-2022-07-08-17-06-01-256.png, 
> longExample.txt
>
>
> Copied mostly from email trail on the flink user mailing list:
> I done a lot of experimentation and I’m convinced there is a problem with 
> Flink handling Finished sources and recovery. 
> The program consists of:
>  * Two sources:
>  ** One “Long Running Source” – stays alive and emits a watermark of 
> DateTime.now() every 10 seconds.
>  *** Prints the console a message saying the watermark has been emitted.
>  *** *Throws an exception every 5 or 10 iterations to force a recovery.*
>  ** One “Short Lived Source” – emits a Long.MAX_VALUE watermark, prints a 
> message to the console and returns.
>  * The “Short Live Source” feeds into a map() and then it joins with the 
> “Long Running Source” with a KeyedCoProcessFunction. Moves to “FINISHED” 
> state by Flink.
> The problem here is that the “Join” receives no Long.MAX_VALUE watermark from 
> the map() in some situations after a recovery. The dashboard goes from 
> showing this:
> !https://attachment.outlook.live.net/owa/MSA%3Ajas_sl%40hotmail.com/service.svc/s/GetAttachmentThumbnail?id=AQMkADAwATEyMTk3LTZiMDQtODBkMi0wMAItMDAKAEYAAAOeUdiydD9QS6CQDK1Dg0olBwACkmHn2W1HRKQHhbPYmGe%2BAASF%2B488ApJh59ltR0SkB4Wz2JhnvgAFXJ9puQESABAAyemY6ar4b0GAFLHn3hpyCw%3D%3D=2=1=eyJhbGciOiJSUzI1NiIsImtpZCI6IkZBRDY1NDI2MkM2QUYyOTYxQUExRThDQUI3OEZGMUIyNzBFNzA3RTkiLCJ0eXAiOiJKV1QiLCJ4NXQiOiItdFpVSml4cThwWWFvZWpLdDRfeHNuRG5CLWsifQ.eyJvcmlnaW4iOiJodHRwczovL291dGxvb2subGl2ZS5jb20iLCJ1YyI6IjMwMDU4MTIzODAzMzRlMmZhNzE5ZGUxOTNjNjA4NjQ3IiwidmVyIjoiRXhjaGFuZ2UuQ2FsbGJhY2suVjEiLCJhcHBjdHhzZW5kZXIiOiJPd2FEb3dubG9hZEA4NGRmOWU3Zi1lOWY2LTQwYWYtYjQzNS1hYWFhYWFhYWFhYWEiLCJpc3NyaW5nIjoiV1ciLCJhcHBjdHgiOiJ7XCJtc2V4Y2hwcm90XCI6XCJvd2FcIixcInB1aWRcIjpcIjMxODQwOTE5NTk0NjE5NFwiLFwic2NvcGVcIjpcIk93YURvd25sb2FkXCIsXCJvaWRcIjpcIjAwMDEyMTk3LTZiMDQtODBkMi0wMDAwLTAwMDAwMDAwMDAwMFwiLFwicHJpbWFyeXNpZFwiOlwiUy0xLTI4MjctNzQxMzUtMTc5NTQ1NzIzNFwifSIsIm5iZiI6MTY1NjY4ODI3OCwiZXhwIjoxNjU2Njg4ODc4LCJpc3MiOiIwMDAwMDAwMi0wMDAwLTBmZjEtY2UwMC0wMDAwMDAwMDAwMDBAODRkZjllN2YtZTlmNi00MGFmLWI0MzUtYWFhYWFhYWFhYWFhIiwiYXVkIjoiMDAwMDAwMDItMDAwMC0wZmYxLWNlMDAtMDAwMDAwMDAwMDAwL2F0dGFjaG1lbnQub3V0bG9vay5saXZlLm5ldEA4NGRmOWU3Zi1lOWY2LTQwYWYtYjQzNS1hYWFhYWFhYWFhYWEiLCJoYXBwIjoib3dhIn0.KI4I55ycdP1duIwxyYZstLCtnNOwEkyTxfEwK_5a35-ZLMrKd8zHCB5Elw-9-A9UHIxFGSYOlwnHXRvDT0xa6FqFIlO8cnebBRLKv9DhxHwfZqdKWIeF2EcUqwH0ejeA3RvD3-dR95iHPTf52-tuKi27nclPUUEJgbfRWQY3wHMDAFLLaLvKM6AV5S1IhGjBmy3MF_1oulTXbqRZx0ar3L8YQiHEGnfKGjFO2zSxQcTZXAp_rch4HIrVv9GSEcQnD7nBhWPBuuzuvXOvJiUzg0u_e9CUuf1-OcQwhUV3cf7cvme8JadfliY6ywkOne1OZsclQeDFc8EnGZke3l2V_Q=Es9QgEoDXEyksG3kZxXeMGC1LvlzW9oYJ-lyWNl-xblWQjmqz5FH_a2-eHuR6Zr51XNjigQpQDs.=outlook.live.com=20220617005.11=true!
> To the below after a recovery (with the currentInput1/2Watermark metrics 
> showing input 2 having not received a watermark from the map, saying 
> –Long.MAX_VALUE):
> !image-2022-07-01-16-18-14-768.png!
> The program is currently set to checkpoint every 5 seconds. By experimenting 
> with 70 seconds, it seems that if only one checkpoint has been taken with the 
> “Short Lived Source” in a FINISHED state since the last recovery then 
> everything works fine and the restarted “Short Lived Source” emits its 
> watermark and I see the “ShortedLivedEmptySource emitting Long.MAX_VALUE 
> watermark” message on the console meaning the run() definitely executed. 
> However, I found that if 2 or more checkpoints are taken since the last 
> recovery with the source 

[jira] [Comment Edited] (FLINK-28357) Watermark issue when recovering Finished sources

2022-07-08 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17563760#comment-17563760
 ] 

Piotr Nowojski edited comment on FLINK-28357 at 7/8/22 7:18 AM:


I've merged the non-chained tasks fix. [~Sandys-Lumsdaine] if you think there 
is still another bug present, please feel free to re-open the ticket.

merged commit 574ffa4 into apache:master
merged commit fa4b327 into apache:release-1.14
merged commit 5c1d412 into apache:release-1.15


was (Author: pnowojski):
I've fixed the non-chained tasks fix. [~Sandys-Lumsdaine] if you think there is 
still another bug present, please feel free to re-open the ticket.

merged commit 574ffa4 into apache:master
merged commit fa4b327 into apache:release-1.14
merged commit 5c1d412 into apache:release-1.15

> Watermark issue when recovering Finished sources
> 
>
> Key: FLINK-28357
> URL: https://issues.apache.org/jira/browse/FLINK-28357
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.0
> Environment: This can be reproduced in an IDE with the attached 
> sample program.
>Reporter: James
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0, 1.15.2, 1.14.6
>
> Attachments: WatermarkDemoMain.java, 
> image-2022-07-01-16-18-14-768.png, longExample.txt
>
>
> Copied mostly from email trail on the flink user mailing list:
> I done a lot of experimentation and I’m convinced there is a problem with 
> Flink handling Finished sources and recovery. 
> The program consists of:
>  * Two sources:
>  ** One “Long Running Source” – stays alive and emits a watermark of 
> DateTime.now() every 10 seconds.
>  *** Prints the console a message saying the watermark has been emitted.
>  *** *Throws an exception every 5 or 10 iterations to force a recovery.*
>  ** One “Short Lived Source” – emits a Long.MAX_VALUE watermark, prints a 
> message to the console and returns.
>  * The “Short Live Source” feeds into a map() and then it joins with the 
> “Long Running Source” with a KeyedCoProcessFunction. Moves to “FINISHED” 
> state by Flink.
> The problem here is that the “Join” receives no Long.MAX_VALUE watermark from 
> the map() in some situations after a recovery. The dashboard goes from 
> showing this:
> !https://attachment.outlook.live.net/owa/MSA%3Ajas_sl%40hotmail.com/service.svc/s/GetAttachmentThumbnail?id=AQMkADAwATEyMTk3LTZiMDQtODBkMi0wMAItMDAKAEYAAAOeUdiydD9QS6CQDK1Dg0olBwACkmHn2W1HRKQHhbPYmGe%2BAASF%2B488ApJh59ltR0SkB4Wz2JhnvgAFXJ9puQESABAAyemY6ar4b0GAFLHn3hpyCw%3D%3D=2=1=eyJhbGciOiJSUzI1NiIsImtpZCI6IkZBRDY1NDI2MkM2QUYyOTYxQUExRThDQUI3OEZGMUIyNzBFNzA3RTkiLCJ0eXAiOiJKV1QiLCJ4NXQiOiItdFpVSml4cThwWWFvZWpLdDRfeHNuRG5CLWsifQ.eyJvcmlnaW4iOiJodHRwczovL291dGxvb2subGl2ZS5jb20iLCJ1YyI6IjMwMDU4MTIzODAzMzRlMmZhNzE5ZGUxOTNjNjA4NjQ3IiwidmVyIjoiRXhjaGFuZ2UuQ2FsbGJhY2suVjEiLCJhcHBjdHhzZW5kZXIiOiJPd2FEb3dubG9hZEA4NGRmOWU3Zi1lOWY2LTQwYWYtYjQzNS1hYWFhYWFhYWFhYWEiLCJpc3NyaW5nIjoiV1ciLCJhcHBjdHgiOiJ7XCJtc2V4Y2hwcm90XCI6XCJvd2FcIixcInB1aWRcIjpcIjMxODQwOTE5NTk0NjE5NFwiLFwic2NvcGVcIjpcIk93YURvd25sb2FkXCIsXCJvaWRcIjpcIjAwMDEyMTk3LTZiMDQtODBkMi0wMDAwLTAwMDAwMDAwMDAwMFwiLFwicHJpbWFyeXNpZFwiOlwiUy0xLTI4MjctNzQxMzUtMTc5NTQ1NzIzNFwifSIsIm5iZiI6MTY1NjY4ODI3OCwiZXhwIjoxNjU2Njg4ODc4LCJpc3MiOiIwMDAwMDAwMi0wMDAwLTBmZjEtY2UwMC0wMDAwMDAwMDAwMDBAODRkZjllN2YtZTlmNi00MGFmLWI0MzUtYWFhYWFhYWFhYWFhIiwiYXVkIjoiMDAwMDAwMDItMDAwMC0wZmYxLWNlMDAtMDAwMDAwMDAwMDAwL2F0dGFjaG1lbnQub3V0bG9vay5saXZlLm5ldEA4NGRmOWU3Zi1lOWY2LTQwYWYtYjQzNS1hYWFhYWFhYWFhYWEiLCJoYXBwIjoib3dhIn0.KI4I55ycdP1duIwxyYZstLCtnNOwEkyTxfEwK_5a35-ZLMrKd8zHCB5Elw-9-A9UHIxFGSYOlwnHXRvDT0xa6FqFIlO8cnebBRLKv9DhxHwfZqdKWIeF2EcUqwH0ejeA3RvD3-dR95iHPTf52-tuKi27nclPUUEJgbfRWQY3wHMDAFLLaLvKM6AV5S1IhGjBmy3MF_1oulTXbqRZx0ar3L8YQiHEGnfKGjFO2zSxQcTZXAp_rch4HIrVv9GSEcQnD7nBhWPBuuzuvXOvJiUzg0u_e9CUuf1-OcQwhUV3cf7cvme8JadfliY6ywkOne1OZsclQeDFc8EnGZke3l2V_Q=Es9QgEoDXEyksG3kZxXeMGC1LvlzW9oYJ-lyWNl-xblWQjmqz5FH_a2-eHuR6Zr51XNjigQpQDs.=outlook.live.com=20220617005.11=true!
> To the below after a recovery (with the currentInput1/2Watermark metrics 
> showing input 2 having not received a watermark from the map, saying 
> –Long.MAX_VALUE):
> !image-2022-07-01-16-18-14-768.png!
> The program is currently set to checkpoint every 5 seconds. By experimenting 
> with 70 seconds, it seems that if only one checkpoint has been taken with the 
> “Short Lived Source” in a FINISHED state since the last recovery then 
> everything works fine and the restarted “Short Lived Source” emits its 
> watermark and I see the “ShortedLivedEmptySource emitting Long.MAX_VALUE 
> watermark” message on the console meaning the run() definitely executed. 
> However, I found that if 2 or more 

[jira] [Comment Edited] (FLINK-28357) Watermark issue when recovering Finished sources

2022-07-07 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17563760#comment-17563760
 ] 

Piotr Nowojski edited comment on FLINK-28357 at 7/7/22 7:18 PM:


I've fixed the non-chained tasks fix. [~Sandys-Lumsdaine] if you think there is 
still another bug present, please feel free to re-open the ticket.

merged commit 574ffa4 into apache:master
merged commit fa4b327 into apache:release-1.14
merged commit 5c1d412 into apache:release-1.15


was (Author: pnowojski):
I've fixed the non-chained tasks fix. [~Sandys-Lumsdaine] if you think there is 
still another bug present, please feel free to re-open the ticket.

merged commit 574ffa4 into apache:master
merged commit fa4b327 into apache:release-1.14

> Watermark issue when recovering Finished sources
> 
>
> Key: FLINK-28357
> URL: https://issues.apache.org/jira/browse/FLINK-28357
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.0
> Environment: This can be reproduced in an IDE with the attached 
> sample program.
>Reporter: James
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0, 1.15.2, 1.14.6
>
> Attachments: WatermarkDemoMain.java, 
> image-2022-07-01-16-18-14-768.png, longExample.txt
>
>
> Copied mostly from email trail on the flink user mailing list:
> I done a lot of experimentation and I’m convinced there is a problem with 
> Flink handling Finished sources and recovery. 
> The program consists of:
>  * Two sources:
>  ** One “Long Running Source” – stays alive and emits a watermark of 
> DateTime.now() every 10 seconds.
>  *** Prints the console a message saying the watermark has been emitted.
>  *** *Throws an exception every 5 or 10 iterations to force a recovery.*
>  ** One “Short Lived Source” – emits a Long.MAX_VALUE watermark, prints a 
> message to the console and returns.
>  * The “Short Live Source” feeds into a map() and then it joins with the 
> “Long Running Source” with a KeyedCoProcessFunction. Moves to “FINISHED” 
> state by Flink.
> The problem here is that the “Join” receives no Long.MAX_VALUE watermark from 
> the map() in some situations after a recovery. The dashboard goes from 
> showing this:
> !https://attachment.outlook.live.net/owa/MSA%3Ajas_sl%40hotmail.com/service.svc/s/GetAttachmentThumbnail?id=AQMkADAwATEyMTk3LTZiMDQtODBkMi0wMAItMDAKAEYAAAOeUdiydD9QS6CQDK1Dg0olBwACkmHn2W1HRKQHhbPYmGe%2BAASF%2B488ApJh59ltR0SkB4Wz2JhnvgAFXJ9puQESABAAyemY6ar4b0GAFLHn3hpyCw%3D%3D=2=1=eyJhbGciOiJSUzI1NiIsImtpZCI6IkZBRDY1NDI2MkM2QUYyOTYxQUExRThDQUI3OEZGMUIyNzBFNzA3RTkiLCJ0eXAiOiJKV1QiLCJ4NXQiOiItdFpVSml4cThwWWFvZWpLdDRfeHNuRG5CLWsifQ.eyJvcmlnaW4iOiJodHRwczovL291dGxvb2subGl2ZS5jb20iLCJ1YyI6IjMwMDU4MTIzODAzMzRlMmZhNzE5ZGUxOTNjNjA4NjQ3IiwidmVyIjoiRXhjaGFuZ2UuQ2FsbGJhY2suVjEiLCJhcHBjdHhzZW5kZXIiOiJPd2FEb3dubG9hZEA4NGRmOWU3Zi1lOWY2LTQwYWYtYjQzNS1hYWFhYWFhYWFhYWEiLCJpc3NyaW5nIjoiV1ciLCJhcHBjdHgiOiJ7XCJtc2V4Y2hwcm90XCI6XCJvd2FcIixcInB1aWRcIjpcIjMxODQwOTE5NTk0NjE5NFwiLFwic2NvcGVcIjpcIk93YURvd25sb2FkXCIsXCJvaWRcIjpcIjAwMDEyMTk3LTZiMDQtODBkMi0wMDAwLTAwMDAwMDAwMDAwMFwiLFwicHJpbWFyeXNpZFwiOlwiUy0xLTI4MjctNzQxMzUtMTc5NTQ1NzIzNFwifSIsIm5iZiI6MTY1NjY4ODI3OCwiZXhwIjoxNjU2Njg4ODc4LCJpc3MiOiIwMDAwMDAwMi0wMDAwLTBmZjEtY2UwMC0wMDAwMDAwMDAwMDBAODRkZjllN2YtZTlmNi00MGFmLWI0MzUtYWFhYWFhYWFhYWFhIiwiYXVkIjoiMDAwMDAwMDItMDAwMC0wZmYxLWNlMDAtMDAwMDAwMDAwMDAwL2F0dGFjaG1lbnQub3V0bG9vay5saXZlLm5ldEA4NGRmOWU3Zi1lOWY2LTQwYWYtYjQzNS1hYWFhYWFhYWFhYWEiLCJoYXBwIjoib3dhIn0.KI4I55ycdP1duIwxyYZstLCtnNOwEkyTxfEwK_5a35-ZLMrKd8zHCB5Elw-9-A9UHIxFGSYOlwnHXRvDT0xa6FqFIlO8cnebBRLKv9DhxHwfZqdKWIeF2EcUqwH0ejeA3RvD3-dR95iHPTf52-tuKi27nclPUUEJgbfRWQY3wHMDAFLLaLvKM6AV5S1IhGjBmy3MF_1oulTXbqRZx0ar3L8YQiHEGnfKGjFO2zSxQcTZXAp_rch4HIrVv9GSEcQnD7nBhWPBuuzuvXOvJiUzg0u_e9CUuf1-OcQwhUV3cf7cvme8JadfliY6ywkOne1OZsclQeDFc8EnGZke3l2V_Q=Es9QgEoDXEyksG3kZxXeMGC1LvlzW9oYJ-lyWNl-xblWQjmqz5FH_a2-eHuR6Zr51XNjigQpQDs.=outlook.live.com=20220617005.11=true!
> To the below after a recovery (with the currentInput1/2Watermark metrics 
> showing input 2 having not received a watermark from the map, saying 
> –Long.MAX_VALUE):
> !image-2022-07-01-16-18-14-768.png!
> The program is currently set to checkpoint every 5 seconds. By experimenting 
> with 70 seconds, it seems that if only one checkpoint has been taken with the 
> “Short Lived Source” in a FINISHED state since the last recovery then 
> everything works fine and the restarted “Short Lived Source” emits its 
> watermark and I see the “ShortedLivedEmptySource emitting Long.MAX_VALUE 
> watermark” message on the console meaning the run() definitely executed. 
> However, I found that if 2 or more checkpoints are taken since the last 
> recovery with the 

[jira] [Updated] (FLINK-28357) Watermark issue when recovering Finished sources

2022-07-07 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-28357:
---
Fix Version/s: 1.16.0

> Watermark issue when recovering Finished sources
> 
>
> Key: FLINK-28357
> URL: https://issues.apache.org/jira/browse/FLINK-28357
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.0
> Environment: This can be reproduced in an IDE with the attached 
> sample program.
>Reporter: James
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0, 1.15.2, 1.14.6
>
> Attachments: WatermarkDemoMain.java, 
> image-2022-07-01-16-18-14-768.png, longExample.txt
>
>
> Copied mostly from email trail on the flink user mailing list:
> I done a lot of experimentation and I’m convinced there is a problem with 
> Flink handling Finished sources and recovery. 
> The program consists of:
>  * Two sources:
>  ** One “Long Running Source” – stays alive and emits a watermark of 
> DateTime.now() every 10 seconds.
>  *** Prints the console a message saying the watermark has been emitted.
>  *** *Throws an exception every 5 or 10 iterations to force a recovery.*
>  ** One “Short Lived Source” – emits a Long.MAX_VALUE watermark, prints a 
> message to the console and returns.
>  * The “Short Live Source” feeds into a map() and then it joins with the 
> “Long Running Source” with a KeyedCoProcessFunction. Moves to “FINISHED” 
> state by Flink.
> The problem here is that the “Join” receives no Long.MAX_VALUE watermark from 
> the map() in some situations after a recovery. The dashboard goes from 
> showing this:
> !https://attachment.outlook.live.net/owa/MSA%3Ajas_sl%40hotmail.com/service.svc/s/GetAttachmentThumbnail?id=AQMkADAwATEyMTk3LTZiMDQtODBkMi0wMAItMDAKAEYAAAOeUdiydD9QS6CQDK1Dg0olBwACkmHn2W1HRKQHhbPYmGe%2BAASF%2B488ApJh59ltR0SkB4Wz2JhnvgAFXJ9puQESABAAyemY6ar4b0GAFLHn3hpyCw%3D%3D=2=1=eyJhbGciOiJSUzI1NiIsImtpZCI6IkZBRDY1NDI2MkM2QUYyOTYxQUExRThDQUI3OEZGMUIyNzBFNzA3RTkiLCJ0eXAiOiJKV1QiLCJ4NXQiOiItdFpVSml4cThwWWFvZWpLdDRfeHNuRG5CLWsifQ.eyJvcmlnaW4iOiJodHRwczovL291dGxvb2subGl2ZS5jb20iLCJ1YyI6IjMwMDU4MTIzODAzMzRlMmZhNzE5ZGUxOTNjNjA4NjQ3IiwidmVyIjoiRXhjaGFuZ2UuQ2FsbGJhY2suVjEiLCJhcHBjdHhzZW5kZXIiOiJPd2FEb3dubG9hZEA4NGRmOWU3Zi1lOWY2LTQwYWYtYjQzNS1hYWFhYWFhYWFhYWEiLCJpc3NyaW5nIjoiV1ciLCJhcHBjdHgiOiJ7XCJtc2V4Y2hwcm90XCI6XCJvd2FcIixcInB1aWRcIjpcIjMxODQwOTE5NTk0NjE5NFwiLFwic2NvcGVcIjpcIk93YURvd25sb2FkXCIsXCJvaWRcIjpcIjAwMDEyMTk3LTZiMDQtODBkMi0wMDAwLTAwMDAwMDAwMDAwMFwiLFwicHJpbWFyeXNpZFwiOlwiUy0xLTI4MjctNzQxMzUtMTc5NTQ1NzIzNFwifSIsIm5iZiI6MTY1NjY4ODI3OCwiZXhwIjoxNjU2Njg4ODc4LCJpc3MiOiIwMDAwMDAwMi0wMDAwLTBmZjEtY2UwMC0wMDAwMDAwMDAwMDBAODRkZjllN2YtZTlmNi00MGFmLWI0MzUtYWFhYWFhYWFhYWFhIiwiYXVkIjoiMDAwMDAwMDItMDAwMC0wZmYxLWNlMDAtMDAwMDAwMDAwMDAwL2F0dGFjaG1lbnQub3V0bG9vay5saXZlLm5ldEA4NGRmOWU3Zi1lOWY2LTQwYWYtYjQzNS1hYWFhYWFhYWFhYWEiLCJoYXBwIjoib3dhIn0.KI4I55ycdP1duIwxyYZstLCtnNOwEkyTxfEwK_5a35-ZLMrKd8zHCB5Elw-9-A9UHIxFGSYOlwnHXRvDT0xa6FqFIlO8cnebBRLKv9DhxHwfZqdKWIeF2EcUqwH0ejeA3RvD3-dR95iHPTf52-tuKi27nclPUUEJgbfRWQY3wHMDAFLLaLvKM6AV5S1IhGjBmy3MF_1oulTXbqRZx0ar3L8YQiHEGnfKGjFO2zSxQcTZXAp_rch4HIrVv9GSEcQnD7nBhWPBuuzuvXOvJiUzg0u_e9CUuf1-OcQwhUV3cf7cvme8JadfliY6ywkOne1OZsclQeDFc8EnGZke3l2V_Q=Es9QgEoDXEyksG3kZxXeMGC1LvlzW9oYJ-lyWNl-xblWQjmqz5FH_a2-eHuR6Zr51XNjigQpQDs.=outlook.live.com=20220617005.11=true!
> To the below after a recovery (with the currentInput1/2Watermark metrics 
> showing input 2 having not received a watermark from the map, saying 
> –Long.MAX_VALUE):
> !image-2022-07-01-16-18-14-768.png!
> The program is currently set to checkpoint every 5 seconds. By experimenting 
> with 70 seconds, it seems that if only one checkpoint has been taken with the 
> “Short Lived Source” in a FINISHED state since the last recovery then 
> everything works fine and the restarted “Short Lived Source” emits its 
> watermark and I see the “ShortedLivedEmptySource emitting Long.MAX_VALUE 
> watermark” message on the console meaning the run() definitely executed. 
> However, I found that if 2 or more checkpoints are taken since the last 
> recovery with the source in a FINISHED state then the console message does 
> not appear and the watermark is not emitted.
> To repeat – the Join does not get a Long.MAX_VALUE watermark from my source 
> or Flink if I see two or more checkpoints logged in between recoveries. If 
> zero or checkpoints are made, everything is fine – the join gets the 
> watermark and I see my console message. You can play with the checkpointing 
> frequency as per the code comments:
>     // Useful checkpoint interval options:
>     //    5 - see the problem after the first recovery
>     //   70 - useful to see bad behaviour 

[jira] [Closed] (FLINK-28357) Watermark issue when recovering Finished sources

2022-07-07 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-28357.
--
Fix Version/s: 1.15.2
   1.14.6
   Resolution: Fixed

I've fixed the non-chained tasks fix. [~Sandys-Lumsdaine] if you think there is 
still another bug present, please feel free to re-open the ticket.

merged commit 574ffa4 into apache:master
merged commit fa4b327 into apache:release-1.14

> Watermark issue when recovering Finished sources
> 
>
> Key: FLINK-28357
> URL: https://issues.apache.org/jira/browse/FLINK-28357
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.0
> Environment: This can be reproduced in an IDE with the attached 
> sample program.
>Reporter: James
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.2, 1.14.6
>
> Attachments: WatermarkDemoMain.java, 
> image-2022-07-01-16-18-14-768.png, longExample.txt
>
>
> Copied mostly from email trail on the flink user mailing list:
> I done a lot of experimentation and I’m convinced there is a problem with 
> Flink handling Finished sources and recovery. 
> The program consists of:
>  * Two sources:
>  ** One “Long Running Source” – stays alive and emits a watermark of 
> DateTime.now() every 10 seconds.
>  *** Prints the console a message saying the watermark has been emitted.
>  *** *Throws an exception every 5 or 10 iterations to force a recovery.*
>  ** One “Short Lived Source” – emits a Long.MAX_VALUE watermark, prints a 
> message to the console and returns.
>  * The “Short Live Source” feeds into a map() and then it joins with the 
> “Long Running Source” with a KeyedCoProcessFunction. Moves to “FINISHED” 
> state by Flink.
> The problem here is that the “Join” receives no Long.MAX_VALUE watermark from 
> the map() in some situations after a recovery. The dashboard goes from 
> showing this:
> !https://attachment.outlook.live.net/owa/MSA%3Ajas_sl%40hotmail.com/service.svc/s/GetAttachmentThumbnail?id=AQMkADAwATEyMTk3LTZiMDQtODBkMi0wMAItMDAKAEYAAAOeUdiydD9QS6CQDK1Dg0olBwACkmHn2W1HRKQHhbPYmGe%2BAASF%2B488ApJh59ltR0SkB4Wz2JhnvgAFXJ9puQESABAAyemY6ar4b0GAFLHn3hpyCw%3D%3D=2=1=eyJhbGciOiJSUzI1NiIsImtpZCI6IkZBRDY1NDI2MkM2QUYyOTYxQUExRThDQUI3OEZGMUIyNzBFNzA3RTkiLCJ0eXAiOiJKV1QiLCJ4NXQiOiItdFpVSml4cThwWWFvZWpLdDRfeHNuRG5CLWsifQ.eyJvcmlnaW4iOiJodHRwczovL291dGxvb2subGl2ZS5jb20iLCJ1YyI6IjMwMDU4MTIzODAzMzRlMmZhNzE5ZGUxOTNjNjA4NjQ3IiwidmVyIjoiRXhjaGFuZ2UuQ2FsbGJhY2suVjEiLCJhcHBjdHhzZW5kZXIiOiJPd2FEb3dubG9hZEA4NGRmOWU3Zi1lOWY2LTQwYWYtYjQzNS1hYWFhYWFhYWFhYWEiLCJpc3NyaW5nIjoiV1ciLCJhcHBjdHgiOiJ7XCJtc2V4Y2hwcm90XCI6XCJvd2FcIixcInB1aWRcIjpcIjMxODQwOTE5NTk0NjE5NFwiLFwic2NvcGVcIjpcIk93YURvd25sb2FkXCIsXCJvaWRcIjpcIjAwMDEyMTk3LTZiMDQtODBkMi0wMDAwLTAwMDAwMDAwMDAwMFwiLFwicHJpbWFyeXNpZFwiOlwiUy0xLTI4MjctNzQxMzUtMTc5NTQ1NzIzNFwifSIsIm5iZiI6MTY1NjY4ODI3OCwiZXhwIjoxNjU2Njg4ODc4LCJpc3MiOiIwMDAwMDAwMi0wMDAwLTBmZjEtY2UwMC0wMDAwMDAwMDAwMDBAODRkZjllN2YtZTlmNi00MGFmLWI0MzUtYWFhYWFhYWFhYWFhIiwiYXVkIjoiMDAwMDAwMDItMDAwMC0wZmYxLWNlMDAtMDAwMDAwMDAwMDAwL2F0dGFjaG1lbnQub3V0bG9vay5saXZlLm5ldEA4NGRmOWU3Zi1lOWY2LTQwYWYtYjQzNS1hYWFhYWFhYWFhYWEiLCJoYXBwIjoib3dhIn0.KI4I55ycdP1duIwxyYZstLCtnNOwEkyTxfEwK_5a35-ZLMrKd8zHCB5Elw-9-A9UHIxFGSYOlwnHXRvDT0xa6FqFIlO8cnebBRLKv9DhxHwfZqdKWIeF2EcUqwH0ejeA3RvD3-dR95iHPTf52-tuKi27nclPUUEJgbfRWQY3wHMDAFLLaLvKM6AV5S1IhGjBmy3MF_1oulTXbqRZx0ar3L8YQiHEGnfKGjFO2zSxQcTZXAp_rch4HIrVv9GSEcQnD7nBhWPBuuzuvXOvJiUzg0u_e9CUuf1-OcQwhUV3cf7cvme8JadfliY6ywkOne1OZsclQeDFc8EnGZke3l2V_Q=Es9QgEoDXEyksG3kZxXeMGC1LvlzW9oYJ-lyWNl-xblWQjmqz5FH_a2-eHuR6Zr51XNjigQpQDs.=outlook.live.com=20220617005.11=true!
> To the below after a recovery (with the currentInput1/2Watermark metrics 
> showing input 2 having not received a watermark from the map, saying 
> –Long.MAX_VALUE):
> !image-2022-07-01-16-18-14-768.png!
> The program is currently set to checkpoint every 5 seconds. By experimenting 
> with 70 seconds, it seems that if only one checkpoint has been taken with the 
> “Short Lived Source” in a FINISHED state since the last recovery then 
> everything works fine and the restarted “Short Lived Source” emits its 
> watermark and I see the “ShortedLivedEmptySource emitting Long.MAX_VALUE 
> watermark” message on the console meaning the run() definitely executed. 
> However, I found that if 2 or more checkpoints are taken since the last 
> recovery with the source in a FINISHED state then the console message does 
> not appear and the watermark is not emitted.
> To repeat – the Join does not get a Long.MAX_VALUE watermark from my source 
> or Flink if I see two or more checkpoints logged in between recoveries. If 
> zero or checkpoints are made, everything is fine – the join gets 

[jira] [Commented] (FLINK-28357) Watermark issue when recovering Finished sources

2022-07-05 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17562485#comment-17562485
 ] 

Piotr Nowojski commented on FLINK-28357:


{quote}
Also - you say "non-chained map" but I believe this problem also occurred with 
operator chaining. I disabled operator chaining in my test program to make it 
clearer what was going on.
{quote}
Are you sure? I can not reproduce this problem with or without my fix when 
chaining is enabled. There might be another issue lurking somewhere around, but 
I can not reproduce it at the moment. If you could confirm that there was no 
problem with enabled chaining, or if you could provide some reproduction steps 
(ideally modify [the ITCase that I'm adding in my 
PR|https://github.com/apache/flink/pull/20158/commits/2db367614bb16c6af714cb8c0cfefbb4ace272f3#diff-24e86c59d8547cbd9a95f798d3e0c300daa99dae9fba5be017ed4b2143bd5787].
 This ITCase would livelock/never finish if the watermarks stagnate after 
recovery), that would be great. Just note what I wrote before, that verifying 
this issue based on messages printed from the sources won't work. 

> Watermark issue when recovering Finished sources
> 
>
> Key: FLINK-28357
> URL: https://issues.apache.org/jira/browse/FLINK-28357
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.0
> Environment: This can be reproduced in an IDE with the attached 
> sample program.
>Reporter: James
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
> Attachments: WatermarkDemoMain.java, 
> image-2022-07-01-16-18-14-768.png, longExample.txt
>
>
> Copied mostly from email trail on the flink user mailing list:
> I done a lot of experimentation and I’m convinced there is a problem with 
> Flink handling Finished sources and recovery. 
> The program consists of:
>  * Two sources:
>  ** One “Long Running Source” – stays alive and emits a watermark of 
> DateTime.now() every 10 seconds.
>  *** Prints the console a message saying the watermark has been emitted.
>  *** *Throws an exception every 5 or 10 iterations to force a recovery.*
>  ** One “Short Lived Source” – emits a Long.MAX_VALUE watermark, prints a 
> message to the console and returns.
>  * The “Short Live Source” feeds into a map() and then it joins with the 
> “Long Running Source” with a KeyedCoProcessFunction. Moves to “FINISHED” 
> state by Flink.
> The problem here is that the “Join” receives no Long.MAX_VALUE watermark from 
> the map() in some situations after a recovery. The dashboard goes from 
> showing this:
> !https://attachment.outlook.live.net/owa/MSA%3Ajas_sl%40hotmail.com/service.svc/s/GetAttachmentThumbnail?id=AQMkADAwATEyMTk3LTZiMDQtODBkMi0wMAItMDAKAEYAAAOeUdiydD9QS6CQDK1Dg0olBwACkmHn2W1HRKQHhbPYmGe%2BAASF%2B488ApJh59ltR0SkB4Wz2JhnvgAFXJ9puQESABAAyemY6ar4b0GAFLHn3hpyCw%3D%3D=2=1=eyJhbGciOiJSUzI1NiIsImtpZCI6IkZBRDY1NDI2MkM2QUYyOTYxQUExRThDQUI3OEZGMUIyNzBFNzA3RTkiLCJ0eXAiOiJKV1QiLCJ4NXQiOiItdFpVSml4cThwWWFvZWpLdDRfeHNuRG5CLWsifQ.eyJvcmlnaW4iOiJodHRwczovL291dGxvb2subGl2ZS5jb20iLCJ1YyI6IjMwMDU4MTIzODAzMzRlMmZhNzE5ZGUxOTNjNjA4NjQ3IiwidmVyIjoiRXhjaGFuZ2UuQ2FsbGJhY2suVjEiLCJhcHBjdHhzZW5kZXIiOiJPd2FEb3dubG9hZEA4NGRmOWU3Zi1lOWY2LTQwYWYtYjQzNS1hYWFhYWFhYWFhYWEiLCJpc3NyaW5nIjoiV1ciLCJhcHBjdHgiOiJ7XCJtc2V4Y2hwcm90XCI6XCJvd2FcIixcInB1aWRcIjpcIjMxODQwOTE5NTk0NjE5NFwiLFwic2NvcGVcIjpcIk93YURvd25sb2FkXCIsXCJvaWRcIjpcIjAwMDEyMTk3LTZiMDQtODBkMi0wMDAwLTAwMDAwMDAwMDAwMFwiLFwicHJpbWFyeXNpZFwiOlwiUy0xLTI4MjctNzQxMzUtMTc5NTQ1NzIzNFwifSIsIm5iZiI6MTY1NjY4ODI3OCwiZXhwIjoxNjU2Njg4ODc4LCJpc3MiOiIwMDAwMDAwMi0wMDAwLTBmZjEtY2UwMC0wMDAwMDAwMDAwMDBAODRkZjllN2YtZTlmNi00MGFmLWI0MzUtYWFhYWFhYWFhYWFhIiwiYXVkIjoiMDAwMDAwMDItMDAwMC0wZmYxLWNlMDAtMDAwMDAwMDAwMDAwL2F0dGFjaG1lbnQub3V0bG9vay5saXZlLm5ldEA4NGRmOWU3Zi1lOWY2LTQwYWYtYjQzNS1hYWFhYWFhYWFhYWEiLCJoYXBwIjoib3dhIn0.KI4I55ycdP1duIwxyYZstLCtnNOwEkyTxfEwK_5a35-ZLMrKd8zHCB5Elw-9-A9UHIxFGSYOlwnHXRvDT0xa6FqFIlO8cnebBRLKv9DhxHwfZqdKWIeF2EcUqwH0ejeA3RvD3-dR95iHPTf52-tuKi27nclPUUEJgbfRWQY3wHMDAFLLaLvKM6AV5S1IhGjBmy3MF_1oulTXbqRZx0ar3L8YQiHEGnfKGjFO2zSxQcTZXAp_rch4HIrVv9GSEcQnD7nBhWPBuuzuvXOvJiUzg0u_e9CUuf1-OcQwhUV3cf7cvme8JadfliY6ywkOne1OZsclQeDFc8EnGZke3l2V_Q=Es9QgEoDXEyksG3kZxXeMGC1LvlzW9oYJ-lyWNl-xblWQjmqz5FH_a2-eHuR6Zr51XNjigQpQDs.=outlook.live.com=20220617005.11=true!
> To the below after a recovery (with the currentInput1/2Watermark metrics 
> showing input 2 having not received a watermark from the map, saying 
> –Long.MAX_VALUE):
> !image-2022-07-01-16-18-14-768.png!
> The program is currently set to checkpoint every 5 seconds. By experimenting 
> with 70 seconds, it seems that if only one checkpoint has been taken with the 
> “Short Lived Source” in a FINISHED state since the last recovery then 
> 

[jira] [Comment Edited] (FLINK-28357) Watermark issue when recovering Finished sources

2022-07-04 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17562234#comment-17562234
 ] 

Piotr Nowojski edited comment on FLINK-28357 at 7/4/22 7:38 PM:


Thanks for the bug report. Indeed there was a bug where max watermark was being 
swallowed by this non chained map.

The problem was that {{FinishedOnRestoreInput#FinishedOnRestoreInput}} was 
being constructed with wrong number of inputs, because of some accidental 
{{null}} passed from the {{StreamGraphGenerator}}.

Just note that even with the bug fix (please see my PR), you will still not see 
the printed message from the short lived source:
{noformat}
System.out.println(String.format("%s: ShortLivedEmptySource emitting 
Long.MAX_VALUE watermark.", DateTime.now()));
{noformat}
as after recovery this source is never started, so the code never reaches the 
run method. And that's fine, MAX_WATERMARK is emitted by the framework.




was (Author: pnowojski):
Thanks for the bug report. Indeed there was a bug where max watermark was being 
swallowed by this non chained map.

Just note that even with the bug fix (please see my PR), you will still not see 
the printed message from the short lived source:
{noformat}
System.out.println(String.format("%s: ShortLivedEmptySource emitting 
Long.MAX_VALUE watermark.", DateTime.now()));
{noformat}
as after recovery this source is never started, so the code never reaches the 
run method. And that's fine, MAX_WATERMARK is emitted by the framework.


> Watermark issue when recovering Finished sources
> 
>
> Key: FLINK-28357
> URL: https://issues.apache.org/jira/browse/FLINK-28357
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.0
> Environment: This can be reproduced in an IDE with the attached 
> sample program.
>Reporter: James
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
> Attachments: WatermarkDemoMain.java, 
> image-2022-07-01-16-18-14-768.png, longExample.txt
>
>
> Copied mostly from email trail on the flink user mailing list:
> I done a lot of experimentation and I’m convinced there is a problem with 
> Flink handling Finished sources and recovery. 
> The program consists of:
>  * Two sources:
>  ** One “Long Running Source” – stays alive and emits a watermark of 
> DateTime.now() every 10 seconds.
>  *** Prints the console a message saying the watermark has been emitted.
>  *** *Throws an exception every 5 or 10 iterations to force a recovery.*
>  ** One “Short Lived Source” – emits a Long.MAX_VALUE watermark, prints a 
> message to the console and returns.
>  * The “Short Live Source” feeds into a map() and then it joins with the 
> “Long Running Source” with a KeyedCoProcessFunction. Moves to “FINISHED” 
> state by Flink.
> The problem here is that the “Join” receives no Long.MAX_VALUE watermark from 
> the map() in some situations after a recovery. The dashboard goes from 
> showing this:
> !https://attachment.outlook.live.net/owa/MSA%3Ajas_sl%40hotmail.com/service.svc/s/GetAttachmentThumbnail?id=AQMkADAwATEyMTk3LTZiMDQtODBkMi0wMAItMDAKAEYAAAOeUdiydD9QS6CQDK1Dg0olBwACkmHn2W1HRKQHhbPYmGe%2BAASF%2B488ApJh59ltR0SkB4Wz2JhnvgAFXJ9puQESABAAyemY6ar4b0GAFLHn3hpyCw%3D%3D=2=1=eyJhbGciOiJSUzI1NiIsImtpZCI6IkZBRDY1NDI2MkM2QUYyOTYxQUExRThDQUI3OEZGMUIyNzBFNzA3RTkiLCJ0eXAiOiJKV1QiLCJ4NXQiOiItdFpVSml4cThwWWFvZWpLdDRfeHNuRG5CLWsifQ.eyJvcmlnaW4iOiJodHRwczovL291dGxvb2subGl2ZS5jb20iLCJ1YyI6IjMwMDU4MTIzODAzMzRlMmZhNzE5ZGUxOTNjNjA4NjQ3IiwidmVyIjoiRXhjaGFuZ2UuQ2FsbGJhY2suVjEiLCJhcHBjdHhzZW5kZXIiOiJPd2FEb3dubG9hZEA4NGRmOWU3Zi1lOWY2LTQwYWYtYjQzNS1hYWFhYWFhYWFhYWEiLCJpc3NyaW5nIjoiV1ciLCJhcHBjdHgiOiJ7XCJtc2V4Y2hwcm90XCI6XCJvd2FcIixcInB1aWRcIjpcIjMxODQwOTE5NTk0NjE5NFwiLFwic2NvcGVcIjpcIk93YURvd25sb2FkXCIsXCJvaWRcIjpcIjAwMDEyMTk3LTZiMDQtODBkMi0wMDAwLTAwMDAwMDAwMDAwMFwiLFwicHJpbWFyeXNpZFwiOlwiUy0xLTI4MjctNzQxMzUtMTc5NTQ1NzIzNFwifSIsIm5iZiI6MTY1NjY4ODI3OCwiZXhwIjoxNjU2Njg4ODc4LCJpc3MiOiIwMDAwMDAwMi0wMDAwLTBmZjEtY2UwMC0wMDAwMDAwMDAwMDBAODRkZjllN2YtZTlmNi00MGFmLWI0MzUtYWFhYWFhYWFhYWFhIiwiYXVkIjoiMDAwMDAwMDItMDAwMC0wZmYxLWNlMDAtMDAwMDAwMDAwMDAwL2F0dGFjaG1lbnQub3V0bG9vay5saXZlLm5ldEA4NGRmOWU3Zi1lOWY2LTQwYWYtYjQzNS1hYWFhYWFhYWFhYWEiLCJoYXBwIjoib3dhIn0.KI4I55ycdP1duIwxyYZstLCtnNOwEkyTxfEwK_5a35-ZLMrKd8zHCB5Elw-9-A9UHIxFGSYOlwnHXRvDT0xa6FqFIlO8cnebBRLKv9DhxHwfZqdKWIeF2EcUqwH0ejeA3RvD3-dR95iHPTf52-tuKi27nclPUUEJgbfRWQY3wHMDAFLLaLvKM6AV5S1IhGjBmy3MF_1oulTXbqRZx0ar3L8YQiHEGnfKGjFO2zSxQcTZXAp_rch4HIrVv9GSEcQnD7nBhWPBuuzuvXOvJiUzg0u_e9CUuf1-OcQwhUV3cf7cvme8JadfliY6ywkOne1OZsclQeDFc8EnGZke3l2V_Q=Es9QgEoDXEyksG3kZxXeMGC1LvlzW9oYJ-lyWNl-xblWQjmqz5FH_a2-eHuR6Zr51XNjigQpQDs.=outlook.live.com=20220617005.11=true!
> To the below after a recovery (with the 

[jira] [Commented] (FLINK-28357) Watermark issue when recovering Finished sources

2022-07-04 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17562234#comment-17562234
 ] 

Piotr Nowojski commented on FLINK-28357:


Thanks for the bug report. Indeed there was a bug where max watermark was being 
swallowed by this non chained map.

Just note that even with the bug fix (please see my PR), you will still not see 
the printed message from the short lived source:
{noformat}
System.out.println(String.format("%s: ShortLivedEmptySource emitting 
Long.MAX_VALUE watermark.", DateTime.now()));
{noformat}
as after recovery this source is never started, so the code never reaches the 
run method. And that's fine, MAX_WATERMARK is emitted by the framework.


> Watermark issue when recovering Finished sources
> 
>
> Key: FLINK-28357
> URL: https://issues.apache.org/jira/browse/FLINK-28357
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.0
> Environment: This can be reproduced in an IDE with the attached 
> sample program.
>Reporter: James
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
> Attachments: WatermarkDemoMain.java, 
> image-2022-07-01-16-18-14-768.png, longExample.txt
>
>
> Copied mostly from email trail on the flink user mailing list:
> I done a lot of experimentation and I’m convinced there is a problem with 
> Flink handling Finished sources and recovery. 
> The program consists of:
>  * Two sources:
>  ** One “Long Running Source” – stays alive and emits a watermark of 
> DateTime.now() every 10 seconds.
>  *** Prints the console a message saying the watermark has been emitted.
>  *** *Throws an exception every 5 or 10 iterations to force a recovery.*
>  ** One “Short Lived Source” – emits a Long.MAX_VALUE watermark, prints a 
> message to the console and returns.
>  * The “Short Live Source” feeds into a map() and then it joins with the 
> “Long Running Source” with a KeyedCoProcessFunction. Moves to “FINISHED” 
> state by Flink.
> The problem here is that the “Join” receives no Long.MAX_VALUE watermark from 
> the map() in some situations after a recovery. The dashboard goes from 
> showing this:
> !https://attachment.outlook.live.net/owa/MSA%3Ajas_sl%40hotmail.com/service.svc/s/GetAttachmentThumbnail?id=AQMkADAwATEyMTk3LTZiMDQtODBkMi0wMAItMDAKAEYAAAOeUdiydD9QS6CQDK1Dg0olBwACkmHn2W1HRKQHhbPYmGe%2BAASF%2B488ApJh59ltR0SkB4Wz2JhnvgAFXJ9puQESABAAyemY6ar4b0GAFLHn3hpyCw%3D%3D=2=1=eyJhbGciOiJSUzI1NiIsImtpZCI6IkZBRDY1NDI2MkM2QUYyOTYxQUExRThDQUI3OEZGMUIyNzBFNzA3RTkiLCJ0eXAiOiJKV1QiLCJ4NXQiOiItdFpVSml4cThwWWFvZWpLdDRfeHNuRG5CLWsifQ.eyJvcmlnaW4iOiJodHRwczovL291dGxvb2subGl2ZS5jb20iLCJ1YyI6IjMwMDU4MTIzODAzMzRlMmZhNzE5ZGUxOTNjNjA4NjQ3IiwidmVyIjoiRXhjaGFuZ2UuQ2FsbGJhY2suVjEiLCJhcHBjdHhzZW5kZXIiOiJPd2FEb3dubG9hZEA4NGRmOWU3Zi1lOWY2LTQwYWYtYjQzNS1hYWFhYWFhYWFhYWEiLCJpc3NyaW5nIjoiV1ciLCJhcHBjdHgiOiJ7XCJtc2V4Y2hwcm90XCI6XCJvd2FcIixcInB1aWRcIjpcIjMxODQwOTE5NTk0NjE5NFwiLFwic2NvcGVcIjpcIk93YURvd25sb2FkXCIsXCJvaWRcIjpcIjAwMDEyMTk3LTZiMDQtODBkMi0wMDAwLTAwMDAwMDAwMDAwMFwiLFwicHJpbWFyeXNpZFwiOlwiUy0xLTI4MjctNzQxMzUtMTc5NTQ1NzIzNFwifSIsIm5iZiI6MTY1NjY4ODI3OCwiZXhwIjoxNjU2Njg4ODc4LCJpc3MiOiIwMDAwMDAwMi0wMDAwLTBmZjEtY2UwMC0wMDAwMDAwMDAwMDBAODRkZjllN2YtZTlmNi00MGFmLWI0MzUtYWFhYWFhYWFhYWFhIiwiYXVkIjoiMDAwMDAwMDItMDAwMC0wZmYxLWNlMDAtMDAwMDAwMDAwMDAwL2F0dGFjaG1lbnQub3V0bG9vay5saXZlLm5ldEA4NGRmOWU3Zi1lOWY2LTQwYWYtYjQzNS1hYWFhYWFhYWFhYWEiLCJoYXBwIjoib3dhIn0.KI4I55ycdP1duIwxyYZstLCtnNOwEkyTxfEwK_5a35-ZLMrKd8zHCB5Elw-9-A9UHIxFGSYOlwnHXRvDT0xa6FqFIlO8cnebBRLKv9DhxHwfZqdKWIeF2EcUqwH0ejeA3RvD3-dR95iHPTf52-tuKi27nclPUUEJgbfRWQY3wHMDAFLLaLvKM6AV5S1IhGjBmy3MF_1oulTXbqRZx0ar3L8YQiHEGnfKGjFO2zSxQcTZXAp_rch4HIrVv9GSEcQnD7nBhWPBuuzuvXOvJiUzg0u_e9CUuf1-OcQwhUV3cf7cvme8JadfliY6ywkOne1OZsclQeDFc8EnGZke3l2V_Q=Es9QgEoDXEyksG3kZxXeMGC1LvlzW9oYJ-lyWNl-xblWQjmqz5FH_a2-eHuR6Zr51XNjigQpQDs.=outlook.live.com=20220617005.11=true!
> To the below after a recovery (with the currentInput1/2Watermark metrics 
> showing input 2 having not received a watermark from the map, saying 
> –Long.MAX_VALUE):
> !image-2022-07-01-16-18-14-768.png!
> The program is currently set to checkpoint every 5 seconds. By experimenting 
> with 70 seconds, it seems that if only one checkpoint has been taken with the 
> “Short Lived Source” in a FINISHED state since the last recovery then 
> everything works fine and the restarted “Short Lived Source” emits its 
> watermark and I see the “ShortedLivedEmptySource emitting Long.MAX_VALUE 
> watermark” message on the console meaning the run() definitely executed. 
> However, I found that if 2 or more checkpoints are taken since the last 
> recovery with the source in a FINISHED state then the console message does 
> not appear and the watermark is not emitted.
> 

[jira] [Assigned] (FLINK-28357) Watermark issue when recovering Finished sources

2022-07-04 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski reassigned FLINK-28357:
--

Assignee: Piotr Nowojski

> Watermark issue when recovering Finished sources
> 
>
> Key: FLINK-28357
> URL: https://issues.apache.org/jira/browse/FLINK-28357
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.0
> Environment: This can be reproduced in an IDE with the attached 
> sample program.
>Reporter: James
>Assignee: Piotr Nowojski
>Priority: Major
> Attachments: WatermarkDemoMain.java, 
> image-2022-07-01-16-18-14-768.png, longExample.txt
>
>
> Copied mostly from email trail on the flink user mailing list:
> I done a lot of experimentation and I’m convinced there is a problem with 
> Flink handling Finished sources and recovery. 
> The program consists of:
>  * Two sources:
>  ** One “Long Running Source” – stays alive and emits a watermark of 
> DateTime.now() every 10 seconds.
>  *** Prints the console a message saying the watermark has been emitted.
>  *** *Throws an exception every 5 or 10 iterations to force a recovery.*
>  ** One “Short Lived Source” – emits a Long.MAX_VALUE watermark, prints a 
> message to the console and returns.
>  * The “Short Live Source” feeds into a map() and then it joins with the 
> “Long Running Source” with a KeyedCoProcessFunction. Moves to “FINISHED” 
> state by Flink.
> The problem here is that the “Join” receives no Long.MAX_VALUE watermark from 
> the map() in some situations after a recovery. The dashboard goes from 
> showing this:
> !https://attachment.outlook.live.net/owa/MSA%3Ajas_sl%40hotmail.com/service.svc/s/GetAttachmentThumbnail?id=AQMkADAwATEyMTk3LTZiMDQtODBkMi0wMAItMDAKAEYAAAOeUdiydD9QS6CQDK1Dg0olBwACkmHn2W1HRKQHhbPYmGe%2BAASF%2B488ApJh59ltR0SkB4Wz2JhnvgAFXJ9puQESABAAyemY6ar4b0GAFLHn3hpyCw%3D%3D=2=1=eyJhbGciOiJSUzI1NiIsImtpZCI6IkZBRDY1NDI2MkM2QUYyOTYxQUExRThDQUI3OEZGMUIyNzBFNzA3RTkiLCJ0eXAiOiJKV1QiLCJ4NXQiOiItdFpVSml4cThwWWFvZWpLdDRfeHNuRG5CLWsifQ.eyJvcmlnaW4iOiJodHRwczovL291dGxvb2subGl2ZS5jb20iLCJ1YyI6IjMwMDU4MTIzODAzMzRlMmZhNzE5ZGUxOTNjNjA4NjQ3IiwidmVyIjoiRXhjaGFuZ2UuQ2FsbGJhY2suVjEiLCJhcHBjdHhzZW5kZXIiOiJPd2FEb3dubG9hZEA4NGRmOWU3Zi1lOWY2LTQwYWYtYjQzNS1hYWFhYWFhYWFhYWEiLCJpc3NyaW5nIjoiV1ciLCJhcHBjdHgiOiJ7XCJtc2V4Y2hwcm90XCI6XCJvd2FcIixcInB1aWRcIjpcIjMxODQwOTE5NTk0NjE5NFwiLFwic2NvcGVcIjpcIk93YURvd25sb2FkXCIsXCJvaWRcIjpcIjAwMDEyMTk3LTZiMDQtODBkMi0wMDAwLTAwMDAwMDAwMDAwMFwiLFwicHJpbWFyeXNpZFwiOlwiUy0xLTI4MjctNzQxMzUtMTc5NTQ1NzIzNFwifSIsIm5iZiI6MTY1NjY4ODI3OCwiZXhwIjoxNjU2Njg4ODc4LCJpc3MiOiIwMDAwMDAwMi0wMDAwLTBmZjEtY2UwMC0wMDAwMDAwMDAwMDBAODRkZjllN2YtZTlmNi00MGFmLWI0MzUtYWFhYWFhYWFhYWFhIiwiYXVkIjoiMDAwMDAwMDItMDAwMC0wZmYxLWNlMDAtMDAwMDAwMDAwMDAwL2F0dGFjaG1lbnQub3V0bG9vay5saXZlLm5ldEA4NGRmOWU3Zi1lOWY2LTQwYWYtYjQzNS1hYWFhYWFhYWFhYWEiLCJoYXBwIjoib3dhIn0.KI4I55ycdP1duIwxyYZstLCtnNOwEkyTxfEwK_5a35-ZLMrKd8zHCB5Elw-9-A9UHIxFGSYOlwnHXRvDT0xa6FqFIlO8cnebBRLKv9DhxHwfZqdKWIeF2EcUqwH0ejeA3RvD3-dR95iHPTf52-tuKi27nclPUUEJgbfRWQY3wHMDAFLLaLvKM6AV5S1IhGjBmy3MF_1oulTXbqRZx0ar3L8YQiHEGnfKGjFO2zSxQcTZXAp_rch4HIrVv9GSEcQnD7nBhWPBuuzuvXOvJiUzg0u_e9CUuf1-OcQwhUV3cf7cvme8JadfliY6ywkOne1OZsclQeDFc8EnGZke3l2V_Q=Es9QgEoDXEyksG3kZxXeMGC1LvlzW9oYJ-lyWNl-xblWQjmqz5FH_a2-eHuR6Zr51XNjigQpQDs.=outlook.live.com=20220617005.11=true!
> To the below after a recovery (with the currentInput1/2Watermark metrics 
> showing input 2 having not received a watermark from the map, saying 
> –Long.MAX_VALUE):
> !image-2022-07-01-16-18-14-768.png!
> The program is currently set to checkpoint every 5 seconds. By experimenting 
> with 70 seconds, it seems that if only one checkpoint has been taken with the 
> “Short Lived Source” in a FINISHED state since the last recovery then 
> everything works fine and the restarted “Short Lived Source” emits its 
> watermark and I see the “ShortedLivedEmptySource emitting Long.MAX_VALUE 
> watermark” message on the console meaning the run() definitely executed. 
> However, I found that if 2 or more checkpoints are taken since the last 
> recovery with the source in a FINISHED state then the console message does 
> not appear and the watermark is not emitted.
> To repeat – the Join does not get a Long.MAX_VALUE watermark from my source 
> or Flink if I see two or more checkpoints logged in between recoveries. If 
> zero or checkpoints are made, everything is fine – the join gets the 
> watermark and I see my console message. You can play with the checkpointing 
> frequency as per the code comments:
>     // Useful checkpoint interval options:
>     //    5 - see the problem after the first recovery
>     //   70 - useful to see bad behaviour kick in after a recovery or two
>     //  120 - won't see the problem as we don't 

[jira] [Closed] (FLINK-27530) FLIP-227: Support overdraft buffer

2022-06-29 Thread Piotr Nowojski (Jira)
Title: Message Title


 
 
 
 

 
 
 

 
   
 Piotr Nowojski closed an issue as Fixed  
 

  
 
 
 
 

 
 
  
 
 
 
 

 
 Again thanks for your efforts fanrui   
 

  
 
 
 
 

 
 Flink /  FLINK-27530  
 
 
  FLIP-227: Support overdraft buffer   
 

  
 
 
 
 

 
Change By: 
 Piotr Nowojski  
 
 
Fix Version/s: 
 1.16.0  
 
 
Resolution: 
 Fixed  
 
 
Status: 
 Open Closed  
 

  
 
 
 
 

 
 
 

 
 
 Add Comment  
 

  
 

  
 
 
 
  
 

  
 
 
 
 

 
 This message was sent by Atlassian Jira (v8.20.10#820010-sha1:ace47f9)  
 
 

 
   
 

  
 

  
 

   



[jira] [Closed] (FLINK-27789) LegacySource compatible with overdraft buffer

2022-06-29 Thread Piotr Nowojski (Jira)
Title: Message Title


 
 
 
 

 
 
 

 
   
 Piotr Nowojski closed an issue as Fixed  
 

  
 
 
 
 

 
 
  
 
 
 
 

 
 merged commit a63b7dd into apache:master now  
 

  
 
 
 
 

 
 Flink /  FLINK-27789  
 
 
  LegacySource compatible with overdraft buffer   
 

  
 
 
 
 

 
Change By: 
 Piotr Nowojski  
 
 
Resolution: 
 Fixed  
 
 
Status: 
 Open Closed  
 

  
 
 
 
 

 
 
 

 
 
 Add Comment  
 

  
 

  
 
 
 
  
 

  
 
 
 
 

 
 This message was sent by Atlassian Jira (v8.20.10#820010-sha1:ace47f9)  
 
 

 
   
 

  
 

  
 

   



[jira] [Commented] (FLINK-26803) Merge small ChannelState file for Unaligned Checkpoint

2022-06-27 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17559261#comment-17559261
 ] 

Piotr Nowojski commented on FLINK-26803:


Hi [~fanrui] +1, from my side to the design doc.

> Merge small ChannelState file for Unaligned Checkpoint
> --
>
> Key: FLINK-26803
> URL: https://issues.apache.org/jira/browse/FLINK-26803
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing, Runtime / Network
>Reporter: fanrui
>Assignee: fanrui
>Priority: Major
> Attachments: image-2022-05-05-12-36-09-969.png
>
>
> When making an unaligned checkpoint, the number of ChannelState files is 
> TaskNumber * subtaskNumber. For high parallelism job, it writes too many 
> small files. It causes high load for hdfs NN.
>  
> In our production, a job writes more than 50K small files for each Unaligned 
> Checkpoint. Could we merge these files before write FileSystem? We can 
> configure the maximum number of files each TM can write in a single Unaligned 
> Checkpoint.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Assigned] (FLINK-26803) Merge small ChannelState file for Unaligned Checkpoint

2022-06-27 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26803?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski reassigned FLINK-26803:
--

Assignee: fanrui

> Merge small ChannelState file for Unaligned Checkpoint
> --
>
> Key: FLINK-26803
> URL: https://issues.apache.org/jira/browse/FLINK-26803
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing, Runtime / Network
>Reporter: fanrui
>Assignee: fanrui
>Priority: Major
> Attachments: image-2022-05-05-12-36-09-969.png
>
>
> When making an unaligned checkpoint, the number of ChannelState files is 
> TaskNumber * subtaskNumber. For high parallelism job, it writes too many 
> small files. It causes high load for hdfs NN.
>  
> In our production, a job writes more than 50K small files for each Unaligned 
> Checkpoint. Could we merge these files before write FileSystem? We can 
> configure the maximum number of files each TM can write in a single Unaligned 
> Checkpoint.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Closed] (FLINK-26762) Add the overdraft buffer in BufferPool to reduce unaligned checkpoint being blocked

2022-06-27 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26762?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-26762.
--
Resolution: Fixed

Chinese documentation translation merged as commit e57f2cf into apache:master 
now

Thanks [~fanrui]!

> Add the overdraft buffer in BufferPool to reduce unaligned checkpoint being 
> blocked
> ---
>
> Key: FLINK-26762
> URL: https://issues.apache.org/jira/browse/FLINK-26762
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / Network
>Affects Versions: 1.13.0, 1.14.0, 1.15.0
>Reporter: fanrui
>Assignee: fanrui
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
> Attachments: image-2022-04-18-11-45-14-700.png, 
> image-2022-04-18-11-46-03-895.png
>
>
> In some past JIRAs of Unaligned Checkpoint, the community has added the  
> recordWriter.isAvaliable() to reduce block for single record write. But for 
> large record, flatmap or broadcast watermark, they may need more buffer.
> Can we add the overdraft buffer in BufferPool to reduce unaligned checkpoint 
> being blocked? 
> h2. Overdraft Buffer mechanism
> Add the configuration of 
> 'taskmanager.network.memory.overdraft-buffers-per-gate=5'. 
> When requestMemory is called and the bufferPool is insufficient, the 
> bufferPool will allow the Task to overdraw up to 5 MemorySegments. And 
> bufferPool will be unavailable until all overdrawn buffers are consumed by 
> downstream tasks. Then the task will wait for bufferPool being available.
> From the above, we have the following benefits:
>  * For scenarios that require multiple buffers, the Task releases the 
> Checkpoint lock, so the Unaligned Checkpoint can be completed quickly.
>  * We can control the memory usage to prevent memory leak.
>  * It just needs a litter memory, and can improve the stability of the Task 
> under back pressure.
>  * Users can increase the overdraft-buffers to adapt the scenarios that 
> require more buffers.
>  
> Masters, please correct me if I'm wrong, thanks a lot.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-27921) Introduce the checkResourceRequirementsWithDelay in DeclarativeSlotManager

2022-06-23 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17558053#comment-17558053
 ] 

Piotr Nowojski commented on FLINK-27921:


Benchmarks maybe could be rewritten in such way that the measuring starts once 
for example all source subtasks are running, using some custom source. But if 
we can disable this feature for benchmarks, that would be simpler.

Separate thing to consider is whether we accept the increased start up cost 
introduced by this ticket.

> Introduce the checkResourceRequirementsWithDelay in DeclarativeSlotManager
> --
>
> Key: FLINK-27921
> URL: https://issues.apache.org/jira/browse/FLINK-27921
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Aitozi
>Assignee: Aitozi
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> As discussed in 
> [https://github.com/apache/flink/pull/19840#discussion_r884242067] .This 
> ticket is meant to introduce the same mechanism to wait for a slight delay 
> before process the resource check with {{{}FineGrainedSlotManager{}}}. It 
> will reduce the frequency of unnecessary re-allocations



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-25454) Negative time in throughput calculator

2022-06-23 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17557971#comment-17557971
 ] 

Piotr Nowojski commented on FLINK-25454:


merged commit 10e6341 into apache:release-1.14

This fix will be released as part of 1.14.6 version [~aliazov] (whenever this 
happens)

> Negative time in throughput calculator
> --
>
> Key: FLINK-25454
> URL: https://issues.apache.org/jira/browse/FLINK-25454
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.15.0
>Reporter: Anton Kalashnikov
>Assignee: Anton Kalashnikov
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0, 1.14.6
>
>
> Found during the random test:
> {noformat}
> 2021-12-23 11:52:01,645 WARN  org.apache.flink.runtime.taskmanager.Task   
>  [] - KeyedProcess -> Sink: Unnamed (3/3)#0 
> (1321490f33c6370f2d68c413a8a0b0c1) switched from RUNNING to FAILED with 
> failure cause: java.lang.IllegalArgumentException: Time should be non negative
> at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
> at 
> org.apache.flink.runtime.throughput.ThroughputCalculator.calculateThroughput(ThroughputCalculator.java:90)
> at 
> org.apache.flink.runtime.throughput.ThroughputCalculator.calculateThroughput(ThroughputCalculator.java:81)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.triggerDebloating(SingleInputGate.java:414)
> at 
> org.apache.flink.runtime.taskmanager.InputGateWithMetrics.triggerDebloating(InputGateWithMetrics.java:90)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.debloat(StreamTask.java:786)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$scheduleBufferDebloater$3(StreamTask.java:777)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:801)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:750)
> at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
> at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
> at java.base/java.lang.Thread.run(Unknown Source)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Comment Edited] (FLINK-25454) Negative time in throughput calculator

2022-06-23 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17557971#comment-17557971
 ] 

Piotr Nowojski edited comment on FLINK-25454 at 6/23/22 9:33 AM:
-

merged commit 10e6341 into apache:release-1.14

[~aliazov]: This fix will be released as part of 1.14.6 version (whenever this 
happens)


was (Author: pnowojski):
merged commit 10e6341 into apache:release-1.14

This fix will be released as part of 1.14.6 version [~aliazov] (whenever this 
happens)

> Negative time in throughput calculator
> --
>
> Key: FLINK-25454
> URL: https://issues.apache.org/jira/browse/FLINK-25454
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.15.0
>Reporter: Anton Kalashnikov
>Assignee: Anton Kalashnikov
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0, 1.14.6
>
>
> Found during the random test:
> {noformat}
> 2021-12-23 11:52:01,645 WARN  org.apache.flink.runtime.taskmanager.Task   
>  [] - KeyedProcess -> Sink: Unnamed (3/3)#0 
> (1321490f33c6370f2d68c413a8a0b0c1) switched from RUNNING to FAILED with 
> failure cause: java.lang.IllegalArgumentException: Time should be non negative
> at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
> at 
> org.apache.flink.runtime.throughput.ThroughputCalculator.calculateThroughput(ThroughputCalculator.java:90)
> at 
> org.apache.flink.runtime.throughput.ThroughputCalculator.calculateThroughput(ThroughputCalculator.java:81)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.triggerDebloating(SingleInputGate.java:414)
> at 
> org.apache.flink.runtime.taskmanager.InputGateWithMetrics.triggerDebloating(InputGateWithMetrics.java:90)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.debloat(StreamTask.java:786)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$scheduleBufferDebloater$3(StreamTask.java:777)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:801)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:750)
> at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
> at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
> at java.base/java.lang.Thread.run(Unknown Source)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (FLINK-25454) Negative time in throughput calculator

2022-06-23 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25454?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-25454:
---
Fix Version/s: 1.14.6

> Negative time in throughput calculator
> --
>
> Key: FLINK-25454
> URL: https://issues.apache.org/jira/browse/FLINK-25454
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.15.0
>Reporter: Anton Kalashnikov
>Assignee: Anton Kalashnikov
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0, 1.14.6
>
>
> Found during the random test:
> {noformat}
> 2021-12-23 11:52:01,645 WARN  org.apache.flink.runtime.taskmanager.Task   
>  [] - KeyedProcess -> Sink: Unnamed (3/3)#0 
> (1321490f33c6370f2d68c413a8a0b0c1) switched from RUNNING to FAILED with 
> failure cause: java.lang.IllegalArgumentException: Time should be non negative
> at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
> at 
> org.apache.flink.runtime.throughput.ThroughputCalculator.calculateThroughput(ThroughputCalculator.java:90)
> at 
> org.apache.flink.runtime.throughput.ThroughputCalculator.calculateThroughput(ThroughputCalculator.java:81)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.triggerDebloating(SingleInputGate.java:414)
> at 
> org.apache.flink.runtime.taskmanager.InputGateWithMetrics.triggerDebloating(InputGateWithMetrics.java:90)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.debloat(StreamTask.java:786)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$scheduleBufferDebloater$3(StreamTask.java:777)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:801)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:750)
> at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
> at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
> at java.base/java.lang.Thread.run(Unknown Source)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Comment Edited] (FLINK-26762) Add the overdraft buffer in BufferPool to reduce unaligned checkpoint being blocked

2022-06-21 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26762?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17556873#comment-17556873
 ] 

Piotr Nowojski edited comment on FLINK-26762 at 6/21/22 3:48 PM:
-

Feature merged commit 0b60ee8 into apache:master
documentations merged as a17d824e179 into apache:master


was (Author: pnowojski):
Feature merged commit 0b60ee8 into apache:master

> Add the overdraft buffer in BufferPool to reduce unaligned checkpoint being 
> blocked
> ---
>
> Key: FLINK-26762
> URL: https://issues.apache.org/jira/browse/FLINK-26762
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / Network
>Affects Versions: 1.13.0, 1.14.0, 1.15.0
>Reporter: fanrui
>Assignee: fanrui
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
> Attachments: image-2022-04-18-11-45-14-700.png, 
> image-2022-04-18-11-46-03-895.png
>
>
> In some past JIRAs of Unaligned Checkpoint, the community has added the  
> recordWriter.isAvaliable() to reduce block for single record write. But for 
> large record, flatmap or broadcast watermark, they may need more buffer.
> Can we add the overdraft buffer in BufferPool to reduce unaligned checkpoint 
> being blocked? 
> h2. Overdraft Buffer mechanism
> Add the configuration of 
> 'taskmanager.network.memory.overdraft-buffers-per-gate=5'. 
> When requestMemory is called and the bufferPool is insufficient, the 
> bufferPool will allow the Task to overdraw up to 5 MemorySegments. And 
> bufferPool will be unavailable until all overdrawn buffers are consumed by 
> downstream tasks. Then the task will wait for bufferPool being available.
> From the above, we have the following benefits:
>  * For scenarios that require multiple buffers, the Task releases the 
> Checkpoint lock, so the Unaligned Checkpoint can be completed quickly.
>  * We can control the memory usage to prevent memory leak.
>  * It just needs a litter memory, and can improve the stability of the Task 
> under back pressure.
>  * Users can increase the overdraft-buffers to adapt the scenarios that 
> require more buffers.
>  
> Masters, please correct me if I'm wrong, thanks a lot.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-26762) Add the overdraft buffer in BufferPool to reduce unaligned checkpoint being blocked

2022-06-21 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26762?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17556873#comment-17556873
 ] 

Piotr Nowojski commented on FLINK-26762:


Feature merged commit 0b60ee8 into apache:master

> Add the overdraft buffer in BufferPool to reduce unaligned checkpoint being 
> blocked
> ---
>
> Key: FLINK-26762
> URL: https://issues.apache.org/jira/browse/FLINK-26762
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / Network
>Affects Versions: 1.13.0, 1.14.0, 1.15.0
>Reporter: fanrui
>Assignee: fanrui
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
> Attachments: image-2022-04-18-11-45-14-700.png, 
> image-2022-04-18-11-46-03-895.png
>
>
> In some past JIRAs of Unaligned Checkpoint, the community has added the  
> recordWriter.isAvaliable() to reduce block for single record write. But for 
> large record, flatmap or broadcast watermark, they may need more buffer.
> Can we add the overdraft buffer in BufferPool to reduce unaligned checkpoint 
> being blocked? 
> h2. Overdraft Buffer mechanism
> Add the configuration of 
> 'taskmanager.network.memory.overdraft-buffers-per-gate=5'. 
> When requestMemory is called and the bufferPool is insufficient, the 
> bufferPool will allow the Task to overdraw up to 5 MemorySegments. And 
> bufferPool will be unavailable until all overdrawn buffers are consumed by 
> downstream tasks. Then the task will wait for bufferPool being available.
> From the above, we have the following benefits:
>  * For scenarios that require multiple buffers, the Task releases the 
> Checkpoint lock, so the Unaligned Checkpoint can be completed quickly.
>  * We can control the memory usage to prevent memory leak.
>  * It just needs a litter memory, and can improve the stability of the Task 
> under back pressure.
>  * Users can increase the overdraft-buffers to adapt the scenarios that 
> require more buffers.
>  
> Masters, please correct me if I'm wrong, thanks a lot.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (FLINK-26762) Add the overdraft buffer in BufferPool to reduce unaligned checkpoint being blocked

2022-06-21 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26762?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-26762:
---
Release Note: 
New concept of overdraft network buffers was introduced to mitigate effects of 
uninterruptible blocking a subtask thread during back pressure. Starting from 
1.16.0 Flink subtask can request by default up to 5 extra (overdraft) buffers 
over the regular configured amount (you can read more about this in the 
documentation: 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/memory/network_mem_tuning/#overdraft-buffers).
 This change can slightly increase memory consumption of the Flink Job. To 
restore the older behaviour you can set 
`taskmanager.network.memory.max-overdraft-buffers-per-gate` to zero.



> Add the overdraft buffer in BufferPool to reduce unaligned checkpoint being 
> blocked
> ---
>
> Key: FLINK-26762
> URL: https://issues.apache.org/jira/browse/FLINK-26762
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / Network
>Affects Versions: 1.13.0, 1.14.0, 1.15.0
>Reporter: fanrui
>Assignee: fanrui
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
> Attachments: image-2022-04-18-11-45-14-700.png, 
> image-2022-04-18-11-46-03-895.png
>
>
> In some past JIRAs of Unaligned Checkpoint, the community has added the  
> recordWriter.isAvaliable() to reduce block for single record write. But for 
> large record, flatmap or broadcast watermark, they may need more buffer.
> Can we add the overdraft buffer in BufferPool to reduce unaligned checkpoint 
> being blocked? 
> h2. Overdraft Buffer mechanism
> Add the configuration of 
> 'taskmanager.network.memory.overdraft-buffers-per-gate=5'. 
> When requestMemory is called and the bufferPool is insufficient, the 
> bufferPool will allow the Task to overdraw up to 5 MemorySegments. And 
> bufferPool will be unavailable until all overdrawn buffers are consumed by 
> downstream tasks. Then the task will wait for bufferPool being available.
> From the above, we have the following benefits:
>  * For scenarios that require multiple buffers, the Task releases the 
> Checkpoint lock, so the Unaligned Checkpoint can be completed quickly.
>  * We can control the memory usage to prevent memory leak.
>  * It just needs a litter memory, and can improve the stability of the Task 
> under back pressure.
>  * Users can increase the overdraft-buffers to adapt the scenarios that 
> require more buffers.
>  
> Masters, please correct me if I'm wrong, thanks a lot.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-27792) InterruptedException thrown by ChannelStateWriterImpl

2022-06-20 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17556311#comment-17556311
 ] 

Piotr Nowojski commented on FLINK-27792:


Thanks for the analysis [~fanrui]. It makes sense to me. Let's close this one 
FLINK-28077 will be merged.

> InterruptedException thrown by ChannelStateWriterImpl
> -
>
> Key: FLINK-27792
> URL: https://issues.apache.org/jira/browse/FLINK-27792
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0
>Reporter: Huang Xingbo
>Assignee: Atri Sharma
>Priority: Blocker
>  Labels: test-stability
> Attachments: image-2022-06-18-16-39-54-402.png, 
> image-2022-06-18-16-42-13-669.png, image-2022-06-18-16-42-46-283.png
>
>
> {code:java}
> 2022-05-25T15:45:17.7584795Z May 25 15:45:17 [ERROR] 
> WindowDistinctAggregateITCase.testTumbleWindow_Rollup  Time elapsed: 1.522 s  
> <<< ERROR!
> 2022-05-25T15:45:17.7586025Z May 25 15:45:17 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> 2022-05-25T15:45:17.7587205Z May 25 15:45:17  at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
> 2022-05-25T15:45:17.7588649Z May 25 15:45:17  at 
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
> 2022-05-25T15:45:17.7589984Z May 25 15:45:17  at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
> 2022-05-25T15:45:17.7603647Z May 25 15:45:17  at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
> 2022-05-25T15:45:17.7605042Z May 25 15:45:17  at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> 2022-05-25T15:45:17.7605750Z May 25 15:45:17  at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
> 2022-05-25T15:45:17.7606751Z May 25 15:45:17  at 
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:268)
> 2022-05-25T15:45:17.7607513Z May 25 15:45:17  at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
> 2022-05-25T15:45:17.7608232Z May 25 15:45:17  at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
> 2022-05-25T15:45:17.7608953Z May 25 15:45:17  at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> 2022-05-25T15:45:17.7614259Z May 25 15:45:17  at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
> 2022-05-25T15:45:17.7615777Z May 25 15:45:17  at 
> org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1277)
> 2022-05-25T15:45:17.7617284Z May 25 15:45:17  at 
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
> 2022-05-25T15:45:17.7618847Z May 25 15:45:17  at 
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
> 2022-05-25T15:45:17.7620579Z May 25 15:45:17  at 
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
> 2022-05-25T15:45:17.7622674Z May 25 15:45:17  at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
> 2022-05-25T15:45:17.7624066Z May 25 15:45:17  at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
> 2022-05-25T15:45:17.7625352Z May 25 15:45:17  at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> 2022-05-25T15:45:17.7626524Z May 25 15:45:17  at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
> 2022-05-25T15:45:17.7627743Z May 25 15:45:17  at 
> org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47)
> 2022-05-25T15:45:17.7628913Z May 25 15:45:17  at 
> akka.dispatch.OnComplete.internal(Future.scala:300)
> 2022-05-25T15:45:17.7629902Z May 25 15:45:17  at 
> akka.dispatch.OnComplete.internal(Future.scala:297)
> 2022-05-25T15:45:17.7630891Z May 25 15:45:17  at 
> akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
> 2022-05-25T15:45:17.7632074Z May 25 15:45:17  at 
> akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
> 2022-05-25T15:45:17.7654202Z May 25 15:45:17  at 
> scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
> 2022-05-25T15:45:17.7655764Z May 25 15:45:17  at 
> org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
> 2022-05-25T15:45:17.7657231Z May 25 15:45:17  at 
> 

[jira] [Commented] (FLINK-27792) InterruptedException thrown by ChannelStateWriterImpl

2022-06-17 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1725#comment-1725
 ] 

Piotr Nowojski commented on FLINK-27792:


Thanks for pointing this out. So what's happening in this issue? Streaming job 
completes while a checkpoint is in progress. Job closing sends the interrupt, 
that is mishandled? CC [~fanrui]

> InterruptedException thrown by ChannelStateWriterImpl
> -
>
> Key: FLINK-27792
> URL: https://issues.apache.org/jira/browse/FLINK-27792
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0
>Reporter: Huang Xingbo
>Assignee: Atri Sharma
>Priority: Blocker
>  Labels: test-stability
>
> {code:java}
> 2022-05-25T15:45:17.7584795Z May 25 15:45:17 [ERROR] 
> WindowDistinctAggregateITCase.testTumbleWindow_Rollup  Time elapsed: 1.522 s  
> <<< ERROR!
> 2022-05-25T15:45:17.7586025Z May 25 15:45:17 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> 2022-05-25T15:45:17.7587205Z May 25 15:45:17  at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
> 2022-05-25T15:45:17.7588649Z May 25 15:45:17  at 
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
> 2022-05-25T15:45:17.7589984Z May 25 15:45:17  at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
> 2022-05-25T15:45:17.7603647Z May 25 15:45:17  at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
> 2022-05-25T15:45:17.7605042Z May 25 15:45:17  at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> 2022-05-25T15:45:17.7605750Z May 25 15:45:17  at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
> 2022-05-25T15:45:17.7606751Z May 25 15:45:17  at 
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:268)
> 2022-05-25T15:45:17.7607513Z May 25 15:45:17  at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
> 2022-05-25T15:45:17.7608232Z May 25 15:45:17  at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
> 2022-05-25T15:45:17.7608953Z May 25 15:45:17  at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> 2022-05-25T15:45:17.7614259Z May 25 15:45:17  at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
> 2022-05-25T15:45:17.7615777Z May 25 15:45:17  at 
> org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1277)
> 2022-05-25T15:45:17.7617284Z May 25 15:45:17  at 
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
> 2022-05-25T15:45:17.7618847Z May 25 15:45:17  at 
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
> 2022-05-25T15:45:17.7620579Z May 25 15:45:17  at 
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
> 2022-05-25T15:45:17.7622674Z May 25 15:45:17  at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
> 2022-05-25T15:45:17.7624066Z May 25 15:45:17  at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
> 2022-05-25T15:45:17.7625352Z May 25 15:45:17  at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> 2022-05-25T15:45:17.7626524Z May 25 15:45:17  at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
> 2022-05-25T15:45:17.7627743Z May 25 15:45:17  at 
> org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47)
> 2022-05-25T15:45:17.7628913Z May 25 15:45:17  at 
> akka.dispatch.OnComplete.internal(Future.scala:300)
> 2022-05-25T15:45:17.7629902Z May 25 15:45:17  at 
> akka.dispatch.OnComplete.internal(Future.scala:297)
> 2022-05-25T15:45:17.7630891Z May 25 15:45:17  at 
> akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
> 2022-05-25T15:45:17.7632074Z May 25 15:45:17  at 
> akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
> 2022-05-25T15:45:17.7654202Z May 25 15:45:17  at 
> scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
> 2022-05-25T15:45:17.7655764Z May 25 15:45:17  at 
> org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
> 2022-05-25T15:45:17.7657231Z May 25 15:45:17  at 
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
> 2022-05-25T15:45:17.7658586Z May 25 

[jira] [Commented] (FLINK-27792) InterruptedException thrown by ChannelStateWriterImpl

2022-06-17 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17555461#comment-17555461
 ] 

Piotr Nowojski commented on FLINK-27792:


What is causing this `InterruptedException`? Where does it originate from?

> InterruptedException thrown by ChannelStateWriterImpl
> -
>
> Key: FLINK-27792
> URL: https://issues.apache.org/jira/browse/FLINK-27792
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0
>Reporter: Huang Xingbo
>Assignee: Atri Sharma
>Priority: Blocker
>  Labels: test-stability
>
> {code:java}
> 2022-05-25T15:45:17.7584795Z May 25 15:45:17 [ERROR] 
> WindowDistinctAggregateITCase.testTumbleWindow_Rollup  Time elapsed: 1.522 s  
> <<< ERROR!
> 2022-05-25T15:45:17.7586025Z May 25 15:45:17 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> 2022-05-25T15:45:17.7587205Z May 25 15:45:17  at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
> 2022-05-25T15:45:17.7588649Z May 25 15:45:17  at 
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
> 2022-05-25T15:45:17.7589984Z May 25 15:45:17  at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
> 2022-05-25T15:45:17.7603647Z May 25 15:45:17  at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
> 2022-05-25T15:45:17.7605042Z May 25 15:45:17  at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> 2022-05-25T15:45:17.7605750Z May 25 15:45:17  at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
> 2022-05-25T15:45:17.7606751Z May 25 15:45:17  at 
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:268)
> 2022-05-25T15:45:17.7607513Z May 25 15:45:17  at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
> 2022-05-25T15:45:17.7608232Z May 25 15:45:17  at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
> 2022-05-25T15:45:17.7608953Z May 25 15:45:17  at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> 2022-05-25T15:45:17.7614259Z May 25 15:45:17  at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
> 2022-05-25T15:45:17.7615777Z May 25 15:45:17  at 
> org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1277)
> 2022-05-25T15:45:17.7617284Z May 25 15:45:17  at 
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
> 2022-05-25T15:45:17.7618847Z May 25 15:45:17  at 
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
> 2022-05-25T15:45:17.7620579Z May 25 15:45:17  at 
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
> 2022-05-25T15:45:17.7622674Z May 25 15:45:17  at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
> 2022-05-25T15:45:17.7624066Z May 25 15:45:17  at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
> 2022-05-25T15:45:17.7625352Z May 25 15:45:17  at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> 2022-05-25T15:45:17.7626524Z May 25 15:45:17  at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
> 2022-05-25T15:45:17.7627743Z May 25 15:45:17  at 
> org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47)
> 2022-05-25T15:45:17.7628913Z May 25 15:45:17  at 
> akka.dispatch.OnComplete.internal(Future.scala:300)
> 2022-05-25T15:45:17.7629902Z May 25 15:45:17  at 
> akka.dispatch.OnComplete.internal(Future.scala:297)
> 2022-05-25T15:45:17.7630891Z May 25 15:45:17  at 
> akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
> 2022-05-25T15:45:17.7632074Z May 25 15:45:17  at 
> akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
> 2022-05-25T15:45:17.7654202Z May 25 15:45:17  at 
> scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
> 2022-05-25T15:45:17.7655764Z May 25 15:45:17  at 
> org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
> 2022-05-25T15:45:17.7657231Z May 25 15:45:17  at 
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
> 2022-05-25T15:45:17.7658586Z May 25 15:45:17  at 
> scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
> 

[jira] [Commented] (FLINK-26993) CheckpointCoordinatorTest#testMinCheckpointPause

2022-06-16 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26993?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17555078#comment-17555078
 ] 

Piotr Nowojski commented on FLINK-26993:


merged commit 3e86cf2 into apache:release-1.15 now

> CheckpointCoordinatorTest#testMinCheckpointPause
> 
>
> Key: FLINK-26993
> URL: https://issues.apache.org/jira/browse/FLINK-26993
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Runtime / Checkpointing, Tests
>Affects Versions: 1.15.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0, 1.15.2
>
>
> The test triggers checkpoints, waits for the CC to have stored a pending 
> checkpoint, and then sends an acknowledge.
> The acknowledge can fail with an NPE because the 
> PendingCheckpoint#checkpointTargetLocation hasn't been set yet. This doesn't 
> happen synchronously with the PendingCheckpoint being added to 
> CheckpointCoordinator#pendingCheckpoints.
> {code}
> Apr 01 19:57:36 [ERROR] 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest.testMinCheckpointPause
>   Time elapsed: 0.012 s  <<< ERROR!
> Apr 01 19:57:36 org.apache.flink.runtime.checkpoint.CheckpointException: 
> Could not finalize the pending checkpoint 1. Failure reason: Failure to 
> finalize checkpoint.
> Apr 01 19:57:36   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1354)
> Apr 01 19:57:36   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1241)
> Apr 01 19:57:36   at 
> org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72)
> ...
> Apr 01 19:57:36 Caused by: java.lang.NullPointerException
> Apr 01 19:57:36   at 
> org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:327)
> Apr 01 19:57:36   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1337)
> Apr 01 19:57:36   ... 50 more
> {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (FLINK-26993) CheckpointCoordinatorTest#testMinCheckpointPause

2022-06-16 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-26993:
---
Fix Version/s: 1.15.2

> CheckpointCoordinatorTest#testMinCheckpointPause
> 
>
> Key: FLINK-26993
> URL: https://issues.apache.org/jira/browse/FLINK-26993
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Runtime / Checkpointing, Tests
>Affects Versions: 1.15.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0, 1.15.2
>
>
> The test triggers checkpoints, waits for the CC to have stored a pending 
> checkpoint, and then sends an acknowledge.
> The acknowledge can fail with an NPE because the 
> PendingCheckpoint#checkpointTargetLocation hasn't been set yet. This doesn't 
> happen synchronously with the PendingCheckpoint being added to 
> CheckpointCoordinator#pendingCheckpoints.
> {code}
> Apr 01 19:57:36 [ERROR] 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest.testMinCheckpointPause
>   Time elapsed: 0.012 s  <<< ERROR!
> Apr 01 19:57:36 org.apache.flink.runtime.checkpoint.CheckpointException: 
> Could not finalize the pending checkpoint 1. Failure reason: Failure to 
> finalize checkpoint.
> Apr 01 19:57:36   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1354)
> Apr 01 19:57:36   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1241)
> Apr 01 19:57:36   at 
> org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72)
> ...
> Apr 01 19:57:36 Caused by: java.lang.NullPointerException
> Apr 01 19:57:36   at 
> org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:327)
> Apr 01 19:57:36   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1337)
> Apr 01 19:57:36   ... 50 more
> {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-25454) Negative time in throughput calculator

2022-06-16 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17555077#comment-17555077
 ] 

Piotr Nowojski commented on FLINK-25454:


I've looked into backporting this fix, but it's not that simple. The fix would 
have to be re-implemented, as the code has changed quite a lot between 1.14.x 
and 1.15.x. If someone ([~aliazov]? [~akalashnikov]?) would like to pick up 
this work, and re-implement [~akalashnikov]'s fix for 1.14 I would be happy to 
review it.

> Negative time in throughput calculator
> --
>
> Key: FLINK-25454
> URL: https://issues.apache.org/jira/browse/FLINK-25454
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.15.0
>Reporter: Anton Kalashnikov
>Assignee: Anton Kalashnikov
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Found during the random test:
> {noformat}
> 2021-12-23 11:52:01,645 WARN  org.apache.flink.runtime.taskmanager.Task   
>  [] - KeyedProcess -> Sink: Unnamed (3/3)#0 
> (1321490f33c6370f2d68c413a8a0b0c1) switched from RUNNING to FAILED with 
> failure cause: java.lang.IllegalArgumentException: Time should be non negative
> at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
> at 
> org.apache.flink.runtime.throughput.ThroughputCalculator.calculateThroughput(ThroughputCalculator.java:90)
> at 
> org.apache.flink.runtime.throughput.ThroughputCalculator.calculateThroughput(ThroughputCalculator.java:81)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.triggerDebloating(SingleInputGate.java:414)
> at 
> org.apache.flink.runtime.taskmanager.InputGateWithMetrics.triggerDebloating(InputGateWithMetrics.java:90)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.debloat(StreamTask.java:786)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$scheduleBufferDebloater$3(StreamTask.java:777)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:801)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:750)
> at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
> at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
> at java.base/java.lang.Thread.run(Unknown Source)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (FLINK-27114) On JM restart, the information about the initial checkpoints can be lost

2022-06-14 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-27114:
---
Fix Version/s: (was: 1.16.0)
   (was: 1.15.1)
   (was: 1.14.6)

> On JM restart, the information about the initial checkpoints can be lost
> 
>
> Key: FLINK-27114
> URL: https://issues.apache.org/jira/browse/FLINK-27114
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.0, 1.14.4, 1.16.0
>Reporter: Roman Khachatryan
>Priority: Major
>
> Scenario (1.14):
>  # A job starts from an existing checkpoint 1, with incremental checkpoints 
> enabled
>  # Checkpoint 1 is loaded with discardOnSubsume=false by 
> CheckpointCoordinator.restoreSavepoint
>  # A new checkpoint 2 completes, it reuses some state from the initial 
> checkpoint
>  # At some point, checkpoint 1 is subsumed, but the state is not discarded 
> (thanks to discardOnSubsume=false, ref counts stay 1)
>  # JM crashes
>  # JM restarts, loads the checkpoints 2..x from ZK (or other store) -   
> discardOnSubsume=true (as deserialized from handles)
>  # At some point, checkpoint 2 is subsumed and the initial shared state is 
> not used anymore; because checkpoint 2 has discardOnSubsume=true, shared 
> state will be erroneously discarded
> In 1.15, there were the following changes:
>  # RestoreMode was added; only LEGACY mode is affected (in NO_CLAIM mode, 
> checkpoint 2 can't reuse any initial state; and in CLAIM mode, it's fine to 
> discard the initial state)
>  # SharedStateRegistry was changed from refCounts to highest checkpoint ID 
> (FLINK-24611)
>  # In step (7), state will not be discarded (FLINK-26985); however, because 
> it's impossible to distinguish initial state from the state created by this 
> job, the latter will not be discarded as well, leading to left-over state 
> artifacts.
> The proposed solution is to store the initial checkpoint ID (in store such as 
> ZK or in checkpoints) and adjust steps 6 or 7.
> Storing restore information in checkpoint allows to handle multiple restore 
> modes in the "lineage", e.g.:
> Initial run -> restore in NO_CLAIM -> restore in CLAIM



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (FLINK-24483) Document what is Public API and what compatibility guarantees Flink is providing

2022-06-14 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24483?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-24483:
---
Fix Version/s: (was: 1.16.0)
   (was: 1.13.7)
   (was: 1.14.6)

> Document what is Public API and what compatibility guarantees Flink is 
> providing
> 
>
> Key: FLINK-24483
> URL: https://issues.apache.org/jira/browse/FLINK-24483
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / DataStream, Documentation, Table SQL / API
>Affects Versions: 1.14.0, 1.12.5, 1.13.2
>Reporter: Piotr Nowojski
>Priority: Major
>
> We should document:
> * What constitute of the Public API, what do 
> Public/PublicEvolving/Experimental/Internal annotations mean.
> * What compatibility guarantees we are providing forward (backward?) 
> functional/compile/binary compatibility for {{@Public}} interfaces?
> A good starting point: 
> https://cwiki.apache.org/confluence/plugins/servlet/mobile?contentId=44302796#content/view/62686683



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Closed] (FLINK-27522) Ignore max buffers per channel when allocate buffer

2022-05-31 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27522?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-27522.
--
Resolution: Fixed

merged commit 9146220 into apache:master

> Ignore max buffers per channel when allocate buffer
> ---
>
> Key: FLINK-27522
> URL: https://issues.apache.org/jira/browse/FLINK-27522
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / Network
>Affects Versions: 1.14.0, 1.15.0
>Reporter: fanrui
>Assignee: fanrui
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> This is first task of  
> [FLIP-227|https://cwiki.apache.org/confluence/display/FLINK/FLIP-227%3A+Support+overdraft+buffer]
> The LocalBufferPool will be unavailable when the maxBuffersPerChannel is 
> reached for this channel or availableMemorySegments.isEmpty.
> If we request a memory segment from LocalBufferPool and the 
> maxBuffersPerChannel is reached for this channel, we just ignore that and 
> continue to allocate buffer while availableMemorySegments isn't empty in 
> LocalBufferPool.
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-27667) YARNHighAvailabilityITCase fails with "Failed to delete temp directory /tmp/junit1681"

2022-05-30 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17543996#comment-17543996
 ] 

Piotr Nowojski commented on FLINK-27667:


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=36183=logs=fc5181b0-e452-5c8f-68de-1097947f6483=995c650b-6573-581c-9ce6-7ad4cc038461

> YARNHighAvailabilityITCase fails with "Failed to delete temp directory 
> /tmp/junit1681"
> --
>
> Key: FLINK-27667
> URL: https://issues.apache.org/jira/browse/FLINK-27667
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN
>Affects Versions: 1.16.0
>Reporter: Martijn Visser
>Priority: Major
>  Labels: test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=35733=logs=fc5181b0-e452-5c8f-68de-1097947f6483=995c650b-6573-581c-9ce6-7ad4cc038461=29208
>  
> {code:bash}
> May 17 08:36:22 [INFO] Results: 
> May 17 08:36:22 [INFO] 
> May 17 08:36:22 [ERROR] Errors: 
> May 17 08:36:22 [ERROR] YARNHighAvailabilityITCase » IO Failed to delete temp 
> directory /tmp/junit1681... 
> May 17 08:36:22 [INFO] 
> May 17 08:36:22 [ERROR] Tests run: 28, Failures: 0, Errors: 1, Skipped: 0 
> May 17 08:36:22 [INFO] 
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (FLINK-27667) YARNHighAvailabilityITCase fails with "Failed to delete temp directory /tmp/junit1681"

2022-05-30 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27667?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-27667:
---
Priority: Critical  (was: Major)

> YARNHighAvailabilityITCase fails with "Failed to delete temp directory 
> /tmp/junit1681"
> --
>
> Key: FLINK-27667
> URL: https://issues.apache.org/jira/browse/FLINK-27667
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN
>Affects Versions: 1.16.0
>Reporter: Martijn Visser
>Priority: Critical
>  Labels: test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=35733=logs=fc5181b0-e452-5c8f-68de-1097947f6483=995c650b-6573-581c-9ce6-7ad4cc038461=29208
>  
> {code:bash}
> May 17 08:36:22 [INFO] Results: 
> May 17 08:36:22 [INFO] 
> May 17 08:36:22 [ERROR] Errors: 
> May 17 08:36:22 [ERROR] YARNHighAvailabilityITCase » IO Failed to delete temp 
> directory /tmp/junit1681... 
> May 17 08:36:22 [INFO] 
> May 17 08:36:22 [ERROR] Tests run: 28, Failures: 0, Errors: 1, Skipped: 0 
> May 17 08:36:22 [INFO] 
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Closed] (FLINK-27825) Update the doc of aligned checkpoint timeout

2022-05-30 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27825?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-27825.
--
Resolution: Fixed

Thanks for the update [~fanrui]. 

merged commit 317b230 into apache:master 

> Update the doc of aligned checkpoint timeout
> 
>
> Key: FLINK-27825
> URL: https://issues.apache.org/jira/browse/FLINK-27825
> Project: Flink
>  Issue Type: Improvement
>  Components: chinese-translation, Documentation, Runtime / 
> Checkpointing
>Affects Versions: 1.16.0
>Reporter: fanrui
>Assignee: fanrui
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
> Attachments: image-2022-05-28-18-00-13-276.png
>
>
> We improved the mechanism of aligned checkpoint timeout in FLINK-27251.
> The doc of aligned checkpoint timeout should be updated. 
> Doc link: 
> [https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/#aligned-checkpoint-timeout]
>  
> !image-2022-05-28-18-00-13-276.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Assigned] (FLINK-27825) Update the doc of aligned checkpoint timeout

2022-05-30 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27825?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski reassigned FLINK-27825:
--

Assignee: fanrui  (was: Piotr Nowojski)

> Update the doc of aligned checkpoint timeout
> 
>
> Key: FLINK-27825
> URL: https://issues.apache.org/jira/browse/FLINK-27825
> Project: Flink
>  Issue Type: Improvement
>  Components: chinese-translation, Documentation, Runtime / 
> Checkpointing
>Affects Versions: 1.16.0
>Reporter: fanrui
>Assignee: fanrui
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
> Attachments: image-2022-05-28-18-00-13-276.png
>
>
> We improved the mechanism of aligned checkpoint timeout in FLINK-27251.
> The doc of aligned checkpoint timeout should be updated. 
> Doc link: 
> [https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/#aligned-checkpoint-timeout]
>  
> !image-2022-05-28-18-00-13-276.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Assigned] (FLINK-27825) Update the doc of aligned checkpoint timeout

2022-05-30 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27825?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski reassigned FLINK-27825:
--

Assignee: Piotr Nowojski

> Update the doc of aligned checkpoint timeout
> 
>
> Key: FLINK-27825
> URL: https://issues.apache.org/jira/browse/FLINK-27825
> Project: Flink
>  Issue Type: Improvement
>  Components: chinese-translation, Documentation, Runtime / 
> Checkpointing
>Affects Versions: 1.16.0
>Reporter: fanrui
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
> Attachments: image-2022-05-28-18-00-13-276.png
>
>
> We improved the mechanism of aligned checkpoint timeout in FLINK-27251.
> The doc of aligned checkpoint timeout should be updated. 
> Doc link: 
> [https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/#aligned-checkpoint-timeout]
>  
> !image-2022-05-28-18-00-13-276.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Assigned] (FLINK-27789) LegacySource compatible with overdraft buffer

2022-05-30 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27789?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski reassigned FLINK-27789:
--

Assignee: fanrui

> LegacySource compatible with overdraft buffer
> -
>
> Key: FLINK-27789
> URL: https://issues.apache.org/jira/browse/FLINK-27789
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Common, Runtime / Checkpointing, Runtime / 
> Network
>Reporter: fanrui
>Assignee: fanrui
>Priority: Major
> Fix For: 1.16.0
>
>
> Since LegacySource does not have checkAvailable, LegacySource will use all 
> overdraft buffers by default, this is not what we expected.
> So we'll set overdraft=0 for the SourceStreamTask.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Assigned] (FLINK-27530) FLIP-227: Support overdraft buffer

2022-05-30 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27530?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski reassigned FLINK-27530:
--

Assignee: fanrui

> FLIP-227: Support overdraft buffer
> --
>
> Key: FLINK-27530
> URL: https://issues.apache.org/jira/browse/FLINK-27530
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing, Runtime / Network
>Reporter: fanrui
>Assignee: fanrui
>Priority: Major
>
> This is the umbrella issue for the feature of unaligned checkpoints. Refer to 
> the 
> [FLIP-227|https://cwiki.apache.org/confluence/display/FLINK/FLIP-227%3A+Support+overdraft+buffer]
>   for more details.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Assigned] (FLINK-27522) Ignore max buffers per channel when allocate buffer

2022-05-30 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27522?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski reassigned FLINK-27522:
--

Assignee: fanrui

> Ignore max buffers per channel when allocate buffer
> ---
>
> Key: FLINK-27522
> URL: https://issues.apache.org/jira/browse/FLINK-27522
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / Network
>Affects Versions: 1.14.0, 1.15.0
>Reporter: fanrui
>Assignee: fanrui
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> This is first task of  
> [FLIP-227|https://cwiki.apache.org/confluence/display/FLINK/FLIP-227%3A+Support+overdraft+buffer]
> The LocalBufferPool will be unavailable when the maxBuffersPerChannel is 
> reached for this channel or availableMemorySegments.isEmpty.
> If we request a memory segment from LocalBufferPool and the 
> maxBuffersPerChannel is reached for this channel, we just ignore that and 
> continue to allocate buffer while availableMemorySegments isn't empty in 
> LocalBufferPool.
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Closed] (FLINK-27251) Timeout aligned to unaligned checkpoint barrier in the output buffers of an upstream subtask

2022-05-25 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27251?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-27251.
--
Resolution: Fixed

Merged to master as e68d679e8ac^^..e68d679e8ac.

Thank you [~fanrui] for a very useful contribution :)

> Timeout aligned to unaligned checkpoint barrier in the output buffers of an 
> upstream subtask
> 
>
> Key: FLINK-27251
> URL: https://issues.apache.org/jira/browse/FLINK-27251
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.14.0, 1.15.0
>Reporter: fanrui
>Assignee: fanrui
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> After FLINK-23041, the downstream task can be switched UC when {_}currentTime 
> - triggerTime > timeout{_}. But the downstream task still needs wait for all 
> barriers of upstream. 
> If the back pressure is serve, the downstream task cannot receive all barrier 
> within CP timeout, causes CP to fail.
>  
> Can we support upstream Task switching from Aligned to UC? It means that when 
> the barrier cannot be sent from the output buffer to the downstream task 
> within the 
> [execution.checkpointing.aligned-checkpoint-timeout|https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#execution-checkpointing-aligned-checkpoint-timeout],
>  the upstream task switches to UC and takes a snapshot of the data before the 
> barrier in the output buffer.
>  
> Hi [~akalashnikov] , please help take a look in your free time, thanks a lot.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Closed] (FLINK-27724) The SubtaskCheckpointCoordinatorImpl#close() isn't called in anywhere

2022-05-24 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27724?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-27724.
--
Resolution: Not A Problem

As [~fanrui] pointed out in an offline discussion, 
{{SubtaskCheckpointCoordinatorImpl}} is actually closed via call stack:
# org.apache.flink.streaming.runtime.tasks.StreamTask#cleanUp
# resourceCloser.close();
# and combination of {{resourceCloser.registerCloseable(cancelables);}} + 
registering {{SubtaskCheckpointCoordinatorImpl}} in {{cancelables}}.

Thanks for spotting this.

> The SubtaskCheckpointCoordinatorImpl#close() isn't called in anywhere
> -
>
> Key: FLINK-27724
> URL: https://issues.apache.org/jira/browse/FLINK-27724
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.14.0, 1.15.0
>Reporter: fanrui
>Assignee: fanrui
>Priority: Major
> Fix For: 1.16.0
>
>
> The SubtaskCheckpointCoordinatorImpl#close() isn't called in anywhere



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Assigned] (FLINK-27724) The SubtaskCheckpointCoordinatorImpl#close() isn't called in anywhere

2022-05-23 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27724?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski reassigned FLINK-27724:
--

Assignee: fanrui

> The SubtaskCheckpointCoordinatorImpl#close() isn't called in anywhere
> -
>
> Key: FLINK-27724
> URL: https://issues.apache.org/jira/browse/FLINK-27724
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.14.0, 1.15.0
>Reporter: fanrui
>Assignee: fanrui
>Priority: Major
> Fix For: 1.16.0
>
>
> The SubtaskCheckpointCoordinatorImpl#close() isn't called in anywhere



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-27724) The SubtaskCheckpointCoordinatorImpl#close() isn't called in anywhere

2022-05-23 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17540947#comment-17540947
 ] 

Piotr Nowojski commented on FLINK-27724:


{quote}
In StreamTask#cancel(), Subtaskcheckbpointcoordinatorimpl#close() should 
probably be called.
{quote}
Yes, but I think on the clean shut down path 
{{SubtaskCheckpointCoordinatorImpl}} is never closed, right?

> The SubtaskCheckpointCoordinatorImpl#close() isn't called in anywhere
> -
>
> Key: FLINK-27724
> URL: https://issues.apache.org/jira/browse/FLINK-27724
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.14.0, 1.15.0
>Reporter: fanrui
>Priority: Major
> Fix For: 1.16.0
>
>
> The SubtaskCheckpointCoordinatorImpl#close() isn't called in anywhere



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-26803) Merge small ChannelState file for Unaligned Checkpoint

2022-05-18 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538819#comment-17538819
 ] 

Piotr Nowojski commented on FLINK-26803:


Sorry for late response [~fanrui]. Thanks for the explanation, I now understand 
the motivation. Indeed for mostly state less jobs this can be an issue.

However I'm still not sure if this is the right thing to do, or at least the 
right way to solve this problem. 
{{quote}}
For large parallelism flink jobs, this size is usually more than 1M.
{{quote}}
1M is not that small file. I think most of the stateful jobs have actually 
smaller state files.

Maybe there should be some more generic mechanism for aggregating small state 
handles, that would solve this problem and at the same time state files? CC 
[~ym] But maybe that's not feasible, as we probably can not merge two RocksDB's 
SST files from different operators/tasks into a single state handle the same 
way we could do it for the in-flight data?

> Merge small ChannelState file for Unaligned Checkpoint
> --
>
> Key: FLINK-26803
> URL: https://issues.apache.org/jira/browse/FLINK-26803
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing, Runtime / Network
>Reporter: fanrui
>Priority: Major
> Attachments: image-2022-05-05-12-36-09-969.png
>
>
> When making an unaligned checkpoint, the number of ChannelState files is 
> TaskNumber * subtaskNumber. For high parallelism job, it writes too many 
> small files. It causes high load for hdfs NN.
>  
> In our production, a job writes more than 50K small files for each Unaligned 
> Checkpoint. Could we merge these files before write FileSystem? We can 
> configure the maximum number of files each TM can write in a single Unaligned 
> Checkpoint.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Comment Edited] (FLINK-27251) Timeout aligned to unaligned checkpoint barrier in the output buffers of an upstream subtask

2022-05-18 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538730#comment-17538730
 ] 

Piotr Nowojski edited comment on FLINK-27251 at 5/18/22 11:01 AM:
--

Thanks [~fanrui] for the update. I will take a look :)

{quote}
But I don't understand "when that thread is blocked by the timeout, it's queue 
of requests should be completely empty.", could your share more details? Which 
thread? Is the ChannelStateWriteThread?
{quote}
Yes, I meant the {{ChannelStateWriterThread}}. If we are enqueuing timeoutable, 
but not yet timed out checkpoint barrier on the outputs, it means that we have 
already received AND processed ALL of the checkpoint barriers on the input 
channels. In other words,under any circumstances there won't be need to 
spill/persist any in-flight data from the outputs for this checkpoints. So if 
we are blocking the {{ChannelStateWriterThread}} for this subtask with waiting 
for the future (for checkpoint barriers to timeout on the output or being sent 
to the downstream task), this {{ChannelStateWriterThread}} doesn't have 
anything else to do. It doesn't matter if we block it or not. New write 
requests to this {{ChannelStateWriterThread}} can only happen for a next 
checkpoint, that won't happen until the current checkpoint completes.


was (Author: pnowojski):
Thanks [~fanrui] for the update. I will take a look :)

{quote}
But I don't understand "when that thread is blocked by the timeout, it's queue 
of requests should be completely empty.", could your share more details? Which 
thread? Is the ChannelStateWriteThread?
{quote}
Yes, I meant the {{ChannelStateWriterThread}}. If we are enqueuing timeoutable, 
but not yet timed out checkpoint barrier on the outputs, it means that we have 
already received AND processed ALL of the checkpoint barriers on the input 
channels. In other words, there under any circumstances there won't be need to 
spill/persist any in-flight data from the outputs for this checkpoints. So if 
we are blocking the {{ChannelStateWriterThread}} for this subtask with waiting 
for the future (for checkpoint barriers to timeout on the output or being sent 
to the downstream task), this {{ChannelStateWriterThread}} doesn't have 
anything else to do. It doesn't matter if we block it or not. New write 
requests to this {{ChannelStateWriterThread}} can only happen for a next 
checkpoint, that won't happen until the current checkpoint completes.

> Timeout aligned to unaligned checkpoint barrier in the output buffers of an 
> upstream subtask
> 
>
> Key: FLINK-27251
> URL: https://issues.apache.org/jira/browse/FLINK-27251
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.14.0, 1.15.0
>Reporter: fanrui
>Assignee: fanrui
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> After FLINK-23041, the downstream task can be switched UC when {_}currentTime 
> - triggerTime > timeout{_}. But the downstream task still needs wait for all 
> barriers of upstream. 
> If the back pressure is serve, the downstream task cannot receive all barrier 
> within CP timeout, causes CP to fail.
>  
> Can we support upstream Task switching from Aligned to UC? It means that when 
> the barrier cannot be sent from the output buffer to the downstream task 
> within the 
> [execution.checkpointing.aligned-checkpoint-timeout|https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#execution-checkpointing-aligned-checkpoint-timeout],
>  the upstream task switches to UC and takes a snapshot of the data before the 
> barrier in the output buffer.
>  
> Hi [~akalashnikov] , please help take a look in your free time, thanks a lot.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-27251) Timeout aligned to unaligned checkpoint barrier in the output buffers of an upstream subtask

2022-05-18 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538730#comment-17538730
 ] 

Piotr Nowojski commented on FLINK-27251:


Thanks [~fanrui] for the update. I will take a look :)

{quote}
But I don't understand "when that thread is blocked by the timeout, it's queue 
of requests should be completely empty.", could your share more details? Which 
thread? Is the ChannelStateWriteThread?
{quote}
Yes, I meant the {{ChannelStateWriterThread}}. If we are enqueuing timeoutable, 
but not yet timed out checkpoint barrier on the outputs, it means that we have 
already received AND processed ALL of the checkpoint barriers on the input 
channels. In other words, there under any circumstances there won't be need to 
spill/persist any in-flight data from the outputs for this checkpoints. So if 
we are blocking the {{ChannelStateWriterThread}} for this subtask with waiting 
for the future (for checkpoint barriers to timeout on the output or being sent 
to the downstream task), this {{ChannelStateWriterThread}} doesn't have 
anything else to do. It doesn't matter if we block it or not. New write 
requests to this {{ChannelStateWriterThread}} can only happen for a next 
checkpoint, that won't happen until the current checkpoint completes.

> Timeout aligned to unaligned checkpoint barrier in the output buffers of an 
> upstream subtask
> 
>
> Key: FLINK-27251
> URL: https://issues.apache.org/jira/browse/FLINK-27251
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.14.0, 1.15.0
>Reporter: fanrui
>Assignee: fanrui
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> After FLINK-23041, the downstream task can be switched UC when {_}currentTime 
> - triggerTime > timeout{_}. But the downstream task still needs wait for all 
> barriers of upstream. 
> If the back pressure is serve, the downstream task cannot receive all barrier 
> within CP timeout, causes CP to fail.
>  
> Can we support upstream Task switching from Aligned to UC? It means that when 
> the barrier cannot be sent from the output buffer to the downstream task 
> within the 
> [execution.checkpointing.aligned-checkpoint-timeout|https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#execution-checkpointing-aligned-checkpoint-timeout],
>  the upstream task switches to UC and takes a snapshot of the data before the 
> barrier in the output buffer.
>  
> Hi [~akalashnikov] , please help take a look in your free time, thanks a lot.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (FLINK-27640) Flink not compiling, flink-connector-hive_2.12 is missing jhyde pentaho-aggdesigner-algorithm:jar:5.1.5-jhyde

2022-05-16 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27640?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-27640:
---
Summary: Flink not compiling, flink-connector-hive_2.12 is missing jhyde 
pentaho-aggdesigner-algorithm:jar:5.1.5-jhyde   (was: Flink not compiling, 
flink-connector-hive_2.12 is missing 
pentaho-aggdesigner-algorithm:jar:5.1.5-jhyde )

> Flink not compiling, flink-connector-hive_2.12 is missing jhyde 
> pentaho-aggdesigner-algorithm:jar:5.1.5-jhyde 
> --
>
> Key: FLINK-27640
> URL: https://issues.apache.org/jira/browse/FLINK-27640
> Project: Flink
>  Issue Type: Bug
>  Components: Build System, Connectors / Hive
>Affects Versions: 1.16.0
>Reporter: Piotr Nowojski
>Assignee: Chesnay Schepler
>Priority: Critical
>
> When clean installing whole project after cleaning local {{.m2}} directory I 
> encountered the following error when compiling flink-connector-hive_2.12:
> {noformat}
> [ERROR] Failed to execute goal on project flink-connector-hive_2.12: Could 
> not resolve dependencies for project 
> org.apache.flink:flink-connector-hive_2.12:jar:1.16-SNAPSHOT: Failed to 
> collect dependencies at org.apache.hive:hive-exec:jar:2.3.9 -> 
> org.pentaho:pentaho-aggdesigner-algorithm:jar:5.1.5-jhyde: Failed to read 
> artifact descriptor for 
> org.pentaho:pentaho-aggdesigner-algorithm:jar:5.1.5-jhyde: Could not transfer 
> artifact org.pentaho:pentaho-aggdesigner-algorithm:pom:5.1.5-jhyde from/to 
> maven-default-http-blocker (http://0.0.0.0/): Blocked mirror for 
> repositories: [conjars (http://conjars.org/repo, default, 
> releases+snapshots), apache.snapshots 
> (http://repository.apache.org/snapshots, default, snapshots)] -> [Help 1]
> {noformat}
> I've solved this by adding 
> {noformat}
> 
> spring-repo-plugins
> https://repo.spring.io/ui/native/plugins-release/
> 
> {noformat}
> to ~/.m2/settings.xml file. 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27640) Flink not compiling, flink-connector-hive_2.12 is missing pentaho-aggdesigner-algorithm:jar:5.1.5-jhyde

2022-05-16 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-27640:
--

 Summary: Flink not compiling, flink-connector-hive_2.12 is missing 
pentaho-aggdesigner-algorithm:jar:5.1.5-jhyde 
 Key: FLINK-27640
 URL: https://issues.apache.org/jira/browse/FLINK-27640
 Project: Flink
  Issue Type: Bug
  Components: Build System, Connectors / Hive
Affects Versions: 1.16.0
Reporter: Piotr Nowojski


When clean installing whole project after cleaning local {{.m2}} directory I 
encountered the following error when compiling flink-connector-hive_2.12:
{noformat}
[ERROR] Failed to execute goal on project flink-connector-hive_2.12: Could not 
resolve dependencies for project 
org.apache.flink:flink-connector-hive_2.12:jar:1.16-SNAPSHOT: Failed to collect 
dependencies at org.apache.hive:hive-exec:jar:2.3.9 -> 
org.pentaho:pentaho-aggdesigner-algorithm:jar:5.1.5-jhyde: Failed to read 
artifact descriptor for 
org.pentaho:pentaho-aggdesigner-algorithm:jar:5.1.5-jhyde: Could not transfer 
artifact org.pentaho:pentaho-aggdesigner-algorithm:pom:5.1.5-jhyde from/to 
maven-default-http-blocker (http://0.0.0.0/): Blocked mirror for repositories: 
[conjars (http://conjars.org/repo, default, releases+snapshots), 
apache.snapshots (http://repository.apache.org/snapshots, default, snapshots)] 
-> [Help 1]
{noformat}
I've solved this by adding 
{noformat}

spring-repo-plugins
https://repo.spring.io/ui/native/plugins-release/

{noformat}
to ~/.m2/settings.xml file. 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Comment Edited] (FLINK-27251) Timeout aligned to unaligned checkpoint barrier in the output buffers of an upstream subtask

2022-05-12 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17536089#comment-17536089
 ] 

Piotr Nowojski edited comment on FLINK-27251 at 5/12/22 12:34 PM:
--

Thanks for the answers. Re 3., thanks for the pointers, I've missed that. I 
think the 1. and 2. could be fixed in the PR. It will be easier to discuss it 
and other issues there.

Re 4. I'm not sure how complicated would be the FIFO queue solution? How many 
writer threads do we have right now? There is one 
{{ChannelStateWriteRequestExecutor}} per each subtask and each instance has 
it's own one single thread? If so, maybe your current proposal is actually 
fine? We do not support concurrent unaligned checkpoints, so when that thread 
is blocked by the timeout, it's queue of requests should be completely empty.

Maybe one missing thing is support of aborting checkpoints. If checkpoint is 
being aborted, it would be good to cancel those futures?

All in all I think +1 for this feature. It looks easier then I though/feared. 
(I think we don't need a new FLIP for this)


was (Author: pnowojski):
Thanks for the answers. Re 3., thanks for the pointers, I've missed that. I 
think the 1. and 2. could be fixed in the PR. It will be easier to discuss it 
and other issues there.

Re 4. I'm not sure how complicated would be the FIFO queue solution? How many 
writer threads do we have right now? There is one 
{{ChannelStateWriteRequestExecutor}} per each subtask and each instance has 
it's own one single thread? If so, maybe your current proposal is actually 
fine? We do not support concurrent unaligned checkpoints, so when that thread 
is blocked by the timeout, it's queue of requests should be completely empty.

Maybe one missing thing is support of aborting checkpoints. If checkpoint is 
being aborted, it would be good to cancel those futures?

All in all I think +1 for this feature. It looks easier then I though/feared. 

> Timeout aligned to unaligned checkpoint barrier in the output buffers of an 
> upstream subtask
> 
>
> Key: FLINK-27251
> URL: https://issues.apache.org/jira/browse/FLINK-27251
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.14.0, 1.15.0
>Reporter: fanrui
>Priority: Major
> Fix For: 1.16.0
>
>
> After FLINK-23041, the downstream task can be switched UC when {_}currentTime 
> - triggerTime > timeout{_}. But the downstream task still needs wait for all 
> barriers of upstream. 
> If the back pressure is serve, the downstream task cannot receive all barrier 
> within CP timeout, causes CP to fail.
>  
> Can we support upstream Task switching from Aligned to UC? It means that when 
> the barrier cannot be sent from the output buffer to the downstream task 
> within the 
> [execution.checkpointing.aligned-checkpoint-timeout|https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#execution-checkpointing-aligned-checkpoint-timeout],
>  the upstream task switches to UC and takes a snapshot of the data before the 
> barrier in the output buffer.
>  
> Hi [~akalashnikov] , please help take a look in your free time, thanks a lot.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Assigned] (FLINK-27251) Timeout aligned to unaligned checkpoint barrier in the output buffers of an upstream subtask

2022-05-12 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27251?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski reassigned FLINK-27251:
--

Assignee: fanrui

> Timeout aligned to unaligned checkpoint barrier in the output buffers of an 
> upstream subtask
> 
>
> Key: FLINK-27251
> URL: https://issues.apache.org/jira/browse/FLINK-27251
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.14.0, 1.15.0
>Reporter: fanrui
>Assignee: fanrui
>Priority: Major
> Fix For: 1.16.0
>
>
> After FLINK-23041, the downstream task can be switched UC when {_}currentTime 
> - triggerTime > timeout{_}. But the downstream task still needs wait for all 
> barriers of upstream. 
> If the back pressure is serve, the downstream task cannot receive all barrier 
> within CP timeout, causes CP to fail.
>  
> Can we support upstream Task switching from Aligned to UC? It means that when 
> the barrier cannot be sent from the output buffer to the downstream task 
> within the 
> [execution.checkpointing.aligned-checkpoint-timeout|https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#execution-checkpointing-aligned-checkpoint-timeout],
>  the upstream task switches to UC and takes a snapshot of the data before the 
> barrier in the output buffer.
>  
> Hi [~akalashnikov] , please help take a look in your free time, thanks a lot.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-27251) Timeout aligned to unaligned checkpoint barrier in the output buffers of an upstream subtask

2022-05-12 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17536089#comment-17536089
 ] 

Piotr Nowojski commented on FLINK-27251:


Thanks for the answers. Re 3., thanks for the pointers, I've missed that. I 
think the 1. and 2. could be fixed in the PR. It will be easier to discuss it 
and other issues there.

Re 4. I'm not sure how complicated would be the FIFO queue solution? How many 
writer threads do we have right now? There is one 
{{ChannelStateWriteRequestExecutor}} per each subtask and each instance has 
it's own one single thread? If so, maybe your current proposal is actually 
fine? We do not support concurrent unaligned checkpoints, so when that thread 
is blocked by the timeout, it's queue of requests should be completely empty.

Maybe one missing thing is support of aborting checkpoints. If checkpoint is 
being aborted, it would be good to cancel those futures?

All in all I think +1 for this feature. It looks easier then I though/feared. 

> Timeout aligned to unaligned checkpoint barrier in the output buffers of an 
> upstream subtask
> 
>
> Key: FLINK-27251
> URL: https://issues.apache.org/jira/browse/FLINK-27251
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.14.0, 1.15.0
>Reporter: fanrui
>Priority: Major
> Fix For: 1.16.0
>
>
> After FLINK-23041, the downstream task can be switched UC when {_}currentTime 
> - triggerTime > timeout{_}. But the downstream task still needs wait for all 
> barriers of upstream. 
> If the back pressure is serve, the downstream task cannot receive all barrier 
> within CP timeout, causes CP to fail.
>  
> Can we support upstream Task switching from Aligned to UC? It means that when 
> the barrier cannot be sent from the output buffer to the downstream task 
> within the 
> [execution.checkpointing.aligned-checkpoint-timeout|https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#execution-checkpointing-aligned-checkpoint-timeout],
>  the upstream task switches to UC and takes a snapshot of the data before the 
> barrier in the output buffer.
>  
> Hi [~akalashnikov] , please help take a look in your free time, thanks a lot.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-25872) Restoring from non-changelog checkpoint with changelog state-backend enabled in CLAIM mode discards state in use

2022-05-11 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17534881#comment-17534881
 ] 

Piotr Nowojski commented on FLINK-25872:


I would still wait and see how many people would be using changelog and if 
indeed they complain about having to take a savepoint. I have a feeling that 
the set of users that falls into those two categories might be so small, 
especially initially, that it's not worth increasing the complexity of the 
system. 

> Restoring from non-changelog checkpoint with changelog state-backend enabled 
> in CLAIM mode discards state in use
> 
>
> Key: FLINK-25872
> URL: https://issues.apache.org/jira/browse/FLINK-25872
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Reporter: Yun Tang
>Assignee: Yanfei Lei
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> If we restore from checkpoint with changelog state-backend enabled in 
> snapshot CLAIM mode, the restored checkpoint would be discarded on subsume. 
> This invalidates newer/active checkpoints because their materialized part is 
> discarded (for incremental wrapped checkpoints, their private state is 
> discarded). This bug is like FLINK-25478.
>  
> Design doc: 
> [https://docs.google.com/document/d/1KSFWc0gL7HkhC-JNrnsp06TLnsTmZOTHITQDcGMo0cI/edit?usp=sharing,]



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Comment Edited] (FLINK-27251) Timeout aligned to unaligned checkpoint barrier in the output buffers of an upstream subtask

2022-05-11 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17534876#comment-17534876
 ] 

Piotr Nowojski edited comment on FLINK-27251 at 5/11/22 12:53 PM:
--

I've looked at the code and I have a couple of high level comments/questions:

# Instead of overriding {{PipelinedResultPartition#broadcastEvent}} having a 
quite ugly {{instanceof}} check, I would try to move this code to, 
{{SubtaskCheckpointCoordinatorImpl#checkpointState}} shortly after broadcasting 
the {{CheckpointBarrier}}.
# {{AlignedCheckpointTimeoutHandle}} should not be executed in non task thread. 
{{InputGates}}, {{InputChannels}}, {{ResultPartitions}} and 
{{ResultSubpartition}} are mostly non-thread safe classes, and all interactions 
with them should happen through the task thread, with an exception of the netty 
threads, pulling or putting data from subpartitions/into channels. I would 
change this and make {{AlignedCheckpointTimeoutHandle}} a 
{{StreamTask#mainMailboxExecutor}} action/mail. [*] 
# It looks like the completion of an aligned checkpoint will be always blocked 
on the timeout in your PoC, regardless if the checkpoint barrier is sent 
downstream or not before the timeout? So without any backpressure, enabling the 
timeout will extend the time of the aligned checkpoint?
# {{ChannelStateWriter}} might be blocked waiting for the output data future to 
complete, wasting resources/cycles?

[*] If you haven't seen this, main task thread runs a loop that prioritises 
processing new mails/actions before processing input records. The task thread 
spends all of its time in {{StreamTask#invoke}} and in 
{{StreamTask#runMailboxLoop}} in particular.


was (Author: pnowojski):
I've looked at the code and I have a couple of high level comments/questions:

# Instead of overriding {{PipelinedResultPartition#broadcastEvent}} having a 
quite ugly {{instanceof}} check, I would try to move this code to, 
{{SubtaskCheckpointCoordinatorImpl#checkpointState}} shortly after broadcasting 
the {{CheckpointBarrier}}.
# {{AlignedCheckpointTimeoutHandle}} should not be executed in non task thread. 
{{InputGates}}, {{InputChannels}}, {{ResultPartitions}} and 
{{ResultSubpartition}} are mostly non-thread safe classes, and all interactions 
with them should happen through the task thread, with an exception of the netty 
threads, pulling or putting data from subpartitions/into channels. I would 
change this and make {{AlignedCheckpointTimeoutHandle}} a 
{{StreamTask#mainMailboxExecutor}} action/mail. [1] 
# It looks like the completion of an aligned checkpoint will be always blocked 
on the timeout in your PoC, regardless if the checkpoint barrier is sent 
downstream or not before the timeout? So without any backpressure, enabling the 
timeout will extend the time of the aligned checkpoint?
# {{ChannelStateWriter}} might be blocked waiting for the output data future to 
complete, wasting resources/cycles?

[1] If you haven't seen this, main task thread runs a loop that prioritises 
processing new mails/actions before processing input records. The task thread 
spends all of its time in {{StreamTask#invoke}} and in 
{{StreamTask#runMailboxLoop}} in particular.

> Timeout aligned to unaligned checkpoint barrier in the output buffers of an 
> upstream subtask
> 
>
> Key: FLINK-27251
> URL: https://issues.apache.org/jira/browse/FLINK-27251
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.14.0, 1.15.0
>Reporter: fanrui
>Priority: Major
> Fix For: 1.16.0
>
>
> After FLINK-23041, the downstream task can be switched UC when {_}currentTime 
> - triggerTime > timeout{_}. But the downstream task still needs wait for all 
> barriers of upstream. 
> If the back pressure is serve, the downstream task cannot receive all barrier 
> within CP timeout, causes CP to fail.
>  
> Can we support upstream Task switching from Aligned to UC? It means that when 
> the barrier cannot be sent from the output buffer to the downstream task 
> within the 
> [execution.checkpointing.aligned-checkpoint-timeout|https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#execution-checkpointing-aligned-checkpoint-timeout],
>  the upstream task switches to UC and takes a snapshot of the data before the 
> barrier in the output buffer.
>  
> Hi [~akalashnikov] , please help take a look in your free time, thanks a lot.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


<    1   2   3   4   5   6   7   8   9   10   >