private static class MergeFunction extends RichProcessFunction<Tuple2<Integer, ObjectNode>, Tuple2<Integer, ObjectNode>> { private ValueState<Tuple2<Integer, ObjectNode>> state;
@Override public void open(Configuration parameters) throws Exception { state = getRuntimeContext().getState(new ValueStateDescriptor<>("mystate", TypeInformation.of(getTypeOfTuple))); } @Override public void processElement(Tuple2<Integer, ObjectNode> tuple2, Context context, Collector<Tuple2<Integer, ObjectNode>> collector) throws Exception { //XXXXXX context.timerService().registerEventTimeTimer(System.currentTimeMillis() + interval); } @Override public void onTimer(long l, OnTimerContext onTimerContext, Collector<Tuple2<Integer, ObjectNode>> collector) throws Exception { if (state.value() != null) { collector.collect(state.value()); } } } In my understanding. If I set a timer in my class that extends RichProcessFunction, once the onTimer function be called. The object of MergeFunction can be recycled by gc. If I don't set a event timer in the processElement, it can be recycled by gc after end of processElement function. -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ConnectedStream-keyby-issues-tp12999p13089.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.