One more thing to add:

6. For KGlobalTable, it is always bootstrapped from the beginning while for
other KTables, we are enabling users to override their resetting position
as in

https://github.com/apache/kafka/pull/2007

Should we consider doing the same for KGlobalTable as well?


Guozhang


On Tue, Dec 20, 2016 at 11:39 AM, Guozhang Wang <wangg...@gmail.com> wrote:

> Thanks for the very well written proposal, and sorry for the very-late
> review. I have a few comments here:
>
> 1. We are introducing a "queryableViewName" in the GlobalTable join
> results, while I'm wondering if we should just add a more general function
> like "materialize" to KTable and KGlobalTable with the name to be used in
> queries?
>
> 2. For KGlobalTable's own "join" and "leftJoin": since we are only passing
> the KeyValueMapper<K, V, K1> keyMapper it seems that for either case only
> the left hand side will logically "trigger" the join, which is different to
> KTable's join semantics. I'm wondering if it would be more consistent to
> have them as:
>
>
> <K1, V1, R> GlobalKTable<K, R> join(final GlobalKTable<K1, V1> other,
>                                         final KeyValueMapper<K, V, K1>
> leftkeyMapper,
>                                         final KeyValueMapper<K1, V1, K>
> rightkeyMapper,
>                                         final ValueJoiner<V, V1, R> joiner
>                                         final String queryableViewName);
>
> <K1, V1, R> GlobalKTable<K, R> outerJoin(final GlobalKTable<K1, V1> other,
>                                          final KeyValueMapper<K, V, K1>
> leftkeyMapper,
>                                          final KeyValueMapper<K1, V1, K>
> rightkeyMapper,
>                                          final ValueJoiner<V, V1, R>
> joiner,
>                                          final String queryableViewName);
>
> <K1, V1, R> GlobalKTable<K, R> leftJoin(final GlobalKTable<K1, V1> other,
>                                         final KeyValueMapper<K, V, K1>
> keyMapper,
>                                         final ValueJoiner<V, V1, R>
> joiner,
>                                         final String queryableViewName);
>
>
> I.e. add another directional key mapper to join and also to outerJoin.
>
>
> 3. For "TopologyBuilder.buildGlobalStateTopology", is it necessary to
> have a separate function from "TopologyBuilder.build" itself? With global
> tables, is there any scenarios that we want to build the topology without
> the embedded global tables (i.e. still calling "build")?
>
> 4. As for implementation, you mentioned that global table bootstraping
> will be done in another dedicated thread. Could we also consider moving the
> logic of bootstrapping the standby-replica state stores into this thread as
> well, which can then leverage on the existing "restoreConsumer" that does
> not participate in the consumer group protocol? By doing this I think we
> can still avoid thread-synchronization while making the logic more clear
> (ideally the standby restoration do not really need to be in part of the
> stream thread's main loops).
>
> 5. Also for the global table's state directory, I'm assuming it will not
> be under the per-task directory as it is per instance. But could you
> elaborate a bit in the wiki about its directory as well? Also could we
> consider adding https://issues.apache.org/jira/browse/KAFKA-3522 along
> with this feature since we may need to change the directory path / storage
> schema formats for these different types of stores moving forward.
>
>
>
> Guozhang
>
>
> On Fri, Dec 9, 2016 at 4:21 AM, Damian Guy <damian....@gmail.com> wrote:
>
>> Thanks for the update Michael.
>>
>> I just wanted to add that there is one crucial piece of information that
>> i've failed to add (I apologise).
>>
>> To me, the join between 2 Global Tables just produces a view on top of the
>> underlying tables (this is the same as it works for KTables today). So
>> that
>> means there is no Physical StateStore that backs the join result, it is
>> just a Virtual StateStore that knows how to resolve the join when it is
>> required. I've deliberately taken this path so that we don't end up having
>> yet another copy of the data, stored on local disk, and sent to another
>> change-log topic. This also reduces the memory overhead from creating
>> RocksDBStores and reduces load on the Thread based caches we have. So it
>> is
>> a resource optimization.
>>
>> So while it is technically possible to support outer joins, we would need
>> to physically materialize the StateStore (and create a changelog-topic for
>> it), or, we'd need to provide another interface where the user could map
>> from the outerJoin key to both of the other table keys. This is because
>> the
>> key of the outerJoin table could be either the key of the lhs table, or
>> the
>> rhs tables, or something completely different.
>>
>> With this and what you have mentioned above in mind i think we should park
>> outerJoin support for this KIP and re-visit if and when we need it in the
>> future.
>>
>> I'll update the KIP with this.
>>
>> Thanks,
>> Damian
>>
>> On Fri, 9 Dec 2016 at 09:53 Michael Noll <mich...@confluent.io> wrote:
>>
>> > Damian and I briefly chatted offline (thanks, Damian!), and here's the
>> > summary of my thoughts and conclusion.
>> >
>> > TL;DR: Let's skip outer join support for global tables.
>> >
>> > In more detail:
>> >
>> > - We agreed that, technically, we can add OUTER JOIN support.  However,
>> > outer joins only work if certain preconditions are met.  The
>> preconditions
>> > are IMHO simliar/the same as we have for the normal, partitioned KTables
>> > (e.g. having matching keys and co-partitioned data for the tables), but
>> in
>> > the case of global tables the user would need to meet all these
>> > preconditions in one big swing when specifying the params for the outer
>> > join call.  Even so, you'd only know at run-time whether the
>> preconditions
>> > were actually met properly.
>> >
>> > - Hence it's quite likely that users will be confused about these
>> > preconditions and how to meet them, and -- from what we can tell -- use
>> > cases / user demand for outer joins have been rare.
>> >
>> > - So, long story short, even though we could add outer join support we'd
>> > suggest to skip it for global tables.  If we subsequently learn that is
>> a
>> > lot of user interest in that functionality, we still have the option to
>> add
>> > it in the future.
>> >
>> >
>> > Best,
>> > Michael
>> >
>> >
>> >
>> >
>> >
>> >
>> > On Thu, Dec 8, 2016 at 6:31 PM, Damian Guy <damian....@gmail.com>
>> wrote:
>> >
>> > > Hi Michael,
>> > >
>> > > I don't see how that helps?
>> > >
>> > > Lets say we have tables Person(id, device_id, name, ...), Device(id,
>> > > person_id, type, ...), and both are keyed with same type. And we have
>> a
>> > > stream, that for the sake of simplicity, has both person_id and
>> > device_id (
>> > > i know this is a bit contrived!)
>> > > so our join
>> > > person = builder.globalTable(...);
>> > > device = builder.globalTable(...);
>> > > personDevice = builder.outerJoin(device, ...);
>> > >
>> > > someStream = builder.stream(..);
>> > > // which id do i use to join with? person.id? device.id?
>> > > someStream.leftJoin(personDevice, ...)
>> > >
>> > > // Interactive Query on the view generated by the join of person and
>> > device
>> > > personDeviceStore = streams.store("personDevice",...);
>> > > // person.id? device.id?
>> > > personDeviceStore.get(someId);
>> > >
>> > > We get records
>> > > person id=1, device_id=2 ,...
>> > > device id=2, person_id=1, ...
>> > > stream person_id = 1, device_id = 2
>> > >
>> > > We could do the join between the GlobalTables both ways as each side
>> > could
>> > > map to the other sides key, but when i'm accessing the resulting
>> table,
>> > > personDevice, what is the key? The person.id ? the device.id? it
>> can't
>> > be
>> > > both of them.
>> > >
>> > > Thanks,
>> > > Damian
>> > >
>> > >
>> > >
>> > >
>> > > On Thu, 8 Dec 2016 at 15:51 Michael Noll <mich...@confluent.io>
>> wrote:
>> > >
>> > > > The key type returned by both KeyValueMappers (in the current trunk
>> > > > version, that type is named `R`) would need to be the same for this
>> to
>> > > > work.
>> > > >
>> > > >
>> > > > On Wed, Dec 7, 2016 at 4:46 PM, Damian Guy <damian....@gmail.com>
>> > wrote:
>> > > >
>> > > > > Michael,
>> > > > >
>> > > > > We can only support outerJoin if both tables are keyed the same
>> way.
>> > > Lets
>> > > > > say for example you can map both ways, however, the key for each
>> > table
>> > > is
>> > > > > of a different type. So t1 is long and t2 is string - what is the
>> key
>> > > > type
>> > > > > of the resulting GlobalKTable? So when you subsequently join to
>> this
>> > > > table,
>> > > > > and do a lookup on it, which key are you using?
>> > > > >
>> > > > > Thanks,
>> > > > > Damian
>> > > > >
>> > > > > On Wed, 7 Dec 2016 at 14:31 Michael Noll <mich...@confluent.io>
>> > wrote:
>> > > > >
>> > > > > > Damian,
>> > > > > >
>> > > > > > yes, that makes sense.
>> > > > > >
>> > > > > > But I am still wondering:  In your example, there's no prior
>> > > knowledge
>> > > > > "can
>> > > > > > I map from t1->t2" that Streams can leverage for joining t1 and
>> t2
>> > > > other
>> > > > > > than blindly relying on the user to provide an appropriate
>> > > > KeyValueMapper
>> > > > > > for K1/V1 of t1 -> K2/V2 of t2.  In other words, if we allow the
>> > user
>> > > > to
>> > > > > > provide a KeyValueMapper from t1->t2 (Streams does not know at
>> > > compile
>> > > > > time
>> > > > > > whether this mapping will actually work), then we can also allow
>> > the
>> > > > user
>> > > > > > to provide a corresponding "reverse" mapper from t2->t1.  That
>> is,
>> > we
>> > > > > could
>> > > > > > say that an outer join between two global tables IS supported,
>> but
>> > if
>> > > > and
>> > > > > > only if the user provides two KeyValueMappers, one for t1->t2
>> and
>> > one
>> > > > for
>> > > > > > t2->t1.
>> > > > > >
>> > > > > > The left join t1->t2 (which is supported in the KIP), in
>> general,
>> > > works
>> > > > > > only because of the existence of the user-provided
>> KeyValueMapper
>> > > from
>> > > > > > t1->t2.  The outer join, as you point out, cannot satisfied as
>> > easily
>> > > > > > because Streams must know two mappers, t1->t2 plus t2->t1 --
>> > > otherwise
>> > > > > the
>> > > > > > outer join won't work.
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > > On Wed, Dec 7, 2016 at 3:04 PM, Damian Guy <
>> damian....@gmail.com>
>> > > > wrote:
>> > > > > >
>> > > > > > > Hi Michael,
>> > > > > > >
>> > > > > > > Sure. Say we have 2 input topics t1 & t2 below:
>> > > > > > > t1{
>> > > > > > >  int key;
>> > > > > > >  string t2_id;
>> > > > > > >  ...
>> > > > > > > }
>> > > > > > >
>> > > > > > > t2 {
>> > > > > > >   string key;
>> > > > > > >   ..
>> > > > > > > }
>> > > > > > > If we create global tables out of these we'd get:
>> > > > > > > GlobalKTable<Integer, ?> t1;
>> > > > > > > GlobalKTable<String, ?> t2;
>> > > > > > >
>> > > > > > > So the join can only go in 1 direction, i.e, from t1 -> t2 as
>> in
>> > > > order
>> > > > > to
>> > > > > > > perform the join we need to use a KeyValueMapper to extract
>> the
>> > t2
>> > > > key
>> > > > > > from
>> > > > > > > the t1 value.
>> > > > > > >
>> > > > > > > Does that make sense?
>> > > > > > >
>> > > > > > > Thanks,
>> > > > > > > Damian
>> > > > > > >
>> > > > > > > On Wed, 7 Dec 2016 at 10:44 Michael Noll <
>> mich...@confluent.io>
>> > > > wrote:
>> > > > > > >
>> > > > > > > > > There is no outer-join for GlobalKTables as the tables
>> may be
>> > > > keyed
>> > > > > > > > > differently. So you need to use the key from the left
>> side of
>> > > the
>> > > > > > join
>> > > > > > > > > along with the KeyValueMapper to resolve the right side of
>> > the
>> > > > > join.
>> > > > > > > This
>> > > > > > > > > wont work the other way around.
>> > > > > > > >
>> > > > > > > > Care to elaborate why it won't work the other way around?
>> If,
>> > > for
>> > > > > > > example,
>> > > > > > > > we swapped the call from leftTable.join(rightTable) to
>> > > > > > > > rightTable.join(leftTable), that join would work, too.
>> > Perhaps I
>> > > > am
>> > > > > > > > missing something though. :-)
>> > > > > > > >
>> > > > > > > >
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > On Wed, Dec 7, 2016 at 10:39 AM, Damian Guy <
>> > > damian....@gmail.com>
>> > > > > > > wrote:
>> > > > > > > >
>> > > > > > > > > Hi Matthias,
>> > > > > > > > >
>> > > > > > > > > Thanks for the feedback.
>> > > > > > > > >
>> > > > > > > > > There is no outer-join for GlobalKTables as the tables
>> may be
>> > > > keyed
>> > > > > > > > > differently. So you need to use the key from the left
>> side of
>> > > the
>> > > > > > join
>> > > > > > > > > along with the KeyValueMapper to resolve the right side of
>> > the
>> > > > > join.
>> > > > > > > This
>> > > > > > > > > wont work the other way around.
>> > > > > > > > >
>> > > > > > > > > On the bootstrapping concern. If the application is
>> failing
>> > > > before
>> > > > > > > > > bootstrapping finishes, the problem is likely to be
>> related
>> > to
>> > > a
>> > > > > > > terminal
>> > > > > > > > > exception, i.e., running out of disk space, corrupt state
>> > > stores
>> > > > > etc.
>> > > > > > > In
>> > > > > > > > > these cases, we wouldn't want the application to continue.
>> > So i
>> > > > > think
>> > > > > > > > this
>> > > > > > > > > is ok.
>> > > > > > > > >
>> > > > > > > > > Thanks,
>> > > > > > > > > Damian
>> > > > > > > > >
>> > > > > > > > > On Tue, 6 Dec 2016 at 21:56 Matthias J. Sax <
>> > > > matth...@confluent.io
>> > > > > >
>> > > > > > > > wrote:
>> > > > > > > > >
>> > > > > > > > > > Thanks for the KIP Damian. Very nice motivating example!
>> > > > > > > > > >
>> > > > > > > > > > A few comments:
>> > > > > > > > > >
>> > > > > > > > > >  - why is there no outer-join for GlobalKTables
>> > > > > > > > > >  - on bootstrapping GlobalKTable, could it happen that
>> this
>> > > > never
>> > > > > > > > > > finishes if the application fails before bootstrapping
>> > > finishes
>> > > > > and
>> > > > > > > new
>> > > > > > > > > > data gets written at the same time? Do we need to guard
>> > > against
>> > > > > > this
>> > > > > > > > > > (seems to be a very rare corner case, so maybe not
>> > required)?
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > > -Matthias
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > > On 12/6/16 2:09 AM, Damian Guy wrote:
>> > > > > > > > > > > Hi all,
>> > > > > > > > > > >
>> > > > > > > > > > > I would like to start the discussion on KIP-99:
>> > > > > > > > > > >
>> > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.
>> > > > > > > > > action?pageId=67633649
>> > > > > > > > > > >
>> > > > > > > > > > > Looking forward to your feedback.
>> > > > > > > > > > >
>> > > > > > > > > > > Thanks,
>> > > > > > > > > > > Damian
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > > --
>> > > > > > *Michael G. Noll*
>> > > > > > Product Manager | Confluent
>> > > > > > +1 650 453 5860 <(650)%20453-5860> <(650)%20453-5860>
>> > <(650)%20453-5860> | @miguno <
>> > > > https://twitter.com/miguno
>> > > > > >
>> > > > > > Follow us: Twitter <https://twitter.com/ConfluentInc> | Blog
>> > > > > > <http://www.confluent.io/blog>
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
>
>
> --
> -- Guozhang
>



-- 
-- Guozhang

Reply via email to