This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
     new b11b010  [FLINK-14825][state-processor-api][docs] Rework state 
processor api documentation
b11b010 is described below

commit b11b010aaacfc6e65d5c703d22e39e642121ce38
Author: Seth Wiesman <sjwies...@gmail.com>
AuthorDate: Mon Dec 9 15:48:02 2019 -0600

    [FLINK-14825][state-processor-api][docs] Rework state processor api 
documentation
---
 docs/dev/libs/state_processor_api.md               | 557 +++++++++++++--------
 docs/dev/libs/state_processor_api.zh.md            | 557 +++++++++++++--------
 .../fig/application-my-app-state-processor-api.png | Bin 0 -> 49938 bytes
 docs/fig/database-my-app-state-processor-api.png   | Bin 0 -> 50174 bytes
 4 files changed, 712 insertions(+), 402 deletions(-)

diff --git a/docs/dev/libs/state_processor_api.md 
b/docs/dev/libs/state_processor_api.md
index acde295..df5ecdc 100644
--- a/docs/dev/libs/state_processor_api.md
+++ b/docs/dev/libs/state_processor_api.md
@@ -23,166 +23,103 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-Apache Flink's State Processor API provides powerful functionality to reading, 
writing, and modifing savepoints and checkpoints using Flink’s batch DataSet 
api.
-This is useful for tasks such as analyzing state for interesting patterns, 
troubleshooting or auditing jobs by checking for discrepancies, and 
bootstrapping state for new applications.
+Apache Flink's State Processor API provides powerful functionality to reading, 
writing, and modifing savepoints and checkpoints using Flink’s batch DataSet 
API.
+Due to the [interoperability of DataSet and Table 
API](https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#integration-with-datastream-and-dataset-api),
 you can even use relational Table API or SQL queries to analyze and process 
state data.
+
+For example, you can take a savepoint of a running stream processing 
application and analyze it with a DataSet batch program to verify that the 
application behaves correctly.
+Or you can read a batch of data from any store, preprocess it, and write the 
result to a savepoint that you use to bootstrap the state of a streaming 
application.
+It is also possible to fix inconsistent state entries.
+Finally, the State Processor API opens up many ways to evolve a stateful 
application that was previously blocked by parameter and design choices that 
could not be changed without losing all the state of the application after it 
was started.
+For example, you can now arbitrarily modify the data types of states, adjust 
the maximum parallelism of operators, split or merge operator state, re-assign 
operator UIDs, and so on.
+
+To get started with the state processor api, include the following library in 
your application.
+
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-state-processor-api{{ site.scala_version_suffix 
}}</artifactId>
+  <version>{{site.version}}</version>
+  <scope>provided</scope>
+</dependency>
+{% endhighlight %}
 
 * This will be replaced by the TOC
 {:toc}
 
-## Abstraction
+## Mapping Application State to DataSets 
 
-To understand how to best interact with savepoints in a batch context it is 
important to have a clear mental model of how the data in Flink state relates 
to a traditional relational database.
+The State Processor API maps the state of a streaming application to one or 
more data sets that can be processed separately.
+In order to be able to use the API, you need to understand how this mapping 
works.
 
-A database can be thought of as one or more namespaces, each containing a 
collection of tables.
-Those tables in turn contain columns whose values have some intrinsic 
relationship between them, such as being scoped under the same key.
+But let us first have a look at what a stateful Flink job looks like.
+A Flink job is composed of operators; typically one or more source operators, 
a few operators for the actual processing, and one or more sink operators.
+Each operator runs in parallel in one or more tasks and can work with 
different types of state.
+An operator can have zero, one, or more *“operator states”* which are 
organized as lists that are scoped to the operator's tasks.
+If the operator is applied on a keyed stream, it can also have zero, one, or 
more *“keyed states”* which are scoped to a key that is extracted from each 
processed record.
+You can think of keyed state as a distributed key-value map. 
 
-A savepoint represents the state of a Flink job at a particular point in time 
which is made up of many operators.
-Those operators contain various kinds of state, both partitioned or keyed 
state, and non-partitioned or operator state. 
+The following figure shows the application “MyApp” which consists of three 
operators called “Src”, “Proc”, and “Snk”.
+Src has one operator state (os1), Proc has one operator state (os2) and two 
keyed states (ks1, ks2) and Snk is stateless.
 
-<div data-lang="java" markdown="1">
-{% highlight java %}
-MapStateDescriptor<Integer, Double> CURRENCY_RATES = new 
MapStateDescriptor<>("rates", Types.INT, Types.DOUBLE);
- 
-class CurrencyConverter extends BroadcastProcessFunction<Transaction, 
CurrencyRate, Transaction> {
- 
-  public void processElement(
-        Transaction value,
-        ReadOnlyContext ctx,
-        Collector<Transaction> out) throws Exception {
- 
-     Double rate = ctx.getBroadcastState(CURRENCY_RATES).get(value.currencyId);
-     if (rate != null) {
-        value.amount *= rate;
-     }
-     out.collect(value);
-  }
- 
-  public void processBroadcastElement(
-        CurrencyRate value,
-        Context ctx,
-        Collector<Transaction> out) throws Exception {
-        ctx.getBroadcastState(CURRENCY_RATES).put(value.currencyId, 
value.rate);
-  }
-}
-  
-class Summarize extends RichFlatMapFunction<Transaction, Summary> {
-  transient ValueState<Double> totalState;
-  transient ValueState<Integer> countState;
- 
-  public void open(Configuration configuration) throws Exception {
-     totalState = getRuntimeContext().getState(new 
ValueStateDescriptor<>("total", Types.DOUBLE));
-     countState = getRuntimeContext().getState(new 
ValueStateDescriptor<>("count", Types.INT));
-  }
- 
-  public void flatMap(Transaction value, Collector<Summary> out) throws 
Exception {
-     Summary summary = new Summary();
-     summary.total = value.amount;
-     summary.count = 1;
- 
-     Double currentTotal = totalState.value();
-     if (currentTotal != null) {
-        summary.total += currentTotal;
-     }
- 
-     Integer currentCount = countState.value();
-     if (currentCount != null) {
-        summary.count += currentCount;
-     }
-     countState.update(summary.count);
- 
-     out.collect(summary);
-  }
-}
- 
-DataStream<Transaction> transactions = . . .
-BroadcastStream<CurrencyRate> rates = . . .
-transactions
-  .connect(rates)
-  .process(new CurrencyConverter())
-  .uid("currency_converter")
-  .keyBy(transaction -> transaction.accountId)
-  .flatMap(new Summarize())
-  .uid("summarize")
-{% endhighlight %}
-</div>
+<p style="display: block; text-align: center; margin-top: 20px; margin-bottom: 
20px">
+       <img src="{{ site.baseurl 
}}/fig/application-my-app-state-processor-api.png" width="600px" 
alt="Application: MyApp"/>
+</p>
 
-This job contains multiple operators along with various kinds of state.
-When analyzing that state we can first scope data by its operator, named by 
setting its uid.
-Within each operator we can look at the registered states.
-`CurrencyConverter` has a broadcast state, which is a type of non-partitioned 
operator state.
-In general, there is no relationship between any two elements in an operator 
state and so we can look at each value as being its own row.
-Contrast this with Summarize, which contains two keyed states.
-Because both states are scoped under the same key we can safely assume there 
exists some relationship between the two values.
-Therefore, keyed state is best understood as a single table per operator 
containing one _key_ column along with _n_ value columns, one for each 
registered state.
-All of this means that the state for this job could be described using the 
following pseudo-sql commands. 
+A savepoint or checkpoint of MyApp consists of the data of all states, 
organized in a way that the states of each task can be restored.
+When processing the data of a savepoint (or checkpoint) with a batch job, we 
need a mental model that maps the data of the individual tasks' states into 
data sets or tables.
+In fact, we can think of a savepoint as a database. Every operator (identified 
by its UID) represents a namespace.
+Each operator state of an operator is mapped to a dedicated table in the 
namespace with a single column that holds the state's data of all tasks.
+All keyed states of an operator are mapped to a single table consisting of a 
column for the key, and one column for each keyed state.
+The following figure shows how a savepoint of MyApp is mapped to a database.
 
