Hi, can you take a heap dump from a JVM that runs into the problem and share it with us? That would make finding the cause a lot easier.
Best, Stefan > Am 15.06.2018 um 23:01 schrieb ashish pok <ashish...@yahoo.com>: > > All, > > I have another slow Memory Leak situation using basic TimeSession Window > (earlier it was GlobalWindow related that Fabian helped clarify). > > I have a very simple data pipeline: > > DataStream<PlatformEvent> processedData = rawTuples > > .window(ProcessingTimeSessionWindows.withGap(Time.seconds(AppConfigs.getWindowSize(780)))) > > .trigger(new ProcessingTimePurgeTrigger()) > .apply(new IPSLAMetricWindowFn()) > .name("windowFunctionTuple") > .map(new TupleToPlatformEventMapFn()) > .name("mapTupleEvent") > ; > > > I initially didnt even have ProcessingTmePurgeTrigger and it was using > default Trigger. In an effort to fix this issue, I created my own Trigger > from default ProcessingTimeTrigger with simple override to onProcessingTime > method (essentially replacing FIRE with FIRE_AND_PURGE) > > @Override > public TriggerResult onProcessingTime(long time, TimeWindow window, > TriggerContext ctx) { > return TriggerResult.FIRE_AND_PURGE; > } > > This seems to have done nothing (may have delayed issue by couple of hours - > not certain). But, I still see heap utilization creep up slowly and > eventually reaches a point when GC starts to take too long and then the > dreaded OOM. > > For completeness here is my Window Function (still using old function > interface). It creates few metrics for reporting and applies logic by looping > over the Iterable. NO states are explicitly kept in this function, needed > RichWindowFunction to generate metrics basically. > > public class IPSLAMetricWindowFn extends RichWindowFunction<NumericFactTuple, > BasicFactTuple, String, TimeWindow> { > > private static final long serialVersionUID = 1L; > > private static Logger logger = > LoggerFactory.getLogger(IPSLAMetricWindowFn.class); > > private Meter in; > > private Meter out; > > private Meter error; > > @Override > public void open(Configuration conf) throws Exception { > this.in = getRuntimeContext() > .getMetricGroup() > .addGroup(AppConstants.APP_METRICS.PROCESS) > .meter(AppConstants.APP_METRICS.IN, new > MeterView(AppConstants.APP_METRICS.INTERVAL_30)); > this.out = getRuntimeContext() > .getMetricGroup() > .addGroup(AppConstants.APP_METRICS.PROCESS) > .meter(AppConstants.APP_METRICS.OUT, new > MeterView(AppConstants.APP_METRICS.INTERVAL_30)); > this.error = getRuntimeContext() > .getMetricGroup() > .addGroup(AppConstants.APP_METRICS.PROCESS) > .meter(AppConstants.APP_METRICS.ERROR, new > MeterView(AppConstants.APP_METRICS.INTERVAL_30)); > super.open(conf); > } > > @Override > public void apply(String key, TimeWindow window, > Iterable<NumericFactTuple> events, Collector<BasicFactTuple> collector) > throws Exception { > } > } > > > Appreciate any pointers on what could be causing leaks here. This seems > pretty straight-forward. > > Thanks, Ashish >