-----BEGIN PGP SIGNED MESSAGE----- Hash: SHA512 Exactly. You need to set the key using KStream#selectKey() and re-distribute data via #through().
About timestamps: you can provide a custom TimestampExtractor that returns the JSON embedded TS instead of record TS (as DefaultTimestampExtractor does) See http://docs.confluent.io/3.0.1/streams/developer-guide.html#timestamp-ex tractor - -Matthias On 10/6/16 2:59 PM, Ali Akhtar wrote: > Sorry, to be clear: > > - Producers post to topic A - Consumers of topic A receive the > data, parse it to find the keys, and post the correct key + message > to Topic B - Topic B is treated as a KTable by 2nd consumer layer, > and its this layer which does the writes to ensure 'last one wins' > (Assuming 'last one' can be determined using the timestamp in the > json of the message) > > On Fri, Oct 7, 2016 at 2:54 AM, Ali Akhtar <ali.rac...@gmail.com> > wrote: > >> Thanks for the reply. >> >> Its not possible to provide keys, unfortunately. (Producer is >> written by a colleague, and said colleague just wants to provide >> whatever data the API gives, and leave all processing of the data >> to me). >> >> Perhaps I can use an intermediate kafka topic, and have producers >> post to that topic w/ whatever data they receive. Then, another >> consumer can listen to that topic, and use it as a KTable to >> process data in the order of 'last one winning'. >> >> However, the source of truth on the time of the messages, is >> embedded in the message itself, its not the Kafka internal >> timestamp. >> >> The message is a json string, which contains a 'timestamp' >> field, containing a string timestamp, and that string timestamp >> is the source of truth on when this message was generated. >> >> So, is it possible to use a KTable which lets me parse the >> message and return the time which is contained inside the >> message, and use THAT time for sorting the messages? >> >> On Fri, Oct 7, 2016 at 2:33 AM, Matthias J. Sax >> <matth...@confluent.io> wrote: >> > It is not global in this sense. > > Thus, you need to ensure that records updating the same product, go > to the same instance. You can ensure this, by given all records of > the same product the same key and "groupByKey" before processing > the data. > > -Matthias > > On 10/6/16 10:55 AM, Ali Akhtar wrote: >>>>> Thank you, State Store seems promising. But, is it >>>>> distributed, or limited to the particular instance of my >>>>> application? >>>>> >>>>> I.e if there are 3 messages, setting product 1's price to >>>>> $1, $3, and $5, and all 3 of them go to a different >>>>> instance of my application, will they be able to correctly >>>>> identify the latest received message using State Store? >>>>> >>>>> On Thu, Oct 6, 2016 at 10:48 PM, Matthias J. Sax >>>>> <matth...@confluent.io> wrote: >>>>> >>>>> What do you mean by "message keys are random" -- do you >>>>> effectively have no keys and want all messages to be >>>>> processed as if they all have the same key? >>>>> >>>>> To access record TS in general, you need to use Processor >>>>> API. The given ProcessorContext object given by >>>>> Processor#init() always return the timestamp of the >>>>> currently processed on #timestamp(). >>>>> >>>>> Thus, you can attach a state store to your processor and >>>>> compare the timestamps of the current record with the >>>>> timestamp of the one in your store. >>>>> >>>>> -Matthias >>>>> >>>>> On 10/6/16 8:52 AM, Ali Akhtar wrote: >>>>>>>> Heya, >>>>>>>> >>>>>>>> I have some Kafka producers, which are listening to >>>>>>>> webhook events, and for each webhook event, they post >>>>>>>> its payload to a Kafka topic. >>>>>>>> >>>>>>>> Each payload contains a timestamp from the webhook >>>>>>>> source. >>>>>>>> >>>>>>>> This timestamp is the source of truth about which >>>>>>>> events happened first, which happened last, etc. >>>>>>>> >>>>>>>> I need to ensure that the last arrival of a >>>>>>>> particular type of message wins. >>>>>>>> >>>>>>>> E.g, if there are 5 messages, saying the price of a >>>>>>>> product with id 1, was set to $1, then $3, then >>>>>>>> something else, etc, before finally being set to $10, >>>>>>>> then I need to make sure that the final price for >>>>>>>> that product is $10. >>>>>>>> >>>>>>>> These messages can be out of order, and I need to >>>>>>>> determine the latest arrival based on the timestamp >>>>>>>> from the webhook source. (Its atm in a string format >>>>>>>> which can be parsed) >>>>>>>> >>>>>>>> Since KTable looks like it uses message keys to >>>>>>>> determine what happens - and in this case, the >>>>>>>> message keys are random, and the timestamp contained >>>>>>>> in the value of the message is what determines the >>>>>>>> order of the events - any pointers on what the best >>>>>>>> way to do this is? >>>>>>>> >>>>>>>> I'm using kafka streaming, latest version, Java. >>>>>>>> >>>>>> >>>>> >>> >> >> > -----BEGIN PGP SIGNATURE----- Comment: GPGTools - https://gpgtools.org iQIcBAEBCgAGBQJX9tKmAAoJECnhiMLycopPQ50P/2wibksuel/cAu0EY1Des9sg AQPxobWD2UITOnzuKeRWZdSOYOLTyvKSy15uFcDmWDDnNH3bQcXO2ElND/6KW+17 wz6YS/btBx/LOhvYRKv8gDz5vAT6EYFevF5royBuiSCGhTZBO829m3b3uSqyAjoT 81lp9nQAjlswveS7hlmD91abmuM4/bbs2f0+CAIFCPiszigkaSR2WOLHzsJs3zfl n67jsQ/HPqMnksp2slLlvzfReYFkk+RPsNImAkXAp65tcp0XM9n/RKbrh5LpFcGY oBRx3C5sJT7xi6jr2EJau9OgsrBD+h+CSH5ipgtbZ4f8X5u2NkfU9TL2Lb+zol/e x2U2fD6dTlCh7w+dLFSqXnMUixKKTYI9xP/gS7ASeRH/f/lw0R0srNTUB5JVtkTY M5KkAcvRkdmrlnl2uOqLZlhwc8wbUwVL6SSgU+qPuk8nHIri5WiMiitrE0a6hx3F HT4GWtDJwCdOkNOcA8xZvWp3VHoXHf56Qm/U3Rq2RBE+aWINEIKRrxyjqjB4YvrO 8xWU9YYutxzMNwgrniThWnll//dE87W6nwAOb5By72OIa36yspBzpLkQsqa9orbP HxebhQCTdU95fCVhF5/IpNhqNLMHhRwb0UtWkEbiZm8GIom8qeN5TPQitjMujOwu DZfSLSO+fQSrSRQBMJ4z =R9KG -----END PGP SIGNATURE-----