Hi Kyle,

Thanks for your input. Really appreciated.

On Thu, 29 Jun 2017 at 06:09 Kyle Winkelman <winkelman.k...@gmail.com>
wrote:

> I like more of a builder pattern even though others have voiced against
> it. The reason I like it is because it makes it clear to the user that a
> call to KGroupedStream#count will return a KTable not some intermediate
> class that I need to undetstand.
>

Yes, that makes sense.


> When trying to program in the fluent API that has been discussed most it
> feels difficult to know when you will actually get an object you can reuse.
> What if I make one KGroupedStream that I want to reuse, is it legal to
> reuse it or does this approach expect you to call grouped each time?


I'd anticipate that once you have a KGroupedStream you can re-use it as you
can today.


> This question doesn’t pop into my head at all in the builder pattern I
> assume I can reuse everything.
> Finally, I like .groupByKey and .groupBy(KeyValueMapper) not a big fan of
> the grouped.
>
> Yes, grouped() was more for demonstration and because groupBy() and
groupByKey() were taken! So i'd imagine the api would actually want to be
groupByKey(/** no required args***/).withOptionalArg() and
groupBy(KeyValueMapper m).withOpitionalArg(...)  of course this all depends
on maintaining backward compatibility.


> Unfortunately, the below approach would require atleast 2 (probably 3)
> overloads (one for returning a KTable and one for returning a KTable with
> Windowed Key, probably would want to split windowed and sessionwindowed for
> ease of implementation) of each count, reduce, and aggregate.
> Obviously not exhaustive but enough for you to get the picture. Count,
> Reduce, and Aggregate supply 3 static methods to initialize the builder:
> // Count
> KTable<String, Long> count =
> groupedStream.count(Count.count().withQueryableStoreName("my-store"));
>
> // Windowed Count
> KTable<Windowed<String>, Long> windowedCount =
> groupedStream.count(Count.windowed(TimeWindows.of(10L).until(10)).withQueryableStoreName("my-windowed-store"));
>
> // Session Count
> KTable<Windowed<String>, Long> sessionCount =
> groupedStream.count(Count.sessionWindowed(SessionWindows.with(10L)).withQueryableStoreName("my-session-windowed-store"));
>
>
Above and below, i think i'd prefer it to be:
groupedStream.count(/** non windowed count**/)
groupedStream.windowed(TimeWindows.of(10L)).count(...)
groupedStream.sessionWindowed(SessionWindows.of(10L)).count(...)




