I did not study your code snippet, but yes, it sounds like a valid approach from your description.

How can I be sure that the start of the window will
coincide with the Punctuator's scheduled interval?

For punctuations, there is always some jitter, because it's not possible to run a punctuation at the very exact point in time when it is scheduled to run. Thus, a punctuation might fire a little delayed anyway. You can also not control the "anchor point" directly, because it depends on the point in time when you register the punctuation.

Also note, that a WindowedStore is basically still a key-value store, ie a single key-value pair models one window. The main difference is the timestamp that is use to expired rows eventually, what just implies that expired rows are dropped (without any notification).

Thus, the only thing you can do is, to run the punctuation frequently enough to keep latency low enough to detect windows that are logically closed to forward the corresponding result.

But you cannot really "bind" the punctuation with the state store expiration, and window-store also does not support deletes... Thus, I am wondering if using a plain key-value store might be more useful for your case? Using a plain kv-store, whenever the punctuation runs you can find closed windows, forward the result and also delete the row explicitly, which give you more control.

Hope this helps.

-Matthias

On 11/2/21 10:29 AM, Luigi Cerone wrote:
I'm using Kafka Streams in a deduplication events problem over short time
windows (<= 1 minute).
First I've tried to tackle the problem by using DSL API with
[`.suppress(Suppressed.untilWindowCloses(...))`](
https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html)
operator but, given the fact that wall-clock time is not yet supported
(I've seen the [KIP 424](
https://cwiki.apache.org/confluence/display/KAFKA/KIP-424%3A+Allow+suppression+of+intermediate+events+based+on+wall+clock+time)),
this operator is not viable for my use case.

Then, I've followed this [official Confluent example](
https://kafka-tutorials.confluent.io/finding-distinct-events/confluent.html)
in which low level Processor API is used and it was working fine but has
one major limitation for my use-case. The single event (obtained by
deduplication) is emitted at the **beginning** of the time window,
subsequent duplicated events are "suppressed". In my use case I need the
reverse of that, meaning that a single event should be emitted at the end
of the window.
I'm asking for suggestions on how to implement this use case with Processor
API.

My idea was to use the Processor API with a custom [Transformer](
https://kafka.apache.org/20/javadoc/org/apache/kafka/streams/kstream/Transformer.html)
and a [Punctuator](
https://kafka.apache.org/24/javadoc/index.html?org/apache/kafka/streams/processor/Punctuator.html
).
The transformer would store in a [WindowStore](
https://kafka.apache.org/24/javadoc/org/apache/kafka/streams/state/WindowStore.html)
the distinct keys received without returning any KeyValue. Simultaneously,
I'd schedule a punctuator running with an interval equal to the size of the
window in the WindowStore. This punctuator will iterate over the elements
in the store and forward them downstream.
The following are some core parts of the logic:

DeduplicationTransformer (slightly modified from [official Confluent
example](
https://kafka-tutorials.confluent.io/finding-distinct-events/confluent.html)
):
```java
     @Override
     @SuppressWarnings("unchecked")
     public void init(final ProcessorContext context) {
         this.context = context;
         eventIdStore = (WindowStore<E, V>)
context.getStateStore(this.storeName);

         // Schedule punctuator for this transformer.
         context.schedule(Duration.ofMillis(this.windowSizeMs),
PunctuationType.WALL_CLOCK_TIME,
             new DeduplicationPunctuator<E, V>(eventIdStore, context,
this.windowSizeMs));
     }

     @Override
     public KeyValue<K, V> transform(final K key, final V value) {
         final E eventId = idExtractor.apply(key, value);
         if (eventId == null) {
             return KeyValue.pair(key, value);
         } else {
             if (!isDuplicate(eventId)) {
                 rememberNewEvent(eventId, value, context.timestamp());
             }
             return null;
         }
     }
```

DeduplicationPunctuator:
```java
     public DeduplicationPunctuator(WindowStore<E, V> eventIdStore,
ProcessorContext context,
         long retainPeriodMs) {
         this.eventIdStore = eventIdStore;
         this.context = context;
         this.retainPeriodMs = retainPeriodMs;
     }

     @Override
     public void punctuate(long invocationTime) {
         LOGGER.info("Punctuator invoked at {}, searching from {}", new
Date(invocationTime), new Date(invocationTime-retainPeriodMs));

         KeyValueIterator<Windowed<E>, V> it =
             eventIdStore.fetchAll(invocationTime - retainPeriodMs,
invocationTime + retainPeriodMs);

         while (it.hasNext()) {
             KeyValue<Windowed<E>, V> next = it.next();

             LOGGER.info("Punctuator running on {}", next.key.key());

             context.forward(next.key.key(), next.value);
             // Delete from store with tombstone
             eventIdStore.put(next.key.key(), null, invocationTime);
             context.commit();
         }

         it.close();
     }
```

Is this a valid approach?
With the previous code, I'm running some integration tests and I've some
synchronization issues. How can I be sure that the start of the window will
coincide with the Punctuator's scheduled interval?


Also as an alternative approach, I was wondering (I've googled with no
result), if there is any event triggered by window closing to which I can
attach a callback in order to iterate over store and publish only distinct
events.

Thanks.

My question is also here: https://stackoverflow.com/q/69725131/4837677

Reply via email to