Heya,
I have some Kafka producers, which are listening to webhook events, and for
each webhook event, they post its payload to a Kafka topic.
Each payload contains a timestamp from the webhook source.
This timestamp is the source of truth about which events happened first,
which happened last, e
-BEGIN PGP SIGNED MESSAGE-
Hash: SHA512
What do you mean by "message keys are random" -- do you effectively
have no keys and want all messages to be processed as if they all have
the same key?
To access record TS in general, you need to use Processor API. The
given ProcessorContext object
Thank you, State Store seems promising. But, is it distributed, or limited
to the particular instance of my application?
I.e if there are 3 messages, setting product 1's price to $1, $3, and $5,
and all 3 of them go to a different instance of my application, will they
be able to correctly identify
-BEGIN PGP SIGNED MESSAGE-
Hash: SHA512
It is not global in this sense.
Thus, you need to ensure that records updating the same product, go to
the same instance. You can ensure this, by given all records of the
same product the same key and "groupByKey" before processing the data.
- -Mat
Thanks for the reply.
Its not possible to provide keys, unfortunately. (Producer is written by a
colleague, and said colleague just wants to provide whatever data the API
gives, and leave all processing of the data to me).
Perhaps I can use an intermediate kafka topic, and have producers post to
Sorry, to be clear:
- Producers post to topic A
- Consumers of topic A receive the data, parse it to find the keys, and
post the correct key + message to Topic B
- Topic B is treated as a KTable by 2nd consumer layer, and its this layer
which does the writes to ensure 'last one wins' (Assuming 'la
-BEGIN PGP SIGNED MESSAGE-
Hash: SHA512
Exactly. You need to set the key using KStream#selectKey() and
re-distribute data via #through().
About timestamps: you can provide a custom TimestampExtractor that
returns the JSON embedded TS instead of record TS (as
DefaultTimestampExtractor does
Thanks! That looks perfect.
Last q.. is there any shortcut to having the json string messages
automatically get serialized to their equivalent Java class via Jackson, or
such?
Perhaps I can write a Serde impl which takes the java.lang.Class of the
class to be mapped, and maps it via Jackson?
On
-BEGIN PGP SIGNED MESSAGE-
Hash: SHA512
Yes, that should work.
On 10/6/16 3:54 PM, Ali Akhtar wrote:
> Thanks! That looks perfect.
>
> Last q.. is there any shortcut to having the json string messages
> automatically get serialized to their equivalent Java class via
> Jackson, or such?