[FLINK-3575] [docs] Update 'Working With State' section in docs

This closes #1760


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5c3c7345
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5c3c7345
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5c3c7345

Branch: refs/heads/release-1.0
Commit: 5c3c7345d1711989883a787a62ef91a72bf4c4f7
Parents: 91587892
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Thu Mar 3 16:53:53 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Mar 4 20:53:44 2016 +0100

----------------------------------------------------------------------
 docs/apis/streaming/state.md | 219 +++++++++++++++++++++++++++-----------
 1 file changed, 157 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5c3c7345/docs/apis/streaming/state.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/state.md b/docs/apis/streaming/state.md
index df68a1b..f32d504 100644
--- a/docs/apis/streaming/state.md
+++ b/docs/apis/streaming/state.md
@@ -26,7 +26,8 @@ under the License.
 
 All transformations in Flink may look like functions (in the functional 
processing terminology), but
 are in fact stateful operators. You can make *every* transformation (`map`, 
`filter`, etc) stateful
-by declaring local variables or using Flink's state interface. You can 
register any local variable
+by using Flink's state interface or checkpointing instance fields of your 
function. You can register
+any instance field
 as ***managed*** state by implementing an interface. In this case, and also in 
the case of using
 Flink's native state interface, Flink will automatically take consistent 
snapshots of your state
 periodically, and restore its value in the case of a failure.
@@ -34,102 +35,130 @@ periodically, and restore its value in the case of a 
failure.
 The end effect is that updates to any form of state are the same under 
failure-free execution and
 execution under failures.
 
-First, we look at how to make local variables consistent under failures, and 
then we look at
+First, we look at how to make instance fields consistent under failures, and 
then we look at
 Flink's state interface.
 
 By default state checkpoints will be stored in-memory at the JobManager. For 
proper persistence of large
 state, Flink supports storing the checkpoints on file systems (HDFS, S3, or 
any mounted POSIX file system),
 which can be configured in the `flink-conf.yaml` or via 
`StreamExecutionEnvironment.setStateBackend(…)`.
+See [state backends]({{ site.baseurl }}/apis/streaming/state_backends.html) 
for information
+about the available state backends and how to configure them.
 
 * ToC
 {:toc}
 
+## Using the Key/Value State Interface
 
-## Checkpointing Local Variables
+The Key/Value state interface provides access to different types of state that 
are all scoped to
+the key of the current input element. This means that this type of state can 
only be used
+on a `KeyedStream`, which can be created via `stream.keyBy(…)`.
 
-Local variables can be checkpointed by using the `Checkpointed` interface.
+Now, we will first look at the different types of state available and then we 
will see
+how they can be used in a program. The available state primitives are:
 
-When the user-defined function implements the `Checkpointed` interface, the 
`snapshotState(…)` and `restoreState(…)`
-methods will be executed to draw and restore function state.
+* `ValueState<T>`: This keeps a value that can be updated and
+retrieved (scoped to key of the input element, mentioned above, so there will 
possibly be one value
+for each key that the operation sees). The value can be set using `update(T)` 
and retrieved using
+`T value()`.
 
-In addition to that, user functions can also implement the 
`CheckpointNotifier` interface to receive notifications on
-completed checkpoints via the `notifyCheckpointComplete(long checkpointId)` 
method.
-Note that there is no guarantee for the user function to receive a 
notification if a failure happens between
-checkpoint completion and notification. The notifications should hence be 
treated in a way that notifications from
-later checkpoints can subsume missing notifications.
+* `ListState<T>`: This keeps a list of elements. You can append elements and 
retrieve an `Iterable`
+over all currently stored elements. Elements are added using `add(T)`, the 
Iterable can
+be retrieved using `Iterable<T> get()`.
 
-For example the same counting, reduce function shown for `OperatorState`s by 
using the `Checkpointed` interface instead:
+* `ReducingState<T>`: This keeps a single value that represents the 
aggregation of all values
+added to the state. The interface is the same as for `ListState` but elements 
added using
+`add(T)` are reduced to an aggregate using a specified `ReduceFunction`.
 
