I have been thinking about reducing all these overloaded functions for
stateful operations (there are some other places that introduces overloaded
functions but let's focus on these only in this discussion), what I used to
have is to use some "materialize" function on the KTables, like:

---------------------------------------

// specifying the topology

KStream stream1 = builder.stream();
KTable table1 = stream1.groupby(...).aggregate(initializer, aggregator,
sessionMerger, sessionWindows);  // do not allow to pass-in a state store
supplier here any more

// additional specs along with the topology above

table1.materialize("queryableStoreName"); // or..
table1.materialize("queryableStoreName").enableCaching().enableLogging();
// or..
table1.materialize(stateStoreSupplier); // add the metrics / logging /
caching / windowing functionalities on top of the store, or..
table1.materialize(stateStoreSupplier).enableCaching().enableLogging(); //
etc..

---------------------------------------

But thinking about it more, I feel Damian's first proposal is better since
my proposal would likely to break the concatenation (e.g. we may not be
able to do sth. like "table1.filter().map().groupBy().aggregate()" if we
want to use different specs for the intermediate filtered KTable).


But since this is a incompatibility change, and we are going to remove the
compatibility annotations soon it means we only have one chance and we
really have to make it right. So I'd call out for anyone try to rewrite
your examples / demo code with the proposed new API and see if it feel
natural, for example, if I want to use a different storage engine than the
default rockDB engine how could I easily specify that with the proposed
APIs?

Meanwhile Damian could you provide a formal set of APIs for people to
exercise on them? Also could you briefly describe how custom storage
engines could be swapped in with the above APIs?



Guozhang


On Wed, Jun 21, 2017 at 9:08 AM, Eno Thereska <eno.there...@gmail.com>
wrote:

> To make it clear, it’s outlined by Damian, I just copy pasted what he told
> me in person :)
>
> Eno
>
> > On Jun 21, 2017, at 4:40 PM, Bill Bejeck <bbej...@gmail.com> wrote:
> >
> > +1 for the approach outlined above by Eno.
> >
> > On Wed, Jun 21, 2017 at 11:28 AM, Damian Guy <damian....@gmail.com>
> wrote:
> >
> >> Thanks Eno.
> >>
> >> Yes i agree. We could apply this same approach to most of the operations
> >> where we have multiple overloads, i.e., we have a single method for each
> >> operation that takes the required parameters and everything else is
> >> specified as you have done above.
> >>
> >> On Wed, 21 Jun 2017 at 16:24 Eno Thereska <eno.there...@gmail.com>
> wrote:
> >>
> >>> (cc’ing user-list too)
> >>>
> >>> Given that we already have StateStoreSuppliers that are configurable
> >> using
> >>> the fluent-like API, probably it’s worth discussing the other examples
> >> with
> >>> joins and serdes first since those have many overloads and are in need
> of
> >>> some TLC.
> >>>
> >>> So following your example, I guess you’d have something like:
> >>> .join()
> >>>   .withKeySerdes(…)
> >>>   .withValueSerdes(…)
> >>>   .withJoinType(“outer”)
> >>>
> >>> etc?
> >>>
> >>> I like the approach since it still remains declarative and it’d reduce
> >> the
> >>> number of overloads by quite a bit.
> >>>
> >>> Eno
> >>>
> >>>> On Jun 21, 2017, at 3:37 PM, Damian Guy <damian....@gmail.com> wrote:
> >>>>
> >>>> Hi,
> >>>>
> >>>> I'd like to get a discussion going around some of the API choices
> we've
> >>>> made in the DLS. In particular those that relate to stateful
> operations
> >>>> (though this could expand).
> >>>> As it stands we lean heavily on overloaded methods in the API, i.e,
> >> there
> >>>> are 9 overloads for KGroupedStream.count(..)! It is becoming noisy and
> >> i
> >>>> feel it is only going to get worse as we add more optional params. In
> >>>> particular we've had some requests to be able to turn caching off, or
> >>>> change log configs,  on a per operator basis (note this can be done
> now
> >>> if
> >>>> you pass in a StateStoreSupplier, but this can be a bit cumbersome).
> >>>>
> >>>> So this is a bit of an open question. How can we change the DSL
> >> overloads
> >>>> so that it flows, is simple to use and understand, and is easily
> >> extended
> >>>> in the future?
> >>>>
> >>>> One option would be to use a fluent API approach for providing the
> >>> optional
> >>>> params, so something like this:
> >>>>
> >>>> groupedStream.count()
> >>>>  .withStoreName("name")
> >>>>  .withCachingEnabled(false)
> >>>>  .withLoggingEnabled(config)
> >>>>  .table()
> >>>>
> >>>>
> >>>>
> >>>> Another option would be to provide a Builder to the count method, so
> it
> >>>> would look something like this:
> >>>> groupedStream.count(new
> >>>> CountBuilder("storeName").withCachingEnabled(false).build())
> >>>>
> >>>> Another option is to say: Hey we don't need this, what are you on
> >> about!
> >>>>
> >>>> The above has focussed on state store related overloads, but the same
> >>> ideas
> >>>> could  be applied to joins etc, where we presently have many join
> >> methods
> >>>> and many overloads.
> >>>>
> >>>> Anyway, i look forward to hearing your opinions.
> >>>>
> >>>> Thanks,
> >>>> Damian
> >>>
> >>>
> >>
>
>


-- 
-- Guozhang

Reply via email to