> // Reduce
> Reducer<Long> reducer;
> KTable<String, Long> reduce = groupedStream.reduce(reducer,
> Reduce.reduce().withQueryableStoreName("my-store"));
>
> // Aggregate Windowed with Custom Store
> Initializer<String> initializer;
> Aggregator<String, Long, String> aggregator;
> KTable<Windowed<String>, String> aggregate =
> groupedStream.aggregate(initializer, aggregator,
> Aggregate.windowed(TimeWindows.of(10L).until(10)).withStateStoreSupplier(stateStoreSupplier)));
>
> // Cogroup SessionWindowed
> KTable<String, String> cogrouped = groupedStream1.cogroup(aggregator1)
>         .cogroup(groupedStream2, aggregator2)
>         .aggregate(initializer, aggregator,
> Aggregate.sessionWindowed(SessionWindows.with(10L),
> sessionMerger).withQueryableStoreName("my-store"));
>
>
>
> public class Count {
>
>     public static class Windowed extends Count {
>         private Windows windows;
>     }
>     public static class SessionWindowed extends Count {
>         private SessionWindows sessionWindows;
>     }
>
>     public static Count count();
>     public static Windowed windowed(Windows windows);
>     public static SessionWindowed sessionWindowed(SessionWindows
> sessionWindows);
>
>     // All withXXX(...) methods.
> }
>
> public class KGroupedStream {
>     public KTable<K, Long> count(Count count);
>     public KTable<Windowed<K>, Long> count(Count.Windowed count);
>     public KTable<Windowed<K>, Long> count(Count.SessionWindowed count);
> …
> }
>
>
> Thanks,
> Kyle
>
> From: Guozhang Wang
> Sent: Wednesday, June 28, 2017 7:45 PM
> To: dev@kafka.apache.org
> Subject: Re: [DISCUSS] Streams DSL/StateStore Refactoring
>
> I played the current proposal a bit with https://github.com/dguy/kafka/
> tree/dsl-experiment <https://github.com/dguy/kafka/tree/dsl-experiment>,
> and here are my observations:
>
> 1. Personally I prefer
>
>     "stream.group(mapper) / stream.groupByKey()"
>
> than
>
>     "stream.group().withKeyMapper(mapper) / stream.group()"
>
> Since 1) withKeyMapper is not enforced programmatically though it is not
> "really" optional like others, 2) syntax-wise it reads more natural.
>
> I think it is okay to add the APIs in (
>
> https://github.com/dguy/kafka/blob/dsl-experiment/streams/src/main/java/org/apache/kafka/streams/kstream/GroupedStream.java
> )
> in KGroupedStream.
>
>
> 2. For the "withStateStoreSupplier" API, are the user supposed to pass in
> the most-inner state store supplier (e.g. then one whose get() return
> RocksDBStore), or it is supposed to return the most-outer supplier with
> logging / metrics / etc? I think it would be more useful to only require
> users pass in the inner state store supplier while specifying caching /
> logging through other APIs.
>
> In addition, the "GroupedWithCustomStore" is a bit suspicious to me: we are
> allowing users to call other APIs like "withQueryableName" multiple time,
> but only call "withStateStoreSupplier" only once in the end. Why is that?
>
>
> 3. The current DSL seems to be only for aggregations, what about joins?
>
>
> 4. I think it is okay to keep the "withLogConfig": for the
> StateStoreSupplier it will still be user code specifying the topology so I
> do not see there is a big difference.
>
>
> 5. "WindowedGroupedStream" 's withStateStoreSupplier should take the
> windowed state store supplier to enforce typing?
>
>
> Below are minor ones:
>
> 6. "withQueryableName": maybe better "withQueryableStateName"?
>
> 7. "withLogConfig": maybe better "withLoggingTopicConfig()"?
>
>
>
> Guozhang
>
>
>
> On Wed, Jun 28, 2017 at 3:59 PM, Matthias J. Sax <matth...@confluent.io>
> wrote:
>
> > I see your point about "when to add the processor to the topology". That
> > is indeed an issue. Not sure it we could allow "updates" to the
> topology...
> >
> > I don't see any problem with having all the withXX() in KTable interface
> > -- but this might be subjective.
> >
> >
> > However, I don't understand your argument about putting aggregate()
> > after the withXX() -- all the calls to withXX() set optional parameters
> > for aggregate() and not for groupBy() -- but a groupBy().withXX()
> > indicates that the withXX() belongs to the groupBy(). IMHO, this might
> > be quite confusion for developers.
> >
> >
> > -Matthias
> >
> > On 6/28/17 2:55 AM, Damian Guy wrote:
> > >> I also think that mixing optional parameters with configs is a bad
> idea.
> > >> Have not proposal for this atm but just wanted to mention it. Hope to
> > >> find some time to come up with something.
> > >>
> > >>
> > > Yes, i don't like the mix of config either. But the only real config
> here
> > > is the logging config - which we don't really need as it can already be
> > > done via a custom StateStoreSupplier.
> > >
> > >
> > >> What I don't like in the current proposal is the
> > >> .grouped().withKeyMapper() -- the current solution with .groupBy(...)
> > >> and .groupByKey() seems better. For clarity, we could rename to
> > >> .groupByNewKey(...) and .groupByCurrentKey() (even if we should find
> > >> some better names).
> > >>
> > >>
> > > it could be groupByKey(), groupBy() or something different bt
> > >
> > >
> > >
> > >> The proposed pattern "chains" grouping and aggregation too close
> > >> together. I would rather separate both more than less, ie, do into the
> > >> opposite direction.
> > >>
> > >> I am also wondering, if we could so something more "fluent". The
> initial
> > >> proposal was like:
> > >>
> > >>>> groupedStream.count()
> > >>>>    .withStoreName("name")
> > >>>>    .withCachingEnabled(false)
> > >>>>    .withLoggingEnabled(config)
> > >>>>    .table()
> > >>
> > >> The .table() statement in the end was kinda alien.
> > >>
> > >
> > > I agree, but then all of the withXXX methods need to be on KTable which
> > is
> > > worse in my opinion. You also need something that is going to "build"
> the
> > > internal processors and add them to the topology.
> > >
> > >
> > >> The current proposal put the count() into the end -- ie, the optional
> > >> parameter for count() have to specified on the .grouped() call -- this
> > >> does not seems to be the best way either.
> > >>
> > >>
> > > I actually prefer this method as you are building a grouped stream that
> > you
> > > will aggregate. So table.grouped(...).withOptionalStuff().aggregate(..)
> > etc
> > > seems natural to me.
> > >
> > >
> > >> I did not think this through in detail, but can't we just do the
> initial
> > >> proposal with the .table() ?
> > >>
> > >> groupedStream.count().withStoreName("name").mapValues(...)
> > >>
> > >> Each .withXXX(...) return the current KTable and all the .withXXX()
> are
> > >> just added to the KTable interface. Or do I miss anything why this
> wont'
> > >> work or any obvious disadvantage?
> > >>
> > >>
> > >>
> > > See above.
> > >
> > >
> > >>
> > >> -Matthias
> > >>
> > >> On 6/22/17 4:06 AM, Damian Guy wrote:
> > >>> Thanks everyone. My latest attempt is below. It builds on the fluent
> > >>> approach, but i think it is slightly nicer.
> > >>> I agree with some of what Eno said about mixing configy stuff in the
> > DSL,
> > >>> but i think that enabling caching and enabling logging are things
> that
> > >>> aren't actually config. I'd probably not add withLogConfig(...) (even
> > >>> though it is below) as this is actually config and we already have a
> > way
> > >> of
> > >>> doing that, via the StateStoreSupplier. Arguably we could use the
> > >>> StateStoreSupplier for disabling caching etc, but as it stands that
> is
> > a
> > >>> bit of a tedious process for someone that just wants to use the
> default
> > >>> storage engine, but not have caching enabled.
> > >>>
> > >>> There is also an orthogonal concern that Guozhang alluded to.... If
> you
> > >>> want to plug in a custom storage engine and you want it to be logged
> > etc,
> > >>> you would currently need to implement that yourself. Ideally we can
> > >> provide
> > >>> a way where we will wrap the custom store with logging, metrics,
> etc. I
> > >>> need to think about where this fits, it is probably more appropriate
> on
> > >> the
> > >>> Stores API.
> > >>>
> > >>> final KeyValueMapper<String, String, Long> keyMapper = null;
> > >>> // count with mapped key
> > >>> final KTable<Long, Long> count = stream.grouped()
> > >>>         .withKeyMapper(keyMapper)
> > >>>         .withKeySerde(Serdes.Long())
> > >>>         .withValueSerde(Serdes.String())
> > >>>         .withQueryableName("my-store")
> > >>>         .count();
> > >>>
> > >>> // windowed count
> > >>> final KTable<Windowed<String>, Long> windowedCount = stream.grouped()
> > >>>         .withQueryableName("my-window-store")
> > >>>         .windowed(TimeWindows.of(10L).until(10))
> > >>>         .count();
> > >>>
> > >>> // windowed reduce
> > >>> final Reducer<String> windowedReducer = null;
> > >>> final KTable<Windowed<String>, String> windowedReduce =
> > stream.grouped()
> > >>>         .withQueryableName("my-window-store")
> > >>>         .windowed(TimeWindows.of(10L).until(10))
> > >>>         .reduce(windowedReducer);
> > >>>
> > >>> final Aggregator<String, String, Long> aggregator = null;
> > >>> final Initializer<Long> init = null;
> > >>>
> > >>> // aggregate
> > >>> final KTable<String, Long> aggregate = stream.grouped()
> > >>>         .withQueryableName("my-aggregate-store")
> > >>>         .aggregate(aggregator, init, Serdes.Long());
> > >>>
> > >>> final StateStoreSupplier<KeyValueStore<String, Long>>
> > stateStoreSupplier
> > >> = null;
> > >>>
> > >>> // aggregate with custom store
> > >>> final KTable<String, Long> aggWithCustomStore = stream.grouped()
> > >>>         .withStateStoreSupplier(stateStoreSupplier)
> > >>>         .aggregate(aggregator, init);
> > >>>
> > >>> // disable caching
> > >>> stream.grouped()
> > >>>         .withQueryableName("name")
> > >>>         .withCachingEnabled(false)
> > >>>         .count();
> > >>>
> > >>> // disable logging
> > >>> stream.grouped()
> > >>>         .withQueryableName("q")
> > >>>         .withLoggingEnabled(false)
> > >>>         .count();
> > >>>
> > >>> // override log config
> > >>> final Reducer<String> reducer = null;
> > >>> stream.grouped()
> > >>>         .withLogConfig(Collections.singletonMap("segment.size",
> "10"))
> > >>>         .reduce(reducer);
> > >>>
> > >>>
> > >>> If anyone wants to play around with this you can find the code here:
> > >>> https://github.com/dguy/kafka/tree/dsl-experiment
> > >>>
> > >>> Note: It won't actually work as most of the methods just return null.
> > >>>
> > >>> Thanks,
> > >>> Damian
> > >>>
> > >>>
> > >>> On Thu, 22 Jun 2017 at 11:18 Ismael Juma <ism...@juma.me.uk> wrote:
> > >>>
> > >>>> Thanks Damian. I think both options have pros and cons. And both are
> > >> better
> > >>>> than overload abuse.
> > >>>>
> > >>>> The fluent API approach reads better, no mention of builder or build
> > >>>> anywhere. The main downside is that the method signatures are a
> little
> > >> less
> > >>>> clear. By reading the method signature, one doesn't necessarily
> knows
> > >> what
> > >>>> it returns. Also, one needs to figure out the special method
> > (`table()`
> > >> in
> > >>>> this case) that gives you what you actually care about (`KTable` in
> > this
> > >>>> case). Not major issues, but worth mentioning while doing the
> > >> comparison.
> > >>>>
> > >>>> The builder approach avoids the issues mentioned above, but it
> doesn't
> > >> read
> > >>>> as well.
> > >>>>
> > >>>> Ismael
> > >>>>
> > >>>> On Wed, Jun 21, 2017 at 3:37 PM, Damian Guy <damian....@gmail.com>
> > >> wrote:
> > >>>>
> > >>>>> Hi,
> > >>>>>
> > >>>>> I'd like to get a discussion going around some of the API choices
> > we've
> > >>>>> made in the DLS. In particular those that relate to stateful
> > operations
> > >>>>> (though this could expand).
> > >>>>> As it stands we lean heavily on overloaded methods in the API, i.e,
> > >> there
> > >>>>> are 9 overloads for KGroupedStream.count(..)! It is becoming noisy
> > and
> > >> i
> > >>>>> feel it is only going to get worse as we add more optional params.
> In
> > >>>>> particular we've had some requests to be able to turn caching off,
> or
> > >>>>> change log configs,  on a per operator basis (note this can be done
> > now
> > >>>> if
> > >>>>> you pass in a StateStoreSupplier, but this can be a bit
> cumbersome).
> > >>>>>
> > >>>>> So this is a bit of an open question. How can we change the DSL
> > >> overloads
> > >>>>> so that it flows, is simple to use and understand, and is easily
> > >> extended
> > >>>>> in the future?
> > >>>>>
> > >>>>> One option would be to use a fluent API approach for providing the
> > >>>> optional
> > >>>>> params, so something like this:
> > >>>>>
> > >>>>> groupedStream.count()
> > >>>>>    .withStoreName("name")
> > >>>>>    .withCachingEnabled(false)
> > >>>>>    .withLoggingEnabled(config)
> > >>>>>    .table()
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>> Another option would be to provide a Builder to the count method,
> so
> > it
> > >>>>> would look something like this:
> > >>>>> groupedStream.count(new
> > >>>>> CountBuilder("storeName").withCachingEnabled(false).build())
> > >>>>>
> > >>>>> Another option is to say: Hey we don't need this, what are you on
> > >> about!
> > >>>>>
> > >>>>> The above has focussed on state store related overloads, but the
> same
> > >>>> ideas
> > >>>>> could  be applied to joins etc, where we presently have many join
> > >> methods
> > >>>>> and many overloads.
> > >>>>>
> > >>>>> Anyway, i look forward to hearing your opinions.
> > >>>>>
> > >>>>> Thanks,
> > >>>>> Damian
> > >>>>>
> > >>>>
> > >>>
> > >>
> > >>
> > >
> >
> >
>
>
> --
> -- Guozhang
>
>

Reply via email to