Hi Guozhang That would certainly work for eliminating those duplicate values. As it stands right now, this would be consistent with swallowing changes due to out-of-order processing with multiple threads, and seems like a very reasonable way forward. Thank you for the suggestion!
I have been trying to think if there are any other scenarios where we can end up with duplicates, though I have not been able to identify any others at the moment. I will think on it a bit more, but if anyone else has any ideas, please chime in. Thanks, Adam On Thu, Mar 7, 2019 at 8:19 PM Guozhang Wang <wangg...@gmail.com> wrote: > One more thought regarding *c-P2: Duplicates)*: first I want to separate > this issue with the more general issue that today (not only foreign-key, > but also co-partition primary-key) table-table joins is still not strictly > respecting the timestamp ordering since the two changelog streams may be > fetched and hence processed out-of-order and we do not allow a record to be > joined with the other table at any given time snapshot yet. So ideally when > there are two changelog records (k1, (f-k1, v1)), (k1, (f-k1, v2)) coming > at the left hand table and one record (f-k1, v3) at the right hand table, > depending on the processing ordering we may get: > > (k1, (f-k1, v2-v3)) > > or > > (k1, (f-k1, v1-v3)) > (k1, (f-k1, v2-v3)) > > And this is not to be addressed by this KIP. > > What I would advocate is to fix the issue that is introduced in this KIP > alone, that is we may have > > (k1, (f-k1, v2-v3)) // this should actually be v1-v3 > (k1, (f-k1, v2-v3)) > > I admit that it does not have correctness issue from the semantics along, > comparing it with "discarding the first result", but it may be confusing > from user's observation who do not expect to see the seemingly duplicates. > On the other hand, I think there's a light solution to avoid it, which is > that we can still optimize away to not send the full payload of "v1" from > left hand side to right hand side, but instead of just trimming off the > whole bytes, we can send, e.g., an MD5 hash of the bytes (I'm using MD5 > here just as an example, we can definitely replace it with other > functions), by doing which we can discard the join operation if the hash > value sent back from the right hand side does not match with the left hand > side any more, i.e. we will only send: > > (k1, (f-k1, v2-v3)) > > to down streams once. > > WDYT? > > > Guozhang > > > On Wed, Mar 6, 2019 at 7:58 AM Adam Bellemare <adam.bellem...@gmail.com> > wrote: > > > Ah yes, I recall it all now. That answers that question as to why I had > > caching disabled. I can certainly re-enable it since I believe the main > > concern was simply about reconciling those two iterators. A lack of > > knowledge there on my part. > > > > > > Thank you John for weighing in - we certainly both do appreciate it. I > > think that John hits it on the head though with his comment of "If it > turns > > out we're wrong about this, then it should be possible to fix the > semantics > > in place, without messing with the API." > > > > If anyone else would like to weigh in, your thoughts would be greatly > > appreciated. > > > > Thanks > > > > On Tue, Mar 5, 2019 at 6:05 PM Matthias J. Sax <matth...@confluent.io> > > wrote: > > > > > >> I dont know how to range scan over a caching store, probably one had > > > >> to open 2 iterators and merge them. > > > > > > That happens automatically. If you query a cached KTable, it ranges > over > > > the cache and the underlying RocksDB and performs the merging under the > > > hood. > > > > > > >> Other than that, I still think even the regualr join is broken with > > > >> caching enabled right? > > > > > > Why? To me, if you use the word "broker", it implies conceptually > > > incorrect; I don't see this. > > > > > > > I once files a ticket, because with caching > > > >>> enabled it would return values that havent been published > downstream > > > yet. > > > > > > For the bug report, if found > > > https://issues.apache.org/jira/browse/KAFKA-6599. We still need to fix > > > this, but it is a regular bug as any other, and we should not change a > > > design because of a bug. > > > > > > That range() returns values that have not been published downstream if > > > caching is enabled is how caching works and is intended behavior. Not > > > sure why say it's incorrect? > > > > > > > > > -Matthias > > > > > > > > > On 3/5/19 1:49 AM, Jan Filipiak wrote: > > > > > > > > > > > > On 04.03.2019 19:14, Matthias J. Sax wrote: > > > >> Thanks Adam, > > > >> > > > >> *> Q) Range scans work with caching enabled, too. Thus, there is no > > > >> functional/correctness requirement to disable caching. I cannot > > > >> remember why Jan's proposal added this? It might be an > > > >> implementation detail though (maybe just remove it from the KIP? > > > >> -- might be miss leading). > > > > > > > > I dont know how to range scan over a caching store, probably one had > > > > to open 2 iterators and merge them. > > > > > > > > Other than that, I still think even the regualr join is broken with > > > > caching enabled right? I once files a ticket, because with caching > > > > enabled it would return values that havent been published downstream > > yet. > > > > > > > > > > > > > > > -- > -- Guozhang >