[ 
https://issues.apache.org/jira/browse/FLINK-5991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15954859#comment-15954859
 ] 

ASF GitHub Bot commented on FLINK-5991:
---------------------------------------

Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3508#discussion_r109610444
  
    --- Diff: docs/dev/stream/state.md ---
    @@ -233,45 +229,44 @@ val counts: DataStream[(String, Int)] = stream
     
     ## Using Managed Operator State
     
    -A stateful function can implement either the more general 
`CheckpointedFunction`
    +To use managed operator state, a stateful function can implement either 
the more general `CheckpointedFunction`
     interface, or the `ListCheckpointed<T extends Serializable>` interface.
     
    -In both cases, the non-keyed state is expected to be a `List` of 
*serializable* objects, independent from each other,
    -thus eligible for redistribution upon rescaling. In other words, these 
objects are the finest granularity at which
    -non-keyed state can be repartitioned. As an example, if with parallelism 1 
the checkpointed state of the `BufferingSink`
    -contains elements `(test1, 2)` and `(test2, 2)`, when increasing the 
parallelism to 2, `(test1, 2)` may end up in task 0,
    -while `(test2, 2)` will go to task 1.
    -
    -##### ListCheckpointed
    +#### CheckpointedFunction
     
    -The `ListCheckpointed` interface requires the implementation of two 
methods:
    -
    -{% highlight java %}
    -List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
    -
    -void restoreState(List<T> state) throws Exception;
    -{% endhighlight %}
    -
    -On `snapshotState()` the operator should return a list of objects to 
checkpoint and
    -`restoreState` has to handle such a list upon recovery. If the state is 
not re-partitionable, you can always
    -return a `Collections.singletonList(MY_STATE)` in the `snapshotState()`.
    -
    -##### CheckpointedFunction
    -
    -The `CheckpointedFunction` interface also requires the implementation of 
two methods:
    +The `CheckpointedFunction` interface provides access to non-keyed state 
with different
    +redistribution schemes. It requires the implementation of two methods:
     
     {% highlight java %}
     void snapshotState(FunctionSnapshotContext context) throws Exception;
     
     void initializeState(FunctionInitializationContext context) throws 
Exception;
     {% endhighlight %}
     
    -Whenever a checkpoint has to be performed `snapshotState()` is called. The 
counterpart, `initializeState()`, is called every time the user-defined 
function is initialized, be that when the function is first initialized
    -or be that when actually recovering from an earlier checkpoint. Given 
this, `initializeState()` is not
    +Whenever a checkpoint has to be performed, `snapshotState()` is called. 
The counterpart, `initializeState()`,
    +is called every time the user-defined function is initialized, be that 
when the function is first initialized
    +or be that when the function is actually recovering from an earlier 
checkpoint. Given this, `initializeState()` is not
     only the place where different types of state are initialized, but also 
where state recovery logic is included.
     
    -This is an example of a function that uses `CheckpointedFunction`, a 
stateful `SinkFunction` that
    -uses state to buffer elements before sending them to the outside world:
    +Currently, list-style managed operator state is supported. The state
    +is expected to be a `List` of *serializable* objects, independent from 
each other,
    +thus eligible for redistribution upon rescaling. In other words, these 
objects are the finest granularity at which
    +non-keyed state can be redistributed. Depending on the state accessing 
method,
    +the following redistribution schemes are defined:
    +
    +  - **Even-split redistribution:** Each operator returns a List of state 
elements. The whole state is logically a concatenation of
    --- End diff --
    
    Could use "Round-Robin redistribution". Maybe...


> Expose Broadcast Operator State through public APIs
> ---------------------------------------------------
>
>                 Key: FLINK-5991
>                 URL: https://issues.apache.org/jira/browse/FLINK-5991
>             Project: Flink
>          Issue Type: New Feature
>          Components: DataStream API, State Backends, Checkpointing
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>             Fix For: 1.3.0
>
>
> The broadcast operator state functionality was added in FLINK-5265, it just 
> hasn't been exposed through any public APIs yet.
> Currently, we have 2 streaming connector features for 1.3 that are pending on 
> broadcast state: rescalable Kinesis / Kafka consumers with shard / partition 
> discovery (FLINK-4821 & FLINK-4022). We should consider exposing broadcast 
> state for the 1.3 release also.
> This JIRA also serves the purpose to discuss how we want to expose it.
> To initiate the discussion, I propose:
> 1. For the more powerful {{CheckpointedFunction}}, add the following to the 
> {{OperatorStateStore}} interface:
> {code}
> <S> ListState<S> getBroadcastOperatorState(ListStateDescriptor<S> 
> stateDescriptor);
> <T extends Serializable> ListState<T> 
> getBroadcastSerializableListState(String stateName);
> {code}
> 2. For a simpler {{ListCheckpointed}} variant, we probably should have a 
> separate {{BroadcastListCheckpointed}} interface.
> Extending {{ListCheckpointed}} to let the user define either the list state 
> type of either {{PARTITIONABLE}} or {{BROADCAST}} might also be possible, if 
> we can rely on a contract that the value doesn't change. Or we expose a 
> defining method (e.g. {{getListStateType()}}) that is called only once in the 
> operator. This would break user code, but can be considered because it is 
> marked as {{PublicEvolving}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to