Thanks for the quick replies.

Overall, it seems that the main concern with this FLIP is that the 2%
> throughput saving might not be worth the added implementation complexity.
>

Yes, my main concern is that the performance improvement may not be worth
the added complexity.

And I'd like to point out that, even the number 2% was achieved with
relative record size. Records in WordCount consist of a string token and an
integer count. While I don't know the exact average length of the tokens,
I'd expect the record size to be ~20B, which from my experiences is much
smaller than most real workloads. Since you are arguing that this
improvement is important for batch workloads, have you tried it with the
widely-used TPD-DS benchmark? I doubt whether the differences can be
observed with that benchmark.

And assuming there's a 2% performance improvement for all batch scenarios
(which I don't think so), its value still depends on the development phase
of the project. Setting a long-term goal to have the best possible
performance is nice, but that doesn't mean we should squash every possible
piece of room for performance improvement immediately. IMHO, we should
first have reasonable performance, as well as a reasonable feature set,
usability, stability, etc, which is likely harder to achieve if we are not
cautious with changes that increase complexity.

I agree that my concerns (1-4) can be addressed one way or another, but
with price.

All in all, I think this is about whether we think the benefit is worth the
price. Admittedly, this is hard to quantify, and there's probably no
correct or wrong. It's rather a technical choice that depends on individual
judgements. Maybe we should also hear how others think about it.

## Concerning StreamElement

I think my major point here is that this improvement relies on the
following assumption, which is implicit and not guaranteed to hold.

Assumption: In scenarios where we don't emit timestamp for records,
StreamRecord without timestamp is the only possible sub-type of
StreamElement

We should always be careful with adding new assumptions, because once added
they are hard to take back and may cause problems for future developments.
For this specific assumption, it doesn't come from any contract. It just
happens to be true with the current implementation.

Best,

Xintong



On Fri, Aug 11, 2023 at 10:43 AM Dong Lin <lindon...@gmail.com> wrote:

> Hi Xintong,
>
> Thanks for the detailed explanation of your concern. Let me chime in and
> provide my thoughts on this issue.
>
> Please see my comments for each point inline.
>
> Overall, it seems that the main concern with this FLIP is that the 2%
> throughput saving might not be worth the added implementation complexity.
>
> IMO, it really depends on what is Flink's *long-term* performance goal in
> the batch-only scenario. Do we just need Flink to perform reasonably well
> (maybe with 80-90% throughput of that of Spark) for batch, or do we expect
> Flink to have best possible performance with minimal possible cost?
>
> If it is the former goal, then I agree it is debatable whether this FLIP is
> worthwhile. And I would be OK if we decide to postpone this FLIP
> indefinitely. However, we (the whole Flink developer) needs to be aware
> that this effectively tells users that: You should only use Flink to run a
> batch-only job if your job needs to be run in stream mode, and Flink does
> not intended to be an optimal engine for batch computation.
>
> If it is the latter goal, the implementation complexity involved in this
> FLIP will be an inevitable one that we should take rather than avoid. Also
> note that it is also better to make infra change sooner than later. I would
> say the resulting code (associated with this complexity) is also a valuable
> asset of Flink (or any other engine) that intends to be stream-batch
> unified.
>
> IMO, we should aim for the 2nd goal and give users (and developers) the
> idea that it is a good investment to migrate their existing batch-only jobs
> (and related infra) to Flink because Flink should be able to achieve the
> best performance for batch-only jobs in the long term.
>
> What do you think?
>
> Best,
> Dong
>
>
> On Thu, Aug 10, 2023 at 9:16 PM Xintong Song <tonysong...@gmail.com>
> wrote:
>
> > Hi Yunfeng,
> >
> > Thanks for preparing this FLIP. I'm respectful for the efforts you
> already
> > put into the PoC implementation and benchmarks. However, I have to say
> I'm
> > quite concerned about this proposal.
> >
> > 1. The FLIP is based on the assumption that in non-timestamp scenarios
> > StreamRecord is the only possible sub-type of StreamElement. This
> > assumption is true with the current sub-types, but is not by definition,
> > and can be broken if we want to introduce more sub-types in future. Or
> put
> > it differently, introducing this optimization would limit us from
> > flexibility extending StreamElement in future, because that may invalid
> > this optimization and thus introduce regressions.
> >
>
> The goal of this FLIP is to avoid the unnecessary per-record overhead (e.g.
> related to timestamp) that is currently involved to process StreamRecord in
> scenarios where StreamRecord does not need timestamp (which is known at job
> compile time). I suppose we both agree that this can be achieved as of the
> current Flink implementation.
>
> In the future, if we have more use-case that requires addition of new
> sub-types of StreamElement, I believe we should still be able to address
> such use-case without breaking the optimization introduced in this FLIP.
> The intuition is that if the use-case knows for sure there is only one type
> of StreamRecord, then the job should **always** be able to hardcode this
> logic at compile time instead of checking the record type on a per-record
> basis at runtime.
>
> Also know that we might not have to add new subtypes of StreamRecord in
> order to achieve the future use-case. It seems like an implementation
> detail that should be decided based on the concrete use-case.
>
> If this is not the case, can you please provide a concrete use-case (can be
> a fabricated one) where this optimization can not be achieved? This can
> help use evaluate whether this concern is realistic.
>
>
> >
> > 2. Changing LatencyMarker into a RuntimeEvent is problematic. The purpose
> > of LatencyMarker is to measure the end-to-end latency of a record
> traveling
> > through the data flow. RuntimeEvents are consumed with higher priority
> than
> > StreamRecord in the shuffle layer, which may introduce bias on the
> > measurement result.
> >
>
> Thank you for mentioning this. I think we can update the proposed change
> section (and the Flink implementation) so that LatencyMarker will be
> consumed with the same priority as the StreamRecord. Would that address you
> concern (apart of the possible implementation complexity)?
>
>
> >
> > 3. The proposed configuration option is, TBH, hard for users to
> understand.
> > It requires in-depth understanding of the Flink internals. But this is
> > probably not a big problem, if the plan is to eventually make this
> feature
> > default and remove the configuration option.
> >
>
> I guess we both agree that it won't be a problem in the long term since we
> should be able to remove this config with new operator APIs (e.g. we can
> tighten the operator API, or add operator attribute to specify whether it
> needs timestamp).
>
> In the short term, this also won't be a problem since optimization is
> turned off by default and users won't need to even know this config.
> Advanced users can take advantage of this config to optimize throughput of
> their jobs.
>
>
> > 4. It complicates the system by having multiple code paths for
> > StreamElement (de)serialization, and the logic for deciding which path to
> > be used. Admittedly, this is not super expensive, but still worth
> > comparison with the benefit.
> >
>
> Yes, I agree it can add complexity to the Flink implementation. This is
> probably an inevitable complexity if we want Flink to achieve optimal
> performance for the target scenario where computation does not need
> timestamp.
>
> And yes, the impact of this optimization can be smaller as the per-record
> size increases. On the other hand, its impact can also be larger as the
> per-record size decreases. Given that word-count is kind of typical and we
> don't know the concrete average per-record size that a user's job might
> have, it seems better to just do this optimization for peace-of-mind, as
> supposed to tell users that "Your Flink job performance can be considerably
> optimal if your job's average record size is too small".
>
>
> > 5. The benefit is limited.
> >   a. It only benefits non-timestamp scenarios
> >   b. For the applicable scenarios, it only saves the (de)serialization /
> > transmission cost of 1 byte / record. The larger the record size, the
> less
> > benefit.
> >   c. As Jark already pointed out, the 20% improvement is achieved with a
> > simple Boolean type record. WordCount (where the record size is just 1
> > string token + 1 integer, typically 20B or so?) shows only 2-3%
> throughput
> > improvement. Not even mention the workloads with larger record size or
> > higher computation load.
> >
>
> I agree 2% saving it not significant. However, it is already observable and
> the absolute saving can be significant for users with large scale Flink
> deployment.
>
> Given that Flink is intended for large scale deployment (particular as
> Flink is trying to be stream-batch unified), I am not sure we should give
> up 2% throughput optimization (at the cost of Flink users) so as to keep
> Flink implementation simpler.
>
>
> > Given the above, I personally don't think the benefit of this proposal is
> > worth the cost.
> >
> > Best,
> >
> > Xintong
> >
> >
> >
> > On Mon, Aug 7, 2023 at 4:24 PM Yunfeng Zhou <flink.zhouyunf...@gmail.com
> >
> > wrote:
> >
> > > Hi Matt,
> > >
> > > Thanks for letting me learn about usages of latency markers in
> > > production scenarios. I have updated the POC code and the FLIP's
> > > description, and now that the optimization's requirement does not
> > > include disabling latency-tracking.  The main idea is to treat
> > > LatencyMarker as a RuntimeEvent during network communication, which
> > > means it will be serialized by EventSerializer instead of
> > > StreamElementSerializer, so the tags of StreamElements can be removed
> > > regardless of latency tracking. Please refer to the FLIP and poc code
> > > for more details. Hope this could resolve your concerns.
> > >
> > > Best,
> > > Yunfeng
> > >
> > > On Mon, Jul 17, 2023 at 2:13 PM Matt Wang <wang...@163.com> wrote:
> > > >
> > > > Hi Yunfeng,
> > > >
> > > >
> > > > Thank you for testing 1 again, and look forward to the performance
> > > results of TPC-DS later.
> > > >
> > > > We use a latency marker to monitor the end-to-end latency of flink
> > jobs.
> > > If the latencyTrackingInterval is set too small(like 5ms), it will
> have a
> > > large impact on performance. But if the latencyTrackingInterval is
> > > configured to be relatively large, such as 10s, this impact can be
> > ignored.
> > > >
> > > >
> > > >
> > > > --
> > > >
> > > > Best,
> > > > Matt Wang
> > > >
> > > >
> > > > ---- Replied Message ----
> > > > | From | Yunfeng Zhou<flink.zhouyunf...@gmail.com> |
> > > > | Date | 07/14/2023 20:30 |
> > > > | To | <dev@flink.apache.org> |
> > > > | Subject | Re: [DISCUSS] FLIP-330: Support specifying record
> timestamp
> > > requirement |
> > > > Hi Matt,
> > > >
> > > > 1. I tried to add back the tag serialization process back to my POC
> > > > code and run the benchmark again, this time the performance
> > > > improvement is roughly reduced by half. It seems that both the
> > > > serialization and the judgement process have a major contribution to
> > > > the overhead reduction in the specific scenario, but in a production
> > > > environment where distributed cluster is deployed, I believe the
> > > > reduction in serialization would be a more major reason for
> > > > performance improvement.
> > > >
> > > > 2. According to the latency-tracking section in Flink document[1], it
> > > > seems that users would only enable latency markers for debugging
> > > > purposes, instead of using it in production code. Could you please
> > > > illustrate a bit more about the scenarios that would be limited when
> > > > latency markers are disabled?
> > > >
> > > > 3. I plan to benchmark the performance of this POC against TPC-DS and
> > > > hope that it could cover the common use cases that you are concerned
> > > > about. I believe there would still be performance improvement when
> the
> > > > size of each StreamRecord increases, though the improvement will not
> > > > be as obvious as that currently in FLIP.
> > > >
> > > > Best regards,
> > > > Yunfeng
> > > >
> > > > [1]
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/#end-to-end-latency-tracking
> > > >
> > > > On Thu, Jul 13, 2023 at 5:51 PM Matt Wang <wang...@163.com> wrote:
> > > >
> > > > Hi Yunfeng,
> > > >
> > > > Thanks for the proposal. The POC showed a performance improvement of
> > > 20%, which is very exciting. But I have some questions:
> > > > 1. Is the performance improvement here mainly due to the reduction of
> > > serialization, or is it due to the judgment consumption caused by tags?
> > > > 2. Watermark is not needed in some scenarios, but the latency maker
> is
> > a
> > > useful function. If the latency maker cannot be used, it will greatly
> > limit
> > > the usage scenarios. Whether the solution design can retain the
> > capability
> > > of the latency marker;
> > > > 3. The data of the POC test is of long type. Here I want to see how
> > much
> > > profit it will have if it is a string with a length of 100B or 1KB.
> > > >
> > > >
> > > > --
> > > >
> > > > Best,
> > > > Matt Wang
> > > >
> > > >
> > > > ---- Replied Message ----
> > > > | From | Yunfeng Zhou<flink.zhouyunf...@gmail.com> |
> > > > | Date | 07/13/2023 14:52 |
> > > > | To | <dev@flink.apache.org> |
> > > > | Subject | Re: [DISCUSS] FLIP-330: Support specifying record
> timestamp
> > > requirement |
> > > > Hi Jing,
> > > >
> > > > Thanks for reviewing this FLIP.
> > > >
> > > > 1. I did change the names of some APIs in the FLIP compared with the
> > > > original version according to which I implemented the POC. As the
> core
> > > > optimization logic remains the same and the POC's performance can
> > > > still reflect the current FLIP's expected improvement, I have not
> > > > updated the POC code after that. I'll add a note on the benchmark
> > > > section of the FLIP saying that the namings in the POC code might be
> > > > outdated, and FLIP is still the source of truth for our proposed
> > > > design.
> > > >
> > > > 2. This FLIP could bring a fixed reduction on the workload of the
> > > > per-record serialization path in Flink, so if the absolute time cost
> > > > by non-optimized components could be lower, the performance
> > > > improvement of this FLIP would be more obvious. That's why I chose to
> > > > enable object-reuse and to transmit Boolean values in serialization.
> > > > If it would be more widely regarded as acceptable for a benchmark to
> > > > adopt more commonly-applied behavior(for object reuse, I believe
> > > > disable is more common), I would be glad to update the benchmark
> > > > result to disable object reuse.
> > > >
> > > > Best regards,
> > > > Yunfeng
> > > >
> > > >
> > > > On Thu, Jul 13, 2023 at 6:37 AM Jing Ge <j...@ververica.com.invalid>
> > > wrote:
> > > >
> > > > Hi Yunfeng,
> > > >
> > > > Thanks for the proposal. It makes sense to offer the optimization. I
> > got
> > > > some NIT questions.
> > > >
> > > > 1. I guess you changed your thoughts while coding the POC, I found
> > > > pipeline.enable-operator-timestamp in the code but  is
> > > > pipeline.force-timestamp-support defined in the FLIP
> > > > 2. about the benchmark example, why did you enable object reuse?
> Since
> > It
> > > > is an optimization of serde, will the benchmark be better if it is
> > > > disabled?
> > > >
> > > > Best regards,
> > > > Jing
> > > >
> > > > On Mon, Jul 10, 2023 at 11:54 AM Yunfeng Zhou <
> > > flink.zhouyunf...@gmail.com>
> > > > wrote:
> > > >
> > > > Hi all,
> > > >
> > > > Dong(cc'ed) and I are opening this thread to discuss our proposal to
> > > > support optimizing StreamRecord's serialization performance.
> > > >
> > > > Currently, a StreamRecord would be converted into a 1-byte tag (+
> > > > 8-byte timestamp) + N-byte serialized value during the serialization
> > > > process. In scenarios where timestamps and watermarks are not needed,
> > > > and latency tracking is enabled, this process would include
> > > > unnecessary information in the serialized byte array. This FLIP aims
> > > > to avoid such overhead and increases Flink job's performance during
> > > > serialization.
> > > >
> > > > Please refer to the FLIP document for more details about the proposed
> > > > design and implementation. We welcome any feedback and opinions on
> > > > this proposal.
> > > >
> > > > Best regards, Dong and Yunfeng
> > > >
> > > > [1]
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-330%3A+Support+specifying+record+timestamp+requirement
> > > >
> > >
> >
>

Reply via email to