AsyncFunction vs Async Sink

2023-06-14 Thread Lu Niu
Hi, Flink dev and users

If I want to async write to an external service, which API shall I use,
AsyncFunction or Async Sink?

My understanding after checking the code are:

   1. Both APIs guarantee at least once write to external service. As both
   API internally stores in-flight requests in the checkpoint.
   2. Async Sink provides a batching request feature. This can be
   implemented with Map + AsyncFunction. Map function groups requests in
   batches and pass it to AsyncFunction.The batching implementation can refer
   to AbstractMapBundleOperator if don’t want to use state.
   3. Async Sink supports retry on failed requests. AsyncFunction also
   supports retry in latest flink version.
   4. Async Sink supports rate limiting, AsyncFunction doesn’t.
   5. AsyncFunction can be used to implement read-update-write. Async Sink
   cannot.

Best

Lu


Re: AsyncFunction vs Async Sink

2023-06-14 Thread Lu Niu
Thanks, Hong!

I understand that if the user case is to simply write sth to an external
service, Async Sink is a good option that provides features like batching,
state management and rate limiting. I have some follow up questions:

1. Is there any problem if we use Async Function for such a user case? We
can simply drop the output and use Unordered mode.
2. For AsyncFunction and  Async Sink. does it make sense that both could
share the same underlying implementation and the features like batching and
rate limiting can benefit both?

Best
Lu


On Wed, Jun 14, 2023 at 2:20 PM Teoh, Hong  wrote:

> Hi Lu,
>
> Thanks for your question. See below for my understanding.
>
> I would recommend using the Async Sink if you are writing to the external
> service as the final output of your job graph, and if you don’t have the
> ordered requirement that updates to the external system must be done before
> updates to some other external system within the same job graph. (More
> explained later).
>
> The abstraction of the Async Sink is a sink, meaning it is a terminal
> operator in the job graph. The abstraction is intended to simplify the
> writing of a sink - meaning the base implementation will handle batching,
> state management and rate limiting. You only need to provide the client and
> request structure to be used to interact with the external service. This
> makes writing and maintaining the sink easier (if you simply want to write
> to a destination with at least once processing).
>
> The AsyncFunction, as I understand it is more used for data enrichment,
> and is not a terminal operator in the job graph. This means the return
> value from the external service will continue to be passed on down the
> Flink Job graph. This is useful for data enrichment using the external
> service, or if we want to ensure the system being called in the
> AsyncFunction is updated BEFORE any data is written to the sinks further
> down the job graph.
>
> For example:
>
> Kinesis Source -> Map -> AsyncFunction (Updates DynamoDB) -> Kinesis Sink
>
> We can be sure that the updates to DynamoDB for a particular record
> happens before the record is written to the Kinesis Sink.
>
>
> Hope the above clarifies your question!
>
> Regards,
> Hong
>
>
> On 14 Jun 2023, at 19:27, Lu Niu  wrote:
>
> *CAUTION*: This email originated from outside of the organization. Do not
> click links or open attachments unless you can confirm the sender and know
> the content is safe.
>
> Hi, Flink dev and users
>
> If I want to async write to an external service, which API shall I use,
> AsyncFunction or Async Sink?
>
> My understanding after checking the code are:
>
>1. Both APIs guarantee at least once write to external service. As
>both API internally stores in-flight requests in the checkpoint.
>2. Async Sink provides a batching request feature. This can be
>implemented with Map + AsyncFunction. Map function groups requests in
>batches and pass it to AsyncFunction.The batching implementation can refer
>to AbstractMapBundleOperator if don’t want to use state.
>3. Async Sink supports retry on failed requests. AsyncFunction also
>supports retry in latest flink version.
>4. Async Sink supports rate limiting, AsyncFunction doesn’t.
>5. AsyncFunction can be used to implement read-update-write. Async
>Sink cannot.
>
> Best
>
> Lu
>
>
>


When does backpressure matter

2023-06-22 Thread Lu Niu
For example, if a flink job reads from kafka do something and writes to
kafka. Do we need to take any actions when the job kafka consumer lag is
low or 0 but some tasks have constant backpressure? Do we need to increase
the parallelism or do some network tuning so that backpressure is constant
0? If so, would that lead to resource overprovision?

Or is it that only when kafka lag keeps increasing while backpressure is
happening at the same time, we need to take action?


Best

Lu


status of FLINK-32476

2023-12-19 Thread Lu Niu
Hi, Flink dev

What's the current status of FLINK-32476?
https://issues.apache.org/jira/browse/FLINK-32476 . I see this feature is
deprioritized. We are interested in this feature and willing to work with
the community on this if no one is actively working on it.

Best
Lu


Re: [DISCUSS] FLIP-329: Add operator attribute to specify support for object-reuse

2024-01-04 Thread Lu Niu
Hi,

Is this still under active development? I notice
https://issues.apache.org/jira/browse/FLINK-32476 is labeled as
deprioritized. If this is the case, would it be acceptable for us to take
on the task?

Best
Lu



On Thu, Oct 19, 2023 at 4:26 PM Ken Krugler 
wrote:

