yunfengzhou-hub commented on code in PR #24672:
URL: https://github.com/apache/flink/pull/24672#discussion_r1569902836


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java:
##########
@@ -117,6 +120,7 @@ public class InternalTimerServiceImpl<K, N> implements 
InternalTimerService<N> {
             startIdx = Math.min(keyGroupIdx, startIdx);
         }
         this.localKeyGroupRangeStartIdx = startIdx;
+        this.processingTimeCallback = this::onProcessingTime;

Review Comment:
   How about making `onProcessingTime` a protected method? This way we won't 
need to introduce processingTimeCallback.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java:
##########
@@ -179,11 +180,60 @@ <N> InternalTimerServiceImpl<K, N> 
registerOrGetTimerService(
         return timerService;
     }
 
+    @Override
+    public <N> InternalTimerService<N> getAsyncInternalTimerService(
+            String name,
+            TypeSerializer<K> keySerializer,
+            TypeSerializer<N> namespaceSerializer,
+            Triggerable<K, N> triggerable,
+            AsyncExecutionController<K> asyncExecutionController) {
+        checkNotNull(keySerializer, "Timers can only be used on keyed 
operators.");
+
+        // the following casting is to overcome type restrictions.
+        TimerSerializer<K, N> timerSerializer =
+                new TimerSerializer<>(keySerializer, namespaceSerializer);
+
+        InternalTimerServiceAsyncImpl<K, N> timerService =
+                registerOrGetAsyncTimerService(name, timerSerializer, 
asyncExecutionController);
+
+        timerService.startTimerService(
+                timerSerializer.getKeySerializer(),
+                timerSerializer.getNamespaceSerializer(),
+                triggerable);
+
+        return timerService;
+    }
+
+    <N> InternalTimerServiceAsyncImpl<K, N> registerOrGetAsyncTimerService(
+            String name,
+            TimerSerializer<K, N> timerSerializer,
+            AsyncExecutionController<K> asyncExecutionController) {
+        InternalTimerServiceAsyncImpl<K, N> timerService =
+                (InternalTimerServiceAsyncImpl<K, N>) timerServices.get(name);
+        if (timerService == null) {
+
+            timerService =
+                    new InternalTimerServiceAsyncImpl<>(
+                            taskIOMetricGroup,
+                            localKeyGroupRange,
+                            keyContext,
+                            processingTimeService,
+                            createTimerPriorityQueue(
+                                    PROCESSING_TIMER_PREFIX + name, 
timerSerializer),
+                            createTimerPriorityQueue(EVENT_TIMER_PREFIX + 
name, timerSerializer),
+                            cancellationContext,
+                            asyncExecutionController);
+
+            timerServices.put(name, timerService);
+        }
+        return timerService;
+    }
+
     Map<String, InternalTimerServiceImpl<K, ?>> getRegisteredTimerServices() {
         return Collections.unmodifiableMap(timerServices);
     }
 
-    private <N>
+    protected <N>
             KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> 
createTimerPriorityQueue(

Review Comment:
   This method seems not used in subclasses, so `private` might be enough. Same 
for `restoreStateForKeyGroup`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to