Thanks Kyle.

On Thu, 29 Jun 2017 at 15:11 Kyle Winkelman <winkelman.k...@gmail.com>
wrote:

> Hi Damian,
>
> >>>> When trying to program in the fluent API that has been discussed most
> it
> >>>> feels difficult to know when you will actually get an object you can
> reuse.
> >>>> What if I make one KGroupedStream that I want to reuse, is it legal to
> >>>> reuse it or does this approach expect you to call grouped each time?
>
> >> I'd anticipate that once you have a KGroupedStream you can re-use it as
> you
> >> can today.
>
> You said it yourself in another post that the grouped stream is
> effectively a no-op until a count, reduce, or aggregate. The way I see it
> you wouldn’t be able to reuse anything except KStreams and KTables, because
> most of this fluent api would continue returning this (this being the
> builder object currently being manipulated).

So, if you ever store a reference to anything but KStreams and KTables and
> you use it in two different ways then its possible you make conflicting
> withXXX() calls on the same builder.
>
>
No necessarily true. It could return a new instance of the builder, i.e.,
the builders being immutable. So if you held a reference to the builder it
would always be the same as it was when it was created.


> GroupedStream<K,V> groupedStreamWithDefaultSerdes = kStream.grouped();
> GroupedStream<K,V> groupedStreamWithDeclaredSerdes =
> groupedStreamsWithDefaultSerdes.withKeySerde(…).withValueSerde(…);
>
> I’ll admit that this shouldn’t happen but some user is going to do it
> eventually…
> Depending on implementation uses of groupedStreamWithDefaultSerdes would
> most likely be equivalent to the version withDeclaredSerdes. One work
> around would be to always make copies of the config objects you are
> building, but this approach has its own problem because now we have to
> identify which configs are equivalent so we don’t create repeated
> processors.
>
> The point of this long winded example is that we always have to be
> thinking about all of the possible ways it could be misused by a user
> (causing them to see hard to diagnose problems).
>

Exactly! That is the point of the discussion really.


>
> In my attempt at a couple methods with builders I feel that I could
> confidently say the user couldn’t really mess it up.
> > // Count
> > KTable<String, Long> count =
> > kGroupedStream.count(Count.count().withQueryableStoreName("my-store"));
> The kGroupedStream is reusable and if they attempted to reuse the Count
> for some reason it would throw an error message saying that a store named
> “my-store” already exists.
>
>
Yes i agree and i think using builders is my preferred pattern.

Cheers,
Damian


