[ 
https://issues.apache.org/jira/browse/KAFKA-13870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17531836#comment-17531836
 ] 

Matthias J. Sax commented on KAFKA-13870:
-----------------------------------------

I think you mix up two concepts: windowing is about "grouping" records 
according to their timestamp. Thus, the _window-size_ you define via 
`TimeWindows.withSizeAndGrace()` (or similar) defines into which window a 
record falls into.

On the other hand suppression has nothing to do with the definition of window 
bounds, but it's about when to emit (potentially partial) results for a window 
– however, if a partial result is emitted, the window is not closed.

It seems your request is more about a window definition rather than suppress, 
and you want some kind of "count based" window? It might be possible to add, 
but only if your requirement are clear. Note that the suppress buffer size has 
nothing to do with the window definition. The buffer size basically define how 
many windows the buffer can hold before it need to emit a partial result and 
drop a window from the buffer.

Thus, it's not clear to me what semantics you really need?

For now, you could still build it manually using the Processor API.

> support both Suppressed untilTimeLimit and maxBytes without using 
> emitEarlyWhenFull()
> -------------------------------------------------------------------------------------
>
>                 Key: KAFKA-13870
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13870
>             Project: Kafka
>          Issue Type: New Feature
>            Reporter: Anil
>            Priority: Major
>
> My use case is to use  ** *untilTimeLimit* with *maxBytes,* but when the 
> buffer is full, the application is breaking, but with using 
> *{{emitEarlyWhenFull}}* {{{}application is not breaking but{}}}{*}{{}}{*} it 
> sends out the same key record multiple times in a particular window when the 
> buffer exceeds max bytes 
> for eg:-
> *Suppressed.untilTimeLimit(Duration.ofMinutes(15),Suppressed.BufferConfig.maxBytes(10000).emitEarlyWhenFull())*
>  
> messages flow : (A,1) (A,2) (A,3) -> aggregation result : (A,6) . suppose 
> here, the buffer is full, (A,6) will be sent downstream. Let's suppose (A,4) 
> comes now in the same tumbling window.
>  
> current response:- the aggregation will continue and eventually *(A,10)* will 
> be emitted
>  
> but our application expected *(A,4),*  so the request for the feature is that 
> window should be happening with window time(untilTimeLimit) or 
> Buffer(maxByte) should full, in either of these two conditions met, a new 
> window should be created and data should be emitted 
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to