Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks Finished

2021-07-21 Thread Guowei Ma
Hi,

Thank Dawid and Piotr for sharing the problem. +1 to EndInput/Finish can be
called repeatedly.

Let me share some of my understanding of this problem, hoping to help
people with a similar problem.

At the beginning, what bothered me the most was: if the framework notified
a task `Finish & Snapshot & CheckpointComplete` message,
why would it notify the task `EndInput` message again and even have new
data? The reason I found it strange at first was: I thought
CheckpointComplete (X) semantics is: the data before X will not come again,
if this X represents the end "position", then
it means that all the data has been processed, and then also There will be
no more data.

After having some offline discussions with Yun, I think the "location"
represented by X is actually related to the topology of the job.
CheckpointComplete(X) only means that the data has been processed before
the X position of an operator in a certain topology,
not all data of the entire job will never come again (especially when we
introduced Final Checkpoint Point).
Therefore, if the topology changes, under the new topology, the meaning of
the previous position has changed,
so it may be reasonable to continue to receive messages.

Best,
Guowei


On Thu, Jul 22, 2021 at 2:46 AM Piotr Nowojski  wrote:

> Hi Steven,
>
> > I probably missed sth here. isn't this the case today already? Why is it
> a concern for the proposed change?
>
> The problem is with the newly added `finish()` method and the already
> existing `endInput()` call. Currently on master there are no issues,
> because we are not checkpointing any operators after some operators have
> finished. The purpose of this FLIP-147 is to exactly enable this and this
> opens a new problem described by Dawid.
>
> To paraphrase and to give a concrete example.  Assume we have an operator
> with parallelism of two. Subtask 0 and subtask 1.
>
> 1. Subtask 0 received both `endInput()` and `finish()`, but subtask 1
> hasn't (yet).
> 2. Checkpoint 42 is triggered, and it completes.
> 3. Job fails and is restarted, but at the same time it's rescaled. User has
> chosen to scale down his operator down to 1.
>
> Now we have a pickle. We don't know if `notifyCheckpointComplete(42)` has
> been processed or not, so while recovering to checkpoint 42, we have to
> recover both finished subtask 0 (#1) state and not yet finished subtask 1's
> (#1). But at the same time they are scaled down, so we only have a single
> subtask 0 (#2) that has a combined state from both of the previous
> instances. The potentially confusing issue is that the state from subtask 0
> (#1) was checkpointed AFTER `endInput()` and `finish()` calls, but it's
> recovered to an operator that has still some records to process. In step 1.
> an user for example could store on the operator's state a bit of
> information "end input has been already called!", that after recovery would
> no longer be true.
>
> Hence the question about `finish()` and `endInput()` semantics. Should it
> be tied down to state, or just to an operator instance/execution attempt?
>
> Piotrek
>
> śr., 21 lip 2021 o 19:00 Steven Wu  napisał(a):
>
> > > if a failure happens after sequence of finish() -> snapshotState(), but
> > before notifyCheckpointComplete(), we will restore such a state and we
> > might end up sending some more records to such an operator.
> >
> > I probably missed sth here. isn't this the case today already? Why is it
> a
> > concern for the proposed change?
> >
> > On Wed, Jul 21, 2021 at 4:39 AM Piotr Nowojski 
> > wrote:
> >
> > > Hi Dawid,
> > >
> > > Thanks for writing down those concerns.
> > >
> > > I think the first issue boils down what should be the contract of
> > lifecycle
> > > methods like open(), close(), initializeState() etc and especially the
> > new
> > > additions like finish() and endInput(). And what should be their
> relation
> > > with the operator state (regardless of it's type keyed, non-keyed,
> union,
> > > ...). Should those methods be tied to state or not? After thinking
> about
> > it
> > > for a while (and discussing it offline with Dawid), I think the answer
> > > might be no, they shouldn't. I mean maybe we should just openly say
> that
> > > all of those methods relate to this single particular instance and
> > > execution of the operator. And if a job is recovered/rescaled, we would
> > be
> > > allowed to freely resume consumption, ignoring a fact that maybe some
> > parts
> > > of the state have previously seen `endInput()`. Why?
> > >
> > > 0. Yes, it might be confusing. Especially with `endInput()`. We call
> > > `endInput()`, we store something in a state and later after recovery
> > > combined with rescaling that state can see more records? Indeed weird,
> > > 1. I haven't come up yet with a counterexample that would break and
> make
> > > impossible to implement a real life use case. Theoretically yes, the
> user
> > > can store `endInput()` on state, and after rescaling this state would
> be
> > > incon

[jira] [Created] (FLINK-23465) JobListener#onJobExecuted cannot be called when sql job finished

2021-07-21 Thread camilesing (Jira)
camilesing created FLINK-23465:
--

 Summary: JobListener#onJobExecuted cannot be called when sql job 
finished
 Key: FLINK-23465
 URL: https://issues.apache.org/jira/browse/FLINK-23465
 Project: Flink
  Issue Type: Bug
  Components: API / Core
Reporter: camilesing


in my code,use sql api, source is kafka and sink can be anything. job FINISHED 
when isEndOfStream return true, but JobListener#onJobExecuted is not executed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23464) Benchmarks aren't compiling

2021-07-21 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-23464:


 Summary: Benchmarks aren't compiling
 Key: FLINK-23464
 URL: https://issues.apache.org/jira/browse/FLINK-23464
 Project: Flink
  Issue Type: Bug
  Components: Benchmarks
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler


{code}
19:07:09  [INFO] Compiling 1 Scala source and 64 Java sources to 
/home/jenkins/workspace/flink-master-benchmarks/flink-benchmarks/target/classes 
...
19:07:13  [ERROR] 
/home/jenkins/workspace/flink-master-benchmarks/flink-benchmarks/src/main/java/org/apache/flink/benchmark/StreamGraphUtils.java:26:43:
  error: cannot find symbol
19:07:13  [WARNING] javac exited with exit code 1
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-185: Shorter heartbeat timeout and interval default values

2021-07-21 Thread Gen Luo
Hi,
Thanks for driving this @Till Rohrmann  . I would
give +1 on reducing the heartbeat timeout and interval, though I'm not sure
if 15s and 3s would be enough either.

IMO, except for the standalone cluster, where the heartbeat mechanism in
Flink is totally relied, reducing the heartbeat can also help JM to find
out faster TaskExecutors in abnormal conditions that can not respond to the
heartbeat requests, e.g., continuously Full GC, though the process of
TaskExecutor is alive and may not be known by the deployment system. Since
there are cases that can benefit from this change, I think it could be done
if it won't break the experience in other scenarios.

If we can address what will block the main threads from processing
heartbeats, or enlarge the GC costs, we can try to get rid of them to have
a more predictable response time of heartbeat, or give some advices to
users if their jobs may encounter these issues. For example, as far as I
know JM of a large scale job will be more busy and may not able to process
heartbeats in time, then we can give a advice that users working with job
large than 5000 tasks should enlarge there heartbeat interval to 10s and
timeout to 50s. The numbers are written casually.

As for the issue in FLINK-23216, I think it should be fixed and may not be
a main concern for this case.

On Wed, Jul 21, 2021 at 6:26 PM Till Rohrmann  wrote:

> Thanks for sharing these insights.
>
> I think it is no longer true that the ResourceManager notifies the
> JobMaster about lost TaskExecutors. See FLINK-23216 [1] for more details.
>
> Given the GC pauses, would you then be ok with decreasing the heartbeat
> timeout to 20 seconds? This should give enough time to do the GC and then
> still send/receive a heartbeat request.
>
> I also wanted to add that we are about to get rid of one big cause of
> blocking I/O operations from the main thread. With FLINK-22483 [2] we will
> get rid of Filesystem accesses to retrieve completed checkpoints. This
> leaves us with one additional file system access from the main thread which
> is the one completing a pending checkpoint. I think it should be possible
> to get rid of this access because as Stephan said it only writes
> information to disk that is already written before. Maybe solving these two
> issues could ease concerns about long pauses of unresponsiveness of Flink.
>
> [1] https://issues.apache.org/jira/browse/FLINK-23216
> [2] https://issues.apache.org/jira/browse/FLINK-22483
>
> Cheers,
> Till
>
> On Wed, Jul 21, 2021 at 4:58 AM Yang Wang  wrote:
>
>> Thanks @Till Rohrmann   for starting this
>> discussion
>>
>> Firstly, I try to understand the benefit of shorter heartbeat timeout.
>> IIUC, it will make the JobManager aware of
>> TaskManager faster. However, it seems that only the standalone cluster
>> could benefit from this. For Yarn and
>> native Kubernetes deployment, the Flink ResourceManager should get the
>> TaskManager lost event in a very short time.
>>
>> * About 8 seconds, 3s for Yarn NM -> Yarn RM, 5s for Yarn RM -> Flink RM
>> * Less than 1 second, Flink RM has a watch for all the TaskManager pods
>>
>> Secondly, I am not very confident to decrease the timeout to 15s. I have
>> quickly checked the TaskManager GC logs
>> in the past week of our internal Flink workloads and find more than 100
>> 10-seconds Full GC logs, but no one is bigger than 15s.
>> We are using CMS GC for old generation.
>>
>>
>> Best,
>> Yang
>>
>> Till Rohrmann  于2021年7月17日周六 上午1:05写道:
>>
>>> Hi everyone,
>>>
>>> Since Flink 1.5 we have the same heartbeat timeout and interval default
>>> values that are defined as heartbeat.timeout: 50s and heartbeat.interval:
>>> 10s. These values were mainly chosen to compensate for lengthy GC pauses
>>> and blocking operations that were executed in the main threads of Flink's
>>> components. Since then, there were quite some advancements wrt the JVM's
>>> GCs and we also got rid of a lot of blocking calls that were executed in
>>> the main thread. Moreover, a long heartbeat.timeout causes long recovery
>>> times in case of a TaskManager loss because the system can only properly
>>> recover after the dead TaskManager has been removed from the scheduler.
>>> Hence, I wanted to propose to change the timeout and interval to:
>>>
>>> heartbeat.timeout: 15s
>>> heartbeat.interval: 3s
>>>
>>> Since there is no perfect solution that fits all use cases, I would
>>> really
>>> like to hear from you what you think about it and how you configure these
>>> heartbeat options. Based on your experience we might actually come up
>>> with
>>> better default values that allow us to be resilient but also to detect
>>> failed components fast. FLIP-185 can be found here [1].
>>>
>>> [1] https://cwiki.apache.org/confluence/x/GAoBCw
>>>
>>> Cheers,
>>> Till
>>>
>>


[jira] [Created] (FLINK-23463) Replace the tag with ShortCodes in document

2021-07-21 Thread Yangze Guo (Jira)
Yangze Guo created FLINK-23463:
--

 Summary: Replace the  tag with ShortCodes in document
 Key: FLINK-23463
 URL: https://issues.apache.org/jira/browse/FLINK-23463
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.14.0
Reporter: Yangze Guo
 Fix For: 1.14.0


In FLINK-22922, we migrate Flink website to hugo. At the moment, most of the 
div tag in user doc is no long take effect. We need to replace them with the 
ShortCodes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23462) Translate the abfs documentation to chinese

2021-07-21 Thread Srinivasulu Punuru (Jira)
Srinivasulu Punuru created FLINK-23462:
--

 Summary: Translate the abfs documentation to chinese
 Key: FLINK-23462
 URL: https://issues.apache.org/jira/browse/FLINK-23462
 Project: Flink
  Issue Type: Bug
  Components: chinese-translation, Documentation
Reporter: Srinivasulu Punuru


Translate the documentation changes that were made in this PR to chinese 
https://github.com/apache/flink/pull/16559/ 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks Finished

2021-07-21 Thread Piotr Nowojski
Hi Steven,

> I probably missed sth here. isn't this the case today already? Why is it
a concern for the proposed change?

The problem is with the newly added `finish()` method and the already
existing `endInput()` call. Currently on master there are no issues,
because we are not checkpointing any operators after some operators have
finished. The purpose of this FLIP-147 is to exactly enable this and this
opens a new problem described by Dawid.

To paraphrase and to give a concrete example.  Assume we have an operator
with parallelism of two. Subtask 0 and subtask 1.

1. Subtask 0 received both `endInput()` and `finish()`, but subtask 1
hasn't (yet).
2. Checkpoint 42 is triggered, and it completes.
3. Job fails and is restarted, but at the same time it's rescaled. User has
chosen to scale down his operator down to 1.

Now we have a pickle. We don't know if `notifyCheckpointComplete(42)` has
been processed or not, so while recovering to checkpoint 42, we have to
recover both finished subtask 0 (#1) state and not yet finished subtask 1's
(#1). But at the same time they are scaled down, so we only have a single
subtask 0 (#2) that has a combined state from both of the previous
instances. The potentially confusing issue is that the state from subtask 0
(#1) was checkpointed AFTER `endInput()` and `finish()` calls, but it's
recovered to an operator that has still some records to process. In step 1.
an user for example could store on the operator's state a bit of
information "end input has been already called!", that after recovery would
no longer be true.

Hence the question about `finish()` and `endInput()` semantics. Should it
be tied down to state, or just to an operator instance/execution attempt?

Piotrek

śr., 21 lip 2021 o 19:00 Steven Wu  napisał(a):

> > if a failure happens after sequence of finish() -> snapshotState(), but
> before notifyCheckpointComplete(), we will restore such a state and we
> might end up sending some more records to such an operator.
>
> I probably missed sth here. isn't this the case today already? Why is it a
> concern for the proposed change?
>
> On Wed, Jul 21, 2021 at 4:39 AM Piotr Nowojski 
> wrote:
>
> > Hi Dawid,
> >
> > Thanks for writing down those concerns.
> >
> > I think the first issue boils down what should be the contract of
> lifecycle
> > methods like open(), close(), initializeState() etc and especially the
> new
> > additions like finish() and endInput(). And what should be their relation
> > with the operator state (regardless of it's type keyed, non-keyed, union,
> > ...). Should those methods be tied to state or not? After thinking about
> it
> > for a while (and discussing it offline with Dawid), I think the answer
> > might be no, they shouldn't. I mean maybe we should just openly say that
> > all of those methods relate to this single particular instance and
> > execution of the operator. And if a job is recovered/rescaled, we would
> be
> > allowed to freely resume consumption, ignoring a fact that maybe some
> parts
> > of the state have previously seen `endInput()`. Why?
> >
> > 0. Yes, it might be confusing. Especially with `endInput()`. We call
> > `endInput()`, we store something in a state and later after recovery
> > combined with rescaling that state can see more records? Indeed weird,
> > 1. I haven't come up yet with a counterexample that would break and make
> > impossible to implement a real life use case. Theoretically yes, the user
> > can store `endInput()` on state, and after rescaling this state would be
> > inconsistent with what is actually happening with the operator, but I
> > haven't found a use case that would break because of that.
> > 2. Otherwise, implementation would be very difficult.
> > 3. It's difficult to access keyed state from within
> `endInput()`/`finish()`
> > calls, as they do not have key context.
> > 4. After all, openly defining `endInput()` and `finish()` to be tied with
> > it's operator execution instance lifecycle is not that strange and quite
> > simple to explain. Sure, it can lead to a bit of confusion (0.), but that
> > doesn't sound that bad in comparison with the alternatives that I'm aware
> > of. Also currently methods like `open()` and `close()` are also tied to
> the
> > operator execution instance, not to the state. Operators can be opened
> and
> > closed multiple times, it doesn't mean that the state is lost after
> closing
> > an operator.
> >
> > For the UnionListState problem I have posted my proposal in the ticket
> [1],
> > so maybe let's move that particular discussion there?
> >
> > Piotrek
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-21080
> >
> > śr., 21 lip 2021 o 12:39 Dawid Wysakowicz 
> > napisał(a):
> >
> > > Hey all,
> > >
> > > To make the issues that were found transparent to the community, I want
> > to
> > > post an update:
> > >
> > > *1. Committing side-effects*
> > > We do want to make sure that all side effects are committed before
> > > bringing tasks down. Sid

Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks Finished

2021-07-21 Thread Steven Wu
> if a failure happens after sequence of finish() -> snapshotState(), but
before notifyCheckpointComplete(), we will restore such a state and we
might end up sending some more records to such an operator.

I probably missed sth here. isn't this the case today already? Why is it a
concern for the proposed change?

On Wed, Jul 21, 2021 at 4:39 AM Piotr Nowojski  wrote:

> Hi Dawid,
>
> Thanks for writing down those concerns.
>
> I think the first issue boils down what should be the contract of lifecycle
> methods like open(), close(), initializeState() etc and especially the new
> additions like finish() and endInput(). And what should be their relation
> with the operator state (regardless of it's type keyed, non-keyed, union,
> ...). Should those methods be tied to state or not? After thinking about it
> for a while (and discussing it offline with Dawid), I think the answer
> might be no, they shouldn't. I mean maybe we should just openly say that
> all of those methods relate to this single particular instance and
> execution of the operator. And if a job is recovered/rescaled, we would be
> allowed to freely resume consumption, ignoring a fact that maybe some parts
> of the state have previously seen `endInput()`. Why?
>
> 0. Yes, it might be confusing. Especially with `endInput()`. We call
> `endInput()`, we store something in a state and later after recovery
> combined with rescaling that state can see more records? Indeed weird,
> 1. I haven't come up yet with a counterexample that would break and make
> impossible to implement a real life use case. Theoretically yes, the user
> can store `endInput()` on state, and after rescaling this state would be
> inconsistent with what is actually happening with the operator, but I
> haven't found a use case that would break because of that.
> 2. Otherwise, implementation would be very difficult.
> 3. It's difficult to access keyed state from within `endInput()`/`finish()`
> calls, as they do not have key context.
> 4. After all, openly defining `endInput()` and `finish()` to be tied with
> it's operator execution instance lifecycle is not that strange and quite
> simple to explain. Sure, it can lead to a bit of confusion (0.), but that
> doesn't sound that bad in comparison with the alternatives that I'm aware
> of. Also currently methods like `open()` and `close()` are also tied to the
> operator execution instance, not to the state. Operators can be opened and
> closed multiple times, it doesn't mean that the state is lost after closing
> an operator.
>
> For the UnionListState problem I have posted my proposal in the ticket [1],
> so maybe let's move that particular discussion there?
>
> Piotrek
>
> [1] https://issues.apache.org/jira/browse/FLINK-21080
>
> śr., 21 lip 2021 o 12:39 Dawid Wysakowicz 
> napisał(a):
>
> > Hey all,
> >
> > To make the issues that were found transparent to the community, I want
> to
> > post an update:
> >
> > *1. Committing side-effects*
> > We do want to make sure that all side effects are committed before
> > bringing tasks down. Side effects are committed when calling
> > notifyCheckpointComplete. For the final checkpoint we introduced the
> method
> > finish(). This notifies the operator that we have consumed all incoming
> > records and we are preparing to close the Task. In turn we should flush
> any
> > pending buffered records and prepare to commit last transactions. The
> goal
> > is that after a successful sequence of finish() -> snapshotState() ->
> > notifyCheckpointComplete(), the remaining state can be considered
> > empty/finished and may be discarded.
> >
> > *Failure before notifyCheckpointComplete()*
> >
> > The question is what is the contract of the endInput()/finish() methods
> > and how do calling these methods affect the operators keyed, non-keyed
> > state and external state. Is it allowed to restore state snapshot taken
> > after calling endInput()/finish() and process more records? Or do we
> assume
> > that after a restore from such a state taken after finish() we should not
> > call any of the lifecycle methods or at least make sure those methods do
> > not emit records/interact with mailbox etc.
> >
> > Currently it is possible that if a failure happens after sequence of
> > finish() -> snapshotState(), but before notifyCheckpointComplete(), we
> will
> > restore such a state and we might end up sending some more records to
> such
> > an operator. It is possible if we rescale and this state is merged with a
> > state of a subtask that has not called finish() yet. It can also happen
> if
> > we rescale the upstream operator and the subtask of interest becomes
> > connected to a newly added non finished subtask.
> >
> > *Snapshotting StreamTasks that finish() has been called*
> >
> >
> > We thought about putting a flag into the snapshot of a subtask produced
> > after the finish() method. This would make it possible to skip execution
> of
> > certain lifecycle methods. Unfortunately this creates problem

[jira] [Created] (FLINK-23461) Consider disallowing in-memory state handles for materialized state

2021-07-21 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-23461:
-

 Summary: Consider disallowing in-memory state handles for 
materialized state
 Key: FLINK-23461
 URL: https://issues.apache.org/jira/browse/FLINK-23461
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / State Backends
Reporter: Roman Khachatryan
 Fix For: 1.14.0


*For non-mateialized part*, FLINK-21353 doesn't use nor 
FsCheckpointStreamFactory neither PlaceholderStreamStateHandle so it's not an 
issue. Adding it in the future doesn't make sense as for such small changes 
incremental checkpoint might work better.

*For materialized part, ByteStreamStateHandle* can be currently used. This can 
bring back to life issues like FLINK-21351 - if checkpoint subsumption on TM 
*will* be decoupled from the state backends state. Removing those assumptions 
is one of the goals of changing the ownership.
 An easy way to solve it is to just enforce zero threshold for writing to DFS 
instead of memory.

 

*PlaceholderStreamStateHandle* can be used for the materialized state 
(regardless of ByteStreamStateHandle; unless SnapshotStrategy is changed). 
However, it shouldn't cause any issues:
 - if the file is shared (i.e. after recovery) then by definition it should be 
managed by JM
 - otherwise, JM should still replace placeholders (FLINK-23137); and it should 
have received the original state objects before; no re-upload should happen 
(FLINK-23344) - so JM and TM will always refer to the same file



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23460) Add a global flag for enabling/disabling final checkpoints

2021-07-21 Thread Dawid Wysakowicz (Jira)
Dawid Wysakowicz created FLINK-23460:


 Summary: Add a global flag for enabling/disabling final checkpoints
 Key: FLINK-23460
 URL: https://issues.apache.org/jira/browse/FLINK-23460
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Checkpointing
Reporter: Dawid Wysakowicz
Assignee: Dawid Wysakowicz


We should have a feature toggle for the final checkpoint story.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23459) New metrics for dynamic buffer size

2021-07-21 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-23459:
-

 Summary: New metrics for dynamic buffer size
 Key: FLINK-23459
 URL: https://issues.apache.org/jira/browse/FLINK-23459
 Project: Flink
  Issue Type: Sub-task
Reporter: Anton Kalashnikov


The following metrics can be added:
 * The total size of buffered in-flight data (both detailed per gate + sum per 
subtask)

 * Total estimated time to process the data (both detailed per gate + max per 
subtask)

 * Actual value of the calculated dynamic buffer size (only detailed per gate)

All of those metrics should be exposed only on the input side, taking into 
account only input in-flight data.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23458) Document required number of buffers in the network stack

2021-07-21 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-23458:
-

 Summary: Document required number of buffers in the network stack
 Key: FLINK-23458
 URL: https://issues.apache.org/jira/browse/FLINK-23458
 Project: Flink
  Issue Type: Sub-task
Reporter: Anton Kalashnikov


Currently it seems that it's not mentioned anywhere what's the number of 
required number of buffers. And that this number is different than the 
configured number of exclusive/floating buffers. It should be mentioned in that 
both {{taskmanager.network.memory.floating-buffers-per-gate}} and 
{{taskmanager.network.memory.buffers-per-channel}} are best effort and not 
guaranteed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23457) Sending the buffer of the right size for broadcast

2021-07-21 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-23457:
-

 Summary: Sending the buffer of the right size for broadcast
 Key: FLINK-23457
 URL: https://issues.apache.org/jira/browse/FLINK-23457
 Project: Flink
  Issue Type: Sub-task
Reporter: Anton Kalashnikov


It is not enough to know just the number of available buffers (credits) for the 
downstream because the size of these buffers can be different. So we are 
proposing to resolve this problem in the following way: If the downstream 
buffer size is changed then the upstream should send the buffer of the size not 
greater than the new one regardless of how big the current buffer on the 
upstream. (pollBuffer should receive parameters like bufferSize and return 
buffer not greater than it)

Downstream will be able to support any buffer size < max buffer size, so it 
should be just good enough to request BufferBuilder with new size after getting 
announcement, and leaving existing BufferBuilder/BufferConsumers unchanged. In 
other words code in {{PipelinedSubpartition(View)}} doesn’t need to be changed 
(apart of forwarding new buffer size to the {{BufferWritingResultPartition}}). 
All buffer size adjustments can be implemented exclusively in 
{{BufferWritingResultPartition}}.

If different downstream subtasks have different throughput and hence different 
desired buffer sizes, then a single upstream subtask has to support having two 
different subpartitions with different buffer sizes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23456) Manually test on cluster

2021-07-21 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-23456:
-

 Summary: Manually test on cluster
 Key: FLINK-23456
 URL: https://issues.apache.org/jira/browse/FLINK-23456
 Project: Flink
  Issue Type: Sub-task
Reporter: Anton Kalashnikov


Test different jobs:
 * a job with static throughput *(must work for successful MVP)*

 * a job with gracefully varying load (sinus shape load?) *(should work for 
successful MVP)*

 * a job with erratic load (emulating evil window operator firing) *(would be 
nice to work for MVP)*

Test on cluster all jobs and check if/how buffer size adjustment is not erratic.

Verify our final goals:
 * that aligned checkpoint time in back pressured jobs is reduced to sane 
values (roughly {{numberOfExchanges * configuredBufferedDataTime * 2}})

 * check if our changes affected the throughput or not.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23455) Remove the usage of the yaml file in SQLJobSubmission

2021-07-21 Thread Shengkai Fang (Jira)
Shengkai Fang created FLINK-23455:
-

 Summary: Remove the usage of the yaml file in SQLJobSubmission
 Key: FLINK-23455
 URL: https://issues.apache.org/jira/browse/FLINK-23455
 Project: Flink
  Issue Type: Improvement
  Components: Tests
Affects Versions: 1.14.0
Reporter: Shengkai Fang
 Fix For: 1.14.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23454) Sending the buffer of the right size for unicast

