-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

Exactly. You need to set the key using KStream#selectKey() and
re-distribute data via #through().

About timestamps: you can provide a custom TimestampExtractor that
returns the JSON embedded TS instead of record TS (as
DefaultTimestampExtractor does)

See
http://docs.confluent.io/3.0.1/streams/developer-guide.html#timestamp-ex
tractor


- -Matthias

On 10/6/16 2:59 PM, Ali Akhtar wrote:
> Sorry, to be clear:
> 
> - Producers post to topic A - Consumers of topic A receive the
> data, parse it to find the keys, and post the correct key + message
> to Topic B - Topic B is treated as a KTable by 2nd consumer layer,
> and its this layer which does the writes to ensure 'last one wins'
> (Assuming 'last one' can be determined using the timestamp in the
> json of the message)
> 
> On Fri, Oct 7, 2016 at 2:54 AM, Ali Akhtar <ali.rac...@gmail.com>
> wrote:
> 
>> Thanks for the reply.
>> 
>> Its not possible to provide keys, unfortunately. (Producer is
>> written by a colleague, and said colleague just wants to provide
>> whatever data the API gives, and leave all processing of the data
>> to me).
>> 
>> Perhaps I can use an intermediate kafka topic, and have producers
>> post to that topic w/ whatever data they receive. Then, another
>> consumer can listen to that topic, and use it as a KTable to
>> process data in the order of 'last one winning'.
>> 
>> However, the source of truth on the time of the messages, is
>> embedded in the message itself, its not the Kafka internal
>> timestamp.
>> 
>> The message is a json string, which contains a 'timestamp'
>> field, containing a string timestamp, and that string timestamp
>> is the source of truth on when this message was generated.
>> 
>> So, is it possible to use a KTable which lets me parse the
>> message and return the time  which is contained inside the
>> message, and use THAT time for sorting the messages?
>> 
>> On Fri, Oct 7, 2016 at 2:33 AM, Matthias J. Sax
>> <matth...@confluent.io> wrote:
>> 
> It is not global in this sense.
> 
> Thus, you need to ensure that records updating the same product, go
> to the same instance. You can ensure this, by given all records of
> the same product the same key and "groupByKey" before processing
> the data.
> 
> -Matthias
> 
> On 10/6/16 10:55 AM, Ali Akhtar wrote:
>>>>> Thank you, State Store seems promising. But, is it
>>>>> distributed, or limited to the particular instance of my
>>>>> application?
>>>>> 
>>>>> I.e if there are 3 messages, setting product 1's price to
>>>>> $1, $3, and $5, and all 3 of them go to a different
>>>>> instance of my application, will they be able to correctly
>>>>> identify the latest received message using State Store?
>>>>> 
>>>>> On Thu, Oct 6, 2016 at 10:48 PM, Matthias J. Sax 
>>>>> <matth...@confluent.io> wrote:
>>>>> 
>>>>> What do you mean by "message keys are random" -- do you 
>>>>> effectively have no keys and want all messages to be
>>>>> processed as if they all have the same key?
>>>>> 
>>>>> To access record TS in general, you need to use Processor
>>>>> API. The given ProcessorContext object given by
>>>>> Processor#init() always return the timestamp of the
>>>>> currently processed on #timestamp().
>>>>> 
>>>>> Thus, you can attach a state store to your processor and
>>>>> compare the timestamps of the current record with the
>>>>> timestamp of the one in your store.
>>>>> 
>>>>> -Matthias
>>>>> 
>>>>> On 10/6/16 8:52 AM, Ali Akhtar wrote:
>>>>>>>> Heya,
>>>>>>>> 
>>>>>>>> I have some Kafka producers, which are listening to
>>>>>>>> webhook events, and for each webhook event, they post
>>>>>>>> its payload to a Kafka topic.
>>>>>>>> 
>>>>>>>> Each payload contains a timestamp from the webhook
>>>>>>>> source.
>>>>>>>> 
>>>>>>>> This timestamp is the source of truth about which
>>>>>>>> events happened first, which happened last, etc.
>>>>>>>> 
>>>>>>>> I need to ensure that the last arrival of a
>>>>>>>> particular type of message wins.
>>>>>>>> 
>>>>>>>> E.g, if there are 5 messages, saying the price of a
>>>>>>>> product with id 1, was set to $1, then $3, then
>>>>>>>> something else, etc, before finally being set to $10,
>>>>>>>> then I need to make sure that the final price for
>>>>>>>> that product is $10.
>>>>>>>> 
>>>>>>>> These messages can be out of order, and I need to
>>>>>>>> determine the latest arrival based on the timestamp
>>>>>>>> from the webhook source. (Its atm in a string format
>>>>>>>> which can be parsed)
>>>>>>>> 
>>>>>>>> Since KTable looks like it uses message keys to
>>>>>>>> determine what happens - and in this case, the
>>>>>>>> message keys are random, and the timestamp contained
>>>>>>>> in the value of the message is what determines the
>>>>>>>> order of the events - any pointers on what the best
>>>>>>>> way to do this is?
>>>>>>>> 
>>>>>>>> I'm using kafka streaming, latest version, Java.
>>>>>>>> 
>>>>>> 
>>>>> 
>>> 
>> 
>> 
> 
-----BEGIN PGP SIGNATURE-----
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJX9tKmAAoJECnhiMLycopPQ50P/2wibksuel/cAu0EY1Des9sg
AQPxobWD2UITOnzuKeRWZdSOYOLTyvKSy15uFcDmWDDnNH3bQcXO2ElND/6KW+17
wz6YS/btBx/LOhvYRKv8gDz5vAT6EYFevF5royBuiSCGhTZBO829m3b3uSqyAjoT
81lp9nQAjlswveS7hlmD91abmuM4/bbs2f0+CAIFCPiszigkaSR2WOLHzsJs3zfl
n67jsQ/HPqMnksp2slLlvzfReYFkk+RPsNImAkXAp65tcp0XM9n/RKbrh5LpFcGY
oBRx3C5sJT7xi6jr2EJau9OgsrBD+h+CSH5ipgtbZ4f8X5u2NkfU9TL2Lb+zol/e
x2U2fD6dTlCh7w+dLFSqXnMUixKKTYI9xP/gS7ASeRH/f/lw0R0srNTUB5JVtkTY
M5KkAcvRkdmrlnl2uOqLZlhwc8wbUwVL6SSgU+qPuk8nHIri5WiMiitrE0a6hx3F
HT4GWtDJwCdOkNOcA8xZvWp3VHoXHf56Qm/U3Rq2RBE+aWINEIKRrxyjqjB4YvrO
8xWU9YYutxzMNwgrniThWnll//dE87W6nwAOb5By72OIa36yspBzpLkQsqa9orbP
HxebhQCTdU95fCVhF5/IpNhqNLMHhRwb0UtWkEbiZm8GIom8qeN5TPQitjMujOwu
DZfSLSO+fQSrSRQBMJ4z
=R9KG
-----END PGP SIGNATURE-----

Reply via email to