GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/1265
[FLINK-2864] Make State of General-Purpose Window Operators Fault-Tolerant This adds method state() on Trigger context that should be used to create an OperatorState to deal with fault-tolerant state. WindowAssigner now has a method getWindowSerializer() that is used to get a TypeSerializer for the Windows that it assigns. The Serializer for the Key is retrieved from the input KeyedStream and the serializer for the input elements is already available. During checkpointing all currently in-flight windows (per key, per window) are serialized using the TypeSerializers. The state that is accessible in Triggers using state() is kept in a HashMap<String, Serializable>, this is serialized using java serialization. This introduces the restriction that the element must be Serializable when using DeltaTrigger. I did not yet take the step of integrating triggers with the operator-provided key-value state since this would require state to be very dynamic and also allow deletion of state. @StephanEwen could you please have a look at how the state checkpoint/restore is implemented. Also the triggers have a state interface that differs from the state interface that user functions have, for now. You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink window-state Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1265.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1265 ---- commit fb5733f52df8eabfd88f51fb39f83930f27befbd Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Date: 2015-10-11T09:37:29Z [FLINK-2864] Make State of General-Purpose Window Operators Fault-Tolerant This adds method state() on Trigger context that should be used to create an OperatorState to deal with fault-tolerant state. WindowAssigner now has a method getWindowSerializer() that is used to get a TypeSerializer for the Windows that it assigns. The Serializer for the Key is retrieved from the input KeyedStream and the serializer for the input elements is already available. During checkpointing all currently in-fligh windows (per key, per window) are serialized using the TypeSerializers. The state that is accessible in Triggers using state() is kept in a HashMap<String, Serializable>, this is serialized using java serialization. commit 07d96f5cc41ae70006699c3f0a6986252565e3df Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Date: 2015-10-17T11:35:24Z Replace Trigger.onTime by Trigger.onProcessingTime/onEventTime This also renames WatermarkTrigger to EventTimeTrigger and ContinuousWatermarkTrigger to ContinuousEventTimeTrigger. ---- --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---