We now have > public synchronized <K, V> KStream<K, V> stream(final Collection<String> > topic, final Consumed<K, V> options)
This would prevent so write code like builder.stream("topic", Consumers.with(...)); I think, we need methods StreamsBuilder#stream(String topic); StreamsBuilder#stream(String topic, Consumed options); Or do I miss anything? -Matthias On 8/24/17 1:53 AM, Damian Guy wrote: > I've updated the kip to reflect Bill's comment and also to make > StreamBuilder methods have topic as the first param, i.e., > StreamBuilder#stream no longer accepts varargs. > > On Thu, 24 Aug 2017 at 09:12 Damian Guy <damian....@gmail.com> wrote: > >> On Thu, 24 Aug 2017 at 02:49 Guozhang Wang <wangg...@gmail.com> wrote: >> >>> I have a couple of comments but otherwise it LGTM: >>> >>> 1. For these two functions in StreamsBuilder, the topic String is set as >>> the second parameter in between of two options. Would that be better to be >>> set as the first or the last one instead? >>> >>> It would be better as the first, but then it is different to the >> #streams() methods due to varargs. >> >> >>> public synchronized <K, V> KTable<K, V> table(final Consumed<K, V> >>> consumed, final String topic, final Materialized<K, V> materialized) >>> >>> public synchronized <K, V> GlobalKTable<K, V> globalTable(final >>> Consumed<K, >>> V> consumed, final String topic, final Materialized<K, V> materialized) >>> >>> I understand that we cannot do it for the first parameter because of the >>> vararg type. So I'd suggest either >>> >>> a) set it as the last parameter, but then it is inconsistent with other >>> functions like these: >>> >>> void to(final String topic, final Produced<V, V> options); >>> >>> KTable<K, V> through(final String topic, final Materialized<K, V> >>> options); >>> >>> b) only allow one single topic name parameter in StreamsBuilder.stream() >>> since in practice we do not see too many usages of multiple topics, plus >>> it >>> can be semi-supported with "merge" as we move it from StreamsBuilder to >>> KStream (KAFKA-5765), >>> >>> Perhaps this is the better approach >> >> >>> 2. KGroupedStream's function: >>> >>> <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, >>> final Aggregator<? super K, ? super V, VR> >>> aggregator, >>> final Serde<VR> aggValueSerde, >>> final Materialized<K, VR, KeyValueStore<K, >>> VR>> materialized); >>> >>> The "aggValueSerde" seems not needed? >>> >>> 3. +1 on `KGroupedStream` v.s. `GroupedKStream`. I think KGroupedStream >>> was >>> a bad name as a hind-sight. I personally feel we should just correct it >>> with a new class and deprecate / remove the old one before 1.0.0, but that >>> could be in its own KIP. >>> >>> >> The problem with this is that we'd need to add new `groupBy` and >> `groupByKey` methods that return `GroupedKStream`, we can't change the >> existing ones as that would break compatibility. So what would we name >> these methods? >> >> >>> >>> Guozhang >>> >>> >>> >>> On Wed, Aug 23, 2017 at 1:01 PM, Damian Guy <damian....@gmail.com> wrote: >>> >>>> We already have GlobalKTable and i can't rename KGroupedStream, which >>>> really should be GroupedKStream. So I think we should name new things >>>> correctly, i.e., WindowedKStream etc and fix the others when we can. >>>> >>>> On Wed, 23 Aug 2017 at 20:38 Matthias J. Sax <matth...@confluent.io> >>>> wrote: >>>> >>>>> About KGroupedStream vs GroupedKStream: shouldn't we keep the naming >>>>> convention consistent? And if we change the naming schema just change >>>>> all at once? I personally don't care which naming scheme is better, >>> but >>>>> I think consistency is super important! >>>>> >>>>> About Bill's comment: I agree, and had a similar thought. >>>>> >>>>> >>>>> -Matthias >>>>> >>>>> On 8/23/17 12:24 PM, Bill Bejeck wrote: >>>>>> Thanks for all the work on this KIP Damian. >>>>>> >>>>>> Both `Produced` and `Joined` have a `with` method accepting all >>>>> parameters, >>>>>> but `Consumed` doesn't. Should we add one for consistency? >>>>>> >>>>>> Thanks, >>>>>> Bill >>>>>> >>>>>> On Wed, Aug 23, 2017 at 4:15 AM, Damian Guy <damian....@gmail.com> >>>>> wrote: >>>>>> >>>>>>> KIP has been updated. thanks >>>>>>> >>>>>>> On Wed, 23 Aug 2017 at 09:10 Damian Guy <damian....@gmail.com> >>> wrote: >>>>>>> >>>>>>>> Hi Matthias, >>>>>>>> >>>>>>>> >>>>>>>>> KStream: >>>>>>>>> leftJoin and outerJoin for KStream/KTable join should not have >>>>>>>>> `JoinWindows` parameter >>>>>>>>> >>>>>>>>> Thanks! >>>>>>>> >>>>>>>> >>>>>>>>> >>>>>>>>> Nit: TopologyBuilder -> Topology >>>>>>>>> >>>>>>>>> Ack >>>>>>>> >>>>>>>> >>>>>>>>> Nit: new class Serialized list static method #with twice >>>>>>>>> >>>>>>>>> Ack >>>>>>>> >>>>>>>> >>>>>>>>> WindowedKStream -> for consistency we should either have >>>>> GroupedKStream >>>>>>>>> or KWindowedStream... (similar argument for >>> SessionWindowedKStream) >>>>>>>>> >>>>>>>>> We can't rename KGroupedStream -> GroupedKStream without breaking >>>>>>>> compatibility. So we are stuck with it for now. Hopefully in the >>>> future >>>>>>> we >>>>>>>> can rename KGroupedStream to GroupedKStream. >>>>>>>> >>>>>>>> >>>>>>>>> >>>>>>>>> KGroupedStream >>>>>>>>> -> why do we use a different name for `sessionWindowedBy()` -- >>> seems >>>>> to >>>>>>>>> be cleaner to call both methods `windowedBy()` >>>>>>>>> >>>>>>>>> >>>>>>>> I beg to differ that it is cleaner either way! >>>>>>>> >>>>>>>> >>>>>>>>> >>>>>>>>> StreamsBuilder#stream -> parameter order is confusing... We >>> should >>>>> have >>>>>>>>> Pattern as second parameter to align both methods. >>>>>>>>> >>>>>>>>> Ack >>>>>>>> >>>>>>>> >>>>>>>>> StreamsBuilder#table/globalTable -> move parameter `Consumed` as >>>> first >>>>>>>>> parameter to align with `#stream` >>>>>>>>> >>>>>>>>> >>>>>>>>> Ack >>>>>>>> >>>>>>>>> Produced#with(Serde, Serde) >>>>>>>>> Produced#with(StreamPartitioner, Serde, Serde) >>>>>>>>> -> should StreamPartitioner be the third argument instead of the >>>>> first? >>>>>>>>> >>>>>>>>> Sure >>>>>>>> >>>>>>>>> >>>>>>>>> Consumed: >>>>>>>>> Why do we need 3 different names for the 3 static methods? I >>> would >>>> all >>>>>>>>> of them just call `with()`. Current names sound clumsy to me. >>> And a >>>>>>>>> plain `with()` also aligns with the naming of static methods of >>>> other >>>>>>>>> classes. >>>>>>>>> >>>>>>>> >>>>>>>> I disagree that the names sound clumsy! But yes they should be >>>> aligned >>>>>>>> with the others. >>>>>>>> >>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> I guess we are also deprecation a bunch of method for >>>>>>>>> KStream/KTable/KGroupedStream/KGroupedTable and should mention >>>> which >>>>>>>>> one? There is just one sentence "Deprecate the existing >>> overloads.", >>>>> but >>>>>>>>> we don't deprecate all existing once. I personally don't care to >>>> much >>>>> if >>>>>>>>> we spell deprecated method out explicitly, but right now it's not >>>>>>>>> consistent as we only list methods we add. >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> Should we deprecate `StateStoreSupplier`? >>>>>>>>> >>>>>>>> Yep >>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -Matthias >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On 8/22/17 6:55 AM, Damian Guy wrote: >>>>>>>>>> I've just updated the KIP with some additional changes targeted >>> at >>>>>>>>>> StreamsBuilder >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> Damian >>>>>>>>>> >>>>>>>>>> On Thu, 10 Aug 2017 at 12:59 Damian Guy <damian....@gmail.com> >>>>> wrote: >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> Got it, thanks. >>>>>>>>>>>> >>>>>>>>>>>> Does it still make sense to have one static constructors for >>> each >>>>>>>>> spec, >>>>>>>>>>>> with one constructor having only one parameter to make it more >>>>>>> usable, >>>>>>>>>>>> i.e. >>>>>>>>>>>> as a user I do not need to give all parameters if I only want >>> to >>>>>>>>> override >>>>>>>>>>>> one of them? Maybe we can just name the constructors as `with` >>>> but >>>>>>>>> I'm not >>>>>>>>>>>> sure if Java distinguish: >>>>>>>>>>>> >>>>>>>>>>>> public static <K, V> Produced<K, V> with(final Serde<K> >>> keySerde) >>>>>>>>>>>> public static <K, V> Produced<K, V> with(final Serde<V> >>>> valueSerde) >>>>>>>>>>>> >>>>>>>>>>>> as two function signatures. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> No that won't work. That is why we have all options, i.e., on >>>>> Produce >>>>>>>>>>> public static <K, V> Produced<K, V> with(final Serde<K> >>> keySerde, >>>>>>>>> final Serde<V> >>>>>>>>>>> valueSerde) >>>>>>>>>>> public static <K, V> Produced<K, V> with(final >>>> StreamPartitioner<K, >>>>>>> V> >>>>>>>>>>> partitioner, final Serde<K> keySerde, final Serde<V> >>> valueSerde) >>>>>>>>>>> public static <K, V> Produced<K, V> keySerde(final Serde<K> >>>>> keySerde) >>>>>>>>>>> public static <K, V> Produced<K, V> valueSerde(final Serde<V> >>>>>>>>> valueSerde) >>>>>>>>>>> public static <K, V> Produced<K, V> streamPartitioner(final >>>>>>>>> StreamPartitioner<K, >>>>>>>>>>> V> partitioner) >>>>>>>>>>> >>>>>>>>>>> So if you only want to use one you can just use the function >>> that >>>>>>> takes >>>>>>>>>>> one argument. >>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Guozhang >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Wed, Aug 9, 2017 at 6:20 AM, Damian Guy < >>> damian....@gmail.com >>>>> >>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> On Tue, 8 Aug 2017 at 20:11 Guozhang Wang < >>> wangg...@gmail.com> >>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Damian, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks for the proposal, I had a few comments on the APIs: >>>>>>>>>>>>>> >>>>>>>>>>>>>> 1. Printed#withFile seems not needed, as users should always >>>> spec >>>>>>> if >>>>>>>>>>>> it >>>>>>>>>>>>> is >>>>>>>>>>>>>> to sysOut or to File at the beginning. In addition as a >>> second >>>>>>>>>>>> thought, I >>>>>>>>>>>>>> think serdes are not useful for prints anyways since we >>> assume >>>>>>>>>>>> `toString` >>>>>>>>>>>>>> is provided except for byte arrays, in which we will special >>>>>>> handle >>>>>>>>>>>> it. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> +1 >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>>> Another comment about Printed in general is it differs with >>>> other >>>>>>>>>>>> options >>>>>>>>>>>>>> that it is a required option than optional one, since it >>>> includes >>>>>>>>>>>>> toSysOut >>>>>>>>>>>>>> / toFile specs; what are the pros and cons for including >>> these >>>>> two >>>>>>>>> in >>>>>>>>>>>> the >>>>>>>>>>>>>> option and hence make it a required option than leaving >>> them at >>>>>>> the >>>>>>>>>>>> API >>>>>>>>>>>>>> layer and make Printed as optional for mapper / label only? >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> It isn't required as we will still have the no-arg print() >>> which >>>>>>> will >>>>>>>>>>>> just >>>>>>>>>>>>> go to sysout as it does now. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> 2.1 KStream#through / to >>>>>>>>>>>>>> >>>>>>>>>>>>>> We should have an overloaded function without Produced? >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Yes - we already have those so they are not part of the KIP, >>>> i.e, >>>>>>>>>>>>> through(topic) >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> 2.2 KStream#groupBy / groupByKey >>>>>>>>>>>>>> >>>>>>>>>>>>>> We should have an overloaded function without Serialized? >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Yes, as above >>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> 2.3 KGroupedStream#count / reduce / aggregate >>>>>>>>>>>>>> >>>>>>>>>>>>>> We should have an overloaded function without Materialized? >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> As above >>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> 2.4 KStream#join >>>>>>>>>>>>>> >>>>>>>>>>>>>> We should have an overloaded function without Joined? >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> as above >>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> 2.5 Each of KTable's operators: >>>>>>>>>>>>>> >>>>>>>>>>>>>> We should have an overloaded function without Produced / >>>>>>> Serialized >>>>>>>>> / >>>>>>>>>>>>>> Materialized? >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> as above >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> 3.1 Produced: the static functions have overlaps, which >>> seems >>>> not >>>>>>>>>>>>>> necessary. I'd suggest jut having the following three static >>>> with >>>>>>>>>>>> another >>>>>>>>>>>>>> three similar member functions: >>>>>>>>>>>>>> >>>>>>>>>>>>>> public static <K, V> Produced<K, V> withKeySerde(final >>> Serde<K> >>>>>>>>>>>> keySerde) >>>>>>>>>>>>>> >>>>>>>>>>>>>> public static <K, V> Produced<K, V> withValueSerde(final >>>> Serde<V> >>>>>>>>>>>>>> valueSerde) >>>>>>>>>>>>>> >>>>>>>>>>>>>> public static <K, V> Produced<K, V> >>> withStreamPartitioner(final >>>>>>>>>>>>>> StreamPartitioner<K, V> partitioner) >>>>>>>>>>>>>> >>>>>>>>>>>>>> The key idea is that by using the same function name string >>> for >>>>>>>>> static >>>>>>>>>>>>>> constructor and member functions, users do not need to >>> remember >>>>>>> what >>>>>>>>>>>> are >>>>>>>>>>>>>> the differences but can call these functions with any >>> ordering >>>>>>> they >>>>>>>>>>>> want, >>>>>>>>>>>>>> and later calls on the same spec will win over early calls. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> That would be great if java supported it, but it doesn't. You >>>>> can't >>>>>>>>> have >>>>>>>>>>>>> static an member functions with the same signature. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> 3.2 Serialized: similarly >>>>>>>>>>>>>> >>>>>>>>>>>>>> public static <K, V> Serialized<K, V> withKeySerde(final >>>> Serde<K> >>>>>>>>>>>>> keySerde) >>>>>>>>>>>>>> >>>>>>>>>>>>>> public static <K, V> Serialized<K, V> withValueSerde(final >>>>>>> Serde<V> >>>>>>>>>>>>>> valueSerde) >>>>>>>>>>>>>> >>>>>>>>>>>>>> public Serialized<K, V> withKeySerde(final Serde<K> >>> keySerde) >>>>>>>>>>>>>> >>>>>>>>>>>>>> public Serialized<K, V> withValueSerde(final Serde >>> valueSerde) >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> as above >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Also it has a final Serde<V> otherValueSerde in one of its >>>> static >>>>>>>>>>>>>> constructor, it that intentional? >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Nope: thanks. >>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> 3.3. Joined: similarly, keep the static constructor >>> signatures >>>>> the >>>>>>>>>>>> same >>>>>>>>>>>>> as >>>>>>>>>>>>>> its corresponding member fields. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> As above >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>>> 3.4 Materialized: it is a bit special, and I think we can >>> keep >>>>> its >>>>>>>>>>>> static >>>>>>>>>>>>>> constructors with only two `as` as they are today.K >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> 4. Is there any modifications on StateStoreSupplier? Is it >>>>> replaced >>>>>>>>> by >>>>>>>>>>>>>> BytesStoreSupplier? Seems some more descriptions are lacking >>>>> here. >>>>>>>>>>>> Also >>>>>>>>>>>>> in >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> No modifications to StateStoreSupplier. It is superseceded by >>>>>>>>>>>>> BytesStoreSupplier. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>>> public static <K, V, S extends StateStore> Materialized<K, >>> V, >>>> S> >>>>>>>>>>>>>> as(final StateStoreSupplier<S> >>>>>>>>>>>>>> supplier) >>>>>>>>>>>>>> >>>>>>>>>>>>>> Is the parameter in type of BytesStoreSupplier? >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Yep - thanks >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Guozhang >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Thu, Jul 27, 2017 at 5:26 AM, Damian Guy < >>>>> damian....@gmail.com >>>>>>>> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Updated link: >>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>>>>>>>>>>>>> 182%3A+Reduce+Streams+DSL+overloads+and+allow+easier+ >>>>>>>>>>>>>>> use+of+custom+storage+engines >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>> Damian >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Thu, 27 Jul 2017 at 13:09 Damian Guy < >>> damian....@gmail.com >>>>> >>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I've put together a KIP to make some changes to the >>>>> KafkaStreams >>>>>>>>>>>> DSL >>>>>>>>>>>>>> that >>>>>>>>>>>>>>>> will hopefully allow us to: >>>>>>>>>>>>>>>> 1) reduce the explosion of overloads >>>>>>>>>>>>>>>> 2) add new features without having to continue adding more >>>>>>>>>>>> overloads >>>>>>>>>>>>>>>> 3) provide simpler ways for people to use custom storage >>>>> engines >>>>>>>>>>>> and >>>>>>>>>>>>>> wrap >>>>>>>>>>>>>>>> them with logging, caching etc if desired >>>>>>>>>>>>>>>> 4) enable per-operator caching rather than global caching >>>>>>> without >>>>>>>>>>>>>> having >>>>>>>>>>>>>>>> to resort to supplying a StateStoreSupplier when you just >>>> want >>>>>>> to >>>>>>>>>>>>> turn >>>>>>>>>>>>>>>> caching off. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> The KIP is here: >>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/pages/viewpage. >>>>>>>>>>>>>>> action?pageId=73631309 >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>> Damian >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> -- >>>>>>>>>>>>>> -- Guozhang >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> -- Guozhang >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>> >>> >>> >>> >>> -- >>> -- Guozhang >>> >> >
signature.asc
Description: OpenPGP digital signature