[GitHub] flink pull request #5236: [FLINK-8322] support getting number of existing ti...
Github user bowenli86 closed the pull request at: https://github.com/apache/flink/pull/5236 ---
[GitHub] flink pull request #5236: [FLINK-8322] support getting number of existing ti...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5236#discussion_r159465985 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java --- @@ -442,23 +458,15 @@ public int numEventTimeTimers() { } public int numProcessingTimeTimers(N namespace) { - int count = 0; - for (InternalTimertimer : processingTimeTimersQueue) { - if (timer.getNamespace().equals(namespace)) { - count++; - } - } - return count; + return processingTimeNumberTimers.column(namespace).keySet().stream() + .mapToInt(k -> processingTimeNumberTimers.get(k, namespace)) + .sum(); } public int numEventTimeTimers(N namespace) { --- End diff -- `@Override` annotation is missing ---
[GitHub] flink pull request #5236: [FLINK-8322] support getting number of existing ti...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5236#discussion_r159465653 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java --- @@ -442,23 +458,15 @@ public int numEventTimeTimers() { } public int numProcessingTimeTimers(N namespace) { --- End diff -- `@Override` annotation is missing ---
[GitHub] flink pull request #5236: [FLINK-8322] support getting number of existing ti...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5236#discussion_r159464428 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java --- @@ -475,4 +483,38 @@ public int getLocalKeyGroupRangeStartIdx() { public Set>[] getProcessingTimeTimersPerKeyGroup() { return this.processingTimeTimersByKeyGroup; } + + private void incrementProcessingTimeTimer(K key, N namespace) { + incrementTimeTimer(processingTimeNumberTimers, key, namespace); + } + + private void incrementEventTimeTimer(K key, N namespace) { + incrementTimeTimer(eventTimeNumberTimers, key, namespace); + } + + private void incrementTimeTimer(Table table, K key, N namespace) { + if (table.contains(key, namespace)) { + table.put(key, namespace, table.get(key, namespace) + 1); + } else { + table.put(key, namespace, 1); + } + } + + private void decrementProcessingTimeTimer(K key, N namespace) { + decrementTimeTimer(processingTimeNumberTimers, key, namespace); + } + + private void decrementEventTimeTimer(K key, N namespace) { + decrementTimeTimer(eventTimeNumberTimers, key, namespace); + } + + private void decrementTimeTimer(Table table, K key, N namespace) { + if (table.contains(key, namespace)) { --- End diff -- Can it actually happen that the `(key, namespace)` is not in the table? If it should not happen, maybe a `checkState` is better. ---
[GitHub] flink pull request #5236: [FLINK-8322] support getting number of existing ti...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5236#discussion_r159462006 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java --- @@ -262,16 +277,17 @@ public void onProcessingTime(long time) throws Exception { } } + private int i = 0; --- End diff -- This line does not look right. ---
[GitHub] flink pull request #5236: [FLINK-8322] support getting number of existing ti...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5236#discussion_r159461777 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java --- @@ -442,23 +458,15 @@ public int numEventTimeTimers() { } public int numProcessingTimeTimers(N namespace) { - int count = 0; - for (InternalTimertimer : processingTimeTimersQueue) { - if (timer.getNamespace().equals(namespace)) { - count++; - } - } - return count; + return processingTimeNumberTimers.column(namespace).keySet().stream() --- End diff -- How about something like: ``` public int numProcessingTimeTimers(N namespace) { return processingTimeNumberTimers.column(namespace) .entrySet() .stream() .mapToInt(Map.Entry::getValue) .sum(); } ``` It avoids the additional key lookup in `mapToInt` ---
[GitHub] flink pull request #5236: [FLINK-8322] support getting number of existing ti...
GitHub user bowenli86 opened a pull request: https://github.com/apache/flink/pull/5236 [FLINK-8322] support getting number of existing timers in TimerService ## What is the purpose of the change There are pretty common use cases where users want to use timers as scheduled threads - e.g. add a timer to wake up x hours later and do something (reap old data usually) only if there's no existing timers, basically we only want at most 1 timer exists for the key all the time ## Brief change log Discussed with Aljoscha that we should use extra data structures to keep track number of timers. I decided to use Guava's Table to store ``. ## Verifying this change This change added tests and can be verified as follows: `HeapInternalTimerServiceTest`, `KeyedProcessOperatorTest`, and `KeyedCoProcessOperatorTest`. ## Does this pull request potentially affect one of the following parts: - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (JavaDocs) You can merge this pull request into a Git repository by running: $ git pull https://github.com/bowenli86/flink FLINK-8322 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5236.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5236 commit 8578eb6678ad54169e1769b8e69391a388d1d40a Author: Bowen Li Date: 2018-01-02T19:21:28Z update local branch commit b81a8e2aebe4d9cd61baef3904d1e6bef0d1702a Author: Bowen Li Date: 2018-01-03T07:51:09Z [FLINK-8322] support getting number of existing timers in TimerService commit 483c8a20ef6c2390d1491724755fbd95e88eb278 Author: Bowen Li Date: 2018-01-03T08:03:48Z add comments commit 4c225df508ebb03acc0cd68c77681977161403f6 Author: Bowen Li Date: 2018-01-03T08:38:24Z update unit tests commit 8876add25b7fbf576c045148cc35cee4086cb901 Author: Bowen Li Date: 2018-01-03T09:12:57Z add unit tests ---