1/2: Sounds good, let's remove the joins within KGlobalTable for now.

3. I see, makes sense.

Unfortunately since TopologyBuilder is a public class we cannot separate
its internal usage only functions like build / buildWithGlobalTables / etc
with other user functions like stream / table / etc. We need to consider
refactoring this interface sooner than later.

4/6. OK.


Guozhang


On Tue, Dec 20, 2016 at 2:16 PM, Damian Guy <damian....@gmail.com> wrote:

> Hi Guozhang,
>
> Thanks for your input. Answers below, but i'm thinking we should remove
> joins from GlobalKTables for the time being and re-visit if necessary in
> the future.
>
> 1. with a global table the joins are never really materialized (at least
> how i see it), rather they are just views on the existing global tables.
> I've deliberately taken this approach so we don't have to create yet
> another State Store and changelog topic etc. These all consume resources
> that i believe are unnecessary. So, i don't really see the point of having
> a materialize method. Further, one of the major benefits of joining two
> global tables is being able to query them via Interactive Queries. For this
> you need the name, so i think it makes sense to provide it with the join.
>
> 2. This has been discussed already in this thread (with Michael), and
> outerJoin is deliberately not part of the KIP. To be able to join both
> ways, as you suggest, requires that both inputs are able to map to the same
> key. This is not always going to be possible, i.e., relationships can be
> one way, so for that reason i felt it was best to not go down that path as
> we'd not be able to resolve it at the time that
> globalTable.join(otherGlobalTable,...) was called, and this would result
> in
> possible confusion. Also, to support this we'd need to physically
> materialize a StateStore that represents the join (which i think is a waste
> of resources), or, we'd need to provide another interface where we can map
> from the key of the resulting global table to the keys of both of the
> joined tables.
>
> 3. The intention is that the GlobalKTables are in a single topology that is
> owned and updated by a single thread. So yes it is necessary that they can
> be created separately.
>
> 4. Bootstrapping and maintaining of the state of GlobalKTables are done on
> a single thread. This thread will run simultaneously with the current
> StreamThreads. It doesn't make sense to move the bootstrapping of the
> StandbyTasks to this thread as they are logically part of a StreamThread,
> they are 'assigned' to the StreamThread. With GlobalKTables there is no
> assignment as such, the thread just maintains all of them.
>
> 5. Yes i'll update the KIP - the state directory will be under the same
> path as StreamsConfig.STATE_DIR_CONFIG, but it will be a specific
> directory, i.e, global_state, rather then being a task directory.
>
> 6. The whole point of GlobalKTables is to have a copy of ALL of the data on
> each node. I don't think it makes sense to be able to reset the starting
> position.
>
> Thanks,
> Damian
>
> On Tue, 20 Dec 2016 at 20:00 Guozhang Wang <wangg...@gmail.com> wrote:
>
> > One more thing to add:
> >
> > 6. For KGlobalTable, it is always bootstrapped from the beginning while
> for
> > other KTables, we are enabling users to override their resetting position
> > as in
> >
> > https://github.com/apache/kafka/pull/2007
> >
> > Should we consider doing the same for KGlobalTable as well?
> >
> >
> > Guozhang
> >
> >
> > On Tue, Dec 20, 2016 at 11:39 AM, Guozhang Wang <wangg...@gmail.com>
> > wrote:
> >
> > > Thanks for the very well written proposal, and sorry for the very-late
> > > review. I have a few comments here:
> > >
> > > 1. We are introducing a "queryableViewName" in the GlobalTable join
> > > results, while I'm wondering if we should just add a more general
> > function
> > > like "materialize" to KTable and KGlobalTable with the name to be used
> in
> > > queries?
> > >
> > > 2. For KGlobalTable's own "join" and "leftJoin": since we are only
> > passing
> > > the KeyValueMapper<K, V, K1> keyMapper it seems that for either case
> only
> > > the left hand side will logically "trigger" the join, which is
> different
> > to
> > > KTable's join semantics. I'm wondering if it would be more consistent
> to
> > > have them as:
> > >
> > >
> > > <K1, V1, R> GlobalKTable<K, R> join(final GlobalKTable<K1, V1> other,
> > >                                         final KeyValueMapper<K, V, K1>
> > > leftkeyMapper,
> > >                                         final KeyValueMapper<K1, V1, K>
> > > rightkeyMapper,
> > >                                         final ValueJoiner<V, V1, R>
> > joiner
> > >                                         final String
> queryableViewName);
> > >
> > > <K1, V1, R> GlobalKTable<K, R> outerJoin(final GlobalKTable<K1, V1>
> > other,
> > >                                          final KeyValueMapper<K, V, K1>
> > > leftkeyMapper,
> > >                                          final KeyValueMapper<K1, V1,
> K>
> > > rightkeyMapper,
> > >                                          final ValueJoiner<V, V1, R>
> > > joiner,
> > >                                          final String
> queryableViewName);
> > >
> > > <K1, V1, R> GlobalKTable<K, R> leftJoin(final GlobalKTable<K1, V1>
> other,
> > >                                         final KeyValueMapper<K, V, K1>
> > > keyMapper,
> > >                                         final ValueJoiner<V, V1, R>
> > > joiner,
> > >                                         final String
> queryableViewName);
> > >
> > >
> > > I.e. add another directional key mapper to join and also to outerJoin.
> > >
> > >
> > > 3. For "TopologyBuilder.buildGlobalStateTopology", is it necessary to
> > > have a separate function from "TopologyBuilder.build" itself? With
> global
> > > tables, is there any scenarios that we want to build the topology
> without
> > > the embedded global tables (i.e. still calling "build")?
> > >
> > > 4. As for implementation, you mentioned that global table bootstraping
> > > will be done in another dedicated thread. Could we also consider moving
> > the
> > > logic of bootstrapping the standby-replica state stores into this
> thread
> > as
> > > well, which can then leverage on the existing "restoreConsumer" that
> does
> > > not participate in the consumer group protocol? By doing this I think
> we
> > > can still avoid thread-synchronization while making the logic more
> clear
> > > (ideally the standby restoration do not really need to be in part of
> the
> > > stream thread's main loops).
> > >
> > > 5. Also for the global table's state directory, I'm assuming it will
> not
> > > be under the per-task directory as it is per instance. But could you
> > > elaborate a bit in the wiki about its directory as well? Also could we
> > > consider adding https://issues.apache.org/jira/browse/KAFKA-3522 along
> > > with this feature since we may need to change the directory path /
> > storage
> > > schema formats for these different types of stores moving forward.
> > >
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Fri, Dec 9, 2016 at 4:21 AM, Damian Guy <damian....@gmail.com>
> wrote:
> > >
> > >> Thanks for the update Michael.
> > >>
> > >> I just wanted to add that there is one crucial piece of information
> that
> > >> i've failed to add (I apologise).
> > >>
> > >> To me, the join between 2 Global Tables just produces a view on top of
> > the
> > >> underlying tables (this is the same as it works for KTables today). So
> > >> that
> > >> means there is no Physical StateStore that backs the join result, it
> is
> > >> just a Virtual StateStore that knows how to resolve the join when it
> is
> > >> required. I've deliberately taken this path so that we don't end up
> > having
> > >> yet another copy of the data, stored on local disk, and sent to
> another
> > >> change-log topic. This also reduces the memory overhead from creating
> > >> RocksDBStores and reduces load on the Thread based caches we have. So
> it
> > >> is
> > >> a resource optimization.
> > >>
> > >> So while it is technically possible to support outer joins, we would
> > need
> > >> to physically materialize the StateStore (and create a changelog-topic
> > for
> > >> it), or, we'd need to provide another interface where the user could
> map
> > >> from the outerJoin key to both of the other table keys. This is
> because
> > >> the
> > >> key of the outerJoin table could be either the key of the lhs table,
> or
> > >> the
> > >> rhs tables, or something completely different.
> > >>
> > >> With this and what you have mentioned above in mind i think we should
> > park
> > >> outerJoin support for this KIP and re-visit if and when we need it in
> > the
> > >> future.
> > >>
> > >> I'll update the KIP with this.
> > >>
> > >> Thanks,
> > >> Damian
> > >>
> > >> On Fri, 9 Dec 2016 at 09:53 Michael Noll <mich...@confluent.io>
> wrote:
> > >>
> > >> > Damian and I briefly chatted offline (thanks, Damian!), and here's
> the
> > >> > summary of my thoughts and conclusion.
> > >> >
> > >> > TL;DR: Let's skip outer join support for global tables.
> > >> >
> > >> > In more detail:
> > >> >
> > >> > - We agreed that, technically, we can add OUTER JOIN support.
> > However,
> > >> > outer joins only work if certain preconditions are met.  The
> > >> preconditions
> > >> > are IMHO simliar/the same as we have for the normal, partitioned
> > KTables
> > >> > (e.g. having matching keys and co-partitioned data for the tables),
> > but
> > >> in
> > >> > the case of global tables the user would need to meet all these
> > >> > preconditions in one big swing when specifying the params for the
> > outer
> > >> > join call.  Even so, you'd only know at run-time whether the
> > >> preconditions
> > >> > were actually met properly.
> > >> >
> > >> > - Hence it's quite likely that users will be confused about these
> > >> > preconditions and how to meet them, and -- from what we can tell --
> > use
> > >> > cases / user demand for outer joins have been rare.
> > >> >
> > >> > - So, long story short, even though we could add outer join support
> > we'd
> > >> > suggest to skip it for global tables.  If we subsequently learn that
> > is
> > >> a
> > >> > lot of user interest in that functionality, we still have the option
> > to
> > >> add
> > >> > it in the future.
> > >> >
> > >> >
> > >> > Best,
> > >> > Michael
> > >> >
> > >> >
> > >> >
> > >> >
> > >> >
> > >> >
> > >> > On Thu, Dec 8, 2016 at 6:31 PM, Damian Guy <damian....@gmail.com>
> > >> wrote:
> > >> >
> > >> > > Hi Michael,
> > >> > >
> > >> > > I don't see how that helps?
> > >> > >
> > >> > > Lets say we have tables Person(id, device_id, name, ...),
> Device(id,
> > >> > > person_id, type, ...), and both are keyed with same type. And we
> > have
> > >> a
> > >> > > stream, that for the sake of simplicity, has both person_id and
> > >> > device_id (
> > >> > > i know this is a bit contrived!)
> > >> > > so our join
> > >> > > person = builder.globalTable(...);
> > >> > > device = builder.globalTable(...);
> > >> > > personDevice = builder.outerJoin(device, ...);
> > >> > >
> > >> > > someStream = builder.stream(..);
> > >> > > // which id do i use to join with? person.id? device.id?
> > >> > > someStream.leftJoin(personDevice, ...)
> > >> > >
> > >> > > // Interactive Query on the view generated by the join of person
> and
> > >> > device
> > >> > > personDeviceStore = streams.store("personDevice",...);
> > >> > > // person.id? device.id?
> > >> > > personDeviceStore.get(someId);
> > >> > >
> > >> > > We get records
> > >> > > person id=1, device_id=2 ,...
> > >> > > device id=2, person_id=1, ...
> > >> > > stream person_id = 1, device_id = 2
> > >> > >
> > >> > > We could do the join between the GlobalTables both ways as each
> side
> > >> > could
> > >> > > map to the other sides key, but when i'm accessing the resulting
> > >> table,
> > >> > > personDevice, what is the key? The person.id ? the device.id? it
> > >> can't
> > >> > be
> > >> > > both of them.
> > >> > >
> > >> > > Thanks,
> > >> > > Damian
> > >> > >
> > >> > >
> > >> > >
> > >> > >
> > >> > > On Thu, 8 Dec 2016 at 15:51 Michael Noll <mich...@confluent.io>
> > >> wrote:
> > >> > >
> > >> > > > The key type returned by both KeyValueMappers (in the current
> > trunk
> > >> > > > version, that type is named `R`) would need to be the same for
> > this
> > >> to
> > >> > > > work.
> > >> > > >
> > >> > > >
> > >> > > > On Wed, Dec 7, 2016 at 4:46 PM, Damian Guy <
> damian....@gmail.com>
> > >> > wrote:
> > >> > > >
> > >> > > > > Michael,
> > >> > > > >
> > >> > > > > We can only support outerJoin if both tables are keyed the
> same
> > >> way.
> > >> > > Lets
> > >> > > > > say for example you can map both ways, however, the key for
> each
> > >> > table
> > >> > > is
> > >> > > > > of a different type. So t1 is long and t2 is string - what is
> > the
> > >> key
> > >> > > > type
> > >> > > > > of the resulting GlobalKTable? So when you subsequently join
> to
> > >> this
> > >> > > > table,
> > >> > > > > and do a lookup on it, which key are you using?
> > >> > > > >
> > >> > > > > Thanks,
> > >> > > > > Damian
> > >> > > > >
> > >> > > > > On Wed, 7 Dec 2016 at 14:31 Michael Noll <
> mich...@confluent.io>
> > >> > wrote:
> > >> > > > >
> > >> > > > > > Damian,
> > >> > > > > >
> > >> > > > > > yes, that makes sense.
> > >> > > > > >
> > >> > > > > > But I am still wondering:  In your example, there's no prior
> > >> > > knowledge
> > >> > > > > "can
> > >> > > > > > I map from t1->t2" that Streams can leverage for joining t1
> > and
> > >> t2
> > >> > > > other
> > >> > > > > > than blindly relying on the user to provide an appropriate
> > >> > > > KeyValueMapper
> > >> > > > > > for K1/V1 of t1 -> K2/V2 of t2.  In other words, if we allow
> > the
> > >> > user
> > >> > > > to
> > >> > > > > > provide a KeyValueMapper from t1->t2 (Streams does not know
> at
> > >> > > compile
> > >> > > > > time
> > >> > > > > > whether this mapping will actually work), then we can also
> > allow
> > >> > the
> > >> > > > user
> > >> > > > > > to provide a corresponding "reverse" mapper from t2->t1.
> That
> > >> is,
> > >> > we
> > >> > > > > could
> > >> > > > > > say that an outer join between two global tables IS
> supported,
> > >> but
> > >> > if
> > >> > > > and
> > >> > > > > > only if the user provides two KeyValueMappers, one for
> t1->t2
> > >> and
> > >> > one
> > >> > > > for
> > >> > > > > > t2->t1.
> > >> > > > > >
> > >> > > > > > The left join t1->t2 (which is supported in the KIP), in
> > >> general,
> > >> > > works
> > >> > > > > > only because of the existence of the user-provided
> > >> KeyValueMapper
> > >> > > from
> > >> > > > > > t1->t2.  The outer join, as you point out, cannot satisfied
> as
> > >> > easily
> > >> > > > > > because Streams must know two mappers, t1->t2 plus t2->t1 --
> > >> > > otherwise
> > >> > > > > the
> > >> > > > > > outer join won't work.
> > >> > > > > >
> > >> > > > > >
> > >> > > > > >
> > >> > > > > >
> > >> > > > > >
> > >> > > > > > On Wed, Dec 7, 2016 at 3:04 PM, Damian Guy <
> > >> damian....@gmail.com>
> > >> > > > wrote:
> > >> > > > > >
> > >> > > > > > > Hi Michael,
> > >> > > > > > >
> > >> > > > > > > Sure. Say we have 2 input topics t1 & t2 below:
> > >> > > > > > > t1{
> > >> > > > > > >  int key;
> > >> > > > > > >  string t2_id;
> > >> > > > > > >  ...
> > >> > > > > > > }
> > >> > > > > > >
> > >> > > > > > > t2 {
> > >> > > > > > >   string key;
> > >> > > > > > >   ..
> > >> > > > > > > }
> > >> > > > > > > If we create global tables out of these we'd get:
> > >> > > > > > > GlobalKTable<Integer, ?> t1;
> > >> > > > > > > GlobalKTable<String, ?> t2;
> > >> > > > > > >
> > >> > > > > > > So the join can only go in 1 direction, i.e, from t1 -> t2
> > as
> > >> in
> > >> > > > order
> > >> > > > > to
> > >> > > > > > > perform the join we need to use a KeyValueMapper to
> extract
> > >> the
> > >> > t2
> > >> > > > key
> > >> > > > > > from
> > >> > > > > > > the t1 value.
> > >> > > > > > >
> > >> > > > > > > Does that make sense?
> > >> > > > > > >
> > >> > > > > > > Thanks,
> > >> > > > > > > Damian
> > >> > > > > > >
> > >> > > > > > > On Wed, 7 Dec 2016 at 10:44 Michael Noll <
> > >> mich...@confluent.io>
> > >> > > > wrote:
> > >> > > > > > >
> > >> > > > > > > > > There is no outer-join for GlobalKTables as the tables
> > >> may be
> > >> > > > keyed
> > >> > > > > > > > > differently. So you need to use the key from the left
> > >> side of
> > >> > > the
> > >> > > > > > join
> > >> > > > > > > > > along with the KeyValueMapper to resolve the right
> side
> > of
> > >> > the
> > >> > > > > join.
> > >> > > > > > > This
> > >> > > > > > > > > wont work the other way around.
> > >> > > > > > > >
> > >> > > > > > > > Care to elaborate why it won't work the other way
> around?
> > >> If,
> > >> > > for
> > >> > > > > > > example,
> > >> > > > > > > > we swapped the call from leftTable.join(rightTable) to
> > >> > > > > > > > rightTable.join(leftTable), that join would work, too.
> > >> > Perhaps I
> > >> > > > am
> > >> > > > > > > > missing something though. :-)
> > >> > > > > > > >
> > >> > > > > > > >
> > >> > > > > > > >
> > >> > > > > > > >
> > >> > > > > > > > On Wed, Dec 7, 2016 at 10:39 AM, Damian Guy <
> > >> > > damian....@gmail.com>
> > >> > > > > > > wrote:
> > >> > > > > > > >
> > >> > > > > > > > > Hi Matthias,
> > >> > > > > > > > >
> > >> > > > > > > > > Thanks for the feedback.
> > >> > > > > > > > >
> > >> > > > > > > > > There is no outer-join for GlobalKTables as the tables
> > >> may be
> > >> > > > keyed
> > >> > > > > > > > > differently. So you need to use the key from the left
> > >> side of
> > >> > > the
> > >> > > > > > join
> > >> > > > > > > > > along with the KeyValueMapper to resolve the right
> side
> > of
> > >> > the
> > >> > > > > join.
> > >> > > > > > > This
> > >> > > > > > > > > wont work the other way around.
> > >> > > > > > > > >
> > >> > > > > > > > > On the bootstrapping concern. If the application is
> > >> failing
> > >> > > > before
> > >> > > > > > > > > bootstrapping finishes, the problem is likely to be
> > >> related
> > >> > to
> > >> > > a
> > >> > > > > > > terminal
> > >> > > > > > > > > exception, i.e., running out of disk space, corrupt
> > state
> > >> > > stores
> > >> > > > > etc.
> > >> > > > > > > In
> > >> > > > > > > > > these cases, we wouldn't want the application to
> > continue.
> > >> > So i
> > >> > > > > think
> > >> > > > > > > > this
> > >> > > > > > > > > is ok.
> > >> > > > > > > > >
> > >> > > > > > > > > Thanks,
> > >> > > > > > > > > Damian
> > >> > > > > > > > >
> > >> > > > > > > > > On Tue, 6 Dec 2016 at 21:56 Matthias J. Sax <
> > >> > > > matth...@confluent.io
> > >> > > > > >
> > >> > > > > > > > wrote:
> > >> > > > > > > > >
> > >> > > > > > > > > > Thanks for the KIP Damian. Very nice motivating
> > example!
> > >> > > > > > > > > >
> > >> > > > > > > > > > A few comments:
> > >> > > > > > > > > >
> > >> > > > > > > > > >  - why is there no outer-join for GlobalKTables
> > >> > > > > > > > > >  - on bootstrapping GlobalKTable, could it happen
> that
> > >> this
> > >> > > > never
> > >> > > > > > > > > > finishes if the application fails before
> bootstrapping
> > >> > > finishes
> > >> > > > > and
> > >> > > > > > > new
> > >> > > > > > > > > > data gets written at the same time? Do we need to
> > guard
> > >> > > against
> > >> > > > > > this
> > >> > > > > > > > > > (seems to be a very rare corner case, so maybe not
> > >> > required)?
> > >> > > > > > > > > >
> > >> > > > > > > > > >
> > >> > > > > > > > > > -Matthias
> > >> > > > > > > > > >
> > >> > > > > > > > > >
> > >> > > > > > > > > > On 12/6/16 2:09 AM, Damian Guy wrote:
> > >> > > > > > > > > > > Hi all,
> > >> > > > > > > > > > >
> > >> > > > > > > > > > > I would like to start the discussion on KIP-99:
> > >> > > > > > > > > > >
> > >> > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.
> > >> > > > > > > > > action?pageId=67633649
> > >> > > > > > > > > > >
> > >> > > > > > > > > > > Looking forward to your feedback.
> > >> > > > > > > > > > >
> > >> > > > > > > > > > > Thanks,
> > >> > > > > > > > > > > Damian
> > >> > > > > > > > > > >
> > >> > > > > > > > > >
> > >> > > > > > > > > >
> > >> > > > > > > > >
> > >> > > > > > > >
> > >> > > > > > >
> > >> > > > > >
> > >> > > > > >
> > >> > > > > >
> > >> > > > > > --
> > >> > > > > > *Michael G. Noll*
> > >> > > > > > Product Manager | Confluent
> > >> > > > > > +1 650 453 5860 <(650)%20453-5860> <(650)%20453-5860>
> > <(650)%20453-5860>
> > >> > <(650)%20453-5860> | @miguno <
> > >> > > > https://twitter.com/miguno
> > >> > > > > >
> > >> > > > > > Follow us: Twitter <https://twitter.com/ConfluentInc> |
> Blog
> > >> > > > > > <http://www.confluent.io/blog>
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Reply via email to