Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/984#discussion_r48799230
  
    --- Diff: docs/documentation/State-checkpointing.md ---
    @@ -0,0 +1,147 @@
    +# State support in core storm
    +Storm core has abstractions for bolts to save and retrieve the state of 
its operations. There is a default in-memory
    +based state implementation and also a Redis backed implementation that 
provides state persistence.
    +
    +## State management
    +Bolts that requires its state to be managed and persisted by the framework 
should implement the `IStatefulBolt` interface or
    +extend the `BaseStatefulBolt` and implement `void initState(T state)` 
method. The `initState` method is invoked by the framework
    +during the bolt initialization with the previously saved state of the 
bolt. This is invoked after prepare but before the bolt starts
    +processing any tuples.
    +
    +Currently the only kind of `State` implementation that is supported is 
`KeyValueState` which provides key-value mapping.
    +
    +For example a word count bolt could use the key value state abstraction 
for the word counts as follows.
    +
    +1. Extend the BaseStatefulBolt and type parameterize it with KeyValueState 
which would store the mapping of word to count.
    +2. The bolt gets initialized with its previously saved state in the init 
method. This will contain the word counts
    +last committed by the framework during the previous run.
    +3. In the execute method, update the word count.
    +
    + ```java
    + public class WordCountBolt extends BaseStatefulBolt<KeyValueState<String, 
Long>> {
    + private KeyValueState<String, Long> wordCounts;
    + ...
    +     @Override
    +     public void initState(KeyValueState<String, Long> state) {
    +       wordCounts = state;
    +     }
    +     @Override
    +     public void execute(Tuple tuple, BasicOutputCollector collector) {
    +       String word = tuple.getString(0);
    +       Integer count = wordCounts.get(word, 0);
    +       count++;
    +       wordCounts.put(word, count);
    +       collector.emit(new Values(word, count));
    +     }
    + ...
    + }
    + ```
    +4. The framework periodically checkpoints the state of the bolt (default 
every second). The frequency
    +can be changed by setting the storm config 
`topology.state.checkpoint.interval.ms`
    +5. For state persistence, use a state provider that supports persistence 
by setting the `topology.state.provider` in the
    +storm config. E.g. for using Redis based key-value state implementation 
set `topology.state.provider: 
org.apache.storm.redis.state.RedisKeyValueStateProvider`
    +in storm.yaml. The provider implementation jar should be in the class 
path, which in this case means putting the `storm-redis-*.jar`
    +in the extlib directory.
    +6. The state provider properties can be overridden by setting 
`topology.state.provider.config`. For Redis state this is a
    +json config with the following properties.
    +
    + ```
    + {
    +   "keyClass": "Optional fully qualified class name of the Key type.",
    +   "valueClass": "Optional fully qualified class name of the Value type.",
    +   "keySerializerClass": "Optional Key serializer implementation class.",
    +   "valueSerializerClass": "Optional Value Serializer implementation 
class.",
    +   "jedisPoolConfig": {
    +     "host": "localhost",
    +     "port": 6379,
    +     "timeout": 2000,
    +     "database": 0,
    +     "password": "xyz"
    +     }
    + }
    + ```
    +
    +## Checkpoint mechanism
    +Checkpoint is triggered by an internal checkpoint spout at the specified 
`topology.state.checkpoint.interval.ms`. If there is
    +at-least one `IStatefulBolt` in the topology, the checkpoint spout is 
automatically added by the topology builder . For stateful topologies,
    +the topology builder wraps the `IStatefulBolt` in a `StatefulBoltExecutor` 
which handles the state commits on receiving the checkpoint tuples.
    +The non stateful bolts are wrapped in a `CheckpointTupleForwarder` which 
just forwards the checkpoint tuples so that the checkpoint tuples
    +can flow through the topology DAG. The checkpoint tuples flow through a 
separate internal stream namely `$checkpoint`. The topology builder
    +wires the checkpoint stream across the whole topology with the checkpoint 
spout at the root.
    +
    +```
    +              default                         default               default
    +[spout1]   ---------------> [statefulbolt1] ----------> [bolt1] 
--------------> [statefulbolt2]
    +                          |                 ---------->         
-------------->
    +                          |                   ($chpt)               ($chpt)
    +                          |
    +[$checkpointspout] _______| ($chpt)
    +```
    +
    +At checkpoint intervals the checkpoint tuples are emitted by the 
checkpoint spout. On receiving a checkpoint tuple, the state of the bolt
    +is saved and then the checkpoint tuple is forwarded to the next component. 
Each bolt waits for the checkpoint to arrive on all its input
    +streams before it saves its state so that the state represents a 
consistent state across the topology. Once the checkpoint spout receives
    +ACK from all the bolts, the state commit is complete and the transaction 
is recorded as committed by the checkpoint spout.
    +
    +The state commit works like a three phase commit protocol with a prepare 
and commit phase so that the state across the topology is saved
    +in a consistent and atomic manner.
    +
    +### Recovery
    +The recovery phase is triggered when the topology is started for the first 
time. If the previous transaction was not successfully
    +prepared, a `rollback` message is sent across the topology so that if a 
bolt has some prepared transactions it can be discarded.
    +If the previous transaction was prepared successfully but not committed, a 
`commit` message is sent across the topology so that
    +the prepared transactions can be committed. After these steps are 
complete, the bolts are initialized with the state.
    +
    +The recovery is also triggered if one of the bolts fails to acknowledge 
the checkpoint message or say a worker crashed in
    +the middle. Thus when the worker is restarted by the supervisor, the 
checkpoint mechanism makes sure that the bolt gets
    +initialized with its previous state and the checkpointing continues from 
the point where it left off.
    +
    +### Guarantee
    +Storm relies on the acking mechanism to replay tuples in case of failures. 
It is possible that the state is committed
    +but the worker crashes before acking the tuples. In this case the tuples 
are replayed causing duplicate state updates.
    +Also currently the StatefulBoltExecutor continues to process the tuples 
from a stream after it has received a checkpoint
    +tuple on one stream while waiting for checkpoint to arrive on other input 
streams for saving the state. This can also cause
    +duplicate state updates during recovery.
    +
    +The state abstraction does not eliminate duplicate evaluations and 
currently provides only at-least once guarantee.
    +
    +### IStateful bolt hooks
    +IStatefult bolt interface provides hook methods where in the stateful 
bolts could implement some custom actions.
    --- End diff --
    
    IStatefult -> IStateful


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to