Re: [DISCUSS] FLIP-304: Pluggable failure handling for Apache Flink

2023-03-27 Thread Gen Luo
is regard. A pluggable restart
> > > > > strategy
> > > > > >> sounds reasonable. But an error classifier and a restart
> strategy
> > > are
> > > > > still
> > > > > >> different enough to justify separate plugins, IMHO. And
> therefore, I
> > > > > would
> > > > > >> think that covering the restart strategy in a separate FLIP is
> the
> > > > > better
> > > > > >> option for the sake of simplicity.
> > > > > >>
> > > > > >> - immutable context: Passing in an immutable context and
> returning
> > > > data
> > > > > >> through the interface method's return value sounds like a better
> > > > > approach
> > > > > >> to harden the contract of the interface. +1 for that proposal
> > > > > >>
> > > > > >> - async operation: I think David is right. An async interface
> makes
> > > > the
> > > > > >> listener implementations more robust when it comes to heavy IO
> > > > > operations.
> > > > > >> The ioExecutor can be passed through the context object. +1
> > > > > >>
> > > > > >> Matthias
> > > > > >>
> > > > > >> On Tue, Mar 21, 2023 at 2:09 PM David Morávek <
> > > > david.mora...@gmail.com>
> > > > > >> wrote:
> > > > > >>
> > > > > >>> *@Piotr*
> > > > > >>>
> > > > > >>>
> > > > > >>>> I was thinking about actually defining the order of the
> > > > > >>>> classifiers/handlers and not allowing them to be asynchronous.
> > > > > >>>> Asynchronousity would create some problems: when to actually
> > > return
> > > > > the
> > > > > >>>> error to the user? After all async responses will get back?
> > > Before,
> > > > > but
> > > > > >>>> without classified exception? It would also add implementation
> > > > > >> complexity
> > > > > >>>> and I think we can always expand the API with async version
> in the
> > > > > >> future
> > > > > >>>> if needed.
> > > > > >>>
> > > > > >>>
> > > > > >>> As long as the classifiers need to talk to an external system,
> we
> > > by
> > > > > >>> definition need to allow them to be asynchronous to unblock the
> > > main
> > > > > >> thread
> > > > > >>> for handling other RPCs. Exposing ioExecutor via the context
> > > proposed
> > > > > >> above
> > > > > >>> would be great.
> > > > > >>>
> > > > > >>> After all async responses will get back
> > > > > >>>
> > > > > >>>
> > > > > >>> This would be the same if we trigger them synchronously one by
> one,
> > > > > with
> > > > > >> a
> > > > > >>> caveat that synchronous execution might take significantly
> longer
> > > and
> > > > > >>> introduce unnecessary downtime to a job.
> > > > > >>>
> > > > > >>> D.
> > > > > >>>
> > > > > >>> On Tue, Mar 21, 2023 at 1:12 PM Zhu Zhu 
> wrote:
> > > > > >>>
> > > > > >>>> Hi Piotr,
> > > > > >>>>
> > > > > >>>> It's fine to me to have a separate FLIP to extend this
> > > > > >> `FailureListener`
> > > > > >>>> to support custom restart strategy.
> > > > > >>>>
> > > > > >>>> What I was a bit concerned is that if we just treat the
> > > > > >> `FailureListener`
> > > > > >>>> as an error classifier which is not crucial to Flink framework
> > > > > process,
> > > > > >>>> we may design it to run asynchronously and not trigger Flink
> > > > failures.
> > > > > >>>> This may be a blocker if later we want to enable it to support
> > > > custom
> > > > > >>>> restart s

Re: [DISCUSS] FLIP-304: Pluggable failure handling for Apache Flink

2023-03-21 Thread Gen Luo
Hi Panagiotis,

Thanks for the proposal.

It's useful to enrich the information so that users can be more
clear why the job is failing, especially platform developers who
need to provide the information to their end users.
And for the very FLIP, I'd prefer the naming `FailureEnricher`
proposed by David, as the plugin doesn't really handle the failure.

However, like Zhu and Lijie said, I also joined a discussion
recently about customized failure handling, e.g. counting the
failure rate of pipeline regions separately, and failing the job
when a specific error occurs, and so on.
I suppose a custom restart strategy, or I'd call it a custom
failure "handler", is indeed necessary. It can also enrich the
information as the current proposed handler does.

To avoid adding too many plugin interfaces which may confuse users
and make the ExecutionFailureHandler more complex,
I think it'd be better to consider the requirements at the same time.

IMO, we can add a handler interface, then make the current restart
strategy and the enricher both types of the handler. The handlers
execute in sequence, and the failure is considered unrecoverable if
any of the handlers decides.
In this way, users can also implement a handler using the enriched
information provided by the previous handlers, e.g. fail the job and
send a notification if too many failures are caused by the end users.

Best,
Gen


On Tue, Mar 21, 2023 at 11:38 AM Weihua Hu  wrote:

> Hi Panagiotis,
>
> Thanks for your proposal. It is valuable to analyze the reason for
> failure with the user plug-in.
>
> Making the context immutable could make the contract stronger.
> Letting the listener return an enriching result may be a better way.
>
> IIUC, listeners could do two things, enrich more information (tags/labels)
> to FailureHandlingResult, and push data out of Flink (metrics or
> something).
>  IMO, we could split these two types into Listener and Advisor (maybe
> other names). The Listener just pushes the data out and returns nothing to
>  Flink, so we can run these async and don't have to wait for Listener's
> result.
>  The Advisor returns rich information to the FailureHadingResult, and it
> should
>  have a lighter logic.
>
>
> Supporting a custom restart strategy is also valuable. In this design, we
> use
> RestartStrategy to construct a FailureHandingResult, and then pass it to
> Listener.
> My question is, should we change the restart strategy interface to support
> the
> custom restart strategy, or keep the current restart strategy and let the
> later
> Listener enrich the restartable information to FailureHandingResult? The
> latter
> may cause some confusion when we use a custom restart strategy.
> The default flink restart strategy also runs but does not take effect.
>
>
> Best,
> Weihua
>
>
> On Mon, Mar 20, 2023 at 11:42 PM Lijie Wang 
> wrote:
>
> > Hi Panagiotis,
> >
> > Thanks for driving this.
> >
> > +1 for supporting custom restart strategy, we did receive such requests
> > from the user mailing list [1][2].
> >
> > Besides, in current design, the plugin will only do some statistical and
> > classification work, and will not affect the *FailureHandlingResult*.
> Just
> > listening, no handling, it doesn't quite match the title.
> >
> > [1] https://lists.apache.org/thread/ch3s4jhh09wnff3tscqnb6btp2zlp2r1
> > [2] https://lists.apache.org/thread/lwjfdr7c1ypo77r4rwojdk7kxx2sw4sx
> >
> > Best,
> > Lijie
> >
> > Zhu Zhu  于2023年3月20日周一 21:39写道:
> >
> > > Hi Panagiotis,
> > >
> > > Thanks for creating this proposal! It's good to enable Flink to handle
> > > different errors in different ways, through a pluggable way.
> > >
> > > There are requests for flexible restart strategies from time to time,
> for
> > > different strategies of restart backoff time, or to suppress restarting
> > > on certain errors. Therefore, I think it's better that the proposed
> > > failure handling plugin can also support custom restart strategies.
> > >
> > > Maybe we can call it FailureHandlingAdvisor which provides more
> > > information (labels) and gives advice (restart backoff time, whether
> > > to restart)? I do not have a strong opinion though, any explanatory
> > > name would be good.
> > >
> > > To avoid unexpected mutation, how about to make the context immutable
> > > and let the plugin return an immutable result? i.e. remove the setters
> > > from the context, and let the plugin method return a result which
> > > contains `labels`, `canRestart` and `restartBackoffTime`. Flink should
> > > apply the result to the context before invoking the next plugin, so
> > > that the next plugin will see the updated context.
> > >
> > > The plugin should avoid taking too much time to return the result,
> > because
> > > it will block the RPC and result in instability. However, it can still
> > > perform heavy actions in a different thread. The context can provide an
> > > `ioExecutor` to the plugins for reuse.
> > >
> > > Thanks,
> > > Zhu
> > >
> > > Shammon FY  

Re: [DISCUSS]Introduce a time-segment based restart strategy

2022-11-25 Thread Gen Luo
Hi all,

Sorry for the late jumping in.

To meet Weihua's need, Dong's proposal seems pretty fine, but the
modification it requires, I'm afraid, is not really easy.
RestartBackoffTimeStrategy is quite a simple interface. The strategy even
doesn't know which task is failing, not to mention the division of pipeline
regions.
To distinguish the failure count of each regions, it lacks too much
information, which is not easy to acquire for the strategy.
One approch I can figure out is to create different strategy instances to
different regions. In this way we do not need to modify the strategy but do
need to modify the schedulers or the ExecutionFailureHandler.

On the other hand, I realize another case that the restart strategy may
need to be aware of the types and occurrence rate of the exceptions. That
is to avoid failing over but directly fail the job when some errors happen.
I know that there's an annotation
`@ThrowableAnnotation(ThrowableType.NonRecoverableError)` that can fail the
job, but I'm afraid there can be some scenarios that can not annotate the
exceptions, or catch and warp with an annotated exception.
In such cases, handling in the restart strategy can be a good choice.
Such a strategy can even combines with other existing strategies which
handle the failure rate rather than the cause type.

Besides, given that new strategies may be necessary, and existing
strategies may also need to enhance, maybe we should make the
RestartBackoffTimeStrategy a plugin rather than the enumerations, or
introduce a new custom type strategy which can load customized
implementations.
This can not solve the problem immediately, but makes the choice of restart
strategy more flexiable.
What do you think about this?

Thanks.

Paul Lam  于 2022年11月21日周一 17:46写道:

> Dong’s proposal LGTM.
>
> Best,
> Paul Lam
>
> > 2022年11月19日 10:50,Dong Lin  写道:
> >
> > Hey Weihua,
> >
> > Thanks for proposing the new strategy!
> >
> > If I understand correctly, the main issue is that different failover
> > regions can be restarted independently, but they share the same counter
> > when counting the number of failures in an interval. So the number of
> > failures for a given region is less than what users expect.
> >
> > Given that regions can be restarted independently, it might be more
> usable
> > and intuitive to count the number of failures for each region when
> > executing the failover strategy. Thus, instead of adding a new failover
> > strategy, how about we update the existing failure-rate strategy, and
> > probably other existing strategies as well, to use the following
> semantics:
> >
> > - For any given region in the job, its number of failures in
> > failure-rate-interval should not exceed max-failures-per-interval.
> > Otherwise, the job will fail without being restarted.
> >
> > By using this updated semantics, the keyby-connected job will have the
> same
> > behavior as the existing Flink when we use failure-rate strategy. For
> > the rescale-connected
> > job, in the case you described above, after the TM fails, each of the 3
> > regions will increment its failure count from 0 to 1, which is still less
> > than max-failures-per-interval. Thus the rescale-connected job can
> continue
> > to work.
> >
> > This alternative approach can solve the problem without increasing the
> > complexity of the failover strategy choice. And this approach does not
> > require us to check whether two exceptions belong to the same root cause.
> > Do you think it can work?
> >
> > Thanks,
> > Dong
> >
> >
> > On Fri, Nov 4, 2022 at 4:46 PM Weihua Hu  wrote:
> >
> >> Hi, everyone
> >>
> >> I'd like to bring up a discussion about restart strategy. Flink
> supports 3
> >> kinds of restart strategy. These work very well for jobs with specific
> >> configs, but for platform users who manage hundreds of jobs, there is no
> >> common strategy to use.
> >>
> >> Let me explain the reason. We manage a lot of jobs, some are
> >> keyby-connected with one region per job, some are rescale-connected with
> >> many regions per job, and when using the failure rate restart strategy,
> we
> >> cannot achieve the same control with the same configuration.
> >>
> >> For example, if I want the job to fail when there are 3 exceptions
> within 5
> >> minutes, the config would look like this:
> >>
> >>> restart-strategy.failure-rate.max-failures-per-interval: 3
> >>>
> >>> restart-strategy.failure-rate.failure-rate-interval: 5 min
> >>>
> >> For the keyby-connected job, this config works well.
> >>
> >> However, for the rescale-connected job, we need to consider the number
> of
> >> regions and the number of slots per TaskManager. If each TM has 3 slots,
> >> and these 3 slots run the task of 3 regions, then when one TaskManager
> >> crashes, it will trigger 3 regions to fail, and the job will fail
> because
> >> it exceeds the threshold of the restart strategy. To avoid the effect of
> >> single TM crashes, I must increase the max-failures-per-interval to 9,
> but

[jira] [Created] (FLINK-29927) AkkaUtils#getAddress may cause memory leak

2022-11-08 Thread Gen Luo (Jira)
Gen Luo created FLINK-29927:
---

 Summary: AkkaUtils#getAddress may cause memory leak
 Key: FLINK-29927
 URL: https://issues.apache.org/jira/browse/FLINK-29927
 Project: Flink
  Issue Type: Bug
Reporter: Gen Luo
 Attachments: 截屏2022-11-08 下午5.19.38.png

We found a slow memory leak in JM. When MetricFetcherImpl tries to retrieve 
metrics, it always call MetricQueryServiceRetriever#retrieveService first. And 
the method will acquire the address of a task manager, which will use 
AkkaUtil#getAddress internally. While the getAddress method is implemented like 
this:

{code:java}
public static Address getAddress(ActorSystem system) {
return new RemoteAddressExtension().apply(system).getAddress();
}
{code}

and the RemoteAddressExtension#apply is like this:

{code:scala}
  def apply(system: ActorSystem): T = {
java.util.Objects.requireNonNull(system, "system must not be 
null!").registerExtension(this)
  }
{code}

This means every call of AkkaUtils#getAddress will register a new extension to 
the ActorSystem, and can never be released until the ActorSystem exits.

Most of the usage of the method are called only once while initializing, but as 
described above, MetricFetcherImpl will also use the method. It can happens 
periodically while users open the WebUI, or happens when the users call the 
RESTful API directly to get metrics. This means the memory may keep leaking. 

The leak may be introduced in FLINK-23662 when porting the scala version of 
AkkaUtils to the java one, while I'm not sure if the scala version has the same 
issue.

The leak seems very slow. We observed it on a job running for more than one 
month with only 1G memory for job manager. So I suppose it's not an emergency 
one but still needs to fix.




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28864) DynamicPartitionPruningRule#isNewSource should check if the source used by the DataStreamScanProvider is actually a new sourc

2022-08-08 Thread Gen Luo (Jira)
Gen Luo created FLINK-28864:
---

 Summary: DynamicPartitionPruningRule#isNewSource should check if 
the source used by the DataStreamScanProvider is actually a new sourc
 Key: FLINK-28864
 URL: https://issues.apache.org/jira/browse/FLINK-28864
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Reporter: Gen Luo


DynamicPartitionPruningRule#isNewSource supposes DataStreamScanProvider that 
supports dynamic filtering will use new source as its source, but it's not 
reliable. For better compatibility, the method should acquire the source 
transformation from the translated DataStream and check if the source is 
actually a new source.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] Replace Attempt column with Attempt Number on the subtask list page of the Web UI

2022-07-20 Thread Gen Luo
Hi user mail list,

I'm also forwarding this thread to you. Please let me know if you have any
comments or feedback!

Best,
Gen

On Wed, Jul 20, 2022 at 4:25 PM Zhu Zhu  wrote:

> Thanks for starting this discussion, Gen!
> I agree it is confusing or even troublesome to show an attempt id that is
> different from the corresponding attempt number in REST, metrics and logs.
> It adds burden to users to do the mapping in troubleshooting. Mis-mapping
> can be easy to happen and result in a waste of efforts and wrong
> conclusion.
>
> Therefore, +1 for this proposal.
>
> Thanks,
> Zhu
>
> Gen Luo  于2022年7月20日周三 15:24写道:
> >
> > Hi everyone,
> >
> > I'd like to propose a change on the Web UI to replace the Attempt column
> > with an Attempt Number column on the subtask list page.
> >
> > From the very beginning, the attempt number shown is calculated at the
> > frontend by subtask.attempt + 1, which means the attempt number shown on
> > the web UI is not the same as it is in the runtime, as well as the logs
> and
> > the metrics. Users may get confused since they can't find logs or metrics
> > of the subtask with the same attempt number.
> >
> > Fortunately, by now the users don't need to care about the attempt
> number,
> > since there can be only one attempt of each subtask. However, the
> confusion
> > seems inevitable once the speculative execution[1] or the attempt history
> > is introduced, since multiple attempts of the same subtask can be
> executed
> > or presented at the same time.
> >
> > I suggest that the attempt number shown on the web UI should be changed
> to
> > align that on the runtime side, which is used in logging and metrics
> > reporting. To avoid confusion, the column should also be renamed as
> > "Attempt Number". The changes should only affect the Web UI. No REST API
> > needs to change. What do you think?
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job
> >
> > Best,
> > Gen
>


[DISCUSS] Replace Attempt column with Attempt Number on the subtask list page of the Web UI

2022-07-20 Thread Gen Luo
Hi everyone,

I'd like to propose a change on the Web UI to replace the Attempt column
with an Attempt Number column on the subtask list page.

