[ https://issues.apache.org/jira/browse/KAFKA-7748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17322023#comment-17322023 ]
Andrey Bratus commented on KAFKA-7748: -------------------------------------- Hi [~cosbor11], for a simple use case I worked around this problem using Processor API and punctuation {code:java} public class SuppressProcessor extends AbstractProcessor<String, Integer> { private final String fStoreName; private final Duration fPunctuationInterval; private final long fSuppressTimeoutMillis; private TimestampedKeyValueStore<String, Integer> stateStore; public SuppressProcessor(String storeName, Duration suppressTimeout, Duration punctuationInterval) { fStoreName = storeName; fSuppressTimeoutMillis = suppressTimeout.toMillis(); fPunctuationInterval = punctuationInterval; } @Override public void init(ProcessorContext context) { super.init(context); stateStore = (TimestampedKeyValueStore) context.getStateStore(fStoreName); context.schedule(fPunctuationInterval, PunctuationType.WALL_CLOCK_TIME, this::punctuate); } @Override public void process(String key, Integer value) { ValueAndTimestamp<Integer> record = stateStore.get(key); if (record != null) { stateStore.put(key, ValueAndTimestamp.make(record.value() + value, record.timestamp())); } else { stateStore.put(key, ValueAndTimestamp.make(value, context().timestamp())); } } private void punctuate(long timestamp) { try (var iterator = stateStore.all()) { while (iterator.hasNext()) { KeyValue<String, ValueAndTimestamp<Integer>> record = iterator.next(); if (timestamp - record.value.timestamp() > fSuppressTimeoutMillis) { context().forward(record.key, record.value); stateStore.delete(record.key); } } } } } {code} > Add wall clock TimeDefinition for suppression of intermediate events > -------------------------------------------------------------------- > > Key: KAFKA-7748 > URL: https://issues.apache.org/jira/browse/KAFKA-7748 > Project: Kafka > Issue Type: New Feature > Components: streams > Affects Versions: 2.1.0 > Reporter: Jonathan Gordon > Priority: Major > Labels: needs-kip > > Currently, Kafka Streams offers the ability to suppress intermediate events > based on either RecordTime or WindowEndTime, which are in turn defined by > stream time: > {{Suppressed.untilTimeLimit(final Duration timeToWaitForMoreEvents, final > BufferConfig bufferConfig)}} > It would be helpful to have another option that would allow suppression of > intermediate events based on wall clock time. This would allow us to only > produce a limited number of aggregates independent of their stream time > (which in our case is event time). > For reference, here's the relevant KIP: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables#KIP-328:AbilitytosuppressupdatesforKTables-Best-effortratelimitperkey] > And here's the relevant Confluent Slack thread: > https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1544468349201700 > -- This message was sent by Atlassian Jira (v8.3.4#803005)