[GitHub] twalthr commented on a change in pull request #6504: [FLINK-10068][docs] Add documentation for RocksDB-based timers and st…
twalthr commented on a change in pull request #6504: [FLINK-10068][docs] Add documentation for RocksDB-based timers and st… URL: https://github.com/apache/flink/pull/6504#discussion_r211066338 ## File path: docs/dev/stream/operators/process_function.md ## @@ -277,19 +277,18 @@ Both types of timers (processing-time and event-time) are internally maintained The `TimerService` deduplicates timers per key and timestamp, i.e., there is at most one timer per key and timestamp. If multiple timers are registered for the same timestamp, the `onTimer()` method will be called just once. -**Note:** Flink synchronizes invocations of `onTimer()` and `processElement()`. Hence, users do not have to worry about concurrent modification of state. +Note Flink synchronizes invocations of `onTimer()` and `processElement()`. Hence, users do not have to worry about concurrent modification of state. ### Fault Tolerance Timers are fault tolerant and checkpointed along with the state of the application. In case of a failure recovery or when starting an application from a savepoint, the timers are restored. -**Note:** Checkpointed processing-time timers that were supposed to fire before their restoration, will fire immediately. +Note Checkpointed processing-time timers that were supposed to fire before their restoration, will fire immediately. This might happen when an application recovers from a failure or when it is started from a savepoint. -**Note:** Timers are always synchronously checkpointed, regardless of the configuration of the state backends. -Therefore, a large number of timers can significantly increase checkpointing time. -See the "Timer Coalescing" section for advice on how to reduce the number of timers. +Note Timers are always asynchronously checkpointed, except for the combination of RocksDB backend / with incremental snapshots / with heap-based timers (will be resolved with `FLINK-10026`). Review comment: If there is nothing more to say in your opinion, then I'm fine with this. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #6504: [FLINK-10068][docs] Add documentation for RocksDB-based timers and st…
twalthr commented on a change in pull request #6504: [FLINK-10068][docs] Add documentation for RocksDB-based timers and st… URL: https://github.com/apache/flink/pull/6504#discussion_r211066305 ## File path: docs/dev/stream/operators/process_function.md ## @@ -277,19 +277,18 @@ Both types of timers (processing-time and event-time) are internally maintained The `TimerService` deduplicates timers per key and timestamp, i.e., there is at most one timer per key and timestamp. If multiple timers are registered for the same timestamp, the `onTimer()` method will be called just once. Review comment: Writing more docs about this might be a bigger change but moving 10 lines into a subsection on a different page can be done in this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #6504: [FLINK-10068][docs] Add documentation for RocksDB-based timers and st…
twalthr commented on a change in pull request #6504: [FLINK-10068][docs] Add documentation for RocksDB-based timers and st… URL: https://github.com/apache/flink/pull/6504#discussion_r208952984 ## File path: docs/ops/state/large_state_tuning.md ## @@ -142,6 +142,17 @@ by default. To enable this feature, users can instantiate a `RocksDBStateBackend new RocksDBStateBackend(filebackend, true); {% endhighlight %} +**RocksDB Timers** + +For RocksDB, user can chose whether timers are stored on the heap (default) or inside RocksDB. Heap-based timers can have a better performance for smaller amounts of +timers, while storing timers inside RocksDB offers higher scalability as the amount of timers in RocksDB can exceed the available main memory (spilling to disk). + +When using RockDB as state backend, the type of timer storage can be selected through Flink's configuration via option key `RocksDBOptions.TIMER_SERVICE_FACTORY`. +Possible choices are `HEAP` (to store timers on the heap, default) and `ROCKSDB` (to store timers in RocksDB). Review comment: Usually all YAML values are lower case. These are upper case? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #6504: [FLINK-10068][docs] Add documentation for RocksDB-based timers and st…
twalthr commented on a change in pull request #6504: [FLINK-10068][docs] Add documentation for RocksDB-based timers and st… URL: https://github.com/apache/flink/pull/6504#discussion_r208950851 ## File path: docs/dev/stream/operators/process_function.md ## @@ -333,3 +332,43 @@ ctx.timerService.registerEventTimeTimer(coalescedTime) {% endhighlight %} + +Timers can also be stopped and removed as follows: + +Stopping a processing-time timer: + + + +{% highlight java %} +long timestampOfTimerToStop = ... +ctx.timerService().deleteProcessingTimeTimer(timestampOfTimerToStop); +{% endhighlight %} + + + +{% highlight scala %} +val timestampOfTimerToStop = ... +ctx.timerService.deleteProcessingTimeTimer(timestampOfTimerToStop) +{% endhighlight %} + + + +Stopping an event-time timer: + + + +{% highlight java %} +long timestampOfTimerToStop = ... +ctx.timerService().deleteEventTimeTimer(timestampOfTimerToStop); +{% endhighlight %} + + + +{% highlight scala %} +val timestampOfTimerToStop = ... +ctx.timerService.deleteEventTimeTimer(timestampOfTimerToStop) +{% endhighlight %} + + + +Note Stopping a timer has no effect if no such timer timer with the given timestamp is registered. Review comment: remove duplicate `timer` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #6504: [FLINK-10068][docs] Add documentation for RocksDB-based timers and st…
twalthr commented on a change in pull request #6504: [FLINK-10068][docs] Add documentation for RocksDB-based timers and st… URL: https://github.com/apache/flink/pull/6504#discussion_r208949265 ## File path: docs/dev/stream/operators/process_function.md ## @@ -277,19 +277,18 @@ Both types of timers (processing-time and event-time) are internally maintained The `TimerService` deduplicates timers per key and timestamp, i.e., there is at most one timer per key and timestamp. If multiple timers are registered for the same timestamp, the `onTimer()` method will be called just once. Review comment: Can we move this entire `Fault Tolerance` section to `dev/stream/state/checkpointing.md` for a new `Timers` section? This is a general concept that is not only relevant for `ProcessFunction`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #6504: [FLINK-10068][docs] Add documentation for RocksDB-based timers and st…
twalthr commented on a change in pull request #6504: [FLINK-10068][docs] Add documentation for RocksDB-based timers and st… URL: https://github.com/apache/flink/pull/6504#discussion_r208950079 ## File path: docs/dev/stream/operators/process_function.md ## @@ -277,19 +277,18 @@ Both types of timers (processing-time and event-time) are internally maintained The `TimerService` deduplicates timers per key and timestamp, i.e., there is at most one timer per key and timestamp. If multiple timers are registered for the same timestamp, the `onTimer()` method will be called just once. -**Note:** Flink synchronizes invocations of `onTimer()` and `processElement()`. Hence, users do not have to worry about concurrent modification of state. +Note Flink synchronizes invocations of `onTimer()` and `processElement()`. Hence, users do not have to worry about concurrent modification of state. ### Fault Tolerance Timers are fault tolerant and checkpointed along with the state of the application. In case of a failure recovery or when starting an application from a savepoint, the timers are restored. -**Note:** Checkpointed processing-time timers that were supposed to fire before their restoration, will fire immediately. +Note Checkpointed processing-time timers that were supposed to fire before their restoration, will fire immediately. This might happen when an application recovers from a failure or when it is started from a savepoint. -**Note:** Timers are always synchronously checkpointed, regardless of the configuration of the state backends. -Therefore, a large number of timers can significantly increase checkpointing time. -See the "Timer Coalescing" section for advice on how to reduce the number of timers. +Note Timers are always asynchronously checkpointed, except for the combination of RocksDB backend / with incremental snapshots / with heap-based timers (will be resolved with `FLINK-10026`). Review comment: It would be great if you could add more internal knowledge if there is more to say. This section is very tiny but I guess a lot of work went it to so we should document it accordingly. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #6504: [FLINK-10068][docs] Add documentation for RocksDB-based timers and st…
twalthr commented on a change in pull request #6504: [FLINK-10068][docs] Add documentation for RocksDB-based timers and st… URL: https://github.com/apache/flink/pull/6504#discussion_r208952554 ## File path: docs/ops/state/large_state_tuning.md ## @@ -142,6 +142,17 @@ by default. To enable this feature, users can instantiate a `RocksDBStateBackend new RocksDBStateBackend(filebackend, true); {% endhighlight %} +**RocksDB Timers** + +For RocksDB, user can chose whether timers are stored on the heap (default) or inside RocksDB. Heap-based timers can have a better performance for smaller amounts of +timers, while storing timers inside RocksDB offers higher scalability as the amount of timers in RocksDB can exceed the available main memory (spilling to disk). + +When using RockDB as state backend, the type of timer storage can be selected through Flink's configuration via option key `RocksDBOptions.TIMER_SERVICE_FACTORY`. Review comment: The option classes are mostly internal, right? So a proper YAML would be more useful I guess. Can I only set this property for an entire Flink cluster or also per-job? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #6504: [FLINK-10068][docs] Add documentation for RocksDB-based timers and st…
twalthr commented on a change in pull request #6504: [FLINK-10068][docs] Add documentation for RocksDB-based timers and st… URL: https://github.com/apache/flink/pull/6504#discussion_r208951393 ## File path: docs/ops/state/large_state_tuning.md ## @@ -142,6 +142,17 @@ by default. To enable this feature, users can instantiate a `RocksDBStateBackend new RocksDBStateBackend(filebackend, true); {% endhighlight %} +**RocksDB Timers** + +For RocksDB, user can chose whether timers are stored on the heap (default) or inside RocksDB. Heap-based timers can have a better performance for smaller amounts of Review comment: "a user" This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #6504: [FLINK-10068][docs] Add documentation for RocksDB-based timers and st…
twalthr commented on a change in pull request #6504: [FLINK-10068][docs] Add documentation for RocksDB-based timers and st… URL: https://github.com/apache/flink/pull/6504#discussion_r208950482 ## File path: docs/dev/stream/operators/process_function.md ## @@ -333,3 +332,43 @@ ctx.timerService.registerEventTimeTimer(coalescedTime) {% endhighlight %} + +Timers can also be stopped and removed as follows: + +Stopping a processing-time timer: + + + +{% highlight java %} +long timestampOfTimerToStop = ... +ctx.timerService().deleteProcessingTimeTimer(timestampOfTimerToStop); Review comment: Are there any performance implications of deleting a timer? I guess not anymore right? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services