> Hi Dong,
>
> Sorry for not seeing this initially. I did have one question about the
> description of the issue in the FLIP:
>
> > However, in cases where the upstream and downstream operators do not
> store or access references to the input or output records, this deep-copy
> overhead becomes unnecessary
>
> I was interested in getting clarification as to what you meant by “or
> access references…”, to see if it covered this situation:
>
> StreamX —forward--> operator1
> StreamX —forward--> operator2
>
> If operator1 modifies the record, and object re-use is enabled, then
> operator2 will see the modified version, right?
>
> Thanks,
>
> — Ken
>
> > On Jul 2, 2023, at 7:24 PM, Xuannan Su  wrote:
> >
> > Hi all,
> >
> > Dong(cc'ed) and I are opening this thread to discuss our proposal to
> > add operator attribute to allow operator to specify support for
> > object-reuse [1].
> >
> > Currently, the default configuration for pipeline.object-reuse is set
> > to false to avoid data corruption, which can result in suboptimal
> > performance. We propose adding APIs that operators can utilize to
> > inform the Flink runtime whether it is safe to reuse the emitted
> > records. This enhancement would enable Flink to maximize its
> > performance using the default configuration.
> >
> > Please refer to the FLIP document for more details about the proposed
> > design and implementation. We welcome any feedback and opinions on
> > this proposal.
> >
> > Best regards,
> >
> > Dong and Xuannan
> >
> > [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255073749
>
> --
> Ken Krugler
> http://www.scaleunlimited.com
> Custom big data solutions
> Flink & Pinot
>
>
>
>


Re: [DISCUSS] FLIP-329: Add operator attribute to specify support for object-reuse

2024-01-05 Thread Lu Niu
Thank you Dong and Xuannan!

Yes. We can take on this task. Any help during bootstrapping would be
greatly appreciated! I realize there is already a voting thread "[VOTE]
FLIP-329: Add operator attribute to specify support for object-reuse". What
else do we need?

Best
Lu

On Fri, Jan 5, 2024 at 12:46 AM Xuannan Su  wrote:

> Hi Lu,
>
> I believe this feature is very useful. However, I currently lack the
> capacity to work on it in the near future. I think it would be great
> if you could take on the task. I am willing to offer assistance if
> there are any questions about the FLIP, or to review the PR if needed.
>
> Please let me know if you are interested in taking over this task. And
> also think that we should start the voting thread if no future
> comments on this FLIP.
>
> Best,
> Xuannan
>
>
>
> On Fri, Jan 5, 2024 at 2:23 PM Dong Lin  wrote:
> >
> > Hi Lu,
> >
> > I am not actively working on Flink and this JIRA recently. If Xuannan
> does not plan to work on this anytime soon, I personally think it will be
> great if you can help work on this FLIP. Maybe we can start the voting
> thread if there is no further comment on this FLIP.
> >
> > Xuannan, what do you think?
> >
> > Thanks,
> > Dong
> >
> >
> > On Fri, Jan 5, 2024 at 2:03 AM Lu Niu  wrote:
> >>
> >> Hi,
> >>
> >> Is this still under active development? I notice
> https://issues.apache.org/jira/browse/FLINK-32476 is labeled as
> deprioritized. If this is the case, would it be acceptable for us to take
> on the task?
> >>
> >> Best
> >> Lu
> >>
> >>
> >>
> >> On Thu, Oct 19, 2023 at 4:26 PM Ken Krugler <
> kkrugler_li...@transpac.com> wrote:
> >>>
> >>> Hi Dong,
> >>>
> >>> Sorry for not seeing this initially. I did have one question about the
> description of the issue in the FLIP:
> >>>
> >>> > However, in cases where the upstream and downstream operators do not
> store or access references to the input or output records, this deep-copy
> overhead becomes unnecessary
> >>>
> >>> I was interested in getting clarification as to what you meant by “or
> access references…”, to see if it covered this situation:
> >>>
> >>> StreamX —forward--> operator1
> >>> StreamX —forward--> operator2
> >>>
> >>> If operator1 modifies the record, and object re-use is enabled, then
> operator2 will see the modified version, right?
> >>>
> >>> Thanks,
> >>>
> >>> — Ken
> >>>
> >>> > On Jul 2, 2023, at 7:24 PM, Xuannan Su 
> wrote:
> >>> >
> >>> > Hi all,
> >>> >
> >>> > Dong(cc'ed) and I are opening this thread to discuss our proposal to
> >>> > add operator attribute to allow operator to specify support for
> >>> > object-reuse [1].
> >>> >
> >>> > Currently, the default configuration for pipeline.object-reuse is set
> >>> > to false to avoid data corruption, which can result in suboptimal
> >>> > performance. We propose adding APIs that operators can utilize to
> >>> > inform the Flink runtime whether it is safe to reuse the emitted
> >>> > records. This enhancement would enable Flink to maximize its
> >>> > performance using the default configuration.
> >>> >
> >>> > Please refer to the FLIP document for more details about the proposed
> >>> > design and implementation. We welcome any feedback and opinions on
> >>> > this proposal.
> >>> >
> >>> > Best regards,
> >>> >
> >>> > Dong and Xuannan
> >>> >
> >>> > [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255073749
> >>>
> >>> --
> >>> Ken Krugler
> >>> http://www.scaleunlimited.com
> >>> Custom big data solutions
> >>> Flink & Pinot
> >>>
> >>>
> >>>
>


Re: [DISCUSS] FLIP-329: Add operator attribute to specify support for object-reuse

2024-01-08 Thread Lu Niu
sounds good. Is the requirement to send an email thread about the voting?
What else is needed? What's the passing criteria?

Best
Lu

On Sun, Jan 7, 2024 at 5:41 PM Xuannan Su  wrote:

> Hi Liu,
>
> The voting thread has been open for a long time. We may want to start
> a new voting thread. WDYT?
>
> Best,
> Xuannan
>
> On Sat, Jan 6, 2024 at 1:51 AM Lu Niu  wrote:
> >
> > Thank you Dong and Xuannan!
> >
> > Yes. We can take on this task. Any help during bootstrapping would be
> greatly appreciated! I realize there is already a voting thread "[VOTE]
> FLIP-329: Add operator attribute to specify support for object-reuse". What
> else do we need?
> >
> > Best
> > Lu
> >
> > On Fri, Jan 5, 2024 at 12:46 AM Xuannan Su 
> wrote:
> >>
> >> Hi Lu,
> >>
> >> I believe this feature is very useful. However, I currently lack the
> >> capacity to work on it in the near future. I think it would be great
> >> if you could take on the task. I am willing to offer assistance if
> >> there are any questions about the FLIP, or to review the PR if needed.
> >>
> >> Please let me know if you are interested in taking over this task. And
> >> also think that we should start the voting thread if no future
> >> comments on this FLIP.
> >>
> >> Best,
> >> Xuannan
> >>
> >>
> >>
> >> On Fri, Jan 5, 2024 at 2:23 PM Dong Lin  wrote:
> >> >
> >> > Hi Lu,
> >> >
> >> > I am not actively working on Flink and this JIRA recently. If Xuannan
> does not plan to work on this anytime soon, I personally think it will be
> great if you can help work on this FLIP. Maybe we can start the voting
> thread if there is no further comment on this FLIP.
> >> >
> >> > Xuannan, what do you think?
> >> >
> >> > Thanks,
> >> > Dong
> >> >
> >> >
> >> > On Fri, Jan 5, 2024 at 2:03 AM Lu Niu  wrote:
> >> >>
> >> >> Hi,
> >> >>
> >> >> Is this still under active development? I notice
> https://issues.apache.org/jira/browse/FLINK-32476 is labeled as
> deprioritized. If this is the case, would it be acceptable for us to take
> on the task?
> >> >>
> >> >> Best
> >> >> Lu
> >> >>
> >> >>
> >> >>
> >> >> On Thu, Oct 19, 2023 at 4:26 PM Ken Krugler <
> kkrugler_li...@transpac.com> wrote:
> >> >>>
> >> >>> Hi Dong,
> >> >>>
> >> >>> Sorry for not seeing this initially. I did have one question about
> the description of the issue in the FLIP:
> >> >>>
> >> >>> > However, in cases where the upstream and downstream operators do
> not store or access references to the input or output records, this
> deep-copy overhead becomes unnecessary
> >> >>>
> >> >>> I was interested in getting clarification as to what you meant by
> “or access references…”, to see if it covered this situation:
> >> >>>
> >> >>> StreamX —forward--> operator1
> >> >>> StreamX —forward--> operator2
> >> >>>
> >> >>> If operator1 modifies the record, and object re-use is enabled,
> then operator2 will see the modified version, right?
> >> >>>
> >> >>> Thanks,
> >> >>>
> >> >>> — Ken
> >> >>>
> >> >>> > On Jul 2, 2023, at 7:24 PM, Xuannan Su 
> wrote:
> >> >>> >
> >> >>> > Hi all,
> >> >>> >
> >> >>> > Dong(cc'ed) and I are opening this thread to discuss our proposal
> to
> >> >>> > add operator attribute to allow operator to specify support for
> >> >>> > object-reuse [1].
> >> >>> >
> >> >>> > Currently, the default configuration for pipeline.object-reuse is
> set
> >> >>> > to false to avoid data corruption, which can result in suboptimal
> >> >>> > performance. We propose adding APIs that operators can utilize to
> >> >>> > inform the Flink runtime whether it is safe to reuse the emitted
> >> >>> > records. This enhancement would enable Flink to maximize its
> >> >>> > performance using the default configuration.
> >> >>> >
> >> >>> > Please refer to the FLIP document for more details about the
> proposed
> >> >>> > design and implementation. We welcome any feedback and opinions on
> >> >>> > this proposal.
> >> >>> >
> >> >>> > Best regards,
> >> >>> >
> >> >>> > Dong and Xuannan
> >> >>> >
> >> >>> > [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255073749
> >> >>>
> >> >>> --
> >> >>> Ken Krugler
> >> >>> http://www.scaleunlimited.com
> >> >>> Custom big data solutions
> >> >>> Flink & Pinot
> >> >>>
> >> >>>
> >> >>>
>


About JobMananger metrics scope

2021-01-07 Thread Lu Niu
Hi, Flink Dev

First of all, Happy New Year! I have a question about JM monitoring.

According to
https://ci.apache.org/projects/flink/flink-docs-stable/ops/metrics.html,
metrics.scope.jm only have  one variable, which seems to be not
enough for YARN deployment mode:

1. The metric doesn't contain identifiers like application id or job id.
2. Since multiple flink jobs can run in a single yarn cluster, it is
possible that multiple JM can run in a single host, but inside different
containers. Under current scope, metrics belonging to two flink jobs will
share identical metric name. e,g,  .jobmanager.
Status.JVM.Memory.Heap.Committed

Is there any walkaround or fix in roadmap? Thanks!

Best
Lu


About metric name truncation

2021-03-25 Thread Lu Niu
Hi, Flink dev

https://issues.apache.org/jira/browse/FLINK-6898 truncates metric name to
less than 80. We plan to relax this in our environment. Want to ask here
whether it will cause any side effects? Thank you!

Best
Lu


Zigzag shape in TM JVM used memory

2021-04-04 Thread Lu Niu
Hi, Flink dev

We observed that the TM JVM used memory metric shows zigzag shape among
lots of our applications, although these applications are quite different
in business logic. The upper bound is close to the max heap size. Is this
expected in flink application? Or does flink internally
aggressively pre-allocate memory?

app1
[image: Screen Shot 2021-04-04 at 8.46.45 PM.png]
app2
[image: Screen Shot 2021-04-04 at 8.45.35 PM.png]
app3
[image: Screen Shot 2021-04-04 at 8.43.53 PM.png]

Best
Lu


Re: Zigzag shape in TM JVM used memory

2021-04-05 Thread Lu Niu
Hi,

we need to update our email system then :) . Here are the links:

https://drive.google.com/file/d/1lZ5_P8_NqsN1JeLzutGj4DxkyWJN75mR/view?usp=sharing

https://drive.google.com/file/d/1J6c6rQJwtDp1moAGlvQyLQXTqcuG4HjL/view?usp=sharing

https://drive.google.com/file/d/1-R2KzsABC471AEjkF5qTm5O3V47cpbBV/view?usp=sharing

All are DataStream job.

Best
Lu

On Sun, Apr 4, 2021 at 9:17 PM Yun Gao  wrote:

>
> Hi Lu,
>
> The image seems not be able to shown due to the mail server limitation,
> could you upload it somewhere and paste the link here ?
>
> Logically, I think zigzag usually due to there are some small object get
> created and eliminated soon in the heap. Are you running a SQL job or a
> DataStream job ?
>
> Best,
> Yun
>
> --
> Sender:Lu Niu
> Date:2021/04/05 12:06:24
> Recipient:dev@flink.apache.org
> Theme:Zigzag shape in TM JVM used memory
>
> Hi, Flink dev
>
> We observed that the TM JVM used memory metric shows zigzag shape among
> lots of our applications, although these applications are quite different
> in business logic. The upper bound is close to the max heap size. Is this
> expected in flink application? Or does flink internally
> aggressively pre-allocate memory?
>
> app1
> [image: Screen Shot 2021-04-04 at 8.46.45 PM.png]
> app2
> [image: Screen Shot 2021-04-04 at 8.45.35 PM.png]
> app3
> [image: Screen Shot 2021-04-04 at 8.43.53 PM.png]
>
> Best
> Lu
>
>


Automatic backpressure detection

2021-04-05 Thread Lu Niu
Hi, Flink dev

Lately, we want to develop some tools to:
1. show backpressure operator without manual operation
2. Provide suggestions to mitigate back pressure after checking data skew,
external service RPC etc.
3. Show back pressure history

Could anyone share their experience with such tooling?
Also, I notice backpressure monitoring and detection is mentioned across
multiple places. Could someone help to explain how these connect to each
other? Maybe some of them are outdated? Thanks!

1. The official doc introduces monitoring back pressure through web UI.
https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/monitoring/back_pressure.html
2. In https://flink.apache.org/2019/07/23/flink-network-stack-2.html, it
says outPoolUsage, inPoolUsage metrics can be used to determine back
pressure.
3. Latest flink version introduces metrics called “isBackPressured" But I
didn't find related documentation on usage.

Best
Lu


Re: Automatic backpressure detection

2021-04-06 Thread Lu Niu
Hi, Piotr

Thanks for replying!

We don't have a plan to upgrade to 1.13 in short term. We are using flink
1.11 and I notice there is a metric called isBackpressured. Is that enough
to solve 1? If not, would backporting patches regarding
backPressuredTimeMsPerSecond, busyTimeMsPerSecond and idleTimeMsPerSecond
work? And do you have an estimate of how difficult it is?


Best
Lu



On Tue, Apr 6, 2021 at 12:18 AM Piotr Nowojski  wrote:

> Hi,
>
> Lately we overhauled the backpressure detection [1] and a screenshot
> preview of those efforts is attached here [2]. I encourage you to check the
> 1.13 RC0 build and how the current mechanism works for you [3]. To support
> those WebUI changes we have added a couple of new metrics:
> backPressuredTimeMsPerSecond, busyTimeMsPerSecond and idleTimeMsPerSecond.
>
> 1. I believe that solves 1.
> 2. This still requires a bit of manual investigation. Once you locate
> backpressuring task, you can check the detail subtask stats to check if all
> parallel instances are uniformly backpressured/busy or not. If you would
> like to add a hint "it looks like you have a data skew in Task XYZ ", that
> I believe could be added to the WebUI.
> 3. The tricky part is how to display this kind of information. Currently I
> would recommend just export/report
> backPressuredTimeMsPerSecond, busyTimeMsPerSecond and idleTimeMsPerSecond
> metrics for every task to an external system and  display them for example
> in Graphana.
>
> The blog post you are referencing is quite outdated, especially with those
> new changes from 1.13. I'm hoping to write a new one pretty soon.
>
> Piotrek
>
> [1] https://issues.apache.org/jira/browse/FLINK-14712
> [2]
>
> https://issues.apache.org/jira/browse/FLINK-14814?focusedCommentId=17256926&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17256926
> [3]
>
> http://mail-archives.apache.org/mod_mbox/flink-user/202104.mbox/%3c1d2412ce-d4d0-ed50-6181-1b610e16d...@apache.org%3E
>
> pon., 5 kwi 2021 o 23:20 Lu Niu  napisał(a):
>
> > Hi, Flink dev
> >
> > Lately, we want to develop some tools to:
> > 1. show backpressure operator without manual operation
> > 2. Provide suggestions to mitigate back pressure after checking data
> skew,
> > external service RPC etc.
> > 3. Show back pressure history
> >
> > Could anyone share their experience with such tooling?
> > Also, I notice backpressure monitoring and detection is mentioned across
> > multiple places. Could someone help to explain how these connect to each
> > other? Maybe some of them are outdated? Thanks!
> >
> > 1. The official doc introduces monitoring back pressure through web UI.
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/monitoring/back_pressure.html
> > 2. In https://flink.apache.org/2019/07/23/flink-network-stack-2.html, it
> > says outPoolUsage, inPoolUsage metrics can be used to determine back
> > pressure.
> > 3. Latest flink version introduces metrics called “isBackPressured" But I
> > didn't find related documentation on usage.
> >
> > Best
> > Lu
> >
>


Re: Zigzag shape in TM JVM used memory

2021-04-07 Thread Lu Niu
Hi, Piotr

Thanks for replying. I asked this because such a pattern might imply memory
oversubscription. For example, I tuned down the memory of one app from heap
2.63GB to 367MB and the job still runs fine:
before:
https://drive.google.com/file/d/1o8k9Vv3yb5gXITi4GvmlXMteQcRfmOhr/view?usp=sharing

after:
https://drive.google.com/file/d/1wNTHBT8aSJaAmL1rVY8jUkdp-G5znnMN/view?usp=sharing


What's the best practice for tuning Flink job memory?

1. What’s a good start point users should try first?
2. How to make progress? e.g. flink application Foo currently encountered
error OOM: java heap space. Where to move next? simply bump up
taskmananger.memory? or just increase heap?
3. What’s the final state? Job running fine and ensuring XYZ headroom in
each memory component?

Best
Lu

On Tue, Apr 6, 2021 at 12:26 AM Piotr Nowojski  wrote:

> Hi,
>
> this should be posted on the user mailing list not the dev.
>
> Apart from that, this looks like normal/standard behaviour of JVM, and has
> very little to do with Flink. Garbage Collector (GC) is kicking in when
> memory usage is approaching some threshold:
> https://www.google.com/search?q=jvm+heap+memory+usage&tbm=isch
>
> Piotrek
>
>
> pon., 5 kwi 2021 o 22:54 Lu Niu  napisał(a):
>
> > Hi,
> >
> > we need to update our email system then :) . Here are the links:
> >
> >
> >
> https://drive.google.com/file/d/1lZ5_P8_NqsN1JeLzutGj4DxkyWJN75mR/view?usp=sharing
> >
> >
> >
> https://drive.google.com/file/d/1J6c6rQJwtDp1moAGlvQyLQXTqcuG4HjL/view?usp=sharing
> >
> >
> >
> https://drive.google.com/file/d/1-R2KzsABC471AEjkF5qTm5O3V47cpbBV/view?usp=sharing
> >
> > All are DataStream job.
> >
> > Best
> > Lu
> >
> > On Sun, Apr 4, 2021 at 9:17 PM Yun Gao  wrote:
> >
> > >
> > > Hi Lu,
> > >
> > > The image seems not be able to shown due to the mail server limitation,
> > > could you upload it somewhere and paste the link here ?
> > >
> > > Logically, I think zigzag usually due to there are some small object
> get
> > > created and eliminated soon in the heap. Are you running a SQL job or a
> > > DataStream job ?
> > >
> > > Best,
> > > Yun
> > >
> > > --
> > > Sender:Lu Niu
> > > Date:2021/04/05 12:06:24
> > > Recipient:dev@flink.apache.org
> > > Theme:Zigzag shape in TM JVM used memory
> > >
> > > Hi, Flink dev
> > >
> > > We observed that the TM JVM used memory metric shows zigzag shape among
> > > lots of our applications, although these applications are quite
> different
> > > in business logic. The upper bound is close to the max heap size. Is
> this
> > > expected in flink application? Or does flink internally
> > > aggressively pre-allocate memory?
> > >
> > > app1
> > > [image: Screen Shot 2021-04-04 at 8.46.45 PM.png]
> > > app2
> > > [image: Screen Shot 2021-04-04 at 8.45.35 PM.png]
> > > app3
> > > [image: Screen Shot 2021-04-04 at 8.43.53 PM.png]
> > >
> > > Best
> > > Lu
> > >
> > >
> >
>


Re: Automatic backpressure detection

2021-04-12 Thread Lu Niu
Hi, Piotr

Thanks for your detailed reply! It is mentioned here we cannot observe
backpressure generated from  AsyncOperator in Flink UI in 1.9.1. Is it
fixed in the latest version? Thank you!
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Async-Function-Not-Generating-Backpressure-td26766.html

Best
Lu

On Tue, Apr 6, 2021 at 11:14 PM Piotr Nowojski  wrote:

> Hi,
>
> Yes, you can use `isBackPressured` to monitor a task's back-pressure.
> However keep in mind:
> a) You are going to miss some nice way to visualize this information, which
> is present in 1.13's WebUI.
> b) `isBackPressured` is a sampling based metric. If your job has varying
> load, for example all windows firing at the same processing time, every
> couple of seconds, causing intermittent back-pressure, this metric will
> show it randomly as `true` or `false`.
> c) `isBackPressured` is slightly less accurate compared to
> `backPressuredTimeMsPerSecond`. There are some corner cases when for a
> brief amount of time it can return `true`, while a task is still running,
> while the time based metrics work in a different much more accurate way.
>
> About back porting the patches, if you want to create a custom Flink build
> it should be do-able. There will be some conflicts for sure, so you will
> need to understand Flink's code.
>
> Best,
> Piotrek
>
> śr., 7 kwi 2021 o 02:32 Lu Niu  napisał(a):
>
> > Hi, Piotr
> >
> > Thanks for replying!
> >
> > We don't have a plan to upgrade to 1.13 in short term. We are using flink
> > 1.11 and I notice there is a metric called isBackpressured. Is that
> enough
> > to solve 1? If not, would backporting patches regarding
> > backPressuredTimeMsPerSecond, busyTimeMsPerSecond and idleTimeMsPerSecond
> > work? And do you have an estimate of how difficult it is?
> >
> >
> > Best
> > Lu
> >
> >
> >
> > On Tue, Apr 6, 2021 at 12:18 AM Piotr Nowojski 
> > wrote:
> >
> > > Hi,
> > >
> > > Lately we overhauled the backpressure detection [1] and a screenshot
> > > preview of those efforts is attached here [2]. I encourage you to check
> > the
> > > 1.13 RC0 build and how the current mechanism works for you [3]. To
> > support
> > > those WebUI changes we have added a couple of new metrics:
> > > backPressuredTimeMsPerSecond, busyTimeMsPerSecond and
> > idleTimeMsPerSecond.
> > >
> > > 1. I believe that solves 1.
> > > 2. This still requires a bit of manual investigation. Once you locate
> > > backpressuring task, you can check the detail subtask stats to check if
> > all
> > > parallel instances are uniformly backpressured/busy or not. If you
> would
> > > like to add a hint "it looks like you have a data skew in Task XYZ ",
> > that
> > > I believe could be added to the WebUI.
> > > 3. The tricky part is how to display this kind of information.
> Currently
> > I
> > > would recommend just export/report
> > > backPressuredTimeMsPerSecond, busyTimeMsPerSecond and
> idleTimeMsPerSecond
> > > metrics for every task to an external system and  display them for
> > example
> > > in Graphana.
> > >
> > > The blog post you are referencing is quite outdated, especially with
> > those
> > > new changes from 1.13. I'm hoping to write a new one pretty soon.
> > >
> > > Piotrek
> > >
> > > [1] https://issues.apache.org/jira/browse/FLINK-14712
> > > [2]
> > >
> > >
> >
> https://issues.apache.org/jira/browse/FLINK-14814?focusedCommentId=17256926&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17256926
> > > [3]
> > >
> > >
> >
> http://mail-archives.apache.org/mod_mbox/flink-user/202104.mbox/%3c1d2412ce-d4d0-ed50-6181-1b610e16d...@apache.org%3E
> > >
> > > pon., 5 kwi 2021 o 23:20 Lu Niu  napisał(a):
> > >
> > > > Hi, Flink dev
> > > >
> > > > Lately, we want to develop some tools to:
> > > > 1. show backpressure operator without manual operation
> > > > 2. Provide suggestions to mitigate back pressure after checking data
> > > skew,
> > > > external service RPC etc.
> > > > 3. Show back pressure history
> > > >
> > > > Could anyone share their experience with such tooling?
> > > > Also, I notice backpressure monitoring and detection is mentioned
> > across
> > > > multiple places. Could someone help to explain how these connect to
> > each
> > > > other? Maybe some of them are outdated? Thanks!
> > > >
> > > > 1. The official doc introduces monitoring back pressure through web
> UI.
> > > >
> > > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/monitoring/back_pressure.html
> > > > 2. In https://flink.apache.org/2019/07/23/flink-network-stack-2.html
> ,
> > it
> > > > says outPoolUsage, inPoolUsage metrics can be used to determine back
> > > > pressure.
> > > > 3. Latest flink version introduces metrics called “isBackPressured"
> > But I
> > > > didn't find related documentation on usage.
> > > >
> > > > Best
> > > > Lu
> > > >
> > >
> >
>


Re: Automatic backpressure detection

2021-04-13 Thread Lu Niu
Cool. Thanks!

Best
Lu

On Mon, Apr 12, 2021 at 11:27 PM Piotr Nowojski 
wrote:

> Hi,
>
> Yes. Back-pressure from AsyncOperator should be correctly reported via
> isBackPressured, backPressuredMsPerSecond metrics and by extension in the
> WebUI from 1.13.
>
> Piotre
>
> pon., 12 kwi 2021 o 23:17 Lu Niu  napisał(a):
>
> > Hi, Piotr
> >
> > Thanks for your detailed reply! It is mentioned here we cannot observe
> > backpressure generated from  AsyncOperator in Flink UI in 1.9.1. Is it
> > fixed in the latest version? Thank you!
> >
> >
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Async-Function-Not-Generating-Backpressure-td26766.html
> >
> > Best
> > Lu
> >
> > On Tue, Apr 6, 2021 at 11:14 PM Piotr Nowojski 
> > wrote:
> >
> > > Hi,
> > >
> > > Yes, you can use `isBackPressured` to monitor a task's back-pressure.
> > > However keep in mind:
> > > a) You are going to miss some nice way to visualize this information,
> > which
> > > is present in 1.13's WebUI.
> > > b) `isBackPressured` is a sampling based metric. If your job has
> varying
> > > load, for example all windows firing at the same processing time, every
> > > couple of seconds, causing intermittent back-pressure, this metric will
> > > show it randomly as `true` or `false`.
> > > c) `isBackPressured` is slightly less accurate compared to
> > > `backPressuredTimeMsPerSecond`. There are some corner cases when for a
> > > brief amount of time it can return `true`, while a task is still
> running,
> > > while the time based metrics work in a different much more accurate
> way.
> > >
> > > About back porting the patches, if you want to create a custom Flink
> > build
> > > it should be do-able. There will be some conflicts for sure, so you
> will
> > > need to understand Flink's code.
> > >
> > > Best,
> > > Piotrek
> > >
> > > śr., 7 kwi 2021 o 02:32 Lu Niu  napisał(a):
> > >
> > > > Hi, Piotr
> > > >
> > > > Thanks for replying!
> > > >
> > > > We don't have a plan to upgrade to 1.13 in short term. We are using
> > flink
> > > > 1.11 and I notice there is a metric called isBackpressured. Is that
> > > enough
> > > > to solve 1? If not, would backporting patches regarding
> > > > backPressuredTimeMsPerSecond, busyTimeMsPerSecond and
> > idleTimeMsPerSecond
> > > > work? And do you have an estimate of how difficult it is?
> > > >
> > > >
> > > > Best
> > > > Lu
> > > >
> > > >
> > > >
> > > > On Tue, Apr 6, 2021 at 12:18 AM Piotr Nowojski  >
> > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > Lately we overhauled the backpressure detection [1] and a
> screenshot
> > > > > preview of those efforts is attached here [2]. I encourage you to
> > check
> > > > the
> > > > > 1.13 RC0 build and how the current mechanism works for you [3]. To
> > > > support
> > > > > those WebUI changes we have added a couple of new metrics:
> > > > > backPressuredTimeMsPerSecond, busyTimeMsPerSecond and
> > > > idleTimeMsPerSecond.
> > > > >
> > > > > 1. I believe that solves 1.
> > > > > 2. This still requires a bit of manual investigation. Once you
> locate
> > > > > backpressuring task, you can check the detail subtask stats to
> check
> > if
> > > > all
> > > > > parallel instances are uniformly backpressured/busy or not. If you
> > > would
> > > > > like to add a hint "it looks like you have a data skew in Task XYZ
> ",
> > > > that
> > > > > I believe could be added to the WebUI.
> > > > > 3. The tricky part is how to display this kind of information.
> > > Currently
> > > > I
> > > > > would recommend just export/report
> > > > > backPressuredTimeMsPerSecond, busyTimeMsPerSecond and
> > > idleTimeMsPerSecond
> > > > > metrics for every task to an external system and  display them for
> > > > example
> > > > > in Graphana.
> > > > >
> > > > > The blog post you are referencing is quite outdated, especially
> with
> > > > those
> > > > > n