>From the very beginning, the attempt number shown is calculated at the
frontend by subtask.attempt + 1, which means the attempt number shown on
the web UI is not the same as it is in the runtime, as well as the logs and
the metrics. Users may get confused since they can't find logs or metrics
of the subtask with the same attempt number.

Fortunately, by now the users don't need to care about the attempt number,
since there can be only one attempt of each subtask. However, the confusion
seems inevitable once the speculative execution[1] or the attempt history
is introduced, since multiple attempts of the same subtask can be executed
or presented at the same time.

I suggest that the attempt number shown on the web UI should be changed to
align that on the runtime side, which is used in logging and metrics
reporting. To avoid confusion, the column should also be renamed as
"Attempt Number". The changes should only affect the Web UI. No REST API
needs to change. What do you think?

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job

Best,
Gen


[jira] [Created] (FLINK-28589) Enhance Web UI for Speculative Execution

2022-07-18 Thread Gen Luo (Jira)
Gen Luo created FLINK-28589:
---

 Summary: Enhance Web UI for Speculative Execution
 Key: FLINK-28589
 URL: https://issues.apache.org/jira/browse/FLINK-28589
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Web Frontend
Affects Versions: 1.16.0
Reporter: Gen Luo






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28588) Enhance REST API for Speculative Execution

2022-07-18 Thread Gen Luo (Jira)
Gen Luo created FLINK-28588:
---

 Summary: Enhance REST API for Speculative Execution
 Key: FLINK-28588
 URL: https://issues.apache.org/jira/browse/FLINK-28588
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / REST
Affects Versions: 1.16.0
Reporter: Gen Luo






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28587) FLIP-249: Flink Web UI Enhancement for Speculative Execution

2022-07-18 Thread Gen Luo (Jira)
Gen Luo created FLINK-28587:
---

 Summary: FLIP-249: Flink Web UI Enhancement for Speculative 
Execution
 Key: FLINK-28587
 URL: https://issues.apache.org/jira/browse/FLINK-28587
 Project: Flink
  Issue Type: New Feature
  Components: Runtime / REST, Runtime / Web Frontend
Affects Versions: 1.16.0
Reporter: Gen Luo


As a follow-up step of FLIP-168 and FLIP-224, the Flink Web UI needs to be 
enhanced to display the related information if the speculative execution 
mechanism is enabled.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[RESULT][VOTE] FLIP-249: Flink Web UI Enhancement for Speculative Execution

2022-07-15 Thread Gen Luo
Hi everyone,

I’m happy to announce that FLIP-249[1] has been accepted, with 4 approving
votes, 3 of which are binding[2]:
- Zhu Zhu (binding)
- Lijie Wang
- Jing Zhang (binding)
- Yun Gao (binding)

There is no disapproving vote.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-249%3A+Flink+Web+UI+Enhancement+for+Speculative+Execution
[2] https://lists.apache.org/thread/0xonn4y8lkj49comors0z86tpyzkvvqg

Best,
Gen


Re: [DISCUSS] FLIP-249: Flink Web UI Enhancement for Speculative Execution

2022-07-13 Thread Gen Luo
Hi Jing,

I agree that in this version it's not really convenient to check out if
there are
speculative attempts. I think what we need to improve this is not only the
enhancement of the current Web UI, but a new page summarizing the
information of speculative execution on the history server, or both. More
useful information can be found there like the history timeline and
progress
of speculative executions. That can be a big project.

I suppose in the FLIP we'd like to make sure of the usability of
speculative
execution, and we can then consider further improvements after this is done.

Best,
Gen

On Wed, Jul 13, 2022 at 7:05 PM Jing Zhang  wrote:

> Hi Gen,
>
> > The way the speculative executions are presented should be almost the
> same as the
> job was running. Users can still find the executions folded in the subtask
> list page.
>
> It's a more complicated operation to check all vertex and all subtasks list
> page.
> It's better to have an easier way to know whether the job contains
> speculative executions
> even after the job finished.
> Maybe the point could be took into consideration in the next version.
>
> Best,
> Jing Zhang
>
>
> Gen Luo  于2022年7月13日周三 14:47写道:
>
> > Hi Jing,
> >
> > Thanks for joining the discussion. It's a very good point to figure out
> the
> > possible influence on the history server.
> >
> > > 1. Does the improvement also cover history server or just Web UI?
> > As far as I know most Web UI components are shared between
> > runtime and history server, so the improvement is expected to cover both.
> >
> > We will make sure the changes proposed in this FLIP do not conflict with
> > the ongoing FLIP-241 which is working on the enhancement of completed
> > job information.
> >
> > > 2. How to know whether the job contains speculative execution
> > instances after the job finished? Do we have to check each subtasks
> > of all vertex one by one?
> >
> > When one attempt of a subtask finishes, all other concurrent attempts
> > will be canceled, but still treated as the current executions. The way
> the
> > speculative executions are presented should be almost the same as the
> > job was running. Users can still find the executions folded in the
> subtask
> > list page.
> >
> > As we mentioned in the FLIP, all changes are expected to be transparent
> > to users who don't use speculative execution. And to users who do use
> > speculative execution, the experience should be almost the same
> > when watching a running job or a completed job in the history server.
> >
> > Best,
> > Gen
> >
> > On Tue, Jul 12, 2022 at 8:41 PM Jing Zhang  wrote:
> >
> > > Thanks for driving this discussion. It's a very helpful improvement.
> > > I only have two minor questions:
> > > 1. Does the improvement also cover history server or just Web UI?
> > > 2. How to know whether the job contains speculative execution instances
> > > after the job finished?
> > > Do we have to check each subtasks of all vertex one by one?
> > >
> > > Best,
> > > Jing Zhang
> > >
> > > Gen Luo  于2022年7月11日周一 22:31写道:
> > >
> > > > Hi, everyone.
> > > >
> > > > Thanks for your feedback.
> > > > If there are no more concerns or comments, I will start the vote
> > > tomorrow.
> > > >
> > > > Gen Luo  于 2022年7月11日周一 11:12写道:
> > > >
> > > > > Hi Lijie and Zhu,
> > > > >
> > > > > Thanks for the suggestion. I agree that the name "Blocked Free
> Slots"
> > > is
> > > > > more clear to users.
> > > > > I'll take the suggestion and update the FLIP.
> > > > >
> > > > > On Fri, Jul 8, 2022 at 9:12 PM Zhu Zhu  wrote:
> > > > >
> > > > >> I agree that it can be more useful to show the number of slots
> that
> > > are
> > > > >> free but blocked. Currently users infer the slots in use by
> > > subtracting
> > > > >> available slots from the total slots. With blocked slots
> introduced,
> > > > this
> > > > >> can be achieved by subtracting available slots and blocked free
> > slots
> > > > >> from the total slots.
> > > > >>
> > > > >> Therefore, +1 to show "Blocked Free Slots" on the resource card.
> > > > >>
> > > > >> Thanks,
> > > > >> Zhu
> > > > >>
>

Re: [VOTE] FLIP-249: Flink Web UI Enhancement for Speculative Execution

2022-07-13 Thread Gen Luo
Hi Jing,

I have replied in the discussion thread about the questions. Hope that
would be helpful.

Best,
Gen

On Tue, Jul 12, 2022 at 8:43 PM Jing Zhang  wrote:

> Hi, Gen Luo,
>
> I left  two minor questions in the DISCUSS thread.
> Sorry for jumping into the discussion so late.
>
> Best,
> Jing Zhang
>
> Lijie Wang  于2022年7月12日周二 19:29写道:
>
> > +1 (non-binding)
> >
> > Best,
> > Lijie
> >
> > Zhu Zhu  于2022年7月12日周二 17:38写道:
> >
> > > +1 (binding)
> > >
> > > Thanks,
> > > Zhu
> > >
> > > Gen Luo  于2022年7月12日周二 13:46写道:
> > > >
> > > > Hi everyone,
> > > >
> > > >
> > > > Thanks for all the feedback so far. Based on the discussion [1], we
> > seem
> > > to
> > > > have consensus. So, I would like to start a vote on FLIP-249 [2].
> > > >
> > > >
> > > > The vote will last for at least 72 hours unless there is an objection
> > or
> > > > insufficient votes.
> > > >
> > > >
> > > > [1] https://lists.apache.org/thread/832tk3zvysg45vrqrv5tgbdqx974pm3m
> > > > [2]
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-249%3A+Flink+Web+UI+Enhancement+for+Speculative+Execution
> > >
> >
>


Re: [DISCUSS] FLIP-249: Flink Web UI Enhancement for Speculative Execution

2022-07-13 Thread Gen Luo
Hi Jing,

Thanks for joining the discussion. It's a very good point to figure out the
possible influence on the history server.

> 1. Does the improvement also cover history server or just Web UI?
As far as I know most Web UI components are shared between
runtime and history server, so the improvement is expected to cover both.

We will make sure the changes proposed in this FLIP do not conflict with
the ongoing FLIP-241 which is working on the enhancement of completed
job information.

> 2. How to know whether the job contains speculative execution
instances after the job finished? Do we have to check each subtasks
of all vertex one by one?

When one attempt of a subtask finishes, all other concurrent attempts
will be canceled, but still treated as the current executions. The way the
speculative executions are presented should be almost the same as the
job was running. Users can still find the executions folded in the subtask
list page.

As we mentioned in the FLIP, all changes are expected to be transparent
to users who don't use speculative execution. And to users who do use
speculative execution, the experience should be almost the same
when watching a running job or a completed job in the history server.

Best,
Gen

On Tue, Jul 12, 2022 at 8:41 PM Jing Zhang  wrote:

> Thanks for driving this discussion. It's a very helpful improvement.
> I only have two minor questions:
> 1. Does the improvement also cover history server or just Web UI?
> 2. How to know whether the job contains speculative execution instances
> after the job finished?
> Do we have to check each subtasks of all vertex one by one?
>
> Best,
> Jing Zhang
>
> Gen Luo  于2022年7月11日周一 22:31写道:
>
> > Hi, everyone.
> >
> > Thanks for your feedback.
> > If there are no more concerns or comments, I will start the vote
> tomorrow.
> >
> > Gen Luo  于 2022年7月11日周一 11:12写道:
> >
> > > Hi Lijie and Zhu,
> > >
> > > Thanks for the suggestion. I agree that the name "Blocked Free Slots"
> is
> > > more clear to users.
> > > I'll take the suggestion and update the FLIP.
> > >
> > > On Fri, Jul 8, 2022 at 9:12 PM Zhu Zhu  wrote:
> > >
> > >> I agree that it can be more useful to show the number of slots that
> are
> > >> free but blocked. Currently users infer the slots in use by
> subtracting
> > >> available slots from the total slots. With blocked slots introduced,
> > this
> > >> can be achieved by subtracting available slots and blocked free slots
> > >> from the total slots.
> > >>
> > >> Therefore, +1 to show "Blocked Free Slots" on the resource card.
> > >>
> > >> Thanks,
> > >> Zhu
> > >>
> > >> Lijie Wang  于2022年7月8日周五 17:39写道:
> > >> >
> > >> > Hi Gen & Zhu,
> > >> >
> > >> > -> 1. Can we also show "Blocked Slots" in the resource card, so that
> > >> users
> > >> > can easily figure out how many slots are available/blocked/in-use?
> > >> >
> > >> > I think we should describe the "available" and "blocked" more
> clearly.
> > >> In
> > >> > my opinion, I think users should be interested in the number of
> slots
> > in
> > >> > the following 3 state:
> > >> > 1. free and unblocked, I think it's OK to call this state
> "available".
> > >> > 2. free and blocked, I think it's not appropriate to call "blocked"
> > >> > directly, because "blocked" should include both the "free and
> blocked"
> > >> and
> > >> > "in-use and blocked".
> > >> > 3. in-use
> > >> >
> > >> > And the sum of the aboved 3 kind of slots should be the total number
> > of
> > >> > slots in this cluster.
> > >> >
> > >> > WDYT?
> > >> >
> > >> > Best,
> > >> > Lijie
> > >> >
> > >> > Gen Luo  于2022年7月8日周五 16:14写道:
> > >> >
> > >> > > Hi Zhu,
> > >> > > Thanks for the feedback!
> > >> > >
> > >> > > 1.Good idea. Users should be more familiar with the slots as the
> > >> resource
> > >> > > units.
> > >> > >
> > >> > > 2.You remind me that the "speculative attempts" are execution
> > attempts
> > >> > > started by the SpeculativeScheduler when slot tasks are d

[VOTE] FLIP-249: Flink Web UI Enhancement for Speculative Execution

2022-07-11 Thread Gen Luo
Hi everyone,


Thanks for all the feedback so far. Based on the discussion [1], we seem to
have consensus. So, I would like to start a vote on FLIP-249 [2].


The vote will last for at least 72 hours unless there is an objection or
insufficient votes.


[1] https://lists.apache.org/thread/832tk3zvysg45vrqrv5tgbdqx974pm3m
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-249%3A+Flink+Web+UI+Enhancement+for+Speculative+Execution


Re: [DISCUSS] FLIP-249: Flink Web UI Enhancement for Speculative Execution

2022-07-11 Thread Gen Luo
Hi, everyone.

Thanks for your feedback.
If there are no more concerns or comments, I will start the vote tomorrow.

Gen Luo  于 2022年7月11日周一 11:12写道:

> Hi Lijie and Zhu,
>
> Thanks for the suggestion. I agree that the name "Blocked Free Slots" is
> more clear to users.
> I'll take the suggestion and update the FLIP.
>
> On Fri, Jul 8, 2022 at 9:12 PM Zhu Zhu  wrote:
>
>> I agree that it can be more useful to show the number of slots that are
>> free but blocked. Currently users infer the slots in use by subtracting
>> available slots from the total slots. With blocked slots introduced, this
>> can be achieved by subtracting available slots and blocked free slots
>> from the total slots.
>>
>> Therefore, +1 to show "Blocked Free Slots" on the resource card.
>>
>> Thanks,
>> Zhu
>>
>> Lijie Wang  于2022年7月8日周五 17:39写道:
>> >
>> > Hi Gen & Zhu,
>> >
>> > -> 1. Can we also show "Blocked Slots" in the resource card, so that
>> users
>> > can easily figure out how many slots are available/blocked/in-use?
>> >
>> > I think we should describe the "available" and "blocked" more clearly.
>> In
>> > my opinion, I think users should be interested in the number of slots in
>> > the following 3 state:
>> > 1. free and unblocked, I think it's OK to call this state "available".
>> > 2. free and blocked, I think it's not appropriate to call "blocked"
>> > directly, because "blocked" should include both the "free and blocked"
>> and
>> > "in-use and blocked".
>> > 3. in-use
>> >
>> > And the sum of the aboved 3 kind of slots should be the total number of
>> > slots in this cluster.
>> >
>> > WDYT?
>> >
>> > Best,
>> > Lijie
>> >
>> > Gen Luo  于2022年7月8日周五 16:14写道:
>> >
>> > > Hi Zhu,
>> > > Thanks for the feedback!
>> > >
>> > > 1.Good idea. Users should be more familiar with the slots as the
>> resource
>> > > units.
>> > >
>> > > 2.You remind me that the "speculative attempts" are execution attempts
>> > > started by the SpeculativeScheduler when slot tasks are detected,
>> while the
>> > > current execution attempts other than the "most current" one are not
>> really
>> > > the speculative attempts. I agree we should modify the field name.
>> > >
>> > > 3.ArchivedSpeculativeExecutionVertex seems to be introduced with the
>> > > speculative execution to handle the speculative attempts as a part of
>> the
>> > > execution history. Since this FLIP is handling the attempts with a
>> more
>> > > proper way, I agree that we can remove the
>> > > ArchivedSpeculativeExecutionVertex.
>> > >
>> > > Thanks again and I'll update the FLIP later according to these
>> suggestions.
>> > >
>> > > On Thu, Jul 7, 2022 at 4:35 PM Zhu Zhu  wrote:
>> > >
>> > > > Thanks for writing this FLIP and initiating the discussion, Gen,
>> Yun and
>> > > > Junhan!
>> > > > It will be very useful to have these improvements on the web UI for
>> > > > speculative execution users, allowing them to know what is
>> happening.
>> > > > I just have a few comment regarding the design details:
>> > > >
>> > > > 1. Can we also show "Blocked Slots" in the resource card, so that
>> users
>> > > > can easily figure out how many slots are available/blocked/in-use?
>> > > > 2. I think "speculative-attempts" is not accurate, because the
>> > > > root/fastest current can be a specualtive execution attempt, and in
>> > > > this case "speculative-attempts" will contain the intial execution
>> > > > attempt. How about name it as "other-concurrent-attempts"?
>> > > > 3. I think ArchivedSpeculativeExecutionVertex is not necessarily
>> > > > needed. We can rework the ArchivedExecutionVertex to contains a set
>> of
>> > > > current execution attempts. The set will have one only element in
>> > > > non-speculative cases though. In this way, we can have a unified
>> > > > processing for ArchivedExecutionVertex in
>> speculative/non-speculative
>> > > > cases.
>> > > >
>> > > >

Re: [DISCUSS] FLIP-249: Flink Web UI Enhancement for Speculative Execution