-{% highlight sql %}
-CREATE NAMESPACE currency_converter;
- 
-CREATE TABLE currency_converter.rates (
-   value Tuple2<Integer, Double>
-);
- 
-CREATE NAMESPACE summarize;
- 
-CREATE TABLE summarize.keyed_state (
-   key   INTEGER PRIMARY KEY,
-   total DOUBLE,
-   count INTEGER
-);
-{% endhighlight %}
-
-In general, the savepoint ↔ database relationship can be summarized as:
+<p style="display: block; text-align: center; margin-top: 20px; margin-bottom: 
20px">
+       <img src="{{ site.baseurl 
}}/fig/database-my-app-state-processor-api.png" width="600px" alt="Database: 
MyApp"/>
+</p>
 
-    * A savepoint is a database
-    * An operator is a namespace named by its uid
-    * Each operator state represents a single table
-        * Each element in an operator state represents a single row in that 
table
-    * Each operator containing keyed state has a single “keyed_state” table
-        * Each keyed_state table has one key column mapping the key value of 
the operator
-        * Each registered state represents a single column in the table
-        * Each row in the table maps to a single key
+The figure shows how the values of Src's operator state are mapped to a table 
with one column and five rows, one row for each of the list entries across all 
parallel tasks of Src.
+Operator state os2 of the operator “Proc” is similarly mapped to an individual 
table.
+The keyed states ks1 and ks2 are combined to a single table with three 
columns, one for the key, one for ks1 and one for ks2.
+The keyed table holds one row for each distinct key of both keyed states.
+Since the operator “Snk” does not have any state, its namespace is empty.
 
 ## Reading State
 
-Reading state begins by specifiying the path to a valid savepoint or 
checkpoint along with the `StateBackend` that should be used to restore the 
data.
-The compatability guarantees for restoring state are identical to those when 
restoring a `DataStream` application.
+Reading state begins by specifying the path to a valid savepoint or checkpoint 
along with the `StateBackend` that should be used to restore the data.
+The compatibility guarantees for restoring state are identical to those when 
restoring a `DataStream` application.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
 ExecutionEnvironment bEnv   = ExecutionEnvironment.getExecutionEnvironment();
-ExistingSavepoint savepoint = Savepoint.load(bEnv, "hdfs://path/", new 
RocksDBStateBackend());
+ExistingSavepoint savepoint = Savepoint.load(bEnv, "hdfs://path/", new 
MemoryStateBackend());
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-val bEnv      = ExecutionEnvironment.getExecutionEnvironment()
-val savepoint = Savepoint.load(bEnv, "hdfs://path/", new RocksDBStateBackend())
+val bEnv      = ExecutionEnvironment.getExecutionEnvironment
+val savepoint = Savepoint.load(bEnv, "hdfs://path/", new MemoryStateBackend)
 {% endhighlight %}
 </div>
 </div>
 
