Hi, I would be in favour of this since it brings things in line with the RocksDB backend. This will, however, come with quite the performance overhead, depending on how fast the TypeSerializer can copy.
Cheers, Aljoscha On Mon, 21 Nov 2016 at 11:30 Fabian Hueske <fhue...@gmail.com> wrote: > Hi everybody, > > when implementing a ReduceFunction for incremental aggregation of SQL / > Table API window aggregates we noticed that the HeapStateBackend does not > store copies but holds references to the original objects. In case of a > SlidingWindow, the same object is referenced from different window panes. > Therefore, it is not possible to modify these objects (in order to avoid > object instantiations, see discussion [1]). > > Other state backends serialize their data such that the behavior is not > consistent across backends. > If we want to have light-weight tests, we have to create new objects in the > ReduceFunction causing unnecessary overhead. > > I would propose to copy objects when storing them in a HeapStateBackend. > This would ensure that objects returned from state to the user behave > identical for different state backends. > > We created a related JIRA [2] that asks to copy records that go into an > incremental ReduceFunction. The scope is more narrow and would solve our > problem, but would leave the inconsistent behavior of state backends in > place. > > What do others think? > > Cheers, Fabian > > [1] https://github.com/apache/flink/pull/2792#discussion_r88653721 > [2] https://issues.apache.org/jira/browse/FLINK-5105 >