Add jobId and JobName variable to JobManager metrics in per-job deployment mode

2021-04-13 Thread Lu Niu
Hi, Flink dev

Could you share your thoughts about
https://issues.apache.org/jira/browse/FLINK-22164 ?

context:
We expose all flink metrics to an external system for monitoring and
alerting. However, JobManager metrics only have one variable ,
which is not enough to target to one job when job is deployed to YARN. If
flink job runs in per-job mode, which ensure one job per cluster, we can
add jobId and JobName to JobMananger metrics.


Best
Lu


Flink TM Heartbeat Timeout

2021-06-10 Thread Lu Niu
Hi, Flink User

Several of our applications get heartbeat timeout occasionally. there is no
GC, no OOM:
```
- realtime conversion event filter (49/120)
(16e3d3e7608eed0a30e0508eb52065fd) switched from RUNNING to FAILED on
container_e05_1599158866703_129001_01_000111 @
xenon-pii-prod-001-20191210-data-slave-prod-0a01bbdd.ec2.pin220.com
(dataPort=39013). java.util.concurrent.TimeoutException: Heartbeat of
TaskManager with id container_e05_1599158866703_129001_01_000111 timed out.
at
org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1193)
at
org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:109)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at
akka.actor.Actor$class.aroundReceive(Actor.scala:517) at
akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at
akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at
akka.actor.ActorCell.invoke(ActorCell.scala:561) at
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at
akka.dispatch.Mailbox.run(Mailbox.scala:225) at
akka.dispatch.Mailbox.exec(Mailbox.scala:235) at
akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
```
Shall we try increase `heartbeat.timeout` ? Any side effects? E.g, would
that lead to slower detection when TM container is killed by YARN?