2021-07-21 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-23454:
-

 Summary: Sending the buffer of the right size for unicast
 Key: FLINK-23454
 URL: https://issues.apache.org/jira/browse/FLINK-23454
 Project: Flink
  Issue Type: Sub-task
Reporter: Anton Kalashnikov


It is not enough to know just the number of available buffers (credits) for the 
downstream because the size of these buffers can be different. So we are 
proposing to resolve this problem in the following way: If the downstream 
buffer size is changed then the upstream should send the buffer of the size not 
greater than the new one regardless of how big the current buffer on the 
upstream. (pollBuffer should receive parameters like bufferSize and return 
buffer not greater than it)

Downstream will be able to support any buffer size < max buffer size, so it 
should be just good enough to request BufferBuilder with new size after getting 
announcement, and leaving existing BufferBuilder/BufferConsumers unchanged. In 
other words code in {{PipelinedSubpartition(View)}} doesn’t need to be changed 
(apart of forwarding new buffer size to the {{BufferWritingResultPartition}}). 
All buffer size adjustments can be implemented exclusively in 
{{BufferWritingResultPartition}}.

If different downstream subtasks have different throughput and hence different 
desired buffer sizes, then a single upstream subtask has to support having two 
different subpartitions with different buffer sizes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23453) Dynamic calculation of the buffer size