2022-07-10 Thread Gen Luo
Hi Lijie and Zhu,

Thanks for the suggestion. I agree that the name "Blocked Free Slots" is
more clear to users.
I'll take the suggestion and update the FLIP.

On Fri, Jul 8, 2022 at 9:12 PM Zhu Zhu  wrote:

> I agree that it can be more useful to show the number of slots that are
> free but blocked. Currently users infer the slots in use by subtracting
> available slots from the total slots. With blocked slots introduced, this
> can be achieved by subtracting available slots and blocked free slots
> from the total slots.
>
> Therefore, +1 to show "Blocked Free Slots" on the resource card.
>
> Thanks,
> Zhu
>
> Lijie Wang  于2022年7月8日周五 17:39写道:
> >
> > Hi Gen & Zhu,
> >
> > -> 1. Can we also show "Blocked Slots" in the resource card, so that
> users
> > can easily figure out how many slots are available/blocked/in-use?
> >
> > I think we should describe the "available" and "blocked" more clearly. In
> > my opinion, I think users should be interested in the number of slots in
> > the following 3 state:
> > 1. free and unblocked, I think it's OK to call this state "available".
> > 2. free and blocked, I think it's not appropriate to call "blocked"
> > directly, because "blocked" should include both the "free and blocked"
> and
> > "in-use and blocked".
> > 3. in-use
> >
> > And the sum of the aboved 3 kind of slots should be the total number of
> > slots in this cluster.
> >
> > WDYT?
> >
> > Best,
> > Lijie
> >
> > Gen Luo  于2022年7月8日周五 16:14写道:
> >
> > > Hi Zhu,
> > > Thanks for the feedback!
> > >
> > > 1.Good idea. Users should be more familiar with the slots as the
> resource
> > > units.
> > >
> > > 2.You remind me that the "speculative attempts" are execution attempts
> > > started by the SpeculativeScheduler when slot tasks are detected,
> while the
> > > current execution attempts other than the "most current" one are not
> really
> > > the speculative attempts. I agree we should modify the field name.
> > >
> > > 3.ArchivedSpeculativeExecutionVertex seems to be introduced with the
> > > speculative execution to handle the speculative attempts as a part of
> the
> > > execution history. Since this FLIP is handling the attempts with a more
> > > proper way, I agree that we can remove the
> > > ArchivedSpeculativeExecutionVertex.
> > >
> > > Thanks again and I'll update the FLIP later according to these
> suggestions.
> > >
> > > On Thu, Jul 7, 2022 at 4:35 PM Zhu Zhu  wrote:
> > >
> > > > Thanks for writing this FLIP and initiating the discussion, Gen, Yun
> and
> > > > Junhan!
> > > > It will be very useful to have these improvements on the web UI for
> > > > speculative execution users, allowing them to know what is happening.
> > > > I just have a few comment regarding the design details:
> > > >
> > > > 1. Can we also show "Blocked Slots" in the resource card, so that
> users
> > > > can easily figure out how many slots are available/blocked/in-use?
> > > > 2. I think "speculative-attempts" is not accurate, because the
> > > > root/fastest current can be a specualtive execution attempt, and in
> > > > this case "speculative-attempts" will contain the intial execution
> > > > attempt. How about name it as "other-concurrent-attempts"?
> > > > 3. I think ArchivedSpeculativeExecutionVertex is not necessarily
> > > > needed. We can rework the ArchivedExecutionVertex to contains a set
> of
> > > > current execution attempts. The set will have one only element in
> > > > non-speculative cases though. In this way, we can have a unified
> > > > processing for ArchivedExecutionVertex in speculative/non-speculative
> > > > cases.
> > > >
> > > > Thanks,
> > > > Zhu
> > > >
> > > > Gen Luo  于2022年7月5日周二 15:10写道:
> > > >
> > > > >
> > > > > Hi everyone,
> > > > >
> > > > > The speculative execution for batch jobs has been proposed and
> accepted
> > > > in
> > > > > FLIP-168[1], as well as the related blocklist mechanism in
> FLIP-224[2].
> > > > As
> > > > > a follow-up step, the Flink Web UI needs to be enhanced to display
> 

Re: [DISCUSS] FLIP-249: Flink Web UI Enhancement for Speculative Execution

2022-07-08 Thread Gen Luo
Hi Zhu,
Thanks for the feedback!

1.Good idea. Users should be more familiar with the slots as the resource
units.

2.You remind me that the "speculative attempts" are execution attempts
started by the SpeculativeScheduler when slot tasks are detected, while the
current execution attempts other than the "most current" one are not really
the speculative attempts. I agree we should modify the field name.

3.ArchivedSpeculativeExecutionVertex seems to be introduced with the
speculative execution to handle the speculative attempts as a part of the
execution history. Since this FLIP is handling the attempts with a more
proper way, I agree that we can remove the
ArchivedSpeculativeExecutionVertex.

Thanks again and I'll update the FLIP later according to these suggestions.

On Thu, Jul 7, 2022 at 4:35 PM Zhu Zhu  wrote:

> Thanks for writing this FLIP and initiating the discussion, Gen, Yun and
> Junhan!
> It will be very useful to have these improvements on the web UI for
> speculative execution users, allowing them to know what is happening.
> I just have a few comment regarding the design details:
>
> 1. Can we also show "Blocked Slots" in the resource card, so that users
> can easily figure out how many slots are available/blocked/in-use?
> 2. I think "speculative-attempts" is not accurate, because the
> root/fastest current can be a specualtive execution attempt, and in
> this case "speculative-attempts" will contain the intial execution
> attempt. How about name it as "other-concurrent-attempts"?
> 3. I think ArchivedSpeculativeExecutionVertex is not necessarily
> needed. We can rework the ArchivedExecutionVertex to contains a set of
> current execution attempts. The set will have one only element in
> non-speculative cases though. In this way, we can have a unified
> processing for ArchivedExecutionVertex in speculative/non-speculative
> cases.
>
> Thanks,
> Zhu
>
> Gen Luo  于2022年7月5日周二 15:10写道:
>
> >
> > Hi everyone,
> >
> > The speculative execution for batch jobs has been proposed and accepted
> in
> > FLIP-168[1], as well as the related blocklist mechanism in FLIP-224[2].
> As
> > a follow-up step, the Flink Web UI needs to be enhanced to display the
> > related information if the speculative execution mechanism is enabled.
> >
> > Junhan Yang, Yun Gao and I would like to start the discussion about the
> Web
> > UI enhancement and the corresponding REST API changes in FLIP-249[3],
> > including:
> > - show the speculative executions in the subtask list and the
> backpressure
> > page, where the fastest is shown directly while others are folded;
> > - show the number of the blocked task managers in the Task Managers and
> > Slots card, when the number is not 0;
> > - show the BLOCKED label in the task manager list and the task manager
> > detail page for the blocked task managers.
> >
> > All changes expect to be transparent to users who don’t use speculative
> > execution.
> >
> > Please see the FLIP page[3] for more details. Looking forward to your
> > feedback.
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job
> > [2]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-224%3A+Blocklist+Mechanism
> > [3]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-249%3A+Flink+Web+UI+Enhancement+for+Speculative+Execution
>


[DISCUSS] FLIP-249: Flink Web UI Enhancement for Speculative Execution

2022-07-05 Thread Gen Luo
Hi everyone,

The speculative execution for batch jobs has been proposed and accepted in
FLIP-168[1], as well as the related blocklist mechanism in FLIP-224[2]. As
a follow-up step, the Flink Web UI needs to be enhanced to display the
related information if the speculative execution mechanism is enabled.

Junhan Yang, Yun Gao and I would like to start the discussion about the Web
UI enhancement and the corresponding REST API changes in FLIP-249[3],
including:
- show the speculative executions in the subtask list and the backpressure
page, where the fastest is shown directly while others are folded;
- show the number of the blocked task managers in the Task Managers and
Slots card, when the number is not 0;
- show the BLOCKED label in the task manager list and the task manager
detail page for the blocked task managers.

All changes expect to be transparent to users who don’t use speculative
execution.

Please see the FLIP page[3] for more details. Looking forward to your
feedback.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-224%3A+Blocklist+Mechanism
[3]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-249%3A+Flink+Web+UI+Enhancement+for+Speculative+Execution


[jira] [Created] (FLINK-28240) NettyShuffleMetricFactory#RequestedMemoryUsageMetric#getValue may throw ArithmeticException when the total segments of NetworkBufferPool is 0

2022-06-24 Thread Gen Luo (Jira)
Gen Luo created FLINK-28240:
---

 Summary: 
NettyShuffleMetricFactory#RequestedMemoryUsageMetric#getValue may throw 
ArithmeticException when the total segments of NetworkBufferPool is 0
 Key: FLINK-28240
 URL: https://issues.apache.org/jira/browse/FLINK-28240
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Affects Versions: 1.15.0
Reporter: Gen Luo


In a single vertex job, the network memory can be set to 0 since the job 
doesn't need it, and in this case the totalNumberOfMemorySegments of the 
NetworkBufferPool will also be 0.

While the NettyShuffleMetricFactory#RequestedMemoryUsageMetric#getValue uses 
the totalNumberOfMemorySegments of NetworkBufferPool as divisor without 
validating, so an ArithmeticException will be thrown when the 
totalNumberOfMemorySegments is 0.

Since 0 network memory is in fact valid for a single vertex job, I suppose the 
RequestedMemoryUsageMetric#getValue should check if the devisor is 0, and 
return 0 as the usage directly in such cases.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


Re: [VOTE] FLIP-232: Add Retry Support For Async I/O In DataStream API

2022-05-30 Thread Gen Luo
+1 (non-binding)

On Mon, May 30, 2022 at 3:50 PM Jark Wu  wrote:

> +1 (binding)
>
> Best,
> Jark
>
> On Mon, 30 May 2022 at 15:40, Lincoln Lee  wrote:
>
> > Dear Flink developers,
> >
> > Thanks for all your feedback for FLIP-232: Add Retry Support For Async
> I/O
> > In DataStream API[1] on the discussion thread[2].
> >
> > I'd like to start a vote for it. The vote will be open for at least 72
> > hours unless there is an objection or not enough votes.
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963
> > [2] https://lists.apache.org/thread/pgm3bf8vd5vqchlm29n6cro0gz4pbd3g
> >
> >
> > Best,
> > Lincoln Lee
> >
>


Re: [DISCUSS] FLIP-232: Add Retry Support For Async I/O In DataStream API

2022-05-23 Thread Gen Luo
Hi Lincoln,

Thanks for the explanation. I understand your thought, but I'm a little
confused by the additional detail.
Is the startTime when the record is processed for the first time? And the
cost time is counted based on it even after a job recovers from a failover
or is restarted? For the failover case, the records may be processed
successfully when normally running, but after some time (probably longer
than the timeout) the job fails and restores, the records in the retry
state will be timeout and discarded immediately. There's also same
situation for the restarting case. I suppose in many cases the timeout will
be less then the time a job may cost to restart, so in these cases the
stored in-flight retry attempts will timeout immediately after the
restarting, making the retry state meaningless. Please let me know if I
mistake somthing.

Lincoln Lee  于 2022年5月24日周二 10:20写道:

> Thanks Gen Luo!
>
> Agree with you that prefer the simpler design.
>
> I’d like to share my thoughts on this choice: whether store the retry state
> or not only affect the recovery logic, not the per-record processing, so I
> just compare the two:
> 1. w/ retry state:  simple recovery but lost precision
> 2. w/o retry state: one more state and little complexly but precise for
> users
> I prefer the second one for the user perspective, the additional complexity
> is manageable.
>
> One detail that not mentioned in the FLIP: we will check if any time left
>  (now() - startTime > timeout) for next attempt, so the real total attempts
> will always less than or equal to maxAttempts and the total cost time <=
> timeout (one special case is job failover takes too long)
>
> For the api, I've updated the FLIP[1]
>
> [1]:
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963
>
> Best,
> Lincoln Lee
>
>
> Gen Luo  于2022年5月23日周一 16:54写道:
>
> > Hi Lincoln,
> >
> > Thanks for the quick reply.
> >
> >
> >
> > 1. I understand when restarting a job with a savepoint, the retry state
> can
> > ensure the total retry attempts and delay is expected. However, when
> > failover happens while a job is running, the remaining attempts recorded
> in
> > the state are actually redid, and of course the total attempts are more
> > than expected. The delay is indeed one of the concerns, but I'm wondering
> > whether the retry state kept here is really important to users or not. In
> > my opinion its benefit is limited but it makes the change much more
> > complex. I would prefer a simpler solution, in which the retry state is
> > still possible to add if the need really arises in the future, but I
> > respect your decision.
> >
> >
> >
> > 2. I think adding a currentAttempts parameter to the method is good
> enough.
> >
> > Lincoln Lee  于 2022年5月23日周一 14:52写道:
> >
> > > Hi Gen Luo,
> > > Thanks a lot for your feedback!
> > >
> > > 1. About the retry state:
> > > I considered dropping the retry state which really simplifies state
> > changes
> > > and avoids compatibility handling. The only reason I changed my mind
> was
> > > that it might be lossy to the user. Elements that has been tried
> several
> > > times but not exhausted its retry opportunities will reset the retry
> > state
> > > after a job failover-restart and start the retry process again (if the
> > > retry condition persists true), which may cause a greater delay for the
> > > retried elements, actually retrying more times and for longer than
> > > expected. (Although in the PoC may also have a special case when
> > > recovering: if the remaining timeout is exhausted for the
> recalculation,
> > it
> > > will execute immediately but will have to register a timeout timer for
> > the
> > > async, here using an extra backoffTimeMillis)
> > > For example, '60s fixed-delay retry if empty result, max-attempts: 5,
> > > timeout 300s'
> > > When checkpointing, some data has been retry 2 times, then suppose the
> > job
> > > is restarted and it takes 2min when the restart succeeds, if we drop
> the
> > > retry state, the worst case will take more 240s(60s * 2 + 2min) delay
> for
> > > users to finish retry.
> > >
> > > For my understanding(please correct me if I missed something), if a job
> > is
> > > resumed from a previous state and the retry strategy is changed, the
> > > elements that need to be recovered in the retry state just needs the
> new
> > > strategy to take over the current attempts and time that has been used,
> > or
> > > giv

Re: [DISCUSS] FLIP-232: Add Retry Support For Async I/O In DataStream API

2022-05-23 Thread Gen Luo
Hi Lincoln,

Thanks for the quick reply.



1. I understand when restarting a job with a savepoint, the retry state can
ensure the total retry attempts and delay is expected. However, when
failover happens while a job is running, the remaining attempts recorded in
the state are actually redid, and of course the total attempts are more
than expected. The delay is indeed one of the concerns, but I'm wondering
whether the retry state kept here is really important to users or not. In
my opinion its benefit is limited but it makes the change much more
complex. I would prefer a simpler solution, in which the retry state is
still possible to add if the need really arises in the future, but I
respect your decision.



2. I think adding a currentAttempts parameter to the method is good enough.

Lincoln Lee  于 2022年5月23日周一 14:52写道:

> Hi Gen Luo,
> Thanks a lot for your feedback!
>
> 1. About the retry state:
> I considered dropping the retry state which really simplifies state changes
> and avoids compatibility handling. The only reason I changed my mind was
> that it might be lossy to the user. Elements that has been tried several
> times but not exhausted its retry opportunities will reset the retry state
> after a job failover-restart and start the retry process again (if the
> retry condition persists true), which may cause a greater delay for the
> retried elements, actually retrying more times and for longer than
> expected. (Although in the PoC may also have a special case when
> recovering: if the remaining timeout is exhausted for the recalculation, it
> will execute immediately but will have to register a timeout timer for the
> async, here using an extra backoffTimeMillis)
> For example, '60s fixed-delay retry if empty result, max-attempts: 5,
> timeout 300s'
> When checkpointing, some data has been retry 2 times, then suppose the job
> is restarted and it takes 2min when the restart succeeds, if we drop the
> retry state, the worst case will take more 240s(60s * 2 + 2min) delay for
> users to finish retry.
>
> For my understanding(please correct me if I missed something), if a job is
> resumed from a previous state and the retry strategy is changed, the
> elements that need to be recovered in the retry state just needs the new
> strategy to take over the current attempts and time that has been used,  or
> give up retry if no retry strategy was set.
> > and can be more compatible when the user restart a job with a changed
> retry strategy.
>
> 2.  About the interface, do you think it would be helpful if add the
> currentAttempts into getBackoffTimeMillis()? e.g.,  long
> getBackoffTimeMillis(int currentAttempts)
> The existing RetryStrategy and RestartBackoffTimeStrategy were in my
> candidate list but not exactly match, and I want to avoid creating the new
> instances for every attempt in RetryStrategy.
>
> WDYT?
>
> Best,
> Lincoln Lee
>
>
> Gen Luo  于2022年5月23日周一 11:37写道:
>
> > Thank Lincoln for the proposal!
> >
> > The FLIP looks good to me. I'm in favor of the timer based
> implementation,
> > and I'd like to share some thoughts.
> >
> > I'm thinking if we have to store the retry status in the state. I suppose
> > the retrying requests can just submit as the first attempt when the job
> > restores from a checkpoint, since in fact the side effect of the retries
> > can not draw back by the restoring. This makes the state simpler and
> makes
> > it unnecessary to do the state migration, and can be more compatible when
> > the user restart a job with a changed retry strategy.
> >
> > Besides, I find it hard to implement a flexible backoff strategy with the
> > current AsyncRetryStrategy interface, for example an
> > ExponentialBackoffRetryStrategy. Maybe we can add a parameter of the
> > attempt or just use the org.apache.flink.util.concurrent.RetryStrategy to
> > take the place of the retry strategy part in the AsyncRetryStrategy?
> >
> > Lincoln Lee  于 2022年5月20日周五 14:24写道:
> >
> > > Hi everyone,
> > >
> > >By comparing the two internal implementations of delayed retries, we
> > > prefer the timer-based solution, which obtains precise delay control
> > > through simple logic and only needs to pay (what we consider to be
> > > acceptable) timer instance cost for the retry element.  The FLIP[1] doc
> > has
> > > been updated.
> > >
> > > [1]:
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963
> > >
> > > Best,
> > > Lincoln Lee
> > >
> > >
> > > Lincoln Lee  于2022年5月16日周一 15:09写道:
> > >
> > > > Hi Jinsong,
> > >

