Hi,

First of all, I like the idea of eagerly defined state for user functions, this 
makes for very nice API, as can be seen in the Beam state API. I also agree 
with previous posters that completely banning lazy state makes certain use 
cases very tricky to implement, basically putting the burden of lazy state 
evaluation on the user by forcing them to use a generic MapState where they 
manually keep track of lazily allocated new state types. (Or some such thing.) 

With this out of the way, I have some remarks on the new APIs:

What would be the fate of this style of method for getting state on 
KeyedStateBackend:

<N, S extends State> S getPartitionedState(
      N namespace,
      TypeSerializer<N> namespaceSerializer,
      StateDescriptor<S, ?> stateDescriptor) throws Exception;

This is used, for example by the WindowOperator to retrieve state. The issue is 
that the WindowOperator is not actually aware of the type of state, it just 
needs to know that it is an AppendingState so that it can put elements into 
state and retrieve them when firing. This is used for supporting different 
kinds of windows (with ListState, ReducingState, FoldingState, and 
AggregatingState) without having extra code in the WindowOperator for that.

How would registering a state that uses a function work with the 
annotation-based user API, I think this is problematic:

@KeyedState(
    stateId = “reducing-state",
    typeSerializerFactory = MySerializer.class,
    function = ?
)
private ReducingState<MyPojo> reducingState

The reason why this is problematic is that annotations only allow constants as 
parameters and in Flink users usually specify objects (anonymous inner, or 
lambda functions) when specifying a user function. Also, we cannot have a 
generic type here, so there’s no way of using Java type checking for verifying 
that a user specifies a ReduceFunction<T> for a ReducingState<T>.

As a side note, Beam is getting around this problem by only putting the state 
name in the annotation and the rest in a state spec, like this:

@StateId(“stateId”)
private final StateSpec<CombiningState<Double, CountSum<Double>, Double>> 
combiningState =
    StateSpecs.combining(new Mean.CountSumCoder<Double>(), Mean.<Double>of());

What do you think?

Best,
Aljoscha


> On 12. Jul 2017, at 08:37, hzyuemeng1 <hzyueme...@corp.netease.com> wrote:
> 
> pengwenlong?
> 
> 2017-07-12 
> 
> hzyuemeng1 
> 
> 
> 
> 发件人:"wenlong.lwl" <wenlong88....@gmail.com>
> 发送时间:2017-07-05 10:27
> 主题:Re: [DISCUSS] FLIP-22: Eager State Declaration
> 收件人:"dev"<dev@flink.apache.org>
> 抄送:
> 
> Hi, all, we have jobs which create state according to type of the key and a 
> dynamic configuration: 
> 
> eg: key_type_1's aggregation function is average, while key_type_2's is sum 
> 
> we need to create state dynamically because the aggregation function may 
> change in runtime and different aggregation function may need different 
> state to persistent state. It is really hard to declare state eagerly. 
> 
> In the flip, I think the main concern to propose the eager declaration of 
> state is to make sure when restoring  we can have all states registered. 
> how about just persisting state descriptor in state handle and 
> automatically register states in restoring? 
> 
> On 5 July 2017 at 03:53, Chesnay Schepler <ches...@apache.org> wrote: 
> 
>> Could you add an example to the FLIP for how a user can register a state 
>> with the methods in the RichFunction interface? 
>> Currently it only contains an example for the annotation option. 
>> 
>> These methods look like they are called by the user, but that doesn't 
>> really make sense to me as after all the user has to 
>> implement them. 
>> 
>> To me a more intuitive signature would be 
>> 
>> |void registerKeyedState(StateDescriptorRegistry registry);| 
>> 
>> that is called by the system when a UDF is provided by a user who then 
>> registers all the state descriptors he has. 
>> 
>> 
>> On 04.07.2017 20:00, Tzu-Li (Gordon) Tai wrote: 
>> 
>>> Hi Flink devs! 
>>> 
>>> I would like to propose the following FLIP - Eager State Declaration for 
>>> Flink managed state: https://cwiki.apache.org/confl 
>>> uence/display/FLINK/FLIP-22%3A+Eager+State+Declaration. 
>>> The proposal is a result of some offline discussions with Aljoscha 
>>> Krettek, Stephan Ewen, and Stefan Richter. 
>>> 
>>> With how the current managed state declaration interfaces work, users may 
>>> declare state lazily while jobs are running. 
>>> This behavior is a direct blocker for several state management features 
>>> we wish to make a reality in the future. 
>>> I also see it as an opportunity to make the interfaces for keyed / 
>>> operator managed state declarations more unified at the API level, as well 
>>> as improved user experience for general use cases. 
>>> 
>>> The most important part of the required changes is the deprecation of 
>>> existing APIs and introducing new state declaration interfaces. 
>>> Since this would be a rework of the state interfaces, it would be great 
>>> to hear thoughts on this and make sure that the proposal is what we want in 
>>> the long run! 
>>> 
>>> Happy to hear feedback on this :) 
>>> 
>>> Cheers, 
>>> Gordon 
>>> 
>> 
>> 

Reply via email to