yunfengzhou-hub commented on code in PR #24672: URL: https://github.com/apache/flink/pull/24672#discussion_r1569902836
########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java: ########## @@ -117,6 +120,7 @@ public class InternalTimerServiceImpl<K, N> implements InternalTimerService<N> { startIdx = Math.min(keyGroupIdx, startIdx); } this.localKeyGroupRangeStartIdx = startIdx; + this.processingTimeCallback = this::onProcessingTime; Review Comment: How about making `onProcessingTime` a protected method? This way we won't need to introduce processingTimeCallback. ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java: ########## @@ -179,11 +180,60 @@ <N> InternalTimerServiceImpl<K, N> registerOrGetTimerService( return timerService; } + @Override + public <N> InternalTimerService<N> getAsyncInternalTimerService( + String name, + TypeSerializer<K> keySerializer, + TypeSerializer<N> namespaceSerializer, + Triggerable<K, N> triggerable, + AsyncExecutionController<K> asyncExecutionController) { + checkNotNull(keySerializer, "Timers can only be used on keyed operators."); + + // the following casting is to overcome type restrictions. + TimerSerializer<K, N> timerSerializer = + new TimerSerializer<>(keySerializer, namespaceSerializer); + + InternalTimerServiceAsyncImpl<K, N> timerService = + registerOrGetAsyncTimerService(name, timerSerializer, asyncExecutionController); + + timerService.startTimerService( + timerSerializer.getKeySerializer(), + timerSerializer.getNamespaceSerializer(), + triggerable); + + return timerService; + } + + <N> InternalTimerServiceAsyncImpl<K, N> registerOrGetAsyncTimerService( + String name, + TimerSerializer<K, N> timerSerializer, + AsyncExecutionController<K> asyncExecutionController) { + InternalTimerServiceAsyncImpl<K, N> timerService = + (InternalTimerServiceAsyncImpl<K, N>) timerServices.get(name); + if (timerService == null) { + + timerService = + new InternalTimerServiceAsyncImpl<>( + taskIOMetricGroup, + localKeyGroupRange, + keyContext, + processingTimeService, + createTimerPriorityQueue( + PROCESSING_TIMER_PREFIX + name, timerSerializer), + createTimerPriorityQueue(EVENT_TIMER_PREFIX + name, timerSerializer), + cancellationContext, + asyncExecutionController); + + timerServices.put(name, timerService); + } + return timerService; + } + Map<String, InternalTimerServiceImpl<K, ?>> getRegisteredTimerServices() { return Collections.unmodifiableMap(timerServices); } - private <N> + protected <N> KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> createTimerPriorityQueue( Review Comment: This method seems not used in subclasses, so `private` might be enough. Same for `restoreStateForKeyGroup`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org