Repository: flink Updated Branches: refs/heads/release-1.3 a390cee33 -> 8b213f578
[FLINK-6478] [doc] Document how to upgrade state serializers This closes #4006. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8abf9392 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8abf9392 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8abf9392 Branch: refs/heads/release-1.3 Commit: 8abf9392ea2cce4d40987b50ccb681c76857dedf Parents: a390cee Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org> Authored: Mon May 29 10:53:11 2017 +0200 Committer: Tzu-Li (Gordon) Tai <tzuli...@apache.org> Committed: Thu Jun 1 14:05:05 2017 +0200 ---------------------------------------------------------------------- docs/dev/stream/state.md | 137 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 137 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8abf9392/docs/dev/stream/state.md ---------------------------------------------------------------------- diff --git a/docs/dev/stream/state.md b/docs/dev/stream/state.md index 276842d..97f0c29 100644 --- a/docs/dev/stream/state.md +++ b/docs/dev/stream/state.md @@ -429,3 +429,140 @@ public static class CounterSource Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the `org.apache.flink.runtime.state.CheckpointListener` interface. +## Custom Serialization for Managed State + +This section is targeted as a guideline for users who require the use of custom serialization for their state, covering how +to provide a custom serializer and how to handle upgrades to the serializer for compatibility. If you're simply using +Flink's own serializers, this section is irrelevant and can be skipped. + +### Using custom serializers + +As demonstrated in the above examples, when registering a managed operator or keyed state, a `StateDescriptor` is required +to specify the state's name, as well as information about the type of the state. The type information is used by Flink's +[type serialization framework](../types_serialization.html) to create appropriate serializers for the state. + +It is also possible to completely bypass this and let Flink use your own custom serializer to serialize managed states, +simply by directly instantiating the `StateDescriptor` with your own `TypeSerializer` implementation: + +{% highlight java %} +ListStateDescriptor<Tuple2<String, Integer>> descriptor = + new ListStateDescriptor<>( + "state-name", + new TypeSerializer<> {...}); + +checkpointedState = getRuntimeContext().getListState(descriptor); +{% endhighlight %} + +### Handling serializer upgrades and compatibility + +Flink allows changing the serializers used to read and write managed state, so that users are not locked in to any +specific serialization. When state is restored, the new serializer registered for the state (i.e., the serializer +that comes with the `StateDescriptor` used to access the state in the restored job) will be checked for compatibility, +and is replaced as the new serializer for the state. + +A compatible serializer would mean that the serializer is capable of reading previous serialized bytes of the state, +and the written binary format of the state also remains identical. The means to check the new serializer's compatibility +is provided through the following two methods of the `TypeSerializer` interface: + +{% highlight java %} +public abstract TypeSerializerConfigSnapshot snapshotConfiguration(); +public abstract CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot); +{% endhighlight %} + +Briefly speaking, every time a checkpoint is performed, the `snapshotConfiguration` method is called to create a +point-in-time view of the state serializer's configuration. The returned configuration snapshot is stored along with the +checkpoint as the state's metadata. When the checkpoint is used to restore a job, that serializer configuration snapshot +will be provided to the _new_ serializer of the same state via the counterpart method, `ensureCompatibility`, to verify +compatibility of the new serializer. This method serves as a check for whether or not the new serializer is compatible, +as well as a hook to possibly reconfigure the new serializer in the case that it is incompatible. + +Note that Flink's own serializers are implemented such that they are at least compatible with themselves, i.e. when the +same serializer is used for the state in the restored job, the serializer's will reconfigure themselves to be compatible +with their previous configuration. + +The following subsections illustrate guidelines to implement these two methods when using custom serializers. + +#### Implementing the `snapshotConfiguration` method + +The serializer's configuration snapshot should capture enough information such that on restore, the information +carried over to the new serializer for the state is sufficient for it to determine whether or not it is compatible. +This could typically contain information about the serializer's parameters or binary format of the serialized data; +generally, anything that allows the new serializer to decide whether or not it can be used to read previous serialized +bytes, and that it writes in the same binary format. + +How the serializer's configuration snapshot is written to and read from checkpoints is fully customizable. The below +is the base class for all serializer configuration snapshot implementations, the `TypeSerializerConfigSnapshot`. + +{% highlight java %} +public abstract TypeSerializerConfigSnapshot extends VersionedIOReadableWritable { + public abstract int getVersion(); + public void read(DataInputView in) {...} + public void write(DataOutputView out) {...} +} +{% endhighlight %} + +The `read` and `write` methods define how the configuration is read from and written to the checkpoint. The base +implementations contain logic to read and write the version of the configuration snapshot, so it should be extended and +not completely overridden. + +The version of the configuration snapshot is determined through the `getVersion` method. Versioning for the serializer +configuration snapshot is the means to maintain compatible configurations, as information included in the configuration +may change over time. By default, configuration snapshots are only compatible with the current version (as returned by +`getVersion`). To indicate that the configuration is compatible with other versions, override the `getCompatibleVersions` +method to return more version values. When reading from the checkpoint, you can use the `getReadVersion` method to +determine the version of the written configuration and adapt the read logic to the specific version. + +<span class="label label-danger">Attention</span> The version of the serializer's configuration snapshot is **not** +related to upgrading the serializer. The exact same serializer can have different implementations of its +configuration snapshot, for example when more information is added to the configuration to allow more comprehensive +compatibility checks in the future. + +One limitation of implementing a `TypeSerializerConfigSnapshot` is that an empty constructor must be present. The empty +constructor is required when reading the configuration snapshot from checkpoints. + +#### Implementing the `ensureCompatibility` method + +The `ensureCompatibility` method should contain logic that performs checks against the information about the previous +serializer carried over via the provided `TypeSerializerConfigSnapshot`, basically doing one of the following: + + * Check whether the serializer is compatible, while possibly reconfiguring itself (if required) so that it may be + compatible. Afterwards, acknowledge with Flink that the serializer is compatible. + + * Acknowledge that the serializer is incompatible and that state migration is required before Flink can proceed with + using the new serializer. + +The above cases can be translated to code by returning one of the following from the `ensureCompatibility` method: + + * **`CompatibilityResult.compatible()`**: This acknowledges that the new serializer is compatible, or has been reconfigured to + be compatible, and Flink can proceed with the job with the serializer as is. + + * **`CompatibilityResult.requiresMigration()`**: This acknowledges that the serializer is incompatible, or cannot be + reconfigured to be compatible, and requires a state migration before the new serializer can be used. State migration + is performed by using the previous serializer to read the restored state bytes to objects, and then serialized again + using the new serializer. + + * **`CompatibilityResult.requiresMigration(TypeDeserializer deserializer)`**: This acknowledgement has equivalent semantics + to `CompatibilityResult.requiresMigration()`, but in the case that the previous serializer cannot be found or loaded + to read the restored state bytes for the migration, a provided `TypeDeserializer` can be used as a fallback resort. + +<span class="label label-danger">Attention</span> Currently, as of Flink 1.3, if the result of the compatibility check +acknowledges that state migration needs to be performed, the job simply fails to restore from the checkpoint as state +migration is currently not available. The ability to migrate state will be introduced in future releases. + +### Managing `TypeSerializer` and `TypeSerializerConfigSnapshot` classes in user code + +Since `TypeSerializer`s and `TypeSerializerConfigSnapshot`s are written as part of checkpoints along with the state +values, the availability of the classes within the classpath may affect restore behaviour. + +`TypeSerializer`s are directly written into checkpoints using Java Object Serialization. In the case that the new +serializer acknowledges that it is incompatible and requires state migration, it will be required to be present to be +able to read the restored state bytes. Therefore, if the original serializer class no longer exists or has been modified +(resulting in a different `serialVersionUID`) as a result of a serializer upgrade for the state, the restore would +not be able to proceed. The alternative to this requirement is to provide a fallback `TypeDeserializer` when +acknowledging that state migration is required, using `CompatibilityResult.requiresMigration(TypeDeserializer deserializer)`. + +The class of `TypeSerializerConfigSnapshot`s in the restored checkpoint must exist in the classpath, as they are +fundamental components to compatibility checks on upgraded serializers and would not be able to be restored if the class +is not present. Since configuration snapshots are written to checkpoints using custom serialization, the implementation +of the class is free to be changed, as long as compatibility of the configuration change is handled using the versioning +mechanisms in `TypeSerializerConfigSnapshot`.