Re: [DISCUSS] FLIP-232: Add Retry Support For Async I/O In DataStream API

2022-05-22 Thread Gen Luo
Thank Lincoln for the proposal!

The FLIP looks good to me. I'm in favor of the timer based implementation,
and I'd like to share some thoughts.

I'm thinking if we have to store the retry status in the state. I suppose
the retrying requests can just submit as the first attempt when the job
restores from a checkpoint, since in fact the side effect of the retries
can not draw back by the restoring. This makes the state simpler and makes
it unnecessary to do the state migration, and can be more compatible when
the user restart a job with a changed retry strategy.

Besides, I find it hard to implement a flexible backoff strategy with the
current AsyncRetryStrategy interface, for example an
ExponentialBackoffRetryStrategy. Maybe we can add a parameter of the
attempt or just use the org.apache.flink.util.concurrent.RetryStrategy to
take the place of the retry strategy part in the AsyncRetryStrategy?

Lincoln Lee  于 2022年5月20日周五 14:24写道:

> Hi everyone,
>
>By comparing the two internal implementations of delayed retries, we
> prefer the timer-based solution, which obtains precise delay control
> through simple logic and only needs to pay (what we consider to be
> acceptable) timer instance cost for the retry element.  The FLIP[1] doc has
> been updated.
>
> [1]:
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963
>
> Best,
> Lincoln Lee
>
>
> Lincoln Lee  于2022年5月16日周一 15:09写道:
>
> > Hi Jinsong,
> >
> > Good question!
> >
> > The delayQueue is very similar to incompleteElements in
> > UnorderedStreamElementQueue, it only records the references of in-flight
> > retry elements, the core value is for the ease of a fast scan when force
> > flush during endInput and less refactor for existing logic.
> >
> > Users needn't configure a new capacity for the delayQueue, just turn the
> > original one up (if needed).
> > And separately store the input data and retry state is mainly to
> implement
> > backwards compatibility. The first version of Poc, I used a single
> combined
> > state in order to reduce state costs, but hard to keep compatibility, and
> > changed  into two via Yun Gao's concern about the compatibility.
> >
> > Best,
> > Lincoln Lee
> >
> >
> > Jingsong Li  于2022年5月16日周一 14:48写道:
> >
> >> Thanks  Lincoln for your reply.
> >>
> >> I'm a little confused about the relationship between Ordered/Unordered
> >> Queue and DelayQueue. Why do we need to have a DelayQueue?
> >> Can we remove the DelayQueue and put the state of the retry in the
> >> StreamRecordQueueEntry (seems like it's already in the FLIP)
> >> The advantages of doing this are:
> >> 1. twice less data is stored in state
> >> 2. the concept is unified, the user only needs to configure one queue
> >> capacity
> >>
> >> Best,
> >> Jingsong
> >>
> >> On Mon, May 16, 2022 at 12:10 PM Lincoln Lee 
> >> wrote:
> >>
> >> > Hi Jinsong,
> >> > Thanks for your feedback! Let me try to answer the two questions:
> >> >
> >> > For q1: Motivation
> >> > Yes, users can implement retries themselves based on the external
> async
> >> > client, but this requires each user to do similar things, and if we
> can
> >> > support retries uniformly, user code would become much simpler.
> >> >
> >> > > The real external call should happen in the asynchronous thread.
> >> > My question is: If the user makes a retry in this asynchronous thread
> by
> >> > themselves, is there a difference between this and the current FLIP's?
> >> >
> >> >
> >> > For q2: Block Main Thread
> >> > You're right, the queue data will be stored in the ListState which is
> an
> >> > OperateState, though in fact, for ListState storage, the theoretical
> >> upper
> >> > limit is Integer.MAX_VALUE, but we can't increase the queue capacity
> too
> >> > big in production because the risk of OOM increases when the queue
> >> capacity
> >> > grows, and increases the task parallelism maybe a more viable way when
> >> > encounter too many retry items for a single task.
> >> > We recommend using a proper estimate of queue capacity based on the
> >> formula
> >> > like this: 'inputRate * retryRate * avgRetryDuration', and also the
> >> actual
> >> > checkpoint duration in runtime.
> >> >
> >> > > If I understand correctly, the retry queue will be put into
> ListState,
> >> > this
> >> > state is OperatorState? As far as I know, OperatorState does not have
> >> the
> >> > ability to store a lot of data.
> >> > So after we need to retry more data, we should need to block the main
> >> > thread? What is the maximum size of the default retry queue?
> >> >
> >> >
> >> >
> >> > Best,
> >> > Lincoln Lee
> >> >
> >> >
> >> > Jingsong Li  于2022年5月16日周一 10:31写道:
> >> >
> >> > > Thank Lincoln for the proposal.
> >> > >
> >> > > ## Motivation:
> >> > >
> >> > > > asyncInvoke and callback functions are executed synchronously by
> the
> >> > main
> >> > > thread, which is not suitable adding long time blocking operations,
> >> and
> >> > > introducing additional thread will bring extra complexity 

[jira] [Created] (FLINK-26610) FileSink can not upgrade from 1.13 if the uid of the origin sink is not set.

2022-03-11 Thread Gen Luo (Jira)
Gen Luo created FLINK-26610:
---

 Summary: FileSink can not upgrade from 1.13 if the uid of the 
origin sink is not set.
 Key: FLINK-26610
 URL: https://issues.apache.org/jira/browse/FLINK-26610
 Project: Flink
  Issue Type: Bug
  Components: Connectors / FileSystem
Affects Versions: 1.15.0
Reporter: Gen Luo
 Fix For: 1.15.0






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26580) FileSink CompactCoordinator add illegal committable as toCompacted.

2022-03-10 Thread Gen Luo (Jira)
Gen Luo created FLINK-26580:
---

 Summary: FileSink CompactCoordinator add illegal committable as 
toCompacted.
 Key: FLINK-26580
 URL: https://issues.apache.org/jira/browse/FLINK-26580
 Project: Flink
  Issue Type: Bug
  Components: Connectors / FileSystem
Affects Versions: 1.15.0
Reporter: Gen Luo
 Fix For: 1.15.0






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26564) CompactCoordinatorStateHandler doesn't properly handle the cleanup-in-progress requests.

2022-03-09 Thread Gen Luo (Jira)
Gen Luo created FLINK-26564:
---

 Summary: CompactCoordinatorStateHandler doesn't properly handle 
the cleanup-in-progress requests.
 Key: FLINK-26564
 URL: https://issues.apache.org/jira/browse/FLINK-26564
 Project: Flink
  Issue Type: Bug
  Components: Connectors / FileSystem
Affects Versions: 1.15.0
Reporter: Gen Luo
 Fix For: 1.15.0


It is found in FLINK-26322 that the CompactCoordinatorStateHandler doesn't 
properly handle the cleanup-in-progress requests but submit them as compacting 
requests. The issue happens when a job with compaction enabled is 
stop-with-savepoint and restarted with compaction disabled.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26440) CompactorOperatorStateHandler can not work with unaligned checkpoint

2022-03-01 Thread Gen Luo (Jira)
Gen Luo created FLINK-26440:
---

 Summary: CompactorOperatorStateHandler can not work with unaligned 
checkpoint
 Key: FLINK-26440
 URL: https://issues.apache.org/jira/browse/FLINK-26440
 Project: Flink
  Issue Type: Bug
  Components: Connectors / FileSystem
Reporter: Gen Luo


As mentioned in FLINK-26314, CompactorOperatorStateHandler can not work with 
unaligned checkpoint currently. Though FLINK-26314 is actually caused by 
another issue in the writer, we should still fix this issue.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26394) CheckpointCoordinator.isTriggering can not be reset if a checkpoint expires while the checkpointCoordinator task is queuing in the SourceCoordinator executor.

2022-02-28 Thread Gen Luo (Jira)
Gen Luo created FLINK-26394:
---

 Summary: CheckpointCoordinator.isTriggering can not be reset if a 
checkpoint expires while the checkpointCoordinator task is queuing in the 
SourceCoordinator executor.
 Key: FLINK-26394
 URL: https://issues.apache.org/jira/browse/FLINK-26394
 Project: Flink
  Issue Type: Bug
Reporter: Gen Luo


We found a job can no longer trigger checkpoints or savepoints after recovering 
from a checkpoint timeout failure. After investigation, we found that the 
`isTriggering` flag is CheckpointCoordinator is true while no checkpoint is 
actually doing, and the root cause is as following:

 
 # The job uses a source whose coordinator needs to scan a table while 
requesting splits, which may cost more than 10min. The source coordinator 
executor thread will be occupied by `handleSplitRequest`, and 
`checkpointCoordinator` task of the first checkpoint will be queued after it.
 # 10min later, the checkpoint is expired, removing the pending checkpoint from 
the coordinator, and triggering a global failover. But the `isTriggering` is 
not reset here. It can only be reset after the checkpoint completable future is 
done, which is now holding only by the `checkpointCoordinator` task in the 
queue, along with the PendingCheckpoint.
 # Then the job failover, and the RecreateOnResetOperatorCoordinator will 
recreate a new SourceCoordinator, and close the previous coordinator 
asynchronously. Timeout for the closing is fixed to 60s. SourceCoordinator will 
try to `shutdown` the coordinator executor then `awaitTermination`. If the 
tasks are done within 60s, nothing wrong will happen.
 # But if the closing method is stuck for more than 60s (which in this case is 
actually stuck in the `handleSplitRequest`), the async closing thread will be 
interrupted and SourceCoordinator will `shutdownNow` the executor. All tasks 
queuing will be discarded, including the `checkpointCoordinator` task.
 # Then the checkpoint completable future will never complete and the 
`isTriggering` flag will never be reset.

 

I see that the closing part of SourceCoordinator is recently refactored. But I 
find the new implementation also has this issue. And since it calls 
`shutdownNow` directly, the issue should be easier to encounter.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26235) CompactingFileWriter and PendingFileRecoverable should not be exposed to users.

2022-02-17 Thread Gen Luo (Jira)
Gen Luo created FLINK-26235:
---

 Summary: CompactingFileWriter and PendingFileRecoverable should 
not be exposed to users.
 Key: FLINK-26235
 URL: https://issues.apache.org/jira/browse/FLINK-26235
 Project: Flink
  Issue Type: Bug
  Components: Connectors / FileSystem
Reporter: Gen Luo






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26180) Update docs to introduce the compaction for FileSink

2022-02-16 Thread Gen Luo (Jira)
Gen Luo created FLINK-26180:
---

 Summary: Update docs to introduce the compaction for FileSink
 Key: FLINK-26180
 URL: https://issues.apache.org/jira/browse/FLINK-26180
 Project: Flink
  Issue Type: Sub-task
Reporter: Gen Luo






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] Checkpointing (partially) failing jobs

2022-02-08 Thread Gen Luo
Hi Chesney and Piotr,

I have seen some jobs with tens of independent vertices that process data
for the same business. The sub jobs should be started or stopped together.
Splitting them into separate jobs means the user has to manage them
separately. But in fact the jobs were running in per-job mode, and maybe
there's now a better choice. Let's see if others have some more valuable
cases.

By the way, I'd like to point out that if we can checkpoint pipeline
regions individually, even a job with only one job graph, if it has no
all-to-all edges connecting all vertices into one pipeline region, may
benefit from this effort, since any failure, long-time pause or
backpressure in a pipeline region will not block the checkpointing of other
regions.

And @Piotr, this is why I think that this discussion may relate to the
task-local checkpoints. Both of them require to checkpoint parts of a job
individually, and can restore only a part of the job, without breaking the
consistency. The main difference is that to maintain the consistency,
task-local checkpoints have to handle the channel data. This is omitted in
the approximate task-local recovery since the consistency is not
guaranteed, and this is why the approximate task-local recovery may use a
part of the global snapshot, rather than individually checkpointing each
subtask. However, in the pipeline region checkpoints, consistency is
guaranteed naturally. We can focus on how to checkpoint individually, the
effort of which is probably necessary if we want to implement the
task-local checkpointing with consistency guarantee.

On Tue, Feb 8, 2022 at 7:41 PM 丛鹏  wrote:

> hi guys,If I understand it correctly, will only some checkpoints be
> recovered when there is an error in the Flink batch?
>
> Piotr Nowojski  于2022年2月8日周二 19:05写道:
>
>> Hi,
>>
>> I second Chesnay's comment and would like to better understand the
>> motivation behind this. At the surface it sounds to me like this might
>> require quite a bit of work for a very narrow use case.
>>
>> At the same time I have a feeling that Yuan, you are mixing this feature
>> request (checkpointing subgraphs/pipeline regions independently) and a
>> very
>> very different issue of "task local checkpoints"? Those problems are kind
>> of similar, but not quite.
>>
>> Best,
>> Piotrek
>>
>> wt., 8 lut 2022 o 11:44 Chesnay Schepler  napisał(a):
>>
>> > Could someone expand on these operational issues you're facing when
>> > achieving this via separate jobs?
>> >
>> > I feel like we're skipping a step, arguing about solutions without even
>> > having discussed the underlying problem.
>> >
>> > On 08/02/2022 11:25, Gen Luo wrote:
>> > > Hi,
>> > >
>> > > @Yuan
>> > > Do you mean that there should be no shared state between source
>> subtasks?
>> > > Sharing state between checkpoints of a specific subtask should be
>> fine.
>> > >
>> > > Sharing state between subtasks of a task can be an issue, no matter
>> > whether
>> > > it's a source. That's also what I was afraid of in the previous
>> replies.
>> > In
>> > > one word, if the behavior of a pipeline region can somehow influence
>> the
>> > > state of other pipeline regions, their checkpoints have to be aligned
>> > > before rescaling.
>> > >
>> > > On Tue, Feb 8, 2022 at 5:27 PM Yuan Mei 
>> wrote:
>> > >
>> > >> Hey Folks,
>> > >>
>> > >> Thanks for the discussion!
>> > >>
>> > >> *Motiviation and use cases*
>> > >> I think motiviation and use cases are very clear and I do not have
>> > doubts
>> > >> on this part.
>> > >> A typical use case is ETL with two-phase-commit, hundreds of
>> partitions
>> > can
>> > >> be blocked by a single straggler (a single task's checkpoint abortion
>> > can
>> > >> affect all, not necessary failure).
>> > >>
>> > >> *Source offset redistribution*
>> > >> As for the known sources & implementation for Flink, I can not find a
>> > case
>> > >> that does not work, *for now*.
>> > >> I need to dig a bit more: how splits are tracked assigned, not
>> > successfully
>> > >> processed, succesffully processed e.t.c.
>> > >> I guess it is a single shared source OPCoordinator. And how this
>> > *shared*
>> > >> state (between tasks) is preserved?
>> > >>
>> > >> *Input partition/splits treat

Re: [DISCUSS] Checkpointing (partially) failing jobs

2022-02-08 Thread Gen Luo
Hi,

@Yuan
Do you mean that there should be no shared state between source subtasks?
Sharing state between checkpoints of a specific subtask should be fine.

Sharing state between subtasks of a task can be an issue, no matter whether
it's a source. That's also what I was afraid of in the previous replies. In
one word, if the behavior of a pipeline region can somehow influence the
state of other pipeline regions, their checkpoints have to be aligned
before rescaling.

On Tue, Feb 8, 2022 at 5:27 PM Yuan Mei  wrote:

