Hi all,

This proposal sounds good to me, especially since we observe that people
are already confused when the see duplicate results coming out of 1:1 joins
(which is a bug). I take this as "evidence" that we're better off
eliminating those duplicates from the start. Guozhang's proposal seems like
a lightweight solution to the problem, so FWIW, I'm in favor.

Thanks,
-John

On Fri, Mar 8, 2019 at 7:59 AM Adam Bellemare <adam.bellem...@gmail.com>
wrote:

> Hi Guozhang
>
> That would certainly work for eliminating those duplicate values. As it
> stands right now, this would be consistent with swallowing changes due to
> out-of-order processing with multiple threads, and seems like a very
> reasonable way forward. Thank you for the suggestion!
>
> I have been trying to think if there are any other scenarios where we can
> end up with duplicates, though I have not been able to identify any others
> at the moment. I will think on it a bit more, but if anyone else has any
> ideas, please chime in.
>
> Thanks,
> Adam
>
>
>
> On Thu, Mar 7, 2019 at 8:19 PM Guozhang Wang <wangg...@gmail.com> wrote:
>
> > One more thought regarding *c-P2: Duplicates)*: first I want to separate
> > this issue with the more general issue that today (not only foreign-key,
> > but also co-partition primary-key) table-table joins is still not
> strictly
> > respecting the timestamp ordering since the two changelog streams may be
> > fetched and hence processed out-of-order and we do not allow a record to
> be
> > joined with the other table at any given time snapshot yet. So ideally
> when
> > there are two changelog records (k1, (f-k1, v1)), (k1, (f-k1, v2)) coming
> > at the left hand table and one record (f-k1, v3) at the right hand table,
> > depending on the processing ordering we may get:
> >
> > (k1, (f-k1, v2-v3))
> >
> > or
> >
> > (k1, (f-k1, v1-v3))
> > (k1, (f-k1, v2-v3))
> >
> > And this is not to be addressed by this KIP.
> >
> > What I would advocate is to fix the issue that is introduced in this KIP
> > alone, that is we may have
> >
> > (k1, (f-k1, v2-v3))   // this should actually be v1-v3
> > (k1, (f-k1, v2-v3))
> >
> > I admit that it does not have correctness issue from the semantics along,
> > comparing it with "discarding the first result", but it may be confusing
> > from user's observation who do not expect to see the seemingly
> duplicates.
> > On the other hand, I think there's a light solution to avoid it, which is
> > that we can still optimize away to not send the full payload of "v1" from
> > left hand side to right hand side, but instead of just trimming off the
> > whole bytes, we can send, e.g., an MD5 hash of the bytes (I'm using MD5
> > here just as an example, we can definitely replace it with other
> > functions), by doing which we can discard the join operation if the hash
> > value sent back from the right hand side does not match with the left
> hand
> > side any more, i.e. we will only send:
> >
> > (k1, (f-k1, v2-v3))
> >
> > to down streams once.
> >
> > WDYT?
> >
> >
> > Guozhang
> >
> >
> > On Wed, Mar 6, 2019 at 7:58 AM Adam Bellemare <adam.bellem...@gmail.com>
> > wrote:
> >
> > > Ah yes, I recall it all now. That answers that question as to why I had
> > > caching disabled. I can certainly re-enable it since I believe the main
> > > concern was simply about reconciling those two iterators. A lack of
> > > knowledge there on my part.
> > >
> > >
> > > Thank you John for weighing in - we certainly both do appreciate it. I
> > > think that John hits it on the head though with his comment of "If it
> > turns
> > > out we're wrong about this, then it should be possible to fix the
> > semantics
> > > in place, without messing with the API."
> > >
> > > If anyone else would like to weigh in, your thoughts would be greatly
> > > appreciated.
> > >
> > > Thanks
> > >
> > > On Tue, Mar 5, 2019 at 6:05 PM Matthias J. Sax <matth...@confluent.io>
> > > wrote:
> > >
> > > > >> I dont know how to range scan over a caching store, probably one
> had
> > > > >> to open 2 iterators and merge them.
> > > >
> > > > That happens automatically. If you query a cached KTable, it ranges
> > over
> > > > the cache and the underlying RocksDB and performs the merging under
> the
> > > > hood.
> > > >
> > > > >> Other than that, I still think even the regualr join is broken
> with
> > > > >> caching enabled right?
> > > >
> > > > Why? To me, if you use the word "broker", it implies conceptually
> > > > incorrect; I don't see this.
> > > >
> > > > > I once files a ticket, because with caching
> > > > >>> enabled it would return values that havent been published
> > downstream
> > > > yet.
> > > >
> > > > For the bug report, if found
> > > > https://issues.apache.org/jira/browse/KAFKA-6599. We still need to
> fix
> > > > this, but it is a regular bug as any other, and we should not change
> a
> > > > design because of a bug.
> > > >
> > > > That range() returns values that have not been published downstream
> if
> > > > caching is enabled is how caching works and is intended behavior. Not
> > > > sure why say it's incorrect?
> > > >
> > > >
> > > > -Matthias
> > > >
> > > >
> > > > On 3/5/19 1:49 AM, Jan Filipiak wrote:
> > > > >
> > > > >
> > > > > On 04.03.2019 19:14, Matthias J. Sax wrote:
> > > > >> Thanks Adam,
> > > > >>
> > > > >> *> Q) Range scans work with caching enabled, too. Thus, there is
> no
> > > > >> functional/correctness requirement to disable caching. I cannot
> > > > >> remember why Jan's proposal added this? It might be an
> > > > >> implementation detail though (maybe just remove it from the KIP?
> > > > >> -- might be miss leading).
> > > > >
> > > > > I dont know how to range scan over a caching store, probably one
> had
> > > > > to open 2 iterators and merge them.
> > > > >
> > > > > Other than that, I still think even the regualr join is broken with
> > > > > caching enabled right? I once files a ticket, because with caching
> > > > > enabled it would return values that havent been published
> downstream
> > > yet.
> > > > >
> > > >
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>

Reply via email to