Hi,

long long technical story, sorry for that.

I'm dealing with a special case. My input topic receives records containing
an id in the key (and another field for partitioning), and a version number
in the value, amongst other metrics. Records with the same id are sent
every 5 seconds, and the version number increments.

These metrics in the record value are used in aggregations to compute
`sums` and `counts` (then stored in a DB to compute averages), and to
compute a few other data structures like cumulative time buckets. If the
aggregation receives the same record with updated metrics, I have to
decrement `sum` by the metric value of the previous record, and increment
`sum` by the new metric value. Also, the `count` would be incremented by 1
only if the record is seen for the first time (which is not the same as
"version number = 1").

To implement this, we would write a processor which would compute the diff
of metrics by storing the last version of each record in its state. This
diff is sent to the aggregation, this diff also tells if the record was the
first (so `count` is incremented). I think this can only written with the
low level API.
That could work well, except we have a dozen type of records, with a few
metrics each, and quite a few fields to compute in aggregations. Each time
we deal with this type of "duplicate" records, we would have to write all
the code to compute the diffs again, and the aggregation algorithm becomes
way less trivial (we deal with cumulative time buckets, if one knows what I
mean).

So we got another idea, which does not seem to feel right in a *streaming*
environment, and quite inefficient:

====
The goal is to "buffer" records until we're quite sure no new version will
be received. And if a new version is actually received, it's ignored.
A generic low level processor would be used in topologies which receive the
same records with updated metrics and an incremented version.

One state store: contains the records, used to know if a record was already
received and when, and if the record was already transferred.

Algorithm:

On each new record:
- GET the previous record in the store by Key
- ignore the new record if:
-- the record version is lower than the one in the store
-- the record timestamp is at least 5 minutes newer than the one in store
- PUT (and thus replace) the record in the store

Every 1 minute:
- for each record in the store
-- if the record has the field "forwarded == true"
--- DELETE it from the store if the record is one hour old
-- else
--- if the timestamp is more that 5 minutes old
---- PUT the record in the store with the field "forwarded" set to true
---- forward the record
===

Caveats:
- low-level processors don't have access to the record's ingestion
timestamp. So we would have to add it to the record value before producing
the record.
- no secondary indexes, so we do complete iterations on each `ponctuate`
- it feels so wrong

Any suggestions ? It feels like a KStream of KTable records...

Thanks.

Reply via email to