> Thanks,
> Kyle
>
> From: Damian Guy
> Sent: Thursday, June 29, 2017 3:59 AM
> To: dev@kafka.apache.org
> Subject: Re: [DISCUSS] Streams DSL/StateStore Refactoring
>
> Hi Kyle,
>
> Thanks for your input. Really appreciated.
>
> On Thu, 29 Jun 2017 at 06:09 Kyle Winkelman <winkelman.k...@gmail.com>
> wrote:
>
> > I like more of a builder pattern even though others have voiced against
> > it. The reason I like it is because it makes it clear to the user that a
> > call to KGroupedStream#count will return a KTable not some intermediate
> > class that I need to undetstand.
> >
>
> Yes, that makes sense.
>
>
> > When trying to program in the fluent API that has been discussed most it
> > feels difficult to know when you will actually get an object you can
> reuse.
> > What if I make one KGroupedStream that I want to reuse, is it legal to
> > reuse it or does this approach expect you to call grouped each time?
>
>
> I'd anticipate that once you have a KGroupedStream you can re-use it as you
> can today.
>
>
> > This question doesn’t pop into my head at all in the builder pattern I
> > assume I can reuse everything.
> > Finally, I like .groupByKey and .groupBy(KeyValueMapper) not a big fan of
> > the grouped.
> >
> > Yes, grouped() was more for demonstration and because groupBy() and
> groupByKey() were taken! So i'd imagine the api would actually want to be
> groupByKey(/** no required args***/).withOptionalArg() and
> groupBy(KeyValueMapper m).withOpitionalArg(...)  of course this all depends
> on maintaining backward compatibility.
>
>
> > Unfortunately, the below approach would require atleast 2 (probably 3)
> > overloads (one for returning a KTable and one for returning a KTable with
> > Windowed Key, probably would want to split windowed and sessionwindowed
> for
> > ease of implementation) of each count, reduce, and aggregate.
> > Obviously not exhaustive but enough for you to get the picture. Count,
> > Reduce, and Aggregate supply 3 static methods to initialize the builder:
> > // Count
> > KTable<String, Long> count =
> > groupedStream.count(Count.count().withQueryableStoreName("my-store"));
> >
> > // Windowed Count
> > KTable<Windowed<String>, Long> windowedCount =
> >
> groupedStream.count(Count.windowed(TimeWindows.of(10L).until(10)).withQueryableStoreName("my-windowed-store"));
> >
> > // Session Count
> > KTable<Windowed<String>, Long> sessionCount =
> >
> groupedStream.count(Count.sessionWindowed(SessionWindows.with(10L)).withQueryableStoreName("my-session-windowed-store"));
> >
> >
> Above and below, i think i'd prefer it to be:
> groupedStream.count(/** non windowed count**/)
> groupedStream.windowed(TimeWindows.of(10L)).count(...)
> groupedStream.sessionWindowed(SessionWindows.of(10L)).count(...)
>
>
>
>
> > // Reduce
> > Reducer<Long> reducer;
> > KTable<String, Long> reduce = groupedStream.reduce(reducer,
> > Reduce.reduce().withQueryableStoreName("my-store"));
> >
> > // Aggregate Windowed with Custom Store
> > Initializer<String> initializer;
> > Aggregator<String, Long, String> aggregator;
> > KTable<Windowed<String>, String> aggregate =
> > groupedStream.aggregate(initializer, aggregator,
> >
> Aggregate.windowed(TimeWindows.of(10L).until(10)).withStateStoreSupplier(stateStoreSupplier)));
> >
> > // Cogroup SessionWindowed
> > KTable<String, String> cogrouped = groupedStream1.cogroup(aggregator1)
> >         .cogroup(groupedStream2, aggregator2)
> >         .aggregate(initializer, aggregator,
> > Aggregate.sessionWindowed(SessionWindows.with(10L),
> > sessionMerger).withQueryableStoreName("my-store"));
> >
> >
> >
> > public class Count {
> >
> >     public static class Windowed extends Count {
> >         private Windows windows;
> >     }
> >     public static class SessionWindowed extends Count {
> >         private SessionWindows sessionWindows;
> >     }
> >
> >     public static Count count();
> >     public static Windowed windowed(Windows windows);
> >     public static SessionWindowed sessionWindowed(SessionWindows
> > sessionWindows);
> >
> >     // All withXXX(...) methods.
> > }
> >
> > public class KGroupedStream {
> >     public KTable<K, Long> count(Count count);
> >     public KTable<Windowed<K>, Long> count(Count.Windowed count);
> >     public KTable<Windowed<K>, Long> count(Count.SessionWindowed count);
> > …
> > }
> >
> >
> > Thanks,
> > Kyle
> >
> > From: Guozhang Wang
> > Sent: Wednesday, June 28, 2017 7:45 PM
> > To: dev@kafka.apache.org
> > Subject: Re: [DISCUSS] Streams DSL/StateStore Refactoring
> >
> > I played the current proposal a bit with https://github.com/dguy/kafka/
> > tree/dsl-experiment <https://github.com/dguy/kafka/tree/dsl-experiment>,
> > and here are my observations:
> >
> > 1. Personally I prefer
> >
> >     "stream.group(mapper) / stream.groupByKey()"
> >
> > than
> >
> >     "stream.group().withKeyMapper(mapper) / stream.group()"
> >
> > Since 1) withKeyMapper is not enforced programmatically though it is not
> > "really" optional like others, 2) syntax-wise it reads more natural.
> >
> > I think it is okay to add the APIs in (
> >
> >
> https://github.com/dguy/kafka/blob/dsl-experiment/streams/src/main/java/org/apache/kafka/streams/kstream/GroupedStream.java
> > )
> > in KGroupedStream.
> >
> >
> > 2. For the "withStateStoreSupplier" API, are the user supposed to pass in
> > the most-inner state store supplier (e.g. then one whose get() return
> > RocksDBStore), or it is supposed to return the most-outer supplier with
> > logging / metrics / etc? I think it would be more useful to only require
> > users pass in the inner state store supplier while specifying caching /
> > logging through other APIs.
> >
> > In addition, the "GroupedWithCustomStore" is a bit suspicious to me: we
> are
> > allowing users to call other APIs like "withQueryableName" multiple time,
> > but only call "withStateStoreSupplier" only once in the end. Why is that?
> >
> >
> > 3. The current DSL seems to be only for aggregations, what about joins?
> >
> >
> > 4. I think it is okay to keep the "withLogConfig": for the
> > StateStoreSupplier it will still be user code specifying the topology so
> I
> > do not see there is a big difference.
> >
> >
> > 5. "WindowedGroupedStream" 's withStateStoreSupplier should take the
> > windowed state store supplier to enforce typing?
> >
> >
> > Below are minor ones:
> >
> > 6. "withQueryableName": maybe better "withQueryableStateName"?
> >
> > 7. "withLogConfig": maybe better "withLoggingTopicConfig()"?
> >
> >
> >
> > Guozhang
> >
> >
> >
> > On Wed, Jun 28, 2017 at 3:59 PM, Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> > > I see your point about "when to add the processor to the topology".
> That
> > > is indeed an issue. Not sure it we could allow "updates" to the
> > topology...
> > >
> > > I don't see any problem with having all the withXX() in KTable
> interface
> > > -- but this might be subjective.
> > >
> > >
> > > However, I don't understand your argument about putting aggregate()
> > > after the withXX() -- all the calls to withXX() set optional parameters
> > > for aggregate() and not for groupBy() -- but a groupBy().withXX()
> > > indicates that the withXX() belongs to the groupBy(). IMHO, this might
> > > be quite confusion for developers.
> > >
> > >
> > > -Matthias
> > >
> > > On 6/28/17 2:55 AM, Damian Guy wrote:
> > > >> I also think that mixing optional parameters with configs is a bad
> > idea.
> > > >> Have not proposal for this atm but just wanted to mention it. Hope
> to
> > > >> find some time to come up with something.
> > > >>
> > > >>
> > > > Yes, i don't like the mix of config either. But the only real config
> > here
> > > > is the logging config - which we don't really need as it can already
> be
> > > > done via a custom StateStoreSupplier.
> > > >
> > > >
> > > >> What I don't like in the current proposal is the
> > > >> .grouped().withKeyMapper() -- the current solution with
> .groupBy(...)
> > > >> and .groupByKey() seems better. For clarity, we could rename to
> > > >> .groupByNewKey(...) and .groupByCurrentKey() (even if we should find
> > > >> some better names).
> > > >>
> > > >>
> > > > it could be groupByKey(), groupBy() or something different bt
> > > >
> > > >
> > > >
> > > >> The proposed pattern "chains" grouping and aggregation too close
> > > >> together. I would rather separate both more than less, ie, do into
> the
> > > >> opposite direction.
> > > >>
> > > >> I am also wondering, if we could so something more "fluent". The
> > initial
> > > >> proposal was like:
> > > >>
> > > >>>> groupedStream.count()
> > > >>>>    .withStoreName("name")
> > > >>>>    .withCachingEnabled(false)
> > > >>>>    .withLoggingEnabled(config)
> > > >>>>    .table()
> > > >>
> > > >> The .table() statement in the end was kinda alien.
> > > >>
> > > >
> > > > I agree, but then all of the withXXX methods need to be on KTable
> which
> > > is
> > > > worse in my opinion. You also need something that is going to "build"
> > the
> > > > internal processors and add them to the topology.
> > > >
> > > >
> > > >> The current proposal put the count() into the end -- ie, the
> optional
> > > >> parameter for count() have to specified on the .grouped() call --
> this
> > > >> does not seems to be the best way either.
> > > >>
> > > >>
> > > > I actually prefer this method as you are building a grouped stream
> that
> > > you
> > > > will aggregate. So
> table.grouped(...).withOptionalStuff().aggregate(..)
> > > etc
> > > > seems natural to me.
> > > >
> > > >
> > > >> I did not think this through in detail, but can't we just do the
> > initial
> > > >> proposal with the .table() ?
> > > >>
> > > >> groupedStream.count().withStoreName("name").mapValues(...)
> > > >>
> > > >> Each .withXXX(...) return the current KTable and all the .withXXX()
> > are
> > > >> just added to the KTable interface. Or do I miss anything why this
> > wont'
> > > >> work or any obvious disadvantage?
> > > >>
> > > >>
> > > >>
> > > > See above.
> > > >
> > > >
> > > >>
> > > >> -Matthias
> > > >>
> > > >> On 6/22/17 4:06 AM, Damian Guy wrote:
> > > >>> Thanks everyone. My latest attempt is below. It builds on the
> fluent
> > > >>> approach, but i think it is slightly nicer.
> > > >>> I agree with some of what Eno said about mixing configy stuff in
> the
> > > DSL,
> > > >>> but i think that enabling caching and enabling logging are things
> > that
> > > >>> aren't actually config. I'd probably not add withLogConfig(...)
> (even
> > > >>> though it is below) as this is actually config and we already have
> a
> > > way
> > > >> of
> > > >>> doing that, via the StateStoreSupplier. Arguably we could use the
> > > >>> StateStoreSupplier for disabling caching etc, but as it stands that
> > is
> > > a
> > > >>> bit of a tedious process for someone that just wants to use the
> > default
> > > >>> storage engine, but not have caching enabled.
> > > >>>
> > > >>> There is also an orthogonal concern that Guozhang alluded to.... If
> > you
> > > >>> want to plug in a custom storage engine and you want it to be
> logged
> > > etc,
> > > >>> you would currently need to implement that yourself. Ideally we can
> > > >> provide
> > > >>> a way where we will wrap the custom store with logging, metrics,
> > etc. I
> > > >>> need to think about where this fits, it is probably more
> appropriate
> > on
> > > >> the
> > > >>> Stores API.
> > > >>>
> > > >>> final KeyValueMapper<String, String, Long> keyMapper = null;
> > > >>> // count with mapped key
> > > >>> final KTable<Long, Long> count = stream.grouped()
> > > >>>         .withKeyMapper(keyMapper)
> > > >>>         .withKeySerde(Serdes.Long())
> > > >>>         .withValueSerde(Serdes.String())
> > > >>>         .withQueryableName("my-store")
> > > >>>         .count();
> > > >>>
> > > >>> // windowed count
> > > >>> final KTable<Windowed<String>, Long> windowedCount =
> stream.grouped()
> > > >>>         .withQueryableName("my-window-store")
> > > >>>         .windowed(TimeWindows.of(10L).until(10))
> > > >>>         .count();
> > > >>>
> > > >>> // windowed reduce
> > > >>> final Reducer<String> windowedReducer = null;
> > > >>> final KTable<Windowed<String>, String> windowedReduce =
> > > stream.grouped()
> > > >>>         .withQueryableName("my-window-store")
> > > >>>         .windowed(TimeWindows.of(10L).until(10))
> > > >>>         .reduce(windowedReducer);
> > > >>>
> > > >>> final Aggregator<String, String, Long> aggregator = null;
> > > >>> final Initializer<Long> init = null;
> > > >>>
> > > >>> // aggregate
> > > >>> final KTable<String, Long> aggregate = stream.grouped()
> > > >>>         .withQueryableName("my-aggregate-store")
> > > >>>         .aggregate(aggregator, init, Serdes.Long());
> > > >>>
> > > >>> final StateStoreSupplier<KeyValueStore<String, Long>>
> > > stateStoreSupplier
> > > >> = null;
> > > >>>
> > > >>> // aggregate with custom store
> > > >>> final KTable<String, Long> aggWithCustomStore = stream.grouped()
> > > >>>         .withStateStoreSupplier(stateStoreSupplier)
> > > >>>         .aggregate(aggregator, init);
> > > >>>
> > > >>> // disable caching
> > > >>> stream.grouped()
> > > >>>         .withQueryableName("name")
> > > >>>         .withCachingEnabled(false)
> > > >>>         .count();
> > > >>>
> > > >>> // disable logging
> > > >>> stream.grouped()
> > > >>>         .withQueryableName("q")
> > > >>>         .withLoggingEnabled(false)
> > > >>>         .count();
> > > >>>
> > > >>> // override log config
> > > >>> final Reducer<String> reducer = null;
> > > >>> stream.grouped()
> > > >>>         .withLogConfig(Collections.singletonMap("segment.size",
> > "10"))
> > > >>>         .reduce(reducer);
> > > >>>
> > > >>>
> > > >>> If anyone wants to play around with this you can find the code
> here:
> > > >>> https://github.com/dguy/kafka/tree/dsl-experiment
> > > >>>
> > > >>> Note: It won't actually work as most of the methods just return
> null.
> > > >>>
> > > >>> Thanks,
> > > >>> Damian
> > > >>>
> > > >>>
> > > >>> On Thu, 22 Jun 2017 at 11:18 Ismael Juma <ism...@juma.me.uk>
> wrote:
> > > >>>
> > > >>>> Thanks Damian. I think both options have pros and cons. And both
> are
> > > >> better
> > > >>>> than overload abuse.
> > > >>>>
> > > >>>> The fluent API approach reads better, no mention of builder or
> build
> > > >>>> anywhere. The main downside is that the method signatures are a
> > little
> > > >> less
> > > >>>> clear. By reading the method signature, one doesn't necessarily
> > knows
> > > >> what
> > > >>>> it returns. Also, one needs to figure out the special method
> > > (`table()`
> > > >> in
> > > >>>> this case) that gives you what you actually care about (`KTable`
> in
> > > this
> > > >>>> case). Not major issues, but worth mentioning while doing the
> > > >> comparison.
> > > >>>>
> > > >>>> The builder approach avoids the issues mentioned above, but it
> > doesn't
> > > >> read
> > > >>>> as well.
> > > >>>>
> > > >>>> Ismael
> > > >>>>
> > > >>>> On Wed, Jun 21, 2017 at 3:37 PM, Damian Guy <damian....@gmail.com
> >
> > > >> wrote:
> > > >>>>
> > > >>>>> Hi,
> > > >>>>>
> > > >>>>> I'd like to get a discussion going around some of the API choices
> > > we've
> > > >>>>> made in the DLS. In particular those that relate to stateful
> > > operations
> > > >>>>> (though this could expand).
> > > >>>>> As it stands we lean heavily on overloaded methods in the API,
> i.e,
> > > >> there
> > > >>>>> are 9 overloads for KGroupedStream.count(..)! It is becoming
> noisy
> > > and
> > > >> i
> > > >>>>> feel it is only going to get worse as we add more optional
> params.
> > In
> > > >>>>> particular we've had some requests to be able to turn caching
> off,
> > or
> > > >>>>> change log configs,  on a per operator basis (note this can be
> done
> > > now
> > > >>>> if
> > > >>>>> you pass in a StateStoreSupplier, but this can be a bit
> > cumbersome).
> > > >>>>>
> > > >>>>> So this is a bit of an open question. How can we change the DSL
> > > >> overloads
> > > >>>>> so that it flows, is simple to use and understand, and is easily
> > > >> extended
> > > >>>>> in the future?
> > > >>>>>
> > > >>>>> One option would be to use a fluent API approach for providing
> the
> > > >>>> optional
> > > >>>>> params, so something like this:
> > > >>>>>
> > > >>>>> groupedStream.count()
> > > >>>>>    .withStoreName("name")
> > > >>>>>    .withCachingEnabled(false)
> > > >>>>>    .withLoggingEnabled(config)
> > > >>>>>    .table()
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>> Another option would be to provide a Builder to the count method,
> > so
> > > it
> > > >>>>> would look something like this:
> > > >>>>> groupedStream.count(new
> > > >>>>> CountBuilder("storeName").withCachingEnabled(false).build())
> > > >>>>>
> > > >>>>> Another option is to say: Hey we don't need this, what are you on
> > > >> about!
> > > >>>>>
> > > >>>>> The above has focussed on state store related overloads, but the
> > same
> > > >>>> ideas
> > > >>>>> could  be applied to joins etc, where we presently have many join
> > > >> methods
> > > >>>>> and many overloads.
> > > >>>>>
> > > >>>>> Anyway, i look forward to hearing your opinions.
> > > >>>>>
> > > >>>>> Thanks,
> > > >>>>> Damian
> > > >>>>>
> > > >>>>
> > > >>>
> > > >>
> > > >>
> > > >
> > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
> >
>
>

Reply via email to