[GitHub] flink pull request #5236: [FLINK-8322] support getting number of existing ti...

2018-01-12 Thread bowenli86
Github user bowenli86 closed the pull request at:

https://github.com/apache/flink/pull/5236


---


[GitHub] flink pull request #5236: [FLINK-8322] support getting number of existing ti...

2018-01-03 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5236#discussion_r159465985
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
 ---
@@ -442,23 +458,15 @@ public int numEventTimeTimers() {
}
 
public int numProcessingTimeTimers(N namespace) {
-   int count = 0;
-   for (InternalTimer timer : processingTimeTimersQueue) {
-   if (timer.getNamespace().equals(namespace)) {
-   count++;
-   }
-   }
-   return count;
+   return 
processingTimeNumberTimers.column(namespace).keySet().stream()
+   .mapToInt(k -> 
processingTimeNumberTimers.get(k, namespace))
+   .sum();
}
 
public int numEventTimeTimers(N namespace) {
--- End diff --

`@Override` annotation is missing
  


---


[GitHub] flink pull request #5236: [FLINK-8322] support getting number of existing ti...

2018-01-03 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5236#discussion_r159465653
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
 ---
@@ -442,23 +458,15 @@ public int numEventTimeTimers() {
}
 
public int numProcessingTimeTimers(N namespace) {
--- End diff --

`@Override` annotation is missing


---


[GitHub] flink pull request #5236: [FLINK-8322] support getting number of existing ti...

2018-01-03 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5236#discussion_r159464428
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
 ---
@@ -475,4 +483,38 @@ public int getLocalKeyGroupRangeStartIdx() {
public Set>[] getProcessingTimeTimersPerKeyGroup() {
return this.processingTimeTimersByKeyGroup;
}
+
+   private void incrementProcessingTimeTimer(K key, N namespace) {
+   incrementTimeTimer(processingTimeNumberTimers, key, namespace);
+   }
+
+   private void incrementEventTimeTimer(K key, N namespace) {
+   incrementTimeTimer(eventTimeNumberTimers, key, namespace);
+   }
+
+   private void incrementTimeTimer(Table table, K key, N 
namespace) {
+   if (table.contains(key, namespace)) {
+   table.put(key, namespace, table.get(key, namespace) + 
1);
+   } else {
+   table.put(key, namespace, 1);
+   }
+   }
+
+   private void decrementProcessingTimeTimer(K key, N namespace) {
+   decrementTimeTimer(processingTimeNumberTimers, key, namespace);
+   }
+
+   private void decrementEventTimeTimer(K key, N namespace) {
+   decrementTimeTimer(eventTimeNumberTimers, key, namespace);
+   }
+
+   private void decrementTimeTimer(Table table, K key, N 
namespace) {
+   if (table.contains(key, namespace)) {
--- End diff --

Can it actually happen that the `(key, namespace)` is not in the table? If 
it should not happen, maybe 
 a `checkState` is better.


---


[GitHub] flink pull request #5236: [FLINK-8322] support getting number of existing ti...

2018-01-03 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5236#discussion_r159462006
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
 ---
@@ -262,16 +277,17 @@ public void onProcessingTime(long time) throws 
Exception {
}
}
 
+   private int i = 0;
--- End diff --

This line does not look right.


---


[GitHub] flink pull request #5236: [FLINK-8322] support getting number of existing ti...

2018-01-03 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5236#discussion_r159461777
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
 ---
@@ -442,23 +458,15 @@ public int numEventTimeTimers() {
}
 
public int numProcessingTimeTimers(N namespace) {
-   int count = 0;
-   for (InternalTimer timer : processingTimeTimersQueue) {
-   if (timer.getNamespace().equals(namespace)) {
-   count++;
-   }
-   }
-   return count;
+   return 
processingTimeNumberTimers.column(namespace).keySet().stream()
--- End diff --

How about something like: 
```
public int numProcessingTimeTimers(N namespace) {
return processingTimeNumberTimers.column(namespace)
.entrySet()
.stream()
.mapToInt(Map.Entry::getValue)
.sum();
}
```
It avoids the additional key lookup in `mapToInt`
  


---


[GitHub] flink pull request #5236: [FLINK-8322] support getting number of existing ti...

2018-01-03 Thread bowenli86
GitHub user bowenli86 opened a pull request:

https://github.com/apache/flink/pull/5236

[FLINK-8322] support getting number of existing timers in TimerService

## What is the purpose of the change


There are pretty common use cases where users want to use timers as 
scheduled threads - e.g. add a timer to wake up x hours later and do something 
(reap old data usually) only if there's no existing timers, basically we only 
want at most 1 timer exists for the key all the time

## Brief change log

Discussed with Aljoscha that we should use extra data structures to keep 
track number of timers. I decided to use Guava's Table to store ``. 

## Verifying this change


This change added tests and can be verified as follows: 
`HeapInternalTimerServiceTest`, `KeyedProcessOperatorTest`, and 
`KeyedCoProcessOperatorTest`.

## Does this pull request potentially affect one of the following parts:

  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (JavaDocs)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bowenli86/flink FLINK-8322

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5236.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5236


commit 8578eb6678ad54169e1769b8e69391a388d1d40a
Author: Bowen Li 
Date:   2018-01-02T19:21:28Z

update local branch

commit b81a8e2aebe4d9cd61baef3904d1e6bef0d1702a
Author: Bowen Li 
Date:   2018-01-03T07:51:09Z

[FLINK-8322] support getting number of existing timers in TimerService

commit 483c8a20ef6c2390d1491724755fbd95e88eb278
Author: Bowen Li 
Date:   2018-01-03T08:03:48Z

add comments

commit 4c225df508ebb03acc0cd68c77681977161403f6
Author: Bowen Li 
Date:   2018-01-03T08:38:24Z

update unit tests

commit 8876add25b7fbf576c045148cc35cee4086cb901
Author: Bowen Li 
Date:   2018-01-03T09:12:57Z

add unit tests




---