Thank you very much for your feedbacks.

Currently, there is still lot of discussions regarding the Named interface.
On the one hand we should provided consistency over the stream API and on
the other hand we should not break the semantic as John point it up.

Guozhang, I'm sorry, but I'm little bit confused, maybe I missed something.
In your comment you have suggested that :
* Produced/Consumed/Suppressed should extends Named
* Named should have a private-package method to get the specified processor
name internally (processorName())
* Finally we should end up with something like :  Named -> XXX ->
XXXInternal or Named -> Produced -> ProducedInternal

The objective behind that is to :
* consolidate the internal method processorName()
* consolidate the method withName that exists now existing into Produced,
Consumed and Suppressed.

But, Named is an interface so we can't define a private-package method on
it. Also, for example Produced and ProducedInternal are not in the same
package so having a private-package method doesn't really help.
In addition, if we add the withName method into Named interface this can
become confusing for developers because action interfaces (ValueMapper,
Reducer, etc) extend it.
The interface would look like :

public interface Named<T extends Named<T>> {
    default String name() {
        return null;
    }
    default Named<T> withName(final String name) {
        return null;
    }
...
}

So maybe instead of adding another method to Named we could create a new
package-private class that could be extended by
Produced/Consumed/Joined/Suppressed. For exemple,
class SettableName<T extends SettableName<T>> implements Named {

    protected String processorName;

