Thanks for the feedback. Will leave this open for some more days, and adopt it as a FLIP, taking Greg's and Aljoscha's comments into account.
On Sun, Jul 2, 2017 at 10:13 PM, Ufuk Celebi <u...@apache.org> wrote: > Thanks for the write up and illustrations. :-) +1 to do this. > > I'm fine with both proposed "changed behaviour" variants, but lean > towards option 1: change the default, make the change explicit in the > release notes and add a good docs page about configuring object reuse > (ideally re-using your illustrations from the FLIP). > > I see that option 2 (keep COPY_PER_OPERATOR as default for upgraded > jobs if nothing else is configured) is nice in order to prevent any > surprises for users upgrading from 1.3 to 1.4. But if I understand it > correctly we only postpone the problem to their first 1.4 savepoint + > restore at which point the behaviour would still change, right? If the > answer is yes, I think that this might be more confusing than simply > changing the default (option 1) in the long run. > > – Ufuk > > > On Sun, Jul 2, 2017 at 6:12 PM, Stephan Ewen <se...@apache.org> wrote: > > Thank you for the reply and for the support! > > > > @Greg, controlling object reuse on a per-operator base is definitely a > good > > way to follow up. My first thought would be to keep this proposal slim > and > > deal with the "default" logic, and have a followup effort to make this > > controllable per operator. > > > > @Greg When you mention the "surprises" about object reuse in the DataSet > > API, what cases and behavior do you have in mind there? > > > > Stephan > > > > > > On Wed, Jun 28, 2017 at 2:56 PM, Aljoscha Krettek <aljos...@apache.org> > > wrote: > > > >> +1 for changing the default if so many people encountered problems with > >> serialisation costs. > >> > >> The first two modes don’t require any code changes, correct? Only the > last > >> one would require changes to the stream input processors. > >> > >> We should also keep this issue in mind: https://issues.apache.org/ > >> jira/browse/FLINK-3974 <https://issues.apache.org/ > jira/browse/FLINK-3974> > >> i.e. we always need to make shallow copies of the StreamRecord. > >> > >> Best, > >> Aljoscha > >> > >> > On 27. Jun 2017, at 21:01, Zhenzhong Xu <flyf...@gmail.com> wrote: > >> > > >> > Stephan, > >> > > >> > Fully supporting this FLIP. We originally encountered pretty big > >> surprises observing the object copy behavior causing significant > >> performance degradation for our massively parallel use case. > >> > > >> > In our case, (I think most appropriately SHOULD be the assumptions for > >> all streaming use case), is to assume object immutability for all the > >> records throughout processing pipeline, as a result, I don't see a need > to > >> distinguish different object reuse behaviors for different (chained) > >> operators (or to the very extreme even the need to support > >> COPY_PER_OPERATOR other than we probably have to support for backward > >> compatibility). I am also a fan of failing fast if user asserts > incorrect > >> assumptions. > >> > > >> > One feedback on the FLIP-21 itself, I am not very clear on the > >> difference between DEFAULT and FULL_REUSE enumeration, aren't them > exactly > >> the same thing in new proposal? However, the model figures seem to > indicate > >> they are slightly different? Can you elaborate a bit more? > >> > > >> > Z. > >> > > >> > > >> > On 2017-06-27 11:14 (-0700), Greg Hogan <c...@greghogan.com <mailto: > >> c...@greghogan.com>> wrote: > >> >> Hi Stephan, > >> >> > >> >> Would this be an appropriate time to discuss allowing reuse to be a > >> per-operator configuration? Object reuse for chained operators has lead > to > >> considerable surprise for some users of the DataSet API. This came up > >> during the rework of the object reuse documentation for the DataSet API. > >> With annotations a Function could mark whether input/iterator or > >> output/collected objects should be copied or reused. > >> >> > >> >> My distant observation is that is is safer to locally assert reuse at > >> the operator level than to assume or guarantee the safety of object > reuse > >> across an entire program. It could also be handy to mix operators > receiving > >> copyable objects with operators not requiring copyable objects. > >> >> > >> >> Greg > >> >> > >> >> > >> >>> On Jun 27, 2017, at 1:21 PM, Stephan Ewen <se...@apache.org> wrote: > >> >>> > >> >>> Hi all! > >> >>> > >> >>> I would like to propose the following FLIP: > >> >>> > >> >>> FLIP-21 - Improve object Copying/Reuse Mode for Streaming Runtime: > >> >>> https://cwiki.apache.org/confluence/pages/viewpage. > >> action?pageId=71012982 > >> >>> > >> >>> The FLIP is motivated by the fact that many users run into an > >> unnecessary > >> >>> kind of performance problem caused by an old design artifact. > >> >>> > >> >>> The required change should be reasonably small, and would help many > >> users > >> >>> and Flink's general standing. > >> >>> > >> >>> Happy to hear thoughts! > >> >>> > >> >>> Stephan > >> >>> > >> >>> ====================================== > >> >>> > >> >>> FLIP text is below. Pictures with illustrations are only in the > Wiki, > >> not > >> >>> supported on the mailing list. > >> >>> ------------------------------------------------------------ > >> ------------------------------------- > >> >>> > >> >>> Motivation > >> >>> > >> >>> The default behavior of the streaming runtime is to copy every > element > >> >>> between chained operators. > >> >>> > >> >>> That operation was introduced for “safety†reasons, to avoid the > >> number of > >> >>> cases where users can create incorrect programs by reusing mutable > >> objects > >> >>> (a discouraged pattern, but possible). For example when using state > >> >>> backends that keep the state as objects on heap, reusing mutable > >> objects > >> >>> can theoretically create cases where the same object is used in > >> multiple > >> >>> state mappings. > >> >>> > >> >>> The effect is that many people that try Flink get much lower > >> performance > >> >>> than they could possibly get. From empirical evidence, almost all > users > >> >>> that I (Stephan) have been in touch with eventually run into this > issue > >> >>> eventually. > >> >>> > >> >>> There are multiple observations about that design: > >> >>> > >> >>> > >> >>> - > >> >>> > >> >>> Object copies are extremely costly. While some simple copy > virtually > >> for > >> >>> free (types reliably detected as immutable are not copied at all), > >> many > >> >>> real pipelines use types like Avro, Thrift, JSON, etc, which are > very > >> >>> expensive to copy. > >> >>> > >> >>> > >> >>> > >> >>> - > >> >>> > >> >>> Keyed operations currently only occur after shuffles. The > operations > >> are > >> >>> hence the first in a pipeline and will never have a reused object > >> anyways. > >> >>> That means for the most critical operation, this pre-caution is > >> unnecessary. > >> >>> > >> >>> > >> >>> > >> >>> - > >> >>> > >> >>> The mode is inconsistent with the contract of the DataSet API, > which > >> >>> does not copy at each step > >> >>> > >> >>> > >> >>> > >> >>> - > >> >>> > >> >>> To prevent these copies, users can select {{enableObjectReuse()}}, > >> which > >> >>> is misleading, since it does not really reuse mutable objects, but > >> avoids > >> >>> additional copies. > >> >>> > >> >>> > >> >>> Proposal > >> >>> > >> >>> Summary > >> >>> > >> >>> I propose to change the default behavior of the DataStream runtime > to > >> be > >> >>> the same as the DataSet runtime. That means that new objects are > >> chosen on > >> >>> every deserialization, and no copies are made as the objects are > >> passed on > >> >>> along the pipelines. > >> >>> > >> >>> Details > >> >>> > >> >>> I propose to drop the execution config flag {{objectReuse}} and > instead > >> >>> introduce an {{ObjectReuseMode}} enumeration with better control of > >> what > >> >>> should happen. There will be three different types: > >> >>> > >> >>> > >> >>> - > >> >>> > >> >>> DEFAULT > >> >>> - > >> >>> > >> >>> This is the default in the DataSet API > >> >>> - > >> >>> > >> >>> This will become the default in the DataStream API > >> >>> - > >> >>> > >> >>> This happens in the DataStream API when {{enableObjectReuse()}} > is > >> >>> activated. > >> >>> > >> >>> > >> >>> > >> >>> - > >> >>> > >> >>> COPY_PER_OPERATOR > >> >>> - > >> >>> > >> >>> The current default in the DataStream API > >> >>> > >> >>> > >> >>> > >> >>> - > >> >>> > >> >>> FULL_REUSE > >> >>> - > >> >>> > >> >>> This happens in the DataSet API when {{enableObjectReuse()}} is > >> >>> chosen. > >> >>> > >> >>> > >> >>> An illustration of the modes is as follows: > >> >>> > >> >>> DEFAULT > >> >>> > >> >>> > >> >>> See here: > >> >>> https://cwiki.apache.org/confluence/pages/viewpage. > >> action?pageId=71012982&preview=/https%3A%2F%2Flh5.googleusercontent.com > % > >> 2F1UOpVB2wSMhx8067IE9t2_mJG549IoOkDiAfIN_uXQZVUvAXCp-hQLY-mgoSWunwF- > >> xciZuJ4pZpj1FX0ZPQrd-Fm1jWzgX3Hv7-SELUdPUvEN6XUPbLrwfA9YRl605bFKMYlf1r > < > >> https://cwiki.apache.org/confluence/pages/viewpage. > action?pageId=71012982& > >> preview=/https%3A%2F%2Flh5.googleusercontent.com% > 2F1UOpVB2wSMhx8067IE9t2_ > >> mJG549IoOkDiAfIN_uXQZVUvAXCp-hQLY-mgoSWunwF-xciZuJ4pZpj1FX0ZPQrd- > >> Fm1jWzgX3Hv7-SELUdPUvEN6XUPbLrwfA9YRl605bFKMYlf1r> > >> >>> > >> >>> COPY_PER_OPERATOR > >> >>> > >> >>> > >> >>> See here: > >> >>> https://cwiki.apache.org/confluence/pages/viewpage. > >> action?pageId=71012982&preview=/https%3A%2F%2Flh3.googleusercontent.com > % > >> 2Fs5sBOktzaKrRw3v1-IQMgImYZfchQMVz2HiG3i050xCWNTK > >> uQV6mmlv3QtR0TZ0SGPRSCyjI-sUAqfbJw4fGOxKqBuRX2f- > >> iZGh0e7hBke7DzuApUNy1vaF2SgtQVH3XEXkRx8Ks <https://cwiki.apache.org/ > >> confluence/pages/viewpage.action?pageId=71012982& > >> preview=/https%3A%2F%2Flh3.googleusercontent.com%2Fs5sBOktzaKrRw3v1- > >> IQMgImYZfchQMVz2HiG3i050xCWNTKuQV6mmlv3QtR0TZ0SGPRSCyjI- > >> sUAqfbJw4fGOxKqBuRX2f-iZGh0e7hBke7DzuApUNy1vaF2SgtQVH3XEXkRx8Ks> > >> >>> > >> >>> > >> >>> FULL_REUSE > >> >>> > >> >>> > >> >>> See here: > >> >>> https://cwiki.apache.org/confluence/pages/viewpage. > >> action?pageId=71012982&preview=/https%3A%2F%2Flh5.googleusercontent.com > % > >> 2FFdOzuuaioooEIOh7bo0gZ5dHZrlgEKiwtNjGE9DjR- > fT20B0q7FGDAvAk5oh1h58WtNQktuF > >> GinrV1q1Yq8H8ayCyyqFUq-gmAYYW91x4XZQNrjLc6eJ0cptzvN_r8cU_GVV7LNE < > >> https://cwiki.apache.org/confluence/pages/viewpage. > action?pageId=71012982& > >> preview=/https%3A%2F%2Flh5.googleusercontent.com% > >> 2FFdOzuuaioooEIOh7bo0gZ5dHZrlgEKiwtNjGE9DjR- > fT20B0q7FGDAvAk5oh1h58WtNQktuF > >> GinrV1q1Yq8H8ayCyyqFUq-gmAYYW91x4XZQNrjLc6eJ0cptzvN_r8cU_GVV7LNE> > >> >>> New or Changed Public Interfaces > >> >>> > >> >>> Interfaces changed > >> >>> > >> >>> The interface of the {{ExecutionConfig}} add the method > >> >>> {{setObjectReuseMode(ObjectReuseMode)}}, and deprecates the methods > >> >>> {{enableObjectReuse()}} and {{disableObjectReuse()}}. > >> >>> > >> >>> > >> >>> Behavior changed > >> >>> > >> >>> The default object passing behavior changes, meaning that it can > >> affect the > >> >>> correctness of prior DataStream programs that assume the original > >> >>> “COPY_PER_OPERATOR†behavior. > >> >>> > >> >>> Migration Plan and Compatibility > >> >>> > >> >>> Interfaces > >> >>> > >> >>> No interface migration path is needed, because the interfaces are > not > >> >>> broken, merely some methods get deprecated. > >> >>> > >> >>> Behavior change > >> >>> > >> >>> Variant 1: > >> >>> > >> >>> - > >> >>> > >> >>> Change the behavior, make it explicit on the release notes that we > did > >> >>> that and what cases are affected. > >> >>> - > >> >>> > >> >>> This may actually be feasible, because the cases that are affected > are > >> >>> quite pathological corner cases that only very bad implementations > >> should > >> >>> encounter (see below) > >> >>> > >> >>> > >> >>> Variant 2: > >> >>> > >> >>> - > >> >>> > >> >>> When users set the mode, always that mode is used. > >> >>> - > >> >>> > >> >>> When the mode is not explicitly set, we follow that strategy: > >> >>> - > >> >>> > >> >>> Change the CLI such that we know when users upgrade existing > jobs > >> >>> (the savepoint to start from has a version prior to 1.4). > >> >>> - > >> >>> > >> >>> Use DEFAULT as the default for jobs that do not start from > >> savepoint, > >> >>> or that start from savepoint >= 1.4 > >> >>> - > >> >>> > >> >>> Use COPY_PER_OPERATOR as the default for upgraded jobs > >> > >> >