Repository: flink Updated Branches: refs/heads/master 490162259 -> e09c4c530
[docs][state] add missing Java syntax highlighting to documentation Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/35d63477 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/35d63477 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/35d63477 Branch: refs/heads/master Commit: 35d63477446f47f523315576592d06ffe95a5fcd Parents: 4901622 Author: Nico Kruber <n...@data-artisans.com> Authored: Mon Feb 6 11:03:45 2017 +0100 Committer: Ufuk Celebi <u...@apache.org> Committed: Mon Feb 6 17:09:18 2017 +0100 ---------------------------------------------------------------------- docs/dev/stream/state.md | 88 +++++++++++++++++++++++-------------------- 1 file changed, 48 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/35d63477/docs/dev/stream/state.md ---------------------------------------------------------------------- diff --git a/docs/dev/stream/state.md b/docs/dev/stream/state.md index 124ce68..4d1cfab 100644 --- a/docs/dev/stream/state.md +++ b/docs/dev/stream/state.md @@ -230,9 +230,11 @@ while `(test2, 2)` will go to task 1. The `ListCheckpointed` interface requires the implementation of two methods: - List<T> snapshotState(long checkpointId, long timestamp) throws Exception; +{% highlight java %} +List<T> snapshotState(long checkpointId, long timestamp) throws Exception; - void restoreState(List<T> state) throws Exception; +void restoreState(List<T> state) throws Exception; +{% endhighlight %} On `snapshotState()` the operator should return a list of objects to checkpoint and `restoreState` has to handle such a list upon recovery. If the state is not re-partitionable, you can always @@ -242,9 +244,11 @@ return a `Collections.singletonList(MY_STATE)` in the `snapshotState()`. The `CheckpointedFunction` interface also requires the implementation of two methods: - void snapshotState(FunctionSnapshotContext context) throws Exception; +{% highlight java %} +void snapshotState(FunctionSnapshotContext context) throws Exception; - void initializeState(FunctionInitializationContext context) throws Exception; +void initializeState(FunctionInitializationContext context) throws Exception; +{% endhighlight %} Whenever a checkpoint has to be performed `snapshotState()` is called. The counterpart, `initializeState()`, is called every time the user-defined function is initialized, be that when the function is first initialized or be that when actually recovering from an earlier checkpoint. Given this, `initializeState()` is not @@ -253,57 +257,61 @@ only the place where different types of state are initialized, but also where st This is an example of a function that uses `CheckpointedFunction`, a stateful `SinkFunction` that uses state to buffer elements before sending them to the outside world: - public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>, - CheckpointedFunction, CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> { +{% highlight java %} +public class BufferingSink + implements SinkFunction<Tuple2<String, Integer>>, + CheckpointedFunction, + CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> { - private final int threshold; + private final int threshold; - private transient ListState<Tuple2<String, Integer>> checkpointedState; + private transient ListState<Tuple2<String, Integer>> checkpointedState; - private List<Tuple2<String, Integer>> bufferedElements; + private List<Tuple2<String, Integer>> bufferedElements; - public BufferingSink(int threshold) { - this.threshold = threshold; - this.bufferedElements = new ArrayList<>(); - } + public BufferingSink(int threshold) { + this.threshold = threshold; + this.bufferedElements = new ArrayList<>(); + } - @Override - public void invoke(Tuple2<String, Integer> value) throws Exception { - bufferedElements.add(value); - if (bufferedElements.size() == threshold) { - for (Tuple2<String, Integer> element: bufferedElements) { - // send it to the sink - } - bufferedElements.clear(); + @Override + public void invoke(Tuple2<String, Integer> value) throws Exception { + bufferedElements.add(value); + if (bufferedElements.size() == threshold) { + for (Tuple2<String, Integer> element: bufferedElements) { + // send it to the sink } + bufferedElements.clear(); } + } - @Override - public void snapshotState(FunctionSnapshotContext context) throws Exception { - checkpointedState.clear(); - for (Tuple2<String, Integer> element : bufferedElements) { - checkpointedState.add(element); - } + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + checkpointedState.clear(); + for (Tuple2<String, Integer> element : bufferedElements) { + checkpointedState.add(element); } + } - @Override - public void initializeState(FunctionInitializationContext context) throws Exception { - checkpointedState = context.getOperatorStateStore(). - getSerializableListState("buffered-elements"); + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + checkpointedState = context.getOperatorStateStore(). + getSerializableListState("buffered-elements"); - if (context.isRestored()) { - for (Tuple2<String, Integer> element : checkpointedState.get()) { - bufferedElements.add(element); - } + if (context.isRestored()) { + for (Tuple2<String, Integer> element : checkpointedState.get()) { + bufferedElements.add(element); } } + } - @Override - public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception { - // this is from the CheckpointedRestoring interface. - this.bufferedElements.addAll(state); - } + @Override + public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception { + // this is from the CheckpointedRestoring interface. + this.bufferedElements.addAll(state); } +} +{% endhighlight %} The `initializeState` method takes as argument a `FunctionInitializationContext`. This is used to initialize