Thanks for the feedback. Will leave this open for some more days, and adopt
it as a FLIP, taking Greg's and Aljoscha's comments into account.

On Sun, Jul 2, 2017 at 10:13 PM, Ufuk Celebi <u...@apache.org> wrote:

> Thanks for the write up and illustrations. :-) +1 to do this.
>
> I'm fine with both proposed "changed behaviour" variants, but lean
> towards option 1: change the default, make the change explicit in the
> release notes and add a good docs page about configuring object reuse
> (ideally re-using your illustrations from the FLIP).
>
> I see that option 2 (keep COPY_PER_OPERATOR as default for upgraded
> jobs if nothing else is configured) is nice in order to prevent any
> surprises for users upgrading from 1.3 to 1.4. But if I understand it
> correctly we only postpone the problem to their first 1.4 savepoint +
> restore at which point the behaviour would still change, right? If the
> answer is yes, I think that this might be more confusing than simply
> changing the default (option 1) in the long run.
>
> – Ufuk
>
>
> On Sun, Jul 2, 2017 at 6:12 PM, Stephan Ewen <se...@apache.org> wrote:
> > Thank you for the reply and for the support!
> >
> > @Greg, controlling object reuse on a per-operator base is definitely a
> good
> > way to follow up. My first thought would be to keep this proposal slim
> and
> > deal with the "default" logic, and have a followup effort to make this
> > controllable per operator.
> >
> > @Greg When you mention the "surprises" about object reuse in the DataSet
> > API, what cases and behavior do you have in mind there?
> >
> > Stephan
> >
> >
> > On Wed, Jun 28, 2017 at 2:56 PM, Aljoscha Krettek <aljos...@apache.org>
> > wrote:
> >
> >> +1 for changing the default if so many people encountered problems with
> >> serialisation costs.
> >>
> >> The first two modes don’t require any code changes, correct? Only the
> last
> >> one would require changes to the stream input processors.
> >>
> >> We should also keep this issue in mind: https://issues.apache.org/
> >> jira/browse/FLINK-3974 <https://issues.apache.org/
> jira/browse/FLINK-3974>
> >> i.e. we always need to make shallow copies of the StreamRecord.
> >>
> >> Best,
> >> Aljoscha
> >>
> >> > On 27. Jun 2017, at 21:01, Zhenzhong Xu <flyf...@gmail.com> wrote:
> >> >
> >> > Stephan,
> >> >
> >> > Fully supporting this FLIP. We originally encountered pretty big
> >> surprises observing the object copy behavior causing significant
> >> performance degradation for our massively parallel use case.
> >> >
> >> > In our case, (I think most appropriately SHOULD be the assumptions for
> >> all streaming use case), is to assume object immutability for all the
> >> records throughout processing pipeline, as a result, I don't see a need
> to
> >> distinguish different object reuse behaviors for different (chained)
> >> operators (or to the very extreme even the need to support
> >> COPY_PER_OPERATOR other than we probably have to support for backward
> >> compatibility). I am also a fan of failing fast if user asserts
> incorrect
> >> assumptions.
> >> >
> >> > One feedback on the FLIP-21 itself, I am not very clear on the
> >> difference between DEFAULT and FULL_REUSE enumeration, aren't them
> exactly
> >> the same thing in new proposal? However, the model figures seem to
> indicate
> >> they are slightly different? Can you elaborate a bit more?
> >> >
> >> > Z.
> >> >
> >> >
> >> > On 2017-06-27 11:14 (-0700), Greg Hogan <c...@greghogan.com <mailto:
> >> c...@greghogan.com>> wrote:
> >> >> Hi Stephan,
> >> >>
> >> >> Would this be an appropriate time to discuss allowing reuse to be a
> >> per-operator configuration? Object reuse for chained operators has lead
> to
> >> considerable surprise for some users of the DataSet API. This came up
> >> during the rework of the object reuse documentation for the DataSet API.
> >> With annotations a Function could mark whether input/iterator or
> >> output/collected objects should be copied or reused.
> >> >>
> >> >> My distant observation is that is is safer to locally assert reuse at
> >> the operator level than to assume or guarantee the safety of object
> reuse
> >> across an entire program. It could also be handy to mix operators
> receiving
> >> copyable objects with operators not requiring copyable objects.
> >> >>
> >> >> Greg
> >> >>
> >> >>
> >> >>> On Jun 27, 2017, at 1:21 PM, Stephan Ewen <se...@apache.org> wrote:
> >> >>>
> >> >>> Hi all!
> >> >>>
> >> >>> I would like to propose the following FLIP:
> >> >>>
> >> >>> FLIP-21 - Improve object Copying/Reuse Mode for Streaming Runtime:
> >> >>> https://cwiki.apache.org/confluence/pages/viewpage.
> >> action?pageId=71012982
> >> >>>
> >> >>> The FLIP is motivated by the fact that many users run into an
> >> unnecessary
> >> >>> kind of performance problem caused by an old design artifact.
> >> >>>
> >> >>> The required change should be reasonably small, and would help many
> >> users
> >> >>> and Flink's general standing.
> >> >>>
> >> >>> Happy to hear thoughts!
> >> >>>
> >> >>> Stephan
> >> >>>
> >> >>> ======================================
> >> >>>
> >> >>> FLIP text is below. Pictures with illustrations are only in the
> Wiki,
> >> not
> >> >>> supported on the mailing list.
> >> >>> ------------------------------------------------------------
> >> -------------------------------------
> >> >>>
> >> >>> Motivation
> >> >>>
> >> >>> The default behavior of the streaming runtime is to copy every
> element
> >> >>> between chained operators.
> >> >>>
> >> >>> That operation was introduced for “safety† reasons, to avoid the
> >> number of
> >> >>> cases where users can create incorrect programs by reusing mutable
> >> objects
> >> >>> (a discouraged pattern, but possible). For example when using state
> >> >>> backends that keep the state as objects on heap, reusing mutable
> >> objects
> >> >>> can theoretically create cases where the same object is used in
> >> multiple
> >> >>> state mappings.
> >> >>>
> >> >>> The effect is that many people that try Flink get much lower
> >> performance
> >> >>> than they could possibly get. From empirical evidence, almost all
> users
> >> >>> that I (Stephan) have been in touch with eventually run into this
> issue
> >> >>> eventually.
> >> >>>
> >> >>> There are multiple observations about that design:
> >> >>>
> >> >>>
> >> >>>  -
> >> >>>
> >> >>>  Object copies are extremely costly. While some simple copy
> virtually
> >> for
> >> >>>  free (types reliably detected as immutable are not copied at all),
> >> many
> >> >>>  real pipelines use types like Avro, Thrift, JSON, etc, which are
> very
> >> >>>  expensive to copy.
> >> >>>
> >> >>>
> >> >>>
> >> >>>  -
> >> >>>
> >> >>>  Keyed operations currently only occur after shuffles. The
> operations
> >> are
> >> >>>  hence the first in a pipeline and will never have a reused object
> >> anyways.
> >> >>>  That means for the most critical operation, this pre-caution is
> >> unnecessary.
> >> >>>
> >> >>>
> >> >>>
> >> >>>  -
> >> >>>
> >> >>>  The mode is inconsistent with the contract of the DataSet API,
> which
> >> >>>  does not copy at each step
> >> >>>
> >> >>>
> >> >>>
> >> >>>  -
> >> >>>
> >> >>>  To prevent these copies, users can select {{enableObjectReuse()}},
> >> which
> >> >>>  is misleading, since it does not really reuse mutable objects, but
> >> avoids
> >> >>>  additional copies.
> >> >>>
> >> >>>
> >> >>> Proposal
> >> >>>
> >> >>> Summary
> >> >>>
> >> >>> I propose to change the default behavior of the DataStream runtime
> to
> >> be
> >> >>> the same as the DataSet runtime. That means that new objects are
> >> chosen on
> >> >>> every deserialization, and no copies are made as the objects are
> >> passed on
> >> >>> along the pipelines.
> >> >>>
> >> >>> Details
> >> >>>
> >> >>> I propose to drop the execution config flag {{objectReuse}} and
> instead
> >> >>> introduce an {{ObjectReuseMode}} enumeration with better control of
> >> what
> >> >>> should happen. There will be three different types:
> >> >>>
> >> >>>
> >> >>>  -
> >> >>>
> >> >>>  DEFAULT
> >> >>>  -
> >> >>>
> >> >>>     This is the default in the DataSet API
> >> >>>     -
> >> >>>
> >> >>>     This will become the default in the DataStream API
> >> >>>     -
> >> >>>
> >> >>>     This happens in the DataStream API when {{enableObjectReuse()}}
> is
> >> >>>     activated.
> >> >>>
> >> >>>
> >> >>>
> >> >>>  -
> >> >>>
> >> >>>  COPY_PER_OPERATOR
> >> >>>  -
> >> >>>
> >> >>>     The current default in the DataStream API
> >> >>>
> >> >>>
> >> >>>
> >> >>>  -
> >> >>>
> >> >>>  FULL_REUSE
> >> >>>  -
> >> >>>
> >> >>>     This happens in the DataSet API when {{enableObjectReuse()}} is
> >> >>>     chosen.
> >> >>>
> >> >>>
> >> >>> An illustration of the modes is as follows:
> >> >>>
> >> >>> DEFAULT
> >> >>>
> >> >>>
> >> >>> See here:
> >> >>> https://cwiki.apache.org/confluence/pages/viewpage.
> >> action?pageId=71012982&preview=/https%3A%2F%2Flh5.googleusercontent.com
> %
> >> 2F1UOpVB2wSMhx8067IE9t2_mJG549IoOkDiAfIN_uXQZVUvAXCp-hQLY-mgoSWunwF-
> >> xciZuJ4pZpj1FX0ZPQrd-Fm1jWzgX3Hv7-SELUdPUvEN6XUPbLrwfA9YRl605bFKMYlf1r
> <
> >> https://cwiki.apache.org/confluence/pages/viewpage.
> action?pageId=71012982&
> >> preview=/https%3A%2F%2Flh5.googleusercontent.com%
> 2F1UOpVB2wSMhx8067IE9t2_
> >> mJG549IoOkDiAfIN_uXQZVUvAXCp-hQLY-mgoSWunwF-xciZuJ4pZpj1FX0ZPQrd-
> >> Fm1jWzgX3Hv7-SELUdPUvEN6XUPbLrwfA9YRl605bFKMYlf1r>
> >> >>>
> >> >>> COPY_PER_OPERATOR
> >> >>>
> >> >>>
> >> >>> See here:
> >> >>> https://cwiki.apache.org/confluence/pages/viewpage.
> >> action?pageId=71012982&preview=/https%3A%2F%2Flh3.googleusercontent.com
> %
> >> 2Fs5sBOktzaKrRw3v1-IQMgImYZfchQMVz2HiG3i050xCWNTK
> >> uQV6mmlv3QtR0TZ0SGPRSCyjI-sUAqfbJw4fGOxKqBuRX2f-
> >> iZGh0e7hBke7DzuApUNy1vaF2SgtQVH3XEXkRx8Ks <https://cwiki.apache.org/
> >> confluence/pages/viewpage.action?pageId=71012982&
> >> preview=/https%3A%2F%2Flh3.googleusercontent.com%2Fs5sBOktzaKrRw3v1-
> >> IQMgImYZfchQMVz2HiG3i050xCWNTKuQV6mmlv3QtR0TZ0SGPRSCyjI-
> >> sUAqfbJw4fGOxKqBuRX2f-iZGh0e7hBke7DzuApUNy1vaF2SgtQVH3XEXkRx8Ks>
> >> >>>
> >> >>>
> >> >>> FULL_REUSE
> >> >>>
> >> >>>
> >> >>> See here:
> >> >>> https://cwiki.apache.org/confluence/pages/viewpage.
> >> action?pageId=71012982&preview=/https%3A%2F%2Flh5.googleusercontent.com
> %
> >> 2FFdOzuuaioooEIOh7bo0gZ5dHZrlgEKiwtNjGE9DjR-
> fT20B0q7FGDAvAk5oh1h58WtNQktuF
> >> GinrV1q1Yq8H8ayCyyqFUq-gmAYYW91x4XZQNrjLc6eJ0cptzvN_r8cU_GVV7LNE <
> >> https://cwiki.apache.org/confluence/pages/viewpage.
> action?pageId=71012982&
> >> preview=/https%3A%2F%2Flh5.googleusercontent.com%
> >> 2FFdOzuuaioooEIOh7bo0gZ5dHZrlgEKiwtNjGE9DjR-
> fT20B0q7FGDAvAk5oh1h58WtNQktuF
> >> GinrV1q1Yq8H8ayCyyqFUq-gmAYYW91x4XZQNrjLc6eJ0cptzvN_r8cU_GVV7LNE>
> >> >>> New or Changed Public Interfaces
> >> >>>
> >> >>> Interfaces changed
> >> >>>
> >> >>> The interface of the {{ExecutionConfig}} add the method
> >> >>> {{setObjectReuseMode(ObjectReuseMode)}}, and deprecates the methods
> >> >>> {{enableObjectReuse()}} and {{disableObjectReuse()}}.
> >> >>>
> >> >>>
> >> >>> Behavior changed
> >> >>>
> >> >>> The default object passing behavior changes, meaning that it can
> >> affect the
> >> >>> correctness of prior DataStream programs that assume the original
> >> >>> “COPY_PER_OPERATOR† behavior.
> >> >>>
> >> >>> Migration Plan and Compatibility
> >> >>>
> >> >>> Interfaces
> >> >>>
> >> >>> No interface migration path is needed, because the interfaces are
> not
> >> >>> broken, merely some methods get deprecated.
> >> >>>
> >> >>> Behavior change
> >> >>>
> >> >>> Variant 1:
> >> >>>
> >> >>>  -
> >> >>>
> >> >>>  Change the behavior, make it explicit on the release notes that we
> did
> >> >>>  that and what cases are affected.
> >> >>>  -
> >> >>>
> >> >>>  This may actually be feasible, because the cases that are affected
> are
> >> >>>  quite pathological corner cases that only very bad implementations
> >> should
> >> >>>  encounter (see below)
> >> >>>
> >> >>>
> >> >>> Variant 2:
> >> >>>
> >> >>>  -
> >> >>>
> >> >>>  When users set the mode, always that mode is used.
> >> >>>  -
> >> >>>
> >> >>>  When the mode is not explicitly set, we follow that strategy:
> >> >>>  -
> >> >>>
> >> >>>     Change the CLI such that we know when users upgrade existing
> jobs
> >> >>>     (the savepoint to start from has a version prior to 1.4).
> >> >>>     -
> >> >>>
> >> >>>     Use DEFAULT as the default for jobs that do not start from
> >> savepoint,
> >> >>>     or that start from savepoint >= 1.4
> >> >>>     -
> >> >>>
> >> >>>     Use COPY_PER_OPERATOR as the default for upgraded jobs
> >>
> >>
>

Reply via email to