请问一下,我在做processing time window的时候因为需要按照数量和时间两个维度处理数据,所以自定义了Trigger方法
其中onElement方法和onProcessingTime方法如下: @Override public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception { ReducingState<Long> count = ctx.getPartitionedState(stateDesc); count.add(1L); long timeout = System.currentTimeMillis() + 30000L; ctx.registerProcessingTimeTimer(timeout); System.out.println("new timeout " + timeout); if (count.get() >= maxCount) { count.clear(); return TriggerResult.FIRE_AND_PURGE; } return TriggerResult.CONTINUE; } @Override public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception { System.out.println("current timeout " + time); return TriggerResult.FIRE_AND_PURGE; } 我的调用方式如下: DataStream<Tuple2<String, Integer>> dataStream = env .addSource(consumer) .flatMap(new CustomFlatMapFunction()) .keyBy(0) .window(TumblingProcessingTimeWindows.of(Time.seconds(30))) .trigger(ProcessingTimeWithCountTrigger.of(5)) .sum(1); 请问为何我在onElement中调用的ctx.registerProcessingTimeTimer并没有作用呢?onProcessingTime还是按照原来设置的30秒时间触发了