[ 
https://issues.apache.org/jira/browse/KAFKA-8147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17332630#comment-17332630
 ] 

Philip Bourke edited comment on KAFKA-8147 at 4/26/21, 5:34 PM:
----------------------------------------------------------------

I'm looking to implement this fix but it seems like it only works depending on 
the order of the 
{{BufferConfig}}

If I do this it works -
{code:java}
.suppress(Suppressed.untilTimeLimit(Duration.ofMillis(maxIdleIntervalMs), 
BufferConfig.maxRecords(
 
maxBufferRecords).emitEarlyWhenFull().withLoggingEnabled(changelogConfig)){code}

But not if I set the {{withLoggingEnabled}} before {{emitEarlyWhenFull}}.
Is it expected that the {{BufferConfig}} should be set in a particular order?


was (Author: philbour):
I'm looking to implement this fix but it seems like it only works depending on 
the order of the {{BufferConfig}}
If I do this it works -
.suppress(Suppressed.untilTimeLimit(Duration.ofMillis(maxIdleIntervalMs), 
BufferConfig.maxRecords(
                        
maxBufferRecords).emitEarlyWhenFull().withLoggingEnabled(changelogConfig))
But not if I set the {{withLoggingEnabled }}before {{emitEarlyWhenFull}}.
Is it expected that the {{BufferConfig }}should be set in a particular order?

> Add changelog topic configuration to KTable suppress
> ----------------------------------------------------
>
>                 Key: KAFKA-8147
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8147
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 2.1.1
>            Reporter: Maarten
>            Assignee: highluck
>            Priority: Minor
>              Labels: kip
>             Fix For: 2.6.0
>
>
> The streams DSL does not provide a way to configure the changelog topic 
> created by KTable.suppress.
> From the perspective of an external user this could be implemented similar to 
> the configuration of aggregate + materialized, i.e.,
> {code:java}
> changelogTopicConfigs = // Configs
> materialized = Materialized.as(..).withLoggingEnabled(changelogTopicConfigs)
> ..
> KGroupedStream.aggregate(..,materialized)
> {code}
> [KIP-446: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-446%3A+Add+changelog+topic+configuration+to+KTable+suppress|https://cwiki.apache.org/confluence/display/KAFKA/KIP-446%3A+Add+changelog+topic+configuration+to+KTable+suppress]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to