> Hey Folks,
>
> Thanks for the discussion!
>
> *Motiviation and use cases*
> I think motiviation and use cases are very clear and I do not have doubts
> on this part.
> A typical use case is ETL with two-phase-commit, hundreds of partitions can
> be blocked by a single straggler (a single task's checkpoint abortion can
> affect all, not necessary failure).
>
> *Source offset redistribution*
> As for the known sources & implementation for Flink, I can not find a case
> that does not work, *for now*.
> I need to dig a bit more: how splits are tracked assigned, not successfully
> processed, succesffully processed e.t.c.
> I guess it is a single shared source OPCoordinator. And how this *shared*
> state (between tasks) is preserved?
>
> *Input partition/splits treated completely independent from each other*
> This part I am still not sure, as mentioned if we have shared state of
> source in the above section.
>
> To Thomas:
> > In Yuan's example, is there a reason why CP8 could not be promoted to
> > CP10 by the coordinator for PR2 once it receives the notification that
> > CP10 did not complete? It appears that should be possible since in its
> > effect it should be no different than no data processed between CP8
> >  and CP10?
>
> Not sure what "promoted" means here, but
> 1. I guess it does not matter whether it is CP8 or CP10 any more,
> if no shared state in source, as exactly what you meantinoed,
> "it should be no different than no data processed between CP8 and CP10"
>
> 2. I've noticed that from this question there is a gap between
> "*allow aborted/failed checkpoint in independent sub-graph*" and
> my intention: "*independent sub-graph checkpointing indepently*"
>
> Best
> Yuan
>
>
> On Tue, Feb 8, 2022 at 11:34 AM Gen Luo  wrote:
>
> > Hi,
> >
> > I'm thinking about Yuan's case. Let's assume that the case is running in
> > current Flink:
> > 1. CP8 finishes
> > 2. For some reason, PR2 stops consuming records from the source (but is
> not
> > stuck), and PR1 continues consuming new records.
> > 3. CP9 and CP10 finish
> > 4. PR2 starts to consume quickly to catch up with PR1, and reaches the
> same
> > final status with that in Yuan's case before CP11 starts.
> >
> > I support that in this case, the status of the job can be the same as in
> > Yuan's case, and the snapshot (including source states) taken at CP10
> > should be the same as the composed global snapshot in Yuan's case, which
> is
> > combining CP10 of PR1 and CP8 of PR2. This should be true if neither
> failed
> > checkpointing nor uncommitted consuming have side effects, both of which
> > can break the exactly-once semantics when replaying. So I think there
> > should be no difference between rescaling the combined global snapshot or
> > the globally taken one, i.e. if the input partitions are not independent,
> > we are probably not able to rescale the source state in the current Flink
> > eiter.
> >
> > And @Thomas, I do agree that the operational burden is
> > significantly reduced, while I'm a little afraid that checkpointing the
> > subgraphs individually may increase most of the runtime overhead back
> > again. Maybe we can find a better way to implement this.
> >
> > On Tue, Feb 8, 2022 at 5:11 AM Thomas Weise  wrote:
> >
> > > Hi,
> > >
> > > Thanks for opening this discussion! The proposed enhancement would be
> > > interesting for use cases in our infrastructure as well.
> > >
> > > There are scenarios where it makes sense to have multiple disconnected
> > > subgraphs in a single job because it can significantly reduce the
> > > operational burden as well as the runtime overhead. Since we allow
> > > subgraphs to recover independently, then why not allow them to make
> > > progress independently also, which would imply that checkpointing must
> > > succeed for non affected subgraphs as certain behavior is tied to
> > > checkpoint completion, like Kafka offset commit, file output etc.
> > >
> > > As for sourc

Re: [DISCUSS] Checkpointing (partially) failing jobs

2022-02-07 Thread Gen Luo
ut I am not sure whether we can make that assumption*.
> > >
> > > If not, the rescaling cannot happen until PR1 and PR2 are aligned with
> CPs.
> > >
> > > Best
> > > -Yuan
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Mon, Feb 7, 2022 at 4:17 PM Till Rohrmann 
> wrote:
> > >
> > > > Hi everyone,
> > > >
> > > > Yuan and Gen could you elaborate why rescaling is a problem if we say
> > > that
> > > > separate pipelined regions can take checkpoints independently?
> > > > Conceptually, I somehow think that a pipelined region that is failed
> and
> > > > cannot create a new checkpoint is more or less the same as a
> pipelined
> > > > region that didn't get new input or a very very slow pipelined region
> > > which
> > > > couldn't read new records since the last checkpoint (assuming that
> the
> > > > checkpoint coordinator can create a global checkpoint by combining
> > > > individual checkpoints (e.g. taking the last completed checkpoint
> from
> > > each
> > > > pipelined region)). If this comparison is correct, then this would
> mean
> > > > that we have rescaling problems under the latter two cases.
> > > >
> > > > Cheers,
> > > > Till
> > > >
> > > > On Mon, Feb 7, 2022 at 8:55 AM Gen Luo  wrote:
> > > >
> > > > > Hi Gyula,
> > > > >
> > > > > Thanks for sharing the idea. As Yuan mentioned, I think we can
> discuss
> > > > this
> > > > > within two scopes. One is the job subgraph, the other is the
> execution
> > > > > subgraph, which I suppose is the same as PipelineRegion.
> > > > >
> > > > > An idea is to individually checkpoint the PipelineRegions, for the
> > > > > recovering in a single run.
> > > > >
> > > > > Flink has now supported PipelineRegion based failover, with a
> subset
> > > of a
> > > > > global checkpoint snapshot. The checkpoint barriers are spread
> within a
> > > > > PipelineRegion, so the checkpointing of individual PipelineRegions
> is
> > > > > actually independent. Since in a single run of a job, the
> > > PipelineRegions
> > > > > are fixed, we can individually checkpoint separated
> PipelineRegions,
> > > > > despite what status the other PipelineRegions are, and use a
> snapshot
> > > of
> > > > a
> > > > > failing region to recover, instead of the subset of a global
> snapshot.
> > > > This
> > > > > can support separated job subgraphs as well, since they will also
> be
> > > > > separated into different PipelineRegions. I think this can fulfill
> your
> > > > > needs.
> > > > >
> > > > > In fact the individual snapshots of all PipelineRegions can form a
> > > global
> > > > > snapshot, and the alignment of snapshots of individual regions is
> not
> > > > > necessary. But rescaling this global snapshot can be potentially
> > > > complex. I
> > > > > think it's better to use the individual snapshots in a single run,
> and
> > > > take
> > > > > a global checkpoint/savepoint before restarting the job, rescaling
> it
> > > or
> > > > > not.
> > > > >
> > > > > A major issue of this plan is that it breaks the checkpoint
> mechanism
> > > of
> > > > > Flink. As far as I know, even in the approximate recovery, the
> snapshot
> > > > > used to recover a single task is still a part of a global
> snapshot. To
> > > > > implement the individual checkpointing of PipelineRegions, there
> may
> > > need
> > > > > to be a checkpoint coordinator for each PipelineRegion, and a new
> > > global
> > > > > checkpoint coordinator. When the scale goes up, there can be many
> > > > > individual regions, which can be a big burden to the job manager.
> The
> > > > > meaning of the checkpoint id will also be changed, which can affect
> > > many
> > > > > aspects. There can be lots of work and risks, and the risks still
> exist
> > > > if
> > > > > we only individually checkpoint separated job subgraphs, since the
> > > > > mechanism is still broken.

Re: [DISCUSS] Checkpointing (partially) failing jobs

2022-02-07 Thread Gen Luo
Hi Till,

I agree that a failing task is much like a very slow or deadlock task to
the checkpointing. The main difference is whether a checkpoint of the
region the task in can be triggered. Triggering a checkpoint on a failing
region makes no sense since the checkpoint should be discarded right away.
But we can still compose a global checkpoint with the new snapshots of
other regions taken when the region is failing, and the former successful
snapshot of this failing region. This global snapshot is still valid and
can be rescaled like a normal one, if the normal ones are possible to
rescale.

As far as I know some snapshotting methods are using or depending on the
ascending checkpoint id. Checkpointing individually probably means to count
the checkpoint id individually. Composing snapshots of different checkpoint
ids may cause errors.

I am also afraid that there might be issues with the shared states, though
I can't figure out a case right now.

On Mon, Feb 7, 2022 at 4:17 PM Till Rohrmann  wrote:

> Hi everyone,
>
> Yuan and Gen could you elaborate why rescaling is a problem if we say that
> separate pipelined regions can take checkpoints independently?
> Conceptually, I somehow think that a pipelined region that is failed and
> cannot create a new checkpoint is more or less the same as a pipelined
> region that didn't get new input or a very very slow pipelined region which
> couldn't read new records since the last checkpoint (assuming that the
> checkpoint coordinator can create a global checkpoint by combining
> individual checkpoints (e.g. taking the last completed checkpoint from each
> pipelined region)). If this comparison is correct, then this would mean
> that we have rescaling problems under the latter two cases.
>
> Cheers,
> Till
>
> On Mon, Feb 7, 2022 at 8:55 AM Gen Luo  wrote:
>
> > Hi Gyula,
> >
> > Thanks for sharing the idea. As Yuan mentioned, I think we can discuss
> this
> > within two scopes. One is the job subgraph, the other is the execution
> > subgraph, which I suppose is the same as PipelineRegion.
> >
> > An idea is to individually checkpoint the PipelineRegions, for the
> > recovering in a single run.
> >
> > Flink has now supported PipelineRegion based failover, with a subset of a
> > global checkpoint snapshot. The checkpoint barriers are spread within a
> > PipelineRegion, so the checkpointing of individual PipelineRegions is
> > actually independent. Since in a single run of a job, the PipelineRegions
> > are fixed, we can individually checkpoint separated PipelineRegions,
> > despite what status the other PipelineRegions are, and use a snapshot of
> a
> > failing region to recover, instead of the subset of a global snapshot.
> This
> > can support separated job subgraphs as well, since they will also be
> > separated into different PipelineRegions. I think this can fulfill your
> > needs.
> >
> > In fact the individual snapshots of all PipelineRegions can form a global
> > snapshot, and the alignment of snapshots of individual regions is not
> > necessary. But rescaling this global snapshot can be potentially
> complex. I
> > think it's better to use the individual snapshots in a single run, and
> take
> > a global checkpoint/savepoint before restarting the job, rescaling it or
> > not.
> >
> > A major issue of this plan is that it breaks the checkpoint mechanism of
> > Flink. As far as I know, even in the approximate recovery, the snapshot
> > used to recover a single task is still a part of a global snapshot. To
> > implement the individual checkpointing of PipelineRegions, there may need
> > to be a checkpoint coordinator for each PipelineRegion, and a new global
> > checkpoint coordinator. When the scale goes up, there can be many
> > individual regions, which can be a big burden to the job manager. The
> > meaning of the checkpoint id will also be changed, which can affect many
> > aspects. There can be lots of work and risks, and the risks still exist
> if
> > we only individually checkpoint separated job subgraphs, since the
> > mechanism is still broken. If that is what you need, maybe separating
> them
> > into different jobs is an easier and better choice, as Caizhi and Yuan
> > mentioned.
> >
> > On Mon, Feb 7, 2022 at 11:39 AM Yuan Mei  wrote:
> >
> > > Hey Gyula,
> > >
> > > That's a very interesting idea. The discussion about the `Individual`
> vs
> > > `Global` checkpoint was raised before, but the main concern was from
> two
> > > aspects:
> > >
> > > - Non-deterministic replaying may lead to an inconsistent view of
> > > checkpoint
> 

Re: [DISCUSS] Checkpointing (partially) failing jobs

2022-02-06 Thread Gen Luo
Hi Gyula,

Thanks for sharing the idea. As Yuan mentioned, I think we can discuss this
within two scopes. One is the job subgraph, the other is the execution
subgraph, which I suppose is the same as PipelineRegion.

An idea is to individually checkpoint the PipelineRegions, for the
recovering in a single run.

Flink has now supported PipelineRegion based failover, with a subset of a
global checkpoint snapshot. The checkpoint barriers are spread within a
PipelineRegion, so the checkpointing of individual PipelineRegions is
actually independent. Since in a single run of a job, the PipelineRegions
are fixed, we can individually checkpoint separated PipelineRegions,
despite what status the other PipelineRegions are, and use a snapshot of a
failing region to recover, instead of the subset of a global snapshot. This
can support separated job subgraphs as well, since they will also be
separated into different PipelineRegions. I think this can fulfill your
needs.

In fact the individual snapshots of all PipelineRegions can form a global
snapshot, and the alignment of snapshots of individual regions is not
necessary. But rescaling this global snapshot can be potentially complex. I
think it's better to use the individual snapshots in a single run, and take
a global checkpoint/savepoint before restarting the job, rescaling it or
not.

A major issue of this plan is that it breaks the checkpoint mechanism of
Flink. As far as I know, even in the approximate recovery, the snapshot
used to recover a single task is still a part of a global snapshot. To
implement the individual checkpointing of PipelineRegions, there may need
to be a checkpoint coordinator for each PipelineRegion, and a new global
checkpoint coordinator. When the scale goes up, there can be many
individual regions, which can be a big burden to the job manager. The
meaning of the checkpoint id will also be changed, which can affect many
aspects. There can be lots of work and risks, and the risks still exist if
we only individually checkpoint separated job subgraphs, since the
mechanism is still broken. If that is what you need, maybe separating them
into different jobs is an easier and better choice, as Caizhi and Yuan
mentioned.

On Mon, Feb 7, 2022 at 11:39 AM Yuan Mei  wrote:

> Hey Gyula,
>
> That's a very interesting idea. The discussion about the `Individual` vs
> `Global` checkpoint was raised before, but the main concern was from two
> aspects:
>
> - Non-deterministic replaying may lead to an inconsistent view of
> checkpoint
> - It is not easy to form a clear cut of past and future and hence no clear
> cut of where the start point of the source should begin to replay from.
>
> Starting from independent subgraphs as you proposed may be a good starting
> point. However, when we talk about subgraph, do we mention it as a job
> subgraph (each vertex is one or more operators) or execution subgraph (each
> vertex is a task instance)?
>
> If it is a job subgraph, then indeed, why not separate it into multiple
> jobs as Caizhi mentioned.
> If it is an execution subgraph, then it is difficult to handle rescaling
> due to inconsistent views of checkpoints between tasks of the same
> operator.
>
> `Individual/Subgraph Checkpointing` is definitely an interesting direction
> to think of, and I'd love to hear more from you!
>
> Best,
>
> Yuan
>
>
>
>
>
>
>
> On Mon, Feb 7, 2022 at 10:16 AM Caizhi Weng  wrote:
>
> > Hi Gyula!
> >
> > Thanks for raising this discussion. I agree that this will be an
> > interesting feature but I actually have some doubts about the motivation
> > and use case. If there are multiple individual subgraphs in the same job,
> > why not just distribute them to multiple jobs so that each job contains
> > only one individual graph and can now fail without disturbing the others?
> >
> >
> > Gyula Fóra  于2022年2月7日周一 05:22写道:
> >
> > > Hi all!
> > >
> > > At the moment checkpointing only works for healthy jobs with all
> running
> > > (or some finished) tasks. This sounds reasonable in most cases but
> there
> > > are a few applications where it would make sense to checkpoint failing
> > jobs
> > > as well.
> > >
> > > Due to how the checkpointing mechanism works, subgraphs that have a
> > failing
> > > task cannot be checkpointed without violating the exactly-once
> semantics.
> > > However if the job has multiple independent subgraphs (that are not
> > > connected to each other), even if one subgraph is failing, the other
> > > completely running one could be checkpointed. In these cases the tasks
> of
> > > the failing subgraph could simply inherit the last successful
> checkpoint
> > > metadata (before they started failing). This logic would produce a
> > > consistent checkpoint.
> > >
> > > The job as a whole could now make stateful progress even if some
> > subgraphs
> > > are constantly failing. This can be very valuable if for some reason
> the
> > > job has a larger number of independent subgraphs that are expected to
> > fail
> > > 

Re: [VOTE] FLIP-205: Support cache in DataStream for Batch Processing

2022-01-20 Thread Gen Luo
+1 (non-binding)

On Thu, Jan 20, 2022 at 3:26 PM Yun Gao 
wrote:

> +1 (binding)
>
> Thanks Xuannan for driving this!
>
> Best,
> Yun
>
>
> --
> From:David Morávek 
> Send Time:2022 Jan. 20 (Thu.) 15:12
> To:dev 
> Subject:Re: [VOTE] FLIP-205: Support cache in DataStream for Batch
> Processing
>
> +1 (non-binding)
>
> D.
>
> On Thu 20. 1. 2022 at 4:09, Xuannan Su  wrote:
>
> > Hi devs,
> >
> > I would like to start the vote for FLIP-205 [1], which was discussed and
> > reached a consensus in the discussion thread [2].
> >
> > The vote will be open for at least 72h, unless there is an objection or
> not
> > enough votes.
> >
> > Best,
> > Xuannan
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-205%3A+Support+Cache+in+DataStream+for+Batch+Processing
> > [2] https://lists.apache.org/thread/sqxyb5bh99f904g8zpj8tv60m75dmyox
> >
>


Re: [DISCUSS] FLIP-205: Support cache in DataStream for Batch Processing

