You can connect to the TaskManagers with a tool such as jvisualvm to observe 
where the objects are created. It doesn't sound normal that there are millions 
of these objects if only a couple thousand elements come in.

> On 25. Jan 2018, at 14:59, Fabian Hueske <fhue...@gmail.com> wrote:
> 
> Aljoscha (in CC), do you have an idea about this issue?
> 
> Thanks,
> Fabian
> 
> 2018-01-24 7:06 GMT+01:00 Navneeth Krishnan <reachnavnee...@gmail.com 
> <mailto:reachnavnee...@gmail.com>>:
> Thanks Fabian but for 1.5k messages per second per TM there are several 
> million Internal & TimerWindow objects created within a period of 5 seconds. 
> Is there a way to get debug this issue?
>  
> Regards,
> Navneeth
> 
> On Tue, Jan 23, 2018 at 2:09 AM, Fabian Hueske <fhue...@gmail.com 
> <mailto:fhue...@gmail.com>> wrote:
> Hi,
> 
> TimeWindows and Timers are created for each window, i.e., every 5 seconds for 
> every distinct key that a task is processing. 
> Event-time windows are completed and cleaned up when a watermark is received 
> that passes the window end timestamp. 
> Therefore, there might be more than one window per key depending on the 
> watermarks.
> 
> Hope this helps,
> Fabian
> 
> 2018-01-21 6:48 GMT+01:00 Navneeth Krishnan <reachnavnee...@gmail.com 
> <mailto:reachnavnee...@gmail.com>>:
> Hi,
> 
> I'm facing issues with frequent young generation garbage collections in my 
> task manager which happens approximately every few seconds. I have 3 task 
> managers with 12GB heap allocated on each and I have set the config to use 
> G1GC. My program ingests binary data from kafka source and the message rate 
> is around 4.5k msgs/sec with around 400 bytes per msg.  Below are the 
> operators used in the program.
> 
> kafka src -> keyby -> CoProcess -> keyby -> Tumbling Window (5secs) -> 
> FlatMap -> Sink
> 
> I captured the below histograms at 5 second intervals and analyzed the heap 
> as well. It looks like a lot InternalTimer and TimeWindow objects are created.
> 
> Also, I see a high usage in 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.
> 
> Window code:
> dataStream.keyBy(new MessageKeySelector())
>                     .window(TumblingEventTimeWindows.of(Time.seconds(5)))
>                     .apply(new Aggregate());
> 
> Captured at time T:
> 
>  num     #instances         #bytes  class name
> ----------------------------------------------
>    1:       2074427      481933816  [B
>    2:        357192      339368592  [D
>    3:      12759222      204147552  java.lang.Integer
>    4:         31416       85151832  [I
>    5:        900982       83872240  [C
>    6:        631888       20220416  java.util.HashMap$Node
>    7:        804203       19300872  java.lang.String
>    8:        541651       17332832  
> org.apache.flink.streaming.api.operators.InternalTimer
>    9:        540252       17288064  
> org.apache.flink.streaming.api.windowing.windows.TimeWindow
> 
> 
> Captured at T1 (T + 5 seconds):
> 
>  num     #instances         #bytes  class name
> ----------------------------------------------
>    1:      12084258     2282849264 <tel:(228)%20284-9264>  [B
>    2:       1922018     1828760896  [D
>    3:      68261427     1092182832  java.lang.Integer
>    4:       2712099      291488736  [C
>    5:         54201       98798976  [I
>    6:       2028250       48678000  java.lang.String
>    7:         66080       43528136  [[B
>    8:       1401915       35580168  [Ljava.lang.Object;
>    9:        949062       30369984  java.util.HashMap$Node
>   10:        570832       18266624  
> org.apache.flink.streaming.api.operators.InternalTimer
>   11:        549979       17599328  
> org.apache.flink.streaming.api.windowing.windows.TimeWindow
> 
> 
> Captured at T2 (T1+ 5 seconds):
> 
>  num     #instances         #bytes  class name
> ----------------------------------------------
>    1:       9911982     2920384472  [B
>    2:       1584406     1510958520  [D
>    3:      56087337      897397392  java.lang.Integer
>    4:      26080337      834570784  java.util.HashMap$Node
>    5:      25756748      824215936  
> org.apache.flink.streaming.api.operators.InternalTimer
>    6:      25740086      823682752  
> org.apache.flink.streaming.api.windowing.windows.TimeWindow
> 
> Thanks.
> 
> 
> 
> 

Reply via email to