Note that while I agree with the initial proposal (withKeySerdes, withJoinType, etc), I don't agree with things like .materialize(), .enableCaching(), .enableLogging().
The former maintain the declarative DSL, while the later break the declarative part by mixing system decisions in the DSL. I think there is a difference between the two proposals. Eno > On 22 Jun 2017, at 03:46, Guozhang Wang <wangg...@gmail.com> wrote: > > I have been thinking about reducing all these overloaded functions for > stateful operations (there are some other places that introduces overloaded > functions but let's focus on these only in this discussion), what I used to > have is to use some "materialize" function on the KTables, like: > > --------------------------------------- > > // specifying the topology > > KStream stream1 = builder.stream(); > KTable table1 = stream1.groupby(...).aggregate(initializer, aggregator, > sessionMerger, sessionWindows); // do not allow to pass-in a state store > supplier here any more > > // additional specs along with the topology above > > table1.materialize("queryableStoreName"); // or.. > table1.materialize("queryableStoreName").enableCaching().enableLogging(); > // or.. > table1.materialize(stateStoreSupplier); // add the metrics / logging / > caching / windowing functionalities on top of the store, or.. > table1.materialize(stateStoreSupplier).enableCaching().enableLogging(); // > etc.. > > --------------------------------------- > > But thinking about it more, I feel Damian's first proposal is better since > my proposal would likely to break the concatenation (e.g. we may not be > able to do sth. like "table1.filter().map().groupBy().aggregate()" if we > want to use different specs for the intermediate filtered KTable). > > > But since this is a incompatibility change, and we are going to remove the > compatibility annotations soon it means we only have one chance and we > really have to make it right. So I'd call out for anyone try to rewrite > your examples / demo code with the proposed new API and see if it feel > natural, for example, if I want to use a different storage engine than the > default rockDB engine how could I easily specify that with the proposed > APIs? > > Meanwhile Damian could you provide a formal set of APIs for people to > exercise on them? Also could you briefly describe how custom storage > engines could be swapped in with the above APIs? > > > > Guozhang > > > On Wed, Jun 21, 2017 at 9:08 AM, Eno Thereska <eno.there...@gmail.com> > wrote: > >> To make it clear, it’s outlined by Damian, I just copy pasted what he told >> me in person :) >> >> Eno >> >>> On Jun 21, 2017, at 4:40 PM, Bill Bejeck <bbej...@gmail.com> wrote: >>> >>> +1 for the approach outlined above by Eno. >>> >>> On Wed, Jun 21, 2017 at 11:28 AM, Damian Guy <damian....@gmail.com> >> wrote: >>> >>>> Thanks Eno. >>>> >>>> Yes i agree. We could apply this same approach to most of the operations >>>> where we have multiple overloads, i.e., we have a single method for each >>>> operation that takes the required parameters and everything else is >>>> specified as you have done above. >>>> >>>> On Wed, 21 Jun 2017 at 16:24 Eno Thereska <eno.there...@gmail.com> >> wrote: >>>> >>>>> (cc’ing user-list too) >>>>> >>>>> Given that we already have StateStoreSuppliers that are configurable >>>> using >>>>> the fluent-like API, probably it’s worth discussing the other examples >>>> with >>>>> joins and serdes first since those have many overloads and are in need >> of >>>>> some TLC. >>>>> >>>>> So following your example, I guess you’d have something like: >>>>> .join() >>>>> .withKeySerdes(…) >>>>> .withValueSerdes(…) >>>>> .withJoinType(“outer”) >>>>> >>>>> etc? >>>>> >>>>> I like the approach since it still remains declarative and it’d reduce >>>> the >>>>> number of overloads by quite a bit. >>>>> >>>>> Eno >>>>> >>>>>> On Jun 21, 2017, at 3:37 PM, Damian Guy <damian....@gmail.com> wrote: >>>>>> >>>>>> Hi, >>>>>> >>>>>> I'd like to get a discussion going around some of the API choices >> we've >>>>>> made in the DLS. In particular those that relate to stateful >> operations >>>>>> (though this could expand). >>>>>> As it stands we lean heavily on overloaded methods in the API, i.e, >>>> there >>>>>> are 9 overloads for KGroupedStream.count(..)! It is becoming noisy and >>>> i >>>>>> feel it is only going to get worse as we add more optional params. In >>>>>> particular we've had some requests to be able to turn caching off, or >>>>>> change log configs, on a per operator basis (note this can be done >> now >>>>> if >>>>>> you pass in a StateStoreSupplier, but this can be a bit cumbersome). >>>>>> >>>>>> So this is a bit of an open question. How can we change the DSL >>>> overloads >>>>>> so that it flows, is simple to use and understand, and is easily >>>> extended >>>>>> in the future? >>>>>> >>>>>> One option would be to use a fluent API approach for providing the >>>>> optional >>>>>> params, so something like this: >>>>>> >>>>>> groupedStream.count() >>>>>> .withStoreName("name") >>>>>> .withCachingEnabled(false) >>>>>> .withLoggingEnabled(config) >>>>>> .table() >>>>>> >>>>>> >>>>>> >>>>>> Another option would be to provide a Builder to the count method, so >> it >>>>>> would look something like this: >>>>>> groupedStream.count(new >>>>>> CountBuilder("storeName").withCachingEnabled(false).build()) >>>>>> >>>>>> Another option is to say: Hey we don't need this, what are you on >>>> about! >>>>>> >>>>>> The above has focussed on state store related overloads, but the same >>>>> ideas >>>>>> could be applied to joins etc, where we presently have many join >>>> methods >>>>>> and many overloads. >>>>>> >>>>>> Anyway, i look forward to hearing your opinions. >>>>>> >>>>>> Thanks, >>>>>> Damian >>>>> >>>>> >>>> >> >> > > > -- > -- Guozhang