2022-01-06 Thread Gen Luo
t; > it should not be a problem with CachedDataStream.
> >
> > For Gen:
> >
> > - Relation between FLIP-205 and FLIP-188
> > Although it feels like dynamic table and caching are similar in the
> > sense that they let user reuse come intermediate result, they target
> > different use cases. The dynamic table is targeting the use case where
> > users want to share a dynamic updating intermediate result across
> > multiple applications. It is some meaningful data that can be consumed
> > by different Flink applications and Flink jobs. While caching is
> > targeting the use case where users know that all the sources are
> > bounded and static, and caching is only used to avoid re-computing the
> > intermediate result. And the cached intermediate result is only
> > meaningful crossing jobs in the same application.
> >
> > Dynamic table and caching can be used together. For example, in a
> > machine learning scenario, we can have a Stream job that is generating
> > some training samples. And we can create a dynamic table for the
> > training sample. And we run a Flink application every hour to do some
> > data analysis on the training sample generated in the last hour. The
> > Flink application consists of multiple batch jobs and the batch jobs
> > share some intermediate results, so users can use cache to avoid
> > re-computation. The intermediate result is not meaningful outside of
> > the application. And the cache will be discarded after the application
> > is finished.
> >
> > [1]
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#execution-batch-shuffle-mode
> >
> >
> > On Thu, Dec 30, 2021 at 7:00 PM Gen Luo  wrote:
> > >
> > > Hi Xuannan,
> > >
> > > I found FLIP-188[1] that is aiming to introduce a built-in dynamic
> table
> > > storage, which provides a unified changelog & table representation.
> > Tables
> > > stored there can be used in further ad-hoc queries. To my
> understanding,
> > > it's quite like an implementation of caching in Table API, and the
> ad-hoc
> > > queries are somehow like further steps in an interactive program.
> > >
> > > As you replied, caching at Table/SQL API is the next step, as a part of
> > > interactive programming in Table API, which we all agree is the major
> > > scenario. What do you think about the relation between it and FLIP-188?
> > >
> > > [1]
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage
> > >
> > >
> > > On Wed, Dec 29, 2021 at 7:53 PM Xuannan Su 
> > wrote:
> > >
> > > > Hi David,
> > > >
> > > > Thanks for sharing your thoughts.
> > > >
> > > > You are right that most people tend to use high-level API for
> > > > interactive data exploration. Actually, there is
> > > > the FLIP-36 [1] covering the cache API at Table/SQL API. As far as I
> > > > know, it has been accepted but hasn’t been implemented. At the time
> > > > when it is drafted, DataStream did not support Batch mode but Table
> > > > API does.
> > > >
> > > > Now that the DataStream API does support batch processing, I think we
> > > > can focus on supporting cache at DataStream first. It is still
> > > > valuable for DataStream users and most of the work we do in this FLIP
> > > > can be reused. So I want to limit the scope of this FLIP.
> > > >
> > > > After caching is supported at DataStream, we can continue from where
> > > > FLIP-36 left off to support caching at Table/SQL API. We might have
> to
> > > > re-vote FLIP-36 or draft a new FLIP. What do you think?
> > > >
> > > > Best,
> > > > Xuannan
> > > >
> > > > [1]
> > > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
> > > >
> > > >
> > > >
> > > > On Wed, Dec 29, 2021 at 6:08 PM David Morávek 
> wrote:
> > > > >
> > > > > Hi Xuannan,
> > > > >
> > > > > thanks for drafting this FLIP.
> > > > >
> > > > > One immediate thought, from what I've seen for interactive data
> > > > exploration
> > > > > with Spark, most people tend to use the higher level APIs, that
> > allow for
> > > > > faster prot

Re: [DISCUSS] FLIP-205: Support cache in DataStream for Batch Processing

2021-12-30 Thread Gen Luo
Hi Xuannan,

I found FLIP-188[1] that is aiming to introduce a built-in dynamic table
storage, which provides a unified changelog & table representation. Tables
stored there can be used in further ad-hoc queries. To my understanding,
it's quite like an implementation of caching in Table API, and the ad-hoc
queries are somehow like further steps in an interactive program.

As you replied, caching at Table/SQL API is the next step, as a part of
interactive programming in Table API, which we all agree is the major
scenario. What do you think about the relation between it and FLIP-188?

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage


On Wed, Dec 29, 2021 at 7:53 PM Xuannan Su  wrote:

> Hi David,
>
> Thanks for sharing your thoughts.
>
> You are right that most people tend to use high-level API for
> interactive data exploration. Actually, there is
> the FLIP-36 [1] covering the cache API at Table/SQL API. As far as I
> know, it has been accepted but hasn’t been implemented. At the time
> when it is drafted, DataStream did not support Batch mode but Table
> API does.
>
> Now that the DataStream API does support batch processing, I think we
> can focus on supporting cache at DataStream first. It is still
> valuable for DataStream users and most of the work we do in this FLIP
> can be reused. So I want to limit the scope of this FLIP.
>
> After caching is supported at DataStream, we can continue from where
> FLIP-36 left off to support caching at Table/SQL API. We might have to
> re-vote FLIP-36 or draft a new FLIP. What do you think?
>
> Best,
> Xuannan
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
>
>
>
> On Wed, Dec 29, 2021 at 6:08 PM David Morávek  wrote:
> >
> > Hi Xuannan,
> >
> > thanks for drafting this FLIP.
> >
> > One immediate thought, from what I've seen for interactive data
> exploration
> > with Spark, most people tend to use the higher level APIs, that allow for
> > faster prototyping (Table API in Flink's case). Should the Table API also
> > be covered by this FLIP?
> >
> > Best,
> > D.
> >
> > On Wed, Dec 29, 2021 at 10:36 AM Xuannan Su 
> wrote:
> >
> > > Hi devs,
> > >
> > > I’d like to start a discussion about adding support to cache the
> > > intermediate result at DataStream API for batch processing.
> > >
> > > As the DataStream API now supports batch execution mode, we see users
> > > using the DataStream API to run batch jobs. Interactive programming is
> > > an important use case of Flink batch processing. And the ability to
> > > cache intermediate results of a DataStream is crucial to the
> > > interactive programming experience.
> > >
> > > Therefore, we propose to support caching a DataStream in Batch
> > > execution. We believe that users can benefit a lot from the change and
> > > encourage them to use DataStream API for their interactive batch
> > > processing work.
> > >
> > > Please check out the FLIP-205 [1] and feel free to reply to this email
> > > thread. Looking forward to your feedback!
> > >
> > > [1]
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-205%3A+Support+Cache+in+DataStream+for+Batch+Processing
> > >
> > > Best,
> > > Xuannan
> > >
>


[jira] [Created] (FLINK-24965) Improper usage of Map.Entry after Entry Iterator.remove in TaskLocaStateStoreImpl#pruneCheckpoints

2021-11-19 Thread Gen Luo (Jira)
Gen Luo created FLINK-24965:
---

 Summary: Improper usage of Map.Entry after Entry Iterator.remove 
in TaskLocaStateStoreImpl#pruneCheckpoints
 Key: FLINK-24965
 URL: https://issues.apache.org/jira/browse/FLINK-24965
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Reporter: Gen Luo


In TaskLocaStateStoreImpl#pruneCheckpoints, a list is created to store 
snapshots which should be removed, and entries to remove are directly add into 
the list. The code is like this.
{code:java}
Iterator> entryIterator =
storedTaskStateByCheckpointID.entrySet().iterator();

while (entryIterator.hasNext()) {
Map.Entry snapshotEntry = entryIterator.next();
long entryCheckpointId = snapshotEntry.getKey();
if (pruningChecker.test(entryCheckpointId)) {
toRemove.add(snapshotEntry);
entryIterator.remove();
} else if (breakOnceCheckerFalse) {
break;
}
} {code}
 

 

However, according to the javadoc of Map.Entry,
{code:java}
the behavior of a map entry is undefined if the backing map has been modified 
after the entry was returned by the iterator, except through the setValue 
operation on the map entry. {code}
 entries should not be reserved for further usage after iterator.remove is 
called. In this case, where the map is a TreeMap, if the first entry is skipped 
and removal happens from the second element in `storedTaskStateByCheckpointID`, 
 entries return by entryIterator.next will be the same object and the list will 
be filled with it.

 

A possible fix is to use `new AbstractMap.SimpleEntry<>(snapshotEntry.getKey(), 
snapshotEntry.getValue())` instead of snapshotEntry itself to add into the list.

 

The issue is a minor one since all usage of this method seems safe so far.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] FLIP-185: Shorter heartbeat timeout and interval default values

2021-07-22 Thread Gen Luo
;> heartbeat
>> to time out? Is it because you have a lot of GC or that the heartbeat
>> thread does not get enough CPU cycles?
>>
>> Cheers,
>> Till
>>
>> On Thu, Jul 22, 2021 at 9:16 AM LINZ, Arnaud 
>> wrote:
>>
>> > Hello,
>> >
>> >
>> >
>> > From a user perspective: we have some (rare) use cases where we use
>> > “coarse grain” datasets, with big beans and tasks that do lengthy
>> operation
>> > (such as ML training). In these cases we had to increase the time out to
>> > huge values (heartbeat.timeout: 50) so that our app is not killed.
>> >
>> > I’m aware this is not the way Flink was meant to be used, but it’s a
>> > convenient way to distribute our workload on datanodes without having to
>> > use another concurrency framework (such as M/R) that would require the
>> > recoding of sources and sinks.
>> >
>> >
>> >
>> > In some other (most common) cases, our tasks do some R/W accesses to
>> > RAM-cached repositories backed by a key-value storage such as Kudu (or
>> > Hbase). If most of those calls are very fast, sometimes when the system
>> is
>> > under heavy load they may block more than a few seconds, and having our
>> app
>> > killed because of a short timeout is not an option.
>> >
>> >
>> >
>> > That’s why I’m not in favor of very short timeouts… Because in my
>> > experience it really depends on what user code does in the tasks. (I
>> > understand that normally, as user code is not a JVM-blocking activity
>> such
>> > as a GC, it should have no impact on heartbeats, but from experience, it
>> > really does)
>> >
>> >
>> >
>> > Cheers,
>> >
>> > Arnaud
>> >
>> >
>> >
>> >
>> >
>> > *De :* Gen Luo 
>> > *Envoyé :* jeudi 22 juillet 2021 05:46
>> > *À :* Till Rohrmann 
>> > *Cc :* Yang Wang ; dev ;
>> > user 
>> > *Objet :* Re: [DISCUSS] FLIP-185: Shorter heartbeat timeout and interval
>> > default values
>> >
>> >
>> >
>> > Hi,
>> >
>> > Thanks for driving this @Till Rohrmann  . I would
>> > give +1 on reducing the heartbeat timeout and interval, though I'm not
>> sure
>> > if 15s and 3s would be enough either.
>> >
>> >
>> >
>> > IMO, except for the standalone cluster, where the heartbeat mechanism in
>> > Flink is totally relied, reducing the heartbeat can also help JM to find
>> > out faster TaskExecutors in abnormal conditions that can not respond to
>> the
>> > heartbeat requests, e.g., continuously Full GC, though the process of
>> > TaskExecutor is alive and may not be known by the deployment system.
>> Since
>> > there are cases that can benefit from this change, I think it could be
>> done
>> > if it won't break the experience in other scenarios.
>> >
>> >
>> >
>> > If we can address what will block the main threads from processing
>> > heartbeats, or enlarge the GC costs, we can try to get rid of them to
>> have
>> > a more predictable response time of heartbeat, or give some advices to
>> > users if their jobs may encounter these issues. For example, as far as I
>> > know JM of a large scale job will be more busy and may not able to
>> process
>> > heartbeats in time, then we can give a advice that users working with
>> job
>> > large than 5000 tasks should enlarge there heartbeat interval to 10s and
>> > timeout to 50s. The numbers are written casually.
>> >
>> >
>> >
>> > As for the issue in FLINK-23216, I think it should be fixed and may not
>> be
>> > a main concern for this case.
>> >
>> >
>> >
>> > On Wed, Jul 21, 2021 at 6:26 PM Till Rohrmann 
>> > wrote:
>> >
>> > Thanks for sharing these insights.
>> >
>> >
>> >
>> > I think it is no longer true that the ResourceManager notifies the
>> > JobMaster about lost TaskExecutors. See FLINK-23216 [1] for more
>> details.
>> >
>> >
>> >
>> > Given the GC pauses, would you then be ok with decreasing the heartbeat
>> > timeout to 20 seconds? This should give enough time to do the GC and
>> then
>> > still send/receive a heartbeat request.
>> >
>> >
>> >
>> > I also wanted to a

Re: [DISCUSS] FLIP-185: Shorter heartbeat timeout and interval default values

2021-07-21 Thread Gen Luo
Hi,
Thanks for driving this @Till Rohrmann  . I would
give +1 on reducing the heartbeat timeout and interval, though I'm not sure
if 15s and 3s would be enough either.

IMO, except for the standalone cluster, where the heartbeat mechanism in
Flink is totally relied, reducing the heartbeat can also help JM to find
out faster TaskExecutors in abnormal conditions that can not respond to the
heartbeat requests, e.g., continuously Full GC, though the process of
TaskExecutor is alive and may not be known by the deployment system. Since
there are cases that can benefit from this change, I think it could be done
if it won't break the experience in other scenarios.

If we can address what will block the main threads from processing
heartbeats, or enlarge the GC costs, we can try to get rid of them to have
a more predictable response time of heartbeat, or give some advices to
users if their jobs may encounter these issues. For example, as far as I
know JM of a large scale job will be more busy and may not able to process
heartbeats in time, then we can give a advice that users working with job
large than 5000 tasks should enlarge there heartbeat interval to 10s and
timeout to 50s. The numbers are written casually.

As for the issue in FLINK-23216, I think it should be fixed and may not be
a main concern for this case.

On Wed, Jul 21, 2021 at 6:26 PM Till Rohrmann  wrote:

> Thanks for sharing these insights.
>
> I think it is no longer true that the ResourceManager notifies the
> JobMaster about lost TaskExecutors. See FLINK-23216 [1] for more details.
>
> Given the GC pauses, would you then be ok with decreasing the heartbeat
> timeout to 20 seconds? This should give enough time to do the GC and then
> still send/receive a heartbeat request.
>
> I also wanted to add that we are about to get rid of one big cause of
> blocking I/O operations from the main thread. With FLINK-22483 [2] we will
> get rid of Filesystem accesses to retrieve completed checkpoints. This
> leaves us with one additional file system access from the main thread which
> is the one completing a pending checkpoint. I think it should be possible
> to get rid of this access because as Stephan said it only writes
> information to disk that is already written before. Maybe solving these two
> issues could ease concerns about long pauses of unresponsiveness of Flink.
>
> [1] https://issues.apache.org/jira/browse/FLINK-23216
> [2] https://issues.apache.org/jira/browse/FLINK-22483
>
> Cheers,
> Till
>
> On Wed, Jul 21, 2021 at 4:58 AM Yang Wang  wrote:
>
>> Thanks @Till Rohrmann   for starting this
>> discussion
>>
>> Firstly, I try to understand the benefit of shorter heartbeat timeout.
>> IIUC, it will make the JobManager aware of
>> TaskManager faster. However, it seems that only the standalone cluster
>> could benefit from this. For Yarn and
>> native Kubernetes deployment, the Flink ResourceManager should get the
>> TaskManager lost event in a very short time.
>>
>> * About 8 seconds, 3s for Yarn NM -> Yarn RM, 5s for Yarn RM -> Flink RM
>> * Less than 1 second, Flink RM has a watch for all the TaskManager pods
>>
>> Secondly, I am not very confident to decrease the timeout to 15s. I have
>> quickly checked the TaskManager GC logs
>> in the past week of our internal Flink workloads and find more than 100
>> 10-seconds Full GC logs, but no one is bigger than 15s.
>> We are using CMS GC for old generation.
>>
>>
>> Best,
>> Yang
>>
>> Till Rohrmann  于2021年7月17日周六 上午1:05写道:
>>
>>> Hi everyone,
>>>
>>> Since Flink 1.5 we have the same heartbeat timeout and interval default
>>> values that are defined as heartbeat.timeout: 50s and heartbeat.interval:
>>> 10s. These values were mainly chosen to compensate for lengthy GC pauses
>>> and blocking operations that were executed in the main threads of Flink's
>>> components. Since then, there were quite some advancements wrt the JVM's
>>> GCs and we also got rid of a lot of blocking calls that were executed in
>>> the main thread. Moreover, a long heartbeat.timeout causes long recovery
>>> times in case of a TaskManager loss because the system can only properly
>>> recover after the dead TaskManager has been removed from the scheduler.
>>> Hence, I wanted to propose to change the timeout and interval to:
>>>
>>> heartbeat.timeout: 15s
>>> heartbeat.interval: 3s
>>>
>>> Since there is no perfect solution that fits all use cases, I would
>>> really
>>> like to hear from you what you think about it and how you configure these
>>> heartbeat options. Based on your experience we might actually come up
>>> with
>>> better default values that allow us to be resilient but also to detect
>>> failed components fast. FLIP-185 can be found here [1].
>>>
>>> [1] https://cwiki.apache.org/confluence/x/GAoBCw
>>>
>>> Cheers,
>>> Till
>>>
>>


Re: Job Recovery Time on TM Lost