2021-07-21 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-23453:
-

 Summary: Dynamic calculation of the buffer size
 Key: FLINK-23453
 URL: https://issues.apache.org/jira/browse/FLINK-23453
 Project: Flink
  Issue Type: Sub-task
Reporter: Anton Kalashnikov


To calculate the desired buffer size we need to take into account the 
throughput, configuration(timeInInputBuffer), and the actual number of buffers 
in use. It makes sense to use EMA for this calculation to smoothen out 
intermittent spikes.

The calculation based on the actual number of buffers in use helps to avoid 
problems with the data skew (when only a couple of channels out of thousands 
have any data). So the solution needs to reliably and efficiently calculate 
either the estimated or an average number of buffers in use. 

Buffer size can be erratic if it’s not trivial to make it stable in the MVP.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23452) Measuring subtask throughput

2021-07-21 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-23452:
-

 Summary: Measuring subtask throughput
 Key: FLINK-23452
 URL: https://issues.apache.org/jira/browse/FLINK-23452
 Project: Flink
  Issue Type: Sub-task
Reporter: Anton Kalashnikov
Assignee: Anton Kalashnikov


In the first implementation, throughput could be measured for the whole 
subtask. The throughput calculation should take into account the numbers of 
bytes that were handled, the backpressure time and ignore the idle time. The 
main idea is to keep the balance between idle and backpressure time, so if the 
backpressure time is high we should decrease the buffer size to provide the 
configured handling time and vice versa if the subtask is idle time that period 
should be ignored from calculating the throughput. Otherwise, in the case of 
network bottleneck, we might have ended up with a small buffer size that’s 
causing the bottleneck in the first place but we are not able to increase it 
due to idle time reducing throughput and lowering the buffer size.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23451) FLIP-183: Dynamic buffer size adjustment

