Hello Adam,

Sorry for being late on this thread. I've read through your updated wiki
and here are some thoughts:

* I agree with your assessed impediments. In fact, today although the
Global KTable have its own checkpoint files, and restoration process,
during its restoration it will always try to bootstrap the backing global
store up to the Kafka topic's end of offset, before starting any
processing. And hence on each machine node the checkpoint offsets  For the
proposed change that Global KTable also needs to trigger joins, it means
its restoration process would not fit as well. On the other hand, even for
Global KTable - Stream joins we do not have such guarantees either: imagine
if there is a crash, or even graceful shutdown scenario, when the task is
back online and Global KTable is bootstrapped, it does not guarantee to be
at the same offset position when it has stopped anyways. So I think one can
argue that users of Global KTable - KTable join should not expect this
semantics either.

* The other issue, which I mentioned above, is that the updates of the
joins triggered by the Global KTable, and hence executed by the global
thread, now needs to be propagated into the downstream operators, and more
important following the order of the join: i.e. if there is a record coming
from Global KTable, and then later another record coming from the other
KTable. It means that then the global thread and the stream thread needs to
be synchronized (note that today these threads are totally in parallel to
each other).


With that, I think we can 1) continue working on KIP-213 for local KTable
joins, and 2) continue this KIP for Global KTable - KTable joins, while
educating users that similar to Global KTable - KStream joins, the global
ktable will not Global KTable trigger joins.

Guozhang


On Mon, Jun 25, 2018 at 11:05 AM, Adam Bellemare <adam.bellem...@gmail.com>
wrote:

> Thanks for your help so far guys.
>
> While I do think that I have a fairly reasonable way forward for
> restructuring the topologies and threads, there is, unfortunately, what I
> believe is a fatal flaw that cannot be easily resolved. I have updated the
> page (
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 314%3A+KTable+to+GlobalKTable+Bi-directional+Join
> ) with the impediments to the solution, all of which revolve around
> ensuring that data consistency is maintained. It seems to me that
> GlobalKTables are not the way forward here and that I may be best
> redirecting my efforts towards KIP-213 ( https://cwiki.apache.org/
> confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable ).
>
> I would appreciate being proven wrong on my impediment listings, but if
> there are no other ideas I think we should close this KIP and the
> associated JIRA. A KTable to GlobalKTable join driven just by the KTable is
> simply performed by a stream to GKT join with a groupbyKey and reduce to
> form a state-store, so I would see no need to keep it open otherwise
> (unless just for the shorthand notation).
>
> Thanks again
>
> Adam
>
> On Fri, Jun 22, 2018 at 9:00 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hello Adam,
> >
> > Please see my comments inline.
> >
> > On Thu, Jun 21, 2018 at 8:14 AM, Adam Bellemare <
> adam.bellem...@gmail.com>
> > wrote:
> >
> > > Hi Guozhang
> > >
> > > *Re: Questions*
> > > *1)* I do not yet have a solution to this, but I also did not look that
> > > closely at it when I begun this KIP. I admit that I was unaware of
> > exactly
> > > how the GlobalKTable worked alongside the KTable/KStream topologies.
> You
> > > mention "It means the two topologies will be merged, and that merged
> > > topology can only be executed as a single task, by a single thread. " -
> > is
> > > the problem here that the merged topology would be parallelized to
> other
> > > threads/instances? While I am becoming familiar with how the topologies
> > are
> > > created under the hood, I am not yet fully clear on the implications of
> > > your statement. I will look into this further.
> > >
> > >
> > Yes. The issue is that today each task is executed by a single thread
> only
> > at any given time, and hence any state stores are only accessed by a
> single
> > thread (except for interactive queries, and for global tables where the
> > global update thread write to the global store, and the local thread read
> > from the global store), if we let the global store update thread to be
> also
> > triggering joins and puts send the results into the downstream operators,
> > then it means that the global store update thread can access on any state
> > stores in the subsequent part of the topology, breaking our current
> > threading model.
> >
> >
> > > *2)* " do you mean that although we have a duplicated state store:
> > > ModifiedEvents in addition to the original Events with only the
> enhanced
> > > key, this is not avoidable anyways even if we do re-keying?" Yes, that
> is
> > > correct, that is what I meant. I need to improve my knowledge around
> this
> > > component too. I have been browsing the KIP-213 discussion thread and
> > > looking at Jan's code
> > >
> > > *Re: Comments*
> > > *1) *Makes sense. I will update the diagram accordingly. Thanks!
> > >
> > > *2)* Wouldn't outer join require that we emit records from the right
> > > GlobalKTable that have no match in the left KTable? This seems
> undefined
> > to
> > > me with the current proposal (above issues aside), since multiple
> threads
> > > would be producing the same output event for a single GlobalKTable
> > update.
> > >
> > >
> > I was considering mainly about the semantics of table-table joins, that
> > whether we should add this operator inside our API. Implementation wise,
> we
> > will only have one global store update thread per instance, so there will
> > not be multiple threads producing the same output, but still there would
> be
> > other issues that we should consider indeed, as mentioned above. Again
> this
> > comment is not about implementations, but API wise if it is desirable to
> > add it.
> >
> >
> > >
> > > Questions for you both:
> > > Q1) Is a KTable always materialized? I am looking at the code under the
> > > hood, and it seems to me that it's either materialized with an explicit
> > > Materialized object, or it's given an anonymous name and the default
> > serdes
> > > are used. Am I correct in this observation?
> > >
> > >
> > A KTable is not always materialized. For example, a KTable generated from
> > `KTable#filter` or `KTable#mapValues` does not create a new materialized
> > state store, but we use the caller `KTable` 's state store for anyone who
> > wants to query it in joins.
> >
> > Moving forward, we are also trying to optimize the topology to only
> > "logically" materialize a KTable when necessary, this is summarized in
> > https://issues.apache.org/jira/browse/KAFKA-6761
> >
> >
> > >
> > > Thanks,
> > > Adam
> > >
> > >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Reply via email to