Hi Guozhang

That would certainly work for eliminating those duplicate values. As it
stands right now, this would be consistent with swallowing changes due to
out-of-order processing with multiple threads, and seems like a very
reasonable way forward. Thank you for the suggestion!

I have been trying to think if there are any other scenarios where we can
end up with duplicates, though I have not been able to identify any others
at the moment. I will think on it a bit more, but if anyone else has any
ideas, please chime in.

Thanks,
Adam



On Thu, Mar 7, 2019 at 8:19 PM Guozhang Wang <wangg...@gmail.com> wrote:

> One more thought regarding *c-P2: Duplicates)*: first I want to separate
> this issue with the more general issue that today (not only foreign-key,
> but also co-partition primary-key) table-table joins is still not strictly
> respecting the timestamp ordering since the two changelog streams may be
> fetched and hence processed out-of-order and we do not allow a record to be
> joined with the other table at any given time snapshot yet. So ideally when
> there are two changelog records (k1, (f-k1, v1)), (k1, (f-k1, v2)) coming
> at the left hand table and one record (f-k1, v3) at the right hand table,
> depending on the processing ordering we may get:
>
> (k1, (f-k1, v2-v3))
>
> or
>
> (k1, (f-k1, v1-v3))
> (k1, (f-k1, v2-v3))
>
> And this is not to be addressed by this KIP.
>
> What I would advocate is to fix the issue that is introduced in this KIP
> alone, that is we may have
>
> (k1, (f-k1, v2-v3))   // this should actually be v1-v3
> (k1, (f-k1, v2-v3))
>
> I admit that it does not have correctness issue from the semantics along,
> comparing it with "discarding the first result", but it may be confusing
> from user's observation who do not expect to see the seemingly duplicates.
> On the other hand, I think there's a light solution to avoid it, which is
> that we can still optimize away to not send the full payload of "v1" from
> left hand side to right hand side, but instead of just trimming off the
> whole bytes, we can send, e.g., an MD5 hash of the bytes (I'm using MD5
> here just as an example, we can definitely replace it with other
> functions), by doing which we can discard the join operation if the hash
> value sent back from the right hand side does not match with the left hand
> side any more, i.e. we will only send:
>
> (k1, (f-k1, v2-v3))
>
> to down streams once.
>
> WDYT?
>
>
> Guozhang
>
>
> On Wed, Mar 6, 2019 at 7:58 AM Adam Bellemare <adam.bellem...@gmail.com>
> wrote:
>
> > Ah yes, I recall it all now. That answers that question as to why I had
> > caching disabled. I can certainly re-enable it since I believe the main
> > concern was simply about reconciling those two iterators. A lack of
> > knowledge there on my part.
> >
> >
> > Thank you John for weighing in - we certainly both do appreciate it. I
> > think that John hits it on the head though with his comment of "If it
> turns
> > out we're wrong about this, then it should be possible to fix the
> semantics
> > in place, without messing with the API."
> >
> > If anyone else would like to weigh in, your thoughts would be greatly
> > appreciated.
> >
> > Thanks
> >
> > On Tue, Mar 5, 2019 at 6:05 PM Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> > > >> I dont know how to range scan over a caching store, probably one had
> > > >> to open 2 iterators and merge them.
> > >
> > > That happens automatically. If you query a cached KTable, it ranges
> over
> > > the cache and the underlying RocksDB and performs the merging under the
> > > hood.
> > >
> > > >> Other than that, I still think even the regualr join is broken with
> > > >> caching enabled right?
> > >
> > > Why? To me, if you use the word "broker", it implies conceptually
> > > incorrect; I don't see this.
> > >
> > > > I once files a ticket, because with caching
> > > >>> enabled it would return values that havent been published
> downstream
> > > yet.
> > >
> > > For the bug report, if found
> > > https://issues.apache.org/jira/browse/KAFKA-6599. We still need to fix
> > > this, but it is a regular bug as any other, and we should not change a
> > > design because of a bug.
> > >
> > > That range() returns values that have not been published downstream if
> > > caching is enabled is how caching works and is intended behavior. Not
> > > sure why say it's incorrect?
> > >
> > >
> > > -Matthias
> > >
> > >
> > > On 3/5/19 1:49 AM, Jan Filipiak wrote:
> > > >
> > > >
> > > > On 04.03.2019 19:14, Matthias J. Sax wrote:
> > > >> Thanks Adam,
> > > >>
> > > >> *> Q) Range scans work with caching enabled, too. Thus, there is no
> > > >> functional/correctness requirement to disable caching. I cannot
> > > >> remember why Jan's proposal added this? It might be an
> > > >> implementation detail though (maybe just remove it from the KIP?
> > > >> -- might be miss leading).
> > > >
> > > > I dont know how to range scan over a caching store, probably one had
> > > > to open 2 iterators and merge them.
> > > >
> > > > Other than that, I still think even the regualr join is broken with
> > > > caching enabled right? I once files a ticket, because with caching
> > > > enabled it would return values that havent been published downstream
> > yet.
> > > >
> > >
> > >
> >
>
>
> --
> -- Guozhang
>

Reply via email to