-{% highlight java %}
-public class CounterSum extends ReduceFunction<Long>, Checkpointed<Long> {
+All types of state also have a method `clear()` that clears the state for the 
currently
+active key (i.e. the key of the input element).
 
-    // persistent counter
-    private long counter = 0;
+It is important to keep in mind that these state objects are only used for 
interfacing
+with state. The state is not necessarily stored inside but might reside on 
disk or somewhere else.
+The second thing to keep in mind is that the value you get from the state
+depend on the key of the input element. So the value you get in one invocation 
of your
+user function can be different from the one you get in another invocation if 
the key of
+the element is different.
 
-    @Override
-    public Long reduce(Long value1, Long value2) {
-        counter++;
-        return value1 + value2;
-    }
+To get a state handle you have to create a `StateDescriptor` this holds the 
name of the state
+(as we will later see you can create several states, and they have to have 
unique names so
+that you can reference them), the type of the values that the state holds and 
possibly
+a user-specified function, such as a `ReduceFunction`. Depending on what type 
of state you
+want to retrieve you create one of `ValueStateDescriptor`, 
`ListStateDescriptor` or
+`ReducingStateDescriptor`.
 
-    // regularly persists state during normal operation
-    @Override
-    public Serializable snapshotState(long checkpointId, long 
checkpointTimestamp) {
-        return counter;
-    }
+State is accessed using the `RuntimeContext`, so it is only possible in *rich 
functions*.
+Please see [here]({{ site.baseurl 
}}/apis/common/#specifying-transformation-functions) for
+information about that but we will also see an example shortly. The 
`RuntimeContext` that
+is available in a `RichFunction` has these methods for accessing state:
 
-    // restores state on recovery from failure
-    @Override
-    public void restoreState(Long state) {
-        counter = state;
-    }
-}
-{% endhighlight %}
+* `ValueState<T> getState(ValueStateDescriptor<T>)`
+* `ReducingState<T> getReducingState(ReducingStateDescriptor<T>)`
+* `ListState<T> getListState(ListStateDescriptor<T>)`
 
-## Using the Key/Value State Interface
+This is an example `FlatMapFunction` that shows how all of the parts fit 
together:
 
-The state interface gives access to key/value states, which are a collection 
of key/value pairs.
-Because the state is partitioned by the keys (distributed accross workers), it 
can only be used
-on the `KeyedStream`, created via `stream.keyBy(…)` (which means also that 
it is usable in all
-types of functions on keyed windows).
+{% highlight java %}
+public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, 
Long>, Tuple2<Long, Long>> {
 
-The handle to the state can be obtained from the function's `RuntimeContext`. 
The state handle will
-then give access to the value mapped under the key of the current record or 
window - each key consequently
-has its own value.
+    /**
+     * The ValueState handle. The first field is the count, the second field a 
running sum.
+     */
+    private transient ValueState<Tuple2<Long, Long>> sum;
 
-The following code sample shows how to use the key/value state inside a reduce 
function.
-When creating the state handle, one needs to supply a name for that state (a 
function can have multiple states
-of different types), the type of the state (used to create efficient 
serializers), and the default value (returned
-as a value for keys that do not yet have a value associated).
+    @Override
+    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, 
Long>> out) throws Exception {
 
-{% highlight java %}
-public class CounterSum extends RichReduceFunction<Long> {
+        // access the state value
+        Tuple2<Long, Long> currentSum = sum.value();
 
-    /** The state handle */
-    private OperatorState<Long> counter;
+        // update the count
+        currentSum.f0 += 1;
 
-    @Override
-    public Long reduce(Long value1, Long value2) {
-        counter.update(counter.value() + 1);
-        return value1 + value2;
+        // add the second field of the input value
+        currentSum.f1 += input.f1;
+
+        // update the state
+        sum.update(currentSum);
+
+        // if the count reaches 2, emit the average and clear the state
+        if (currentSum.f0 >= 2) {
+            out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
+            sum.clear();
+        }
     }
 
     @Override
     public void open(Configuration config) {
-        counter = getRuntimeContext().getKeyValueState("myCounter", 
Long.class, 0L);
+        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
+                new ValueStateDescriptor<>(
+                        "average", // the state name
+                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() 
{}), // type information
+                        Tuple2.of(0L, 0L)); // default value of the state, if 
nothing was set
+        sum = getRuntimeContext().getState(descriptor);
     }
 }
+
+// this can be used in a streaming program like this (assuming we have a 
StreamExecutionEnvironment env)
+env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), 
Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
+        .keyBy(0)
+        .flatMap(new CountWindowAverage())
+        .print();
+
+// the printed output will be (1,4) and (1,5)
 {% endhighlight %}
 
