Hi,
I think we have to rethink a bit how the state interfaces work. Having an
interface where you snapshot a type T and restore a type T are not well
suited for dealing with job updates/state updates. For example, let's look
at the current Checkpointed<T> interface. The value that you return is
serialized using Java Serialization. This means, that you can never change
the fields of that type T, i.e. you can never update your data type if you
want to restore from a savepoint of an earlier version. The only workaround
to this is to change Checkpointed<T> to Checkpointed<Serializable> and
manually check the type of the object that you get in restore and cast it.
In the long run, this will lead to everyone implementing
Checkpointed<Serializable> and doing manual (non type safe) casts.

Cheers,
Aljoscha

On Sun, 14 Aug 2016 at 06:54 Gyula Fóra <gyula.f...@gmail.com> wrote:

> Hi,
> Yes I think it makes sense. :)
>
> Gyula
>
> On Fri, Aug 12, 2016, 17:02 Ufuk Celebi <u...@apache.org> wrote:
>
> > I will update the design doc with more details for the Checkpointed
> > variants and remove Option 2 (I think that's an orthogonal thing).
> >
> > The way I see it now, we should have base CheckpointedBase interface,
> > have the current Checkpointed interface be a subclass for not
> > repartitionable state. Then we have two other List-based variants:
> >
> > 1) Union List => on restore all state is unioned (what is currently in
> > the design doc)
> >
> > 2) List => on restore state is automatically redistributed (if
> > parallelism stays the same, state should go to the same sub tasks, but
> > no guarantees when changed parallelism).
> >
> > ====
> >
> > Regarding the other thing you and Aljoscha discussed: I feel like that
> > should be handled as part of the side input effort. Does that make
> > sense?
> >
> >
> >
> > On Fri, Aug 12, 2016 at 3:11 PM, Gyula Fóra <gyula.f...@gmail.com>
> wrote:
> > > Hi Aljoscha,
> > >
> > > Yes this is pretty much how I think about it as well.
> > >
> > > Basically the state in this case would be computed from the side inputs
> > > with the same state update logic on all operators. I think it is
> imprtant
> > > that operators compute their own state or at least observe all state
> > > changes otherwise a lot of things can get weird.
> > >
> > > Lets say for instance I am building a dynamic filter where new filter
> > > conditions are added /removed on the fly. For the sake of my argument
> > lets
> > > also assume that initializing a new filter condition is a heavy
> > operation.
> > > The global state in this case is the union of all filter conditions.
> > >
> > > If at any point in time the operators could only observe the current
> > state
> > > we might end up with a very inefficient code, while if we observe all
> > state
> > > changes individually  (add 1 new filter) we can jus instantiate the new
> > > filter without worrying about the other ones.
> > >
> > > I am not completely sure if its clear what I am trying to say :D
> > >
> > > Gyula
> > >
> > > On Fri, Aug 12, 2016, 14:28 Aljoscha Krettek <aljos...@apache.org>
> > wrote:
> > >
> > >> Hi Gyula,
> > >> I was thinking about this as well, in the context of side-inputs,
> which
> > >> would be a generalization of your use case. If I'm not mistaken. In my
> > head
> > >> I was calling it global state. Essentially, this state would be the
> > same on
> > >> all operators and when checkpointing you would only have to checkpoint
> > the
> > >> state of operator 0. Upon restore you would distribute this state to
> all
> > >> operators again.
> > >>
> > >> Is this what you had in mind?
> > >>
> > >> Cheers,
> > >> Aljoscha
> > >>
> > >> On Fri, 12 Aug 2016 at 13:07 Gyula Fóra <gyula.f...@gmail.com> wrote:
> > >>
> > >> > Hi,
> > >> > Let me try to explain what I mean by broadcast states.
> > >> >
> > >> > I think it is a very common pattern that people broadcast control
> > >> messages
> > >> > to operators that also receive normal input events.
> > >> >
> > >> > some examples: broadcast a model for prediction, broadcast some
> > >> information
> > >> > that should be the same at all subtasks but is evolving over time.
> At
> > the
> > >> > same time these operators usually also do normal event processing
> > based
> > >> on
> > >> > the broadcasted input stream.
> > >> >
> > >> > There is currently no proper solution for this provided by the api.
> We
> > >> can
> > >> > of course use connected operators or wrapper types and broadcast one
> > of
> > >> the
> > >> > input but there are several limitations. We cant use keyed states
> for
> > >> > instance becase that requires both inputs to be keyed (so we cant
> > >> > broadcast).
> > >> >
> > >> > Cheers,
> > >> > Gyula
> > >> >
> > >> > On Fri, Aug 12, 2016, 12:28 Ufuk Celebi <u...@apache.org> wrote:
> > >> >
> > >> > > Comments inline.
> > >> > >
> > >> > > On Thu, Aug 11, 2016 at 8:06 PM, Gyula Fóra <gyula.f...@gmail.com
> >
> > >> > wrote:
> > >> > > > Option 1:
> > >> > > > I think the main problem here is sending all the state
> everywhere
> > >> will
> > >> > > not
> > >> > > > scale at all. I think this will even fail for some internal
> Flink
> > >> > > operators
> > >> > > > (window timers I think are kept like this, maybe Im wrong here).
> > The
> > >> > > > general problem here what we don't have with the key-value
> states
> > is
> > >> > that
> > >> > > > the system can't do the repartitioning automatically. I think we
> > >> should
> > >> > > try
> > >> > > > to make abstractions that would allow the system to do this.
> > >> > >
> > >> > > The state size can definitely become a problem. For Kafka sources
> > for
> > >> > > example I don' think it would be problematic, but the timers it
> > might
> > >> > > be, yes. It definitely depends on the use case.
> > >> > >
> > >> > > In theory, we could also redistribute the list elements
> > automatically,
> > >> > > for example in a round robing fashion. The question is whether
> this
> > >> > > will be enough in general.
> > >> > >
> > >> > > >
> > >> > > > Option 2:
> > >> > > > To be honest I don't completely get this approach, what do the
> > >> indices
> > >> > > mean
> > >> > > > in the get set methods? What happens if the same index is used
> > from
> > >> > > > multiple operators?
> > >> > > > This may also suffers in scalability like option 1 (but as I
> said
> > I
> > >> > dont
> > >> > > > get this completely :()
> > >> > >
> > >> > > Yes, I don't like it either. It's actually similar to Option 1
> (from
> > >> > > runtime perspective). I think the main question with Option 2 is
> > >> > > whether we expose the API as an interface or a state class. If we
> go
> > >> > > for this kind of interface we could parameterize the restore
> > behaviour
> > >> > > via the descriptor (e.g. flag to merge/union etc.). That should be
> > >> > > more extensible than providing interfaces.
> > >> > >
> > >> > > > I think another approach could be (might be similar what option
> 2
> > is
> > >> > > trying
> > >> > > > to achieve) to provide a Set<T>  (or Map) like  abstraction to
> > keep
> > >> the
> > >> > > non
> > >> > > > partitioned states. Users could add/remove things from it at
> > their on
> > >> > > will,
> > >> > > > but the system would be free to redistribute the Sets between
> the
> > >> > > > operators. In practice this would mean for instance that the
> Kafka
> > >> > > sources
> > >> > > > would store (partition, offset) tuples in the set but and every
> > time
> > >> in
> > >> > > the
> > >> > > > open method they would check what is assigned to them (the
> system
> > is
> > >> > free
> > >> > > > to decide). This of course would only work well if we can assume
> > that
> > >> > > > distributing the states by equal numbers is desirable.
> > >> > >
> > >> > > I think the same point applies to redistributing the list
> > >> > > automatically (what I meant with whether it is "general enough").
> I
> > >> > > think what you describe here could be the list w/o unioning it.
> > >> > >
> > >> > > >
> > >> > > > Broadcast states:
> > >> > > > This might be a good time to think about broadcast states.
> > >> > > Non-partitioned
> > >> > > > states that are the same at all subtasks, I think this comes up
> > in a
> > >> > lot
> > >> > > of
> > >> > > > use-cases (I know at least one myself haha) and it is pretty
> > straight
> > >> > > > forward from a runtime perspective, the bigger question is the
> > API.
> > >> > >
> > >> > > Can you explain this a little more?
> > >> > >
> > >> > > ========
> > >> > >
> > >> > > Another open question (not addressed in the FLIP yet) is how we
> > treat
> > >> > > operators that have both keyed and non-keyed state. The current
> API
> > >> > > kind of moves this question to the user.
> > >> > >
> > >> >
> > >>
> >
>

Reply via email to