Re: Adaptive load balancing
Hi Krishnan, Thanks for discussing this interesting scenario! It makes me remind of a previous pending improvement of adaptive load balance for rebalance partitioner. Since the rebalance mode can emit the data to any nodes without precision consideration, then the data can be emitted based on the current backlog of partition adaptively which can reflect the load condition of consumers somehow. For your keyBy case, I guess the requirement is not only for the load balance of processing, but also for the consistency of preloaded cache. Do you think it is possible to implement somehow custom partitioner which can control the logic of keyBy distribution based on pre-defined cache distribution in nodes? Best, Zhijiang -- From:Navneeth Krishnan Send Time:2020年9月23日(星期三) 02:21 To:user Subject:Adaptive load balancing Hi All, We are currently using flink in production and use keyBy for performing a CPU intensive computation. There is a cache lookup for a set of keys and since keyBy cannot guarantee the data is sent to a single node we are basically replicating the cache on all nodes. This is causing more memory problems for us and we would like to explore some options to mitigate the current limitations. Is there a way to group a set of keys and send to a set of nodes so that we don't have to replicate the cache data on all nodes? Has someone tried implementing hashing with adaptive load balancing so that if a node is busy processing then the data can be routed effectively to other nodes which are free. Any suggestions are greatly appreciated. Thanks
Re: Re: [ANNOUNCE] New PMC member: Dian Fu
Congrats, Dian! -- From:Yun Gao Send Time:2020年8月27日(星期四) 17:44 To:dev ; Dian Fu ; user ; user-zh Subject:Re: Re: [ANNOUNCE] New PMC member: Dian Fu Congratulations Dian ! Best Yun -- Sender:Marta Paes Moreira Date:2020/08/27 17:42:34 Recipient:Yuan Mei Cc:Xingbo Huang; jincheng sun; dev; Dian Fu; user; user-zh Theme:Re: [ANNOUNCE] New PMC member: Dian Fu Congrats, Dian! On Thu, Aug 27, 2020 at 11:39 AM Yuan Mei wrote: Congrats! On Thu, Aug 27, 2020 at 5:38 PM Xingbo Huang wrote: Congratulations Dian! Best, Xingbo jincheng sun 于2020年8月27日周四 下午5:24写道: Hi all, On behalf of the Flink PMC, I'm happy to announce that Dian Fu is now part of the Apache Flink Project Management Committee (PMC). Dian Fu has been very active on PyFlink component, working on various important features, such as the Python UDF and Pandas integration, and keeps checking and voting for our releases, and also has successfully produced two releases(1.9.3&1.11.1) as RM, currently working as RM to push forward the release of Flink 1.12. Please join me in congratulating Dian Fu for becoming a Flink PMC Member! Best, Jincheng(on behalf of the Flink PMC)
Re: Re: [ANNOUNCE] New PMC member: Dian Fu
Congrats, Dian! -- From:Yun Gao Send Time:2020年8月27日(星期四) 17:44 To:dev ; Dian Fu ; user ; user-zh Subject:Re: Re: [ANNOUNCE] New PMC member: Dian Fu Congratulations Dian ! Best Yun -- Sender:Marta Paes Moreira Date:2020/08/27 17:42:34 Recipient:Yuan Mei Cc:Xingbo Huang; jincheng sun; dev; Dian Fu; user; user-zh Theme:Re: [ANNOUNCE] New PMC member: Dian Fu Congrats, Dian! On Thu, Aug 27, 2020 at 11:39 AM Yuan Mei wrote: Congrats! On Thu, Aug 27, 2020 at 5:38 PM Xingbo Huang wrote: Congratulations Dian! Best, Xingbo jincheng sun 于2020年8月27日周四 下午5:24写道: Hi all, On behalf of the Flink PMC, I'm happy to announce that Dian Fu is now part of the Apache Flink Project Management Committee (PMC). Dian Fu has been very active on PyFlink component, working on various important features, such as the Python UDF and Pandas integration, and keeps checking and voting for our releases, and also has successfully produced two releases(1.9.3&1.11.1) as RM, currently working as RM to push forward the release of Flink 1.12. Please join me in congratulating Dian Fu for becoming a Flink PMC Member! Best, Jincheng(on behalf of the Flink PMC)
Re: [ANNOUNCE] Apache Flink 1.10.2 released
Congrats, thanks for the release manager work Zhu Zhu and everyone involved in! Best, Zhijiang -- From:liupengcheng Send Time:2020年8月26日(星期三) 19:37 To:dev ; Xingbo Huang Cc:Guowei Ma ; user-zh ; Yangze Guo ; Dian Fu ; Zhu Zhu ; user Subject:Re: [ANNOUNCE] Apache Flink 1.10.2 released Thanks ZhuZhu for managing this release and everyone who contributed to this. Best, Pengcheng 在 2020/8/26 下午7:06,“Congxian Qiu” 写入: Thanks ZhuZhu for managing this release and everyone else who contributed to this release! Best, Congxian Xingbo Huang 于2020年8月26日周三 下午1:53写道: > Thanks Zhu for the great work and everyone who contributed to this release! > > Best, > Xingbo > > Guowei Ma 于2020年8月26日周三 下午12:43写道: > >> Hi, >> >> Thanks a lot for being the release manager Zhu Zhu! >> Thanks everyone contributed to this! >> >> Best, >> Guowei >> >> >> On Wed, Aug 26, 2020 at 11:18 AM Yun Tang wrote: >> >>> Thanks for Zhu's work to manage this release and everyone who >>> contributed to this! >>> >>> Best, >>> Yun Tang >>> >>> From: Yangze Guo >>> Sent: Tuesday, August 25, 2020 14:47 >>> To: Dian Fu >>> Cc: Zhu Zhu ; dev ; user < >>> user@flink.apache.org>; user-zh >>> Subject: Re: [ANNOUNCE] Apache Flink 1.10.2 released >>> >>> Thanks a lot for being the release manager Zhu Zhu! >>> Congrats to all others who have contributed to the release! >>> >>> Best, >>> Yangze Guo >>> >>> On Tue, Aug 25, 2020 at 2:42 PM Dian Fu wrote: >>> > >>> > Thanks ZhuZhu for managing this release and everyone else who >>> contributed to this release! >>> > >>> > Regards, >>> > Dian >>> > >>> > 在 2020年8月25日,下午2:22,Till Rohrmann 写道: >>> > >>> > Great news. Thanks a lot for being our release manager Zhu Zhu and to >>> all others who have contributed to the release! >>> > >>> > Cheers, >>> > Till >>> > >>> > On Tue, Aug 25, 2020 at 5:37 AM Zhu Zhu wrote: >>> >> >>> >> The Apache Flink community is very happy to announce the release of >>> Apache Flink 1.10.2, which is the first bugfix release for the Apache Flink >>> 1.10 series. >>> >> >>> >> Apache Flink(r) is an open-source stream processing framework for >>> distributed, high-performing, always-available, and accurate data streaming >>> applications. >>> >> >>> >> The release is available for download at: >>> >> https://flink.apache.org/downloads.html >>> >> >>> >> Please check out the release blog post for an overview of the >>> improvements for this bugfix release: >>> >> https://flink.apache.org/news/2020/08/25/release-1.10.2.html >>> >> >>> >> The full release notes are available in Jira: >>> >> >>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12347791 >>> >> >>> >> We would like to thank all contributors of the Apache Flink community >>> who made this release possible! >>> >> >>> >> Thanks, >>> >> Zhu >>> > >>> > >>> >>
Re: [ANNOUNCE] Apache Flink 1.10.2 released
Congrats, thanks for the release manager work Zhu Zhu and everyone involved in! Best, Zhijiang -- From:liupengcheng Send Time:2020年8月26日(星期三) 19:37 To:dev ; Xingbo Huang Cc:Guowei Ma ; user-zh ; Yangze Guo ; Dian Fu ; Zhu Zhu ; user Subject:Re: [ANNOUNCE] Apache Flink 1.10.2 released Thanks ZhuZhu for managing this release and everyone who contributed to this. Best, Pengcheng 在 2020/8/26 下午7:06,“Congxian Qiu” 写入: Thanks ZhuZhu for managing this release and everyone else who contributed to this release! Best, Congxian Xingbo Huang 于2020年8月26日周三 下午1:53写道: > Thanks Zhu for the great work and everyone who contributed to this release! > > Best, > Xingbo > > Guowei Ma 于2020年8月26日周三 下午12:43写道: > >> Hi, >> >> Thanks a lot for being the release manager Zhu Zhu! >> Thanks everyone contributed to this! >> >> Best, >> Guowei >> >> >> On Wed, Aug 26, 2020 at 11:18 AM Yun Tang wrote: >> >>> Thanks for Zhu's work to manage this release and everyone who >>> contributed to this! >>> >>> Best, >>> Yun Tang >>> >>> From: Yangze Guo >>> Sent: Tuesday, August 25, 2020 14:47 >>> To: Dian Fu >>> Cc: Zhu Zhu ; dev ; user < >>> u...@flink.apache.org>; user-zh >>> Subject: Re: [ANNOUNCE] Apache Flink 1.10.2 released >>> >>> Thanks a lot for being the release manager Zhu Zhu! >>> Congrats to all others who have contributed to the release! >>> >>> Best, >>> Yangze Guo >>> >>> On Tue, Aug 25, 2020 at 2:42 PM Dian Fu wrote: >>> > >>> > Thanks ZhuZhu for managing this release and everyone else who >>> contributed to this release! >>> > >>> > Regards, >>> > Dian >>> > >>> > 在 2020年8月25日,下午2:22,Till Rohrmann 写道: >>> > >>> > Great news. Thanks a lot for being our release manager Zhu Zhu and to >>> all others who have contributed to the release! >>> > >>> > Cheers, >>> > Till >>> > >>> > On Tue, Aug 25, 2020 at 5:37 AM Zhu Zhu wrote: >>> >> >>> >> The Apache Flink community is very happy to announce the release of >>> Apache Flink 1.10.2, which is the first bugfix release for the Apache Flink >>> 1.10 series. >>> >> >>> >> Apache Flink(r) is an open-source stream processing framework for >>> distributed, high-performing, always-available, and accurate data streaming >>> applications. >>> >> >>> >> The release is available for download at: >>> >> https://flink.apache.org/downloads.html >>> >> >>> >> Please check out the release blog post for an overview of the >>> improvements for this bugfix release: >>> >> https://flink.apache.org/news/2020/08/25/release-1.10.2.html >>> >> >>> >> The full release notes are available in Jira: >>> >> >>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12347791 >>> >> >>> >> We would like to thank all contributors of the Apache Flink community >>> who made this release possible! >>> >> >>> >> Thanks, >>> >> Zhu >>> > >>> > >>> >>
Re: [ANNOUNCE] Apache Flink 1.11.1 released
Thanks for being the release manager and the efficient work, Dian! Best, Zhijiang -- From:Konstantin Knauf Send Time:2020年7月22日(星期三) 19:55 To:Till Rohrmann Cc:dev ; Yangze Guo ; Dian Fu ; user ; user-zh Subject:Re: [ANNOUNCE] Apache Flink 1.11.1 released Thank you for managing the quick follow up release. I think this was very important for Table & SQL users. On Wed, Jul 22, 2020 at 1:45 PM Till Rohrmann wrote: Thanks for being the release manager for the 1.11.1 release, Dian. Thanks a lot to everyone who contributed to this release. Cheers, Till On Wed, Jul 22, 2020 at 11:38 AM Hequn Cheng wrote: Thanks Dian for the great work and thanks to everyone who makes this release possible! Best, Hequn On Wed, Jul 22, 2020 at 4:40 PM Jark Wu wrote: > Congratulations! Thanks Dian for the great work and to be the release > manager! > > Best, > Jark > > On Wed, 22 Jul 2020 at 15:45, Yangze Guo wrote: > > > Congrats! > > > > Thanks Dian Fu for being release manager, and everyone involved! > > > > Best, > > Yangze Guo > > > > On Wed, Jul 22, 2020 at 3:14 PM Wei Zhong > wrote: > > > > > > Congratulations! Thanks Dian for the great work! > > > > > > Best, > > > Wei > > > > > > > 在 2020年7月22日,15:09,Leonard Xu 写道: > > > > > > > > Congratulations! > > > > > > > > Thanks Dian Fu for the great work as release manager, and thanks > > everyone involved! > > > > > > > > Best > > > > Leonard Xu > > > > > > > >> 在 2020年7月22日,14:52,Dian Fu 写道: > > > >> > > > >> The Apache Flink community is very happy to announce the release of > > Apache Flink 1.11.1, which is the first bugfix release for the Apache > Flink > > 1.11 series. > > > >> > > > >> Apache Flink(r) is an open-source stream processing framework for > > distributed, high-performing, always-available, and accurate data > streaming > > applications. > > > >> > > > >> The release is available for download at: > > > >> https://flink.apache.org/downloads.html > > > >> > > > >> Please check out the release blog post for an overview of the > > improvements for this bugfix release: > > > >> https://flink.apache.org/news/2020/07/21/release-1.11.1.html > > > >> > > > >> The full release notes are available in Jira: > > > >> > > > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12348323 > > > >> > > > >> We would like to thank all contributors of the Apache Flink > community > > who made this release possible! > > > >> > > > >> Regards, > > > >> Dian > > > > > > > > > > -- Konstantin Knauf https://twitter.com/snntrable https://github.com/knaufk
Re: [ANNOUNCE] Apache Flink 1.11.1 released
Thanks for being the release manager and the efficient work, Dian! Best, Zhijiang -- From:Konstantin Knauf Send Time:2020年7月22日(星期三) 19:55 To:Till Rohrmann Cc:dev ; Yangze Guo ; Dian Fu ; user ; user-zh Subject:Re: [ANNOUNCE] Apache Flink 1.11.1 released Thank you for managing the quick follow up release. I think this was very important for Table & SQL users. On Wed, Jul 22, 2020 at 1:45 PM Till Rohrmann wrote: Thanks for being the release manager for the 1.11.1 release, Dian. Thanks a lot to everyone who contributed to this release. Cheers, Till On Wed, Jul 22, 2020 at 11:38 AM Hequn Cheng wrote: Thanks Dian for the great work and thanks to everyone who makes this release possible! Best, Hequn On Wed, Jul 22, 2020 at 4:40 PM Jark Wu wrote: > Congratulations! Thanks Dian for the great work and to be the release > manager! > > Best, > Jark > > On Wed, 22 Jul 2020 at 15:45, Yangze Guo wrote: > > > Congrats! > > > > Thanks Dian Fu for being release manager, and everyone involved! > > > > Best, > > Yangze Guo > > > > On Wed, Jul 22, 2020 at 3:14 PM Wei Zhong > wrote: > > > > > > Congratulations! Thanks Dian for the great work! > > > > > > Best, > > > Wei > > > > > > > 在 2020年7月22日,15:09,Leonard Xu 写道: > > > > > > > > Congratulations! > > > > > > > > Thanks Dian Fu for the great work as release manager, and thanks > > everyone involved! > > > > > > > > Best > > > > Leonard Xu > > > > > > > >> 在 2020年7月22日,14:52,Dian Fu 写道: > > > >> > > > >> The Apache Flink community is very happy to announce the release of > > Apache Flink 1.11.1, which is the first bugfix release for the Apache > Flink > > 1.11 series. > > > >> > > > >> Apache Flink(r) is an open-source stream processing framework for > > distributed, high-performing, always-available, and accurate data > streaming > > applications. > > > >> > > > >> The release is available for download at: > > > >> https://flink.apache.org/downloads.html > > > >> > > > >> Please check out the release blog post for an overview of the > > improvements for this bugfix release: > > > >> https://flink.apache.org/news/2020/07/21/release-1.11.1.html > > > >> > > > >> The full release notes are available in Jira: > > > >> > > > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12348323 > > > >> > > > >> We would like to thank all contributors of the Apache Flink > community > > who made this release possible! > > > >> > > > >> Regards, > > > >> Dian > > > > > > > > > > -- Konstantin Knauf https://twitter.com/snntrable https://github.com/knaufk
[ANNOUNCE] Apache Flink 1.11.0 released
The Apache Flink community is very happy to announce the release of Apache Flink 1.11.0, which is the latest major release. Apache Flink(r) is an open-source stream processing framework for distributed, high-performing, always-available, and accurate data streaming applications. The release is available for download at: https://flink.apache.org/downloads.html Please check out the release blog post for an overview of the improvements for this new major release: https://flink.apache.org/news/2020/07/06/release-1.11.0.html The full release notes are available in Jira: https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12346364 We would like to thank all contributors of the Apache Flink community who made this release possible! Cheers, Piotr & Zhijiang
Re: Unaligned Checkpoint and Exactly Once
From implementation or logic complication perspective, the AT_LEAST_ONCE is somehow simpler compared with EXACTLY_ONCE w/o unaligned, since it can always process data without blocking any channels. -- From:Lu Weizheng Send Time:2020年6月22日(星期一) 10:53 To:Zhijiang ; user@flink.apache.org Subject:回复: Unaligned Checkpoint and Exactly Once Thank you Zhijiang! The second question about config is just because I find a method in InputProcessorUtil. I guess AT_LEAST_ONCE mode is a simpler way to handle checkpont barrier? private static CheckpointBarrierHandler createCheckpointBarrierHandler( StreamConfig config, InputGate[] inputGates, SubtaskCheckpointCoordinator checkpointCoordinator, String taskName, AbstractInvokable toNotifyOnCheckpoint) { switch (config.getCheckpointMode()) { case EXACTLY_ONCE: if (config.isUnalignedCheckpointsEnabled()) { return new AlternatingCheckpointBarrierHandler( new CheckpointBarrierAligner(taskName, toNotifyOnCheckpoint, inputGates), new CheckpointBarrierUnaligner(checkpointCoordinator, taskName, toNotifyOnCheckpoint, inputGates), toNotifyOnCheckpoint); } return new CheckpointBarrierAligner(taskName, toNotifyOnCheckpoint, inputGates); case AT_LEAST_ONCE: int numInputChannels = Arrays.stream(inputGates).mapToInt(InputGate::getNumberOfInputChannels).sum(); return new CheckpointBarrierTracker(numInputChannels, toNotifyOnCheckpoint); default: throw new UnsupportedOperationException("Unrecognized Checkpointing Mode: " + config.getCheckpointMode()); } } 发件人: Zhijiang 发送时间: 2020年6月22日 10:41 收件人: Lu Weizheng ; user@flink.apache.org 主题: Re: Unaligned Checkpoint and Exactly Once Hi Weizheng, The unaligned checkpoint (UC) only supports exactly-once mode in Flink 1.11 except savepoint mode. The savepoint is probably used in job rescaling scenario and we plan to support it in future release version. Of course UC can satisfy exactly-once semantic as promised. Regarding the config issue, i am not sure I get your point here. The first config is for describing whether the current setting mode (actually only exactly-once) enables UC or not, and the second config is for setting the different mode (exactly-once or at least-once). I guess you refer to merge them by using the first config form. But somehow they seem two different dimensions for config the checkpoint. One is for the semantic of data processing guarantee. And the other is for how we realize two different mechanisms to guarantee one (exactly-once) of the semantics. Best, Zhijiang -- From:Lu Weizheng Send Time:2020年6月22日(星期一) 07:20 To:user@flink.apache.org Subject:Unaligned Checkpoint and Exactly Once Hi there, The new feature in Flink 1.11 will provide us the Unaligned Checkpoint which means a operator subtask does not need to wait all the Checkpoint barrier and will not block some channels. As the Checkpoint barrier is the key mechanism for Exactly Once guarantee, I am not sure Unaligned Checkpoint could still achieve Exactly Once guarantee or only AT Least Once? FLIP-76 : Unaligned checkpoints will initially be an optional feature. After collecting experience and implementing all necessary extensions, unaligned checkpoint will probably be enabled by default for exactly once. What's more, in the following two configs, Config 1 env.getCheckpointConfig().enableUnalignedCheckpoints(); Config 2 checkpointCfg.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE); Does Config 2 use a even simpler way for Checkpoint than Unaligned Checkpoint? Hope for replies! Weizheng
Re: Unaligned Checkpoint and Exactly Once
Hi Weizheng, The unaligned checkpoint (UC) only supports exactly-once mode in Flink 1.11 except savepoint mode. The savepoint is probably used in job rescaling scenario and we plan to support it in future release version. Of course UC can satisfy exactly-once semantic as promised. Regarding the config issue, i am not sure I get your point here. The first config is for describing whether the current setting mode (actually only exactly-once) enables UC or not, and the second config is for setting the different mode (exactly-once or at least-once). I guess you refer to merge them by using the first config form. But somehow they seem two different dimensions for config the checkpoint. One is for the semantic of data processing guarantee. And the other is for how we realize two different mechanisms to guarantee one (exactly-once) of the semantics. Best, Zhijiang -- From:Lu Weizheng Send Time:2020年6月22日(星期一) 07:20 To:user@flink.apache.org Subject:Unaligned Checkpoint and Exactly Once Hi there, The new feature in Flink 1.11 will provide us the Unaligned Checkpoint which means a operator subtask does not need to wait all the Checkpoint barrier and will not block some channels. As the Checkpoint barrier is the key mechanism for Exactly Once guarantee, I am not sure Unaligned Checkpoint could still achieve Exactly Once guarantee or only AT Least Once? FLIP-76 : Unaligned checkpoints will initially be an optional feature. After collecting experience and implementing all necessary extensions, unaligned checkpoint will probably be enabled by default for exactly once. What's more, in the following two configs, Config 1 env.getCheckpointConfig().enableUnalignedCheckpoints(); Config 2 checkpointCfg.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE); Does Config 2 use a even simpler way for Checkpoint than Unaligned Checkpoint? Hope for replies! Weizheng
Re: [ANNOUNCE] Yu Li is now part of the Flink PMC
Congratulations Yu! Well deserved! Best, Zhijiang -- From:Dian Fu Send Time:2020年6月17日(星期三) 10:48 To:dev Cc:Haibo Sun ; user ; user-zh Subject:Re: [ANNOUNCE] Yu Li is now part of the Flink PMC Congrats Yu! Regards, Dian > 在 2020年6月17日,上午10:35,Jark Wu 写道: > > Congratulations Yu! Well deserved! > > Best, > Jark > > On Wed, 17 Jun 2020 at 10:18, Haibo Sun wrote: > >> Congratulations Yu! >> >> Best, >> Haibo >> >> >> At 2020-06-17 09:15:02, "jincheng sun" wrote: >>> Hi all, >>> >>> On behalf of the Flink PMC, I'm happy to announce that Yu Li is now >>> part of the Apache Flink Project Management Committee (PMC). >>> >>> Yu Li has been very active on Flink's Statebackend component, working on >>> various improvements, for example the RocksDB memory management for 1.10. >>> and keeps checking and voting for our releases, and also has successfully >>> produced two releases(1.10.0&1.10.1) as RM. >>> >>> Congratulations & Welcome Yu Li! >>> >>> Best, >>> Jincheng (on behalf of the Flink PMC) >> >>
Re: [ANNOUNCE] Yu Li is now part of the Flink PMC
Congratulations Yu! Well deserved! Best, Zhijiang -- From:Dian Fu Send Time:2020年6月17日(星期三) 10:48 To:dev Cc:Haibo Sun ; user ; user-zh Subject:Re: [ANNOUNCE] Yu Li is now part of the Flink PMC Congrats Yu! Regards, Dian > 在 2020年6月17日,上午10:35,Jark Wu 写道: > > Congratulations Yu! Well deserved! > > Best, > Jark > > On Wed, 17 Jun 2020 at 10:18, Haibo Sun wrote: > >> Congratulations Yu! >> >> Best, >> Haibo >> >> >> At 2020-06-17 09:15:02, "jincheng sun" wrote: >>> Hi all, >>> >>> On behalf of the Flink PMC, I'm happy to announce that Yu Li is now >>> part of the Apache Flink Project Management Committee (PMC). >>> >>> Yu Li has been very active on Flink's Statebackend component, working on >>> various improvements, for example the RocksDB memory management for 1.10. >>> and keeps checking and voting for our releases, and also has successfully >>> produced two releases(1.10.0&1.10.1) as RM. >>> >>> Congratulations & Welcome Yu Li! >>> >>> Best, >>> Jincheng (on behalf of the Flink PMC) >> >>
Re: Blocked requesting MemorySegment when Segments are available.
Hi David, I want to clarify two things firstly based on the info you provided below. 1. If all the tasks are running on the same TaskManager, it would be no credit-based flow control. The downstream operator consumes the upstream's data in memory directly, no need network shuffle. 2. If the TaskManager has available buffers, that does not mean the internal task must have available buffers on input or output sides. E.g for the output side of "enrich-events" operator, it has 10 buffers in maximum. After these buffers are exhausted the operator would be blocked no matter with available buffers on TaskManager level. Considering your case, could you double check whether there are buffers accumulated in output ("outputQueueLength" metric) of "enrich-events" operator and whether the "numRecordsIn/numBytesIn" metric of "Test Function" operator is more than 0? I want to get ride of the factors of buffer leak on upstream side and without partition request on downstream side. Then we can further allocate whether the input availability notification on downstream side has bugs to make it stuck forever. Best, Zhijiang -- From:David Maddison Send Time:2020年6月9日(星期二) 19:28 To:user Subject:Blocked requesting MemorySegment when Segments are available. Hi, I keep seeing the following situation where a task is blocked getting a MemorySegment from the pool but the TaskManager is reporting that it has lots of MemorySegments available. I'm completely stumped as to how to debug or what to look at next so any hints/help/advice would be greatly appreciated! /David/ The situation is as follows (Flink 1.10.0): I have two operations, the first one "enrich-events" is stuck forever attempting to get a memory segment to send to downstream operator "Test function": "read-events -> enriched-events (1/1)" #61 prio=5 os_prio=0 tid=0x7f6424091800 nid=0x13b waiting on condition [0x7f644acf] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0xd2206000> (a java.util.concurrent.CompletableFuture$Signaller) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707) at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:231) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:209) at org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:189) at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:145) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:116) at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) All the operator tasks are running on the same TaskManager and the TaskManager reports that it has 6,517 memory segments available, so it's confusing why the task would be blocked getting a memory segment. Memory Segments Type Count Available 6,517 Total 6,553 Even more confusing is that the downstream task appears to be waiting for data and therefore I would assume that the credit based flow control isn't causing the back pressure. "Test Function (1/1)" #62 prio=5 os_prio=0 tid=0x7f6424094000 nid=0x13c waiting on condition [0x7f644abf] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0xc91de1d0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) at org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:146) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(Mailbox
Re: Flink 1.10 memory and backpressure
Sorry for missing the document link [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/back_pressure.html -- From:Zhijiang Send Time:2020年6月11日(星期四) 11:32 To:Steven Nelson ; user Subject:Re: Flink 1.10 memory and backpressure Regarding the monitor of backpressure, you can refer to the document [1]. As for debugging the backpressure, one option is to trace the jstack of respective window task thread which causes the backpressure(almost has the maximum inqueue buffers). After frequent tracing the jstack, you might find which execution (e.g. state access) costs much, then you can probably find the bottleneck. Besides that, in release-1.11 the unaligned checkpoint is introduced and implemented to mainly resolve the checkpoint issue in the case of backkpressure. Maybe you can pay attention to this feature and have a try for your case. Best, Zhiijiang -- From:Steven Nelson Send Time:2020年6月11日(星期四) 04:35 To:user Subject:Flink 1.10 memory and backpressure We are working with a process and having some problems with backpressure. The backpressure seems to be caused by a simple Window operation, which causes our checkpoints to fail. What would be the recommendations for debugging the backpressure?
Re: Flink 1.10 memory and backpressure
Regarding the monitor of backpressure, you can refer to the document [1]. As for debugging the backpressure, one option is to trace the jstack of respective window task thread which causes the backpressure(almost has the maximum inqueue buffers). After frequent tracing the jstack, you might find which execution (e.g. state access) costs much, then you can probably find the bottleneck. Besides that, in release-1.11 the unaligned checkpoint is introduced and implemented to mainly resolve the checkpoint issue in the case of backkpressure. Maybe you can pay attention to this feature and have a try for your case. Best, Zhiijiang -- From:Steven Nelson Send Time:2020年6月11日(星期四) 04:35 To:user Subject:Flink 1.10 memory and backpressure We are working with a process and having some problems with backpressure. The backpressure seems to be caused by a simple Window operation, which causes our checkpoints to fail. What would be the recommendations for debugging the backpressure?
Re: Singal task backpressure problem with Credit-based Flow Control
Hi Weihua, From your below info, it is with the expectation in credit-based flow control. I guess one of the sink parallelism causes the backpressure, so you will see that there are no available credits on Sink side and the outPoolUsage of Map is almost 100%. It really reflects the credit-based states in the case of backpressure. If you want to analyze the root cause of backpressure, you can trace the task stack of respective Sink parallelism to find which operation costs much, then you can increase the parallelism or improve the UDF(if have bottleneck) to have a try. In addition, i am not sure why you choose rescale to shuffle data among operators. The default forward mode can gain really good performance by default if you adjusting the same parallelism among them. Best, Zhijiang -- From:Weihua Hu Send Time:2020年5月24日(星期日) 18:32 To:user Subject:Singal task backpressure problem with Credit-based Flow Control Hi, all I ran into a weird single Task BackPressure problem. JobInfo: DAG: Source (1000)-> Map (2000)-> Sink (1000), which is linked via rescale. Flink version: 1.9.0 There is no related info in jobmanager/taskamanger log. Through Metrics, I see that Map (242) 's outPoolUsage is full, but its downstream Sink (121)' s inPoolUsage is 0. After dumping the memory and analyzing it, I found: Sink (121)'s RemoteInputChannel.unannouncedCredit = 0, Map (242)'s CreditBasedSequenceNumberingViewReader.numCreditsAvailable = 0. This is not consistent with my understanding of the Flink network transmission mechanism. Can someone help me? Thanks a lot. Best Weihua Hu
Re: [ANNOUNCE] Apache Flink 1.10.1 released
Thanks Yu for the release manager and everyone involved in. Best, Zhijiang -- From:Arvid Heise Send Time:2020年5月18日(星期一) 23:17 To:Yangze Guo Cc:dev ; Apache Announce List ; user ; Yu Li ; user-zh Subject:Re: [ANNOUNCE] Apache Flink 1.10.1 released Thank you very much! On Mon, May 18, 2020 at 8:28 AM Yangze Guo wrote: Thanks Yu for the great job. Congrats everyone who made this release possible. Best, Yangze Guo On Mon, May 18, 2020 at 10:57 AM Leonard Xu wrote: > > > Thanks Yu for being the release manager, and everyone else who made this > possible. > > Best, > Leonard Xu > > 在 2020年5月18日,10:43,Zhu Zhu 写道: > > Thanks Yu for being the release manager. Thanks everyone who made this > release possible! > > Thanks, > Zhu Zhu > > Benchao Li 于2020年5月15日周五 下午7:51写道: >> >> Thanks Yu for the great work, and everyone else who made this possible. >> >> Dian Fu 于2020年5月15日周五 下午6:55写道: >>> >>> Thanks Yu for managing this release and everyone else who made this >>> release possible. Good work! >>> >>> Regards, >>> Dian >>> >>> 在 2020年5月15日,下午6:26,Till Rohrmann 写道: >>> >>> Thanks Yu for being our release manager and everyone else who made the >>> release possible! >>> >>> Cheers, >>> Till >>> >>> On Fri, May 15, 2020 at 9:15 AM Congxian Qiu >>> wrote: >>>> >>>> Thanks a lot for the release and your great job, Yu! >>>> Also thanks to everyone who made this release possible! >>>> >>>> Best, >>>> Congxian >>>> >>>> >>>> Yu Li 于2020年5月14日周四 上午1:59写道: >>>>> >>>>> The Apache Flink community is very happy to announce the release of >>>>> Apache Flink 1.10.1, which is the first bugfix release for the Apache >>>>> Flink 1.10 series. >>>>> >>>>> Apache Flink(r) is an open-source stream processing framework for >>>>> distributed, high-performing, always-available, and accurate data >>>>> streaming applications. >>>>> >>>>> The release is available for download at: >>>>> https://flink.apache.org/downloads.html >>>>> >>>>> Please check out the release blog post for an overview of the >>>>> improvements for this bugfix release: >>>>> https://flink.apache.org/news/2020/05/12/release-1.10.1.html >>>>> >>>>> The full release notes are available in Jira: >>>>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12346891 >>>>> >>>>> We would like to thank all contributors of the Apache Flink community >>>>> who made this release possible! >>>>> >>>>> Regards, >>>>> Yu >>> >>> >> >> >> -- >> >> Benchao Li >> School of Electronics Engineering and Computer Science, Peking University >> Tel:+86-15650713730 >> Email: libenc...@gmail.com; libenc...@pku.edu.cn > > -- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany --Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng
Re: [ANNOUNCE] Apache Flink 1.10.1 released
Thanks Yu for the release manager and everyone involved in. Best, Zhijiang -- From:Arvid Heise Send Time:2020年5月18日(星期一) 23:17 To:Yangze Guo Cc:dev ; Apache Announce List ; user ; Yu Li ; user-zh Subject:Re: [ANNOUNCE] Apache Flink 1.10.1 released Thank you very much! On Mon, May 18, 2020 at 8:28 AM Yangze Guo wrote: Thanks Yu for the great job. Congrats everyone who made this release possible. Best, Yangze Guo On Mon, May 18, 2020 at 10:57 AM Leonard Xu wrote: > > > Thanks Yu for being the release manager, and everyone else who made this > possible. > > Best, > Leonard Xu > > 在 2020年5月18日,10:43,Zhu Zhu 写道: > > Thanks Yu for being the release manager. Thanks everyone who made this > release possible! > > Thanks, > Zhu Zhu > > Benchao Li 于2020年5月15日周五 下午7:51写道: >> >> Thanks Yu for the great work, and everyone else who made this possible. >> >> Dian Fu 于2020年5月15日周五 下午6:55写道: >>> >>> Thanks Yu for managing this release and everyone else who made this >>> release possible. Good work! >>> >>> Regards, >>> Dian >>> >>> 在 2020年5月15日,下午6:26,Till Rohrmann 写道: >>> >>> Thanks Yu for being our release manager and everyone else who made the >>> release possible! >>> >>> Cheers, >>> Till >>> >>> On Fri, May 15, 2020 at 9:15 AM Congxian Qiu >>> wrote: >>>> >>>> Thanks a lot for the release and your great job, Yu! >>>> Also thanks to everyone who made this release possible! >>>> >>>> Best, >>>> Congxian >>>> >>>> >>>> Yu Li 于2020年5月14日周四 上午1:59写道: >>>>> >>>>> The Apache Flink community is very happy to announce the release of >>>>> Apache Flink 1.10.1, which is the first bugfix release for the Apache >>>>> Flink 1.10 series. >>>>> >>>>> Apache Flink(r) is an open-source stream processing framework for >>>>> distributed, high-performing, always-available, and accurate data >>>>> streaming applications. >>>>> >>>>> The release is available for download at: >>>>> https://flink.apache.org/downloads.html >>>>> >>>>> Please check out the release blog post for an overview of the >>>>> improvements for this bugfix release: >>>>> https://flink.apache.org/news/2020/05/12/release-1.10.1.html >>>>> >>>>> The full release notes are available in Jira: >>>>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12346891 >>>>> >>>>> We would like to thank all contributors of the Apache Flink community >>>>> who made this release possible! >>>>> >>>>> Regards, >>>>> Yu >>> >>> >> >> >> -- >> >> Benchao Li >> School of Electronics Engineering and Computer Science, Peking University >> Tel:+86-15650713730 >> Email: libenc...@gmail.com; libenc...@pku.edu.cn > > -- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany --Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng
Re: Flink Streaming Job Tuning help
Hi Kumar, I can give some general ideas for further analysis. > We are finding that flink lags seriously behind when we introduce the keyBy > (presumably because of shuffle across the network) The `keyBy` would break the chained operators, so it might bring obvious performance sensitive in practice. I guess if your previous way without keyBy can make use of chained mechanism, the follow-up operator can consume the emitted records from the preceding operator directly, no need to involve in buffer serialization-> network shuffle -> buffer deserializer processes, especially your record size 10K is a bit large. If the keyBy is necessary in your case, then you can further check the current bottleneck. E.g. whether there are back pressure which you can monitor from web UI. If so, which task is the bottleneck to cause the back pressure, and you can trace it by network related metrics. Whether there are data skew in your case, that means some task would process more records than others. If so, maybe we can increase the parallelism to balance the load. Best, Zhijiang -- From:Senthil Kumar Send Time:2020年5月13日(星期三) 00:49 To:user@flink.apache.org Subject:Re: Flink Streaming Job Tuning help I forgot to mention, we are consuming said records from AWS kinesis and writing out to S3. From: Senthil Kumar Date: Tuesday, May 12, 2020 at 10:47 AM To: "user@flink.apache.org" Subject: Flink Streaming Job Tuning help Hello Flink Community! We have a fairly intensive flink streaming application, processing 8-9 million records a minute, with each record being 10k. One of our steps is a keyBy operation. We are finding that flink lags seriously behind when we introduce the keyBy (presumably because of shuffle across the network). We are trying to tune it ourselves (size of nodes, memory, network buffers etc), but before we spend way too much time on this; would it be better to hire some “flink tuning expert” to get us through? If so what resources are recommended on this list? Cheers Kumar
Re: [ANNOUNCE] Apache Flink 1.9.3 released
Thanks Dian for the release work and thanks everyone involved. Best, Zhijiang -- From:Till Rohrmann Send Time:2020 Apr. 27 (Mon.) 15:13 To:Jingsong Li Cc:dev ; Leonard Xu ; Benchao Li ; Konstantin Knauf ; jincheng sun ; Hequn Cheng ; Dian Fu ; user ; user-zh ; Apache Announce List Subject:Re: [ANNOUNCE] Apache Flink 1.9.3 released Thanks Dian for being our release manager and thanks to everyone who helped making this release possible. Cheers, Till On Mon, Apr 27, 2020 at 3:26 AM Jingsong Li wrote: > Thanks Dian for managing this release! > > Best, > Jingsong Lee > > On Sun, Apr 26, 2020 at 7:17 PM Jark Wu wrote: > >> Thanks Dian for being the release manager and thanks all who make this >> possible. >> >> Best, >> Jark >> >> On Sun, 26 Apr 2020 at 18:06, Leonard Xu wrote: >> >> > Thanks Dian for the release and being the release manager ! >> > >> > Best, >> > Leonard Xu >> > >> > >> > 在 2020年4月26日,17:58,Benchao Li 写道: >> > >> > Thanks Dian for the effort, and all who make this release possible. >> Great >> > work! >> > >> > Konstantin Knauf 于2020年4月26日周日 下午5:21写道: >> > >> >> Thanks for managing this release! >> >> >> >> On Sun, Apr 26, 2020 at 3:58 AM jincheng sun > > >> >> wrote: >> >> >> >>> Thanks for your great job, Dian! >> >>> >> >>> Best, >> >>> Jincheng >> >>> >> >>> >> >>> Hequn Cheng 于2020年4月25日周六 下午8:30写道: >> >>> >> >>>> @Dian, thanks a lot for the release and for being the release >> manager. >> >>>> Also thanks to everyone who made this release possible! >> >>>> >> >>>> Best, >> >>>> Hequn >> >>>> >> >>>> On Sat, Apr 25, 2020 at 7:57 PM Dian Fu wrote: >> >>>> >> >>>>> Hi everyone, >> >>>>> >> >>>>> The Apache Flink community is very happy to announce the release of >> >>>>> Apache Flink 1.9.3, which is the third bugfix release for the >> Apache Flink >> >>>>> 1.9 series. >> >>>>> >> >>>>> Apache Flink(r) is an open-source stream processing framework for >> >>>>> distributed, high-performing, always-available, and accurate data >> streaming >> >>>>> applications. >> >>>>> >> >>>>> The release is available for download at: >> >>>>> https://flink.apache.org/downloads.html >> >>>>> >> >>>>> Please check out the release blog post for an overview of the >> >>>>> improvements for this bugfix release: >> >>>>> https://flink.apache.org/news/2020/04/24/release-1.9.3.html >> >>>>> >> >>>>> The full release notes are available in Jira: >> >>>>> https://issues.apache.org/jira/projects/FLINK/versions/12346867 >> >>>>> >> >>>>> We would like to thank all contributors of the Apache Flink >> community >> >>>>> who made this release possible! >> >>>>> Also great thanks to @Jincheng for helping finalize this release. >> >>>>> >> >>>>> Regards, >> >>>>> Dian >> >>>>> >> >>>> >> >> >> >> -- >> >> Konstantin Knauf | Head of Product >> >> +49 160 91394525 >> >> >> >> Follow us @VervericaData Ververica <https://www.ververica.com/> >> >> >> >> -- >> >> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >> >> Conference >> >> Stream Processing | Event Driven | Real Time >> >> -- >> >> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >> >> -- >> >> Ververica GmbH >> >> Registered at Amtsgericht Charlottenburg: HRB 158244 B >> >> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji >> >> (Tony) Cheng >> >> >> > >> > >> > -- >> > >> > Benchao Li >> > School of Electronics Engineering and Computer Science, Peking >> University >> > Tel:+86-15650713730 >> > Email: libenc...@gmail.com; libenc...@pku.edu.cn >> > >> > >> > >> > > > -- > Best, Jingsong Lee >
Re: [ANNOUNCE] Apache Flink 1.9.3 released
Thanks Dian for the release work and thanks everyone involved. Best, Zhijiang -- From:Till Rohrmann Send Time:2020 Apr. 27 (Mon.) 15:13 To:Jingsong Li Cc:dev ; Leonard Xu ; Benchao Li ; Konstantin Knauf ; jincheng sun ; Hequn Cheng ; Dian Fu ; user ; user-zh ; Apache Announce List Subject:Re: [ANNOUNCE] Apache Flink 1.9.3 released Thanks Dian for being our release manager and thanks to everyone who helped making this release possible. Cheers, Till On Mon, Apr 27, 2020 at 3:26 AM Jingsong Li wrote: > Thanks Dian for managing this release! > > Best, > Jingsong Lee > > On Sun, Apr 26, 2020 at 7:17 PM Jark Wu wrote: > >> Thanks Dian for being the release manager and thanks all who make this >> possible. >> >> Best, >> Jark >> >> On Sun, 26 Apr 2020 at 18:06, Leonard Xu wrote: >> >> > Thanks Dian for the release and being the release manager ! >> > >> > Best, >> > Leonard Xu >> > >> > >> > 在 2020年4月26日,17:58,Benchao Li 写道: >> > >> > Thanks Dian for the effort, and all who make this release possible. >> Great >> > work! >> > >> > Konstantin Knauf 于2020年4月26日周日 下午5:21写道: >> > >> >> Thanks for managing this release! >> >> >> >> On Sun, Apr 26, 2020 at 3:58 AM jincheng sun > > >> >> wrote: >> >> >> >>> Thanks for your great job, Dian! >> >>> >> >>> Best, >> >>> Jincheng >> >>> >> >>> >> >>> Hequn Cheng 于2020年4月25日周六 下午8:30写道: >> >>> >> >>>> @Dian, thanks a lot for the release and for being the release >> manager. >> >>>> Also thanks to everyone who made this release possible! >> >>>> >> >>>> Best, >> >>>> Hequn >> >>>> >> >>>> On Sat, Apr 25, 2020 at 7:57 PM Dian Fu wrote: >> >>>> >> >>>>> Hi everyone, >> >>>>> >> >>>>> The Apache Flink community is very happy to announce the release of >> >>>>> Apache Flink 1.9.3, which is the third bugfix release for the >> Apache Flink >> >>>>> 1.9 series. >> >>>>> >> >>>>> Apache Flink(r) is an open-source stream processing framework for >> >>>>> distributed, high-performing, always-available, and accurate data >> streaming >> >>>>> applications. >> >>>>> >> >>>>> The release is available for download at: >> >>>>> https://flink.apache.org/downloads.html >> >>>>> >> >>>>> Please check out the release blog post for an overview of the >> >>>>> improvements for this bugfix release: >> >>>>> https://flink.apache.org/news/2020/04/24/release-1.9.3.html >> >>>>> >> >>>>> The full release notes are available in Jira: >> >>>>> https://issues.apache.org/jira/projects/FLINK/versions/12346867 >> >>>>> >> >>>>> We would like to thank all contributors of the Apache Flink >> community >> >>>>> who made this release possible! >> >>>>> Also great thanks to @Jincheng for helping finalize this release. >> >>>>> >> >>>>> Regards, >> >>>>> Dian >> >>>>> >> >>>> >> >> >> >> -- >> >> Konstantin Knauf | Head of Product >> >> +49 160 91394525 >> >> >> >> Follow us @VervericaData Ververica <https://www.ververica.com/> >> >> >> >> -- >> >> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >> >> Conference >> >> Stream Processing | Event Driven | Real Time >> >> -- >> >> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >> >> -- >> >> Ververica GmbH >> >> Registered at Amtsgericht Charlottenburg: HRB 158244 B >> >> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji >> >> (Tony) Cheng >> >> >> > >> > >> > -- >> > >> > Benchao Li >> > School of Electronics Engineering and Computer Science, Peking >> University >> > Tel:+86-15650713730 >> > Email: libenc...@gmail.com; libenc...@pku.edu.cn >> > >> > >> > >> > > > -- > Best, Jingsong Lee >
Re: [ANNOUNCE] Apache Flink Stateful Functions 2.0.0 released
Great work! Thanks Gordon for the continuous efforts for enhancing stateful functions and the efficient release! Wish stateful functions becoming more and more popular in users. Best, Zhijiang -- From:Yun Tang Send Time:2020 Apr. 9 (Thu.) 00:17 To:Till Rohrmann ; dev Cc:Oytun Tez ; user Subject:Re: [ANNOUNCE] Apache Flink Stateful Functions 2.0.0 released Excited to see the stateful functions release! Thanks for the great work of manager Gordon and everyone who ever contributed to this. Best Yun Tang From: Till Rohrmann Sent: Wednesday, April 8, 2020 14:30 To: dev Cc: Oytun Tez ; user Subject: Re: [ANNOUNCE] Apache Flink Stateful Functions 2.0.0 released Great news! Thanks a lot for being our release manager Gordon and to everyone who helped with the release. Cheers, Till On Wed, Apr 8, 2020 at 3:57 AM Congxian Qiu mailto:qcx978132...@gmail.com>> wrote: Thanks a lot for the release and your great job, Gordon! Also thanks to everyone who made this release possible! Best, Congxian Oytun Tez mailto:oy...@motaword.com>> 于2020年4月8日周三 上午2:55写道: > I should also add, I couldn't agree more with this sentence in the release > article: "state access/updates and messaging need to be integrated." > > This is something we strictly enforce in our Flink case, where we do not > refer to anything external for storage, use Flink as our DB. > > > > -- > > [image: MotaWord] > Oytun Tez > M O T A W O R D | CTO & Co-Founder > oy...@motaword.com<mailto:oy...@motaword.com> > > <https://www.motaword.com/blog> > > > On Tue, Apr 7, 2020 at 12:26 PM Oytun Tez > mailto:oy...@motaword.com>> wrote: > >> Great news! Thank you all. >> >> On Tue, Apr 7, 2020 at 12:23 PM Marta Paes Moreira >> mailto:ma...@ververica.com>> >> wrote: >> >>> Thank you for managing the release, Gordon — you did a tremendous job! >>> And to everyone else who worked on pushing it through. >>> >>> Really excited about the new use cases that StateFun 2.0 unlocks for >>> Flink users and beyond! >>> >>> >>> Marta >>> >>> On Tue, Apr 7, 2020 at 4:47 PM Hequn Cheng >>> mailto:he...@apache.org>> wrote: >>> >>>> Thanks a lot for the release and your great job, Gordon! >>>> Also thanks to everyone who made this release possible! >>>> >>>> Best, >>>> Hequn >>>> >>>> On Tue, Apr 7, 2020 at 8:58 PM Tzu-Li (Gordon) Tai >>>> mailto:tzuli...@apache.org>> >>>> wrote: >>>> >>>>> The Apache Flink community is very happy to announce the release of >>>>> Apache Flink Stateful Functions 2.0.0. >>>>> >>>>> Stateful Functions is an API that simplifies building distributed >>>>> stateful applications. >>>>> It's based on functions with persistent state that can interact >>>>> dynamically with strong consistency guarantees. >>>>> >>>>> Please check out the release blog post for an overview of the release: >>>>> https://flink.apache.org/news/2020/04/07/release-statefun-2.0.0.html >>>>> >>>>> The release is available for download at: >>>>> https://flink.apache.org/downloads.html >>>>> >>>>> Maven artifacts for Stateful Functions can be found at: >>>>> https://search.maven.org/search?q=g:org.apache.flink%20statefun >>>>> >>>>> Python SDK for Stateful Functions published to the PyPI index can be >>>>> found at: >>>>> https://pypi.org/project/apache-flink-statefun/ >>>>> >>>>> Official Docker image for building Stateful Functions applications is >>>>> currently being published to Docker Hub. >>>>> Dockerfiles for this release can be found at: >>>>> https://github.com/apache/flink-statefun-docker/tree/master/2.0.0 >>>>> Progress for creating the Docker Hub repository can be tracked at: >>>>> https://github.com/docker-library/official-images/pull/7749 >>>>> >>>>> The full release notes are available in Jira: >>>>> >>>>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12346878 >>>>> >>>>> We would like to thank all contributors of the Apache Flink community >>>>> who made this release possible! >>>>> >>>>> Cheers, >>>>> Gordon >>>>> >>>> -- >> -- >> >> [image: MotaWord] >> Oytun Tez >> M O T A W O R D | CTO & Co-Founder >> oy...@motaword.com<mailto:oy...@motaword.com> >> >> <https://www.motaword.com/blog> >> >
Re: [ANNOUNCE] Flink on Zeppelin (Zeppelin 0.9 is released)
Thanks for the continuous efforts for engaging in Flink ecosystem Jeff! Glad to see the progressive achievement. Wish more users try it out in practice. Best, Zhijiang -- From:Dian Fu Send Time:2020 Mar. 31 (Tue.) 10:15 To:Jeff Zhang Cc:user ; dev Subject:Re: [ANNOUNCE] Flink on Zeppelin (Zeppelin 0.9 is released) Hi Jeff, Thanks for the great work and sharing it with the community! Very impressive and will try it out. Regards, Dian 在 2020年3月30日,下午9:16,Till Rohrmann 写道: This is great news Jeff! Thanks a lot for sharing it with the community. Looking forward trying Flink on Zeppelin out :-) Cheers, Till On Mon, Mar 30, 2020 at 2:47 PM Jeff Zhang wrote: Hi Folks, I am very excited to announce the integration work of flink on apache zeppelin notebook is completed. You can now run flink jobs via datastream api, table api, sql, pyflink in apache apache zeppelin notebook. Download it here http://zeppelin.apache.org/download.html), Here's some highlights of this work 1. Support 3 kind of execution mode: local, remote, yarn 2. Support multiple languages in one flink session: scala, python, sql 3. Support hive connector (reading from hive and writing to hive) 4. Dependency management 5. UDF support (scala, pyflink) 6. Support both batch sql and streaming sql For more details and usage instructions, you can refer following 4 blogs 1) Get started https://link.medium.com/oppqD6dIg5 2) Batch https://link.medium.com/3qumbwRIg5 3) Streaming https://link.medium.com/RBHa2lTIg5 4) Advanced usage https://link.medium.com/CAekyoXIg5 Welcome to use flink on zeppelin and give feedback and comments. -- Best Regards Jeff Zhang
Re: End to End Latency Tracking in flink
Hi Lu, Besides Congxian's replies, you can also get some further explanations from "https://flink.apache.org/2019/07/23/flink-network-stack-2.html#latency-tracking;. Best, Zhijiang -- From:Congxian Qiu Send Time:2020 Mar. 28 (Sat.) 11:49 To:Lu Niu Cc:user Subject:Re: End to End Latency Tracking in flink Hi As far as I know, the latency-tracking feature is for debugging usages, you can use it to debug, and disable it when running the job on production. From my side, use $current_processing - $event_time is something ok, but keep the things in mind: the event time may not be the time ingested in Flink. Best, Congxian Lu Niu 于2020年3月28日周六 上午6:25写道: Hi, I am looking for end to end latency monitoring of link job. Based on my study, I have two options: 1. flink provide a latency tracking feature. However, the documentation says it cannot show actual latency of business logic as it will bypass all operators. https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#latency-tracking Also, the feature can significantly impact the performance so I assume it's not for usage in production. What are users use the latency tracking for? Sounds like only back pressure could affect the latency. 2. I found another stackoverflow question on this. https://stackoverflow.com/questions/56578919/latency-monitoring-in-flink-application . The answer suggestion to expose (current processing - the event time) after source and before sink for end to end latency monitoring. Is this a good solution? If not, What’s the official solution for end to end latency tracking? Thank you! Best Lu
Re: FlinkCEP - Detect absence of a certain event
Hi Humberto, I guess Fuji is familiar with Flink CEP and he can answer your proposed question. I already cc him. Best, Zhijiang -- From:Humberto Rodriguez Avila Send Time:2020 Mar. 18 (Wed.) 17:31 To:user Subject:FlinkCEP - Detect absence of a certain event In the documentation of FlinkCEP, I found that I can enforce that a particular event doesn't occur between two other events using notFollowedBy or notNext. However, I was wondering If I could detect the absence of a certain event after a time X. For example, if an event A is not followed by another event A within 10 seconds, fire an alert or do something. Could be possible to define a FlinkCEP pattern to capture that situation? Thanks in advance, Humberto
Re: How do I get the outPoolUsage value inside my own stream operator?
Hi Felipe, I checked the code path, and the metric of outPoolUsage is under the following layer: TaskMetricGroup -> TaskIOMetricGroup -> "buffers" group -> "outPoolUsage". It seems that you missed the `TaskIOMetricGroup` from below samples. You can get it from TaskMetricGroup. Hope it solve your problem. Best, Zhijiang -- From:Felipe Gutierrez Send Time:2020 Mar. 17 (Tue.) 17:50 To:user Subject:Re: How do I get the outPoolUsage value inside my own stream operator? Hi, just for the record. I am collecting the values of outPoolUsage using the piece of code below inside my stream operator OperatorMetricGroup operatorMetricGroup = (OperatorMetricGroup) this.getMetricGroup(); TaskMetricGroup taskMetricGroup = operatorMetricGroup.parent(); MetricGroup metricGroup = taskMetricGroup.getGroup("buffers"); Gauge gauge = (Gauge) metricGroup.getMetric("outPoolUsage"); if (gauge != null && gauge.getValue() != null) { float outPoolUsage = gauge.getValue().floatValue(); this.outPoolUsageHistogram.update((long) (outPoolUsage * 100)); } -- -- Felipe Gutierrez -- skype: felipe.o.gutierrez -- https://felipeogutierrez.blogspot.com On Mon, Mar 16, 2020 at 5:17 PM Felipe Gutierrez wrote: > > Hi community, > > I have built my own operator (not a UDF) and I want to collect the > metrics of "outPoolUsage" inside it. How do I do it assuming that I > have to do some modifications in the source code? > > I know that the Gouge comes from > flink-runtime/org.apache.flink.runtime.io.network.metrics.OutputBufferPoolUsageGauge.java. > Inside of my operator MyAbstractUdfStreamOperator I can get the > "MetricGroup metricGroup = this.getMetricGroup()". > Then I implemented the "Gauge gauge = (Gauge) > metricGroup.getMetric("outPoolUsage");" but it returns null all the > time. Even when I click on the Backpressure UI Interface. > > Thanks, > Felipe > -- > -- Felipe Gutierrez > -- skype: felipe.o.gutierrez > -- https://felipeogutierrez.blogspot.com
Re: Help me understand this Exception
Agree with Gordon's below explanation! Besides that, maybe you can also check the job master's log which might probably show the specific exception to cause this failure. I was thinking whether it is necessary to improve ExceptionInChainedOperatorException to also provide the message from the wrapped real exception, then users can easily get the root cause directly, not only for the current message "Could not forward element to next operator". Best, Zhijiang -- From:Tzu-Li (Gordon) Tai Send Time:2020 Mar. 18 (Wed.) 00:01 To:aj Cc:user Subject:Re: Help me understand this Exception Hi, The exception stack you posted simply means that the next operator in the chain failed to process the output watermark. There should be another exception, which would explain why some operator was closed / failed and eventually leading to the above exception. That would provide more insight to exactly why your job is failing. Cheers, Gordon On Tue, Mar 17, 2020 at 11:27 PM aj wrote: Hi, I am running a streaming job with generating watermark like this : public static class SessionAssigner implements AssignerWithPunctuatedWatermarks { @Override public long extractTimestamp(GenericRecord record, long previousElementTimestamp) { long timestamp = (long) record.get("event_ts"); LOGGER.info("timestamp", timestamp); return timestamp; } @Override public Watermark checkAndGetNextWatermark(GenericRecord record, long extractedTimestamp) { // simply emit a watermark with every event LOGGER.info("extractedTimestamp ", extractedTimestamp); return new Watermark(extractedTimestamp); } } Please help me understand what this exception means: java.lang.RuntimeException: Exception occurred while processing valve output watermark: at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamOneInputProcessor.java:216) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:169) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279) at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:707) at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:660) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at com.bounce.test.SessionProcessor$2.process(SessionProcessor.java:137) at com.bounce.test.SessionProcessor$2.process(SessionProcessor.java:116) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:50) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:457) at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWat
Re: Backpressure and 99th percentile latency
Thanks for the feedback Felipe! Regarding with your below concern: > Although I think it is better to use outPoolUsage and inPoolUsage according > to [1]. However, in your opinion is it better (faster to see) to use > inputQueueLength and > outputQueueLength or outPoolUsage and inPoolUsage to monitor a consequence of > backpressure? I mean, is there a faster way to show that the latency > increased due to > backpressure? Maybe if I create my own metric on my own operator or udf? The blog [1] already gave a great explanation of network stack for users in general and I also have the consensus on this issue. In particular,I can provide some further notes for your understanding. 1. It is not easy for users to get the precise total amount of input & output buffers, so we are not aware of whether the input & output buffers are exhausted and backpressure is happened from the metrics of input In contrast, we can know easily that input & outputPoolUsage should both reach 100% once backpressure happening. 2. The inputPoolUsage has the different semantic from release-1.9. Before 1.9 this metric is only for measuring the usage of floating buffers. But from 1.9 it also covers the usage of exclusive buffers. That means from 1.9 you might see the inputPoolUsage far from 100% when backpressure happens especially in the data skew case, but the inputFloatingBufferUsage should be 100% instead. 3. The latency marker provided by flink framework is emitted to a random channel (non-broadcast) every time because of performance concern. So it is hard to say whether it is measuring the heavy-load channel or lightweight channel in short while, especially in data skew scenario. 4. In theory the latency should be increased along with the trend of increased input and input All of them should be proportional to have the same trend in most cases. Best, Zhijiang -- From:Felipe Gutierrez Send Time:2020 Mar. 7 (Sat.) 18:49 To:Arvid Heise Cc:Zhijiang ; user Subject:Re: Backpressure and 99th percentile latency Hi, I implemented my own histogram metric on my operator to measure the latency. The latency is following the throughput at the same pace now. The figures are attached. Best, Felipe -- -- Felipe Gutierrez -- skype: felipe.o.gutierrez -- https://felipeogutierrez.blogspot.com On Fri, Mar 6, 2020 at 9:38 AM Felipe Gutierrez wrote: > > Thanks for the clarified answer @Zhijiang, I am gonna monitor > inputQueueLength and outputQueueLength to check some relation with > backpressure. Although I think it is better to use outPoolUsage and > inPoolUsage according to [1]. > However, in your opinion is it better (faster to see) to use > inputQueueLength and outputQueueLength or outPoolUsage and inPoolUsage > to monitor a consequence of backpressure? I mean, is there a faster > way to show that the latency increased due to backpressure? Maybe if I > create my own metric on my own operator or udf? > > Thanks @Arvid. In the end I want to be able to hold SLAs. For me, the > SLA would be the minimum latency. If I understood correctly, in the > time that I started to have backpressure the latency track metrics are > not a very precise indication of how much backpressure my application > is suffering. It just indicates that there is backpressure. > What would you say that is more less precise metric to tune the > throughput in order to not have backpressure. Something like, if I > have 50,000 milliseconds of latency and the normal latency is 150 > milliseconds, the throughput has to decrease by a factor of 50,000/150 > times. > > Just a note. I am not changing the throughput of the sources yet. I am > changing the size of the window without restart the job. But I guess > they have the same meaning for this question. > > [1] https://flink.apache.org/2019/07/23/flink-network-stack-2.html > > -- > -- Felipe Gutierrez > -- skype: felipe.o.gutierrez > -- https://felipeogutierrez.blogspot.com > > -- > -- Felipe Gutierrez > -- skype: felipe.o.gutierrez > -- https://felipeogutierrez.blogspot.com > > > On Fri, Mar 6, 2020 at 8:17 AM Arvid Heise wrote: > > > > Hi Felipe, > > > > latency under backpressure has to be carefully interpreted. Latency's > > semantics actually require that the data source is read in a timely manner; > > that is, there is no bottleneck in your pipeline where data is piling up. > > > > Thus, to measure latency in experiments you must ensure that the current > > throughput is below the maximum throughput, for example by gradually > > increasing the throughput with a generating source or through some > > throttles on the external source. Until you reach the maximum throughput, > > latencies semantics is exactly
Re: Backpressure and 99th percentile latency
Hi Felipe, Try to answer your below questions. > I understand that I am tracking latency every 10 seconds for each physical > instance operator. Is that right? Generally right. The latency marker is emitted from source and flow through all the intermediate operators until sink. This interval controls the emitting frequency of source. > The backpressure goes away but the 99th percentile latency is still the same. > Why? Does it have no relation with each other? The latency might be influenced by buffer flush timeout, network transport and load, etc. In the case of backpressure, there are huge in-flight data accumulated in network wire, so the latency marker is queuing to wait for network transport which might bring obvious delay. Even the latency marker can not be emitted in time from source because of no available buffers temporarily. After the backpressure goes away, that does not mean there are no accumulated buffers on network wire, just not reaching the degree of backpressure. So the latency marker still needs to be queued with accumulated buffers on the wire. And it might take some time to digest the previous accumulated buffers completed to relax the latency. I guess it might be your case. You can monitor the metrics of "inputQueueLength" and "outputQueueLength" for confirming the status. Anyway, the answer is yes that it has relation with backpressure, but might have some delay to see the changes obviously. >In the end I left the experiment for more than 2 hours running and only after >about 1,5 hour the 99th percentile latency got down to milliseconds. Is that >normal? I guess it is normal as mentioned above. After there are no accumulated buffers in network stack completely without backpressure, it should go down to milliseconds. Best, Zhijiang -- From:Felipe Gutierrez Send Time:2020 Mar. 6 (Fri.) 05:04 To:user Subject:Backpressure and 99th percentile latency Hi, I am a bit confused about the topic of tracking latency in Flink [1]. It says if I use the latency track I am measuring the Flink’s network stack but application code latencies also can influence it. For instance, if I am using the metrics.latency.granularity: operator (default) and setLatencyTrackingInterval(1). I understand that I am tracking latency every 10 seconds for each physical instance operator. Is that right? In my application, I am tracking the latency of all aggregators. When I have a high workload and I can see backpressure from the flink UI the 99th percentile latency is 13, 25, 21, and 25 seconds. Then I set my aggregator to have a larger window. The backpressure goes away but the 99th percentile latency is still the same. Why? Does it have no relation with each other? In the end I left the experiment for more than 2 hours running and only after about 1,5 hour the 99th percentile latency got down to milliseconds. Is that normal? Please see the figure attached. [1] https://flink.apache.org/2019/07/23/flink-network-stack-2.html#latency-tracking Thanks, Felipe -- -- Felipe Gutierrez -- skype: felipe.o.gutierrez -- https://felipeogutierrez.blogspot.com
Re: Question: Determining Total Recovery Time
Hi Morgan, Your idea is great and i am also interested in it. I think it is valuable for some users to estimate the maximum throughput capacity based on certain metrics or models. But I am not quite sure whether it is feasible to do that based on existing metrics, at-least exist some limitations as Arvid mentioned. If I understand correctly, the maximum throughput you want to measure is based on source emitting, that means some data are still buffered in mid topology and not processed yet. If so, we might refer to the metrics of `inputQueueLength` and `inPoolUsage` together. Note if the `inPoolUsage` reaches 100%, it does not mean all the buffers are already filled with data, and just mean all the available buffers are requested away. So `inputQueueLength` would be more precise to predict the available condition if we are aware of the total buffer amount. In general we can make use of these two together. We can find the largest value of above metrics from all the topology tasks, which probably hint the bottleneck in the whole view. Then we can estimate how many available buffers are left to hold more source emitting throughput. But there is a limitation if all the metrics are `0` in light-weight situation, which i mentioned above. So we can not estimate the saturation unless we increase the source emit. Wish good news sharing from you! Best, Zhijiang -- From:Arvid Heise Send Time:2020 Feb. 26 (Wed.) 22:29 To:Morgan Geldenhuys Cc:Timo Walther ; user Subject:Re: Question: Determining Total Recovery Time Hi Morgan, doing it in a very general way sure is challenging. I'd assume that your idea of using the buffer usage has some shortcomings (which I don't know), but I also think it's a good starting point. Have you checked the PoolUsage metrics? [1] You could use them to detect the bottleneck and then estimate the max capacity of the whole job. Btw, I'd be interested in results. We have the idea of adjustable buffer sizes and the insights would help us. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#default-shuffle-service On Tue, Feb 25, 2020 at 6:36 PM Morgan Geldenhuys wrote: Hi Arvid, Timo, Really appreciate the feedback. I have one final question on this topic and hope you dont mind me posing it to you directly. I posted the question earlier to the mailing list, but am looking at this more from an academic perspective as apposed to manually optimizing a specific job for a specific production environment. I do not know the flink internals well enough to determine if I can accomplish what I am looking for. For an experiment, I need to work out the Total Recovery Time (TRT). I define this as the time it takes the system to "catch up" to the current timestamp assuming event time processing after a node failure. I would like to follow a heuristic approach which is: job+environment agnostic, does not involve load testing, does not involve modifying the job or flink codebase, and relies solely on the metrics supplied. As far as I know (and correct me if im wrong): TRT = heartbeat.timeout + recoveryTime+ time to reprocess uncheckpointed messages + lag to catch up to current timestamp. In order to predict TRT, I need some kind of resource utilization model based on the current processing capacity and maximum processing limit, let me explain: Backpressure is essentially the point at which utilization has reached 100% for any particular streaming pipeline and means that the application has reached the max limit of messages that it can process per second. Lets consider an example: The system is running along perfectly fine under normal conditions, accessing external sources, and processing at an average of 100,000 messages/sec. Lets assume the maximum capacity is around 130,000 message/sec before back pressure starts propagating messages back up the stream. Therefore, utilization is at 0.76 (100K/130K). Great, but at present we dont know that 130,000 is the limit without load testing. For this example, is there a way of finding this maximum capacity (and hence the utilization) without pushing the system to its limit based solely on the average current throughput? Possibly by measuring the saturation of certain buffers between the operators? If this is possible, the unused utilization can be used to predict how fast a system would get back to the current timestamp. Again, its a heuristic so it doesn't have to be extremely precise. Any hints would be greatly appreciated. Thank you very much! Regards, Morgan. On 21.02.20 14:44, Arvid Heise wrote: Hi Morgan, sorry for the late reply. In general, that should work. You need to ensure that the same task is processing the same record though. Local copy needs to be state or else the last message would be lost upon restart. Performance will take a hit but if that is significa
Re: How JobManager and TaskManager find each other?
I guess you are indicating the data shuffle process among different task managers. While task manager(TM) registering itself to the job manager(JM), it also carries the infos of ip address and data port that it listens to. During the process of scheduling tasks, the upstream TM's address info(ip, port) would be covered inside the data structure of task deployment descriptor for respective downstream tasks. Then the downstream tasks can connect to the remote upstream TM to request data. In short words, JM knows all the addresses of TMs via registration, then these addresses would be sent to the required peers during task schedule and deployment. Best, Zhijiang -- From:KristoffSC Send Time:2020 Feb. 26 (Wed.) 19:39 To:user Subject:Re: How JobManager and TaskManager find each other? Thanks all for the answers, One more question though. In [1] we can see that task managers are talking with each other - sending data streams. How each task manager knows the address of other task managers? [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/concepts/runtime.html#job-managers-task-managers-clients -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer
Congrats Jingsong! Welcome on board! Best, Zhijiang -- From:Zhenghua Gao Send Time:2020 Feb. 21 (Fri.) 12:49 To:godfrey he Cc:dev ; user Subject:Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer Congrats Jingsong! Best Regards, Zhenghua Gao On Fri, Feb 21, 2020 at 11:59 AM godfrey he wrote: Congrats Jingsong! Well deserved. Best, godfrey Jeff Zhang 于2020年2月21日周五 上午11:49写道: Congratulations!Jingsong. You deserve it wenlong.lwl 于2020年2月21日周五 上午11:43写道: Congrats Jingsong! On Fri, 21 Feb 2020 at 11:41, Dian Fu wrote: > Congrats Jingsong! > > > 在 2020年2月21日,上午11:39,Jark Wu 写道: > > > > Congratulations Jingsong! Well deserved. > > > > Best, > > Jark > > > > On Fri, 21 Feb 2020 at 11:32, zoudan wrote: > > > >> Congratulations! Jingsong > >> > >> > >> Best, > >> Dan Zou > >> > > -- Best Regards Jeff Zhang
Re: Re: The mapping relationship between Checkpoint subtask id and Task subtask id
BTW, the FLIP-75 is going for the user experience of web UI. @Yadong Xiehave we already considered this issue to unify the ids in different parts in FLIP-75? Best, Zhijiang -- From:Zhijiang Send Time:2020 Feb. 14 (Fri.) 13:03 To:Jiayi Liao Cc:Yun Tang ; 杨东晓 ; user Subject:Re: Re: The mapping relationship between Checkpoint subtask id and Task subtask id Let's move the further discussion onto the jira page. I have not much time recently for working on this. If you want to take it, I can assign it to you and help review the PR if have time then. Or I can find other possible guys work on it future. Best, Zhijiang -- From:Jiayi Liao Send Time:2020 Feb. 14 (Fri.) 12:39 To:Zhijiang Cc:Yun Tang ; 杨东晓 ; user Subject:Re:Re: The mapping relationship between Checkpoint subtask id and Task subtask id Hi Zhijiang, It did confuses us when we’re tring to locate the unfinished subtask in Checkpoint UI last time. I’ve created an issue[1] for this. @杨东晓 Do you have time to work on this? [1]. https://issues.apache.org/jira/browse/FLINK-16051 Best Regards, Jiayi Liao At 2020-02-14 10:14:27, "Zhijiang" wrote: If the id is not consistent in different parts, maybe it is worth creating a jira ticket for better improving the user experience. If anyone wants to work on it, please ping me then I can give a hand. Best, Zhijiang -- From:Yun Tang Send Time:2020 Feb. 14 (Fri.) 10:52 To:杨东晓 ; user Subject:Re: The mapping relationship between Checkpoint subtask id and Task subtask id Hi Yes, you are right. Just simply use checkpoint subtask_id -1 would find the corresponding task subtask_id. Best Yun Tang From: 杨东晓 Sent: Friday, February 14, 2020 10:11 To: user Subject: The mapping relationship between Checkpoint subtask id and Task subtask id Hi, I'm trying to figure out the different end2end duration for each subtask id in checkpoint. In flink web ui I noticed for job task subtask id it start from 0 and for checkpoint subtask id it start from number 1,. How can I find out which checkpoint subtask id belongs to which job task subtask id, just simply use checkpoint subtask ID -1 will be ok? Thanks
Re: Re: The mapping relationship between Checkpoint subtask id and Task subtask id
Let's move the further discussion onto the jira page. I have not much time recently for working on this. If you want to take it, I can assign it to you and help review the PR if have time then. Or I can find other possible guys work on it future. Best, Zhijiang -- From:Jiayi Liao Send Time:2020 Feb. 14 (Fri.) 12:39 To:Zhijiang Cc:Yun Tang ; 杨东晓 ; user Subject:Re:Re: The mapping relationship between Checkpoint subtask id and Task subtask id Hi Zhijiang, It did confuses us when we’re tring to locate the unfinished subtask in Checkpoint UI last time. I’ve created an issue[1] for this. @杨东晓 Do you have time to work on this? [1]. https://issues.apache.org/jira/browse/FLINK-16051 Best Regards, Jiayi Liao At 2020-02-14 10:14:27, "Zhijiang" wrote: If the id is not consistent in different parts, maybe it is worth creating a jira ticket for better improving the user experience. If anyone wants to work on it, please ping me then I can give a hand. Best, Zhijiang -- From:Yun Tang Send Time:2020 Feb. 14 (Fri.) 10:52 To:杨东晓 ; user Subject:Re: The mapping relationship between Checkpoint subtask id and Task subtask id Hi Yes, you are right. Just simply use checkpoint subtask_id -1 would find the corresponding task subtask_id. Best Yun Tang From: 杨东晓 Sent: Friday, February 14, 2020 10:11 To: user Subject: The mapping relationship between Checkpoint subtask id and Task subtask id Hi, I'm trying to figure out the different end2end duration for each subtask id in checkpoint. In flink web ui I noticed for job task subtask id it start from 0 and for checkpoint subtask id it start from number 1,. How can I find out which checkpoint subtask id belongs to which job task subtask id, just simply use checkpoint subtask ID -1 will be ok? Thanks
Re: The mapping relationship between Checkpoint subtask id and Task subtask id
If the id is not consistent in different parts, maybe it is worth creating a jira ticket for better improving the user experience. If anyone wants to work on it, please ping me then I can give a hand. Best, Zhijiang -- From:Yun Tang Send Time:2020 Feb. 14 (Fri.) 10:52 To:杨东晓 ; user Subject:Re: The mapping relationship between Checkpoint subtask id and Task subtask id Hi Yes, you are right. Just simply use checkpoint subtask_id -1 would find the corresponding task subtask_id. Best Yun Tang From: 杨东晓 Sent: Friday, February 14, 2020 10:11 To: user Subject: The mapping relationship between Checkpoint subtask id and Task subtask id Hi, I'm trying to figure out the different end2end duration for each subtask id in checkpoint. In flink web ui I noticed for job task subtask id it start from 0 and for checkpoint subtask id it start from number 1,. How can I find out which checkpoint subtask id belongs to which job task subtask id, just simply use checkpoint subtask ID -1 will be ok? Thanks
Re: Encountered error while consuming partitions
Thanks for reporting this issue and I also agree with the below analysis. Actually we encountered the same issue several years ago and solved it also via the netty idle handler. Let's trace it via the ticket [1] as the following step. [1] https://issues.apache.org/jira/browse/FLINK-16030 Best, Zhijiang -- From:张光辉 Send Time:2020 Feb. 12 (Wed.) 22:19 To:Benchao Li Cc:刘建刚 ; user Subject:Re: Encountered error while consuming partitions Network can fail in many ways, sometimes pretty subtle (e.g. high ratio packet loss). The problem is that the long tcp connection between netty client and server is lost, then the server failed to send message to the client, and shut down the channel. The Netty Client does not know that the connection has been disconnected, so it has been waiting. To detect long tcp connection alive on netty client and server, we should have two ways: tcp keepalives and heartbeat. Tcp keepalives is 2 hours by default. When the error occurs, if you continue to wait for 2 hours, the netty client will trigger exception and enter failover recovery. If you want to detect long tcp connection quickly, netty provides IdleStateHandler which it use ping-pang mechanism. If netty client send continuously n ping message and receive no one pang message, then trigger exception.
Re: [ANNOUNCE] Apache Flink 1.10.0 released
Really great work and thanks everyone involved, especially for the release managers! Best, Zhijiang -- From:Kurt Young Send Time:2020 Feb. 13 (Thu.) 11:06 To:[None] Cc:user ; dev Subject:Re: [ANNOUNCE] Apache Flink 1.10.0 released Congratulations to everyone involved! Great thanks to Yu & Gary for being the release manager! Best, Kurt On Thu, Feb 13, 2020 at 10:06 AM Hequn Cheng wrote: Great thanks to Yu & Gary for being the release manager! Also thanks to everyone who made this release possible! Best, Hequn On Thu, Feb 13, 2020 at 9:54 AM Rong Rong wrote: Congratulations, a big thanks to the release managers for all the hard works!! -- Rong On Wed, Feb 12, 2020 at 5:52 PM Yang Wang wrote: Excellent work. Thanks Gary & Yu for being the release manager. Best, Yang Jeff Zhang 于2020年2月13日周四 上午9:36写道: Congratulations! Really appreciated your hard work. Yangze Guo 于2020年2月13日周四 上午9:29写道: Thanks, Gary & Yu. Congrats to everyone involved! Best, Yangze Guo On Thu, Feb 13, 2020 at 9:23 AM Jingsong Li wrote: > > Congratulations! Great work. > > Best, > Jingsong Lee > > On Wed, Feb 12, 2020 at 11:05 PM Leonard Xu wrote: >> >> Great news! >> Thanks everyone involved ! >> Thanks Gary and Yu for being the release manager ! >> >> Best, >> Leonard Xu >> >> 在 2020年2月12日,23:02,Stephan Ewen 写道: >> >> Congrats to us all. >> >> A big piece of work, nicely done. >> >> Let's hope that this helps our users make their existing use cases easier >> and also opens up new use cases. >> >> On Wed, Feb 12, 2020 at 3:31 PM 张光辉 wrote: >>> >>> Greet work. >>> >>> Congxian Qiu 于2020年2月12日周三 下午10:11写道: >>>> >>>> Great work. >>>> Thanks everyone involved. >>>> Thanks Gary and Yu for being the release manager >>>> >>>> >>>> Best, >>>> Congxian >>>> >>>> >>>> Jark Wu 于2020年2月12日周三 下午9:46写道: >>>>> >>>>> Congratulations to everyone involved! >>>>> Great thanks to Yu & Gary for being the release manager! >>>>> >>>>> Best, >>>>> Jark >>>>> >>>>> On Wed, 12 Feb 2020 at 21:42, Zhu Zhu wrote: >>>>>> >>>>>> Cheers! >>>>>> Thanks Gary and Yu for the great job as release managers. >>>>>> And thanks to everyone whose contribution makes the release possible! >>>>>> >>>>>> Thanks, >>>>>> Zhu Zhu >>>>>> >>>>>> Wyatt Chun 于2020年2月12日周三 下午9:36写道: >>>>>>> >>>>>>> Sounds great. Congrats & Thanks! >>>>>>> >>>>>>> On Wed, Feb 12, 2020 at 9:31 PM Yu Li wrote: >>>>>>>> >>>>>>>> The Apache Flink community is very happy to announce the release of >>>>>>>> Apache Flink 1.10.0, which is the latest major release. >>>>>>>> >>>>>>>> Apache Flink(r) is an open-source stream processing framework for >>>>>>>> distributed, high-performing, always-available, and accurate data >>>>>>>> streaming applications. >>>>>>>> >>>>>>>> The release is available for download at: >>>>>>>> https://flink.apache.org/downloads.html >>>>>>>> >>>>>>>> Please check out the release blog post for an overview of the >>>>>>>> improvements for this new major release: >>>>>>>> https://flink.apache.org/news/2020/02/11/release-1.10.0.html >>>>>>>> >>>>>>>> The full release notes are available in Jira: >>>>>>>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12345845 >>>>>>>> >>>>>>>> We would like to thank all contributors of the Apache Flink community >>>>>>>> who made this release possible! >>>>>>>> >>>>>>>> Cheers, >>>>>>>> Gary & Yu >> >> > > > -- > Best, Jingsong Lee -- Best Regards Jeff Zhang
Re: How can I find out which key group belongs to which subtask
Only chained operators can avoid record serialization cost, but the chaining mode can not support keyed stream. If you want to deploy downstream with upstream in the same task manager, it can avoid network shuffle cost which can still get performance benefits. As I know @Till Rohrmann has implemented some enhancements in scheduler layer to support such requirement in release-1.10. You can have a try when the rc candidate is ready. Best, Zhijiang -- From:杨东晓 Send Time:2020 Jan. 10 (Fri.) 02:10 To:Congxian Qiu Cc:user Subject:Re: How can I find out which key group belongs to which subtask Thanks Congxian! My purpose is not only make data goes into one same subtask but the specific subtask which belongs to same taskmanager with upstream record. The key idea is to avoid shuffling between taskmanagers. I think the KeyGroupRangeAssignment.java explained a lot about how to get keygroup and subtask context that can make that happen. Do you know if there are still serialization happening while data transferred between operator in same taskmanager? Thanks. Congxian Qiu 于2020年1月9日周四 上午1:55写道: Hi If you just want to make sure some key goes into the same subtask, does custom key selector[1] help? For the keygroup and subtask information, you can ref to KeyGroupRangeAssignment[2] for more info, and the max parallelism logic you can ref to doc[3] [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/api_concepts.html#define-keys-using-key-selector-functions [2] https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java [3] https://ci.apache.org/projects/flink/flink-docs-stable/dev/parallel.html#setting-the-maximum-parallelism Best, Congxian 杨东晓 于2020年1月9日周四 上午7:47写道: Hi , I'm trying to do some optimize about Flink 'keyby' processfunction. Is there any possible I can find out one key belongs to which key-group and essentially find out one key-group belongs to which subtask. The motivation I want to know that is we want to force the data records from upstream still goes to same taskmanager downstream subtask .Which means even if we use a keyedstream function we still want no cross jvm communication happened during run time. And if we can achieve that , can we also avoid the expensive cost for record serialization because data is only transferred in same taskmanager jvm instance? Thanks.
Re: How to verify if checkpoints are asynchronous or sync
The log way is simple for tracing and you can also grep some keywords to find your requirement messages to avoid skimming through the whole large logs. I am not quite sure what's your specific motivation for doing this. Besides the log way, you can also monitor the thread stack for confirming whether it is happening, but maybe it is not very convenient. Another possible way is via the checkpoint metrics which would record the sync/async duration time, maybe it can also satisfy your requirements. Best, Zhijiang -- From:RKandoji Send Time:2020 Jan. 8 (Wed.) 10:23 To:William C Cc:user Subject:Re: How to verify if checkpoints are asynchronous or sync Thanks for the reply. I will check and enable debug logs specifically for the class that contains this log. But in general logs are already too huge and I'm trying to suppress some of them, so wondering if there is any other way? Thanks, RKandoji On Tue, Jan 7, 2020 at 7:50 PM William C wrote: Can you enable debug log to check with that? regards. on 2020/1/8 6:36, RKandoji wrote: > But I'm curious if there is way to verify if the checkpoints are > happening asynchronously or synchronously. >
Re: Aggregating Movie Rental information in a DynamoDB table using DynamoDB streams and Flink
Hi Joe, Your requirement is the effective exactly-once for external sink. I think your option 4 with TwoPhaseCommitSinkFunction is the right way to go. Unfortunately I am not quite familiar with this part, so can not give you specific suggestions for using it, especially for your concern of storing checkpoint id. After the holiday some guys with rich experienced with it can provide you more professional ideas I guess. :) ATM you can refer to the simple implementation TwoPhaseCommitSinkFunctionTest#ContentDumpSinkFunction and complex one FlinkKafkaProducer for more insights. In addition, the StreamingFileSink also implements the exactly-once for sink. You might also refer to it to get some insights if possible. Best, Zhijiang -- From:Joe Hansen Send Time:2019 Dec. 26 (Thu.) 01:42 To:user Subject:Aggregating Movie Rental information in a DynamoDB table using DynamoDB streams and Flink Happy Holidays everyone! tl;dr: I need to aggregate movie rental information that is being stored in one DynamoDB table and store running total of the aggregation in another table. How do I ensure exactly-once aggregation. I currently store movie rental information in a DynamoDB table named MovieRentals: {movie_title, rental_period_in_days, order_date, rent_amount} We have millions of movie rentals happening on any given day. Our web application needs to display the aggregated rental amount for any given movie title. I am planning to use Flink to aggregate rental amounts by movie_title on the MovieRental DynamoDB stream and store the aggregated rental amounts in another DynamoDB table named RentalAmountsByMovie: {movie_title, total_rental_amount} How do I ensure that RentalAmountsByMovie amounts are accurate. i.e. How do I prevent results from any checkpoint from not updating the RentalAmountsByMovie table records more than once? 1) Do I need to store checkpoint ids in the RentalAmountsByMovie table and do conditional updates to handle the scenario described above? 2) I can possibly implement TwoPhaseCommitSinkFunction that talks to DynamoDB. However, according to Flink documentation the commit function can be called more than once and hence needs to be idempotent. So even this solution requires checkpoint-ids to be stored on the target store. 3) Another pattern seems to be storing the time-window aggregation results in the RentalAmountsByMovie table. And the webapp will have to compute the running total on the fly. I don't like this solution for its latency implications to the webapp. 4) May be I can use Flink's Queryable state feature. However, that feature seems to be in Beta: https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/queryable_state.html I imagine this is a very common aggregation use case. How do folks usually handle **updating aggregated results in Flink external sinks**? I appreciate any pointers. Happy to provide more details if needed. Thanks!
Re: Rewind offset to a previous position and ensure certainty.
If I understood correctly, different partitions of Kafka would be emitted by different source tasks with different watermark progress. And the Flink framework would align the different watermarks to only output the smallest watermark among them, so the events from slow partitions would not be discarded because the downstream operator would only see the watermark based on the slow partition atm. You can refer to [1] for some details. As for rewinding the offset of partition position, I guess it only happens in failure recovery case or you manually restart the job. Anyway all the topology tasks would be restarted and previous received watermarks are cleared. So it would also not discard the events in this case. Unless you can only rewind some source task to previous positions and keep other downstream tasks still running, it might have the issues you concern. But Flink can not support such operation/function atm. :) [1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/event_timestamps_watermarks.html Best, Zhijiang -- From:邢瑞斌 Send Time:2019 Dec. 25 (Wed.) 20:27 To:user-zh ; user Subject:Rewind offset to a previous position and ensure certainty. Hi, I'm trying to use Kafka as an event store and I want to create several partitions to improve read/write throughput. Occasionally I need to rewind offset to a previous position for recomputing. Since order isn't guaranteed among partitions in Kafka, does this mean that Flink won't produce the same results as before when rewind even if it uses event time? For example, consumer for a partition progresses extremely fast and raises watermark, so events from other partitions are discarded. Is there any ways to prevent this from happening? Thanks in advance! Ruibin
Re: Rewind offset to a previous position and ensure certainty.
If I understood correctly, different partitions of Kafka would be emitted by different source tasks with different watermark progress. And the Flink framework would align the different watermarks to only output the smallest watermark among them, so the events from slow partitions would not be discarded because the downstream operator would only see the watermark based on the slow partition atm. You can refer to [1] for some details. As for rewinding the offset of partition position, I guess it only happens in failure recovery case or you manually restart the job. Anyway all the topology tasks would be restarted and previous received watermarks are cleared. So it would also not discard the events in this case. Unless you can only rewind some source task to previous positions and keep other downstream tasks still running, it might have the issues you concern. But Flink can not support such operation/function atm. :) [1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/event_timestamps_watermarks.html Best, Zhijiang -- From:邢瑞斌 Send Time:2019 Dec. 25 (Wed.) 20:27 To:user-zh ; user Subject:Rewind offset to a previous position and ensure certainty. Hi, I'm trying to use Kafka as an event store and I want to create several partitions to improve read/write throughput. Occasionally I need to rewind offset to a previous position for recomputing. Since order isn't guaranteed among partitions in Kafka, does this mean that Flink won't produce the same results as before when rewind even if it uses event time? For example, consumer for a partition progresses extremely fast and raises watermark, so events from other partitions are discarded. Is there any ways to prevent this from happening? Thanks in advance! Ruibin
Re: Flink task node shut it self off.
If you use rocksDB state backend, it might consume extra native memory. Some resource framework cluster like yarn would kill the container if the memory usage exceeds some threshold. You can also double check whether it exists in your case. -- From:John Smith Send Time:2019 Dec. 25 (Wed.) 03:40 To:Zhijiang Cc:user Subject:Re: Flink task node shut it self off. The shutdown happened after the massive IO wait. I don't use any state Checkpoints are disk based... On Mon., Dec. 23, 2019, 1:42 a.m. Zhijiang, wrote: Hi John, Thanks for the positive comments of Flink usage. No matter at least-once or exactly-once you used for checkpoint, it would never lose one message during failure recovery. Unfortunatelly I can not visit the logs you posted. Generally speaking the longer internal checkpoint would mean replaying more source data after failure recovery. In my experience the 5 seconds interval for checkpoint is too frequently in my experience, and you might increase it to 1 minute or so. You can also monitor how long will the checkpoint finish in your application, then you can adjust the interval accordingly. Concerning of the node shutdown you mentioned, I am not quite sure whether it is relevant to your short checkpoint interval. Do you config to use heap state backend? The hs_err file really indicated that you job had encountered the memory issue, then it is better to somehow increase your task manager memory. But if you can analyze the dump hs_err file via some profiler tool for checking the memory usage, it might be more helpful to find the root cause. Best, Zhijiang -- From:John Smith Send Time:2019 Dec. 21 (Sat.) 05:26 To:user Subject:Flink task node shut it self off. Hi, using Flink 1.8.0 1st off I must say Flink resiliency is very impressive, we lost a node and never lost one message by using checkpoints and Kafka. Thanks! The cluster is a self hosted cluster and we use our own zookeeper cluster. We have... 3 zookeepers: 4 cpu, 8GB (each) 3 job nodes: 4 cpu, 8GB (each) 3 task nodes: 4 cpu, 8GB (each) The nodes also share GlusterFS for storing savepoints and checkpoints, GlusterFS is running on the same machines. Yesterday a node shut itself off we the following log messages... - Stopping TaskExecutor akka.tcp://fl...@xxx.xxx.xxx.73:34697/user/taskmanager_0. - Stop job leader service. - Stopping ZooKeeperLeaderRetrievalService /leader/resource_manager_lock. - Shutting down TaskExecutorLocalStateStoresManager. - Shutting down BLOB cache - Shutting down BLOB cache - removed file cache directory /tmp/flink-dist-cache-4b60d79b-1cef-4ffb-8837-3a9c9a205000 - I/O manager removed spill file directory /tmp/flink-io-c9d01b92-2809-4a55-8ab3-6920487da0ed - Shutting down the network environment and its components. Prior to the node shutting off we noticed massive IOWAIT of 140% and CPU load 1minute of 15. And we also got an hs_err file which sais we should increase the memory. I'm attaching the logs here: https://www.dropbox.com/sh/vp1ytpguimiayw7/AADviCPED47QEy_4rHsGI1Nya?dl=0 I wonder if my 5 second checkpointing is too much for gluster. Any thoughts?
Re: Flink task node shut it self off.
Hi John, Thanks for the positive comments of Flink usage. No matter at least-once or exactly-once you used for checkpoint, it would never lose one message during failure recovery. Unfortunatelly I can not visit the logs you posted. Generally speaking the longer internal checkpoint would mean replaying more source data after failure recovery. In my experience the 5 seconds interval for checkpoint is too frequently in my experience, and you might increase it to 1 minute or so. You can also monitor how long will the checkpoint finish in your application, then you can adjust the interval accordingly. Concerning of the node shutdown you mentioned, I am not quite sure whether it is relevant to your short checkpoint interval. Do you config to use heap state backend? The hs_err file really indicated that you job had encountered the memory issue, then it is better to somehow increase your task manager memory. But if you can analyze the dump hs_err file via some profiler tool for checking the memory usage, it might be more helpful to find the root cause. Best, Zhijiang -- From:John Smith Send Time:2019 Dec. 21 (Sat.) 05:26 To:user Subject:Flink task node shut it self off. Hi, using Flink 1.8.0 1st off I must say Flink resiliency is very impressive, we lost a node and never lost one message by using checkpoints and Kafka. Thanks! The cluster is a self hosted cluster and we use our own zookeeper cluster. We have... 3 zookeepers: 4 cpu, 8GB (each) 3 job nodes: 4 cpu, 8GB (each) 3 task nodes: 4 cpu, 8GB (each) The nodes also share GlusterFS for storing savepoints and checkpoints, GlusterFS is running on the same machines. Yesterday a node shut itself off we the following log messages... - Stopping TaskExecutor akka.tcp://fl...@xxx.xxx.xxx.73:34697/user/taskmanager_0. - Stop job leader service. - Stopping ZooKeeperLeaderRetrievalService /leader/resource_manager_lock. - Shutting down TaskExecutorLocalStateStoresManager. - Shutting down BLOB cache - Shutting down BLOB cache - removed file cache directory /tmp/flink-dist-cache-4b60d79b-1cef-4ffb-8837-3a9c9a205000 - I/O manager removed spill file directory /tmp/flink-io-c9d01b92-2809-4a55-8ab3-6920487da0ed - Shutting down the network environment and its components. Prior to the node shutting off we noticed massive IOWAIT of 140% and CPU load 1minute of 15. And we also got an hs_err file which sais we should increase the memory. I'm attaching the logs here: https://www.dropbox.com/sh/vp1ytpguimiayw7/AADviCPED47QEy_4rHsGI1Nya?dl=0 I wonder if my 5 second checkpointing is too much for gluster. Any thoughts?
Re: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?
The hint of mmap usage below is really helpful to locate this problem. I forgot this biggest change for batch job in release-1.9. The blocking type option can be set to `file` as Piotr suggested to behave similar as before. I think it can solve your problem. -- From:Hailu, Andreas Send Time:2019 Nov. 21 (Thu.) 23:37 To:Piotr Nowojski Cc:Zhijiang ; user@flink.apache.org Subject:RE: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1? Thanks, Piotr. We’ll rerun our apps today with this and get back to you. // ah From: Piotr Nowojski On Behalf Of Piotr Nowojski Sent: Thursday, November 21, 2019 10:14 AM To: Hailu, Andreas [Engineering] Cc: Zhijiang ; user@flink.apache.org Subject: Re: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1? Hi, I would suspect this: https://issues.apache.org/jira/browse/FLINK-12070 To be the source of the problems. There seems to be a hidden configuration option that avoids using memory mapped files: taskmanager.network.bounded-blocking-subpartition-type: file Could you test if helps? Piotrek On 21 Nov 2019, at 15:22, Hailu, Andreas wrote: Hi Zhijiang, I looked into the container logs for the failure, and didn’t see any specific OutOfMemory errors before it was killed. I ran the application using the same config this morning on 1.6.4, and it went through successfully. I took a snapshot of the memory usage from the dashboard and can send it to you if you like for reference. What stands out to me as suspicious is that on 1.9.1, the application is using nearly 6GB of Mapped memory before it dies, while 1.6.4 uses 0 throughout its runtime and succeeds. The JVM heap memory itself never exceeds its capacity, peaking at 6.65GB, so it sounds like the problem lies somewhere in the changes around mapped memory. // ah From: Zhijiang Sent: Wednesday, November 20, 2019 11:32 PM To: Hailu, Andreas [Engineering] ; user@flink.apache.org Subject: Re: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1? Hi Andreas, You are running a batch job, so there should be no native memory used by rocked state backend. Then I guess it is either heap memory or direct memory over used. The heap managed memory is mainly used by batch operators and direct memory is used by network shuffle. Can you further check whether there are any logs to indicate HeapOutOfMemory or DirectOutOfMemory before killed? If the used memory exceeds the JVM configuration, it should throw that error. Then we can further narrow down the scope. I can not remember the changes of memory issues for managed memory or network stack, especially it really spans several releases. Best, Zhijiang -- From:Hailu, Andreas Send Time:2019 Nov. 21 (Thu.) 01:03 To:user@flink.apache.org Subject:RE: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1? Going through the release notes today - we tried fiddling with the taskmanager.memory.fraction option, going as low as 0.1 with unfortunately no success. It still leads to the container running beyond physical memory limits. // ah From: Hailu, Andreas [Engineering] Sent: Tuesday, November 19, 2019 6:01 PM To: 'user@flink.apache.org' Subject: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1? Hi, We’re in the middle of testing the upgrade of our data processing flows from Flink 1.6.4 to 1.9.1. We’re seeing that flows which were running just fine on 1.6.4 now fail on 1.9.1 with the same application resources and input data size. It seems that there have been some changes around how the data is sorted prior to being fed to the CoGroup operator - this is the error that we encounter: Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259) ... 15 more Caused by: java.lang.Exception: The data preparation for task 'CoGroup (Dataset | Merge | NONE)' , caused an error: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Lost connection to task manager 'd73996-213.dc.gs.com/10.47.226.218:46003'. This indicates that the remote task manager was lost. at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) ... 1 more Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Lost connection to task manager 'd73996-213.dc.gs.com/10.47.226.218:46003'. This indicates that the remote task manager was lost
Re: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?
Hi Andreas, You are running a batch job, so there should be no native memory used by rocked state backend. Then I guess it is either heap memory or direct memory over used. The heap managed memory is mainly used by batch operators and direct memory is used by network shuffle. Can you further check whether there are any logs to indicate HeapOutOfMemory or DirectOutOfMemory before killed? If the used memory exceeds the JVM configuration, it should throw that error. Then we can further narrow down the scope. I can not remember the changes of memory issues for managed memory or network stack, especially it really spans several releases. Best, Zhijiang -- From:Hailu, Andreas Send Time:2019 Nov. 21 (Thu.) 01:03 To:user@flink.apache.org Subject:RE: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1? Going through the release notes today - we tried fiddling with the taskmanager.memory.fraction option, going as low as 0.1 with unfortunately no success. It still leads to the container running beyond physical memory limits. // ah From: Hailu, Andreas [Engineering] Sent: Tuesday, November 19, 2019 6:01 PM To: 'user@flink.apache.org' Subject: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1? Hi, We’re in the middle of testing the upgrade of our data processing flows from Flink 1.6.4 to 1.9.1. We’re seeing that flows which were running just fine on 1.6.4 now fail on 1.9.1 with the same application resources and input data size. It seems that there have been some changes around how the data is sorted prior to being fed to the CoGroup operator - this is the error that we encounter: Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259) ... 15 more Caused by: java.lang.Exception: The data preparation for task 'CoGroup (Dataset | Merge | NONE)' , caused an error: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Lost connection to task manager 'd73996-213.dc.gs.com/10.47.226.218:46003'. This indicates that the remote task manager was lost. at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) ... 1 more Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Lost connection to task manager 'd73996-213.dc.gs.com/10.47.226.218:46003'. This indicates that the remote task manager was lost. at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:650) at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1109) at org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:102) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:474) I drilled further down into the YARN app logs, and I found that the container was running out of physical memory: 2019-11-19 12:49:23,068 INFO org.apache.flink.yarn.YarnResourceManager - Closing TaskExecutor connection container_e42_1574076744505_9444_01_04 because: Container [pid=42774,containerID=container_e42_1574076744505_9444_01_04] is running beyond physical memory limits. Current usage: 12.0 GB of 12 GB physical memory used; 13.9 GB of 25.2 GB virtual memory used. Killing container. This is what leads my suspicions as this resourcing configuration worked just fine on 1.6.4 I’m working on getting heap dumps of these applications to try and get a better understanding of what’s causing the blowup in physical memory required myself, but it would be helpful if anyone knew what relevant changes have been made between these versions or where else I could look? There are some features in 1.9 that we’d like to use in our flows so getting this sorted out, no pun intended, is inhibiting us from doing so. Best, Andreas Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices
Re: How can I get the backpressure signals inside my function or operator?
You can refer to this document [1] for the rest API details. Actually the backpreesure uri refers to "/jobs/:jobid/vertices/:vertexid/backpressure". But I am not sure whether it is easy to get the jobid and vertexid. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/rest_api.html Best, Zhijiang -- From:Felipe Gutierrez Send Time:2019 Nov. 7 (Thu.) 00:06 To:Chesnay Schepler Cc:Zhijiang ; user Subject:Re: How can I get the backpressure signals inside my function or operator? If I can trigger the sample via rest API it is good for a POC. Then I can read from any in-memory storage using a separated thread within the operator. But what is the rest api that gives to me the ratio value from backpressure? Thanks -- -- Felipe Gutierrez -- skype: felipe.o.gutierrez -- https://felipeogutierrez.blogspot.com On Wed, Nov 6, 2019 at 4:55 PM Chesnay Schepler wrote: I don't think there is a truly sane way to do this. I could envision a separate application triggering samples via the REST API, writing the results into kafka which your operator can read. This is probably the most reasonable solution I can come up with. Any attempt at accessing the TaskExecutor or metrics from within the operator are inadvisable; you'd be encroaching into truly hacky territory. You could also do your own backpressure sampling within your operator (separate thread within the operator executing the same sampling logic), but I don't know how easy it would be to re-use Flink code. On 06/11/2019 13:40, Felipe Gutierrez wrote: Does anyone know in which metric I can rely on to know if a given operator is activating the backpressure? Or how can I call the same java object that the Flink UI calls to give me the ratio of backpressure? Thanks, Felipe -- -- Felipe Gutierrez -- skype: felipe.o.gutierrez -- https://felipeogutierrez.blogspot.com On Tue, Nov 5, 2019 at 4:15 PM Felipe Gutierrez wrote: Hi Zhijiang, thanks for your reply. Yes, you understood correctly. The fact that I cannot get "Shuffle.Netty.Input.Buffers.inputQueueLength" on the operator might be because of the way Flink runtime architecture was designed. But I was wondering what kind of signal I can get. I guess some backpressure message I could get because backpressure works to slow down the upstream operators. For example, I can see the ratio per sub-task on the web interface [1]. It means the physical operators. Is there any message flowing backward that I can get? Is there anything that makes me able to not rely on some external storage? [1] https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/back_pressure.html#sampling-threads -- -- Felipe Gutierrez -- skype: felipe.o.gutierrez -- https://felipeogutierrez.blogspot.com On Tue, Nov 5, 2019 at 12:23 PM Zhijiang wrote: Hi Felipe, That is an interesting idea to control the upstream's output based on downstream's input. If I understood correctly, the preAggregate operator would trigger flush output while the reduce operator is idle/hungry. In contrast, the preAggregate would continue aggregating data in the case of back pressure. I think this requirement is valid, but unfortunately I guess you can not get the back pressure signal from the operator level. AIK only the upper task level can get the input/output state to decide whether to process or not. If you want to get the reduce's metric of `Shuffle.Netty.Input.Buffers.inputQueueLength` on preAggregate side, you might rely on some external metric reporter to query it if possible. Best, Zhijiang -- From:Felipe Gutierrez Send Time:2019 Nov. 5 (Tue.) 16:58 To:user Subject:How can I get the backpressure signals inside my function or operator? Hi all, let's say that I have a "source -> map .> preAggregrate -> keyBy -> reduce -> sink" job and the reducer is sending backpressure signals to the preAggregate, map and source operator. How do I get those signals inside my operator's implementation? I guess inside the function is not possible. But if I have my own operator implemented (preAggregate) can I get those backpressure signals? I want to get the messages "Shuffle.Netty.Input.Buffers.inputQueueLength" [1] on my preAggregate operator in order to decide when I stop the pre-aggregation and flush tuples or when I keep pre aggregating. It is something like the "credit based control on the network stack" [2]. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/metrics.html#default-shuffle-service [2] https://www.youtube.com/watch?v=AbqatHF3tZI Thanks! Felipe -- -- Felipe Gutierrez -- skype: felipe.o.gutierrez -- https://felipeogutierrez.blogspot.com
Re: What metrics can I see the root cause of "Buffer pool is destroyed" message?
Hi Felipe, "Buffer pool is destroyed" is mainly caused by canceling task. That means there are other tasks failure which would trigger canceling all the topology tasks by job master. So if you want to find the root cause, it is proper to check the job master log to find the first failure which triggers the following cancel operations. In addition, which flink version are you using? Best, Zhijiang -- From:Felipe Gutierrez Send Time:2019 Nov. 6 (Wed.) 19:12 To:user Subject:What metrics can I see the root cause of "Buffer pool is destroyed" message? Hi community, Looking at the code [1] it seems that it is related to not have availableMemorySegments anymore. I am looking at several metrics but it hasn't seemed to help me understand where I can measure the root cause of this error message. - flink_taskmanager_Status_Shuffle_Netty_AvailableMemorySegments does not seem to give me a related cause. - flink_taskmanager_job_task_Shuffle_Netty_Output_Buffers_inputQueueLength I see my reducer operator always with queue lenght equal 4. Pre-aggregate task sometimes goes to 3 but it goes only few times. - flink_taskmanager_job_task_Shuffle_Netty_Input_Buffers_outPoolUsage and flink_taskmanager_job_task_Shuffle_Netty_Input_Buffers_outputQueueLength shows my source task several times in 100%. But my error message comes from the pre-aggregate task. - flink_taskmanager_job_task_Shuffle_Netty_Output_numBuffersInLocalPerSecond DOES show the the pre-aggregate task is consuming a lot. But with which metric can I relate this to know in advance how much is a lot? [1] https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java#L265 Thanks for your suggestions and here is my stack trace: java.lang.RuntimeException: Buffer pool is destroyed. at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53) at org.apache.flink.streaming.examples.aggregate.WordCountPreAggregate$WordCountPreAggregateFunction.collect(WordCountPreAggregate.java:251) at org.apache.flink.streaming.api.operators.AbstractUdfStreamPreAggregateOperator.collect(AbstractUdfStreamPreAggregateOperator.java:84) at org.apache.flink.streaming.api.functions.aggregation.PreAggregateTriggerFunction.collect(PreAggregateTriggerFunction.java:49) at org.apache.flink.streaming.api.functions.aggregation.PreAggregateTriggerFunction.run(PreAggregateTriggerFunction.java:63) at java.util.TimerThread.mainLoop(Timer.java:555) at java.util.TimerThread.run(Timer.java:505) Caused by: java.lang.IllegalStateException: Buffer pool is destroyed. at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentFromGlobal(LocalBufferPool.java:264) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:240) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:215) at org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:182) at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103) at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.getBufferBuilder(ChannelSelectorRecordWriter.java:95) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:131) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:116) at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) -- -- Felipe Gutierrez -- skype: felipe.o.gutierrez -- https://felipeogutierrez.blogspot.com
Re: How can I get the backpressure signals inside my function or operator?
Hi Felipe, That is an interesting idea to control the upstream's output based on downstream's input. If I understood correctly, the preAggregate operator would trigger flush output while the reduce operator is idle/hungry. In contrast, the preAggregate would continue aggregating data in the case of back pressure. I think this requirement is valid, but unfortunately I guess you can not get the back pressure signal from the operator level. AIK only the upper task level can get the input/output state to decide whether to process or not. If you want to get the reduce's metric of `Shuffle.Netty.Input.Buffers.inputQueueLength` on preAggregate side, you might rely on some external metric reporter to query it if possible. Best, Zhijiang -- From:Felipe Gutierrez Send Time:2019 Nov. 5 (Tue.) 16:58 To:user Subject:How can I get the backpressure signals inside my function or operator? Hi all, let's say that I have a "source -> map .> preAggregrate -> keyBy -> reduce -> sink" job and the reducer is sending backpressure signals to the preAggregate, map and source operator. How do I get those signals inside my operator's implementation? I guess inside the function is not possible. But if I have my own operator implemented (preAggregate) can I get those backpressure signals? I want to get the messages "Shuffle.Netty.Input.Buffers.inputQueueLength" [1] on my preAggregate operator in order to decide when I stop the pre-aggregation and flush tuples or when I keep pre aggregating. It is something like the "credit based control on the network stack" [2]. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/metrics.html#default-shuffle-service [2] https://www.youtube.com/watch?v=AbqatHF3tZI Thanks! Felipe -- -- Felipe Gutierrez -- skype: felipe.o.gutierrez -- https://felipeogutierrez.blogspot.com
Re: [ANNOUNCE] Zili Chen becomes a Flink committer
Congratulations Zili! -- From:Becket Qin Send Time:2019年9月12日(星期四) 03:43 To:Paul Lam Cc:Rong Rong ; dev ; user Subject:Re: [ANNOUNCE] Zili Chen becomes a Flink committer Congrats, Zili! On Thu, Sep 12, 2019 at 9:39 AM Paul Lam wrote: Congratulations Zili! Best, Paul Lam 在 2019年9月12日,09:34,Rong Rong 写道: Congratulations Zili! -- Rong On Wed, Sep 11, 2019 at 6:26 PM Hequn Cheng wrote: Congratulations! Best, Hequn On Thu, Sep 12, 2019 at 9:24 AM Jark Wu wrote: Congratulations Zili! Best, Jark On Wed, 11 Sep 2019 at 23:06, wrote: > Congratulations, Zili. > > > > Best, > > Xingcan > > > > *From:* SHI Xiaogang > *Sent:* Wednesday, September 11, 2019 7:43 AM > *To:* Guowei Ma > *Cc:* Fabian Hueske ; Biao Liu ; > Oytun Tez ; bupt_ljy ; dev < > d...@flink.apache.org>; user ; Till Rohrmann < > trohrm...@apache.org> > *Subject:* Re: [ANNOUNCE] Zili Chen becomes a Flink committer > > > > Congratulations! > > > > Regards, > > Xiaogang > > > > Guowei Ma 于2019年9月11日周三 下午7:07写道: > > Congratulations Zili ! > > > Best, > > Guowei > > > > > > Fabian Hueske 于2019年9月11日周三 下午7:02写道: > > Congrats Zili Chen :-) > > > > Cheers, Fabian > > > > Am Mi., 11. Sept. 2019 um 12:48 Uhr schrieb Biao Liu : > > Congrats Zili! > > > > Thanks, > > Biao /'bɪ.aʊ/ > > > > > > > > On Wed, 11 Sep 2019 at 18:43, Oytun Tez wrote: > > Congratulations! > > > > --- > > Oytun Tez > > > > *M O T A W O R D* > > *The World's Fastest Human Translation Platform.* > > oy...@motaword.com — www.motaword.com > > > > > > On Wed, Sep 11, 2019 at 6:36 AM bupt_ljy wrote: > > Congratulations! > > > > Best, > > Jiayi Liao > > > > Original Message > > *Sender:* Till Rohrmann > > *Recipient:* dev; user > > *Date:* Wednesday, Sep 11, 2019 17:22 > > *Subject:* [ANNOUNCE] Zili Chen becomes a Flink committer > > > > Hi everyone, > > > > I'm very happy to announce that Zili Chen (some of you might also know > him as Tison Kun) accepted the offer of the Flink PMC to become a committer > of the Flink project. > > > > Zili Chen has been an active community member for almost 16 months now. > He helped pushing the Flip-6 effort over the finish line, ported a lot of > legacy code tests, removed a good part of the legacy code, contributed > numerous fixes, is involved in the Flink's client API refactoring, drives > the refactoring of Flink's HighAvailabilityServices and much more. Zili > Chen also helped the community by PR reviews, reporting Flink issues, > answering user mails and being very active on the dev mailing list. > > > > Congratulations Zili Chen! > > > > Best, Till > > (on behalf of the Flink PMC) > >
Re: [ANNOUNCE] Andrey Zagrebin becomes a Flink committer
Congratulations Andrey, great work and well deserved! Best, Zhijiang -- From:Till Rohrmann Send Time:2019年8月14日(星期三) 15:26 To:dev ; user Subject:[ANNOUNCE] Andrey Zagrebin becomes a Flink committer Hi everyone, I'm very happy to announce that Andrey Zagrebin accepted the offer of the Flink PMC to become a committer of the Flink project. Andrey has been an active community member for more than 15 months. He has helped shaping numerous features such as State TTL, FRocksDB release, Shuffle service abstraction, FLIP-1, result partition management and various fixes/improvements. He's also frequently helping out on the user@f.a.o mailing lists. Congratulations Andrey! Best, Till (on behalf of the Flink PMC)
Re: Will broadcast stream affect performance because of the absence of operator chaining?
Hi paul, In theory broadcast operator could not be chained for all-to-all mode, and chain is only feasible for one-to-one mode like forward. If chain, the next operator could process the raw record emitted by head operator directly. But if not, the emitted record must be serialized into buffer which could be consumed by the dowstream op via network ornot. So the chain way has the best performance in theory compared to non-chain. In your case, if you could not bypass the requirements of broadcast, then you have to face the non-chain way and test whether the real performance is within your acception or not. If the performance is not reaching your requirements, we could further consider other improvements. Best, Zhijiang -- From:Piotr Nowojski Send Time:2019年8月6日(星期二) 14:55 To:黄兆鹏 Cc:user Subject:Re: Will broadcast stream affect performance because of the absence of operator chaining? Hi, No, I think you are right, I forgot about the broadcasting requirement. Piotrek On 6 Aug 2019, at 13:11, 黄兆鹏 wrote: Hi, Piotrek, I previously considered your first advice(use union record type), but I found that the schema would be only sent to one subtask of the operator(for example, operatorA), and other subtasks of the operator are not aware of it. In this case is there anything I have missed? Thank you! -- Original -- From: "Piotr Nowojski"; Date: Tue, Aug 6, 2019 06:57 PM To: "黄兆鹏"; Cc: "user"; Subject: Re: Will broadcast stream affect performance because of the absence of operator chaining? Hi, Have you measured the performance impact of braking the operator chain? This is a current limitation of Flink chaining, that if an operator has two inputs, it can be chained to something else (only one input operators are chained together). There are plans for the future to address this issue. As a workaround, besides what you have mentioned: - maybe your record type can be a union: type of Record or Schema (not Record AND Schema), and upstream operators (operatorA) could just ignore/forward the Schema. You wouldn’t need to send schema with every record. - another (ugly) solution, is to implement BroadcastStream input outside of Flink, but then you might have issues with checkpointing/watermarking and it just makes many things more complicated. Piotrek On 6 Aug 2019, at 10:50, 黄兆鹏 wrote: Hi Piotrek, Thanks for your reply, my broadcast stream just listen to the changes of the schema, and it's very infrequent and very lightweight. In fact there are two ways to solve my problem, the first one is a broadcast stream that listen to the change of the schema, and broadcast to every operator that will handle the data, just as I posted originally. DataStream: OperatorA -> OperatorB -> OperatorC ^ ^ ^ ||| BroadcastStream the second approach is that I have an operator that will join my data and schema together and send to the downstream operators: DataStream: MergeSchemaOperator -> OperatorA -> OperatorB -> OperatorC ^ | BroadcastStream The benefits of the first approach is that the flink job does not have to transfer the schema with the real data records among operators, because the schema will be broadcasted to each operator. But the disadvantage of the first approache is that it breaks the operator chain, so operators may not be executed in the same slot and gain worse performance. The second approach does not have the problem as the first one, but each message will carry its schema info among operators, it will cost about 2x for serialization and deserialization between operators. Is there a better workaround that all the operators could notice the schema change and at the same time not breaking the operator chaining? Thanks! -- Original -- From: "Piotr Nowojski"; Date: Tue, Aug 6, 2019 04:23 PM To: "黄兆鹏"; Cc: "user"; Subject: Re: Will broadcast stream affect performance because of the absence of operator chaining? Hi, Broadcasting will brake an operator chain. However my best guess is that Kafka source will be still a performance bottleneck in your job. Also Network exchanges add some measurable overhead only if your records are very lightweight and easy to process (for example if you are using RocksDB then you can just ignore network costs). Either way, you can just try this out. Pre populate your Kafka topic with some significant number of messages, run both jobs, compare the throughput and decide based on those results wether this is ok for you or not. Piotre
Re: [ANNOUNCE] Rong Rong becomes a Flink committer
Congratulations Rong! Best, Zhijiang -- From:Kurt Young Send Time:2019年7月11日(星期四) 22:54 To:Kostas Kloudas Cc:Jark Wu ; Fabian Hueske ; dev ; user Subject:Re: [ANNOUNCE] Rong Rong becomes a Flink committer Congratulations Rong! Best, Kurt On Thu, Jul 11, 2019 at 10:53 PM Kostas Kloudas wrote: Congratulations Rong! On Thu, Jul 11, 2019 at 4:40 PM Jark Wu wrote: Congratulations Rong Rong! Welcome on board! On Thu, 11 Jul 2019 at 22:25, Fabian Hueske wrote: Hi everyone, I'm very happy to announce that Rong Rong accepted the offer of the Flink PMC to become a committer of the Flink project. Rong has been contributing to Flink for many years, mainly working on SQL and Yarn security features. He's also frequently helping out on the user@f.a.o mailing lists. Congratulations Rong! Best, Fabian (on behalf of the Flink PMC)
Re: Maybe a flink bug. Job keeps in FAILING state
Thanks for opening this ticket and I would watch it. Flink does not handle OOM issue specially. I remembered we ever discussed the similar issue before but forgot the conclusion then or have other concerns for it. I am not sure whether it is worth to fix atm, maybe Till or Chesnay could give a final decision. Best, Zhijiang -- From:Joshua Fan Send Time:2019年6月25日(星期二) 11:10 To:zhijiang Cc:Chesnay Schepler ; user ; Till Rohrmann Subject:Re: Maybe a flink bug. Job keeps in FAILING state Hi Zhijiang Thank you for your analysis. I agree with it. The solution may be to let tm exit like you mentioned when any type of oom occurs, because the flink has no control on a tm when a oom occurs. I fired a jira before, https://issues.apache.org/jira/browse/FLINK-12889. Don't know it is worth to fix. Thank you all. Yours sincerely Joshua On Fri, Jun 21, 2019 at 5:32 PM zhijiang wrote: Thanks for the reminding @Chesnay Schepler . I just looked throught the related logs. Actually all the five "Source: ServiceLog" tasks are not in terminal state on JM view, the relevant processes are as follows: 1. The checkpoint in task causes OOM issue which would call `Task#failExternally` as a result, we could see the log "Attempting to fail task externally" in tm. 2. The source task would transform state from RUNNING to FAILED and then starts a canceler thread for canceling task, we could see log "Triggering cancellation of task" in tm. 3. When JM starts to cancel the source tasks, the rpc call `Task#cancelExecution` would find the task was already in FAILED state as above step 2, we could see log "Attempting to cancel task" in tm. At last all the five source tasks are not in terminal states from jm log, I guess the step 2 might not create canceler thread successfully, because the root failover was caused by OOM during creating native thread in step1, so it might exist possibilities that createing canceler thread is not successful as well in OOM case which is unstable. If so, the source task would not been interrupted at all, then it would not report to JM as well, but the state is already changed to FAILED before. For the other vertex tasks, it does not trigger `Task#failExternally` in step 1, and only receives the cancel rpc from JM in step 3. And I guess at this time later than the source period, the canceler thread could be created succesfully after some GCs, then these tasks could be canceled as reported to JM side. I think the key problem is under OOM case some behaviors are not within expectations, so it might bring problems. Maybe we should handle OOM error in extreme way like making TM exit to solve the potential issue. Best, Zhijiang -- From:Chesnay Schepler Send Time:2019年6月21日(星期五) 16:34 To:zhijiang ; Joshua Fan Cc:user ; Till Rohrmann Subject:Re: Maybe a flink bug. Job keeps in FAILING state The logs are attached to the initial mail. Echoing my thoughts from earlier: from the logs it looks as if the TM never even submits the terminal state RPC calls for several tasks to the JM. On 21/06/2019 10:30, zhijiang wrote: Hi Joshua, If the tasks(subtask 1/5,subtask 2/5,subtask 3/5,subtask 5/5) were really in CANCELED state on TM side, but in CANCELING state on JM side, then it might indicates the terminal state RPC was not received by JM. I am not sure whether the OOM would cause this issue happen resulting in unexpected behavior. In addition, you mentioned these tasks are still active after OOM and was called to cancel, so I am not sure what is the specific periods for your attached TM stack. I think it might provide help if you could provide corresponding TM log and JM log. From TM log it is easy to check the task final state. Best, Zhijiang -- From:Joshua Fan Send Time:2019年6月20日(星期四) 11:55 To:zhijiang Cc:user ; Till Rohrmann ; Chesnay Schepler Subject:Re: Maybe a flink bug. Job keeps in FAILING state zhijiang I did not capture the job ui, the topology is in FAILING state, but the persistentbolt subtasks as can be seen in the picture attached in first mail was all canceled, and the parsebolt subtasks as described before had one subtask FAILED, other subtasks CANCELED, but the source subtasks had one subtask(subtask 4/5) CANCELED, and other subtasks(subtask 1/5,subtask 2/5,subtask 3/5,subtask 5/5) CANCELING, not in a terminal state. The subtask status described above is in jm view, but in tm view, all of the source subtask was in FAILED, do not know why jm was not notify about this. As all of the failed status was triggered by a oom by the subtask can not create native thread when checkpointing, I also dumped the stack of the jvm, it shows the four subtasks(subtask 1/5,subtask 2/5,subtask 3/5,subtask 5/5) are still acti
Re: Maybe a flink bug. Job keeps in FAILING state
Thanks for the reminding @Chesnay Schepler . I just looked throught the related logs. Actually all the five "Source: ServiceLog" tasks are not in terminal state on JM view, the relevant processes are as follows: 1. The checkpoint in task causes OOM issue which would call `Task#failExternally` as a result, we could see the log "Attempting to fail task externally" in tm. 2. The source task would transform state from RUNNING to FAILED and then starts a canceler thread for canceling task, we could see log "Triggering cancellation of task" in tm. 3. When JM starts to cancel the source tasks, the rpc call `Task#cancelExecution` would find the task was already in FAILED state as above step 2, we could see log "Attempting to cancel task" in tm. At last all the five source tasks are not in terminal states from jm log, I guess the step 2 might not create canceler thread successfully, because the root failover was caused by OOM during creating native thread in step1, so it might exist possibilities that createing canceler thread is not successful as well in OOM case which is unstable. If so, the source task would not been interrupted at all, then it would not report to JM as well, but the state is already changed to FAILED before. For the other vertex tasks, it does not trigger `Task#failExternally` in step 1, and only receives the cancel rpc from JM in step 3. And I guess at this time later than the source period, the canceler thread could be created succesfully after some GCs, then these tasks could be canceled as reported to JM side. I think the key problem is under OOM case some behaviors are not within expectations, so it might bring problems. Maybe we should handle OOM error in extreme way like making TM exit to solve the potential issue. Best, Zhijiang -- From:Chesnay Schepler Send Time:2019年6月21日(星期五) 16:34 To:zhijiang ; Joshua Fan Cc:user ; Till Rohrmann Subject:Re: Maybe a flink bug. Job keeps in FAILING state The logs are attached to the initial mail. Echoing my thoughts from earlier: from the logs it looks as if the TM never even submits the terminal state RPC calls for several tasks to the JM. On 21/06/2019 10:30, zhijiang wrote: Hi Joshua, If the tasks(subtask 1/5,subtask 2/5,subtask 3/5,subtask 5/5) were really in CANCELED state on TM side, but in CANCELING state on JM side, then it might indicates the terminal state RPC was not received by JM. I am not sure whether the OOM would cause this issue happen resulting in unexpected behavior. In addition, you mentioned these tasks are still active after OOM and was called to cancel, so I am not sure what is the specific periods for your attached TM stack. I think it might provide help if you could provide corresponding TM log and JM log. From TM log it is easy to check the task final state. Best, Zhijiang -- From:Joshua Fan Send Time:2019年6月20日(星期四) 11:55 To:zhijiang Cc:user ; Till Rohrmann ; Chesnay Schepler Subject:Re: Maybe a flink bug. Job keeps in FAILING state zhijiang I did not capture the job ui, the topology is in FAILING state, but the persistentbolt subtasks as can be seen in the picture attached in first mail was all canceled, and the parsebolt subtasks as described before had one subtask FAILED, other subtasks CANCELED, but the source subtasks had one subtask(subtask 4/5) CANCELED, and other subtasks(subtask 1/5,subtask 2/5,subtask 3/5,subtask 5/5) CANCELING, not in a terminal state. The subtask status described above is in jm view, but in tm view, all of the source subtask was in FAILED, do not know why jm was not notify about this. As all of the failed status was triggered by a oom by the subtask can not create native thread when checkpointing, I also dumped the stack of the jvm, it shows the four subtasks(subtask 1/5,subtask 2/5,subtask 3/5,subtask 5/5) are still active after it throwed a oom and was called to cancel . I attached the jstack file in this email. Yours sincerely Joshua On Wed, Jun 19, 2019 at 4:40 PM zhijiang wrote: As long as one task is in canceling state, then the job status might be still in canceling state. @Joshua Do you confirm all of the tasks in topology were already in terminal state such as failed or canceled? Best, Zhijiang -- From:Chesnay Schepler Send Time:2019年6月19日(星期三) 16:32 To:Joshua Fan ; user ; Till Rohrmann Subject:Re: Maybe a flink bug. Job keeps in FAILING state @Till have you see something like this before? Despite all source tasks reaching a terminal state on a TM (FAILED) it does not send updates to the JM for all of them, but only a single one. On 18/06/2019 12:14, Joshua Fan wrote: > Hi All, > There is a topology of 3 operator, such as, source, parser, and > persist.
Re: Maybe a flink bug. Job keeps in FAILING state
Hi Joshua, If the tasks(subtask 1/5,subtask 2/5,subtask 3/5,subtask 5/5) were really in CANCELED state on TM side, but in CANCELING state on JM side, then it might indicates the terminal state RPC was not received by JM. I am not sure whether the OOM would cause this issue happen resulting in unexpected behavior. In addition, you mentioned these tasks are still active after OOM and was called to cancel, so I am not sure what is the specific periods for your attached TM stack. I think it might provide help if you could provide corresponding TM log and JM log. From TM log it is easy to check the task final state. Best, Zhijiang -- From:Joshua Fan Send Time:2019年6月20日(星期四) 11:55 To:zhijiang Cc:user ; Till Rohrmann ; Chesnay Schepler Subject:Re: Maybe a flink bug. Job keeps in FAILING state zhijiang I did not capture the job ui, the topology is in FAILING state, but the persistentbolt subtasks as can be seen in the picture attached in first mail was all canceled, and the parsebolt subtasks as described before had one subtask FAILED, other subtasks CANCELED, but the source subtasks had one subtask(subtask 4/5) CANCELED, and other subtasks(subtask 1/5,subtask 2/5,subtask 3/5,subtask 5/5) CANCELING, not in a terminal state. The subtask status described above is in jm view, but in tm view, all of the source subtask was in FAILED, do not know why jm was not notify about this. As all of the failed status was triggered by a oom by the subtask can not create native thread when checkpointing, I also dumped the stack of the jvm, it shows the four subtasks(subtask 1/5,subtask 2/5,subtask 3/5,subtask 5/5) are still active after it throwed a oom and was called to cancel . I attached the jstack file in this email. Yours sincerely Joshua On Wed, Jun 19, 2019 at 4:40 PM zhijiang wrote: As long as one task is in canceling state, then the job status might be still in canceling state. @Joshua Do you confirm all of the tasks in topology were already in terminal state such as failed or canceled? Best, Zhijiang -- From:Chesnay Schepler Send Time:2019年6月19日(星期三) 16:32 To:Joshua Fan ; user ; Till Rohrmann Subject:Re: Maybe a flink bug. Job keeps in FAILING state @Till have you see something like this before? Despite all source tasks reaching a terminal state on a TM (FAILED) it does not send updates to the JM for all of them, but only a single one. On 18/06/2019 12:14, Joshua Fan wrote: > Hi All, > There is a topology of 3 operator, such as, source, parser, and > persist. Occasionally, 5 subtasks of the source encounters exception > and turns to failed, at the same time, one subtask of the parser runs > into exception and turns to failed too. The jobmaster gets a message > of the parser's failed. The jobmaster then try to cancel all the > subtask, most of the subtasks of the three operator turns to canceled > except the 5 subtasks of the source, because the state of the 5 ones > is already FAILED before jobmaster try to cancel it. Then the > jobmaster can not reach a final state but keeps in Failing state > meanwhile the subtask of the source kees in canceling state. > > The job run on a flink 1.7 cluster on yarn, and there is only one tm > with 10 slots. > > The attached files contains a jm log , tm log and the ui picture. > > The exception timestamp is about 2019-06-16 13:42:28. > > Yours > Joshua
Re: Maybe a flink bug. Job keeps in FAILING state
As long as one task is in canceling state, then the job status might be still in canceling state. @Joshua Do you confirm all of the tasks in topology were already in terminal state such as failed or canceled? Best, Zhijiang -- From:Chesnay Schepler Send Time:2019年6月19日(星期三) 16:32 To:Joshua Fan ; user ; Till Rohrmann Subject:Re: Maybe a flink bug. Job keeps in FAILING state @Till have you see something like this before? Despite all source tasks reaching a terminal state on a TM (FAILED) it does not send updates to the JM for all of them, but only a single one. On 18/06/2019 12:14, Joshua Fan wrote: > Hi All, > There is a topology of 3 operator, such as, source, parser, and > persist. Occasionally, 5 subtasks of the source encounters exception > and turns to failed, at the same time, one subtask of the parser runs > into exception and turns to failed too. The jobmaster gets a message > of the parser's failed. The jobmaster then try to cancel all the > subtask, most of the subtasks of the three operator turns to canceled > except the 5 subtasks of the source, because the state of the 5 ones > is already FAILED before jobmaster try to cancel it. Then the > jobmaster can not reach a final state but keeps in Failing state > meanwhile the subtask of the source kees in canceling state. > > The job run on a flink 1.7 cluster on yarn, and there is only one tm > with 10 slots. > > The attached files contains a jm log , tm log and the ui picture. > > The exception timestamp is about 2019-06-16 13:42:28. > > Yours > Joshua
Re: A little doubt about the blog A Deep To Flink's NetworkStack
Hi Aitozi, The current connection sharing is only suitbale for the same jobVertex between TaskMangaers via ConnectionIndex. I ever tried to remove this limit by ticket [1] but have not reached an agreement on this. You could watch this jira if interested. [1] https://issues.apache.org/jira/browse/FLINK-10462 Best, Zhijiang -- From:aitozi Send Time:2019年6月16日(星期日) 22:19 To:user Subject:A little doubt about the blog A Deep To Flink's NetworkStack Hi, community I read this blog A Deep To Flink's NetworkStack <https://flink.apache.org/2019/06/05/flink-network-stack.html> In the chapter *Physical Transport*, it says that the /if different subtasks of the same task are scheduled onto the same TaskManager, their network connections towards the same TaskManagers will be multiplexed and share a single TCP channel for reduced resource usage./. But I check the code I think each taskmanager have a NettyServer and a NettyClient which work with a channel. So i think the tcp channel is shared between the taskmanager remote connection no matter there is different subtasks of the same task are scheduled onto the same TaskManager or not. Please correct me if my thoughts was wrong. Thanks Aitozi -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: java.io.FileNotFoundException in implementing exactly once
For exactly-once mode before flink-1.5, it needs the temp dir for spilling the blocking buffers during checkpoint. The temp dir is configured via `io.tmp.dirs` and the default value is `java.io.tmpdir`. In your case, your temp dir prefix with `/tmp/` has some problems (might be deleted), and you could double check this dir for the issue. In addition I suggestt you upgrading the flink version because flink-1.3.3 is too old. After upgrading to flink-1.5 above, you do not need to consider this issue, because the exactly-once mode would not spill data to disk any more. Best, Zhijiang -- From:syed Send Time:2019年6月12日(星期三) 10:16 To:user Subject:java.io.FileNotFoundException in implementing exactly once Hi; I am trying to run the standard WordCount application under exactly once and at-least once processing guarantees, respectively. I successfully run the at-least once guarantee, however which running the exactly once guarantee, I face the following exception *Root exception:* java.io.FileNotFoundException: /tmp/flink-io-7a8947d4-c75c-4165-85a1-fb727dd98791/ff99c56a01707c5a610ad250e77e71be4c4ed762b6294d73cf9d780e0d422444.0.buffer (No such file or directory) at java.io.RandomAccessFile.open0(Native Method) at java.io.RandomAccessFile.open(RandomAccessFile.java:316) at java.io.RandomAccessFile.(RandomAccessFile.java:243) at org.apache.flink.streaming.runtime.io.BufferSpiller.createSpillingChannel(BufferSpiller.java:259) at org.apache.flink.streaming.runtime.io.BufferSpiller.(BufferSpiller.java:120) at org.apache.flink.streaming.runtime.io.BarrierBuffer.(BarrierBuffer.java:147) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.(StreamInputProcessor.java:128) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.init(OneInputStreamTask.java:56) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:234) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:748) *Keyed Aggregation -> Sink: Unnamed (1/1)* java.io.FileNotFoundException: /tmp/flink-io-7a8947d4-c75c-4165-85a1-fb727dd98791/ff99c56a01707c5a610ad250e77e71be4c4ed762b6294d73cf9d780e0d422444.0.buffer (No such file or directory) at java.io.RandomAccessFile.open0(Native Method) at java.io.RandomAccessFile.open(RandomAccessFile.java:316) at java.io.RandomAccessFile.(RandomAccessFile.java:243) at org.apache.flink.streaming.runtime.io.BufferSpiller.createSpillingChannel(BufferSpiller.java:259) at org.apache.flink.streaming.runtime.io.BufferSpiller.(BufferSpiller.java:120) at org.apache.flink.streaming.runtime.io.BarrierBuffer.(BarrierBuffer.java:147) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.(StreamInputProcessor.java:128) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.init(OneInputStreamTask.java:56) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:234) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:748) I am using Apache Kafka to keep data source available for checkpoints, and using flink 1.3.3. I face the same exception either I explicitly use Exactly once [/setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE/)] or only use [ /enableCheckpointing(1000)/] Kind regards; Syed -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Apache Flink - Disabling system metrics and collecting only specific metrics
Hi Mans, AFAIK, we have no switch to disable general system metrics which are useful for monitoring status and performance tuning. Only some advanced system metrics could be confgiured to enable or not, and the default config is always disabled, so you do not need toconcern them. Maybe you could implement a custom MetricReporter, and then only consentrate on your required application metrics in the method of `MetricReporter#notifyOfAddedMetric` to show them in backend. Best, Zhijiang -- From:M Singh Send Time:2019年6月12日(星期三) 00:30 To:User Subject:Apache Flink - Disabling system metrics and collecting only specific metrics Hi: I am working on an application and need to collect application metrics. I would like to use Flink's metrics framework for my application metrics. From Flink's documentation (https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/metrics.html#system-metrics), it looks like Flink collects system metrics by default but I don't need those. Is there any way to configure metrics to 1. disable system metrics collection, 2. collect only specific metrics. If there is any documentation/configuration that I might have missed, please let me know. Thanks Mans
Re: [DISCUSS] Deprecate previous Python APIs
It is reasonable as stephan explained. +1 from my side! -- From:Jeff Zhang Send Time:2019年6月11日(星期二) 22:11 To:Stephan Ewen Cc:user ; dev Subject:Re: [DISCUSS] Deprecate previous Python APIs +1 Stephan Ewen 于2019年6月11日周二 下午9:30写道: > Hi all! > > I would suggest to deprecating the existing python APIs for DataSet and > DataStream API with the 1.9 release. > > Background is that there is a new Python API under development. > The new Python API is initially against the Table API. Flink 1.9 will > support Table API programs without UDFs, 1.10 is planned to support UDFs. > Future versions would support also the DataStream API. > > In the long term, Flink should have one Python API for DataStream and > Table APIs. We should not maintain multiple different implementations and > confuse users that way. > Given that the existing Python APIs are a bit limited and not under active > development, I would suggest to deprecate them in favor of the new API. > > Best, > Stephan > > -- Best Regards Jeff Zhang
Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks
The jira is https://issues.apache.org/jira/browse/FLINK-12544 and you could find the PR link in it. -- From:Erai, Rahul Send Time:2019年6月4日(星期二) 18:19 To:zhijiang ; Aljoscha Krettek ; Piotr Nowojski ; "Narayanaswamy, Krishna" Cc:Nico Kruber ; user@flink.apache.org ; "Chan, Regina" Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks Thanks Zhijiang. Can you point us to the JIRA for your fix? Regards, -Rahul From: zhijiang Sent: Tuesday, June 4, 2019 1:26 PM To: Aljoscha Krettek ; Piotr Nowojski ; Narayanaswamy, Krishna [Tech] ; Erai, Rahul [Tech] Cc: Nico Kruber ; user@flink.apache.org; Chan, Regina [Tech] Subject: Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks Yes, it is the same case as multiple slots in TM. The source task and co-group task are still in the same TM in this case. I think you might enable slot sharing, so they are running still in the same slot in one TM. BTW, the previous deadlock issue is already fixed on my side, and it is waiting for review atm. You could pick the code in PR to verfiy the results if you like. And the next release-1.8.1 might cover this fix as well. Best, Zhijiang -- From:Erai, Rahul Send Time:2019年6月4日(星期二) 15:50 To:zhijiang ; Aljoscha Krettek ; Piotr Nowojski ; "Narayanaswamy, Krishna" Cc:Nico Kruber ; user@flink.apache.org ; "Chan, Regina" Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks Hello Zhijiang, We have been seeing deadlocks with single slot TMs as well. Attaching the thread dump as requested. Looks similar to what was had with multi-slots TMs. Thanks, Rahul From: zhijiang Sent: Wednesday, May 22, 2019 7:56 AM To: Aljoscha Krettek ; Piotr Nowojski ; Narayanaswamy, Krishna [Tech] Cc: Nico Kruber ; user@flink.apache.org; Chan, Regina [Tech] ; Erai, Rahul [Tech] Subject: Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks If it is still the case of multiple slots in one TaskManager, it is the same as before. But you said you already used the single slot per TaskManager, right? If it is the case of single slot in TaskManager, you could attach the jstack when occurs next time, otherwise it is not needed. Best, Zhijiang -- From:Narayanaswamy, Krishna Send Time:2019年5月22日(星期三) 00:49 To:zhijiang ; Aljoscha Krettek ; Piotr Nowojski Cc:Nico Kruber ; user@flink.apache.org ; "Chan, Regina" ; "Erai, Rahul" Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks Hi Zhijiang, I couldn’t get the jstack due to some constraints this time around. Will try and get them when it occurs next. But from the looks of it from the console/logs it appears to be the same as the 2 slot cases. DataSource finishing up and CoGroup looking to move from DEPLOYING to RUNNING (and stuck at DEPLOYING) Thanks, Krishna. From: zhijiang Sent: Tuesday, May 21, 2019 7:38 PM To: Aljoscha Krettek ; Piotr Nowojski ; Narayanaswamy, Krishna [Tech] Cc: Nico Kruber ; user@flink.apache.org; Chan, Regina [Tech] ; Erai, Rahul [Tech] Subject: Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks Hi Krishna, Could you show me or attach the jstack for the single slot case? Or is it the same jstack as before? Best, Zhijiang -- From:Narayanaswamy, Krishna Send Time:2019年5月21日(星期二) 19:50 To:zhijiang ; Aljoscha Krettek ; Piotr Nowojski Cc:Nico Kruber ; user@flink.apache.org ; "Chan, Regina" ; "Erai, Rahul" Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks We started to run jobs using the single slotted task managers which seemed to be ok for the past couple of days, but today morning we seem to be seeing these deadlocks even with 1 slot. Is there something else we could try out? Thanks, Krishna. From: Narayanaswamy, Krishna [Tech] Sent: Friday, May 17, 2019 4:20 PM To: 'zhijiang' ; Aljoscha Krettek ; Piotr Nowojski Cc: Nico Kruber ; user@flink.apache.org; Chan, Regina [Tech] ; Erai, Rahul [Tech] Subject: RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks Thanks Zhijiang. We will try these deadlock usecases with a single slot approach to see how they go. Will await the fix to start using more slots on the single TM. Thanks, Krishna. From: zhijiang Sent: Friday, May 17, 2019 4:05 PM To: Aljoscha Krettek ;
Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks
Yes, it is the same case as multiple slots in TM. The source task and co-group task are still in the same TM in this case. I think you might enable slot sharing, so they are running still in the same slot in one TM. BTW, the previous deadlock issue is already fixed on my side, and it is waiting for review atm. You could pick the code in PR to verfiy the results if you like. And the next release-1.8.1 might cover this fix as well. Best, Zhijiang -- From:Erai, Rahul Send Time:2019年6月4日(星期二) 15:50 To:zhijiang ; Aljoscha Krettek ; Piotr Nowojski ; "Narayanaswamy, Krishna" Cc:Nico Kruber ; user@flink.apache.org ; "Chan, Regina" Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks Hello Zhijiang, We have been seeing deadlocks with single slot TMs as well. Attaching the thread dump as requested. Looks similar to what was had with multi-slots TMs. Thanks, Rahul From: zhijiang Sent: Wednesday, May 22, 2019 7:56 AM To: Aljoscha Krettek ; Piotr Nowojski ; Narayanaswamy, Krishna [Tech] Cc: Nico Kruber ; user@flink.apache.org; Chan, Regina [Tech] ; Erai, Rahul [Tech] Subject: Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks If it is still the case of multiple slots in one TaskManager, it is the same as before. But you said you already used the single slot per TaskManager, right? If it is the case of single slot in TaskManager, you could attach the jstack when occurs next time, otherwise it is not needed. Best, Zhijiang -- From:Narayanaswamy, Krishna Send Time:2019年5月22日(星期三) 00:49 To:zhijiang ; Aljoscha Krettek ; Piotr Nowojski Cc:Nico Kruber ; user@flink.apache.org ; "Chan, Regina" ; "Erai, Rahul" Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks Hi Zhijiang, I couldn’t get the jstack due to some constraints this time around. Will try and get them when it occurs next. But from the looks of it from the console/logs it appears to be the same as the 2 slot cases. DataSource finishing up and CoGroup looking to move from DEPLOYING to RUNNING (and stuck at DEPLOYING) Thanks, Krishna. From: zhijiang Sent: Tuesday, May 21, 2019 7:38 PM To: Aljoscha Krettek ; Piotr Nowojski ; Narayanaswamy, Krishna [Tech] Cc: Nico Kruber ; user@flink.apache.org; Chan, Regina [Tech] ; Erai, Rahul [Tech] Subject: Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks Hi Krishna, Could you show me or attach the jstack for the single slot case? Or is it the same jstack as before? Best, Zhijiang -- From:Narayanaswamy, Krishna Send Time:2019年5月21日(星期二) 19:50 To:zhijiang ; Aljoscha Krettek ; Piotr Nowojski Cc:Nico Kruber ; user@flink.apache.org ; "Chan, Regina" ; "Erai, Rahul" Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks We started to run jobs using the single slotted task managers which seemed to be ok for the past couple of days, but today morning we seem to be seeing these deadlocks even with 1 slot. Is there something else we could try out? Thanks, Krishna. From: Narayanaswamy, Krishna [Tech] Sent: Friday, May 17, 2019 4:20 PM To: 'zhijiang' ; Aljoscha Krettek ; Piotr Nowojski Cc: Nico Kruber ; user@flink.apache.org; Chan, Regina [Tech] ; Erai, Rahul [Tech] Subject: RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks Thanks Zhijiang. We will try these deadlock usecases with a single slot approach to see how they go. Will await the fix to start using more slots on the single TM. Thanks, Krishna. From: zhijiang Sent: Friday, May 17, 2019 4:05 PM To: Aljoscha Krettek ; Piotr Nowojski ; Narayanaswamy, Krishna [Tech] Cc: Nico Kruber ; user@flink.apache.org; Chan, Regina [Tech] ; Erai, Rahul [Tech] Subject: Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks I already analyzed out this deadlock case based on the codes. FLINK-10491 has already solved on place to cause deadlock in SpillableSubpartition, but this is a different place to cause this issue. When source task is trying to release subpartition memory, meanwhile another CoGroup task is submitted to trigger source task to release its memory, then it might cause deadlock. I would create a jira ticket for this issue and think how to solve it soon. Currently if you still want to use the blocking type, the simple way to avoid this is to make only one slot in TM, then there never happen one task triggers another task to release
Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks
If it is still the case of multiple slots in one TaskManager, it is the same as before. But you said you already used the single slot per TaskManager, right? If it is the case of single slot in TaskManager, you could attach the jstack when occurs next time, otherwise it is not needed. Best, Zhijiang -- From:Narayanaswamy, Krishna Send Time:2019年5月22日(星期三) 00:49 To:zhijiang ; Aljoscha Krettek ; Piotr Nowojski Cc:Nico Kruber ; user@flink.apache.org ; "Chan, Regina" ; "Erai, Rahul" Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks Hi Zhijiang, I couldn’t get the jstack due to some constraints this time around. Will try and get them when it occurs next. But from the looks of it from the console/logs it appears to be the same as the 2 slot cases. DataSource finishing up and CoGroup looking to move from DEPLOYING to RUNNING (and stuck at DEPLOYING) Thanks, Krishna. From: zhijiang Sent: Tuesday, May 21, 2019 7:38 PM To: Aljoscha Krettek ; Piotr Nowojski ; Narayanaswamy, Krishna [Tech] Cc: Nico Kruber ; user@flink.apache.org; Chan, Regina [Tech] ; Erai, Rahul [Tech] Subject: Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks Hi Krishna, Could you show me or attach the jstack for the single slot case? Or is it the same jstack as before? Best, Zhijiang -- From:Narayanaswamy, Krishna Send Time:2019年5月21日(星期二) 19:50 To:zhijiang ; Aljoscha Krettek ; Piotr Nowojski Cc:Nico Kruber ; user@flink.apache.org ; "Chan, Regina" ; "Erai, Rahul" Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks We started to run jobs using the single slotted task managers which seemed to be ok for the past couple of days, but today morning we seem to be seeing these deadlocks even with 1 slot. Is there something else we could try out? Thanks, Krishna. From: Narayanaswamy, Krishna [Tech] Sent: Friday, May 17, 2019 4:20 PM To: 'zhijiang' ; Aljoscha Krettek ; Piotr Nowojski Cc: Nico Kruber ; user@flink.apache.org; Chan, Regina [Tech] ; Erai, Rahul [Tech] Subject: RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks Thanks Zhijiang. We will try these deadlock usecases with a single slot approach to see how they go. Will await the fix to start using more slots on the single TM. Thanks, Krishna. From: zhijiang Sent: Friday, May 17, 2019 4:05 PM To: Aljoscha Krettek ; Piotr Nowojski ; Narayanaswamy, Krishna [Tech] Cc: Nico Kruber ; user@flink.apache.org; Chan, Regina [Tech] ; Erai, Rahul [Tech] Subject: Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks I already analyzed out this deadlock case based on the codes. FLINK-10491 has already solved on place to cause deadlock in SpillableSubpartition, but this is a different place to cause this issue. When source task is trying to release subpartition memory, meanwhile another CoGroup task is submitted to trigger source task to release its memory, then it might cause deadlock. I would create a jira ticket for this issue and think how to solve it soon. Currently if you still want to use the blocking type, the simple way to avoid this is to make only one slot in TM, then there never happen one task triggers another task to release memory in the same TM. Or you could increase the network buffer setting to work aournd, but not sure this way could work for your case because it is up to the total data size the source produced. Best, Zhijiang -- From:Narayanaswamy, Krishna Send Time:2019年5月17日(星期五) 17:37 To:Zhijiang(wangzhijiang999) ; Aljoscha Krettek ; Piotr Nowojski Cc:Nico Kruber ; user@flink.apache.org ; "Chan, Regina" ; "Erai, Rahul" Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks We see this JIRA issue (FLINK-10491) as fixed and the fix is present in Flink v1.6.4 which we are using now but the problem now seems to come up for relatively simpler scenarios as well. Deadlock dump below - Java stack information for the threads listed above: === "CoGroup (2/2)": at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:213) - waiting to lock <0x00062bf859b8> (a java.lang.Object) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614) at java.lang.Thread.run(Thread.java:745) "CoGroup (1/2)": at org.apache.fli
Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks
Hi Krishna, Could you show me or attach the jstack for the single slot case? Or is it the same jstack as before? Best, Zhijiang -- From:Narayanaswamy, Krishna Send Time:2019年5月21日(星期二) 19:50 To:zhijiang ; Aljoscha Krettek ; Piotr Nowojski Cc:Nico Kruber ; user@flink.apache.org ; "Chan, Regina" ; "Erai, Rahul" Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks We started to run jobs using the single slotted task managers which seemed to be ok for the past couple of days, but today morning we seem to be seeing these deadlocks even with 1 slot. Is there something else we could try out? Thanks, Krishna. From: Narayanaswamy, Krishna [Tech] Sent: Friday, May 17, 2019 4:20 PM To: 'zhijiang' ; Aljoscha Krettek ; Piotr Nowojski Cc: Nico Kruber ; user@flink.apache.org; Chan, Regina [Tech] ; Erai, Rahul [Tech] Subject: RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks Thanks Zhijiang. We will try these deadlock usecases with a single slot approach to see how they go. Will await the fix to start using more slots on the single TM. Thanks, Krishna. From: zhijiang Sent: Friday, May 17, 2019 4:05 PM To: Aljoscha Krettek ; Piotr Nowojski ; Narayanaswamy, Krishna [Tech] Cc: Nico Kruber ; user@flink.apache.org; Chan, Regina [Tech] ; Erai, Rahul [Tech] Subject: Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks I already analyzed out this deadlock case based on the codes. FLINK-10491 has already solved on place to cause deadlock in SpillableSubpartition, but this is a different place to cause this issue. When source task is trying to release subpartition memory, meanwhile another CoGroup task is submitted to trigger source task to release its memory, then it might cause deadlock. I would create a jira ticket for this issue and think how to solve it soon. Currently if you still want to use the blocking type, the simple way to avoid this is to make only one slot in TM, then there never happen one task triggers another task to release memory in the same TM. Or you could increase the network buffer setting to work aournd, but not sure this way could work for your case because it is up to the total data size the source produced. Best, Zhijiang -- From:Narayanaswamy, Krishna Send Time:2019年5月17日(星期五) 17:37 To:Zhijiang(wangzhijiang999) ; Aljoscha Krettek ; Piotr Nowojski Cc:Nico Kruber ; user@flink.apache.org ; "Chan, Regina" ; "Erai, Rahul" Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks We see this JIRA issue (FLINK-10491) as fixed and the fix is present in Flink v1.6.4 which we are using now but the problem now seems to come up for relatively simpler scenarios as well. Deadlock dump below - Java stack information for the threads listed above: === "CoGroup (2/2)": at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:213) - waiting to lock <0x00062bf859b8> (a java.lang.Object) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614) at java.lang.Thread.run(Thread.java:745) "CoGroup (1/2)": at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:277) - waiting to lock <0x00063fdf4888> (a java.util.ArrayDeque) at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172) at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95) at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84) at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147) at org.apache.flink.runtime.io.network.buffer.BufferConsumer.close(BufferConsumer.java:121) at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.spillFinishedBufferConsumers(SpillableSubpartition.java:274) at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:239) - locked <0x00063fdf4ac8> (a java.util.ArrayDeque) at org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371) at org.apa
Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks
I already created the jira [1] for it and you could monitor it for progress. In addition, the SpillableSubpartition would be abandoned from FLINK-1.9, and stephan already implemented a new BoundedBlockingSubpartition to replace it. Of course we would still provide the support for the existing bugs in previous flink versions. [1] https://issues.apache.org/jira/browse/FLINK-12544 Best, Zhijiang -- From:Narayanaswamy, Krishna Send Time:2019年5月17日(星期五) 19:00 To:zhijiang ; Aljoscha Krettek ; Piotr Nowojski Cc:Nico Kruber ; user@flink.apache.org ; "Chan, Regina" ; "Erai, Rahul" Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks Thanks Zhijiang. We will try these deadlock usecases with a single slot approach to see how they go. Will await the fix to start using more slots on the single TM. Thanks, Krishna. From: zhijiang Sent: Friday, May 17, 2019 4:05 PM To: Aljoscha Krettek ; Piotr Nowojski ; Narayanaswamy, Krishna [Tech] Cc: Nico Kruber ; user@flink.apache.org; Chan, Regina [Tech] ; Erai, Rahul [Tech] Subject: Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks I already analyzed out this deadlock case based on the codes. FLINK-10491 has already solved on place to cause deadlock in SpillableSubpartition, but this is a different place to cause this issue. When source task is trying to release subpartition memory, meanwhile another CoGroup task is submitted to trigger source task to release its memory, then it might cause deadlock. I would create a jira ticket for this issue and think how to solve it soon. Currently if you still want to use the blocking type, the simple way to avoid this is to make only one slot in TM, then there never happen one task triggers another task to release memory in the same TM. Or you could increase the network buffer setting to work aournd, but not sure this way could work for your case because it is up to the total data size the source produced. Best, Zhijiang -- From:Narayanaswamy, Krishna Send Time:2019年5月17日(星期五) 17:37 To:Zhijiang(wangzhijiang999) ; Aljoscha Krettek ; Piotr Nowojski Cc:Nico Kruber ; user@flink.apache.org ; "Chan, Regina" ; "Erai, Rahul" Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks We see this JIRA issue (FLINK-10491) as fixed and the fix is present in Flink v1.6.4 which we are using now but the problem now seems to come up for relatively simpler scenarios as well. Deadlock dump below - Java stack information for the threads listed above: === "CoGroup (2/2)": at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:213) - waiting to lock <0x00062bf859b8> (a java.lang.Object) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614) at java.lang.Thread.run(Thread.java:745) "CoGroup (1/2)": at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:277) - waiting to lock <0x00063fdf4888> (a java.util.ArrayDeque) at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172) at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95) at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84) at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147) at org.apache.flink.runtime.io.network.buffer.BufferConsumer.close(BufferConsumer.java:121) at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.spillFinishedBufferConsumers(SpillableSubpartition.java:274) at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:239) - locked <0x00063fdf4ac8> (a java.util.ArrayDeque) at org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:375) at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:408) at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(Networ
Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks
I already analyzed out this deadlock case based on the codes. FLINK-10491 has already solved on place to cause deadlock in SpillableSubpartition, but this is a different place to cause this issue. When source task is trying to release subpartition memory, meanwhile another CoGroup task is submitted to trigger source task to release its memory, then it might cause deadlock. I would create a jira ticket for this issue and think how to solve it soon. Currently if you still want to use the blocking type, the simple way to avoid this is to make only one slot in TM, then there never happen one task triggers another task to release memory in the same TM. Or you could increase the network buffer setting to work aournd, but not sure this way could work for your case because it is up to the total data size the source produced. Best, Zhijiang -- From:Narayanaswamy, Krishna Send Time:2019年5月17日(星期五) 17:37 To:Zhijiang(wangzhijiang999) ; Aljoscha Krettek ; Piotr Nowojski Cc:Nico Kruber ; user@flink.apache.org ; "Chan, Regina" ; "Erai, Rahul" Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks We see this JIRA issue (FLINK-10491) as fixed and the fix is present in Flink v1.6.4 which we are using now but the problem now seems to come up for relatively simpler scenarios as well. Deadlock dump below - Java stack information for the threads listed above: === "CoGroup (2/2)": at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:213) - waiting to lock <0x00062bf859b8> (a java.lang.Object) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614) at java.lang.Thread.run(Thread.java:745) "CoGroup (1/2)": at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:277) - waiting to lock <0x00063fdf4888> (a java.util.ArrayDeque) at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172) at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95) at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84) at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147) at org.apache.flink.runtime.io.network.buffer.BufferConsumer.close(BufferConsumer.java:121) at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.spillFinishedBufferConsumers(SpillableSubpartition.java:274) at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:239) - locked <0x00063fdf4ac8> (a java.util.ArrayDeque) at org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:375) at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:408) at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:297) - locked <0x00063c785350> (a java.lang.Object) at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:259) at org.apache.flink.runtime.io.network.NetworkEnvironment.setupInputGate(NetworkEnvironment.java:272) at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:224) - locked <0x00062bf859b8> (a java.lang.Object) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614) at java.lang.Thread.run(Thread.java:745) "DataSource (1/1)": at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:227) - waiting to lock <0x00063fdf4ac8> (a java.util.ArrayDeque) at org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:257) - locked <0x00063fdf4888> (a java.util.ArrayDeque) at org.apache.flink.
Re: Netty channel closed at AKKA gated status
Hi Wenrui, I think you could trace the log of node manager which contains the lifecycle of this task executor. Maybe this task executor is killed by node manager because of memory overuse. Best, Zhijiang -- From:Wenrui Meng Send Time:2019年4月20日(星期六) 09:48 To:zhijiang Cc:Biao Liu ; user ; tzulitai Subject:Re: Netty channel closed at AKKA gated status Attached the lost task manager last 1 lines log. Anyone can help take a look? Thanks, Wenrui On Fri, Apr 19, 2019 at 6:32 PM Wenrui Meng wrote: Looked at a few same instances. The lost task manager was indeed not active anymore since there is no log for that task manager printed after the connection issue timestamp. I guess somehow that task manager died silently without exception or termination relevant information logged. I double checked the lost task manager host, the GC, CPU, memory, network, disk I/O all look good without any spike. Is there any other possibility that the task manager can be terminated? We run our jobs in the yarn cluster. On Mon, Apr 15, 2019 at 10:47 PM zhijiang wrote: Hi Wenrui, You might further check whether there exists network connection issue between job master and target task executor if you confirm the target task executor is still alive. Best, Zhijiang -- From:Biao Liu Send Time:2019年4月16日(星期二) 10:14 To:Wenrui Meng Cc:zhijiang ; user ; tzulitai Subject:Re: Netty channel closed at AKKA gated status Hi Wenrui, If a task manager is killed (kill -9), it would have no chance to log anything. If the task manager exits since connection timeout, there would be something in log file. So it is probably killed by other user or operating system. Please check the log of operating system. BTW, I don't think "DEBUG log level" would help. Wenrui Meng 于2019年4月16日周二 上午9:16写道: There is no exception or any warning in the task manager `'athena592-phx2/10.80.118.166:44177'` log. In addition, the host was not shut down either in cluster monitor dashboard. It probably requires to turn on DEBUG log to get more useful information. If the task manager gets killed, I assume there will be terminating log in the task manager log. If not, I don't know how to figure out whether it's due to task manager gets killed or just a connection timeout. On Sun, Apr 14, 2019 at 7:22 PM zhijiang wrote: Hi Wenrui, I think the akka gated issue and inactive netty channel are both caused by some task manager exits/killed. You should double check the status and reason of this task manager `'athena592-phx2/10.80.118.166:44177'`. Best, Zhijiang -- From:Wenrui Meng Send Time:2019年4月13日(星期六) 01:01 To:user Cc:tzulitai Subject:Netty channel closed at AKKA gated status We encountered the netty channel inactive issue while the AKKA gated that task manager. I'm wondering whether the channel closed because of the AKKA gated status, since all message to the taskManager will be dropped at that moment, which might cause netty channel exception. If so, shall we have coordination between AKKA and Netty? The gated status is not intended to fail the system. Here is the stack trace fthe or exception 2019-04-12 12:46:38.413 [flink-akka.actor.default-dispatcher-90] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 3758 (3788228399 bytes in 5967 ms). 2019-04-12 12:49:14.175 [flink-akka.actor.default-dispatcher-65] WARN akka.remote.ReliableDeliverySupervisor flink-akka.remote.default-remote-dispatcher-25 - Association with remote system [akka.tcp://flink@athena592-phx2:44487] has failed, address is now gated for [5000] ms. Reason: [Disassociated] 2019-04-12 12:49:14.175 [flink-akka.actor.default-dispatcher-65] WARN akka.remote.ReliableDeliverySupervisor flink-akka.remote.default-remote-dispatcher-25 - Association with remote system [akka.tcp://flink@athena592-phx2:44487] has failed, address is now gated for [5000] ms. Reason: [Disassociated] 2019-04-12 12:49:14.230 [flink-akka.actor.default-dispatcher-65] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - id (14/96) (93fcbfc535a190e1edcfd913d5f304fe) switched from RUNNING to FAILED. org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connection unexpectedly closed by remote task manager 'athena592-phx2/10.80.118.166:44177'. This might indicate that the remote task manager was lost. at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelInactive(PartitionRequestClientHandler.java:117) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChann
Re: Netty channel closed at AKKA gated status
Hi Wenrui, You might further check whether there exists network connection issue between job master and target task executor if you confirm the target task executor is still alive. Best, Zhijiang -- From:Biao Liu Send Time:2019年4月16日(星期二) 10:14 To:Wenrui Meng Cc:zhijiang ; user ; tzulitai Subject:Re: Netty channel closed at AKKA gated status Hi Wenrui, If a task manager is killed (kill -9), it would have no chance to log anything. If the task manager exits since connection timeout, there would be something in log file. So it is probably killed by other user or operating system. Please check the log of operating system. BTW, I don't think "DEBUG log level" would help. Wenrui Meng 于2019年4月16日周二 上午9:16写道: There is no exception or any warning in the task manager `'athena592-phx2/10.80.118.166:44177'` log. In addition, the host was not shut down either in cluster monitor dashboard. It probably requires to turn on DEBUG log to get more useful information. If the task manager gets killed, I assume there will be terminating log in the task manager log. If not, I don't know how to figure out whether it's due to task manager gets killed or just a connection timeout. On Sun, Apr 14, 2019 at 7:22 PM zhijiang wrote: Hi Wenrui, I think the akka gated issue and inactive netty channel are both caused by some task manager exits/killed. You should double check the status and reason of this task manager `'athena592-phx2/10.80.118.166:44177'`. Best, Zhijiang -- From:Wenrui Meng Send Time:2019年4月13日(星期六) 01:01 To:user Cc:tzulitai Subject:Netty channel closed at AKKA gated status We encountered the netty channel inactive issue while the AKKA gated that task manager. I'm wondering whether the channel closed because of the AKKA gated status, since all message to the taskManager will be dropped at that moment, which might cause netty channel exception. If so, shall we have coordination between AKKA and Netty? The gated status is not intended to fail the system. Here is the stack trace fthe or exception 2019-04-12 12:46:38.413 [flink-akka.actor.default-dispatcher-90] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 3758 (3788228399 bytes in 5967 ms). 2019-04-12 12:49:14.175 [flink-akka.actor.default-dispatcher-65] WARN akka.remote.ReliableDeliverySupervisor flink-akka.remote.default-remote-dispatcher-25 - Association with remote system [akka.tcp://flink@athena592-phx2:44487] has failed, address is now gated for [5000] ms. Reason: [Disassociated] 2019-04-12 12:49:14.175 [flink-akka.actor.default-dispatcher-65] WARN akka.remote.ReliableDeliverySupervisor flink-akka.remote.default-remote-dispatcher-25 - Association with remote system [akka.tcp://flink@athena592-phx2:44487] has failed, address is now gated for [5000] ms. Reason: [Disassociated] 2019-04-12 12:49:14.230 [flink-akka.actor.default-dispatcher-65] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - id (14/96) (93fcbfc535a190e1edcfd913d5f304fe) switched from RUNNING to FAILED. org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connection unexpectedly closed by remote task manager 'athena592-phx2/10.80.118.166:44177'. This might indicate that the remote task manager was lost. at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelInactive(PartitionRequestClientHandler.java:117) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223) at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223) at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:294) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:829) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUn
Re: Can back pressure data be gathered by Flink metric system?
Hi Henry, Thanks for the explanation. I am not sure whether it is feasible on your side to monitor the backpressure via restful api provided by flink. Some experience on my side to share. We ever monitored the backpressure via the metrics of outqueue length/usage on producer side and inqueue length/usage on consumer side. Although it is not very accurate sometimes, it could provide some hints of backpressure, because the outqueue and inqueue should be filled with buffers between producer and consumer when backpressure occurs. Best, Zhijiang -- From:徐涛 Send Time:2019年4月16日(星期二) 09:33 To:zhijiang Cc:user Subject:Re: Can back pressure data be gathered by Flink metric system? Hi Zhijiang, Because I want to know the current and the trend of backpressure status in Flink Job. Like other index such as latency, I can monitor it, and show it in graph by getting data from metric. Now using the metric to get the backpressure data is the simplest way I can think. Best Henry 在 2019年4月15日,上午10:34,zhijiang 写道: Hi Henry, The backpressure tracking is not realized in metric framework, you could check the details via [1]. I am not sure why your requirements is showing backpressure in metrics. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/back_pressure.html Best, Zhijiang -- From:徐涛 Send Time:2019年4月15日(星期一) 10:19 To:user Subject:Can back pressure data be gathered by Flink metric system? Hi Experts, From the page Flink metric system(https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/metrics.html#system-metrics), I do not find the info about the back pressure. I want to get the backpressure data and plot it, but I do not know how to get it via metric. Can anybody help me about it? Thanks a lot. Best Henry
Re: Can back pressure data be gathered by Flink metric system?
Hi Henry, The backpressure tracking is not realized in metric framework, you could check the details via [1]. I am not sure why your requirements is showing backpressure in metrics. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/back_pressure.html Best, Zhijiang -- From:徐涛 Send Time:2019年4月15日(星期一) 10:19 To:user Subject:Can back pressure data be gathered by Flink metric system? Hi Experts, From the page Flink metric system(https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/metrics.html#system-metrics), I do not find the info about the back pressure. I want to get the backpressure data and plot it, but I do not know how to get it via metric. Can anybody help me about it? Thanks a lot. Best Henry
Re: Retain metrics counters across task restarts
Hi Peter, The lifecycle of these metrics are coupled with lifecycle of task, So the metrics would be initialized after task is restarted. I think of one possible option is that you could store your required metrics into state, then the metric states would be restored from backend after task is restarted. Best, Zhijiang -- From:Peter Zende Send Time:2019年4月14日(星期日) 00:25 To:user Subject:Retain metrics counters across task restarts Hi all We're exposing Prometheus metrics from our Flink (v1.7.1) pipeline to Prometheus, e.g: the total number of processed records. This works fine until any of the tasks is restarted within this yarn application. Then the counter is reset and it starts incrementing values from 0. How can we retain such counter through the entire lifetime of the yarn application similarly to Hadoop counters? Thanks Peter
Re: Netty channel closed at AKKA gated status
Hi Wenrui, I think the akka gated issue and inactive netty channel are both caused by some task manager exits/killed. You should double check the status and reason of this task manager `'athena592-phx2/10.80.118.166:44177'`. Best, Zhijiang -- From:Wenrui Meng Send Time:2019年4月13日(星期六) 01:01 To:user Cc:tzulitai Subject:Netty channel closed at AKKA gated status We encountered the netty channel inactive issue while the AKKA gated that task manager. I'm wondering whether the channel closed because of the AKKA gated status, since all message to the taskManager will be dropped at that moment, which might cause netty channel exception. If so, shall we have coordination between AKKA and Netty? The gated status is not intended to fail the system. Here is the stack trace fthe or exception 2019-04-12 12:46:38.413 [flink-akka.actor.default-dispatcher-90] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 3758 (3788228399 bytes in 5967 ms). 2019-04-12 12:49:14.175 [flink-akka.actor.default-dispatcher-65] WARN akka.remote.ReliableDeliverySupervisor flink-akka.remote.default-remote-dispatcher-25 - Association with remote system [akka.tcp://flink@athena592-phx2:44487] has failed, address is now gated for [5000] ms. Reason: [Disassociated] 2019-04-12 12:49:14.175 [flink-akka.actor.default-dispatcher-65] WARN akka.remote.ReliableDeliverySupervisor flink-akka.remote.default-remote-dispatcher-25 - Association with remote system [akka.tcp://flink@athena592-phx2:44487] has failed, address is now gated for [5000] ms. Reason: [Disassociated] 2019-04-12 12:49:14.230 [flink-akka.actor.default-dispatcher-65] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - id (14/96) (93fcbfc535a190e1edcfd913d5f304fe) switched from RUNNING to FAILED. org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connection unexpectedly closed by remote task manager 'athena592-phx2/10.80.118.166:44177'. This might indicate that the remote task manager was lost. at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelInactive(PartitionRequestClientHandler.java:117) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223) at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223) at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:294) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:829) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:610) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) at java.lang.Thread.run(Thread.java:748)
Re: Question regarding "Insufficient number of network buffers"
Hi Allen, There are two ways for setting network buffers. The old way via `taskmanager.network.numberOfBuffers` is deprecated. The new way is via three parameters min,max and fraction. The specific formula is Math.min(network.memory.max, Math.max(network.memory.min, network.memory.fraction * jvmMemory). If both ways are setting, only the new way works. You can adjust these three parameters accordingly. Also you could check the log of task manager by searching " MB for network buffer pool (number of memory segments: " to confirm whether your setting is working as expected. Best, Zhijiang -- From:Xiangfeng Zhu Send Time:2019年4月12日(星期五) 08:03 To:user Subject:Question regarding "Insufficient number of network buffers" Hello, My name is Allen, and I'm currently researching different distributed execution engines. I wanted to run some benchmarks on Flink with a 10-node cluster(each node has 64vCPUs and 376GB memory). I ran the program with parallelism 320 and got an error message: "Caused by: java.io.IOException: Insufficient number of network buffers: required 320, but only 128 available. The total number of network buffers is currently set to 32768 of 32768 bytes each. You can increase this number by setting the configuration keys 'taskmanager.network.memory.fraction', 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'." Currently, I set the following parameters: jobmanager.heap.size: 102400m taskmanager.memory.size: 102400m taskmanager.numberOfTaskSlots: 32 taskmanager.network.memory.min: 102400m taskmanager.network.memory.max: 102400m taskmanager.network.memory.fraction: 0.5 (For the last three fields, I've also tried to set taskmanager.network.numberOfBuffers: 40960 directly) Could you please give me some advice about how should I fix it? Thank you so much! Best, Allen
Re: [ANNOUNCE] Apache Flink 1.8.0 released
Cool! Finally see the FLINK 1.8.0 release. Thanks Aljoscha for this excellent work and efforts for other contributors. We would continue working hard for FLINK 1.9.0 Best, Zhijiang -- From:vino yang Send Time:2019年4月10日(星期三) 17:33 To:d...@flink.apache.org Cc:Aljoscha Krettek ; user ; announce Subject:Re: [ANNOUNCE] Apache Flink 1.8.0 released Great news! Thanks Aljoscha for being the release manager and thanks to all the contributors! Best, Vino Driesprong, Fokko 于2019年4月10日周三 下午4:54写道: Great news! Great effort by the community to make this happen. Thanks all! Cheers, Fokko Op wo 10 apr. 2019 om 10:50 schreef Shaoxuan Wang : > Thanks Aljoscha and all others who made contributions to FLINK 1.8.0. > Looking forward to FLINK 1.9.0. > > Regards, > Shaoxuan > > On Wed, Apr 10, 2019 at 4:31 PM Aljoscha Krettek > wrote: > > > The Apache Flink community is very happy to announce the release of > Apache > > Flink 1.8.0, which is the next major release. > > > > Apache Flink(r) is an open-source stream processing framework for > > distributed, high-performing, always-available, and accurate data > streaming > > applications. > > > > The release is available for download at: > > https://flink.apache.org/downloads.html > > > > Please check out the release blog post for an overview of the > improvements > > for this bugfix release: > > https://flink.apache.org/news/2019/04/09/release-1.8.0.html > > > > The full release notes are available in Jira: > > > > > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12344274 > > > > We would like to thank all contributors of the Apache Flink community who > > made this release possible! > > > > Regards, > > Aljoscha >
Re: Flink credit based flow control
Hi Brian, Actually I also thought of adding the metrics you mentioned after contributing the credit-based flow control. It should help performance tuning sometimes. If you want to add this metirc, you could trace the related process in `ResultSubpartition`. When the backlog is increasd during adding `BufferConsumer` in queue, you can check the current credits for this sub partition. Another possible way is when the subpartition view is changed from available to unavailable because of no credits, wecould add some metrics here. But even though we found the sender is blocked because of no credits, we should still need distinguish two conditions. If the sender is backpressured, then this condition of no credits is within expectation. Maybe it does not need to add extra metric to trace your problem. You can check whether the high latency network is caused by backpressure. And what is the flush timeout you config. Also you can trace the current metrics of outqueue.usages|length and inqueue.usags|length to find something. Best, Zhijiang -- From:Brian Ramprasad Send Time:2019年3月12日(星期二) 03:47 To:user Subject:Flink credit based flow control Hi, I am trying to use the most recent version of Flink over a high latency network and I am trying to measure how long a sender may wait for credits before it can send buffers to the receiver. Does anyone know which function/class where I can measure at the sender side the time spent waiting to receive the incoming credit announcements? Thanks Brian R
Re: Checkpoints and catch-up burst (heavy back pressure)
Hi Arnaud, I think I understand your special user case based on your further explanation. As you said, it is easy for source to emit the whole file names caching in network buffers because the emitted file name is so small and flatmap/sink processing is slow. Then when checkpoint triggered, the barrier is behind the whole set of file names, that means the sink can not receive the barrier until reading and writing all the corresponding files. So the proper solution in your case has to control the emit rate on source side based on sink catchup progress in order to avoid many files queued in front of barriers. This is the right way to try and wish your solution with 2 parameters work. Best, Zhijiang -- From:LINZ, Arnaud Send Time:2019年3月2日(星期六) 16:45 To:zhijiang ; user Subject:RE: Checkpoints and catch-up burst (heavy back pressure) Hello, When I think about it, I figure out that a barrier for the source is the whole set of files and therefore the checkpoint will never complete until the sink have caught up. The simplest way to deal with it without refactoring is to add 2 parameters to the source, a file number threshold detecting the catchup mode and a max file per sec limitation when this occupe, slightly lower than the natural catchup rate. Message d'origine De : "LINZ, Arnaud" Date : ven., mars 01, 2019 2:04 PM +0100 A : zhijiang , user Objet : RE: Checkpoints and catch-up burst (heavy back pressure) Hi, I think I should go into more details to explain my use case. I have one non parallel source (parallelism = 1) that list binary files in a HDFS directory. DataSet emitted by the source is a data set of file names, not file content. These filenames are rebalanced, and sent to workers (parallelism = 15) that will use a flatmapper that open the file, read it, decode it, and send records (forward mode) to the sinks (with a few 1-to-1 mapping in-between). So the flatmap operation is a time-consuming one as the files are more than 200Mb large each; the flatmapper will emit millions of record to the sink given one source record (filename). The rebalancing, occurring at the file name level, does not use much I/O and I cannot use one-to-one mode at that point if I want some parallelims since I have only one source. I did not put file decoding directly in the sources because I have no good way to distribute files to sources without a controller (input directory is unique, filenames are random and cannot be “attributed” to one particular source instance easily). Alternatively, I could have used a dispatcher daemon separated from the streaming app that distribute files to various directories, each directory being associated with a flink source instance, and put the file reading & decoding directly in the source, but that seemed more complex to code and exploit than the filename source. Would it have been better from the checkpointing perspective? About the ungraceful source sleep(), is there a way, programmatically, to know the “load” of the app, or to determine if checkpointing takes too much time, so that I can do it only on purpose? Thanks, Arnaud De : zhijiang Envoyé : vendredi 1 mars 2019 04:59 À : user ; LINZ, Arnaud Objet : Re: Checkpoints and catch-up burst (heavy back pressure) Hi Arnaud, Thanks for the further feedbacks! For option1: 40min still does not makes sense, which indicates it might take more time to finish checkpoint in your case. I also experienced some scenarios of catching up data to take several hours to finish one checkpoint. If the current checkpoint expires because of timeout, the next new triggered checkpoint might still be failed for timeout. So it seems better to wait the current checkpoint until finishes, not expires it, unless we can not bear this long time for some reasons such as wondering failover to restore more data during this time. For option2: The default network setting should be make sense. The lower values might cause performance regression and the higher values would increase the inflighing buffers and checkpoint delay more seriously. For option3: If the resource is limited, it is still not working on your side. It is an option and might work in your case for sleeping some time in source as you mentioned, although it seems not a graceful way. I think there are no data skew in your case to cause backpressure, because you used the rebalance mode as mentioned. Another option might use the forward mode which would be better than rebalance mode if possible in your case. Because the source and downstream task is one-to-one in forward mode, so the total flighting buffers are 2+2+8 for one single downstream task before barrier. If in rebalance mode, the total flighting buffer would be (a*2+a*2+8) for one single downstream task (`a` is the parallelism of source vertex), because it is all-to-all connectio
Re: Checkpoints and catch-up burst (heavy back pressure)
Hi Arnaud, Thanks for the further feedbacks! For option1: 40min still does not makes sense, which indicates it might take more time to finish checkpoint in your case. I also experienced some scenarios of catching up data to take several hours to finish one checkpoint. If the current checkpoint expires because of timeout, the next new triggered checkpoint might still be failed for timeout. So it seems better to wait the current checkpoint until finishes, not expires it, unless we can not bear this long time for some reasons such as wondering failover to restore more data during this time. For option2: The default network setting should be make sense. The lower values might cause performance regression and the higher values would increase the inflighing buffers and checkpoint delay more seriously. For option3: If the resource is limited, it is still not working on your side. It is an option and might work in your case for sleeping some time in source as you mentioned, although it seems not a graceful way. I think there are no data skew in your case to cause backpressure, because you used the rebalance mode as mentioned. Another option might use the forward mode which would be better than rebalance mode if possible in your case. Because the source and downstream task is one-to-one in forward mode, so the total flighting buffers are 2+2+8 for one single downstream task before barrier. If in rebalance mode, the total flighting buffer would be (a*2+a*2+8) for one single downstream task (`a` is the parallelism of source vertex), because it is all-to-all connection. The barrier alignment takes more time in rebalance mode than forward mode. Best, Zhijiang -- From:LINZ, Arnaud Send Time:2019年3月1日(星期五) 00:46 To:zhijiang ; user Subject:RE: Checkpoints and catch-up burst (heavy back pressure) Update : Option 1 does not work. It still fails at the end of the timeout, no matter its value. Should I implement a “bandwidth” management system by using artificial Thread.sleep in the source depending on the back pressure ? De : LINZ, Arnaud Envoyé : jeudi 28 février 2019 15:47 À : 'zhijiang' ; user Objet : RE: Checkpoints and catch-up burst (heavy back pressure) Hi Zhihiang, Thanks for your feedback. I’ll try option 1 ; time out is 4min for now, I’ll switch it to 40min and will let you know. Setting it higher than 40 min does not make much sense since after 40 min the pending output is already quite large. Option 3 won’t work ; I already take too many ressources, and as my source is more or less a hdfs directory listing, it will always be far faster than any mapper that reads the file and emits records based on its content or sink that store the transformed data, unless I put “sleeps” in it (but is this really a good idea?) Option 2: taskmanager.network.memory.buffers-per-channel and taskmanager.network.memory.buffers-per-gate are currently unset in my configuration (so to their default of 2 and 8), but for this streaming app I have very few exchanges between nodes (just a rebalance after the source that emit file names, everything else is local to the node). Should I adjust their values nonetheless ? To higher or lower values ? Best, Arnaud De : zhijiang Envoyé : jeudi 28 février 2019 10:58 À : user ; LINZ, Arnaud Objet : Re: Checkpoints and catch-up burst (heavy back pressure) Hi Arnaud, I think there are two key points. First the checkpoint barrier might be emitted delay from source under high backpressure for synchronizing lock. Second the barrier has to be queued in flighting data buffers, so the downstream task has to process all the buffers before barriers to trigger checkpoint and this would take some time under back pressure. There has three ways to work around: 1. Increase the checkpoint timeout avoid expire in short time. 2. Decrease the setting of network buffers to decrease the amount of flighting buffers before barrier, you can check the config of "taskmanager.network.memory.buffers-per-channel" and "taskmanager.network.memory.buffers-per-gate". 3. Adjust the parallelism such as increasing it for sink vertex in order to process source data faster, to avoid backpressure in some extent. You could check which way is suitable for your scenario and may have a try. Best, Zhijiang -- From:LINZ, Arnaud Send Time:2019年2月28日(星期四) 17:28 To:user Subject:Checkpoints and catch-up burst (heavy back pressure) Hello, I have a simple streaming app that get data from a source and store it to HDFS using a sink similar to the bucketing file sink. Checkpointing mode is “exactly once”. Everything is fine on a “normal” course as the sink is faster than the source; but when we stop the application for a while and then restart it, we have a catch-up burst to get all the messages emitted in the meanwhile. During this burst, the s
Re: Flink performance drops when async checkpoint is slow
Hi Paul, Thanks for your feedback. If the at-least-once mode still causes the problem, we can confirm it is not caused by blocking behavior in exactly-once-mode mentioned before. For at-least once, the task would continue processing the buffers following with barriers during allignment. But for exactly-once, the task would block the channel after reading barrier during allignment. It is difficult to confirm based on the obsolute state size which is also related to network buffer setting and parallelism. I think your problem is once activing checkpoint, the backpressure is also caused sometimes which results in low performance and cpu usages. Maybe we can analyze which vertex and subtask cause the backpressure and then traces its jstack to check which operations slow down it. The source vertex is blocked in `requestingBufferBuilder` once backpressure. But I am not sure it is caused by middle vertex or the last sink vertex. If the middle vertex is also blocked by requesting buffer as you mentioned in first email, then the backpressure should be caused by last sink vertex. But the jstack of last sink task should not be in `getNextBuffer` or you did not trace all the sink tasks parallelism. My suggestion is first to check which vertex causes the backpressure (middle or last sink vertex?). And then trace the jstack of proper parallelism task in this vertex, you can select the task with the largest inqueue length. I think we might find something if seeing which operation delays the task to cause the backpressure, and this operation might be involved with HDFS. :) Best, Zhijiang -- From:Paul Lam Send Time:2019年2月28日(星期四) 19:17 To:zhijiang Cc:user Subject:Re: Flink performance drops when async checkpoint is slow Hi Zhijiang, Thanks a lot for your reasoning! I tried to set the checkpoint to at-leaset-once as you suggested, but unluckily the problem remains the same :( IMHO, if it’s caused by barrier alignment, the state size (mainly buffers during alignment) would be big, right? But actually it’s not, so we didn’t think that way before. Best, Paul Lam 在 2019年2月28日,16:12,zhijiang 写道: Hi Paul, I am not sure whether task thread is involverd in some works during snapshoting states for FsStateBackend. But I have another experience which might also cause your problem. From your descriptions below, the last task is blocked by `SingleInputGate.getNextBufferOrEvent` that means the middle task does not have any outpus or the middle operator does not process records. The backpressure is high between source and middle task which results in blocking the source task in `requestBufferBuilder`. Based on above two points, I guess the middle task is waiting for barrier from some source tasks. For the input channels which already receives the barriers, the middle task would not process the following data buffers and just cache them, so it would result in backpressure the corresponding source based on credit-based flow control. For the input channels without barriers, if there are also no data buffers, then the middle task would not have any outputs. So I think one hint is to trace why some source task emits barrier delay. In order to double check the above analysis, you can change the checkpoint mode from `exactly-once` to `at-least once`, if the cpu usages and task TPS are not decreased for a period as before, I think we could confirm the above analysis. :) Best, Zhijiang -- From:Paul Lam Send Time:2019年2月28日(星期四) 15:17 To:user Subject:Flink performance drops when async checkpoint is slow Hi, I have a Flink job (version 1.5.3) that consumes from Kafka topic, does some transformations and aggregates, and write to two Kafka topics respectively. Meanwhile, there’s a custom source that pulls configurations for the transformations periodically. The generic job graph is as below. <屏幕快照 2019-02-25 11.24.54.png> The job uses FsStateBackend and checkpoints to HDFS, but HDFS’s load is unstable, and sometimes HDFS client reports slow read and slow waitForAckedSeqno during checkpoints. When that happens, the Flink job consume rate drops significantly, and some taskmanager’ cpu usage drops from about 140% to 1%, all the task threads on that taskmanager are blocked. This situation lasts from seconds to a minute. We started a parallel job with everything the same except checkpointing disabled, and it runs very steady. But I think as the checkpointing is async, it should not affect the task threads. There are some additional information that we observed: - When the performance drops, jstack shows that Kafka source and the task right after it is blocked at requesting memory buffer (with back pressure close to 1), and the last task is blocked at `SingleInputGate.getNextBufferOrEvent`. - The dashboard shows that the buffer during ali
Re: Checkpoints and catch-up burst (heavy back pressure)
Hi Arnaud, I think there are two key points. First the checkpoint barrier might be emitted delay from source under high backpressure for synchronizing lock. Second the barrier has to be queued in flighting data buffers, so the downstream task has to process all the buffers before barriers to trigger checkpoint and this would take some time under back pressure. There has three ways to work around: 1. Increase the checkpoint timeout avoid expire in short time. 2. Decrease the setting of network buffers to decrease the amount of flighting buffers before barrier, you can check the config of "taskmanager.network.memory.buffers-per-channel" and "taskmanager.network.memory.buffers-per-gate". 3. Adjust the parallelism such as increasing it for sink vertex in order to process source data faster, to avoid backpressure in some extent. You could check which way is suitable for your scenario and may have a try. Best, Zhijiang -- From:LINZ, Arnaud Send Time:2019年2月28日(星期四) 17:28 To:user Subject:Checkpoints and catch-up burst (heavy back pressure) Hello, I have a simple streaming app that get data from a source and store it to HDFS using a sink similar to the bucketing file sink. Checkpointing mode is “exactly once”. Everything is fine on a “normal” course as the sink is faster than the source; but when we stop the application for a while and then restart it, we have a catch-up burst to get all the messages emitted in the meanwhile. During this burst, the source is faster than the sink, and all checkpoints fail (time out) until the source has been totally caught up. This is annoying because the sink does not “commit” the data before a successful checkpoint is made, and so the app release all the “catch up” data as a atomic block that can be huge if the streaming app was stopped for a while, adding an unwanted stress to all the following hive treatments that use the data provided in micro batches and to the Hadoop cluster. How should I handle the situation? Is there something special to do to get checkpoints even during heavy load? The problem does not seem to be new, but I was unable to find any practical solution in the documentation. Best regards, Arnaud L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur. The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.
Re: Flink performance drops when async checkpoint is slow
Hi Paul, I am not sure whether task thread is involverd in some works during snapshoting states for FsStateBackend. But I have another experience which might also cause your problem. From your descriptions below, the last task is blocked by `SingleInputGate.getNextBufferOrEvent` that means the middle task does not have any outpus or the middle operator does not process records. The backpressure is high between source and middle task which results in blocking the source task in `requestBufferBuilder`. Based on above two points, I guess the middle task is waiting for barrier from some source tasks. For the input channels which already receives the barriers, the middle task would not process the following data buffers and just cache them, so it would result in backpressure the corresponding source based on credit-based flow control. For the input channels without barriers, if there are also no data buffers, then the middle task would not have any outputs. So I think one hint is to trace why some source task emits barrier delay. In order to double check the above analysis, you can change the checkpoint mode from `exactly-once` to `at-least once`, if the cpu usages and task TPS are not decreased for a period as before, I think we could confirm the above analysis. :) Best, Zhijiang -- From:Paul Lam Send Time:2019年2月28日(星期四) 15:17 To:user Subject:Flink performance drops when async checkpoint is slow Hi, I have a Flink job (version 1.5.3) that consumes from Kafka topic, does some transformations and aggregates, and write to two Kafka topics respectively. Meanwhile, there’s a custom source that pulls configurations for the transformations periodically. The generic job graph is as below. The job uses FsStateBackend and checkpoints to HDFS, but HDFS’s load is unstable, and sometimes HDFS client reports slow read and slow waitForAckedSeqno during checkpoints. When that happens, the Flink job consume rate drops significantly, and some taskmanager’ cpu usage drops from about 140% to 1%, all the task threads on that taskmanager are blocked. This situation lasts from seconds to a minute. We started a parallel job with everything the same except checkpointing disabled, and it runs very steady. But I think as the checkpointing is async, it should not affect the task threads. There are some additional information that we observed: - When the performance drops, jstack shows that Kafka source and the task right after it is blocked at requesting memory buffer (with back pressure close to 1), and the last task is blocked at `SingleInputGate.getNextBufferOrEvent`. - The dashboard shows that the buffer during alignment is less than 10 MB, even when back pressure is high. We’ve been struggling with this problem for weeks, and any help is appreciated. Thanks a lot! Best, Paul Lam 屏幕快照 2019-02-25 11.24.54.png Description: Binary data
Re: Confusion in Heartbeat configurations
Hi sohimankotia, In order not to strongly rely on the akka implementation, flink implements the heartbeat mechanism for health monitor for the components of TaskExecutor, JobMaster and ResourceManager from FLIP6. So you can see two sets of heartbeat setting, one is for akka internal implementation prefix with `akka` and the other is flink internal implementation. Best, Zhijiang -- From:sohimankotia Send Time:2019年2月18日(星期一) 14:40 To:user Subject:Confusion in Heartbeat configurations Hi, In https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html link there are two heartbeat config are mentioned . akka.watch.heartbeat.interval akka.watch.heartbeat.pause Vs heartbeat.interval heartbeat.timeout Can u guys pls explain what exactly is difference between them and which component of job execution graph they impact . Thanks -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: [DISCUSS] Adding a mid-term roadmap to the Flink website
Thanks Stephan for this proposal and I totally agree with it. It is very necessary to summarize the overall features/directions the community is going or planning to go. Although I almost checked the mailing list everyday, it still seems difficult to trace everything. In addtion I think this whole roadmap picture can also help expose the relationships among different items, even avoid the similar/duplicated thoughts or works. Just one small suggestion, if we coule add some existing link (jira/discussion/FLIP/google doc) for each listed item, then it would be easy to keep trace of the interested one and handle the progress of it. Best, Zhijiang -- From:Jeff Zhang Send Time:2019年2月14日(星期四) 18:03 To:Stephan Ewen Cc:dev ; user ; jincheng sun ; Shuyi Chen ; Rong Rong Subject:Re: [DISCUSS] Adding a mid-term roadmap to the Flink website Hi Stephan, Thanks for this proposal. It is a good idea to track the roadmap. One suggestion is that it might be better to put it into wiki page first. Because it is easier to update the roadmap on wiki compared to on flink web site. And I guess we may need to update the roadmap very often at the beginning as there's so many discussions and proposals in community recently. We can move it into flink web site later when we feel it could be nailed down. Stephan Ewen 于2019年2月14日周四 下午5:44写道: > Thanks Jincheng and Rong Rong! > > I am not deciding a roadmap and making a call on what features should be > developed or not. I was only collecting broader issues that are already > happening or have an active FLIP/design discussion plus committer support. > > Do we have that for the suggested issues as well? If yes , we can add them > (can you point me to the issue/mail-thread), if not, let's try and move the > discussion forward and add them to the roadmap overview then. > > Best, > Stephan > > > On Wed, Feb 13, 2019 at 6:47 PM Rong Rong wrote: > >> Thanks Stephan for the great proposal. >> >> This would not only be beneficial for new users but also for contributors >> to keep track on all upcoming features. >> >> I think that better window operator support can also be separately group >> into its own category, as they affects both future DataStream API and batch >> stream unification. >> can we also include: >> - OVER aggregate for DataStream API separately as @jincheng suggested. >> - Improving sliding window operator [1] >> >> One more additional suggestion, can we also include a more extendable >> security module [2,3] @shuyi and I are currently working on? >> This will significantly improve the usability for Flink in corporate >> environments where proprietary or 3rd-party security integration is needed. >> >> Thanks, >> Rong >> >> >> [1] >> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Improvement-to-Flink-Window-Operator-with-Slicing-td25750.html >> [2] >> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-security-improvements-td21068.html >> [3] >> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Kerberos-Improvement-td25983.html >> >> >> >> >> On Wed, Feb 13, 2019 at 3:39 AM jincheng sun >> wrote: >> >>> Very excited and thank you for launching such a great discussion, >>> Stephan ! >>> >>> Here only a little suggestion that in the Batch Streaming Unification >>> section, do we need to add an item: >>> >>> - Same window operators on bounded/unbounded Table API and DataStream >>> API >>> (currently OVER window only exists in SQL/TableAPI, DataStream API does >>> not yet support) >>> >>> Best, >>> Jincheng >>> >>> Stephan Ewen 于2019年2月13日周三 下午7:21写道: >>> >>>> Hi all! >>>> >>>> Recently several contributors, committers, and users asked about making >>>> it more visible in which way the project is currently going. >>>> >>>> Users and developers can track the direction by following the >>>> discussion threads and JIRA, but due to the mass of discussions and open >>>> issues, it is very hard to get a good overall picture. >>>> Especially for new users and contributors, is is very hard to get a >>>> quick overview of the project direction. >>>> >>>> To fix this, I suggest to add a brief roadmap summary to the homepage. >>>> It is a bit of a commitment to keep that roadmap up to date, but I think >>>> the benefit for users justifies that. >>>> The Apach
Re: [ANNOUNCE] New Flink PMC member Thomas Weise
Congrats Thomas! Best, Zhijiang -- From:Kostas Kloudas Send Time:2019年2月12日(星期二) 22:46 To:Jark Wu Cc:Hequn Cheng ; Stefan Richter ; user Subject:Re: [ANNOUNCE] New Flink PMC member Thomas Weise Congratulations Thomas! Best, Kostas On Tue, Feb 12, 2019 at 12:39 PM Jark Wu wrote: Congrats Thomas! On Tue, 12 Feb 2019 at 18:58, Hequn Cheng wrote: Congrats Thomas! Best, Hequn On Tue, Feb 12, 2019 at 6:53 PM Stefan Richter wrote: Congrats Thomas!, Best, Stefan Am 12.02.2019 um 11:20 schrieb Stephen Connolly : Congratulations to Thomas. I see that this is not his first time in the PMC rodeo... also somebody needs to update LDAP as he's not on https://people.apache.org/phonebook.html?pmc=flink yet! -stephenc On Tue, 12 Feb 2019 at 09:59, Fabian Hueske wrote: Hi everyone, On behalf of the Flink PMC I am happy to announce Thomas Weise as a new member of the Apache Flink PMC. Thomas is a long time contributor and member of our community. He is starting and participating in lots of discussions on our mailing lists, working on topics that are of joint interest of Flink and Beam, and giving talks on Flink at many events. Please join me in welcoming and congratulating Thomas! Best, Fabian -- Kostas Kloudas | Software Engineer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany --Data Artisans GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
Re: ConnectTimeoutException when createPartitionRequestClient
Hi Wenrui, I suspect another issue which might cause connection failure. You can check whether the netty server already binds and listens port successfully in time before the client requests connection. If there exists some time-consuming process during TM startup which might delay netty server start, so when the client requests connection, the server is not ready which may cause connection timeout or failure. From your description, it seems exist in only some TM. Because when you decrease the total parallel, it might miss the problem TM and does not cause this issue. The default number of netty thread and timeout should make sense for normal cases. Best, Zhijiang -- From:Wenrui Meng Send Time:2019年1月9日(星期三) 18:18 To:Till Rohrmann Cc:user ; Konstantin Subject:Re: ConnectTimeoutException when createPartitionRequestClient Hi Till, This job is not on AthenaX but on a special uber version Flink. I tried to ping the connected host from connecting host. It seems very stable. For the connection timeout, I do set it as 20min but it still report the timeout after 2 minutes. Could you let me know how do you test locally about the timeout setting? Thanks, Wenrui On Tue, Jan 8, 2019 at 7:06 AM Till Rohrmann wrote: Hi Wenrui, the exception now occurs while finishing the connection creation. I'm not sure whether this is so different. Could it be that your network is overloaded or not very reliable? Have you tried running your Flink job outside of AthenaX? Cheers, Till On Tue, Jan 8, 2019 at 2:50 PM Wenrui Meng wrote: Hi Till, Thanks for your reply. Our cluster is Yarn cluster. I found that if we decrease the total parallel the timeout issue can be avoided. But we do need that amount of taskManagers to process data. In addition, once I increase the netty server threads to 128, the error is changed to to following error. It seems the cause is different. Could you help take a look? 2b0ac47c1eb1bcbbbe4a97) switched from RUNNING to FAILED. java.io.IOException: Connecting the channel failed: Connecting to remote task manager + 'athena464-sjc1/10.70.129.13:39466' has failed. This might indicate that the remote task manager has been lost. at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197) at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:132) at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:84) at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:59) at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:156) at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:480) at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.requestPartitions(UnionInputGate.java:134) at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:148) at org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:93) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:214) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connecting to remote task manager + 'athena464-sjc1/10.70.129.13:39466' has failed. This might indicate that the remote task manager has been lost. at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220) at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:132) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424
Re: Buffer stats when Back Pressure is high
Hi Gagan, What flink version do you use? And have you checked the buffers.inputQueueLength for all the related parallelism (connected with A) of B? It may exist the scenario that only one parallelim B is full of inqueue buffers which back pressure A, and the input queue for other parallelism B is empty. Best, Zhijiang -- From:Gagan Agrawal Send Time:2019年1月7日(星期一) 12:06 To:user Subject:Buffer stats when Back Pressure is high Hi, I want to understand does any of buffer stats help in debugging / validating that downstream operator is performing slow when Back Pressure is high? Say I have A -> B operators and A shows High Back Pressure which indicates something wrong or not performing well on B side which is slowing down operator A. However when I look at buffers.inputQueueLength for operator B, it's 0. My understanding is that when B is processing slow, it's input buffer will be full of incoming messages which ultimately blocks/slows down upstream operator A. However it doesn't seem to be happening in my case. Can someone throw some light on how should different stats around buffers (e.g buffers.inPoolUsage, buffers.inputQueueLength, numBuffersInLocalPerSecond, numBuffersInRemotePerSecond) look like when downstream operator is performing slow? Gagan
回复:buffer pool is destroyed
Hi Shuang, Normally this exception you mentioned is not the root cause of failover, and it is mainly caused by cancel process to make task exit. You can further check whether there are other failures in job master log to find the root cause. Best, Zhijiang -- 发件人:Chan, Shuang 发送时间:2018年12月21日(星期五) 11:12 收件人:user@flink.apache.org 主 题:buffer pool is destroyed Hi Flink community, I have a custom source that emits an user-defined data type, BaseEvent. The following code works fine when BaseEvent is not POJO. But, when I changed it to POJO by adding a default constructor, I’m getting “Buffer Pool is destroyed” runtime exception on the Collect method. DataStream eventStream = see.addSource(new AgoraSource(configFile, instance)); DataStream> result_order = eventStream .filter(e -> e instanceof OrderEvent) .map(e -> (OrderEvent)e) .map(e -> new Tuple3<>(e.SecurityID, Long.valueOf(1), Double.valueOf(e.OriginalQuantity))).returns(info_tuple3) .keyBy(e -> e.f0) .timeWindow(Time.seconds(5)) .reduce((a, b) -> new Tuple3<>(a.f0, a.f1 + b.f1, a.f2 + b.f2)) .map(e -> new Tuple4<>(e.f0, e.f1, e.f2, "Order")).returns(info_tuple4); Any idea? Shuang == Please access the attached hyperlink for an important electronic communications disclaimer: http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html ==
回复:回复:Flink job failing due to "Container is running beyond physical memory limits" error.
It may work aournd by increasing the task manager memory size. The recover failure is up to serveral issues, whether it had successful checkpoint before, the states are available and what is the failover strategy? Best, Zhijiang -- 发件人:Flink Developer 发送时间:2018年11月26日(星期一) 16:37 收件人:Flink Developer 抄 送:zhijiang ; user ; Gagan Agrawal 主 题:Re: 回复:Flink job failing due to "Container is running beyond physical memory limits" error. Also, after the Flink job has failed from the above error, the Flink job is unable to recover from previous checkpoint. Is this the expected behavior? How can the job be recovered successfully from this? ‐‐‐ Original Message ‐‐‐ On Monday, November 26, 2018 12:35 AM, Flink Developer wrote: I am also experiencing this error message "Container is running beyond physical memory limits". In my case, I am using Flink 1.5.2 with 10 task managers, with 40 slots for each task manager. The memory assigned during flink cluster creation is 1024MB per task manager. The checkpoint is using RocksDb and the checkpoint size is very small (10MB). Is the simply solution to increase the Task Manager memory size? I will try from 1024MB to 4096MB per task manager. ‐‐‐ Original Message ‐‐‐ On Sunday, November 25, 2018 7:58 PM, zhijiang wrote: I think it is probably related with rockdb memory usage if you have not found OutOfMemory issue before. There already existed a jira ticket [1] for fixing this issue, and you can watch it for updates. :) [1] https://issues.apache.org/jira/browse/FLINK-10884 Best, Zhijiang -- 发件人:Gagan Agrawal 发送时间:2018年11月24日(星期六) 14:14 收件人:user 主 题:Flink job failing due to "Container is running beyond physical memory limits" error. Hi, I am running flink job on yarn where it ran fine so far (4-5 days) and have now started failing with following errors. 2018-11-24 03:46:21,029 INFO org.apache.flink.yarn.YarnResourceManager - Closing TaskExecutor connection container_1542008917197_0038_01_06 because: Container [pid=18380,containerID=container_1542008917197_0038_01_06] is running beyond physical memory limits. Current usage: 3.0 GB of 3 GB physical memory used; 5.0 GB of 15 GB virtual memory used. Killing container. This is simple job where we are reading 2 Avro streams from Kafka and applying some custom UDF after creating keyed stream from union on those 2 streams and writing back output to Kafka. Udf internally uses Map State with RocksDB backend. Currently size of checkpoint is around 300 GB and we are running this with 10 task manager with 3 GB memory each. I have also set "containerized.heap-cutoff-ratio: 0.5" but still facing same issue. Flink version is 1.6.2 Here is the flink command ./bin/flink run -m yarn-cluster -yd -yn 10 -ytm 3072 -ys 4 job.jar I want to understand what are typical reasons for this issue? Also why would flink consume more memory than allocated as JVM memory is fixed and will not grow beyond max heap. Can this be something related to RocksDB where it may be consuming memory outside heap and hence over using defined limits? I didn't find this issue when checkpoint size was small (<50 GB). But ever since we are now at 300GB size, this issue is coming frequently. I can try increasing memory, but I am still interested in knowing what are typical reasons for this error if Jvm heap memory can not grow beyond defined limit. Gagan
回复:Flink job failing due to "Container is running beyond physical memory limits" error.
I think it is probably related with rockdb memory usage if you have not found OutOfMemory issue before. There already existed a jira ticket [1] for fixing this issue, and you can watch it for updates. :) [1] https://issues.apache.org/jira/browse/FLINK-10884 Best, Zhijiang -- 发件人:Gagan Agrawal 发送时间:2018年11月24日(星期六) 14:14 收件人:user 主 题:Flink job failing due to "Container is running beyond physical memory limits" error. Hi, I am running flink job on yarn where it ran fine so far (4-5 days) and have now started failing with following errors. 2018-11-24 03:46:21,029 INFO org.apache.flink.yarn.YarnResourceManager - Closing TaskExecutor connection container_1542008917197_0038_01_06 because: Container [pid=18380,containerID=container_1542008917197_0038_01_06] is running beyond physical memory limits. Current usage: 3.0 GB of 3 GB physical memory used; 5.0 GB of 15 GB virtual memory used. Killing container. This is simple job where we are reading 2 Avro streams from Kafka and applying some custom UDF after creating keyed stream from union on those 2 streams and writing back output to Kafka. Udf internally uses Map State with RocksDB backend. Currently size of checkpoint is around 300 GB and we are running this with 10 task manager with 3 GB memory each. I have also set "containerized.heap-cutoff-ratio: 0.5" but still facing same issue. Flink version is 1.6.2 Here is the flink command ./bin/flink run -m yarn-cluster -yd -yn 10 -ytm 3072 -ys 4 job.jar I want to understand what are typical reasons for this issue? Also why would flink consume more memory than allocated as JVM memory is fixed and will not grow beyond max heap. Can this be something related to RocksDB where it may be consuming memory outside heap and hence over using defined limits? I didn't find this issue when checkpoint size was small (<50 GB). But ever since we are now at 300GB size, this issue is coming frequently. I can try increasing memory, but I am still interested in knowing what are typical reasons for this error if Jvm heap memory can not grow beyond defined limit. Gagan
回复:OutOfMemoryError while doing join operation in flink
Hi Akshay, Sorrry I have not thought of a proper way to handle single large record in distributed task managers in flink. But I can give some hints for adjusting the related memories for work around OOM issue. Large fraction of memories in task manager are managed by flink for efficiency, and these memories are long live persistent in JVM not recycled by gc. You can check the parameter "taskmanager.memory.fraction" for this and the default value is 0.7 if you have not changed, that means 7GB * 0.7 are used by framework. I am not sure what is the flink version you used. If I rememberd correctly, before release-1.5 the network buffers also uses heap memories by default, so you should also minus this part of memory from total task manager memory. If not considering network buffer used by framework, you only leave 7GB * 0.3 temporaray memories for other parts. The temporaray memories in serializer will exceed twice as current size every time if not covering the record size, that means one serializer may need 2GB overhead memories for your 1GB record. You have 2 slots per task manager for running two tasks, so the total overhead memories may need 4GB almost. So you can decrease the "taskmanager.memory.fraction" in low fraction or increase the total task manager to cover this overhead memories, or set one slot for each task manager. Best, Zhijiang -- 发件人:Akshay Mendole 发送时间:2018年11月23日(星期五) 02:54 收件人:trohrmann 抄 送:zhijiang ; user ; Shreesha Madogaran 主 题:Re: OutOfMemoryError while doing join operation in flink Hi, Thanks for your reply. I tried running a simple "group by" on just one dataset where few keys are repeatedly occurring (in order of millions) and did not include any joins. I wanted to see if this issue is specific to join. But as I was expecting, I ran into the same issue. I am giving 7GBs to each task manager with 2 slots per task manager. From what I understood so far, such cases where individual records somewhere in the pipeline become so large that they should be handled in distributed manner instead of handling them by a simple data structure in single JVM. I am guessing there is no way to do this in Flink today. Could you please confirm this? Thanks, Akshay On Thu, Nov 22, 2018 at 9:28 PM Till Rohrmann wrote: Hi Akshay, Flink currently does not support to automatically distribute hot keys across different JVMs. What you can do is to adapt the parallelism/number of partitions manually if you encounter that one partition contains a lot of hot keys. This might mitigate the problem by partitioning the hot keys into different partitions. Apart from that, the problem seems to be as Zhijiang indicated that your join result is quite large. One record is 1 GB large. Try to decrease it or give more memory to your TMs. Cheers, Till On Thu, Nov 22, 2018 at 1:08 PM Akshay Mendole wrote: Hi Zhijiang, Thanks for the quick reply. My concern is more towards how flink perform joins of two skewed datasets. Pig and spark seems to support the join of skewed datasets. The record size that you are mentioning about in your reply is after join operation takes place which is definitely going to be huge enough not to fit in jvm task manager task slot in my use case. We want to know if there is a way in flink to handle such skewed keys by distributing their values across different jvms. Let me know if you need more clarity on the issue. Thanks, Akshay On Thu, Nov 22, 2018 at 2:38 PM zhijiang wrote: Hi Akshay, You encountered an existing issue for serializing large records to cause OOM. Every subpartition would create a separate serializer before, and each serializer would maintain an internal bytes array for storing intermediate serialization results. The key point is that these overhead internal bytes array are not managed by framework, and their size would exceed with the record size dynamically. If your job has many subpartitions with large records, it may probably cause OOM issue. I already improved this issue to some extent by sharing only one serializer for all subpartitions [1], that means we only have one bytes array overhead at most. This issue is covered in release-1.7. Currently the best option may reduce your record size if possible or you can increase the heap size of task manager container. [1] https://issues.apache.org/jira/browse/FLINK-9913 Best, Zhijiang -- 发件人:Akshay Mendole 发送时间:2018年11月22日(星期四) 13:43 收件人:user 主 题:OutOfMemoryError while doing join operation in flink Hi, We are converting one of our pig pipelines to flink using apache beam. The pig pipeline reads two different data sets (R1 & R2) from hdfs, enriches them, joins them and dumps back to hdfs. The data set R1 is skewed. In a sense, it has few keys with lot of records. When we co
回复:OutOfMemoryError while doing join operation in flink
Hi Akshay, You encountered an existing issue for serializing large records to cause OOM. Every subpartition would create a separate serializer before, and each serializer would maintain an internal bytes array for storing intermediate serialization results. The key point is that these overhead internal bytes array are not managed by framework, and their size would exceed with the record size dynamically. If your job has many subpartitions with large records, it may probably cause OOM issue. I already improved this issue to some extent by sharing only one serializer for all subpartitions [1], that means we only have one bytes array overhead at most. This issue is covered in release-1.7. Currently the best option may reduce your record size if possible or you can increase the heap size of task manager container. [1] https://issues.apache.org/jira/browse/FLINK-9913 Best, Zhijiang -- 发件人:Akshay Mendole 发送时间:2018年11月22日(星期四) 13:43 收件人:user 主 题:OutOfMemoryError while doing join operation in flink Hi, We are converting one of our pig pipelines to flink using apache beam. The pig pipeline reads two different data sets (R1 & R2) from hdfs, enriches them, joins them and dumps back to hdfs. The data set R1 is skewed. In a sense, it has few keys with lot of records. When we converted the pig pipeline to apache beam and ran it using flink on a production yarn cluster, we got the following error 2018-11-21 16:52:25,307 ERROR org.apache.flink.runtime.operators.BatchTask - Error in task code: GroupReduce (GroupReduce at CoGBK/GBK) (25/100) java.lang.RuntimeException: Emitting the record caused an I/O exception: Failed to serialize element. Serialized size (> 1136656562 bytes) exceeds JVM heap space at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:69) at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) at org.apache.beam.runners.flink.translation.functions.SortingFlinkCombineRunner.combine(SortingFlinkCombineRunner.java:140) at org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction.reduce(FlinkReduceFunction.java:85) at org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator$TupleUnwrappingNonCombinableGroupReducer.reduce(PlanUnwrappingReduceGroupOperator.java:111) at org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:131) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Failed to serialize element. Serialized size (> 1136656562 bytes) exceeds JVM heap space at org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:323) at org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:149) at org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:48) at java.io.DataOutputStream.write(DataOutputStream.java:107) at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877) at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786) at java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1286) at java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1577) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351) at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:170) at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:50) at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136) at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:71) at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:58) at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:32) at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:98) at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60) at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136) at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:71) at org.apache.beam.sdk.coders.KvCoder.encode(KvCod
回复:checkpoint/taskmanager is stuck, deadlock on LocalBufferPool
From the stack below, it indicates there are no available buffers for source outputs including watermark and normal records, so the source will be blocked on request buffer from LocalBufferPool. The checkpoint process is also affected by above blocking request. The root cause is why the queued output buffers are not consumed by downstream tasks. I think you can check the downstream task which inqueue usage should reach 100%, then jstack the corresponding downstream tasks that may stuck in some operations to cause back pressure. Best, Zhijiang -- 发件人:Yan Zhou [FDS Science] 发送时间:2018年10月23日(星期二) 02:29 收件人:user@flink.apache.org 主 题:Re: checkpoint/taskmanager is stuck, deadlock on LocalBufferPool I am using flink 1.5.3 From: Yan Zhou [FDS Science] Sent: Monday, October 22, 2018 11:26 To: user@flink.apache.org Subject: checkpoint/taskmanager is stuck, deadlock on LocalBufferPool Hi, My application suddenly stuck and completely doesn't move forward after running for a few days. No exceptions are found. From the thread dump, I can see that the operator threads and checkpoint threads deadlock on LocalBufferPool. LocalBufferPool is not able to request memory and keep the lock. Please see the thread dump at the bottom. It uses rocksdb as statebackend. From the heap dump and web ui, there are plenty of memory in jvm and it doesn't have GC problem. Check points were good until there was the problem: 2018-10-19 04:41:23,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 4347 @ 1539891683667 for job 1. 2018-10-19 04:41:45,069 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 4347 for job 1 (1019729450 bytes in 13600 ms). 2018-10-19 04:46:45,089 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 4348 @ 1539892005069 for job 1. 2018-10-19 04:56:45,089 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 4348 of job 1 expired before completing. This happened at mid night and the traffic was relatively low. Even if there was a spike and caused a back pressure, to my understand that the events should be processed eventually and the network buffer would be available after that. What might be the cause of it? Best Yan "Time Trigger for Source: Custom Source -> Flat Map -> Flat Map -> Timestamps/Watermarks -> (from: ...s#363 daemon prio=5 os_prio=0 tid=0x7ff187944000 nid=0x8f76 in Object.wait() [0x7ff12fda9000] java.lang.Thread.State: TIMED_WAITING (on object monitor) at java.lang.Object.wait(Native Method) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:247) - locked <0x0006dadeeac8> (a java.util.ArrayDeque) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:204) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.broadcastEmit(RecordWriter.java:117) at org.apache.flink.streaming.runtime.io.StreamRecordWriter.broadcastEmit(StreamRecordWriter.java:87) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:121) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitWatermark(AbstractStreamOperator.java:668) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:736) at org.apache.flink.streaming.api.operators.ProcessOperator.processWatermark(ProcessOperator.java:72) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:479) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitWatermark(AbstractStreamOperator.java:668) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:736) at org.apache.flink.streaming.api.operators.ProcessOperator.processWatermark(ProcessOperator.java:72) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:479) at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.emitWatermark(OperatorChain.java:603) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitWatermark(AbstractStreamOperator.java:668) at org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.onProcessingTime(TimestampsAndPeriodicWatermarksOperator.java:77) at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeSer
回复:Need help to understand memory consumption
The operators for stream jobs will not use memory management which is only for batch jobs as you said. I guess the initial feedback is for batch jobs from the description? -- 发件人:Paul Lam 发送时间:2018年10月17日(星期三) 14:35 收件人:Zhijiang(wangzhijiang999) 抄 送:jpreisner ; user 主 题:Re: Need help to understand memory consumption Hi Zhijiang, Does the memory management apply to streaming jobs as well? A previous post[1] said that it can only be used in batch API, but I might miss some updates on that. Thank you! [1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=53741525 Best, Paul Lam 在 2018年10月17日,13:39,Zhijiang(wangzhijiang999) 写道: Hi Julien, Flink would manage the default 70% fraction of free memory in TaskManager for caching data efficiently, just as you mentioned in this article "https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html;. These managed memories are persistent resident and referenced by the MemoryManager once allocated, so they will be resident in old region of JVM and will not be recycled by gc. To do so, wecan aovid the costs of creating and recycling the objects repeatedly. The default parameter "taskmanager.memory.preallocate" is false, that means these managed memories will not be allocated during starting TaskManager. When the job is running, the related tasks would request these managed memories and then you will see the memory consumption is high. When the job is cancelled, these managed memories will be released to the MemoryManager but not recycled by gc, so you will see no changes in memory consumption. After you restart the TaskManager, the initial memory consumption is low because of lazy allocating via taskmanager.memory.preallocate=false. Best, Zhijiang -- 发件人:Paul Lam 发送时间:2018年10月17日(星期三) 12:31 收件人:jpreisner 抄 送:user 主 题:Re: Need help to understand memory consumption Hi Julien, AFAIK, streaming jobs put data objects on heap, so the it depends on the JVM GC to release the memory. Best, Paul Lam > 在 2018年10月12日,14:29,jpreis...@free.fr 写道: > > Hi, > > My use case is : > - I use Flink 1.4.1 in standalone cluster with 5 VM (1 VM = 1 JobManager + 1 > TaskManager) > - I run N jobs per days. N may vary (one day : N=20, another day : N=50, > ...). All jobs are the same. They connect to Kafka topics and have two DB2 > connector. > - Depending on a special event, a job can self-restart via the command : > bin/flink cancel > - At the end of the day, I cancel all jobs > - Each VM is configured with 16Gb RAM > - Allocated memory configured for one taskmanager is 10Gb > > After several days, the memory saturates (we exceed 14Gb of used memory). > > I read the following posts but I did not succeed in understanding my problem : > - https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html > - http://mail-archives.apache.org/mod_mbox/flink-user/201711.mbox/browser > > I did some tests on a machine (outside the cluster) with the top command and > this is what I concluded (please see attached file - Flink_memory.PNG) : > - When a job is started and running, it consumes memory > - When a job is cancelled, a large part of the memory is still used > - When another job is started and running (after to have cancel the previous > job), even more memory is consumed > - When I restart jobmanager and taskmanager, memory returns to normal > > Why when a job is canceled, the memory is not released? > > I added another attachment that represents the graph of a job - Graph.PNG. > If it can be useful we use MapFunction, FlatMapFunction, FilterFunction, > triggers and windows, ... > > Thanks in advance, > Julien
回复:Need help to understand memory consumption
Hi Julien, Flink would manage the default 70% fraction of free memory in TaskManager for caching data efficiently, just as you mentioned in this article "https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html;. These managed memories are persistent resident and referenced by the MemoryManager once allocated, so they will be resident in old region of JVM and will not be recycled by gc. To do so, wecan aovid the costs of creating and recycling the objects repeatedly. The default parameter "taskmanager.memory.preallocate" is false, that means these managed memories will not be allocated during starting TaskManager. When the job is running, the related tasks would request these managed memories and then you will see the memory consumption is high. When the job is cancelled, these managed memories will be released to the MemoryManager but not recycled by gc, so you will see no changes in memory consumption. After you restart the TaskManager, the initial memory consumption is low because of lazy allocating via taskmanager.memory.preallocate=false. Best, Zhijiang -- 发件人:Paul Lam 发送时间:2018年10月17日(星期三) 12:31 收件人:jpreisner 抄 送:user 主 题:Re: Need help to understand memory consumption Hi Julien, AFAIK, streaming jobs put data objects on heap, so the it depends on the JVM GC to release the memory. Best, Paul Lam > 在 2018年10月12日,14:29,jpreis...@free.fr 写道: > > Hi, > > My use case is : > - I use Flink 1.4.1 in standalone cluster with 5 VM (1 VM = 1 JobManager + 1 > TaskManager) > - I run N jobs per days. N may vary (one day : N=20, another day : N=50, > ...). All jobs are the same. They connect to Kafka topics and have two DB2 > connector. > - Depending on a special event, a job can self-restart via the command : > bin/flink cancel > - At the end of the day, I cancel all jobs > - Each VM is configured with 16Gb RAM > - Allocated memory configured for one taskmanager is 10Gb > > After several days, the memory saturates (we exceed 14Gb of used memory). > > I read the following posts but I did not succeed in understanding my problem : > - https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html > - http://mail-archives.apache.org/mod_mbox/flink-user/201711.mbox/browser > > I did some tests on a machine (outside the cluster) with the top command and > this is what I concluded (please see attached file - Flink_memory.PNG) : > - When a job is started and running, it consumes memory > - When a job is cancelled, a large part of the memory is still used > - When another job is started and running (after to have cancel the previous > job), even more memory is consumed > - When I restart jobmanager and taskmanager, memory returns to normal > > Why when a job is canceled, the memory is not released? > > I added another attachment that represents the graph of a job - Graph.PNG. > If it can be useful we use MapFunction, FlatMapFunction, FilterFunction, > triggers and windows, ... > > Thanks in advance, > Julien
回复:What are channels mapped to?
The channels are mapped to the subpartition index which would be consumed by specific downstream task parallelism. For example, if there are three reduce tasks parallelism, every map task would generate three subpartitions. If one record is hashed to the first channel, that means this record will be consumed by the first reduce task. Best, Zhijiang -- 发件人:Chris Miller 发送时间:2018年10月11日(星期四) 16:54 收件人:user 主 题:What are channels mapped to? Hi, in the OutputEmitter, the output channel can be selected in different manner. eg. OutputEmitter#hashPartitionDefault() What are the channels mapped to? Do they map to one IP Address or Port? Thanks. Chris
回复:Small checkpoint data takes too much time
The checkpoint duration includes the processes of barrier alignment and state snapshot. Every task has to receive all the barriers from all the channels, then trriger to snapshot state. I guess the barrier alignment may take long time for your case, and it is specially critical during backpressure. You can check the metric of "checkpointAlignmentTime" for confirmation. Best, Zhijiang -- 发件人:徐涛 发送时间:2018年10月10日(星期三) 13:13 收件人:user 主 题:Small checkpoint data takes too much time Hi I recently encounter a problem in production. I found checkpoint takes too much time, although it doesn`t affect the job execution. I am using FsStateBackend, writing the data to a HDFS checkpointDataUri, and asynchronousSnapshots, I print the metric data “lastCheckpointDuration” and “lastCheckpointSize”. It shows the “lastCheckpointSize” is about 80KB, but the “lastCheckpointDuration” is about 160s! Because checkpoint data is small , I think it should not take that long time. I do not know why and which condition may influent the checkpoint time. Does anyone has encounter such problem? Thanks a lot. Best Henry
回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks
There actually exists this deadlock for special scenarios. Before fixing the bug, we can avoid this issue by not deploying the map and sink tasks in the same task manager to work around. Krishna, do you share the slot for these two tasks? If so, you can set disable slot sharing for this job. Or I guess we can set the ExecutionMode#PIPELINED_FORCED to not generate blocking result partition to avoid this issue temporarily. Best, Zhijiang -- 发件人:Piotr Nowojski 发送时间:2018年10月4日(星期四) 21:54 收件人:Aljoscha Krettek 抄 送:"Narayanaswamy, Krishna" ; Nico Kruber ; user@flink.apache.org 主 题:Re: Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks Hi, Thanks for reporting the problem. This bug was previously unknown to us. I have created a jira ticket for this bug: https://issues.apache.org/jira/browse/FLINK-10491 Unfortunately I’m not familiar with running batch jobs in Flink, so I don’t know if there is some hot fix or anything that can at least mitigate/decrease the probability of the bug for you until we fix it properly. Piotrek On 4 Oct 2018, at 13:55, Aljoscha Krettek wrote: Hi, this looks like a potential Flink bug. Looping in Nico and Piotr who have looked into that in the past. Could you please comment on that? Best, Aljoscha On 3. Oct 2018, at 12:12, Narayanaswamy, Krishna wrote: Hi, I am trying to run one large single job graph which has > 10k tasks. The form of the graph is something like DataSource -> Filter -> Map [...multiple] Sink1 Sink2 I am using a parallelism of 10 with 1 slot per task manager and a memory allocation of 32G per TM. The JM is running with 8G. Everything starts up and runs fine with close to 6-7k tasks (this is variable and is mostly the source /filter/map portions) completing and then the graph just hangs. I managed to connect to the task managers and get a thread dump just in time and found the following deadlock on one of the TMs which apparently seems to be holding up everything else. Please could someone take a look and advise if there is something I could do or try out to fix this. Marked below are the 2 isolated thread stacks marking the deadlock - Thread-1 "DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002" prio=5 tid=0x3e2 nid=NA waiting for monitor entry waiting for Map (Key Extractor) (1/10)@9967 to release lock on <0x2dfb> (a java.util.ArrayDeque) at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:223) at org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:373) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:355) - locked <0x2dfd> (a java.util.ArrayDeque) at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:402) at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:203) - locked <0x2da5> (a java.lang.Object) at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:193) at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.returnExclusiveSegments(SingleInputGate.java:318) at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.releaseAllResources(RemoteInputChannel.java:259) at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:578) at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.pollNextBufferOrEvent(SingleInputGate.java:507) at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.waitAndGetNextInputGate(UnionInputGate.java:213) at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:163) at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86) at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47) at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73) at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:216) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) at java.lang.Thread.run(Thread.java:745) Thread-2 "Map (Key Extractor) (1/10)@9967" prio=5 tid=0xaab nid=NA waiting for monitor entry java.lang.Thread.State: BLOCKED blocks DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002 waiting for DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002 to release lock on <0x2dfd> (a java.util.ArrayDeque) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:261) at org.apache.flink.run
回复:[DISCUSS] Dropping flink-storm?
Very agree with to drop it. +1 -- 发件人:Jeff Carter 发送时间:2018年9月29日(星期六) 10:18 收件人:dev 抄 送:chesnay ; Till Rohrmann ; user 主 题:Re: [DISCUSS] Dropping flink-storm? +1 to drop it. On Fri, Sep 28, 2018, 7:25 PM Hequn Cheng wrote: > Hi, > > +1 to drop it. It seems that few people use it. > > Best, Hequn > > On Fri, Sep 28, 2018 at 10:22 PM Chesnay Schepler > wrote: > > > I'm very much in favor of dropping it. > > > > Flink has been continually growing in terms of features, and IMO we've > > reached the point where we should cull some of the more obscure ones. > > flink-storm, while interesting from a theoretical standpoint, offers too > > little value. > > > > Note that the bolt/spout wrapper parts of the part are still compatible, > > it's only topologies that aren't working. > > > > IMO compatibility layers only add value if they ease the migration to > > Flink APIs. > > * bolt/spout wrappers do this, but they will continue to work even if we > > drop it > > * topologies don't do this, so I'm not interested in then. > > > > On 28.09.2018 15:22, Till Rohrmann wrote: > > > Hi everyone, > > > > > > I would like to discuss how to proceed with Flink's storm > > > compatibility layer flink-strom. > > > > > > While working on removing Flink's legacy mode, I noticed that some > > > parts of flink-storm rely on the legacy Flink client. In fact, at the > > > moment flink-storm does not work together with Flink's new distributed > > > architecture. > > > > > > I'm also wondering how many people are actually using Flink's Storm > > > compatibility layer and whether it would be worth porting it. > > > > > > I see two options how to proceed: > > > > > > 1) Commit to maintain flink-storm and port it to Flink's new > architecture > > > 2) Drop flink-storm > > > > > > I doubt that we can contribute it to Apache Bahir [1], because once we > > > remove the legacy mode, this module will no longer work with all newer > > > Flink versions. > > > > > > Therefore, I would like to hear your opinion on this and in particular > > > if you are using or planning to use flink-storm in the future. > > > > > > [1] https://github.com/apache/bahir-flink > > > > > > Cheers, > > > Till > > > > > > >
回复:InpoolUsage & InpoolBuffers inconsistence
Hi, The inpoolQueueLength indicates hown many buffers are received and queued. But if the buffers in the queue are the events (like barrier), it will not be calculated in the inpoolUsage. So in your case it may be normal for these two metrics. If you monitored that the autoread=false in downstream side, that means the inpoolUsage may already reach 100% and there are no available buffers to receive data from upstream side. But if the upstream still has available buffers to output, the upstream will not be blocked in this case. BTW, if the autoread=false and the inpoolUsage reaches 100%, there may be a lot of buffers queued in front of barrier, so the checkpoint may expire as you said. Best, Zhijiang -- 发件人:aitozi 发送时间:2018年9月18日(星期二) 12:59 收件人:user 主 题:Re: InpoolUsage & InpoolBuffers inconsistence And my doubt for that comes from the debug of problem of checkpoint expiration. I encountered the checkpoint expiration with no backpressure shown in web ui. But after i add many log, i found that the barrier send to the downstream, And the downstream may be set to autoread = false , and block the consume of the barrier. But temporary in inputchannel do not cause the upstream backpressure. I think this situation can be monitored by check the inpoolUsage metric, when it is 1, it may have some problem. But when i check the inpoolUsage and inpoolQueueLength, I found the inconsistent problem. Although the inpoolUsage is calculated by bestEffotGetUsedBuffer / allbuffers, Is this lead to the mistake ? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
回复:Flink application down due to RpcTimeout exception
Hi, 1. This rpc timeout occurs during JobMaster deploying task into TaskExecutor. The rpc thread in TaskExecutor does not respond the deployment message within 10 seconds. There are many possibilities to cause this issue, such as network problem between TaskExecutor and JobMaster or other time-consuming operators in TaskExecutor. The root cause may be a bit complicated for tracing. First you can debug when the TaskExecutor receives this message, then you can check when the TaskExecutor responses this message, and may also need check what is the rpc thread doing during these times. 2. You can increase the default value of rpc timeout parameter(akka.ask.timeout) to work around temporarily. Best, Zhijiang -- 发件人:徐涛 发送时间:2018年9月13日(星期四) 14:10 收件人:user 主 题:Flink application down due to RpcTimeout exception Hi All, I`m running flink1.6 on yarn,after the program run for a day, the flink program fails on yarn, and the error log is as follows: It seems that it is due to a timeout error. But I have the following questions: 1. In which step the flink components communicate failed? What are the two components? 2. How to solve this problem? Thanks a lot!! java.lang.Exception: Cannot deploy task LeftOuterJoin(where: (=(id, article_id)), join: (id, created_time, article_score, PU, article_id, CU, CN)) -> select: (id, created_time, article_score, PU, CU, CN) (2/2) (d403002a7accc5133cf89a386ddc1dfb) - TaskManager (container_1532509321420_463249_01_02 @ sh-bs-3-i1-hadoop-17-225 (dataPort=10459)) not responding after a rpcTimeout of 1 ms at org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$5(Execution.java:601) ~[flink-runtime_2.11-1.6.0.jar:1.6.0] at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) ~[na:1.8.0_65] at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) ~[na:1.8.0_65] at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) ~[na:1.8.0_65] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_65] at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_65] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[na:1.8.0_65] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[na:1.8.0_65] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_65] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0_65] at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_65] Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@sh-bs-3-i1-hadoop-17-225:24213/user/taskmanager_0#-1762816591]] after [1 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation". at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604) ~[akka-actor_2.11-2.4.20.jar:na] at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126) ~[akka-actor_2.11-2.4.20.jar:na] at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601) ~[scala-library-2.11.8.jar:na] at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) ~[scala-library-2.11.8.jar:na] at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) ~[scala-library-2.11.8.jar:na] at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329) ~[akka-actor_2.11-2.4.20.jar:na] at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280) ~[akka-actor_2.11-2.4.20.jar:na] at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284) ~[akka-actor_2.11-2.4.20.jar:na] at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236) ~[akka-actor_2.11-2.4.20.jar:na] ... 1 common frames omitted Best, Henry