2021-07-08 Thread Gen Luo
utor.runAllTasks(SingleThreadEventExecutor.java:416)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:331)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
> at
> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> at java.lang.Thread.run(Thread.java:748)
> ```
> 1. It's a bit inconvenient to debug such an exception because it doesn't
> report the exact container id. Right now we have to look for `
> xenon-prod-001-20200316-data-slave-prod-0a020f7a.ec2.pin220.com/10.2.15.122:33539`
> <http://xenon-prod-001-20200316-data-slave-prod-0a020f7a.ec2.pin220.com/10.2.15.122:33539>
> in JobMananger log to find that.
> 2. The task manager log doesn't show anything suspicious. Also, no major
> GC. So it might imply a flack connection in this case.
> 3. Is there any short term workaround we can try? any config tuning? Also,
> what's the long term solution?
>
> Best
> Lu
>
>
>
>
> On Tue, Jul 6, 2021 at 11:45 PM 刘建刚  wrote:
>
>> It is really helpful to find the lost container quickly. In our inner
>> flink version, we optimize it by task's report and jobmaster's probe. When
>> a task fails because of the connection, it reports to the jobmaster. The
>> jobmaster will try to confirm the liveness of the unconnected
>> taskmanager for certain times by config. If the jobmaster find the
>> taskmanager unconnected or dead, it releases the taskmanger. This will work
>> for most cases. For an unstable environment, config needs adjustment.
>>
>> Gen Luo  于2021年7月6日周二 下午8:41写道:
>>
>>> Yes, I have noticed the PR and commented there with some consideration
>>> about the new option. We can discuss further there.
>>>
>>> On Tue, Jul 6, 2021 at 6:04 PM Till Rohrmann 
>>> wrote:
>>>
>>> > This is actually a very good point Gen. There might not be a lot to
>>> gain
>>> > for us by implementing a fancy algorithm for figuring out whether a TM
>>> is
>>> > dead or not based on failed heartbeat RPCs from the JM if the TM <> TM
>>> > communication does not tolerate failures and directly fails the
>>> affected
>>> > tasks. This assumes that the JM and TM run in the same environment.
>>> >
>>> > One simple approach could be to make the number of failed heartbeat
>>> RPCs
>>> > until a target is marked as unreachable configurable because what
>>> > represents a good enough criterion in one user's environment might
>>> produce
>>> > too many false-positives in somebody else's environment. Or even
>>> simpler,
>>> > one could say that one can disable reacting to a failed heartbeat RPC
>>> as it
>>> > is currently the case.
>>> >
>>> > We currently have a discussion about this on this PR [1]. Maybe you
>>> wanna
>>> > join the discussion there and share your insights.
>>> >
>>> > [1] https://github.com/apache/flink/pull/16357
>>> >
>>> > Cheers,
>>> > Till
>>> >
>>> > On Tue, Jul 6, 2021 at 4:37 AM Gen Luo  wrote:
>>> >
>>> >> I know that there are retry strategies for akka rpc frameworks. I was
>>> >> just considering that, since the environment is shared by JM and TMs,
>>> and
>>> >> the connections among TMs (using netty) are flaky in unstable
>>> environments,
>>> >> which will also cause the job failure, is it necessary to build a
>>> >> strongly guaranteed connection between JM and TMs, or it could be as
>>> flaky
>>> >> as the connections among TMs?
>>> >>
>>> >> As far as I know, connections among TMs will just fail on their first
>>> >> connection loss, so behaving like this in JM just means "as flaky as
>>> >> connections among TMs". In a stable environment it's good enough, but
>>> in an
>>> >> unstable environment, it indeed increases the instability. IMO,
>>> though a
>>> >> single connection loss is not reliable, a double check should be good
>>> >> enough. But since I'm not experienced with an unstable environment, I
>>> can't
>>> >> tell whether that's also enough for it.
>>> >>
>>> >> On Mon, Jul 5, 2021 at 5:59 PM Till Rohrmann 
>>> >> wrote:
>>> >>
>>> >>> I think for RPC commun

Re: Job Recovery Time on TM Lost

2021-07-06 Thread Gen Luo
Yes, I have noticed the PR and commented there with some consideration
about the new option. We can discuss further there.

On Tue, Jul 6, 2021 at 6:04 PM Till Rohrmann  wrote:

> This is actually a very good point Gen. There might not be a lot to gain
> for us by implementing a fancy algorithm for figuring out whether a TM is
> dead or not based on failed heartbeat RPCs from the JM if the TM <> TM
> communication does not tolerate failures and directly fails the affected
> tasks. This assumes that the JM and TM run in the same environment.
>
> One simple approach could be to make the number of failed heartbeat RPCs
> until a target is marked as unreachable configurable because what
> represents a good enough criterion in one user's environment might produce
> too many false-positives in somebody else's environment. Or even simpler,
> one could say that one can disable reacting to a failed heartbeat RPC as it
> is currently the case.
>
> We currently have a discussion about this on this PR [1]. Maybe you wanna
> join the discussion there and share your insights.
>
> [1] https://github.com/apache/flink/pull/16357
>
> Cheers,
> Till
>
> On Tue, Jul 6, 2021 at 4:37 AM Gen Luo  wrote:
>
>> I know that there are retry strategies for akka rpc frameworks. I was
>> just considering that, since the environment is shared by JM and TMs, and
>> the connections among TMs (using netty) are flaky in unstable environments,
>> which will also cause the job failure, is it necessary to build a
>> strongly guaranteed connection between JM and TMs, or it could be as flaky
>> as the connections among TMs?
>>
>> As far as I know, connections among TMs will just fail on their first
>> connection loss, so behaving like this in JM just means "as flaky as
>> connections among TMs". In a stable environment it's good enough, but in an
>> unstable environment, it indeed increases the instability. IMO, though a
>> single connection loss is not reliable, a double check should be good
>> enough. But since I'm not experienced with an unstable environment, I can't
>> tell whether that's also enough for it.
>>
>> On Mon, Jul 5, 2021 at 5:59 PM Till Rohrmann 
>> wrote:
>>
>>> I think for RPC communication there are retry strategies used by the
>>> underlying Akka ActorSystem. So a RpcEndpoint can reconnect to a remote
>>> ActorSystem and resume communication. Moreover, there are also
>>> reconciliation protocols in place which reconcile the states between the
>>> components because of potentially lost RPC messages. So the main question
>>> would be whether a single connection loss is good enough for triggering the
>>> timeout or whether we want a more elaborate mechanism to reason about the
>>> availability of the remote system (e.g. a couple of lost heartbeat
>>> messages).
>>>
>>> Cheers,
>>> Till
>>>
>>> On Mon, Jul 5, 2021 at 10:00 AM Gen Luo  wrote:
>>>
>>>> As far as I know, a TM will report connection failure once its
>>>> connected TM is lost. I suppose JM can believe the report and fail the
>>>> tasks in the lost TM if it also encounters a connection failure.
>>>>
>>>> Of course, it won't work if the lost TM is standalone. But I suppose we
>>>> can use the same strategy as the connected scenario. That is, consider it
>>>> possibly lost on the first connection loss, and fail it if double check
>>>> also fails. The major difference is the senders of the probes are the same
>>>> one rather than two different roles, so the results may tend to be the 
>>>> same.
>>>>
>>>> On the other hand, the fact also means that the jobs can be fragile in
>>>> an unstable environment, no matter whether the failover is triggered by TM
>>>> or JM. So maybe it's not that worthy to introduce extra configurations for
>>>> fault tolerance of heartbeat, unless we also introduce some retry
>>>> strategies for netty connections.
>>>>
>>>>
>>>> On Fri, Jul 2, 2021 at 9:34 PM Till Rohrmann 
>>>> wrote:
>>>>
>>>>> Could you share the full logs with us for the second experiment, Lu? I
>>>>> cannot tell from the top of my head why it should take 30s unless you have
>>>>> configured a restart delay of 30s.
>>>>>
>>>>> Let's discuss FLINK-23216 on the JIRA ticket, Gen.
>>>>>
>>>>> I've now implemented FLINK-23209 [1] but it somehow has the problem
>>>>> that in a flakey envi

Re: Job Recovery Time on TM Lost

2021-07-05 Thread Gen Luo
I know that there are retry strategies for akka rpc frameworks. I was just
considering that, since the environment is shared by JM and TMs, and the
connections among TMs (using netty) are flaky in unstable environments,
which will also cause the job failure, is it necessary to build a
strongly guaranteed connection between JM and TMs, or it could be as flaky
as the connections among TMs?

As far as I know, connections among TMs will just fail on their first
connection loss, so behaving like this in JM just means "as flaky as
connections among TMs". In a stable environment it's good enough, but in an
unstable environment, it indeed increases the instability. IMO, though a
single connection loss is not reliable, a double check should be good
enough. But since I'm not experienced with an unstable environment, I can't
tell whether that's also enough for it.

On Mon, Jul 5, 2021 at 5:59 PM Till Rohrmann  wrote:

> I think for RPC communication there are retry strategies used by the
> underlying Akka ActorSystem. So a RpcEndpoint can reconnect to a remote
> ActorSystem and resume communication. Moreover, there are also
> reconciliation protocols in place which reconcile the states between the
> components because of potentially lost RPC messages. So the main question
> would be whether a single connection loss is good enough for triggering the
> timeout or whether we want a more elaborate mechanism to reason about the
> availability of the remote system (e.g. a couple of lost heartbeat
> messages).
>
> Cheers,
> Till
>
> On Mon, Jul 5, 2021 at 10:00 AM Gen Luo  wrote:
>
>> As far as I know, a TM will report connection failure once its connected
>> TM is lost. I suppose JM can believe the report and fail the tasks in the
>> lost TM if it also encounters a connection failure.
>>
>> Of course, it won't work if the lost TM is standalone. But I suppose we
>> can use the same strategy as the connected scenario. That is, consider it
>> possibly lost on the first connection loss, and fail it if double check
>> also fails. The major difference is the senders of the probes are the same
>> one rather than two different roles, so the results may tend to be the same.
>>
>> On the other hand, the fact also means that the jobs can be fragile in an
>> unstable environment, no matter whether the failover is triggered by TM or
>> JM. So maybe it's not that worthy to introduce extra configurations for
>> fault tolerance of heartbeat, unless we also introduce some retry
>> strategies for netty connections.
>>
>>
>> On Fri, Jul 2, 2021 at 9:34 PM Till Rohrmann 
>> wrote:
>>
>>> Could you share the full logs with us for the second experiment, Lu? I
>>> cannot tell from the top of my head why it should take 30s unless you have
>>> configured a restart delay of 30s.
>>>
>>> Let's discuss FLINK-23216 on the JIRA ticket, Gen.
>>>
>>> I've now implemented FLINK-23209 [1] but it somehow has the problem that
>>> in a flakey environment you might not want to mark a TaskExecutor dead on
>>> the first connection loss. Maybe this is something we need to make
>>> configurable (e.g. introducing a threshold which admittedly is similar to
>>> the heartbeat timeout) so that the user can configure it for her
>>> environment. On the upside, if you mark the TaskExecutor dead on the first
>>> connection loss (assuming you have a stable network environment), then it
>>> can now detect lost TaskExecutors as fast as the heartbeat interval.
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-23209
>>>
>>> Cheers,
>>> Till
>>>
>>> On Fri, Jul 2, 2021 at 9:33 AM Gen Luo  wrote:
>>>
>>>> Thanks for sharing, Till and Yang.
>>>>
>>>> @Lu
>>>> Sorry but I don't know how to explain the new test with the log. Let's
>>>> wait for others' reply.
>>>>
>>>> @Till
>>>> It would be nice if JIRAs could be fixed. Thanks again for proposing
>>>> them.
>>>>
>>>> In addition, I was tracking an issue that RM keeps allocating and
>>>> freeing slots after a TM lost until its heartbeat timeout, when I found the
>>>> recovery costing as long as heartbeat timeout. That should be a minor bug
>>>> introduced by declarative resource management. I have created a JIRA about
>>>> the problem [1] and  we can discuss it there if necessary.
>>>>
>>>> [1] https://issues.apache.org/jira/browse/FLINK-23216
>>>>
>>>> Lu Niu  于2021年7月2日周五 上午3:13写道:
>>>>
>>>>> An

Re: Job Recovery Time on TM Lost

2021-07-05 Thread Gen Luo
As far as I know, a TM will report connection failure once its connected TM
is lost. I suppose JM can believe the report and fail the tasks in the lost
TM if it also encounters a connection failure.

Of course, it won't work if the lost TM is standalone. But I suppose we can
use the same strategy as the connected scenario. That is, consider it
possibly lost on the first connection loss, and fail it if double check
also fails. The major difference is the senders of the probes are the same
one rather than two different roles, so the results may tend to be the same.

On the other hand, the fact also means that the jobs can be fragile in an
unstable environment, no matter whether the failover is triggered by TM or
JM. So maybe it's not that worthy to introduce extra configurations for
fault tolerance of heartbeat, unless we also introduce some retry
strategies for netty connections.


On Fri, Jul 2, 2021 at 9:34 PM Till Rohrmann  wrote:

> Could you share the full logs with us for the second experiment, Lu? I
> cannot tell from the top of my head why it should take 30s unless you have
> configured a restart delay of 30s.
>
> Let's discuss FLINK-23216 on the JIRA ticket, Gen.
>
> I've now implemented FLINK-23209 [1] but it somehow has the problem that
> in a flakey environment you might not want to mark a TaskExecutor dead on
> the first connection loss. Maybe this is something we need to make
> configurable (e.g. introducing a threshold which admittedly is similar to
> the heartbeat timeout) so that the user can configure it for her
> environment. On the upside, if you mark the TaskExecutor dead on the first
> connection loss (assuming you have a stable network environment), then it
> can now detect lost TaskExecutors as fast as the heartbeat interval.
>
> [1] https://issues.apache.org/jira/browse/FLINK-23209
>
> Cheers,
> Till
>
> On Fri, Jul 2, 2021 at 9:33 AM Gen Luo  wrote:
>
>> Thanks for sharing, Till and Yang.
>>
>> @Lu
>> Sorry but I don't know how to explain the new test with the log. Let's
>> wait for others' reply.
>>
>> @Till
>> It would be nice if JIRAs could be fixed. Thanks again for proposing them.
>>
>> In addition, I was tracking an issue that RM keeps allocating and freeing
>> slots after a TM lost until its heartbeat timeout, when I found the
>> recovery costing as long as heartbeat timeout. That should be a minor bug
>> introduced by declarative resource management. I have created a JIRA about
>> the problem [1] and  we can discuss it there if necessary.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-23216
>>
>> Lu Niu  于2021年7月2日周五 上午3:13写道:
>>
>>> Another side question, Shall we add metric to cover the complete
>>> restarting time (phase 1 + phase 2)? Current metric jm.restartingTime only
>>> covers phase 1. Thanks!
>>>
>>> Best
>>> Lu
>>>
>>> On Thu, Jul 1, 2021 at 12:09 PM Lu Niu  wrote:
>>>
>>>> Thanks TIll and Yang for help! Also Thanks Till for a quick fix!
>>>>
>>>> I did another test yesterday. In this test, I intentionally throw
>>>> exception from the source operator:
>>>> ```
>>>> if (runtimeContext.getIndexOfThisSubtask() == 1
>>>> && errorFrenquecyInMin > 0
>>>> && System.currentTimeMillis() - lastStartTime >=
>>>> errorFrenquecyInMin * 60 * 1000) {
>>>>   lastStartTime = System.currentTimeMillis();
>>>>   throw new RuntimeException(
>>>>   "Trigger expected exception at: " + lastStartTime);
>>>> }
>>>> ```
>>>> In this case, I found phase 1 still takes about 30s and Phase 2 dropped
>>>> to 1s (because no need for container allocation). Why phase 1 still takes
>>>> 30s even though no TM is lost?
>>>>
>>>> Related logs:
>>>> ```
>>>> 2021-06-30 00:55:07,463 INFO
>>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph- Source:
>>>> USER_MATERIALIZED_EVENT_SIGNAL-user_context-frontend_event_source -> ...
>>>> java.lang.RuntimeException: Trigger expected exception at: 1625014507446
>>>> 2021-06-30 00:55:07,509 INFO
>>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
>>>> NRTG_USER_MATERIALIZED_EVENT_SIGNAL_user_context_staging
>>>> (35c95ee7141334845cfd49b642fa9f98) switched from state RUNNING to
>>>> RESTARTING.
>>>> 2021-06-30 00:55:37,596 INFO
>>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
>>>> NRTG_USER_

Re: Job Recovery Time on TM Lost