2021-07-21 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-23451:
-

 Summary: FLIP-183: Dynamic buffer size adjustment
 Key: FLINK-23451
 URL: https://issues.apache.org/jira/browse/FLINK-23451
 Project: Flink
  Issue Type: New Feature
Reporter: Anton Kalashnikov


Umbrella ticket for 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-183%3A+Dynamic+buffer+size+adjustment



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23450) Properties map is not set in DebeziumAvroFormatFactory

2021-07-21 Thread Timo Walther (Jira)
Timo Walther created FLINK-23450:


 Summary: Properties map is not set in DebeziumAvroFormatFactory
 Key: FLINK-23450
 URL: https://issues.apache.org/jira/browse/FLINK-23450
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Ecosystem
Reporter: Timo Walther
Assignee: Timo Walther


FLINK-21229 did not set the properties map correctly in 
DebeziumAvroFormatFactory.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[VOTE] FLIP-182: Support watermark alignment of FLIP-27 Sources

2021-07-21 Thread Piotr Nowojski
Hi everyone,

I would like to start a vote on the FLIP-182 [1] which was discussed in this
thread [2].
The vote will be open for at least 72 hours unless there is an objection
or not enough votes.

Best,
Piotrek


[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources
[2]
https://mail-archives.apache.org/mod_mbox/flink-dev/202107.mbox/%3CCABpD1RUHJURJg7Vkbq4Tjz2yecd8Wv8kJiT46M11F-ODSakgcw%40mail.gmail.com%3E


Re: [DISCUSS] FLIP-182: Watermark alignment

2021-07-21 Thread Piotr Nowojski
Hi,

>  I would not fully advertise this before we have the second part
implemented as well.

I'm not sure, maybe we could advertise with a big warning about this
limitation. I mean it's not as if this change would break something. At
worst it just wouldn't fully solve the problem with multiple splits per
single operator, but only limit the scope of that problem. At the same time
I don't have strong feelings about this. If the consensus would be to not
advertise it, I'm also fine with it. Only in that case we should probably
quickly follow up with the per split solution.

Anyway, thanks for voicing your support and the discussions. I'm going to
start a voting thread for this feature soon.

Best,
Piotrek

wt., 13 lip 2021 o 19:09 Stephan Ewen  napisał(a):

> @Eron Wright   The per-split watermarks are the
> default in the new source interface (FLIP-27) and come for free if you use
> the SplitReader.
>
> Based on that, it is also possible to unsubscribe individual splits to
> solve the alignment in the case where operators have multiple splits
> assigned.
> Piotr and I already discussed that, but concluded that the implementation
> of that is largely orthogonal.
>
> I am a bit worried, though, that if we release and advertise the alignment
> without handling this case, we create a surprise for quite a few users.
> While this is admittedly valuable for some users, I think we need to
> position this accordingly. I would not fully advertise this before we have
> the second part implemented as well.
>
>
>
> On Mon, Jul 12, 2021 at 7:18 PM Eron Wright  .invalid>
> wrote:
>
> > The notion of per-split watermarks seems quite interesting.  I think the
> > idleness feature could benefit from a per-split approach too, because
> > idleness is typically related to whether any splits are assigned to a
> given
> > operator instance.
> >
> >
> > On Mon, Jul 12, 2021 at 3:06 AM 刘建刚  wrote:
> >
> > > +1 for the source watermark alignment.
> > > In the previous flink version, the source connectors are different in
> > > implementation and it is hard to make this feature. When the consumed
> > data
> > > is not aligned or consuming history data, it is very easy to cause the
> > > unalignment. Source alignment can resolve many unstable problems.
> > >
> > > Seth Wiesman  于2021年7月9日周五 下午11:25写道:
> > >
> > > > +1
> > > >
> > > > In my opinion, this limitation is perfectly fine for the MVP.
> Watermark
> > > > alignment is a long-standing issue and this already moves the ball so
> > far
> > > > forward.
> > > >
> > > > I don't expect this will cause many issues in practice, as I
> understand
> > > it
> > > > the FileSource always processes one split at a time, and in my
> > > experience,
> > > > 90% of Kafka users have a small number of partitions scale their
> > > pipelines
> > > > to have one reader per partition. Obviously, there are larger-scale
> > Kafka
> > > > topics and more sources that will be ported over in the future but I
> > > think
> > > > there is an implicit understanding that aligning sources adds latency
> > to
> > > > pipelines, and we can frame the follow-up "per-split" alignment as an
> > > > optimization.
> > > >
> > > > On Fri, Jul 9, 2021 at 6:40 AM Piotr Nowojski <
> > piotr.nowoj...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hey!
> > > > >
> > > > > A couple of weeks ago me and Arvid Heise played around with an idea
> > to
> > > > > address a long standing issue of Flink: lack of watermark/event
> time
> > > > > alignment between different parallel instances of sources, that can
> > > lead
> > > > to
> > > > > ever growing state size for downstream operators like
> WindowOperator.
> > > > >
> > > > > We had an impression that this is relatively low hanging fruit that
> > can
> > > > be
> > > > > quite easily implemented - at least partially (the first part
> > mentioned
> > > > in
> > > > > the FLIP document). I have written down our proposal [1] and you
> can
> > > also
> > > > > check out our PoC that we have implemented [2].
> > > > >
> > > > > We think that this is a quite easy proposal, that has been in large
> > > part
> > > > > already implemented. There is one obvious limitation of our PoC.
> > Namely
> > > > we
> > > > > can only easily block individual SourceOperators. This works
> > perfectly
> > > > fine
> > > > > as long as there is at most one split per SourceOperator. However
> it
> > > > > doesn't work with multiple splits. In that case, if a single
> > > > > `SourceOperator` is responsible for processing both the least and
> the
> > > > most
> > > > > advanced splits, we won't be able to block this most advanced split
> > for
> > > > > generating new records. I'm proposing to solve this problem in the
> > > future
> > > > > in another follow up FLIP, as a solution that works with a single
> > split
> > > > per
> > > > > operator is easier and already valuable for some of the users.
> > > > >
> > > > > What do you think about this proposal?
> > > > > Best, Piotrek
> > > > >
> > > > > [1]

Re: [VOTE] FLIP-183: Dynamic buffer size adjustment

2021-07-21 Thread Yuan Mei
+1 (binding)

Best
Yuan

On Wed, Jul 21, 2021 at 7:40 PM Piotr Nowojski  wrote:

> +1 (binding)
>
> Piotrek
>
> śr., 21 lip 2021 o 13:21 Anton Kalashnikov 
> napisał(a):
>
> > Hi everyone,
> >
> > I would like to start a vote on FLIP-183 [1] which was discussed in this
> > thread [2].
> > The vote will be open for at least 72 hours unless there is an objection
> > or not enough votes.
> >
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-183%3A+Dynamic+buffer+size+adjustment
> > [2]
> >
> >
> https://lists.apache.org/thread.html/r0d06131b35fe641df787c16e8bcd3784161f901062c25778ed92871b%40%3Cdev.flink.apache.org%3E
> > --
> > Best regards,
> > Anton Kalashnikov
> >
>


Re: [VOTE] FLIP-183: Dynamic buffer size adjustment

2021-07-21 Thread Piotr Nowojski
+1 (binding)

Piotrek

śr., 21 lip 2021 o 13:21 Anton Kalashnikov  napisał(a):

> Hi everyone,
>
> I would like to start a vote on FLIP-183 [1] which was discussed in this
> thread [2].
> The vote will be open for at least 72 hours unless there is an objection
> or not enough votes.
>
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-183%3A+Dynamic+buffer+size+adjustment
> [2]
>
> https://lists.apache.org/thread.html/r0d06131b35fe641df787c16e8bcd3784161f901062c25778ed92871b%40%3Cdev.flink.apache.org%3E
> --
> Best regards,
> Anton Kalashnikov
>


Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks Finished

2021-07-21 Thread Piotr Nowojski
Hi Dawid,

Thanks for writing down those concerns.

I think the first issue boils down what should be the contract of lifecycle
methods like open(), close(), initializeState() etc and especially the new
additions like finish() and endInput(). And what should be their relation
with the operator state (regardless of it's type keyed, non-keyed, union,
...). Should those methods be tied to state or not? After thinking about it
for a while (and discussing it offline with Dawid), I think the answer
might be no, they shouldn't. I mean maybe we should just openly say that
all of those methods relate to this single particular instance and
execution of the operator. And if a job is recovered/rescaled, we would be
allowed to freely resume consumption, ignoring a fact that maybe some parts
of the state have previously seen `endInput()`. Why?

0. Yes, it might be confusing. Especially with `endInput()`. We call
`endInput()`, we store something in a state and later after recovery
combined with rescaling that state can see more records? Indeed weird,
1. I haven't come up yet with a counterexample that would break and make
impossible to implement a real life use case. Theoretically yes, the user
can store `endInput()` on state, and after rescaling this state would be
inconsistent with what is actually happening with the operator, but I
haven't found a use case that would break because of that.
2. Otherwise, implementation would be very difficult.
3. It's difficult to access keyed state from within `endInput()`/`finish()`
calls, as they do not have key context.
4. After all, openly defining `endInput()` and `finish()` to be tied with
it's operator execution instance lifecycle is not that strange and quite
simple to explain. Sure, it can lead to a bit of confusion (0.), but that
doesn't sound that bad in comparison with the alternatives that I'm aware
of. Also currently methods like `open()` and `close()` are also tied to the
operator execution instance, not to the state. Operators can be opened and
closed multiple times, it doesn't mean that the state is lost after closing
an operator.

For the UnionListState problem I have posted my proposal in the ticket [1],
so maybe let's move that particular discussion there?

Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-21080

śr., 21 lip 2021 o 12:39 Dawid Wysakowicz 
napisał(a):

> Hey all,
>
> To make the issues that were found transparent to the community, I want to
> post an update:
>
> *1. Committing side-effects*
> We do want to make sure that all side effects are committed before
> bringing tasks down. Side effects are committed when calling
> notifyCheckpointComplete. For the final checkpoint we introduced the method
> finish(). This notifies the operator that we have consumed all incoming
> records and we are preparing to close the Task. In turn we should flush any
> pending buffered records and prepare to commit last transactions. The goal
> is that after a successful sequence of finish() -> snapshotState() ->
> notifyCheckpointComplete(), the remaining state can be considered
> empty/finished and may be discarded.
>
> *Failure before notifyCheckpointComplete()*
>
> The question is what is the contract of the endInput()/finish() methods
> and how do calling these methods affect the operators keyed, non-keyed
> state and external state. Is it allowed to restore state snapshot taken
> after calling endInput()/finish() and process more records? Or do we assume
> that after a restore from such a state taken after finish() we should not
> call any of the lifecycle methods or at least make sure those methods do
> not emit records/interact with mailbox etc.
>
> Currently it is possible that if a failure happens after sequence of
> finish() -> snapshotState(), but before notifyCheckpointComplete(), we will
> restore such a state and we might end up sending some more records to such
> an operator. It is possible if we rescale and this state is merged with a
> state of a subtask that has not called finish() yet. It can also happen if
> we rescale the upstream operator and the subtask of interest becomes
> connected to a newly added non finished subtask.
>
> *Snapshotting StreamTasks that finish() has been called*
>
>
> We thought about putting a flag into the snapshot of a subtask produced
> after the finish() method. This would make it possible to skip execution of
> certain lifecycle methods. Unfortunately this creates problems for
> rescaling. How do we deal with a situation that subtask states with both
> the feature flag set and unset end up in a single StreamTask. Additional
> problem is that we merge those states into a single OperatorSubtaskState on
> CheckpointCoordinator.
>
> *Finishing upon receiving notifyCheckpointComplete() of not the latest
> checkpoint*
>
> We need to wait for a checkpoint to complete, that started after the
> finish() method. However, we support concurrent checkpoints therefore,
> there might be later checkpoints that completed, but the 

Re: [DISCUSS] FLIP-179: Expose Standardized Operator Metrics

2021-07-21 Thread Becket Qin
Hey Chesnay,

I think I got what that method was designed for now. Basically the
motivation is to let the SourceOutput to report the eventTimeFetchLag for
users. At this point, the SourceOutput only has the EventTime, so this
method provides a way for the users to pass the FetchTime to the
SourceOutput. This is essentially a context associated with each record
emitted to the SourceOutput.

It might be slightly better if we let the method accept a Supplier in this
case. However, it seems to introduce a parallel channel or a sidepath
between the user implementation and SourceOutput. I am not sure if this is
the right way to go. Would it be more intuitive if we just add a new method
to the SourceOutput, to allow the FetchTime to be passed in explicitly?
This would work well with the change I suggested above, which adds a
generic metadata type  to the RecordsWithSplits and passes that to the
RecordEmitter.emitRecord() as an argument.

What do you think?

Thanks,

Jiangjie (Becket) Qin

On Tue, Jul 20, 2021 at 2:50 PM Chesnay Schepler  wrote:

> Would it be easier to understand if the method would accept a Supplier
> instead?
>
> On 20/07/2021 05:36, Becket Qin wrote:
> > In that case, do we still need the metric here? It seems we are creating
> a
> > "global variable" which users may potentially use. I am wondering how
> much
> > additional convenience it provides because it seems easy for people to
> > simply pass the fetch time by themselves if they have decided to not use
> > SourceReaderBase. Also, it looks like we do not have an API pattern that
> > lets users get the value of a metric and derive another metric. So I
> think
> > it is easier for people to understand if LastFetchTimeGauge() is just an
> > independent metric by itself, instead of being a part of the
> > eventTimeFetchLag computation.
>
>
>


[VOTE] FLIP-183: Dynamic buffer size adjustment

2021-07-21 Thread Anton Kalashnikov

Hi everyone,

I would like to start a vote on FLIP-183 [1] which was discussed in this 
thread [2].
The vote will be open for at least 72 hours unless there is an objection 
or not enough votes.



[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-183%3A+Dynamic+buffer+size+adjustment
[2] 
https://lists.apache.org/thread.html/r0d06131b35fe641df787c16e8bcd3784161f901062c25778ed92871b%40%3Cdev.flink.apache.org%3E

--
Best regards,
Anton Kalashnikov


Re: [DISCUSS] FLIP-183: Dynamic buffer size adjustment

2021-07-21 Thread Anton Kalashnikov
Thanks everyone for sharing your opinion. I updated the FLIP according 
to discussion and I'm going to start the vote on this FLIP


--
Best regards,
Anton Kalashnikov

16.07.2021 09:23, Till Rohrmann пишет:

I think this is a good idea. +1 for this approach. Are you gonna update the
FLIP accordingly?

Cheers,
Till

On Thu, Jul 15, 2021 at 9:33 PM Steven Wu  wrote:


I really like the new idea.

On Thu, Jul 15, 2021 at 11:51 AM Piotr Nowojski 
wrote:


Hi Till,


  I assume that buffer sizes are only
changed for newly assigned buffers/credits, right? Otherwise, the data
could already be on the wire and then it wouldn't fit on the receiver

side.

Or do we have a back channel mechanism to tell the sender that a part

of

a

buffer needs to be resent once more capacity is available?

Initially our implementation proposal was intending to implement the

first

option. Buffer size would be attached to a credit message, so first
received would need to allocate a buffer with the updated size, send the
credit upstream, and sender would be allowed to only send as much data as
in the credit. So there would be no way and no problem with changing

buffer

sizes while something is "on the wire".

However Anton suggested an even simpler idea to me today. There is

actually

no problem with receivers supporting all buffer sizes up to the maximum
allowed size (current configured memory segment size). Thus new buffer

size

can be treated as a recommendation by the sender. We can announce a new
buffer size, and the sender will start capping the newly requested buffer
to that size, but we can still send already filled buffers in chunks with
any size, as long as it's below max memory segment size. In this way we

can

leave any already filled in buffers on the sender side untouched and we

do

not need to partition/slice them before sending them down, making at

least

the initial version even simpler. This way we also do not need to
differentiate that different credits have different sizes. We just

announce

a single value "recommended/requested buffer size".

Piotrek

czw., 15 lip 2021 o 17:27 Till Rohrmann 

napisał(a):

Hi everyone,

Thanks a lot for creating this FLIP Anton and Piotr. I think it looks

like

a very promising solution for speeding up our checkpoints and being

able

to

create them more reliably.

Following up on Steven's question: I assume that buffer sizes are only
changed for newly assigned buffers/credits, right? Otherwise, the data
could already be on the wire and then it wouldn't fit on the receiver

side.

Or do we have a back channel mechanism to tell the sender that a part

of

a

buffer needs to be resent once more capacity is available?

Cheers,
Till

On Wed, Jul 14, 2021 at 11:16 AM Piotr Nowojski 
wrote:


Hi Steven,

As downstream/upstream nodes are decoupled, if downstream nodes

adjust

first it's buffer size first, there will be a lag until this updated

buffer

size information reaches the upstream node.. It is a problem, but it

has

a

quite simple solution that we described in the FLIP document:


Sending the buffer of the right size.
It is not enough to know just the number of available buffers

(credits)

for the downstream because the size of these buffers can be

different.

So we are proposing to resolve this problem in the following way:

If

the

downstream buffer size is changed then the upstream should send

the buffer of the size not greater than the new one regardless of

how

big

the current buffer on the upstream. (pollBuffer should receive

parameters like bufferSize and return buffer not greater than it)

So apart from adding buffer size information to the `AddCredit`

message,

we

will need to support a case where upstream subpartition has already
produced a buffer with older size (for example 32KB), while the next

credit

arrives with an allowance for a smaller size (16KB). In that case, we

are

only allowed to send a portion of the data from this buffer that fits

into

the new updated buffer size, and keep announcing the remaining part

as

available backlog.

Best,
Piotrek


śr., 14 lip 2021 o 08:33 Steven Wu 

napisał(a):

- The subtask observes the changes in the throughput and changes

the

buffer size during the whole life period of the task.
- The subtask sends buffer size and number of available buffers

to

the

upstream to the corresponding subpartition.
- Upstream changes the buffer size corresponding to the received
information.
- Upstream sends the data and number of filled buffers to the

downstream


Will the above steps of buffer size adjustment cause problems with
credit-based flow control (mainly for downsizing), since downstream
adjust down first?

Here is the quote from the blog[1]
"Credit-based flow control makes sure that whatever is “on the

wire”

will

have capacity at the receiver to handle. "


[1]



https://flink.apache.org/2019/06/05/flink-network-stack.html#credit-based-flow-control


On Tue, Jul 13, 2

Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks Finished

2021-07-21 Thread Dawid Wysakowicz
Hey all,

To make the issues that were found transparent to the community, I want
to post an update:

*1. Committing side-effects*
We do want to make sure that all side effects are committed before
bringing tasks down. Side effects are committed when calling
notifyCheckpointComplete. For the final checkpoint we introduced the
method finish(). This notifies the operator that we have consumed all
incoming records and we are preparing to close the Task. In turn we
should flush any pending buffered records and prepare to commit last
transactions. The goal is that after a successful sequence of finish()
-> snapshotState() -> notifyCheckpointComplete(), the remaining state
can be considered empty/finished and may be discarded.

_/Failure before notifyCheckpointComplete()/_

The question is what is the contract of the endInput()/finish() methods
and how do calling these methods affect the operators keyed, non-keyed
state and external state. Is it allowed to restore state snapshot taken
after calling endInput()/finish() and process more records? Or do we
assume that after a restore from such a state taken after finish() we
should not call any of the lifecycle methods or at least make sure those
methods do not emit records/interact with mailbox etc.

Currently it is possible that if a failure happens after sequence of
finish() -> snapshotState(), but before notifyCheckpointComplete(), we
will restore such a state and we might end up sending some more records
to such an operator. It is possible if we rescale and this state is
merged with a state of a subtask that has not called finish() yet. It
can also happen if we rescale the upstream operator and the subtask of
interest becomes connected to a newly added non finished subtask.

/Snapshotting StreamTasks that finish() has been called/


We thought about putting a flag into the snapshot of a subtask produced
after the finish() method. This would make it possible to skip execution
of certain lifecycle methods. Unfortunately this creates problems for
rescaling. How do we deal with a situation that subtask states with both
the feature flag set and unset end up in a single StreamTask. Additional
problem is that we merge those states into a single OperatorSubtaskState
on CheckpointCoordinator.

_/Finishing upon receiving notifyCheckpointComplete() of not the latest
checkpoint/_

We need to wait for a checkpoint to complete, that started after the
finish() method. However, we support concurrent checkpoints therefore,
there might be later checkpoints that completed, but the notification
has not arrived. We must make sure those checkpoints do not leave
lingering external resources.

*Checkpointing from a single subtask / UnionListState case*
There are operators that checkpoint from a single subtask only. Usually
from the subtask index=0. If we let those subtasks finish, subsequent
checkpoints will miss this information.
Esp. Legacy sources problem:
https://issues.apache.org/jira/browse/FLINK-21080

Best,

Dawid

On 19/07/2021 15:10, Yun Gao wrote:
> Hi Till, Dawid,
>
> Very thanks for the comments and discussion! Glad that it seems we
> have come to a convergence, and I also agree with that we could not
> include the optimization in the first version. 
>
> Best
> Yun
>
> --
> From:Dawid Wysakowicz 
> Send Time:2021 Jul. 19 (Mon.) 17:51
> To:dev ; Till Rohrmann 
> Cc:Yun Gao ; Yun Gao
> 
> Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After
> Tasks Finished
>
> Small correction. I meant we need to adjust the EndOfInputEvent of course.
>
> Best,
>
> Dawid
>
> On 19/07/2021 11:48, Dawid Wysakowicz wrote:
> > Hey Till,
> >
> > Yes, you're right we will have to adjust the current state of
> > EndOfPartitionEvent and move the moment when we emit it to have what
> > we're discussing here. We are aware of that.
> >
> > As for the MAX_WATERMARK vs finish(). My take is that we should always
> > emit MAX_WATERMARK before calling finish() on an operator. At the same
> > time finish() should not leave behind anything in state, as the
> > intention is that we never restore from the taken savepoint/checkpoint
> > (savepoint w drain or bounded data consumed).
> >
> > Best,
> >
> > Dawid
> >
> > On 19/07/2021 11:33, Till Rohrmann wrote:
> >> Hi Yun and Dawid,
> >>
> >> Thanks for your comments. I do agree with your comments that finish() 
> can
> >> do more than MAX_WATERMARK. I guess we should then explain how
> >> MAX_WATERMARK and finish() play together and what kind of
> >> order guarantees we provide.
> >>
> >> Concerning the EndOfPartitionEvent, I am not entirely sure whether it 
> would
> >> work in its current state because we send this event when the Task is 
> about
> >> to shut down if I am not mistaken. What we want to have is to bring the
> >> Stre

[jira] [Created] (FLINK-23449) YarnTaskExecutorRunner does not contains MapReduce classes

2021-07-21 Thread Kai Chen (Jira)
Kai Chen created FLINK-23449:


 Summary: YarnTaskExecutorRunner does  not contains MapReduce 
classes
 Key: FLINK-23449
 URL: https://issues.apache.org/jira/browse/FLINK-23449
 Project: Flink
  Issue Type: Bug
  Components: Deployment / YARN
Affects Versions: 1.11.3
 Environment: flink-1.11

flink on yarn cluster

jdk1.8

hive1.2.1

hadoop2.7

hadoop classes is provided with  {{export HADOOP_CLASSPATH=`hadoop classpath` 
when submitting test APP. (described in 
[https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/hadoop.html]
 )}}

{{}}
Reporter: Kai Chen


I followed instructions described in 
[https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive] 
and tested hive streaming sink, met this exception  

      Caused by: java.lang.ClassNotFoundException: 
org.apache.hadoop.mapred.JobConf

[http://apache-flink.147419.n8.nabble.com/Flink-td7866.html] met the same 
problem.

 

I checked TM jvm envs and the code and found that flink only set up 
YARN_APPLICATION_CLASSPATH, but without MAPREDUCE_APPLICATION_CLASSPATH.

See: 
[https://github.com/apache/flink/blob/ed39fb2efc790af038c1babd4a48847b7b39f91e/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java#L119]

 

I think we should add MAPREDUCE_APPLICATION_CLASSPATH as well, as the same as 
spark does.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Flink 1.14. Bi-weekly 2021-07-20

2021-07-21 Thread Till Rohrmann
Thanks for the update, Joe, Xintong and Dawid. This is very helpful.

Cheers,
Till

On Wed, Jul 21, 2021 at 9:27 AM Johannes Moser  wrote:

> Dear Flink Community,
>
> here's the latest update from the bi-weekly.
> Short summary: still a lot of stuff is going on, the feature freeze has
> been
> pushed by two weeks to the >>>16th of August<<< and we are working
> on the release communication. Good stuff: the test and blockers are
> looking very good.
>
> Get all the updates on the release page. [1]
>
> *Feature freeze:*
> Initially because of issues with FLIP-147 we have been considering
> moving the feature freeze, when we did a general update it became
> obvious that a lot of teams/efforts would benefit from more time.
> So we did what has to be done and moved it by two weeks.
>
> *On-going efforts:*
> What also became obvious, the pure list of links to Jira Issues
> doesn't provide a clear overview on the progress of the efforts.
> Often the linked issues have a subset of other issues, that also
> went into other releases. Some of the subtasks are optional.
> We, as a community, need to improve that going forward. For now
> I introduced a traffic light style state for each row in the
> feature list as a quick fix. I will make sure this will be
> updated before the next bi-weekly.
> The update has shown that ~20 of 36 planned features have a
> positive state. 6 are rather looking bad.
>
> *Test instabilities:*
> The tests became more stable recently, which is a really, really good
> thing.
> Thanks for all the efforts that went into this.
>
> *Highlight features:*
> We are working on the release communication in general. So we are
> collecting
> the highlights of this release and did a quick brainstorming which
> brought us to this list: ML, Table API, Log based checkpoints...
> If there's something worth mentioning please add it on the
> release page.
>
> What can you do to make 1.14. a good release:
> * Get an realistic state of the ongoing efforts and update the page
> * Start testing the "done" features
> * Think of the things that go beyond the implementation: documentation and
>   release/feature evangelism.
>
> Sincerely, your release managers
> Xintong, Dawid & Joe
>
> -
>
> [1] https://cwiki.apache.org/confluence/display/FLINK/1.14+Release
>


Re: [DISCUSS] Releasing Flink 1.11.4

2021-07-21 Thread Till Rohrmann
Great, thanks Godfrey.

Cheers,
Till

On Wed, Jul 21, 2021 at 7:42 AM godfrey he  wrote:

> Hi Till,
>
> Sorry for the late reply. The previous period, I focused on another urgent
> work,
> and suspended the releasing work. I've recently restarted it.
>
> Best,
> Godfrey
>
> Till Rohrmann  于2021年7月13日周二 下午8:36写道:
>
> > Hi Godfrey,
> >
> > Are you continuing with the 1.11.4 release process?
> >
> > Cheers,
> > Till
> >
> > On Tue, Jul 6, 2021 at 1:15 PM Chesnay Schepler 
> > wrote:
> >
> > > Since 1.11.4 is about releasing the commits we already have merged
> > > between 1.11.3 and 1.13.0, I would suggest to not add additional fixes.
> > >
> > > On 06/07/2021 12:47, Matthias Pohl wrote:
> > > > Hi Godfrey,
> > > > Thanks for volunteering to be the release manager for 1.11.4.
> > FLINK-21445
> > > > [1] has a backport PR for 1.11.4 [2] prepared. I wouldn't label it
> as a
> > > > blocker but it would be nice to have it included in 1.11.4
> considering
> > > that
> > > > it's quite unlikely to have another 1.11.5 release. Right now,
> AzureCI
> > is
> > > > running as a final step. I'm CC'ing Chesnay because he would be in
> > charge
> > > > of merging the PR.
> > > >
> > > > Matthias
> > > >
> > > > [1] https://issues.apache.org/jira/browse/FLINK-21445
> > > > [2] https://github.com/apache/flink/pull/16387
> > > >
> > > > On Wed, Jun 30, 2021 at 2:15 PM godfrey he 
> > wrote:
> > > >
> > > >> Hi devs,
> > > >>
> > > >> As discussed in [1], I would like to start a discussion for
> releasing
> > > Flink
> > > >> 1.11.4.
> > > >>
> > > >> I would like to volunteer as the release manger for 1.11.4, and will
> > > start
> > > >> the release process on the next Wednesday (July 7th).
> > > >>
> > > >> There are 75 issues that have been closed or resolved [2],
> > > >> and no blocker issues left [3] so far.
> > > >>
> > > >> If any issues need to be marked as blocker for 1.11.4, please let me
> > > know
> > > >> in this thread!
> > > >>
> > > >> Best,
> > > >> Godfrey
> > > >>
> > > >>
> > > >> [1]
> > > >>
> > > >>
> > >
> >
> https://lists.apache.org/thread.html/r40a541027c6a04519f37c61f2a6f3dabdb821b3760cda9cc6ebe6ce9%40%3Cdev.flink.apache.org%3E
> > > >> [2]
> > > >>
> > > >>
> > >
> >
> https://issues.apache.org/jira/issues/?jql=project%20%3D%20FLINK%20AND%20fixVersion%20%3D%201.11.4%20AND%20status%20in%20(Closed%2C%20Resolved)
> > > >> [3]
> > > >>
> > > >>
> > >
> >
> https://issues.apache.org/jira/issues/?jql=project%20%3D%20FLINK%20AND%20fixVersion%20%3D%201.11.4%20AND%20status%20not%20in%20(Closed%2C%20Resolved)%20ORDER%20BY%20priority%20DESC
> > > >>
> > >
> > >
> >
>


Re: [DISCUSS] FLIP-185: Shorter heartbeat timeout and interval default values

2021-07-21 Thread Till Rohrmann
Thanks for sharing these insights.

I think it is no longer true that the ResourceManager notifies the
JobMaster about lost TaskExecutors. See FLINK-23216 [1] for more details.

Given the GC pauses, would you then be ok with decreasing the heartbeat
timeout to 20 seconds? This should give enough time to do the GC and then
still send/receive a heartbeat request.

I also wanted to add that we are about to get rid of one big cause of
blocking I/O operations from the main thread. With FLINK-22483 [2] we will
get rid of Filesystem accesses to retrieve completed checkpoints. This
leaves us with one additional file system access from the main thread which
is the one completing a pending checkpoint. I think it should be possible
to get rid of this access because as Stephan said it only writes
information to disk that is already written before. Maybe solving these two
issues could ease concerns about long pauses of unresponsiveness of Flink.

[1] https://issues.apache.org/jira/browse/FLINK-23216
[2] https://issues.apache.org/jira/browse/FLINK-22483

Cheers,
Till

On Wed, Jul 21, 2021 at 4:58 AM Yang Wang  wrote:

> Thanks @Till Rohrmann   for starting this discussion
>
> Firstly, I try to understand the benefit of shorter heartbeat timeout.
> IIUC, it will make the JobManager aware of
> TaskManager faster. However, it seems that only the standalone cluster
> could benefit from this. For Yarn and
> native Kubernetes deployment, the Flink ResourceManager should get the
> TaskManager lost event in a very short time.
>
> * About 8 seconds, 3s for Yarn NM -> Yarn RM, 5s for Yarn RM -> Flink RM
> * Less than 1 second, Flink RM has a watch for all the TaskManager pods
>
> Secondly, I am not very confident to decrease the timeout to 15s. I have
> quickly checked the TaskManager GC logs
> in the past week of our internal Flink workloads and find more than 100
> 10-seconds Full GC logs, but no one is bigger than 15s.
> We are using CMS GC for old generation.
>
>
> Best,
> Yang
>
> Till Rohrmann  于2021年7月17日周六 上午1:05写道:
>
>> Hi everyone,
>>
>> Since Flink 1.5 we have the same heartbeat timeout and interval default
>> values that are defined as heartbeat.timeout: 50s and heartbeat.interval:
>> 10s. These values were mainly chosen to compensate for lengthy GC pauses
>> and blocking operations that were executed in the main threads of Flink's
>> components. Since then, there were quite some advancements wrt the JVM's
>> GCs and we also got rid of a lot of blocking calls that were executed in
>> the main thread. Moreover, a long heartbeat.timeout causes long recovery
>> times in case of a TaskManager loss because the system can only properly
>> recover after the dead TaskManager has been removed from the scheduler.
>> Hence, I wanted to propose to change the timeout and interval to:
>>
>> heartbeat.timeout: 15s
>> heartbeat.interval: 3s
>>
>> Since there is no perfect solution that fits all use cases, I would really
>> like to hear from you what you think about it and how you configure these
>> heartbeat options. Based on your experience we might actually come up with
>> better default values that allow us to be resilient but also to detect
>> failed components fast. FLIP-185 can be found here [1].
>>
>> [1] https://cwiki.apache.org/confluence/x/GAoBCw
>>
>> Cheers,
>> Till
>>
>


Re: [VOTE] FLIP-184: Refine ShuffleMaster lifecycle management for pluggable shuffle service framework

2021-07-21 Thread Jingsong Li
+1 (binding)

Look forward to the production ready~

Best,
Jingsong

On Mon, Jul 19, 2021 at 3:25 PM Zhu Zhu  wrote:

> +1 (binding)
>
> Thanks,
> Zhu
>
> XING JIN  于2021年7月19日周一 上午10:29写道:
>
> > +1 (non-binding)
> >
> > Best,
> > Jin
> >
> > Guowei Ma  于2021年7月19日周一 上午9:41写道:
> >
> > > +1(binding)
> > >
> > > Best,
> > > Guowei
> > >
> > >
> > > On Fri, Jul 16, 2021 at 5:36 PM Yingjie Cao 
> > > wrote:
> > >
> > > > Hi all,
> > > >
> > > > I'd like to start a vote on FLIP-184 [1] which was
> > > > discussed in [2] [3]. The vote will be open for at least 72 hours
> > > > until 7.21 unless there is an objection.
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-184%3A+Refine+ShuffleMaster+lifecycle+management+for+pluggable+shuffle+service+framework
> > > > [2]
> > > >
> > > >
> > >
> >
> https://lists.apache.org/thread.html/radbbabfcfb6bec305ddf7aeefb983232f96b18ba013f0ae2ee500288%40%3Cdev.flink.apache.org%3E
> > > > [3]
> > > >
> > > >
> > >
> >
> https://lists.apache.org/thread.html/r93e3a72506f3e7ffd3c1ab860b5d1a21f8a47b059f2f2fdd05ca1d46%40%3Cdev.flink.apache.org%3E
> > > >
> > >
> >
>


-- 
Best, Jingsong Lee


[jira] [Created] (FLINK-23448) Update and fix up README

2021-07-21 Thread Daisy Tsang (Jira)
Daisy Tsang created FLINK-23448:
---

 Summary: Update and fix up README
 Key: FLINK-23448
 URL: https://issues.apache.org/jira/browse/FLINK-23448
 Project: Flink
  Issue Type: Improvement
  Components: Documentation / Training / Exercises
Reporter: Daisy Tsang






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Flink 1.14. Bi-weekly 2021-07-20

2021-07-21 Thread Johannes Moser
Dear Flink Community,

here's the latest update from the bi-weekly.
Short summary: still a lot of stuff is going on, the feature freeze has been
pushed by two weeks to the >>>16th of August<<< and we are working
on the release communication. Good stuff: the test and blockers are
looking very good.

Get all the updates on the release page. [1]

*Feature freeze:*
Initially because of issues with FLIP-147 we have been considering
moving the feature freeze, when we did a general update it became
obvious that a lot of teams/efforts would benefit from more time. 
So we did what has to be done and moved it by two weeks.

*On-going efforts:*
What also became obvious, the pure list of links to Jira Issues
doesn't provide a clear overview on the progress of the efforts.
Often the linked issues have a subset of other issues, that also
went into other releases. Some of the subtasks are optional.
We, as a community, need to improve that going forward. For now
I introduced a traffic light style state for each row in the
feature list as a quick fix. I will make sure this will be
updated before the next bi-weekly. 
The update has shown that ~20 of 36 planned features have a
positive state. 6 are rather looking bad.

*Test instabilities:*
The tests became more stable recently, which is a really, really good thing.
Thanks for all the efforts that went into this.

*Highlight features:*
We are working on the release communication in general. So we are collecting
the highlights of this release and did a quick brainstorming which
brought us to this list: ML, Table API, Log based checkpoints...
If there's something worth mentioning please add it on the
release page.

What can you do to make 1.14. a good release:
* Get an realistic state of the ongoing efforts and update the page
* Start testing the "done" features
* Think of the things that go beyond the implementation: documentation and
  release/feature evangelism.

Sincerely, your release managers
Xintong, Dawid & Joe

-

[1] https://cwiki.apache.org/confluence/display/FLINK/1.14+Release


[RESULT] [VOTE] Release flink-shaded 14.0, release candidate 1

2021-07-21 Thread Chesnay Schepler

|I'm happy to announce that we have unanimously approved this release.|
|There are 3 approving votes, 3 of which are binding:|
|* Timo|
|* Dawid|
|* Chesnay|
|There are no disapproving votes.|
|Thanks everyone!|

On 15/07/2021 09:02, Chesnay Schepler wrote:

Hi everyone,
Please review and vote on the release candidate #1 for the version 
14.0, as follows:

[ ] +1, Approve the release
[ ] -1, Do not approve the release (please provide specific comments)


The complete staging area is available for your review, which includes:
* JIRA release notes [1],
* the official Apache source release to be deployed to dist.apache.org 
[2], which are signed with the key with fingerprint C2EED7B111D464BA [3],

* all artifacts to be deployed to the Maven Central Repository [4],
* source code tag [5],
* website pull request listing the new release [6].

The vote will be open for at least 72 hours. It is adopted by majority 
approval, with at least 3 PMC affirmative votes.


Thanks,
Chesnay

[1] 
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12350408

[2] https://dist.apache.org/repos/dist/dev/flink/flink-shaded-14.0-rc1/
[3] https://dist.apache.org/repos/dist/release/flink/KEYS
[4] 
https://repository.apache.org/content/repositories/orgapacheflink-1435

[5] https://github.com/apache/flink-shaded/commits/release-14.0-rc1
[6] https://github.com/apache/flink-web/pull/458





Re: [VOTE] Release flink-shaded 14.0, release candidate 1

2021-07-21 Thread Chesnay Schepler

+1

On 20/07/2021 19:54, Dawid Wysakowicz wrote:


+1 (binding)

  * checked the website PR
  * verified changed versions and NOTICE files since version 13.0
  * verified the checksum and the signature

Best,

Dawid

On 19/07/2021 10:58, Timo Walther wrote:

+1 (binding)

I went through all commits one more time and could not spot anything 
that would block a release.


Thanks Chesnay!

Timo

On 15.07.21 09:02, Chesnay Schepler wrote:

Hi everyone,
Please review and vote on the release candidate #1 for the version 
14.0, as follows:

[ ] +1, Approve the release
[ ] -1, Do not approve the release (please provide specific comments)


The complete staging area is available for your review, which includes:
* JIRA release notes [1],
* the official Apache source release to be deployed to 
dist.apache.org [2], which are signed with the key with fingerprint 
C2EED7B111D464BA [3],

* all artifacts to be deployed to the Maven Central Repository [4],
* source code tag [5],
* website pull request listing the new release [6].

The vote will be open for at least 72 hours. It is adopted by 
majority approval, with at least 3 PMC affirmative votes.


Thanks,
Chesnay

[1] 
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12350408 


[2] https://dist.apache.org/repos/dist/dev/flink/flink-shaded-14.0-rc1/
[3] https://dist.apache.org/repos/dist/release/flink/KEYS
[4] 
https://repository.apache.org/content/repositories/orgapacheflink-1435

[5] https://github.com/apache/flink-shaded/commits/release-14.0-rc1
[6] https://github.com/apache/flink-web/pull/458







[jira] [Created] (FLINK-23447) Bump lz4-java to 1.8

2021-07-21 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-23447:


 Summary: Bump lz4-java to 1.8
 Key: FLINK-23447
 URL: https://issues.apache.org/jira/browse/FLINK-23447
 Project: Flink
  Issue Type: Improvement
  Components: Build System, Runtime / Network
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.14.0


Bump lz4 to the latest version for bug fixes and performance improvements.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)