Hi all, This proposal sounds good to me, especially since we observe that people are already confused when the see duplicate results coming out of 1:1 joins (which is a bug). I take this as "evidence" that we're better off eliminating those duplicates from the start. Guozhang's proposal seems like a lightweight solution to the problem, so FWIW, I'm in favor.
Thanks, -John On Fri, Mar 8, 2019 at 7:59 AM Adam Bellemare <adam.bellem...@gmail.com> wrote: > Hi Guozhang > > That would certainly work for eliminating those duplicate values. As it > stands right now, this would be consistent with swallowing changes due to > out-of-order processing with multiple threads, and seems like a very > reasonable way forward. Thank you for the suggestion! > > I have been trying to think if there are any other scenarios where we can > end up with duplicates, though I have not been able to identify any others > at the moment. I will think on it a bit more, but if anyone else has any > ideas, please chime in. > > Thanks, > Adam > > > > On Thu, Mar 7, 2019 at 8:19 PM Guozhang Wang <wangg...@gmail.com> wrote: > > > One more thought regarding *c-P2: Duplicates)*: first I want to separate > > this issue with the more general issue that today (not only foreign-key, > > but also co-partition primary-key) table-table joins is still not > strictly > > respecting the timestamp ordering since the two changelog streams may be > > fetched and hence processed out-of-order and we do not allow a record to > be > > joined with the other table at any given time snapshot yet. So ideally > when > > there are two changelog records (k1, (f-k1, v1)), (k1, (f-k1, v2)) coming > > at the left hand table and one record (f-k1, v3) at the right hand table, > > depending on the processing ordering we may get: > > > > (k1, (f-k1, v2-v3)) > > > > or > > > > (k1, (f-k1, v1-v3)) > > (k1, (f-k1, v2-v3)) > > > > And this is not to be addressed by this KIP. > > > > What I would advocate is to fix the issue that is introduced in this KIP > > alone, that is we may have > > > > (k1, (f-k1, v2-v3)) // this should actually be v1-v3 > > (k1, (f-k1, v2-v3)) > > > > I admit that it does not have correctness issue from the semantics along, > > comparing it with "discarding the first result", but it may be confusing > > from user's observation who do not expect to see the seemingly > duplicates. > > On the other hand, I think there's a light solution to avoid it, which is > > that we can still optimize away to not send the full payload of "v1" from > > left hand side to right hand side, but instead of just trimming off the > > whole bytes, we can send, e.g., an MD5 hash of the bytes (I'm using MD5 > > here just as an example, we can definitely replace it with other > > functions), by doing which we can discard the join operation if the hash > > value sent back from the right hand side does not match with the left > hand > > side any more, i.e. we will only send: > > > > (k1, (f-k1, v2-v3)) > > > > to down streams once. > > > > WDYT? > > > > > > Guozhang > > > > > > On Wed, Mar 6, 2019 at 7:58 AM Adam Bellemare <adam.bellem...@gmail.com> > > wrote: > > > > > Ah yes, I recall it all now. That answers that question as to why I had > > > caching disabled. I can certainly re-enable it since I believe the main > > > concern was simply about reconciling those two iterators. A lack of > > > knowledge there on my part. > > > > > > > > > Thank you John for weighing in - we certainly both do appreciate it. I > > > think that John hits it on the head though with his comment of "If it > > turns > > > out we're wrong about this, then it should be possible to fix the > > semantics > > > in place, without messing with the API." > > > > > > If anyone else would like to weigh in, your thoughts would be greatly > > > appreciated. > > > > > > Thanks > > > > > > On Tue, Mar 5, 2019 at 6:05 PM Matthias J. Sax <matth...@confluent.io> > > > wrote: > > > > > > > >> I dont know how to range scan over a caching store, probably one > had > > > > >> to open 2 iterators and merge them. > > > > > > > > That happens automatically. If you query a cached KTable, it ranges > > over > > > > the cache and the underlying RocksDB and performs the merging under > the > > > > hood. > > > > > > > > >> Other than that, I still think even the regualr join is broken > with > > > > >> caching enabled right? > > > > > > > > Why? To me, if you use the word "broker", it implies conceptually > > > > incorrect; I don't see this. > > > > > > > > > I once files a ticket, because with caching > > > > >>> enabled it would return values that havent been published > > downstream > > > > yet. > > > > > > > > For the bug report, if found > > > > https://issues.apache.org/jira/browse/KAFKA-6599. We still need to > fix > > > > this, but it is a regular bug as any other, and we should not change > a > > > > design because of a bug. > > > > > > > > That range() returns values that have not been published downstream > if > > > > caching is enabled is how caching works and is intended behavior. Not > > > > sure why say it's incorrect? > > > > > > > > > > > > -Matthias > > > > > > > > > > > > On 3/5/19 1:49 AM, Jan Filipiak wrote: > > > > > > > > > > > > > > > On 04.03.2019 19:14, Matthias J. Sax wrote: > > > > >> Thanks Adam, > > > > >> > > > > >> *> Q) Range scans work with caching enabled, too. Thus, there is > no > > > > >> functional/correctness requirement to disable caching. I cannot > > > > >> remember why Jan's proposal added this? It might be an > > > > >> implementation detail though (maybe just remove it from the KIP? > > > > >> -- might be miss leading). > > > > > > > > > > I dont know how to range scan over a caching store, probably one > had > > > > > to open 2 iterators and merge them. > > > > > > > > > > Other than that, I still think even the regualr join is broken with > > > > > caching enabled right? I once files a ticket, because with caching > > > > > enabled it would return values that havent been published > downstream > > > yet. > > > > > > > > > > > > > > > > > > > > > > -- > > -- Guozhang > > >