[ 
https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16904495#comment-16904495
 ] 

Alessandro Tagliapietra commented on KAFKA-8769:
------------------------------------------------

I'm the one who created that stackoverflow question (and [this|[mailing 
list|https://lists.apache.org/thread.html/e8976c1acf4fb3bad9c6944658fdc9ec00237fd11ddebd15bde0f15a@%3Cusers.kafka.apache.org%3E]
 mailing list thread), this would help our use case (which is the one John 
mentioned) of IOT devices sending data not in realtime.

I've faced this problem because I was trying to use a grace period of 0 seconds 
and suppress after the aggregation. We want to close each device time window as 
soon as we get new data because after a window is closed we take some decisions 
along with decisions taken by the user so we don't want old windows to be 
changed or reprocessed. We also want to be able to load historical data when we 
add a new IOT device, this data is obviously older than the data from other IOT 
devices sending data in realtime (since we use a data service between the 
device and kafka the customers usually have old data they want to import into 
our system so we just start polling this service with an old timestamp and 
catch up from there).

My current solution is this:

 
{code:java}
KStream<String, OutputMetricList> aggregatedStream = pairsStream
  .groupByKey()
  .windowedBy(TimeWindows.of(Duration.ofMinutes(1)))
  .aggregate(// ...)
  .toStream()
  .flatTransform(new TransformerSupplier<Windowed<String>, MetricSequenceList, 
Iterable<KeyValue<String, SuppressedOutput>>>() {
    @Override
    public Transformer<Windowed<String>, MetricSequenceList, 
Iterable<KeyValue<String, SuppressedOutput>>> get() {
      return new Transformer<Windowed<String>, MetricSequenceList, 
Iterable<KeyValue<String, SuppressedOutput>>>() {
        KeyValueStore<String, SuppressedOutput> store;

        @SuppressWarnings("unchecked")
        @Override
        public void init(ProcessorContext context) {
          store = (KeyValueStore<String, SuppressedOutput>) 
context.getStateStore("suppress-store");
        }

        @Override
        public Iterable<KeyValue<String, SuppressedOutput>> 
transform(Windowed<String> windowKey, MetricSequenceList sequenceList) {
          String messageKey = windowKey.key();
          long windowEndTimestamp = windowKey.window().endTime().toEpochMilli();
          SuppressedOutput currentSuppressedOutput = new 
SuppressedOutput(windowEndTimestamp, sequenceList);
          SuppressedOutput storeValue = store.get(messageKey);
          if (storeValue == null) {
            store.put(messageKey, currentSuppressedOutput);
            return new ArrayList<>();
          }

          if (windowEndTimestamp > storeValue.getTimestamp()) {
            store.put(messageKey, currentSuppressedOutput);
            List<KeyValue<String, SuppressedOutput>> result = new 
LinkedList<>();
            result.add(KeyValue.pair(messageKey, storeValue));
            return result;
          }

          if (windowEndTimestamp < storeValue.getTimestamp()) {
            return new ArrayList<>();
          }


          store.put(messageKey, currentSuppressedOutput);
          return new ArrayList<>();
        }

        @Override
        public void close() {
        }
      };
    }
  }, "suppress-store")
  .map((sensorId, suppressedOutput) -> {
    ....
{code}
So i basically have a store that just saves the last aggregate for each window 
and makes sure it's processing either the current window or a future one, 
otherwise the data gets dropped.

 

> Consider computing stream time independently per key
> ----------------------------------------------------
>
>                 Key: KAFKA-8769
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8769
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: John Roesler
>            Priority: Major
>
> Currently, Streams uses a concept of "stream time", which is computed as the 
> highest timestamp observed by stateful operators, per partition. This concept 
> of time backs grace period, retention time, and suppression.
> For use cases in which data is produced to topics in roughly chronological 
> order (as in db change capture), this reckoning is fine.
> Some use cases have a different pattern, though. For example, in IOT 
> applications, it's common for sensors to save up quite a bit of data and then 
> dump it all at once into the topic. See 
> https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware
>  for a concrete example of the use case.
> I have heard of cases where each sensor dumps 24 hours' worth of data at a 
> time into the topic. This results in a pattern in which, when reading a 
> single partition, the operators observe a lot of consecutive records for one 
> key that increase in timestamp for 24 hours, then a bunch of consecutive 
> records for another key that are also increasing in timestamp over the same 
> 24 hour period. With our current stream-time definition, this means that the 
> partition's stream time increases while reading the first key's data, but 
> then stays paused while reading the second key's data, since the second batch 
> of records all have timestamps in the "past".
> E.g:
> {noformat}
> A@t0 (stream time: 0)
> A@t1 (stream time: 1)
> A@t2 (stream time: 2)
> A@t3 (stream time: 3)
> B@t0 (stream time: 3)
> B@t1 (stream time: 3)
> B@t2 (stream time: 3)
> B@t3 (stream time: 3)
> {noformat}
> This pattern results in an unfortunate compromise in which folks are required 
> to set the grace period to the max expected time skew, for example 24 hours, 
> or Streams will just drop the second key's data (since it is late). But, this 
> means that if they want to use Suppression for "final results", they have to 
> wait 24 hours for the result.
> This tradeoff is not strictly necessary, though, because each key represents 
> a logically independent sequence of events. Tracking by partition is simply 
> convenient, but typically not logically meaningful. That is, the partitions 
> are just physically independent sequences of events, so it's convenient to 
> track stream time at this granularity. It would be just as correct, and more 
> useful for IOT-like use cases, to track time independently for each key.
> However, before considering this change, we need to solve the 
> testing/low-traffic problem. This is the opposite issue, where a partition 
> doesn't get enough traffic to advance stream time and results remain "stuck" 
> in the suppression buffers. We can provide some mechanism to force the 
> advancement of time across all partitions, for use in testing when you want 
> to flush out all results, or in production when some topic is low volume. We 
> shouldn't consider tracking time _more_ granularly until this problem is 
> solved, since it would just make the low-traffic problem worse.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to