-When reading operator state, simply specify the operator uid, state name, and 
type information.
+### Operator State
+
+[Operator state]({{ site.baseurl 
}}/dev/stream/state/state.html#operator-state) is any non-keyed state in Flink.
+This includes, but is not limited to, any use of `CheckpointedFunction` or 
`BroadcastState` within an application.
+When reading operator state, users specify the operator uid, the state name, 
and the type information.
+
+#### Operator List State
+
+Operator state stored in a `CheckpointedFunction` using `getListState` can be 
read using `ExistingSavepoint#readListState`.
+The state name and type information should match those used to define the 
`ListStateDescriptor` that declared this state in the DataStream application.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-DataSet<Integer> listState  = savepoint.readListState(
+DataSet<Integer> listState  = savepoint.readListState<>(
     "my-uid",
     "list-state",
     Types.INT);
-
-DataSet<Integer> unionState = savepoint.readUnionState(
-    "my-uid",
-    "union-state",
-    Types.INT);
- 
-DataSet<Tuple2<Integer, Integer>> broadcastState = 
savepoint.readBroadcastState(
-    "my-uid",
-    "broadcast-state",
-    Types.INT,
-    Types.INT);
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
@@ -191,12 +128,54 @@ val listState  = savepoint.readListState(
     "my-uid",
     "list-state",
     Types.INT)
+{% endhighlight %}
+</div>
+</div>
 
-val unionState = savepoint.readUnionState(
+#### Operator Union List State
+
+Operator state stored in a `CheckpointedFunction` using `getUnionListState` 
can be read using `ExistingSavepoint#readUnionState`.
+The state name and type information should match those used to define the 
`ListStateDescriptor` that declared this state in the DataStream application.
+The framework will return a _single_ copy of the state, equivalent to 
restoring a DataStream with parallelism 1.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataSet<Integer> listState  = savepoint.readUnionState<>(
+    "my-uid",
+    "union-state",
+    Types.INT);
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val listState  = savepoint.readUnionState(
     "my-uid",
     "union-state",
     Types.INT)
- 
+{% endhighlight %}
+</div>
+</div>
+
+
+#### Broadcast State
+
+[BroadcastState]({{ site.baseurl }} /dev/stream/state/broadcast_state.html) 
can be read using `ExistingSavepoint#readBroadcastState`.
+The state name and type information should match those used to define the 
`MapStateDescriptor` that declared this state in the DataStream application.
+The framework will return a _single_ copy of the state, equivalent to 
restoring a DataStream with parallelism 1.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataSet<Tuple2<Integer, Integer>> broadcastState = 
savepoint.readBroadcastState<>(
+    "my-uid",
+    "broadcast-state",
+    Types.INT,
+    Types.INT);
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
 val broadcastState = savepoint.readBroadcastState(
     "my-uid",
     "broadcast-state",
@@ -206,12 +185,14 @@ val broadcastState = savepoint.readBroadcastState(
 </div>
 </div>
 
-A custom `TypeSerializer` may also be specified if one was used in the 
`StateDescriptor` for the state.
+#### Using Custom Serializers
+
+Each of the operator state readers support using custom `TypeSerializers` if 
one was used to define the `StateDescriptor` that wrote out the state. 
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-DataSet<Integer> listState = savepoint.readListState(
+DataSet<Integer> listState = savepoint.readListState<>(
     "uid",
     "list-state", 
     Types.INT,
@@ -224,12 +205,17 @@ val listState = savepoint.readListState(
     "uid",
     "list-state", 
     Types.INT,
-    new MyCustomIntSerializer())
+    new MyCustomIntSerializer)
 {% endhighlight %}
 </div>
 </div>
 
-When reading keyed state, users specify a KeyedStateReaderFunction to allow 
reading arbitrary columns and complex state types such as ListState, MapState, 
and AggregatingState.
+### Keyed State
+
+[Keyed state]({{ site.baseurl }}/dev/stream/state/state.html#keyed-state), or 
partitioned state, is any state that is partitioned relative to a key.
+When reading a keyed state, users specify the operator id and a 
`KeyedStateReaderFunction<KeyType, OutputType>`.
+
+The `KeyedStateReaderFunction` allows users to read arbitrary columns and 
complex state types such as ListState, MapState, and AggregatingState.
 This means if an operator contains a stateful process function such as:
 
 <div class="codetabs" markdown="1">
@@ -239,55 +225,80 @@ public class StatefulFunctionWithTime extends 
KeyedProcessFunction<Integer, Inte
  
    ValueState<Integer> state;
  
+   ListState<Long> updateTimes;
+
    @Override
    public void open(Configuration parameters) {
       ValueStateDescriptor<Integer> stateDescriptor = new 
ValueStateDescriptor<>("state", Types.INT);
       state = getRuntimeContext().getState(stateDescriptor);
+
+      ListStateDescriptor<Long> updateDescriptor = new 
ListStateDescriptor<>("times", Types.LONG);
+      updateTimes = getRuntimeContext().getListState(updateDescriptor);
    }
  
    @Override
    public void processElement(Integer value, Context ctx, Collector<Void> out) 
throws Exception {
       state.update(value + 1);
+      updateTimes.add(System.currentTimeMillis());
    }
 }
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-public class StatefulFunctionWithTime extends KeyedProcessFunction[Integer, 
Integer, Void] {
+class StatefulFunctionWithTime extends KeyedProcessFunction[Integer, Integer, 
Void] {
  
-   var state: ValueState[Integer];
+   var state: ValueState[Integer] = _
  
-   override def open(parameters: Configuration) {
-      val stateDescriptor = new ValueStateDescriptor("state", Types.INT);
-      state = getRuntimeContext().getState(stateDescriptor);
+   var updateTimes: ListState[Long] = _ 
+
+   @throws[Exception]
+   override def open(parameters: Configuration): Unit {
+      val stateDescriptor = new ValueStateDescriptor("state", Types.INT)
+      state = getRuntimeContext().getState(stateDescriptor)
+
+      val updateDescirptor = new ListStateDescriptor("times", Types.LONG)
+      updateTimes = getRuntimeContext().getListState(updateDescriptor)
    }
  
-   override def processElement(value: Integer, ctx: Context, out: 
Collector[Void]) {
-      state.update(value + 1);
+   @throws[Exception]
+   override def processElement(value: Integer, ctx: Context, out: 
Collector[Void]): Unit = {
+      state.update(value + 1)
+      updateTimes.add(System.currentTimeMillis)
    }
 }
 {% endhighlight %}
 </div>
 </div>
 
-Then it can read by defining an output type and corresponding 
KeyedStateReaderFunction. 
+Then it can read by defining an output type and corresponding 
`KeyedStateReaderFunction`. 
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-class KeyedState {
-  Integer key;
-  Integer value;
+DataSet<KeyedState> keyedState = savepoint.readKeyedState("my-uid", new 
ReaderFunction());
+
+public class KeyedState {
+  public int key;
+
+  public int value;
+
+  public List<Long> times;
 }
  
-class ReaderFunction extends KeyedStateReaderFunction<Integer, KeyedState> {
+public class ReaderFunction extends KeyedStateReaderFunction<Integer, 
KeyedState> {
+
   ValueState<Integer> state;
  
+  ListState<Long> updateTimes;
+
   @Override
   public void open(Configuration parameters) {
-     ValueStateDescriptor<Integer> stateDescriptor = new 
ValueStateDescriptor<>("state", Types.INT);
-     state = getRuntimeContext().getState(stateDescriptor);
+    ValueStateDescriptor<Integer> stateDescriptor = new 
ValueStateDescriptor<>("state", Types.INT);
+    state = getRuntimeContext().getState(stateDescriptor);
+
+    ListStateDescriptor<Long> updateDescriptor = new 
ListStateDescriptor<>("times", Types.LONG);
+    updateTimes = getRuntimeContext().getListState(updateDescriptor);
   }
  
   @Override
@@ -295,58 +306,221 @@ class ReaderFunction extends 
KeyedStateReaderFunction<Integer, KeyedState> {
     Integer key,
     Context ctx,
     Collector<KeyedState> out) throws Exception {
- 
-     KeyedState data = new KeyedState();
-     data.key    = key;
-     data.value  = state.value();
-     out.collect(data);
+        
+    KeyedState data = new KeyedState();
+    data.key    = key;
+    data.value  = state.value();
+    data.times  = StreamSupport
+      .stream(updateTimes.get().spliterator(), false)
+      .collect(Collectors.toList());
+
+    out.collect(data);
   }
 }
- 
-DataSet<KeyedState> keyedState = savepoint.readKeyedState("my-uid", new 
ReaderFunction());
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-case class KeyedState(key: Int, value: Int)
+val keyedState = savepoint.readKeyedState("my-uid", new ReaderFunction)
+
+case class KeyedState(key: Int, value: Int, List[Long])
  
 class ReaderFunction extends KeyedStateReaderFunction[Integer, KeyedState] {
-  var state: ValueState[Integer];
  
-  override def open(parameters: Configuration) {
-     val stateDescriptor = new ValueStateDescriptor("state", Types.INT);
-     state = getRuntimeContext().getState(stateDescriptor);
-  }
+  var state: ValueState[Integer] = _
+
+  var updateTimes: ListState[Long] = _
  
+  @throws[Exception]
+  override def open(parameters: Configuration): Unit {
+     val stateDescriptor = new ValueStateDescriptor("state", Types.INT)
+     state = getRuntimeContext().getState(stateDescriptor)
+
+      val updateDescirptor = new ListStateDescriptor("times", Types.LONG)
+      updateTimes = getRuntimeContext().getListState(updateDescriptor)
+    }
+ 
+
+  @throws[Exception]
   override def processKey(
     key: Int,
     ctx: Context,
-    out: Collector[Keyedstate]) throws Exception {
+    out: Collector[Keyedstate]): Unit {
  
-     val data = KeyedState(key, state.value())
-     out.collect(data);
+     val data = KeyedState(key, state.value(), updateTimes.get.asScala.toList)
+     out.collect(data)
   }
 }
-
-val keyedState = savepoint.readKeyedState("my-uid", new ReaderFunction());
 {% endhighlight %}
 </div>
 </div>
 
-{% panel **Note:** When using a `KeyedStateReaderFunction` all state 
descriptors must be registered eagerly inside of open. Any attempt to call 
`RuntimeContext#getState`, `RuntimeContext#getListState`, or 
`RuntimeContext#getMapState` will result in a `RuntimeException`. %}
+Along with reading registered state values, each key has access to a `Context` 
with metadata such as registered event time and processing time timers.
+
+{% panel **Note:** When using a `KeyedStateReaderFunction`, all state 
descriptors must be registered eagerly inside of open. Any attempt to call a 
`RuntimeContext#get*State` will result in a `RuntimeException`. %}
 
 ## Writing New Savepoints
 
-State writers are based around the abstraction of `Savepoint`, where one 
`Savepoint` may have many operators and the state for any particular operator 
is created using a `BootstrapTransformation`.
+`Savepoint`'s may also be written, which allows such use cases as 
bootstrapping state based on historical data.
+Each savepoint is made up of one or more `BootstrapTransformation`'s 
(explained below), each of which defines the state for an individual operator.
 
-A `BootstrapTransformation` starts with a `DataSet` containing the values that 
are to be written into state.
-The transformation may be optionally `keyed` depending on whether or not you 
are writing keyed or operator state.
-Finally a bootstrap function is applied depending to the transformation; Flink 
supplies `KeyedStateBootstrapFunction` for writing keyed state, 
`StateBootstrapFunction` for writing non keyed state, and 
`BroadcastStateBootstrapFunction` for writing broadcast state.
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+int maxParallelism = 128;
+
+Savepoint
+    .create(new MemoryStateBackend(), maxParallelism)
+    .withOperator("uid1", transformation1)
+    .withOperator("uid2", transformation2)
+    .write(savepointPath);
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val maxParallelism = 128
+
+Savepoint
+    .create(new MemoryStateBackend(), maxParallelism)
+    .withOperator("uid1", transformation1)
+    .withOperator("uid2", transformation2)
+    .write(savepointPath)
+{% endhighlight %}
+</div>
+</div>
+
+The [UIDs]({{ site.baseurl}}/ops/state/savepoints.html#assigning-operator-ids) 
associated with each operator must match one to one with the UIDs assigned to 
the operators in your `DataStream` application; these are how Flink knows what 
state maps to which operator.
+
+### Operator State
+
+Simple operator state, using `CheckpointedFunction`, can be created using the 
`StateBootstrapFunction`. 
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public class SimpleBootstrapFunction extends StateBootstrapFunction<Integer> {
+
+    private ListState<Integer> state;
+
+    @Override
+    public void processElement(Integer value, Context ctx) throws Exception {
+        state.add(value);
+    }
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
+    }
+       
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws 
Exception {
+        state = context.getOperatorState().getListState(new 
ListStateDescriptor<>("state", Types.INT));
+    }
+}
+
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnviornment();
+DataSet<Integer> data = env.fromElements(1, 2, 3);
+
+BootstrapTransformation transformation = OperatorTransformation
+    .bootstrapWith(data)
+    .transform(new SimpleBootstrapFunction());
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+class SimpleBootstrapFunction extends StateBootstrapFunction[Integer] {
+
+    var ListState[Integer] state = _
+
+    @throws[Exception]
+    override def processElement(value: Integer, ctx: Context): Unit = {
+        state.add(value)
+    }
+
+    @throws[Exception]
+    override def snapshotState(context: FunctionSnapshotContext): Unit = {
+    }
+       
+    @throws[Exception]
+    override def initializeState(context: FunctionInitializationContext): Unit 
= {
+        state = context.getOperatorState().getListState(new 
ListStateDescriptor("state", Types.INT))
+    }
+}
+
+val env = ExecutionEnvironment.getExecutionEnviornment
+val data = env.fromElements(1, 2, 3)
+
+BootstrapTransformation transformation = OperatorTransformation
+    .bootstrapWith(data)
+    .transform(new SimpleBootstrapFunction)
+{% endhighlight %}
+</div>
+</div>
+
+### Broadcast State
+
+[BroadcastState]({{ site.baseurl }} /dev/stream/state/broadcast_state.html) 
can be written using a `BroadcastStateBootstrapFunction`. Similar to broadcast 
state in the `DataStream` API, the full state must fit in memory. 
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-public  class Account {
+public class CurrencyRate {
+    public String currency;
+
+    public Double rate;
+}
+
+public class CurrencyBootstrapFunction extends 
BroadcastStateBootstrapFunction<CurrencyRate> {
+
+    public static final MapStateDescriptor<String, Double> descriptor = 
+        new MapStateDescriptor<>("currency-rates", Types.STRING, Types.DOUBLE);
+
+    @Override
+    public void processElement(CurrencyRate value, Context ctx) throws 
Exception {
+        ctx.getBroadcastState(descriptor).put(value.currency, value.rate);
+    }
+}
+
+DataSet<CurrencyRate> currencyDataSet = bEnv.fromCollection(
+    new CurrencyRate("USD", 1.0), new CurrencyRate("EUR", 1.3));
+
+BootstrapTransformation<CurrencyRate> broadcastTransformation = 
OperatorTransformation
+    .bootstrapWith(currencyDataSet)
+    .transform(new CurrencyBootstrapFunction());
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+case class CurrencyRate(currency: String, rate: Double)
+
+object CurrencyBootstrapFunction {
+    val descriptor = new MapStateDescriptor("currency-rates", Types.STRING, 
Types.DOUBLE)
+}
+
+class CurrencyBootstrapFunction extends 
BroadcastStateBootstrapFunction[CurrencyRate] {
+
+    @throws[Exception]
+    override processElement(value: CurrencyRate, ctx: Context): Unit = {
+        ctx.getBroadcastState(descriptor).put(value.currency, value.rate)
+    }
+}
+
+val currencyDataSet = bEnv.fromCollection(CurrencyRate("USD", 1.0), 
CurrencyRate("EUR", 1.3))
+
+val broadcastTransformation = OperatorTransformation
+    .bootstrapWith(currencyDataSet)
+    .transform(new CurrencyBootstrapFunction)
+{% endhighlight %}
+</div>
+</div>
+
+### Keyed State
+
+Keyed state for `ProcessFunction`'s and other `RichFunction` types can be 
written using a `KeyedStateBootstrapFunction`.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public class Account {
     public int id;
 
     public double amount;      
@@ -382,12 +556,13 @@ BootstrapTransformation<Account> transformation = 
OperatorTransformation
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 case class Account(id: Int, amount: Double, timestamp: Long)
- 
+
 class AccountBootstrapper extends KeyedStateBootstrapFunction[Integer, 
Account] {
     var state: ValueState[Double]
 
+    @throws[Exception]
     override def open(parameters: Configuration): Unit = {
-        val descriptor = new ValueStateDescriptor[Double]("total",Types.DOUBLE)
+        val descriptor = new ValueStateDescriptor("total",Types.DOUBLE)
         state = getRuntimeContext().getState(descriptor)
     }
 
@@ -403,47 +578,27 @@ val accountDataSet = bEnv.fromCollection(accounts)
 
 val transformation = OperatorTransformation
     .bootstrapWith(accountDataSet)
-    .keyBy(acc -> acc.id)
-    .transform(new AccountBootstrapper())
+    .keyBy(acc => acc.id)
+    .transform(new AccountBootstrapper)
 {% endhighlight %}
 </div>
 </div>
 
 The `KeyedStateBootstrapFunction` supports setting event time and processing 
time timers.
 The timers will not fire inside the bootstrap function and only become active 
once restored within a `DataStream` application.
-If a processing time timer is set but the state is not restored until after 
that time has passed, the timer will fire immediatly upon start.
+If a processing time timer is set but the state is not restored until after 
that time has passed, the timer will fire immediately upon start.
 
-Once one or more transformations have been created they may be combined into a 
single `Savepoint`. 
-`Savepoint`'s are created using a state backend and max parallelism, they may 
contain any number of operators. 
+<span class="label label-danger">Attention</span> If your bootstrap function 
creates timers, the state can only be restored using one of the [process]({{ 
site.baseurl }}/dev/stream/operators/process_function.html) type functions.
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-Savepoint
-    .create(backend, 128)
-    .withOperator("uid1", transformation1)
-    .withOperator("uid2", transformation2)
-    .write(savepointPath);
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-Savepoint
-    .create(backend, 128)
-    .withOperator("uid1", transformation1)
-    .withOperator("uid2", transformation2)
-    .write(savepointPath)
-{% endhighlight %}
-</div>
-</div>
-               
-Besides creating a savepoint from scratch, you can base on off an existing 
savepoint such as when bootstrapping a single new operator for an existing job.
+## Modifying Savepoints
+
+Besides creating a savepoint from scratch, you can base one off an existing 
savepoint such as when bootstrapping a single new operator for an existing job.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
 Savepoint
-    .load(backend, oldPath)
+    .load(bEnv, new MemoryStateBackend(), oldPath)
     .withOperator("uid", transformation)
     .write(newPath);
 {% endhighlight %}
@@ -451,7 +606,7 @@ Savepoint
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 Savepoint
-    .load(backend, oldPath)
+    .load(bEnv, new MemoryStateBackend, oldPath)
     .withOperator("uid", transformation)
     .write(newPath)
 {% endhighlight %}
diff --git a/docs/dev/libs/state_processor_api.zh.md 
b/docs/dev/libs/state_processor_api.zh.md
index acde295..df5ecdc 100644
--- a/docs/dev/libs/state_processor_api.zh.md
+++ b/docs/dev/libs/state_processor_api.zh.md
@@ -23,166 +23,103 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-Apache Flink's State Processor API provides powerful functionality to reading, 
writing, and modifing savepoints and checkpoints using Flink’s batch DataSet 
api.
-This is useful for tasks such as analyzing state for interesting patterns, 
troubleshooting or auditing jobs by checking for discrepancies, and 
bootstrapping state for new applications.
+Apache Flink's State Processor API provides powerful functionality to reading, 
writing, and modifing savepoints and checkpoints using Flink’s batch DataSet 
API.
+Due to the [interoperability of DataSet and Table 
API](https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#integration-with-datastream-and-dataset-api),
 you can even use relational Table API or SQL queries to analyze and process 
state data.
+
+For example, you can take a savepoint of a running stream processing 
application and analyze it with a DataSet batch program to verify that the 
application behaves correctly.
+Or you can read a batch of data from any store, preprocess it, and write the 
result to a savepoint that you use to bootstrap the state of a streaming 
application.
+It is also possible to fix inconsistent state entries.
+Finally, the State Processor API opens up many ways to evolve a stateful 
application that was previously blocked by parameter and design choices that 
could not be changed without losing all the state of the application after it 
was started.
+For example, you can now arbitrarily modify the data types of states, adjust 
the maximum parallelism of operators, split or merge operator state, re-assign 
operator UIDs, and so on.
+
+To get started with the state processor api, include the following library in 
your application.
+
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-state-processor-api{{ site.scala_version_suffix 
}}</artifactId>
+  <version>{{site.version}}</version>
+  <scope>provided</scope>
+</dependency>
+{% endhighlight %}
 
 * This will be replaced by the TOC
 {:toc}
 
-## Abstraction
+## Mapping Application State to DataSets 
 
-To understand how to best interact with savepoints in a batch context it is 
important to have a clear mental model of how the data in Flink state relates 
to a traditional relational database.
+The State Processor API maps the state of a streaming application to one or 
more data sets that can be processed separately.
+In order to be able to use the API, you need to understand how this mapping 
works.
 
-A database can be thought of as one or more namespaces, each containing a 
collection of tables.
-Those tables in turn contain columns whose values have some intrinsic 
relationship between them, such as being scoped under the same key.
+But let us first have a look at what a stateful Flink job looks like.
+A Flink job is composed of operators; typically one or more source operators, 
a few operators for the actual processing, and one or more sink operators.
+Each operator runs in parallel in one or more tasks and can work with 
different types of state.
+An operator can have zero, one, or more *“operator states”* which are 
organized as lists that are scoped to the operator's tasks.
+If the operator is applied on a keyed stream, it can also have zero, one, or 
more *“keyed states”* which are scoped to a key that is extracted from each 
processed record.
+You can think of keyed state as a distributed key-value map. 
 
-A savepoint represents the state of a Flink job at a particular point in time 
which is made up of many operators.
-Those operators contain various kinds of state, both partitioned or keyed 
state, and non-partitioned or operator state. 
+The following figure shows the application “MyApp” which consists of three 
operators called “Src”, “Proc”, and “Snk”.
+Src has one operator state (os1), Proc has one operator state (os2) and two 
keyed states (ks1, ks2) and Snk is stateless.
 
-<div data-lang="java" markdown="1">
-{% highlight java %}
-MapStateDescriptor<Integer, Double> CURRENCY_RATES = new 
MapStateDescriptor<>("rates", Types.INT, Types.DOUBLE);
- 
-class CurrencyConverter extends BroadcastProcessFunction<Transaction, 
CurrencyRate, Transaction> {
- 
-  public void processElement(
-        Transaction value,
-        ReadOnlyContext ctx,
-        Collector<Transaction> out) throws Exception {
- 
-     Double rate = ctx.getBroadcastState(CURRENCY_RATES).get(value.currencyId);
-     if (rate != null) {
-        value.amount *= rate;
-     }
-     out.collect(value);
-  }
- 
-  public void processBroadcastElement(
-        CurrencyRate value,
-        Context ctx,
-        Collector<Transaction> out) throws Exception {
-        ctx.getBroadcastState(CURRENCY_RATES).put(value.currencyId, 
value.rate);
-  }
-}
-  
-class Summarize extends RichFlatMapFunction<Transaction, Summary> {
-  transient ValueState<Double> totalState;
-  transient ValueState<Integer> countState;
- 
-  public void open(Configuration configuration) throws Exception {
-     totalState = getRuntimeContext().getState(new 
ValueStateDescriptor<>("total", Types.DOUBLE));
-     countState = getRuntimeContext().getState(new 
ValueStateDescriptor<>("count", Types.INT));
-  }
- 
-  public void flatMap(Transaction value, Collector<Summary> out) throws 
Exception {
-     Summary summary = new Summary();
-     summary.total = value.amount;
-     summary.count = 1;
- 
-     Double currentTotal = totalState.value();
-     if (currentTotal != null) {
-        summary.total += currentTotal;
-     }
- 
-     Integer currentCount = countState.value();
-     if (currentCount != null) {
-        summary.count += currentCount;
-     }
-     countState.update(summary.count);
- 
-     out.collect(summary);
-  }
-}
- 
-DataStream<Transaction> transactions = . . .
-BroadcastStream<CurrencyRate> rates = . . .
-transactions
-  .connect(rates)
-  .process(new CurrencyConverter())
-  .uid("currency_converter")
-  .keyBy(transaction -> transaction.accountId)
-  .flatMap(new Summarize())
-  .uid("summarize")
-{% endhighlight %}
-</div>
+<p style="display: block; text-align: center; margin-top: 20px; margin-bottom: 
20px">
+       <img src="{{ site.baseurl 
}}/fig/application-my-app-state-processor-api.png" width="600px" 
alt="Application: MyApp"/>
+</p>
 
-This job contains multiple operators along with various kinds of state.
-When analyzing that state we can first scope data by its operator, named by 
setting its uid.
-Within each operator we can look at the registered states.
-`CurrencyConverter` has a broadcast state, which is a type of non-partitioned 
operator state.
-In general, there is no relationship between any two elements in an operator 
state and so we can look at each value as being its own row.
-Contrast this with Summarize, which contains two keyed states.
-Because both states are scoped under the same key we can safely assume there 
exists some relationship between the two values.
-Therefore, keyed state is best understood as a single table per operator 
containing one _key_ column along with _n_ value columns, one for each 
registered state.
-All of this means that the state for this job could be described using the 
following pseudo-sql commands. 
+A savepoint or checkpoint of MyApp consists of the data of all states, 
organized in a way that the states of each task can be restored.
+When processing the data of a savepoint (or checkpoint) with a batch job, we 
need a mental model that maps the data of the individual tasks' states into 
data sets or tables.
+In fact, we can think of a savepoint as a database. Every operator (identified 
by its UID) represents a namespace.
+Each operator state of an operator is mapped to a dedicated table in the 
namespace with a single column that holds the state's data of all tasks.
+All keyed states of an operator are mapped to a single table consisting of a 
column for the key, and one column for each keyed state.
+The following figure shows how a savepoint of MyApp is mapped to a database.
 
-{% highlight sql %}
-CREATE NAMESPACE currency_converter;
- 
-CREATE TABLE currency_converter.rates (
-   value Tuple2<Integer, Double>
-);
- 
-CREATE NAMESPACE summarize;
- 
-CREATE TABLE summarize.keyed_state (
-   key   INTEGER PRIMARY KEY,
-   total DOUBLE,
-   count INTEGER
-);
-{% endhighlight %}
-
-In general, the savepoint ↔ database relationship can be summarized as:
+<p style="display: block; text-align: center; margin-top: 20px; margin-bottom: 
20px">
+       <img src="{{ site.baseurl 
}}/fig/database-my-app-state-processor-api.png" width="600px" alt="Database: 
MyApp"/>
+</p>
 
-    * A savepoint is a database
-    * An operator is a namespace named by its uid
-    * Each operator state represents a single table
-        * Each element in an operator state represents a single row in that 
table
-    * Each operator containing keyed state has a single “keyed_state” table
-        * Each keyed_state table has one key column mapping the key value of 
the operator
-        * Each registered state represents a single column in the table
-        * Each row in the table maps to a single key
+The figure shows how the values of Src's operator state are mapped to a table 
with one column and five rows, one row for each of the list entries across all 
parallel tasks of Src.
+Operator state os2 of the operator “Proc” is similarly mapped to an individual 
table.
+The keyed states ks1 and ks2 are combined to a single table with three 
columns, one for the key, one for ks1 and one for ks2.
+The keyed table holds one row for each distinct key of both keyed states.
+Since the operator “Snk” does not have any state, its namespace is empty.
 
 ## Reading State
 
-Reading state begins by specifiying the path to a valid savepoint or 
checkpoint along with the `StateBackend` that should be used to restore the 
data.
-The compatability guarantees for restoring state are identical to those when 
restoring a `DataStream` application.
+Reading state begins by specifying the path to a valid savepoint or checkpoint 
along with the `StateBackend` that should be used to restore the data.
+The compatibility guarantees for restoring state are identical to those when 
restoring a `DataStream` application.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
 ExecutionEnvironment bEnv   = ExecutionEnvironment.getExecutionEnvironment();
-ExistingSavepoint savepoint = Savepoint.load(bEnv, "hdfs://path/", new 
RocksDBStateBackend());
+ExistingSavepoint savepoint = Savepoint.load(bEnv, "hdfs://path/", new 
MemoryStateBackend());
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-val bEnv      = ExecutionEnvironment.getExecutionEnvironment()
-val savepoint = Savepoint.load(bEnv, "hdfs://path/", new RocksDBStateBackend())
+val bEnv      = ExecutionEnvironment.getExecutionEnvironment
+val savepoint = Savepoint.load(bEnv, "hdfs://path/", new MemoryStateBackend)
 {% endhighlight %}
 </div>
 </div>
 
-When reading operator state, simply specify the operator uid, state name, and 
type information.
+### Operator State
+
+[Operator state]({{ site.baseurl 
}}/dev/stream/state/state.html#operator-state) is any non-keyed state in Flink.
+This includes, but is not limited to, any use of `CheckpointedFunction` or 
`BroadcastState` within an application.
+When reading operator state, users specify the operator uid, the state name, 
and the type information.
+
+#### Operator List State
+
+Operator state stored in a `CheckpointedFunction` using `getListState` can be 
read using `ExistingSavepoint#readListState`.
+The state name and type information should match those used to define the 
`ListStateDescriptor` that declared this state in the DataStream application.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-DataSet<Integer> listState  = savepoint.readListState(
+DataSet<Integer> listState  = savepoint.readListState<>(
     "my-uid",
     "list-state",
     Types.INT);
-
-DataSet<Integer> unionState = savepoint.readUnionState(
-    "my-uid",
-    "union-state",
-    Types.INT);
- 
-DataSet<Tuple2<Integer, Integer>> broadcastState = 
savepoint.readBroadcastState(
-    "my-uid",
-    "broadcast-state",
-    Types.INT,
-    Types.INT);
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
@@ -191,12 +128,54 @@ val listState  = savepoint.readListState(
     "my-uid",
     "list-state",
     Types.INT)
+{% endhighlight %}
+</div>
+</div>
 
-val unionState = savepoint.readUnionState(
+#### Operator Union List State
+
+Operator state stored in a `CheckpointedFunction` using `getUnionListState` 
can be read using `ExistingSavepoint#readUnionState`.
+The state name and type information should match those used to define the 
`ListStateDescriptor` that declared this state in the DataStream application.
+The framework will return a _single_ copy of the state, equivalent to 
restoring a DataStream with parallelism 1.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataSet<Integer> listState  = savepoint.readUnionState<>(
+    "my-uid",
+    "union-state",
+    Types.INT);
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val listState  = savepoint.readUnionState(
     "my-uid",
     "union-state",
     Types.INT)
- 
+{% endhighlight %}
+</div>
+</div>
+
+
+#### Broadcast State
+
+[BroadcastState]({{ site.baseurl }} /dev/stream/state/broadcast_state.html) 
can be read using `ExistingSavepoint#readBroadcastState`.
+The state name and type information should match those used to define the 
`MapStateDescriptor` that declared this state in the DataStream application.
+The framework will return a _single_ copy of the state, equivalent to 
restoring a DataStream with parallelism 1.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataSet<Tuple2<Integer, Integer>> broadcastState = 
savepoint.readBroadcastState<>(
+    "my-uid",
+    "broadcast-state",
+    Types.INT,
+    Types.INT);
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
 val broadcastState = savepoint.readBroadcastState(
     "my-uid",
     "broadcast-state",
@@ -206,12 +185,14 @@ val broadcastState = savepoint.readBroadcastState(
 </div>
 </div>
 
-A custom `TypeSerializer` may also be specified if one was used in the 
`StateDescriptor` for the state.
+#### Using Custom Serializers
+
+Each of the operator state readers support using custom `TypeSerializers` if 
one was used to define the `StateDescriptor` that wrote out the state. 
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-DataSet<Integer> listState = savepoint.readListState(
+DataSet<Integer> listState = savepoint.readListState<>(
     "uid",
     "list-state", 
     Types.INT,
@@ -224,12 +205,17 @@ val listState = savepoint.readListState(
     "uid",
     "list-state", 
     Types.INT,
-    new MyCustomIntSerializer())
+    new MyCustomIntSerializer)
 {% endhighlight %}
 </div>
 </div>
 
-When reading keyed state, users specify a KeyedStateReaderFunction to allow 
reading arbitrary columns and complex state types such as ListState, MapState, 
and AggregatingState.
+### Keyed State
+
+[Keyed state]({{ site.baseurl }}/dev/stream/state/state.html#keyed-state), or 
partitioned state, is any state that is partitioned relative to a key.
+When reading a keyed state, users specify the operator id and a 
`KeyedStateReaderFunction<KeyType, OutputType>`.
+
+The `KeyedStateReaderFunction` allows users to read arbitrary columns and 
complex state types such as ListState, MapState, and AggregatingState.
 This means if an operator contains a stateful process function such as:
 
 <div class="codetabs" markdown="1">
@@ -239,55 +225,80 @@ public class StatefulFunctionWithTime extends 
KeyedProcessFunction<Integer, Inte
  
    ValueState<Integer> state;
  
+   ListState<Long> updateTimes;
+
    @Override
    public void open(Configuration parameters) {
       ValueStateDescriptor<Integer> stateDescriptor = new 
ValueStateDescriptor<>("state", Types.INT);
       state = getRuntimeContext().getState(stateDescriptor);
+
+      ListStateDescriptor<Long> updateDescriptor = new 
ListStateDescriptor<>("times", Types.LONG);
+      updateTimes = getRuntimeContext().getListState(updateDescriptor);
    }
  
    @Override
    public void processElement(Integer value, Context ctx, Collector<Void> out) 
throws Exception {
       state.update(value + 1);
+      updateTimes.add(System.currentTimeMillis());
    }
 }
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-public class StatefulFunctionWithTime extends KeyedProcessFunction[Integer, 
Integer, Void] {
+class StatefulFunctionWithTime extends KeyedProcessFunction[Integer, Integer, 
Void] {
  
-   var state: ValueState[Integer];
+   var state: ValueState[Integer] = _
  
-   override def open(parameters: Configuration) {
-      val stateDescriptor = new ValueStateDescriptor("state", Types.INT);
-      state = getRuntimeContext().getState(stateDescriptor);
+   var updateTimes: ListState[Long] = _ 
+
+   @throws[Exception]
+   override def open(parameters: Configuration): Unit {
+      val stateDescriptor = new ValueStateDescriptor("state", Types.INT)
+      state = getRuntimeContext().getState(stateDescriptor)
+
+      val updateDescirptor = new ListStateDescriptor("times", Types.LONG)
+      updateTimes = getRuntimeContext().getListState(updateDescriptor)
    }
  
-   override def processElement(value: Integer, ctx: Context, out: 
Collector[Void]) {
-      state.update(value + 1);
+   @throws[Exception]
+   override def processElement(value: Integer, ctx: Context, out: 
Collector[Void]): Unit = {
+      state.update(value + 1)
+      updateTimes.add(System.currentTimeMillis)
    }
 }
 {% endhighlight %}
 </div>
 </div>
 
-Then it can read by defining an output type and corresponding 
KeyedStateReaderFunction. 
+Then it can read by defining an output type and corresponding 
`KeyedStateReaderFunction`. 
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-class KeyedState {
-  Integer key;
-  Integer value;
+DataSet<KeyedState> keyedState = savepoint.readKeyedState("my-uid", new 
ReaderFunction());
+
+public class KeyedState {
+  public int key;
+
+  public int value;
+
+  public List<Long> times;
 }
  
-class ReaderFunction extends KeyedStateReaderFunction<Integer, KeyedState> {
+public class ReaderFunction extends KeyedStateReaderFunction<Integer, 
KeyedState> {
+
   ValueState<Integer> state;
  
+  ListState<Long> updateTimes;
+
   @Override
   public void open(Configuration parameters) {
-     ValueStateDescriptor<Integer> stateDescriptor = new 
ValueStateDescriptor<>("state", Types.INT);
-     state = getRuntimeContext().getState(stateDescriptor);
+    ValueStateDescriptor<Integer> stateDescriptor = new 
ValueStateDescriptor<>("state", Types.INT);
+    state = getRuntimeContext().getState(stateDescriptor);
+
+    ListStateDescriptor<Long> updateDescriptor = new 
ListStateDescriptor<>("times", Types.LONG);
+    updateTimes = getRuntimeContext().getListState(updateDescriptor);
   }
  
   @Override
@@ -295,58 +306,221 @@ class ReaderFunction extends 
KeyedStateReaderFunction<Integer, KeyedState> {
     Integer key,
     Context ctx,
     Collector<KeyedState> out) throws Exception {
- 
-     KeyedState data = new KeyedState();
-     data.key    = key;
-     data.value  = state.value();
-     out.collect(data);
+        
+    KeyedState data = new KeyedState();
+    data.key    = key;
+    data.value  = state.value();
+    data.times  = StreamSupport
+      .stream(updateTimes.get().spliterator(), false)
+      .collect(Collectors.toList());
+
+    out.collect(data);
   }
 }
- 
-DataSet<KeyedState> keyedState = savepoint.readKeyedState("my-uid", new 
ReaderFunction());
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-case class KeyedState(key: Int, value: Int)
+val keyedState = savepoint.readKeyedState("my-uid", new ReaderFunction)
+
+case class KeyedState(key: Int, value: Int, List[Long])
  
 class ReaderFunction extends KeyedStateReaderFunction[Integer, KeyedState] {
-  var state: ValueState[Integer];
  
-  override def open(parameters: Configuration) {
-     val stateDescriptor = new ValueStateDescriptor("state", Types.INT);
-     state = getRuntimeContext().getState(stateDescriptor);
-  }
+  var state: ValueState[Integer] = _
+
+  var updateTimes: ListState[Long] = _
  
+  @throws[Exception]
+  override def open(parameters: Configuration): Unit {
+     val stateDescriptor = new ValueStateDescriptor("state", Types.INT)
+     state = getRuntimeContext().getState(stateDescriptor)
+
+      val updateDescirptor = new ListStateDescriptor("times", Types.LONG)
+      updateTimes = getRuntimeContext().getListState(updateDescriptor)
+    }
+ 
+
+  @throws[Exception]
   override def processKey(
     key: Int,
     ctx: Context,
-    out: Collector[Keyedstate]) throws Exception {
+    out: Collector[Keyedstate]): Unit {
  
-     val data = KeyedState(key, state.value())
-     out.collect(data);
+     val data = KeyedState(key, state.value(), updateTimes.get.asScala.toList)
+     out.collect(data)
   }
 }
-
-val keyedState = savepoint.readKeyedState("my-uid", new ReaderFunction());
 {% endhighlight %}
 </div>
 </div>
 
-{% panel **Note:** When using a `KeyedStateReaderFunction` all state 
descriptors must be registered eagerly inside of open. Any attempt to call 
`RuntimeContext#getState`, `RuntimeContext#getListState`, or 
`RuntimeContext#getMapState` will result in a `RuntimeException`. %}
+Along with reading registered state values, each key has access to a `Context` 
with metadata such as registered event time and processing time timers.
+
+{% panel **Note:** When using a `KeyedStateReaderFunction`, all state 
descriptors must be registered eagerly inside of open. Any attempt to call a 
`RuntimeContext#get*State` will result in a `RuntimeException`. %}
 
 ## Writing New Savepoints
 
-State writers are based around the abstraction of `Savepoint`, where one 
`Savepoint` may have many operators and the state for any particular operator 
is created using a `BootstrapTransformation`.
+`Savepoint`'s may also be written, which allows such use cases as 
bootstrapping state based on historical data.
+Each savepoint is made up of one or more `BootstrapTransformation`'s 
(explained below), each of which defines the state for an individual operator.
 
-A `BootstrapTransformation` starts with a `DataSet` containing the values that 
are to be written into state.
-The transformation may be optionally `keyed` depending on whether or not you 
are writing keyed or operator state.
-Finally a bootstrap function is applied depending to the transformation; Flink 
supplies `KeyedStateBootstrapFunction` for writing keyed state, 
`StateBootstrapFunction` for writing non keyed state, and 
`BroadcastStateBootstrapFunction` for writing broadcast state.
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+int maxParallelism = 128;
+
+Savepoint
+    .create(new MemoryStateBackend(), maxParallelism)
+    .withOperator("uid1", transformation1)
+    .withOperator("uid2", transformation2)
+    .write(savepointPath);
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val maxParallelism = 128
+
+Savepoint
+    .create(new MemoryStateBackend(), maxParallelism)
+    .withOperator("uid1", transformation1)
+    .withOperator("uid2", transformation2)
+    .write(savepointPath)
+{% endhighlight %}
+</div>
+</div>
+
+The [UIDs]({{ site.baseurl}}/ops/state/savepoints.html#assigning-operator-ids) 
associated with each operator must match one to one with the UIDs assigned to 
the operators in your `DataStream` application; these are how Flink knows what 
state maps to which operator.
+
+### Operator State
+
+Simple operator state, using `CheckpointedFunction`, can be created using the 
`StateBootstrapFunction`. 
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public class SimpleBootstrapFunction extends StateBootstrapFunction<Integer> {
+
+    private ListState<Integer> state;
+
+    @Override
+    public void processElement(Integer value, Context ctx) throws Exception {
+        state.add(value);
+    }
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
+    }
+       
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws 
Exception {
+        state = context.getOperatorState().getListState(new 
ListStateDescriptor<>("state", Types.INT));
+    }
+}
+
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnviornment();
+DataSet<Integer> data = env.fromElements(1, 2, 3);
+
+BootstrapTransformation transformation = OperatorTransformation
+    .bootstrapWith(data)
+    .transform(new SimpleBootstrapFunction());
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+class SimpleBootstrapFunction extends StateBootstrapFunction[Integer] {
+
+    var ListState[Integer] state = _
+
+    @throws[Exception]
+    override def processElement(value: Integer, ctx: Context): Unit = {
+        state.add(value)
+    }
+
+    @throws[Exception]
+    override def snapshotState(context: FunctionSnapshotContext): Unit = {
+    }
+       
+    @throws[Exception]
+    override def initializeState(context: FunctionInitializationContext): Unit 
= {
+        state = context.getOperatorState().getListState(new 
ListStateDescriptor("state", Types.INT))
+    }
+}
+
+val env = ExecutionEnvironment.getExecutionEnviornment
+val data = env.fromElements(1, 2, 3)
+
+BootstrapTransformation transformation = OperatorTransformation
+    .bootstrapWith(data)
+    .transform(new SimpleBootstrapFunction)
+{% endhighlight %}
+</div>
+</div>
+
+### Broadcast State
+
+[BroadcastState]({{ site.baseurl }} /dev/stream/state/broadcast_state.html) 
can be written using a `BroadcastStateBootstrapFunction`. Similar to broadcast 
state in the `DataStream` API, the full state must fit in memory. 
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-public  class Account {
+public class CurrencyRate {
+    public String currency;
+
+    public Double rate;
+}
+
+public class CurrencyBootstrapFunction extends 
BroadcastStateBootstrapFunction<CurrencyRate> {
+
+    public static final MapStateDescriptor<String, Double> descriptor = 
+        new MapStateDescriptor<>("currency-rates", Types.STRING, Types.DOUBLE);
+
+    @Override
+    public void processElement(CurrencyRate value, Context ctx) throws 
Exception {
+        ctx.getBroadcastState(descriptor).put(value.currency, value.rate);
+    }
+}
+
+DataSet<CurrencyRate> currencyDataSet = bEnv.fromCollection(
+    new CurrencyRate("USD", 1.0), new CurrencyRate("EUR", 1.3));
+
+BootstrapTransformation<CurrencyRate> broadcastTransformation = 
OperatorTransformation
+    .bootstrapWith(currencyDataSet)
+    .transform(new CurrencyBootstrapFunction());
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+case class CurrencyRate(currency: String, rate: Double)
+
+object CurrencyBootstrapFunction {
+    val descriptor = new MapStateDescriptor("currency-rates", Types.STRING, 
Types.DOUBLE)
+}
+
+class CurrencyBootstrapFunction extends 
BroadcastStateBootstrapFunction[CurrencyRate] {
+
+    @throws[Exception]
+    override processElement(value: CurrencyRate, ctx: Context): Unit = {
+        ctx.getBroadcastState(descriptor).put(value.currency, value.rate)
+    }
+}
+
+val currencyDataSet = bEnv.fromCollection(CurrencyRate("USD", 1.0), 
CurrencyRate("EUR", 1.3))
+
+val broadcastTransformation = OperatorTransformation
+    .bootstrapWith(currencyDataSet)
+    .transform(new CurrencyBootstrapFunction)
+{% endhighlight %}
+</div>
+</div>
+
+### Keyed State
+
+Keyed state for `ProcessFunction`'s and other `RichFunction` types can be 
written using a `KeyedStateBootstrapFunction`.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public class Account {
     public int id;
 
     public double amount;      
@@ -382,12 +556,13 @@ BootstrapTransformation<Account> transformation = 
OperatorTransformation
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 case class Account(id: Int, amount: Double, timestamp: Long)
- 
+
 class AccountBootstrapper extends KeyedStateBootstrapFunction[Integer, 
Account] {
     var state: ValueState[Double]
 
+    @throws[Exception]
     override def open(parameters: Configuration): Unit = {
-        val descriptor = new ValueStateDescriptor[Double]("total",Types.DOUBLE)
+        val descriptor = new ValueStateDescriptor("total",Types.DOUBLE)
         state = getRuntimeContext().getState(descriptor)
     }
 
@@ -403,47 +578,27 @@ val accountDataSet = bEnv.fromCollection(accounts)
 
 val transformation = OperatorTransformation
     .bootstrapWith(accountDataSet)
-    .keyBy(acc -> acc.id)
-    .transform(new AccountBootstrapper())
+    .keyBy(acc => acc.id)
+    .transform(new AccountBootstrapper)
 {% endhighlight %}
 </div>
 </div>
 
 The `KeyedStateBootstrapFunction` supports setting event time and processing 
time timers.
 The timers will not fire inside the bootstrap function and only become active 
once restored within a `DataStream` application.
-If a processing time timer is set but the state is not restored until after 
that time has passed, the timer will fire immediatly upon start.
+If a processing time timer is set but the state is not restored until after 
that time has passed, the timer will fire immediately upon start.
 
-Once one or more transformations have been created they may be combined into a 
single `Savepoint`. 
-`Savepoint`'s are created using a state backend and max parallelism, they may 
contain any number of operators. 
+<span class="label label-danger">Attention</span> If your bootstrap function 
creates timers, the state can only be restored using one of the [process]({{ 
site.baseurl }}/dev/stream/operators/process_function.html) type functions.
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-Savepoint
-    .create(backend, 128)
-    .withOperator("uid1", transformation1)
-    .withOperator("uid2", transformation2)
-    .write(savepointPath);
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-Savepoint
-    .create(backend, 128)
-    .withOperator("uid1", transformation1)
-    .withOperator("uid2", transformation2)
-    .write(savepointPath)
-{% endhighlight %}
-</div>
-</div>
-               
-Besides creating a savepoint from scratch, you can base on off an existing 
savepoint such as when bootstrapping a single new operator for an existing job.
+## Modifying Savepoints
+
+Besides creating a savepoint from scratch, you can base one off an existing 
savepoint such as when bootstrapping a single new operator for an existing job.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
 Savepoint
-    .load(backend, oldPath)
+    .load(bEnv, new MemoryStateBackend(), oldPath)
     .withOperator("uid", transformation)
     .write(newPath);
 {% endhighlight %}
@@ -451,7 +606,7 @@ Savepoint
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 Savepoint
-    .load(backend, oldPath)
+    .load(bEnv, new MemoryStateBackend, oldPath)
     .withOperator("uid", transformation)
     .write(newPath)
 {% endhighlight %}
diff --git a/docs/fig/application-my-app-state-processor-api.png 
b/docs/fig/application-my-app-state-processor-api.png
new file mode 100644
index 0000000..ca6ef6c
Binary files /dev/null and 
b/docs/fig/application-my-app-state-processor-api.png differ
diff --git a/docs/fig/database-my-app-state-processor-api.png 
b/docs/fig/database-my-app-state-processor-api.png
new file mode 100644
index 0000000..31642fb
Binary files /dev/null and b/docs/fig/database-my-app-state-processor-api.png 
differ

Reply via email to