> However, I don't understand your argument about putting aggregate()
> after the withXX() -- all the calls to withXX() set optional parameters
> for aggregate() and not for groupBy() -- but a groupBy().withXX()
> indicates that the withXX() belongs to the groupBy(). IMHO, this might
> be quite confusion for developers.
>
>
I see what you are saying, but the grouped stream is effectively a no-op
until you call one of the aggregate/count/reduce etc functions. So the
optional params are ones that are applicable to any of the operations you
can perform on this grouped stream. Then the final
count()/reduce()/aggregate() call has any of the params that are
required/specific to that function.


>
> -Matthias
>
> On 6/28/17 2:55 AM, Damian Guy wrote:
> >> I also think that mixing optional parameters with configs is a bad idea.
> >> Have not proposal for this atm but just wanted to mention it. Hope to
> >> find some time to come up with something.
> >>
> >>
> > Yes, i don't like the mix of config either. But the only real config here
> > is the logging config - which we don't really need as it can already be
> > done via a custom StateStoreSupplier.
> >
> >
> >> What I don't like in the current proposal is the
> >> .grouped().withKeyMapper() -- the current solution with .groupBy(...)
> >> and .groupByKey() seems better. For clarity, we could rename to
> >> .groupByNewKey(...) and .groupByCurrentKey() (even if we should find
> >> some better names).
> >>
> >>
> > it could be groupByKey(), groupBy() or something different bt
> >
> >
> >
> >> The proposed pattern "chains" grouping and aggregation too close
> >> together. I would rather separate both more than less, ie, do into the
> >> opposite direction.
> >>
> >> I am also wondering, if we could so something more "fluent". The initial
> >> proposal was like:
> >>
> >>>> groupedStream.count()
> >>>>    .withStoreName("name")
> >>>>    .withCachingEnabled(false)
> >>>>    .withLoggingEnabled(config)
> >>>>    .table()
> >>
> >> The .table() statement in the end was kinda alien.
> >>
> >
> > I agree, but then all of the withXXX methods need to be on KTable which
> is
> > worse in my opinion. You also need something that is going to "build" the
> > internal processors and add them to the topology.
> >
> >
> >> The current proposal put the count() into the end -- ie, the optional
> >> parameter for count() have to specified on the .grouped() call -- this
> >> does not seems to be the best way either.
> >>
> >>
> > I actually prefer this method as you are building a grouped stream that
> you
> > will aggregate. So table.grouped(...).withOptionalStuff().aggregate(..)
> etc
> > seems natural to me.
> >
> >
> >> I did not think this through in detail, but can't we just do the initial
> >> proposal with the .table() ?
> >>
> >> groupedStream.count().withStoreName("name").mapValues(...)
> >>
> >> Each .withXXX(...) return the current KTable and all the .withXXX() are
> >> just added to the KTable interface. Or do I miss anything why this wont'
> >> work or any obvious disadvantage?
> >>
> >>
> >>
> > See above.
> >
> >
> >>
> >> -Matthias
> >>
> >> On 6/22/17 4:06 AM, Damian Guy wrote:
> >>> Thanks everyone. My latest attempt is below. It builds on the fluent
> >>> approach, but i think it is slightly nicer.
> >>> I agree with some of what Eno said about mixing configy stuff in the
> DSL,
> >>> but i think that enabling caching and enabling logging are things that
> >>> aren't actually config. I'd probably not add withLogConfig(...) (even
> >>> though it is below) as this is actually config and we already have a
> way
> >> of
> >>> doing that, via the StateStoreSupplier. Arguably we could use the
> >>> StateStoreSupplier for disabling caching etc, but as it stands that is
> a
> >>> bit of a tedious process for someone that just wants to use the default
> >>> storage engine, but not have caching enabled.
> >>>
> >>> There is also an orthogonal concern that Guozhang alluded to.... If you
> >>> want to plug in a custom storage engine and you want it to be logged
> etc,
> >>> you would currently need to implement that yourself. Ideally we can
> >> provide
> >>> a way where we will wrap the custom store with logging, metrics, etc. I
> >>> need to think about where this fits, it is probably more appropriate on
> >> the
> >>> Stores API.
> >>>
> >>> final KeyValueMapper<String, String, Long> keyMapper = null;
> >>> // count with mapped key
> >>> final KTable<Long, Long> count = stream.grouped()
> >>>         .withKeyMapper(keyMapper)
> >>>         .withKeySerde(Serdes.Long())
> >>>         .withValueSerde(Serdes.String())
> >>>         .withQueryableName("my-store")
> >>>         .count();
> >>>
> >>> // windowed count
> >>> final KTable<Windowed<String>, Long> windowedCount = stream.grouped()
> >>>         .withQueryableName("my-window-store")
> >>>         .windowed(TimeWindows.of(10L).until(10))
> >>>         .count();
> >>>
> >>> // windowed reduce
> >>> final Reducer<String> windowedReducer = null;
> >>> final KTable<Windowed<String>, String> windowedReduce =
> stream.grouped()
> >>>         .withQueryableName("my-window-store")
> >>>         .windowed(TimeWindows.of(10L).until(10))
> >>>         .reduce(windowedReducer);
> >>>
> >>> final Aggregator<String, String, Long> aggregator = null;
> >>> final Initializer<Long> init = null;
> >>>
> >>> // aggregate
> >>> final KTable<String, Long> aggregate = stream.grouped()
> >>>         .withQueryableName("my-aggregate-store")
> >>>         .aggregate(aggregator, init, Serdes.Long());
> >>>
> >>> final StateStoreSupplier<KeyValueStore<String, Long>>
> stateStoreSupplier
> >> = null;
> >>>
> >>> // aggregate with custom store
> >>> final KTable<String, Long> aggWithCustomStore = stream.grouped()
> >>>         .withStateStoreSupplier(stateStoreSupplier)
> >>>         .aggregate(aggregator, init);
> >>>
> >>> // disable caching
> >>> stream.grouped()
> >>>         .withQueryableName("name")
> >>>         .withCachingEnabled(false)
> >>>         .count();
> >>>
> >>> // disable logging
> >>> stream.grouped()
> >>>         .withQueryableName("q")
> >>>         .withLoggingEnabled(false)
> >>>         .count();
> >>>
> >>> // override log config
> >>> final Reducer<String> reducer = null;
> >>> stream.grouped()
> >>>         .withLogConfig(Collections.singletonMap("segment.size", "10"))
> >>>         .reduce(reducer);
> >>>
> >>>
> >>> If anyone wants to play around with this you can find the code here:
> >>> https://github.com/dguy/kafka/tree/dsl-experiment
> >>>
> >>> Note: It won't actually work as most of the methods just return null.
> >>>
> >>> Thanks,
> >>> Damian
> >>>
> >>>
> >>> On Thu, 22 Jun 2017 at 11:18 Ismael Juma <ism...@juma.me.uk> wrote:
> >>>
> >>>> Thanks Damian. I think both options have pros and cons. And both are
> >> better
> >>>> than overload abuse.
> >>>>
> >>>> The fluent API approach reads better, no mention of builder or build
> >>>> anywhere. The main downside is that the method signatures are a little
> >> less
> >>>> clear. By reading the method signature, one doesn't necessarily knows
> >> what
> >>>> it returns. Also, one needs to figure out the special method
> (`table()`
> >> in
> >>>> this case) that gives you what you actually care about (`KTable` in
> this
> >>>> case). Not major issues, but worth mentioning while doing the
> >> comparison.
> >>>>
> >>>> The builder approach avoids the issues mentioned above, but it doesn't
> >> read
> >>>> as well.
> >>>>
> >>>> Ismael
> >>>>
> >>>> On Wed, Jun 21, 2017 at 3:37 PM, Damian Guy <damian....@gmail.com>
> >> wrote:
> >>>>
> >>>>> Hi,
> >>>>>
> >>>>> I'd like to get a discussion going around some of the API choices
> we've
> >>>>> made in the DLS. In particular those that relate to stateful
> operations
> >>>>> (though this could expand).
> >>>>> As it stands we lean heavily on overloaded methods in the API, i.e,
> >> there
> >>>>> are 9 overloads for KGroupedStream.count(..)! It is becoming noisy
> and
> >> i
> >>>>> feel it is only going to get worse as we add more optional params. In
> >>>>> particular we've had some requests to be able to turn caching off, or
> >>>>> change log configs,  on a per operator basis (note this can be done
> now
> >>>> if
> >>>>> you pass in a StateStoreSupplier, but this can be a bit cumbersome).
> >>>>>
> >>>>> So this is a bit of an open question. How can we change the DSL
> >> overloads
> >>>>> so that it flows, is simple to use and understand, and is easily
> >> extended
> >>>>> in the future?
> >>>>>
> >>>>> One option would be to use a fluent API approach for providing the
> >>>> optional
> >>>>> params, so something like this:
> >>>>>
> >>>>> groupedStream.count()
> >>>>>    .withStoreName("name")
> >>>>>    .withCachingEnabled(false)
> >>>>>    .withLoggingEnabled(config)
> >>>>>    .table()
> >>>>>
> >>>>>
> >>>>>
> >>>>> Another option would be to provide a Builder to the count method, so
> it
> >>>>> would look something like this:
> >>>>> groupedStream.count(new
> >>>>> CountBuilder("storeName").withCachingEnabled(false).build())
> >>>>>
> >>>>> Another option is to say: Hey we don't need this, what are you on
> >> about!
> >>>>>
> >>>>> The above has focussed on state store related overloads, but the same
> >>>> ideas
> >>>>> could  be applied to joins etc, where we presently have many join
> >> methods
> >>>>> and many overloads.
> >>>>>
> >>>>> Anyway, i look forward to hearing your opinions.
> >>>>>
> >>>>> Thanks,
> >>>>> Damian
> >>>>>
> >>>>
> >>>
> >>
> >>
> >
>
>

Reply via email to