Thanks! That looks perfect. Last q.. is there any shortcut to having the json string messages automatically get serialized to their equivalent Java class via Jackson, or such?
Perhaps I can write a Serde<V> impl which takes the java.lang.Class of the class to be mapped, and maps it via Jackson? On Fri, Oct 7, 2016 at 3:39 AM, Matthias J. Sax <matth...@confluent.io> wrote: > -----BEGIN PGP SIGNED MESSAGE----- > Hash: SHA512 > > Exactly. You need to set the key using KStream#selectKey() and > re-distribute data via #through(). > > About timestamps: you can provide a custom TimestampExtractor that > returns the JSON embedded TS instead of record TS (as > DefaultTimestampExtractor does) > > See > http://docs.confluent.io/3.0.1/streams/developer-guide.html#timestamp-ex > tractor > > > - -Matthias > > On 10/6/16 2:59 PM, Ali Akhtar wrote: > > Sorry, to be clear: > > > > - Producers post to topic A - Consumers of topic A receive the > > data, parse it to find the keys, and post the correct key + message > > to Topic B - Topic B is treated as a KTable by 2nd consumer layer, > > and its this layer which does the writes to ensure 'last one wins' > > (Assuming 'last one' can be determined using the timestamp in the > > json of the message) > > > > On Fri, Oct 7, 2016 at 2:54 AM, Ali Akhtar <ali.rac...@gmail.com> > > wrote: > > > >> Thanks for the reply. > >> > >> Its not possible to provide keys, unfortunately. (Producer is > >> written by a colleague, and said colleague just wants to provide > >> whatever data the API gives, and leave all processing of the data > >> to me). > >> > >> Perhaps I can use an intermediate kafka topic, and have producers > >> post to that topic w/ whatever data they receive. Then, another > >> consumer can listen to that topic, and use it as a KTable to > >> process data in the order of 'last one winning'. > >> > >> However, the source of truth on the time of the messages, is > >> embedded in the message itself, its not the Kafka internal > >> timestamp. > >> > >> The message is a json string, which contains a 'timestamp' > >> field, containing a string timestamp, and that string timestamp > >> is the source of truth on when this message was generated. > >> > >> So, is it possible to use a KTable which lets me parse the > >> message and return the time which is contained inside the > >> message, and use THAT time for sorting the messages? > >> > >> On Fri, Oct 7, 2016 at 2:33 AM, Matthias J. Sax > >> <matth...@confluent.io> wrote: > >> > > It is not global in this sense. > > > > Thus, you need to ensure that records updating the same product, go > > to the same instance. You can ensure this, by given all records of > > the same product the same key and "groupByKey" before processing > > the data. > > > > -Matthias > > > > On 10/6/16 10:55 AM, Ali Akhtar wrote: > >>>>> Thank you, State Store seems promising. But, is it > >>>>> distributed, or limited to the particular instance of my > >>>>> application? > >>>>> > >>>>> I.e if there are 3 messages, setting product 1's price to > >>>>> $1, $3, and $5, and all 3 of them go to a different > >>>>> instance of my application, will they be able to correctly > >>>>> identify the latest received message using State Store? > >>>>> > >>>>> On Thu, Oct 6, 2016 at 10:48 PM, Matthias J. Sax > >>>>> <matth...@confluent.io> wrote: > >>>>> > >>>>> What do you mean by "message keys are random" -- do you > >>>>> effectively have no keys and want all messages to be > >>>>> processed as if they all have the same key? > >>>>> > >>>>> To access record TS in general, you need to use Processor > >>>>> API. The given ProcessorContext object given by > >>>>> Processor#init() always return the timestamp of the > >>>>> currently processed on #timestamp(). > >>>>> > >>>>> Thus, you can attach a state store to your processor and > >>>>> compare the timestamps of the current record with the > >>>>> timestamp of the one in your store. > >>>>> > >>>>> -Matthias > >>>>> > >>>>> On 10/6/16 8:52 AM, Ali Akhtar wrote: > >>>>>>>> Heya, > >>>>>>>> > >>>>>>>> I have some Kafka producers, which are listening to > >>>>>>>> webhook events, and for each webhook event, they post > >>>>>>>> its payload to a Kafka topic. > >>>>>>>> > >>>>>>>> Each payload contains a timestamp from the webhook > >>>>>>>> source. > >>>>>>>> > >>>>>>>> This timestamp is the source of truth about which > >>>>>>>> events happened first, which happened last, etc. > >>>>>>>> > >>>>>>>> I need to ensure that the last arrival of a > >>>>>>>> particular type of message wins. > >>>>>>>> > >>>>>>>> E.g, if there are 5 messages, saying the price of a > >>>>>>>> product with id 1, was set to $1, then $3, then > >>>>>>>> something else, etc, before finally being set to $10, > >>>>>>>> then I need to make sure that the final price for > >>>>>>>> that product is $10. > >>>>>>>> > >>>>>>>> These messages can be out of order, and I need to > >>>>>>>> determine the latest arrival based on the timestamp > >>>>>>>> from the webhook source. (Its atm in a string format > >>>>>>>> which can be parsed) > >>>>>>>> > >>>>>>>> Since KTable looks like it uses message keys to > >>>>>>>> determine what happens - and in this case, the > >>>>>>>> message keys are random, and the timestamp contained > >>>>>>>> in the value of the message is what determines the > >>>>>>>> order of the events - any pointers on what the best > >>>>>>>> way to do this is? > >>>>>>>> > >>>>>>>> I'm using kafka streaming, latest version, Java. > >>>>>>>> > >>>>>> > >>>>> > >>> > >> > >> > > > -----BEGIN PGP SIGNATURE----- > Comment: GPGTools - https://gpgtools.org > > iQIcBAEBCgAGBQJX9tKmAAoJECnhiMLycopPQ50P/2wibksuel/cAu0EY1Des9sg > AQPxobWD2UITOnzuKeRWZdSOYOLTyvKSy15uFcDmWDDnNH3bQcXO2ElND/6KW+17 > wz6YS/btBx/LOhvYRKv8gDz5vAT6EYFevF5royBuiSCGhTZBO829m3b3uSqyAjoT > 81lp9nQAjlswveS7hlmD91abmuM4/bbs2f0+CAIFCPiszigkaSR2WOLHzsJs3zfl > n67jsQ/HPqMnksp2slLlvzfReYFkk+RPsNImAkXAp65tcp0XM9n/RKbrh5LpFcGY > oBRx3C5sJT7xi6jr2EJau9OgsrBD+h+CSH5ipgtbZ4f8X5u2NkfU9TL2Lb+zol/e > x2U2fD6dTlCh7w+dLFSqXnMUixKKTYI9xP/gS7ASeRH/f/lw0R0srNTUB5JVtkTY > M5KkAcvRkdmrlnl2uOqLZlhwc8wbUwVL6SSgU+qPuk8nHIri5WiMiitrE0a6hx3F > HT4GWtDJwCdOkNOcA8xZvWp3VHoXHf56Qm/U3Rq2RBE+aWINEIKRrxyjqjB4YvrO > 8xWU9YYutxzMNwgrniThWnll//dE87W6nwAOb5By72OIa36yspBzpLkQsqa9orbP > HxebhQCTdU95fCVhF5/IpNhqNLMHhRwb0UtWkEbiZm8GIom8qeN5TPQitjMujOwu > DZfSLSO+fQSrSRQBMJ4z > =R9KG > -----END PGP SIGNATURE----- >