Hi, Sorry for getting back so late and thanks for the improved document :) I think now I got your idea.
You are now trying (or have you already done it?) to implement a custom window assigner, that would work as in the [Figure 3] from your document? I think that indeed should be possible and relatively easy to do without the need for API changes. Piotrek > On 1 Oct 2018, at 17:48, Rong Rong <walter...@gmail.com> wrote: > > Hi Piotrek, > > Thanks for the quick response. To follow up with the questions: > Re 1). Yes it is causing network I/O issues on Kafka itself. > > Re 2a). Actually. I thought about it last weekend and I think there's a way > for a work around: We directly duplicated the key extraction logic in our > window assigner. Since the element record is passed in, it should be OK to > create a customized window assigner to handle offset-based on key by > extracting the key from record > This was the main part of my change: to let WindowAssignerContext to > provide current key information extracted from KeyedStateBackend. > > Re 2b). Thanks for the explanation, we will try to profile it! We've seems > some weird behaviors previously when loading up the network buffer in > Flink, although it's very rare and inconsistent when trying to reproduce. > > Re 3) Regarding the event time offset. I think I might have not explain my > idea clearly. I added some more details to the doc. Please kindly take a > look. > In a nutshell, window offsets does not change the event time of records at > all. We simply changes how window assigner assigns records to windows with > various different offsets. > > -- > Rong > > On Fri, Sep 28, 2018 at 8:03 AM Piotr Nowojski <pi...@data-artisans.com> > wrote: > >> Hi, >> >> Thanks for the response again :) >> >> Re 1). Do you mean that this extra burst external I/O network traffic is >> causing disturbance with other systems reading/writing from Kafka? With >> Kafka itself? >> >> Re 2a) Yes, it should be relatively simple, however any new brick makes >> the overall component more and more complicated, which has long term >> consequences in maintenance/refactoring/adding new features/just making >> reading the code more difficult etc. >> >> Re 2b) With setup of: >> >> WindowOperator -> RateLimitingOperator(maxSize = 0) -> Sink >> >> RateLimitingOperator would just slow down data processing via standard >> back pressure mechanism. Flink by default allocates 10% of the memory to >> Network buffers we could partially relay on them to buffer some smaller >> bursts, without blocking whole pipeline altogether. Essentially >> RateLimitingOperator(maxSize = 0) would cause back pressure and slow down >> record emission from the WindowOperator. So yes, there would be still batch >> emission of the data in the WindowOperator itself, but it would be >> prolonged/slowed down in terms of wall time because of down stream back >> pressure caused by RateLimitingOperator. >> >> Btw, with your proposal, with what event time do you want to emit the >> delayed data? If the event time of the produced records changes based on >> using/not using windows offsets, this can cause quite a lot of semantic >> problems and side effects for the downstream operators. >> >> Piotrek >> >>> On 28 Sep 2018, at 15:18, Rong Rong <walter...@gmail.com> wrote: >>> >>> Hi Piotrek, >>> >>> Thanks for getting back to me so quickly. Let me explain. >>> >>> Re 1). As I explained in the doc. we are using a basic Kafka-in Kafka-out >>> system with same partition number on both side. It is causing degraded >>> performance in external I/O network traffic. >>> It is definitely possible to configure more resource (e.g. larger >> partition >>> count) for output to handle the burst but it can also be resolved through >>> some sort of smoothing through internal (either through rate limiting as >>> you suggested, or through the dynamic offset). >>> >>> Re 2a). Yes I agree and I think I understand your concern. However it is >>> one simple API addition with default fallbacks that are fully >>> backward-compatible (or I think it be made fully compatible if I missed >> and >>> corner cases). >>> Re 2b). Yes. there could be many potential issues that causes data burst. >>> However, putting aside the scenarios that was caused by the nature of the >>> stream (data skew, bursts) that both affects input and output. We want to >>> address specifically the case that a smooth input is *deterministically* >>> resulting in burst output. What we are proposing here is kind of exactly >>> like the case of users' customer operator. However we can't do so unless >>> there's an API to control the offset. >>> >>> Regarding the problem of rate limiting and skew. I think I missed one key >>> point from you. I think you are right. If we introduce a *new rate >> limiting >>> operator *(with size > 0) it will >>> - causes extra state usage within the container (moving all the >>> components from window operator and store in rate limit buffer at window >>> boundaries). >>> - will not cause data skew problem: The data skew problem I mentioned is >>> that, if data are buffered in window operator state longer for some data >>> but not the other. Then potentially some panes will handle more late >>> arrival than others. >>> >>> However if it is possible to get rid of the extra memory usage we will >>> definitely benchmark the rate-limit approach. Can you be more specific on >>> how setting the rate-limit operator (size = 0) can resolve the burst >> issue? >>> If I understand correctly the backpressure will cause the watermark to >> not >>> advance, but once it crosses the window boundary, there will still be a >>> batch of messages emitting out of the window operator at the same time, >>> correct? >>> >>> Thanks, >>> Rong >>> >>> >>> >>> On Fri, Sep 28, 2018 at 1:25 AM Piotr Nowojski <pi...@data-artisans.com> >>> wrote: >>> >>>> Hi, >>>> >>>> Re 1. Can you be more specific? What system are you using, what’s >>>> happening and how does it brake? >>>> >>>> While delaying windows firing is probably the most cost effective >> solution >>>> for this particular problem, it has some disadvantages: >>>> a) putting even more logic to already complicated component >>>> b) not solving potential similar problems. I can easily imagine the same >>>> issue happening to other scenarios then "interval based operators” such >> as: >>>> - input sources faster then output sinks >>>> - data skew >>>> - data bursts >>>> - users' custom operators causing data bursts >>>> - users’ custom operators being prone to bursts (maybe something >>>> like AsyncOperator or something else that works with an external >> system) - >>>> so the problem might not necessarily be limited to the sinks >>>> >>>> As far as I recall, there were some users reporting some similar issues. >>>> >>>> Regarding potential drawbacks of rate limiting, I didn’t understand this >>>> part: >>>> >>>>> However the problem is similar to delay triggers which can provide >>>> degraded performance for skew sensitive downstream service, such as >> feeding >>>> feature extraction results to deep learning model. >>>> >>>> >>>> The way how I could imagine RateLimitingOperator is that it could take a >>>> parameters: rate limits, buffer size limit. >>>> >>>> With buffer size = 0, it would cause immediately a back pressure if rate >>>> is exceeded >>>> With buffer size > 0, ti would first buffer events on the state and only >>>> when reaching max buffer size, causing the back pressure >>>> >>>> For the case with WindowOperator, if windows are evicted and removed >> from >>>> the state, using buffer size > 0, wouldn’t cause increased state usage, >> it >>>> would only move the state from the WindowOperator to the >>>> RateLimitingOperator. >>>> >>>> Piotrek >>>> >>>>> On 27 Sep 2018, at 17:28, Rong Rong <walter...@gmail.com> wrote: >>>>> >>>>> HI Piotrek, >>>>> >>>>> Yes, to be more clear, >>>>> 1) the network I/O issue I am referring to is in between Flink and >>>> external >>>>> sink. We did not see issues in between operators. >>>>> 2) yes we've considered rate limiting sink functions as well which is >>>> also >>>>> mentioned in the doc. along with some of the the pro-con we identified. >>>>> >>>>> This kind of problem seems to only occur in WindowOperator so far, but >>>> yes >>>>> it can probably occur to any aligned interval based operator. >>>>> >>>>> -- >>>>> Rong >>>>> >>>>> On Wed, Sep 26, 2018 at 11:44 PM Piotr Nowojski < >> pi...@data-artisans.com >>>>> >>>>> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> Thanks for the proposal. Could you provide more >>>>>> background/explanation/motivation why do you need such feature? What >> do >>>> you >>>>>> mean by “network I/O” degradation? >>>>>> >>>>>> On it’s own burst writes shouldn’t cause problems within Flink. If >> they >>>>>> do, we might want to fix the original underlying problem and if they >> are >>>>>> causing problems in external systems, we also might think about other >>>>>> approaches to fix/handle the problem (write rate limiting?), which >>>> might be >>>>>> more general and not fixing only bursts originating from >> WindowOperator. >>>>>> I’m not saying that your proposal is bad or anything, but I would just >>>> like >>>>>> to have more context :) >>>>>> >>>>>> Piotrek. >>>>>> >>>>>>> On 26 Sep 2018, at 19:21, Rong Rong <walter...@gmail.com> wrote: >>>>>>> >>>>>>> Hi Dev, >>>>>>> >>>>>>> I was wondering if there's any previous discussion regarding how to >>>>>> handle >>>>>>> burst network I/O when deploying Flink applications with window >>>>>> operators. >>>>>>> >>>>>>> We've recently see some significant network I/O degradation when >> trying >>>>>> to >>>>>>> use sliding window to perform rolling aggregations. The pattern is >> very >>>>>>> periodic: output connections get no traffic for a period of time >> until >>>> a >>>>>>> burst at window boundaries (in our case every 5 minutes). >>>>>>> >>>>>>> We have drafted a doc >>>>>>> < >>>>>> >>>> >> https://docs.google.com/document/d/1fEhbcRgxxX8zFYD_iMBG1DCbHmTcTRfRQFXelPhMFiY/edit?usp=sharing >>>>>>> >>>>>>> on >>>>>>> how we proposed to handle it to smooth the output traffic spikes. >>>> Please >>>>>>> kindly take a look, any comments and suggestions are highly >>>> appreciated. >>>>>>> >>>>>>> -- >>>>>>> Rong >>>>>> >>>>>> >>>> >>>> >> >>