Hi,

What Flink version are you using?

Generally speaking Flink might not the best if you have records fan out, this 
may significantly increase checkpointing time. 

However you might want to first identify what’s causing long GC times. If there 
are long GC pause, this should be the first thing to fix.

Piotrek

> On 2 Mar 2019, at 08:19, Padarn Wilson <pad...@gmail.com> wrote:
> 
> Hi all again - following up on this I think I've identified my problem as 
> being something else, but would appreciate if anyone can offer advice.
> 
> After running my stream from sometime, I see that my garbage collector for 
> old generation starts to take a very long time:
> <Screen Shot 2019-03-02 at 3.01.57 PM.png>
> here the purple line is young generation time, this is ever increasing, but 
> grows slowly, while the blue is old generation.
> This in itself is not a problem, but as soon as the next checkpoint is 
> triggered after this happens you see the following:
> <Screen Shot 2019-03-02 at 3.02.48 PM.png>
> It looks like the checkpoint hits a cap, but this is only because the 
> checkpoints start to timeout and fail (these are the alignment time per 
> operator)
> 
> I do notice that my state is growing quite larger over time, but I don't have 
> a good understanding of what would cause this to happen with the JVM old 
> generation metric, which appears to be the leading metric before a problem is 
> noticed. Other metrics such as network buffers also show that at the 
> checkpoint time things start to go haywire and the situation never recovers.
> 
> Thanks
> 
> On Thu, Feb 28, 2019 at 5:50 PM Padarn Wilson <pad...@gmail.com 
> <mailto:pad...@gmail.com>> wrote:
> Hi all,
> 
> I'm trying to process many records, and I have an expensive operation I'm 
> trying to optimize. Simplified it is something like:
> 
> Data: (key1, count, time)
> 
> Source -> Map(x -> (x, newKeyList(x.key1))
>             -> Flatmap(x -> x._2.map(y => (y, x._1.count, x._1.time))
>             -> Keyby(_.key1).TublingWindow().apply..
>             -> Sink
> 
> In the Map -> Flatmap, what is happening is that each key is mapping to a set 
> of keys, and then this is set as the new key. This effectively increase the 
> size of the stream by 16x
> 
> What I am trying to figure out is how to set the parallelism of my operators. 
> I see in some comments that people suggest your source, sink and aggregation 
> should have different parallelism, but I'm not clear on exactly why, or what 
> this means for CPU utilization. 
> (see for example 
> https://stackoverflow.com/questions/48317438/unable-to-achieve-high-cpu-utilization-with-flink-and-gelly
>  
> <https://stackoverflow.com/questions/48317438/unable-to-achieve-high-cpu-utilization-with-flink-and-gelly>)
> 
> Also, it isn't clear to me the best way to handle this increase in data 
> within the stream itself.
> 
> Thanks

Reply via email to