Hi Dian,

The different opinion is fine for me, If there is a better solution or
there are obvious deficiencies in our design, we are very happy to accept
and improve it.

I agree with you that customized local aggregate operator is more scalable
in the way of the trigger mechanism. However, I have two questions about
your reply.

1) When, Why and How to judge the memory is exhausted?

IMO, the operator is in a high abstract level, when implementing we should
not care about the memory is exhausted.

2) If the local aggregate operator rarely needs to operate the state, what
do you think about fault tolerance?

We reuse Flink's state concept because we can get the benefit from the
fault tolerance. We need to guarantee correctness semantics.

Best,
Vino


Dian Fu <dian0511...@gmail.com> 于2019年6月4日周二 下午10:31写道:

> Hi Vino,
>
> It may seem similar to window operator but there are also a few key
> differences. For example, the local aggregate operator can send out the
> results at any time and the window operator can only send out the results
> at the end of window (without early fire). This means that the local
> aggregate operator can send out the results not only when the trigger time
> is reached, but also when the memory is exhausted. This difference makes
> optimization available as it means that the local aggregate operator rarely
> need to operate the state.
>
> I admit that window operator can solve part of the problem (the data skew)
> and just wonder if we can do more. Using window operator at present seems
> OK for me as it can indeed solve part of the problems. We just need to
> think a little more in the design and make sure that the current solution
> is consistent with future optimizations.
>
> Thanks,
>
> Dian
>
> 在 2019年6月4日,下午5:22,vino yang <yanghua1...@gmail.com> 写道:
>
> Hi Dian,
>
> Thanks for your reply.
>
> I know what you mean. However, if you think deeply, you will find your
> implementation need to provide an operator which looks like a window
> operator. You need to use state and receive aggregation function and
> specify the trigger time. It looks like a lightweight window operator.
> Right?
>
> We try to reuse Flink provided functions and reduce complexity. IMO, It is
> more user-friendly because users are familiar with the window API.
>
> Best,
> Vino
>
>
> Dian Fu <dian0511...@gmail.com> 于2019年6月4日周二 下午4:19写道:
>
> Hi Vino,
>
> Thanks a lot for starting this discussion. +1 to this feature as I think
> it will be very useful.
>
> Regarding to using window to buffer the input elements, personally I don't
> think it's a good solution for the following reasons:
> 1) As we know that WindowOperator will store the accumulated results in
> states, this is not necessary for Local Aggregate operator.
> 2) For WindowOperator, each input element will be accumulated to states.
> This is also not necessary for Local Aggregate operator and storing the
> input elements in memory is enough.
>
> Thanks,
> Dian
>
> 在 2019年6月4日,上午10:03,vino yang <yanghua1...@gmail.com> 写道:
>
> Hi Ken,
>
> Thanks for your reply.
>
> As I said before, we try to reuse Flink's state concept (fault tolerance
> and guarantee "Exactly-Once" semantics). So we did not consider cache.
>
> In addition, if we use Flink's state, the OOM related issue is not a key
> problem we need to consider.
>
> Best,
> Vino
>
> Ken Krugler <kkrugler_li...@transpac.com> 于2019年6月4日周二 上午1:37写道:
>
> Hi all,
>
> Cascading implemented this “map-side reduce” functionality with an LLR
> cache.
>
> That worked well, as then the skewed keys would always be in the cache.
>
> The API let you decide the size of the cache, in terms of number of
> entries.
>
> Having a memory limit would have been better for many of our use cases,
> though FWIR there’s no good way to estimate in-memory size for objects.
>
> — Ken
>
> On Jun 3, 2019, at 2:03 AM, vino yang <yanghua1...@gmail.com> wrote:
>
> Hi Piotr,
>
> The localKeyBy API returns an instance of KeyedStream (we just added an
> inner flag to identify the local mode) which is Flink has provided
>
> before.
>
> Users can call all the APIs(especially *window* APIs) which KeyedStream
> provided.
>
> So if users want to use local aggregation, they should call the window
>
> API
>
> to build a local window that means users should (or say "can") specify
>
> the
>
> window length and other information based on their needs.
>
> I think you described another idea different from us. We did not try to
> react after triggering some predefined threshold. We tend to give users
>
> the
>
> discretion to make decisions.
>
> Our design idea tends to reuse Flink provided concept and functions
>
> like
>
> state and window (IMO, we do not need to worry about OOM and the issues
>
> you
>
> mentioned).
>
> Best,
> Vino
>
> Piotr Nowojski <pi...@ververica.com> 于2019年6月3日周一 下午4:30写道:
>
> Hi,
>
> +1 for the idea from my side. I’ve even attempted to add similar
>
> feature
>
> quite some time ago, but didn’t get enough traction [1].
>
> I’ve read through your document and I couldn’t find it mentioning
> anywhere, when the pre aggregated result should be emitted down the
>
> stream?
>
> I think that’s one of the most crucial decision, since wrong decision
>
> here
>
> can lead to decrease of performance or to an explosion of memory/state
> consumption (both with bounded and unbounded data streams). For
>
> streaming
>
> it can also lead to an increased latency.
>
> Since this is also a decision that’s impossible to make automatically
> perfectly reliably, first and foremost I would expect this to be
> configurable via the API. With maybe some predefined triggers, like on
> watermark (for windowed operations), on checkpoint barrier (to
>
> decrease
>
> state size?), on element count, maybe memory usage (much easier to
>
> estimate
>
> with a known/predefined types, like in SQL)… and with some option to
> implement custom trigger.
>
> Also what would work the best would be to have a some form of memory
> consumption priority. For example if we are running out of memory for
> HashJoin/Final aggregation, instead of spilling to disk or crashing
>
> the
>
> job
>
> with OOM it would be probably better to prune/dump the pre/local
> aggregation state. But that’s another story.
>
> [1] https://github.com/apache/flink/pull/4626 <
> https://github.com/apache/flink/pull/4626>
>
> Piotrek
>
> On 3 Jun 2019, at 10:16, sf lee <leesf0...@gmail.com> wrote:
>
> Excited and  Big +1 for this feature.
>
> SHI Xiaogang <shixiaoga...@gmail.com> 于2019年6月3日周一 下午3:37写道:
>
> Nice feature.
> Looking forward to having it in Flink.
>
> Regards,
> Xiaogang
>
> vino yang <yanghua1...@gmail.com> 于2019年6月3日周一 下午3:31写道:
>
> Hi all,
>
> As we mentioned in some conference, such as Flink Forward SF 2019
>
> and
>
> QCon
>
> Beijing 2019, our team has implemented "Local aggregation" in our
>
> inner
>
> Flink fork. This feature can effectively alleviate data skew.
>
> Currently, keyed streams are widely used to perform aggregating
>
> operations
>
> (e.g., reduce, sum and window) on the elements that having the same
>
> key.
>
> When executed at runtime, the elements with the same key will be
>
> sent
>
> to
>
> and aggregated by the same task.
>
> The performance of these aggregating operations is very sensitive
>
> to
>
> the
>
> distribution of keys. In the cases where the distribution of keys
>
> follows a
>
> powerful law, the performance will be significantly downgraded.
>
> More
>
> unluckily, increasing the degree of parallelism does not help when
>
> a
>
> task
>
> is overloaded by a single key.
>
> Local aggregation is a widely-adopted method to reduce the
>
> performance
>
> degraded by data skew. We can decompose the aggregating operations
>
> into
>
> two
>
> phases. In the first phase, we aggregate the elements of the same
>
> key
>
> at
>
> the sender side to obtain partial results. Then at the second
>
> phase,
>
> these
>
> partial results are sent to receivers according to their keys and
>
> are
>
> combined to obtain the final result. Since the number of partial
>
> results
>
> received by each receiver is limited by the number of senders, the
> imbalance among receivers can be reduced. Besides, by reducing the
>
> amount
>
> of transferred data the performance can be further improved.
>
> The design documentation is here:
>
>
>
>
>
>
> https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing
>
>
> Any comment and feedback are welcome and appreciated.
>
> Best,
> Vino
>
>
>
>
>
> --------------------------
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com
> Custom big data solutions & training
> Flink, Solr, Hadoop, Cascading & Cassandra
>

Reply via email to