we use flink 1.11

Best
Lu
Pinterest, Inc.


flink 1.9 Restore from a checkpoint taken in 1.11

2020-11-30 Thread Lu Niu
Hi, Flink dev

Is it supported that a flink job in version 1.9 could restore from a
checkpoint taken from the same job using 1.11? The context is we are
migrating to version 1.11 and we need a backup plan for emergency fallback.
We did a test and it throws error:
```
Caused by: org.apache.flink.runtime.rest.util.RestClientException:
[Internal server error., (JobManagerRunner.java:152)
at
org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:83)
at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:387)
at
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
... 7 more
Caused by: java.lang.IllegalArgumentException: Cannot restore savepoint
version 3.
at
org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializers.getSerializer(SavepointSerializers.java:80)
at
org.apache.flink.runtime.checkpoint.Checkpoints.loadCheckpointMetadata(Checkpoints.java:106)
at
org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCheckpoint(Checkpoints.java:143)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1099)
at
org.apache.flink.runtime.scheduler.LegacyScheduler.tryRestoreExecutionGraphFromSavepoint(LegacyScheduler.java:237)
at
org.apache.flink.runtime.scheduler.LegacyScheduler.createAndRestoreExecutionGraph(LegacyScheduler.java:196)
at
org.apache.flink.runtime.scheduler.LegacyScheduler.(LegacyScheduler.java:176)
at
org.apache.flink.runtime.scheduler.LegacySchedulerFactory.createInstance(LegacySchedulerFactory.java:70)
at
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:278)
at
org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:266)
at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
at
org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:146)
... 10 more

End of exception on server side>]
```
So it seems not. I just want to confirm that with the community.


