[ 
https://issues.apache.org/jira/browse/KAFKA-7748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17322023#comment-17322023
 ] 

Andrey Bratus commented on KAFKA-7748:
--------------------------------------

Hi [~cosbor11],

for a simple use case I worked around this problem using Processor API and 
punctuation

{code:java}
public class SuppressProcessor extends AbstractProcessor<String, Integer> {

    private final String fStoreName;
    private final Duration fPunctuationInterval;
    private final long fSuppressTimeoutMillis;
    private TimestampedKeyValueStore<String, Integer> stateStore;


    public SuppressProcessor(String storeName, Duration suppressTimeout, 
Duration punctuationInterval) {
        fStoreName = storeName;
        fSuppressTimeoutMillis = suppressTimeout.toMillis();
        fPunctuationInterval = punctuationInterval;
    }

    @Override
    public void init(ProcessorContext context) {
        super.init(context);
        stateStore = (TimestampedKeyValueStore) 
context.getStateStore(fStoreName);
        context.schedule(fPunctuationInterval, PunctuationType.WALL_CLOCK_TIME, 
this::punctuate);
    }

    @Override
    public void process(String key, Integer value) {
        ValueAndTimestamp<Integer> record = stateStore.get(key);
        if (record != null) {
            stateStore.put(key, ValueAndTimestamp.make(record.value() + value, 
record.timestamp()));
        } else {
            stateStore.put(key, ValueAndTimestamp.make(value, 
context().timestamp()));
        }
    }

    private void punctuate(long timestamp) {
        try (var iterator = stateStore.all()) {
            while (iterator.hasNext()) {
                KeyValue<String, ValueAndTimestamp<Integer>> record = 
iterator.next();
                if (timestamp - record.value.timestamp() > 
fSuppressTimeoutMillis) {
                    context().forward(record.key, record.value);
                    stateStore.delete(record.key);
                }
            }
        }
    }
}
{code}


> Add wall clock TimeDefinition for suppression of intermediate events
> --------------------------------------------------------------------
>
>                 Key: KAFKA-7748
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7748
>             Project: Kafka
>          Issue Type: New Feature
>          Components: streams
>    Affects Versions: 2.1.0
>            Reporter: Jonathan Gordon
>            Priority: Major
>              Labels: needs-kip
>
> Currently, Kafka Streams offers the ability to suppress intermediate events 
> based on either RecordTime or WindowEndTime, which are in turn defined by 
> stream time:
> {{Suppressed.untilTimeLimit(final Duration timeToWaitForMoreEvents, final 
> BufferConfig bufferConfig)}}
> It would be helpful to have another option that would allow suppression of 
> intermediate events based on wall clock time. This would allow us to only 
> produce a limited number of aggregates independent of their stream time 
> (which in our case is event time).
> For reference, here's the relevant KIP:
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables#KIP-328:AbilitytosuppressupdatesforKTables-Best-effortratelimitperkey]
> And here's the relevant Confluent Slack thread:
> https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1544468349201700
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to