Thanks for your help so far guys.

While I do think that I have a fairly reasonable way forward for
restructuring the topologies and threads, there is, unfortunately, what I
believe is a fatal flaw that cannot be easily resolved. I have updated the
page (
https://cwiki.apache.org/confluence/display/KAFKA/KIP-314%3A+KTable+to+GlobalKTable+Bi-directional+Join
) with the impediments to the solution, all of which revolve around
ensuring that data consistency is maintained. It seems to me that
GlobalKTables are not the way forward here and that I may be best
redirecting my efforts towards KIP-213 ( https://cwiki.apache.org/
confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable ).

I would appreciate being proven wrong on my impediment listings, but if
there are no other ideas I think we should close this KIP and the
associated JIRA. A KTable to GlobalKTable join driven just by the KTable is
simply performed by a stream to GKT join with a groupbyKey and reduce to
form a state-store, so I would see no need to keep it open otherwise
(unless just for the shorthand notation).

Thanks again

Adam

On Fri, Jun 22, 2018 at 9:00 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hello Adam,
>
> Please see my comments inline.
>
> On Thu, Jun 21, 2018 at 8:14 AM, Adam Bellemare <adam.bellem...@gmail.com>
> wrote:
>
> > Hi Guozhang
> >
> > *Re: Questions*
> > *1)* I do not yet have a solution to this, but I also did not look that
> > closely at it when I begun this KIP. I admit that I was unaware of
> exactly
> > how the GlobalKTable worked alongside the KTable/KStream topologies. You
> > mention "It means the two topologies will be merged, and that merged
> > topology can only be executed as a single task, by a single thread. " -
> is
> > the problem here that the merged topology would be parallelized to other
> > threads/instances? While I am becoming familiar with how the topologies
> are
> > created under the hood, I am not yet fully clear on the implications of
> > your statement. I will look into this further.
> >
> >
> Yes. The issue is that today each task is executed by a single thread only
> at any given time, and hence any state stores are only accessed by a single
> thread (except for interactive queries, and for global tables where the
> global update thread write to the global store, and the local thread read
> from the global store), if we let the global store update thread to be also
> triggering joins and puts send the results into the downstream operators,
> then it means that the global store update thread can access on any state
> stores in the subsequent part of the topology, breaking our current
> threading model.
>
>
> > *2)* " do you mean that although we have a duplicated state store:
> > ModifiedEvents in addition to the original Events with only the enhanced
> > key, this is not avoidable anyways even if we do re-keying?" Yes, that is
> > correct, that is what I meant. I need to improve my knowledge around this
> > component too. I have been browsing the KIP-213 discussion thread and
> > looking at Jan's code
> >
> > *Re: Comments*
> > *1) *Makes sense. I will update the diagram accordingly. Thanks!
> >
> > *2)* Wouldn't outer join require that we emit records from the right
> > GlobalKTable that have no match in the left KTable? This seems undefined
> to
> > me with the current proposal (above issues aside), since multiple threads
> > would be producing the same output event for a single GlobalKTable
> update.
> >
> >
> I was considering mainly about the semantics of table-table joins, that
> whether we should add this operator inside our API. Implementation wise, we
> will only have one global store update thread per instance, so there will
> not be multiple threads producing the same output, but still there would be
> other issues that we should consider indeed, as mentioned above. Again this
> comment is not about implementations, but API wise if it is desirable to
> add it.
>
>
> >
> > Questions for you both:
> > Q1) Is a KTable always materialized? I am looking at the code under the
> > hood, and it seems to me that it's either materialized with an explicit
> > Materialized object, or it's given an anonymous name and the default
> serdes
> > are used. Am I correct in this observation?
> >
> >
> A KTable is not always materialized. For example, a KTable generated from
> `KTable#filter` or `KTable#mapValues` does not create a new materialized
> state store, but we use the caller `KTable` 's state store for anyone who
> wants to query it in joins.
>
> Moving forward, we are also trying to optimize the topology to only
> "logically" materialize a KTable when necessary, this is summarized in
> https://issues.apache.org/jira/browse/KAFKA-6761
>
>
> >
> > Thanks,
> > Adam
> >
> >
>
> --
> -- Guozhang
>

Reply via email to