2021-07-02 Thread Gen Luo
t; completion event after the heartbeat of Yarn NM->Yarn RM->Flink RM,
>>>>> which is 8 seconds by default.
>>>>> And Flink ResourceManager will release the dead TaskManager container
>>>>> once received the completion event.
>>>>> As a result, Flink will not deploy tasks onto the dead TaskManagers.
>>>>>
>>>>>
>>>>> I think most of the time cost in Phase 1 might be cancelling the tasks
>>>>> on the dead TaskManagers.
>>>>>
>>>>>
>>>>> Best,
>>>>> Yang
>>>>>
>>>>>
>>>>> Till Rohrmann  于2021年7月1日周四 下午4:49写道:
>>>>>
>>>>>> The analysis of Gen is correct. Flink currently uses its heartbeat as
>>>>>> the primary means to detect dead TaskManagers. This means that Flink will
>>>>>> take at least `heartbeat.timeout` time before the system recovers. Even 
>>>>>> if
>>>>>> the cancellation happens fast (e.g. by having configured a low
>>>>>> akka.ask.timeout), then Flink will still try to deploy tasks onto the 
>>>>>> dead
>>>>>> TaskManager until it is marked as dead and its slots are released (unless
>>>>>> the ResourceManager does not get a signal from the underlying resource
>>>>>> management system that a container/pod has died). One way to improve the
>>>>>> situation is to introduce logic which can react to a ConnectionException
>>>>>> and then black lists or releases a TaskManager, for example. This is
>>>>>> currently not implemented in Flink, though.
>>>>>>
>>>>>> Concerning the cancellation operation: Flink currently does not
>>>>>> listen to the dead letters of Akka. This means that the 
>>>>>> `akka.ask.timeout`
>>>>>> is the primary means to fail the future result of a rpc which could not 
>>>>>> be
>>>>>> sent. This is also an improvement we should add to Flink's RpcService. 
>>>>>> I've
>>>>>> created a JIRA issue for this problem [1].
>>>>>>
>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-23202
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Wed, Jun 30, 2021 at 6:33 PM Lu Niu  wrote:
>>>>>>
>>>>>>> Thanks Gen! cc flink-dev to collect more inputs.
>>>>>>>
>>>>>>> Best
>>>>>>> Lu
>>>>>>>
>>>>>>> On Wed, Jun 30, 2021 at 12:55 AM Gen Luo 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I'm also wondering here.
>>>>>>>>
>>>>>>>> In my opinion, it's because the JM can not confirm whether the TM
>>>>>>>> is lost or it's a temporary network trouble and will recover soon, 
>>>>>>>> since
>>>>>>>> I can see in the log that akka has got a Connection refused but JM 
>>>>>>>> still
>>>>>>>> sends a heartbeat request to the lost TM until it reaches heartbeat
>>>>>>>> timeout. But I'm not sure if it's indeed designed like this.
>>>>>>>>
>>>>>>>> I would really appreciate it if anyone who knows more details could
>>>>>>>> answer. Thanks.
>>>>>>>>
>>>>>>>


[jira] [Created] (FLINK-23216) RM keeps allocating and freeing slots after a TM lost until its heartbeat timeout

2021-07-02 Thread Gen Luo (Jira)
Gen Luo created FLINK-23216:
---

 Summary: RM keeps allocating and freeing slots after a TM lost 
until its heartbeat timeout
 Key: FLINK-23216
 URL: https://issues.apache.org/jira/browse/FLINK-23216
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.13.1
Reporter: Gen Luo


In Flink 1.13, it's observed that the ResourceManager keeps allocating and 
freeing slots with a new TM when it's notified by yarn that a TM is lost. The 
behavior will continue until JM marks the TM as FAILED when its heartbeat 
timeout is reached. It can be easily reproduced by enlarging the 
akka.ask.timeout and heartbeat.timeout, for example to 10 min.

 

After tracking, we find the procedure should be like this:

When a TM is killed, yarn will first receive the event and notify the RM.

In Flink 1.13, RM uses declarative resource management to manage the slots. It 
will find a lack of resources when receiving the notification, and then request 
a new TM from yarn.

RM will then require the new TM to connect and offer slots to JM.

But from JM's point of view, all slots are fulfilled, since the lost TM is not 
considered disconnected yet, until the heartbeat timeout is reached, so JM will 
reject all slot offers.

The new TM will find no slot serving for the JM, then disconnect from the JM.

RM will then find a lack of resources again and go back to step3, requiring the 
new TM to connect and offer slots to JM, but It won't request another new TM 
from yarn.

 

The original log is lost but is like this:

o.a.f.r.r.s.DefaultSlotStatusSyncer - Freeing slot xxx.

...(repeat serval lines for different slots)...

o.a.f.r.r.s.DefaultSlotStatusSyncer - Starting allocation of slot xxx from 
container_xxx for job xxx.

...(repeat serval lines for different slots)...

 

This could be fixed in several ways, such as notifying JM as well the RM 
receives a TM lost notification, TMs do not offer slots until required, etc. 
But all these ways have side effects so may need further discussion. 

Besides, this should no longer be an issue after FLINK-23209 is done.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-39: Flink ML pipeline and ML libs

2019-06-17 Thread Gen Luo
Hi all,

In the review of PR for FLINK-12473, there were a few comments regarding
pipeline exportation. We would like to start a follow up discussions to
address some related comments.

Currently, FLIP-39 proposal gives a way for users to persist a pipeline in
JSON format. But it does not specify how users can export a pipeline for
serving purpose. We summarized some thoughts on this in the following doc.

https://docs.google.com/document/d/1B84b-1CvOXtwWQ6_tQyiaHwnSeiRqh-V96Or8uHqCp8/edit?usp=sharing

After we reach consensus on the pipeline exportation, we will add a
corresponding section in FLIP-39.


Shaoxuan Wang  于2019年6月5日周三 上午8:47写道:

> Stavros,
> They have the similar logic concept, but the implementation details are
> quite different. It is hard to migrate the interface with different
> implementations. The built-in algorithms are useful legacy that we will
> consider migrate to the new API (but still with different implementations).
> BTW, the new API has already been merged via FLINK-12473.
>
> Thanks,
> Shaoxuan
>
>
>
> On Mon, Jun 3, 2019 at 6:08 PM Stavros Kontopoulos <
> st.kontopou...@gmail.com>
> wrote:
>
> > Hi,
> >
> > Some portion of the code could be migrated to the new Table API no?
> > I am saying that because the new API design is based on scikit-learn and
> > the old one was also inspired by it.
> >
> > Best,
> > Stavros
> > On Wed, May 22, 2019 at 1:24 PM Shaoxuan Wang 
> wrote:
> >
> > > Another consensus (from the offline discussion) is that we will
> > > delete/deprecate flink-libraries/flink-ml. I have started a survey and
> > > discussion [1] in dev/user-ml to collect the feedback. Depending on the
> > > replies, we will decide if we shall delete it in Flink1.9 or
> > > deprecate in the next release after 1.9.
> > >
> > > [1]
> > >
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/SURVEY-Usage-of-flink-ml-and-DISCUSS-Delete-flink-ml-td29057.html
> > >
> > > Regards,
> > > Shaoxuan
> > >
> > >
> > > On Tue, May 21, 2019 at 9:22 PM Gen Luo  wrote:
> > >
> > > > Yes, this is our conclusion. I'd like to add only one point that
> > > > registering user defined aggregator is also needed which is currently
> > > > provided by 'bridge' and finally will be merged into Table API. It's
> > same
> > > > with collect().
> > > >
> > > > I will add a TableEnvironment argument in Estimator.fit() and
> > > > Transformer.transform() to get rid of the dependency on
> > > > flink-table-planner. This will be committed soon.
> > > >
> > > > Aljoscha Krettek  于2019年5月21日周二 下午7:31写道:
> > > >
> > > > > We discussed this in private and came to the conclusion that we
> > should
> > > > > (for now) have the dependency on flink-table-api-xxx-bridge because
> > we
> > > > need
> > > > > access to the collect() method, which is not yet available in the
> > Table
> > > > > API. Once that is available the code can be refactored but for now
> we
> > > > want
> > > > > to unblock work on this new module.
> > > > >
> > > > > We also agreed that we don’t need a direct dependency on
> > > > > flink-table-planner.
> > > > >
> > > > > I hope I summarised our discussion correctly.
> > > > >
> > > > > > On 17. May 2019, at 12:20, Gen Luo  wrote:
> > > > > >
> > > > > > Thanks for your reply.
> > > > > >
> > > > > > For the first question, it's not strictly necessary. But I perfer
> > not
> > > > to
> > > > > > have a TableEnvironment argument in Estimator.fit() or
> > > > > > Transformer.transform(), which is not part of machine learning
> > > concept,
> > > > > and
> > > > > > may make our API not as clean and pretty as other systems do. I
> > would
> > > > > like
> > > > > > another way other than introducing flink-table-planner to do
> this.
> > If
> > > > > it's
> > > > > > impossible or severely opposed, I may make the concession to add
> > the
> > > > > > argument.
> > > > > >
> > > > > > Other than that, "flink-table-api-xxx-bridge"s are still needed.
> A
> > > vary
> > > > > > common case is that an algorithm needs to guarantee that it's
> > running
> > > > > under
> > > > > > a BatchTableEnvironment, which makes it possible to collect
> result
> > > each
> > > > > > iteration. A typical algorithm like this is ALS. By flink1.8,
> this
> > > can
> > > > be
> > > > > > only achieved by converting Table to DataSet than call
> > > > DataSet.collect(),
> > > > > > which is available in flink-table-api-xxx-bridge. Besides,
> > > registering
> > > > > > UDAGG is also depending on it.
> > > > > >
> > > > > > In conclusion, '"planner" can be removed from dependencies but
> > > > > introducing
> > > > > > "bridge"s are inevitable. Whether and how to acquire
> > TableEnvironment
> > > > > from
> > > > > > a Table can be discussed.
> > > > >
> > > > >
> > > >
> > >
> >
>


Re: [DISCUSS] FLIP-39: Flink ML pipeline and ML libs

2019-05-21 Thread Gen Luo
Yes, this is our conclusion. I'd like to add only one point that
registering user defined aggregator is also needed which is currently
provided by 'bridge' and finally will be merged into Table API. It's same
with collect().

I will add a TableEnvironment argument in Estimator.fit() and
Transformer.transform() to get rid of the dependency on
flink-table-planner. This will be committed soon.

Aljoscha Krettek  于2019年5月21日周二 下午7:31写道:

> We discussed this in private and came to the conclusion that we should
> (for now) have the dependency on flink-table-api-xxx-bridge because we need
> access to the collect() method, which is not yet available in the Table
> API. Once that is available the code can be refactored but for now we want
> to unblock work on this new module.
>
> We also agreed that we don’t need a direct dependency on
> flink-table-planner.
>
> I hope I summarised our discussion correctly.
>
> > On 17. May 2019, at 12:20, Gen Luo  wrote:
> >
> > Thanks for your reply.
> >
> > For the first question, it's not strictly necessary. But I perfer not to
> > have a TableEnvironment argument in Estimator.fit() or
> > Transformer.transform(), which is not part of machine learning concept,
> and
> > may make our API not as clean and pretty as other systems do. I would
> like
> > another way other than introducing flink-table-planner to do this. If
> it's
> > impossible or severely opposed, I may make the concession to add the
> > argument.
> >
> > Other than that, "flink-table-api-xxx-bridge"s are still needed. A vary
> > common case is that an algorithm needs to guarantee that it's running
> under
> > a BatchTableEnvironment, which makes it possible to collect result each
> > iteration. A typical algorithm like this is ALS. By flink1.8, this can be
> > only achieved by converting Table to DataSet than call DataSet.collect(),
> > which is available in flink-table-api-xxx-bridge. Besides, registering
> > UDAGG is also depending on it.
> >
> > In conclusion, '"planner" can be removed from dependencies but
> introducing
> > "bridge"s are inevitable. Whether and how to acquire TableEnvironment
> from
> > a Table can be discussed.
>
>


Re: [DISCUSS] FLIP-39: Flink ML pipeline and ML libs

2019-05-17 Thread Gen Luo
Thanks for your reply.

For the first question, it's not strictly necessary. But I perfer not to
have a TableEnvironment argument in Estimator.fit() or
Transformer.transform(), which is not part of machine learning concept, and
may make our API not as clean and pretty as other systems do. I would like
another way other than introducing flink-table-planner to do this. If it's
impossible or severely opposed, I may make the concession to add the
argument.

Other than that, "flink-table-api-xxx-bridge"s are still needed. A vary
common case is that an algorithm needs to guarantee that it's running under
a BatchTableEnvironment, which makes it possible to collect result each
iteration. A typical algorithm like this is ALS. By flink1.8, this can be
only achieved by converting Table to DataSet than call DataSet.collect(),
which is available in flink-table-api-xxx-bridge. Besides, registering
UDAGG is also depending on it.

In conclusion, '"planner" can be removed from dependencies but introducing
"bridge"s are inevitable. Whether and how to acquire TableEnvironment from
a Table can be discussed.


Re: [DISCUSS] FLIP-39: Flink ML pipeline and ML libs

2019-05-17 Thread Gen Luo
It's better not to depend on flink-table-planner indeed. It's currently
needed for 3 points: registering udagg, judging the tableEnv batch or
streaming, converting table to dataSet to collect data. Most of these
requirements can be fulfilled by flink-table-api-java-bridge and
flink-table-api-scala-bridge.

But there's a lack that without current flink-table-planner, it's
impossible to acquire the tableEnv from a table. If so, all interfaces have
to require an extra argument tableEnv.

This does make sense, but personally I don't like it because it has nothing
to do with machine learning concept. The flink-ml is mainly towards to
algorithm engineers and scientists, I believe it's better to make the api
clean and hide the detail of implementation as much as possible. Hopefully
there would another way to acquire the tableEnv and the api could stay
clean.

Aljoscha Krettek  于2019年5月16日周四 下午8:16写道:

> Hi,
>
> I had a look at the document mostly from a module structure/dependency
> structure perspective.
>
> We should make the expected dependency structure explicit in the document.
>
> From the discussion in the doc it seems that the intention is that
> flink-ml-lib should depend on flink-table-planner (the current, pre-blink
> Table API planner that has a dependency on the DataSet API and DataStream
> API). I think we should not have this because it ties the Flink ML
> implementation to a module that is going to be deprecated. As far as I
> understood, the intention for this new Flink ML module is to be the next
> generation approach, based on the Table API. If this is true, we should
> make sure that this only depends on the Table API and is independent of the
> underlying planner implementation. Especially if we want this to work with
> the new Blink-based planner that is currently being added to Flink.
>
> What do you think?
>
> Best,
> Aljoscha
>
> > On 10. May 2019, at 11:22, Shaoxuan Wang  wrote:
> >
> > Hi everyone,
> >
> > I created umbrella Jira FLINK-12470
> >  for FLIP39 and
> added an
> > "implementation plan" section in the google doc
> > (
> https://docs.google.com/document/d/1StObo1DLp8iiy0rbukx8kwAJb0BwDZrQrMWub3DzsEo/edit#heading=h.pggjwvwg8mrx
> )
> >  docs.google.com/document/d/1StObo1DLp8iiy0rbukx8kwAJb0BwDZrQrMWub3DzsEo/edit#heading=h.pggjwvwg8mrx)
> .>
> > .
> > Need your special attention on the organization of modules/packages of
> > flink-ml. @Aljosha, @Till, @Rong, @Jincheng, @Becket, and all.
> >
> > We anticipate a quick development growth of Flink ML in the next several
> > releases. Several components (for instance, pipeline, mllib, model
> serving,
> > ml integration test) need to be separated into different submodules.
> > Therefore, we propose to create a new flink-ml module at the root, and
> add
> > sub-modules for ml-pipeline and ml-lib of FLIP39, and potentially we
> > can also design FLIP23 as another sub-module under this new flink-ml
> > module (I will raise a discussion in FLIP23 ML thread about this). The
> > legacy flink-ml module (under flink-libraries) can be remained as it is
> and
> > await to be deprecated in the future, or alternatively we move it under
> > this new flink-ml module and rename it to flink-dataset-ml. What do you
> > think?
> >
> > Looking forward to your feedback.
> >
> > Regards,
> > Shaoxuan
> >
> >
> > On Tue, May 7, 2019 at 8:42 AM Rong Rong  wrote:
> >
> >> Thanks for following up promptly and sharing the feedback @shaoxuan.
> >>
> >> Yes I share the same view with you on the convergence of these 2 FLIPs
> >> eventually. I also have some questions regarding the API as well as the
> >> possible convergence challenges (especially current Co-processor
> approach
> >> vs. FLIP-39's table API approach), I will follow up on the discussion
> >> thread and the PR on FLIP-23 with you and Boris :-)
> >>
> >> --
> >> Rong
> >>
> >> On Mon, May 6, 2019 at 3:30 AM Shaoxuan Wang 
> wrote:
> >>
> >>>
> >>> Thanks for the feedback, Rong and Flavio.
> >>>
> >>> @Rong Rong
>  There's another thread regarding a close to merge FLIP-23
> implementation
>  [1]. I agree this might still be early stage to talk about
> >>> productionizing
>  and model-serving. But I would be nice to keep the
> >>> design/implementation in
>  mind that: ease of use for productionizing a ML pipeline is also very
>  important.
>  And if we can leverage the implementation in FLIP-23 in the future,
> >>> (some
>  adjustment might be needed) that would be super helpful.
> >>> Your raised a very good point. Actually I have been reviewing FLIP23
> for
> >>> a while (mostly offline to help Boris polish the PR). FMPOV, FLIP23 and
> >>> FLIP39 can be well unified at some point. Model serving in FLIP23 is
> >>> actually a special case of “transformer/model” proposed in FLIP39.
> Boris's
> >>> implementation of model serving can be designed as an abstract class
> on top
> >>> of transformer/model 

Apply for contributor permission

2019-05-09 Thread Gen Luo
Hi,

Could someone add me as a contributor? My JIRA username is c4e.
Thanks!