Hi Piotr, Thank you very much for your response. I will try the new feature of Flink 1.5 when it is released.
But I am not sure minimising buffers sizes will work in all scenarios. If I understand correctly these settings are affecting the whole Flink instance. We might have a flow like this: Source: Read file paths --> Unpack and parse files --> Analyse parsed data -> …. So it will be a very small amount of data at first step but quite a lot of parsed data later. Changing buffer sizes globally will probably affect throughput of later steps, as you wrote. > On 23 May 2018, at 14:48, Piotr Nowojski <pi...@data-artisans.com> wrote: > > Hi, > > Yes, Flink 1.5.0 will come with better tools to handle this problem. Namely > you will be able to limit the “in flight” data, by controlling the number of > assigned credits per channel/input gate. Even without any configuring Flink > 1.5.0 will out of the box buffer less data, thus mitigating the problem. > > There are some tweaks that you could use to make 1.4.x work better. With > small records that require heavy processing, generally speaking you do not > need huge buffers sizes to keep max throughput. You can try to both reduce > the buffer pool and reduce the memory segment sizes: > > https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/config.html#configuring-the-network-buffers > > <https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/config.html#configuring-the-network-buffers> > • taskmanager.network.memory.fraction: Fraction of JVM memory to use > for network buffers (DEFAULT: 0.1), > • taskmanager.network.memory.min: Minimum memory size for network > buffers in bytes (DEFAULT: 64 MB), > • taskmanager.network.memory.max: Maximum memory size for network > buffers in bytes (DEFAULT: 1 GB), and > • taskmanager.memory.segment-size: Size of memory buffers used by the > memory manager and the network stack in bytes (DEFAULT: 32768 (= 32 KiBytes)). > > Reducing those values will reduce amount of in-flight data that will be > caught between checkpoints. But keep in mind that smaller values can lead to > smaller throughput, but as I said, with small number of heavy processing > records this is not an issue. In an extreme example, if your records are lets > say 8 bytes each and require 1 hour to process, there is almost no need for > any buffering. > > Piotrek > >> On 23 May 2018, at 12:58, Fabian Hueske <fhue...@gmail.com >> <mailto:fhue...@gmail.com>> wrote: >> >> Hi Andrei, >> >> With the current version of Flink, there is no general solution to this >> problem. >> The upcoming version 1.5.0 of Flink adds a feature called credit-based flow >> control which might help here. >> >> I'm adding @Piotr to this thread who knows more about the details of this >> new feature. >> >> Best, Fabian >> >> 2018-05-18 11:59 GMT+02:00 Andrei Shumanski <and...@shumanski.com >> <mailto:and...@shumanski.com>>: >> Hi, >> >> >> Right now it is a Kafka source, but I had the same issue when reading data >> from local FS. >> >> It looks like a common problem for many (all?) sources. >> When incoming data is very small (paths to large archives) but each entry >> requires a significant time to process (unpack, parse, etc.) Flink detects >> the back pressure with delay and too much data becomes part of the first >> transaction. >> >> >> >> -- >> Best regards, >> Andrei Shumanski >> >> >> >> On Fri, May 18, 2018 at 11:44 AM, makeyang <riverbuild...@hotmail.com >> <mailto:riverbuild...@hotmail.com>> wrote: >> Andrei Shumanski: >> which source are u using? >> >> >> >> -- >> Sent from: >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> >> >> >