Best
Lu


Re: Job Recovery Time on TM Lost

2021-06-30 Thread Lu Niu
Thanks Gen! cc flink-dev to collect more inputs.

Best
Lu

On Wed, Jun 30, 2021 at 12:55 AM Gen Luo  wrote:

> I'm also wondering here.
>
> In my opinion, it's because the JM can not confirm whether the TM is lost
> or it's a temporary network trouble and will recover soon, since I can see
> in the log that akka has got a Connection refused but JM still sends a
> heartbeat request to the lost TM until it reaches heartbeat timeout. But
> I'm not sure if it's indeed designed like this.
>
> I would really appreciate it if anyone who knows more details could
> answer. Thanks.
>


Re: Job Recovery Time on TM Lost

2021-07-01 Thread Lu Niu
Thanks TIll and Yang for help! Also Thanks Till for a quick fix!

I did another test yesterday. In this test, I intentionally throw exception
from the source operator:
```
if (runtimeContext.getIndexOfThisSubtask() == 1
&& errorFrenquecyInMin > 0
&& System.currentTimeMillis() - lastStartTime >=
errorFrenquecyInMin * 60 * 1000) {
  lastStartTime = System.currentTimeMillis();
  throw new RuntimeException(
  "Trigger expected exception at: " + lastStartTime);
}
```
In this case, I found phase 1 still takes about 30s and Phase 2 dropped to
1s (because no need for container allocation).

Some logs:
```
```


On Thu, Jul 1, 2021 at 6:28 AM Till Rohrmann  wrote:

> A quick addition, I think with FLINK-23202 it should now also be possible
> to improve the heartbeat mechanism in the general case. We can leverage the
> unreachability exception thrown if a remote target is no longer reachable
> to mark an heartbeat target as no longer reachable [1]. This can then be
> considered as if the heartbeat timeout has been triggered. That way we
> should detect lost TaskExecutors as fast as our heartbeat interval is.
>
> [1] https://issues.apache.org/jira/browse/FLINK-23209
>
> Cheers,
> Till
>
> On Thu, Jul 1, 2021 at 1:46 PM Yang Wang  wrote:
>
>> Since you are deploying Flink workloads on Yarn, the Flink
>> ResourceManager should get the container
>> completion event after the heartbeat of Yarn NM->Yarn RM->Flink RM, which
>> is 8 seconds by default.
>> And Flink ResourceManager will release the dead TaskManager container
>> once received the completion event.
>> As a result, Flink will not deploy tasks onto the dead TaskManagers.
>>
>>
>> I think most of the time cost in Phase 1 might be cancelling the tasks on
>> the dead TaskManagers.
>>
>>
>> Best,
>> Yang
>>
>>
>> Till Rohrmann  于2021年7月1日周四 下午4:49写道:
>>
>>> The analysis of Gen is correct. Flink currently uses its heartbeat as
>>> the primary means to detect dead TaskManagers. This means that Flink will
>>> take at least `heartbeat.timeout` time before the system recovers. Even if
>>> the cancellation happens fast (e.g. by having configured a low
>>> akka.ask.timeout), then Flink will still try to deploy tasks onto the dead
>>> TaskManager until it is marked as dead and its slots are released (unless
>>> the ResourceManager does not get a signal from the underlying resource
>>> management system that a container/pod has died). One way to improve the
>>> situation is to introduce logic which can react to a ConnectionException
>>> and then black lists or releases a TaskManager, for example. This is
>>> currently not implemented in Flink, though.
>>>
>>> Concerning the cancellation operation: Flink currently does not listen
>>> to the dead letters of Akka. This means that the `akka.ask.timeout` is the
>>> primary means to fail the future result of a rpc which could not be sent.
>>> This is also an improvement we should add to Flink's RpcService. I've
>>> created a JIRA issue for this problem [1].
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-23202
>>>
>>> Cheers,
>>> Till
>>>
>>> On Wed, Jun 30, 2021 at 6:33 PM Lu Niu  wrote:
>>>
>>>> Thanks Gen! cc flink-dev to collect more inputs.
>>>>
>>>> Best
>>>> Lu
>>>>
>>>> On Wed, Jun 30, 2021 at 12:55 AM Gen Luo  wrote:
>>>>
>>>>> I'm also wondering here.
>>>>>
>>>>> In my opinion, it's because the JM can not confirm whether the TM is
>>>>> lost or it's a temporary network trouble and will recover soon, since I 
>>>>> can
>>>>> see in the log that akka has got a Connection refused but JM still sends a
>>>>> heartbeat request to the lost TM until it reaches heartbeat timeout. But
>>>>> I'm not sure if it's indeed designed like this.
>>>>>
>>>>> I would really appreciate it if anyone who knows more details could
>>>>> answer. Thanks.
>>>>>
>>>>


Re: Job Recovery Time on TM Lost

2021-07-01 Thread Lu Niu
Thanks TIll and Yang for help! Also Thanks Till for a quick fix!

I did another test yesterday. In this test, I intentionally throw exception
from the source operator:
```
if (runtimeContext.getIndexOfThisSubtask() == 1
&& errorFrenquecyInMin > 0
&& System.currentTimeMillis() - lastStartTime >=
errorFrenquecyInMin * 60 * 1000) {
  lastStartTime = System.currentTimeMillis();
  throw new RuntimeException(
  "Trigger expected exception at: " + lastStartTime);
}
```
In this case, I found phase 1 still takes about 30s and Phase 2 dropped to
1s (because no need for container allocation). Why phase 1 still takes 30s
even though no TM is lost?

Related logs:
```
2021-06-30 00:55:07,463 INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph- Source:
USER_MATERIALIZED_EVENT_SIGNAL-user_context-frontend_event_source -> ...
java.lang.RuntimeException: Trigger expected exception at: 1625014507446
2021-06-30 00:55:07,509 INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
NRTG_USER_MATERIALIZED_EVENT_SIGNAL_user_context_staging
(35c95ee7141334845cfd49b642fa9f98) switched from state RUNNING to
RESTARTING.
2021-06-30 00:55:37,596 INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
NRTG_USER_MATERIALIZED_EVENT_SIGNAL_user_context_staging
(35c95ee7141334845cfd49b642fa9f98) switched from state RESTARTING to
RUNNING.
2021-06-30 00:55:38,678 INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph(time when
all tasks switch from CREATED to RUNNING)
```
Best
Lu


On Thu, Jul 1, 2021 at 12:06 PM Lu Niu  wrote:

> Thanks TIll and Yang for help! Also Thanks Till for a quick fix!
>
> I did another test yesterday. In this test, I intentionally throw
> exception from the source operator:
> ```
> if (runtimeContext.getIndexOfThisSubtask() == 1
> && errorFrenquecyInMin > 0
> && System.currentTimeMillis() - lastStartTime >=
> errorFrenquecyInMin * 60 * 1000) {
>   lastStartTime = System.currentTimeMillis();
>   throw new RuntimeException(
>   "Trigger expected exception at: " + lastStartTime);
> }
> ```
> In this case, I found phase 1 still takes about 30s and Phase 2 dropped to
> 1s (because no need for container allocation).
>
> Some logs:
> ```
> ```
>
>
> On Thu, Jul 1, 2021 at 6:28 AM Till Rohrmann  wrote:
>
>> A quick addition, I think with FLINK-23202 it should now also be possible
>> to improve the heartbeat mechanism in the general case. We can leverage the
>> unreachability exception thrown if a remote target is no longer reachable
>> to mark an heartbeat target as no longer reachable [1]. This can then be
>> considered as if the heartbeat timeout has been triggered. That way we
>> should detect lost TaskExecutors as fast as our heartbeat interval is.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-23209
>>
>> Cheers,
>> Till
>>
>> On Thu, Jul 1, 2021 at 1:46 PM Yang Wang  wrote:
>>
>>> Since you are deploying Flink workloads on Yarn, the Flink
>>> ResourceManager should get the container
>>> completion event after the heartbeat of Yarn NM->Yarn RM->Flink RM,
>>> which is 8 seconds by default.
>>> And Flink ResourceManager will release the dead TaskManager container
>>> once received the completion event.
>>> As a result, Flink will not deploy tasks onto the dead TaskManagers.
>>>
>>>
>>> I think most of the time cost in Phase 1 might be cancelling the tasks
>>> on the dead TaskManagers.
>>>
>>>
>>> Best,
>>> Yang
>>>
>>>
>>> Till Rohrmann  于2021年7月1日周四 下午4:49写道:
>>>
>>>> The analysis of Gen is correct. Flink currently uses its heartbeat as
>>>> the primary means to detect dead TaskManagers. This means that Flink will
>>>> take at least `heartbeat.timeout` time before the system recovers. Even if
>>>> the cancellation happens fast (e.g. by having configured a low
>>>> akka.ask.timeout), then Flink will still try to deploy tasks onto the dead
>>>> TaskManager until it is marked as dead and its slots are released (unless
>>>> the ResourceManager does not get a signal from the underlying resource
>>>> management system that a container/pod has died). One way to improve the
>>>> situation is to introduce logic which can react to a ConnectionException
>>>> and then black lists or releases a TaskManager, for example. This is
>>>> currently not implemented in Flink, though.
>>>>
>>>> Concerning the cancellation operation: Flink currently doe