    SettableName(final SettableName settable) {
        this(Objects.requireNonNull(settable, "settable can't be
null").name());
    }

    SettableName(final String processorName) {
        this.processorName = processorName;
    }

    @Override
    public String name() {
        return processorName;
    }
    public T withName(final String processorName) {
        this.processorName = processorName;
        return (T)this;
    }
}

In that way, we will get : public class Produced implements
SettableName<Produced> { ...

WDYT?


Le mar. 11 déc. 2018 à 02:46, Guozhang Wang <wangg...@gmail.com> a écrit :

> I had one meta comment on the PR:
> https://github.com/apache/kafka/pull/5909#discussion_r240447153
>
> On Mon, Dec 10, 2018 at 5:22 PM John Roesler <j...@confluent.io> wrote:
>
> > Hi Florian,
> >
> > I hope it's ok if I ask a few questions at this late stage...
> >
> > Comment 1 ======
> >
> > It seems like the proposal is to add a new "Named" interface that is
> > intended to be mixed in with the existing API objects at various points.
> >
> > Just to preface some of my comments, it looks like your KIP was created
> > quite a while ago, so the API may have changed somewhat since you
> started.
> >
> > As I see the API, there are a few different kinds of DSL method
> arguments:
> > * functions: things like Initializer, Aggregator, ValueJoiner,
> > ForEachAction... All of these are essentially Streams-flavored Function
> > interfaces with different arities, type bounds, and semantics.
> > * config objects: things like Produced, Consumed, Joined, Grouped...
> These
> > are containers for configurations, where the target of the configuration
> is
> > the operation itself
> > * raw configurations: things like a raw topic-name string and
> Materialized:
> > These are configurations for operations that have no config object, and
> for
> > various reasons, we didn't make one. The distinguishing feature is that
> the
> > target of the configuration is not the operation itself, but some aspect
> of
> > it. For example, in Materialized, we are not setting the caching behavior
> > of, for example, an aggregation; we're setting the caching behavior of a
> > materialized state store attached to the aggregation.
> >
> > It seems like choosing to mix the Named interface in with the functions
> has
> > a couple of unfortunate side-effects:
> > * Aggregator is not the only function passed to any of the relevant
> > aggregate methods, so it seems a little arbitrary to pick that function
> > over Initializer or Merger.
> > * As you noted, branch() takes an array of Predicate, so we just ignore
> the
> > provided name(s), even though Predicate names are used elsewhere.
> > * Not all things that we want to name have function arguments, notably
> > source and sink, so we'd switch paradigms and use the config object
> > instead.
> > * Adding an extra method to the function interfaces means that those are
> no
> > longer SAM interfaces. You proposed to add a default implementation, so
> we
> > could still pass a lambda if we don't want to set the name, but if we
> *do*
> > want to set the name, we can no longer use lambdas.
> >
> > I think the obvious other choice would be to mix Named in with the config
> > objects instead, but this has one main downside of its own...
> > * not every operator we wish to name has a config object. I don't know if
> > everyone involved is comfortable with adding a config object to every
> > operator that's missing one.
> >
> > Personally, I favor moving toward a more consistent state that's forward
> > compatible with any further changes we wish to make. I *think* that
> giving
> > every operator two forms (one with no config and one with a config
> object)
> > would be such an API.
> >
> > Comment 2 =========
> >
> > Finally, just a minor comment: the static method in Named wouldn't work
> > properly as defined. Assuming that we mix Named in with Produced, for
> > example, we'd need to be able to use it like:
> > >  kStream.to("out", Produced.with("myOut"))
> > This doesn't work because with() returns a Named, but we need a Produced.
> >
> > We can pull off a builder method in the interface, but not a static
> method.
> > To define a builder method in the interface that returns an instance of
> the
> > concrete subtype, you have to use the "curiously recurring generic"
> > pattern.
> >
> > It would look like:
> >
> > public interface Named<N extends Named<N>> {
> >   String name();
> >   N withName(String name);
> > }
> >
> > You can see where the name of the pattern comes from ;)
> > An implementation would then look like:
> >
> > public class Produced implements Named<Produced> {
> >   String name() { return name; }
> >   Produced withName(final String name) { this.name = name; return this;
> }
> > }
> >
> > Note that the generic parameter gets filled in properly in the
> implementing
> > class, so that you get the right return type out.
> >
> > It doesn't work at all with a static factory method at the interface
> level,
> > so it would be up to Produced to define a static factory if it wants to
> > present one.
> >
> > ======
> >
> > Those are my two feedbacks!
> >
> > I hope you find this helpful, rather than frustrating. I'm sorry I didn't
> > get a chance to comment sooner.
> >
> > Thanks for the KIP, I think it will be much nicer to be able to name the
> > processor nodes.
> >
> > -John
> >
> > On Tue, Nov 27, 2018 at 6:34 PM Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> > > Hi Florian,
> > >
> > > I've made a pass over the PR. There are some comments that are related
> to
> > > the function names which may be affecting the KIP wiki page, but
> overall
> > I
> > > think it looks good already.
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Fri, Nov 16, 2018 at 4:21 PM Guozhang Wang <wangg...@gmail.com>
> > wrote:
> > >
> > > > Thanks Florian! I will take a look at the PR.
> > > >
> > > >
> > > >
> > > > On Mon, Nov 12, 2018 at 2:44 PM Florian Hussonnois <
> > > fhussonn...@gmail.com>
> > > > wrote:
> > > >
> > > >> Hi Matthias,
> > > >>
> > > >> Sorry I was absent for a while. I have started a new PR for this
> KIP.
> > It
> > > >> is
> > > >> still in progress for now. I'm working on it.
> > > >> https://github.com/apache/kafka/pull/5909
> > > >>
> > > >> Le ven. 19 oct. 2018 à 20:13, Matthias J. Sax <
> matth...@confluent.io>
> > a
> > > >> écrit :
> > > >>
> > > >> > What is the status of this KIP?
> > > >> >
> > > >> > -Matthias
> > > >> >
> > > >> > On 7/19/18 5:17 PM, Guozhang Wang wrote:
> > > >> > > Hello Florian,
> > > >> > >
> > > >> > > Sorry for being late... Found myself keep apologizing for late
> > > replies
> > > >> > > these days. But I do want to push this KIP's progress forward
> as I
> > > >> see it
> > > >> > > very important and helpful feature for extensibility.
> > > >> > >
> > > >> > > About the exceptions, I've gone through them and hopefully it is
> > an
> > > >> > > exhaustive list:
> > > >> > >
> > > >> > > 1. KTable#toStream()
> > > >> > > 2. KStream#merge(KStream)
> > > >> > > 3. KStream#process() / transform() / transformValues()
> > > >> > > 4. KGroupedTable / KGroupedStream#count()
> > > >> > >
> > > >> > >
> > > >> > > Here's my reasoning:
> > > >> > >
> > > >> > > * It is okay not letting users to override the name for 1/2,
> since
> > > >> they
> > > >> > are
> > > >> > > too trivial to be useful for debugging, plus their processor
> names
> > > >> would
> > > >> > > not determine any related topic / store names.
> > > >> > > * For 3, I'd vote for adding overloaded functions with Named.
> > > >> > > * For 4, if users really want to name the processor she can call
> > > >> > > aggregate() instead, so I think it is okay to skip this case.
> > > >> > >
> > > >> > >
> > > >> > > Guozhang
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > > On Fri, Jul 6, 2018 at 3:06 PM, Florian Hussonnois <
> > > >> > fhussonn...@gmail.com>
> > > >> > > wrote:
> > > >> > >
> > > >> > >> Hi,
> > > >> > >>
> > > >> > >> The option #3 seems to be a good alternative and I find the API
> > > more
> > > >> > >> elegant (thanks John).
> > > >> > >>
> > > >> > >> But, we still have the need to overload some methods either
> > because
> > > >> > they do
> > > >> > >> not accept an action instance or because they are translated to
> > > >> multiple
> > > >> > >> processors.
> > > >> > >>
> > > >> > >> For example, this is the case for methods branch() and merge().
> > We
> > > >> could
> > > >> > >> introduce a new interface Named (or maybe a different name ?)
> > with
> > > a
> > > >> > method
> > > >> > >> name(). All action interfaces could extend this one to
> implement
> > > the
> > > >> > option
> > > >> > >> 3).
> > > >> > >> This would result by having the following overloads  :
> > > >> > >>
> > > >> > >> Stream<K, V> merge(final Named name, final KStream<K, V>
> stream);
> > > >> > >> KStream<K, V>[] branch(final Named name, final Predicate<?
> super
> > > K, ?
> > > >> > super
> > > >> > >> V>... predicates)
> > > >> > >>
> > > >> > >> N.B : The list above is  not exhaustive
> > > >> > >>
> > > >> > >> ---------
> > > >> > >> user's code will become :
> > > >> > >>
> > > >> > >>         KStream<String, Integer> stream =
> builder.stream("test");
> > > >> > >>         KStream<String, Integer>[] branches =
> > > >> > >> stream.branch(Named.with("BRANCH-STREAM-ON-VALUE"),
> > > >> > >>                 Predicate.named("STREAM-PAIR-VALUE", (k, v) ->
> v
> > %
> > > 2
> > > >> ==
> > > >> > >> 0),
> > > >> > >>                 Predicate.named("STREAM-IMPAIR-VALUE", (k, v)
> ->
> > v
> > > %
> > > >> 2
> > > >> > !=
> > > >> > >> 0));
> > > >> > >>
> > > >> > >>         branches[0].to("pair");
> > > >> > >>         branches[1].to("impair");
> > > >> > >> ---------
> > > >> > >>
> > > >> > >> This is a mix of the options 3) and 1)
> > > >> > >>
> > > >> > >> Le ven. 6 juil. 2018 à 22:58, Guozhang Wang <
> wangg...@gmail.com>
> > a
> > > >> > écrit :
> > > >> > >>
> > > >> > >>> Hi folks, just to summarize the options we have so far:
> > > >> > >>>
> > > >> > >>> 1) Add a new "as" for KTable / KStream, plus adding new fields
> > for
> > > >> > >>> operators-returns-void control objects (the current wiki's
> > > >> proposal).
> > > >> > >>>
> > > >> > >>> Pros: no more overloads.
> > > >> > >>> Cons: a bit departing with the current high-level API design
> of
> > > the
> > > >> > DSL,
> > > >> > >>> plus, the inconsistency between operators-returns-void and
> > > >> > >>> operators-not-return-voids.
> > > >> > >>>
> > > >> > >>> 2) Add overloaded functions for all operators, that accepts a
> > new
> > > >> > control
> > > >> > >>> object "Described".
> > > >> > >>>
> > > >> > >>> Pros: consistent with current APIs.
> > > >> > >>> Cons: lots of overloaded functions to add.
> > > >> > >>>
> > > >> > >>> 3) Add another default function in the interface (thank you
> J8!)
> > > as
> > > >> > John
> > > >> > >>> proposed.
> > > >> > >>>
> > > >> > >>> Pros: no overloaded functions, no "Described".
> > > >> > >>> Cons: do we lose lambda functions really (seems not if we
> > provide
> > > a
> > > >> > >> "named"
> > > >> > >>> for each func)? Plus "Described" may be more extensible than a
> > > >> single
> > > >> > >>> `String`.
> > > >> > >>>
> > > >> > >>>
> > > >> > >>> My principle of considering which one is better depends
> > primarily
> > > on
> > > >> > "how
> > > >> > >>> to make advanced users easily use the additional API, while
> > > keeping
> > > >> it
> > > >> > >>> hidden from normal users who do not care at all". For that
> > > purpose I
> > > >> > >> think
> > > >> > >>> 3) > 1) > 2).
> > > >> > >>>
> > > >> > >>> One caveat though, is that changing the interface would not be
> > > >> > >>> binary-compatible though source-compatible, right? I.e. users
> > need
> > > >> to
> > > >> > >>> recompile their code though no changes needed.
> > > >> > >>>
> > > >> > >>>
> > > >> > >>>
> > > >> > >>> Another note: for 3), if we really want to keep extensibility
> of
> > > >> > >> Described
> > > >> > >>> we could do sth. like:
> > > >> > >>>
> > > >> > >>> ---------
> > > >> > >>>
> > > >> > >>> public interface Predicate<K, V> {
> > > >> > >>>     // existing method
> > > >> > >>>     boolean test(final K key, final V value);
> > > >> > >>>
> > > >> > >>>     // new default method adds the ability to name the
> predicate
> > > >> > >>>     default Described described() {
> > > >> > >>>         return new Described(null);
> > > >> > >>>     }
> > > >> > >>> }
> > > >> > >>>
> > > >> > >>> ----------
> > > >> > >>>
> > > >> > >>> where user's code becomes:
> > > >> > >>>
> > > >> > >>> stream.filter(named("key", (k, v) -> true));   // note `named`
> > now
> > > >> just
> > > >> > >>> sets a Described("key") in "described()".
> > > >> > >>>
> > > >> > >>> stream.filter(described(Described.as("key", /* any other fancy
> > > >> > >> parameters
> > > >> > >>> in the future*/), (k, v) -> true));
> > > >> > >>> ----------
> > > >> > >>>
> > > >> > >>>
> > > >> > >>> I feel it is not much likely that we'd need to extend it
> further
> > > in
> > > >> the
> > > >> > >>> future, so just a `String` would be good enough. But just
> > listing
> > > >> all
> > > >> > >>> possibilities here.
> > > >> > >>>
> > > >> > >>>
> > > >> > >>>
> > > >> > >>> Guozhang
> > > >> > >>>
> > > >> > >>>
> > > >> > >>>
> > > >> > >>>
> > > >> > >>>
> > > >> > >>>
> > > >> > >>> On Fri, Jul 6, 2018 at 8:19 AM, John Roesler <
> j...@confluent.io
> > >
> > > >> > wrote:
> > > >> > >>>
> > > >> > >>>> Hi Florian,
> > > >> > >>>>
> > > >> > >>>> Sorry I'm late to the party, but I missed the message
> > originally.
> > > >> > >>>>
> > > >> > >>>> Regarding the names, it's probably a good idea to stick to
> the
> > > same
> > > >> > >>>> character set we're currently using: letters, numbers, and
> > > hyphens.
> > > >> > The
> > > >> > >>>> names are used in Kafka topics, files and folders, and
> RocksDB
> > > >> > >> databases,
> > > >> > >>>> and we also need them to work with the file systems of
> Windows,
> > > >> Linux,
> > > >> > >>> and
> > > >> > >>>> MacOS. My opinion is that with a situation like that, it's
> > better
> > > >> to
> > > >> > be
> > > >> > >>>> conservative. It might also be a good idea to impose an upper
> > > >> limit on
> > > >> > >>> name
> > > >> > >>>> length to avoid running afoul of any of those systems.
> > > >> > >>>>
> > > >> > >>>> ---
> > > >> > >>>>
> > > >> > >>>> It seems like there's a small debate between 1) adding a new
> > > >> method to
> > > >> > >>>> KStream (and maybe KTable) to modify its name after the fact,
> > or
> > > 2)
> > > >> > >>>> piggy-backing on the config objects where they exist and
> adding
> > > one
> > > >> > >> where
> > > >> > >>>> they don't. To me, #2 is the better alternative even though
> it
> > > >> > produces
> > > >> > >>>> more overloads and may be a bit awkward in places.
> > > >> > >>>>
> > > >> > >>>> The reason is simply that #1 is a high-level departure from
> the
> > > >> > >>>> graph-building paradigm we're using in the DSL. Consider:
> > > >> > >>>>
> > > >> > >>>> Graph.node1(config).node2(config)
> > > >> > >>>>
> > > >> > >>>> vs
> > > >> > >>>>
> > > >> > >>>> Graph.node1().config().node2().config()
> > > >> > >>>>
> > > >> > >>>> We could have done either, but we picked the former. I think
> > it's
> > > >> > >>> probably
> > > >> > >>>> a good goal to try and stick to it so that developers can
> > develop
> > > >> and
> > > >> > >>> rely
> > > >> > >>>> on their instincts for how the DSL will behave.
> > > >> > >>>>
> > > >> > >>>> I do want to present one alternative to adding new config
> > > objects:
> > > >> we
> > > >> > >> can
> > > >> > >>>> just add a "name()" method to all our "action" interfaces.
> For
> > > >> > example,
> > > >> > >>>> I'll demonstrate how we can add a "name" to Predicate and
> then
> > > use
> > > >> it
> > > >> > >> to
> > > >> > >>>> name a "KStream#filter" DSL operator:
> > > >> > >>>>
> > > >> > >>>> public interface Predicate<K, V> {
> > > >> > >>>>     // existing method
> > > >> > >>>>     boolean test(final K key, final V value);
> > > >> > >>>>
> > > >> > >>>>     // new default method adds the ability to name the
> > predicate
> > > >> > >>>>     default String name() {
> > > >> > >>>>         return null;
> > > >> > >>>>     }
> > > >> > >>>>
> > > >> > >>>>     // new static factory method adds the ability to wrap
> > lambda
> > > >> > >>> predicates
> > > >> > >>>> with a named predicate
> > > >> > >>>>     static <K, V> Predicate<K, V> named(final String name,
> > final
> > > >> > >>>> Predicate<K, V> predicate) {
> > > >> > >>>>         return new Predicate<K, V>() {
> > > >> > >>>>             @Override
> > > >> > >>>>             public boolean test(final K key, final V value) {
> > > >> > >>>>                 return predicate.test(key, value);
> > > >> > >>>>             }
> > > >> > >>>>
> > > >> > >>>>             @Override
> > > >> > >>>>             public String name() {
> > > >> > >>>>                 return name;
> > > >> > >>>>             }
> > > >> > >>>>         };
> > > >> > >>>>     }
> > > >> > >>>> }
> > > >> > >>>>
> > > >> > >>>> Then, here's how it would look to use it:
> > > >> > >>>>
> > > >> > >>>> // Anonymous predicates continue to work just fine
> > > >> > >>>> stream.filter((k, v) -> true);
> > > >> > >>>>
> > > >> > >>>> // Devs can swap in a Predicate that implements the name()
> > > method.
> > > >> > >>>> stream.filter(new Predicate<Object, Object>() {
> > > >> > >>>>     @Override
> > > >> > >>>>     public boolean test(final Object key, final Object
> value) {
> > > >> > >>>>         return true;
> > > >> > >>>>     }
> > > >> > >>>>
> > > >> > >>>>     @Override
> > > >> > >>>>     public String name() {
> > > >> > >>>>         return "hey";
> > > >> > >>>>     }
> > > >> > >>>> });
> > > >> > >>>>
> > > >> > >>>> // Or they can wrap their existing lambda using the static
> > > factory
> > > >> > >> method
> > > >> > >>>> stream.filter(named("key", (k, v) -> true));
> > > >> > >>>>
> > > >> > >>>> Just a thought.
> > > >> > >>>>
> > > >> > >>>> Overall, I think it's really valuable to be able to name the
> > > >> > >> processors,
> > > >> > >>>> for all the reasons you mentioned in the KIP. So thank you
> for
> > > >> > >>> introducing
> > > >> > >>>> this!
> > > >> > >>>>
> > > >> > >>>> Thanks,
> > > >> > >>>> -John
> > > >> > >>>>
> > > >> > >>>> On Thu, Jul 5, 2018 at 4:53 PM Florian Hussonnois <
> > > >> > >> fhussonn...@gmail.com
> > > >> > >>>>
> > > >> > >>>> wrote:
> > > >> > >>>>
> > > >> > >>>>> Hi, thank you very much for all you suggestions. I've
> started
> > to
> > > >> > >> update
> > > >> > >>>> the
> > > >> > >>>>> KIP (
> > > >> > >>>>>
> > > >> > >>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > >> > >>>>
> 307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL
> > > >> > >>>>> ).
> > > >> > >>>>> Also, I propose to rename the Processed class into
> Described -
> > > >> this
> > > >> > >>> will
> > > >> > >>>> be
> > > >> > >>>>> more meaningful (but this is just a detail).
> > > >> > >>>>>
> > > >> > >>>>> I'm OK to not enforcing uppercase for specific names but
> > should
> > > we
> > > >> > >>> allow
> > > >> > >>>>> arbitrary names with whitespaces for example ? Currently, I
> > > can't
> > > >> > >> tell
> > > >> > >>> if
> > > >> > >>>>> this can lead to some side effects ?
> > > >> > >>>>>
> > > >> > >>>>> Le lun. 11 juin 2018 à 01:31, Matthias J. Sax <
> > > >> matth...@confluent.io
> > > >> > >>>
> > > >> > >>> a
> > > >> > >>>>> écrit :
> > > >> > >>>>>
> > > >> > >>>>>> Just catching up on this thread.
> > > >> > >>>>>>
> > > >> > >>>>>> I like the general idea. Couple of comments:
> > > >> > >>>>>>
> > > >> > >>>>>>  - I think that adding `Processed` (or maybe a different
> > name?)
> > > >> is
> > > >> > >> a
> > > >> > >>>>>> valid proposal for stateless operators that only have a
> > single
> > > >> > >>> overload
> > > >> > >>>>>> atm. It would align with the overall API design.
> > > >> > >>>>>>
> > > >> > >>>>>>  - for all methods with multiple existing overloads, we can
> > > >> > >> consider
> > > >> > >>> to
> > > >> > >>>>>> extend `Consumed`, `Produced`, `Materialized` etc to take
> an
> > > >> > >>> additional
> > > >> > >>>>>> processor name (not sure atm how elegant this is; we would
> > need
> > > >> to
> > > >> > >>>>>> "play" with the API a little bit; the advantage would be,
> > that
> > > we
> > > >> > >> do
> > > >> > >>>> not
> > > >> > >>>>>> add more overloads what seems to be key for this KIP)
> > > >> > >>>>>>
> > > >> > >>>>>>  - operators return void: while I agree that the "name
> first"
> > > >> > >>> chaining
> > > >> > >>>>>> idea is not very intuitive, it might still work, if we name
> > the
> > > >> > >>> method
> > > >> > >>>>>> correctly (again, we would need to "play" with the API a
> > little
> > > >> bit
> > > >> > >>> to
> > > >> > >>>>> see)
> > > >> > >>>>>>
> > > >> > >>>>>>  - for DSL operators that are translated to multiple nodes:
> > it
> > > >> > >> might
> > > >> > >>>>>> make sense to use the specified operator name as prefix and
> > add
> > > >> > >>>>>> reasonable suffixes. For example, a join translates into 5
> > > >> > >> operators
> > > >> > >>>>>> that could be name "name-left-store-processor",
> > > >> > >>>>>> "name-left-join-processor", "name-right-store-processor",
> > > >> > >>>>>> "name-right-join-processor", and
> "name-join-merge-processor"
> > > (or
> > > >> > >>>>>> similar). Maybe just using numbers might also work.
> > > >> > >>>>>>
> > > >> > >>>>>>  - I think, we should strip the number suffixes if a user
> > > >> provides
> > > >> > >>>> names
> > > >> > >>>>>>
> > > >> > >>>>>>  - enforcing upper case seems to be tricky: for example, we
> > do
> > > >> not
> > > >> > >>>>>> enforce upper case for store names and we cannot easily
> > change
> > > it
> > > >> > >> as
> > > >> > >>> it
> > > >> > >>>>>> would break compatibility -- thus, for consistency reasons
> we
> > > >> might
> > > >> > >>> not
> > > >> > >>>>>> want to do this
> > > >> > >>>>>>
> > > >> > >>>>>>  - for better understand of the impact of the KIP, it would
> > be
> > > >> > >> quite
> > > >> > >>>>>> helpful if you would list all method names that are
> affected
> > in
> > > >> the
> > > >> > >>> KIP
> > > >> > >>>>>> (ie, list all newly added overloads)
> > > >> > >>>>>>
> > > >> > >>>>>>
> > > >> > >>>>>> -Matthias
> > > >> > >>>>>>
> > > >> > >>>>>>
> > > >> > >>>>>> On 5/31/18 6:40 PM, Guozhang Wang wrote:
> > > >> > >>>>>>> Hi Florian,
> > > >> > >>>>>>>
> > > >> > >>>>>>> Re 1: I think changing the KStreamImpl / KTableImpl to
> allow
> > > >> > >>>> modifying
> > > >> > >>>>>> the
> > > >> > >>>>>>> processor name after the operator is fine as long as we do
> > the
> > > >> > >>> check
> > > >> > >>>>>> again
> > > >> > >>>>>>> when modifying that. In fact, we are having some topology
> > > >> > >>>> optimization
> > > >> > >>>>>>> going on which may modify processor names in the final
> > > topology
> > > >> > >>>>> anyways (
> > > >> > >>>>>>> https://github.com/apache/kafka/pull/4983). Semantically
> I
> > > >> think
> > > >> > >>> it
> > > >> > >>>> is
> > > >> > >>>>>>> easier to understand to developers than "deciding the
> > > processor
> > > >> > >>> name
> > > >> > >>>>> for
> > > >> > >>>>>>> the next operator".
> > > >> > >>>>>>>
> > > >> > >>>>>>> Re 2: Yeah I'm thinking that for operators that translates
> > to
> > > >> > >>>> multiple
> > > >> > >>>>>>> processor names, we can still use the provided "hint" to
> > name
> > > >> the
> > > >> > >>>>>> processor
> > > >> > >>>>>>> names, e.g. for Joins we can name them as `join-foo-this`
> > and
> > > >> > >>>>>>> `join-foo-that` etc if user calls `as("foo")`.
> > > >> > >>>>>>>
> > > >> > >>>>>>> Re 3: The motivation I had about removing the suffix is
> that
> > > it
> > > >> > >> has
> > > >> > >>>>> huge
> > > >> > >>>>>>> restrictions on topology compatibilities: consider if user
> > > code
> > > >> > >>>> added a
> > > >> > >>>>>> new
> > > >> > >>>>>>> operator, or library does some optimization to remove some
> > > >> > >>> operators,
> > > >> > >>>>> the
> > > >> > >>>>>>> suffix indexing may be changed for a large amount of the
> > > >> > >> processor
> > > >> > >>>>> names:
> > > >> > >>>>>>> this will in turn change the internal state store names,
> as
> > > well
> > > >> > >> as
> > > >> > >>>>>>> internal topic names as well, making the new application
> > > >> topology
> > > >> > >>> to
> > > >> > >>>> be
> > > >> > >>>>>>> incompatible with the ones. One rationale I had about this
> > KIP
> > > >> is
> > > >> > >>>> that
> > > >> > >>>>>>> aligned this effort, moving forward we can allow users to
> > > >> > >> customize
> > > >> > >>>>>>> internal names so that they can still be reused even with
> > > >> > >> topology
> > > >> > >>>>>> changes
> > > >> > >>>>>>> (e.g. KIP-230), so I think removing the suffix index would
> > be
> > > >> > >> more
> > > >> > >>>>>>> applicable in the long run.
> > > >> > >>>>>>>
> > > >> > >>>>>>>
> > > >> > >>>>>>>
> > > >> > >>>>>>> Guozhang
> > > >> > >>>>>>>
> > > >> > >>>>>>>
> > > >> > >>>>>>>
> > > >> > >>>>>>>
> > > >> > >>>>>>>
> > > >> > >>>>>>> On Thu, May 31, 2018 at 3:08 PM, Florian Hussonnois <
> > > >> > >>>>>> fhussonn...@gmail.com>
> > > >> > >>>>>>> wrote:
> > > >> > >>>>>>>
> > > >> > >>>>>>>> Hi ,
> > > >> > >>>>>>>> Thank you very much for your feedback.
> > > >> > >>>>>>>>
> > > >> > >>>>>>>> 1/
> > > >> > >>>>>>>> I agree that overloading most of the methods with a
> > Processed
> > > >> is
> > > >> > >>> not
> > > >> > >>>>>> ideal.
> > > >> > >>>>>>>> I've started modifying the KStream API and I got to the
> > same
> > > >> > >>>>> conclusion.
> > > >> > >>>>>>>> Also ading a new method directly to KStreamImpl and
> > > KTableImpl
> > > >> > >>>> classes
> > > >> > >>>>>>>> seems to be a better option.
> > > >> > >>>>>>>>
> > > >> > >>>>>>>> However a processor name cannot be redefined after
> calling
> > an
> > > >> > >>>> operator
> > > >> > >>>>>> (or
> > > >> > >>>>>>>> maybe I miss something in the code).
> > > >> > >>>>>>>> From my understanding, this will only set the KStream
> name
> > > >> > >>> property
> > > >> > >>>>> not
> > > >> > >>>>>> the
> > > >> > >>>>>>>> processor name previsouly added to the topology builder -
> > > >> > >> leading
> > > >> > >>> to
> > > >> > >>>>>>>> InvalidTopology exception.
> > > >> > >>>>>>>>
> > > >> > >>>>>>>> So the new method should actually defines the name of the
> > > next
> > > >> > >>>>>> processor :
> > > >> > >>>>>>>> Below is an example :
> > > >> > >>>>>>>>
> > > >> > >>>>>>>> *stream.as <http://stream.as
> > > >> > >>>> (Processed.name("MAPPE_TO_UPPERCASE")*
> > > >> > >>>>>>>> *          .map( (k, v) -> KeyValue.pair(k,
> > > v.toUpperCase()))*
> > > >> > >>>>>>>>
> > > >> > >>>>>>>> I think this approach could solve the cases for methods
> > > >> > >> returning
> > > >> > >>>>> void ?
> > > >> > >>>>>>>>
> > > >> > >>>>>>>> Regarding this new method we have two possible
> > > implementations
> > > >> :
> > > >> > >>>>>>>>
> > > >> > >>>>>>>>    1. Adding a method like : withName(String
> processorName)
> > > >> > >>>>>>>>    2. or adding a method accepting an Processed object :
> > > >> > >>>>> as(Processed).
> > > >> > >>>>>>>>
> > > >> > >>>>>>>> I think solution 2. is preferable as the Processed class
> > > could
> > > >> > >> be
> > > >> > >>>>>> enriched
> > > >> > >>>>>>>> further (in futur).
> > > >> > >>>>>>>>
> > > >> > >>>>>>>> 2/
> > > >> > >>>>>>>> As Guozhang said some operators add internal processors.
> > > >> > >>>>>>>> For example the branch() method create one KStreamBranch
> > > >> > >> processor
> > > >> > >>>> to
> > > >> > >>>>>> route
> > > >> > >>>>>>>> records and one KStreamPassThrough processor for each
> > branch.
> > > >> > >>>>>>>> In that situation only the parent processor can be named.
> > For
> > > >> > >>>> children
> > > >> > >>>>>>>> processors we could keep the current behaviour that add a
> > > >> suffix
> > > >> > >>>> (i.e
> > > >> > >>>>>>>> KSTREAM-BRANCHCHILD-)
> > > >> > >>>>>>>>
> > > >> > >>>>>>>> This also the case for the join() method that result to
> > > adding
> > > >> > >>>>> multiple
> > > >> > >>>>>>>> processors to the topology (windowing, left/right joins
> > and a
> > > >> > >>> merge
> > > >> > >>>>>>>> processor).
> > > >> > >>>>>>>> I think, like for the branch method users could only
> > define a
> > > >> > >>>>> processor
> > > >> > >>>>>>>> name prefix.
> > > >> > >>>>>>>>
> > > >> > >>>>>>>> 3/
> > > >> > >>>>>>>> I think we should  still added a suffix like
> "-0000000000"
> > to
> > > >> > >>>>> processor
> > > >> > >>>>>>>> name and enforce uppercases as this will keep some
> > > consistency
> > > >> > >>> with
> > > >> > >>>>> the
> > > >> > >>>>>>>> ones generated by the API.
> > > >> > >>>>>>>>
> > > >> > >>>>>>>> 4/
> > > >> > >>>>>>>> Yes, the KTable interface should be modified like KStream
> > to
> > > >> > >> allow
> > > >> > >>>>>> custom
> > > >> > >>>>>>>> processor names definition.
> > > >> > >>>>>>>>
> > > >> > >>>>>>>> Thanks,
> > > >> > >>>>>>>>
> > > >> > >>>>>>>>
> > > >> > >>>>>>>> Le jeu. 31 mai 2018 à 19:18, Damian Guy <
> > > damian....@gmail.com>
> > > >> > >> a
> > > >> > >>>>> écrit
> > > >> > >>>>>> :
> > > >> > >>>>>>>>
> > > >> > >>>>>>>>> Hi Florian,
> > > >> > >>>>>>>>>
> > > >> > >>>>>>>>> Thanks for the KIP. What about KTable and other DSL
> > > >> interfaces?
> > > >> > >>>> Will
> > > >> > >>>>>> they
> > > >> > >>>>>>>>> not want to be able to do the same thing?
> > > >> > >>>>>>>>> It would be good to see a complete set of the public API
> > > >> > >> changes.
> > > >> > >>>>>>>>>
> > > >> > >>>>>>>>> Cheers,
> > > >> > >>>>>>>>> Damian
> > > >> > >>>>>>>>>
> > > >> > >>>>>>>>> On Wed, 30 May 2018 at 19:45 Guozhang Wang <
> > > >> wangg...@gmail.com
> > > >> > >>>
> > > >> > >>>>> wrote:
> > > >> > >>>>>>>>>
> > > >> > >>>>>>>>>> Hello Florian,
> > > >> > >>>>>>>>>>
> > > >> > >>>>>>>>>> Thanks for the KIP. I have some meta feedbacks on the
> > > >> > >> proposal:
> > > >> > >>>>>>>>>>
> > > >> > >>>>>>>>>> 1. You mentioned that this `Processed` object will be
> > added
> > > >> > >> to a
> > > >> > >>>> new
> > > >> > >>>>>>>>>> overloaded variant of all the stateless operators, what
> > > about
> > > >> > >>> the
> > > >> > >>>>>>>>> stateful
> > > >> > >>>>>>>>>> operators? Would like to hear your opinions if you have
> > > >> > >> thought
> > > >> > >>>>> about
> > > >> > >>>>>>>>> that:
> > > >> > >>>>>>>>>> note for stateful operators they will usually be mapped
> > to
> > > >> > >>>> multiple
> > > >> > >>>>>>>>>> processor node names, so we probably need to come up
> with
> > > >> some
> > > >> > >>>> ways
> > > >> > >>>>> to
> > > >> > >>>>>>>>>> define all their names.
> > > >> > >>>>>>>>>>
> > > >> > >>>>>>>>>> 2. I share the same concern with Bill as for adding
> lots
> > of
> > > >> > >> new
> > > >> > >>>>>>>> overload
> > > >> > >>>>>>>>>> functions into the stateless operators, as we have just
> > > spent
> > > >> > >>>> quite
> > > >> > >>>>>>>> some
> > > >> > >>>>>>>>>> effort in trimming them since 1.0.0 release. If the
> goal
> > is
> > > >> to
> > > >> > >>>> just
> > > >> > >>>>>>>>> provide
> > > >> > >>>>>>>>>> some "hints" on the generated processor node names, not
> > > >> > >> strictly
> > > >> > >>>>>>>>> enforcing
> > > >> > >>>>>>>>>> the exact names that to be generated, then how about we
> > > just
> > > >> > >>> add a
> > > >> > >>>>> new
> > > >> > >>>>>>>>>> function to `KStream` and `KTable` classes like:
> > > >> > >>> "as(Processed)",
> > > >> > >>>>> with
> > > >> > >>>>>>>>> the
> > > >> > >>>>>>>>>> semantics as "the latest operators that generate this
> > > KStream
> > > >> > >> /
> > > >> > >>>>> KTable
> > > >> > >>>>>>>>> will
> > > >> > >>>>>>>>>> be named accordingly to this hint".
> > > >> > >>>>>>>>>>
> > > >> > >>>>>>>>>> The only caveat, is that for all operators like
> > > `KStream#to`
> > > >> > >> and
> > > >> > >>>>>>>>>> `KStream#print` that returns void, this alternative
> would
> > > not
> > > >> > >>>> work.
> > > >> > >>>>>> But
> > > >> > >>>>>>>>> for
> > > >> > >>>>>>>>>> the current operators:
> > > >> > >>>>>>>>>>
> > > >> > >>>>>>>>>> a. KStream#print,
> > > >> > >>>>>>>>>> b. KStream#foreach,
> > > >> > >>>>>>>>>> c. KStream#to,
> > > >> > >>>>>>>>>> d. KStream#process
> > > >> > >>>>>>>>>>
> > > >> > >>>>>>>>>> I personally felt that except `KStream#process` users
> > would
> > > >> > >> not
> > > >> > >>>>>> usually
> > > >> > >>>>>>>>>> bother to override their names, and for
> `KStream#process`
> > > we
> > > >> > >>> could
> > > >> > >>>>> add
> > > >> > >>>>>>>> an
> > > >> > >>>>>>>>>> overload variant with the additional Processed object.
> > > >> > >>>>>>>>>>
> > > >> > >>>>>>>>>>
> > > >> > >>>>>>>>>> 3. In your example, the processor names are still added
> > > with
> > > >> a
> > > >> > >>>>> suffix
> > > >> > >>>>>>>>> like
> > > >> > >>>>>>>>>> "
> > > >> > >>>>>>>>>> -0000000000", is this intentional? If yes, why (I
> thought
> > > >> with
> > > >> > >>>> user
> > > >> > >>>>>>>>>> specified processor name hints we will not add suffix
> to
> > > >> > >>>> distinguish
> > > >> > >>>>>>>>>> different nodes of the same type any more)?
> > > >> > >>>>>>>>>>
> > > >> > >>>>>>>>>>
> > > >> > >>>>>>>>>> Guozhang
> > > >> > >>>>>>>>>>
> > > >> > >>>>>>>>>>
> > > >> > >>>>>>>>>> On Tue, May 29, 2018 at 6:47 AM, Bill Bejeck <
> > > >> > >> bbej...@gmail.com
> > > >> > >>>>
> > > >> > >>>>>>>> wrote:
> > > >> > >>>>>>>>>>
> > > >> > >>>>>>>>>>> Hi Florian,
> > > >> > >>>>>>>>>>>
> > > >> > >>>>>>>>>>> Thanks for the KIP.  I think being able to add more
> > > context
> > > >> > >> to
> > > >> > >>>> the
> > > >> > >>>>>>>>>>> processor names would be useful.
> > > >> > >>>>>>>>>>>
> > > >> > >>>>>>>>>>> I like the idea of adding a "withProcessorName" to
> > > Produced,
> > > >> > >>>>> Consumed
> > > >> > >>>>>>>>> and
> > > >> > >>>>>>>>>>> Joined.
> > > >> > >>>>>>>>>>>
> > > >> > >>>>>>>>>>> But instead of adding the "Processed" parameter to a
> > large
> > > >> > >>>>> percentage
> > > >> > >>>>>>>>> of
> > > >> > >>>>>>>>>>> the methods, which would result in overloaded methods
> > > (which
> > > >> > >> we
> > > >> > >>>>>>>> removed
> > > >> > >>>>>>>>>>> quite a bit with KIP-182) what do you think of adding
> a
> > > >> > >> method
> > > >> > >>>>>>>>>>> to the AbstractStream class "withName(String
> > > >> processorName)"?
> > > >> > >>> BTW
> > > >> > >>>>> I"m
> > > >> > >>>>>>>>> not
> > > >> > >>>>>>>>>>> married to the method name, it's the best I can do off
> > the
> > > >> > >> top
> > > >> > >>> of
> > > >> > >>>>> my
> > > >> > >>>>>>>>>> head.
> > > >> > >>>>>>>>>>>
> > > >> > >>>>>>>>>>> For the methods that return void, we'd have to add a
> > > >> > >> parameter,
> > > >> > >>>> but
> > > >> > >>>>>>>>> that
> > > >> > >>>>>>>>>>> would at least cut down on the number of overloaded
> > > methods
> > > >> > >> in
> > > >> > >>>> the
> > > >> > >>>>>>>> API.
> > > >> > >>>>>>>>>>>
> > > >> > >>>>>>>>>>> Just my 2 cents.
> > > >> > >>>>>>>>>>>
> > > >> > >>>>>>>>>>> Thanks,
> > > >> > >>>>>>>>>>> Bill
> > > >> > >>>>>>>>>>>
> > > >> > >>>>>>>>>>> On Sun, May 27, 2018 at 4:13 PM, Florian Hussonnois <
> > > >> > >>>>>>>>>> fhussonn...@gmail.com
> > > >> > >>>>>>>>>>>>
> > > >> > >>>>>>>>>>> wrote:
> > > >> > >>>>>>>>>>>
> > > >> > >>>>>>>>>>>> Hi,
> > > >> > >>>>>>>>>>>>
> > > >> > >>>>>>>>>>>> I would like to start a new discussion on following
> > KIP :
> > > >> > >>>>>>>>>>>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > >> > >>>>>>>>>>>>
> > > >> > >>>
> 307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL
> > > >> > >>>>>>>>>>>>
> > > >> > >>>>>>>>>>>> This is still a draft.
> > > >> > >>>>>>>>>>>>
> > > >> > >>>>>>>>>>>> Looking forward for your feedback.
> > > >> > >>>>>>>>>>>> --
> > > >> > >>>>>>>>>>>> Florian HUSSONNOIS
> > > >> > >>>>>>>>>>>>
> > > >> > >>>>>>>>>>>
> > > >> > >>>>>>>>>>
> > > >> > >>>>>>>>>>
> > > >> > >>>>>>>>>>
> > > >> > >>>>>>>>>> --
> > > >> > >>>>>>>>>> -- Guozhang
> > > >> > >>>>>>>>>>
> > > >> > >>>>>>>>>
> > > >> > >>>>>>>>
> > > >> > >>>>>>>>
> > > >> > >>>>>>>> --
> > > >> > >>>>>>>> Florian HUSSONNOIS
> > > >> > >>>>>>>>
> > > >> > >>>>>>>
> > > >> > >>>>>>>
> > > >> > >>>>>>>
> > > >> > >>>>>>
> > > >> > >>>>>>
> > > >> > >>>>>
> > > >> > >>>>> --
> > > >> > >>>>> Florian HUSSONNOIS
> > > >> > >>>>>
> > > >> > >>>>
> > > >> > >>>
> > > >> > >>>
> > > >> > >>>
> > > >> > >>> --
> > > >> > >>> -- Guozhang
> > > >> > >>>
> > > >> > >>
> > > >> > >>
> > > >> > >> --
> > > >> > >> Florian HUSSONNOIS
> > > >> > >>
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> >
> > > >> >
> > > >>
> > > >> --
> > > >> Florian HUSSONNOIS
> > > >>
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
> --
> -- Guozhang
>


-- 
Florian HUSSONNOIS

Reply via email to