Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/6062#discussion_r191711210
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
---
@@ -199,17 +186,9 @@ public long currentWatermark() {
@Override
public void registerProcessingTimeTimer(N namespace, long time) {
- InternalTimer<K, N> timer = new InternalTimer<>(time, (K)
keyContext.getCurrentKey(), namespace);
-
- // make sure we only put one timer per key into the queue
- Set<InternalTimer<K, N>> timerSet =
getProcessingTimeTimerSetForTimer(timer);
- if (timerSet.add(timer)) {
-
- InternalTimer<K, N> oldHead =
processingTimeTimersQueue.peek();
+ InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek();
+ if (processingTimeTimersQueue.scheduleTimer(time, (K)
keyContext.getCurrentKey(), namespace)) {
--- End diff --
Good point, I would suggest we do this in another PR.
---