Re: Job Recovery Time on TM Lost

2021-07-01 Thread Lu Niu
Another side question, Shall we add metric to cover the complete restarting
time (phase 1 + phase 2)? Current metric jm.restartingTime only covers
phase 1. Thanks!

Best
Lu

On Thu, Jul 1, 2021 at 12:09 PM Lu Niu  wrote:

> Thanks TIll and Yang for help! Also Thanks Till for a quick fix!
>
> I did another test yesterday. In this test, I intentionally throw
> exception from the source operator:
> ```
> if (runtimeContext.getIndexOfThisSubtask() == 1
> && errorFrenquecyInMin > 0
> && System.currentTimeMillis() - lastStartTime >=
> errorFrenquecyInMin * 60 * 1000) {
>   lastStartTime = System.currentTimeMillis();
>   throw new RuntimeException(
>   "Trigger expected exception at: " + lastStartTime);
> }
> ```
> In this case, I found phase 1 still takes about 30s and Phase 2 dropped to
> 1s (because no need for container allocation). Why phase 1 still takes 30s
> even though no TM is lost?
>
> Related logs:
> ```
> 2021-06-30 00:55:07,463 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph- Source:
> USER_MATERIALIZED_EVENT_SIGNAL-user_context-frontend_event_source -> ...
> java.lang.RuntimeException: Trigger expected exception at: 1625014507446
> 2021-06-30 00:55:07,509 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
> NRTG_USER_MATERIALIZED_EVENT_SIGNAL_user_context_staging
> (35c95ee7141334845cfd49b642fa9f98) switched from state RUNNING to
> RESTARTING.
> 2021-06-30 00:55:37,596 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
> NRTG_USER_MATERIALIZED_EVENT_SIGNAL_user_context_staging
> (35c95ee7141334845cfd49b642fa9f98) switched from state RESTARTING to
> RUNNING.
> 2021-06-30 00:55:38,678 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph(time when
> all tasks switch from CREATED to RUNNING)
> ```
> Best
> Lu
>
>
> On Thu, Jul 1, 2021 at 12:06 PM Lu Niu  wrote:
>
>> Thanks TIll and Yang for help! Also Thanks Till for a quick fix!
>>
>> I did another test yesterday. In this test, I intentionally throw
>> exception from the source operator:
>> ```
>> if (runtimeContext.getIndexOfThisSubtask() == 1
>> && errorFrenquecyInMin > 0
>> && System.currentTimeMillis() - lastStartTime >=
>> errorFrenquecyInMin * 60 * 1000) {
>>   lastStartTime = System.currentTimeMillis();
>>   throw new RuntimeException(
>>   "Trigger expected exception at: " + lastStartTime);
>> }
>> ```
>> In this case, I found phase 1 still takes about 30s and Phase 2 dropped
>> to 1s (because no need for container allocation).
>>
>> Some logs:
>> ```
>> ```
>>
>>
>> On Thu, Jul 1, 2021 at 6:28 AM Till Rohrmann 
>> wrote:
>>
>>> A quick addition, I think with FLINK-23202 it should now also be
>>> possible to improve the heartbeat mechanism in the general case. We can
>>> leverage the unreachability exception thrown if a remote target is no
>>> longer reachable to mark an heartbeat target as no longer reachable [1].
>>> This can then be considered as if the heartbeat timeout has been triggered.
>>> That way we should detect lost TaskExecutors as fast as our heartbeat
>>> interval is.
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-23209
>>>
>>> Cheers,
>>> Till
>>>
>>> On Thu, Jul 1, 2021 at 1:46 PM Yang Wang  wrote:
>>>
>>>> Since you are deploying Flink workloads on Yarn, the Flink
>>>> ResourceManager should get the container
>>>> completion event after the heartbeat of Yarn NM->Yarn RM->Flink RM,
>>>> which is 8 seconds by default.
>>>> And Flink ResourceManager will release the dead TaskManager container
>>>> once received the completion event.
>>>> As a result, Flink will not deploy tasks onto the dead TaskManagers.
>>>>
>>>>
>>>> I think most of the time cost in Phase 1 might be cancelling the tasks
>>>> on the dead TaskManagers.
>>>>
>>>>
>>>> Best,
>>>> Yang
>>>>
>>>>
>>>> Till Rohrmann  于2021年7月1日周四 下午4:49写道:
>>>>
>>>>> The analysis of Gen is correct. Flink currently uses its heartbeat as
>>>>> the primary means to detect dead TaskManagers. This means that Flink will
>>>>> take at least `heartbeat.timeout` time before the system recovers. Even if
>>>>> the cancellation happens fast (e.g. by having co

Re: Job Recovery Time on TM Lost

2021-07-08 Thread Lu Niu
d
>> by TM
>> >>>> or JM. So maybe it's not that worthy to introduce extra
>> configurations for
>> >>>> fault tolerance of heartbeat, unless we also introduce some retry
>> >>>> strategies for netty connections.
>> >>>>
>> >>>>
>> >>>> On Fri, Jul 2, 2021 at 9:34 PM Till Rohrmann 
>> >>>> wrote:
>> >>>>
>> >>>>> Could you share the full logs with us for the second experiment,
>> Lu? I
>> >>>>> cannot tell from the top of my head why it should take 30s unless
>> you have
>> >>>>> configured a restart delay of 30s.
>> >>>>>
>> >>>>> Let's discuss FLINK-23216 on the JIRA ticket, Gen.
>> >>>>>
>> >>>>> I've now implemented FLINK-23209 [1] but it somehow has the problem
>> >>>>> that in a flakey environment you might not want to mark a
>> TaskExecutor dead
>> >>>>> on the first connection loss. Maybe this is something we need to
>> make
>> >>>>> configurable (e.g. introducing a threshold which admittedly is
>> similar to
>> >>>>> the heartbeat timeout) so that the user can configure it for her
>> >>>>> environment. On the upside, if you mark the TaskExecutor dead on
>> the first
>> >>>>> connection loss (assuming you have a stable network environment),
>> then it
>> >>>>> can now detect lost TaskExecutors as fast as the heartbeat interval.
>> >>>>>
>> >>>>> [1] https://issues.apache.org/jira/browse/FLINK-23209
>> >>>>>
>> >>>>> Cheers,
>> >>>>> Till
>> >>>>>
>> >>>>> On Fri, Jul 2, 2021 at 9:33 AM Gen Luo  wrote:
>> >>>>>
>> >>>>>> Thanks for sharing, Till and Yang.
>> >>>>>>
>> >>>>>> @Lu
>> >>>>>> Sorry but I don't know how to explain the new test with the log.
>> >>>>>> Let's wait for others' reply.
>> >>>>>>
>> >>>>>> @Till
>> >>>>>> It would be nice if JIRAs could be fixed. Thanks again for
>> proposing
>> >>>>>> them.
>> >>>>>>
>> >>>>>> In addition, I was tracking an issue that RM keeps allocating and
>> >>>>>> freeing slots after a TM lost until its heartbeat timeout, when I
>> found the
>> >>>>>> recovery costing as long as heartbeat timeout. That should be a
>> minor bug
>> >>>>>> introduced by declarative resource management. I have created a
>> JIRA about
>> >>>>>> the problem [1] and  we can discuss it there if necessary.
>> >>>>>>
>> >>>>>> [1] https://issues.apache.org/jira/browse/FLINK-23216
>> >>>>>>
>> >>>>>> Lu Niu  于2021年7月2日周五 上午3:13写道:
>> >>>>>>
>> >>>>>>> Another side question, Shall we add metric to cover the complete
>> >>>>>>> restarting time (phase 1 + phase 2)? Current metric
>> jm.restartingTime only
>> >>>>>>> covers phase 1. Thanks!
>> >>>>>>>
>> >>>>>>> Best
>> >>>>>>> Lu
>> >>>>>>>
>> >>>>>>> On Thu, Jul 1, 2021 at 12:09 PM Lu Niu  wrote:
>> >>>>>>>
>> >>>>>>>> Thanks TIll and Yang for help! Also Thanks Till for a quick fix!
>> >>>>>>>>
>> >>>>>>>> I did another test yesterday. In this test, I intentionally throw
>> >>>>>>>> exception from the source operator:
>> >>>>>>>> ```
>> >>>>>>>> if (runtimeContext.getIndexOfThisSubtask() == 1
>> >>>>>>>> && errorFrenquecyInMin > 0
>> >>>>>>>> && System.currentTimeMillis() - lastStartTime >=
>> >>>>>>>> errorFrenquecyInMin * 60 * 1000) {
>> >>>>>>>>   lastStartTime = System.currentTimeMillis();
>> >>>>>>>>   throw new RuntimeException(
>> >>>

[jira] [Created] (FLINK-33804) Add Option to disable showing metrics in JobMananger UI

2023-12-12 Thread Lu Niu (Jira)
Lu Niu created FLINK-33804:
--

 Summary: Add Option to disable showing metrics in JobMananger UI
 Key: FLINK-33804
 URL: https://issues.apache.org/jira/browse/FLINK-33804
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Metrics
Reporter: Lu Niu


Flink allows users to view metric in JobMananger UI. However there are 2 
problems we found:
 # The JobManager is required to aggregate metrics from all task managers. When 
the metric cardinality is quite high, this process can trigger a JobManager 
Full GC and slow response time.
 # Flink user cases in prod usually have their own dashboard to view metrics. 
so this feature sometimes is not useful.

In light of this, we propose to add option to disable this feature.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33806) Async IO Allows Custom Action after Final Retry Failure

2023-12-12 Thread Lu Niu (Jira)
Lu Niu created FLINK-33806:
--

 Summary:  Async IO Allows Custom Action after Final Retry Failure
 Key: FLINK-33806
 URL: https://issues.apache.org/jira/browse/FLINK-33806
 Project: Flink
  Issue Type: Improvement
  Components: API / DataStream
Reporter: Lu Niu


in Async IO Retry Support, if all retries fail, the record is dropped without 
any further action. However, there are user cases requiring action after the 
final retry failure occurs, e.g, log out the input or write the input data to 
an external storage.  To address this, we propose to add a new API in the 
AsyncRetryStrategy and make changes accordingly. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-16640) Expose listStatus latency in flink filesystem

2020-03-17 Thread Lu Niu (Jira)
Lu Niu created FLINK-16640:
--

 Summary: Expose listStatus latency in flink filesystem
 Key: FLINK-16640
 URL: https://issues.apache.org/jira/browse/FLINK-16640
 Project: Flink
  Issue Type: Improvement
  Components: FileSystems
Reporter: Lu Niu


listStatus could potentially takes long time and slowdown flow workflow. Expose 
the metrics will help developer better debug. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16931) Large _metadata file lead to JobManager not responding when restart

2020-04-01 Thread Lu Niu (Jira)
Lu Niu created FLINK-16931:
--

 Summary: Large _metadata file lead to JobManager not responding 
when restart
 Key: FLINK-16931
 URL: https://issues.apache.org/jira/browse/FLINK-16931
 Project: Flink
  Issue Type: Bug
Reporter: Lu Niu


When _metadata file is big, JobManager could never recover from checkpoint. It 
fall into a loop that fetch checkpoint -> JM timeout -> restart). Here is 
related log: 
2020-04-01 17:08:25,689 INFO  
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - 
Recovering checkpoints from ZooKeeper.
2020-04-01 17:08:25,698 INFO  
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Found 
3 checkpoints in ZooKeeper.
2020-04-01 17:08:25,698 INFO  
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Trying 
to fetch 3 checkpoints from storage.
2020-04-01 17:08:25,698 INFO  
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Trying 
to retrieve checkpoint 50.
2020-04-01 17:08:48,589 INFO  
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Trying 
to retrieve checkpoint 51.
2020-04-01 17:09:12,775 INFO  org.apache.flink.yarn.YarnResourceManager 
- The heartbeat of JobManager with id 
02500708baf0bb976891c391afd3d7d5 timed out.
Digging into the code, looks like ExecutionGraph::restart runs in JobMaster 
main thread and finally calls 
ZooKeeperCompletedCheckpointStore::retrieveCompletedCheckpoint which download 
file form DFS. The main thread is basically blocked for a while because of 
this. One possible solution is to making the downloading part async. More 
things might need to consider as the original change tries to make it 
single-threaded. [https://github.com/apache/flink/pull/7568]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21263) Job hangs under backpressure

2021-02-03 Thread Lu Niu (Jira)
Lu Niu created FLINK-21263:
--

 Summary: Job hangs under backpressure
 Key: FLINK-21263
 URL: https://issues.apache.org/jira/browse/FLINK-21263
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.11.0
Reporter: Lu Niu
 Attachments: source_graph.svg, source_js1, source_js2, source_js3

We have a flink job that runs fine for a few days but suddenly hangs and could 
never recover. Once we relanuch the job, the job runs fine. We detected the job 
has backpressure, but in all other cases, backpressure would only lead to 
slower consumption but what is wired here is the job made no progress at all. 
The symptoms looks similar with FLINK-20618

 

About the job:
1. Reads from Kafka and writes to Kafka

2. version 1.11

3. enabled unaligned checkpoint

 

symptoms:
 # All source/sink throughput drop to 0
 # All checkpoint fails immediately after triggering.
 # backpressure shows "high" from source to two downstream operators. 
 # Flamegraph shows all subtask threads are in waiting
 # Source jstack shows the Source thread is BLOCKED, as belows.

{code:java}
Source: impression-reader -> impression-filter -> impression-data-conversion 
(1/60)
Stack Trace is:
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x0003a3e71330> (a 
java.util.concurrent.CompletableFuture$Signaller)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at 
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at 
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:293)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:266)
at 
org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:213)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:294)
at 
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103)
at 
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.getBufferBuilder(ChannelSelectorRecordWriter.java:95)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:135)
at 
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.broadcastEmit(ChannelSelectorRecordWriter.java:80)
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:121)
at 
org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:570)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:638)
at 
org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:570)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:638)
at 
org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndEmitWatermark(StreamSourceContexts.java:315)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.emitWatermark(StreamSourceContexts.java:425)
- locked <0x0006a485dab0> (a java.lang.Object)
at 
org.apache.flink.streaming.connectors.kafka.internals.SourceContextWatermarkOutputAdapter.emitWatermark(SourceContextWatermarkOutputAdapter.java:37)
at 
org.apache.flink.api.common.eventtime.WatermarkOutputMultiplexer.updateCombinedWatermark(WatermarkOutputMultiplexer.java:167)
at 
org.apache.flink.api.common.eventtime.WatermarkOutputMultiplexer.onPeriodicEmit(WatermarkOutputMultiplexer.java:136)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$PeriodicWatermarkEmitter.onProcessingTime(AbstractFetcher.java:574)
- locked <0x0006a485dab0> (a java.lang.Object)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1220)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$16(StreamTask.java:1211)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$590/1066788035.run(Unknown
 Source)
at 
org.apache.flink.streaming.runt

[jira] [Created] (FLINK-22162) Make Max Operator name Length Configurable

2021-04-08 Thread Lu Niu (Jira)
Lu Niu created FLINK-22162:
--

 Summary: Make Max Operator name Length Configurable
 Key: FLINK-22162
 URL: https://issues.apache.org/jira/browse/FLINK-22162
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Configuration
Reporter: Lu Niu


MaxOperatorNameLength is hardcoded to be 80. User might want to tune the 
parameter so that after exposing metrics to external metrics system, user can 
better query the metrics data by name. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22164) Add jobId and JobName variable to JobManager metrics in per-job deployment mode

2021-04-08 Thread Lu Niu (Jira)
Lu Niu created FLINK-22164:
--

 Summary: Add jobId and JobName variable to JobManager metrics in 
per-job deployment mode
 Key: FLINK-22164
 URL: https://issues.apache.org/jira/browse/FLINK-22164
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Metrics
Reporter: Lu Niu


We expose all flink metrics to external system for monitoring and alerting. 
However, JobManager metrics only have one variable , which is not 
enough to target to one job when job is deployed to YARN. If flink job runs in 
per-job mode, which ensure one job per cluster, we can add jobId and JobName to 
JobMananger metrics. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22326) Job contains Iterate Operator always fails on Checkpoint

2021-04-16 Thread Lu Niu (Jira)
Lu Niu created FLINK-22326:
--

 Summary: Job contains Iterate Operator always fails on Checkpoint 
 Key: FLINK-22326
 URL: https://issues.apache.org/jira/browse/FLINK-22326
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing
Affects Versions: 1.11.1
Reporter: Lu Niu
 Attachments: Screen Shot 2021-04-16 at 12.40.34 PM.png, Screen Shot 
2021-04-16 at 12.43.38 PM.png

Job contains Iterate Operator will always fail on checkpoint.

How to reproduce: 

[https://gist.github.com/qqibrow/f297babadb0bb662ee398b9088870785]

this is based on 
[https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java,]
 but a few line difference:
1. Make maxWaitTime large enough when create IterativeStream

2. No output back to Itertive Source

 

Result:

The same code is able to checkpoint in 1.9.1

!image-2021-04-16-12-45-23-624.png!

 

but always fail on checkpoint in 1.11

!image-2021-04-16-12-41-35-002.png!

 

It seems 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19985) job went into zombie state after ZK session timeout

2020-11-04 Thread Lu Niu (Jira)
Lu Niu created FLINK-19985:
--

 Summary: job went into zombie state after ZK session timeout
 Key: FLINK-19985
 URL: https://issues.apache.org/jira/browse/FLINK-19985
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.9.1
Reporter: Lu Niu


Recently we had an issue that one flink job went into zombie state after ZK 
session timeout. What happened seemingly was:
 # ZK session timeout (JobManager HA is enabled) and reconnect, the JobManager 
is no longer the leader now:
 # 
{code:java}
org.apache.flink.util.FlinkException: JobManager is no longer the 
leader.org.apache.flink.util.FlinkException: JobManager is no longer the 
leader. at 
org.apache.flink.runtime.jobmaster.JobManagerRunner.revokeJobMasterLeadership(JobManagerRunner.java:391)
 at 
org.apache.flink.runtime.jobmaster.JobManagerRunner.lambda$revokeLeadership$5(JobManagerRunner.java:377)
 at 
java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:981)
 at 
java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2124) 
at 
org.apache.flink.runtime.jobmaster.JobManagerRunner.revokeLeadership(JobManagerRunner.java:374)
 at 
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService.notLeader(ZooKeeperLeaderElectionService.java:247)
 at 
org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.leader.LeaderLatch$8.apply(LeaderLatch.java:640)
 at 
org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.leader.LeaderLatch$8.apply(LeaderLatch.java:636)
 at 
org.apache.flink.shaded.curator.org.apache.curator.framework.listen.ListenerContainer$1.run(ListenerContainer.java:93)
 at 
org.apache.flink.shaded.curator.org.apache.curator.shaded.com.google.common.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:297)
 at 
org.apache.flink.shaded.curator.org.apache.curator.framework.listen.ListenerContainer.forEach(ListenerContainer.java:85)
 at 
org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.leader.LeaderLatch.setLeadership(LeaderLatch.java:635)
 at 
org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.leader.LeaderLatch.handleStateChange(LeaderLatch.java:623)
 at 
org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.leader.LeaderLatch.access$000(LeaderLatch.java:64)
 at 
org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.leader.LeaderLatch$1.stateChanged(LeaderLatch.java:82)
 at 
org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager$2.apply(ConnectionStateManager.java:259)
 at 
org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager$2.apply(ConnectionStateManager.java:255)
 at 
org.apache.flink.shaded.curator.org.apache.curator.framework.listen.ListenerContainer$1.run(ListenerContainer.java:93)
 at 
org.apache.flink.shaded.curator.org.apache.curator.shaded.com.google.common.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:297)
 at 
org.apache.flink.shaded.curator.org.apache.curator.framework.listen.ListenerContainer.forEach(ListenerContainer.java:85)
 at 
org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager.processEvents(ConnectionStateManager.java:253)
 at 
org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager.access$000(ConnectionStateManager.java:43)
 at 
org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager$1.call(ConnectionStateManager.java:111)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748)
{code}

2. For some reason, the JobManager failed to register itself as the leader, 
leading the whole job into SUSPENDED and then CANCELED mode. exception was: 
 # 
{code:java}
2020-11-04 18:42:43,523 ERROR 
org.apache.flink.runtime.rest.handler.job.JobDetailsHandler   - Unhandled 
exception.
org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token 
mismatch: Ignoring message LocalFencedMessage(826fbfb893e4dbce095f5f1b1d75426b, 
LocalRpcInvocation(requestJob(JobID, Time))) because the fencing token 
826fbfb893e4dbce095f5f1b1d75426b did not match the expected fencing token 
b42ecc60ee45cf65209ab2c2da88473a.
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:81)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123

[jira] [Created] (FLINK-17089) Checkpoint fail because RocksDBException: Error While opening a file for sequentially reading

2020-04-10 Thread Lu Niu (Jira)
Lu Niu created FLINK-17089:
--

 Summary: Checkpoint fail because RocksDBException: Error While 
opening a file for sequentially reading
 Key: FLINK-17089
 URL: https://issues.apache.org/jira/browse/FLINK-17089
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Reporter: Lu Niu


we use incremental rocksdb state backend. Flink job checkpoint throws following 
exception after running for about 20 hours:
{code:java}
Caused by: org.rocksdb.RocksDBException: While opening a file for sequentially 
reading: 
/data/foo/bar/usercache/xxx/appcache/application_1584397637704_9072/flink-io-4e2294f0-7e9b-4102-b079-1089f23c47aa/job_d781983f4967703b0480c7943e8100af_op_KeyedProcessOperator_b9daf26d7397cd4b00184cc833054139__27_60__uuid_dee7e33b-9bce-42f3-909a-f6fa4ab52d8c/db/MANIFEST-06:
 No such file or directory at org.rocksdb.Checkpoint.createCheckpoint(Native 
Method)   at org.rocksdb.Checkpoint.createCheckpoint(Checkpoint.java:51)  
at 
org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy.takeDBNativeCheckpoint(RocksIncrementalSnapshotStrategy.java:249)
 at 
org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy.doSnapshot(RocksIncrementalSnapshotStrategy.java:160)
 at 
org.apache.flink.contrib.streaming.state.snapshot.RocksDBSnapshotStrategyBase.snapshot(RocksDBSnapshotStrategyBase.java:126)
 at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.snapshot(RocksDBKeyedStateBackend.java:439)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:411)
   ... 17 more
{code}
This failure consistent happens until the job restarts.

Some findings:

Jobmanager log shows each time the error came from different subTask:
{code:java}
// grep jobManager log on appcache/application_1584397637704_9622
Caused by: org.rocksdb.RocksDBException: While opening a file for sequentially 
reading: 
/data/nvme3n1/nm-local-dir/usercache/dkapoor/appcache/application_1584397637704_9622/flink-io-c42b6665-0170-4dc9-9933-8abd78812fd5/job_03a4b302f44a8d9f5b31693a80bde30c_op_KeyedProcessOperator_b9daf26d7397cd4b00184cc833054139__5_60__uuid_fa8124e4-1678-4555-a90a-8eec4d974a22/db/MANIFEST-06:
 No such file or directory
Caused by: org.rocksdb.RocksDBException: While opening a file for sequentially 
reading: 
/data/nvme3n1/nm-local-dir/usercache/dkapoor/appcache/application_1584397637704_9622/flink-io-a8dfe34d-909e-4aea-8d20-c89199b20856/job_03a4b302f44a8d9f5b31693a80bde30c_op_KeyedProcessOperator_b9daf26d7397cd4b00184cc833054139__4_60__uuid_12fc9764-418e-4802-800e-3623e385743f/db/MANIFEST-06:
 No such file or directory
Caused by: org.rocksdb.RocksDBException: While opening a file for sequentially 
reading: 
/data/nvme1n1/nm-local-dir/usercache/dkapoor/appcache/application_1584397637704_9622/flink-io-e98c35d7-586a-4edb-9eba-99c6fd823540/job_03a4b302f44a8d9f5b31693a80bde30c_op_KeyedProcessOperator_b9daf26d7397cd4b00184cc833054139__9_60__uuid_f52a3f02-aa12-4285-b594-b94e1b0f8ba7/db/MANIFEST-06:
 No such file or directory
Caused by: org.rocksdb.RocksDBException: While opening a file for sequentially 
reading: 
/data/nvme3n1/nm-local-dir/usercache/dkapoor/appcache/application_1584397637704_9622/flink-io-a2887f93-1c75-48b1-8b67-72acdc69ce1b/job_03a4b302f44a8d9f5b31693a80bde30c_op_KeyedProcessOperator_b9daf26d7397cd4b00184cc833054139__2_60__uuid_6a8267eb-aa04-48a3-b82f-7b5b9f21c8e0/db/MANIFEST-06:
 No such file or directory
Caused by: org.rocksdb.RocksDBException: While opening a file for sequentially 
reading: 
/data/nvme2n1/nm-local-dir/usercache/dkapoor/appcache/application_1584397637704_9622/flink-io-27e797c3-de39-4140-84e8-b94e640154cc/job_03a4b302f44a8d9f5b31693a80bde30c_op_KeyedProcessOperator_b9daf26d7397cd4b00184cc833054139__1_60__uuid_fde8b198-32d8-4e0c-a412-f316a4fe1e3e/db/MANIFEST-06:
 No such file or directory
Caused by: org.rocksdb.RocksDBException: While opening a file for sequentially 
reading: 
/data/nvme1n1/nm-local-dir/usercache/dkapoor/appcache/application_1584397637704_9622/flink-io-e98c35d7-586a-4edb-9eba-99c6fd823540/job_03a4b302f44a8d9f5b31693a80bde30c_op_KeyedProcessOperator_b9daf26d7397cd4b00184cc833054139__9_60__uuid_f52a3f02-aa12-4285-b594-b94e1b0f8ba7/db/MANIFEST-06:
 No such file or directory
Caused by: org.rocksdb.RocksDBException: While opening a file for sequentially 
reading: 
/data/nvme2n1/nm-local-dir/usercache/dkapoor/appcache/application_1584397637704_9622/flink-io-7be6a975-c0cd-4083-a1c3-b47e4c8fbb1b/job_03a4b302f44a8d9f5b31693a80bde30c_op_KeyedProcessOperator_b9daf26d7397cd4b00184cc833054139__13_60__uuid_d779fe65-181f-40d2-b32e-e17a023c128d/db/MANIFEST-06:
 No such file or directory
Caused by: org.rocksdb.RocksDBException: While opening a file for sequentially 
reading: 
/data/nvme1n1/nm-local-dir/usercache

[jira] [Created] (FLINK-17364) Support StreamingFileSink in PrestoS3FileSystem

2020-04-23 Thread Lu Niu (Jira)
Lu Niu created FLINK-17364:
--

 Summary: Support StreamingFileSink in PrestoS3FileSystem
 Key: FLINK-17364
 URL: https://issues.apache.org/jira/browse/FLINK-17364
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / FileSystem
Reporter: Lu Niu


For S3, currently the StreamingFileSink supports only the Hadoop-based 
FileSystem implementation, not the implementation based on Presto At the same 
time, presto is the recommended file system for checkpointing. implementing 
StreamingFileSink in PrestoS3FileSystem helps filling the gap, enables user to 
use PrestoS3FileSystem in all access to S3. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)