private static class MergeFunction extends
RichProcessFunction<Tuple2&lt;Integer, ObjectNode>, Tuple2<Integer,
ObjectNode>> {
        private ValueState<Tuple2&lt;Integer, ObjectNode>> state;

        @Override
        public void open(Configuration parameters) throws Exception {
            state = getRuntimeContext().getState(new
ValueStateDescriptor<>("mystate", TypeInformation.of(getTypeOfTuple)));
        }

        @Override
        public void processElement(Tuple2<Integer, ObjectNode> tuple2,
Context context, Collector<Tuple2&lt;Integer, ObjectNode>> collector) throws
Exception {
               //XXXXXX
              
context.timerService().registerEventTimeTimer(System.currentTimeMillis() +
interval);
        }
     
        @Override
        public void onTimer(long l, OnTimerContext onTimerContext,
Collector<Tuple2&lt;Integer, ObjectNode>> collector) throws Exception {
            if (state.value() != null) {
                collector.collect(state.value());
            }
        }
    }

In my understanding.

If I set a timer in my class that extends RichProcessFunction, once the
onTimer function be called.
The object of MergeFunction can be recycled by gc. 
If I don't set a event timer in the processElement, it can be recycled by gc
after end of processElement function.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ConnectedStream-keyby-issues-tp12999p13089.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to