Hi Simon,

Good question!

For event time semantics, we reuse the window operator can keep the correct
behavior which is the same as the current window operator. The window
operator will trigger based on the watermark.

About your example, the window of three partitions will trigger normally.
For the delayed partition, it should not trigger if there is no correct
watermark. The behavior is the same as input.keyBy(0).window in event time
semantics.

For processing idle partition scenarios, currently, Flink allows
calling markAsTemporarilyIdle to send StreamStatus.IDLE to the downstream.

Best,
Vino

Shu Su <barley...@163.com> 于2019年6月24日周一 下午9:13写道:

> Hi vino
>
>
> Thanks for proposal.
> For Local Aggregation I have a question about doing this in window
> aggregation. As we know , window aggregation like sliding window should
> based on
> Time trigger, and there may exists a problem in event time if we do local
> aggregation. For example if I want to do a 5s sliding window with count agg:
>
>
> 1. I have input with 4 parallelism and data are firstly randomly pass in 4
> partitions.
> 2. We do LocalAggregation in each of them and we get a partial count
> result.
> 3. Forward partial result to a node with same key then do the final
> aggregation.
>
>
> It seems no problem but what will happen if data skew in event time ? If
> we have a continuous time sequence in 3 of 4 input partitions, for example
> , we have a continuous time sequence in partition 1, 2, 3 but data to
> partition 4 was delay for some reason, and we just get 3 partial result for
> the moment, does final aggregation need to wait for the 4th partial result
> because of data delay ? If so , how long we need to wait for ? If not, does
> it mean that
> The final aggregation will wait forever ?
>
>
> Thanks,
> Simon
>
>
> On 06/18/2019 10:06,vino yang<yanghua1...@gmail.com> wrote:
> Hi Jark,
>
> We have done a comparative test. The effect is obvious.
>
> From our observation, the optimized effect mainly depends on two factors:
>
>
> - the degree of the skew: this factor depends on users business ;
> - the size of the window: localKeyBy support all the type of window
> which provided by Flink. Obviously, the larger the size of the window, the
> more obvious the effect.
>
> In production, we can not decide the first factor. About the second factor,
> it's the result of a trade-off. The size of the window affects the latency
> of the pre-aggregation. That's to say:
>
>
> - the larger the size of the window, the more obvious the effect;
> - the larger the size of the window, the larger latency of the result
>
> Best,
> Vino
>
> Jark Wu <imj...@gmail.com> 于2019年6月17日周一 下午7:32写道:
>
> Hi Vino,
>
> Thanks for the proposal.
>
> Regarding to the "input.keyBy(0).sum(1)" vs
> "input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1)", have you done
> some benchmark?
> Because I'm curious about how much performance improvement can we get by
> using count window as the local operator.
>
> Best,
> Jark
>
>
>
> On Mon, 17 Jun 2019 at 17:48, vino yang <yanghua1...@gmail.com> wrote:
>
> Hi Hequn,
>
> Thanks for your reply.
>
> The purpose of localKeyBy API is to provide a tool which can let users do
> pre-aggregation in the local. The behavior of the pre-aggregation is
> similar to keyBy API.
>
> So the three cases are different, I will describe them one by one:
>
> 1. input.keyBy(0).sum(1)
>
> *In this case, the result is event-driven, each event can produce one sum
> aggregation result and it is the latest one from the source start.*
>
> 2. input.localKeyBy(0).sum(1).keyBy(0).sum(1)
>
> *In this case, the semantic may have a problem, it would do the local sum
> aggregation and will produce the latest partial result from the source
> start for every event. *
> *These latest partial results from the same key are hashed to one node to
> do the global sum aggregation.*
> *In the global aggregation, when it received multiple partial results
> (they
> are all calculated from the source start) and sum them will get the wrong
> result.*
>
> 3. input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1)
>
> *In this case, it would just get a partial aggregation result for the 5
> records in the count window. The partial aggregation results from the
> same
> key will be aggregated globally.*
>
> So the first case and the third case can get the *same* result, the
> difference is the output-style and the latency.
>
> Generally speaking, the local key API is just an optimization API. We do
> not limit the user's usage, but the user has to understand its semantics
> and use it correctly.
>
> Best,
> Vino
>
> Hequn Cheng <chenghe...@gmail.com> 于2019年6月17日周一 下午4:18写道:
>
> Hi Vino,
>
> Thanks for the proposal, I think it is a very good feature!
>
> One thing I want to make sure is the semantics for the `localKeyBy`.
> From
> the document, the `localKeyBy` API returns an instance of `KeyedStream`
> which can also perform sum(), so in this case, what's the semantics for
> `localKeyBy()`. For example, will the following code share the same
> result?
> and what're the differences between them?
>
> 1. input.keyBy(0).sum(1)
> 2. input.localKeyBy(0).sum(1).keyBy(0).sum(1)
> 3. input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1)
>
> Would also be great if we can add this into the document. Thank you
> very
> much.
>
> Best, Hequn
>
>
> On Fri, Jun 14, 2019 at 11:34 AM vino yang <yanghua1...@gmail.com>
> wrote:
>
> Hi Aljoscha,
>
> I have looked at the "*Process*" section of FLIP wiki page.[1] This
> mail
> thread indicates that it has proceeded to the third step.
>
> When I looked at the fourth step(vote step), I didn't find the
> prerequisites for starting the voting process.
>
> Considering that the discussion of this feature has been done in the
> old
> thread. [2] So can you tell me when should I start voting? Can I
> start
> now?
>
> Best,
> Vino
>
> [1]:
>
>
>
>
>
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals#FlinkImprovementProposals-FLIPround-up
> [2]:
>
>
>
>
>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308
>
> leesf <leesf0...@gmail.com> 于2019年6月13日周四 上午9:19写道:
>
> +1 for the FLIP, thank vino for your efforts.
>
> Best,
> Leesf
>
> vino yang <yanghua1...@gmail.com> 于2019年6月12日周三 下午5:46写道:
>
> Hi folks,
>
> I would like to start the FLIP discussion thread about supporting
> local
> aggregation in Flink.
>
> In short, this feature can effectively alleviate data skew. This
> is
> the
> FLIP:
>
>
>
>
>
>
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-44%3A+Support+Local+Aggregation+in+Flink
>
>
> *Motivation* (copied from FLIP)
>
> Currently, keyed streams are widely used to perform aggregating
> operations
> (e.g., reduce, sum and window) on the elements that have the same
> key.
> When
> executed at runtime, the elements with the same key will be sent
> to
> and
> aggregated by the same task.
>
> The performance of these aggregating operations is very sensitive
> to
> the
> distribution of keys. In the cases where the distribution of keys
> follows a
> powerful law, the performance will be significantly downgraded.
> More
> unluckily, increasing the degree of parallelism does not help
> when
> a
> task
> is overloaded by a single key.
>
> Local aggregation is a widely-adopted method to reduce the
> performance
> degraded by data skew. We can decompose the aggregating
> operations
> into
> two
> phases. In the first phase, we aggregate the elements of the same
> key
> at
> the sender side to obtain partial results. Then at the second
> phase,
> these
> partial results are sent to receivers according to their keys and
> are
> combined to obtain the final result. Since the number of partial
> results
> received by each receiver is limited by the number of senders,
> the
> imbalance among receivers can be reduced. Besides, by reducing
> the
> amount
> of transferred data the performance can be further improved.
>
> *More details*:
>
> Design documentation:
>
>
>
>
>
>
>
> https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing
>
> Old discussion thread:
>
>
>
>
>
>
>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308
>
> JIRA: FLINK-12786 <
> https://issues.apache.org/jira/browse/FLINK-12786
>
>
> We are looking forwards to your feedback!
>
> Best,
> Vino
>
>
>
>
>
>
>

Reply via email to