Re: Handling out of order messages without KTables

2016-10-06 Thread Matthias J. Sax
-BEGIN PGP SIGNED MESSAGE- Hash: SHA512 Yes, that should work. On 10/6/16 3:54 PM, Ali Akhtar wrote: > Thanks! That looks perfect. > > Last q.. is there any shortcut to having the json string messages > automatically get serialized to their equivalent Java class via > Jackson, or such?

Re: Handling out of order messages without KTables

2016-10-06 Thread Ali Akhtar
Thanks! That looks perfect. Last q.. is there any shortcut to having the json string messages automatically get serialized to their equivalent Java class via Jackson, or such? Perhaps I can write a Serde impl which takes the java.lang.Class of the class to be mapped, and maps it via Jackson? On

Re: Handling out of order messages without KTables

2016-10-06 Thread Matthias J. Sax
-BEGIN PGP SIGNED MESSAGE- Hash: SHA512 Exactly. You need to set the key using KStream#selectKey() and re-distribute data via #through(). About timestamps: you can provide a custom TimestampExtractor that returns the JSON embedded TS instead of record TS (as DefaultTimestampExtractor

Re: Handling out of order messages without KTables

2016-10-06 Thread Ali Akhtar
Sorry, to be clear: - Producers post to topic A - Consumers of topic A receive the data, parse it to find the keys, and post the correct key + message to Topic B - Topic B is treated as a KTable by 2nd consumer layer, and its this layer which does the writes to ensure 'last one wins' (Assuming

Re: Handling out of order messages without KTables

2016-10-06 Thread Ali Akhtar
Thanks for the reply. Its not possible to provide keys, unfortunately. (Producer is written by a colleague, and said colleague just wants to provide whatever data the API gives, and leave all processing of the data to me). Perhaps I can use an intermediate kafka topic, and have producers post to

Re: Handling out of order messages without KTables

2016-10-06 Thread Matthias J. Sax
-BEGIN PGP SIGNED MESSAGE- Hash: SHA512 It is not global in this sense. Thus, you need to ensure that records updating the same product, go to the same instance. You can ensure this, by given all records of the same product the same key and "groupByKey" before processing the data. -

Re: Handling out of order messages without KTables

2016-10-06 Thread Ali Akhtar
Thank you, State Store seems promising. But, is it distributed, or limited to the particular instance of my application? I.e if there are 3 messages, setting product 1's price to $1, $3, and $5, and all 3 of them go to a different instance of my application, will they be able to correctly

Re: Handling out of order messages without KTables

2016-10-06 Thread Matthias J. Sax
-BEGIN PGP SIGNED MESSAGE- Hash: SHA512 What do you mean by "message keys are random" -- do you effectively have no keys and want all messages to be processed as if they all have the same key? To access record TS in general, you need to use Processor API. The given ProcessorContext

Handling out of order messages without KTables

2016-10-06 Thread Ali Akhtar
Heya, I have some Kafka producers, which are listening to webhook events, and for each webhook event, they post its payload to a Kafka topic. Each payload contains a timestamp from the webhook source. This timestamp is the source of truth about which events happened first, which happened last,