Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2218#discussion_r129110842
--- Diff: docs/Windowing.md ---
@@ -266,3 +266,105 @@ tuples can be received within the timeout period.
An example toplogy `SlidingWindowTopology` shows how to use the apis to
compute a sliding window sum and a tumbling window
average.
+## Stateful windowing
+The default windowing implementation in storm stores the tuples in memory
until they are processed and expired from the
+window. This limits the use cases to windows that
+fit entirely in memory. Also the source tuples cannot be ack-ed until the
window expiry requiring large message timeouts
+(topology.message.timeout.secs should be larger than the window length +
sliding interval). This also puts extra loads
+due to the complex acking and anchoring requirements.
+
+To address the above limitations and to support larger window sizes, storm
provides stateful windowing support via `IStatefulWindowedBolt`.
+User bolts should typically extend `BaseStatefulWindowedBolt` for the
windowing operations with the framework automatically
+managing the state of the window in the background.
+
+If the sources provide a monotonically increasing identifier as a part of
the message, the framework can use this
+to periodically checkpoint the last expired and evaluated message ids, to
avoid duplicate window evaluations in case of
+failures or restarts. During recovery, the tuples with message ids lower
than last expired id are discarded and tuples with
+message id between the last expired and last evaluated message ids are fed
into the system without activating any triggers.
+The tuples beyond the last evaluated message ids are processed as usual.
This can be enabled by setting
+the `messageIdField` as shown below,
+
+```java
+topologyBuilder.setBolt("mybolt",
+ new MyStatefulWindowedBolt()
+ .withWindow(...) // windowing configuarations
+ .withMessageIdField("msgid"), // a monotonically
increasing 'long' field in the tuple
+ parallelism)
+ .shuffleGrouping("spout");
+```
+
+However, this option is feasible only if the sources can provide a
monotonically increasing identifier in the tuple and the same is maintained
+while re-emitting the messages in case of failures. Here the tuples in
window are still held in memory.
+
+For more details take a look at the sample topology in storm starter
`StatefulWindowingTopology` which will help you get started.
+
+### Window checkpointing
+
+With window checkpointing, the monotonically increasing id is no-longer
required since the framework transparently saves the state of the window
periodically into the configured state backend.
+The state that is saved includes the tuples in the window, any system
state that is required to recover the state of processing
+and also the user state.
+
+```java
+topologyBuilder.setBolt("mybolt",
+ new MyStatefulPersistentWindowedBolt()
+ .withWindow(...) // windowing configuarations
+ .withPersistence() // persist the window state
+ .withMaxEventsInMemory(25000), // max number of events
to be cached in memory
+ parallelism)
+ .shuffleGrouping("spout");
+
+```
+
+The `withPersistence` instructs the framework to transparently save the
tuples in window along with
+any associated system and user state to the state backend. The
`withMaxEventsInMemory` is an optional
+configuration that specifies the maximum number of tuples that may be kept
in memory. The tuples are transparently loaded from
+the state backend as required and the ones that are most likely to be used
again are retained in memory.
+
+The state backend can be configured by setting the topology state provider
config,
+
+```java
+// use redis for state persistence
+conf.put(Config.TOPOLOGY_STATE_PROVIDER,
"org.apache.storm.redis.state.RedisKeyValueStateProvider");
+
+```
+Currently storm supports Redis and HBase as state backends and uses the
underlying state-checkpointing
+framework for saving the window state. For more details on state
checkpointing see [State-checkpointing.md](State-checkpointing.md)
+
+Here is an example of a persistent windowed bolt that uses the window
checkpointing to save its state. The `initState`
+is invoked with the last saved state (user state) at initialization time.
The execute method is invoked based on the configured
+windowing parameters and the tuples in the active window can be accessed
via an `iterator` as shown below.
+
+```java
+public class MyStatefulPersistentWindowedBolt extends
BaseStatefulWindowedBolt<K, V> {
+ private KeyValueState<K, V> state;
+
+ @Override
+ public void initState(KeyValueState<K, V> state) {
+ this.state = state;
+ // ...
+ // restore the state from the last saved state.
+ // ...
+ }
+
+ @Override
+ public void execute(TupleWindow window) {
+ // iterate over tuples in the current window
+ Iterator<Tuple> it = window.getIter();
+ while (it.hasNext()) {
+ // compute some result based on the tuples in window
+ }
+
+ // possibly update any state to be maintained across windows
+ state.put(STATE_KEY, updatedValue);
+
+ // emit the results downstream
+ collector.emit(new Values(result));
+ }
+}
+```
+
+Note: In case of persistent windowed bolts, use `TupleWindow.getIter` to
retrieve an iterator over the
+events in the window. If the number of tuples in windows are huge,
invoking `TupleWindow.get` would
+try to load all the tuples into memory and may throw an OOM exception.
+
+For more details take a look at the sample topology in storm starter
`PersistentWindowingTopology` which will help you get started.
--- End diff --
Nit: storm-starter, and maybe link
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---