I am probably being too ocd anyway. It will almost never happen that messages from another vm in the same network on ec2 arrive out of order. Right?
On 13 Oct 2016 8:47 pm, "Ali Akhtar" <ali.rac...@gmail.com> wrote: > Makes sense. Thanks > > On 13 Oct 2016 12:42 pm, "Michael Noll" <mich...@confluent.io> wrote: > >> > But if they arrive out of order, I have to detect / process that myself >> in >> > the processor logic. >> >> Yes -- if your processing logic depends on the specific ordering of >> messages (which is the case for you), then you must manually implement >> this >> ordering-specific logic at the moment. >> >> Other use cases may not need to do that and "just work" even with >> out-of-order data. If, for example, you are counting objects or are >> computing the sum of numbers, then you do not need to anything special. >> >> >> >> >> >> On Wed, Oct 12, 2016 at 10:22 PM, Ali Akhtar <ali.rac...@gmail.com> >> wrote: >> >> > Thanks Matthias. >> > >> > So, if I'm understanding this right, Kafka will not discard which >> messages >> > which arrive out of order. >> > >> > What it will do is show messages in the order in which they arrive. >> > >> > But if they arrive out of order, I have to detect / process that myself >> in >> > the processor logic. >> > >> > Is that correct? >> > >> > Thanks. >> > >> > On Wed, Oct 12, 2016 at 11:37 PM, Matthias J. Sax < >> matth...@confluent.io> >> > wrote: >> > >> > > -----BEGIN PGP SIGNED MESSAGE----- >> > > Hash: SHA512 >> > > >> > > Last question first: A KTable is basically in finite window over the >> > > whole stream providing a single result (that gets updated when new >> > > data arrives). If you use windows, you cut the overall stream into >> > > finite subsets and get a result per window. Thus, I guess you do not >> > > need windows (if I understood you use case correctly). >> > > >> > > However, current state of Kafka Streams DSL, you will not be able to >> > > use KTable (directly -- see suggestion to fix this below) because is >> > > does (currently) not allow to access the timestamp of the current >> > > record (thus, you can not know if a record is late or not). You will >> > > need to use Processor API which allows you to access the current >> > > records timestamp via the Context object given in init() >> > > >> > > Your reasoning about partitions and Streams instances is correct. >> > > However, the following two are not >> > > >> > > > - Because I'm using a KTable, the timestamp of the messages is >> > > > extracted, and I'm not shown the older bid because I've already >> > > > processed the later bid. The older bid is ignored. >> > > >> > > and >> > > >> > > > - Because of this, the replica already knows which timestamps it >> > > > has processed, and is able to ignore the older messages. >> > > >> > > Late arriving records are not dropped but processes regularly. Thus, >> > > your KTable aggregate function will be called for the late arriving >> > > record, too (but as described about, you have currently no way to know >> > > it is a later record). >> > > >> > > >> > > Last but not least, you last statement is a valid concern: >> > > >> > > > Also, what will happen if bid 2 arrived and got processed, and then >> > > > the particular replica crashed, and was restarted. The restarted >> > > > replica won't have any memory of which timestamps it has previously >> > > > processed. >> > > > >> > > > So if bid 2 got processed, replica crashed and restarted, and then >> > > > bid 1 arrived, what would happen in that case? >> > > >> > > In order to make this work, you would need to store the timestamp in >> > > you store next to the actual data. Thus, you can compare the timestamp >> > > of the latest result (safely stored in operator state) with the >> > > timestamp of the current record. >> > > >> > > Does this makes sense? >> > > >> > > To fix you issue, you could add a .transformValue() before you KTable, >> > > which allows you to access the timestamp of a record. If you add this >> > > timestamp to you value and pass it to KTable afterwards, you can >> > > access it and it gets also store reliably. >> > > >> > > <bid_id : bid_value> => transformValue => <bid_id : {bid_value, >> > > timestamp} => aggregate >> > > >> > > Hope this helps. >> > > >> > > - -Matthias >> > > >> > > >> > > On 10/11/16 9:12 PM, Ali Akhtar wrote: >> > > > P.S, does my scenario require using windows, or can it be achieved >> > > > using just KTable? >> > > > >> > > > On Wed, Oct 12, 2016 at 8:56 AM, Ali Akhtar <ali.rac...@gmail.com> >> > > > wrote: >> > > > >> > > >> Heya, >> > > >> >> > > >> Say I'm building a live auction site, with different products. >> > > >> Different users will bid on different products. And each time >> > > >> they do, I want to update the product's price, so it should >> > > >> always have the latest price in place. >> > > >> >> > > >> Example: Person 1 bids $3 on Product A, and Person 2 bids $5 on >> > > >> the same product 100 ms later. >> > > >> >> > > >> The second bid arrives first and the price is updated to $5. Then >> > > >> the first bid arrives. I want the price to not be updated in this >> > > >> case, as this bid is older than the one I've already processed. >> > > >> >> > > >> Here's my understanding of how I can achieve this with Kafka >> > > >> Streaming - is my understanding correct? >> > > >> >> > > >> - I have a topic for receiving bids. The topic has N partitions, >> > > >> and I have N replicas of my application which hooks up w/ Kafka >> > > >> Streaming, up and running. >> > > >> >> > > >> - I assume each replica of my app will listen to a different >> > > >> partition of the topic. >> > > >> >> > > >> - A user makes a bid on product A. >> > > >> >> > > >> - This is pushed to the topic with the key bid_a >> > > >> >> > > >> - Another user makes a bid. This is also pushed with the same key >> > > >> (bid_a) >> > > >> >> > > >> - The 2nd bid arrives first, and gets processed. Then the first >> > > >> (older) bid arrives. >> > > >> >> > > >> - Because I'm using a KTable, the timestamp of the messages is >> > > >> extracted, and I'm not shown the older bid because I've already >> > > >> processed the later bid. The older bid is ignored. >> > > >> >> > > >> - All bids on product A go to the same topic partition, and hence >> > > >> the same replica of my app, because they all have the key bid_a. >> > > >> >> > > >> - Because of this, the replica already knows which timestamps it >> > > >> has processed, and is able to ignore the older messages. >> > > >> >> > > >> Is the above understandning correct? >> > > >> >> > > >> Also, what will happen if bid 2 arrived and got processed, and >> > > >> then the particular replica crashed, and was restarted. The >> > > >> restarted replica won't have any memory of which timestamps it >> > > >> has previously processed. >> > > >> >> > > >> So if bid 2 got processed, replica crashed and restarted, and >> > > >> then bid 1 arrived, what would happen in that case? >> > > >> >> > > >> Thanks. >> > > >> >> > > > >> > > -----BEGIN PGP SIGNATURE----- >> > > Comment: GPGTools - https://gpgtools.org >> > > >> > > iQIcBAEBCgAGBQJX/oLPAAoJECnhiMLycopP8akP/3Fo24Xeu1/0LuNdBuwTlJd7 >> > > 6r9WrSiSbpiVlWoA1dRjSrkjQoUOwgAD6vXji5Jb8BIT5tMi57KQVrTmXWz/feuy >> > > 6qJIvfxj8vYdFLTcTOYZKWCEHQK1am2SGkFEeZKY0BbABNqwWzx6lWAJxKlxoBcn >> > > AXi+IZn07fTvQeShahwg7pLL5xbbE4u6w7YBNqTuvlYNglKI2CUK1EE2jw5Gp2sy >> > > sjnHCIXDCBhFYyxxdKWTsfHEV74wUI4ARvRChJondY/uRxc5u+INCNax79N2Syq9 >> > > S/ffQvaCS5PJ0nwcv2Gu7WDkrxVu+sP+nwSoxoE3bE1iYH91KLmdLlmBnJ9j+6g/ >> > > i7P7+kwf4a04KMZtGXCU2ZGQjnSlIsjTSFuEE8ASFeRkzGBhM1zDoMNHys6dQDSR >> > > lgB8eIay2jknUeWR+NJLuerwJZTPYfnlPBZ1jYoaKKsnHDleS69sn0BstphZ/3k5 >> > > fsQz435/emecRZI6Vok9+9FvehPmJ0Jsz70sUlhJS7hvpJ+0D+aI0VbRAUxML7QX >> > > 7IOw3gLGi8K+bCGxB80AidbSGvzcuEqyrW/9wPttgIuqFjfGcF80nyKsvvgySLnE >> > > 0RlM0qm24fzCzxFlNZQEJrmJ9YsaNWCQ4qhzuwGhQC1bBEa10Jy5Dqjj1lwA/G+v >> > > wLVWRn2J0n9mKSiOnHki >> > > =oJIL >> > > -----END PGP SIGNATURE----- >> > > >> > >> >