Hi John

Thanks for the explanation. I wasn't sure how KTable repartition topics
were handled with regards to cleanup but I just wanted to double check to
see if it could cause an issue.

@Matthias
My inclination is to keep the DSL topologies consistent with one another. I
am a bit concerned about scope creep into the header domain, and I am not
sure how much performance would be improved vs. additional complexity. I
think if we go down this approach we should consider a new type of internal
topic so that it's not confused with existing repartition and changelog
topic types. I am more inclined to go with keeping it consistent and
separated into a normal repartition topic and a normal changelog topic
otherwise.

Thanks
Adam






On Mon, Mar 11, 2019 at 1:24 PM Matthias J. Sax <matth...@confluent.io>
wrote:

> I guess Adam suggests, to use compaction for the repartition topic and
> don't purge data. Doing this, would allow us to avoid a store changelog
> topic for the "subscription store" on the RHS. This would be a nice
> optimization.
>
> But the concern about breaking compaction is correct. However, I see it
> as an optimization only and thus, if we keep the topic as plain
> repartition topic and use a separate store changelog topic the issue
> resolves itself.
>
> Maybe we could use headers thought to get this optimization. Do you
> think it's worth to do this optimization or just stick with the simple
> design and two topics (repartition plus changelog)?
>
>
>
> @Adam: thanks for updating the Wiki page. LGTM.
>
>
> -Matthias
>
>
> On 3/11/19 9:24 AM, John Roesler wrote:
> > Hey Adam,
> >
> > That's a good observation, but it wouldn't be a problem for repartition
> > topics because Streams aggressively deletes messages from the reparation
> > topics once it knows they are handled. Thus, we don't need to try and
> cater
> > to the log compactor.
> >
> > Thanks,
> > -John
> >
> > On Mon, Mar 11, 2019 at 9:10 AM Adam Bellemare <adam.bellem...@gmail.com
> >
> > wrote:
> >
> >> For the sake of expediency, I updated the KIP with what I believe we
> have
> >> discussed.
> >>
> >>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable#KIP-213Supportnon-keyjoininginKTable-Tombstones&ForeignKeyChanges
> >>
> >>
> >>
> >> On Mon, Mar 11, 2019 at 8:20 AM Adam Bellemare <
> adam.bellem...@gmail.com>
> >> wrote:
> >>
> >>> My only concern was around compaction of records in the repartition
> >> topic.
> >>> This would simply mean that these records would stick around as their
> >> value
> >>> isn't truly null. Though I know about the usage of compaction on
> >> changelog
> >>> topics, I am a bit fuzzy on where we use compaction in other internal
> >>> topics. So long as this doesn't cause concern I can certainly finish
> off
> >>> the KIP today.
> >>>
> >>> Thanks
> >>>
> >>> Adam
> >>>
> >>> On Sun, Mar 10, 2019 at 11:38 PM Matthias J. Sax <
> matth...@confluent.io>
> >>> wrote:
> >>>
> >>>> I agree that the LHS side must encode this information and tell the
> RHS
> >>>> if a tombstone requires a reply or not.
> >>>>
> >>>>>> Would this pose some sort of verbosity problem in the internal
> >> topics,
> >>>>>> especially if we have to rebuild state off of them?
> >>>>
> >>>> I don't see an issue atm. Can you elaborate how this relates to
> rebuild
> >>>> state?
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>> On 3/10/19 12:25 PM, Adam Bellemare wrote:
> >>>>> Hi Matthias
> >>>>>
> >>>>> I have been mulling over the unsubscribe / delete optimization, and I
> >>>> have
> >>>>> one main concern. I believe that the RHS can only determine whether
> to
> >>>>> propagate the tombstone or not based on the value passed over from
> the
> >>>> LHS.
> >>>>> This value would need to be non-null, and so wouldn't the internal
> >>>>> repartition topics end up containing many non-null "tombstone"
> values?
> >>>>>
> >>>>> ie:
> >>>>> Normal tombstone (propagate):     (key=123, value=null)
> >>>>> Don't-propagate-tombstone:          (key=123, value=("don't propagate
> >>>> me,
> >>>>> but please delete state"))
> >>>>>
> >>>>> Would this pose some sort of verbosity problem in the internal
> topics,
> >>>>> especially if we have to rebuild state off of them?
> >>>>>
> >>>>> Thanks
> >>>>>
> >>>>> Adam
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> On Fri, Mar 8, 2019 at 10:14 PM Matthias J. Sax <
> >> matth...@confluent.io>
> >>>>> wrote:
> >>>>>
> >>>>>> SGTM.
> >>>>>>
> >>>>>> I also had the impression that those duplicates are rather an error
> >>>> than
> >>>>>> an case of eventual consistency. Using hashing to avoid sending the
> >>>>>> payload is a good idea IMHO.
> >>>>>>
> >>>>>> @Adam: can you update the KIP accordingly?
> >>>>>>
> >>>>>>  - add the optimization to not send a reply from RHS to LHS on
> >>>>>> unsubscribe (if not a tombstone)
> >>>>>>  - explain why using offsets to avoid duplicates does not work
> >>>>>>  - add hashing to avoid duplicates
> >>>>>>
> >>>>>> Beside this, I don't have any further comments. Excited to finally
> >> get
> >>>>>> this in!
> >>>>>>
> >>>>>> Let us know when you have updated the KIP so we can move forward
> with
> >>>>>> the VOTE. Thanks a lot for your patience! This was a very loooong
> >> shot!
> >>>>>>
> >>>>>>
> >>>>>> -Matthias
> >>>>>>
> >>>>>> On 3/8/19 8:47 AM, John Roesler wrote:
> >>>>>>> Hi all,
> >>>>>>>
> >>>>>>> This proposal sounds good to me, especially since we observe that
> >>>> people
> >>>>>>> are already confused when the see duplicate results coming out of
> >> 1:1
> >>>>>> joins
> >>>>>>> (which is a bug). I take this as "evidence" that we're better off
> >>>>>>> eliminating those duplicates from the start. Guozhang's proposal
> >> seems
> >>>>>> like
> >>>>>>> a lightweight solution to the problem, so FWIW, I'm in favor.
> >>>>>>>
> >>>>>>> Thanks,
> >>>>>>> -John
> >>>>>>>
> >>>>>>> On Fri, Mar 8, 2019 at 7:59 AM Adam Bellemare <
> >>>> adam.bellem...@gmail.com>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Hi Guozhang
> >>>>>>>>
> >>>>>>>> That would certainly work for eliminating those duplicate values.
> >> As
> >>>> it
> >>>>>>>> stands right now, this would be consistent with swallowing changes
> >>>> due
> >>>>>> to
> >>>>>>>> out-of-order processing with multiple threads, and seems like a
> >> very
> >>>>>>>> reasonable way forward. Thank you for the suggestion!
> >>>>>>>>
> >>>>>>>> I have been trying to think if there are any other scenarios where
> >> we
> >>>>>> can
> >>>>>>>> end up with duplicates, though I have not been able to identify
> any
> >>>>>> others
> >>>>>>>> at the moment. I will think on it a bit more, but if anyone else
> >> has
> >>>> any
> >>>>>>>> ideas, please chime in.
> >>>>>>>>
> >>>>>>>> Thanks,
> >>>>>>>> Adam
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Thu, Mar 7, 2019 at 8:19 PM Guozhang Wang <wangg...@gmail.com>
> >>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> One more thought regarding *c-P2: Duplicates)*: first I want to
> >>>>>> separate
> >>>>>>>>> this issue with the more general issue that today (not only
> >>>>>> foreign-key,
> >>>>>>>>> but also co-partition primary-key) table-table joins is still not
> >>>>>>>> strictly
> >>>>>>>>> respecting the timestamp ordering since the two changelog streams
> >>>> may
> >>>>>> be
> >>>>>>>>> fetched and hence processed out-of-order and we do not allow a
> >>>> record
> >>>>>> to
> >>>>>>>> be
> >>>>>>>>> joined with the other table at any given time snapshot yet. So
> >>>> ideally
> >>>>>>>> when
> >>>>>>>>> there are two changelog records (k1, (f-k1, v1)), (k1, (f-k1,
> v2))
> >>>>>> coming
> >>>>>>>>> at the left hand table and one record (f-k1, v3) at the right
> hand
> >>>>>> table,
> >>>>>>>>> depending on the processing ordering we may get:
> >>>>>>>>>
> >>>>>>>>> (k1, (f-k1, v2-v3))
> >>>>>>>>>
> >>>>>>>>> or
> >>>>>>>>>
> >>>>>>>>> (k1, (f-k1, v1-v3))
> >>>>>>>>> (k1, (f-k1, v2-v3))
> >>>>>>>>>
> >>>>>>>>> And this is not to be addressed by this KIP.
> >>>>>>>>>
> >>>>>>>>> What I would advocate is to fix the issue that is introduced in
> >> this
> >>>>>> KIP
> >>>>>>>>> alone, that is we may have
> >>>>>>>>>
> >>>>>>>>> (k1, (f-k1, v2-v3))   // this should actually be v1-v3
> >>>>>>>>> (k1, (f-k1, v2-v3))
> >>>>>>>>>
> >>>>>>>>> I admit that it does not have correctness issue from the
> semantics
> >>>>>> along,
> >>>>>>>>> comparing it with "discarding the first result", but it may be
> >>>>>> confusing
> >>>>>>>>> from user's observation who do not expect to see the seemingly
> >>>>>>>> duplicates.
> >>>>>>>>> On the other hand, I think there's a light solution to avoid it,
> >>>> which
> >>>>>> is
> >>>>>>>>> that we can still optimize away to not send the full payload of
> >> "v1"
> >>>>>> from
> >>>>>>>>> left hand side to right hand side, but instead of just trimming
> >> off
> >>>> the
> >>>>>>>>> whole bytes, we can send, e.g., an MD5 hash of the bytes (I'm
> >> using
> >>>> MD5
> >>>>>>>>> here just as an example, we can definitely replace it with other
> >>>>>>>>> functions), by doing which we can discard the join operation if
> >> the
> >>>>>> hash
> >>>>>>>>> value sent back from the right hand side does not match with the
> >>>> left
> >>>>>>>> hand
> >>>>>>>>> side any more, i.e. we will only send:
> >>>>>>>>>
> >>>>>>>>> (k1, (f-k1, v2-v3))
> >>>>>>>>>
> >>>>>>>>> to down streams once.
> >>>>>>>>>
> >>>>>>>>> WDYT?
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Guozhang
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Wed, Mar 6, 2019 at 7:58 AM Adam Bellemare <
> >>>>>> adam.bellem...@gmail.com>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Ah yes, I recall it all now. That answers that question as to
> >> why I
> >>>>>> had
> >>>>>>>>>> caching disabled. I can certainly re-enable it since I believe
> >> the
> >>>>>> main
> >>>>>>>>>> concern was simply about reconciling those two iterators. A lack
> >> of
> >>>>>>>>>> knowledge there on my part.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> Thank you John for weighing in - we certainly both do appreciate
> >>>> it. I
> >>>>>>>>>> think that John hits it on the head though with his comment of
> >> "If
> >>>> it
> >>>>>>>>> turns
> >>>>>>>>>> out we're wrong about this, then it should be possible to fix
> the
> >>>>>>>>> semantics
> >>>>>>>>>> in place, without messing with the API."
> >>>>>>>>>>
> >>>>>>>>>> If anyone else would like to weigh in, your thoughts would be
> >>>> greatly
> >>>>>>>>>> appreciated.
> >>>>>>>>>>
> >>>>>>>>>> Thanks
> >>>>>>>>>>
> >>>>>>>>>> On Tue, Mar 5, 2019 at 6:05 PM Matthias J. Sax <
> >>>> matth...@confluent.io
> >>>>>>>
> >>>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>>>> I dont know how to range scan over a caching store, probably
> >> one
> >>>>>>>> had
> >>>>>>>>>>>>> to open 2 iterators and merge them.
> >>>>>>>>>>>
> >>>>>>>>>>> That happens automatically. If you query a cached KTable, it
> >>>> ranges
> >>>>>>>>> over
> >>>>>>>>>>> the cache and the underlying RocksDB and performs the merging
> >>>> under
> >>>>>>>> the
> >>>>>>>>>>> hood.
> >>>>>>>>>>>
> >>>>>>>>>>>>> Other than that, I still think even the regualr join is
> broken
> >>>>>>>> with
> >>>>>>>>>>>>> caching enabled right?
> >>>>>>>>>>>
> >>>>>>>>>>> Why? To me, if you use the word "broker", it implies
> >> conceptually
> >>>>>>>>>>> incorrect; I don't see this.
> >>>>>>>>>>>
> >>>>>>>>>>>> I once files a ticket, because with caching
> >>>>>>>>>>>>>> enabled it would return values that havent been published
> >>>>>>>>> downstream
> >>>>>>>>>>> yet.
> >>>>>>>>>>>
> >>>>>>>>>>> For the bug report, if found
> >>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-6599. We still
> need
> >>>> to
> >>>>>>>> fix
> >>>>>>>>>>> this, but it is a regular bug as any other, and we should not
> >>>> change
> >>>>>>>> a
> >>>>>>>>>>> design because of a bug.
> >>>>>>>>>>>
> >>>>>>>>>>> That range() returns values that have not been published
> >>>> downstream
> >>>>>>>> if
> >>>>>>>>>>> caching is enabled is how caching works and is intended
> >> behavior.
> >>>> Not
> >>>>>>>>>>> sure why say it's incorrect?
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> -Matthias
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On 3/5/19 1:49 AM, Jan Filipiak wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> On 04.03.2019 19:14, Matthias J. Sax wrote:
> >>>>>>>>>>>>> Thanks Adam,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> *> Q) Range scans work with caching enabled, too. Thus, there
> >> is
> >>>>>>>> no
> >>>>>>>>>>>>> functional/correctness requirement to disable caching. I
> >> cannot
> >>>>>>>>>>>>> remember why Jan's proposal added this? It might be an
> >>>>>>>>>>>>> implementation detail though (maybe just remove it from the
> >> KIP?
> >>>>>>>>>>>>> -- might be miss leading).
> >>>>>>>>>>>>
> >>>>>>>>>>>> I dont know how to range scan over a caching store, probably
> >> one
> >>>>>>>> had
> >>>>>>>>>>>> to open 2 iterators and merge them.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Other than that, I still think even the regualr join is broken
> >>>> with
> >>>>>>>>>>>> caching enabled right? I once files a ticket, because with
> >>>> caching
> >>>>>>>>>>>> enabled it would return values that havent been published
> >>>>>>>> downstream
> >>>>>>>>>> yet.
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> --
> >>>>>>>>> -- Guozhang
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >>
> >
>
>

Reply via email to