Hi,

Sorry for getting back so late and thanks for the improved document :) I think 
now I got your idea.

You are now trying (or have you already done it?) to implement a custom window 
assigner, that would work as in the [Figure 3] from your document? 


I think that indeed should be possible and relatively easy to do without the 
need for API changes.

Piotrek

> On 1 Oct 2018, at 17:48, Rong Rong <walter...@gmail.com> wrote:
> 
> Hi Piotrek,
> 
> Thanks for the quick response. To follow up with the questions:
> Re 1). Yes it is causing network I/O issues on Kafka itself.
> 
> Re 2a). Actually. I thought about it last weekend and I think there's a way
> for a work around: We directly duplicated the key extraction logic in our
> window assigner. Since the element record is passed in, it should be OK to
> create a customized window assigner to handle offset-based on key by
> extracting the key from record
> This was the main part of my change: to let WindowAssignerContext to
> provide current key information extracted from KeyedStateBackend.
> 
> Re 2b). Thanks for the explanation, we will try to profile it! We've seems
> some weird behaviors previously when loading up the network buffer in
> Flink, although it's very rare and inconsistent when trying to reproduce.
> 
> Re 3) Regarding the event time offset. I think I might have not explain my
> idea clearly. I added some more details to the doc. Please kindly take a
> look.
> In a nutshell, window offsets does not change the event time of records at
> all. We simply changes how window assigner assigns records to windows with
> various different offsets.
> 
> --
> Rong
> 
> On Fri, Sep 28, 2018 at 8:03 AM Piotr Nowojski <pi...@data-artisans.com>
> wrote:
> 
>> Hi,
>> 
>> Thanks for the response again :)
>> 
>> Re 1). Do you mean that this extra burst external I/O network traffic is
>> causing disturbance with other systems reading/writing from Kafka? With
>> Kafka itself?
>> 
>> Re 2a) Yes, it should be relatively simple, however any new brick makes
>> the overall component more and more complicated, which has long term
>> consequences in maintenance/refactoring/adding new features/just making
>> reading the code more difficult etc.
>> 
>> Re 2b) With setup of:
>> 
>> WindowOperator -> RateLimitingOperator(maxSize = 0) -> Sink
>> 
>> RateLimitingOperator would just slow down data processing via standard
>> back pressure mechanism. Flink by default allocates 10% of the memory to
>> Network buffers we could partially relay on them to buffer some smaller
>> bursts, without blocking whole pipeline altogether. Essentially
>> RateLimitingOperator(maxSize = 0) would cause back pressure and slow down
>> record emission from the WindowOperator. So yes, there would be still batch
>> emission of the data in the WindowOperator itself, but it would be
>> prolonged/slowed down in terms of wall time because of down stream back
>> pressure caused by RateLimitingOperator.
>> 
>> Btw, with your proposal, with what event time do you want to emit the
>> delayed data? If the event time of the produced records changes based on
>> using/not using windows offsets, this can cause quite a lot of semantic
>> problems and side effects for the downstream operators.
>> 
>> Piotrek
>> 
>>> On 28 Sep 2018, at 15:18, Rong Rong <walter...@gmail.com> wrote:
>>> 
>>> Hi Piotrek,
>>> 
>>> Thanks for getting back to me so quickly. Let me explain.
>>> 
>>> Re 1). As I explained in the doc. we are using a basic Kafka-in Kafka-out
>>> system with same partition number on both side. It is causing degraded
>>> performance in external I/O network traffic.
>>> It is definitely possible to configure more resource (e.g. larger
>> partition
>>> count) for output to handle the burst but it can also be resolved through
>>> some sort of smoothing through internal (either through rate limiting as
>>> you suggested, or through the dynamic offset).
>>> 
>>> Re 2a). Yes I agree and I think I understand your concern. However it is
>>> one simple API addition with default fallbacks that are fully
>>> backward-compatible (or I think it be made fully compatible if I missed
>> and
>>> corner cases).
>>> Re 2b). Yes. there could be many potential issues that causes data burst.
>>> However, putting aside the scenarios that was caused by the nature of the
>>> stream (data skew, bursts) that both affects input and output. We want to
>>> address specifically the case that a smooth input is *deterministically*
>>> resulting in burst output. What we are proposing here is kind of exactly
>>> like the case of users' customer operator. However we can't do so unless
>>> there's an API to control the offset.
>>> 
>>> Regarding the problem of rate limiting and skew. I think I missed one key
>>> point from you. I think you are right. If we introduce a *new rate
>> limiting
>>> operator *(with size > 0) it will
>>> - causes extra state usage within the container (moving all the
>>> components from window operator and store in rate limit buffer at window
>>> boundaries).
>>> - will not cause data skew problem: The data skew problem I mentioned is
>>> that, if data are buffered in window operator state longer for some data
>>> but not the other. Then potentially some panes will handle more late
>>> arrival than others.
>>> 
>>> However if it is possible to get rid of the extra memory usage we will
>>> definitely benchmark the rate-limit approach. Can you be more specific on
>>> how setting the rate-limit operator (size = 0) can resolve the burst
>> issue?
>>> If I understand correctly the backpressure will cause the watermark to
>> not
>>> advance, but once it crosses the window boundary, there will still be a
>>> batch of messages emitting out of the window operator at the same time,
>>> correct?
>>> 
>>> Thanks,
>>> Rong
>>> 
>>> 
>>> 
>>> On Fri, Sep 28, 2018 at 1:25 AM Piotr Nowojski <pi...@data-artisans.com>
>>> wrote:
>>> 
>>>> Hi,
>>>> 
>>>> Re 1. Can you be more specific? What system are you using, what’s
>>>> happening and how does it brake?
>>>> 
>>>> While delaying windows firing is probably the most cost effective
>> solution
>>>> for this particular problem, it has some disadvantages:
>>>> a) putting even more logic to already complicated component
>>>> b) not solving potential similar problems. I can easily imagine the same
>>>> issue happening to other scenarios then "interval based operators” such
>> as:
>>>>       - input sources faster then output sinks
>>>>       - data skew
>>>>       - data bursts
>>>>       - users' custom operators causing data bursts
>>>>       - users’ custom operators being prone to bursts (maybe something
>>>> like AsyncOperator or something else that works with an external
>> system) -
>>>> so the problem might not necessarily be limited to the sinks
>>>> 
>>>> As far as I recall, there were some users reporting some similar issues.
>>>> 
>>>> Regarding potential drawbacks of rate limiting, I didn’t understand this
>>>> part:
>>>> 
>>>>> However the problem is similar to delay triggers which can provide
>>>> degraded performance for skew sensitive downstream service, such as
>> feeding
>>>> feature extraction results to deep learning model.
>>>> 
>>>> 
>>>> The way how I could imagine RateLimitingOperator is that it could take a
>>>> parameters: rate limits, buffer size limit.
>>>> 
>>>> With buffer size = 0, it would cause immediately a back pressure if rate
>>>> is exceeded
>>>> With buffer size > 0, ti would first buffer events on the state and only
>>>> when reaching max buffer size, causing the back pressure
>>>> 
>>>> For the case with WindowOperator, if windows are evicted and removed
>> from
>>>> the state, using buffer size > 0, wouldn’t cause increased state usage,
>> it
>>>> would only move the state from the WindowOperator to the
>>>> RateLimitingOperator.
>>>> 
>>>> Piotrek
>>>> 
>>>>> On 27 Sep 2018, at 17:28, Rong Rong <walter...@gmail.com> wrote:
>>>>> 
>>>>> HI Piotrek,
>>>>> 
>>>>> Yes, to be more clear,
>>>>> 1) the network I/O issue I am referring to is in between Flink and
>>>> external
>>>>> sink. We did not see issues in between operators.
>>>>> 2) yes we've considered rate limiting sink functions as well which is
>>>> also
>>>>> mentioned in the doc. along with some of the the pro-con we identified.
>>>>> 
>>>>> This kind of problem seems to only occur in WindowOperator so far, but
>>>> yes
>>>>> it can probably occur to any aligned interval based operator.
>>>>> 
>>>>> --
>>>>> Rong
>>>>> 
>>>>> On Wed, Sep 26, 2018 at 11:44 PM Piotr Nowojski <
>> pi...@data-artisans.com
>>>>> 
>>>>> wrote:
>>>>> 
>>>>>> Hi,
>>>>>> 
>>>>>> Thanks for the proposal. Could you provide more
>>>>>> background/explanation/motivation why do you need such feature? What
>> do
>>>> you
>>>>>> mean by “network I/O” degradation?
>>>>>> 
>>>>>> On it’s own burst writes shouldn’t cause problems within Flink. If
>> they
>>>>>> do, we might want to fix the original underlying problem and if they
>> are
>>>>>> causing problems in external systems, we also might think about other
>>>>>> approaches to fix/handle the problem (write rate limiting?), which
>>>> might be
>>>>>> more general and not fixing only bursts originating from
>> WindowOperator.
>>>>>> I’m not saying that your proposal is bad or anything, but I would just
>>>> like
>>>>>> to have more context :)
>>>>>> 
>>>>>> Piotrek.
>>>>>> 
>>>>>>> On 26 Sep 2018, at 19:21, Rong Rong <walter...@gmail.com> wrote:
>>>>>>> 
>>>>>>> Hi Dev,
>>>>>>> 
>>>>>>> I was wondering if there's any previous discussion regarding how to
>>>>>> handle
>>>>>>> burst network I/O when deploying Flink applications with window
>>>>>> operators.
>>>>>>> 
>>>>>>> We've recently see some significant network I/O degradation when
>> trying
>>>>>> to
>>>>>>> use sliding window to perform rolling aggregations. The pattern is
>> very
>>>>>>> periodic: output connections get no traffic for a period of time
>> until
>>>> a
>>>>>>> burst at window boundaries (in our case every 5 minutes).
>>>>>>> 
>>>>>>> We have drafted a doc
>>>>>>> <
>>>>>> 
>>>> 
>> https://docs.google.com/document/d/1fEhbcRgxxX8zFYD_iMBG1DCbHmTcTRfRQFXelPhMFiY/edit?usp=sharing
>>>>>>> 
>>>>>>> on
>>>>>>> how we proposed to handle it to smooth the output traffic spikes.
>>>> Please
>>>>>>> kindly take a look, any comments and suggestions are highly
>>>> appreciated.
>>>>>>> 
>>>>>>> --
>>>>>>> Rong
>>>>>> 
>>>>>> 
>>>> 
>>>> 
>> 
>> 

Reply via email to