You can connect to the TaskManagers with a tool such as jvisualvm to observe where the objects are created. It doesn't sound normal that there are millions of these objects if only a couple thousand elements come in.
> On 25. Jan 2018, at 14:59, Fabian Hueske <fhue...@gmail.com> wrote: > > Aljoscha (in CC), do you have an idea about this issue? > > Thanks, > Fabian > > 2018-01-24 7:06 GMT+01:00 Navneeth Krishnan <reachnavnee...@gmail.com > <mailto:reachnavnee...@gmail.com>>: > Thanks Fabian but for 1.5k messages per second per TM there are several > million Internal & TimerWindow objects created within a period of 5 seconds. > Is there a way to get debug this issue? > > Regards, > Navneeth > > On Tue, Jan 23, 2018 at 2:09 AM, Fabian Hueske <fhue...@gmail.com > <mailto:fhue...@gmail.com>> wrote: > Hi, > > TimeWindows and Timers are created for each window, i.e., every 5 seconds for > every distinct key that a task is processing. > Event-time windows are completed and cleaned up when a watermark is received > that passes the window end timestamp. > Therefore, there might be more than one window per key depending on the > watermarks. > > Hope this helps, > Fabian > > 2018-01-21 6:48 GMT+01:00 Navneeth Krishnan <reachnavnee...@gmail.com > <mailto:reachnavnee...@gmail.com>>: > Hi, > > I'm facing issues with frequent young generation garbage collections in my > task manager which happens approximately every few seconds. I have 3 task > managers with 12GB heap allocated on each and I have set the config to use > G1GC. My program ingests binary data from kafka source and the message rate > is around 4.5k msgs/sec with around 400 bytes per msg. Below are the > operators used in the program. > > kafka src -> keyby -> CoProcess -> keyby -> Tumbling Window (5secs) -> > FlatMap -> Sink > > I captured the below histograms at 5 second intervals and analyzed the heap > as well. It looks like a lot InternalTimer and TimeWindow objects are created. > > Also, I see a high usage in > org.apache.flink.streaming.api.operators.HeapInternalTimerService. > > Window code: > dataStream.keyBy(new MessageKeySelector()) > .window(TumblingEventTimeWindows.of(Time.seconds(5))) > .apply(new Aggregate()); > > Captured at time T: > > num #instances #bytes class name > ---------------------------------------------- > 1: 2074427 481933816 [B > 2: 357192 339368592 [D > 3: 12759222 204147552 java.lang.Integer > 4: 31416 85151832 [I > 5: 900982 83872240 [C > 6: 631888 20220416 java.util.HashMap$Node > 7: 804203 19300872 java.lang.String > 8: 541651 17332832 > org.apache.flink.streaming.api.operators.InternalTimer > 9: 540252 17288064 > org.apache.flink.streaming.api.windowing.windows.TimeWindow > > > Captured at T1 (T + 5 seconds): > > num #instances #bytes class name > ---------------------------------------------- > 1: 12084258 2282849264 <tel:(228)%20284-9264> [B > 2: 1922018 1828760896 [D > 3: 68261427 1092182832 java.lang.Integer > 4: 2712099 291488736 [C > 5: 54201 98798976 [I > 6: 2028250 48678000 java.lang.String > 7: 66080 43528136 [[B > 8: 1401915 35580168 [Ljava.lang.Object; > 9: 949062 30369984 java.util.HashMap$Node > 10: 570832 18266624 > org.apache.flink.streaming.api.operators.InternalTimer > 11: 549979 17599328 > org.apache.flink.streaming.api.windowing.windows.TimeWindow > > > Captured at T2 (T1+ 5 seconds): > > num #instances #bytes class name > ---------------------------------------------- > 1: 9911982 2920384472 [B > 2: 1584406 1510958520 [D > 3: 56087337 897397392 java.lang.Integer > 4: 26080337 834570784 java.util.HashMap$Node > 5: 25756748 824215936 > org.apache.flink.streaming.api.operators.InternalTimer > 6: 25740086 823682752 > org.apache.flink.streaming.api.windowing.windows.TimeWindow > > Thanks. > > > >