I had one meta comment on the PR:
https://github.com/apache/kafka/pull/5909#discussion_r240447153

On Mon, Dec 10, 2018 at 5:22 PM John Roesler <j...@confluent.io> wrote:

> Hi Florian,
>
> I hope it's ok if I ask a few questions at this late stage...
>
> Comment 1 ======
>
> It seems like the proposal is to add a new "Named" interface that is
> intended to be mixed in with the existing API objects at various points.
>
> Just to preface some of my comments, it looks like your KIP was created
> quite a while ago, so the API may have changed somewhat since you started.
>
> As I see the API, there are a few different kinds of DSL method arguments:
> * functions: things like Initializer, Aggregator, ValueJoiner,
> ForEachAction... All of these are essentially Streams-flavored Function
> interfaces with different arities, type bounds, and semantics.
> * config objects: things like Produced, Consumed, Joined, Grouped... These
> are containers for configurations, where the target of the configuration is
> the operation itself
> * raw configurations: things like a raw topic-name string and Materialized:
> These are configurations for operations that have no config object, and for
> various reasons, we didn't make one. The distinguishing feature is that the
> target of the configuration is not the operation itself, but some aspect of
> it. For example, in Materialized, we are not setting the caching behavior
> of, for example, an aggregation; we're setting the caching behavior of a
> materialized state store attached to the aggregation.
>
> It seems like choosing to mix the Named interface in with the functions has
> a couple of unfortunate side-effects:
> * Aggregator is not the only function passed to any of the relevant
> aggregate methods, so it seems a little arbitrary to pick that function
> over Initializer or Merger.
> * As you noted, branch() takes an array of Predicate, so we just ignore the
> provided name(s), even though Predicate names are used elsewhere.
> * Not all things that we want to name have function arguments, notably
> source and sink, so we'd switch paradigms and use the config object
> instead.
> * Adding an extra method to the function interfaces means that those are no
> longer SAM interfaces. You proposed to add a default implementation, so we
> could still pass a lambda if we don't want to set the name, but if we *do*
> want to set the name, we can no longer use lambdas.
>
> I think the obvious other choice would be to mix Named in with the config
> objects instead, but this has one main downside of its own...
> * not every operator we wish to name has a config object. I don't know if
> everyone involved is comfortable with adding a config object to every
> operator that's missing one.
>
> Personally, I favor moving toward a more consistent state that's forward
> compatible with any further changes we wish to make. I *think* that giving
> every operator two forms (one with no config and one with a config object)
> would be such an API.
>
> Comment 2 =========
>
> Finally, just a minor comment: the static method in Named wouldn't work
> properly as defined. Assuming that we mix Named in with Produced, for
> example, we'd need to be able to use it like:
> >  kStream.to("out", Produced.with("myOut"))
> This doesn't work because with() returns a Named, but we need a Produced.
>
> We can pull off a builder method in the interface, but not a static method.
> To define a builder method in the interface that returns an instance of the
> concrete subtype, you have to use the "curiously recurring generic"
> pattern.
>
> It would look like:
>
> public interface Named<N extends Named<N>> {
>   String name();
>   N withName(String name);
> }
>
> You can see where the name of the pattern comes from ;)
> An implementation would then look like:
>
> public class Produced implements Named<Produced> {
>   String name() { return name; }
>   Produced withName(final String name) { this.name = name; return this; }
> }
>
> Note that the generic parameter gets filled in properly in the implementing
> class, so that you get the right return type out.
>
> It doesn't work at all with a static factory method at the interface level,
> so it would be up to Produced to define a static factory if it wants to
> present one.
>
> ======
>
> Those are my two feedbacks!
>
> I hope you find this helpful, rather than frustrating. I'm sorry I didn't
> get a chance to comment sooner.
>
> Thanks for the KIP, I think it will be much nicer to be able to name the
> processor nodes.
>
> -John
>
> On Tue, Nov 27, 2018 at 6:34 PM Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hi Florian,
> >
> > I've made a pass over the PR. There are some comments that are related to
> > the function names which may be affecting the KIP wiki page, but overall
> I
> > think it looks good already.
> >
> >
> > Guozhang
> >
> >
> > On Fri, Nov 16, 2018 at 4:21 PM Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> > > Thanks Florian! I will take a look at the PR.
> > >
> > >
> > >
> > > On Mon, Nov 12, 2018 at 2:44 PM Florian Hussonnois <
> > fhussonn...@gmail.com>
> > > wrote:
> > >
> > >> Hi Matthias,
> > >>
> > >> Sorry I was absent for a while. I have started a new PR for this KIP.
> It
> > >> is
> > >> still in progress for now. I'm working on it.
> > >> https://github.com/apache/kafka/pull/5909
> > >>
> > >> Le ven. 19 oct. 2018 à 20:13, Matthias J. Sax <matth...@confluent.io>
> a
> > >> écrit :
> > >>
> > >> > What is the status of this KIP?
> > >> >
> > >> > -Matthias
> > >> >
> > >> > On 7/19/18 5:17 PM, Guozhang Wang wrote:
> > >> > > Hello Florian,
> > >> > >
> > >> > > Sorry for being late... Found myself keep apologizing for late
> > replies
> > >> > > these days. But I do want to push this KIP's progress forward as I
> > >> see it
> > >> > > very important and helpful feature for extensibility.
> > >> > >
> > >> > > About the exceptions, I've gone through them and hopefully it is
> an
> > >> > > exhaustive list:
> > >> > >
> > >> > > 1. KTable#toStream()
> > >> > > 2. KStream#merge(KStream)
> > >> > > 3. KStream#process() / transform() / transformValues()
> > >> > > 4. KGroupedTable / KGroupedStream#count()
> > >> > >
> > >> > >
> > >> > > Here's my reasoning:
> > >> > >
> > >> > > * It is okay not letting users to override the name for 1/2, since
> > >> they
> > >> > are
> > >> > > too trivial to be useful for debugging, plus their processor names
> > >> would
> > >> > > not determine any related topic / store names.
> > >> > > * For 3, I'd vote for adding overloaded functions with Named.
> > >> > > * For 4, if users really want to name the processor she can call
> > >> > > aggregate() instead, so I think it is okay to skip this case.
> > >> > >
> > >> > >
> > >> > > Guozhang
> > >> > >
> > >> > >
> > >> > >
> > >> > > On Fri, Jul 6, 2018 at 3:06 PM, Florian Hussonnois <
> > >> > fhussonn...@gmail.com>
> > >> > > wrote:
> > >> > >
> > >> > >> Hi,
> > >> > >>
> > >> > >> The option #3 seems to be a good alternative and I find the API
> > more
> > >> > >> elegant (thanks John).
> > >> > >>
> > >> > >> But, we still have the need to overload some methods either
> because
> > >> > they do
> > >> > >> not accept an action instance or because they are translated to
> > >> multiple
> > >> > >> processors.
> > >> > >>
> > >> > >> For example, this is the case for methods branch() and merge().
> We
> > >> could
> > >> > >> introduce a new interface Named (or maybe a different name ?)
> with
> > a
> > >> > method
> > >> > >> name(). All action interfaces could extend this one to implement
> > the
> > >> > option
> > >> > >> 3).
> > >> > >> This would result by having the following overloads  :
> > >> > >>
> > >> > >> Stream<K, V> merge(final Named name, final KStream<K, V> stream);
> > >> > >> KStream<K, V>[] branch(final Named name, final Predicate<? super
> > K, ?
> > >> > super
> > >> > >> V>... predicates)
> > >> > >>
> > >> > >> N.B : The list above is  not exhaustive
> > >> > >>
> > >> > >> ---------
> > >> > >> user's code will become :
> > >> > >>
> > >> > >>         KStream<String, Integer> stream = builder.stream("test");
> > >> > >>         KStream<String, Integer>[] branches =
> > >> > >> stream.branch(Named.with("BRANCH-STREAM-ON-VALUE"),
> > >> > >>                 Predicate.named("STREAM-PAIR-VALUE", (k, v) -> v
> %
> > 2
> > >> ==
> > >> > >> 0),
> > >> > >>                 Predicate.named("STREAM-IMPAIR-VALUE", (k, v) ->
> v
> > %
> > >> 2
> > >> > !=
> > >> > >> 0));
> > >> > >>
> > >> > >>         branches[0].to("pair");
> > >> > >>         branches[1].to("impair");
> > >> > >> ---------
> > >> > >>
> > >> > >> This is a mix of the options 3) and 1)
> > >> > >>
> > >> > >> Le ven. 6 juil. 2018 à 22:58, Guozhang Wang <wangg...@gmail.com>
> a
> > >> > écrit :
> > >> > >>
> > >> > >>> Hi folks, just to summarize the options we have so far:
> > >> > >>>
> > >> > >>> 1) Add a new "as" for KTable / KStream, plus adding new fields
> for
> > >> > >>> operators-returns-void control objects (the current wiki's
> > >> proposal).
> > >> > >>>
> > >> > >>> Pros: no more overloads.
> > >> > >>> Cons: a bit departing with the current high-level API design of
> > the
> > >> > DSL,
> > >> > >>> plus, the inconsistency between operators-returns-void and
> > >> > >>> operators-not-return-voids.
> > >> > >>>
> > >> > >>> 2) Add overloaded functions for all operators, that accepts a
> new
> > >> > control
> > >> > >>> object "Described".
> > >> > >>>
> > >> > >>> Pros: consistent with current APIs.
> > >> > >>> Cons: lots of overloaded functions to add.
> > >> > >>>
> > >> > >>> 3) Add another default function in the interface (thank you J8!)
> > as
> > >> > John
> > >> > >>> proposed.
> > >> > >>>
> > >> > >>> Pros: no overloaded functions, no "Described".
> > >> > >>> Cons: do we lose lambda functions really (seems not if we
> provide
> > a
> > >> > >> "named"
> > >> > >>> for each func)? Plus "Described" may be more extensible than a
> > >> single
> > >> > >>> `String`.
> > >> > >>>
> > >> > >>>
> > >> > >>> My principle of considering which one is better depends
> primarily
> > on
> > >> > "how
> > >> > >>> to make advanced users easily use the additional API, while
> > keeping
> > >> it
> > >> > >>> hidden from normal users who do not care at all". For that
> > purpose I
> > >> > >> think
> > >> > >>> 3) > 1) > 2).
> > >> > >>>
> > >> > >>> One caveat though, is that changing the interface would not be
> > >> > >>> binary-compatible though source-compatible, right? I.e. users
> need
> > >> to
> > >> > >>> recompile their code though no changes needed.
> > >> > >>>
> > >> > >>>
> > >> > >>>
> > >> > >>> Another note: for 3), if we really want to keep extensibility of
> > >> > >> Described
> > >> > >>> we could do sth. like:
> > >> > >>>
> > >> > >>> ---------
> > >> > >>>
> > >> > >>> public interface Predicate<K, V> {
> > >> > >>>     // existing method
> > >> > >>>     boolean test(final K key, final V value);
> > >> > >>>
> > >> > >>>     // new default method adds the ability to name the predicate
> > >> > >>>     default Described described() {
> > >> > >>>         return new Described(null);
> > >> > >>>     }
> > >> > >>> }
> > >> > >>>
> > >> > >>> ----------
> > >> > >>>
> > >> > >>> where user's code becomes:
> > >> > >>>
> > >> > >>> stream.filter(named("key", (k, v) -> true));   // note `named`
> now
> > >> just
> > >> > >>> sets a Described("key") in "described()".
> > >> > >>>
> > >> > >>> stream.filter(described(Described.as("key", /* any other fancy
> > >> > >> parameters
> > >> > >>> in the future*/), (k, v) -> true));
> > >> > >>> ----------
> > >> > >>>
> > >> > >>>
> > >> > >>> I feel it is not much likely that we'd need to extend it further
> > in
> > >> the
> > >> > >>> future, so just a `String` would be good enough. But just
> listing
> > >> all
> > >> > >>> possibilities here.
> > >> > >>>
> > >> > >>>
> > >> > >>>
> > >> > >>> Guozhang
> > >> > >>>
> > >> > >>>
> > >> > >>>
> > >> > >>>
> > >> > >>>
> > >> > >>>
> > >> > >>> On Fri, Jul 6, 2018 at 8:19 AM, John Roesler <j...@confluent.io
> >
> > >> > wrote:
> > >> > >>>
> > >> > >>>> Hi Florian,
> > >> > >>>>
> > >> > >>>> Sorry I'm late to the party, but I missed the message
> originally.
> > >> > >>>>
> > >> > >>>> Regarding the names, it's probably a good idea to stick to the
> > same
> > >> > >>>> character set we're currently using: letters, numbers, and
> > hyphens.
> > >> > The
> > >> > >>>> names are used in Kafka topics, files and folders, and RocksDB
> > >> > >> databases,
> > >> > >>>> and we also need them to work with the file systems of Windows,
> > >> Linux,
> > >> > >>> and
> > >> > >>>> MacOS. My opinion is that with a situation like that, it's
> better
> > >> to
> > >> > be
> > >> > >>>> conservative. It might also be a good idea to impose an upper
> > >> limit on
> > >> > >>> name
> > >> > >>>> length to avoid running afoul of any of those systems.
> > >> > >>>>
> > >> > >>>> ---
> > >> > >>>>
> > >> > >>>> It seems like there's a small debate between 1) adding a new
> > >> method to
> > >> > >>>> KStream (and maybe KTable) to modify its name after the fact,
> or
> > 2)
> > >> > >>>> piggy-backing on the config objects where they exist and adding
> > one
> > >> > >> where
> > >> > >>>> they don't. To me, #2 is the better alternative even though it
> > >> > produces
> > >> > >>>> more overloads and may be a bit awkward in places.
> > >> > >>>>
> > >> > >>>> The reason is simply that #1 is a high-level departure from the
> > >> > >>>> graph-building paradigm we're using in the DSL. Consider:
> > >> > >>>>
> > >> > >>>> Graph.node1(config).node2(config)
> > >> > >>>>
> > >> > >>>> vs
> > >> > >>>>
> > >> > >>>> Graph.node1().config().node2().config()
> > >> > >>>>
> > >> > >>>> We could have done either, but we picked the former. I think
> it's
> > >> > >>> probably
> > >> > >>>> a good goal to try and stick to it so that developers can
> develop
> > >> and
> > >> > >>> rely
> > >> > >>>> on their instincts for how the DSL will behave.
> > >> > >>>>
> > >> > >>>> I do want to present one alternative to adding new config
> > objects:
> > >> we
> > >> > >> can
> > >> > >>>> just add a "name()" method to all our "action" interfaces. For
> > >> > example,
> > >> > >>>> I'll demonstrate how we can add a "name" to Predicate and then
> > use
> > >> it
> > >> > >> to
> > >> > >>>> name a "KStream#filter" DSL operator:
> > >> > >>>>
> > >> > >>>> public interface Predicate<K, V> {
> > >> > >>>>     // existing method
> > >> > >>>>     boolean test(final K key, final V value);
> > >> > >>>>
> > >> > >>>>     // new default method adds the ability to name the
> predicate
> > >> > >>>>     default String name() {
> > >> > >>>>         return null;
> > >> > >>>>     }
> > >> > >>>>
> > >> > >>>>     // new static factory method adds the ability to wrap
> lambda
> > >> > >>> predicates
> > >> > >>>> with a named predicate
> > >> > >>>>     static <K, V> Predicate<K, V> named(final String name,
> final
> > >> > >>>> Predicate<K, V> predicate) {
> > >> > >>>>         return new Predicate<K, V>() {
> > >> > >>>>             @Override
> > >> > >>>>             public boolean test(final K key, final V value) {
> > >> > >>>>                 return predicate.test(key, value);
> > >> > >>>>             }
> > >> > >>>>
> > >> > >>>>             @Override
> > >> > >>>>             public String name() {
> > >> > >>>>                 return name;
> > >> > >>>>             }
> > >> > >>>>         };
> > >> > >>>>     }
> > >> > >>>> }
> > >> > >>>>
> > >> > >>>> Then, here's how it would look to use it:
> > >> > >>>>
> > >> > >>>> // Anonymous predicates continue to work just fine
> > >> > >>>> stream.filter((k, v) -> true);
> > >> > >>>>
> > >> > >>>> // Devs can swap in a Predicate that implements the name()
> > method.
> > >> > >>>> stream.filter(new Predicate<Object, Object>() {
> > >> > >>>>     @Override
> > >> > >>>>     public boolean test(final Object key, final Object value) {
> > >> > >>>>         return true;
> > >> > >>>>     }
> > >> > >>>>
> > >> > >>>>     @Override
> > >> > >>>>     public String name() {
> > >> > >>>>         return "hey";
> > >> > >>>>     }
> > >> > >>>> });
> > >> > >>>>
> > >> > >>>> // Or they can wrap their existing lambda using the static
> > factory
> > >> > >> method
> > >> > >>>> stream.filter(named("key", (k, v) -> true));
> > >> > >>>>
> > >> > >>>> Just a thought.
> > >> > >>>>
> > >> > >>>> Overall, I think it's really valuable to be able to name the
> > >> > >> processors,
> > >> > >>>> for all the reasons you mentioned in the KIP. So thank you for
> > >> > >>> introducing
> > >> > >>>> this!
> > >> > >>>>
> > >> > >>>> Thanks,
> > >> > >>>> -John
> > >> > >>>>
> > >> > >>>> On Thu, Jul 5, 2018 at 4:53 PM Florian Hussonnois <
> > >> > >> fhussonn...@gmail.com
> > >> > >>>>
> > >> > >>>> wrote:
> > >> > >>>>
> > >> > >>>>> Hi, thank you very much for all you suggestions. I've started
> to
> > >> > >> update
> > >> > >>>> the
> > >> > >>>>> KIP (
> > >> > >>>>>
> > >> > >>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > >> > >>>> 307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL
> > >> > >>>>> ).
> > >> > >>>>> Also, I propose to rename the Processed class into Described -
> > >> this
> > >> > >>> will
> > >> > >>>> be
> > >> > >>>>> more meaningful (but this is just a detail).
> > >> > >>>>>
> > >> > >>>>> I'm OK to not enforcing uppercase for specific names but
> should
> > we
> > >> > >>> allow
> > >> > >>>>> arbitrary names with whitespaces for example ? Currently, I
> > can't
> > >> > >> tell
> > >> > >>> if
> > >> > >>>>> this can lead to some side effects ?
> > >> > >>>>>
> > >> > >>>>> Le lun. 11 juin 2018 à 01:31, Matthias J. Sax <
> > >> matth...@confluent.io
> > >> > >>>
> > >> > >>> a
> > >> > >>>>> écrit :
> > >> > >>>>>
> > >> > >>>>>> Just catching up on this thread.
> > >> > >>>>>>
> > >> > >>>>>> I like the general idea. Couple of comments:
> > >> > >>>>>>
> > >> > >>>>>>  - I think that adding `Processed` (or maybe a different
> name?)
> > >> is
> > >> > >> a
> > >> > >>>>>> valid proposal for stateless operators that only have a
> single
> > >> > >>> overload
> > >> > >>>>>> atm. It would align with the overall API design.
> > >> > >>>>>>
> > >> > >>>>>>  - for all methods with multiple existing overloads, we can
> > >> > >> consider
> > >> > >>> to
> > >> > >>>>>> extend `Consumed`, `Produced`, `Materialized` etc to take an
> > >> > >>> additional
> > >> > >>>>>> processor name (not sure atm how elegant this is; we would
> need
> > >> to
> > >> > >>>>>> "play" with the API a little bit; the advantage would be,
> that
> > we
> > >> > >> do
> > >> > >>>> not
> > >> > >>>>>> add more overloads what seems to be key for this KIP)
> > >> > >>>>>>
> > >> > >>>>>>  - operators return void: while I agree that the "name first"
> > >> > >>> chaining
> > >> > >>>>>> idea is not very intuitive, it might still work, if we name
> the
> > >> > >>> method
> > >> > >>>>>> correctly (again, we would need to "play" with the API a
> little
> > >> bit
> > >> > >>> to
> > >> > >>>>> see)
> > >> > >>>>>>
> > >> > >>>>>>  - for DSL operators that are translated to multiple nodes:
> it
> > >> > >> might
> > >> > >>>>>> make sense to use the specified operator name as prefix and
> add
> > >> > >>>>>> reasonable suffixes. For example, a join translates into 5
> > >> > >> operators
> > >> > >>>>>> that could be name "name-left-store-processor",
> > >> > >>>>>> "name-left-join-processor", "name-right-store-processor",
> > >> > >>>>>> "name-right-join-processor", and "name-join-merge-processor"
> > (or
> > >> > >>>>>> similar). Maybe just using numbers might also work.
> > >> > >>>>>>
> > >> > >>>>>>  - I think, we should strip the number suffixes if a user
> > >> provides
> > >> > >>>> names
> > >> > >>>>>>
> > >> > >>>>>>  - enforcing upper case seems to be tricky: for example, we
> do
> > >> not
> > >> > >>>>>> enforce upper case for store names and we cannot easily
> change
> > it
> > >> > >> as
> > >> > >>> it
> > >> > >>>>>> would break compatibility -- thus, for consistency reasons we
> > >> might
> > >> > >>> not
> > >> > >>>>>> want to do this
> > >> > >>>>>>
> > >> > >>>>>>  - for better understand of the impact of the KIP, it would
> be
> > >> > >> quite
> > >> > >>>>>> helpful if you would list all method names that are affected
> in
> > >> the
> > >> > >>> KIP
> > >> > >>>>>> (ie, list all newly added overloads)
> > >> > >>>>>>
> > >> > >>>>>>
> > >> > >>>>>> -Matthias
> > >> > >>>>>>
> > >> > >>>>>>
> > >> > >>>>>> On 5/31/18 6:40 PM, Guozhang Wang wrote:
> > >> > >>>>>>> Hi Florian,
> > >> > >>>>>>>
> > >> > >>>>>>> Re 1: I think changing the KStreamImpl / KTableImpl to allow
> > >> > >>>> modifying
> > >> > >>>>>> the
> > >> > >>>>>>> processor name after the operator is fine as long as we do
> the
> > >> > >>> check
> > >> > >>>>>> again
> > >> > >>>>>>> when modifying that. In fact, we are having some topology
> > >> > >>>> optimization
> > >> > >>>>>>> going on which may modify processor names in the final
> > topology
> > >> > >>>>> anyways (
> > >> > >>>>>>> https://github.com/apache/kafka/pull/4983). Semantically I
> > >> think
> > >> > >>> it
> > >> > >>>> is
> > >> > >>>>>>> easier to understand to developers than "deciding the
> > processor
> > >> > >>> name
> > >> > >>>>> for
> > >> > >>>>>>> the next operator".
> > >> > >>>>>>>
> > >> > >>>>>>> Re 2: Yeah I'm thinking that for operators that translates
> to
> > >> > >>>> multiple
> > >> > >>>>>>> processor names, we can still use the provided "hint" to
> name
> > >> the
> > >> > >>>>>> processor
> > >> > >>>>>>> names, e.g. for Joins we can name them as `join-foo-this`
> and
> > >> > >>>>>>> `join-foo-that` etc if user calls `as("foo")`.
> > >> > >>>>>>>
> > >> > >>>>>>> Re 3: The motivation I had about removing the suffix is that
> > it
> > >> > >> has
> > >> > >>>>> huge
> > >> > >>>>>>> restrictions on topology compatibilities: consider if user
> > code
> > >> > >>>> added a
> > >> > >>>>>> new
> > >> > >>>>>>> operator, or library does some optimization to remove some
> > >> > >>> operators,
> > >> > >>>>> the
> > >> > >>>>>>> suffix indexing may be changed for a large amount of the
> > >> > >> processor
> > >> > >>>>> names:
> > >> > >>>>>>> this will in turn change the internal state store names, as
> > well
> > >> > >> as
> > >> > >>>>>>> internal topic names as well, making the new application
> > >> topology
> > >> > >>> to
> > >> > >>>> be
> > >> > >>>>>>> incompatible with the ones. One rationale I had about this
> KIP
> > >> is
> > >> > >>>> that
> > >> > >>>>>>> aligned this effort, moving forward we can allow users to
> > >> > >> customize
> > >> > >>>>>>> internal names so that they can still be reused even with
> > >> > >> topology
> > >> > >>>>>> changes
> > >> > >>>>>>> (e.g. KIP-230), so I think removing the suffix index would
> be
> > >> > >> more
> > >> > >>>>>>> applicable in the long run.
> > >> > >>>>>>>
> > >> > >>>>>>>
> > >> > >>>>>>>
> > >> > >>>>>>> Guozhang
> > >> > >>>>>>>
> > >> > >>>>>>>
> > >> > >>>>>>>
> > >> > >>>>>>>
> > >> > >>>>>>>
> > >> > >>>>>>> On Thu, May 31, 2018 at 3:08 PM, Florian Hussonnois <
> > >> > >>>>>> fhussonn...@gmail.com>
> > >> > >>>>>>> wrote:
> > >> > >>>>>>>
> > >> > >>>>>>>> Hi ,
> > >> > >>>>>>>> Thank you very much for your feedback.
> > >> > >>>>>>>>
> > >> > >>>>>>>> 1/
> > >> > >>>>>>>> I agree that overloading most of the methods with a
> Processed
> > >> is
> > >> > >>> not
> > >> > >>>>>> ideal.
> > >> > >>>>>>>> I've started modifying the KStream API and I got to the
> same
> > >> > >>>>> conclusion.
> > >> > >>>>>>>> Also ading a new method directly to KStreamImpl and
> > KTableImpl
> > >> > >>>> classes
> > >> > >>>>>>>> seems to be a better option.
> > >> > >>>>>>>>
> > >> > >>>>>>>> However a processor name cannot be redefined after calling
> an
> > >> > >>>> operator
> > >> > >>>>>> (or
> > >> > >>>>>>>> maybe I miss something in the code).
> > >> > >>>>>>>> From my understanding, this will only set the KStream name
> > >> > >>> property
> > >> > >>>>> not
> > >> > >>>>>> the
> > >> > >>>>>>>> processor name previsouly added to the topology builder -
> > >> > >> leading
> > >> > >>> to
> > >> > >>>>>>>> InvalidTopology exception.
> > >> > >>>>>>>>
> > >> > >>>>>>>> So the new method should actually defines the name of the
> > next
> > >> > >>>>>> processor :
> > >> > >>>>>>>> Below is an example :
> > >> > >>>>>>>>
> > >> > >>>>>>>> *stream.as <http://stream.as
> > >> > >>>> (Processed.name("MAPPE_TO_UPPERCASE")*
> > >> > >>>>>>>> *          .map( (k, v) -> KeyValue.pair(k,
> > v.toUpperCase()))*
> > >> > >>>>>>>>
> > >> > >>>>>>>> I think this approach could solve the cases for methods
> > >> > >> returning
> > >> > >>>>> void ?
> > >> > >>>>>>>>
> > >> > >>>>>>>> Regarding this new method we have two possible
> > implementations
> > >> :
> > >> > >>>>>>>>
> > >> > >>>>>>>>    1. Adding a method like : withName(String processorName)
> > >> > >>>>>>>>    2. or adding a method accepting an Processed object :
> > >> > >>>>> as(Processed).
> > >> > >>>>>>>>
> > >> > >>>>>>>> I think solution 2. is preferable as the Processed class
> > could
> > >> > >> be
> > >> > >>>>>> enriched
> > >> > >>>>>>>> further (in futur).
> > >> > >>>>>>>>
> > >> > >>>>>>>> 2/
> > >> > >>>>>>>> As Guozhang said some operators add internal processors.
> > >> > >>>>>>>> For example the branch() method create one KStreamBranch
> > >> > >> processor
> > >> > >>>> to
> > >> > >>>>>> route
> > >> > >>>>>>>> records and one KStreamPassThrough processor for each
> branch.
> > >> > >>>>>>>> In that situation only the parent processor can be named.
> For
> > >> > >>>> children
> > >> > >>>>>>>> processors we could keep the current behaviour that add a
> > >> suffix
> > >> > >>>> (i.e
> > >> > >>>>>>>> KSTREAM-BRANCHCHILD-)
> > >> > >>>>>>>>
> > >> > >>>>>>>> This also the case for the join() method that result to
> > adding
> > >> > >>>>> multiple
> > >> > >>>>>>>> processors to the topology (windowing, left/right joins
> and a
> > >> > >>> merge
> > >> > >>>>>>>> processor).
> > >> > >>>>>>>> I think, like for the branch method users could only
> define a
> > >> > >>>>> processor
> > >> > >>>>>>>> name prefix.
> > >> > >>>>>>>>
> > >> > >>>>>>>> 3/
> > >> > >>>>>>>> I think we should  still added a suffix like "-0000000000"
> to
> > >> > >>>>> processor
> > >> > >>>>>>>> name and enforce uppercases as this will keep some
> > consistency
> > >> > >>> with
> > >> > >>>>> the
> > >> > >>>>>>>> ones generated by the API.
> > >> > >>>>>>>>
> > >> > >>>>>>>> 4/
> > >> > >>>>>>>> Yes, the KTable interface should be modified like KStream
> to
> > >> > >> allow
> > >> > >>>>>> custom
> > >> > >>>>>>>> processor names definition.
> > >> > >>>>>>>>
> > >> > >>>>>>>> Thanks,
> > >> > >>>>>>>>
> > >> > >>>>>>>>
> > >> > >>>>>>>> Le jeu. 31 mai 2018 à 19:18, Damian Guy <
> > damian....@gmail.com>
> > >> > >> a
> > >> > >>>>> écrit
> > >> > >>>>>> :
> > >> > >>>>>>>>
> > >> > >>>>>>>>> Hi Florian,
> > >> > >>>>>>>>>
> > >> > >>>>>>>>> Thanks for the KIP. What about KTable and other DSL
> > >> interfaces?
> > >> > >>>> Will
> > >> > >>>>>> they
> > >> > >>>>>>>>> not want to be able to do the same thing?
> > >> > >>>>>>>>> It would be good to see a complete set of the public API
> > >> > >> changes.
> > >> > >>>>>>>>>
> > >> > >>>>>>>>> Cheers,
> > >> > >>>>>>>>> Damian
> > >> > >>>>>>>>>
> > >> > >>>>>>>>> On Wed, 30 May 2018 at 19:45 Guozhang Wang <
> > >> wangg...@gmail.com
> > >> > >>>
> > >> > >>>>> wrote:
> > >> > >>>>>>>>>
> > >> > >>>>>>>>>> Hello Florian,
> > >> > >>>>>>>>>>
> > >> > >>>>>>>>>> Thanks for the KIP. I have some meta feedbacks on the
> > >> > >> proposal:
> > >> > >>>>>>>>>>
> > >> > >>>>>>>>>> 1. You mentioned that this `Processed` object will be
> added
> > >> > >> to a
> > >> > >>>> new
> > >> > >>>>>>>>>> overloaded variant of all the stateless operators, what
> > about
> > >> > >>> the
> > >> > >>>>>>>>> stateful
> > >> > >>>>>>>>>> operators? Would like to hear your opinions if you have
> > >> > >> thought
> > >> > >>>>> about
> > >> > >>>>>>>>> that:
> > >> > >>>>>>>>>> note for stateful operators they will usually be mapped
> to
> > >> > >>>> multiple
> > >> > >>>>>>>>>> processor node names, so we probably need to come up with
> > >> some
> > >> > >>>> ways
> > >> > >>>>> to
> > >> > >>>>>>>>>> define all their names.
> > >> > >>>>>>>>>>
> > >> > >>>>>>>>>> 2. I share the same concern with Bill as for adding lots
> of
> > >> > >> new
> > >> > >>>>>>>> overload
> > >> > >>>>>>>>>> functions into the stateless operators, as we have just
> > spent
> > >> > >>>> quite
> > >> > >>>>>>>> some
> > >> > >>>>>>>>>> effort in trimming them since 1.0.0 release. If the goal
> is
> > >> to
> > >> > >>>> just
> > >> > >>>>>>>>> provide
> > >> > >>>>>>>>>> some "hints" on the generated processor node names, not
> > >> > >> strictly
> > >> > >>>>>>>>> enforcing
> > >> > >>>>>>>>>> the exact names that to be generated, then how about we
> > just
> > >> > >>> add a
> > >> > >>>>> new
> > >> > >>>>>>>>>> function to `KStream` and `KTable` classes like:
> > >> > >>> "as(Processed)",
> > >> > >>>>> with
> > >> > >>>>>>>>> the
> > >> > >>>>>>>>>> semantics as "the latest operators that generate this
> > KStream
> > >> > >> /
> > >> > >>>>> KTable
> > >> > >>>>>>>>> will
> > >> > >>>>>>>>>> be named accordingly to this hint".
> > >> > >>>>>>>>>>
> > >> > >>>>>>>>>> The only caveat, is that for all operators like
> > `KStream#to`
> > >> > >> and
> > >> > >>>>>>>>>> `KStream#print` that returns void, this alternative would
> > not
> > >> > >>>> work.
> > >> > >>>>>> But
> > >> > >>>>>>>>> for
> > >> > >>>>>>>>>> the current operators:
> > >> > >>>>>>>>>>
> > >> > >>>>>>>>>> a. KStream#print,
> > >> > >>>>>>>>>> b. KStream#foreach,
> > >> > >>>>>>>>>> c. KStream#to,
> > >> > >>>>>>>>>> d. KStream#process
> > >> > >>>>>>>>>>
> > >> > >>>>>>>>>> I personally felt that except `KStream#process` users
> would
> > >> > >> not
> > >> > >>>>>> usually
> > >> > >>>>>>>>>> bother to override their names, and for `KStream#process`
> > we
> > >> > >>> could
> > >> > >>>>> add
> > >> > >>>>>>>> an
> > >> > >>>>>>>>>> overload variant with the additional Processed object.
> > >> > >>>>>>>>>>
> > >> > >>>>>>>>>>
> > >> > >>>>>>>>>> 3. In your example, the processor names are still added
> > with
> > >> a
> > >> > >>>>> suffix
> > >> > >>>>>>>>> like
> > >> > >>>>>>>>>> "
> > >> > >>>>>>>>>> -0000000000", is this intentional? If yes, why (I thought
> > >> with
> > >> > >>>> user
> > >> > >>>>>>>>>> specified processor name hints we will not add suffix to
> > >> > >>>> distinguish
> > >> > >>>>>>>>>> different nodes of the same type any more)?
> > >> > >>>>>>>>>>
> > >> > >>>>>>>>>>
> > >> > >>>>>>>>>> Guozhang
> > >> > >>>>>>>>>>
> > >> > >>>>>>>>>>
> > >> > >>>>>>>>>> On Tue, May 29, 2018 at 6:47 AM, Bill Bejeck <
> > >> > >> bbej...@gmail.com
> > >> > >>>>
> > >> > >>>>>>>> wrote:
> > >> > >>>>>>>>>>
> > >> > >>>>>>>>>>> Hi Florian,
> > >> > >>>>>>>>>>>
> > >> > >>>>>>>>>>> Thanks for the KIP.  I think being able to add more
> > context
> > >> > >> to
> > >> > >>>> the
> > >> > >>>>>>>>>>> processor names would be useful.
> > >> > >>>>>>>>>>>
> > >> > >>>>>>>>>>> I like the idea of adding a "withProcessorName" to
> > Produced,
> > >> > >>>>> Consumed
> > >> > >>>>>>>>> and
> > >> > >>>>>>>>>>> Joined.
> > >> > >>>>>>>>>>>
> > >> > >>>>>>>>>>> But instead of adding the "Processed" parameter to a
> large
> > >> > >>>>> percentage
> > >> > >>>>>>>>> of
> > >> > >>>>>>>>>>> the methods, which would result in overloaded methods
> > (which
> > >> > >> we
> > >> > >>>>>>>> removed
> > >> > >>>>>>>>>>> quite a bit with KIP-182) what do you think of adding a
> > >> > >> method
> > >> > >>>>>>>>>>> to the AbstractStream class "withName(String
> > >> processorName)"?
> > >> > >>> BTW
> > >> > >>>>> I"m
> > >> > >>>>>>>>> not
> > >> > >>>>>>>>>>> married to the method name, it's the best I can do off
> the
> > >> > >> top
> > >> > >>> of
> > >> > >>>>> my
> > >> > >>>>>>>>>> head.
> > >> > >>>>>>>>>>>
> > >> > >>>>>>>>>>> For the methods that return void, we'd have to add a
> > >> > >> parameter,
> > >> > >>>> but
> > >> > >>>>>>>>> that
> > >> > >>>>>>>>>>> would at least cut down on the number of overloaded
> > methods
> > >> > >> in
> > >> > >>>> the
> > >> > >>>>>>>> API.
> > >> > >>>>>>>>>>>
> > >> > >>>>>>>>>>> Just my 2 cents.
> > >> > >>>>>>>>>>>
> > >> > >>>>>>>>>>> Thanks,
> > >> > >>>>>>>>>>> Bill
> > >> > >>>>>>>>>>>
> > >> > >>>>>>>>>>> On Sun, May 27, 2018 at 4:13 PM, Florian Hussonnois <
> > >> > >>>>>>>>>> fhussonn...@gmail.com
> > >> > >>>>>>>>>>>>
> > >> > >>>>>>>>>>> wrote:
> > >> > >>>>>>>>>>>
> > >> > >>>>>>>>>>>> Hi,
> > >> > >>>>>>>>>>>>
> > >> > >>>>>>>>>>>> I would like to start a new discussion on following
> KIP :
> > >> > >>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > >> > >>>>>>>>>>>>
> > >> > >>> 307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL
> > >> > >>>>>>>>>>>>
> > >> > >>>>>>>>>>>> This is still a draft.
> > >> > >>>>>>>>>>>>
> > >> > >>>>>>>>>>>> Looking forward for your feedback.
> > >> > >>>>>>>>>>>> --
> > >> > >>>>>>>>>>>> Florian HUSSONNOIS
> > >> > >>>>>>>>>>>>
> > >> > >>>>>>>>>>>
> > >> > >>>>>>>>>>
> > >> > >>>>>>>>>>
> > >> > >>>>>>>>>>
> > >> > >>>>>>>>>> --
> > >> > >>>>>>>>>> -- Guozhang
> > >> > >>>>>>>>>>
> > >> > >>>>>>>>>
> > >> > >>>>>>>>
> > >> > >>>>>>>>
> > >> > >>>>>>>> --
> > >> > >>>>>>>> Florian HUSSONNOIS
> > >> > >>>>>>>>
> > >> > >>>>>>>
> > >> > >>>>>>>
> > >> > >>>>>>>
> > >> > >>>>>>
> > >> > >>>>>>
> > >> > >>>>>
> > >> > >>>>> --
> > >> > >>>>> Florian HUSSONNOIS
> > >> > >>>>>
> > >> > >>>>
> > >> > >>>
> > >> > >>>
> > >> > >>>
> > >> > >>> --
> > >> > >>> -- Guozhang
> > >> > >>>
> > >> > >>
> > >> > >>
> > >> > >> --
> > >> > >> Florian HUSSONNOIS
> > >> > >>
> > >> > >
> > >> > >
> > >> > >
> > >> >
> > >> >
> > >>
> > >> --
> > >> Florian HUSSONNOIS
> > >>
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang

Reply via email to