-State updated by this is usually kept locally inside the flink process (unless 
one configures explicitly
-an external state backend). This means that lookups and updates are process 
local and this very fast.
+This example implements a poor man's counting window. We key the tuples by the 
first field
+(in the example all have the same key `1`). The function stores the count and 
a running sum in
+a `ValueState`, once the count reaches 2 it will emit the average and clear 
the state so that
+we start over from `0`. Note that this would keep a different state value for 
each different input
+key if we had tuples with different values in the first field.
 
-The important implication of having the keys set implicitly is that it forces 
programs to group the stream
-by key (via the `keyBy()` function), making the key partitioning transparent 
to Flink. That allows the system
-to efficiently restore and redistribute keys and state.
+### State in the Scala DataStream API
 
-The Scala API has shortcuts that for stateful `map()` or `flatMap()` functions 
on `KeyedStream`, which give the
-state of the current key as an option directly into the function, and return 
the result with a state update:
+In addition to the interface described above, the Scala API has shortcuts for 
stateful
+`map()` or `flatMap()` functions with a single `ValueState` on `KeyedStream`. 
The user function
+gets the current value of the `ValueState` in an `Option` and must return an 
updated value that
+will be used to update the state.
 
 {% highlight scala %}
 val stream: DataStream[(String, Int)] = ...
@@ -143,6 +172,70 @@ val counts: DataStream[(String, Int)] = stream
     })
 {% endhighlight %}
 
+## Checkpointing Instance Fields
+
+Instance fields can be checkpointed by using the `Checkpointed` interface.
+
+When the user-defined function implements the `Checkpointed` interface, the 
`snapshotState(…)` and `restoreState(…)`
+methods will be executed to draw and restore function state.
+
+In addition to that, user functions can also implement the 
`CheckpointNotifier` interface to receive notifications on
+completed checkpoints via the `notifyCheckpointComplete(long checkpointId)` 
method.
+Note that there is no guarantee for the user function to receive a 
notification if a failure happens between
+checkpoint completion and notification. The notifications should hence be 
treated in a way that notifications from
+later checkpoints can subsume missing notifications.
+
+The above example for `ValueState` can be implemented using instance fields 
like this:
+
+{% highlight java %}
+
+public class CountWindowAverage
+        extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
+        implements Checkpointed<Tuple2<Long, Long>> {
+
+    private Tuple2<Long, Long> sum = null;
+
+    @Override
+    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, 
Long>> out) throws Exception {
+
+        // update the count
+        sum.f0 += 1;
+
+        // add the second field of the input value
+        sum.f1 += input.f1;
+
+
+        // if the count reaches 2, emit the average and clear the state
+        if (sum.f0 >= 2) {
+            out.collect(new Tuple2<>(input.f0, sum.f1 / sum.f0));
+            sum = Tuple2.of(0L, 0L);
+        }
+    }
+
+    @Override
+    public void open(Configuration config) {
+        if (sum == null) {
+            // only recreate if null
+            // restoreState will be called before open()
+            // so this will already set the sum to the restored value
+            sum = Tuple2.of(0L, 0L);
+        }
+    }
+
+    // regularly persists state during normal operation
+    @Override
+    public Serializable snapshotState(long checkpointId, long 
checkpointTimestamp) {
+        return sum;
+    }
+
+    // restores state on recovery from failure
+    @Override
+    public void restoreState(Tuple2<Long, Long> state) {
+        sum = state;
+    }
+}
+{% endhighlight %}
+
 ## Stateful Source Functions
 
 Stateful sources require a bit more care as opposed to other operators.
@@ -150,7 +243,9 @@ In order to make the updates to the state and output 
collection atomic (required
 on failure/recovery), the user is required to get a lock from the source's 
context.
 
 {% highlight java %}
-public static class CounterSource extends RichParallelSourceFunction<Long>, 
Checkpointed<Long> {
+public static class CounterSource
+        extends RichParallelSourceFunction<Long>
+        implements Checkpointed<Long> {
 
     /**  current offset for exactly once semantics */
     private long offset;

Reply via email to