You can go ahead and do the change. I just think that this is quite
fragile. For example, this depends on the reduce function returning the
right object for reuse. If we hand in the copied object as the first input
and the ReduceFunction reuses the second input then we again have a
reference to the same (input) object in the state. Several components have
to work together to make this intricate dance work and if someone changes
the order in the reduce function for some reason this might break.

Big +1 on documenting the shortcomings. :-)

On Wed, 23 Nov 2016 at 12:34 Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Aljoscha,
>
> sure, there many issues with holding the state as objects on the heap.
> However, I think we don't have to solve all problems related to that in
> order to add a small fix that solves one specific issue.
> I would not explicitly expose the fix to users but it would be nice if we
> could implement more efficient code for internal functions.
>
> Moreover, I think we should extend the documentation and clearly point out
> the limitations regarding modifying state objects.
>
> Best, Fabian
>
>
>
> 2016-11-23 12:07 GMT+01:00 sjk <shijinkui...@163.com>:
>
> > hi,Fabian Hueske, Sorry for mistake for the whole PR #2792
> >
> > > On Nov 23, 2016, at 17:10, Fabian Hueske <fhue...@gmail.com> wrote:
> > >
> > > Hi,
> > >
> > > Why do you think that this means "much code changes"?
> > > I think it would actually be a pretty lightweight change in
> > > HeapReducingState.
> > >
> > > The proposal is to copy the *first* value that goes into a
> ReducingState.
> > > The copy would be done by a TypeSerializer and hence be a deep copy.
> > > This will allow to reuse the copy in each invocation of the
> > ReduceFunction
> > > instead of creating a new result object of the same type that was
> > initially
> > > copied.
> > >
> > > I think the savings of reusing the object in each invocation of the
> > > ReduceFunction and not creating a new object should amortize the
> one-time
> > > object copy.
> > >
> > > Fabian
> > >
> > > 2016-11-23 3:04 GMT+01:00 sjk <shijinkui...@163.com>:
> > >
> > >> Hi, Fabian
> > >>
> > >> So much code changes. Can you show us the key changes code for the
> > object
> > >> copy?
> > >> Object reference maybe hold more deep reference, it can be a bomb.
> > >> Can we renew a object with its data or direct use kryo for object
> > >> serialization?
> > >> I’m not prefer object copy.
> > >>
> > >>
> > >>> On Nov 22, 2016, at 20:33, Fabian Hueske <fhue...@gmail.com> wrote:
> > >>>
> > >>> Does anybody have objections against copying the first record that
> goes
> > >>> into the ReduceState?
> > >>>
> > >>> 2016-11-22 12:49 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>:
> > >>>
> > >>>> That's right, yes.
> > >>>>
> > >>>> On Mon, 21 Nov 2016 at 19:14 Fabian Hueske <fhue...@gmail.com>
> wrote:
> > >>>>
> > >>>>> Right, but that would be a much bigger change than "just" copying
> the
> > >>>>> *first* record that goes into the ReduceState, or am I missing
> > >> something?
> > >>>>>
> > >>>>>
> > >>>>> 2016-11-21 18:41 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>:
> > >>>>>
> > >>>>>> To bring over my comment from the Github PR that started this
> > >>>> discussion:
> > >>>>>>
> > >>>>>> @wuchong <https://github.com/wuchong>, yes this is a problem with
> > the
> > >>>>>> HeapStateBackend. The RocksDB backend does not suffer from this
> > >>>> problem.
> > >>>>> I
> > >>>>>> think in the long run we should migrate the HeapStateBackend to
> > always
> > >>>>> keep
> > >>>>>> data in serialised form, then we also won't have this problem
> > anymore.
> > >>>>>>
> > >>>>>> So I'm very much in favour of keeping data serialised. Copying
> data
> > >>>> would
> > >>>>>> only ever be a stopgap solution.
> > >>>>>>
> > >>>>>> On Mon, 21 Nov 2016 at 15:56 Fabian Hueske <fhue...@gmail.com>
> > wrote:
> > >>>>>>
> > >>>>>>> Another approach that would solve the problem for our use case
> > >>>> (object
> > >>>>>>> re-usage for incremental window ReduceFunctions) would be to copy
> > the
> > >>>>>> first
> > >>>>>>> object that is put into the state.
> > >>>>>>> This would be a change on the ReduceState, not on the overall
> state
> > >>>>>>> backend, which should be feasible, no?
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> 2016-11-21 15:43 GMT+01:00 Stephan Ewen <se...@apache.org>:
> > >>>>>>>
> > >>>>>>>> -1 for copying objects.
> > >>>>>>>>
> > >>>>>>>> Storing a serialized data where possible is good, but copying
> all
> > >>>>>> objects
> > >>>>>>>> by default is not a good idea, in my opinion.
> > >>>>>>>> A lot of scenarios use data types that are hellishly expensive
> to
> > >>>>> copy.
> > >>>>>>>> Even the current copy on chain handover is a problem.
> > >>>>>>>>
> > >>>>>>>> Let's not introduce even more copies.
> > >>>>>>>>
> > >>>>>>>> On Mon, Nov 21, 2016 at 3:16 PM, Maciek Próchniak <m...@touk.pl>
> > >>>>> wrote:
> > >>>>>>>>
> > >>>>>>>>> Hi,
> > >>>>>>>>>
> > >>>>>>>>> it will come with performance overhead when updating the state,
> > >>>>> but I
> > >>>>>>>>> think it'll be possible to perform asynchronous snapshots using
> > >>>>>>>>> HeapStateBackend (probably some changes to underlying data
> > >>>>> structures
> > >>>>>>>> would
> > >>>>>>>>> be needed) - which would bring more predictable performance.
> > >>>>>>>>>
> > >>>>>>>>> thanks,
> > >>>>>>>>> maciek
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> On 21/11/2016 13:48, Aljoscha Krettek wrote:
> > >>>>>>>>>
> > >>>>>>>>>> Hi,
> > >>>>>>>>>> I would be in favour of this since it brings things in line
> with
> > >>>>> the
> > >>>>>>>>>> RocksDB backend. This will, however, come with quite the
> > >>>>> performance
> > >>>>>>>>>> overhead, depending on how fast the TypeSerializer can copy.
> > >>>>>>>>>>
> > >>>>>>>>>> Cheers,
> > >>>>>>>>>> Aljoscha
> > >>>>>>>>>>
> > >>>>>>>>>> On Mon, 21 Nov 2016 at 11:30 Fabian Hueske <fhue...@gmail.com
> >
> > >>>>>> wrote:
> > >>>>>>>>>>
> > >>>>>>>>>> Hi everybody,
> > >>>>>>>>>>>
> > >>>>>>>>>>> when implementing a ReduceFunction for incremental
> aggregation
> > >>>> of
> > >>>>>>> SQL /
> > >>>>>>>>>>> Table API window aggregates we noticed that the
> > >>>> HeapStateBackend
> > >>>>>> does
> > >>>>>>>> not
> > >>>>>>>>>>> store copies but holds references to the original objects. In
> > >>>>> case
> > >>>>>>> of a
> > >>>>>>>>>>> SlidingWindow, the same object is referenced from different
> > >>>>> window
> > >>>>>>>> panes.
> > >>>>>>>>>>> Therefore, it is not possible to modify these objects (in
> order
> > >>>>> to
> > >>>>>>>> avoid
> > >>>>>>>>>>> object instantiations, see discussion [1]).
> > >>>>>>>>>>>
> > >>>>>>>>>>> Other state backends serialize their data such that the
> > >>>> behavior
> > >>>>> is
> > >>>>>>> not
> > >>>>>>>>>>> consistent across backends.
> > >>>>>>>>>>> If we want to have light-weight tests, we have to create new
> > >>>>>> objects
> > >>>>>>> in
> > >>>>>>>>>>> the
> > >>>>>>>>>>> ReduceFunction causing unnecessary overhead.
> > >>>>>>>>>>>
> > >>>>>>>>>>> I would propose to copy objects when storing them in a
> > >>>>>>>> HeapStateBackend.
> > >>>>>>>>>>> This would ensure that objects returned from state to the
> user
> > >>>>>> behave
> > >>>>>>>>>>> identical for different state backends.
> > >>>>>>>>>>>
> > >>>>>>>>>>> We created a related JIRA [2] that asks to copy records that
> go
> > >>>>>> into
> > >>>>>>> an
> > >>>>>>>>>>> incremental ReduceFunction. The scope is more narrow and
> would
> > >>>>>> solve
> > >>>>>>>> our
> > >>>>>>>>>>> problem, but would leave the inconsistent behavior of state
> > >>>>>> backends
> > >>>>>>> in
> > >>>>>>>>>>> place.
> > >>>>>>>>>>>
> > >>>>>>>>>>> What do others think?
> > >>>>>>>>>>>
> > >>>>>>>>>>> Cheers, Fabian
> > >>>>>>>>>>>
> > >>>>>>>>>>> [1]
> > >>>>> https://github.com/apache/flink/pull/2792#discussion_r88653721
> > >>>>>>>>>>> [2] https://issues.apache.org/jira/browse/FLINK-5105
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>
> > >>
> > >>
> >
> >
> >
>

Reply via email to