Moved this KIP into status "inactive". Feel free to resume and any time.
-Matthias On 7/1/18 9:47 PM, Guozhang Wang wrote: > Hello Adam, > > Sorry for being late on this thread. I've read through your updated wiki > and here are some thoughts: > > * I agree with your assessed impediments. In fact, today although the > Global KTable have its own checkpoint files, and restoration process, > during its restoration it will always try to bootstrap the backing global > store up to the Kafka topic's end of offset, before starting any > processing. And hence on each machine node the checkpoint offsets For the > proposed change that Global KTable also needs to trigger joins, it means > its restoration process would not fit as well. On the other hand, even for > Global KTable - Stream joins we do not have such guarantees either: imagine > if there is a crash, or even graceful shutdown scenario, when the task is > back online and Global KTable is bootstrapped, it does not guarantee to be > at the same offset position when it has stopped anyways. So I think one can > argue that users of Global KTable - KTable join should not expect this > semantics either. > > * The other issue, which I mentioned above, is that the updates of the > joins triggered by the Global KTable, and hence executed by the global > thread, now needs to be propagated into the downstream operators, and more > important following the order of the join: i.e. if there is a record coming > from Global KTable, and then later another record coming from the other > KTable. It means that then the global thread and the stream thread needs to > be synchronized (note that today these threads are totally in parallel to > each other). > > > With that, I think we can 1) continue working on KIP-213 for local KTable > joins, and 2) continue this KIP for Global KTable - KTable joins, while > educating users that similar to Global KTable - KStream joins, the global > ktable will not Global KTable trigger joins. > > Guozhang > > > On Mon, Jun 25, 2018 at 11:05 AM, Adam Bellemare <adam.bellem...@gmail.com> > wrote: > >> Thanks for your help so far guys. >> >> While I do think that I have a fairly reasonable way forward for >> restructuring the topologies and threads, there is, unfortunately, what I >> believe is a fatal flaw that cannot be easily resolved. I have updated the >> page ( >> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >> 314%3A+KTable+to+GlobalKTable+Bi-directional+Join >> ) with the impediments to the solution, all of which revolve around >> ensuring that data consistency is maintained. It seems to me that >> GlobalKTables are not the way forward here and that I may be best >> redirecting my efforts towards KIP-213 ( https://cwiki.apache.org/ >> confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable ). >> >> I would appreciate being proven wrong on my impediment listings, but if >> there are no other ideas I think we should close this KIP and the >> associated JIRA. A KTable to GlobalKTable join driven just by the KTable is >> simply performed by a stream to GKT join with a groupbyKey and reduce to >> form a state-store, so I would see no need to keep it open otherwise >> (unless just for the shorthand notation). >> >> Thanks again >> >> Adam >> >> On Fri, Jun 22, 2018 at 9:00 PM, Guozhang Wang <wangg...@gmail.com> wrote: >> >>> Hello Adam, >>> >>> Please see my comments inline. >>> >>> On Thu, Jun 21, 2018 at 8:14 AM, Adam Bellemare < >> adam.bellem...@gmail.com> >>> wrote: >>> >>>> Hi Guozhang >>>> >>>> *Re: Questions* >>>> *1)* I do not yet have a solution to this, but I also did not look that >>>> closely at it when I begun this KIP. I admit that I was unaware of >>> exactly >>>> how the GlobalKTable worked alongside the KTable/KStream topologies. >> You >>>> mention "It means the two topologies will be merged, and that merged >>>> topology can only be executed as a single task, by a single thread. " - >>> is >>>> the problem here that the merged topology would be parallelized to >> other >>>> threads/instances? While I am becoming familiar with how the topologies >>> are >>>> created under the hood, I am not yet fully clear on the implications of >>>> your statement. I will look into this further. >>>> >>>> >>> Yes. The issue is that today each task is executed by a single thread >> only >>> at any given time, and hence any state stores are only accessed by a >> single >>> thread (except for interactive queries, and for global tables where the >>> global update thread write to the global store, and the local thread read >>> from the global store), if we let the global store update thread to be >> also >>> triggering joins and puts send the results into the downstream operators, >>> then it means that the global store update thread can access on any state >>> stores in the subsequent part of the topology, breaking our current >>> threading model. >>> >>> >>>> *2)* " do you mean that although we have a duplicated state store: >>>> ModifiedEvents in addition to the original Events with only the >> enhanced >>>> key, this is not avoidable anyways even if we do re-keying?" Yes, that >> is >>>> correct, that is what I meant. I need to improve my knowledge around >> this >>>> component too. I have been browsing the KIP-213 discussion thread and >>>> looking at Jan's code >>>> >>>> *Re: Comments* >>>> *1) *Makes sense. I will update the diagram accordingly. Thanks! >>>> >>>> *2)* Wouldn't outer join require that we emit records from the right >>>> GlobalKTable that have no match in the left KTable? This seems >> undefined >>> to >>>> me with the current proposal (above issues aside), since multiple >> threads >>>> would be producing the same output event for a single GlobalKTable >>> update. >>>> >>>> >>> I was considering mainly about the semantics of table-table joins, that >>> whether we should add this operator inside our API. Implementation wise, >> we >>> will only have one global store update thread per instance, so there will >>> not be multiple threads producing the same output, but still there would >> be >>> other issues that we should consider indeed, as mentioned above. Again >> this >>> comment is not about implementations, but API wise if it is desirable to >>> add it. >>> >>> >>>> >>>> Questions for you both: >>>> Q1) Is a KTable always materialized? I am looking at the code under the >>>> hood, and it seems to me that it's either materialized with an explicit >>>> Materialized object, or it's given an anonymous name and the default >>> serdes >>>> are used. Am I correct in this observation? >>>> >>>> >>> A KTable is not always materialized. For example, a KTable generated from >>> `KTable#filter` or `KTable#mapValues` does not create a new materialized >>> state store, but we use the caller `KTable` 's state store for anyone who >>> wants to query it in joins. >>> >>> Moving forward, we are also trying to optimize the topology to only >>> "logically" materialize a KTable when necessary, this is summarized in >>> https://issues.apache.org/jira/browse/KAFKA-6761 >>> >>> >>>> >>>> Thanks, >>>> Adam >>>> >>>> >>> >>> -- >>> -- Guozhang >>> >> > > >
signature.asc
Description: OpenPGP digital signature