One more thing to add: 6. For KGlobalTable, it is always bootstrapped from the beginning while for other KTables, we are enabling users to override their resetting position as in
https://github.com/apache/kafka/pull/2007 Should we consider doing the same for KGlobalTable as well? Guozhang On Tue, Dec 20, 2016 at 11:39 AM, Guozhang Wang <wangg...@gmail.com> wrote: > Thanks for the very well written proposal, and sorry for the very-late > review. I have a few comments here: > > 1. We are introducing a "queryableViewName" in the GlobalTable join > results, while I'm wondering if we should just add a more general function > like "materialize" to KTable and KGlobalTable with the name to be used in > queries? > > 2. For KGlobalTable's own "join" and "leftJoin": since we are only passing > the KeyValueMapper<K, V, K1> keyMapper it seems that for either case only > the left hand side will logically "trigger" the join, which is different to > KTable's join semantics. I'm wondering if it would be more consistent to > have them as: > > > <K1, V1, R> GlobalKTable<K, R> join(final GlobalKTable<K1, V1> other, > final KeyValueMapper<K, V, K1> > leftkeyMapper, > final KeyValueMapper<K1, V1, K> > rightkeyMapper, > final ValueJoiner<V, V1, R> joiner > final String queryableViewName); > > <K1, V1, R> GlobalKTable<K, R> outerJoin(final GlobalKTable<K1, V1> other, > final KeyValueMapper<K, V, K1> > leftkeyMapper, > final KeyValueMapper<K1, V1, K> > rightkeyMapper, > final ValueJoiner<V, V1, R> > joiner, > final String queryableViewName); > > <K1, V1, R> GlobalKTable<K, R> leftJoin(final GlobalKTable<K1, V1> other, > final KeyValueMapper<K, V, K1> > keyMapper, > final ValueJoiner<V, V1, R> > joiner, > final String queryableViewName); > > > I.e. add another directional key mapper to join and also to outerJoin. > > > 3. For "TopologyBuilder.buildGlobalStateTopology", is it necessary to > have a separate function from "TopologyBuilder.build" itself? With global > tables, is there any scenarios that we want to build the topology without > the embedded global tables (i.e. still calling "build")? > > 4. As for implementation, you mentioned that global table bootstraping > will be done in another dedicated thread. Could we also consider moving the > logic of bootstrapping the standby-replica state stores into this thread as > well, which can then leverage on the existing "restoreConsumer" that does > not participate in the consumer group protocol? By doing this I think we > can still avoid thread-synchronization while making the logic more clear > (ideally the standby restoration do not really need to be in part of the > stream thread's main loops). > > 5. Also for the global table's state directory, I'm assuming it will not > be under the per-task directory as it is per instance. But could you > elaborate a bit in the wiki about its directory as well? Also could we > consider adding https://issues.apache.org/jira/browse/KAFKA-3522 along > with this feature since we may need to change the directory path / storage > schema formats for these different types of stores moving forward. > > > > Guozhang > > > On Fri, Dec 9, 2016 at 4:21 AM, Damian Guy <damian....@gmail.com> wrote: > >> Thanks for the update Michael. >> >> I just wanted to add that there is one crucial piece of information that >> i've failed to add (I apologise). >> >> To me, the join between 2 Global Tables just produces a view on top of the >> underlying tables (this is the same as it works for KTables today). So >> that >> means there is no Physical StateStore that backs the join result, it is >> just a Virtual StateStore that knows how to resolve the join when it is >> required. I've deliberately taken this path so that we don't end up having >> yet another copy of the data, stored on local disk, and sent to another >> change-log topic. This also reduces the memory overhead from creating >> RocksDBStores and reduces load on the Thread based caches we have. So it >> is >> a resource optimization. >> >> So while it is technically possible to support outer joins, we would need >> to physically materialize the StateStore (and create a changelog-topic for >> it), or, we'd need to provide another interface where the user could map >> from the outerJoin key to both of the other table keys. This is because >> the >> key of the outerJoin table could be either the key of the lhs table, or >> the >> rhs tables, or something completely different. >> >> With this and what you have mentioned above in mind i think we should park >> outerJoin support for this KIP and re-visit if and when we need it in the >> future. >> >> I'll update the KIP with this. >> >> Thanks, >> Damian >> >> On Fri, 9 Dec 2016 at 09:53 Michael Noll <mich...@confluent.io> wrote: >> >> > Damian and I briefly chatted offline (thanks, Damian!), and here's the >> > summary of my thoughts and conclusion. >> > >> > TL;DR: Let's skip outer join support for global tables. >> > >> > In more detail: >> > >> > - We agreed that, technically, we can add OUTER JOIN support. However, >> > outer joins only work if certain preconditions are met. The >> preconditions >> > are IMHO simliar/the same as we have for the normal, partitioned KTables >> > (e.g. having matching keys and co-partitioned data for the tables), but >> in >> > the case of global tables the user would need to meet all these >> > preconditions in one big swing when specifying the params for the outer >> > join call. Even so, you'd only know at run-time whether the >> preconditions >> > were actually met properly. >> > >> > - Hence it's quite likely that users will be confused about these >> > preconditions and how to meet them, and -- from what we can tell -- use >> > cases / user demand for outer joins have been rare. >> > >> > - So, long story short, even though we could add outer join support we'd >> > suggest to skip it for global tables. If we subsequently learn that is >> a >> > lot of user interest in that functionality, we still have the option to >> add >> > it in the future. >> > >> > >> > Best, >> > Michael >> > >> > >> > >> > >> > >> > >> > On Thu, Dec 8, 2016 at 6:31 PM, Damian Guy <damian....@gmail.com> >> wrote: >> > >> > > Hi Michael, >> > > >> > > I don't see how that helps? >> > > >> > > Lets say we have tables Person(id, device_id, name, ...), Device(id, >> > > person_id, type, ...), and both are keyed with same type. And we have >> a >> > > stream, that for the sake of simplicity, has both person_id and >> > device_id ( >> > > i know this is a bit contrived!) >> > > so our join >> > > person = builder.globalTable(...); >> > > device = builder.globalTable(...); >> > > personDevice = builder.outerJoin(device, ...); >> > > >> > > someStream = builder.stream(..); >> > > // which id do i use to join with? person.id? device.id? >> > > someStream.leftJoin(personDevice, ...) >> > > >> > > // Interactive Query on the view generated by the join of person and >> > device >> > > personDeviceStore = streams.store("personDevice",...); >> > > // person.id? device.id? >> > > personDeviceStore.get(someId); >> > > >> > > We get records >> > > person id=1, device_id=2 ,... >> > > device id=2, person_id=1, ... >> > > stream person_id = 1, device_id = 2 >> > > >> > > We could do the join between the GlobalTables both ways as each side >> > could >> > > map to the other sides key, but when i'm accessing the resulting >> table, >> > > personDevice, what is the key? The person.id ? the device.id? it >> can't >> > be >> > > both of them. >> > > >> > > Thanks, >> > > Damian >> > > >> > > >> > > >> > > >> > > On Thu, 8 Dec 2016 at 15:51 Michael Noll <mich...@confluent.io> >> wrote: >> > > >> > > > The key type returned by both KeyValueMappers (in the current trunk >> > > > version, that type is named `R`) would need to be the same for this >> to >> > > > work. >> > > > >> > > > >> > > > On Wed, Dec 7, 2016 at 4:46 PM, Damian Guy <damian....@gmail.com> >> > wrote: >> > > > >> > > > > Michael, >> > > > > >> > > > > We can only support outerJoin if both tables are keyed the same >> way. >> > > Lets >> > > > > say for example you can map both ways, however, the key for each >> > table >> > > is >> > > > > of a different type. So t1 is long and t2 is string - what is the >> key >> > > > type >> > > > > of the resulting GlobalKTable? So when you subsequently join to >> this >> > > > table, >> > > > > and do a lookup on it, which key are you using? >> > > > > >> > > > > Thanks, >> > > > > Damian >> > > > > >> > > > > On Wed, 7 Dec 2016 at 14:31 Michael Noll <mich...@confluent.io> >> > wrote: >> > > > > >> > > > > > Damian, >> > > > > > >> > > > > > yes, that makes sense. >> > > > > > >> > > > > > But I am still wondering: In your example, there's no prior >> > > knowledge >> > > > > "can >> > > > > > I map from t1->t2" that Streams can leverage for joining t1 and >> t2 >> > > > other >> > > > > > than blindly relying on the user to provide an appropriate >> > > > KeyValueMapper >> > > > > > for K1/V1 of t1 -> K2/V2 of t2. In other words, if we allow the >> > user >> > > > to >> > > > > > provide a KeyValueMapper from t1->t2 (Streams does not know at >> > > compile >> > > > > time >> > > > > > whether this mapping will actually work), then we can also allow >> > the >> > > > user >> > > > > > to provide a corresponding "reverse" mapper from t2->t1. That >> is, >> > we >> > > > > could >> > > > > > say that an outer join between two global tables IS supported, >> but >> > if >> > > > and >> > > > > > only if the user provides two KeyValueMappers, one for t1->t2 >> and >> > one >> > > > for >> > > > > > t2->t1. >> > > > > > >> > > > > > The left join t1->t2 (which is supported in the KIP), in >> general, >> > > works >> > > > > > only because of the existence of the user-provided >> KeyValueMapper >> > > from >> > > > > > t1->t2. The outer join, as you point out, cannot satisfied as >> > easily >> > > > > > because Streams must know two mappers, t1->t2 plus t2->t1 -- >> > > otherwise >> > > > > the >> > > > > > outer join won't work. >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > On Wed, Dec 7, 2016 at 3:04 PM, Damian Guy < >> damian....@gmail.com> >> > > > wrote: >> > > > > > >> > > > > > > Hi Michael, >> > > > > > > >> > > > > > > Sure. Say we have 2 input topics t1 & t2 below: >> > > > > > > t1{ >> > > > > > > int key; >> > > > > > > string t2_id; >> > > > > > > ... >> > > > > > > } >> > > > > > > >> > > > > > > t2 { >> > > > > > > string key; >> > > > > > > .. >> > > > > > > } >> > > > > > > If we create global tables out of these we'd get: >> > > > > > > GlobalKTable<Integer, ?> t1; >> > > > > > > GlobalKTable<String, ?> t2; >> > > > > > > >> > > > > > > So the join can only go in 1 direction, i.e, from t1 -> t2 as >> in >> > > > order >> > > > > to >> > > > > > > perform the join we need to use a KeyValueMapper to extract >> the >> > t2 >> > > > key >> > > > > > from >> > > > > > > the t1 value. >> > > > > > > >> > > > > > > Does that make sense? >> > > > > > > >> > > > > > > Thanks, >> > > > > > > Damian >> > > > > > > >> > > > > > > On Wed, 7 Dec 2016 at 10:44 Michael Noll < >> mich...@confluent.io> >> > > > wrote: >> > > > > > > >> > > > > > > > > There is no outer-join for GlobalKTables as the tables >> may be >> > > > keyed >> > > > > > > > > differently. So you need to use the key from the left >> side of >> > > the >> > > > > > join >> > > > > > > > > along with the KeyValueMapper to resolve the right side of >> > the >> > > > > join. >> > > > > > > This >> > > > > > > > > wont work the other way around. >> > > > > > > > >> > > > > > > > Care to elaborate why it won't work the other way around? >> If, >> > > for >> > > > > > > example, >> > > > > > > > we swapped the call from leftTable.join(rightTable) to >> > > > > > > > rightTable.join(leftTable), that join would work, too. >> > Perhaps I >> > > > am >> > > > > > > > missing something though. :-) >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > On Wed, Dec 7, 2016 at 10:39 AM, Damian Guy < >> > > damian....@gmail.com> >> > > > > > > wrote: >> > > > > > > > >> > > > > > > > > Hi Matthias, >> > > > > > > > > >> > > > > > > > > Thanks for the feedback. >> > > > > > > > > >> > > > > > > > > There is no outer-join for GlobalKTables as the tables >> may be >> > > > keyed >> > > > > > > > > differently. So you need to use the key from the left >> side of >> > > the >> > > > > > join >> > > > > > > > > along with the KeyValueMapper to resolve the right side of >> > the >> > > > > join. >> > > > > > > This >> > > > > > > > > wont work the other way around. >> > > > > > > > > >> > > > > > > > > On the bootstrapping concern. If the application is >> failing >> > > > before >> > > > > > > > > bootstrapping finishes, the problem is likely to be >> related >> > to >> > > a >> > > > > > > terminal >> > > > > > > > > exception, i.e., running out of disk space, corrupt state >> > > stores >> > > > > etc. >> > > > > > > In >> > > > > > > > > these cases, we wouldn't want the application to continue. >> > So i >> > > > > think >> > > > > > > > this >> > > > > > > > > is ok. >> > > > > > > > > >> > > > > > > > > Thanks, >> > > > > > > > > Damian >> > > > > > > > > >> > > > > > > > > On Tue, 6 Dec 2016 at 21:56 Matthias J. Sax < >> > > > matth...@confluent.io >> > > > > > >> > > > > > > > wrote: >> > > > > > > > > >> > > > > > > > > > Thanks for the KIP Damian. Very nice motivating example! >> > > > > > > > > > >> > > > > > > > > > A few comments: >> > > > > > > > > > >> > > > > > > > > > - why is there no outer-join for GlobalKTables >> > > > > > > > > > - on bootstrapping GlobalKTable, could it happen that >> this >> > > > never >> > > > > > > > > > finishes if the application fails before bootstrapping >> > > finishes >> > > > > and >> > > > > > > new >> > > > > > > > > > data gets written at the same time? Do we need to guard >> > > against >> > > > > > this >> > > > > > > > > > (seems to be a very rare corner case, so maybe not >> > required)? >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > -Matthias >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > On 12/6/16 2:09 AM, Damian Guy wrote: >> > > > > > > > > > > Hi all, >> > > > > > > > > > > >> > > > > > > > > > > I would like to start the discussion on KIP-99: >> > > > > > > > > > > >> > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage. >> > > > > > > > > action?pageId=67633649 >> > > > > > > > > > > >> > > > > > > > > > > Looking forward to your feedback. >> > > > > > > > > > > >> > > > > > > > > > > Thanks, >> > > > > > > > > > > Damian >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > -- >> > > > > > *Michael G. Noll* >> > > > > > Product Manager | Confluent >> > > > > > +1 650 453 5860 <(650)%20453-5860> <(650)%20453-5860> >> > <(650)%20453-5860> | @miguno < >> > > > https://twitter.com/miguno >> > > > > > >> > > > > > Follow us: Twitter <https://twitter.com/ConfluentInc> | Blog >> > > > > > <http://www.confluent.io/blog> >> > > > > > >> > > > > >> > > > >> > > >> > >> > > > > -- > -- Guozhang > -- -- Guozhang