This is an automated email from the ASF dual-hosted git repository. nkruber pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.10 by this push: new b11b010 [FLINK-14825][state-processor-api][docs] Rework state processor api documentation b11b010 is described below commit b11b010aaacfc6e65d5c703d22e39e642121ce38 Author: Seth Wiesman <sjwies...@gmail.com> AuthorDate: Mon Dec 9 15:48:02 2019 -0600 [FLINK-14825][state-processor-api][docs] Rework state processor api documentation --- docs/dev/libs/state_processor_api.md | 557 +++++++++++++-------- docs/dev/libs/state_processor_api.zh.md | 557 +++++++++++++-------- .../fig/application-my-app-state-processor-api.png | Bin 0 -> 49938 bytes docs/fig/database-my-app-state-processor-api.png | Bin 0 -> 50174 bytes 4 files changed, 712 insertions(+), 402 deletions(-) diff --git a/docs/dev/libs/state_processor_api.md b/docs/dev/libs/state_processor_api.md index acde295..df5ecdc 100644 --- a/docs/dev/libs/state_processor_api.md +++ b/docs/dev/libs/state_processor_api.md @@ -23,166 +23,103 @@ specific language governing permissions and limitations under the License. --> -Apache Flink's State Processor API provides powerful functionality to reading, writing, and modifing savepoints and checkpoints using Flink’s batch DataSet api. -This is useful for tasks such as analyzing state for interesting patterns, troubleshooting or auditing jobs by checking for discrepancies, and bootstrapping state for new applications. +Apache Flink's State Processor API provides powerful functionality to reading, writing, and modifing savepoints and checkpoints using Flink’s batch DataSet API. +Due to the [interoperability of DataSet and Table API](https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#integration-with-datastream-and-dataset-api), you can even use relational Table API or SQL queries to analyze and process state data. + +For example, you can take a savepoint of a running stream processing application and analyze it with a DataSet batch program to verify that the application behaves correctly. +Or you can read a batch of data from any store, preprocess it, and write the result to a savepoint that you use to bootstrap the state of a streaming application. +It is also possible to fix inconsistent state entries. +Finally, the State Processor API opens up many ways to evolve a stateful application that was previously blocked by parameter and design choices that could not be changed without losing all the state of the application after it was started. +For example, you can now arbitrarily modify the data types of states, adjust the maximum parallelism of operators, split or merge operator state, re-assign operator UIDs, and so on. + +To get started with the state processor api, include the following library in your application. + +{% highlight xml %} +<dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-state-processor-api{{ site.scala_version_suffix }}</artifactId> + <version>{{site.version}}</version> + <scope>provided</scope> +</dependency> +{% endhighlight %} * This will be replaced by the TOC {:toc} -## Abstraction +## Mapping Application State to DataSets -To understand how to best interact with savepoints in a batch context it is important to have a clear mental model of how the data in Flink state relates to a traditional relational database. +The State Processor API maps the state of a streaming application to one or more data sets that can be processed separately. +In order to be able to use the API, you need to understand how this mapping works. -A database can be thought of as one or more namespaces, each containing a collection of tables. -Those tables in turn contain columns whose values have some intrinsic relationship between them, such as being scoped under the same key. +But let us first have a look at what a stateful Flink job looks like. +A Flink job is composed of operators; typically one or more source operators, a few operators for the actual processing, and one or more sink operators. +Each operator runs in parallel in one or more tasks and can work with different types of state. +An operator can have zero, one, or more *“operator states”* which are organized as lists that are scoped to the operator's tasks. +If the operator is applied on a keyed stream, it can also have zero, one, or more *“keyed states”* which are scoped to a key that is extracted from each processed record. +You can think of keyed state as a distributed key-value map. -A savepoint represents the state of a Flink job at a particular point in time which is made up of many operators. -Those operators contain various kinds of state, both partitioned or keyed state, and non-partitioned or operator state. +The following figure shows the application “MyApp” which consists of three operators called “Src”, “Proc”, and “Snk”. +Src has one operator state (os1), Proc has one operator state (os2) and two keyed states (ks1, ks2) and Snk is stateless. -<div data-lang="java" markdown="1"> -{% highlight java %} -MapStateDescriptor<Integer, Double> CURRENCY_RATES = new MapStateDescriptor<>("rates", Types.INT, Types.DOUBLE); - -class CurrencyConverter extends BroadcastProcessFunction<Transaction, CurrencyRate, Transaction> { - - public void processElement( - Transaction value, - ReadOnlyContext ctx, - Collector<Transaction> out) throws Exception { - - Double rate = ctx.getBroadcastState(CURRENCY_RATES).get(value.currencyId); - if (rate != null) { - value.amount *= rate; - } - out.collect(value); - } - - public void processBroadcastElement( - CurrencyRate value, - Context ctx, - Collector<Transaction> out) throws Exception { - ctx.getBroadcastState(CURRENCY_RATES).put(value.currencyId, value.rate); - } -} - -class Summarize extends RichFlatMapFunction<Transaction, Summary> { - transient ValueState<Double> totalState; - transient ValueState<Integer> countState; - - public void open(Configuration configuration) throws Exception { - totalState = getRuntimeContext().getState(new ValueStateDescriptor<>("total", Types.DOUBLE)); - countState = getRuntimeContext().getState(new ValueStateDescriptor<>("count", Types.INT)); - } - - public void flatMap(Transaction value, Collector<Summary> out) throws Exception { - Summary summary = new Summary(); - summary.total = value.amount; - summary.count = 1; - - Double currentTotal = totalState.value(); - if (currentTotal != null) { - summary.total += currentTotal; - } - - Integer currentCount = countState.value(); - if (currentCount != null) { - summary.count += currentCount; - } - countState.update(summary.count); - - out.collect(summary); - } -} - -DataStream<Transaction> transactions = . . . -BroadcastStream<CurrencyRate> rates = . . . -transactions - .connect(rates) - .process(new CurrencyConverter()) - .uid("currency_converter") - .keyBy(transaction -> transaction.accountId) - .flatMap(new Summarize()) - .uid("summarize") -{% endhighlight %} -</div> +<p style="display: block; text-align: center; margin-top: 20px; margin-bottom: 20px"> + <img src="{{ site.baseurl }}/fig/application-my-app-state-processor-api.png" width="600px" alt="Application: MyApp"/> +</p> -This job contains multiple operators along with various kinds of state. -When analyzing that state we can first scope data by its operator, named by setting its uid. -Within each operator we can look at the registered states. -`CurrencyConverter` has a broadcast state, which is a type of non-partitioned operator state. -In general, there is no relationship between any two elements in an operator state and so we can look at each value as being its own row. -Contrast this with Summarize, which contains two keyed states. -Because both states are scoped under the same key we can safely assume there exists some relationship between the two values. -Therefore, keyed state is best understood as a single table per operator containing one _key_ column along with _n_ value columns, one for each registered state. -All of this means that the state for this job could be described using the following pseudo-sql commands. +A savepoint or checkpoint of MyApp consists of the data of all states, organized in a way that the states of each task can be restored. +When processing the data of a savepoint (or checkpoint) with a batch job, we need a mental model that maps the data of the individual tasks' states into data sets or tables. +In fact, we can think of a savepoint as a database. Every operator (identified by its UID) represents a namespace. +Each operator state of an operator is mapped to a dedicated table in the namespace with a single column that holds the state's data of all tasks. +All keyed states of an operator are mapped to a single table consisting of a column for the key, and one column for each keyed state. +The following figure shows how a savepoint of MyApp is mapped to a database. -{% highlight sql %} -CREATE NAMESPACE currency_converter; - -CREATE TABLE currency_converter.rates ( - value Tuple2<Integer, Double> -); - -CREATE NAMESPACE summarize; - -CREATE TABLE summarize.keyed_state ( - key INTEGER PRIMARY KEY, - total DOUBLE, - count INTEGER -); -{% endhighlight %} - -In general, the savepoint ↔ database relationship can be summarized as: +<p style="display: block; text-align: center; margin-top: 20px; margin-bottom: 20px"> + <img src="{{ site.baseurl }}/fig/database-my-app-state-processor-api.png" width="600px" alt="Database: MyApp"/> +</p> - * A savepoint is a database - * An operator is a namespace named by its uid - * Each operator state represents a single table - * Each element in an operator state represents a single row in that table - * Each operator containing keyed state has a single “keyed_state” table - * Each keyed_state table has one key column mapping the key value of the operator - * Each registered state represents a single column in the table - * Each row in the table maps to a single key +The figure shows how the values of Src's operator state are mapped to a table with one column and five rows, one row for each of the list entries across all parallel tasks of Src. +Operator state os2 of the operator “Proc” is similarly mapped to an individual table. +The keyed states ks1 and ks2 are combined to a single table with three columns, one for the key, one for ks1 and one for ks2. +The keyed table holds one row for each distinct key of both keyed states. +Since the operator “Snk” does not have any state, its namespace is empty. ## Reading State -Reading state begins by specifiying the path to a valid savepoint or checkpoint along with the `StateBackend` that should be used to restore the data. -The compatability guarantees for restoring state are identical to those when restoring a `DataStream` application. +Reading state begins by specifying the path to a valid savepoint or checkpoint along with the `StateBackend` that should be used to restore the data. +The compatibility guarantees for restoring state are identical to those when restoring a `DataStream` application. <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> {% highlight java %} ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment(); -ExistingSavepoint savepoint = Savepoint.load(bEnv, "hdfs://path/", new RocksDBStateBackend()); +ExistingSavepoint savepoint = Savepoint.load(bEnv, "hdfs://path/", new MemoryStateBackend()); {% endhighlight %} </div> <div data-lang="scala" markdown="1"> {% highlight scala %} -val bEnv = ExecutionEnvironment.getExecutionEnvironment() -val savepoint = Savepoint.load(bEnv, "hdfs://path/", new RocksDBStateBackend()) +val bEnv = ExecutionEnvironment.getExecutionEnvironment +val savepoint = Savepoint.load(bEnv, "hdfs://path/", new MemoryStateBackend) {% endhighlight %} </div> </div> -When reading operator state, simply specify the operator uid, state name, and type information. +### Operator State + +[Operator state]({{ site.baseurl }}/dev/stream/state/state.html#operator-state) is any non-keyed state in Flink. +This includes, but is not limited to, any use of `CheckpointedFunction` or `BroadcastState` within an application. +When reading operator state, users specify the operator uid, the state name, and the type information. + +#### Operator List State + +Operator state stored in a `CheckpointedFunction` using `getListState` can be read using `ExistingSavepoint#readListState`. +The state name and type information should match those used to define the `ListStateDescriptor` that declared this state in the DataStream application. <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> {% highlight java %} -DataSet<Integer> listState = savepoint.readListState( +DataSet<Integer> listState = savepoint.readListState<>( "my-uid", "list-state", Types.INT); - -DataSet<Integer> unionState = savepoint.readUnionState( - "my-uid", - "union-state", - Types.INT); - -DataSet<Tuple2<Integer, Integer>> broadcastState = savepoint.readBroadcastState( - "my-uid", - "broadcast-state", - Types.INT, - Types.INT); {% endhighlight %} </div> <div data-lang="scala" markdown="1"> @@ -191,12 +128,54 @@ val listState = savepoint.readListState( "my-uid", "list-state", Types.INT) +{% endhighlight %} +</div> +</div> -val unionState = savepoint.readUnionState( +#### Operator Union List State + +Operator state stored in a `CheckpointedFunction` using `getUnionListState` can be read using `ExistingSavepoint#readUnionState`. +The state name and type information should match those used to define the `ListStateDescriptor` that declared this state in the DataStream application. +The framework will return a _single_ copy of the state, equivalent to restoring a DataStream with parallelism 1. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +DataSet<Integer> listState = savepoint.readUnionState<>( + "my-uid", + "union-state", + Types.INT); +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val listState = savepoint.readUnionState( "my-uid", "union-state", Types.INT) - +{% endhighlight %} +</div> +</div> + + +#### Broadcast State + +[BroadcastState]({{ site.baseurl }} /dev/stream/state/broadcast_state.html) can be read using `ExistingSavepoint#readBroadcastState`. +The state name and type information should match those used to define the `MapStateDescriptor` that declared this state in the DataStream application. +The framework will return a _single_ copy of the state, equivalent to restoring a DataStream with parallelism 1. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +DataSet<Tuple2<Integer, Integer>> broadcastState = savepoint.readBroadcastState<>( + "my-uid", + "broadcast-state", + Types.INT, + Types.INT); +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} val broadcastState = savepoint.readBroadcastState( "my-uid", "broadcast-state", @@ -206,12 +185,14 @@ val broadcastState = savepoint.readBroadcastState( </div> </div> -A custom `TypeSerializer` may also be specified if one was used in the `StateDescriptor` for the state. +#### Using Custom Serializers + +Each of the operator state readers support using custom `TypeSerializers` if one was used to define the `StateDescriptor` that wrote out the state. <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> {% highlight java %} -DataSet<Integer> listState = savepoint.readListState( +DataSet<Integer> listState = savepoint.readListState<>( "uid", "list-state", Types.INT, @@ -224,12 +205,17 @@ val listState = savepoint.readListState( "uid", "list-state", Types.INT, - new MyCustomIntSerializer()) + new MyCustomIntSerializer) {% endhighlight %} </div> </div> -When reading keyed state, users specify a KeyedStateReaderFunction to allow reading arbitrary columns and complex state types such as ListState, MapState, and AggregatingState. +### Keyed State + +[Keyed state]({{ site.baseurl }}/dev/stream/state/state.html#keyed-state), or partitioned state, is any state that is partitioned relative to a key. +When reading a keyed state, users specify the operator id and a `KeyedStateReaderFunction<KeyType, OutputType>`. + +The `KeyedStateReaderFunction` allows users to read arbitrary columns and complex state types such as ListState, MapState, and AggregatingState. This means if an operator contains a stateful process function such as: <div class="codetabs" markdown="1"> @@ -239,55 +225,80 @@ public class StatefulFunctionWithTime extends KeyedProcessFunction<Integer, Inte ValueState<Integer> state; + ListState<Long> updateTimes; + @Override public void open(Configuration parameters) { ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("state", Types.INT); state = getRuntimeContext().getState(stateDescriptor); + + ListStateDescriptor<Long> updateDescriptor = new ListStateDescriptor<>("times", Types.LONG); + updateTimes = getRuntimeContext().getListState(updateDescriptor); } @Override public void processElement(Integer value, Context ctx, Collector<Void> out) throws Exception { state.update(value + 1); + updateTimes.add(System.currentTimeMillis()); } } {% endhighlight %} </div> <div data-lang="scala" markdown="1"> {% highlight scala %} -public class StatefulFunctionWithTime extends KeyedProcessFunction[Integer, Integer, Void] { +class StatefulFunctionWithTime extends KeyedProcessFunction[Integer, Integer, Void] { - var state: ValueState[Integer]; + var state: ValueState[Integer] = _ - override def open(parameters: Configuration) { - val stateDescriptor = new ValueStateDescriptor("state", Types.INT); - state = getRuntimeContext().getState(stateDescriptor); + var updateTimes: ListState[Long] = _ + + @throws[Exception] + override def open(parameters: Configuration): Unit { + val stateDescriptor = new ValueStateDescriptor("state", Types.INT) + state = getRuntimeContext().getState(stateDescriptor) + + val updateDescirptor = new ListStateDescriptor("times", Types.LONG) + updateTimes = getRuntimeContext().getListState(updateDescriptor) } - override def processElement(value: Integer, ctx: Context, out: Collector[Void]) { - state.update(value + 1); + @throws[Exception] + override def processElement(value: Integer, ctx: Context, out: Collector[Void]): Unit = { + state.update(value + 1) + updateTimes.add(System.currentTimeMillis) } } {% endhighlight %} </div> </div> -Then it can read by defining an output type and corresponding KeyedStateReaderFunction. +Then it can read by defining an output type and corresponding `KeyedStateReaderFunction`. <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> {% highlight java %} -class KeyedState { - Integer key; - Integer value; +DataSet<KeyedState> keyedState = savepoint.readKeyedState("my-uid", new ReaderFunction()); + +public class KeyedState { + public int key; + + public int value; + + public List<Long> times; } -class ReaderFunction extends KeyedStateReaderFunction<Integer, KeyedState> { +public class ReaderFunction extends KeyedStateReaderFunction<Integer, KeyedState> { + ValueState<Integer> state; + ListState<Long> updateTimes; + @Override public void open(Configuration parameters) { - ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("state", Types.INT); - state = getRuntimeContext().getState(stateDescriptor); + ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("state", Types.INT); + state = getRuntimeContext().getState(stateDescriptor); + + ListStateDescriptor<Long> updateDescriptor = new ListStateDescriptor<>("times", Types.LONG); + updateTimes = getRuntimeContext().getListState(updateDescriptor); } @Override @@ -295,58 +306,221 @@ class ReaderFunction extends KeyedStateReaderFunction<Integer, KeyedState> { Integer key, Context ctx, Collector<KeyedState> out) throws Exception { - - KeyedState data = new KeyedState(); - data.key = key; - data.value = state.value(); - out.collect(data); + + KeyedState data = new KeyedState(); + data.key = key; + data.value = state.value(); + data.times = StreamSupport + .stream(updateTimes.get().spliterator(), false) + .collect(Collectors.toList()); + + out.collect(data); } } - -DataSet<KeyedState> keyedState = savepoint.readKeyedState("my-uid", new ReaderFunction()); {% endhighlight %} </div> <div data-lang="scala" markdown="1"> {% highlight scala %} -case class KeyedState(key: Int, value: Int) +val keyedState = savepoint.readKeyedState("my-uid", new ReaderFunction) + +case class KeyedState(key: Int, value: Int, List[Long]) class ReaderFunction extends KeyedStateReaderFunction[Integer, KeyedState] { - var state: ValueState[Integer]; - override def open(parameters: Configuration) { - val stateDescriptor = new ValueStateDescriptor("state", Types.INT); - state = getRuntimeContext().getState(stateDescriptor); - } + var state: ValueState[Integer] = _ + + var updateTimes: ListState[Long] = _ + @throws[Exception] + override def open(parameters: Configuration): Unit { + val stateDescriptor = new ValueStateDescriptor("state", Types.INT) + state = getRuntimeContext().getState(stateDescriptor) + + val updateDescirptor = new ListStateDescriptor("times", Types.LONG) + updateTimes = getRuntimeContext().getListState(updateDescriptor) + } + + + @throws[Exception] override def processKey( key: Int, ctx: Context, - out: Collector[Keyedstate]) throws Exception { + out: Collector[Keyedstate]): Unit { - val data = KeyedState(key, state.value()) - out.collect(data); + val data = KeyedState(key, state.value(), updateTimes.get.asScala.toList) + out.collect(data) } } - -val keyedState = savepoint.readKeyedState("my-uid", new ReaderFunction()); {% endhighlight %} </div> </div> -{% panel **Note:** When using a `KeyedStateReaderFunction` all state descriptors must be registered eagerly inside of open. Any attempt to call `RuntimeContext#getState`, `RuntimeContext#getListState`, or `RuntimeContext#getMapState` will result in a `RuntimeException`. %} +Along with reading registered state values, each key has access to a `Context` with metadata such as registered event time and processing time timers. + +{% panel **Note:** When using a `KeyedStateReaderFunction`, all state descriptors must be registered eagerly inside of open. Any attempt to call a `RuntimeContext#get*State` will result in a `RuntimeException`. %} ## Writing New Savepoints -State writers are based around the abstraction of `Savepoint`, where one `Savepoint` may have many operators and the state for any particular operator is created using a `BootstrapTransformation`. +`Savepoint`'s may also be written, which allows such use cases as bootstrapping state based on historical data. +Each savepoint is made up of one or more `BootstrapTransformation`'s (explained below), each of which defines the state for an individual operator. -A `BootstrapTransformation` starts with a `DataSet` containing the values that are to be written into state. -The transformation may be optionally `keyed` depending on whether or not you are writing keyed or operator state. -Finally a bootstrap function is applied depending to the transformation; Flink supplies `KeyedStateBootstrapFunction` for writing keyed state, `StateBootstrapFunction` for writing non keyed state, and `BroadcastStateBootstrapFunction` for writing broadcast state. +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +int maxParallelism = 128; + +Savepoint + .create(new MemoryStateBackend(), maxParallelism) + .withOperator("uid1", transformation1) + .withOperator("uid2", transformation2) + .write(savepointPath); +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val maxParallelism = 128 + +Savepoint + .create(new MemoryStateBackend(), maxParallelism) + .withOperator("uid1", transformation1) + .withOperator("uid2", transformation2) + .write(savepointPath) +{% endhighlight %} +</div> +</div> + +The [UIDs]({{ site.baseurl}}/ops/state/savepoints.html#assigning-operator-ids) associated with each operator must match one to one with the UIDs assigned to the operators in your `DataStream` application; these are how Flink knows what state maps to which operator. + +### Operator State + +Simple operator state, using `CheckpointedFunction`, can be created using the `StateBootstrapFunction`. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +public class SimpleBootstrapFunction extends StateBootstrapFunction<Integer> { + + private ListState<Integer> state; + + @Override + public void processElement(Integer value, Context ctx) throws Exception { + state.add(value); + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + state = context.getOperatorState().getListState(new ListStateDescriptor<>("state", Types.INT)); + } +} + +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnviornment(); +DataSet<Integer> data = env.fromElements(1, 2, 3); + +BootstrapTransformation transformation = OperatorTransformation + .bootstrapWith(data) + .transform(new SimpleBootstrapFunction()); +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +class SimpleBootstrapFunction extends StateBootstrapFunction[Integer] { + + var ListState[Integer] state = _ + + @throws[Exception] + override def processElement(value: Integer, ctx: Context): Unit = { + state.add(value) + } + + @throws[Exception] + override def snapshotState(context: FunctionSnapshotContext): Unit = { + } + + @throws[Exception] + override def initializeState(context: FunctionInitializationContext): Unit = { + state = context.getOperatorState().getListState(new ListStateDescriptor("state", Types.INT)) + } +} + +val env = ExecutionEnvironment.getExecutionEnviornment +val data = env.fromElements(1, 2, 3) + +BootstrapTransformation transformation = OperatorTransformation + .bootstrapWith(data) + .transform(new SimpleBootstrapFunction) +{% endhighlight %} +</div> +</div> + +### Broadcast State + +[BroadcastState]({{ site.baseurl }} /dev/stream/state/broadcast_state.html) can be written using a `BroadcastStateBootstrapFunction`. Similar to broadcast state in the `DataStream` API, the full state must fit in memory. <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> {% highlight java %} -public class Account { +public class CurrencyRate { + public String currency; + + public Double rate; +} + +public class CurrencyBootstrapFunction extends BroadcastStateBootstrapFunction<CurrencyRate> { + + public static final MapStateDescriptor<String, Double> descriptor = + new MapStateDescriptor<>("currency-rates", Types.STRING, Types.DOUBLE); + + @Override + public void processElement(CurrencyRate value, Context ctx) throws Exception { + ctx.getBroadcastState(descriptor).put(value.currency, value.rate); + } +} + +DataSet<CurrencyRate> currencyDataSet = bEnv.fromCollection( + new CurrencyRate("USD", 1.0), new CurrencyRate("EUR", 1.3)); + +BootstrapTransformation<CurrencyRate> broadcastTransformation = OperatorTransformation + .bootstrapWith(currencyDataSet) + .transform(new CurrencyBootstrapFunction()); +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +case class CurrencyRate(currency: String, rate: Double) + +object CurrencyBootstrapFunction { + val descriptor = new MapStateDescriptor("currency-rates", Types.STRING, Types.DOUBLE) +} + +class CurrencyBootstrapFunction extends BroadcastStateBootstrapFunction[CurrencyRate] { + + @throws[Exception] + override processElement(value: CurrencyRate, ctx: Context): Unit = { + ctx.getBroadcastState(descriptor).put(value.currency, value.rate) + } +} + +val currencyDataSet = bEnv.fromCollection(CurrencyRate("USD", 1.0), CurrencyRate("EUR", 1.3)) + +val broadcastTransformation = OperatorTransformation + .bootstrapWith(currencyDataSet) + .transform(new CurrencyBootstrapFunction) +{% endhighlight %} +</div> +</div> + +### Keyed State + +Keyed state for `ProcessFunction`'s and other `RichFunction` types can be written using a `KeyedStateBootstrapFunction`. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +public class Account { public int id; public double amount; @@ -382,12 +556,13 @@ BootstrapTransformation<Account> transformation = OperatorTransformation <div data-lang="scala" markdown="1"> {% highlight scala %} case class Account(id: Int, amount: Double, timestamp: Long) - + class AccountBootstrapper extends KeyedStateBootstrapFunction[Integer, Account] { var state: ValueState[Double] + @throws[Exception] override def open(parameters: Configuration): Unit = { - val descriptor = new ValueStateDescriptor[Double]("total",Types.DOUBLE) + val descriptor = new ValueStateDescriptor("total",Types.DOUBLE) state = getRuntimeContext().getState(descriptor) } @@ -403,47 +578,27 @@ val accountDataSet = bEnv.fromCollection(accounts) val transformation = OperatorTransformation .bootstrapWith(accountDataSet) - .keyBy(acc -> acc.id) - .transform(new AccountBootstrapper()) + .keyBy(acc => acc.id) + .transform(new AccountBootstrapper) {% endhighlight %} </div> </div> The `KeyedStateBootstrapFunction` supports setting event time and processing time timers. The timers will not fire inside the bootstrap function and only become active once restored within a `DataStream` application. -If a processing time timer is set but the state is not restored until after that time has passed, the timer will fire immediatly upon start. +If a processing time timer is set but the state is not restored until after that time has passed, the timer will fire immediately upon start. -Once one or more transformations have been created they may be combined into a single `Savepoint`. -`Savepoint`'s are created using a state backend and max parallelism, they may contain any number of operators. +<span class="label label-danger">Attention</span> If your bootstrap function creates timers, the state can only be restored using one of the [process]({{ site.baseurl }}/dev/stream/operators/process_function.html) type functions. -<div class="codetabs" markdown="1"> -<div data-lang="java" markdown="1"> -{% highlight java %} -Savepoint - .create(backend, 128) - .withOperator("uid1", transformation1) - .withOperator("uid2", transformation2) - .write(savepointPath); -{% endhighlight %} -</div> -<div data-lang="scala" markdown="1"> -{% highlight scala %} -Savepoint - .create(backend, 128) - .withOperator("uid1", transformation1) - .withOperator("uid2", transformation2) - .write(savepointPath) -{% endhighlight %} -</div> -</div> - -Besides creating a savepoint from scratch, you can base on off an existing savepoint such as when bootstrapping a single new operator for an existing job. +## Modifying Savepoints + +Besides creating a savepoint from scratch, you can base one off an existing savepoint such as when bootstrapping a single new operator for an existing job. <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> {% highlight java %} Savepoint - .load(backend, oldPath) + .load(bEnv, new MemoryStateBackend(), oldPath) .withOperator("uid", transformation) .write(newPath); {% endhighlight %} @@ -451,7 +606,7 @@ Savepoint <div data-lang="scala" markdown="1"> {% highlight scala %} Savepoint - .load(backend, oldPath) + .load(bEnv, new MemoryStateBackend, oldPath) .withOperator("uid", transformation) .write(newPath) {% endhighlight %} diff --git a/docs/dev/libs/state_processor_api.zh.md b/docs/dev/libs/state_processor_api.zh.md index acde295..df5ecdc 100644 --- a/docs/dev/libs/state_processor_api.zh.md +++ b/docs/dev/libs/state_processor_api.zh.md @@ -23,166 +23,103 @@ specific language governing permissions and limitations under the License. --> -Apache Flink's State Processor API provides powerful functionality to reading, writing, and modifing savepoints and checkpoints using Flink’s batch DataSet api. -This is useful for tasks such as analyzing state for interesting patterns, troubleshooting or auditing jobs by checking for discrepancies, and bootstrapping state for new applications. +Apache Flink's State Processor API provides powerful functionality to reading, writing, and modifing savepoints and checkpoints using Flink’s batch DataSet API. +Due to the [interoperability of DataSet and Table API](https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#integration-with-datastream-and-dataset-api), you can even use relational Table API or SQL queries to analyze and process state data. + +For example, you can take a savepoint of a running stream processing application and analyze it with a DataSet batch program to verify that the application behaves correctly. +Or you can read a batch of data from any store, preprocess it, and write the result to a savepoint that you use to bootstrap the state of a streaming application. +It is also possible to fix inconsistent state entries. +Finally, the State Processor API opens up many ways to evolve a stateful application that was previously blocked by parameter and design choices that could not be changed without losing all the state of the application after it was started. +For example, you can now arbitrarily modify the data types of states, adjust the maximum parallelism of operators, split or merge operator state, re-assign operator UIDs, and so on. + +To get started with the state processor api, include the following library in your application. + +{% highlight xml %} +<dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-state-processor-api{{ site.scala_version_suffix }}</artifactId> + <version>{{site.version}}</version> + <scope>provided</scope> +</dependency> +{% endhighlight %} * This will be replaced by the TOC {:toc} -## Abstraction +## Mapping Application State to DataSets -To understand how to best interact with savepoints in a batch context it is important to have a clear mental model of how the data in Flink state relates to a traditional relational database. +The State Processor API maps the state of a streaming application to one or more data sets that can be processed separately. +In order to be able to use the API, you need to understand how this mapping works. -A database can be thought of as one or more namespaces, each containing a collection of tables. -Those tables in turn contain columns whose values have some intrinsic relationship between them, such as being scoped under the same key. +But let us first have a look at what a stateful Flink job looks like. +A Flink job is composed of operators; typically one or more source operators, a few operators for the actual processing, and one or more sink operators. +Each operator runs in parallel in one or more tasks and can work with different types of state. +An operator can have zero, one, or more *“operator states”* which are organized as lists that are scoped to the operator's tasks. +If the operator is applied on a keyed stream, it can also have zero, one, or more *“keyed states”* which are scoped to a key that is extracted from each processed record. +You can think of keyed state as a distributed key-value map. -A savepoint represents the state of a Flink job at a particular point in time which is made up of many operators. -Those operators contain various kinds of state, both partitioned or keyed state, and non-partitioned or operator state. +The following figure shows the application “MyApp” which consists of three operators called “Src”, “Proc”, and “Snk”. +Src has one operator state (os1), Proc has one operator state (os2) and two keyed states (ks1, ks2) and Snk is stateless. -<div data-lang="java" markdown="1"> -{% highlight java %} -MapStateDescriptor<Integer, Double> CURRENCY_RATES = new MapStateDescriptor<>("rates", Types.INT, Types.DOUBLE); - -class CurrencyConverter extends BroadcastProcessFunction<Transaction, CurrencyRate, Transaction> { - - public void processElement( - Transaction value, - ReadOnlyContext ctx, - Collector<Transaction> out) throws Exception { - - Double rate = ctx.getBroadcastState(CURRENCY_RATES).get(value.currencyId); - if (rate != null) { - value.amount *= rate; - } - out.collect(value); - } - - public void processBroadcastElement( - CurrencyRate value, - Context ctx, - Collector<Transaction> out) throws Exception { - ctx.getBroadcastState(CURRENCY_RATES).put(value.currencyId, value.rate); - } -} - -class Summarize extends RichFlatMapFunction<Transaction, Summary> { - transient ValueState<Double> totalState; - transient ValueState<Integer> countState; - - public void open(Configuration configuration) throws Exception { - totalState = getRuntimeContext().getState(new ValueStateDescriptor<>("total", Types.DOUBLE)); - countState = getRuntimeContext().getState(new ValueStateDescriptor<>("count", Types.INT)); - } - - public void flatMap(Transaction value, Collector<Summary> out) throws Exception { - Summary summary = new Summary(); - summary.total = value.amount; - summary.count = 1; - - Double currentTotal = totalState.value(); - if (currentTotal != null) { - summary.total += currentTotal; - } - - Integer currentCount = countState.value(); - if (currentCount != null) { - summary.count += currentCount; - } - countState.update(summary.count); - - out.collect(summary); - } -} - -DataStream<Transaction> transactions = . . . -BroadcastStream<CurrencyRate> rates = . . . -transactions - .connect(rates) - .process(new CurrencyConverter()) - .uid("currency_converter") - .keyBy(transaction -> transaction.accountId) - .flatMap(new Summarize()) - .uid("summarize") -{% endhighlight %} -</div> +<p style="display: block; text-align: center; margin-top: 20px; margin-bottom: 20px"> + <img src="{{ site.baseurl }}/fig/application-my-app-state-processor-api.png" width="600px" alt="Application: MyApp"/> +</p> -This job contains multiple operators along with various kinds of state. -When analyzing that state we can first scope data by its operator, named by setting its uid. -Within each operator we can look at the registered states. -`CurrencyConverter` has a broadcast state, which is a type of non-partitioned operator state. -In general, there is no relationship between any two elements in an operator state and so we can look at each value as being its own row. -Contrast this with Summarize, which contains two keyed states. -Because both states are scoped under the same key we can safely assume there exists some relationship between the two values. -Therefore, keyed state is best understood as a single table per operator containing one _key_ column along with _n_ value columns, one for each registered state. -All of this means that the state for this job could be described using the following pseudo-sql commands. +A savepoint or checkpoint of MyApp consists of the data of all states, organized in a way that the states of each task can be restored. +When processing the data of a savepoint (or checkpoint) with a batch job, we need a mental model that maps the data of the individual tasks' states into data sets or tables. +In fact, we can think of a savepoint as a database. Every operator (identified by its UID) represents a namespace. +Each operator state of an operator is mapped to a dedicated table in the namespace with a single column that holds the state's data of all tasks. +All keyed states of an operator are mapped to a single table consisting of a column for the key, and one column for each keyed state. +The following figure shows how a savepoint of MyApp is mapped to a database. -{% highlight sql %} -CREATE NAMESPACE currency_converter; - -CREATE TABLE currency_converter.rates ( - value Tuple2<Integer, Double> -); - -CREATE NAMESPACE summarize; - -CREATE TABLE summarize.keyed_state ( - key INTEGER PRIMARY KEY, - total DOUBLE, - count INTEGER -); -{% endhighlight %} - -In general, the savepoint ↔ database relationship can be summarized as: +<p style="display: block; text-align: center; margin-top: 20px; margin-bottom: 20px"> + <img src="{{ site.baseurl }}/fig/database-my-app-state-processor-api.png" width="600px" alt="Database: MyApp"/> +</p> - * A savepoint is a database - * An operator is a namespace named by its uid - * Each operator state represents a single table - * Each element in an operator state represents a single row in that table - * Each operator containing keyed state has a single “keyed_state” table - * Each keyed_state table has one key column mapping the key value of the operator - * Each registered state represents a single column in the table - * Each row in the table maps to a single key +The figure shows how the values of Src's operator state are mapped to a table with one column and five rows, one row for each of the list entries across all parallel tasks of Src. +Operator state os2 of the operator “Proc” is similarly mapped to an individual table. +The keyed states ks1 and ks2 are combined to a single table with three columns, one for the key, one for ks1 and one for ks2. +The keyed table holds one row for each distinct key of both keyed states. +Since the operator “Snk” does not have any state, its namespace is empty. ## Reading State -Reading state begins by specifiying the path to a valid savepoint or checkpoint along with the `StateBackend` that should be used to restore the data. -The compatability guarantees for restoring state are identical to those when restoring a `DataStream` application. +Reading state begins by specifying the path to a valid savepoint or checkpoint along with the `StateBackend` that should be used to restore the data. +The compatibility guarantees for restoring state are identical to those when restoring a `DataStream` application. <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> {% highlight java %} ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment(); -ExistingSavepoint savepoint = Savepoint.load(bEnv, "hdfs://path/", new RocksDBStateBackend()); +ExistingSavepoint savepoint = Savepoint.load(bEnv, "hdfs://path/", new MemoryStateBackend()); {% endhighlight %} </div> <div data-lang="scala" markdown="1"> {% highlight scala %} -val bEnv = ExecutionEnvironment.getExecutionEnvironment() -val savepoint = Savepoint.load(bEnv, "hdfs://path/", new RocksDBStateBackend()) +val bEnv = ExecutionEnvironment.getExecutionEnvironment +val savepoint = Savepoint.load(bEnv, "hdfs://path/", new MemoryStateBackend) {% endhighlight %} </div> </div> -When reading operator state, simply specify the operator uid, state name, and type information. +### Operator State + +[Operator state]({{ site.baseurl }}/dev/stream/state/state.html#operator-state) is any non-keyed state in Flink. +This includes, but is not limited to, any use of `CheckpointedFunction` or `BroadcastState` within an application. +When reading operator state, users specify the operator uid, the state name, and the type information. + +#### Operator List State + +Operator state stored in a `CheckpointedFunction` using `getListState` can be read using `ExistingSavepoint#readListState`. +The state name and type information should match those used to define the `ListStateDescriptor` that declared this state in the DataStream application. <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> {% highlight java %} -DataSet<Integer> listState = savepoint.readListState( +DataSet<Integer> listState = savepoint.readListState<>( "my-uid", "list-state", Types.INT); - -DataSet<Integer> unionState = savepoint.readUnionState( - "my-uid", - "union-state", - Types.INT); - -DataSet<Tuple2<Integer, Integer>> broadcastState = savepoint.readBroadcastState( - "my-uid", - "broadcast-state", - Types.INT, - Types.INT); {% endhighlight %} </div> <div data-lang="scala" markdown="1"> @@ -191,12 +128,54 @@ val listState = savepoint.readListState( "my-uid", "list-state", Types.INT) +{% endhighlight %} +</div> +</div> -val unionState = savepoint.readUnionState( +#### Operator Union List State + +Operator state stored in a `CheckpointedFunction` using `getUnionListState` can be read using `ExistingSavepoint#readUnionState`. +The state name and type information should match those used to define the `ListStateDescriptor` that declared this state in the DataStream application. +The framework will return a _single_ copy of the state, equivalent to restoring a DataStream with parallelism 1. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +DataSet<Integer> listState = savepoint.readUnionState<>( + "my-uid", + "union-state", + Types.INT); +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val listState = savepoint.readUnionState( "my-uid", "union-state", Types.INT) - +{% endhighlight %} +</div> +</div> + + +#### Broadcast State + +[BroadcastState]({{ site.baseurl }} /dev/stream/state/broadcast_state.html) can be read using `ExistingSavepoint#readBroadcastState`. +The state name and type information should match those used to define the `MapStateDescriptor` that declared this state in the DataStream application. +The framework will return a _single_ copy of the state, equivalent to restoring a DataStream with parallelism 1. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +DataSet<Tuple2<Integer, Integer>> broadcastState = savepoint.readBroadcastState<>( + "my-uid", + "broadcast-state", + Types.INT, + Types.INT); +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} val broadcastState = savepoint.readBroadcastState( "my-uid", "broadcast-state", @@ -206,12 +185,14 @@ val broadcastState = savepoint.readBroadcastState( </div> </div> -A custom `TypeSerializer` may also be specified if one was used in the `StateDescriptor` for the state. +#### Using Custom Serializers + +Each of the operator state readers support using custom `TypeSerializers` if one was used to define the `StateDescriptor` that wrote out the state. <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> {% highlight java %} -DataSet<Integer> listState = savepoint.readListState( +DataSet<Integer> listState = savepoint.readListState<>( "uid", "list-state", Types.INT, @@ -224,12 +205,17 @@ val listState = savepoint.readListState( "uid", "list-state", Types.INT, - new MyCustomIntSerializer()) + new MyCustomIntSerializer) {% endhighlight %} </div> </div> -When reading keyed state, users specify a KeyedStateReaderFunction to allow reading arbitrary columns and complex state types such as ListState, MapState, and AggregatingState. +### Keyed State + +[Keyed state]({{ site.baseurl }}/dev/stream/state/state.html#keyed-state), or partitioned state, is any state that is partitioned relative to a key. +When reading a keyed state, users specify the operator id and a `KeyedStateReaderFunction<KeyType, OutputType>`. + +The `KeyedStateReaderFunction` allows users to read arbitrary columns and complex state types such as ListState, MapState, and AggregatingState. This means if an operator contains a stateful process function such as: <div class="codetabs" markdown="1"> @@ -239,55 +225,80 @@ public class StatefulFunctionWithTime extends KeyedProcessFunction<Integer, Inte ValueState<Integer> state; + ListState<Long> updateTimes; + @Override public void open(Configuration parameters) { ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("state", Types.INT); state = getRuntimeContext().getState(stateDescriptor); + + ListStateDescriptor<Long> updateDescriptor = new ListStateDescriptor<>("times", Types.LONG); + updateTimes = getRuntimeContext().getListState(updateDescriptor); } @Override public void processElement(Integer value, Context ctx, Collector<Void> out) throws Exception { state.update(value + 1); + updateTimes.add(System.currentTimeMillis()); } } {% endhighlight %} </div> <div data-lang="scala" markdown="1"> {% highlight scala %} -public class StatefulFunctionWithTime extends KeyedProcessFunction[Integer, Integer, Void] { +class StatefulFunctionWithTime extends KeyedProcessFunction[Integer, Integer, Void] { - var state: ValueState[Integer]; + var state: ValueState[Integer] = _ - override def open(parameters: Configuration) { - val stateDescriptor = new ValueStateDescriptor("state", Types.INT); - state = getRuntimeContext().getState(stateDescriptor); + var updateTimes: ListState[Long] = _ + + @throws[Exception] + override def open(parameters: Configuration): Unit { + val stateDescriptor = new ValueStateDescriptor("state", Types.INT) + state = getRuntimeContext().getState(stateDescriptor) + + val updateDescirptor = new ListStateDescriptor("times", Types.LONG) + updateTimes = getRuntimeContext().getListState(updateDescriptor) } - override def processElement(value: Integer, ctx: Context, out: Collector[Void]) { - state.update(value + 1); + @throws[Exception] + override def processElement(value: Integer, ctx: Context, out: Collector[Void]): Unit = { + state.update(value + 1) + updateTimes.add(System.currentTimeMillis) } } {% endhighlight %} </div> </div> -Then it can read by defining an output type and corresponding KeyedStateReaderFunction. +Then it can read by defining an output type and corresponding `KeyedStateReaderFunction`. <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> {% highlight java %} -class KeyedState { - Integer key; - Integer value; +DataSet<KeyedState> keyedState = savepoint.readKeyedState("my-uid", new ReaderFunction()); + +public class KeyedState { + public int key; + + public int value; + + public List<Long> times; } -class ReaderFunction extends KeyedStateReaderFunction<Integer, KeyedState> { +public class ReaderFunction extends KeyedStateReaderFunction<Integer, KeyedState> { + ValueState<Integer> state; + ListState<Long> updateTimes; + @Override public void open(Configuration parameters) { - ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("state", Types.INT); - state = getRuntimeContext().getState(stateDescriptor); + ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("state", Types.INT); + state = getRuntimeContext().getState(stateDescriptor); + + ListStateDescriptor<Long> updateDescriptor = new ListStateDescriptor<>("times", Types.LONG); + updateTimes = getRuntimeContext().getListState(updateDescriptor); } @Override @@ -295,58 +306,221 @@ class ReaderFunction extends KeyedStateReaderFunction<Integer, KeyedState> { Integer key, Context ctx, Collector<KeyedState> out) throws Exception { - - KeyedState data = new KeyedState(); - data.key = key; - data.value = state.value(); - out.collect(data); + + KeyedState data = new KeyedState(); + data.key = key; + data.value = state.value(); + data.times = StreamSupport + .stream(updateTimes.get().spliterator(), false) + .collect(Collectors.toList()); + + out.collect(data); } } - -DataSet<KeyedState> keyedState = savepoint.readKeyedState("my-uid", new ReaderFunction()); {% endhighlight %} </div> <div data-lang="scala" markdown="1"> {% highlight scala %} -case class KeyedState(key: Int, value: Int) +val keyedState = savepoint.readKeyedState("my-uid", new ReaderFunction) + +case class KeyedState(key: Int, value: Int, List[Long]) class ReaderFunction extends KeyedStateReaderFunction[Integer, KeyedState] { - var state: ValueState[Integer]; - override def open(parameters: Configuration) { - val stateDescriptor = new ValueStateDescriptor("state", Types.INT); - state = getRuntimeContext().getState(stateDescriptor); - } + var state: ValueState[Integer] = _ + + var updateTimes: ListState[Long] = _ + @throws[Exception] + override def open(parameters: Configuration): Unit { + val stateDescriptor = new ValueStateDescriptor("state", Types.INT) + state = getRuntimeContext().getState(stateDescriptor) + + val updateDescirptor = new ListStateDescriptor("times", Types.LONG) + updateTimes = getRuntimeContext().getListState(updateDescriptor) + } + + + @throws[Exception] override def processKey( key: Int, ctx: Context, - out: Collector[Keyedstate]) throws Exception { + out: Collector[Keyedstate]): Unit { - val data = KeyedState(key, state.value()) - out.collect(data); + val data = KeyedState(key, state.value(), updateTimes.get.asScala.toList) + out.collect(data) } } - -val keyedState = savepoint.readKeyedState("my-uid", new ReaderFunction()); {% endhighlight %} </div> </div> -{% panel **Note:** When using a `KeyedStateReaderFunction` all state descriptors must be registered eagerly inside of open. Any attempt to call `RuntimeContext#getState`, `RuntimeContext#getListState`, or `RuntimeContext#getMapState` will result in a `RuntimeException`. %} +Along with reading registered state values, each key has access to a `Context` with metadata such as registered event time and processing time timers. + +{% panel **Note:** When using a `KeyedStateReaderFunction`, all state descriptors must be registered eagerly inside of open. Any attempt to call a `RuntimeContext#get*State` will result in a `RuntimeException`. %} ## Writing New Savepoints -State writers are based around the abstraction of `Savepoint`, where one `Savepoint` may have many operators and the state for any particular operator is created using a `BootstrapTransformation`. +`Savepoint`'s may also be written, which allows such use cases as bootstrapping state based on historical data. +Each savepoint is made up of one or more `BootstrapTransformation`'s (explained below), each of which defines the state for an individual operator. -A `BootstrapTransformation` starts with a `DataSet` containing the values that are to be written into state. -The transformation may be optionally `keyed` depending on whether or not you are writing keyed or operator state. -Finally a bootstrap function is applied depending to the transformation; Flink supplies `KeyedStateBootstrapFunction` for writing keyed state, `StateBootstrapFunction` for writing non keyed state, and `BroadcastStateBootstrapFunction` for writing broadcast state. +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +int maxParallelism = 128; + +Savepoint + .create(new MemoryStateBackend(), maxParallelism) + .withOperator("uid1", transformation1) + .withOperator("uid2", transformation2) + .write(savepointPath); +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val maxParallelism = 128 + +Savepoint + .create(new MemoryStateBackend(), maxParallelism) + .withOperator("uid1", transformation1) + .withOperator("uid2", transformation2) + .write(savepointPath) +{% endhighlight %} +</div> +</div> + +The [UIDs]({{ site.baseurl}}/ops/state/savepoints.html#assigning-operator-ids) associated with each operator must match one to one with the UIDs assigned to the operators in your `DataStream` application; these are how Flink knows what state maps to which operator. + +### Operator State + +Simple operator state, using `CheckpointedFunction`, can be created using the `StateBootstrapFunction`. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +public class SimpleBootstrapFunction extends StateBootstrapFunction<Integer> { + + private ListState<Integer> state; + + @Override + public void processElement(Integer value, Context ctx) throws Exception { + state.add(value); + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + state = context.getOperatorState().getListState(new ListStateDescriptor<>("state", Types.INT)); + } +} + +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnviornment(); +DataSet<Integer> data = env.fromElements(1, 2, 3); + +BootstrapTransformation transformation = OperatorTransformation + .bootstrapWith(data) + .transform(new SimpleBootstrapFunction()); +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +class SimpleBootstrapFunction extends StateBootstrapFunction[Integer] { + + var ListState[Integer] state = _ + + @throws[Exception] + override def processElement(value: Integer, ctx: Context): Unit = { + state.add(value) + } + + @throws[Exception] + override def snapshotState(context: FunctionSnapshotContext): Unit = { + } + + @throws[Exception] + override def initializeState(context: FunctionInitializationContext): Unit = { + state = context.getOperatorState().getListState(new ListStateDescriptor("state", Types.INT)) + } +} + +val env = ExecutionEnvironment.getExecutionEnviornment +val data = env.fromElements(1, 2, 3) + +BootstrapTransformation transformation = OperatorTransformation + .bootstrapWith(data) + .transform(new SimpleBootstrapFunction) +{% endhighlight %} +</div> +</div> + +### Broadcast State + +[BroadcastState]({{ site.baseurl }} /dev/stream/state/broadcast_state.html) can be written using a `BroadcastStateBootstrapFunction`. Similar to broadcast state in the `DataStream` API, the full state must fit in memory. <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> {% highlight java %} -public class Account { +public class CurrencyRate { + public String currency; + + public Double rate; +} + +public class CurrencyBootstrapFunction extends BroadcastStateBootstrapFunction<CurrencyRate> { + + public static final MapStateDescriptor<String, Double> descriptor = + new MapStateDescriptor<>("currency-rates", Types.STRING, Types.DOUBLE); + + @Override + public void processElement(CurrencyRate value, Context ctx) throws Exception { + ctx.getBroadcastState(descriptor).put(value.currency, value.rate); + } +} + +DataSet<CurrencyRate> currencyDataSet = bEnv.fromCollection( + new CurrencyRate("USD", 1.0), new CurrencyRate("EUR", 1.3)); + +BootstrapTransformation<CurrencyRate> broadcastTransformation = OperatorTransformation + .bootstrapWith(currencyDataSet) + .transform(new CurrencyBootstrapFunction()); +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +case class CurrencyRate(currency: String, rate: Double) + +object CurrencyBootstrapFunction { + val descriptor = new MapStateDescriptor("currency-rates", Types.STRING, Types.DOUBLE) +} + +class CurrencyBootstrapFunction extends BroadcastStateBootstrapFunction[CurrencyRate] { + + @throws[Exception] + override processElement(value: CurrencyRate, ctx: Context): Unit = { + ctx.getBroadcastState(descriptor).put(value.currency, value.rate) + } +} + +val currencyDataSet = bEnv.fromCollection(CurrencyRate("USD", 1.0), CurrencyRate("EUR", 1.3)) + +val broadcastTransformation = OperatorTransformation + .bootstrapWith(currencyDataSet) + .transform(new CurrencyBootstrapFunction) +{% endhighlight %} +</div> +</div> + +### Keyed State + +Keyed state for `ProcessFunction`'s and other `RichFunction` types can be written using a `KeyedStateBootstrapFunction`. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +public class Account { public int id; public double amount; @@ -382,12 +556,13 @@ BootstrapTransformation<Account> transformation = OperatorTransformation <div data-lang="scala" markdown="1"> {% highlight scala %} case class Account(id: Int, amount: Double, timestamp: Long) - + class AccountBootstrapper extends KeyedStateBootstrapFunction[Integer, Account] { var state: ValueState[Double] + @throws[Exception] override def open(parameters: Configuration): Unit = { - val descriptor = new ValueStateDescriptor[Double]("total",Types.DOUBLE) + val descriptor = new ValueStateDescriptor("total",Types.DOUBLE) state = getRuntimeContext().getState(descriptor) } @@ -403,47 +578,27 @@ val accountDataSet = bEnv.fromCollection(accounts) val transformation = OperatorTransformation .bootstrapWith(accountDataSet) - .keyBy(acc -> acc.id) - .transform(new AccountBootstrapper()) + .keyBy(acc => acc.id) + .transform(new AccountBootstrapper) {% endhighlight %} </div> </div> The `KeyedStateBootstrapFunction` supports setting event time and processing time timers. The timers will not fire inside the bootstrap function and only become active once restored within a `DataStream` application. -If a processing time timer is set but the state is not restored until after that time has passed, the timer will fire immediatly upon start. +If a processing time timer is set but the state is not restored until after that time has passed, the timer will fire immediately upon start. -Once one or more transformations have been created they may be combined into a single `Savepoint`. -`Savepoint`'s are created using a state backend and max parallelism, they may contain any number of operators. +<span class="label label-danger">Attention</span> If your bootstrap function creates timers, the state can only be restored using one of the [process]({{ site.baseurl }}/dev/stream/operators/process_function.html) type functions. -<div class="codetabs" markdown="1"> -<div data-lang="java" markdown="1"> -{% highlight java %} -Savepoint - .create(backend, 128) - .withOperator("uid1", transformation1) - .withOperator("uid2", transformation2) - .write(savepointPath); -{% endhighlight %} -</div> -<div data-lang="scala" markdown="1"> -{% highlight scala %} -Savepoint - .create(backend, 128) - .withOperator("uid1", transformation1) - .withOperator("uid2", transformation2) - .write(savepointPath) -{% endhighlight %} -</div> -</div> - -Besides creating a savepoint from scratch, you can base on off an existing savepoint such as when bootstrapping a single new operator for an existing job. +## Modifying Savepoints + +Besides creating a savepoint from scratch, you can base one off an existing savepoint such as when bootstrapping a single new operator for an existing job. <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> {% highlight java %} Savepoint - .load(backend, oldPath) + .load(bEnv, new MemoryStateBackend(), oldPath) .withOperator("uid", transformation) .write(newPath); {% endhighlight %} @@ -451,7 +606,7 @@ Savepoint <div data-lang="scala" markdown="1"> {% highlight scala %} Savepoint - .load(backend, oldPath) + .load(bEnv, new MemoryStateBackend, oldPath) .withOperator("uid", transformation) .write(newPath) {% endhighlight %} diff --git a/docs/fig/application-my-app-state-processor-api.png b/docs/fig/application-my-app-state-processor-api.png new file mode 100644 index 0000000..ca6ef6c Binary files /dev/null and b/docs/fig/application-my-app-state-processor-api.png differ diff --git a/docs/fig/database-my-app-state-processor-api.png b/docs/fig/database-my-app-state-processor-api.png new file mode 100644 index 0000000..31642fb Binary files /dev/null and b/docs/fig/database-my-app-state-processor-api.png differ