> However, I don't understand your argument about putting aggregate() > after the withXX() -- all the calls to withXX() set optional parameters > for aggregate() and not for groupBy() -- but a groupBy().withXX() > indicates that the withXX() belongs to the groupBy(). IMHO, this might > be quite confusion for developers. > > I see what you are saying, but the grouped stream is effectively a no-op until you call one of the aggregate/count/reduce etc functions. So the optional params are ones that are applicable to any of the operations you can perform on this grouped stream. Then the final count()/reduce()/aggregate() call has any of the params that are required/specific to that function.
> > -Matthias > > On 6/28/17 2:55 AM, Damian Guy wrote: > >> I also think that mixing optional parameters with configs is a bad idea. > >> Have not proposal for this atm but just wanted to mention it. Hope to > >> find some time to come up with something. > >> > >> > > Yes, i don't like the mix of config either. But the only real config here > > is the logging config - which we don't really need as it can already be > > done via a custom StateStoreSupplier. > > > > > >> What I don't like in the current proposal is the > >> .grouped().withKeyMapper() -- the current solution with .groupBy(...) > >> and .groupByKey() seems better. For clarity, we could rename to > >> .groupByNewKey(...) and .groupByCurrentKey() (even if we should find > >> some better names). > >> > >> > > it could be groupByKey(), groupBy() or something different bt > > > > > > > >> The proposed pattern "chains" grouping and aggregation too close > >> together. I would rather separate both more than less, ie, do into the > >> opposite direction. > >> > >> I am also wondering, if we could so something more "fluent". The initial > >> proposal was like: > >> > >>>> groupedStream.count() > >>>> .withStoreName("name") > >>>> .withCachingEnabled(false) > >>>> .withLoggingEnabled(config) > >>>> .table() > >> > >> The .table() statement in the end was kinda alien. > >> > > > > I agree, but then all of the withXXX methods need to be on KTable which > is > > worse in my opinion. You also need something that is going to "build" the > > internal processors and add them to the topology. > > > > > >> The current proposal put the count() into the end -- ie, the optional > >> parameter for count() have to specified on the .grouped() call -- this > >> does not seems to be the best way either. > >> > >> > > I actually prefer this method as you are building a grouped stream that > you > > will aggregate. So table.grouped(...).withOptionalStuff().aggregate(..) > etc > > seems natural to me. > > > > > >> I did not think this through in detail, but can't we just do the initial > >> proposal with the .table() ? > >> > >> groupedStream.count().withStoreName("name").mapValues(...) > >> > >> Each .withXXX(...) return the current KTable and all the .withXXX() are > >> just added to the KTable interface. Or do I miss anything why this wont' > >> work or any obvious disadvantage? > >> > >> > >> > > See above. > > > > > >> > >> -Matthias > >> > >> On 6/22/17 4:06 AM, Damian Guy wrote: > >>> Thanks everyone. My latest attempt is below. It builds on the fluent > >>> approach, but i think it is slightly nicer. > >>> I agree with some of what Eno said about mixing configy stuff in the > DSL, > >>> but i think that enabling caching and enabling logging are things that > >>> aren't actually config. I'd probably not add withLogConfig(...) (even > >>> though it is below) as this is actually config and we already have a > way > >> of > >>> doing that, via the StateStoreSupplier. Arguably we could use the > >>> StateStoreSupplier for disabling caching etc, but as it stands that is > a > >>> bit of a tedious process for someone that just wants to use the default > >>> storage engine, but not have caching enabled. > >>> > >>> There is also an orthogonal concern that Guozhang alluded to.... If you > >>> want to plug in a custom storage engine and you want it to be logged > etc, > >>> you would currently need to implement that yourself. Ideally we can > >> provide > >>> a way where we will wrap the custom store with logging, metrics, etc. I > >>> need to think about where this fits, it is probably more appropriate on > >> the > >>> Stores API. > >>> > >>> final KeyValueMapper<String, String, Long> keyMapper = null; > >>> // count with mapped key > >>> final KTable<Long, Long> count = stream.grouped() > >>> .withKeyMapper(keyMapper) > >>> .withKeySerde(Serdes.Long()) > >>> .withValueSerde(Serdes.String()) > >>> .withQueryableName("my-store") > >>> .count(); > >>> > >>> // windowed count > >>> final KTable<Windowed<String>, Long> windowedCount = stream.grouped() > >>> .withQueryableName("my-window-store") > >>> .windowed(TimeWindows.of(10L).until(10)) > >>> .count(); > >>> > >>> // windowed reduce > >>> final Reducer<String> windowedReducer = null; > >>> final KTable<Windowed<String>, String> windowedReduce = > stream.grouped() > >>> .withQueryableName("my-window-store") > >>> .windowed(TimeWindows.of(10L).until(10)) > >>> .reduce(windowedReducer); > >>> > >>> final Aggregator<String, String, Long> aggregator = null; > >>> final Initializer<Long> init = null; > >>> > >>> // aggregate > >>> final KTable<String, Long> aggregate = stream.grouped() > >>> .withQueryableName("my-aggregate-store") > >>> .aggregate(aggregator, init, Serdes.Long()); > >>> > >>> final StateStoreSupplier<KeyValueStore<String, Long>> > stateStoreSupplier > >> = null; > >>> > >>> // aggregate with custom store > >>> final KTable<String, Long> aggWithCustomStore = stream.grouped() > >>> .withStateStoreSupplier(stateStoreSupplier) > >>> .aggregate(aggregator, init); > >>> > >>> // disable caching > >>> stream.grouped() > >>> .withQueryableName("name") > >>> .withCachingEnabled(false) > >>> .count(); > >>> > >>> // disable logging > >>> stream.grouped() > >>> .withQueryableName("q") > >>> .withLoggingEnabled(false) > >>> .count(); > >>> > >>> // override log config > >>> final Reducer<String> reducer = null; > >>> stream.grouped() > >>> .withLogConfig(Collections.singletonMap("segment.size", "10")) > >>> .reduce(reducer); > >>> > >>> > >>> If anyone wants to play around with this you can find the code here: > >>> https://github.com/dguy/kafka/tree/dsl-experiment > >>> > >>> Note: It won't actually work as most of the methods just return null. > >>> > >>> Thanks, > >>> Damian > >>> > >>> > >>> On Thu, 22 Jun 2017 at 11:18 Ismael Juma <ism...@juma.me.uk> wrote: > >>> > >>>> Thanks Damian. I think both options have pros and cons. And both are > >> better > >>>> than overload abuse. > >>>> > >>>> The fluent API approach reads better, no mention of builder or build > >>>> anywhere. The main downside is that the method signatures are a little > >> less > >>>> clear. By reading the method signature, one doesn't necessarily knows > >> what > >>>> it returns. Also, one needs to figure out the special method > (`table()` > >> in > >>>> this case) that gives you what you actually care about (`KTable` in > this > >>>> case). Not major issues, but worth mentioning while doing the > >> comparison. > >>>> > >>>> The builder approach avoids the issues mentioned above, but it doesn't > >> read > >>>> as well. > >>>> > >>>> Ismael > >>>> > >>>> On Wed, Jun 21, 2017 at 3:37 PM, Damian Guy <damian....@gmail.com> > >> wrote: > >>>> > >>>>> Hi, > >>>>> > >>>>> I'd like to get a discussion going around some of the API choices > we've > >>>>> made in the DLS. In particular those that relate to stateful > operations > >>>>> (though this could expand). > >>>>> As it stands we lean heavily on overloaded methods in the API, i.e, > >> there > >>>>> are 9 overloads for KGroupedStream.count(..)! It is becoming noisy > and > >> i > >>>>> feel it is only going to get worse as we add more optional params. In > >>>>> particular we've had some requests to be able to turn caching off, or > >>>>> change log configs, on a per operator basis (note this can be done > now > >>>> if > >>>>> you pass in a StateStoreSupplier, but this can be a bit cumbersome). > >>>>> > >>>>> So this is a bit of an open question. How can we change the DSL > >> overloads > >>>>> so that it flows, is simple to use and understand, and is easily > >> extended > >>>>> in the future? > >>>>> > >>>>> One option would be to use a fluent API approach for providing the > >>>> optional > >>>>> params, so something like this: > >>>>> > >>>>> groupedStream.count() > >>>>> .withStoreName("name") > >>>>> .withCachingEnabled(false) > >>>>> .withLoggingEnabled(config) > >>>>> .table() > >>>>> > >>>>> > >>>>> > >>>>> Another option would be to provide a Builder to the count method, so > it > >>>>> would look something like this: > >>>>> groupedStream.count(new > >>>>> CountBuilder("storeName").withCachingEnabled(false).build()) > >>>>> > >>>>> Another option is to say: Hey we don't need this, what are you on > >> about! > >>>>> > >>>>> The above has focussed on state store related overloads, but the same > >>>> ideas > >>>>> could be applied to joins etc, where we presently have many join > >> methods > >>>>> and many overloads. > >>>>> > >>>>> Anyway, i look forward to hearing your opinions. > >>>>> > >>>>> Thanks, > >>>>> Damian > >>>>> > >>>> > >>> > >> > >> > > > >