Thanks Kyle. On Thu, 29 Jun 2017 at 15:11 Kyle Winkelman <winkelman.k...@gmail.com> wrote:
> Hi Damian, > > >>>> When trying to program in the fluent API that has been discussed most > it > >>>> feels difficult to know when you will actually get an object you can > reuse. > >>>> What if I make one KGroupedStream that I want to reuse, is it legal to > >>>> reuse it or does this approach expect you to call grouped each time? > > >> I'd anticipate that once you have a KGroupedStream you can re-use it as > you > >> can today. > > You said it yourself in another post that the grouped stream is > effectively a no-op until a count, reduce, or aggregate. The way I see it > you wouldn’t be able to reuse anything except KStreams and KTables, because > most of this fluent api would continue returning this (this being the > builder object currently being manipulated). So, if you ever store a reference to anything but KStreams and KTables and > you use it in two different ways then its possible you make conflicting > withXXX() calls on the same builder. > > No necessarily true. It could return a new instance of the builder, i.e., the builders being immutable. So if you held a reference to the builder it would always be the same as it was when it was created. > GroupedStream<K,V> groupedStreamWithDefaultSerdes = kStream.grouped(); > GroupedStream<K,V> groupedStreamWithDeclaredSerdes = > groupedStreamsWithDefaultSerdes.withKeySerde(…).withValueSerde(…); > > I’ll admit that this shouldn’t happen but some user is going to do it > eventually… > Depending on implementation uses of groupedStreamWithDefaultSerdes would > most likely be equivalent to the version withDeclaredSerdes. One work > around would be to always make copies of the config objects you are > building, but this approach has its own problem because now we have to > identify which configs are equivalent so we don’t create repeated > processors. > > The point of this long winded example is that we always have to be > thinking about all of the possible ways it could be misused by a user > (causing them to see hard to diagnose problems). > Exactly! That is the point of the discussion really. > > In my attempt at a couple methods with builders I feel that I could > confidently say the user couldn’t really mess it up. > > // Count > > KTable<String, Long> count = > > kGroupedStream.count(Count.count().withQueryableStoreName("my-store")); > The kGroupedStream is reusable and if they attempted to reuse the Count > for some reason it would throw an error message saying that a store named > “my-store” already exists. > > Yes i agree and i think using builders is my preferred pattern. Cheers, Damian > Thanks, > Kyle > > From: Damian Guy > Sent: Thursday, June 29, 2017 3:59 AM > To: dev@kafka.apache.org > Subject: Re: [DISCUSS] Streams DSL/StateStore Refactoring > > Hi Kyle, > > Thanks for your input. Really appreciated. > > On Thu, 29 Jun 2017 at 06:09 Kyle Winkelman <winkelman.k...@gmail.com> > wrote: > > > I like more of a builder pattern even though others have voiced against > > it. The reason I like it is because it makes it clear to the user that a > > call to KGroupedStream#count will return a KTable not some intermediate > > class that I need to undetstand. > > > > Yes, that makes sense. > > > > When trying to program in the fluent API that has been discussed most it > > feels difficult to know when you will actually get an object you can > reuse. > > What if I make one KGroupedStream that I want to reuse, is it legal to > > reuse it or does this approach expect you to call grouped each time? > > > I'd anticipate that once you have a KGroupedStream you can re-use it as you > can today. > > > > This question doesn’t pop into my head at all in the builder pattern I > > assume I can reuse everything. > > Finally, I like .groupByKey and .groupBy(KeyValueMapper) not a big fan of > > the grouped. > > > > Yes, grouped() was more for demonstration and because groupBy() and > groupByKey() were taken! So i'd imagine the api would actually want to be > groupByKey(/** no required args***/).withOptionalArg() and > groupBy(KeyValueMapper m).withOpitionalArg(...) of course this all depends > on maintaining backward compatibility. > > > > Unfortunately, the below approach would require atleast 2 (probably 3) > > overloads (one for returning a KTable and one for returning a KTable with > > Windowed Key, probably would want to split windowed and sessionwindowed > for > > ease of implementation) of each count, reduce, and aggregate. > > Obviously not exhaustive but enough for you to get the picture. Count, > > Reduce, and Aggregate supply 3 static methods to initialize the builder: > > // Count > > KTable<String, Long> count = > > groupedStream.count(Count.count().withQueryableStoreName("my-store")); > > > > // Windowed Count > > KTable<Windowed<String>, Long> windowedCount = > > > groupedStream.count(Count.windowed(TimeWindows.of(10L).until(10)).withQueryableStoreName("my-windowed-store")); > > > > // Session Count > > KTable<Windowed<String>, Long> sessionCount = > > > groupedStream.count(Count.sessionWindowed(SessionWindows.with(10L)).withQueryableStoreName("my-session-windowed-store")); > > > > > Above and below, i think i'd prefer it to be: > groupedStream.count(/** non windowed count**/) > groupedStream.windowed(TimeWindows.of(10L)).count(...) > groupedStream.sessionWindowed(SessionWindows.of(10L)).count(...) > > > > > > // Reduce > > Reducer<Long> reducer; > > KTable<String, Long> reduce = groupedStream.reduce(reducer, > > Reduce.reduce().withQueryableStoreName("my-store")); > > > > // Aggregate Windowed with Custom Store > > Initializer<String> initializer; > > Aggregator<String, Long, String> aggregator; > > KTable<Windowed<String>, String> aggregate = > > groupedStream.aggregate(initializer, aggregator, > > > Aggregate.windowed(TimeWindows.of(10L).until(10)).withStateStoreSupplier(stateStoreSupplier))); > > > > // Cogroup SessionWindowed > > KTable<String, String> cogrouped = groupedStream1.cogroup(aggregator1) > > .cogroup(groupedStream2, aggregator2) > > .aggregate(initializer, aggregator, > > Aggregate.sessionWindowed(SessionWindows.with(10L), > > sessionMerger).withQueryableStoreName("my-store")); > > > > > > > > public class Count { > > > > public static class Windowed extends Count { > > private Windows windows; > > } > > public static class SessionWindowed extends Count { > > private SessionWindows sessionWindows; > > } > > > > public static Count count(); > > public static Windowed windowed(Windows windows); > > public static SessionWindowed sessionWindowed(SessionWindows > > sessionWindows); > > > > // All withXXX(...) methods. > > } > > > > public class KGroupedStream { > > public KTable<K, Long> count(Count count); > > public KTable<Windowed<K>, Long> count(Count.Windowed count); > > public KTable<Windowed<K>, Long> count(Count.SessionWindowed count); > > … > > } > > > > > > Thanks, > > Kyle > > > > From: Guozhang Wang > > Sent: Wednesday, June 28, 2017 7:45 PM > > To: dev@kafka.apache.org > > Subject: Re: [DISCUSS] Streams DSL/StateStore Refactoring > > > > I played the current proposal a bit with https://github.com/dguy/kafka/ > > tree/dsl-experiment <https://github.com/dguy/kafka/tree/dsl-experiment>, > > and here are my observations: > > > > 1. Personally I prefer > > > > "stream.group(mapper) / stream.groupByKey()" > > > > than > > > > "stream.group().withKeyMapper(mapper) / stream.group()" > > > > Since 1) withKeyMapper is not enforced programmatically though it is not > > "really" optional like others, 2) syntax-wise it reads more natural. > > > > I think it is okay to add the APIs in ( > > > > > https://github.com/dguy/kafka/blob/dsl-experiment/streams/src/main/java/org/apache/kafka/streams/kstream/GroupedStream.java > > ) > > in KGroupedStream. > > > > > > 2. For the "withStateStoreSupplier" API, are the user supposed to pass in > > the most-inner state store supplier (e.g. then one whose get() return > > RocksDBStore), or it is supposed to return the most-outer supplier with > > logging / metrics / etc? I think it would be more useful to only require > > users pass in the inner state store supplier while specifying caching / > > logging through other APIs. > > > > In addition, the "GroupedWithCustomStore" is a bit suspicious to me: we > are > > allowing users to call other APIs like "withQueryableName" multiple time, > > but only call "withStateStoreSupplier" only once in the end. Why is that? > > > > > > 3. The current DSL seems to be only for aggregations, what about joins? > > > > > > 4. I think it is okay to keep the "withLogConfig": for the > > StateStoreSupplier it will still be user code specifying the topology so > I > > do not see there is a big difference. > > > > > > 5. "WindowedGroupedStream" 's withStateStoreSupplier should take the > > windowed state store supplier to enforce typing? > > > > > > Below are minor ones: > > > > 6. "withQueryableName": maybe better "withQueryableStateName"? > > > > 7. "withLogConfig": maybe better "withLoggingTopicConfig()"? > > > > > > > > Guozhang > > > > > > > > On Wed, Jun 28, 2017 at 3:59 PM, Matthias J. Sax <matth...@confluent.io> > > wrote: > > > > > I see your point about "when to add the processor to the topology". > That > > > is indeed an issue. Not sure it we could allow "updates" to the > > topology... > > > > > > I don't see any problem with having all the withXX() in KTable > interface > > > -- but this might be subjective. > > > > > > > > > However, I don't understand your argument about putting aggregate() > > > after the withXX() -- all the calls to withXX() set optional parameters > > > for aggregate() and not for groupBy() -- but a groupBy().withXX() > > > indicates that the withXX() belongs to the groupBy(). IMHO, this might > > > be quite confusion for developers. > > > > > > > > > -Matthias > > > > > > On 6/28/17 2:55 AM, Damian Guy wrote: > > > >> I also think that mixing optional parameters with configs is a bad > > idea. > > > >> Have not proposal for this atm but just wanted to mention it. Hope > to > > > >> find some time to come up with something. > > > >> > > > >> > > > > Yes, i don't like the mix of config either. But the only real config > > here > > > > is the logging config - which we don't really need as it can already > be > > > > done via a custom StateStoreSupplier. > > > > > > > > > > > >> What I don't like in the current proposal is the > > > >> .grouped().withKeyMapper() -- the current solution with > .groupBy(...) > > > >> and .groupByKey() seems better. For clarity, we could rename to > > > >> .groupByNewKey(...) and .groupByCurrentKey() (even if we should find > > > >> some better names). > > > >> > > > >> > > > > it could be groupByKey(), groupBy() or something different bt > > > > > > > > > > > > > > > >> The proposed pattern "chains" grouping and aggregation too close > > > >> together. I would rather separate both more than less, ie, do into > the > > > >> opposite direction. > > > >> > > > >> I am also wondering, if we could so something more "fluent". The > > initial > > > >> proposal was like: > > > >> > > > >>>> groupedStream.count() > > > >>>> .withStoreName("name") > > > >>>> .withCachingEnabled(false) > > > >>>> .withLoggingEnabled(config) > > > >>>> .table() > > > >> > > > >> The .table() statement in the end was kinda alien. > > > >> > > > > > > > > I agree, but then all of the withXXX methods need to be on KTable > which > > > is > > > > worse in my opinion. You also need something that is going to "build" > > the > > > > internal processors and add them to the topology. > > > > > > > > > > > >> The current proposal put the count() into the end -- ie, the > optional > > > >> parameter for count() have to specified on the .grouped() call -- > this > > > >> does not seems to be the best way either. > > > >> > > > >> > > > > I actually prefer this method as you are building a grouped stream > that > > > you > > > > will aggregate. So > table.grouped(...).withOptionalStuff().aggregate(..) > > > etc > > > > seems natural to me. > > > > > > > > > > > >> I did not think this through in detail, but can't we just do the > > initial > > > >> proposal with the .table() ? > > > >> > > > >> groupedStream.count().withStoreName("name").mapValues(...) > > > >> > > > >> Each .withXXX(...) return the current KTable and all the .withXXX() > > are > > > >> just added to the KTable interface. Or do I miss anything why this > > wont' > > > >> work or any obvious disadvantage? > > > >> > > > >> > > > >> > > > > See above. > > > > > > > > > > > >> > > > >> -Matthias > > > >> > > > >> On 6/22/17 4:06 AM, Damian Guy wrote: > > > >>> Thanks everyone. My latest attempt is below. It builds on the > fluent > > > >>> approach, but i think it is slightly nicer. > > > >>> I agree with some of what Eno said about mixing configy stuff in > the > > > DSL, > > > >>> but i think that enabling caching and enabling logging are things > > that > > > >>> aren't actually config. I'd probably not add withLogConfig(...) > (even > > > >>> though it is below) as this is actually config and we already have > a > > > way > > > >> of > > > >>> doing that, via the StateStoreSupplier. Arguably we could use the > > > >>> StateStoreSupplier for disabling caching etc, but as it stands that > > is > > > a > > > >>> bit of a tedious process for someone that just wants to use the > > default > > > >>> storage engine, but not have caching enabled. > > > >>> > > > >>> There is also an orthogonal concern that Guozhang alluded to.... If > > you > > > >>> want to plug in a custom storage engine and you want it to be > logged > > > etc, > > > >>> you would currently need to implement that yourself. Ideally we can > > > >> provide > > > >>> a way where we will wrap the custom store with logging, metrics, > > etc. I > > > >>> need to think about where this fits, it is probably more > appropriate > > on > > > >> the > > > >>> Stores API. > > > >>> > > > >>> final KeyValueMapper<String, String, Long> keyMapper = null; > > > >>> // count with mapped key > > > >>> final KTable<Long, Long> count = stream.grouped() > > > >>> .withKeyMapper(keyMapper) > > > >>> .withKeySerde(Serdes.Long()) > > > >>> .withValueSerde(Serdes.String()) > > > >>> .withQueryableName("my-store") > > > >>> .count(); > > > >>> > > > >>> // windowed count > > > >>> final KTable<Windowed<String>, Long> windowedCount = > stream.grouped() > > > >>> .withQueryableName("my-window-store") > > > >>> .windowed(TimeWindows.of(10L).until(10)) > > > >>> .count(); > > > >>> > > > >>> // windowed reduce > > > >>> final Reducer<String> windowedReducer = null; > > > >>> final KTable<Windowed<String>, String> windowedReduce = > > > stream.grouped() > > > >>> .withQueryableName("my-window-store") > > > >>> .windowed(TimeWindows.of(10L).until(10)) > > > >>> .reduce(windowedReducer); > > > >>> > > > >>> final Aggregator<String, String, Long> aggregator = null; > > > >>> final Initializer<Long> init = null; > > > >>> > > > >>> // aggregate > > > >>> final KTable<String, Long> aggregate = stream.grouped() > > > >>> .withQueryableName("my-aggregate-store") > > > >>> .aggregate(aggregator, init, Serdes.Long()); > > > >>> > > > >>> final StateStoreSupplier<KeyValueStore<String, Long>> > > > stateStoreSupplier > > > >> = null; > > > >>> > > > >>> // aggregate with custom store > > > >>> final KTable<String, Long> aggWithCustomStore = stream.grouped() > > > >>> .withStateStoreSupplier(stateStoreSupplier) > > > >>> .aggregate(aggregator, init); > > > >>> > > > >>> // disable caching > > > >>> stream.grouped() > > > >>> .withQueryableName("name") > > > >>> .withCachingEnabled(false) > > > >>> .count(); > > > >>> > > > >>> // disable logging > > > >>> stream.grouped() > > > >>> .withQueryableName("q") > > > >>> .withLoggingEnabled(false) > > > >>> .count(); > > > >>> > > > >>> // override log config > > > >>> final Reducer<String> reducer = null; > > > >>> stream.grouped() > > > >>> .withLogConfig(Collections.singletonMap("segment.size", > > "10")) > > > >>> .reduce(reducer); > > > >>> > > > >>> > > > >>> If anyone wants to play around with this you can find the code > here: > > > >>> https://github.com/dguy/kafka/tree/dsl-experiment > > > >>> > > > >>> Note: It won't actually work as most of the methods just return > null. > > > >>> > > > >>> Thanks, > > > >>> Damian > > > >>> > > > >>> > > > >>> On Thu, 22 Jun 2017 at 11:18 Ismael Juma <ism...@juma.me.uk> > wrote: > > > >>> > > > >>>> Thanks Damian. I think both options have pros and cons. And both > are > > > >> better > > > >>>> than overload abuse. > > > >>>> > > > >>>> The fluent API approach reads better, no mention of builder or > build > > > >>>> anywhere. The main downside is that the method signatures are a > > little > > > >> less > > > >>>> clear. By reading the method signature, one doesn't necessarily > > knows > > > >> what > > > >>>> it returns. Also, one needs to figure out the special method > > > (`table()` > > > >> in > > > >>>> this case) that gives you what you actually care about (`KTable` > in > > > this > > > >>>> case). Not major issues, but worth mentioning while doing the > > > >> comparison. > > > >>>> > > > >>>> The builder approach avoids the issues mentioned above, but it > > doesn't > > > >> read > > > >>>> as well. > > > >>>> > > > >>>> Ismael > > > >>>> > > > >>>> On Wed, Jun 21, 2017 at 3:37 PM, Damian Guy <damian....@gmail.com > > > > > >> wrote: > > > >>>> > > > >>>>> Hi, > > > >>>>> > > > >>>>> I'd like to get a discussion going around some of the API choices > > > we've > > > >>>>> made in the DLS. In particular those that relate to stateful > > > operations > > > >>>>> (though this could expand). > > > >>>>> As it stands we lean heavily on overloaded methods in the API, > i.e, > > > >> there > > > >>>>> are 9 overloads for KGroupedStream.count(..)! It is becoming > noisy > > > and > > > >> i > > > >>>>> feel it is only going to get worse as we add more optional > params. > > In > > > >>>>> particular we've had some requests to be able to turn caching > off, > > or > > > >>>>> change log configs, on a per operator basis (note this can be > done > > > now > > > >>>> if > > > >>>>> you pass in a StateStoreSupplier, but this can be a bit > > cumbersome). > > > >>>>> > > > >>>>> So this is a bit of an open question. How can we change the DSL > > > >> overloads > > > >>>>> so that it flows, is simple to use and understand, and is easily > > > >> extended > > > >>>>> in the future? > > > >>>>> > > > >>>>> One option would be to use a fluent API approach for providing > the > > > >>>> optional > > > >>>>> params, so something like this: > > > >>>>> > > > >>>>> groupedStream.count() > > > >>>>> .withStoreName("name") > > > >>>>> .withCachingEnabled(false) > > > >>>>> .withLoggingEnabled(config) > > > >>>>> .table() > > > >>>>> > > > >>>>> > > > >>>>> > > > >>>>> Another option would be to provide a Builder to the count method, > > so > > > it > > > >>>>> would look something like this: > > > >>>>> groupedStream.count(new > > > >>>>> CountBuilder("storeName").withCachingEnabled(false).build()) > > > >>>>> > > > >>>>> Another option is to say: Hey we don't need this, what are you on > > > >> about! > > > >>>>> > > > >>>>> The above has focussed on state store related overloads, but the > > same > > > >>>> ideas > > > >>>>> could be applied to joins etc, where we presently have many join > > > >> methods > > > >>>>> and many overloads. > > > >>>>> > > > >>>>> Anyway, i look forward to hearing your opinions. > > > >>>>> > > > >>>>> Thanks, > > > >>>>> Damian > > > >>>>> > > > >>>> > > > >>> > > > >> > > > >> > > > > > > > > > > > > > > > > -- > > -- Guozhang > > > > > >