Hi Piotr,

Thank you very much for your response.
I will try the new feature of Flink 1.5 when it is released.

But I am not sure minimising buffers sizes will work in all scenarios.
If I understand correctly these settings are affecting the whole Flink instance.

We might have a flow like this:

Source: Read file paths --> Unpack and parse files --> Analyse parsed data -> ….

So it will be a very small amount of data at first step but quite a lot of 
parsed data later.
Changing buffer sizes globally will probably affect throughput of later steps, 
as you wrote.


> On 23 May 2018, at 14:48, Piotr Nowojski <pi...@data-artisans.com> wrote:
> 
> Hi,
> 
> Yes, Flink 1.5.0 will come with better tools to handle this problem. Namely 
> you will be able to limit the “in flight” data, by controlling the number of 
> assigned credits per channel/input gate. Even without any configuring Flink 
> 1.5.0 will out of the box buffer less data, thus mitigating the problem.
> 
> There are some tweaks that you could use to make 1.4.x work better. With 
> small records that require heavy processing, generally speaking you do not 
> need huge buffers sizes to keep max throughput. You can try to both reduce 
> the buffer pool and reduce the memory segment sizes:
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/config.html#configuring-the-network-buffers
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/config.html#configuring-the-network-buffers>
>       • taskmanager.network.memory.fraction: Fraction of JVM memory to use 
> for network buffers (DEFAULT: 0.1),
>       • taskmanager.network.memory.min: Minimum memory size for network 
> buffers in bytes (DEFAULT: 64 MB),
>       • taskmanager.network.memory.max: Maximum memory size for network 
> buffers in bytes (DEFAULT: 1 GB), and
>       • taskmanager.memory.segment-size: Size of memory buffers used by the 
> memory manager and the network stack in bytes (DEFAULT: 32768 (= 32 KiBytes)).
> 
> Reducing those values will reduce amount of in-flight data that will be 
> caught between checkpoints. But keep in mind that smaller values can lead to 
> smaller throughput, but as I said, with small number of heavy processing 
> records this is not an issue. In an extreme example, if your records are lets 
> say 8 bytes each and require 1 hour to process, there is almost no need for 
> any buffering. 
> 
> Piotrek
> 
>> On 23 May 2018, at 12:58, Fabian Hueske <fhue...@gmail.com 
>> <mailto:fhue...@gmail.com>> wrote:
>> 
>> Hi Andrei,
>> 
>> With the current version of Flink, there is no general solution to this 
>> problem.
>> The upcoming version 1.5.0 of Flink adds a feature called credit-based flow 
>> control which might help here.
>> 
>> I'm adding @Piotr to this thread who knows more about the details of this 
>> new feature.
>> 
>> Best, Fabian
>> 
>> 2018-05-18 11:59 GMT+02:00 Andrei Shumanski <and...@shumanski.com 
>> <mailto:and...@shumanski.com>>:
>> Hi,
>> 
>> 
>> Right now it is a Kafka source, but I had the same issue when reading data 
>> from local FS.
>> 
>> It looks like a common problem for many (all?) sources.
>> When incoming data is very small (paths to large archives) but each entry 
>> requires a significant time to process (unpack, parse, etc.) Flink detects 
>> the back pressure with delay and too much data becomes part of the first 
>> transaction.
>> 
>> 
>> 
>> -- 
>> Best regards,
>> Andrei Shumanski
>> 
>> 
>> 
>> On Fri, May 18, 2018 at 11:44 AM, makeyang <riverbuild...@hotmail.com 
>> <mailto:riverbuild...@hotmail.com>> wrote:
>> Andrei Shumanski:
>>     which source are u using?
>> 
>> 
>> 
>> --
>> Sent from: 
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ 
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>> 
>> 
> 

Reply via email to