-----BEGIN PGP SIGNED MESSAGE----- Hash: SHA512 Yes, that should work.
On 10/6/16 3:54 PM, Ali Akhtar wrote: > Thanks! That looks perfect. > > Last q.. is there any shortcut to having the json string messages > automatically get serialized to their equivalent Java class via > Jackson, or such? > > Perhaps I can write a Serde<V> impl which takes the java.lang.Class > of the class to be mapped, and maps it via Jackson? > > On Fri, Oct 7, 2016 at 3:39 AM, Matthias J. Sax > <matth...@confluent.io> wrote: > > Exactly. You need to set the key using KStream#selectKey() and > re-distribute data via #through(). > > About timestamps: you can provide a custom TimestampExtractor that > returns the JSON embedded TS instead of record TS (as > DefaultTimestampExtractor does) > > See > http://docs.confluent.io/3.0.1/streams/developer-guide.html#timestamp- ex > > tractor > > > -Matthias > > On 10/6/16 2:59 PM, Ali Akhtar wrote: >>>> Sorry, to be clear: >>>> >>>> - Producers post to topic A - Consumers of topic A receive >>>> the data, parse it to find the keys, and post the correct key >>>> + message to Topic B - Topic B is treated as a KTable by 2nd >>>> consumer layer, and its this layer which does the writes to >>>> ensure 'last one wins' (Assuming 'last one' can be determined >>>> using the timestamp in the json of the message) >>>> >>>> On Fri, Oct 7, 2016 at 2:54 AM, Ali Akhtar >>>> <ali.rac...@gmail.com> wrote: >>>> >>>>> Thanks for the reply. >>>>> >>>>> Its not possible to provide keys, unfortunately. (Producer >>>>> is written by a colleague, and said colleague just wants to >>>>> provide whatever data the API gives, and leave all >>>>> processing of the data to me). >>>>> >>>>> Perhaps I can use an intermediate kafka topic, and have >>>>> producers post to that topic w/ whatever data they receive. >>>>> Then, another consumer can listen to that topic, and use it >>>>> as a KTable to process data in the order of 'last one >>>>> winning'. >>>>> >>>>> However, the source of truth on the time of the messages, >>>>> is embedded in the message itself, its not the Kafka >>>>> internal timestamp. >>>>> >>>>> The message is a json string, which contains a 'timestamp' >>>>> field, containing a string timestamp, and that string >>>>> timestamp is the source of truth on when this message was >>>>> generated. >>>>> >>>>> So, is it possible to use a KTable which lets me parse the >>>>> message and return the time which is contained inside the >>>>> message, and use THAT time for sorting the messages? >>>>> >>>>> On Fri, Oct 7, 2016 at 2:33 AM, Matthias J. Sax >>>>> <matth...@confluent.io> wrote: >>>>> >>>> It is not global in this sense. >>>> >>>> Thus, you need to ensure that records updating the same >>>> product, go to the same instance. You can ensure this, by >>>> given all records of the same product the same key and >>>> "groupByKey" before processing the data. >>>> >>>> -Matthias >>>> >>>> On 10/6/16 10:55 AM, Ali Akhtar wrote: >>>>>>>> Thank you, State Store seems promising. But, is it >>>>>>>> distributed, or limited to the particular instance of >>>>>>>> my application? >>>>>>>> >>>>>>>> I.e if there are 3 messages, setting product 1's >>>>>>>> price to $1, $3, and $5, and all 3 of them go to a >>>>>>>> different instance of my application, will they be >>>>>>>> able to correctly identify the latest received >>>>>>>> message using State Store? >>>>>>>> >>>>>>>> On Thu, Oct 6, 2016 at 10:48 PM, Matthias J. Sax >>>>>>>> <matth...@confluent.io> wrote: >>>>>>>> >>>>>>>> What do you mean by "message keys are random" -- do >>>>>>>> you effectively have no keys and want all messages to >>>>>>>> be processed as if they all have the same key? >>>>>>>> >>>>>>>> To access record TS in general, you need to use >>>>>>>> Processor API. The given ProcessorContext object >>>>>>>> given by Processor#init() always return the timestamp >>>>>>>> of the currently processed on #timestamp(). >>>>>>>> >>>>>>>> Thus, you can attach a state store to your processor >>>>>>>> and compare the timestamps of the current record with >>>>>>>> the timestamp of the one in your store. >>>>>>>> >>>>>>>> -Matthias >>>>>>>> >>>>>>>> On 10/6/16 8:52 AM, Ali Akhtar wrote: >>>>>>>>>>> Heya, >>>>>>>>>>> >>>>>>>>>>> I have some Kafka producers, which are >>>>>>>>>>> listening to webhook events, and for each >>>>>>>>>>> webhook event, they post its payload to a Kafka >>>>>>>>>>> topic. >>>>>>>>>>> >>>>>>>>>>> Each payload contains a timestamp from the >>>>>>>>>>> webhook source. >>>>>>>>>>> >>>>>>>>>>> This timestamp is the source of truth about >>>>>>>>>>> which events happened first, which happened >>>>>>>>>>> last, etc. >>>>>>>>>>> >>>>>>>>>>> I need to ensure that the last arrival of a >>>>>>>>>>> particular type of message wins. >>>>>>>>>>> >>>>>>>>>>> E.g, if there are 5 messages, saying the price >>>>>>>>>>> of a product with id 1, was set to $1, then $3, >>>>>>>>>>> then something else, etc, before finally being >>>>>>>>>>> set to $10, then I need to make sure that the >>>>>>>>>>> final price for that product is $10. >>>>>>>>>>> >>>>>>>>>>> These messages can be out of order, and I need >>>>>>>>>>> to determine the latest arrival based on the >>>>>>>>>>> timestamp from the webhook source. (Its atm in >>>>>>>>>>> a string format which can be parsed) >>>>>>>>>>> >>>>>>>>>>> Since KTable looks like it uses message keys >>>>>>>>>>> to determine what happens - and in this case, >>>>>>>>>>> the message keys are random, and the timestamp >>>>>>>>>>> contained in the value of the message is what >>>>>>>>>>> determines the order of the events - any >>>>>>>>>>> pointers on what the best way to do this is? >>>>>>>>>>> >>>>>>>>>>> I'm using kafka streaming, latest version, >>>>>>>>>>> Java. >>>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>> >>>>> >>>>> >>>> >> > -----BEGIN PGP SIGNATURE----- Comment: GPGTools - https://gpgtools.org iQIcBAEBCgAGBQJX9tcJAAoJECnhiMLycopPIQMQAI/alRCktJdQ9+FQIC81xoCq DnLNTszPIdC7RzA/zqN1lvE1WtkRL8IrqqW8VsqnuermppF1KFWxGL8/x/gwC6qY iCzhL4YU6nRWx0E9VGtfp6A9/aFsRIT5KHXHSugjfqBpcw5Wl1skKt5dsh8Hl2vT 3g5oNeCkxl/8Yh/0yJPOlpT9ie7G+WfS+fEUyYKbd9Em2He6DMixRZ69MbFaq0Wk F0sPJcbOge7JRcPNThCud62N1j4b6sAjRZkorxrNcoU9tkAC9FNyTpCQFF+AoooI XMsU111tsrKIynwNHWwGhm1L6GivJm1NJosM68g3/M/PB2zb8Sc1erokk6C3DdeB CJV34brqUfpG4C8YE2W2s0ROVN38qXm7/+RLI8+qKFrrUHBgPiP6msPa5kPPisLE sAZx0lktRQRn6k3SdtnfjETWIzXYj8cdFO8ALPmSs/81o9db8GkmaQy0e670tAcf i/S8f8wvDyS8G/74kILEYZ1kb0WW2+Yb68C70o7hlHR78yAbouiJuK2J1iCYPluI 4aBza16f0RKdwvGbIq4ZU50bwAvEov1hukjHkuQhHWdpiQErzZufRXjvkGUQwpHw 2LquB3iWsRjROt//bjAm6x63uPt/mu/eRV0LqyD/pSkvr7EM3/WuLEprYRTbkJrc auKVSXbzM4ChDoG7xMqi =JF24 -----END PGP SIGNATURE-----