AsyncFunction vs Async Sink
Hi, Flink dev and users If I want to async write to an external service, which API shall I use, AsyncFunction or Async Sink? My understanding after checking the code are: 1. Both APIs guarantee at least once write to external service. As both API internally stores in-flight requests in the checkpoint. 2. Async Sink provides a batching request feature. This can be implemented with Map + AsyncFunction. Map function groups requests in batches and pass it to AsyncFunction.The batching implementation can refer to AbstractMapBundleOperator if don’t want to use state. 3. Async Sink supports retry on failed requests. AsyncFunction also supports retry in latest flink version. 4. Async Sink supports rate limiting, AsyncFunction doesn’t. 5. AsyncFunction can be used to implement read-update-write. Async Sink cannot. Best Lu
Re: AsyncFunction vs Async Sink
Thanks, Hong! I understand that if the user case is to simply write sth to an external service, Async Sink is a good option that provides features like batching, state management and rate limiting. I have some follow up questions: 1. Is there any problem if we use Async Function for such a user case? We can simply drop the output and use Unordered mode. 2. For AsyncFunction and Async Sink. does it make sense that both could share the same underlying implementation and the features like batching and rate limiting can benefit both? Best Lu On Wed, Jun 14, 2023 at 2:20 PM Teoh, Hong wrote: > Hi Lu, > > Thanks for your question. See below for my understanding. > > I would recommend using the Async Sink if you are writing to the external > service as the final output of your job graph, and if you don’t have the > ordered requirement that updates to the external system must be done before > updates to some other external system within the same job graph. (More > explained later). > > The abstraction of the Async Sink is a sink, meaning it is a terminal > operator in the job graph. The abstraction is intended to simplify the > writing of a sink - meaning the base implementation will handle batching, > state management and rate limiting. You only need to provide the client and > request structure to be used to interact with the external service. This > makes writing and maintaining the sink easier (if you simply want to write > to a destination with at least once processing). > > The AsyncFunction, as I understand it is more used for data enrichment, > and is not a terminal operator in the job graph. This means the return > value from the external service will continue to be passed on down the > Flink Job graph. This is useful for data enrichment using the external > service, or if we want to ensure the system being called in the > AsyncFunction is updated BEFORE any data is written to the sinks further > down the job graph. > > For example: > > Kinesis Source -> Map -> AsyncFunction (Updates DynamoDB) -> Kinesis Sink > > We can be sure that the updates to DynamoDB for a particular record > happens before the record is written to the Kinesis Sink. > > > Hope the above clarifies your question! > > Regards, > Hong > > > On 14 Jun 2023, at 19:27, Lu Niu wrote: > > *CAUTION*: This email originated from outside of the organization. Do not > click links or open attachments unless you can confirm the sender and know > the content is safe. > > Hi, Flink dev and users > > If I want to async write to an external service, which API shall I use, > AsyncFunction or Async Sink? > > My understanding after checking the code are: > >1. Both APIs guarantee at least once write to external service. As >both API internally stores in-flight requests in the checkpoint. >2. Async Sink provides a batching request feature. This can be >implemented with Map + AsyncFunction. Map function groups requests in >batches and pass it to AsyncFunction.The batching implementation can refer >to AbstractMapBundleOperator if don’t want to use state. >3. Async Sink supports retry on failed requests. AsyncFunction also >supports retry in latest flink version. >4. Async Sink supports rate limiting, AsyncFunction doesn’t. >5. AsyncFunction can be used to implement read-update-write. Async >Sink cannot. > > Best > > Lu > > >
When does backpressure matter
For example, if a flink job reads from kafka do something and writes to kafka. Do we need to take any actions when the job kafka consumer lag is low or 0 but some tasks have constant backpressure? Do we need to increase the parallelism or do some network tuning so that backpressure is constant 0? If so, would that lead to resource overprovision? Or is it that only when kafka lag keeps increasing while backpressure is happening at the same time, we need to take action? Best Lu
status of FLINK-32476
Hi, Flink dev What's the current status of FLINK-32476? https://issues.apache.org/jira/browse/FLINK-32476 . I see this feature is deprioritized. We are interested in this feature and willing to work with the community on this if no one is actively working on it. Best Lu
Re: [DISCUSS] FLIP-329: Add operator attribute to specify support for object-reuse
Hi, Is this still under active development? I notice https://issues.apache.org/jira/browse/FLINK-32476 is labeled as deprioritized. If this is the case, would it be acceptable for us to take on the task? Best Lu On Thu, Oct 19, 2023 at 4:26 PM Ken Krugler wrote: > Hi Dong, > > Sorry for not seeing this initially. I did have one question about the > description of the issue in the FLIP: > > > However, in cases where the upstream and downstream operators do not > store or access references to the input or output records, this deep-copy > overhead becomes unnecessary > > I was interested in getting clarification as to what you meant by “or > access references…”, to see if it covered this situation: > > StreamX —forward--> operator1 > StreamX —forward--> operator2 > > If operator1 modifies the record, and object re-use is enabled, then > operator2 will see the modified version, right? > > Thanks, > > — Ken > > > On Jul 2, 2023, at 7:24 PM, Xuannan Su wrote: > > > > Hi all, > > > > Dong(cc'ed) and I are opening this thread to discuss our proposal to > > add operator attribute to allow operator to specify support for > > object-reuse [1]. > > > > Currently, the default configuration for pipeline.object-reuse is set > > to false to avoid data corruption, which can result in suboptimal > > performance. We propose adding APIs that operators can utilize to > > inform the Flink runtime whether it is safe to reuse the emitted > > records. This enhancement would enable Flink to maximize its > > performance using the default configuration. > > > > Please refer to the FLIP document for more details about the proposed > > design and implementation. We welcome any feedback and opinions on > > this proposal. > > > > Best regards, > > > > Dong and Xuannan > > > > [1] > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255073749 > > -- > Ken Krugler > http://www.scaleunlimited.com > Custom big data solutions > Flink & Pinot > > > >
Re: [DISCUSS] FLIP-329: Add operator attribute to specify support for object-reuse
Thank you Dong and Xuannan! Yes. We can take on this task. Any help during bootstrapping would be greatly appreciated! I realize there is already a voting thread "[VOTE] FLIP-329: Add operator attribute to specify support for object-reuse". What else do we need? Best Lu On Fri, Jan 5, 2024 at 12:46 AM Xuannan Su wrote: > Hi Lu, > > I believe this feature is very useful. However, I currently lack the > capacity to work on it in the near future. I think it would be great > if you could take on the task. I am willing to offer assistance if > there are any questions about the FLIP, or to review the PR if needed. > > Please let me know if you are interested in taking over this task. And > also think that we should start the voting thread if no future > comments on this FLIP. > > Best, > Xuannan > > > > On Fri, Jan 5, 2024 at 2:23 PM Dong Lin wrote: > > > > Hi Lu, > > > > I am not actively working on Flink and this JIRA recently. If Xuannan > does not plan to work on this anytime soon, I personally think it will be > great if you can help work on this FLIP. Maybe we can start the voting > thread if there is no further comment on this FLIP. > > > > Xuannan, what do you think? > > > > Thanks, > > Dong > > > > > > On Fri, Jan 5, 2024 at 2:03 AM Lu Niu wrote: > >> > >> Hi, > >> > >> Is this still under active development? I notice > https://issues.apache.org/jira/browse/FLINK-32476 is labeled as > deprioritized. If this is the case, would it be acceptable for us to take > on the task? > >> > >> Best > >> Lu > >> > >> > >> > >> On Thu, Oct 19, 2023 at 4:26 PM Ken Krugler < > kkrugler_li...@transpac.com> wrote: > >>> > >>> Hi Dong, > >>> > >>> Sorry for not seeing this initially. I did have one question about the > description of the issue in the FLIP: > >>> > >>> > However, in cases where the upstream and downstream operators do not > store or access references to the input or output records, this deep-copy > overhead becomes unnecessary > >>> > >>> I was interested in getting clarification as to what you meant by “or > access references…”, to see if it covered this situation: > >>> > >>> StreamX —forward--> operator1 > >>> StreamX —forward--> operator2 > >>> > >>> If operator1 modifies the record, and object re-use is enabled, then > operator2 will see the modified version, right? > >>> > >>> Thanks, > >>> > >>> — Ken > >>> > >>> > On Jul 2, 2023, at 7:24 PM, Xuannan Su > wrote: > >>> > > >>> > Hi all, > >>> > > >>> > Dong(cc'ed) and I are opening this thread to discuss our proposal to > >>> > add operator attribute to allow operator to specify support for > >>> > object-reuse [1]. > >>> > > >>> > Currently, the default configuration for pipeline.object-reuse is set > >>> > to false to avoid data corruption, which can result in suboptimal > >>> > performance. We propose adding APIs that operators can utilize to > >>> > inform the Flink runtime whether it is safe to reuse the emitted > >>> > records. This enhancement would enable Flink to maximize its > >>> > performance using the default configuration. > >>> > > >>> > Please refer to the FLIP document for more details about the proposed > >>> > design and implementation. We welcome any feedback and opinions on > >>> > this proposal. > >>> > > >>> > Best regards, > >>> > > >>> > Dong and Xuannan > >>> > > >>> > [1] > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255073749 > >>> > >>> -- > >>> Ken Krugler > >>> http://www.scaleunlimited.com > >>> Custom big data solutions > >>> Flink & Pinot > >>> > >>> > >>> >
Re: [DISCUSS] FLIP-329: Add operator attribute to specify support for object-reuse
sounds good. Is the requirement to send an email thread about the voting? What else is needed? What's the passing criteria? Best Lu On Sun, Jan 7, 2024 at 5:41 PM Xuannan Su wrote: > Hi Liu, > > The voting thread has been open for a long time. We may want to start > a new voting thread. WDYT? > > Best, > Xuannan > > On Sat, Jan 6, 2024 at 1:51 AM Lu Niu wrote: > > > > Thank you Dong and Xuannan! > > > > Yes. We can take on this task. Any help during bootstrapping would be > greatly appreciated! I realize there is already a voting thread "[VOTE] > FLIP-329: Add operator attribute to specify support for object-reuse". What > else do we need? > > > > Best > > Lu > > > > On Fri, Jan 5, 2024 at 12:46 AM Xuannan Su > wrote: > >> > >> Hi Lu, > >> > >> I believe this feature is very useful. However, I currently lack the > >> capacity to work on it in the near future. I think it would be great > >> if you could take on the task. I am willing to offer assistance if > >> there are any questions about the FLIP, or to review the PR if needed. > >> > >> Please let me know if you are interested in taking over this task. And > >> also think that we should start the voting thread if no future > >> comments on this FLIP. > >> > >> Best, > >> Xuannan > >> > >> > >> > >> On Fri, Jan 5, 2024 at 2:23 PM Dong Lin wrote: > >> > > >> > Hi Lu, > >> > > >> > I am not actively working on Flink and this JIRA recently. If Xuannan > does not plan to work on this anytime soon, I personally think it will be > great if you can help work on this FLIP. Maybe we can start the voting > thread if there is no further comment on this FLIP. > >> > > >> > Xuannan, what do you think? > >> > > >> > Thanks, > >> > Dong > >> > > >> > > >> > On Fri, Jan 5, 2024 at 2:03 AM Lu Niu wrote: > >> >> > >> >> Hi, > >> >> > >> >> Is this still under active development? I notice > https://issues.apache.org/jira/browse/FLINK-32476 is labeled as > deprioritized. If this is the case, would it be acceptable for us to take > on the task? > >> >> > >> >> Best > >> >> Lu > >> >> > >> >> > >> >> > >> >> On Thu, Oct 19, 2023 at 4:26 PM Ken Krugler < > kkrugler_li...@transpac.com> wrote: > >> >>> > >> >>> Hi Dong, > >> >>> > >> >>> Sorry for not seeing this initially. I did have one question about > the description of the issue in the FLIP: > >> >>> > >> >>> > However, in cases where the upstream and downstream operators do > not store or access references to the input or output records, this > deep-copy overhead becomes unnecessary > >> >>> > >> >>> I was interested in getting clarification as to what you meant by > “or access references…”, to see if it covered this situation: > >> >>> > >> >>> StreamX —forward--> operator1 > >> >>> StreamX —forward--> operator2 > >> >>> > >> >>> If operator1 modifies the record, and object re-use is enabled, > then operator2 will see the modified version, right? > >> >>> > >> >>> Thanks, > >> >>> > >> >>> — Ken > >> >>> > >> >>> > On Jul 2, 2023, at 7:24 PM, Xuannan Su > wrote: > >> >>> > > >> >>> > Hi all, > >> >>> > > >> >>> > Dong(cc'ed) and I are opening this thread to discuss our proposal > to > >> >>> > add operator attribute to allow operator to specify support for > >> >>> > object-reuse [1]. > >> >>> > > >> >>> > Currently, the default configuration for pipeline.object-reuse is > set > >> >>> > to false to avoid data corruption, which can result in suboptimal > >> >>> > performance. We propose adding APIs that operators can utilize to > >> >>> > inform the Flink runtime whether it is safe to reuse the emitted > >> >>> > records. This enhancement would enable Flink to maximize its > >> >>> > performance using the default configuration. > >> >>> > > >> >>> > Please refer to the FLIP document for more details about the > proposed > >> >>> > design and implementation. We welcome any feedback and opinions on > >> >>> > this proposal. > >> >>> > > >> >>> > Best regards, > >> >>> > > >> >>> > Dong and Xuannan > >> >>> > > >> >>> > [1] > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255073749 > >> >>> > >> >>> -- > >> >>> Ken Krugler > >> >>> http://www.scaleunlimited.com > >> >>> Custom big data solutions > >> >>> Flink & Pinot > >> >>> > >> >>> > >> >>> >
About JobMananger metrics scope
Hi, Flink Dev First of all, Happy New Year! I have a question about JM monitoring. According to https://ci.apache.org/projects/flink/flink-docs-stable/ops/metrics.html, metrics.scope.jm only have one variable, which seems to be not enough for YARN deployment mode: 1. The metric doesn't contain identifiers like application id or job id. 2. Since multiple flink jobs can run in a single yarn cluster, it is possible that multiple JM can run in a single host, but inside different containers. Under current scope, metrics belonging to two flink jobs will share identical metric name. e,g, .jobmanager. Status.JVM.Memory.Heap.Committed Is there any walkaround or fix in roadmap? Thanks! Best Lu
About metric name truncation
Hi, Flink dev https://issues.apache.org/jira/browse/FLINK-6898 truncates metric name to less than 80. We plan to relax this in our environment. Want to ask here whether it will cause any side effects? Thank you! Best Lu
Zigzag shape in TM JVM used memory
Hi, Flink dev We observed that the TM JVM used memory metric shows zigzag shape among lots of our applications, although these applications are quite different in business logic. The upper bound is close to the max heap size. Is this expected in flink application? Or does flink internally aggressively pre-allocate memory? app1 [image: Screen Shot 2021-04-04 at 8.46.45 PM.png] app2 [image: Screen Shot 2021-04-04 at 8.45.35 PM.png] app3 [image: Screen Shot 2021-04-04 at 8.43.53 PM.png] Best Lu
Re: Zigzag shape in TM JVM used memory
Hi, we need to update our email system then :) . Here are the links: https://drive.google.com/file/d/1lZ5_P8_NqsN1JeLzutGj4DxkyWJN75mR/view?usp=sharing https://drive.google.com/file/d/1J6c6rQJwtDp1moAGlvQyLQXTqcuG4HjL/view?usp=sharing https://drive.google.com/file/d/1-R2KzsABC471AEjkF5qTm5O3V47cpbBV/view?usp=sharing All are DataStream job. Best Lu On Sun, Apr 4, 2021 at 9:17 PM Yun Gao wrote: > > Hi Lu, > > The image seems not be able to shown due to the mail server limitation, > could you upload it somewhere and paste the link here ? > > Logically, I think zigzag usually due to there are some small object get > created and eliminated soon in the heap. Are you running a SQL job or a > DataStream job ? > > Best, > Yun > > -- > Sender:Lu Niu > Date:2021/04/05 12:06:24 > Recipient:dev@flink.apache.org > Theme:Zigzag shape in TM JVM used memory > > Hi, Flink dev > > We observed that the TM JVM used memory metric shows zigzag shape among > lots of our applications, although these applications are quite different > in business logic. The upper bound is close to the max heap size. Is this > expected in flink application? Or does flink internally > aggressively pre-allocate memory? > > app1 > [image: Screen Shot 2021-04-04 at 8.46.45 PM.png] > app2 > [image: Screen Shot 2021-04-04 at 8.45.35 PM.png] > app3 > [image: Screen Shot 2021-04-04 at 8.43.53 PM.png] > > Best > Lu > >
Automatic backpressure detection
Hi, Flink dev Lately, we want to develop some tools to: 1. show backpressure operator without manual operation 2. Provide suggestions to mitigate back pressure after checking data skew, external service RPC etc. 3. Show back pressure history Could anyone share their experience with such tooling? Also, I notice backpressure monitoring and detection is mentioned across multiple places. Could someone help to explain how these connect to each other? Maybe some of them are outdated? Thanks! 1. The official doc introduces monitoring back pressure through web UI. https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/monitoring/back_pressure.html 2. In https://flink.apache.org/2019/07/23/flink-network-stack-2.html, it says outPoolUsage, inPoolUsage metrics can be used to determine back pressure. 3. Latest flink version introduces metrics called “isBackPressured" But I didn't find related documentation on usage. Best Lu
Re: Automatic backpressure detection
Hi, Piotr Thanks for replying! We don't have a plan to upgrade to 1.13 in short term. We are using flink 1.11 and I notice there is a metric called isBackpressured. Is that enough to solve 1? If not, would backporting patches regarding backPressuredTimeMsPerSecond, busyTimeMsPerSecond and idleTimeMsPerSecond work? And do you have an estimate of how difficult it is? Best Lu On Tue, Apr 6, 2021 at 12:18 AM Piotr Nowojski wrote: > Hi, > > Lately we overhauled the backpressure detection [1] and a screenshot > preview of those efforts is attached here [2]. I encourage you to check the > 1.13 RC0 build and how the current mechanism works for you [3]. To support > those WebUI changes we have added a couple of new metrics: > backPressuredTimeMsPerSecond, busyTimeMsPerSecond and idleTimeMsPerSecond. > > 1. I believe that solves 1. > 2. This still requires a bit of manual investigation. Once you locate > backpressuring task, you can check the detail subtask stats to check if all > parallel instances are uniformly backpressured/busy or not. If you would > like to add a hint "it looks like you have a data skew in Task XYZ ", that > I believe could be added to the WebUI. > 3. The tricky part is how to display this kind of information. Currently I > would recommend just export/report > backPressuredTimeMsPerSecond, busyTimeMsPerSecond and idleTimeMsPerSecond > metrics for every task to an external system and display them for example > in Graphana. > > The blog post you are referencing is quite outdated, especially with those > new changes from 1.13. I'm hoping to write a new one pretty soon. > > Piotrek > > [1] https://issues.apache.org/jira/browse/FLINK-14712 > [2] > > https://issues.apache.org/jira/browse/FLINK-14814?focusedCommentId=17256926&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17256926 > [3] > > http://mail-archives.apache.org/mod_mbox/flink-user/202104.mbox/%3c1d2412ce-d4d0-ed50-6181-1b610e16d...@apache.org%3E > > pon., 5 kwi 2021 o 23:20 Lu Niu napisał(a): > > > Hi, Flink dev > > > > Lately, we want to develop some tools to: > > 1. show backpressure operator without manual operation > > 2. Provide suggestions to mitigate back pressure after checking data > skew, > > external service RPC etc. > > 3. Show back pressure history > > > > Could anyone share their experience with such tooling? > > Also, I notice backpressure monitoring and detection is mentioned across > > multiple places. Could someone help to explain how these connect to each > > other? Maybe some of them are outdated? Thanks! > > > > 1. The official doc introduces monitoring back pressure through web UI. > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/monitoring/back_pressure.html > > 2. In https://flink.apache.org/2019/07/23/flink-network-stack-2.html, it > > says outPoolUsage, inPoolUsage metrics can be used to determine back > > pressure. > > 3. Latest flink version introduces metrics called “isBackPressured" But I > > didn't find related documentation on usage. > > > > Best > > Lu > > >
Re: Zigzag shape in TM JVM used memory
Hi, Piotr Thanks for replying. I asked this because such a pattern might imply memory oversubscription. For example, I tuned down the memory of one app from heap 2.63GB to 367MB and the job still runs fine: before: https://drive.google.com/file/d/1o8k9Vv3yb5gXITi4GvmlXMteQcRfmOhr/view?usp=sharing after: https://drive.google.com/file/d/1wNTHBT8aSJaAmL1rVY8jUkdp-G5znnMN/view?usp=sharing What's the best practice for tuning Flink job memory? 1. What’s a good start point users should try first? 2. How to make progress? e.g. flink application Foo currently encountered error OOM: java heap space. Where to move next? simply bump up taskmananger.memory? or just increase heap? 3. What’s the final state? Job running fine and ensuring XYZ headroom in each memory component? Best Lu On Tue, Apr 6, 2021 at 12:26 AM Piotr Nowojski wrote: > Hi, > > this should be posted on the user mailing list not the dev. > > Apart from that, this looks like normal/standard behaviour of JVM, and has > very little to do with Flink. Garbage Collector (GC) is kicking in when > memory usage is approaching some threshold: > https://www.google.com/search?q=jvm+heap+memory+usage&tbm=isch > > Piotrek > > > pon., 5 kwi 2021 o 22:54 Lu Niu napisał(a): > > > Hi, > > > > we need to update our email system then :) . Here are the links: > > > > > > > https://drive.google.com/file/d/1lZ5_P8_NqsN1JeLzutGj4DxkyWJN75mR/view?usp=sharing > > > > > > > https://drive.google.com/file/d/1J6c6rQJwtDp1moAGlvQyLQXTqcuG4HjL/view?usp=sharing > > > > > > > https://drive.google.com/file/d/1-R2KzsABC471AEjkF5qTm5O3V47cpbBV/view?usp=sharing > > > > All are DataStream job. > > > > Best > > Lu > > > > On Sun, Apr 4, 2021 at 9:17 PM Yun Gao wrote: > > > > > > > > Hi Lu, > > > > > > The image seems not be able to shown due to the mail server limitation, > > > could you upload it somewhere and paste the link here ? > > > > > > Logically, I think zigzag usually due to there are some small object > get > > > created and eliminated soon in the heap. Are you running a SQL job or a > > > DataStream job ? > > > > > > Best, > > > Yun > > > > > > -- > > > Sender:Lu Niu > > > Date:2021/04/05 12:06:24 > > > Recipient:dev@flink.apache.org > > > Theme:Zigzag shape in TM JVM used memory > > > > > > Hi, Flink dev > > > > > > We observed that the TM JVM used memory metric shows zigzag shape among > > > lots of our applications, although these applications are quite > different > > > in business logic. The upper bound is close to the max heap size. Is > this > > > expected in flink application? Or does flink internally > > > aggressively pre-allocate memory? > > > > > > app1 > > > [image: Screen Shot 2021-04-04 at 8.46.45 PM.png] > > > app2 > > > [image: Screen Shot 2021-04-04 at 8.45.35 PM.png] > > > app3 > > > [image: Screen Shot 2021-04-04 at 8.43.53 PM.png] > > > > > > Best > > > Lu > > > > > > > > >
Re: Automatic backpressure detection
Hi, Piotr Thanks for your detailed reply! It is mentioned here we cannot observe backpressure generated from AsyncOperator in Flink UI in 1.9.1. Is it fixed in the latest version? Thank you! http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Async-Function-Not-Generating-Backpressure-td26766.html Best Lu On Tue, Apr 6, 2021 at 11:14 PM Piotr Nowojski wrote: > Hi, > > Yes, you can use `isBackPressured` to monitor a task's back-pressure. > However keep in mind: > a) You are going to miss some nice way to visualize this information, which > is present in 1.13's WebUI. > b) `isBackPressured` is a sampling based metric. If your job has varying > load, for example all windows firing at the same processing time, every > couple of seconds, causing intermittent back-pressure, this metric will > show it randomly as `true` or `false`. > c) `isBackPressured` is slightly less accurate compared to > `backPressuredTimeMsPerSecond`. There are some corner cases when for a > brief amount of time it can return `true`, while a task is still running, > while the time based metrics work in a different much more accurate way. > > About back porting the patches, if you want to create a custom Flink build > it should be do-able. There will be some conflicts for sure, so you will > need to understand Flink's code. > > Best, > Piotrek > > śr., 7 kwi 2021 o 02:32 Lu Niu napisał(a): > > > Hi, Piotr > > > > Thanks for replying! > > > > We don't have a plan to upgrade to 1.13 in short term. We are using flink > > 1.11 and I notice there is a metric called isBackpressured. Is that > enough > > to solve 1? If not, would backporting patches regarding > > backPressuredTimeMsPerSecond, busyTimeMsPerSecond and idleTimeMsPerSecond > > work? And do you have an estimate of how difficult it is? > > > > > > Best > > Lu > > > > > > > > On Tue, Apr 6, 2021 at 12:18 AM Piotr Nowojski > > wrote: > > > > > Hi, > > > > > > Lately we overhauled the backpressure detection [1] and a screenshot > > > preview of those efforts is attached here [2]. I encourage you to check > > the > > > 1.13 RC0 build and how the current mechanism works for you [3]. To > > support > > > those WebUI changes we have added a couple of new metrics: > > > backPressuredTimeMsPerSecond, busyTimeMsPerSecond and > > idleTimeMsPerSecond. > > > > > > 1. I believe that solves 1. > > > 2. This still requires a bit of manual investigation. Once you locate > > > backpressuring task, you can check the detail subtask stats to check if > > all > > > parallel instances are uniformly backpressured/busy or not. If you > would > > > like to add a hint "it looks like you have a data skew in Task XYZ ", > > that > > > I believe could be added to the WebUI. > > > 3. The tricky part is how to display this kind of information. > Currently > > I > > > would recommend just export/report > > > backPressuredTimeMsPerSecond, busyTimeMsPerSecond and > idleTimeMsPerSecond > > > metrics for every task to an external system and display them for > > example > > > in Graphana. > > > > > > The blog post you are referencing is quite outdated, especially with > > those > > > new changes from 1.13. I'm hoping to write a new one pretty soon. > > > > > > Piotrek > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-14712 > > > [2] > > > > > > > > > https://issues.apache.org/jira/browse/FLINK-14814?focusedCommentId=17256926&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17256926 > > > [3] > > > > > > > > > http://mail-archives.apache.org/mod_mbox/flink-user/202104.mbox/%3c1d2412ce-d4d0-ed50-6181-1b610e16d...@apache.org%3E > > > > > > pon., 5 kwi 2021 o 23:20 Lu Niu napisał(a): > > > > > > > Hi, Flink dev > > > > > > > > Lately, we want to develop some tools to: > > > > 1. show backpressure operator without manual operation > > > > 2. Provide suggestions to mitigate back pressure after checking data > > > skew, > > > > external service RPC etc. > > > > 3. Show back pressure history > > > > > > > > Could anyone share their experience with such tooling? > > > > Also, I notice backpressure monitoring and detection is mentioned > > across > > > > multiple places. Could someone help to explain how these connect to > > each > > > > other? Maybe some of them are outdated? Thanks! > > > > > > > > 1. The official doc introduces monitoring back pressure through web > UI. > > > > > > > > > > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/monitoring/back_pressure.html > > > > 2. In https://flink.apache.org/2019/07/23/flink-network-stack-2.html > , > > it > > > > says outPoolUsage, inPoolUsage metrics can be used to determine back > > > > pressure. > > > > 3. Latest flink version introduces metrics called “isBackPressured" > > But I > > > > didn't find related documentation on usage. > > > > > > > > Best > > > > Lu > > > > > > > > > >
Re: Automatic backpressure detection
Cool. Thanks! Best Lu On Mon, Apr 12, 2021 at 11:27 PM Piotr Nowojski wrote: > Hi, > > Yes. Back-pressure from AsyncOperator should be correctly reported via > isBackPressured, backPressuredMsPerSecond metrics and by extension in the > WebUI from 1.13. > > Piotre > > pon., 12 kwi 2021 o 23:17 Lu Niu napisał(a): > > > Hi, Piotr > > > > Thanks for your detailed reply! It is mentioned here we cannot observe > > backpressure generated from AsyncOperator in Flink UI in 1.9.1. Is it > > fixed in the latest version? Thank you! > > > > > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Async-Function-Not-Generating-Backpressure-td26766.html > > > > Best > > Lu > > > > On Tue, Apr 6, 2021 at 11:14 PM Piotr Nowojski > > wrote: > > > > > Hi, > > > > > > Yes, you can use `isBackPressured` to monitor a task's back-pressure. > > > However keep in mind: > > > a) You are going to miss some nice way to visualize this information, > > which > > > is present in 1.13's WebUI. > > > b) `isBackPressured` is a sampling based metric. If your job has > varying > > > load, for example all windows firing at the same processing time, every > > > couple of seconds, causing intermittent back-pressure, this metric will > > > show it randomly as `true` or `false`. > > > c) `isBackPressured` is slightly less accurate compared to > > > `backPressuredTimeMsPerSecond`. There are some corner cases when for a > > > brief amount of time it can return `true`, while a task is still > running, > > > while the time based metrics work in a different much more accurate > way. > > > > > > About back porting the patches, if you want to create a custom Flink > > build > > > it should be do-able. There will be some conflicts for sure, so you > will > > > need to understand Flink's code. > > > > > > Best, > > > Piotrek > > > > > > śr., 7 kwi 2021 o 02:32 Lu Niu napisał(a): > > > > > > > Hi, Piotr > > > > > > > > Thanks for replying! > > > > > > > > We don't have a plan to upgrade to 1.13 in short term. We are using > > flink > > > > 1.11 and I notice there is a metric called isBackpressured. Is that > > > enough > > > > to solve 1? If not, would backporting patches regarding > > > > backPressuredTimeMsPerSecond, busyTimeMsPerSecond and > > idleTimeMsPerSecond > > > > work? And do you have an estimate of how difficult it is? > > > > > > > > > > > > Best > > > > Lu > > > > > > > > > > > > > > > > On Tue, Apr 6, 2021 at 12:18 AM Piotr Nowojski > > > > > wrote: > > > > > > > > > Hi, > > > > > > > > > > Lately we overhauled the backpressure detection [1] and a > screenshot > > > > > preview of those efforts is attached here [2]. I encourage you to > > check > > > > the > > > > > 1.13 RC0 build and how the current mechanism works for you [3]. To > > > > support > > > > > those WebUI changes we have added a couple of new metrics: > > > > > backPressuredTimeMsPerSecond, busyTimeMsPerSecond and > > > > idleTimeMsPerSecond. > > > > > > > > > > 1. I believe that solves 1. > > > > > 2. This still requires a bit of manual investigation. Once you > locate > > > > > backpressuring task, you can check the detail subtask stats to > check > > if > > > > all > > > > > parallel instances are uniformly backpressured/busy or not. If you > > > would > > > > > like to add a hint "it looks like you have a data skew in Task XYZ > ", > > > > that > > > > > I believe could be added to the WebUI. > > > > > 3. The tricky part is how to display this kind of information. > > > Currently > > > > I > > > > > would recommend just export/report > > > > > backPressuredTimeMsPerSecond, busyTimeMsPerSecond and > > > idleTimeMsPerSecond > > > > > metrics for every task to an external system and display them for > > > > example > > > > > in Graphana. > > > > > > > > > > The blog post you are referencing is quite outdated, especially > with > > > > those > > > > > n
Add jobId and JobName variable to JobManager metrics in per-job deployment mode
Hi, Flink dev Could you share your thoughts about https://issues.apache.org/jira/browse/FLINK-22164 ? context: We expose all flink metrics to an external system for monitoring and alerting. However, JobManager metrics only have one variable , which is not enough to target to one job when job is deployed to YARN. If flink job runs in per-job mode, which ensure one job per cluster, we can add jobId and JobName to JobMananger metrics. Best Lu
Flink TM Heartbeat Timeout
Hi, Flink User Several of our applications get heartbeat timeout occasionally. there is no GC, no OOM: ``` - realtime conversion event filter (49/120) (16e3d3e7608eed0a30e0508eb52065fd) switched from RUNNING to FAILED on container_e05_1599158866703_129001_01_000111 @ xenon-pii-prod-001-20191210-data-slave-prod-0a01bbdd.ec2.pin220.com (dataPort=39013). java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id container_e05_1599158866703_129001_01_000111 timed out. at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1193) at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:109) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ``` Shall we try increase `heartbeat.timeout` ? Any side effects? E.g, would that lead to slower detection when TM container is killed by YARN? we use flink 1.11 Best Lu Pinterest, Inc.
flink 1.9 Restore from a checkpoint taken in 1.11
Hi, Flink dev Is it supported that a flink job in version 1.9 could restore from a checkpoint taken from the same job using 1.11? The context is we are migrating to version 1.11 and we need a backup plan for emergency fallback. We did a test and it throws error: ``` Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., (JobManagerRunner.java:152) at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:83) at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:387) at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34) ... 7 more Caused by: java.lang.IllegalArgumentException: Cannot restore savepoint version 3. at org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializers.getSerializer(SavepointSerializers.java:80) at org.apache.flink.runtime.checkpoint.Checkpoints.loadCheckpointMetadata(Checkpoints.java:106) at org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCheckpoint(Checkpoints.java:143) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1099) at org.apache.flink.runtime.scheduler.LegacyScheduler.tryRestoreExecutionGraphFromSavepoint(LegacyScheduler.java:237) at org.apache.flink.runtime.scheduler.LegacyScheduler.createAndRestoreExecutionGraph(LegacyScheduler.java:196) at org.apache.flink.runtime.scheduler.LegacyScheduler.(LegacyScheduler.java:176) at org.apache.flink.runtime.scheduler.LegacySchedulerFactory.createInstance(LegacySchedulerFactory.java:70) at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:278) at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:266) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40) at org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:146) ... 10 more End of exception on server side>] ``` So it seems not. I just want to confirm that with the community. Best Lu
Re: Job Recovery Time on TM Lost
Thanks Gen! cc flink-dev to collect more inputs. Best Lu On Wed, Jun 30, 2021 at 12:55 AM Gen Luo wrote: > I'm also wondering here. > > In my opinion, it's because the JM can not confirm whether the TM is lost > or it's a temporary network trouble and will recover soon, since I can see > in the log that akka has got a Connection refused but JM still sends a > heartbeat request to the lost TM until it reaches heartbeat timeout. But > I'm not sure if it's indeed designed like this. > > I would really appreciate it if anyone who knows more details could > answer. Thanks. >
Re: Job Recovery Time on TM Lost
Thanks TIll and Yang for help! Also Thanks Till for a quick fix! I did another test yesterday. In this test, I intentionally throw exception from the source operator: ``` if (runtimeContext.getIndexOfThisSubtask() == 1 && errorFrenquecyInMin > 0 && System.currentTimeMillis() - lastStartTime >= errorFrenquecyInMin * 60 * 1000) { lastStartTime = System.currentTimeMillis(); throw new RuntimeException( "Trigger expected exception at: " + lastStartTime); } ``` In this case, I found phase 1 still takes about 30s and Phase 2 dropped to 1s (because no need for container allocation). Some logs: ``` ``` On Thu, Jul 1, 2021 at 6:28 AM Till Rohrmann wrote: > A quick addition, I think with FLINK-23202 it should now also be possible > to improve the heartbeat mechanism in the general case. We can leverage the > unreachability exception thrown if a remote target is no longer reachable > to mark an heartbeat target as no longer reachable [1]. This can then be > considered as if the heartbeat timeout has been triggered. That way we > should detect lost TaskExecutors as fast as our heartbeat interval is. > > [1] https://issues.apache.org/jira/browse/FLINK-23209 > > Cheers, > Till > > On Thu, Jul 1, 2021 at 1:46 PM Yang Wang wrote: > >> Since you are deploying Flink workloads on Yarn, the Flink >> ResourceManager should get the container >> completion event after the heartbeat of Yarn NM->Yarn RM->Flink RM, which >> is 8 seconds by default. >> And Flink ResourceManager will release the dead TaskManager container >> once received the completion event. >> As a result, Flink will not deploy tasks onto the dead TaskManagers. >> >> >> I think most of the time cost in Phase 1 might be cancelling the tasks on >> the dead TaskManagers. >> >> >> Best, >> Yang >> >> >> Till Rohrmann 于2021年7月1日周四 下午4:49写道: >> >>> The analysis of Gen is correct. Flink currently uses its heartbeat as >>> the primary means to detect dead TaskManagers. This means that Flink will >>> take at least `heartbeat.timeout` time before the system recovers. Even if >>> the cancellation happens fast (e.g. by having configured a low >>> akka.ask.timeout), then Flink will still try to deploy tasks onto the dead >>> TaskManager until it is marked as dead and its slots are released (unless >>> the ResourceManager does not get a signal from the underlying resource >>> management system that a container/pod has died). One way to improve the >>> situation is to introduce logic which can react to a ConnectionException >>> and then black lists or releases a TaskManager, for example. This is >>> currently not implemented in Flink, though. >>> >>> Concerning the cancellation operation: Flink currently does not listen >>> to the dead letters of Akka. This means that the `akka.ask.timeout` is the >>> primary means to fail the future result of a rpc which could not be sent. >>> This is also an improvement we should add to Flink's RpcService. I've >>> created a JIRA issue for this problem [1]. >>> >>> [1] https://issues.apache.org/jira/browse/FLINK-23202 >>> >>> Cheers, >>> Till >>> >>> On Wed, Jun 30, 2021 at 6:33 PM Lu Niu wrote: >>> >>>> Thanks Gen! cc flink-dev to collect more inputs. >>>> >>>> Best >>>> Lu >>>> >>>> On Wed, Jun 30, 2021 at 12:55 AM Gen Luo wrote: >>>> >>>>> I'm also wondering here. >>>>> >>>>> In my opinion, it's because the JM can not confirm whether the TM is >>>>> lost or it's a temporary network trouble and will recover soon, since I >>>>> can >>>>> see in the log that akka has got a Connection refused but JM still sends a >>>>> heartbeat request to the lost TM until it reaches heartbeat timeout. But >>>>> I'm not sure if it's indeed designed like this. >>>>> >>>>> I would really appreciate it if anyone who knows more details could >>>>> answer. Thanks. >>>>> >>>>
Re: Job Recovery Time on TM Lost
Thanks TIll and Yang for help! Also Thanks Till for a quick fix! I did another test yesterday. In this test, I intentionally throw exception from the source operator: ``` if (runtimeContext.getIndexOfThisSubtask() == 1 && errorFrenquecyInMin > 0 && System.currentTimeMillis() - lastStartTime >= errorFrenquecyInMin * 60 * 1000) { lastStartTime = System.currentTimeMillis(); throw new RuntimeException( "Trigger expected exception at: " + lastStartTime); } ``` In this case, I found phase 1 still takes about 30s and Phase 2 dropped to 1s (because no need for container allocation). Why phase 1 still takes 30s even though no TM is lost? Related logs: ``` 2021-06-30 00:55:07,463 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: USER_MATERIALIZED_EVENT_SIGNAL-user_context-frontend_event_source -> ... java.lang.RuntimeException: Trigger expected exception at: 1625014507446 2021-06-30 00:55:07,509 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- Job NRTG_USER_MATERIALIZED_EVENT_SIGNAL_user_context_staging (35c95ee7141334845cfd49b642fa9f98) switched from state RUNNING to RESTARTING. 2021-06-30 00:55:37,596 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- Job NRTG_USER_MATERIALIZED_EVENT_SIGNAL_user_context_staging (35c95ee7141334845cfd49b642fa9f98) switched from state RESTARTING to RUNNING. 2021-06-30 00:55:38,678 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph(time when all tasks switch from CREATED to RUNNING) ``` Best Lu On Thu, Jul 1, 2021 at 12:06 PM Lu Niu wrote: > Thanks TIll and Yang for help! Also Thanks Till for a quick fix! > > I did another test yesterday. In this test, I intentionally throw > exception from the source operator: > ``` > if (runtimeContext.getIndexOfThisSubtask() == 1 > && errorFrenquecyInMin > 0 > && System.currentTimeMillis() - lastStartTime >= > errorFrenquecyInMin * 60 * 1000) { > lastStartTime = System.currentTimeMillis(); > throw new RuntimeException( > "Trigger expected exception at: " + lastStartTime); > } > ``` > In this case, I found phase 1 still takes about 30s and Phase 2 dropped to > 1s (because no need for container allocation). > > Some logs: > ``` > ``` > > > On Thu, Jul 1, 2021 at 6:28 AM Till Rohrmann wrote: > >> A quick addition, I think with FLINK-23202 it should now also be possible >> to improve the heartbeat mechanism in the general case. We can leverage the >> unreachability exception thrown if a remote target is no longer reachable >> to mark an heartbeat target as no longer reachable [1]. This can then be >> considered as if the heartbeat timeout has been triggered. That way we >> should detect lost TaskExecutors as fast as our heartbeat interval is. >> >> [1] https://issues.apache.org/jira/browse/FLINK-23209 >> >> Cheers, >> Till >> >> On Thu, Jul 1, 2021 at 1:46 PM Yang Wang wrote: >> >>> Since you are deploying Flink workloads on Yarn, the Flink >>> ResourceManager should get the container >>> completion event after the heartbeat of Yarn NM->Yarn RM->Flink RM, >>> which is 8 seconds by default. >>> And Flink ResourceManager will release the dead TaskManager container >>> once received the completion event. >>> As a result, Flink will not deploy tasks onto the dead TaskManagers. >>> >>> >>> I think most of the time cost in Phase 1 might be cancelling the tasks >>> on the dead TaskManagers. >>> >>> >>> Best, >>> Yang >>> >>> >>> Till Rohrmann 于2021年7月1日周四 下午4:49写道: >>> >>>> The analysis of Gen is correct. Flink currently uses its heartbeat as >>>> the primary means to detect dead TaskManagers. This means that Flink will >>>> take at least `heartbeat.timeout` time before the system recovers. Even if >>>> the cancellation happens fast (e.g. by having configured a low >>>> akka.ask.timeout), then Flink will still try to deploy tasks onto the dead >>>> TaskManager until it is marked as dead and its slots are released (unless >>>> the ResourceManager does not get a signal from the underlying resource >>>> management system that a container/pod has died). One way to improve the >>>> situation is to introduce logic which can react to a ConnectionException >>>> and then black lists or releases a TaskManager, for example. This is >>>> currently not implemented in Flink, though. >>>> >>>> Concerning the cancellation operation: Flink currently doe
Re: Job Recovery Time on TM Lost
Another side question, Shall we add metric to cover the complete restarting time (phase 1 + phase 2)? Current metric jm.restartingTime only covers phase 1. Thanks! Best Lu On Thu, Jul 1, 2021 at 12:09 PM Lu Niu wrote: > Thanks TIll and Yang for help! Also Thanks Till for a quick fix! > > I did another test yesterday. In this test, I intentionally throw > exception from the source operator: > ``` > if (runtimeContext.getIndexOfThisSubtask() == 1 > && errorFrenquecyInMin > 0 > && System.currentTimeMillis() - lastStartTime >= > errorFrenquecyInMin * 60 * 1000) { > lastStartTime = System.currentTimeMillis(); > throw new RuntimeException( > "Trigger expected exception at: " + lastStartTime); > } > ``` > In this case, I found phase 1 still takes about 30s and Phase 2 dropped to > 1s (because no need for container allocation). Why phase 1 still takes 30s > even though no TM is lost? > > Related logs: > ``` > 2021-06-30 00:55:07,463 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: > USER_MATERIALIZED_EVENT_SIGNAL-user_context-frontend_event_source -> ... > java.lang.RuntimeException: Trigger expected exception at: 1625014507446 > 2021-06-30 00:55:07,509 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph- Job > NRTG_USER_MATERIALIZED_EVENT_SIGNAL_user_context_staging > (35c95ee7141334845cfd49b642fa9f98) switched from state RUNNING to > RESTARTING. > 2021-06-30 00:55:37,596 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph- Job > NRTG_USER_MATERIALIZED_EVENT_SIGNAL_user_context_staging > (35c95ee7141334845cfd49b642fa9f98) switched from state RESTARTING to > RUNNING. > 2021-06-30 00:55:38,678 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph(time when > all tasks switch from CREATED to RUNNING) > ``` > Best > Lu > > > On Thu, Jul 1, 2021 at 12:06 PM Lu Niu wrote: > >> Thanks TIll and Yang for help! Also Thanks Till for a quick fix! >> >> I did another test yesterday. In this test, I intentionally throw >> exception from the source operator: >> ``` >> if (runtimeContext.getIndexOfThisSubtask() == 1 >> && errorFrenquecyInMin > 0 >> && System.currentTimeMillis() - lastStartTime >= >> errorFrenquecyInMin * 60 * 1000) { >> lastStartTime = System.currentTimeMillis(); >> throw new RuntimeException( >> "Trigger expected exception at: " + lastStartTime); >> } >> ``` >> In this case, I found phase 1 still takes about 30s and Phase 2 dropped >> to 1s (because no need for container allocation). >> >> Some logs: >> ``` >> ``` >> >> >> On Thu, Jul 1, 2021 at 6:28 AM Till Rohrmann >> wrote: >> >>> A quick addition, I think with FLINK-23202 it should now also be >>> possible to improve the heartbeat mechanism in the general case. We can >>> leverage the unreachability exception thrown if a remote target is no >>> longer reachable to mark an heartbeat target as no longer reachable [1]. >>> This can then be considered as if the heartbeat timeout has been triggered. >>> That way we should detect lost TaskExecutors as fast as our heartbeat >>> interval is. >>> >>> [1] https://issues.apache.org/jira/browse/FLINK-23209 >>> >>> Cheers, >>> Till >>> >>> On Thu, Jul 1, 2021 at 1:46 PM Yang Wang wrote: >>> >>>> Since you are deploying Flink workloads on Yarn, the Flink >>>> ResourceManager should get the container >>>> completion event after the heartbeat of Yarn NM->Yarn RM->Flink RM, >>>> which is 8 seconds by default. >>>> And Flink ResourceManager will release the dead TaskManager container >>>> once received the completion event. >>>> As a result, Flink will not deploy tasks onto the dead TaskManagers. >>>> >>>> >>>> I think most of the time cost in Phase 1 might be cancelling the tasks >>>> on the dead TaskManagers. >>>> >>>> >>>> Best, >>>> Yang >>>> >>>> >>>> Till Rohrmann 于2021年7月1日周四 下午4:49写道: >>>> >>>>> The analysis of Gen is correct. Flink currently uses its heartbeat as >>>>> the primary means to detect dead TaskManagers. This means that Flink will >>>>> take at least `heartbeat.timeout` time before the system recovers. Even if >>>>> the cancellation happens fast (e.g. by having co
Re: Job Recovery Time on TM Lost
d >> by TM >> >>>> or JM. So maybe it's not that worthy to introduce extra >> configurations for >> >>>> fault tolerance of heartbeat, unless we also introduce some retry >> >>>> strategies for netty connections. >> >>>> >> >>>> >> >>>> On Fri, Jul 2, 2021 at 9:34 PM Till Rohrmann >> >>>> wrote: >> >>>> >> >>>>> Could you share the full logs with us for the second experiment, >> Lu? I >> >>>>> cannot tell from the top of my head why it should take 30s unless >> you have >> >>>>> configured a restart delay of 30s. >> >>>>> >> >>>>> Let's discuss FLINK-23216 on the JIRA ticket, Gen. >> >>>>> >> >>>>> I've now implemented FLINK-23209 [1] but it somehow has the problem >> >>>>> that in a flakey environment you might not want to mark a >> TaskExecutor dead >> >>>>> on the first connection loss. Maybe this is something we need to >> make >> >>>>> configurable (e.g. introducing a threshold which admittedly is >> similar to >> >>>>> the heartbeat timeout) so that the user can configure it for her >> >>>>> environment. On the upside, if you mark the TaskExecutor dead on >> the first >> >>>>> connection loss (assuming you have a stable network environment), >> then it >> >>>>> can now detect lost TaskExecutors as fast as the heartbeat interval. >> >>>>> >> >>>>> [1] https://issues.apache.org/jira/browse/FLINK-23209 >> >>>>> >> >>>>> Cheers, >> >>>>> Till >> >>>>> >> >>>>> On Fri, Jul 2, 2021 at 9:33 AM Gen Luo wrote: >> >>>>> >> >>>>>> Thanks for sharing, Till and Yang. >> >>>>>> >> >>>>>> @Lu >> >>>>>> Sorry but I don't know how to explain the new test with the log. >> >>>>>> Let's wait for others' reply. >> >>>>>> >> >>>>>> @Till >> >>>>>> It would be nice if JIRAs could be fixed. Thanks again for >> proposing >> >>>>>> them. >> >>>>>> >> >>>>>> In addition, I was tracking an issue that RM keeps allocating and >> >>>>>> freeing slots after a TM lost until its heartbeat timeout, when I >> found the >> >>>>>> recovery costing as long as heartbeat timeout. That should be a >> minor bug >> >>>>>> introduced by declarative resource management. I have created a >> JIRA about >> >>>>>> the problem [1] and we can discuss it there if necessary. >> >>>>>> >> >>>>>> [1] https://issues.apache.org/jira/browse/FLINK-23216 >> >>>>>> >> >>>>>> Lu Niu 于2021年7月2日周五 上午3:13写道: >> >>>>>> >> >>>>>>> Another side question, Shall we add metric to cover the complete >> >>>>>>> restarting time (phase 1 + phase 2)? Current metric >> jm.restartingTime only >> >>>>>>> covers phase 1. Thanks! >> >>>>>>> >> >>>>>>> Best >> >>>>>>> Lu >> >>>>>>> >> >>>>>>> On Thu, Jul 1, 2021 at 12:09 PM Lu Niu wrote: >> >>>>>>> >> >>>>>>>> Thanks TIll and Yang for help! Also Thanks Till for a quick fix! >> >>>>>>>> >> >>>>>>>> I did another test yesterday. In this test, I intentionally throw >> >>>>>>>> exception from the source operator: >> >>>>>>>> ``` >> >>>>>>>> if (runtimeContext.getIndexOfThisSubtask() == 1 >> >>>>>>>> && errorFrenquecyInMin > 0 >> >>>>>>>> && System.currentTimeMillis() - lastStartTime >= >> >>>>>>>> errorFrenquecyInMin * 60 * 1000) { >> >>>>>>>> lastStartTime = System.currentTimeMillis(); >> >>>>>>>> throw new RuntimeException( >> >>>
[jira] [Created] (FLINK-33804) Add Option to disable showing metrics in JobMananger UI
Lu Niu created FLINK-33804: -- Summary: Add Option to disable showing metrics in JobMananger UI Key: FLINK-33804 URL: https://issues.apache.org/jira/browse/FLINK-33804 Project: Flink Issue Type: Improvement Components: Runtime / Metrics Reporter: Lu Niu Flink allows users to view metric in JobMananger UI. However there are 2 problems we found: # The JobManager is required to aggregate metrics from all task managers. When the metric cardinality is quite high, this process can trigger a JobManager Full GC and slow response time. # Flink user cases in prod usually have their own dashboard to view metrics. so this feature sometimes is not useful. In light of this, we propose to add option to disable this feature. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33806) Async IO Allows Custom Action after Final Retry Failure
Lu Niu created FLINK-33806: -- Summary: Async IO Allows Custom Action after Final Retry Failure Key: FLINK-33806 URL: https://issues.apache.org/jira/browse/FLINK-33806 Project: Flink Issue Type: Improvement Components: API / DataStream Reporter: Lu Niu in Async IO Retry Support, if all retries fail, the record is dropped without any further action. However, there are user cases requiring action after the final retry failure occurs, e.g, log out the input or write the input data to an external storage. To address this, we propose to add a new API in the AsyncRetryStrategy and make changes accordingly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-16640) Expose listStatus latency in flink filesystem
Lu Niu created FLINK-16640: -- Summary: Expose listStatus latency in flink filesystem Key: FLINK-16640 URL: https://issues.apache.org/jira/browse/FLINK-16640 Project: Flink Issue Type: Improvement Components: FileSystems Reporter: Lu Niu listStatus could potentially takes long time and slowdown flow workflow. Expose the metrics will help developer better debug. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16931) Large _metadata file lead to JobManager not responding when restart
Lu Niu created FLINK-16931: -- Summary: Large _metadata file lead to JobManager not responding when restart Key: FLINK-16931 URL: https://issues.apache.org/jira/browse/FLINK-16931 Project: Flink Issue Type: Bug Reporter: Lu Niu When _metadata file is big, JobManager could never recover from checkpoint. It fall into a loop that fetch checkpoint -> JM timeout -> restart). Here is related log: 2020-04-01 17:08:25,689 INFO org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Recovering checkpoints from ZooKeeper. 2020-04-01 17:08:25,698 INFO org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Found 3 checkpoints in ZooKeeper. 2020-04-01 17:08:25,698 INFO org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Trying to fetch 3 checkpoints from storage. 2020-04-01 17:08:25,698 INFO org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Trying to retrieve checkpoint 50. 2020-04-01 17:08:48,589 INFO org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Trying to retrieve checkpoint 51. 2020-04-01 17:09:12,775 INFO org.apache.flink.yarn.YarnResourceManager - The heartbeat of JobManager with id 02500708baf0bb976891c391afd3d7d5 timed out. Digging into the code, looks like ExecutionGraph::restart runs in JobMaster main thread and finally calls ZooKeeperCompletedCheckpointStore::retrieveCompletedCheckpoint which download file form DFS. The main thread is basically blocked for a while because of this. One possible solution is to making the downloading part async. More things might need to consider as the original change tries to make it single-threaded. [https://github.com/apache/flink/pull/7568] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21263) Job hangs under backpressure
Lu Niu created FLINK-21263: -- Summary: Job hangs under backpressure Key: FLINK-21263 URL: https://issues.apache.org/jira/browse/FLINK-21263 Project: Flink Issue Type: Bug Affects Versions: 1.11.0 Reporter: Lu Niu Attachments: source_graph.svg, source_js1, source_js2, source_js3 We have a flink job that runs fine for a few days but suddenly hangs and could never recover. Once we relanuch the job, the job runs fine. We detected the job has backpressure, but in all other cases, backpressure would only lead to slower consumption but what is wired here is the job made no progress at all. The symptoms looks similar with FLINK-20618 About the job: 1. Reads from Kafka and writes to Kafka 2. version 1.11 3. enabled unaligned checkpoint symptoms: # All source/sink throughput drop to 0 # All checkpoint fails immediately after triggering. # backpressure shows "high" from source to two downstream operators. # Flamegraph shows all subtask threads are in waiting # Source jstack shows the Source thread is BLOCKED, as belows. {code:java} Source: impression-reader -> impression-filter -> impression-data-conversion (1/60) Stack Trace is: java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x0003a3e71330> (a java.util.concurrent.CompletableFuture$Signaller) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693) at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:293) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:266) at org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:213) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:294) at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103) at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.getBufferBuilder(ChannelSelectorRecordWriter.java:95) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:135) at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.broadcastEmit(ChannelSelectorRecordWriter.java:80) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:121) at org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:570) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:638) at org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:570) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:638) at org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41) at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndEmitWatermark(StreamSourceContexts.java:315) at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.emitWatermark(StreamSourceContexts.java:425) - locked <0x0006a485dab0> (a java.lang.Object) at org.apache.flink.streaming.connectors.kafka.internals.SourceContextWatermarkOutputAdapter.emitWatermark(SourceContextWatermarkOutputAdapter.java:37) at org.apache.flink.api.common.eventtime.WatermarkOutputMultiplexer.updateCombinedWatermark(WatermarkOutputMultiplexer.java:167) at org.apache.flink.api.common.eventtime.WatermarkOutputMultiplexer.onPeriodicEmit(WatermarkOutputMultiplexer.java:136) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$PeriodicWatermarkEmitter.onProcessingTime(AbstractFetcher.java:574) - locked <0x0006a485dab0> (a java.lang.Object) at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1220) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$16(StreamTask.java:1211) at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$590/1066788035.run(Unknown Source) at org.apache.flink.streaming.runt
[jira] [Created] (FLINK-22162) Make Max Operator name Length Configurable
Lu Niu created FLINK-22162: -- Summary: Make Max Operator name Length Configurable Key: FLINK-22162 URL: https://issues.apache.org/jira/browse/FLINK-22162 Project: Flink Issue Type: Improvement Components: Runtime / Configuration Reporter: Lu Niu MaxOperatorNameLength is hardcoded to be 80. User might want to tune the parameter so that after exposing metrics to external metrics system, user can better query the metrics data by name. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22164) Add jobId and JobName variable to JobManager metrics in per-job deployment mode
Lu Niu created FLINK-22164: -- Summary: Add jobId and JobName variable to JobManager metrics in per-job deployment mode Key: FLINK-22164 URL: https://issues.apache.org/jira/browse/FLINK-22164 Project: Flink Issue Type: Improvement Components: Runtime / Metrics Reporter: Lu Niu We expose all flink metrics to external system for monitoring and alerting. However, JobManager metrics only have one variable , which is not enough to target to one job when job is deployed to YARN. If flink job runs in per-job mode, which ensure one job per cluster, we can add jobId and JobName to JobMananger metrics. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22326) Job contains Iterate Operator always fails on Checkpoint
Lu Niu created FLINK-22326: -- Summary: Job contains Iterate Operator always fails on Checkpoint Key: FLINK-22326 URL: https://issues.apache.org/jira/browse/FLINK-22326 Project: Flink Issue Type: Improvement Components: Runtime / Checkpointing Affects Versions: 1.11.1 Reporter: Lu Niu Attachments: Screen Shot 2021-04-16 at 12.40.34 PM.png, Screen Shot 2021-04-16 at 12.43.38 PM.png Job contains Iterate Operator will always fail on checkpoint. How to reproduce: [https://gist.github.com/qqibrow/f297babadb0bb662ee398b9088870785] this is based on [https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java,] but a few line difference: 1. Make maxWaitTime large enough when create IterativeStream 2. No output back to Itertive Source Result: The same code is able to checkpoint in 1.9.1 !image-2021-04-16-12-45-23-624.png! but always fail on checkpoint in 1.11 !image-2021-04-16-12-41-35-002.png! It seems -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19985) job went into zombie state after ZK session timeout
Lu Niu created FLINK-19985: -- Summary: job went into zombie state after ZK session timeout Key: FLINK-19985 URL: https://issues.apache.org/jira/browse/FLINK-19985 Project: Flink Issue Type: Bug Affects Versions: 1.9.1 Reporter: Lu Niu Recently we had an issue that one flink job went into zombie state after ZK session timeout. What happened seemingly was: # ZK session timeout (JobManager HA is enabled) and reconnect, the JobManager is no longer the leader now: # {code:java} org.apache.flink.util.FlinkException: JobManager is no longer the leader.org.apache.flink.util.FlinkException: JobManager is no longer the leader. at org.apache.flink.runtime.jobmaster.JobManagerRunner.revokeJobMasterLeadership(JobManagerRunner.java:391) at org.apache.flink.runtime.jobmaster.JobManagerRunner.lambda$revokeLeadership$5(JobManagerRunner.java:377) at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:981) at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2124) at org.apache.flink.runtime.jobmaster.JobManagerRunner.revokeLeadership(JobManagerRunner.java:374) at org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService.notLeader(ZooKeeperLeaderElectionService.java:247) at org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.leader.LeaderLatch$8.apply(LeaderLatch.java:640) at org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.leader.LeaderLatch$8.apply(LeaderLatch.java:636) at org.apache.flink.shaded.curator.org.apache.curator.framework.listen.ListenerContainer$1.run(ListenerContainer.java:93) at org.apache.flink.shaded.curator.org.apache.curator.shaded.com.google.common.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:297) at org.apache.flink.shaded.curator.org.apache.curator.framework.listen.ListenerContainer.forEach(ListenerContainer.java:85) at org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.leader.LeaderLatch.setLeadership(LeaderLatch.java:635) at org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.leader.LeaderLatch.handleStateChange(LeaderLatch.java:623) at org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.leader.LeaderLatch.access$000(LeaderLatch.java:64) at org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.leader.LeaderLatch$1.stateChanged(LeaderLatch.java:82) at org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager$2.apply(ConnectionStateManager.java:259) at org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager$2.apply(ConnectionStateManager.java:255) at org.apache.flink.shaded.curator.org.apache.curator.framework.listen.ListenerContainer$1.run(ListenerContainer.java:93) at org.apache.flink.shaded.curator.org.apache.curator.shaded.com.google.common.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:297) at org.apache.flink.shaded.curator.org.apache.curator.framework.listen.ListenerContainer.forEach(ListenerContainer.java:85) at org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager.processEvents(ConnectionStateManager.java:253) at org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager.access$000(ConnectionStateManager.java:43) at org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager$1.call(ConnectionStateManager.java:111) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {code} 2. For some reason, the JobManager failed to register itself as the leader, leading the whole job into SUSPENDED and then CANCELED mode. exception was: # {code:java} 2020-11-04 18:42:43,523 ERROR org.apache.flink.runtime.rest.handler.job.JobDetailsHandler - Unhandled exception. org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token mismatch: Ignoring message LocalFencedMessage(826fbfb893e4dbce095f5f1b1d75426b, LocalRpcInvocation(requestJob(JobID, Time))) because the fencing token 826fbfb893e4dbce095f5f1b1d75426b did not match the expected fencing token b42ecc60ee45cf65209ab2c2da88473a. at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:81) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123
[jira] [Created] (FLINK-17089) Checkpoint fail because RocksDBException: Error While opening a file for sequentially reading
Lu Niu created FLINK-17089: -- Summary: Checkpoint fail because RocksDBException: Error While opening a file for sequentially reading Key: FLINK-17089 URL: https://issues.apache.org/jira/browse/FLINK-17089 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Reporter: Lu Niu we use incremental rocksdb state backend. Flink job checkpoint throws following exception after running for about 20 hours: {code:java} Caused by: org.rocksdb.RocksDBException: While opening a file for sequentially reading: /data/foo/bar/usercache/xxx/appcache/application_1584397637704_9072/flink-io-4e2294f0-7e9b-4102-b079-1089f23c47aa/job_d781983f4967703b0480c7943e8100af_op_KeyedProcessOperator_b9daf26d7397cd4b00184cc833054139__27_60__uuid_dee7e33b-9bce-42f3-909a-f6fa4ab52d8c/db/MANIFEST-06: No such file or directory at org.rocksdb.Checkpoint.createCheckpoint(Native Method) at org.rocksdb.Checkpoint.createCheckpoint(Checkpoint.java:51) at org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy.takeDBNativeCheckpoint(RocksIncrementalSnapshotStrategy.java:249) at org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy.doSnapshot(RocksIncrementalSnapshotStrategy.java:160) at org.apache.flink.contrib.streaming.state.snapshot.RocksDBSnapshotStrategyBase.snapshot(RocksDBSnapshotStrategyBase.java:126) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.snapshot(RocksDBKeyedStateBackend.java:439) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:411) ... 17 more {code} This failure consistent happens until the job restarts. Some findings: Jobmanager log shows each time the error came from different subTask: {code:java} // grep jobManager log on appcache/application_1584397637704_9622 Caused by: org.rocksdb.RocksDBException: While opening a file for sequentially reading: /data/nvme3n1/nm-local-dir/usercache/dkapoor/appcache/application_1584397637704_9622/flink-io-c42b6665-0170-4dc9-9933-8abd78812fd5/job_03a4b302f44a8d9f5b31693a80bde30c_op_KeyedProcessOperator_b9daf26d7397cd4b00184cc833054139__5_60__uuid_fa8124e4-1678-4555-a90a-8eec4d974a22/db/MANIFEST-06: No such file or directory Caused by: org.rocksdb.RocksDBException: While opening a file for sequentially reading: /data/nvme3n1/nm-local-dir/usercache/dkapoor/appcache/application_1584397637704_9622/flink-io-a8dfe34d-909e-4aea-8d20-c89199b20856/job_03a4b302f44a8d9f5b31693a80bde30c_op_KeyedProcessOperator_b9daf26d7397cd4b00184cc833054139__4_60__uuid_12fc9764-418e-4802-800e-3623e385743f/db/MANIFEST-06: No such file or directory Caused by: org.rocksdb.RocksDBException: While opening a file for sequentially reading: /data/nvme1n1/nm-local-dir/usercache/dkapoor/appcache/application_1584397637704_9622/flink-io-e98c35d7-586a-4edb-9eba-99c6fd823540/job_03a4b302f44a8d9f5b31693a80bde30c_op_KeyedProcessOperator_b9daf26d7397cd4b00184cc833054139__9_60__uuid_f52a3f02-aa12-4285-b594-b94e1b0f8ba7/db/MANIFEST-06: No such file or directory Caused by: org.rocksdb.RocksDBException: While opening a file for sequentially reading: /data/nvme3n1/nm-local-dir/usercache/dkapoor/appcache/application_1584397637704_9622/flink-io-a2887f93-1c75-48b1-8b67-72acdc69ce1b/job_03a4b302f44a8d9f5b31693a80bde30c_op_KeyedProcessOperator_b9daf26d7397cd4b00184cc833054139__2_60__uuid_6a8267eb-aa04-48a3-b82f-7b5b9f21c8e0/db/MANIFEST-06: No such file or directory Caused by: org.rocksdb.RocksDBException: While opening a file for sequentially reading: /data/nvme2n1/nm-local-dir/usercache/dkapoor/appcache/application_1584397637704_9622/flink-io-27e797c3-de39-4140-84e8-b94e640154cc/job_03a4b302f44a8d9f5b31693a80bde30c_op_KeyedProcessOperator_b9daf26d7397cd4b00184cc833054139__1_60__uuid_fde8b198-32d8-4e0c-a412-f316a4fe1e3e/db/MANIFEST-06: No such file or directory Caused by: org.rocksdb.RocksDBException: While opening a file for sequentially reading: /data/nvme1n1/nm-local-dir/usercache/dkapoor/appcache/application_1584397637704_9622/flink-io-e98c35d7-586a-4edb-9eba-99c6fd823540/job_03a4b302f44a8d9f5b31693a80bde30c_op_KeyedProcessOperator_b9daf26d7397cd4b00184cc833054139__9_60__uuid_f52a3f02-aa12-4285-b594-b94e1b0f8ba7/db/MANIFEST-06: No such file or directory Caused by: org.rocksdb.RocksDBException: While opening a file for sequentially reading: /data/nvme2n1/nm-local-dir/usercache/dkapoor/appcache/application_1584397637704_9622/flink-io-7be6a975-c0cd-4083-a1c3-b47e4c8fbb1b/job_03a4b302f44a8d9f5b31693a80bde30c_op_KeyedProcessOperator_b9daf26d7397cd4b00184cc833054139__13_60__uuid_d779fe65-181f-40d2-b32e-e17a023c128d/db/MANIFEST-06: No such file or directory Caused by: org.rocksdb.RocksDBException: While opening a file for sequentially reading: /data/nvme1n1/nm-local-dir/usercache
[jira] [Created] (FLINK-17364) Support StreamingFileSink in PrestoS3FileSystem
Lu Niu created FLINK-17364: -- Summary: Support StreamingFileSink in PrestoS3FileSystem Key: FLINK-17364 URL: https://issues.apache.org/jira/browse/FLINK-17364 Project: Flink Issue Type: Improvement Components: Connectors / FileSystem Reporter: Lu Niu For S3, currently the StreamingFileSink supports only the Hadoop-based FileSystem implementation, not the implementation based on Presto At the same time, presto is the recommended file system for checkpointing. implementing StreamingFileSink in PrestoS3FileSystem helps filling the gap, enables user to use PrestoS3FileSystem in all access to S3. -- This message was sent by Atlassian Jira (v8.3.4#803005)